#![warn(rust_2018_idioms)]
#![deny(
missing_docs,
rustdoc::broken_intra_doc_links,
rustdoc::invalid_rust_codeblocks,
missing_debug_implementations,
unreachable_pub,
unreachable_patterns,
unused,
unused_results,
unused_qualifications,
while_true,
trivial_casts,
trivial_bounds,
trivial_numeric_casts,
unconditional_panic,
unsafe_code,
clippy::all
)]
pub mod compat;
pub mod joint;
pub mod prelude;
pub(crate) mod scheduler;
pub mod sink;
pub mod stream;
pub use async_trait::async_trait;
use compat::{Sendable, SendableFusedStream, SendableWrapper};
use joint::{JointActor, JointClient};
pub use scheduler::Scheduler;
use scheduler::{ActorRunner, ActorStream};
use sink::{SinkActor, SinkClient};
use stream::{StreamActor, StreamClient};
pub use tokio::sync::oneshot::{
channel as oneshot_channel, Receiver as OneshotReceiver, Sender as OneshotSender,
};
use std::marker::PhantomData;
use futures::StreamExt;
use tokio::sync::{
broadcast,
mpsc::{unbounded_channel, UnboundedSender},
};
#[async_trait]
pub trait ActorState: 'static + Send + Sized {
type ActorType;
type Permanence;
type Message: Sendable;
type Output: Sendable + Clone;
#[allow(unused_variables)]
async fn start_up(&mut self, scheduler: &mut Scheduler<Self>) {}
async fn process(&mut self, scheduler: &mut Scheduler<Self>, msg: Self::Message);
#[allow(unused_variables)]
async fn finalize(self, scheduler: &mut Scheduler<Self>) {}
}
#[derive(Debug)]
pub struct Permanent;
#[derive(Debug)]
pub struct Transient;
#[allow(missing_debug_implementations)]
pub struct ActorBuilder<T, A: ActorState> {
ty: PhantomData<T>,
send: UnboundedSender<A::Message>,
#[allow(clippy::type_complexity)]
broadcast: Option<(
broadcast::Sender<SendableWrapper<A::Output>>,
broadcast::Receiver<SendableWrapper<A::Output>>,
)>,
recv: Vec<ActorStream<A::Message>>,
state: A,
}
impl<T, A> ActorBuilder<T, A>
where
A: ActorState<ActorType = T>,
{
pub fn new(state: A) -> Self {
let (send, recv) = unbounded_channel();
let recv = vec![recv.into()];
Self {
state,
send,
recv,
broadcast: None,
ty: PhantomData,
}
}
pub fn attach_stream<S, I>(&mut self, stream: S)
where
S: SendableFusedStream<Item = I>,
I: Into<A::Message>,
{
self.recv
.push(ActorStream::Secondary(Box::new(stream.map(|m| m.into()))));
}
}
impl<A> ActorBuilder<SinkActor, A>
where
A: ActorState<ActorType = SinkActor>,
{
pub fn client(&self) -> SinkClient<A::Permanence, A::Message> {
SinkClient::new(self.send.clone())
}
pub fn launch(self) -> SinkClient<A::Permanence, A::Message> {
let Self {
send, recv, state, ..
} = self;
let mut runner = ActorRunner::new(state);
recv.into_iter().for_each(|r| runner.add_stream(r));
runner.launch();
SinkClient::new(send)
}
}
impl<A> ActorBuilder<StreamActor, A>
where
A: ActorState<ActorType = StreamActor>,
{
pub fn client(&mut self) -> StreamClient<A::Output> {
let (_, broad) = self
.broadcast
.get_or_insert_with(|| broadcast::channel(100));
StreamClient::new(broad.resubscribe())
}
pub fn launch<S>(self, stream: S) -> StreamClient<A::Output>
where
S: SendableFusedStream<Item = A::Message>,
{
let Self {
mut recv,
state,
broadcast,
..
} = self;
let (broad, sub) = broadcast.unwrap_or_else(|| broadcast::channel(100));
recv.push(ActorStream::Secondary(Box::new(stream)));
let mut runner = ActorRunner::new(state);
runner.add_broadcaster(broad);
recv.into_iter().for_each(|r| runner.add_stream(r));
runner.launch();
StreamClient::new(sub)
}
}
impl<A> ActorBuilder<JointActor, A>
where
A: ActorState<ActorType = JointActor>,
{
pub fn stream_client(&self) -> StreamClient<A::Output> {
StreamClient::new(self.broadcast.as_ref().unwrap().1.resubscribe())
}
pub fn sink_client(&self) -> SinkClient<A::Permanence, A::Message> {
SinkClient::new(self.send.clone())
}
pub fn client(&self) -> SinkClient<A::Permanence, A::Message> {
SinkClient::new(self.send.clone())
}
pub fn launch_with_stream<S>(
mut self,
stream: S,
) -> JointClient<A::Permanence, A::Message, A::Output>
where
S: SendableFusedStream<Item = A::Message>,
{
self.attach_stream(stream);
self.launch()
}
pub fn launch(self) -> JointClient<A::Permanence, A::Message, A::Output> {
let Self {
send,
recv,
state,
broadcast,
..
} = self;
let (broad, sub) = broadcast.unwrap_or_else(|| broadcast::channel(100));
let mut runner = ActorRunner::new(state);
recv.into_iter().for_each(|r| runner.add_stream(r));
runner.add_broadcaster(broad);
runner.launch();
let sink = SinkClient::new(send);
let stream = StreamClient::new(sub);
JointClient::new(sink, stream)
}
}