gmt_dos_actors/model/
running.rs1use super::{Completed, Model, ModelError, Result, Running};
2use crate::{
3 framework::model::TaskError::FromActor,
4 ActorError::{Disconnected, DropRecv, DropSend},
5};
6use chrono::{DateTime, Local, SecondsFormat};
7use std::{
8 future::{Future, IntoFuture},
9 marker::PhantomData,
10 pin::Pin,
11 time::Instant,
12};
13
14impl Model<Running> {
15 pub async fn wait(mut self) -> Result<Model<Completed>> {
17 let task_handles = self.task_handles.take().unwrap();
18 for task_handle in task_handles.into_iter() {
19 match task_handle.await? {
21 Ok(_) => {
22 log::debug!(
23 "{} succesfully completed",
24 self.name.as_ref().unwrap_or(&String::from("Model"))
25 );
26 Ok(())
27 }
28 Err(FromActor(Disconnected(msg))) => {
29 log::debug!("{} has been disconnected", msg);
30 Ok(())
31 }
32 Err(FromActor(DropRecv { msg, .. })) => {
33 log::debug!("{} has been dropped", msg);
34 Ok(())
35 }
36 Err(FromActor(DropSend { msg, .. })) => {
37 log::debug!("{} has been dropped", msg);
38 Ok(())
39 }
40 Err(e) => Err(e),
41 }
42 .map_err(Box::new)?;
43 }
44 let elapsed_time = Instant::now().duration_since(self.start);
45 let now: DateTime<Local> = Local::now();
46 self.verbose.then(|| {
47 eprintln!(
48 "[{}<{}>] COMPLETED in {}",
49 self.name
50 .as_ref()
51 .unwrap_or(&String::from("Model"))
52 .to_uppercase(),
53 now.to_rfc3339_opts(SecondsFormat::Secs, true),
54 humantime::format_duration(elapsed_time)
55 )
56 });
57 Ok(Model::<Completed> {
58 name: self.name,
59 actors: None,
60 task_handles: None,
61 state: PhantomData,
62 start: Instant::now(),
63 verbose: self.verbose,
64 elapsed_time: elapsed_time.as_secs_f64(),
65 })
66 }
67}
68
69pub type ModelCompleted = Pin<
70 Box<dyn Future<Output = std::result::Result<Model<Completed>, ModelError>> + Send + 'static>,
71>;
72impl IntoFuture for Model<Running> {
73 type IntoFuture = ModelCompleted;
74 type Output = <ModelCompleted as Future>::Output;
75 fn into_future(self) -> Self::IntoFuture {
76 Box::pin(self.wait())
77 }
78}