Skip to main content

rns_embedded_runtime/node_parts/
eventsubscription.rs

1pub struct EventSubscription {
2    inner: SharedNode,
3    subscription_id: u64,
4}
5
6impl Clone for EventSubscription {
7    fn clone(&self) -> Self {
8        Self { inner: clone_inner(&self.inner), subscription_id: self.subscription_id }
9    }
10}
11
12impl EmbeddedNode {
13    pub fn new() -> Self {
14        #[cfg(feature = "std")]
15        let inner = Arc::new(StdNodeInner {
16            state: Mutex::new(NodeState::default()),
17            condvar: Condvar::new(),
18        });
19
20        #[cfg(not(feature = "std"))]
21        let inner = Rc::new(RefCell::new(NodeState::default()));
22
23        Self { inner }
24    }
25
26    pub fn start(&self, config: NodeConfig) -> Result<(), NodeError> {
27        let next_epoch = self.with_state(|state| {
28            if state.session.is_some() {
29                return Err(NodeError::AlreadyRunning);
30            }
31            let epoch = state.epoch.saturating_add(1);
32            let session = RuntimeSession::new(epoch, &config)?;
33            state.epoch = epoch;
34            state.session = Some(session);
35            state.event_capacity = config.runtime.max_events;
36            state.last_now_ms = 0;
37            signal_generation_change(state, epoch);
38            push_event_locked(
39                state,
40                NodeEventKind::StatusChanged {
41                    run_state: NodeRunState::Running,
42                    lifecycle_state: Some(NodeLifecycleState::Boot),
43                },
44                None,
45                0,
46            );
47            Ok(epoch)
48        })?;
49        self.notify_waiters();
50        #[cfg(feature = "std")]
51        self.start_driver(next_epoch);
52        #[cfg(not(feature = "std"))]
53        let _ = next_epoch;
54        Ok(())
55    }
56
57    pub fn stop(&self) -> Result<(), NodeError> {
58        #[cfg(feature = "std")]
59        let handle = self.with_state(|state| {
60            let handle = stop_driver_locked(state);
61            if state.session.is_some() {
62                state.session = None;
63                signal_stopped(state);
64                push_event_locked(
65                    state,
66                    NodeEventKind::StatusChanged {
67                        run_state: NodeRunState::Stopped,
68                        lifecycle_state: None,
69                    },
70                    None,
71                    state.last_now_ms,
72                );
73            }
74            Ok(handle)
75        })?;
76
77        #[cfg(not(feature = "std"))]
78        self.with_state(|state| {
79            if state.session.is_some() {
80                state.session = None;
81                signal_stopped(state);
82                push_event_locked(
83                    state,
84                    NodeEventKind::StatusChanged {
85                        run_state: NodeRunState::Stopped,
86                        lifecycle_state: None,
87                    },
88                    None,
89                    state.last_now_ms,
90                );
91            }
92            Ok(())
93        })?;
94
95        self.notify_waiters();
96
97        #[cfg(feature = "std")]
98        join_driver(handle);
99
100        Ok(())
101    }
102
103    pub fn restart(&self, config: NodeConfig) -> Result<(), NodeError> {
104        self.stop()?;
105        self.start(config)
106    }
107
108    pub fn get_status(&self) -> NodeStatus {
109        self.with_state_read(|state| {
110            state.session.as_ref().map_or(
111                NodeStatus {
112                    run_state: NodeRunState::Stopped,
113                    epoch: state.epoch,
114                    lifecycle_state: None,
115                    pending_outbound: 0,
116                    stats: RuntimeStats::default(),
117                    log_level: state.log_level,
118                },
119                |session| session.status(state.log_level),
120            )
121        })
122    }
123
124    pub fn send(
125        &self,
126        destination: [u8; 16],
127        data: &[u8],
128        _options: SendOptions,
129    ) -> Result<NodeOperationReceipt, NodeError> {
130        let receipt = self.with_state(|state| {
131            let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
132            if !session.has_outbound_capacity(1) {
133                return Err(NodeError::QueuePressure);
134            }
135            let sequence = session.queue_message(destination, data)?;
136            Ok(NodeOperationReceipt {
137                operation: NodeOperationKind::Send,
138                operation_id: u64::from(sequence),
139                epoch: session.epoch,
140                accepted_bytes: data.len(),
141                queued: true,
142                target_count: 1,
143            })
144        })?;
145        self.notify_waiters();
146        Ok(receipt)
147    }
148
149    pub fn broadcast(
150        &self,
151        data: &[u8],
152        options: BroadcastOptions,
153    ) -> Result<NodeOperationReceipt, NodeError> {
154        if options.destinations.is_empty() {
155            return Err(NodeError::InvalidConfig);
156        }
157        let receipt = self.with_state(|state| {
158            let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
159            if !session.has_outbound_capacity(options.destinations.len()) {
160                return Err(NodeError::QueuePressure);
161            }
162            let mut last_sequence = 0_u64;
163            for destination in &options.destinations {
164                last_sequence = u64::from(session.queue_message(*destination, data)?);
165            }
166            Ok(NodeOperationReceipt {
167                operation: NodeOperationKind::Broadcast,
168                operation_id: last_sequence,
169                epoch: session.epoch,
170                accepted_bytes: data.len(),
171                queued: true,
172                target_count: u32::try_from(options.destinations.len()).unwrap_or(u32::MAX),
173            })
174        })?;
175        self.notify_waiters();
176        Ok(receipt)
177    }
178
179    pub fn set_log_level(&self, level: NodeLogLevel) -> Result<(), NodeError> {
180        self.with_state(|state| {
181            state.log_level = level;
182            push_event_locked(
183                state,
184                NodeEventKind::Log { level, code: 0 },
185                None,
186                state.last_now_ms,
187            );
188            Ok(())
189        })?;
190        self.notify_waiters();
191        Ok(())
192    }
193
194    pub fn subscribe_events(&self) -> Result<EventSubscription, NodeError> {
195        let subscription_id = self.with_state(|state| {
196            let id = state.next_subscription_id;
197            state.next_subscription_id = state.next_subscription_id.saturating_add(1);
198            state.subscriptions.insert(
199                id,
200                SubscriptionState {
201                    next_event_id: state.next_event_id,
202                    pending_signals: VecDeque::new(),
203                },
204            );
205            Ok(id)
206        })?;
207
208        Ok(EventSubscription { inner: clone_inner(&self.inner), subscription_id })
209    }
210
211    pub fn tick(&self, now_ms: u64) -> Result<(), NodeError> {
212        self.with_state(|state| {
213            ensure_manual_progression_allowed(state)?;
214            let events = {
215                let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
216                session.tick(now_ms)?
217            };
218            state.last_now_ms = now_ms;
219            append_runtime_events_locked(state, events);
220            Ok(())
221        })?;
222        self.notify_waiters();
223        Ok(())
224    }
225
226    pub fn set_link_state(&self, state_value: LinkState) -> Result<(), NodeError> {
227        self.with_state(|state| {
228            let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
229            session.backend.set_link_state(state_value);
230            Ok(())
231        })
232    }
233
234    pub fn set_network_provisioned(&self, provisioned: bool) -> Result<(), NodeError> {
235        self.with_state(|state| {
236            let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
237            session.runtime.set_network_provisioned(provisioned);
238            Ok(())
239        })
240    }
241
242    pub fn set_ble_recovery_active(&self, active: bool) -> Result<(), NodeError> {
243        self.with_state(|state| {
244            let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
245            session.runtime.set_ble_recovery_active(active);
246            Ok(())
247        })
248    }
249
250    pub fn push_inbound_wire(&self, bytes: &[u8]) -> Result<(), NodeError> {
251        self.with_state(|state| {
252            let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
253            session.backend.push_inbound_wire(bytes)
254        })
255    }
256
257    pub fn take_outbound_wire(&self) -> Result<Option<Vec<u8>>, NodeError> {
258        self.with_state(|state| {
259            let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
260            Ok(session.backend.take_outbound_wire())
261        })
262    }
263
264    pub fn link_state(&self) -> Result<LinkState, NodeError> {
265        self.with_state_read_result(|state| {
266            let session = state.session.as_ref().ok_or(NodeError::NotRunning)?;
267            Ok(session.backend.link_state())
268        })
269    }
270
271    pub fn capability_supports_blocking_next(&self) -> bool {
272        cfg!(feature = "std")
273    }
274
275    pub fn capability_supports_managed_runtime(&self) -> bool {
276        cfg!(feature = "std")
277    }
278
279    pub fn capability_supports_event_gap_signaling(&self) -> bool {
280        true
281    }
282
283    #[cfg(feature = "std")]
284    fn start_driver(&self, epoch: u64) {
285        let start_instant = Instant::now();
286        if let Ok(mut state) = self.inner.state.lock() {
287            state.driver =
288                Some(DriverState { epoch, stop_requested: false, start_instant, handle: None });
289        }
290
291        let inner = Arc::clone(&self.inner);
292        let handle = thread::spawn(move || loop {
293            let continue_running = driver_tick(&inner, epoch);
294            if !continue_running {
295                break;
296            }
297            thread::sleep(DRIVER_TICK_SLEEP);
298        });
299
300        if let Ok(mut state) = self.inner.state.lock() {
301            if let Some(driver) = state.driver.as_mut() {
302                if driver.epoch == epoch {
303                    driver.handle = Some(handle);
304                    return;
305                }
306            }
307        }
308
309        let _ = handle.join();
310    }
311
312    #[cfg(feature = "std")]
313    fn notify_waiters(&self) {
314        self.inner.condvar.notify_all();
315    }
316
317    #[cfg(not(feature = "std"))]
318    fn notify_waiters(&self) {}
319
320    #[cfg(feature = "std")]
321    fn with_state<R>(
322        &self,
323        f: impl FnOnce(&mut NodeState) -> Result<R, NodeError>,
324    ) -> Result<R, NodeError> {
325        let mut state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
326        f(&mut state)
327    }
328
329    #[cfg(not(feature = "std"))]
330    fn with_state<R>(
331        &self,
332        f: impl FnOnce(&mut NodeState) -> Result<R, NodeError>,
333    ) -> Result<R, NodeError> {
334        let mut state = self.inner.try_borrow_mut().map_err(|_| NodeError::InternalError)?;
335        f(&mut state)
336    }
337
338    #[cfg(feature = "std")]
339    fn with_state_read<R>(&self, f: impl FnOnce(&NodeState) -> R) -> R {
340        let state = self.inner.state.lock().expect("node state poisoned");
341        f(&state)
342    }
343
344    #[cfg(not(feature = "std"))]
345    fn with_state_read<R>(&self, f: impl FnOnce(&NodeState) -> R) -> R {
346        let state = self.inner.borrow();
347        f(&state)
348    }
349
350    #[cfg(feature = "std")]
351    fn with_state_read_result<R>(
352        &self,
353        f: impl FnOnce(&NodeState) -> Result<R, NodeError>,
354    ) -> Result<R, NodeError> {
355        let state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
356        f(&state)
357    }
358
359    #[cfg(not(feature = "std"))]
360    fn with_state_read_result<R>(
361        &self,
362        f: impl FnOnce(&NodeState) -> Result<R, NodeError>,
363    ) -> Result<R, NodeError> {
364        let state = self.inner.try_borrow().map_err(|_| NodeError::InternalError)?;
365        f(&state)
366    }
367}
368
369impl EventSubscription {
370    pub fn next(&self, timeout_ms: u64) -> Result<PollResult, NodeError> {
371        #[cfg(feature = "std")]
372        {
373            if timeout_ms > MAX_BLOCKING_TIMEOUT_MS {
374                return Ok(PollResult::Timeout);
375            }
376
377            let Some(deadline) = Instant::now().checked_add(Duration::from_millis(timeout_ms))
378            else {
379                return Ok(PollResult::Timeout);
380            };
381            let mut state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
382            loop {
383                let result = next_poll_result_locked(&mut state, self.subscription_id);
384                if !matches!(result, PollResult::Timeout) || timeout_ms == 0 {
385                    return Ok(result);
386                }
387                let now = Instant::now();
388                if now >= deadline {
389                    return Ok(PollResult::Timeout);
390                }
391                let wait = deadline.saturating_duration_since(now);
392                let (next_state, _) = self
393                    .inner
394                    .condvar
395                    .wait_timeout(state, wait)
396                    .map_err(|_| NodeError::InternalError)?;
397                state = next_state;
398            }
399        }
400
401        #[cfg(not(feature = "std"))]
402        {
403            let mut state = self.inner.try_borrow_mut().map_err(|_| NodeError::InternalError)?;
404            let _ = timeout_ms;
405            Ok(next_poll_result_locked(&mut state, self.subscription_id))
406        }
407    }
408
409    pub fn close(&self) -> Result<(), NodeError> {
410        #[cfg(feature = "std")]
411        {
412            let mut state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
413            state.subscriptions.remove(&self.subscription_id);
414            self.inner.condvar.notify_all();
415            Ok(())
416        }
417
418        #[cfg(not(feature = "std"))]
419        {
420            let mut state = self.inner.try_borrow_mut().map_err(|_| NodeError::InternalError)?;
421            state.subscriptions.remove(&self.subscription_id);
422            Ok(())
423        }
424    }
425}