simcore/async_mode/
queue.rs

1//! Queue for producer-consumer communication between asynchronous tasks.
2
3use std::cell::RefCell;
4use std::collections::VecDeque;
5use std::future::Future;
6use std::pin::Pin;
7use std::rc::Rc;
8use std::task::{Context, Poll};
9
10use rustc_hash::FxHashSet;
11use serde::Serialize;
12
13use crate::SimulationContext;
14
15/// A simple implementation of unbounded multi-producer multi-consumer queue with items of type `T`.
16///
17/// The items are guarantied to be delivered to consumers in the order of [`take`](UnboundedQueue::take) calls.
18pub struct UnboundedQueue<T> {
19    items: RefCell<VecDeque<T>>,
20    send_ticket: Ticket,
21    receive_ticket: Ticket,
22    dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
23    ctx: SimulationContext,
24}
25
26impl<T> UnboundedQueue<T> {
27    pub(crate) fn new(ctx: SimulationContext) -> Self {
28        ctx.register_key_getter_for::<ConsumerNotify>(|notify| notify.ticket_id);
29        Self {
30            items: RefCell::new(VecDeque::new()),
31            send_ticket: Ticket::new(),
32            receive_ticket: Ticket::new(),
33            dropped_tickets: Rc::new(RefCell::new(FxHashSet::default())),            
34            ctx,
35        }
36    }
37
38    /// Inserts the specified item into the queue without blocking.
39    pub fn put(&self, item: T) {
40        self.send_ticket.next();
41        let mut dropped_tickets = self.dropped_tickets.borrow_mut();
42        while dropped_tickets.remove(&self.send_ticket.value()) {
43            self.send_ticket.next();
44        }
45        self.items.borrow_mut().push_back(item);
46        // notify awaiting consumer if needed
47        if self.receive_ticket.is_after(&self.send_ticket) {
48            self.ctx.emit_self_now(ConsumerNotify {
49                ticket_id: self.send_ticket.value(),
50            });
51        }
52    }
53
54    /// Removes the head of the queue and returns it, waiting if necessary until an item becomes available.
55    ///
56    /// This function is asynchronous and its result (future) must be awaited.
57    /// If multiple consumers are waiting for item, the items will be delivered in the order of [`take`](Self::take) calls.
58    pub async fn take(&self) -> T {
59        self.receive_ticket.next();
60        ElementFutureWrapper::from_future(
61            async {
62                // wait for notification from producer side if the queue is empty
63                if self.items.borrow().is_empty() {
64                    self.ctx
65                        .recv_event_by_key_from_self::<ConsumerNotify>(self.receive_ticket.value())
66                        .await;
67                }
68                self.items.borrow_mut().pop_front().unwrap()
69            },
70            self.receive_ticket.value(),
71            self.dropped_tickets.clone(),
72        )
73        .await
74    }
75}
76
77type TicketID = u64;
78
79#[derive(Serialize, Clone)]
80struct ConsumerNotify {
81    ticket_id: TicketID,
82}
83
84struct Ticket {
85    value: RefCell<TicketID>,
86}
87
88impl Ticket {
89    fn new() -> Self {
90        Self { value: RefCell::new(0) }
91    }
92
93    fn next(&self) {
94        *self.value.borrow_mut() += 1;
95    }
96
97    fn is_after(&self, other: &Self) -> bool {
98        *self.value.borrow() >= *other.value.borrow()
99    }
100
101    fn value(&self) -> TicketID {
102        *self.value.borrow()
103    }
104}
105
106struct ElementFutureWrapper<'a, T> {
107    element_future: Pin<Box<dyn Future<Output = T> + 'a>>,
108    ticket_id: TicketID,
109    dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,    
110    completed: bool,
111}
112
113impl<'a, T> ElementFutureWrapper<'a, T> {
114    fn from_future(
115        element_future: impl Future<Output = T> + 'a,
116        ticket_id: TicketID,
117        dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
118    ) -> Self {
119        Self {
120            element_future: Box::pin(element_future),
121            ticket_id,
122            dropped_tickets,            
123            completed: false,
124        }
125    }
126}
127
128impl<'a, T> Future for ElementFutureWrapper<'a, T> {
129    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130        match self.element_future.as_mut().poll(cx) {
131            Poll::Ready(output) => {
132                self.completed = true;
133                Poll::Ready(output)
134            }
135            Poll::Pending => Poll::Pending,
136        }
137    }
138
139    type Output = T;
140}
141
142impl<'a, T> Drop for ElementFutureWrapper<'a, T> {
143    fn drop(&mut self) {
144        if !self.completed {
145            self.dropped_tickets.borrow_mut().insert(self.ticket_id);
146        }
147    }
148}