Skip to main content

strontium_core/
transport.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::time::Duration;
3
4#[derive(Debug, Default)]
5pub struct NetworkClock {
6    nanos: AtomicU64,
7}
8
9impl NetworkClock {
10    pub fn new() -> Self {
11        Self {
12            nanos: AtomicU64::new(0),
13        }
14    }
15
16    pub fn now(&self) -> Duration {
17        Duration::from_nanos(self.nanos.load(Ordering::Relaxed))
18    }
19
20    pub fn set(&self, now: Duration) {
21        self.nanos.store(
22            now.as_nanos().min(u64::MAX as u128) as u64,
23            Ordering::Relaxed,
24        );
25    }
26
27    pub fn advance(&self, delta: Duration) {
28        let add = delta.as_nanos().min(u64::MAX as u128) as u64;
29        self.nanos.fetch_add(add, Ordering::Relaxed);
30    }
31}
32
33#[derive(Debug, Default)]
34pub struct NetworkDeliveryReport {
35    pub delivered: usize,
36    pub mailbox_full: usize,
37    pub unroutable: usize,
38    pub deserialize_errors: usize,
39    pub invalid_source: usize,
40}
41
42pub struct RoutedActorEvent<Address, Envelope, Payload> {
43    pub from: Address,
44    pub to: Address,
45    pub envelope: Envelope,
46    pub payload: Payload,
47}
48
49pub enum RouteOutcome {
50    Delivered,
51    MailboxFull,
52    Unroutable,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum DeserializeError {
57    InvalidEnvelope,
58    InvalidPayload,
59    UnsupportedVersion,
60    CorruptData,
61}
62
63pub trait DispatchRegistry<Engine, Address, Envelope, Payload>: Clone {
64    fn is_local(&self, addr: Address) -> bool;
65    fn register_remote_source(&self, source_engine: &Engine, source_addr: Address);
66    fn validate_source(&self, envelope: &Envelope, source_engine: &Engine) -> bool;
67    fn deserialize_payload(
68        &self,
69        envelope: &Envelope,
70        payload: Payload,
71    ) -> Result<Payload, DeserializeError>;
72    fn route(&self, envelope: Envelope, payload: Payload) -> RouteOutcome;
73}
74
75pub fn dispatch_ready_actor_events<Engine, Registry, Address, Envelope, Payload, V>(
76    events: Vec<RoutedActorEvent<Address, Envelope, Payload>>,
77    registries: &[(Engine, Registry)],
78    validate_batch: V,
79) -> NetworkDeliveryReport
80where
81    Engine: Clone,
82    Registry: DispatchRegistry<Engine, Address, Envelope, Payload>,
83    Address: Copy,
84    Envelope: Copy,
85    V: FnOnce(&[Envelope]),
86{
87    let envelopes: Vec<_> = events.iter().map(|event| event.envelope).collect();
88    validate_batch(&envelopes);
89
90    let mut report = NetworkDeliveryReport::default();
91
92    for event in events {
93        let RoutedActorEvent {
94            from,
95            to,
96            envelope,
97            payload,
98        } = event;
99
100        let Some((dest_engine, registry)) = registries
101            .iter()
102            .find(|(_, reg)| reg.is_local(to))
103            .map(|(engine, reg)| (engine.clone(), reg.clone()))
104        else {
105            report.unroutable += 1;
106            continue;
107        };
108
109        let source_engine = registries
110            .iter()
111            .find(|(_, reg)| reg.is_local(from))
112            .map(|(engine, _)| engine.clone());
113
114        if let Some(source_engine) = source_engine {
115            registry.register_remote_source(&source_engine, from);
116            if !registry.validate_source(&envelope, &source_engine) {
117                report.invalid_source += 1;
118                continue;
119            }
120        }
121
122        let payload = match registry.deserialize_payload(&envelope, payload) {
123            Ok(payload) => payload,
124            Err(_error) => {
125                report.deserialize_errors += 1;
126                continue;
127            }
128        };
129
130        match registry.route(envelope, payload) {
131            RouteOutcome::Delivered => {
132                let _ = dest_engine;
133                report.delivered += 1;
134            }
135            RouteOutcome::MailboxFull => report.mailbox_full += 1,
136            RouteOutcome::Unroutable => report.unroutable += 1,
137        }
138    }
139
140    report
141}