1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
use std::sync::{ mpsc::{channel, Receiver, Sender}, Mutex, }; use crate::{Envelope, Message}; pub fn queue<Msg: Message>() -> (QueueWriter<Msg>, QueueReader<Msg>) { let (tx, rx) = channel::<Envelope<Msg>>(); let qw = QueueWriter { tx }; let qr = QueueReaderInner { rx, next_item: None, }; let qr = QueueReader { inner: Mutex::new(qr), }; (qw, qr) } #[derive(Clone)] pub struct QueueWriter<Msg: Message> { tx: Sender<Envelope<Msg>>, } impl<Msg: Message> QueueWriter<Msg> { pub fn try_enqueue(&self, msg: Envelope<Msg>) -> EnqueueResult<Msg> { self.tx .send(msg) .map(|_| ()) .map_err(|e| EnqueueError { msg: e.0 }) } } pub struct QueueReader<Msg: Message> { inner: Mutex<QueueReaderInner<Msg>>, } struct QueueReaderInner<Msg: Message> { rx: Receiver<Envelope<Msg>>, next_item: Option<Envelope<Msg>>, } impl<Msg: Message> QueueReader<Msg> { #[allow(dead_code)] pub fn dequeue(&self) -> Envelope<Msg> { let mut inner = self.inner.lock().unwrap(); if let Some(item) = inner.next_item.take() { item } else { inner.rx.recv().unwrap() } } pub fn try_dequeue(&self) -> DequeueResult<Envelope<Msg>> { let mut inner = self.inner.lock().unwrap(); if let Some(item) = inner.next_item.take() { Ok(item) } else { inner.rx.try_recv().map_err(|_| QueueEmpty) } } pub fn has_msgs(&self) -> bool { let mut inner = self.inner.lock().unwrap(); inner.next_item.is_some() || { match inner.rx.try_recv() { Ok(item) => { inner.next_item = Some(item); true } Err(_) => false, } } } } #[derive(Clone, Debug)] pub struct EnqueueError<T> { pub msg: T, } pub type EnqueueResult<Msg> = Result<(), EnqueueError<Envelope<Msg>>>; pub struct QueueEmpty; pub type DequeueResult<Msg> = Result<Msg, QueueEmpty>;