lazy_async_promise/
immediatevalueprogress.rs1use crate::{BoxedSendError, DirectCacheAccess, Progress};
2use crate::{ImmediateValuePromise, ImmediateValueState};
3use std::borrow::Cow;
4use std::time::Instant;
5use tokio::sync::mpsc::Receiver;
6use tokio::sync::mpsc::Sender;
7
8#[derive(Debug)]
11pub struct Status<M> {
12 pub time: Instant,
14 pub progress: Progress,
16 pub message: M,
18}
19
20impl<M> Status<M> {
21 pub fn new(progress: Progress, message: M) -> Self {
23 Self {
24 progress,
25 message,
26 time: Instant::now(),
27 }
28 }
29}
30
31pub type StringStatus = Status<Cow<'static, str>>;
33
34impl StringStatus {
35 pub fn from_str(progress: Progress, static_message: &'static str) -> Self {
37 StringStatus {
38 message: Cow::Borrowed(static_message),
39 time: Instant::now(),
40 progress,
41 }
42 }
43 pub fn from_string(progress: Progress, message: String) -> Self {
45 StringStatus {
46 message: Cow::Owned(message),
47 time: Instant::now(),
48 progress,
49 }
50 }
51}
52
53pub struct ProgressTrackedImValProm<T: Send, M> {
84 promise: ImmediateValuePromise<T>,
85 status: Vec<Status<M>>,
86 receiver: Receiver<Status<M>>,
87}
88
89impl<T: Send + 'static, M> ProgressTrackedImValProm<T, M> {
90 pub fn new(
92 creator: impl FnOnce(Sender<Status<M>>) -> ImmediateValuePromise<T>,
93 buffer: usize,
94 ) -> Self {
95 let (sender, receiver) = tokio::sync::mpsc::channel(buffer);
96 ProgressTrackedImValProm {
97 receiver,
98 status: Vec::new(),
99 promise: creator(sender),
100 }
101 }
102
103 pub fn status_history(&self) -> &[Status<M>] {
105 &self.status
106 }
107
108 pub fn last_status(&self) -> Option<&Status<M>> {
110 self.status.last()
111 }
112
113 pub fn finished(&self) -> bool {
115 self.promise.get_value().is_some()
116 }
117
118 pub fn poll_state(&mut self) -> &ImmediateValueState<T> {
120 while let Ok(msg) = self.receiver.try_recv() {
121 self.status.push(msg);
122 }
123 self.promise.poll_state()
124 }
125
126 pub fn get_progress(&self) -> Progress {
128 self.status.last().map(|p| p.progress).unwrap_or_default()
129 }
130}
131
132impl<T: Send + 'static, M> DirectCacheAccess<T, BoxedSendError> for ProgressTrackedImValProm<T, M> {
133 fn get_value_mut(&mut self) -> Option<&mut T> {
134 self.promise.get_value_mut()
135 }
136 fn get_value(&self) -> Option<&T> {
137 self.promise.get_value()
138 }
139 fn get_result(&self) -> Option<Result<&T, &BoxedSendError>> {
140 self.promise.get_result()
141 }
142 fn take_value(&mut self) -> Option<T> {
143 self.promise.take_value()
144 }
145 fn take_result(&mut self) -> Option<Result<T, BoxedSendError>> {
146 self.promise.take_result()
147 }
148}
149#[cfg(test)]
150mod test {
151 use super::*;
152 use crate::ImmediateValuePromise;
153 use std::time::Duration;
154 #[tokio::test]
155 async fn basic_usage_cycle() {
156 let mut oneshot_progress = ProgressTrackedImValProm::new(
157 |s| {
158 ImmediateValuePromise::new(async move {
159 s.send(StringStatus::from_str(
160 Progress::from_percent(0.0),
161 "Initializing",
162 ))
163 .await
164 .unwrap();
165 tokio::time::sleep(Duration::from_millis(25)).await;
166 s.send(StringStatus::new(
167 Progress::from_percent(50.0),
168 "processing".into(),
169 ))
170 .await
171 .unwrap();
172 tokio::time::sleep(Duration::from_millis(25)).await;
173
174 s.send(StringStatus::from_string(
175 Progress::from_percent(100.0),
176 format!("Done"),
177 ))
178 .await
179 .unwrap();
180 Ok(34)
181 })
182 },
183 2000,
184 );
185 assert!(matches!(
186 oneshot_progress.poll_state(),
187 ImmediateValueState::Updating
188 ));
189 assert!(!oneshot_progress.finished());
190
191 assert_eq!(*oneshot_progress.get_progress(), 0.0);
192 tokio::time::sleep(Duration::from_millis(100)).await;
193 let _ = oneshot_progress.poll_state();
194 assert_eq!(*oneshot_progress.get_progress(), 1.0);
195 let result = oneshot_progress.poll_state();
196
197 if let ImmediateValueState::Success(val) = result {
198 assert_eq!(*val, 34);
199 } else {
200 unreachable!();
201 }
202 assert!(oneshot_progress.finished());
204 let history = oneshot_progress.status_history();
205 assert_eq!(history.len(), 3);
206
207 let val = oneshot_progress.get_value().unwrap();
209 assert_eq!(*val, 34);
210 let val = oneshot_progress.get_value_mut().unwrap();
211 *val = 33;
212 assert_eq!(*oneshot_progress.get_value().unwrap(), 33);
213 let val = oneshot_progress.take_value().unwrap();
214 assert_eq!(val, 33);
215 assert!(oneshot_progress.get_value().is_none());
216 }
217}