Skip to main content

palladium_runtime/
responses.rs

1use crate::sharded_map::ShardedIndexMap;
2use palladium_actor::MessagePayload;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
4use tokio::sync::oneshot;
5
6static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
7
8pub(crate) fn next_correlation_id() -> u64 {
9    NEXT_CORRELATION_ID.fetch_add(1, AtomicOrdering::Relaxed)
10}
11
12/// Error returned when the response registry is at capacity.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub struct RegistryFull;
15
16/// Manages pending `ask()` requests and their correlation IDs (REQ-019, REQ-072).
17pub(crate) struct ResponseRegistry {
18    pending: ShardedIndexMap<u64, oneshot::Sender<MessagePayload>>,
19    capacity: usize,
20    count: AtomicUsize,
21}
22
23impl ResponseRegistry {
24    pub(crate) fn new(capacity: usize) -> Self {
25        Self {
26            pending: ShardedIndexMap::new(),
27            capacity,
28            count: AtomicUsize::new(0),
29        }
30    }
31
32    /// Register a one-shot sender for a new ask request.  Returns the
33    /// correlation ID.
34    pub(crate) fn register(
35        &self,
36        tx: oneshot::Sender<MessagePayload>,
37    ) -> Result<u64, RegistryFull> {
38        // Optimistic check.
39        if self.count.load(AtomicOrdering::Relaxed) >= self.capacity {
40            return Err(RegistryFull);
41        }
42
43        let id = next_correlation_id();
44        self.count.fetch_add(1, AtomicOrdering::Relaxed);
45        self.pending.insert(id, tx);
46        Ok(id)
47    }
48
49    /// Complete an ask request by sending the response payload.
50    pub(crate) fn complete(&self, correlation_id: u64, payload: MessagePayload) {
51        if let Some(tx) = self.pending.remove(&correlation_id) {
52            self.count.fetch_sub(1, AtomicOrdering::Relaxed);
53            tx.send(payload).ok();
54        }
55    }
56
57    /// Cancel an ask request (e.g., on timeout).
58    pub(crate) fn cancel(&self, correlation_id: u64) {
59        if self.pending.remove(&correlation_id).is_some() {
60            self.count.fetch_sub(1, AtomicOrdering::Relaxed);
61        }
62    }
63
64    /// Drain all pending asks, resolving them with an error (used on shutdown).
65    pub(crate) fn drain(&self) {
66        self.pending.clear();
67        self.count.store(0, AtomicOrdering::Relaxed);
68    }
69}