gmt_dos_actors/
model.rs

1/*!
2# Actors model
3
4The module implements the high-level integrated model interface.
5The model is build from a collection of [actor]s.
6
7The model has 4 states:
8 1. [Unknown]: model state at its creation
9 2. [Ready]: model state after succesfully performing runtime checks on inputs and outputs on all the actors, the model can move to the [Ready] state only from the [Unknown] state
10 3. [Running]: model state while all the actors are performing their respective tasks, the model can move to the [Running] state only from the [Ready] state
11 4. [Completed]: model state after the succesful completion of the tasks of all the actors, the model can move to the [Completed] state only from the [Running] state
12
13# Example
14
15A 3 actors model with [Signals], [Sampler] and [Logging] clients is build with:
16```
17use gmt_dos_actors::prelude::*;
18use gmt_dos_clients::signals::Signals;
19use gmt_dos_clients::sampler::Sampler;
20use gmt_dos_clients::logging::Logging;
21use interface::UID;
22
23let mut source: Initiator<_> = Signals::new(1, 100).into();
24#[derive(UID)]
25enum Source {};
26let mut sampler: Actor<_, 1, 10> = Sampler::<Vec<f64>, Source>::default().into();
27let logging = Logging::<f64>::default().into_arcx();
28let mut sink = Terminator::<_, 10>::new(logging);
29```
30`sampler` decimates `source` with a 1:10 ratio.
31The `source` connects to the `sampler` using the empty enum type `Source` as the data identifier.
32The source data is then logged into the client of the `sink` actor.
33```
34# use gmt_dos_actors::prelude::*;
35# use gmt_dos_clients::signals::Signals;
36# use gmt_dos_clients::sampler::Sampler;
37# use gmt_dos_clients::logging::Logging;
38# use interface::UID;
39# let mut source: Initiator<_> = Signals::new(1, 100).into();
40# #[derive(UID)]
41# enum Source {};
42# let mut sampler: Actor<_> = Sampler::<Vec<f64>, Source>::default().into();
43# let logging = Logging::<f64>::default().into_arcx();
44# let mut sink = Terminator::<_>::new(logging);
45source.add_output().build::<Source>().into_input(&mut sampler);
46sampler.add_output().build::<Source>().into_input(&mut sink);
47```
48A [model](mod@crate::model) is build from the set of actors:
49```
50# use gmt_dos_actors::prelude::*;
51# use gmt_dos_clients::signals::Signals;
52# use gmt_dos_clients::sampler::Sampler;
53# use gmt_dos_clients::logging::Logging;
54# use interface::UID;
55# let mut source: Initiator<_> = Signals::new(1, 100).into();
56# #[derive(UID)]
57# enum Source {};
58# let mut sampler: Actor<_> = Sampler::<Vec<f64>, Source>::default().into();
59# let logging = Logging::<f64>::default().into_arcx();
60# let mut sink = Terminator::<_>::new(logging.clone());
61# source.add_output().build::<Source>().into_input(&mut sampler);
62# sampler.add_output().build::<Source>().into_input(&mut sink);
63Model::new(vec![Box::new(source), Box::new(sampler), Box::new(sink)]);
64```
65Actors are checked for inputs/outputs consistencies:
66```
67# use gmt_dos_actors::prelude::*;
68# use gmt_dos_clients::signals::Signals;
69# use gmt_dos_clients::sampler::Sampler;
70# use gmt_dos_clients::logging::Logging;
71# use interface::UID;
72# let mut source: Initiator<_> = Signals::new(1, 100).into();
73# #[derive(UID)]
74# enum Source {};
75# let mut sampler: Actor<_> = Sampler::<Vec<f64>, Source>::default().into();
76# let logging = Logging::<f64>::default().into_arcx();
77# let mut sink = Terminator::<_>::new(logging.clone());
78# source.add_output().build::<Source>().into_input(&mut sampler);
79# sampler.add_output().build::<Source>().into_input(&mut sink);
80Model::new(vec![Box::new(source), Box::new(sampler), Box::new(sink)])
81       .check()?;
82# Ok::<(), gmt_dos_actors::model::ModelError>(())
83```
84The model run the actor tasks:
85```
86# tokio_test::block_on(async {
87# use gmt_dos_actors::prelude::*;
88# use gmt_dos_clients::signals::Signals;
89# use gmt_dos_clients::sampler::Sampler;
90# use gmt_dos_clients::logging::Logging;
91# use interface::UID;
92# let mut source: Initiator<_> = Signals::new(1, 100).into();
93# #[derive(UID)]
94# enum Source {};
95# let mut sampler: Actor<_> = Sampler::<Vec<f64>, Source>::default().into();
96# let logging = Logging::<f64>::default().into_arcx();
97# let mut sink = Terminator::<_>::new(logging.clone());
98# source.add_output().build::<Source>().into_input(&mut sampler);
99# sampler.add_output().build::<Source>().into_input(&mut sink);
100Model::new(vec![Box::new(source), Box::new(sampler), Box::new(sink)])
101       .check()?
102       .run();
103# Ok::<(), gmt_dos_actors::model::ModelError>(())
104# });
105```
106and wait for the tasks to finish:
107```
108# tokio_test::block_on(async {
109# use gmt_dos_actors::prelude::*;
110# use gmt_dos_clients::signals::Signals;
111# use gmt_dos_clients::sampler::Sampler;
112# use gmt_dos_clients::logging::Logging;
113# use interface::UID;
114# let mut source: Initiator<_> = Signals::new(1, 100).into();
115# #[derive(UID)]
116# enum Source {};
117# let mut sampler: Actor<_> = Sampler::<Vec<f64>, Source>::default().into();
118# let logging = Logging::<f64>::default().into_arcx();
119# let mut sink = Terminator::<_>::new(logging.clone());
120# source.add_output().build::<Source>().into_input(&mut sampler);
121# sampler.add_output().build::<Source>().into_input(&mut sink);
122Model::new(vec![Box::new(source), Box::new(sampler), Box::new(sink)])
123       .check()?
124       .run()
125       .wait()
126       .await?;
127# Ok::<(), gmt_dos_actors::model::ModelError>(())
128# });
129```
130Once the model run to completion, the data from `logging` is read with:
131```
132# tokio_test::block_on(async {
133# use gmt_dos_actors::prelude::*;
134# use gmt_dos_clients::signals::Signals;
135# use gmt_dos_clients::sampler::Sampler;
136# use gmt_dos_clients::logging::Logging;
137# use interface::UID;
138# let mut source: Initiator<_> = Signals::new(1, 100).into();
139# #[derive(UID)]
140# enum Source {};
141# let mut sampler: Actor<_> = Sampler::<Vec<f64>, Source>::default().into();
142# let logging = Logging::<f64>::default().into_arcx();
143# let mut sink = Terminator::<_>::new(logging.clone());
144# source.add_output().build::<Source>().into_input(&mut sampler);
145# sampler.add_output().build::<Source>().into_input(&mut sink);
146# Model::new(vec![Box::new(source), Box::new(sampler), Box::new(sink)])
147#       .check()?
148#       .run()
149#       .wait()
150#       .await?;
151let data: &[f64]  = &logging.lock().await;
152# Ok::<(), gmt_dos_actors::model::ModelError>(())
153# });
154```
155
156[Actor]: crate::actor::Actor
157[Write]: interface::Write
158[Read]: interface::Read
159[Update]: interface::Update
160[Model]: crate::model::Model
161[Mutex]: tokio::sync::Mutex
162[Arc]: std::sync::Arc
163[Arcmutex]: crate::ArcMutex
164[into_arcx]: crate::ArcMutex::into_arcx
165[Signals]: https://docs.rs/gmt_dos-clients/latest/gmt_dos_clients/logging/struct.Signals.html
166[Sampler]: https://docs.rs/gmt_dos-clients/latest/gmt_dos_clients/logging/struct.Sampler.html
167[Logging]: https://docs.rs/gmt_dos-clients/latest/gmt_dos_clients/logging/struct.Logging.html
168*/
169
170use crate::framework::model::{CheckError, Task, TaskError};
171use std::{fmt::Display, marker::PhantomData, time::Instant};
172
173mod flowchart;
174use tokio::task::JoinHandle;
175
176#[derive(thiserror::Error, Debug)]
177pub enum ModelError {
178    #[error("no actors found in the model")]
179    NoActors,
180    #[error("failed to join the task")]
181    TaskError(#[from] tokio::task::JoinError),
182    #[error("Actor IO inconsistency")]
183    ActorIO(#[from] crate::ActorError),
184    #[error("error in Task implementation")]
185    Task(#[from] Box<TaskError>),
186    #[error("error in Check implementation")]
187    Check(#[from] Box<CheckError>),
188}
189
190type Result<T> = std::result::Result<T, ModelError>;
191
192/// [Model] initial state
193pub enum Unknown {}
194/// Valid [Model] state
195pub enum Ready {}
196/// [Model]ing in-progress state
197pub enum Running {}
198/// [Model] final state
199pub enum Completed {}
200
201type Actors = Vec<Box<dyn Task>>;
202
203/// Actor model
204pub struct Model<State> {
205    pub(crate) name: Option<String>,
206    pub(crate) actors: Option<Actors>,
207    pub(crate) task_handles: Option<Vec<JoinHandle<std::result::Result<(), TaskError>>>>,
208    pub(crate) state: PhantomData<State>,
209    pub(crate) start: Instant,
210    pub(crate) verbose: bool,
211    pub(crate) elapsed_time: f64,
212}
213
214impl<S> Display for Model<S> {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        writeln!(
217            f,
218            "{} [{},{:?}]",
219            self.name.as_ref().unwrap_or(&"ACTOR MODEL".to_string()),
220            self.n_actors(),
221            self.n_io()
222        )?;
223        if let Some(actors) = &self.actors {
224            for actor in actors {
225                write!(f, " {}", actor)?;
226            }
227        }
228        Ok(())
229    }
230}
231
232impl<S> Model<S> {
233    /// Prints some informations about the model and the actors within
234    pub fn inspect(self) -> Self {
235        println!("{self}");
236        self
237    }
238    /// Returns the total number of inputs and the total number of outputs
239    ///
240    /// Both numbers should be the same
241    pub fn n_io(&self) -> (usize, usize) {
242        if let Some(ref actors) = self.actors {
243            actors
244                .iter()
245                .fold((0usize, 0usize), |(mut i, mut o), actor| {
246                    i += actor.n_inputs();
247                    o += actor.n_outputs();
248                    (i, o)
249                })
250        } else {
251            (0, 0)
252        }
253    }
254    /// Returns the number of actors
255    pub fn n_actors(&self) -> usize {
256        self.actors.as_ref().map_or(0, |actors| actors.len())
257    }
258    pub fn get_name(&self) -> String {
259        self.name.clone().unwrap_or("model".to_string())
260    }
261    pub fn elapsed_time(&self) -> std::time::Duration {
262        std::time::Duration::from_secs_f64(self.elapsed_time)
263    }
264}
265
266#[doc(hidden)]
267pub trait UnknownOrReady {}
268impl UnknownOrReady for Unknown {}
269impl UnknownOrReady for Ready {}
270
271mod plain;
272pub mod ready;
273pub mod running;
274pub mod unknown;
275pub use plain::PlainModel;
276
277// mod task;