palladium_runtime/multi_core/transport.rs
1use std::sync::Arc;
2
3use palladium_actor::{AddrHash, Envelope, MessagePayload};
4use palladium_transport::{
5 InProcessTransport, MailboxSender, Transport, TransportError, TransportFuture,
6};
7
8use crate::placement::PlacementMap;
9use crate::ring_buffer::{CacheLineAtomicBool, InterCoreQueue};
10
11/// Cross-core routing transport backed by [`InterCoreQueue`] ring buffers.
12///
13/// When the destination actor lives on a different core, the message is pushed
14/// to `rings[my_core][dest_core]` instead of being delivered directly. The
15/// receiving core's event loop is responsible for draining its inbound rings.
16///
17/// Falls back to the local [`InProcessTransport`] when the destination is on
18/// the same core or has no placement entry.
19///
20/// After each successful cross-core push the sender signals the destination
21/// core's entry in the shared `has_work` flag array (cache-line padded to
22/// prevent false sharing). The drain task on the destination core uses this
23/// flag to break out of its spin window early and to skip the expensive
24/// `yield_now()` / Tokio wakeup when more inter-core work is already in flight.
25pub struct InterCoreTransport {
26 pub(crate) my_core: usize,
27 pub(crate) rings: Arc<Vec<Vec<InterCoreQueue>>>,
28 pub(crate) placement: Arc<PlacementMap>,
29 pub(crate) local: Arc<InProcessTransport>,
30 /// Per-core work-available flags shared across all `InterCoreTransport`
31 /// instances. `has_work[dst]` is set (Release) after a successful push
32 /// into the `dst` core's inbound ring.
33 pub(crate) has_work: Arc<Vec<CacheLineAtomicBool>>,
34 /// Per-core idle state. `true` means destination core is idle/parked.
35 /// Senders flip `true -> false` to gate wake signals to one per idle period.
36 pub(crate) core_idle: Arc<Vec<CacheLineAtomicBool>>,
37}
38
39impl Transport for InterCoreTransport {
40 fn try_send(&self, envelope: Envelope, payload: MessagePayload) -> Result<(), TransportError> {
41 let dest_core = self
42 .placement
43 .get(&envelope.destination.path_hash())
44 .map(|v| *v);
45
46 if let Some(core) = dest_core {
47 if core != self.my_core {
48 let msg = palladium_transport::MailboxMessage { envelope, payload };
49 return if self.rings[self.my_core][core].push(msg) {
50 // Idle->active transition gate: only signal has_work once
51 // per idle period for this destination core.
52 if self.core_idle[core].take() {
53 self.has_work[core].signal();
54 }
55 Ok(())
56 } else {
57 Err(TransportError::MailboxFull)
58 };
59 }
60 }
61 // If same core or unknown, use local transport.
62 self.local.try_send(envelope, payload)
63 }
64
65 fn send<'a>(&'a self, envelope: Envelope, payload: MessagePayload) -> TransportFuture<'a> {
66 Box::pin(async move { self.try_send(envelope, payload) })
67 }
68
69 fn register(&self, _addr: AddrHash, _mailbox: MailboxSender) -> Result<(), TransportError> {
70 // Registration is handled by the core-local InProcessTransport.
71 Ok(())
72 }
73
74 fn unregister(&self, _addr: AddrHash) -> Result<(), TransportError> {
75 Ok(())
76 }
77
78 fn can_route(&self, destination: AddrHash) -> bool {
79 // InterCoreTransport can attempt to route any address by checking the placement map.
80 // It acts as a fallback for non-local addresses.
81 self.placement.contains_key(&destination.path_hash())
82 }
83}
84
85impl InterCoreTransport {
86 /// Create a new `InterCoreTransport` owning an N×N ring buffer matrix and
87 /// a private `has_work` flag array.
88 ///
89 /// `capacity` is the number of messages each ring can hold before
90 /// returning [`TransportError::MailboxFull`].
91 ///
92 /// When multiple transport instances need to **share** the same flag array
93 /// (i.e. cross-core signalling in `MultiCoreEngine`), construct the struct
94 /// directly using the public fields rather than calling this constructor.
95 pub fn new(
96 my_core: usize,
97 num_cores: usize,
98 capacity: usize,
99 placement: Arc<PlacementMap>,
100 local: Arc<InProcessTransport>,
101 ) -> Self {
102 let rings = (0..num_cores)
103 .map(|_| {
104 (0..num_cores)
105 .map(|_| InterCoreQueue::new(capacity))
106 .collect()
107 })
108 .collect();
109 let has_work = Arc::new(
110 (0..num_cores)
111 .map(|_| CacheLineAtomicBool::new(false))
112 .collect(),
113 );
114 let core_idle = Arc::new(
115 (0..num_cores)
116 .map(|_| CacheLineAtomicBool::new(true))
117 .collect(),
118 );
119 Self {
120 my_core,
121 rings: Arc::new(rings),
122 placement,
123 local,
124 has_work,
125 core_idle,
126 }
127 }
128
129 /// Deliver a message, routing cross-core via the ring buffer if needed.
130 pub fn try_deliver(
131 &self,
132 envelope: palladium_actor::Envelope,
133 payload: palladium_actor::MessagePayload,
134 ) -> Result<(), TransportError> {
135 let dest_core = self
136 .placement
137 .get(&envelope.destination.path_hash())
138 .map(|v| *v);
139 if let Some(core) = dest_core {
140 if core != self.my_core {
141 let msg = palladium_transport::MailboxMessage { envelope, payload };
142 return if self.rings[self.my_core][core].push(msg) {
143 if self.core_idle[core].take() {
144 self.has_work[core].signal();
145 }
146 Ok(())
147 } else {
148 Err(TransportError::MailboxFull)
149 };
150 }
151 }
152 self.local.try_deliver(envelope, payload)
153 }
154}