Skip to main content

rns_embedded_runtime/node_parts/
next_poll_result_locked.rs

1fn next_poll_result_locked(state: &mut NodeState, subscription_id: u64) -> PollResult {
2    let Some(subscription) = state.subscriptions.get_mut(&subscription_id) else {
3        return PollResult::Closed;
4    };
5
6    if let Some(signal) = subscription.pending_signals.pop_front() {
7        return match signal {
8            PendingSignal::NodeStopped => PollResult::NodeStopped,
9            PendingSignal::NodeRestarted { epoch } => PollResult::NodeRestarted { epoch },
10        };
11    }
12
13    let Some(first_event) = state.event_log.front() else {
14        return PollResult::Timeout;
15    };
16
17    if subscription.next_event_id < first_event.event_id {
18        subscription.next_event_id = first_event.event_id;
19        return PollResult::Gap { next_event_id: first_event.event_id };
20    }
21
22    let Some(event) =
23        state.event_log.iter().find(|event| event.event_id >= subscription.next_event_id).cloned()
24    else {
25        return PollResult::Timeout;
26    };
27
28    if subscription.next_event_id < event.event_id {
29        subscription.next_event_id = event.event_id;
30        return PollResult::Gap { next_event_id: event.event_id };
31    }
32
33    subscription.next_event_id = event.event_id.saturating_add(1);
34    PollResult::Event(event)
35}
36
37fn append_runtime_events_locked(state: &mut NodeState, events: Vec<RuntimeEvent>) {
38    let now_ms = state.last_now_ms;
39    for event in events {
40        match event {
41            RuntimeEvent::LifecycleChanged { to, .. } => push_event_locked(
42                state,
43                NodeEventKind::StatusChanged {
44                    run_state: NodeRunState::Running,
45                    lifecycle_state: Some(to),
46                },
47                None,
48                now_ms,
49            ),
50            RuntimeEvent::FrameSent { kind, sequence, bytes } => push_event_locked(
51                state,
52                NodeEventKind::PacketSent { frame_kind: kind, sequence, bytes },
53                Some(u64::from(sequence)),
54                now_ms,
55            ),
56            RuntimeEvent::FrameReceived { kind, sequence, bytes } => push_event_locked(
57                state,
58                NodeEventKind::PacketReceived { frame_kind: kind, sequence, bytes },
59                Some(u64::from(sequence)),
60                now_ms,
61            ),
62            RuntimeEvent::FrameDeferred { kind, sequence, error }
63            | RuntimeEvent::FrameRejected { kind, sequence, error } => push_event_locked(
64                state,
65                NodeEventKind::Error { error: NodeError::from(error), frame_kind: kind, sequence },
66                Some(u64::from(sequence)),
67                now_ms,
68            ),
69            RuntimeEvent::Bootstrapped { replay_floor } => push_event_locked(
70                state,
71                NodeEventKind::Extension {
72                    extension_id: NODE_EXTENSION_ID_BOOTSTRAPPED,
73                    value0: replay_floor,
74                    value1: 0,
75                },
76                None,
77                now_ms,
78            ),
79            RuntimeEvent::AnnounceQueued { sequence }
80            | RuntimeEvent::MessageQueued { sequence, .. } => {
81                push_event_locked(
82                    state,
83                    NodeEventKind::Extension {
84                        extension_id: NODE_EXTENSION_ID_MESSAGE_QUEUED,
85                        value0: u64::from(sequence),
86                        value1: 0,
87                    },
88                    Some(u64::from(sequence)),
89                    now_ms,
90                );
91            }
92            RuntimeEvent::AnnounceReceived { sequence, bytes }
93            | RuntimeEvent::LxmfMessageReceived { sequence, body_bytes: bytes, .. } => {
94                push_event_locked(
95                    state,
96                    NodeEventKind::Extension {
97                        extension_id: NODE_EXTENSION_ID_RECEIVED_SUMMARY,
98                        value0: u64::from(sequence),
99                        value1: bytes as u64,
100                    },
101                    Some(u64::from(sequence)),
102                    now_ms,
103                );
104            }
105        }
106    }
107}
108
109fn push_event_locked(
110    state: &mut NodeState,
111    kind: NodeEventKind,
112    operation_id: Option<u64>,
113    occurred_at_ms: u64,
114) {
115    if let NodeEventKind::Extension { extension_id, .. } = &kind {
116        debug_assert!(is_valid_extension_id(*extension_id));
117    }
118    if state.event_log.len() >= state.event_capacity {
119        state.event_log.pop_front();
120    }
121    state.event_log.push_back(NodeEvent {
122        event_id: state.next_event_id,
123        epoch: state.epoch,
124        occurred_at_ms,
125        operation_id,
126        kind,
127    });
128    state.next_event_id = state.next_event_id.saturating_add(1);
129}
130
131#[cfg(feature = "std")]
132fn ensure_manual_progression_allowed(state: &NodeState) -> Result<(), NodeError> {
133    if state.driver.as_ref().is_some_and(|driver| !driver.stop_requested) {
134        return Err(NodeError::ModeConflict);
135    }
136    Ok(())
137}
138
139#[cfg(not(feature = "std"))]
140fn ensure_manual_progression_allowed(_state: &NodeState) -> Result<(), NodeError> {
141    Ok(())
142}
143
144pub fn is_valid_extension_id(extension_id: u32) -> bool {
145    matches!(
146        extension_id,
147        NODE_EXTENSION_ID_BOOTSTRAPPED
148            | NODE_EXTENSION_ID_MESSAGE_QUEUED
149            | NODE_EXTENSION_ID_RECEIVED_SUMMARY
150    )
151}
152
153fn signal_generation_change(state: &mut NodeState, epoch: u64) {
154    let next_event_id = state.next_event_id;
155    for subscription in state.subscriptions.values_mut() {
156        subscription.next_event_id = next_event_id;
157        subscription.pending_signals.push_back(PendingSignal::NodeRestarted { epoch });
158    }
159}
160
161fn signal_stopped(state: &mut NodeState) {
162    let next_event_id = state.next_event_id;
163    for subscription in state.subscriptions.values_mut() {
164        subscription.next_event_id = next_event_id;
165        subscription.pending_signals.push_back(PendingSignal::NodeStopped);
166    }
167}
168
169#[cfg(feature = "std")]
170fn driver_tick(inner: &Arc<StdNodeInner>, epoch: u64) -> bool {
171    let keep_running = {
172        let mut state = match inner.state.lock() {
173            Ok(state) => state,
174            Err(_) => return false,
175        };
176
177        let Some(driver) = state.driver.as_ref() else {
178            return false;
179        };
180        if driver.stop_requested || driver.epoch != epoch {
181            return false;
182        }
183        let now_ms = driver.start_instant.elapsed().as_millis() as u64;
184
185        let events = match state.session.as_mut() {
186            Some(session) if session.epoch == epoch => match session.tick(now_ms) {
187                Ok(events) => events,
188                Err(err) => {
189                    state.last_now_ms = now_ms;
190                    push_event_locked(
191                        &mut state,
192                        NodeEventKind::Error { error: err, frame_kind: 0, sequence: 0 },
193                        None,
194                        now_ms,
195                    );
196                    Vec::new()
197                }
198            },
199            _ => return false,
200        };
201
202        state.last_now_ms = now_ms;
203        append_runtime_events_locked(&mut state, events);
204        true
205    };
206
207    inner.condvar.notify_all();
208
209    keep_running
210}
211
212#[cfg(feature = "std")]
213fn stop_driver_locked(state: &mut NodeState) -> Option<JoinHandle<()>> {
214    let driver = state.driver.as_mut()?;
215    driver.stop_requested = true;
216    driver.handle.take()
217}
218
219#[cfg(feature = "std")]
220fn join_driver(handle: Option<JoinHandle<()>>) {
221    if let Some(handle) = handle {
222        let _ = handle.join();
223    }
224}
225
226#[cfg(feature = "std")]
227fn clone_inner(inner: &Arc<StdNodeInner>) -> Arc<StdNodeInner> {
228    Arc::clone(inner)
229}
230
231#[cfg(not(feature = "std"))]
232fn clone_inner(inner: &Rc<RefCell<NodeState>>) -> Rc<RefCell<NodeState>> {
233    Rc::clone(inner)
234}