use crate::Task;
use chrono::{DateTime, Local, SecondsFormat};
use std::{
env, fmt::Display, marker::PhantomData, ops::Add, path::Path, process::Command, time::Instant,
};
mod flowchart;
pub use flowchart::Graph;
#[derive(thiserror::Error, Debug)]
pub enum ModelError {
#[error("no actors found in the model")]
NoActors,
#[error("failed to join the task")]
TaskError(#[from] tokio::task::JoinError),
#[error("Actor IO inconsistency")]
ActorIO(#[from] crate::ActorError),
}
type Result<T> = std::result::Result<T, ModelError>;
pub enum Unknown {}
pub enum Ready {}
pub enum Running {}
pub enum Completed {}
type Actors = Vec<Box<dyn Task>>;
pub struct Model<State> {
name: Option<String>,
actors: Option<Actors>,
task_handles: Option<Vec<tokio::task::JoinHandle<()>>>,
state: PhantomData<State>,
start: Instant,
}
impl<S> Display for Model<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(
f,
"{} [{},{:?}]",
self.name.as_ref().unwrap_or(&"ACTOR MODEL".to_string()),
self.n_actors(),
self.n_io()
)?;
if let Some(actors) = &self.actors {
for actor in actors {
write!(f, " {}", actor)?;
}
}
Ok(())
}
}
impl<S> Model<S> {
pub fn inspect(self) -> Self {
println!("{self}");
self
}
pub fn n_io(&self) -> (usize, usize) {
if let Some(ref actors) = self.actors {
actors
.iter()
.fold((0usize, 0usize), |(mut i, mut o), actor| {
i += actor.n_inputs();
o += actor.n_outputs();
(i, o)
})
} else {
(0, 0)
}
}
pub fn n_actors(&self) -> usize {
self.actors.as_ref().map_or(0, |actors| actors.len())
}
}
#[doc(hidden)]
pub trait UnknownOrReady {}
impl UnknownOrReady for Unknown {}
impl UnknownOrReady for Ready {}
impl<State> Model<State>
where
State: UnknownOrReady,
{
pub fn graph(&self) -> Option<Graph> {
self.actors
.as_ref()
.map(|actors| Graph::new(actors.iter().map(|a| a.as_plain()).collect()))
}
pub fn flowchart(self) -> Self {
let name = self
.name
.clone()
.unwrap_or_else(|| "integrated_model".to_string());
let root_env = env::var("DATA_REPO").unwrap_or_else(|_| ".".to_string());
let path = Path::new(&root_env).join(&name);
if let Some(graph) = self.graph() {
match graph.to_dot(path.with_extension("dot")) {
Ok(_) => {
if let Err(e) = Command::new("neato")
.arg("-Gstart=rand")
.arg("-Tsvg")
.arg("-O")
.arg(path.with_extension("dot").to_str().unwrap())
.output()
{
println!(
"Failed to convert Graphviz dot file {path:?} to SVG image with {e}"
)
}
}
Err(e) => println!("Failed to write Graphviz dot file {path:?} with {e}"),
}
}
self
}
}
impl Model<Unknown> {
pub fn new(actors: Actors) -> Self {
Self {
name: None,
actors: Some(actors),
task_handles: None,
state: PhantomData,
start: Instant::now(),
}
}
pub fn name<S: Into<String>>(self, name: S) -> Self {
Self {
name: Some(name.into()),
..self
}
}
pub fn check(self) -> Result<Model<Ready>> {
let (n_inputs, n_outputs) = self.n_io();
assert_eq!(
n_inputs, n_outputs,
"I/O #({},{}) don't match, did you forget to add some actors to the model?",
n_inputs, n_outputs
);
match self.actors {
Some(ref actors) => {
let mut inputs_hashes = vec![];
let mut outputs_hashes = vec![];
for actor in actors {
actor.check_inputs()?;
actor.check_outputs()?;
inputs_hashes.append(&mut actor.inputs_hashes());
outputs_hashes.append(&mut actor.outputs_hashes());
}
let hashes_diff = outputs_hashes
.into_iter()
.zip(inputs_hashes.into_iter())
.map(|(o, i)| o as i128 - i as i128)
.sum::<i128>();
assert_eq!(hashes_diff,0i128,
"I/O hashes difference: expected 0, found {}, did you forget to add some actors to the model?",
hashes_diff);
Ok(Model::<Ready> {
name: self.name,
actors: self.actors,
task_handles: None,
state: PhantomData,
start: Instant::now(),
})
}
None => Err(ModelError::NoActors),
}
}
}
impl Model<Ready> {
pub fn run(mut self) -> Model<Running> {
let now: DateTime<Local> = Local::now();
println!(
"[{}<{}>] LAUNCHED",
self.name
.as_ref()
.unwrap_or(&String::from("Model"))
.to_uppercase(),
now.to_rfc3339_opts(SecondsFormat::Secs, true),
);
let mut actors = self.actors.take().unwrap();
let mut task_handles = vec![];
while let Some(mut actor) = actors.pop() {
task_handles.push(tokio::spawn(async move {
actor.task().await;
}));
}
Model::<Running> {
name: self.name,
actors: None,
task_handles: Some(task_handles),
state: PhantomData,
start: Instant::now(),
}
}
}
impl Model<Running> {
pub async fn wait(mut self) -> Result<Model<Completed>> {
let task_handles = self.task_handles.take().unwrap();
for task_handle in task_handles.into_iter() {
task_handle.await?;
}
let elapsed_time = Instant::now().duration_since(self.start);
let now: DateTime<Local> = Local::now();
println!(
"[{}<{}>] COMPLETED in {}",
self.name
.as_ref()
.unwrap_or(&String::from("Model"))
.to_uppercase(),
now.to_rfc3339_opts(SecondsFormat::Secs, true),
humantime::format_duration(elapsed_time)
);
Ok(Model::<Completed> {
name: self.name,
actors: None,
task_handles: None,
state: PhantomData,
start: Instant::now(),
})
}
}
use std::future::{Future, IntoFuture};
use std::pin::Pin;
pub type ModelCompleted = Pin<
Box<dyn Future<Output = std::result::Result<Model<Completed>, ModelError>> + Send + 'static>,
>;
impl IntoFuture for Model<Running> {
type IntoFuture = ModelCompleted;
type Output = <ModelCompleted as Future>::Output;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.wait())
}
}
impl Add for Model<Unknown> {
type Output = Model<Unknown>;
fn add(self, rhs: Self) -> Self::Output {
match (self.actors, rhs.actors) {
(None, None) => Model::new(vec![]),
(None, Some(b)) => Model::new(b),
(Some(a), None) => Model::new(a),
(Some(mut a), Some(mut b)) => {
a.append(&mut b);
Model::new(a)
}
}
}
}