Skip to main content

rns_embedded_runtime/node_parts/
ants.rs

1use crate::{
2    ble::{BleShimConfig, BleShimTransport},
3    constants::DEFAULT_CAPTURE_MAX_BYTES,
4    RuntimeConfig, RuntimeEvent, RuntimeStats,
5};
6
7use alloc::{
8    collections::{BTreeMap, VecDeque},
9    string::String,
10    vec::Vec,
11};
12
13use rns_embedded_core::{store::JournaledEmbeddedStore, transport::LinkState, EmbeddedError};
14
15#[cfg(feature = "std")]
16use alloc::sync::Arc;
17
18#[cfg(not(feature = "std"))]
19use alloc::rc::Rc;
20
21#[cfg(not(feature = "std"))]
22use core::cell::RefCell;
23
24#[cfg(feature = "std")]
25use std::{
26    net::{SocketAddr, TcpListener, ToSocketAddrs},
27    sync::{Condvar, Mutex},
28    thread::{self, JoinHandle},
29    time::{Duration, Instant},
30};
31
32#[cfg(feature = "std")]
33use crate::tcp::TcpEmbeddedTransport;
34
35#[cfg(feature = "std")]
36const DRIVER_TICK_MS: u64 = 25;
37
38#[cfg(feature = "std")]
39const DRIVER_TICK_SLEEP: Duration = Duration::from_millis(DRIVER_TICK_MS);
40
41// FFI callers can pass arbitrarily large timeouts; clamp the blocking surface to a
42// practical upper bound so impossible waits degrade into a safe timeout result.
43#[cfg(feature = "std")]
44const MAX_BLOCKING_TIMEOUT_MS: u64 = u32::MAX as u64;
45
46pub const NODE_EXTENSION_ID_BOOTSTRAPPED: u32 = 1;
47
48pub const NODE_EXTENSION_ID_MESSAGE_QUEUED: u32 = 2;
49
50pub const NODE_EXTENSION_ID_RECEIVED_SUMMARY: u32 = 3;
51
52#[cfg(feature = "std")]
53const DEFAULT_TCP_MTU_HINT: u16 = 1024;
54
55#[derive(Debug, Clone, Copy, Eq, PartialEq)]
56pub enum NodeTransportMode {
57    BleOnly,
58    TcpClient,
59    TcpServer,
60}
61
62#[derive(Debug, Clone, Copy, Eq, PartialEq)]
63pub enum NodeLifecycleState {
64    Boot,
65    Unprovisioned,
66    ProvisionedOffline,
67    TcpOnline,
68    BleRecovery,
69    FailureReconnect,
70}
71
72#[derive(Debug, Clone, Copy, Eq, PartialEq)]
73pub struct CaptureDefaults {
74    pub max_bytes: u32,
75}
76
77impl Default for CaptureDefaults {
78    fn default() -> Self {
79        Self { max_bytes: DEFAULT_CAPTURE_MAX_BYTES }
80    }
81}
82
83#[derive(Debug, Clone, Eq, PartialEq)]
84pub struct BleNodeBackendConfig {
85    pub mtu_hint: u16,
86    pub max_inbound_frames: usize,
87    pub max_outbound_frames: usize,
88    pub ordered_delivery: bool,
89}
90
91impl Default for BleNodeBackendConfig {
92    fn default() -> Self {
93        let config = BleShimConfig::default();
94        Self {
95            mtu_hint: config.mtu_hint,
96            max_inbound_frames: config.max_inbound_frames,
97            max_outbound_frames: config.max_outbound_frames,
98            ordered_delivery: config.ordered_delivery,
99        }
100    }
101}
102
103impl From<&BleNodeBackendConfig> for BleShimConfig {
104    fn from(value: &BleNodeBackendConfig) -> Self {
105        Self {
106            mtu_hint: value.mtu_hint,
107            max_inbound_frames: value.max_inbound_frames,
108            max_outbound_frames: value.max_outbound_frames,
109            ordered_delivery: value.ordered_delivery,
110        }
111    }
112}
113
114#[derive(Debug, Clone, Eq, PartialEq)]
115pub struct TcpClientConfig {
116    pub host: String,
117    pub port: u16,
118    pub reconnect_backoff_ms: Vec<u64>,
119}
120
121#[derive(Debug, Clone, Copy, Eq, PartialEq)]
122pub struct TcpServerConfig {
123    pub listen_port: u16,
124}
125
126#[derive(Debug, Clone, Eq, PartialEq)]
127pub enum NodeBackendConfig {
128    Ble(BleNodeBackendConfig),
129    #[cfg(feature = "std")]
130    TcpClient(TcpClientConfig),
131    #[cfg(feature = "std")]
132    TcpServer(TcpServerConfig),
133}
134
135#[derive(Debug, Clone, Eq, PartialEq)]
136pub struct NodeConfig {
137    pub runtime: RuntimeConfig,
138    pub backend: NodeBackendConfig,
139}
140
141impl Default for NodeConfig {
142    fn default() -> Self {
143        Self {
144            runtime: RuntimeConfig::default(),
145            backend: NodeBackendConfig::Ble(BleNodeBackendConfig::default()),
146        }
147    }
148}
149
150#[derive(Debug, Clone, Copy, Eq, PartialEq)]
151pub enum NodeRunState {
152    Stopped,
153    Running,
154}
155
156#[derive(Debug, Clone, Copy, Eq, PartialEq)]
157pub enum NodeLogLevel {
158    Error,
159    Warn,
160    Info,
161    Debug,
162    Trace,
163}
164
165#[derive(Debug, Clone, Copy, Eq, PartialEq)]
166pub enum NodeOperationKind {
167    Send,
168    Broadcast,
169}
170
171#[derive(Debug, Clone, Copy, Eq, PartialEq)]
172pub struct NodeOperationReceipt {
173    pub operation: NodeOperationKind,
174    pub operation_id: u64,
175    pub epoch: u64,
176    pub accepted_bytes: usize,
177    pub queued: bool,
178    pub target_count: u32,
179}
180
181#[derive(Debug, Clone, Copy, Eq, PartialEq)]
182pub struct SendOptions;
183
184#[derive(Debug, Clone, Eq, PartialEq, Default)]
185pub struct BroadcastOptions {
186    pub destinations: Vec<[u8; 16]>,
187}
188
189#[derive(Debug, Clone, Copy, Eq, PartialEq)]
190pub struct NodeStatus {
191    pub run_state: NodeRunState,
192    pub epoch: u64,
193    pub lifecycle_state: Option<NodeLifecycleState>,
194    pub pending_outbound: usize,
195    pub stats: RuntimeStats,
196    pub log_level: NodeLogLevel,
197}
198
199#[derive(Debug, Clone, Eq, PartialEq)]
200pub enum NodeError {
201    InvalidConfig,
202    IoError,
203    NetworkError,
204    ReticulumError,
205    AlreadyRunning,
206    NotRunning,
207    Timeout,
208    InternalError,
209    ModeConflict,
210    SubscriptionClosed,
211    NodeRestarted,
212    EventGap,
213    QueuePressure,
214}
215
216impl From<EmbeddedError> for NodeError {
217    fn from(value: EmbeddedError) -> Self {
218        match value {
219            EmbeddedError::InvalidInput
220            | EmbeddedError::InvalidArgument
221            | EmbeddedError::Unsupported => Self::InvalidConfig,
222            EmbeddedError::Timeout => Self::Timeout,
223            EmbeddedError::Backpressure => Self::QueuePressure,
224            EmbeddedError::Disconnected => Self::NetworkError,
225            EmbeddedError::IntegrityFailure
226            | EmbeddedError::ChecksumMismatch
227            | EmbeddedError::IdempotencyConflict
228            | EmbeddedError::ReplayRejected
229            | EmbeddedError::SeqGap
230            | EmbeddedError::NotFound
231            | EmbeddedError::InvalidCursor => Self::ReticulumError,
232            EmbeddedError::StorageCorruption | EmbeddedError::InvalidState => Self::InternalError,
233        }
234    }
235}
236
237#[derive(Debug, Clone, Eq, PartialEq)]
238pub enum NodeEventKind {
239    StatusChanged { run_state: NodeRunState, lifecycle_state: Option<NodeLifecycleState> },
240    Log { level: NodeLogLevel, code: u32 },
241    Error { error: NodeError, frame_kind: u8, sequence: u32 },
242    PacketReceived { frame_kind: u8, sequence: u32, bytes: usize },
243    PacketSent { frame_kind: u8, sequence: u32, bytes: usize },
244    Extension { extension_id: u32, value0: u64, value1: u64 },
245}
246
247#[derive(Debug, Clone, Eq, PartialEq)]
248pub struct NodeEvent {
249    pub event_id: u64,
250    pub epoch: u64,
251    pub occurred_at_ms: u64,
252    pub operation_id: Option<u64>,
253    pub kind: NodeEventKind,
254}
255
256#[derive(Debug, Clone, Eq, PartialEq)]
257pub enum PollResult {
258    Event(NodeEvent),
259    Timeout,
260    Closed,
261    Gap { next_event_id: u64 },
262    NodeStopped,
263    NodeRestarted { epoch: u64 },
264}
265
266enum NodeBackend {
267    Ble(BleShimTransport),
268    #[cfg(feature = "std")]
269    Tcp(TcpEmbeddedTransport),
270}
271
272impl NodeBackend {
273    fn set_link_state(&mut self, state: LinkState) {
274        match self {
275            Self::Ble(transport) => transport.set_link_state(state),
276            #[cfg(feature = "std")]
277            Self::Tcp(_) => {}
278        }
279    }
280
281    fn push_inbound_wire(&mut self, bytes: &[u8]) -> Result<(), NodeError> {
282        match self {
283            Self::Ble(transport) => transport.push_inbound_wire(bytes).map_err(NodeError::from),
284            #[cfg(feature = "std")]
285            Self::Tcp(_) => {
286                let _ = bytes;
287                Err(NodeError::ModeConflict)
288            }
289        }
290    }
291
292    fn take_outbound_wire(&mut self) -> Option<Vec<u8>> {
293        match self {
294            Self::Ble(transport) => transport.take_outbound_wire(),
295            #[cfg(feature = "std")]
296            Self::Tcp(_) => None,
297        }
298    }
299
300    fn link_state(&self) -> LinkState {
301        match self {
302            Self::Ble(transport) => {
303                rns_embedded_core::transport::EmbeddedTransport::link_state(transport)
304            }
305            #[cfg(feature = "std")]
306            Self::Tcp(transport) => {
307                rns_embedded_core::transport::EmbeddedTransport::link_state(transport)
308            }
309        }
310    }
311}
312
313#[cfg(feature = "std")]
314fn resolve_tcp_addr(config: &TcpClientConfig) -> Result<SocketAddr, NodeError> {
315    (config.host.as_str(), config.port)
316        .to_socket_addrs()
317        .map_err(|_| NodeError::InvalidConfig)?
318        .next()
319        .ok_or(NodeError::InvalidConfig)
320}
321
322#[cfg(feature = "std")]
323fn tcp_server_transport(config: &TcpServerConfig) -> Result<TcpEmbeddedTransport, NodeError> {
324    let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], config.listen_port)))
325        .map_err(|_| NodeError::IoError)?;
326    let (stream, _) = listener.accept().map_err(|_| NodeError::IoError)?;
327    TcpEmbeddedTransport::from_stream(stream, DEFAULT_TCP_MTU_HINT).map_err(NodeError::from)
328}
329
330struct RuntimeSession {
331    epoch: u64,
332    runtime: crate::EmbeddedNodeRuntime,
333    store: JournaledEmbeddedStore,
334    backend: NodeBackend,
335}
336
337impl RuntimeSession {
338    fn new(epoch: u64, config: &NodeConfig) -> Result<Self, NodeError> {
339        let runtime = crate::EmbeddedNodeRuntime::new(config.runtime).map_err(NodeError::from)?;
340        let backend = match &config.backend {
341            NodeBackendConfig::Ble(ble) => {
342                if config.runtime.node_mode != NodeTransportMode::BleOnly {
343                    return Err(NodeError::InvalidConfig);
344                }
345                NodeBackend::Ble(
346                    BleShimTransport::new(BleShimConfig::from(ble)).map_err(NodeError::from)?,
347                )
348            }
349            #[cfg(feature = "std")]
350            NodeBackendConfig::TcpClient(tcp) => {
351                if config.runtime.node_mode != NodeTransportMode::TcpClient {
352                    return Err(NodeError::InvalidConfig);
353                }
354                let addr = resolve_tcp_addr(tcp)?;
355                NodeBackend::Tcp(
356                    TcpEmbeddedTransport::connect(addr, DEFAULT_TCP_MTU_HINT)
357                        .map_err(NodeError::from)?,
358                )
359            }
360            #[cfg(feature = "std")]
361            NodeBackendConfig::TcpServer(tcp) => {
362                if config.runtime.node_mode != NodeTransportMode::TcpServer {
363                    return Err(NodeError::InvalidConfig);
364                }
365                NodeBackend::Tcp(tcp_server_transport(tcp)?)
366            }
367        };
368
369        Ok(Self { epoch, runtime, store: JournaledEmbeddedStore::new(), backend })
370    }
371
372    fn tick(&mut self, now_ms: u64) -> Result<Vec<RuntimeEvent>, NodeError> {
373        match &mut self.backend {
374            NodeBackend::Ble(transport) => {
375                self.runtime.tick(now_ms, transport, &mut self.store).map_err(NodeError::from)?
376            }
377            #[cfg(feature = "std")]
378            NodeBackend::Tcp(transport) => {
379                self.runtime.tick(now_ms, transport, &mut self.store).map_err(NodeError::from)?
380            }
381        }
382        Ok(self.runtime.drain_events())
383    }
384
385    fn queue_message(&mut self, destination: [u8; 16], data: &[u8]) -> Result<u32, NodeError> {
386        self.runtime.queue_message(destination, data).map_err(NodeError::from)
387    }
388
389    fn has_outbound_capacity(&self, needed_slots: usize) -> bool {
390        let capacity = self.runtime.config().max_outbound_queue;
391        let used = self.runtime.pending_outbound_len();
392        capacity.saturating_sub(used) >= needed_slots
393    }
394
395    fn status(&self, log_level: NodeLogLevel) -> NodeStatus {
396        NodeStatus {
397            run_state: NodeRunState::Running,
398            epoch: self.epoch,
399            lifecycle_state: Some(self.runtime.lifecycle_state()),
400            pending_outbound: self.runtime.pending_outbound_len(),
401            stats: self.runtime.stats(),
402            log_level,
403        }
404    }
405}
406
407#[derive(Debug, Clone)]
408enum PendingSignal {
409    NodeStopped,
410    NodeRestarted { epoch: u64 },
411}
412
413struct SubscriptionState {
414    next_event_id: u64,
415    pending_signals: VecDeque<PendingSignal>,
416}
417
418#[cfg(feature = "std")]
419struct DriverState {
420    epoch: u64,
421    stop_requested: bool,
422    start_instant: Instant,
423    handle: Option<JoinHandle<()>>,
424}
425
426struct NodeState {
427    epoch: u64,
428    session: Option<RuntimeSession>,
429    log_level: NodeLogLevel,
430    next_event_id: u64,
431    next_subscription_id: u64,
432    last_now_ms: u64,
433    event_capacity: usize,
434    event_log: VecDeque<NodeEvent>,
435    subscriptions: BTreeMap<u64, SubscriptionState>,
436    #[cfg(feature = "std")]
437    driver: Option<DriverState>,
438}
439
440impl Default for NodeState {
441    fn default() -> Self {
442        Self {
443            epoch: 0,
444            session: None,
445            log_level: NodeLogLevel::Info,
446            next_event_id: 1,
447            next_subscription_id: 1,
448            last_now_ms: 0,
449            event_capacity: RuntimeConfig::default().max_events,
450            event_log: VecDeque::new(),
451            subscriptions: BTreeMap::new(),
452            #[cfg(feature = "std")]
453            driver: None,
454        }
455    }
456}
457
458#[cfg(feature = "std")]
459struct StdNodeInner {
460    state: Mutex<NodeState>,
461    condvar: Condvar,
462}
463
464#[cfg(feature = "std")]
465type SharedNode = Arc<StdNodeInner>;
466
467#[cfg(not(feature = "std"))]
468type SharedNode = Rc<RefCell<NodeState>>;
469
470pub struct EmbeddedNode {
471    inner: SharedNode,
472}
473
474impl Clone for EmbeddedNode {
475    fn clone(&self) -> Self {
476        Self { inner: clone_inner(&self.inner) }
477    }
478}
479
480impl Default for EmbeddedNode {
481    fn default() -> Self {
482        Self::new()
483    }
484}