simcore/async_mode/
queue.rs1use std::cell::RefCell;
4use std::collections::VecDeque;
5use std::future::Future;
6use std::pin::Pin;
7use std::rc::Rc;
8use std::task::{Context, Poll};
9
10use rustc_hash::FxHashSet;
11use serde::Serialize;
12
13use crate::SimulationContext;
14
15pub struct UnboundedQueue<T> {
19 items: RefCell<VecDeque<T>>,
20 send_ticket: Ticket,
21 receive_ticket: Ticket,
22 dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
23 ctx: SimulationContext,
24}
25
26impl<T> UnboundedQueue<T> {
27 pub(crate) fn new(ctx: SimulationContext) -> Self {
28 ctx.register_key_getter_for::<ConsumerNotify>(|notify| notify.ticket_id);
29 Self {
30 items: RefCell::new(VecDeque::new()),
31 send_ticket: Ticket::new(),
32 receive_ticket: Ticket::new(),
33 dropped_tickets: Rc::new(RefCell::new(FxHashSet::default())),
34 ctx,
35 }
36 }
37
38 pub fn put(&self, item: T) {
40 self.send_ticket.next();
41 let mut dropped_tickets = self.dropped_tickets.borrow_mut();
42 while dropped_tickets.remove(&self.send_ticket.value()) {
43 self.send_ticket.next();
44 }
45 self.items.borrow_mut().push_back(item);
46 if self.receive_ticket.is_after(&self.send_ticket) {
48 self.ctx.emit_self_now(ConsumerNotify {
49 ticket_id: self.send_ticket.value(),
50 });
51 }
52 }
53
54 pub async fn take(&self) -> T {
59 self.receive_ticket.next();
60 ElementFutureWrapper::from_future(
61 async {
62 if self.items.borrow().is_empty() {
64 self.ctx
65 .recv_event_by_key_from_self::<ConsumerNotify>(self.receive_ticket.value())
66 .await;
67 }
68 self.items.borrow_mut().pop_front().unwrap()
69 },
70 self.receive_ticket.value(),
71 self.dropped_tickets.clone(),
72 )
73 .await
74 }
75}
76
77type TicketID = u64;
78
79#[derive(Serialize, Clone)]
80struct ConsumerNotify {
81 ticket_id: TicketID,
82}
83
84struct Ticket {
85 value: RefCell<TicketID>,
86}
87
88impl Ticket {
89 fn new() -> Self {
90 Self { value: RefCell::new(0) }
91 }
92
93 fn next(&self) {
94 *self.value.borrow_mut() += 1;
95 }
96
97 fn is_after(&self, other: &Self) -> bool {
98 *self.value.borrow() >= *other.value.borrow()
99 }
100
101 fn value(&self) -> TicketID {
102 *self.value.borrow()
103 }
104}
105
106struct ElementFutureWrapper<'a, T> {
107 element_future: Pin<Box<dyn Future<Output = T> + 'a>>,
108 ticket_id: TicketID,
109 dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
110 completed: bool,
111}
112
113impl<'a, T> ElementFutureWrapper<'a, T> {
114 fn from_future(
115 element_future: impl Future<Output = T> + 'a,
116 ticket_id: TicketID,
117 dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
118 ) -> Self {
119 Self {
120 element_future: Box::pin(element_future),
121 ticket_id,
122 dropped_tickets,
123 completed: false,
124 }
125 }
126}
127
128impl<'a, T> Future for ElementFutureWrapper<'a, T> {
129 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 match self.element_future.as_mut().poll(cx) {
131 Poll::Ready(output) => {
132 self.completed = true;
133 Poll::Ready(output)
134 }
135 Poll::Pending => Poll::Pending,
136 }
137 }
138
139 type Output = T;
140}
141
142impl<'a, T> Drop for ElementFutureWrapper<'a, T> {
143 fn drop(&mut self) {
144 if !self.completed {
145 self.dropped_tickets.borrow_mut().insert(self.ticket_id);
146 }
147 }
148}