use crate::stream::{CloneableStreamable, Stream, StreamMessage};
use actix::prelude::*;
use futures::channel::oneshot; use futures::FutureExt; use std::collections::HashMap;
use std::marker::PhantomData;
use tokio::task; use uuid::Uuid;
#[derive(Message)]
#[rtype(result = "()")]
struct PublishItem<A: CloneableStreamable> {
item: A,
}
#[derive(Message)]
#[rtype(result = "Result<Uuid, ()>")] struct SubscribeMsg<A: CloneableStreamable> {
recipient: Recipient<StreamMessage<A>>,
}
#[derive(Message)]
#[rtype(result = "()")]
struct CloseTopicMsg;
pub struct TopicActor<A>
where
A: CloneableStreamable + 'static, {
subscribers: HashMap<Uuid, Recipient<StreamMessage<A>>>,
_phantom_a: PhantomData<A>,
closed: bool,
}
impl<A> TopicActor<A>
where
A: CloneableStreamable + 'static,
{
pub fn new() -> Self {
TopicActor {
subscribers: HashMap::new(),
_phantom_a: PhantomData,
closed: false,
}
}
}
impl<A> Default for TopicActor<A>
where
A: CloneableStreamable + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<A> Actor for TopicActor<A>
where
A: CloneableStreamable + 'static,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
self.closed = true;
for (_, sub) in self.subscribers.iter() {
let _ = sub.try_send(StreamMessage::End);
}
self.subscribers.clear();
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
}
}
impl<A> Handler<PublishItem<A>> for TopicActor<A>
where
A: CloneableStreamable + 'static,
{
type Result = ();
fn handle(&mut self, msg: PublishItem<A>, _ctx: &mut Context<Self>) {
if self.closed {
return;
}
let mut failed_sends = Vec::new();
for (id, sub) in self.subscribers.iter() {
if sub
.try_send(StreamMessage::Element(msg.item.clone()))
.is_err()
{
failed_sends.push(*id);
}
}
for id in failed_sends {
self.subscribers.remove(&id);
}
}
}
impl<A> Handler<SubscribeMsg<A>> for TopicActor<A>
where
A: CloneableStreamable + 'static,
{
type Result = Result<Uuid, ()>;
fn handle(&mut self, msg: SubscribeMsg<A>, _ctx: &mut Context<Self>) -> Self::Result {
if self.closed {
return Err(());
}
let id = Uuid::new_v4();
self.subscribers.insert(id, msg.recipient);
Ok(id)
}
}
impl<A> Handler<CloseTopicMsg> for TopicActor<A>
where
A: CloneableStreamable + 'static,
{
type Result = ();
fn handle(&mut self, _msg: CloseTopicMsg, ctx: &mut Context<Self>) {
ctx.stop(); }
}
#[derive(Clone)]
pub struct Topic<A>
where
A: CloneableStreamable + 'static, {
actor_addr: Addr<TopicActor<A>>,
}
impl<A> Topic<A>
where
A: CloneableStreamable + 'static, {
pub fn new() -> Self {
Topic {
actor_addr: TopicActor::new().start(),
}
}
pub fn publish(&self, item: A) {
self.actor_addr.do_send(PublishItem { item });
}
pub fn subscribe(&self) -> Stream<A> {
let actor_addr_for_local_task = self.actor_addr.clone();
let setup_fn_closure = Box::new(
move |downstream_recipient: Recipient<StreamMessage<A>>| {
let (tx_oneshot, rx_oneshot) = oneshot::channel::<Result<(), String>>();
let task_actor_addr_clone = actor_addr_for_local_task.clone();
task::spawn_local(async move {
match task_actor_addr_clone
.send(SubscribeMsg {
recipient: downstream_recipient,
})
.await
{
Ok(Ok(_uuid)) => {
let _ = tx_oneshot.send(Ok(()));
}
Ok(Err(())) => {
let _ = tx_oneshot.send(Err(String::from("Subscription failed: Topic actor reported an internal error.")));
}
Err(_e) => {
let _ = tx_oneshot.send(Err(String::from("Subscription failed: Could not send message to Topic actor (MailboxError).")));
}
}
});
async move {
match rx_oneshot.await {
Ok(Ok(())) => Ok(()), Ok(Err(e)) => Err(e), Err(_recv_error) => Err(String::from("Subscription channel closed unexpectedly; task may have panicked.")), }
}
.boxed() },
);
Stream {
setup_fn: setup_fn_closure,
_phantom: PhantomData,
}
}
pub fn close(&self) {
self.actor_addr.do_send(CloseTopicMsg);
}
}
impl<A> Default for Topic<A>
where
A: CloneableStreamable + 'static,
{
fn default() -> Self {
Self::new()
}
}