use crate::stream::streamable::CloneableStreamable;
use actix::prelude::*;
use futures::channel::oneshot;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::marker::PhantomData;
#[derive(Debug, PartialEq, Eq)]
pub enum ChannelSendError<A: CloneableStreamable> {
Full(A),
Closed(A),
}
#[derive(Debug, PartialEq, Eq)]
pub enum ChannelReceiveError {
Empty,
Closed,
}
#[derive(Message)]
#[rtype(result = "()")] struct SendMessage<A: CloneableStreamable> {
item: A,
tx: oneshot::Sender<Result<(), ChannelSendError<A>>>,
}
#[derive(Message)]
#[rtype(result = "()")] struct ReceiveMessage<A: CloneableStreamable> {
tx: oneshot::Sender<Result<A, ChannelReceiveError>>,
}
#[derive(Message)]
#[rtype(result = "()")]
struct CloseMessage;
#[derive(Message)]
#[rtype(result = "usize")]
struct SizeMessage;
#[derive(Message)]
#[rtype(result = "bool")]
struct IsClosedMessage;
#[derive(Debug)]
struct ChannelActor<A: CloneableStreamable + 'static> {
buffer: VecDeque<A>,
capacity: usize,
closed: bool,
pending_receives: VecDeque<oneshot::Sender<Result<A, ChannelReceiveError>>>,
}
impl<A: CloneableStreamable + 'static> ChannelActor<A> {
fn new(capacity: usize) -> Self {
let effective_capacity = if capacity == 0 { 1 } else { capacity };
ChannelActor {
buffer: VecDeque::with_capacity(effective_capacity),
capacity: effective_capacity,
closed: false,
pending_receives: VecDeque::new(),
}
}
fn try_satisfy_pending_receive(&mut self) {
while !self.buffer.is_empty() {
if let Some(waiting_tx) = self.pending_receives.pop_front() {
if let Some(item) = self.buffer.pop_front() {
if waiting_tx.send(Ok(item)).is_err() {
}
} else {
break;
}
} else {
break;
}
}
}
}
impl<A: CloneableStreamable + 'static> Actor for ChannelActor<A> {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
if !self.closed {
self.closed = true;
}
while let Some(tx) = self.pending_receives.pop_front() {
let _ = tx.send(Err(ChannelReceiveError::Closed));
}
Running::Stop
}
}
impl<A: CloneableStreamable + 'static> Handler<SendMessage<A>> for ChannelActor<A> {
type Result = ();
fn handle(&mut self, msg: SendMessage<A>, _ctx: &mut Context<Self>) {
if self.closed {
let _ = msg.tx.send(Err(ChannelSendError::Closed(msg.item)));
return;
}
if self.buffer.len() >= self.capacity {
let _ = msg.tx.send(Err(ChannelSendError::Full(msg.item)));
return;
}
self.buffer.push_back(msg.item);
self.try_satisfy_pending_receive(); let _ = msg.tx.send(Ok(()));
}
}
impl<A: CloneableStreamable + 'static> Handler<ReceiveMessage<A>> for ChannelActor<A> {
type Result = ();
fn handle(&mut self, msg: ReceiveMessage<A>, _ctx: &mut Context<Self>) {
if !self.buffer.is_empty() {
let item = self.buffer.pop_front().unwrap(); let _ = msg.tx.send(Ok(item));
return;
}
if self.closed {
let _ = msg.tx.send(Err(ChannelReceiveError::Closed));
return;
}
self.pending_receives.push_back(msg.tx);
}
}
impl<A: CloneableStreamable + 'static> Handler<CloseMessage> for ChannelActor<A> {
type Result = ();
fn handle(&mut self, _msg: CloseMessage, _ctx: &mut Context<Self>) {
if !self.closed {
self.closed = true;
while let Some(tx) = self.pending_receives.pop_front() {
let _ = tx.send(Err(ChannelReceiveError::Closed));
}
}
}
}
impl<A: CloneableStreamable + 'static> Handler<SizeMessage> for ChannelActor<A> {
type Result = usize;
fn handle(&mut self, _msg: SizeMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.buffer.len()
}
}
impl<A: CloneableStreamable + 'static> Handler<IsClosedMessage> for ChannelActor<A> {
type Result = bool;
fn handle(&mut self, _msg: IsClosedMessage, _ctx: &mut Context<Self>) -> Self::Result {
self.closed
}
}
#[derive(Debug)]
pub struct Channel<A: CloneableStreamable> {
actor_addr: Addr<ChannelActor<A>>,
_phantom_a: PhantomData<A>,
}
impl<A: CloneableStreamable> Clone for Channel<A> {
fn clone(&self) -> Self {
Channel {
actor_addr: self.actor_addr.clone(),
_phantom_a: PhantomData,
}
}
}
impl<A: CloneableStreamable + 'static> Channel<A> {
pub fn new(capacity: usize) -> Self {
let actor_addr = ChannelActor::new(capacity).start();
Channel { actor_addr, _phantom_a: PhantomData }
}
pub async fn send(&self, item: A) -> Result<(), ChannelSendError<A>> {
let (tx, rx) = oneshot::channel();
let item_clone_for_error = item.clone();
self.actor_addr.do_send(SendMessage { item, tx });
match rx.await {
Ok(res) => res,
Err(_) => Err(ChannelSendError::Closed(item_clone_for_error)), }
}
pub async fn receive(&self) -> Result<A, ChannelReceiveError> {
let (tx, rx) = oneshot::channel();
self.actor_addr.do_send(ReceiveMessage { tx });
match rx.await {
Ok(res) => res,
Err(_) => Err(ChannelReceiveError::Closed), }
}
pub fn close(&self) {
self.actor_addr.do_send(CloseMessage);
}
pub async fn size(&self) -> usize {
self.actor_addr.send(SizeMessage).await.unwrap_or(0) }
pub async fn is_closed(&self) -> bool {
self.actor_addr.send(IsClosedMessage).await.unwrap_or(true) }
}