rns_embedded_runtime/node_parts/
eventsubscription.rs1pub struct EventSubscription {
2 inner: SharedNode,
3 subscription_id: u64,
4}
5
6impl Clone for EventSubscription {
7 fn clone(&self) -> Self {
8 Self { inner: clone_inner(&self.inner), subscription_id: self.subscription_id }
9 }
10}
11
12impl EmbeddedNode {
13 pub fn new() -> Self {
14 #[cfg(feature = "std")]
15 let inner = Arc::new(StdNodeInner {
16 state: Mutex::new(NodeState::default()),
17 condvar: Condvar::new(),
18 });
19
20 #[cfg(not(feature = "std"))]
21 let inner = Rc::new(RefCell::new(NodeState::default()));
22
23 Self { inner }
24 }
25
26 pub fn start(&self, config: NodeConfig) -> Result<(), NodeError> {
27 let next_epoch = self.with_state(|state| {
28 if state.session.is_some() {
29 return Err(NodeError::AlreadyRunning);
30 }
31 let epoch = state.epoch.saturating_add(1);
32 let session = RuntimeSession::new(epoch, &config)?;
33 state.epoch = epoch;
34 state.session = Some(session);
35 state.event_capacity = config.runtime.max_events;
36 state.last_now_ms = 0;
37 signal_generation_change(state, epoch);
38 push_event_locked(
39 state,
40 NodeEventKind::StatusChanged {
41 run_state: NodeRunState::Running,
42 lifecycle_state: Some(NodeLifecycleState::Boot),
43 },
44 None,
45 0,
46 );
47 Ok(epoch)
48 })?;
49 self.notify_waiters();
50 #[cfg(feature = "std")]
51 self.start_driver(next_epoch);
52 #[cfg(not(feature = "std"))]
53 let _ = next_epoch;
54 Ok(())
55 }
56
57 pub fn stop(&self) -> Result<(), NodeError> {
58 #[cfg(feature = "std")]
59 let handle = self.with_state(|state| {
60 let handle = stop_driver_locked(state);
61 if state.session.is_some() {
62 state.session = None;
63 signal_stopped(state);
64 push_event_locked(
65 state,
66 NodeEventKind::StatusChanged {
67 run_state: NodeRunState::Stopped,
68 lifecycle_state: None,
69 },
70 None,
71 state.last_now_ms,
72 );
73 }
74 Ok(handle)
75 })?;
76
77 #[cfg(not(feature = "std"))]
78 self.with_state(|state| {
79 if state.session.is_some() {
80 state.session = None;
81 signal_stopped(state);
82 push_event_locked(
83 state,
84 NodeEventKind::StatusChanged {
85 run_state: NodeRunState::Stopped,
86 lifecycle_state: None,
87 },
88 None,
89 state.last_now_ms,
90 );
91 }
92 Ok(())
93 })?;
94
95 self.notify_waiters();
96
97 #[cfg(feature = "std")]
98 join_driver(handle);
99
100 Ok(())
101 }
102
103 pub fn restart(&self, config: NodeConfig) -> Result<(), NodeError> {
104 self.stop()?;
105 self.start(config)
106 }
107
108 pub fn get_status(&self) -> NodeStatus {
109 self.with_state_read(|state| {
110 state.session.as_ref().map_or(
111 NodeStatus {
112 run_state: NodeRunState::Stopped,
113 epoch: state.epoch,
114 lifecycle_state: None,
115 pending_outbound: 0,
116 stats: RuntimeStats::default(),
117 log_level: state.log_level,
118 },
119 |session| session.status(state.log_level),
120 )
121 })
122 }
123
124 pub fn send(
125 &self,
126 destination: [u8; 16],
127 data: &[u8],
128 _options: SendOptions,
129 ) -> Result<NodeOperationReceipt, NodeError> {
130 let receipt = self.with_state(|state| {
131 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
132 if !session.has_outbound_capacity(1) {
133 return Err(NodeError::QueuePressure);
134 }
135 let sequence = session.queue_message(destination, data)?;
136 Ok(NodeOperationReceipt {
137 operation: NodeOperationKind::Send,
138 operation_id: u64::from(sequence),
139 epoch: session.epoch,
140 accepted_bytes: data.len(),
141 queued: true,
142 target_count: 1,
143 })
144 })?;
145 self.notify_waiters();
146 Ok(receipt)
147 }
148
149 pub fn broadcast(
150 &self,
151 data: &[u8],
152 options: BroadcastOptions,
153 ) -> Result<NodeOperationReceipt, NodeError> {
154 if options.destinations.is_empty() {
155 return Err(NodeError::InvalidConfig);
156 }
157 let receipt = self.with_state(|state| {
158 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
159 if !session.has_outbound_capacity(options.destinations.len()) {
160 return Err(NodeError::QueuePressure);
161 }
162 let mut last_sequence = 0_u64;
163 for destination in &options.destinations {
164 last_sequence = u64::from(session.queue_message(*destination, data)?);
165 }
166 Ok(NodeOperationReceipt {
167 operation: NodeOperationKind::Broadcast,
168 operation_id: last_sequence,
169 epoch: session.epoch,
170 accepted_bytes: data.len(),
171 queued: true,
172 target_count: u32::try_from(options.destinations.len()).unwrap_or(u32::MAX),
173 })
174 })?;
175 self.notify_waiters();
176 Ok(receipt)
177 }
178
179 pub fn set_log_level(&self, level: NodeLogLevel) -> Result<(), NodeError> {
180 self.with_state(|state| {
181 state.log_level = level;
182 push_event_locked(
183 state,
184 NodeEventKind::Log { level, code: 0 },
185 None,
186 state.last_now_ms,
187 );
188 Ok(())
189 })?;
190 self.notify_waiters();
191 Ok(())
192 }
193
194 pub fn subscribe_events(&self) -> Result<EventSubscription, NodeError> {
195 let subscription_id = self.with_state(|state| {
196 let id = state.next_subscription_id;
197 state.next_subscription_id = state.next_subscription_id.saturating_add(1);
198 state.subscriptions.insert(
199 id,
200 SubscriptionState {
201 next_event_id: state.next_event_id,
202 pending_signals: VecDeque::new(),
203 },
204 );
205 Ok(id)
206 })?;
207
208 Ok(EventSubscription { inner: clone_inner(&self.inner), subscription_id })
209 }
210
211 pub fn tick(&self, now_ms: u64) -> Result<(), NodeError> {
212 self.with_state(|state| {
213 ensure_manual_progression_allowed(state)?;
214 let events = {
215 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
216 session.tick(now_ms)?
217 };
218 state.last_now_ms = now_ms;
219 append_runtime_events_locked(state, events);
220 Ok(())
221 })?;
222 self.notify_waiters();
223 Ok(())
224 }
225
226 pub fn set_link_state(&self, state_value: LinkState) -> Result<(), NodeError> {
227 self.with_state(|state| {
228 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
229 session.backend.set_link_state(state_value);
230 Ok(())
231 })
232 }
233
234 pub fn set_network_provisioned(&self, provisioned: bool) -> Result<(), NodeError> {
235 self.with_state(|state| {
236 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
237 session.runtime.set_network_provisioned(provisioned);
238 Ok(())
239 })
240 }
241
242 pub fn set_ble_recovery_active(&self, active: bool) -> Result<(), NodeError> {
243 self.with_state(|state| {
244 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
245 session.runtime.set_ble_recovery_active(active);
246 Ok(())
247 })
248 }
249
250 pub fn push_inbound_wire(&self, bytes: &[u8]) -> Result<(), NodeError> {
251 self.with_state(|state| {
252 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
253 session.backend.push_inbound_wire(bytes)
254 })
255 }
256
257 pub fn take_outbound_wire(&self) -> Result<Option<Vec<u8>>, NodeError> {
258 self.with_state(|state| {
259 let session = state.session.as_mut().ok_or(NodeError::NotRunning)?;
260 Ok(session.backend.take_outbound_wire())
261 })
262 }
263
264 pub fn link_state(&self) -> Result<LinkState, NodeError> {
265 self.with_state_read_result(|state| {
266 let session = state.session.as_ref().ok_or(NodeError::NotRunning)?;
267 Ok(session.backend.link_state())
268 })
269 }
270
271 pub fn capability_supports_blocking_next(&self) -> bool {
272 cfg!(feature = "std")
273 }
274
275 pub fn capability_supports_managed_runtime(&self) -> bool {
276 cfg!(feature = "std")
277 }
278
279 pub fn capability_supports_event_gap_signaling(&self) -> bool {
280 true
281 }
282
283 #[cfg(feature = "std")]
284 fn start_driver(&self, epoch: u64) {
285 let start_instant = Instant::now();
286 if let Ok(mut state) = self.inner.state.lock() {
287 state.driver =
288 Some(DriverState { epoch, stop_requested: false, start_instant, handle: None });
289 }
290
291 let inner = Arc::clone(&self.inner);
292 let handle = thread::spawn(move || loop {
293 let continue_running = driver_tick(&inner, epoch);
294 if !continue_running {
295 break;
296 }
297 thread::sleep(DRIVER_TICK_SLEEP);
298 });
299
300 if let Ok(mut state) = self.inner.state.lock() {
301 if let Some(driver) = state.driver.as_mut() {
302 if driver.epoch == epoch {
303 driver.handle = Some(handle);
304 return;
305 }
306 }
307 }
308
309 let _ = handle.join();
310 }
311
312 #[cfg(feature = "std")]
313 fn notify_waiters(&self) {
314 self.inner.condvar.notify_all();
315 }
316
317 #[cfg(not(feature = "std"))]
318 fn notify_waiters(&self) {}
319
320 #[cfg(feature = "std")]
321 fn with_state<R>(
322 &self,
323 f: impl FnOnce(&mut NodeState) -> Result<R, NodeError>,
324 ) -> Result<R, NodeError> {
325 let mut state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
326 f(&mut state)
327 }
328
329 #[cfg(not(feature = "std"))]
330 fn with_state<R>(
331 &self,
332 f: impl FnOnce(&mut NodeState) -> Result<R, NodeError>,
333 ) -> Result<R, NodeError> {
334 let mut state = self.inner.try_borrow_mut().map_err(|_| NodeError::InternalError)?;
335 f(&mut state)
336 }
337
338 #[cfg(feature = "std")]
339 fn with_state_read<R>(&self, f: impl FnOnce(&NodeState) -> R) -> R {
340 let state = self.inner.state.lock().expect("node state poisoned");
341 f(&state)
342 }
343
344 #[cfg(not(feature = "std"))]
345 fn with_state_read<R>(&self, f: impl FnOnce(&NodeState) -> R) -> R {
346 let state = self.inner.borrow();
347 f(&state)
348 }
349
350 #[cfg(feature = "std")]
351 fn with_state_read_result<R>(
352 &self,
353 f: impl FnOnce(&NodeState) -> Result<R, NodeError>,
354 ) -> Result<R, NodeError> {
355 let state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
356 f(&state)
357 }
358
359 #[cfg(not(feature = "std"))]
360 fn with_state_read_result<R>(
361 &self,
362 f: impl FnOnce(&NodeState) -> Result<R, NodeError>,
363 ) -> Result<R, NodeError> {
364 let state = self.inner.try_borrow().map_err(|_| NodeError::InternalError)?;
365 f(&state)
366 }
367}
368
369impl EventSubscription {
370 pub fn next(&self, timeout_ms: u64) -> Result<PollResult, NodeError> {
371 #[cfg(feature = "std")]
372 {
373 if timeout_ms > MAX_BLOCKING_TIMEOUT_MS {
374 return Ok(PollResult::Timeout);
375 }
376
377 let Some(deadline) = Instant::now().checked_add(Duration::from_millis(timeout_ms))
378 else {
379 return Ok(PollResult::Timeout);
380 };
381 let mut state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
382 loop {
383 let result = next_poll_result_locked(&mut state, self.subscription_id);
384 if !matches!(result, PollResult::Timeout) || timeout_ms == 0 {
385 return Ok(result);
386 }
387 let now = Instant::now();
388 if now >= deadline {
389 return Ok(PollResult::Timeout);
390 }
391 let wait = deadline.saturating_duration_since(now);
392 let (next_state, _) = self
393 .inner
394 .condvar
395 .wait_timeout(state, wait)
396 .map_err(|_| NodeError::InternalError)?;
397 state = next_state;
398 }
399 }
400
401 #[cfg(not(feature = "std"))]
402 {
403 let mut state = self.inner.try_borrow_mut().map_err(|_| NodeError::InternalError)?;
404 let _ = timeout_ms;
405 Ok(next_poll_result_locked(&mut state, self.subscription_id))
406 }
407 }
408
409 pub fn close(&self) -> Result<(), NodeError> {
410 #[cfg(feature = "std")]
411 {
412 let mut state = self.inner.state.lock().map_err(|_| NodeError::InternalError)?;
413 state.subscriptions.remove(&self.subscription_id);
414 self.inner.condvar.notify_all();
415 Ok(())
416 }
417
418 #[cfg(not(feature = "std"))]
419 {
420 let mut state = self.inner.try_borrow_mut().map_err(|_| NodeError::InternalError)?;
421 state.subscriptions.remove(&self.subscription_id);
422 Ok(())
423 }
424 }
425}