1use crate::{
2 ble::{BleShimConfig, BleShimTransport},
3 constants::DEFAULT_CAPTURE_MAX_BYTES,
4 RuntimeConfig, RuntimeEvent, RuntimeStats,
5};
6
7use alloc::{
8 collections::{BTreeMap, VecDeque},
9 string::String,
10 vec::Vec,
11};
12
13use rns_embedded_core::{store::JournaledEmbeddedStore, transport::LinkState, EmbeddedError};
14
15#[cfg(feature = "std")]
16use alloc::sync::Arc;
17
18#[cfg(not(feature = "std"))]
19use alloc::rc::Rc;
20
21#[cfg(not(feature = "std"))]
22use core::cell::RefCell;
23
24#[cfg(feature = "std")]
25use std::{
26 net::{SocketAddr, TcpListener, ToSocketAddrs},
27 sync::{Condvar, Mutex},
28 thread::{self, JoinHandle},
29 time::{Duration, Instant},
30};
31
32#[cfg(feature = "std")]
33use crate::tcp::TcpEmbeddedTransport;
34
35#[cfg(feature = "std")]
36const DRIVER_TICK_MS: u64 = 25;
37
38#[cfg(feature = "std")]
39const DRIVER_TICK_SLEEP: Duration = Duration::from_millis(DRIVER_TICK_MS);
40
41#[cfg(feature = "std")]
44const MAX_BLOCKING_TIMEOUT_MS: u64 = u32::MAX as u64;
45
46pub const NODE_EXTENSION_ID_BOOTSTRAPPED: u32 = 1;
47
48pub const NODE_EXTENSION_ID_MESSAGE_QUEUED: u32 = 2;
49
50pub const NODE_EXTENSION_ID_RECEIVED_SUMMARY: u32 = 3;
51
52#[cfg(feature = "std")]
53const DEFAULT_TCP_MTU_HINT: u16 = 1024;
54
55#[derive(Debug, Clone, Copy, Eq, PartialEq)]
56pub enum NodeTransportMode {
57 BleOnly,
58 TcpClient,
59 TcpServer,
60}
61
62#[derive(Debug, Clone, Copy, Eq, PartialEq)]
63pub enum NodeLifecycleState {
64 Boot,
65 Unprovisioned,
66 ProvisionedOffline,
67 TcpOnline,
68 BleRecovery,
69 FailureReconnect,
70}
71
72#[derive(Debug, Clone, Copy, Eq, PartialEq)]
73pub struct CaptureDefaults {
74 pub max_bytes: u32,
75}
76
77impl Default for CaptureDefaults {
78 fn default() -> Self {
79 Self { max_bytes: DEFAULT_CAPTURE_MAX_BYTES }
80 }
81}
82
83#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct BleNodeBackendConfig {
85 pub mtu_hint: u16,
86 pub max_inbound_frames: usize,
87 pub max_outbound_frames: usize,
88 pub ordered_delivery: bool,
89}
90
91impl Default for BleNodeBackendConfig {
92 fn default() -> Self {
93 let config = BleShimConfig::default();
94 Self {
95 mtu_hint: config.mtu_hint,
96 max_inbound_frames: config.max_inbound_frames,
97 max_outbound_frames: config.max_outbound_frames,
98 ordered_delivery: config.ordered_delivery,
99 }
100 }
101}
102
103impl From<&BleNodeBackendConfig> for BleShimConfig {
104 fn from(value: &BleNodeBackendConfig) -> Self {
105 Self {
106 mtu_hint: value.mtu_hint,
107 max_inbound_frames: value.max_inbound_frames,
108 max_outbound_frames: value.max_outbound_frames,
109 ordered_delivery: value.ordered_delivery,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Eq, PartialEq)]
115pub struct TcpClientConfig {
116 pub host: String,
117 pub port: u16,
118 pub reconnect_backoff_ms: Vec<u64>,
119}
120
121#[derive(Debug, Clone, Copy, Eq, PartialEq)]
122pub struct TcpServerConfig {
123 pub listen_port: u16,
124}
125
126#[derive(Debug, Clone, Eq, PartialEq)]
127pub enum NodeBackendConfig {
128 Ble(BleNodeBackendConfig),
129 #[cfg(feature = "std")]
130 TcpClient(TcpClientConfig),
131 #[cfg(feature = "std")]
132 TcpServer(TcpServerConfig),
133}
134
135#[derive(Debug, Clone, Eq, PartialEq)]
136pub struct NodeConfig {
137 pub runtime: RuntimeConfig,
138 pub backend: NodeBackendConfig,
139}
140
141impl Default for NodeConfig {
142 fn default() -> Self {
143 Self {
144 runtime: RuntimeConfig::default(),
145 backend: NodeBackendConfig::Ble(BleNodeBackendConfig::default()),
146 }
147 }
148}
149
150#[derive(Debug, Clone, Copy, Eq, PartialEq)]
151pub enum NodeRunState {
152 Stopped,
153 Running,
154}
155
156#[derive(Debug, Clone, Copy, Eq, PartialEq)]
157pub enum NodeLogLevel {
158 Error,
159 Warn,
160 Info,
161 Debug,
162 Trace,
163}
164
165#[derive(Debug, Clone, Copy, Eq, PartialEq)]
166pub enum NodeOperationKind {
167 Send,
168 Broadcast,
169}
170
171#[derive(Debug, Clone, Copy, Eq, PartialEq)]
172pub struct NodeOperationReceipt {
173 pub operation: NodeOperationKind,
174 pub operation_id: u64,
175 pub epoch: u64,
176 pub accepted_bytes: usize,
177 pub queued: bool,
178 pub target_count: u32,
179}
180
181#[derive(Debug, Clone, Copy, Eq, PartialEq)]
182pub struct SendOptions;
183
184#[derive(Debug, Clone, Eq, PartialEq, Default)]
185pub struct BroadcastOptions {
186 pub destinations: Vec<[u8; 16]>,
187}
188
189#[derive(Debug, Clone, Copy, Eq, PartialEq)]
190pub struct NodeStatus {
191 pub run_state: NodeRunState,
192 pub epoch: u64,
193 pub lifecycle_state: Option<NodeLifecycleState>,
194 pub pending_outbound: usize,
195 pub stats: RuntimeStats,
196 pub log_level: NodeLogLevel,
197}
198
199#[derive(Debug, Clone, Eq, PartialEq)]
200pub enum NodeError {
201 InvalidConfig,
202 IoError,
203 NetworkError,
204 ReticulumError,
205 AlreadyRunning,
206 NotRunning,
207 Timeout,
208 InternalError,
209 ModeConflict,
210 SubscriptionClosed,
211 NodeRestarted,
212 EventGap,
213 QueuePressure,
214}
215
216impl From<EmbeddedError> for NodeError {
217 fn from(value: EmbeddedError) -> Self {
218 match value {
219 EmbeddedError::InvalidInput
220 | EmbeddedError::InvalidArgument
221 | EmbeddedError::Unsupported => Self::InvalidConfig,
222 EmbeddedError::Timeout => Self::Timeout,
223 EmbeddedError::Backpressure => Self::QueuePressure,
224 EmbeddedError::Disconnected => Self::NetworkError,
225 EmbeddedError::IntegrityFailure
226 | EmbeddedError::ChecksumMismatch
227 | EmbeddedError::IdempotencyConflict
228 | EmbeddedError::ReplayRejected
229 | EmbeddedError::SeqGap
230 | EmbeddedError::NotFound
231 | EmbeddedError::InvalidCursor => Self::ReticulumError,
232 EmbeddedError::StorageCorruption | EmbeddedError::InvalidState => Self::InternalError,
233 }
234 }
235}
236
237#[derive(Debug, Clone, Eq, PartialEq)]
238pub enum NodeEventKind {
239 StatusChanged { run_state: NodeRunState, lifecycle_state: Option<NodeLifecycleState> },
240 Log { level: NodeLogLevel, code: u32 },
241 Error { error: NodeError, frame_kind: u8, sequence: u32 },
242 PacketReceived { frame_kind: u8, sequence: u32, bytes: usize },
243 PacketSent { frame_kind: u8, sequence: u32, bytes: usize },
244 Extension { extension_id: u32, value0: u64, value1: u64 },
245}
246
247#[derive(Debug, Clone, Eq, PartialEq)]
248pub struct NodeEvent {
249 pub event_id: u64,
250 pub epoch: u64,
251 pub occurred_at_ms: u64,
252 pub operation_id: Option<u64>,
253 pub kind: NodeEventKind,
254}
255
256#[derive(Debug, Clone, Eq, PartialEq)]
257pub enum PollResult {
258 Event(NodeEvent),
259 Timeout,
260 Closed,
261 Gap { next_event_id: u64 },
262 NodeStopped,
263 NodeRestarted { epoch: u64 },
264}
265
266enum NodeBackend {
267 Ble(BleShimTransport),
268 #[cfg(feature = "std")]
269 Tcp(TcpEmbeddedTransport),
270}
271
272impl NodeBackend {
273 fn set_link_state(&mut self, state: LinkState) {
274 match self {
275 Self::Ble(transport) => transport.set_link_state(state),
276 #[cfg(feature = "std")]
277 Self::Tcp(_) => {}
278 }
279 }
280
281 fn push_inbound_wire(&mut self, bytes: &[u8]) -> Result<(), NodeError> {
282 match self {
283 Self::Ble(transport) => transport.push_inbound_wire(bytes).map_err(NodeError::from),
284 #[cfg(feature = "std")]
285 Self::Tcp(_) => {
286 let _ = bytes;
287 Err(NodeError::ModeConflict)
288 }
289 }
290 }
291
292 fn take_outbound_wire(&mut self) -> Option<Vec<u8>> {
293 match self {
294 Self::Ble(transport) => transport.take_outbound_wire(),
295 #[cfg(feature = "std")]
296 Self::Tcp(_) => None,
297 }
298 }
299
300 fn link_state(&self) -> LinkState {
301 match self {
302 Self::Ble(transport) => {
303 rns_embedded_core::transport::EmbeddedTransport::link_state(transport)
304 }
305 #[cfg(feature = "std")]
306 Self::Tcp(transport) => {
307 rns_embedded_core::transport::EmbeddedTransport::link_state(transport)
308 }
309 }
310 }
311}
312
313#[cfg(feature = "std")]
314fn resolve_tcp_addr(config: &TcpClientConfig) -> Result<SocketAddr, NodeError> {
315 (config.host.as_str(), config.port)
316 .to_socket_addrs()
317 .map_err(|_| NodeError::InvalidConfig)?
318 .next()
319 .ok_or(NodeError::InvalidConfig)
320}
321
322#[cfg(feature = "std")]
323fn tcp_server_transport(config: &TcpServerConfig) -> Result<TcpEmbeddedTransport, NodeError> {
324 let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
325 .map_err(|_| NodeError::IoError)?;
326 let (stream, _) = listener.accept().map_err(|_| NodeError::IoError)?;
327 TcpEmbeddedTransport::from_stream(stream, DEFAULT_TCP_MTU_HINT).map_err(NodeError::from)
328}
329
330struct RuntimeSession {
331 epoch: u64,
332 runtime: crate::EmbeddedNodeRuntime,
333 store: JournaledEmbeddedStore,
334 backend: NodeBackend,
335}
336
337impl RuntimeSession {
338 fn new(epoch: u64, config: &NodeConfig) -> Result<Self, NodeError> {
339 let runtime = crate::EmbeddedNodeRuntime::new(config.runtime).map_err(NodeError::from)?;
340 let backend = match &config.backend {
341 NodeBackendConfig::Ble(ble) => {
342 if config.runtime.node_mode != NodeTransportMode::BleOnly {
343 return Err(NodeError::InvalidConfig);
344 }
345 NodeBackend::Ble(
346 BleShimTransport::new(BleShimConfig::from(ble)).map_err(NodeError::from)?,
347 )
348 }
349 #[cfg(feature = "std")]
350 NodeBackendConfig::TcpClient(tcp) => {
351 if config.runtime.node_mode != NodeTransportMode::TcpClient {
352 return Err(NodeError::InvalidConfig);
353 }
354 let addr = resolve_tcp_addr(tcp)?;
355 NodeBackend::Tcp(
356 TcpEmbeddedTransport::connect(addr, DEFAULT_TCP_MTU_HINT)
357 .map_err(NodeError::from)?,
358 )
359 }
360 #[cfg(feature = "std")]
361 NodeBackendConfig::TcpServer(tcp) => {
362 if config.runtime.node_mode != NodeTransportMode::TcpServer {
363 return Err(NodeError::InvalidConfig);
364 }
365 NodeBackend::Tcp(tcp_server_transport(tcp)?)
366 }
367 };
368
369 Ok(Self { epoch, runtime, store: JournaledEmbeddedStore::new(), backend })
370 }
371
372 fn tick(&mut self, now_ms: u64) -> Result<Vec<RuntimeEvent>, NodeError> {
373 match &mut self.backend {
374 NodeBackend::Ble(transport) => {
375 self.runtime.tick(now_ms, transport, &mut self.store).map_err(NodeError::from)?
376 }
377 #[cfg(feature = "std")]
378 NodeBackend::Tcp(transport) => {
379 self.runtime.tick(now_ms, transport, &mut self.store).map_err(NodeError::from)?
380 }
381 }
382 Ok(self.runtime.drain_events())
383 }
384
385 fn queue_message(&mut self, destination: [u8; 16], data: &[u8]) -> Result<u32, NodeError> {
386 self.runtime.queue_message(destination, data).map_err(NodeError::from)
387 }
388
389 fn has_outbound_capacity(&self, needed_slots: usize) -> bool {
390 let capacity = self.runtime.config().max_outbound_queue;
391 let used = self.runtime.pending_outbound_len();
392 capacity.saturating_sub(used) >= needed_slots
393 }
394
395 fn status(&self, log_level: NodeLogLevel) -> NodeStatus {
396 NodeStatus {
397 run_state: NodeRunState::Running,
398 epoch: self.epoch,
399 lifecycle_state: Some(self.runtime.lifecycle_state()),
400 pending_outbound: self.runtime.pending_outbound_len(),
401 stats: self.runtime.stats(),
402 log_level,
403 }
404 }
405}
406
407#[derive(Debug, Clone)]
408enum PendingSignal {
409 NodeStopped,
410 NodeRestarted { epoch: u64 },
411}
412
413struct SubscriptionState {
414 next_event_id: u64,
415 pending_signals: VecDeque<PendingSignal>,
416}
417
418#[cfg(feature = "std")]
419struct DriverState {
420 epoch: u64,
421 stop_requested: bool,
422 start_instant: Instant,
423 handle: Option<JoinHandle<()>>,
424}
425
426struct NodeState {
427 epoch: u64,
428 session: Option<RuntimeSession>,
429 log_level: NodeLogLevel,
430 next_event_id: u64,
431 next_subscription_id: u64,
432 last_now_ms: u64,
433 event_capacity: usize,
434 event_log: VecDeque<NodeEvent>,
435 subscriptions: BTreeMap<u64, SubscriptionState>,
436 #[cfg(feature = "std")]
437 driver: Option<DriverState>,
438}
439
440impl Default for NodeState {
441 fn default() -> Self {
442 Self {
443 epoch: 0,
444 session: None,
445 log_level: NodeLogLevel::Info,
446 next_event_id: 1,
447 next_subscription_id: 1,
448 last_now_ms: 0,
449 event_capacity: RuntimeConfig::default().max_events,
450 event_log: VecDeque::new(),
451 subscriptions: BTreeMap::new(),
452 #[cfg(feature = "std")]
453 driver: None,
454 }
455 }
456}
457
458#[cfg(feature = "std")]
459struct StdNodeInner {
460 state: Mutex<NodeState>,
461 condvar: Condvar,
462}
463
464#[cfg(feature = "std")]
465type SharedNode = Arc<StdNodeInner>;
466
467#[cfg(not(feature = "std"))]
468type SharedNode = Rc<RefCell<NodeState>>;
469
470pub struct EmbeddedNode {
471 inner: SharedNode,
472}
473
474impl Clone for EmbeddedNode {
475 fn clone(&self) -> Self {
476 Self { inner: clone_inner(&self.inner) }
477 }
478}
479
480impl Default for EmbeddedNode {
481 fn default() -> Self {
482 Self::new()
483 }
484}