gmt_dos_actors/model/
running.rs

1use super::{Completed, Model, ModelError, Result, Running};
2use crate::{
3    framework::model::TaskError::FromActor,
4    ActorError::{Disconnected, DropRecv, DropSend},
5};
6use chrono::{DateTime, Local, SecondsFormat};
7use std::{
8    future::{Future, IntoFuture},
9    marker::PhantomData,
10    pin::Pin,
11    time::Instant,
12};
13
14impl Model<Running> {
15    /// Waits for the task of each actor to finish
16    pub async fn wait(mut self) -> Result<Model<Completed>> {
17        let task_handles = self.task_handles.take().unwrap();
18        for task_handle in task_handles.into_iter() {
19            // task_handle.await?.map_err(|e| Box::new(e))?;
20            match task_handle.await? {
21                Ok(_) => {
22                    log::debug!(
23                        "{} succesfully completed",
24                        self.name.as_ref().unwrap_or(&String::from("Model"))
25                    );
26                    Ok(())
27                }
28                Err(FromActor(Disconnected(msg))) => {
29                    log::debug!("{} has been disconnected", msg);
30                    Ok(())
31                }
32                Err(FromActor(DropRecv { msg, .. })) => {
33                    log::debug!("{} has been dropped", msg);
34                    Ok(())
35                }
36                Err(FromActor(DropSend { msg, .. })) => {
37                    log::debug!("{} has been dropped", msg);
38                    Ok(())
39                }
40                Err(e) => Err(e),
41            }
42            .map_err(Box::new)?;
43        }
44        let elapsed_time = Instant::now().duration_since(self.start);
45        let now: DateTime<Local> = Local::now();
46        self.verbose.then(|| {
47            eprintln!(
48                "[{}<{}>] COMPLETED in {}",
49                self.name
50                    .as_ref()
51                    .unwrap_or(&String::from("Model"))
52                    .to_uppercase(),
53                now.to_rfc3339_opts(SecondsFormat::Secs, true),
54                humantime::format_duration(elapsed_time)
55            )
56        });
57        Ok(Model::<Completed> {
58            name: self.name,
59            actors: None,
60            task_handles: None,
61            state: PhantomData,
62            start: Instant::now(),
63            verbose: self.verbose,
64            elapsed_time: elapsed_time.as_secs_f64(),
65        })
66    }
67}
68
69pub type ModelCompleted = Pin<
70    Box<dyn Future<Output = std::result::Result<Model<Completed>, ModelError>> + Send + 'static>,
71>;
72impl IntoFuture for Model<Running> {
73    type IntoFuture = ModelCompleted;
74    type Output = <ModelCompleted as Future>::Output;
75    fn into_future(self) -> Self::IntoFuture {
76        Box::pin(self.wait())
77    }
78}