jacquard_host_support/
dispatch.rs1use alloc::{collections::VecDeque, vec::Vec};
13use core::fmt;
14
15#[cfg(not(feature = "std"))]
16use alloc::rc::Rc;
17#[cfg(not(feature = "std"))]
18use core::cell::RefCell;
19#[cfg(feature = "std")]
20use std::sync::{Arc, Mutex};
21
22use jacquard_macros::public_model;
23use serde::{Deserialize, Serialize};
24
25#[public_model]
26#[derive(Clone, Copy, Debug, PartialEq, Eq)]
27pub enum DispatchSendOutcome {
28 Enqueued,
29}
30
31#[public_model]
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub struct DispatchOverflow;
34
35impl fmt::Display for DispatchOverflow {
36 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
37 formatter.write_str("dispatch queue is full")
38 }
39}
40
41#[cfg(feature = "std")]
42impl std::error::Error for DispatchOverflow {}
43
44struct SharedDispatch<T> {
45 queue: DispatchQueue<T>,
46 capacity: usize,
47}
48
49#[cfg(feature = "std")]
50type SharedDispatchHandle<T> = Arc<SharedDispatch<T>>;
51
52#[cfg(not(feature = "std"))]
53type SharedDispatchHandle<T> = Rc<SharedDispatch<T>>;
54
55#[cfg(feature = "std")]
56type DispatchQueue<T> = Mutex<VecDeque<T>>;
57
58#[cfg(not(feature = "std"))]
59type DispatchQueue<T> = RefCell<VecDeque<T>>;
60
61#[derive(Clone)]
62pub struct DispatchSender<T> {
63 shared: SharedDispatchHandle<T>,
64}
65
66pub struct DispatchReceiver<T> {
67 shared: SharedDispatchHandle<T>,
68}
69
70#[cfg(feature = "std")]
71fn new_queue<T>() -> DispatchQueue<T> {
72 Mutex::new(VecDeque::new())
73}
74
75#[cfg(not(feature = "std"))]
76fn new_queue<T>() -> DispatchQueue<T> {
77 RefCell::new(VecDeque::new())
78}
79
80#[cfg(feature = "std")]
81fn with_queue<T, Output>(
82 queue: &DispatchQueue<T>,
83 operation: impl FnOnce(&mut VecDeque<T>) -> Output,
84) -> Output {
85 let mut guard = queue
86 .lock()
87 .unwrap_or_else(|poisoned| poisoned.into_inner());
88 operation(&mut guard)
89}
90
91#[cfg(not(feature = "std"))]
92fn with_queue<T, Output>(
93 queue: &DispatchQueue<T>,
94 operation: impl FnOnce(&mut VecDeque<T>) -> Output,
95) -> Output {
96 let mut guard = queue.borrow_mut();
97 operation(&mut guard)
98}
99
100#[must_use]
101pub fn dispatch_mailbox<T>(capacity: usize) -> (DispatchSender<T>, DispatchReceiver<T>) {
102 assert!(capacity > 0, "dispatch mailbox capacity must be non-zero");
103 let shared = SharedDispatchHandle::new(SharedDispatch {
104 queue: new_queue(),
105 capacity,
106 });
107 (
108 DispatchSender {
109 shared: shared.clone(),
110 },
111 DispatchReceiver { shared },
112 )
113}
114
115impl<T> DispatchSender<T> {
116 pub fn send(&self, item: T) -> Result<DispatchSendOutcome, DispatchOverflow> {
117 with_queue(&self.shared.queue, |queue| {
118 if queue.len() >= self.shared.capacity {
119 return Err(DispatchOverflow);
120 }
121 queue.push_back(item);
122 Ok(DispatchSendOutcome::Enqueued)
123 })
124 }
125}
126
127impl<T> DispatchReceiver<T> {
128 #[must_use]
129 pub fn drain(&mut self) -> Vec<T> {
130 with_queue(&self.shared.queue, |queue| queue.drain(..).collect())
131 }
132
133 #[must_use]
134 pub fn pending_len(&self) -> usize {
135 with_queue(&self.shared.queue, |queue| queue.len())
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::{dispatch_mailbox, DispatchSendOutcome};
142
143 #[test]
144 fn dispatch_mailbox_fails_closed_when_full() {
145 let (sender, _) = dispatch_mailbox(1);
146
147 assert_eq!(
148 sender.send(1).expect("enqueue command"),
149 DispatchSendOutcome::Enqueued
150 );
151 let error = sender
152 .send(2)
153 .expect_err("queue should fail closed when full");
154
155 assert_eq!(error.to_string(), "dispatch queue is full");
156 }
157
158 #[test]
159 fn dispatch_mailbox_drains_in_fifo_order() {
160 let (sender, mut receiver) = dispatch_mailbox(4);
161 sender.send(1).expect("enqueue first");
162 sender.send(2).expect("enqueue second");
163
164 let drained = receiver.drain();
165
166 assert_eq!(drained, vec![1, 2]);
167 assert_eq!(receiver.pending_len(), 0);
168 }
169}