use crate::stream::streamable::CloneableStreamable;
use crate::stream::{Stream, StreamMessage};
use actix::prelude::*;
use futures::channel::oneshot;
use futures::future::BoxFuture; use futures::FutureExt; use std::collections::VecDeque;
use std::fmt::Debug;
use std::marker::PhantomData;
use tokio::sync::mpsc; use tokio::task;
#[derive(Debug)]
pub enum QueueOfferError<A: CloneableStreamable> {
Full(A),
Closed(A),
}
#[derive(Debug)]
pub enum QueuePollError {
Empty,
Closed,
}
#[derive(Message)]
#[rtype(result = "()")] struct OfferMessage<A: CloneableStreamable> {
item: A,
tx: oneshot::Sender<Result<(), QueueOfferError<A>>>,
}
#[derive(Message)]
#[rtype(result = "()")] struct PollMessage<A: CloneableStreamable> {
tx: oneshot::Sender<Result<A, QueuePollError>>,
}
#[derive(Message)]
#[rtype(result = "usize")]
struct SizeMessage;
#[derive(Message)]
#[rtype(result = "bool")]
struct IsEmptyMessage;
#[derive(Message)]
#[rtype(result = "bool")]
struct IsFullMessage;
#[derive(Message)]
#[rtype(result = "bool")]
struct IsClosedMessage;
#[derive(Message)]
#[rtype(result = "()")]
struct CloseMessage;
#[derive(Message)]
#[rtype(result = "()")] struct ConsumeMessage<A: CloneableStreamable> {
tx: oneshot::Sender<Option<Stream<A>>>,
_phantom_a: PhantomData<A>,
}
struct QueueActor<A: CloneableStreamable + 'static> {
buffer: VecDeque<A>,
capacity: usize,
closed: bool,
pending_polls: VecDeque<oneshot::Sender<Result<A, QueuePollError>>>,
}
impl<A: CloneableStreamable + 'static> QueueActor<A> {
fn new(capacity: usize) -> Self {
QueueActor {
buffer: VecDeque::with_capacity(capacity),
capacity,
closed: false,
pending_polls: VecDeque::new(),
}
}
fn try_satisfy_pending_poll(&mut self) {
if !self.buffer.is_empty() {
if let Some(waiting_tx) = self.pending_polls.pop_front() {
if let Some(item) = self.buffer.pop_front() {
let _ = waiting_tx.send(Ok(item));
}
}
}
}
}
impl<A: CloneableStreamable + 'static> Actor for QueueActor<A> {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
if !self.closed {
self.closed = true;
}
while let Some(tx) = self.pending_polls.pop_front() {
let _ = tx.send(Err(QueuePollError::Closed));
}
Running::Stop
}
}
impl<A: CloneableStreamable + 'static> Handler<OfferMessage<A>> for QueueActor<A> {
type Result = ();
fn handle(&mut self, msg: OfferMessage<A>, _ctx: &mut Context<Self>) {
if self.closed {
let _ = msg.tx.send(Err(QueueOfferError::Closed(msg.item)));
return;
}
if self.buffer.len() >= self.capacity {
let _ = msg.tx.send(Err(QueueOfferError::Full(msg.item)));
return;
}
self.buffer.push_back(msg.item);
self.try_satisfy_pending_poll();
let _ = msg.tx.send(Ok(()));
}
}
impl<A: CloneableStreamable + 'static> Handler<PollMessage<A>> for QueueActor<A> {
type Result = ();
fn handle(&mut self, msg: PollMessage<A>, _ctx: &mut Context<Self>) {
if !self.buffer.is_empty() {
let item = self.buffer.pop_front().unwrap();
let _ = msg.tx.send(Ok(item));
return;
}
if self.closed {
let _ = msg.tx.send(Err(QueuePollError::Closed));
return;
}
self.pending_polls.push_back(msg.tx);
}
}
impl<A: CloneableStreamable + 'static> Handler<SizeMessage> for QueueActor<A> {
type Result = usize;
fn handle(&mut self, _msg: SizeMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.buffer.len()
}
}
impl<A: CloneableStreamable + 'static> Handler<IsEmptyMessage> for QueueActor<A> {
type Result = bool;
fn handle(&mut self, _msg: IsEmptyMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.buffer.is_empty()
}
}
impl<A: CloneableStreamable + 'static> Handler<IsFullMessage> for QueueActor<A> {
type Result = bool;
fn handle(&mut self, _msg: IsFullMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.buffer.len() >= self.capacity
}
}
impl<A: CloneableStreamable + 'static> Handler<IsClosedMessage> for QueueActor<A> {
type Result = bool;
fn handle(&mut self, _msg: IsClosedMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.closed
}
}
impl<A: CloneableStreamable + 'static> Handler<CloseMessage> for QueueActor<A> {
type Result = ();
fn handle(&mut self, _msg: CloseMessage, _ctx: &mut Context<Self>) {
if !self.closed {
self.closed = true;
while let Some(tx) = self.pending_polls.pop_front() {
let _ = tx.send(Err(QueuePollError::Closed));
}
}
}
}
impl<A: CloneableStreamable + 'static> Handler<ConsumeMessage<A>> for QueueActor<A> {
type Result = ();
fn handle(&mut self, msg: ConsumeMessage<A>, _ctx: &mut Context<Self>) {
if self.closed && self.buffer.is_empty() {
let _ = msg.tx.send(None);
return;
}
let items: Vec<A> = self.buffer.drain(..).collect();
if !self.closed {
self.closed = true;
}
while let Some(poll_tx) = self.pending_polls.pop_front() {
let _ = poll_tx.send(Err(QueuePollError::Closed));
}
let stream_setup_fn = move |downstream_recipient: Recipient<StreamMessage<A>>| -> BoxFuture<'static, Result<(), String>> {
async move {
for item in items {
if downstream_recipient.try_send(StreamMessage::Element(item)).is_err() {
return Err("Downstream recipient closed during static queue item processing".to_string());
}
}
Ok(())
}
.boxed()
};
let stream = Stream {
setup_fn: Box::new(stream_setup_fn),
_phantom: PhantomData,
};
let _ = msg.tx.send(Some(stream));
}
}
#[derive(Debug)]
pub struct VuoQueue<A: CloneableStreamable> {
actor_addr: Addr<QueueActor<A>>,
_phantom_a: PhantomData<A>,
}
impl<A: CloneableStreamable> Clone for VuoQueue<A> {
fn clone(&self) -> Self {
VuoQueue {
actor_addr: self.actor_addr.clone(),
_phantom_a: PhantomData,
}
}
}
impl<A: CloneableStreamable + 'static> VuoQueue<A> {
pub fn new(capacity: usize) -> Self {
let effective_capacity = if capacity == 0 { 1 } else { capacity };
let actor_addr = QueueActor::new(effective_capacity).start();
VuoQueue {
actor_addr,
_phantom_a: PhantomData,
}
}
pub async fn offer(&self, item: A) -> Result<(), QueueOfferError<A>> {
let (tx, rx) = oneshot::channel();
self.actor_addr.do_send(OfferMessage {
item: item.clone(),
tx,
});
match rx.await {
Ok(res) => res,
Err(_) => Err(QueueOfferError::Closed(item)),
}
}
pub async fn poll(&self) -> Result<A, QueuePollError> {
let (tx, rx) = oneshot::channel();
self.actor_addr.do_send(PollMessage { tx });
match rx.await {
Ok(res) => res,
Err(_) => Err(QueuePollError::Closed),
}
}
pub async fn size(&self) -> usize {
self.actor_addr.send(SizeMessage).await.unwrap_or(0)
}
pub async fn is_empty(&self) -> bool {
self.actor_addr.send(IsEmptyMessage).await.unwrap_or(true)
}
pub async fn is_full(&self) -> bool {
self.actor_addr.send(IsFullMessage).await.unwrap_or(false)
}
pub async fn is_closed(&self) -> bool {
self.actor_addr.send(IsClosedMessage).await.unwrap_or(true)
}
pub fn close(&self) {
self.actor_addr.do_send(CloseMessage);
}
pub async fn consume(self) -> Option<Stream<A>> {
let (tx, rx) = oneshot::channel();
self.actor_addr.do_send(ConsumeMessage {
tx,
_phantom_a: PhantomData,
});
rx.await.unwrap_or(None)
}
pub fn dequeue_stream(&self) -> Stream<A> {
let queue_for_polling_task = self.clone();
const MPSC_BUFFER_SIZE: usize = 16; let (mpsc_sender, mpsc_receiver) = mpsc::channel::<StreamMessage<A>>(MPSC_BUFFER_SIZE);
task::spawn_local(async move {
loop {
match queue_for_polling_task.poll().await {
Ok(item) => {
if mpsc_sender
.send(StreamMessage::Element(item))
.await
.is_err()
{
break;
}
}
Err(QueuePollError::Closed) | Err(QueuePollError::Empty) => {
let _ = mpsc_sender.send(StreamMessage::End).await;
break; }
}
}
});
let setup_fn_closure = move |downstream_actix_recipient: Recipient<StreamMessage<A>>| -> BoxFuture<'static, Result<(), String>> {
let mut mutable_mpsc_receiver = mpsc_receiver;
async move {
loop {
match mutable_mpsc_receiver.recv().await {
Some(StreamMessage::Element(item)) => {
if downstream_actix_recipient.try_send(StreamMessage::Element(item)).is_err() {
return Err(String::from("Downstream Actix recipient is gone"));
}
}
Some(StreamMessage::End) => {
let _ = downstream_actix_recipient.try_send(StreamMessage::End);
return Ok(());
}
None => {
let _ = downstream_actix_recipient.try_send(StreamMessage::End);
return Ok(());
}
}
}
}
.boxed() };
Stream {
setup_fn: Box::new(setup_fn_closure),
_phantom: PhantomData,
}
}
}