1use alloc::{collections::VecDeque, vec::Vec};
22use core::{
23 fmt,
24 future::Future,
25 mem,
26 pin::Pin,
27 task::{Context, Poll, Waker},
28};
29
30#[cfg(not(feature = "std"))]
31use alloc::rc::Rc;
32#[cfg(not(feature = "std"))]
33use core::cell::RefCell;
34#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
35use std::sync::Condvar;
36#[cfg(feature = "std")]
37use std::sync::{Arc, Mutex, MutexGuard};
38
39use jacquard_core::TransportIngressEvent;
40use jacquard_macros::public_model;
41use serde::{Deserialize, Serialize};
42
43#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
44use jacquard_core::DurationMs;
45
46#[public_model]
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum TransportIngressClass {
49 Payload,
50 Control,
51}
52
53#[public_model]
54#[derive(Clone, Copy, Debug, PartialEq, Eq)]
55pub enum TransportIngressSendOutcome {
56 Enqueued,
57 DroppedPayload,
58}
59
60#[public_model]
61#[derive(Clone, Debug, PartialEq, Eq)]
62pub struct TransportIngressDrain {
63 pub events: Vec<TransportIngressEvent>,
64 pub dropped_payload_count: u64,
65}
66
67#[public_model]
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
69pub struct ControlIngressOverflow;
70
71impl fmt::Display for ControlIngressOverflow {
72 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
73 formatter.write_str("control ingress queue is full")
74 }
75}
76
77#[cfg(feature = "std")]
78impl std::error::Error for ControlIngressOverflow {}
79
80#[derive(Default)]
81struct MailboxState {
82 events: VecDeque<TransportIngressEvent>,
83 dropped_payload_count: u64,
84 generation: u64,
85 waiter: Option<Waker>,
86}
87
88#[cfg(feature = "std")]
89type SharedMailboxHandle = Arc<SharedMailbox>;
90
91#[cfg(not(feature = "std"))]
92type SharedMailboxHandle = Rc<SharedMailbox>;
93
94struct SharedMailbox {
95 storage: MailboxStorage,
96 capacity: usize,
97 notifier: MailboxChangeNotifier,
98}
99
100#[cfg(feature = "std")]
101struct MailboxStorage {
102 state: Mutex<MailboxState>,
103}
104
105#[cfg(not(feature = "std"))]
106struct MailboxStorage {
107 state: RefCell<MailboxState>,
108}
109
110#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
111struct MailboxChangeNotifier {
112 changed: Condvar,
113}
114
115#[cfg(not(all(feature = "std", not(target_arch = "wasm32"))))]
116#[derive(Clone, Copy, Debug, Default)]
117struct MailboxChangeNotifier;
118
119trait TransportIngressWake {
120 fn wake_ingress_waiters(&self);
121}
122
123#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
124impl TransportIngressWake for MailboxChangeNotifier {
125 fn wake_ingress_waiters(&self) {
126 self.changed.notify_all();
127 }
128}
129
130#[cfg(not(all(feature = "std", not(target_arch = "wasm32"))))]
131impl TransportIngressWake for MailboxChangeNotifier {
132 fn wake_ingress_waiters(&self) {}
133}
134
135impl SharedMailbox {
136 fn bump_generation(state: &mut MailboxState) {
137 state.generation = state.generation.saturating_add(1);
138 }
139
140 fn take_waiter(state: &mut MailboxState) -> Option<Waker> {
141 state.waiter.take()
142 }
143
144 fn wake_waiter(waiter: Option<Waker>) {
145 if let Some(waiter) = waiter {
146 waiter.wake();
147 }
148 }
149
150 fn notify_changed(&self) {
151 self.notifier.wake_ingress_waiters();
152 }
153
154 #[cfg(feature = "std")]
155 fn with_state<Output>(&self, operation: impl FnOnce(&mut MailboxState) -> Output) -> Output {
156 let mut guard = self.lock_state();
157 operation(&mut guard)
158 }
159
160 #[cfg(feature = "std")]
161 fn lock_state(&self) -> MutexGuard<'_, MailboxState> {
162 self.storage
163 .state
164 .lock()
165 .unwrap_or_else(|poisoned| poisoned.into_inner())
166 }
167
168 #[cfg(not(feature = "std"))]
169 fn with_state<Output>(&self, operation: impl FnOnce(&mut MailboxState) -> Output) -> Output {
170 let mut guard = self.storage.state.borrow_mut();
171 operation(&mut guard)
172 }
173
174 #[cfg(all(feature = "std", not(target_arch = "wasm32")))]
175 #[expect(
176 clippy::disallowed_types,
177 reason = "Condvar and thread-parking APIs require std::time::Duration internally"
178 )]
179 fn std_duration(duration_ms: DurationMs) -> std::time::Duration {
180 std::time::Duration::from_millis(u64::from(duration_ms.0))
181 }
182}
183
184#[derive(Clone)]
185pub struct TransportIngressSender {
186 shared: SharedMailboxHandle,
187}
188
189pub struct TransportIngressReceiver {
190 shared: SharedMailboxHandle,
191}
192
193#[derive(Clone)]
194pub struct TransportIngressNotifier {
195 shared: SharedMailboxHandle,
196}
197
198pub struct TransportIngressChanged<'a> {
199 notifier: &'a TransportIngressNotifier,
200 snapshot: u64,
201}
202
203#[must_use]
204pub fn transport_ingress_mailbox(
205 capacity: usize,
206) -> (
207 TransportIngressSender,
208 TransportIngressReceiver,
209 TransportIngressNotifier,
210) {
211 assert!(
212 capacity > 0,
213 "transport ingress mailbox capacity must be non-zero"
214 );
215 let shared = new_shared_mailbox(capacity);
216 (
217 TransportIngressSender {
218 shared: shared.clone(),
219 },
220 TransportIngressReceiver {
221 shared: shared.clone(),
222 },
223 TransportIngressNotifier { shared },
224 )
225}
226
227#[cfg(feature = "std")]
228fn new_shared_mailbox(capacity: usize) -> SharedMailboxHandle {
229 Arc::new(SharedMailbox {
230 storage: MailboxStorage {
231 state: Mutex::new(MailboxState::default()),
232 },
233 capacity,
234 notifier: new_change_notifier(),
235 })
236}
237
238#[cfg(all(feature = "std", not(target_arch = "wasm32")))]
239fn new_change_notifier() -> MailboxChangeNotifier {
240 MailboxChangeNotifier {
241 changed: Condvar::new(),
242 }
243}
244
245#[cfg(all(feature = "std", target_arch = "wasm32"))]
246fn new_change_notifier() -> MailboxChangeNotifier {
247 MailboxChangeNotifier
248}
249
250#[cfg(not(feature = "std"))]
251fn new_shared_mailbox(capacity: usize) -> SharedMailboxHandle {
252 Rc::new(SharedMailbox {
253 storage: MailboxStorage {
254 state: RefCell::new(MailboxState::default()),
255 },
256 capacity,
257 notifier: MailboxChangeNotifier,
258 })
259}
260
261impl TransportIngressSender {
262 pub fn emit(
263 &self,
264 class: TransportIngressClass,
265 event: TransportIngressEvent,
266 ) -> Result<TransportIngressSendOutcome, ControlIngressOverflow> {
267 let (result, waiter) = self.shared.with_state(|state| {
268 if state.events.len() >= self.shared.capacity {
269 if class == TransportIngressClass::Payload {
270 state.dropped_payload_count = state.dropped_payload_count.saturating_add(1);
271 SharedMailbox::bump_generation(state);
272 let waiter = SharedMailbox::take_waiter(state);
273 return (Ok(TransportIngressSendOutcome::DroppedPayload), waiter);
274 }
275 return (Err(ControlIngressOverflow), None);
276 }
277
278 state.events.push_back(event);
279 SharedMailbox::bump_generation(state);
280 let waiter = SharedMailbox::take_waiter(state);
281 (Ok(TransportIngressSendOutcome::Enqueued), waiter)
282 });
283 if waiter.is_some() || result.is_ok() {
284 self.shared.notify_changed();
285 }
286 SharedMailbox::wake_waiter(waiter);
287 result
288 }
289}
290
291impl TransportIngressReceiver {
292 #[must_use]
293 pub fn drain(&mut self) -> TransportIngressDrain {
294 self.shared.with_state(|state| TransportIngressDrain {
295 events: state.events.drain(..).collect(),
296 dropped_payload_count: mem::take(&mut state.dropped_payload_count),
297 })
298 }
299}
300
301impl TransportIngressNotifier {
302 #[must_use]
303 pub fn snapshot(&self) -> u64 {
304 self.shared.with_state(|state| state.generation)
305 }
306
307 #[must_use]
308 pub fn has_changed_since(&self, snapshot: u64) -> bool {
309 self.snapshot() != snapshot
310 }
311
312 #[cfg(all(feature = "std", not(target_arch = "wasm32")))]
313 pub fn wait_for_change(&self, snapshot: u64) {
314 let mut guard = self.shared.lock_state();
315 while guard.generation == snapshot {
316 guard = self
317 .shared
318 .notifier
319 .changed
320 .wait(guard)
321 .unwrap_or_else(|poisoned| poisoned.into_inner());
322 }
323 }
324
325 #[cfg(all(feature = "std", not(target_arch = "wasm32")))]
326 #[must_use]
327 pub fn wait_for_change_within_ms(&self, snapshot: u64, wait_ms: DurationMs) -> bool {
328 let guard = self.shared.lock_state();
329 let std_wait = SharedMailbox::std_duration(wait_ms);
330 let (guard, _) = self
331 .shared
332 .notifier
333 .changed
334 .wait_timeout_while(guard, std_wait, |state| state.generation == snapshot)
335 .unwrap_or_else(|poisoned| poisoned.into_inner());
336 guard.generation != snapshot
337 }
338
339 #[must_use]
340 pub fn changed(&self, snapshot: u64) -> TransportIngressChanged<'_> {
341 TransportIngressChanged {
342 notifier: self,
343 snapshot,
344 }
345 }
346}
347
348impl Future for TransportIngressChanged<'_> {
349 type Output = u64;
350
351 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
352 self.notifier.shared.with_state(|state| {
353 if state.generation != self.snapshot {
354 return Poll::Ready(state.generation);
355 }
356
357 match &state.waiter {
358 Some(waiter) if waiter.will_wake(cx.waker()) => {}
359 _ => {
360 state.waiter = Some(cx.waker().clone());
361 }
362 }
363 Poll::Pending
364 })
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use std::{
371 future::Future,
372 sync::{
373 atomic::{AtomicBool, Ordering},
374 Arc, Barrier,
375 },
376 task::{Context, Poll, Wake, Waker},
377 };
378
379 #[cfg(not(target_arch = "wasm32"))]
380 use std::thread;
381
382 use jacquard_core::{ByteCount, EndpointLocator, NodeId, TransportKind};
383
384 #[cfg(not(target_arch = "wasm32"))]
385 use jacquard_core::DurationMs;
386
387 use super::{transport_ingress_mailbox, TransportIngressClass, TransportIngressSendOutcome};
388
389 fn payload(byte: u8) -> jacquard_core::TransportIngressEvent {
390 jacquard_core::TransportIngressEvent::PayloadReceived {
391 from_node_id: NodeId([byte; 32]),
392 endpoint: jacquard_core::LinkEndpoint::new(
393 TransportKind::WifiAware,
394 EndpointLocator::Opaque(vec![byte]),
395 ByteCount(64),
396 ),
397 payload: vec![byte],
398 }
399 }
400
401 #[cfg(not(target_arch = "wasm32"))]
402 #[expect(
403 clippy::disallowed_types,
404 reason = "std thread sleep and park APIs require std::time::Duration in tests"
405 )]
406 fn std_duration(duration_ms: DurationMs) -> std::time::Duration {
407 std::time::Duration::from_millis(u64::from(duration_ms.0))
408 }
409
410 #[test]
411 fn payload_overflow_is_accounted_for_explicitly() {
412 let (sender, mut receiver, _) = transport_ingress_mailbox(1);
413
414 assert_eq!(
415 sender
416 .emit(TransportIngressClass::Payload, payload(1))
417 .expect("enqueue payload"),
418 TransportIngressSendOutcome::Enqueued
419 );
420 assert_eq!(
421 sender
422 .emit(TransportIngressClass::Payload, payload(2))
423 .expect("drop payload"),
424 TransportIngressSendOutcome::DroppedPayload
425 );
426
427 let drain = receiver.drain();
428 assert_eq!(drain.events.len(), 1);
429 assert_eq!(drain.dropped_payload_count, 1);
430 }
431
432 #[test]
433 fn control_path_overflow_fails_closed() {
434 let (sender, _, _) = transport_ingress_mailbox(1);
435
436 sender
437 .emit(TransportIngressClass::Control, payload(1))
438 .expect("enqueue control");
439 let error = sender
440 .emit(TransportIngressClass::Control, payload(2))
441 .expect_err("control overflow must fail closed");
442
443 assert_eq!(error.to_string(), "control ingress queue is full");
444 }
445
446 #[test]
447 #[cfg(not(target_arch = "wasm32"))]
448 fn notifier_timeout_reports_when_no_change_arrives() {
449 let (_, _, notifier) = transport_ingress_mailbox(1);
450 let snapshot = notifier.snapshot();
451
452 assert!(!notifier.wait_for_change_within_ms(snapshot, DurationMs(5)));
453 }
454
455 #[test]
456 #[cfg(not(target_arch = "wasm32"))]
457 fn notifier_timeout_reports_when_change_arrives() {
458 let (sender, _, notifier) = transport_ingress_mailbox(1);
459 let snapshot = notifier.snapshot();
460
461 thread::spawn(move || {
462 thread::sleep(std_duration(DurationMs(5)));
463 sender
464 .emit(TransportIngressClass::Payload, payload(7))
465 .expect("enqueue payload");
466 });
467
468 assert!(notifier.wait_for_change_within_ms(snapshot, DurationMs(50)));
469 }
470
471 #[test]
472 fn receiver_drain_clears_events_and_drop_counts() {
473 let (sender, mut receiver, _) = transport_ingress_mailbox(2);
474 sender
475 .emit(TransportIngressClass::Payload, payload(1))
476 .expect("enqueue payload");
477 sender
478 .emit(TransportIngressClass::Payload, payload(2))
479 .expect("enqueue payload");
480
481 let first = receiver.drain();
482 assert_eq!(first.events.len(), 2);
483 assert_eq!(first.dropped_payload_count, 0);
484
485 let second = receiver.drain();
486 assert!(second.events.is_empty());
487 assert_eq!(second.dropped_payload_count, 0);
488 }
489
490 #[test]
491 #[cfg(not(target_arch = "wasm32"))]
492 fn notifier_wakes_after_ingress_change() {
493 let (sender, _, notifier) = transport_ingress_mailbox(1);
494 let snapshot = notifier.snapshot();
495 let start = Arc::new(Barrier::new(2));
496 let ready = Arc::clone(&start);
497 let wait_notifier = notifier.clone();
498
499 let handle = thread::spawn(move || {
500 ready.wait();
501 wait_notifier.wait_for_change(snapshot);
502 });
503
504 start.wait();
505 sender
506 .emit(TransportIngressClass::Payload, payload(9))
507 .expect("enqueue payload");
508
509 handle.join().expect("notifier waiter");
510 assert!(notifier.has_changed_since(snapshot));
511 }
512
513 #[test]
514 fn changed_future_wakes_after_ingress_change() {
515 #[derive(Debug)]
516 struct FlagWaker {
517 woke: Arc<AtomicBool>,
518 thread: thread::Thread,
519 }
520
521 impl Wake for FlagWaker {
522 fn wake(self: Arc<Self>) {
523 self.woke.store(true, Ordering::SeqCst);
524 self.thread.unpark();
525 }
526
527 fn wake_by_ref(self: &Arc<Self>) {
528 self.woke.store(true, Ordering::SeqCst);
529 self.thread.unpark();
530 }
531 }
532
533 let (sender, _, notifier) = transport_ingress_mailbox(1);
534 let snapshot = notifier.snapshot();
535 let woke = Arc::new(AtomicBool::new(false));
536 let waker = Waker::from(Arc::new(FlagWaker {
537 woke: Arc::clone(&woke),
538 thread: thread::current(),
539 }));
540 let mut context = Context::from_waker(&waker);
541 let mut changed = Box::pin(notifier.changed(snapshot));
542
543 assert!(matches!(changed.as_mut().poll(&mut context), Poll::Pending));
544
545 thread::spawn(move || {
546 thread::sleep(std_duration(DurationMs(5)));
547 sender
548 .emit(TransportIngressClass::Payload, payload(8))
549 .expect("enqueue payload");
550 });
551
552 while !woke.load(Ordering::SeqCst) {
553 thread::park_timeout(std_duration(DurationMs(50)));
554 }
555
556 assert!(matches!(
557 changed.as_mut().poll(&mut context),
558 Poll::Ready(_)
559 ));
560 }
561
562 #[test]
563 fn changed_future_keeps_single_waiter_slot() {
564 #[derive(Debug)]
565 struct NoopWaker;
566
567 impl Wake for NoopWaker {
568 fn wake(self: Arc<Self>) {}
569 }
570
571 let (_, _, notifier) = transport_ingress_mailbox(1);
572 let snapshot = notifier.snapshot();
573 let first_waker = Waker::from(Arc::new(NoopWaker));
574 let second_waker = Waker::from(Arc::new(NoopWaker));
575 let mut first_context = Context::from_waker(&first_waker);
576 let mut second_context = Context::from_waker(&second_waker);
577 let mut first = Box::pin(notifier.changed(snapshot));
578 let mut second = Box::pin(notifier.changed(snapshot));
579
580 assert!(matches!(
581 first.as_mut().poll(&mut first_context),
582 Poll::Pending
583 ));
584 assert!(matches!(
585 second.as_mut().poll(&mut second_context),
586 Poll::Pending
587 ));
588
589 notifier.shared.with_state(|state| {
590 assert!(state.waiter.is_some());
591 });
592 }
593}