use crate::stream::{Stream, StreamMessage, Streamable};
use actix::prelude::*;
use futures::FutureExt;
use std::fmt; use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
struct GetValue<A>(PhantomData<A>)
where
A: 'static + Send + Clone + Unpin + fmt::Debug;
impl<A> Message for GetValue<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
type Result = Option<A>;
}
struct SetValue<A>(A)
where
A: 'static + Send + Clone + Unpin + fmt::Debug;
impl<A> Message for SetValue<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
type Result = ();
}
struct SubscribeDiscrete<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
recipient: Recipient<StreamMessage<A>>,
}
impl<A> Message for SubscribeDiscrete<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
type Result = ();
}
struct SignalActor<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
current_value: Arc<Mutex<Option<A>>>,
subscribers: Vec<Recipient<StreamMessage<A>>>,
}
impl<A> SignalActor<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
fn new(initial_value: Option<A>) -> Self {
SignalActor {
current_value: Arc::new(Mutex::new(initial_value)),
subscribers: Vec::new(),
}
}
}
impl<A> Actor for SignalActor<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
type Context = Context<Self>;
}
impl<A> Handler<GetValue<A>> for SignalActor<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
type Result = Option<A>;
fn handle(&mut self, _msg: GetValue<A>, _ctx: &mut Context<Self>) -> Self::Result {
self.current_value.lock().unwrap().clone()
}
}
impl<A> Handler<SetValue<A>> for SignalActor<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
type Result = ();
fn handle(&mut self, msg: SetValue<A>, _ctx: &mut Context<Self>) -> Self::Result {
let mut value_guard = self.current_value.lock().unwrap();
*value_guard = Some(msg.0.clone());
self.subscribers
.retain(|sub| sub.try_send(StreamMessage::Element(msg.0.clone())).is_ok());
}
}
impl<A> Handler<SubscribeDiscrete<A>> for SignalActor<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
type Result = ();
fn handle(&mut self, msg: SubscribeDiscrete<A>, _ctx: &mut Context<Self>) -> Self::Result {
if let Some(val) = self.current_value.lock().unwrap().as_ref() {
let _ = msg.recipient.try_send(StreamMessage::Element(val.clone()));
}
self.subscribers.push(msg.recipient);
}
}
#[derive(Clone)]
pub struct Signal<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug,
{
actor_addr: Addr<SignalActor<A>>,
}
impl<A> Signal<A>
where
A: 'static + Send + Clone + Unpin + fmt::Debug + Streamable, {
pub fn new(initial_value: A) -> Self {
Signal {
actor_addr: SignalActor::new(Some(initial_value)).start(),
}
}
pub fn constant(value: A) -> Self {
Signal::new(value)
}
pub fn empty() -> Self {
Signal {
actor_addr: SignalActor::new(None).start(),
}
}
pub async fn get_value(&self) -> Option<A> {
self.actor_addr
.send(GetValue(PhantomData))
.await
.unwrap_or(None)
}
pub fn set_value(&self, value: A) {
self.actor_addr.do_send(SetValue(value));
}
pub fn discrete(&self) -> Stream<A> {
let actor_addr_clone = self.actor_addr.clone();
let setup_fn = Box::new(move |downstream_recipient: Recipient<StreamMessage<A>>| {
async move {
actor_addr_clone.do_send(SubscribeDiscrete {
recipient: downstream_recipient,
});
Ok(())
}
.boxed() });
Stream {
setup_fn,
_phantom: PhantomData,
}
}
}