Skip to main content

rns_embedded_runtime/lib_parts/
module_prelude.rs

1use alloc::{collections::VecDeque, vec::Vec};
2
3pub use constants::*;
4
5pub use node::{
6    BleNodeBackendConfig, BroadcastOptions, CaptureDefaults, EmbeddedNode, EventSubscription,
7    NodeBackendConfig, NodeConfig, NodeError, NodeEvent, NodeEventKind, NodeLifecycleState,
8    NodeLogLevel, NodeOperationKind, NodeOperationReceipt, NodeRunState, NodeStatus,
9    NodeTransportMode, PollResult, SendOptions, TcpClientConfig, TcpServerConfig,
10};
11
12use rns_embedded_core::{
13    lxmf_min::{decode_envelope, encode_envelope, MinimalEnvelope},
14    packet::PacketFrame,
15    replay::ReplayWindow,
16    store::EmbeddedStore,
17    transport::{EmbeddedTransport, LinkState},
18    EmbeddedError, EmbeddedResult,
19};
20
21#[derive(Debug, Clone, Copy, Eq, PartialEq)]
22pub struct RuntimeConfig {
23    pub store_identity: [u8; 32],
24    pub lxmf_address: [u8; 16],
25    pub node_mode: NodeTransportMode,
26    pub announce_interval_ms: u64,
27    pub max_outbound_queue: usize,
28    pub max_events: usize,
29    pub capture_defaults: CaptureDefaults,
30}
31
32impl Default for RuntimeConfig {
33    fn default() -> Self {
34        Self {
35            store_identity: [0x11; 32],
36            lxmf_address: [0x22; 16],
37            node_mode: NodeTransportMode::BleOnly,
38            announce_interval_ms: DEFAULT_ANNOUNCE_INTERVAL_MS,
39            max_outbound_queue: 8,
40            max_events: 32,
41            capture_defaults: CaptureDefaults::default(),
42        }
43    }
44}
45
46#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
47pub struct RuntimeStats {
48    pub announces_queued: u32,
49    pub outbound_sent: u32,
50    pub outbound_deferred: u32,
51    pub inbound_accepted: u32,
52    pub inbound_rejected: u32,
53    pub announces_received: u32,
54    pub lxmf_messages_received: u32,
55}
56
57#[derive(Debug, Clone, Copy, Eq, PartialEq)]
58pub enum RuntimeEvent {
59    Bootstrapped {
60        replay_floor: u64,
61    },
62    AnnounceQueued {
63        sequence: u32,
64    },
65    MessageQueued {
66        sequence: u32,
67        bytes: usize,
68    },
69    FrameSent {
70        kind: u8,
71        sequence: u32,
72        bytes: usize,
73    },
74    FrameDeferred {
75        kind: u8,
76        sequence: u32,
77        error: EmbeddedError,
78    },
79    FrameReceived {
80        kind: u8,
81        sequence: u32,
82        bytes: usize,
83    },
84    AnnounceReceived {
85        sequence: u32,
86        bytes: usize,
87    },
88    LxmfMessageReceived {
89        sequence: u32,
90        source: [u8; 16],
91        destination: [u8; 16],
92        body_bytes: usize,
93    },
94    FrameRejected {
95        kind: u8,
96        sequence: u32,
97        error: EmbeddedError,
98    },
99    LifecycleChanged {
100        from: NodeLifecycleState,
101        to: NodeLifecycleState,
102    },
103}
104
105pub struct EmbeddedNodeRuntime {
106    config: RuntimeConfig,
107    replay_floor: u64,
108    replay_window: ReplayWindow,
109    next_sequence: u32,
110    outbound: VecDeque<PacketFrame>,
111    events: VecDeque<RuntimeEvent>,
112    last_announce_at_ms: Option<u64>,
113    bootstrapped: bool,
114    stats: RuntimeStats,
115    network_provisioned: bool,
116    ble_recovery_active: bool,
117    lifecycle_state: NodeLifecycleState,
118}
119
120impl EmbeddedNodeRuntime {
121    pub fn new(config: RuntimeConfig) -> EmbeddedResult<Self> {
122        if config.store_identity == [0; 32] || config.lxmf_address == [0; 16] {
123            return Err(EmbeddedError::InvalidInput);
124        }
125        if config.max_outbound_queue == 0 || config.max_events == 0 {
126            return Err(EmbeddedError::InvalidArgument);
127        }
128        Ok(Self {
129            config,
130            replay_floor: 0,
131            replay_window: ReplayWindow::new(),
132            next_sequence: 1,
133            outbound: VecDeque::new(),
134            events: VecDeque::new(),
135            last_announce_at_ms: None,
136            bootstrapped: false,
137            stats: RuntimeStats {
138                announces_queued: 0,
139                outbound_sent: 0,
140                outbound_deferred: 0,
141                inbound_accepted: 0,
142                inbound_rejected: 0,
143                announces_received: 0,
144                lxmf_messages_received: 0,
145            },
146            network_provisioned: false,
147            ble_recovery_active: false,
148            lifecycle_state: NodeLifecycleState::Boot,
149        })
150    }
151
152    pub fn bootstrap<S: EmbeddedStore>(&mut self, store: &S) -> EmbeddedResult<()> {
153        let replay_floor = store.load_replay_floor(&self.config.store_identity)?;
154        self.replay_floor = replay_floor;
155        self.bootstrapped = true;
156        self.push_event(RuntimeEvent::Bootstrapped { replay_floor });
157        Ok(())
158    }
159
160    pub fn tick<T: EmbeddedTransport, S: EmbeddedStore>(
161        &mut self,
162        now_ms: u64,
163        transport: &mut T,
164        store: &mut S,
165    ) -> EmbeddedResult<()> {
166        if !self.bootstrapped {
167            self.bootstrap(store)?;
168        }
169
170        self.update_lifecycle(transport.link_state());
171
172        if transport.link_state() == LinkState::Up
173            && self.announce_due(now_ms)
174            && !self.has_queued_kind(FRAME_KIND_ANNOUNCE)
175        {
176            let sequence = self.queue_announce()?;
177            self.last_announce_at_ms = Some(now_ms);
178            self.stats.announces_queued = self.stats.announces_queued.saturating_add(1);
179            self.push_event(RuntimeEvent::AnnounceQueued { sequence });
180        }
181
182        self.poll_inbound(transport, store)?;
183        self.flush_outbound(transport)?;
184        Ok(())
185    }
186
187    pub fn queue_message(&mut self, destination: [u8; 16], body: &[u8]) -> EmbeddedResult<u32> {
188        let sequence = self.peek_next_sequence();
189        let envelope = MinimalEnvelope {
190            source: self.config.lxmf_address,
191            destination,
192            sequence: u64::from(sequence),
193            body: body.to_vec(),
194        };
195        let payload = encode_envelope(&envelope)?;
196        let frame = PacketFrame::new(FRAME_KIND_LXMF_MESSAGE, sequence, payload)?;
197        self.enqueue_frame(frame)?;
198        self.push_event(RuntimeEvent::MessageQueued { sequence, bytes: body.len() });
199        Ok(sequence)
200    }
201
202    pub fn pending_outbound_len(&self) -> usize {
203        self.outbound.len()
204    }
205
206    pub fn config(&self) -> RuntimeConfig {
207        self.config
208    }
209
210    pub fn lifecycle_state(&self) -> NodeLifecycleState {
211        self.lifecycle_state
212    }
213
214    pub fn set_network_provisioned(&mut self, provisioned: bool) {
215        self.network_provisioned = provisioned;
216    }
217
218    pub fn set_ble_recovery_active(&mut self, active: bool) {
219        self.ble_recovery_active = active;
220    }
221
222    pub fn stats(&self) -> RuntimeStats {
223        self.stats
224    }
225
226    pub fn drain_events(&mut self) -> Vec<RuntimeEvent> {
227        self.events.drain(..).collect()
228    }
229
230    fn poll_inbound<T: EmbeddedTransport, S: EmbeddedStore>(
231        &mut self,
232        transport: &mut T,
233        store: &mut S,
234    ) -> EmbeddedResult<()> {
235        for _ in 0..8 {
236            let Some(frame) = transport.poll_frame()? else {
237                break;
238            };
239            let sequence = u64::from(frame.sequence);
240            if sequence <= self.replay_floor || !self.replay_window.accept(sequence) {
241                self.stats.inbound_rejected = self.stats.inbound_rejected.saturating_add(1);
242                self.push_event(RuntimeEvent::FrameRejected {
243                    kind: frame.kind,
244                    sequence: frame.sequence,
245                    error: EmbeddedError::ReplayRejected,
246                });
247                continue;
248            }
249
250            self.replay_floor = sequence;
251            store.save_replay_floor(&self.config.store_identity, self.replay_floor)?;
252            self.stats.inbound_accepted = self.stats.inbound_accepted.saturating_add(1);
253            self.push_event(RuntimeEvent::FrameReceived {
254                kind: frame.kind,
255                sequence: frame.sequence,
256                bytes: frame.payload.len(),
257            });
258            self.handle_inbound_frame(frame)?;
259        }
260        Ok(())
261    }
262
263    fn handle_inbound_frame(&mut self, frame: PacketFrame) -> EmbeddedResult<()> {
264        match frame.kind {
265            FRAME_KIND_ANNOUNCE => {
266                self.stats.announces_received = self.stats.announces_received.saturating_add(1);
267                self.push_event(RuntimeEvent::AnnounceReceived {
268                    sequence: frame.sequence,
269                    bytes: frame.payload.len(),
270                });
271            }
272            FRAME_KIND_LXMF_MESSAGE => {
273                let envelope = decode_envelope(&frame.payload)?;
274                self.stats.lxmf_messages_received =
275                    self.stats.lxmf_messages_received.saturating_add(1);
276                self.push_event(RuntimeEvent::LxmfMessageReceived {
277                    sequence: frame.sequence,
278                    source: envelope.source,
279                    destination: envelope.destination,
280                    body_bytes: envelope.body.len(),
281                });
282                if envelope.destination == self.config.lxmf_address {
283                    let mut response_body = b"pong:".to_vec();
284                    response_body.extend_from_slice(&envelope.body);
285                    self.queue_message(envelope.source, &response_body)?;
286                }
287            }
288            FRAME_KIND_TEST_PING => {
289                let mut payload = b"pong:".to_vec();
290                payload.extend_from_slice(&frame.payload);
291                let sequence = self.peek_next_sequence();
292                let response = PacketFrame::new(FRAME_KIND_TEST_PONG, sequence, payload)?;
293                self.enqueue_frame(response)?;
294            }
295            _ => {}
296        }
297        Ok(())
298    }
299
300    fn flush_outbound<T: EmbeddedTransport>(&mut self, transport: &mut T) -> EmbeddedResult<()> {
301        if transport.link_state() != LinkState::Up {
302            return Ok(());
303        }
304
305        while let Some(frame) = self.outbound.front().cloned() {
306            match transport.send_frame(&frame) {
307                Ok(()) => {
308                    self.outbound.pop_front();
309                    self.stats.outbound_sent = self.stats.outbound_sent.saturating_add(1);
310                    self.push_event(RuntimeEvent::FrameSent {
311                        kind: frame.kind,
312                        sequence: frame.sequence,
313                        bytes: frame.payload.len(),
314                    });
315                }
316                Err(error @ (EmbeddedError::Backpressure | EmbeddedError::Disconnected)) => {
317                    self.stats.outbound_deferred = self.stats.outbound_deferred.saturating_add(1);
318                    self.push_event(RuntimeEvent::FrameDeferred {
319                        kind: frame.kind,
320                        sequence: frame.sequence,
321                        error,
322                    });
323                    break;
324                }
325                Err(error) => {
326                    self.outbound.pop_front();
327                    self.stats.outbound_deferred = self.stats.outbound_deferred.saturating_add(1);
328                    self.push_event(RuntimeEvent::FrameDeferred {
329                        kind: frame.kind,
330                        sequence: frame.sequence,
331                        error,
332                    });
333                    return Err(error);
334                }
335            }
336        }
337        Ok(())
338    }
339
340    fn queue_announce(&mut self) -> EmbeddedResult<u32> {
341        let sequence = self.peek_next_sequence();
342        let frame =
343            PacketFrame::new(FRAME_KIND_ANNOUNCE, sequence, self.config.store_identity.to_vec())?;
344        self.enqueue_frame(frame)?;
345        Ok(sequence)
346    }
347
348    fn enqueue_frame(&mut self, frame: PacketFrame) -> EmbeddedResult<()> {
349        if self.outbound.len() >= self.config.max_outbound_queue {
350            return Err(EmbeddedError::Backpressure);
351        }
352        self.outbound.push_back(frame);
353        self.next_sequence = self.next_sequence.saturating_add(1);
354        Ok(())
355    }
356
357    fn has_queued_kind(&self, kind: u8) -> bool {
358        self.outbound.iter().any(|frame| frame.kind == kind)
359    }
360
361    fn announce_due(&self, now_ms: u64) -> bool {
362        match self.last_announce_at_ms {
363            None => true,
364            Some(last) => now_ms.saturating_sub(last) >= self.config.announce_interval_ms,
365        }
366    }
367
368    fn update_lifecycle(&mut self, link_state: LinkState) {
369        let next = if !self.bootstrapped {
370            NodeLifecycleState::Boot
371        } else if self.ble_recovery_active {
372            NodeLifecycleState::BleRecovery
373        } else {
374            match self.config.node_mode {
375                NodeTransportMode::BleOnly => NodeLifecycleState::Unprovisioned,
376                NodeTransportMode::TcpClient | NodeTransportMode::TcpServer => {
377                    if !self.network_provisioned {
378                        NodeLifecycleState::Unprovisioned
379                    } else {
380                        match link_state {
381                            LinkState::Up => NodeLifecycleState::TcpOnline,
382                            LinkState::Connecting => NodeLifecycleState::FailureReconnect,
383                            LinkState::Down => NodeLifecycleState::ProvisionedOffline,
384                        }
385                    }
386                }
387            }
388        };
389
390        if self.lifecycle_state != next {
391            let from = self.lifecycle_state;
392            self.lifecycle_state = next;
393            self.push_event(RuntimeEvent::LifecycleChanged { from, to: next });
394        }
395    }
396
397    fn peek_next_sequence(&self) -> u32 {
398        self.next_sequence
399    }
400
401    fn push_event(&mut self, event: RuntimeEvent) {
402        if self.events.len() >= self.config.max_events {
403            self.events.pop_front();
404        }
405        self.events.push_back(event);
406    }
407}