rns_embedded_runtime/node_parts/
next_poll_result_locked.rs1fn next_poll_result_locked(state: &mut NodeState, subscription_id: u64) -> PollResult {
2 let Some(subscription) = state.subscriptions.get_mut(&subscription_id) else {
3 return PollResult::Closed;
4 };
5
6 if let Some(signal) = subscription.pending_signals.pop_front() {
7 return match signal {
8 PendingSignal::NodeStopped => PollResult::NodeStopped,
9 PendingSignal::NodeRestarted { epoch } => PollResult::NodeRestarted { epoch },
10 };
11 }
12
13 let Some(first_event) = state.event_log.front() else {
14 return PollResult::Timeout;
15 };
16
17 if subscription.next_event_id < first_event.event_id {
18 subscription.next_event_id = first_event.event_id;
19 return PollResult::Gap { next_event_id: first_event.event_id };
20 }
21
22 let Some(event) =
23 state.event_log.iter().find(|event| event.event_id >= subscription.next_event_id).cloned()
24 else {
25 return PollResult::Timeout;
26 };
27
28 if subscription.next_event_id < event.event_id {
29 subscription.next_event_id = event.event_id;
30 return PollResult::Gap { next_event_id: event.event_id };
31 }
32
33 subscription.next_event_id = event.event_id.saturating_add(1);
34 PollResult::Event(event)
35}
36
37fn append_runtime_events_locked(state: &mut NodeState, events: Vec<RuntimeEvent>) {
38 let now_ms = state.last_now_ms;
39 for event in events {
40 match event {
41 RuntimeEvent::LifecycleChanged { to, .. } => push_event_locked(
42 state,
43 NodeEventKind::StatusChanged {
44 run_state: NodeRunState::Running,
45 lifecycle_state: Some(to),
46 },
47 None,
48 now_ms,
49 ),
50 RuntimeEvent::FrameSent { kind, sequence, bytes } => push_event_locked(
51 state,
52 NodeEventKind::PacketSent { frame_kind: kind, sequence, bytes },
53 Some(u64::from(sequence)),
54 now_ms,
55 ),
56 RuntimeEvent::FrameReceived { kind, sequence, bytes } => push_event_locked(
57 state,
58 NodeEventKind::PacketReceived { frame_kind: kind, sequence, bytes },
59 Some(u64::from(sequence)),
60 now_ms,
61 ),
62 RuntimeEvent::FrameDeferred { kind, sequence, error }
63 | RuntimeEvent::FrameRejected { kind, sequence, error } => push_event_locked(
64 state,
65 NodeEventKind::Error { error: NodeError::from(error), frame_kind: kind, sequence },
66 Some(u64::from(sequence)),
67 now_ms,
68 ),
69 RuntimeEvent::Bootstrapped { replay_floor } => push_event_locked(
70 state,
71 NodeEventKind::Extension {
72 extension_id: NODE_EXTENSION_ID_BOOTSTRAPPED,
73 value0: replay_floor,
74 value1: 0,
75 },
76 None,
77 now_ms,
78 ),
79 RuntimeEvent::AnnounceQueued { sequence }
80 | RuntimeEvent::MessageQueued { sequence, .. } => {
81 push_event_locked(
82 state,
83 NodeEventKind::Extension {
84 extension_id: NODE_EXTENSION_ID_MESSAGE_QUEUED,
85 value0: u64::from(sequence),
86 value1: 0,
87 },
88 Some(u64::from(sequence)),
89 now_ms,
90 );
91 }
92 RuntimeEvent::AnnounceReceived { sequence, bytes }
93 | RuntimeEvent::LxmfMessageReceived { sequence, body_bytes: bytes, .. } => {
94 push_event_locked(
95 state,
96 NodeEventKind::Extension {
97 extension_id: NODE_EXTENSION_ID_RECEIVED_SUMMARY,
98 value0: u64::from(sequence),
99 value1: bytes as u64,
100 },
101 Some(u64::from(sequence)),
102 now_ms,
103 );
104 }
105 }
106 }
107}
108
109fn push_event_locked(
110 state: &mut NodeState,
111 kind: NodeEventKind,
112 operation_id: Option<u64>,
113 occurred_at_ms: u64,
114) {
115 if let NodeEventKind::Extension { extension_id, .. } = &kind {
116 debug_assert!(is_valid_extension_id(*extension_id));
117 }
118 if state.event_log.len() >= state.event_capacity {
119 state.event_log.pop_front();
120 }
121 state.event_log.push_back(NodeEvent {
122 event_id: state.next_event_id,
123 epoch: state.epoch,
124 occurred_at_ms,
125 operation_id,
126 kind,
127 });
128 state.next_event_id = state.next_event_id.saturating_add(1);
129}
130
131#[cfg(feature = "std")]
132fn ensure_manual_progression_allowed(state: &NodeState) -> Result<(), NodeError> {
133 if state.driver.as_ref().is_some_and(|driver| !driver.stop_requested) {
134 return Err(NodeError::ModeConflict);
135 }
136 Ok(())
137}
138
139#[cfg(not(feature = "std"))]
140fn ensure_manual_progression_allowed(_state: &NodeState) -> Result<(), NodeError> {
141 Ok(())
142}
143
144pub fn is_valid_extension_id(extension_id: u32) -> bool {
145 matches!(
146 extension_id,
147 NODE_EXTENSION_ID_BOOTSTRAPPED
148 | NODE_EXTENSION_ID_MESSAGE_QUEUED
149 | NODE_EXTENSION_ID_RECEIVED_SUMMARY
150 )
151}
152
153fn signal_generation_change(state: &mut NodeState, epoch: u64) {
154 let next_event_id = state.next_event_id;
155 for subscription in state.subscriptions.values_mut() {
156 subscription.next_event_id = next_event_id;
157 subscription.pending_signals.push_back(PendingSignal::NodeRestarted { epoch });
158 }
159}
160
161fn signal_stopped(state: &mut NodeState) {
162 let next_event_id = state.next_event_id;
163 for subscription in state.subscriptions.values_mut() {
164 subscription.next_event_id = next_event_id;
165 subscription.pending_signals.push_back(PendingSignal::NodeStopped);
166 }
167}
168
169#[cfg(feature = "std")]
170fn driver_tick(inner: &Arc<StdNodeInner>, epoch: u64) -> bool {
171 let keep_running = {
172 let mut state = match inner.state.lock() {
173 Ok(state) => state,
174 Err(_) => return false,
175 };
176
177 let Some(driver) = state.driver.as_ref() else {
178 return false;
179 };
180 if driver.stop_requested || driver.epoch != epoch {
181 return false;
182 }
183 let now_ms = driver.start_instant.elapsed().as_millis() as u64;
184
185 let events = match state.session.as_mut() {
186 Some(session) if session.epoch == epoch => match session.tick(now_ms) {
187 Ok(events) => events,
188 Err(err) => {
189 state.last_now_ms = now_ms;
190 push_event_locked(
191 &mut state,
192 NodeEventKind::Error { error: err, frame_kind: 0, sequence: 0 },
193 None,
194 now_ms,
195 );
196 Vec::new()
197 }
198 },
199 _ => return false,
200 };
201
202 state.last_now_ms = now_ms;
203 append_runtime_events_locked(&mut state, events);
204 true
205 };
206
207 inner.condvar.notify_all();
208
209 keep_running
210}
211
212#[cfg(feature = "std")]
213fn stop_driver_locked(state: &mut NodeState) -> Option<JoinHandle<()>> {
214 let driver = state.driver.as_mut()?;
215 driver.stop_requested = true;
216 driver.handle.take()
217}
218
219#[cfg(feature = "std")]
220fn join_driver(handle: Option<JoinHandle<()>>) {
221 if let Some(handle) = handle {
222 let _ = handle.join();
223 }
224}
225
226#[cfg(feature = "std")]
227fn clone_inner(inner: &Arc<StdNodeInner>) -> Arc<StdNodeInner> {
228 Arc::clone(inner)
229}
230
231#[cfg(not(feature = "std"))]
232fn clone_inner(inner: &Rc<RefCell<NodeState>>) -> Rc<RefCell<NodeState>> {
233 Rc::clone(inner)
234}