Skip to main content

palladium_runtime/
responses.rs

1use crate::sharded_map::ShardedIndexMap;
2use palladium_actor::MessagePayload;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
4use tokio::sync::oneshot;
5
6static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
7
8pub(crate) fn next_correlation_id() -> u64 {
9    NEXT_CORRELATION_ID.fetch_add(1, AtomicOrdering::Relaxed)
10}
11
12/// Error returned when the response registry is at capacity.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub struct RegistryFull;
15
16#[derive(Debug)]
17pub(crate) struct PendingResponse {
18    pub(crate) type_tag: u64,
19    pub(crate) payload: MessagePayload,
20}
21
22/// Manages pending `ask()` requests and their correlation IDs (REQ-019, REQ-072).
23pub(crate) struct ResponseRegistry {
24    pending: ShardedIndexMap<u64, oneshot::Sender<PendingResponse>>,
25    capacity: usize,
26    count: AtomicUsize,
27}
28
29impl ResponseRegistry {
30    pub(crate) fn new(capacity: usize) -> Self {
31        Self {
32            pending: ShardedIndexMap::new(),
33            capacity,
34            count: AtomicUsize::new(0),
35        }
36    }
37
38    /// Register a one-shot sender for a new ask request.  Returns the
39    /// correlation ID.
40    pub(crate) fn register(
41        &self,
42        tx: oneshot::Sender<PendingResponse>,
43    ) -> Result<u64, RegistryFull> {
44        // Optimistic check.
45        if self.count.load(AtomicOrdering::Relaxed) >= self.capacity {
46            return Err(RegistryFull);
47        }
48
49        let id = next_correlation_id();
50        self.count.fetch_add(1, AtomicOrdering::Relaxed);
51        self.pending.insert(id, tx);
52        Ok(id)
53    }
54
55    /// Complete an ask request by sending the response payload.
56    pub(crate) fn complete(&self, correlation_id: u64, type_tag: u64, payload: MessagePayload) {
57        if let Some(tx) = self.pending.remove(&correlation_id) {
58            self.count.fetch_sub(1, AtomicOrdering::Relaxed);
59            tx.send(PendingResponse { type_tag, payload }).ok();
60        }
61    }
62
63    /// Attempt to complete an ask request.  Returns `Ok(())` if the
64    /// correlation ID was found and the payload was delivered, or
65    /// `Err(payload)` if no pending ask exists for this ID (e.g. the ask
66    /// originated on a different engine).
67    pub(crate) fn try_complete(
68        &self,
69        correlation_id: u64,
70        type_tag: u64,
71        payload: MessagePayload,
72    ) -> Result<(), PendingResponse> {
73        if let Some(tx) = self.pending.remove(&correlation_id) {
74            self.count.fetch_sub(1, AtomicOrdering::Relaxed);
75            tx.send(PendingResponse { type_tag, payload }).ok();
76            Ok(())
77        } else {
78            Err(PendingResponse { type_tag, payload })
79        }
80    }
81
82    /// Cancel an ask request (e.g., on timeout).
83    pub(crate) fn cancel(&self, correlation_id: u64) {
84        if self.pending.remove(&correlation_id).is_some() {
85            self.count.fetch_sub(1, AtomicOrdering::Relaxed);
86        }
87    }
88
89    /// Drain all pending asks, resolving them with an error (used on shutdown).
90    pub(crate) fn drain(&self) {
91        self.pending.clear();
92        self.count.store(0, AtomicOrdering::Relaxed);
93    }
94}