Skip to main content

jacquard_host_support/
mailbox.rs

1//! Bounded transport ingress mailbox for staging raw ingress events before
2//! they are stamped with Jacquard logical time.
3//!
4//! The mailbox is created via `transport_ingress_mailbox(capacity)`, which
5//! returns three handles that together cover the full host-side lifecycle:
6//! - `TransportIngressSender` — cloneable write handle used by the transport
7//!   driver to emit raw ingress events from any thread.
8//! - `TransportIngressReceiver` — single-owner drain handle used by the host
9//!   bridge to collect and stamp events before routing.
10//! - `TransportIngressNotifier` — cloneable generation-stamp handle that lets a
11//!   bridge or scheduler observe whether the mailbox has changed since the last
12//!   drain, enabling efficient blocking waits without polling and
13//!   runtime-agnostic async waiting via [`TransportIngressNotifier::changed`].
14//!
15//! `TransportIngressClass` distinguishes payload frames from control frames.
16//! Payload overflow is fail-open: excess frames are counted in
17//! `TransportIngressDrain::dropped_payload_count` and silently discarded.
18//! Control overflow is fail-closed: `ControlIngressOverflow` is returned so
19//! the caller can take corrective action.
20
21use alloc::{collections::VecDeque, vec::Vec};
22use core::{
23    fmt,
24    future::Future,
25    mem,
26    pin::Pin,
27    task::{Context, Poll, Waker},
28};
29
30#[cfg(not(feature = "std"))]
31use alloc::rc::Rc;
32#[cfg(not(feature = "std"))]
33use core::cell::RefCell;
34#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
35use std::sync::Condvar;
36#[cfg(feature = "std")]
37use std::sync::{Arc, Mutex, MutexGuard};
38
39use jacquard_core::TransportIngressEvent;
40use jacquard_macros::public_model;
41use serde::{Deserialize, Serialize};
42
43#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
44use jacquard_core::DurationMs;
45
46#[public_model]
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum TransportIngressClass {
49    Payload,
50    Control,
51}
52
53#[public_model]
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum TransportIngressSendOutcome {
56    Enqueued,
57    DroppedPayload,
58}
59
60#[public_model]
61#[derive(Clone, Debug, PartialEq, Eq)]
62pub struct TransportIngressDrain {
63    pub events: Vec<TransportIngressEvent>,
64    pub dropped_payload_count: u64,
65}
66
67#[public_model]
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69pub struct ControlIngressOverflow;
70
71impl fmt::Display for ControlIngressOverflow {
72    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
73        formatter.write_str("control ingress queue is full")
74    }
75}
76
77#[cfg(feature = "std")]
78impl std::error::Error for ControlIngressOverflow {}
79
80#[derive(Default)]
81struct MailboxState {
82    events: VecDeque<TransportIngressEvent>,
83    dropped_payload_count: u64,
84    generation: u64,
85    waiter: Option<Waker>,
86}
87
88#[cfg(feature = "std")]
89type SharedMailboxHandle = Arc<SharedMailbox>;
90
91#[cfg(not(feature = "std"))]
92type SharedMailboxHandle = Rc<SharedMailbox>;
93
94struct SharedMailbox {
95    storage: MailboxStorage,
96    capacity: usize,
97    notifier: MailboxChangeNotifier,
98}
99
100#[cfg(feature = "std")]
101struct MailboxStorage {
102    state: Mutex<MailboxState>,
103}
104
105#[cfg(not(feature = "std"))]
106struct MailboxStorage {
107    state: RefCell<MailboxState>,
108}
109
110#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
111struct MailboxChangeNotifier {
112    changed: Condvar,
113}
114
115#[cfg(not(all(feature = "std", not(target_arch = "wasm32"))))]
116#[derive(Clone, Copy, Debug, Default)]
117struct MailboxChangeNotifier;
118
119trait TransportIngressWake {
120    fn wake_ingress_waiters(&self);
121}
122
123#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
124impl TransportIngressWake for MailboxChangeNotifier {
125    fn wake_ingress_waiters(&self) {
126        self.changed.notify_all();
127    }
128}
129
130#[cfg(not(all(feature = "std", not(target_arch = "wasm32"))))]
131impl TransportIngressWake for MailboxChangeNotifier {
132    fn wake_ingress_waiters(&self) {}
133}
134
135impl SharedMailbox {
136    fn bump_generation(state: &mut MailboxState) {
137        state.generation = state.generation.saturating_add(1);
138    }
139
140    fn take_waiter(state: &mut MailboxState) -> Option<Waker> {
141        state.waiter.take()
142    }
143
144    fn wake_waiter(waiter: Option<Waker>) {
145        if let Some(waiter) = waiter {
146            waiter.wake();
147        }
148    }
149
150    fn notify_changed(&self) {
151        self.notifier.wake_ingress_waiters();
152    }
153
154    #[cfg(feature = "std")]
155    fn with_state<Output>(&self, operation: impl FnOnce(&mut MailboxState) -> Output) -> Output {
156        let mut guard = self.lock_state();
157        operation(&mut guard)
158    }
159
160    #[cfg(feature = "std")]
161    fn lock_state(&self) -> MutexGuard<'_, MailboxState> {
162        self.storage
163            .state
164            .lock()
165            .unwrap_or_else(|poisoned| poisoned.into_inner())
166    }
167
168    #[cfg(not(feature = "std"))]
169    fn with_state<Output>(&self, operation: impl FnOnce(&mut MailboxState) -> Output) -> Output {
170        let mut guard = self.storage.state.borrow_mut();
171        operation(&mut guard)
172    }
173
174    #[cfg(all(feature = "std", not(target_arch = "wasm32")))]
175    #[expect(
176        clippy::disallowed_types,
177        reason = "Condvar and thread-parking APIs require std::time::Duration internally"
178    )]
179    fn std_duration(duration_ms: DurationMs) -> std::time::Duration {
180        std::time::Duration::from_millis(u64::from(duration_ms.0))
181    }
182}
183
184#[derive(Clone)]
185pub struct TransportIngressSender {
186    shared: SharedMailboxHandle,
187}
188
189pub struct TransportIngressReceiver {
190    shared: SharedMailboxHandle,
191}
192
193#[derive(Clone)]
194pub struct TransportIngressNotifier {
195    shared: SharedMailboxHandle,
196}
197
198pub struct TransportIngressChanged<'a> {
199    notifier: &'a TransportIngressNotifier,
200    snapshot: u64,
201}
202
203#[must_use]
204pub fn transport_ingress_mailbox(
205    capacity: usize,
206) -> (
207    TransportIngressSender,
208    TransportIngressReceiver,
209    TransportIngressNotifier,
210) {
211    assert!(
212        capacity > 0,
213        "transport ingress mailbox capacity must be non-zero"
214    );
215    let shared = new_shared_mailbox(capacity);
216    (
217        TransportIngressSender {
218            shared: shared.clone(),
219        },
220        TransportIngressReceiver {
221            shared: shared.clone(),
222        },
223        TransportIngressNotifier { shared },
224    )
225}
226
227#[cfg(feature = "std")]
228fn new_shared_mailbox(capacity: usize) -> SharedMailboxHandle {
229    Arc::new(SharedMailbox {
230        storage: MailboxStorage {
231            state: Mutex::new(MailboxState::default()),
232        },
233        capacity,
234        notifier: new_change_notifier(),
235    })
236}
237
238#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
239fn new_change_notifier() -> MailboxChangeNotifier {
240    MailboxChangeNotifier {
241        changed: Condvar::new(),
242    }
243}
244
245#[cfg(all(feature = "std", target_arch = "wasm32"))]
246fn new_change_notifier() -> MailboxChangeNotifier {
247    MailboxChangeNotifier
248}
249
250#[cfg(not(feature = "std"))]
251fn new_shared_mailbox(capacity: usize) -> SharedMailboxHandle {
252    Rc::new(SharedMailbox {
253        storage: MailboxStorage {
254            state: RefCell::new(MailboxState::default()),
255        },
256        capacity,
257        notifier: MailboxChangeNotifier,
258    })
259}
260
261impl TransportIngressSender {
262    pub fn emit(
263        &self,
264        class: TransportIngressClass,
265        event: TransportIngressEvent,
266    ) -> Result<TransportIngressSendOutcome, ControlIngressOverflow> {
267        let (result, waiter) = self.shared.with_state(|state| {
268            if state.events.len() >= self.shared.capacity {
269                if class == TransportIngressClass::Payload {
270                    state.dropped_payload_count = state.dropped_payload_count.saturating_add(1);
271                    SharedMailbox::bump_generation(state);
272                    let waiter = SharedMailbox::take_waiter(state);
273                    return (Ok(TransportIngressSendOutcome::DroppedPayload), waiter);
274                }
275                return (Err(ControlIngressOverflow), None);
276            }
277
278            state.events.push_back(event);
279            SharedMailbox::bump_generation(state);
280            let waiter = SharedMailbox::take_waiter(state);
281            (Ok(TransportIngressSendOutcome::Enqueued), waiter)
282        });
283        if waiter.is_some() || result.is_ok() {
284            self.shared.notify_changed();
285        }
286        SharedMailbox::wake_waiter(waiter);
287        result
288    }
289}
290
291impl TransportIngressReceiver {
292    #[must_use]
293    pub fn drain(&mut self) -> TransportIngressDrain {
294        self.shared.with_state(|state| TransportIngressDrain {
295            events: state.events.drain(..).collect(),
296            dropped_payload_count: mem::take(&mut state.dropped_payload_count),
297        })
298    }
299}
300
301impl TransportIngressNotifier {
302    #[must_use]
303    pub fn snapshot(&self) -> u64 {
304        self.shared.with_state(|state| state.generation)
305    }
306
307    #[must_use]
308    pub fn has_changed_since(&self, snapshot: u64) -> bool {
309        self.snapshot() != snapshot
310    }
311
312    #[cfg(all(feature = "std", not(target_arch = "wasm32")))]
313    pub fn wait_for_change(&self, snapshot: u64) {
314        let mut guard = self.shared.lock_state();
315        while guard.generation == snapshot {
316            guard = self
317                .shared
318                .notifier
319                .changed
320                .wait(guard)
321                .unwrap_or_else(|poisoned| poisoned.into_inner());
322        }
323    }
324
325    #[cfg(all(feature = "std", not(target_arch = "wasm32")))]
326    #[must_use]
327    pub fn wait_for_change_within_ms(&self, snapshot: u64, wait_ms: DurationMs) -> bool {
328        let guard = self.shared.lock_state();
329        let std_wait = SharedMailbox::std_duration(wait_ms);
330        let (guard, _) = self
331            .shared
332            .notifier
333            .changed
334            .wait_timeout_while(guard, std_wait, |state| state.generation == snapshot)
335            .unwrap_or_else(|poisoned| poisoned.into_inner());
336        guard.generation != snapshot
337    }
338
339    #[must_use]
340    pub fn changed(&self, snapshot: u64) -> TransportIngressChanged<'_> {
341        TransportIngressChanged {
342            notifier: self,
343            snapshot,
344        }
345    }
346}
347
348impl Future for TransportIngressChanged<'_> {
349    type Output = u64;
350
351    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
352        self.notifier.shared.with_state(|state| {
353            if state.generation != self.snapshot {
354                return Poll::Ready(state.generation);
355            }
356
357            match &state.waiter {
358                Some(waiter) if waiter.will_wake(cx.waker()) => {}
359                _ => {
360                    state.waiter = Some(cx.waker().clone());
361                }
362            }
363            Poll::Pending
364        })
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use std::{
371        future::Future,
372        sync::{
373            atomic::{AtomicBool, Ordering},
374            Arc, Barrier,
375        },
376        task::{Context, Poll, Wake, Waker},
377    };
378
379    #[cfg(not(target_arch = "wasm32"))]
380    use std::thread;
381
382    use jacquard_core::{ByteCount, EndpointLocator, NodeId, TransportKind};
383
384    #[cfg(not(target_arch = "wasm32"))]
385    use jacquard_core::DurationMs;
386
387    use super::{transport_ingress_mailbox, TransportIngressClass, TransportIngressSendOutcome};
388
389    fn payload(byte: u8) -> jacquard_core::TransportIngressEvent {
390        jacquard_core::TransportIngressEvent::PayloadReceived {
391            from_node_id: NodeId([byte; 32]),
392            endpoint: jacquard_core::LinkEndpoint::new(
393                TransportKind::WifiAware,
394                EndpointLocator::Opaque(vec![byte]),
395                ByteCount(64),
396            ),
397            payload: vec![byte],
398        }
399    }
400
401    #[cfg(not(target_arch = "wasm32"))]
402    #[expect(
403        clippy::disallowed_types,
404        reason = "std thread sleep and park APIs require std::time::Duration in tests"
405    )]
406    fn std_duration(duration_ms: DurationMs) -> std::time::Duration {
407        std::time::Duration::from_millis(u64::from(duration_ms.0))
408    }
409
410    #[test]
411    fn payload_overflow_is_accounted_for_explicitly() {
412        let (sender, mut receiver, _) = transport_ingress_mailbox(1);
413
414        assert_eq!(
415            sender
416                .emit(TransportIngressClass::Payload, payload(1))
417                .expect("enqueue payload"),
418            TransportIngressSendOutcome::Enqueued
419        );
420        assert_eq!(
421            sender
422                .emit(TransportIngressClass::Payload, payload(2))
423                .expect("drop payload"),
424            TransportIngressSendOutcome::DroppedPayload
425        );
426
427        let drain = receiver.drain();
428        assert_eq!(drain.events.len(), 1);
429        assert_eq!(drain.dropped_payload_count, 1);
430    }
431
432    #[test]
433    fn control_path_overflow_fails_closed() {
434        let (sender, _, _) = transport_ingress_mailbox(1);
435
436        sender
437            .emit(TransportIngressClass::Control, payload(1))
438            .expect("enqueue control");
439        let error = sender
440            .emit(TransportIngressClass::Control, payload(2))
441            .expect_err("control overflow must fail closed");
442
443        assert_eq!(error.to_string(), "control ingress queue is full");
444    }
445
446    #[test]
447    #[cfg(not(target_arch = "wasm32"))]
448    fn notifier_timeout_reports_when_no_change_arrives() {
449        let (_, _, notifier) = transport_ingress_mailbox(1);
450        let snapshot = notifier.snapshot();
451
452        assert!(!notifier.wait_for_change_within_ms(snapshot, DurationMs(5)));
453    }
454
455    #[test]
456    #[cfg(not(target_arch = "wasm32"))]
457    fn notifier_timeout_reports_when_change_arrives() {
458        let (sender, _, notifier) = transport_ingress_mailbox(1);
459        let snapshot = notifier.snapshot();
460
461        thread::spawn(move || {
462            thread::sleep(std_duration(DurationMs(5)));
463            sender
464                .emit(TransportIngressClass::Payload, payload(7))
465                .expect("enqueue payload");
466        });
467
468        assert!(notifier.wait_for_change_within_ms(snapshot, DurationMs(50)));
469    }
470
471    #[test]
472    fn receiver_drain_clears_events_and_drop_counts() {
473        let (sender, mut receiver, _) = transport_ingress_mailbox(2);
474        sender
475            .emit(TransportIngressClass::Payload, payload(1))
476            .expect("enqueue payload");
477        sender
478            .emit(TransportIngressClass::Payload, payload(2))
479            .expect("enqueue payload");
480
481        let first = receiver.drain();
482        assert_eq!(first.events.len(), 2);
483        assert_eq!(first.dropped_payload_count, 0);
484
485        let second = receiver.drain();
486        assert!(second.events.is_empty());
487        assert_eq!(second.dropped_payload_count, 0);
488    }
489
490    #[test]
491    #[cfg(not(target_arch = "wasm32"))]
492    fn notifier_wakes_after_ingress_change() {
493        let (sender, _, notifier) = transport_ingress_mailbox(1);
494        let snapshot = notifier.snapshot();
495        let start = Arc::new(Barrier::new(2));
496        let ready = Arc::clone(&start);
497        let wait_notifier = notifier.clone();
498
499        let handle = thread::spawn(move || {
500            ready.wait();
501            wait_notifier.wait_for_change(snapshot);
502        });
503
504        start.wait();
505        sender
506            .emit(TransportIngressClass::Payload, payload(9))
507            .expect("enqueue payload");
508
509        handle.join().expect("notifier waiter");
510        assert!(notifier.has_changed_since(snapshot));
511    }
512
513    #[test]
514    fn changed_future_wakes_after_ingress_change() {
515        #[derive(Debug)]
516        struct FlagWaker {
517            woke: Arc<AtomicBool>,
518            thread: thread::Thread,
519        }
520
521        impl Wake for FlagWaker {
522            fn wake(self: Arc<Self>) {
523                self.woke.store(true, Ordering::SeqCst);
524                self.thread.unpark();
525            }
526
527            fn wake_by_ref(self: &Arc<Self>) {
528                self.woke.store(true, Ordering::SeqCst);
529                self.thread.unpark();
530            }
531        }
532
533        let (sender, _, notifier) = transport_ingress_mailbox(1);
534        let snapshot = notifier.snapshot();
535        let woke = Arc::new(AtomicBool::new(false));
536        let waker = Waker::from(Arc::new(FlagWaker {
537            woke: Arc::clone(&woke),
538            thread: thread::current(),
539        }));
540        let mut context = Context::from_waker(&waker);
541        let mut changed = Box::pin(notifier.changed(snapshot));
542
543        assert!(matches!(changed.as_mut().poll(&mut context), Poll::Pending));
544
545        thread::spawn(move || {
546            thread::sleep(std_duration(DurationMs(5)));
547            sender
548                .emit(TransportIngressClass::Payload, payload(8))
549                .expect("enqueue payload");
550        });
551
552        while !woke.load(Ordering::SeqCst) {
553            thread::park_timeout(std_duration(DurationMs(50)));
554        }
555
556        assert!(matches!(
557            changed.as_mut().poll(&mut context),
558            Poll::Ready(_)
559        ));
560    }
561
562    #[test]
563    fn changed_future_keeps_single_waiter_slot() {
564        #[derive(Debug)]
565        struct NoopWaker;
566
567        impl Wake for NoopWaker {
568            fn wake(self: Arc<Self>) {}
569        }
570
571        let (_, _, notifier) = transport_ingress_mailbox(1);
572        let snapshot = notifier.snapshot();
573        let first_waker = Waker::from(Arc::new(NoopWaker));
574        let second_waker = Waker::from(Arc::new(NoopWaker));
575        let mut first_context = Context::from_waker(&first_waker);
576        let mut second_context = Context::from_waker(&second_waker);
577        let mut first = Box::pin(notifier.changed(snapshot));
578        let mut second = Box::pin(notifier.changed(snapshot));
579
580        assert!(matches!(
581            first.as_mut().poll(&mut first_context),
582            Poll::Pending
583        ));
584        assert!(matches!(
585            second.as_mut().poll(&mut second_context),
586            Poll::Pending
587        ));
588
589        notifier.shared.with_state(|state| {
590            assert!(state.waiter.is_some());
591        });
592    }
593}