use crate::{
actor::plain::{PlainActor, PlainOutput},
Task,
};
use chrono::{DateTime, Local, SecondsFormat};
use std::{
collections::{hash_map::DefaultHasher, HashMap},
env,
fs::File,
hash::{Hash, Hasher},
io::Write,
marker::PhantomData,
path::Path,
process::Command,
time::Instant,
};
#[derive(thiserror::Error, Debug)]
pub enum ModelError {
#[error("no actors found in the model")]
NoActors,
#[error("failed to join the task")]
TaskError(#[from] tokio::task::JoinError),
#[error("Actor IO inconsistency")]
ActorIO(#[from] crate::ActorError),
}
type Result<T> = std::result::Result<T, ModelError>;
pub enum Unknown {}
pub enum Ready {}
pub enum Running {}
pub enum Completed {}
type Actors = Vec<Box<dyn Task>>;
pub struct Model<State> {
name: Option<String>,
actors: Option<Actors>,
task_handles: Option<Vec<tokio::task::JoinHandle<()>>>,
state: PhantomData<State>,
start: Instant,
}
#[doc(hidden)]
pub trait UnknownOrReady {}
impl UnknownOrReady for Unknown {}
impl UnknownOrReady for Ready {}
impl<State> Model<State>
where
State: UnknownOrReady,
{
pub fn graph(&self) -> Option<Graph> {
self.actors
.as_ref()
.map(|actors| Graph::new(actors.iter().map(|a| a.as_plain()).collect()))
}
pub fn flowchart(self) -> Self {
let name = self
.name
.clone()
.unwrap_or_else(|| "integrated_model".to_string());
let root_env = env::var("DATA_REPO").unwrap_or_else(|_| ".".to_string());
let path = Path::new(&root_env).join(&name);
if let Some(graph) = self.graph() {
match graph.to_dot(path.with_extension("dot")) {
Ok(_) => {
if let Err(e) = Command::new("neato")
.arg("-Gstart=rand")
.arg("-Tsvg")
.arg("-O")
.arg(path.with_extension("dot").to_str().unwrap())
.output()
{
println!(
"Failed to convert Graphviz dot file {path:?} to SVG image with {e}"
)
}
}
Err(e) => println!("Failed to write Graphviz dot file {path:?} with {e}"),
}
}
self
}
}
impl Model<Unknown> {
pub fn new(actors: Actors) -> Self {
Self {
name: None,
actors: Some(actors),
task_handles: None,
state: PhantomData,
start: Instant::now(),
}
}
pub fn name<S: Into<String>>(self, name: S) -> Self {
Self {
name: Some(name.into()),
..self
}
}
pub fn check(self) -> Result<Model<Ready>> {
match self.actors {
Some(ref actors) => {
for actor in actors {
actor.check_inputs()?;
actor.check_outputs()?;
}
Ok(Model::<Ready> {
name: self.name,
actors: self.actors,
task_handles: None,
state: PhantomData,
start: Instant::now(),
})
}
None => Err(ModelError::NoActors),
}
}
}
impl Model<Ready> {
pub fn run(mut self) -> Model<Running> {
let now: DateTime<Local> = Local::now();
println!(
"[{}<{}>] LAUNCHED",
self.name
.as_ref()
.unwrap_or(&String::from("Model"))
.to_uppercase(),
now.to_rfc3339_opts(SecondsFormat::Secs, true),
);
let mut actors = self.actors.take().unwrap();
let mut task_handles = vec![];
while let Some(mut actor) = actors.pop() {
task_handles.push(tokio::spawn(async move {
actor.task().await;
}));
}
Model::<Running> {
name: self.name,
actors: None,
task_handles: Some(task_handles),
state: PhantomData,
start: Instant::now(),
}
}
}
impl Model<Running> {
pub async fn wait(mut self) -> Result<Model<Completed>> {
let task_handles = self.task_handles.take().unwrap();
for task_handle in task_handles.into_iter() {
task_handle.await?;
}
let elapsed_time = Instant::now().duration_since(self.start);
let now: DateTime<Local> = Local::now();
println!(
"[{}<{}>] COMPLETED in {}",
self.name
.as_ref()
.unwrap_or(&String::from("Model"))
.to_uppercase(),
now.to_rfc3339_opts(SecondsFormat::Secs, true),
humantime::format_duration(elapsed_time)
);
Ok(Model::<Completed> {
name: self.name,
actors: None,
task_handles: None,
state: PhantomData,
start: Instant::now(),
})
}
}
#[derive(Debug)]
pub struct Graph {
actors: Vec<PlainActor>,
}
impl Graph {
fn new(actors: Vec<PlainActor>) -> Self {
let mut hasher = DefaultHasher::new();
let mut actors = actors;
actors.iter_mut().for_each(|actor| {
actor.client = actor
.client
.replace("::Controller", "")
.split('<')
.next()
.unwrap()
.split("::")
.last()
.unwrap()
.to_string();
actor.hash(&mut hasher);
actor.hash = hasher.finish();
});
Self { actors }
}
pub fn to_string(&self) -> String {
use PlainOutput::*;
let mut lookup: HashMap<usize, usize> = HashMap::new();
let mut colors = (1usize..=8).cycle();
let outputs: Vec<_> = self
.actors
.iter()
.filter_map(|actor| {
actor.outputs.as_ref().map(|outputs| {
outputs
.iter()
.map(|output| {
let color = lookup
.entry(actor.outputs_rate)
.or_insert_with(|| colors.next().unwrap());
match output {
Bootstrap(output) => format!(
"{0} -> {1} [color={2}, style=bold];",
actor.hash, output.hash, color
),
Regular(output) => format!(
"{0} -> {1} [color={2}];",
actor.hash, output.hash, color
),
}
})
.collect::<Vec<String>>()
})
})
.flatten()
.collect();
let inputs: Vec<_> = self
.actors
.iter()
.filter_map(|actor| {
actor.inputs.as_ref().map(|inputs| {
inputs
.iter()
.map(|input| {
let color = lookup
.entry(actor.inputs_rate)
.or_insert_with(|| colors.next().unwrap());
format!(
r#"{0} -> {1} [label="{2}", color={3}];"#,
input.hash,
actor.hash,
input.name.split("::").last().unwrap(),
color
)
})
.collect::<Vec<String>>()
})
})
.flatten()
.collect();
format!(
r#"
digraph G {{
overlap = scale;
splines = true;
bgcolor = gray24;
{{node [shape=box, width=1.5, style="rounded,filled", fillcolor=lightgray]; {};}}
node [shape=point, fillcolor=gray24, color=lightgray];
/* Outputs */
{{
edge [arrowhead=none,colorscheme=dark28];
{}
}}
/* Inputs */
{{
edge [arrowhead=vee,fontsize=9, fontcolor=lightgray, labelfloat=true,colorscheme=dark28]
{}
}}
}}
"#,
self.actors
.iter()
.map(|actor| format!(r#"{} [label="{}"]"#, actor.hash, actor.client))
.collect::<Vec<String>>()
.join("; "),
outputs.join("\n"),
inputs.join("\n"),
)
}
pub fn to_dot<P: AsRef<Path>>(
&self,
path: P,
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut file = File::create(path)?;
write!(&mut file, "{}", self.to_string())?;
Ok(())
}
}