Skip to main content

jacquard_host_support/
dispatch.rs

1//! Bounded fail-closed dispatch mailbox for host-owned outbound work.
2//!
3//! This helper complements the raw ingress mailbox: hosts or bridges that
4//! need to enqueue outbound commands from synchronous capability handlers can
5//! use `dispatch_mailbox(capacity)` to obtain:
6//! - `DispatchSender<T>` — cloneable bounded enqueue handle
7//! - `DispatchReceiver<T>` — single-owner drain/inspection handle
8//!
9//! The mailbox is generic over `T` and stays transport-neutral. It does not
10//! assign Jacquard time or ordering and it does not interpret the queued work.
11
12use alloc::{collections::VecDeque, vec::Vec};
13use core::fmt;
14
15#[cfg(not(feature = "std"))]
16use alloc::rc::Rc;
17#[cfg(not(feature = "std"))]
18use core::cell::RefCell;
19#[cfg(feature = "std")]
20use std::sync::{Arc, Mutex};
21
22use jacquard_macros::public_model;
23use serde::{Deserialize, Serialize};
24
25#[public_model]
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum DispatchSendOutcome {
28    Enqueued,
29}
30
31#[public_model]
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub struct DispatchOverflow;
34
35impl fmt::Display for DispatchOverflow {
36    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
37        formatter.write_str("dispatch queue is full")
38    }
39}
40
41#[cfg(feature = "std")]
42impl std::error::Error for DispatchOverflow {}
43
44struct SharedDispatch<T> {
45    queue: DispatchQueue<T>,
46    capacity: usize,
47}
48
49#[cfg(feature = "std")]
50type SharedDispatchHandle<T> = Arc<SharedDispatch<T>>;
51
52#[cfg(not(feature = "std"))]
53type SharedDispatchHandle<T> = Rc<SharedDispatch<T>>;
54
55#[cfg(feature = "std")]
56type DispatchQueue<T> = Mutex<VecDeque<T>>;
57
58#[cfg(not(feature = "std"))]
59type DispatchQueue<T> = RefCell<VecDeque<T>>;
60
61#[derive(Clone)]
62pub struct DispatchSender<T> {
63    shared: SharedDispatchHandle<T>,
64}
65
66pub struct DispatchReceiver<T> {
67    shared: SharedDispatchHandle<T>,
68}
69
70#[cfg(feature = "std")]
71fn new_queue<T>() -> DispatchQueue<T> {
72    Mutex::new(VecDeque::new())
73}
74
75#[cfg(not(feature = "std"))]
76fn new_queue<T>() -> DispatchQueue<T> {
77    RefCell::new(VecDeque::new())
78}
79
80#[cfg(feature = "std")]
81fn with_queue<T, Output>(
82    queue: &DispatchQueue<T>,
83    operation: impl FnOnce(&mut VecDeque<T>) -> Output,
84) -> Output {
85    let mut guard = queue
86        .lock()
87        .unwrap_or_else(|poisoned| poisoned.into_inner());
88    operation(&mut guard)
89}
90
91#[cfg(not(feature = "std"))]
92fn with_queue<T, Output>(
93    queue: &DispatchQueue<T>,
94    operation: impl FnOnce(&mut VecDeque<T>) -> Output,
95) -> Output {
96    let mut guard = queue.borrow_mut();
97    operation(&mut guard)
98}
99
100#[must_use]
101pub fn dispatch_mailbox<T>(capacity: usize) -> (DispatchSender<T>, DispatchReceiver<T>) {
102    assert!(capacity > 0, "dispatch mailbox capacity must be non-zero");
103    let shared = SharedDispatchHandle::new(SharedDispatch {
104        queue: new_queue(),
105        capacity,
106    });
107    (
108        DispatchSender {
109            shared: shared.clone(),
110        },
111        DispatchReceiver { shared },
112    )
113}
114
115impl<T> DispatchSender<T> {
116    pub fn send(&self, item: T) -> Result<DispatchSendOutcome, DispatchOverflow> {
117        with_queue(&self.shared.queue, |queue| {
118            if queue.len() >= self.shared.capacity {
119                return Err(DispatchOverflow);
120            }
121            queue.push_back(item);
122            Ok(DispatchSendOutcome::Enqueued)
123        })
124    }
125}
126
127impl<T> DispatchReceiver<T> {
128    #[must_use]
129    pub fn drain(&mut self) -> Vec<T> {
130        with_queue(&self.shared.queue, |queue| queue.drain(..).collect())
131    }
132
133    #[must_use]
134    pub fn pending_len(&self) -> usize {
135        with_queue(&self.shared.queue, |queue| queue.len())
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::{dispatch_mailbox, DispatchSendOutcome};
142
143    #[test]
144    fn dispatch_mailbox_fails_closed_when_full() {
145        let (sender, _) = dispatch_mailbox(1);
146
147        assert_eq!(
148            sender.send(1).expect("enqueue command"),
149            DispatchSendOutcome::Enqueued
150        );
151        let error = sender
152            .send(2)
153            .expect_err("queue should fail closed when full");
154
155        assert_eq!(error.to_string(), "dispatch queue is full");
156    }
157
158    #[test]
159    fn dispatch_mailbox_drains_in_fifo_order() {
160        let (sender, mut receiver) = dispatch_mailbox(4);
161        sender.send(1).expect("enqueue first");
162        sender.send(2).expect("enqueue second");
163
164        let drained = receiver.drain();
165
166        assert_eq!(drained, vec![1, 2]);
167        assert_eq!(receiver.pending_len(), 0);
168    }
169}