use std::{any::Any, fmt, future::Future};
use tokio::sync::mpsc;
use crate::{
BoxFuture, Effects, Instant, Name, OutputEffect, Receiver, Resources, SendData, Sender, StageBuildRef, StageRef,
stage_ref::StageStateRef, types::MpscSender,
};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy, serde::Serialize, serde::Deserialize)]
pub struct ScheduleId(Instant, u64);
impl ScheduleId {
pub(crate) fn new(id: u64, instant: Instant) -> Self {
Self(instant, id)
}
pub fn time(&self) -> Instant {
self.0
}
}
impl fmt::Display for ScheduleId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "id {} at {}", self.1, self.0)
}
}
pub trait StageGraph {
fn stage<Msg, St, F, Fut>(&mut self, name: impl AsRef<str>, f: F) -> StageBuildRef<Msg, St, Box<dyn Any + Send>>
where
F: FnMut(St, Msg, Effects<Msg>) -> Fut + 'static + Send,
Fut: Future<Output = St> + 'static + Send,
Msg: SendData + serde::de::DeserializeOwned,
St: SendData;
fn wire_up<Msg, St>(
&mut self,
stage: StageBuildRef<Msg, St, Box<dyn Any + Send>>,
state: St,
) -> StageStateRef<Msg, St>
where
Msg: SendData + serde::de::DeserializeOwned,
St: SendData;
fn contramap<Original: SendData, Mapped: SendData>(
&mut self,
stage_ref: impl AsRef<StageRef<Original>>,
new_name: impl AsRef<str>,
transform: impl Fn(Mapped) -> Original + 'static + Send,
) -> StageRef<Mapped>;
fn preload<Msg: SendData>(
&mut self,
stage: impl AsRef<StageRef<Msg>>,
messages: impl IntoIterator<Item = Msg>,
) -> Result<(), Box<dyn SendData>>;
fn input<Msg: SendData>(&mut self, stage: impl AsRef<StageRef<Msg>>) -> Sender<Msg>;
fn output<Msg>(&mut self, name: impl AsRef<str>, send_queue_size: usize) -> (StageRef<Msg>, Receiver<Msg>)
where
Msg: SendData + PartialEq + serde::Serialize + serde::de::DeserializeOwned,
{
let name = Name::from(name.as_ref());
let (sender, rx) = mpsc::channel(send_queue_size);
let tx = MpscSender { sender };
let output = self.stage(name, async |tx: MpscSender<Msg>, msg: Msg, eff| {
eff.external(OutputEffect::new(eff.me().name().clone(), msg, tx.clone())).await;
tx
});
let output = self.wire_up(output, tx);
(output.without_state(), Receiver::new(rx))
}
fn resources(&self) -> &Resources;
}
pub trait StageGraphRunning {
fn is_terminated(&self) -> bool;
fn termination(&self) -> BoxFuture<'static, ()>;
}
pub fn stage_name(counter: &mut usize, prefix: &str) -> Name {
*counter += 1;
Name::from(&*format!("{}-{}", prefix, counter))
}