Skip to main content

palladium_runtime/multi_core/
transport.rs

1use std::sync::Arc;
2
3use palladium_actor::{AddrHash, Envelope, MessagePayload};
4use palladium_transport::{
5    InProcessTransport, MailboxSender, Transport, TransportError, TransportFuture,
6};
7
8use crate::placement::PlacementMap;
9use crate::ring_buffer::{CacheLineAtomicBool, InterCoreQueue};
10
11/// Cross-core routing transport backed by [`InterCoreQueue`] ring buffers.
12///
13/// When the destination actor lives on a different core, the message is pushed
14/// to `rings[my_core][dest_core]` instead of being delivered directly. The
15/// receiving core's event loop is responsible for draining its inbound rings.
16///
17/// Falls back to the local [`InProcessTransport`] when the destination is on
18/// the same core or has no placement entry.
19///
20/// After each successful cross-core push the sender signals the destination
21/// core's entry in the shared `has_work` flag array (cache-line padded to
22/// prevent false sharing).  The drain task on the destination core uses this
23/// flag to break out of its spin window early and to skip the expensive
24/// `yield_now()` / Tokio wakeup when more inter-core work is already in flight.
25pub struct InterCoreTransport {
26    pub(crate) my_core: usize,
27    pub(crate) rings: Arc<Vec<Vec<InterCoreQueue>>>,
28    pub(crate) placement: Arc<PlacementMap>,
29    pub(crate) local: Arc<InProcessTransport>,
30    /// Per-core work-available flags shared across all `InterCoreTransport`
31    /// instances.  `has_work[dst]` is set (Release) after a successful push
32    /// into the `dst` core's inbound ring.
33    pub(crate) has_work: Arc<Vec<CacheLineAtomicBool>>,
34    /// Per-core idle state. `true` means destination core is idle/parked.
35    /// Senders flip `true -> false` to gate wake signals to one per idle period.
36    pub(crate) core_idle: Arc<Vec<CacheLineAtomicBool>>,
37}
38
39impl Transport for InterCoreTransport {
40    fn try_send(&self, envelope: Envelope, payload: MessagePayload) -> Result<(), TransportError> {
41        let dest_core = self
42            .placement
43            .get(&envelope.destination.path_hash())
44            .map(|v| *v);
45
46        if let Some(core) = dest_core {
47            if core != self.my_core {
48                let msg = palladium_transport::MailboxMessage { envelope, payload };
49                return if self.rings[self.my_core][core].push(msg) {
50                    // Idle->active transition gate: only signal has_work once
51                    // per idle period for this destination core.
52                    if self.core_idle[core].take() {
53                        self.has_work[core].signal();
54                    }
55                    Ok(())
56                } else {
57                    Err(TransportError::MailboxFull)
58                };
59            }
60        }
61        // If same core or unknown, use local transport.
62        self.local.try_send(envelope, payload)
63    }
64
65    fn send<'a>(&'a self, envelope: Envelope, payload: MessagePayload) -> TransportFuture<'a> {
66        Box::pin(async move { self.try_send(envelope, payload) })
67    }
68
69    fn register(&self, _addr: AddrHash, _mailbox: MailboxSender) -> Result<(), TransportError> {
70        // Registration is handled by the core-local InProcessTransport.
71        Ok(())
72    }
73
74    fn unregister(&self, _addr: AddrHash) -> Result<(), TransportError> {
75        Ok(())
76    }
77
78    fn can_route(&self, destination: AddrHash) -> bool {
79        // InterCoreTransport can attempt to route any address by checking the placement map.
80        // It acts as a fallback for non-local addresses.
81        self.placement.contains_key(&destination.path_hash())
82    }
83}
84
85impl InterCoreTransport {
86    /// Create a new `InterCoreTransport` owning an N×N ring buffer matrix and
87    /// a private `has_work` flag array.
88    ///
89    /// `capacity` is the number of messages each ring can hold before
90    /// returning [`TransportError::MailboxFull`].
91    ///
92    /// When multiple transport instances need to **share** the same flag array
93    /// (i.e. cross-core signalling in `MultiCoreEngine`), construct the struct
94    /// directly using the public fields rather than calling this constructor.
95    pub fn new(
96        my_core: usize,
97        num_cores: usize,
98        capacity: usize,
99        placement: Arc<PlacementMap>,
100        local: Arc<InProcessTransport>,
101    ) -> Self {
102        let rings = (0..num_cores)
103            .map(|_| {
104                (0..num_cores)
105                    .map(|_| InterCoreQueue::new(capacity))
106                    .collect()
107            })
108            .collect();
109        let has_work = Arc::new(
110            (0..num_cores)
111                .map(|_| CacheLineAtomicBool::new(false))
112                .collect(),
113        );
114        let core_idle = Arc::new(
115            (0..num_cores)
116                .map(|_| CacheLineAtomicBool::new(true))
117                .collect(),
118        );
119        Self {
120            my_core,
121            rings: Arc::new(rings),
122            placement,
123            local,
124            has_work,
125            core_idle,
126        }
127    }
128
129    /// Deliver a message, routing cross-core via the ring buffer if needed.
130    pub fn try_deliver(
131        &self,
132        envelope: palladium_actor::Envelope,
133        payload: palladium_actor::MessagePayload,
134    ) -> Result<(), TransportError> {
135        let dest_core = self
136            .placement
137            .get(&envelope.destination.path_hash())
138            .map(|v| *v);
139        if let Some(core) = dest_core {
140            if core != self.my_core {
141                let msg = palladium_transport::MailboxMessage { envelope, payload };
142                return if self.rings[self.my_core][core].push(msg) {
143                    if self.core_idle[core].take() {
144                        self.has_work[core].signal();
145                    }
146                    Ok(())
147                } else {
148                    Err(TransportError::MailboxFull)
149                };
150            }
151        }
152        self.local.try_deliver(envelope, payload)
153    }
154}