1use alloc::{collections::VecDeque, vec::Vec};
2
3pub use constants::*;
4
5pub use node::{
6 BleNodeBackendConfig, BroadcastOptions, CaptureDefaults, EmbeddedNode, EventSubscription,
7 NodeBackendConfig, NodeConfig, NodeError, NodeEvent, NodeEventKind, NodeLifecycleState,
8 NodeLogLevel, NodeOperationKind, NodeOperationReceipt, NodeRunState, NodeStatus,
9 NodeTransportMode, PollResult, SendOptions, TcpClientConfig, TcpServerConfig,
10};
11
12use rns_embedded_core::{
13 lxmf_min::{decode_envelope, encode_envelope, MinimalEnvelope},
14 packet::PacketFrame,
15 replay::ReplayWindow,
16 store::EmbeddedStore,
17 transport::{EmbeddedTransport, LinkState},
18 EmbeddedError, EmbeddedResult,
19};
20
21#[derive(Debug, Clone, Copy, Eq, PartialEq)]
22pub struct RuntimeConfig {
23 pub store_identity: [u8; 32],
24 pub lxmf_address: [u8; 16],
25 pub node_mode: NodeTransportMode,
26 pub announce_interval_ms: u64,
27 pub max_outbound_queue: usize,
28 pub max_events: usize,
29 pub capture_defaults: CaptureDefaults,
30}
31
32impl Default for RuntimeConfig {
33 fn default() -> Self {
34 Self {
35 store_identity: [0x11; 32],
36 lxmf_address: [0x22; 16],
37 node_mode: NodeTransportMode::BleOnly,
38 announce_interval_ms: DEFAULT_ANNOUNCE_INTERVAL_MS,
39 max_outbound_queue: 8,
40 max_events: 32,
41 capture_defaults: CaptureDefaults::default(),
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
47pub struct RuntimeStats {
48 pub announces_queued: u32,
49 pub outbound_sent: u32,
50 pub outbound_deferred: u32,
51 pub inbound_accepted: u32,
52 pub inbound_rejected: u32,
53 pub announces_received: u32,
54 pub lxmf_messages_received: u32,
55}
56
57#[derive(Debug, Clone, Copy, Eq, PartialEq)]
58pub enum RuntimeEvent {
59 Bootstrapped {
60 replay_floor: u64,
61 },
62 AnnounceQueued {
63 sequence: u32,
64 },
65 MessageQueued {
66 sequence: u32,
67 bytes: usize,
68 },
69 FrameSent {
70 kind: u8,
71 sequence: u32,
72 bytes: usize,
73 },
74 FrameDeferred {
75 kind: u8,
76 sequence: u32,
77 error: EmbeddedError,
78 },
79 FrameReceived {
80 kind: u8,
81 sequence: u32,
82 bytes: usize,
83 },
84 AnnounceReceived {
85 sequence: u32,
86 bytes: usize,
87 },
88 LxmfMessageReceived {
89 sequence: u32,
90 source: [u8; 16],
91 destination: [u8; 16],
92 body_bytes: usize,
93 },
94 FrameRejected {
95 kind: u8,
96 sequence: u32,
97 error: EmbeddedError,
98 },
99 LifecycleChanged {
100 from: NodeLifecycleState,
101 to: NodeLifecycleState,
102 },
103}
104
105pub struct EmbeddedNodeRuntime {
106 config: RuntimeConfig,
107 replay_floor: u64,
108 replay_window: ReplayWindow,
109 next_sequence: u32,
110 outbound: VecDeque<PacketFrame>,
111 events: VecDeque<RuntimeEvent>,
112 last_announce_at_ms: Option<u64>,
113 bootstrapped: bool,
114 stats: RuntimeStats,
115 network_provisioned: bool,
116 ble_recovery_active: bool,
117 lifecycle_state: NodeLifecycleState,
118}
119
120impl EmbeddedNodeRuntime {
121 pub fn new(config: RuntimeConfig) -> EmbeddedResult<Self> {
122 if config.store_identity == [0; 32] || config.lxmf_address == [0; 16] {
123 return Err(EmbeddedError::InvalidInput);
124 }
125 if config.max_outbound_queue == 0 || config.max_events == 0 {
126 return Err(EmbeddedError::InvalidArgument);
127 }
128 Ok(Self {
129 config,
130 replay_floor: 0,
131 replay_window: ReplayWindow::new(),
132 next_sequence: 1,
133 outbound: VecDeque::new(),
134 events: VecDeque::new(),
135 last_announce_at_ms: None,
136 bootstrapped: false,
137 stats: RuntimeStats {
138 announces_queued: 0,
139 outbound_sent: 0,
140 outbound_deferred: 0,
141 inbound_accepted: 0,
142 inbound_rejected: 0,
143 announces_received: 0,
144 lxmf_messages_received: 0,
145 },
146 network_provisioned: false,
147 ble_recovery_active: false,
148 lifecycle_state: NodeLifecycleState::Boot,
149 })
150 }
151
152 pub fn bootstrap<S: EmbeddedStore>(&mut self, store: &S) -> EmbeddedResult<()> {
153 let replay_floor = store.load_replay_floor(&self.config.store_identity)?;
154 self.replay_floor = replay_floor;
155 self.bootstrapped = true;
156 self.push_event(RuntimeEvent::Bootstrapped { replay_floor });
157 Ok(())
158 }
159
160 pub fn tick<T: EmbeddedTransport, S: EmbeddedStore>(
161 &mut self,
162 now_ms: u64,
163 transport: &mut T,
164 store: &mut S,
165 ) -> EmbeddedResult<()> {
166 if !self.bootstrapped {
167 self.bootstrap(store)?;
168 }
169
170 self.update_lifecycle(transport.link_state());
171
172 if transport.link_state() == LinkState::Up
173 && self.announce_due(now_ms)
174 && !self.has_queued_kind(FRAME_KIND_ANNOUNCE)
175 {
176 let sequence = self.queue_announce()?;
177 self.last_announce_at_ms = Some(now_ms);
178 self.stats.announces_queued = self.stats.announces_queued.saturating_add(1);
179 self.push_event(RuntimeEvent::AnnounceQueued { sequence });
180 }
181
182 self.poll_inbound(transport, store)?;
183 self.flush_outbound(transport)?;
184 Ok(())
185 }
186
187 pub fn queue_message(&mut self, destination: [u8; 16], body: &[u8]) -> EmbeddedResult<u32> {
188 let sequence = self.peek_next_sequence();
189 let envelope = MinimalEnvelope {
190 source: self.config.lxmf_address,
191 destination,
192 sequence: u64::from(sequence),
193 body: body.to_vec(),
194 };
195 let payload = encode_envelope(&envelope)?;
196 let frame = PacketFrame::new(FRAME_KIND_LXMF_MESSAGE, sequence, payload)?;
197 self.enqueue_frame(frame)?;
198 self.push_event(RuntimeEvent::MessageQueued { sequence, bytes: body.len() });
199 Ok(sequence)
200 }
201
202 pub fn pending_outbound_len(&self) -> usize {
203 self.outbound.len()
204 }
205
206 pub fn config(&self) -> RuntimeConfig {
207 self.config
208 }
209
210 pub fn lifecycle_state(&self) -> NodeLifecycleState {
211 self.lifecycle_state
212 }
213
214 pub fn set_network_provisioned(&mut self, provisioned: bool) {
215 self.network_provisioned = provisioned;
216 }
217
218 pub fn set_ble_recovery_active(&mut self, active: bool) {
219 self.ble_recovery_active = active;
220 }
221
222 pub fn stats(&self) -> RuntimeStats {
223 self.stats
224 }
225
226 pub fn drain_events(&mut self) -> Vec<RuntimeEvent> {
227 self.events.drain(..).collect()
228 }
229
230 fn poll_inbound<T: EmbeddedTransport, S: EmbeddedStore>(
231 &mut self,
232 transport: &mut T,
233 store: &mut S,
234 ) -> EmbeddedResult<()> {
235 for _ in 0..8 {
236 let Some(frame) = transport.poll_frame()? else {
237 break;
238 };
239 let sequence = u64::from(frame.sequence);
240 if sequence <= self.replay_floor || !self.replay_window.accept(sequence) {
241 self.stats.inbound_rejected = self.stats.inbound_rejected.saturating_add(1);
242 self.push_event(RuntimeEvent::FrameRejected {
243 kind: frame.kind,
244 sequence: frame.sequence,
245 error: EmbeddedError::ReplayRejected,
246 });
247 continue;
248 }
249
250 self.replay_floor = sequence;
251 store.save_replay_floor(&self.config.store_identity, self.replay_floor)?;
252 self.stats.inbound_accepted = self.stats.inbound_accepted.saturating_add(1);
253 self.push_event(RuntimeEvent::FrameReceived {
254 kind: frame.kind,
255 sequence: frame.sequence,
256 bytes: frame.payload.len(),
257 });
258 self.handle_inbound_frame(frame)?;
259 }
260 Ok(())
261 }
262
263 fn handle_inbound_frame(&mut self, frame: PacketFrame) -> EmbeddedResult<()> {
264 match frame.kind {
265 FRAME_KIND_ANNOUNCE => {
266 self.stats.announces_received = self.stats.announces_received.saturating_add(1);
267 self.push_event(RuntimeEvent::AnnounceReceived {
268 sequence: frame.sequence,
269 bytes: frame.payload.len(),
270 });
271 }
272 FRAME_KIND_LXMF_MESSAGE => {
273 let envelope = decode_envelope(&frame.payload)?;
274 self.stats.lxmf_messages_received =
275 self.stats.lxmf_messages_received.saturating_add(1);
276 self.push_event(RuntimeEvent::LxmfMessageReceived {
277 sequence: frame.sequence,
278 source: envelope.source,
279 destination: envelope.destination,
280 body_bytes: envelope.body.len(),
281 });
282 if envelope.destination == self.config.lxmf_address {
283 let mut response_body = b"pong:".to_vec();
284 response_body.extend_from_slice(&envelope.body);
285 self.queue_message(envelope.source, &response_body)?;
286 }
287 }
288 FRAME_KIND_TEST_PING => {
289 let mut payload = b"pong:".to_vec();
290 payload.extend_from_slice(&frame.payload);
291 let sequence = self.peek_next_sequence();
292 let response = PacketFrame::new(FRAME_KIND_TEST_PONG, sequence, payload)?;
293 self.enqueue_frame(response)?;
294 }
295 _ => {}
296 }
297 Ok(())
298 }
299
300 fn flush_outbound<T: EmbeddedTransport>(&mut self, transport: &mut T) -> EmbeddedResult<()> {
301 if transport.link_state() != LinkState::Up {
302 return Ok(());
303 }
304
305 while let Some(frame) = self.outbound.front().cloned() {
306 match transport.send_frame(&frame) {
307 Ok(()) => {
308 self.outbound.pop_front();
309 self.stats.outbound_sent = self.stats.outbound_sent.saturating_add(1);
310 self.push_event(RuntimeEvent::FrameSent {
311 kind: frame.kind,
312 sequence: frame.sequence,
313 bytes: frame.payload.len(),
314 });
315 }
316 Err(error @ (EmbeddedError::Backpressure | EmbeddedError::Disconnected)) => {
317 self.stats.outbound_deferred = self.stats.outbound_deferred.saturating_add(1);
318 self.push_event(RuntimeEvent::FrameDeferred {
319 kind: frame.kind,
320 sequence: frame.sequence,
321 error,
322 });
323 break;
324 }
325 Err(error) => {
326 self.outbound.pop_front();
327 self.stats.outbound_deferred = self.stats.outbound_deferred.saturating_add(1);
328 self.push_event(RuntimeEvent::FrameDeferred {
329 kind: frame.kind,
330 sequence: frame.sequence,
331 error,
332 });
333 return Err(error);
334 }
335 }
336 }
337 Ok(())
338 }
339
340 fn queue_announce(&mut self) -> EmbeddedResult<u32> {
341 let sequence = self.peek_next_sequence();
342 let frame =
343 PacketFrame::new(FRAME_KIND_ANNOUNCE, sequence, self.config.store_identity.to_vec())?;
344 self.enqueue_frame(frame)?;
345 Ok(sequence)
346 }
347
348 fn enqueue_frame(&mut self, frame: PacketFrame) -> EmbeddedResult<()> {
349 if self.outbound.len() >= self.config.max_outbound_queue {
350 return Err(EmbeddedError::Backpressure);
351 }
352 self.outbound.push_back(frame);
353 self.next_sequence = self.next_sequence.saturating_add(1);
354 Ok(())
355 }
356
357 fn has_queued_kind(&self, kind: u8) -> bool {
358 self.outbound.iter().any(|frame| frame.kind == kind)
359 }
360
361 fn announce_due(&self, now_ms: u64) -> bool {
362 match self.last_announce_at_ms {
363 None => true,
364 Some(last) => now_ms.saturating_sub(last) >= self.config.announce_interval_ms,
365 }
366 }
367
368 fn update_lifecycle(&mut self, link_state: LinkState) {
369 let next = if !self.bootstrapped {
370 NodeLifecycleState::Boot
371 } else if self.ble_recovery_active {
372 NodeLifecycleState::BleRecovery
373 } else {
374 match self.config.node_mode {
375 NodeTransportMode::BleOnly => NodeLifecycleState::Unprovisioned,
376 NodeTransportMode::TcpClient | NodeTransportMode::TcpServer => {
377 if !self.network_provisioned {
378 NodeLifecycleState::Unprovisioned
379 } else {
380 match link_state {
381 LinkState::Up => NodeLifecycleState::TcpOnline,
382 LinkState::Connecting => NodeLifecycleState::FailureReconnect,
383 LinkState::Down => NodeLifecycleState::ProvisionedOffline,
384 }
385 }
386 }
387 }
388 };
389
390 if self.lifecycle_state != next {
391 let from = self.lifecycle_state;
392 self.lifecycle_state = next;
393 self.push_event(RuntimeEvent::LifecycleChanged { from, to: next });
394 }
395 }
396
397 fn peek_next_sequence(&self) -> u32 {
398 self.next_sequence
399 }
400
401 fn push_event(&mut self, event: RuntimeEvent) {
402 if self.events.len() >= self.config.max_events {
403 self.events.pop_front();
404 }
405 self.events.push_back(event);
406 }
407}