use crossbeam::channel::unbounded;
use zrx_storage::Storage;
use zrx_store::Collection;
use crate::scheduler::action::{Action, Options};
use crate::scheduler::signal::Diff;
use crate::scheduler::signal::{Id, Scope, Value};
use super::error::Result;
use super::graph::{Descriptor, Handler, Node, Source, Worker};
use super::Builder;
pub struct Subscriber<'a, I, A>
where
A: Action<I>,
{
action: A,
options: Options,
storage: Option<Storage<Scope<I>, A::Output<'a>>>,
}
impl<'a, I, A> Subscriber<'a, I, A>
where
A: Action<I>,
{
#[inline]
pub fn new(action: A) -> Self {
Self {
action,
options: Options::default(),
storage: None,
}
}
#[inline]
#[must_use]
pub fn with_options(mut self, options: Options) -> Self {
self.options = options;
self
}
#[inline]
#[must_use]
pub fn with_storage<S>(mut self, storage: S) -> Self
where
S: Collection<Scope<I>, A::Output<'a>>,
{
self.storage = Some(Storage::new(storage));
self
}
}
impl<I> Builder<I>
where
I: Id,
{
pub fn add<'a, N, S, A>(&mut self, nodes: N, subscriber: S) -> Result<usize>
where
N: IntoIterator<Item = usize>,
S: Into<Subscriber<'a, I, A>>,
A: Action<I> + 'static,
{
let Subscriber { action, options, storage } = subscriber.into();
let handler = Handler::new(action, options);
let target = self.graph.add_node(Node::new(
Descriptor::of::<A::Output<'a>>(),
Worker::new(handler),
));
let n = self.storages.insert(storage.unwrap_or_default());
debug_assert_eq!(n, target);
for source in nodes {
self.graph.add_edge(source, target)?;
}
Ok(target)
}
pub fn add_source<T>(&mut self) -> usize
where
T: Value,
{
let (sender, receiver) = unbounded::<Diff<I, T>>();
let handler = Handler::new(receiver, Options::default());
let target = self.graph.add_node(Node::new(
Descriptor::of::<T>(),
Source::new(handler, sender),
));
let n = self.storages.insert(Storage::<Scope<I>, T>::default());
debug_assert_eq!(n, target);
target
}
}
impl<I, A> From<A> for Subscriber<'_, I, A>
where
A: Action<I>,
{
#[inline]
fn from(action: A) -> Self {
Self::new(action)
}
}