lazy_async_promise/
immediatevalueprogress.rs

1use crate::{BoxedSendError, DirectCacheAccess, Progress};
2use crate::{ImmediateValuePromise, ImmediateValueState};
3use std::borrow::Cow;
4use std::time::Instant;
5use tokio::sync::mpsc::Receiver;
6use tokio::sync::mpsc::Sender;
7
8/// A status update struct containing the issue-date, progress and a message
9/// You can use any struct that can be transferred via tokio mpsc channels.
10#[derive(Debug)]
11pub struct Status<M> {
12    /// Time when this status was created
13    pub time: Instant,
14    /// Current progress
15    pub progress: Progress,
16    /// Message
17    pub message: M,
18}
19
20impl<M> Status<M> {
21    /// Create a new status message with `now` as timestamp
22    pub fn new(progress: Progress, message: M) -> Self {
23        Self {
24            progress,
25            message,
26            time: Instant::now(),
27        }
28    }
29}
30
31/// This [`Status`] typedef allows to use both: `&'static str` and `String` in a message
32pub type StringStatus = Status<Cow<'static, str>>;
33
34impl StringStatus {
35    /// create a [`StringStatus`] from a `&'static str`
36    pub fn from_str(progress: Progress, static_message: &'static str) -> Self {
37        StringStatus {
38            message: Cow::Borrowed(static_message),
39            time: Instant::now(),
40            progress,
41        }
42    }
43    /// create a [`StringStatus`] from a `String`
44    pub fn from_string(progress: Progress, message: String) -> Self {
45        StringStatus {
46            message: Cow::Owned(message),
47            time: Instant::now(),
48            progress,
49        }
50    }
51}
52
53/// # A progress and status enabling wrapper for [`ImmediateValuePromise`]
54/// This struct allows to use the [`Progress`] type and any kind of status message
55/// You can use this to set a computation progress and optionally attach any kind of status message.
56/// Assume your action runs  for an extended period of time and you want to inform the user about the state:
57///```rust, no_run
58///use std::borrow::Cow;
59///use std::time::Duration;
60///use lazy_async_promise::{ImmediateValueState, ImmediateValuePromise, Progress, ProgressTrackedImValProm, StringStatus};
61///let mut oneshot_progress = ProgressTrackedImValProm::new( |s| { ImmediateValuePromise::new(
62///  async move {
63///  //send some initial status
64///    s.send(StringStatus::new(
65///      Progress::from_percent(0.0),
66///      "Initializing".into(),
67///    )).await.unwrap();
68///    // do some long running operation
69///    for i in 0..100 {
70///      tokio::time::sleep(Duration::from_millis(50)).await;
71///      s.send(StringStatus::new(
72///        Progress::from_percent(i as f64),
73///        Cow::Borrowed("In progress"))).await.unwrap();
74///    }
75///    Ok(34)
76///  })}, 2000);
77///  assert!(matches!(
78///    oneshot_progress.poll_state(),
79///    ImmediateValueState::Updating));
80///   //waiting and polling will yield "In progress" now :)
81/// ```
82///
83pub struct ProgressTrackedImValProm<T: Send, M> {
84    promise: ImmediateValuePromise<T>,
85    status: Vec<Status<M>>,
86    receiver: Receiver<Status<M>>,
87}
88
89impl<T: Send + 'static, M> ProgressTrackedImValProm<T, M> {
90    /// create a new Progress tracked immediate value promise.
91    pub fn new(
92        creator: impl FnOnce(Sender<Status<M>>) -> ImmediateValuePromise<T>,
93        buffer: usize,
94    ) -> Self {
95        let (sender, receiver) = tokio::sync::mpsc::channel(buffer);
96        ProgressTrackedImValProm {
97            receiver,
98            status: Vec::new(),
99            promise: creator(sender),
100        }
101    }
102
103    /// Slice of all recorded [`Status`] changes
104    pub fn status_history(&self) -> &[Status<M>] {
105        &self.status
106    }
107
108    /// Get the last [`Status`] if there is any
109    pub fn last_status(&self) -> Option<&Status<M>> {
110        self.status.last()
111    }
112
113    /// Is our future already finished?
114    pub fn finished(&self) -> bool {
115        self.promise.get_value().is_some()
116    }
117
118    /// Poll the state and process the messages
119    pub fn poll_state(&mut self) -> &ImmediateValueState<T> {
120        while let Ok(msg) = self.receiver.try_recv() {
121            self.status.push(msg);
122        }
123        self.promise.poll_state()
124    }
125
126    /// Get the current progress
127    pub fn get_progress(&self) -> Progress {
128        self.status.last().map(|p| p.progress).unwrap_or_default()
129    }
130}
131
132impl<T: Send + 'static, M> DirectCacheAccess<T, BoxedSendError> for ProgressTrackedImValProm<T, M> {
133    fn get_value_mut(&mut self) -> Option<&mut T> {
134        self.promise.get_value_mut()
135    }
136    fn get_value(&self) -> Option<&T> {
137        self.promise.get_value()
138    }
139    fn get_result(&self) -> Option<Result<&T, &BoxedSendError>> {
140        self.promise.get_result()
141    }
142    fn take_value(&mut self) -> Option<T> {
143        self.promise.take_value()
144    }
145    fn take_result(&mut self) -> Option<Result<T, BoxedSendError>> {
146        self.promise.take_result()
147    }
148}
149#[cfg(test)]
150mod test {
151    use super::*;
152    use crate::ImmediateValuePromise;
153    use std::time::Duration;
154    #[tokio::test]
155    async fn basic_usage_cycle() {
156        let mut oneshot_progress = ProgressTrackedImValProm::new(
157            |s| {
158                ImmediateValuePromise::new(async move {
159                    s.send(StringStatus::from_str(
160                        Progress::from_percent(0.0),
161                        "Initializing",
162                    ))
163                    .await
164                    .unwrap();
165                    tokio::time::sleep(Duration::from_millis(25)).await;
166                    s.send(StringStatus::new(
167                        Progress::from_percent(50.0),
168                        "processing".into(),
169                    ))
170                    .await
171                    .unwrap();
172                    tokio::time::sleep(Duration::from_millis(25)).await;
173
174                    s.send(StringStatus::from_string(
175                        Progress::from_percent(100.0),
176                        format!("Done"),
177                    ))
178                    .await
179                    .unwrap();
180                    Ok(34)
181                })
182            },
183            2000,
184        );
185        assert!(matches!(
186            oneshot_progress.poll_state(),
187            ImmediateValueState::Updating
188        ));
189        assert!(!oneshot_progress.finished());
190
191        assert_eq!(*oneshot_progress.get_progress(), 0.0);
192        tokio::time::sleep(Duration::from_millis(100)).await;
193        let _ = oneshot_progress.poll_state();
194        assert_eq!(*oneshot_progress.get_progress(), 1.0);
195        let result = oneshot_progress.poll_state();
196
197        if let ImmediateValueState::Success(val) = result {
198            assert_eq!(*val, 34);
199        } else {
200            unreachable!();
201        }
202        // check finished
203        assert!(oneshot_progress.finished());
204        let history = oneshot_progress.status_history();
205        assert_eq!(history.len(), 3);
206
207        // check direct cache access trait
208        let val = oneshot_progress.get_value().unwrap();
209        assert_eq!(*val, 34);
210        let val = oneshot_progress.get_value_mut().unwrap();
211        *val = 33;
212        assert_eq!(*oneshot_progress.get_value().unwrap(), 33);
213        let val = oneshot_progress.take_value().unwrap();
214        assert_eq!(val, 33);
215        assert!(oneshot_progress.get_value().is_none());
216    }
217}