rns-embedded-runtime 0.5.1

Runtime support layer for embedded Reticulum transports and alloc-backed targets.
Documentation
fn next_poll_result_locked(state: &mut NodeState, subscription_id: u64) -> PollResult {
    let Some(subscription) = state.subscriptions.get_mut(&subscription_id) else {
        return PollResult::Closed;
    };

    if let Some(signal) = subscription.pending_signals.pop_front() {
        return match signal {
            PendingSignal::NodeStopped => PollResult::NodeStopped,
            PendingSignal::NodeRestarted { epoch } => PollResult::NodeRestarted { epoch },
        };
    }

    let Some(first_event) = state.event_log.front() else {
        return PollResult::Timeout;
    };

    if subscription.next_event_id < first_event.event_id {
        subscription.next_event_id = first_event.event_id;
        return PollResult::Gap { next_event_id: first_event.event_id };
    }

    let Some(event) =
        state.event_log.iter().find(|event| event.event_id >= subscription.next_event_id).cloned()
    else {
        return PollResult::Timeout;
    };

    if subscription.next_event_id < event.event_id {
        subscription.next_event_id = event.event_id;
        return PollResult::Gap { next_event_id: event.event_id };
    }

    subscription.next_event_id = event.event_id.saturating_add(1);
    PollResult::Event(event)
}

fn append_runtime_events_locked(state: &mut NodeState, events: Vec<RuntimeEvent>) {
    let now_ms = state.last_now_ms;
    for event in events {
        match event {
            RuntimeEvent::LifecycleChanged { to, .. } => push_event_locked(
                state,
                NodeEventKind::StatusChanged {
                    run_state: NodeRunState::Running,
                    lifecycle_state: Some(to),
                },
                None,
                now_ms,
            ),
            RuntimeEvent::FrameSent { kind, sequence, bytes } => push_event_locked(
                state,
                NodeEventKind::PacketSent { frame_kind: kind, sequence, bytes },
                Some(u64::from(sequence)),
                now_ms,
            ),
            RuntimeEvent::FrameReceived { kind, sequence, bytes } => push_event_locked(
                state,
                NodeEventKind::PacketReceived { frame_kind: kind, sequence, bytes },
                Some(u64::from(sequence)),
                now_ms,
            ),
            RuntimeEvent::FrameDeferred { kind, sequence, error }
            | RuntimeEvent::FrameRejected { kind, sequence, error } => push_event_locked(
                state,
                NodeEventKind::Error { error: NodeError::from(error), frame_kind: kind, sequence },
                Some(u64::from(sequence)),
                now_ms,
            ),
            RuntimeEvent::Bootstrapped { replay_floor } => push_event_locked(
                state,
                NodeEventKind::Extension {
                    extension_id: NODE_EXTENSION_ID_BOOTSTRAPPED,
                    value0: replay_floor,
                    value1: 0,
                },
                None,
                now_ms,
            ),
            RuntimeEvent::AnnounceQueued { sequence }
            | RuntimeEvent::MessageQueued { sequence, .. } => {
                push_event_locked(
                    state,
                    NodeEventKind::Extension {
                        extension_id: NODE_EXTENSION_ID_MESSAGE_QUEUED,
                        value0: u64::from(sequence),
                        value1: 0,
                    },
                    Some(u64::from(sequence)),
                    now_ms,
                );
            }
            RuntimeEvent::AnnounceReceived { sequence, bytes }
            | RuntimeEvent::LxmfMessageReceived { sequence, body_bytes: bytes, .. } => {
                push_event_locked(
                    state,
                    NodeEventKind::Extension {
                        extension_id: NODE_EXTENSION_ID_RECEIVED_SUMMARY,
                        value0: u64::from(sequence),
                        value1: bytes as u64,
                    },
                    Some(u64::from(sequence)),
                    now_ms,
                );
            }
        }
    }
}

fn push_event_locked(
    state: &mut NodeState,
    kind: NodeEventKind,
    operation_id: Option<u64>,
    occurred_at_ms: u64,
) {
    if let NodeEventKind::Extension { extension_id, .. } = &kind {
        debug_assert!(is_valid_extension_id(*extension_id));
    }
    if state.event_log.len() >= state.event_capacity {
        state.event_log.pop_front();
    }
    state.event_log.push_back(NodeEvent {
        event_id: state.next_event_id,
        epoch: state.epoch,
        occurred_at_ms,
        operation_id,
        kind,
    });
    state.next_event_id = state.next_event_id.saturating_add(1);
}

#[cfg(feature = "std")]
fn ensure_manual_progression_allowed(state: &NodeState) -> Result<(), NodeError> {
    if state.driver.as_ref().is_some_and(|driver| !driver.stop_requested) {
        return Err(NodeError::ModeConflict);
    }
    Ok(())
}

#[cfg(not(feature = "std"))]
fn ensure_manual_progression_allowed(_state: &NodeState) -> Result<(), NodeError> {
    Ok(())
}

pub fn is_valid_extension_id(extension_id: u32) -> bool {
    matches!(
        extension_id,
        NODE_EXTENSION_ID_BOOTSTRAPPED
            | NODE_EXTENSION_ID_MESSAGE_QUEUED
            | NODE_EXTENSION_ID_RECEIVED_SUMMARY
    )
}

fn signal_generation_change(state: &mut NodeState, epoch: u64) {
    let next_event_id = state.next_event_id;
    for subscription in state.subscriptions.values_mut() {
        subscription.next_event_id = next_event_id;
        subscription.pending_signals.push_back(PendingSignal::NodeRestarted { epoch });
    }
}

fn signal_stopped(state: &mut NodeState) {
    let next_event_id = state.next_event_id;
    for subscription in state.subscriptions.values_mut() {
        subscription.next_event_id = next_event_id;
        subscription.pending_signals.push_back(PendingSignal::NodeStopped);
    }
}

#[cfg(feature = "std")]
fn driver_tick(inner: &Arc<StdNodeInner>, epoch: u64) -> bool {
    let keep_running = {
        let mut state = match inner.state.lock() {
            Ok(state) => state,
            Err(_) => return false,
        };

        let Some(driver) = state.driver.as_ref() else {
            return false;
        };
        if driver.stop_requested || driver.epoch != epoch {
            return false;
        }
        let now_ms = driver.start_instant.elapsed().as_millis() as u64;

        let events = match state.session.as_mut() {
            Some(session) if session.epoch == epoch => match session.tick(now_ms) {
                Ok(events) => events,
                Err(err) => {
                    state.last_now_ms = now_ms;
                    push_event_locked(
                        &mut state,
                        NodeEventKind::Error { error: err, frame_kind: 0, sequence: 0 },
                        None,
                        now_ms,
                    );
                    Vec::new()
                }
            },
            _ => return false,
        };

        state.last_now_ms = now_ms;
        append_runtime_events_locked(&mut state, events);
        true
    };

    inner.condvar.notify_all();

    keep_running
}

#[cfg(feature = "std")]
fn stop_driver_locked(state: &mut NodeState) -> Option<JoinHandle<()>> {
    let driver = state.driver.as_mut()?;
    driver.stop_requested = true;
    driver.handle.take()
}

#[cfg(feature = "std")]
fn join_driver(handle: Option<JoinHandle<()>>) {
    if let Some(handle) = handle {
        let _ = handle.join();
    }
}

#[cfg(feature = "std")]
fn clone_inner(inner: &Arc<StdNodeInner>) -> Arc<StdNodeInner> {
    Arc::clone(inner)
}

#[cfg(not(feature = "std"))]
fn clone_inner(inner: &Rc<RefCell<NodeState>>) -> Rc<RefCell<NodeState>> {
    Rc::clone(inner)
}