palladium_runtime/
responses.rs1use crate::sharded_map::ShardedIndexMap;
2use palladium_actor::MessagePayload;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
4use tokio::sync::oneshot;
5
6static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
7
8pub(crate) fn next_correlation_id() -> u64 {
9 NEXT_CORRELATION_ID.fetch_add(1, AtomicOrdering::Relaxed)
10}
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub struct RegistryFull;
15
16pub(crate) struct ResponseRegistry {
18 pending: ShardedIndexMap<u64, oneshot::Sender<MessagePayload>>,
19 capacity: usize,
20 count: AtomicUsize,
21}
22
23impl ResponseRegistry {
24 pub(crate) fn new(capacity: usize) -> Self {
25 Self {
26 pending: ShardedIndexMap::new(),
27 capacity,
28 count: AtomicUsize::new(0),
29 }
30 }
31
32 pub(crate) fn register(
35 &self,
36 tx: oneshot::Sender<MessagePayload>,
37 ) -> Result<u64, RegistryFull> {
38 if self.count.load(AtomicOrdering::Relaxed) >= self.capacity {
40 return Err(RegistryFull);
41 }
42
43 let id = next_correlation_id();
44 self.count.fetch_add(1, AtomicOrdering::Relaxed);
45 self.pending.insert(id, tx);
46 Ok(id)
47 }
48
49 pub(crate) fn complete(&self, correlation_id: u64, payload: MessagePayload) {
51 if let Some(tx) = self.pending.remove(&correlation_id) {
52 self.count.fetch_sub(1, AtomicOrdering::Relaxed);
53 tx.send(payload).ok();
54 }
55 }
56
57 pub(crate) fn cancel(&self, correlation_id: u64) {
59 if self.pending.remove(&correlation_id).is_some() {
60 self.count.fetch_sub(1, AtomicOrdering::Relaxed);
61 }
62 }
63
64 pub(crate) fn drain(&self) {
66 self.pending.clear();
67 self.count.store(0, AtomicOrdering::Relaxed);
68 }
69}