palladium_runtime/
responses.rs1use crate::sharded_map::ShardedIndexMap;
2use palladium_actor::MessagePayload;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
4use tokio::sync::oneshot;
5
6static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
7
8pub(crate) fn next_correlation_id() -> u64 {
9 NEXT_CORRELATION_ID.fetch_add(1, AtomicOrdering::Relaxed)
10}
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub struct RegistryFull;
15
16#[derive(Debug)]
17pub(crate) struct PendingResponse {
18 pub(crate) type_tag: u64,
19 pub(crate) payload: MessagePayload,
20}
21
22pub(crate) struct ResponseRegistry {
24 pending: ShardedIndexMap<u64, oneshot::Sender<PendingResponse>>,
25 capacity: usize,
26 count: AtomicUsize,
27}
28
29impl ResponseRegistry {
30 pub(crate) fn new(capacity: usize) -> Self {
31 Self {
32 pending: ShardedIndexMap::new(),
33 capacity,
34 count: AtomicUsize::new(0),
35 }
36 }
37
38 pub(crate) fn register(
41 &self,
42 tx: oneshot::Sender<PendingResponse>,
43 ) -> Result<u64, RegistryFull> {
44 if self.count.load(AtomicOrdering::Relaxed) >= self.capacity {
46 return Err(RegistryFull);
47 }
48
49 let id = next_correlation_id();
50 self.count.fetch_add(1, AtomicOrdering::Relaxed);
51 self.pending.insert(id, tx);
52 Ok(id)
53 }
54
55 pub(crate) fn complete(&self, correlation_id: u64, type_tag: u64, payload: MessagePayload) {
57 if let Some(tx) = self.pending.remove(&correlation_id) {
58 self.count.fetch_sub(1, AtomicOrdering::Relaxed);
59 tx.send(PendingResponse { type_tag, payload }).ok();
60 }
61 }
62
63 pub(crate) fn try_complete(
68 &self,
69 correlation_id: u64,
70 type_tag: u64,
71 payload: MessagePayload,
72 ) -> Result<(), PendingResponse> {
73 if let Some(tx) = self.pending.remove(&correlation_id) {
74 self.count.fetch_sub(1, AtomicOrdering::Relaxed);
75 tx.send(PendingResponse { type_tag, payload }).ok();
76 Ok(())
77 } else {
78 Err(PendingResponse { type_tag, payload })
79 }
80 }
81
82 pub(crate) fn cancel(&self, correlation_id: u64) {
84 if self.pending.remove(&correlation_id).is_some() {
85 self.count.fetch_sub(1, AtomicOrdering::Relaxed);
86 }
87 }
88
89 pub(crate) fn drain(&self) {
91 self.pending.clear();
92 self.count.store(0, AtomicOrdering::Relaxed);
93 }
94}