forest/utils/io/
progress_log.rs1use educe::Educe;
40use human_repr::HumanCount as _;
41use humantime::format_duration;
42use pin_project_lite::pin_project;
43use std::io;
44use std::pin::Pin;
45use std::sync::Arc;
46use std::task::{Context, Poll};
47use std::time::{Duration, Instant};
48use tokio::io::ReadBuf;
49
50const UPDATE_FREQUENCY: Duration = Duration::from_millis(5000);
51
52pin_project! {
53 #[derive(Debug, Clone)]
54 pub struct WithProgress<Inner> {
55 #[pin]
56 inner: Inner,
57 progress: Progress,
58 }
59}
60
61impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for WithProgress<R> {
62 fn poll_read(
63 self: Pin<&mut Self>,
64 cx: &mut Context<'_>,
65 buf: &mut ReadBuf<'_>,
66 ) -> Poll<io::Result<()>> {
67 let prev_len = buf.filled().len() as u64;
68 let this = self.project();
69 if let Poll::Ready(e) = this.inner.poll_read(cx, buf) {
70 this.progress.inc(buf.filled().len() as u64 - prev_len);
71 Poll::Ready(e)
72 } else {
73 Poll::Pending
74 }
75 }
76}
77
78impl<S> WithProgress<S> {
79 pub fn wrap_sync_read_with_callback(
80 message: &str,
81 read: S,
82 total_items: u64,
83 callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
84 ) -> WithProgress<S> {
85 WithProgress {
86 inner: read,
87 progress: Progress::new(message)
88 .with_callback(callback)
89 .with_total(total_items),
90 }
91 }
92
93 pub fn bytes(mut self) -> Self {
94 self.progress.item_type = ItemType::Bytes;
95 self
96 }
97}
98
99#[derive(Clone, Educe)]
100#[educe(Debug)]
101pub struct Progress {
102 completed_items: u64,
103 total_items: Option<u64>,
104 last_logged_items: u64,
105 start: Instant,
106 last_logged: Instant,
107 message: String,
108 item_type: ItemType,
109 #[educe(Debug(ignore))]
110 callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
111}
112
113#[derive(Debug, Clone, Copy)]
114enum ItemType {
115 Bytes,
116 Items,
117}
118
119impl Progress {
120 fn new(message: &str) -> Self {
121 let now = Instant::now();
122 Self {
123 completed_items: 0,
124 last_logged_items: 0,
125 total_items: None,
126 start: now,
127 last_logged: now,
128 message: message.into(),
129 item_type: ItemType::Items,
130 callback: None,
131 }
132 }
133
134 fn with_callback(mut self, callback: Option<Arc<dyn Fn(String) + Sync + Send>>) -> Self {
135 self.callback = callback;
136 self
137 }
138
139 fn with_total(mut self, total: u64) -> Self {
140 self.total_items = Some(total);
141 self
142 }
143
144 fn inc(&mut self, value: u64) {
145 self.completed_items += value;
146
147 self.emit_log_if_required();
148 }
149
150 #[cfg(test)]
151 fn set(&mut self, value: u64) {
152 self.completed_items = value;
153
154 self.emit_log_if_required();
155 }
156
157 fn msg(&self, now: Instant) -> String {
164 let message = &self.message;
165 let elapsed_secs = (now - self.start).as_secs_f64();
166 let elapsed_duration = format_duration(Duration::from_secs(elapsed_secs as u64));
167 let seconds_since_last_msg = (now - self.last_logged).as_secs_f64().max(0.1);
169
170 let at = match self.item_type {
171 ItemType::Bytes => self.completed_items.human_count_bytes().to_string(),
172 ItemType::Items => self.completed_items.to_string(),
173 };
174
175 let total = if let Some(total) = self.total_items {
176 let mut output = String::new();
177 if total > 0 {
178 output += " / ";
179 output += &match self.item_type {
180 ItemType::Bytes => total.human_count_bytes().to_string(),
181 ItemType::Items => total.to_string(),
182 };
183 output += &format!(", {}%", self.completed_items * 100 / total);
184 }
185 output
186 } else {
187 String::new()
188 };
189
190 let diff = (self.completed_items - self.last_logged_items) as f64 / seconds_since_last_msg;
191 let speed = match self.item_type {
192 ItemType::Bytes => format!("{}/s", diff.human_count_bytes()),
193 ItemType::Items => format!("{diff:.0} items/s"),
194 };
195
196 format!("{message} {at}{total}, {speed}, elapsed time: {elapsed_duration}")
197 }
198
199 fn emit_log_if_required(&mut self) {
200 let now = Instant::now();
201 if (now - self.last_logged) > UPDATE_FREQUENCY {
202 let msg = self.msg(now);
203 if let Some(cb) = self.callback.as_ref() {
204 cb(msg.clone());
205 }
206
207 tracing::info!(
208 target: "forest::progress",
209 "{}",
210 msg
211 );
212 self.last_logged = now;
213 self.last_logged_items = self.completed_items;
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn test_progress_msg_bytes() {
224 let mut progress = Progress::new("test");
225 let now = progress.start;
226 progress.item_type = ItemType::Bytes;
227 progress.total_items = Some(1024 * 1024 * 1024);
228 progress.set(1024 * 1024 * 1024);
229 progress.last_logged_items = 1024 * 1024 * 1024 / 2;
230 assert_eq!(
232 progress.msg(now + Duration::from_secs(1)),
233 "test 1 GiB / 1 GiB, 100%, 512 MiB/s, elapsed time: 1s"
234 );
235
236 progress.set(1024 * 1024 * 1024 / 2);
237 progress.last_logged_items = 1024 * 1024 * 128;
238 assert_eq!(
240 progress.msg(now + Duration::from_secs(125)),
241 "test 512 MiB / 1 GiB, 50%, 3.1 MiB/s, elapsed time: 2m 5s"
242 );
243
244 progress.set(1024 * 1024 * 1024 / 10);
245 progress.last_logged_items = 1024 * 1024;
246 assert_eq!(
248 progress.msg(now + Duration::from_secs(10)),
249 "test 102.4 MiB / 1 GiB, 9%, 10.1 MiB/s, elapsed time: 10s"
250 );
251 }
252
253 #[test]
254 fn test_progress_msg_items() {
255 let mut progress = Progress::new("test");
256 let now = progress.start;
257 progress.item_type = ItemType::Items;
258 progress.total_items = Some(1024);
259 progress.set(1024);
260 progress.last_logged_items = 1024 / 2;
261 assert_eq!(
262 progress.msg(now + Duration::from_secs(1)),
263 "test 1024 / 1024, 100%, 512 items/s, elapsed time: 1s"
264 );
265
266 progress.set(1024 / 2);
267 progress.last_logged_items = 1024 / 3;
268 assert_eq!(
269 progress.msg(now + Duration::from_secs(125)),
270 "test 512 / 1024, 50%, 1 items/s, elapsed time: 2m 5s"
271 );
272
273 progress.set(1024 / 10);
274 progress.last_logged_items = 0;
275 assert_eq!(
276 progress.msg(now + Duration::from_secs(10)),
277 "test 102 / 1024, 9%, 10 items/s, elapsed time: 10s"
278 );
279 }
280}