use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use rustc_hash::FxHashSet;
use serde::Serialize;
use crate::SimulationContext;
pub struct UnboundedQueue<T> {
items: RefCell<VecDeque<T>>,
send_ticket: Ticket,
receive_ticket: Ticket,
dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
ctx: SimulationContext,
}
impl<T> UnboundedQueue<T> {
pub(crate) fn new(ctx: SimulationContext) -> Self {
ctx.register_key_getter_for::<ConsumerNotify>(|notify| notify.ticket_id);
Self {
items: RefCell::new(VecDeque::new()),
send_ticket: Ticket::new(),
receive_ticket: Ticket::new(),
dropped_tickets: Rc::new(RefCell::new(FxHashSet::default())),
ctx,
}
}
pub fn put(&self, item: T) {
self.send_ticket.next();
let mut dropped_tickets = self.dropped_tickets.borrow_mut();
while dropped_tickets.remove(&self.send_ticket.value()) {
self.send_ticket.next();
}
self.items.borrow_mut().push_back(item);
if self.receive_ticket.is_after(&self.send_ticket) {
self.ctx.emit_self_now(ConsumerNotify {
ticket_id: self.send_ticket.value(),
});
}
}
pub async fn take(&self) -> T {
self.receive_ticket.next();
ElementFutureWrapper::from_future(
async {
if self.items.borrow().is_empty() {
self.ctx
.recv_event_by_key_from_self::<ConsumerNotify>(self.receive_ticket.value())
.await;
}
self.items.borrow_mut().pop_front().unwrap()
},
self.receive_ticket.value(),
self.dropped_tickets.clone(),
)
.await
}
}
type TicketID = u64;
#[derive(Serialize, Clone)]
struct ConsumerNotify {
ticket_id: TicketID,
}
struct Ticket {
value: RefCell<TicketID>,
}
impl Ticket {
fn new() -> Self {
Self { value: RefCell::new(0) }
}
fn next(&self) {
*self.value.borrow_mut() += 1;
}
fn is_after(&self, other: &Self) -> bool {
*self.value.borrow() >= *other.value.borrow()
}
fn value(&self) -> TicketID {
*self.value.borrow()
}
}
struct ElementFutureWrapper<'a, T> {
element_future: Pin<Box<dyn Future<Output = T> + 'a>>,
ticket_id: TicketID,
dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
completed: bool,
}
impl<'a, T> ElementFutureWrapper<'a, T> {
fn from_future(
element_future: impl Future<Output = T> + 'a,
ticket_id: TicketID,
dropped_tickets: Rc<RefCell<FxHashSet<TicketID>>>,
) -> Self {
Self {
element_future: Box::pin(element_future),
ticket_id,
dropped_tickets,
completed: false,
}
}
}
impl<'a, T> Future for ElementFutureWrapper<'a, T> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.element_future.as_mut().poll(cx) {
Poll::Ready(output) => {
self.completed = true;
Poll::Ready(output)
}
Poll::Pending => Poll::Pending,
}
}
type Output = T;
}
impl<'a, T> Drop for ElementFutureWrapper<'a, T> {
fn drop(&mut self) {
if !self.completed {
self.dropped_tickets.borrow_mut().insert(self.ticket_id);
}
}
}