strontium_core/
transport.rs1use std::sync::atomic::{AtomicU64, Ordering};
2use std::time::Duration;
3
4#[derive(Debug, Default)]
5pub struct NetworkClock {
6 nanos: AtomicU64,
7}
8
9impl NetworkClock {
10 pub fn new() -> Self {
11 Self {
12 nanos: AtomicU64::new(0),
13 }
14 }
15
16 pub fn now(&self) -> Duration {
17 Duration::from_nanos(self.nanos.load(Ordering::Relaxed))
18 }
19
20 pub fn set(&self, now: Duration) {
21 self.nanos.store(
22 now.as_nanos().min(u64::MAX as u128) as u64,
23 Ordering::Relaxed,
24 );
25 }
26
27 pub fn advance(&self, delta: Duration) {
28 let add = delta.as_nanos().min(u64::MAX as u128) as u64;
29 self.nanos.fetch_add(add, Ordering::Relaxed);
30 }
31}
32
33#[derive(Debug, Default)]
34pub struct NetworkDeliveryReport {
35 pub delivered: usize,
36 pub mailbox_full: usize,
37 pub unroutable: usize,
38 pub deserialize_errors: usize,
39 pub invalid_source: usize,
40}
41
42pub struct RoutedActorEvent<Address, Envelope, Payload> {
43 pub from: Address,
44 pub to: Address,
45 pub envelope: Envelope,
46 pub payload: Payload,
47}
48
49pub enum RouteOutcome {
50 Delivered,
51 MailboxFull,
52 Unroutable,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum DeserializeError {
57 InvalidEnvelope,
58 InvalidPayload,
59 UnsupportedVersion,
60 CorruptData,
61}
62
63pub trait DispatchRegistry<Engine, Address, Envelope, Payload>: Clone {
64 fn is_local(&self, addr: Address) -> bool;
65 fn register_remote_source(&self, source_engine: &Engine, source_addr: Address);
66 fn validate_source(&self, envelope: &Envelope, source_engine: &Engine) -> bool;
67 fn deserialize_payload(
68 &self,
69 envelope: &Envelope,
70 payload: Payload,
71 ) -> Result<Payload, DeserializeError>;
72 fn route(&self, envelope: Envelope, payload: Payload) -> RouteOutcome;
73}
74
75pub fn dispatch_ready_actor_events<Engine, Registry, Address, Envelope, Payload, V>(
76 events: Vec<RoutedActorEvent<Address, Envelope, Payload>>,
77 registries: &[(Engine, Registry)],
78 validate_batch: V,
79) -> NetworkDeliveryReport
80where
81 Engine: Clone,
82 Registry: DispatchRegistry<Engine, Address, Envelope, Payload>,
83 Address: Copy,
84 Envelope: Copy,
85 V: FnOnce(&[Envelope]),
86{
87 let envelopes: Vec<_> = events.iter().map(|event| event.envelope).collect();
88 validate_batch(&envelopes);
89
90 let mut report = NetworkDeliveryReport::default();
91
92 for event in events {
93 let RoutedActorEvent {
94 from,
95 to,
96 envelope,
97 payload,
98 } = event;
99
100 let Some((dest_engine, registry)) = registries
101 .iter()
102 .find(|(_, reg)| reg.is_local(to))
103 .map(|(engine, reg)| (engine.clone(), reg.clone()))
104 else {
105 report.unroutable += 1;
106 continue;
107 };
108
109 let source_engine = registries
110 .iter()
111 .find(|(_, reg)| reg.is_local(from))
112 .map(|(engine, _)| engine.clone());
113
114 if let Some(source_engine) = source_engine {
115 registry.register_remote_source(&source_engine, from);
116 if !registry.validate_source(&envelope, &source_engine) {
117 report.invalid_source += 1;
118 continue;
119 }
120 }
121
122 let payload = match registry.deserialize_payload(&envelope, payload) {
123 Ok(payload) => payload,
124 Err(_error) => {
125 report.deserialize_errors += 1;
126 continue;
127 }
128 };
129
130 match registry.route(envelope, payload) {
131 RouteOutcome::Delivered => {
132 let _ = dest_engine;
133 report.delivered += 1;
134 }
135 RouteOutcome::MailboxFull => report.mailbox_full += 1,
136 RouteOutcome::Unroutable => report.unroutable += 1,
137 }
138 }
139
140 report
141}