forest/utils/io/
progress_log.rs1use human_bytes::human_bytes;
40use humantime::format_duration;
41use std::time::{Duration, Instant};
42
43use educe::Educe;
44use pin_project_lite::pin_project;
45use std::io;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use tokio::io::ReadBuf;
50
51const UPDATE_FREQUENCY: Duration = Duration::from_millis(5000);
52
53pin_project! {
54 #[derive(Debug, Clone)]
55 pub struct WithProgress<Inner> {
56 #[pin]
57 inner: Inner,
58 progress: Progress,
59 }
60}
61
62impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for WithProgress<R> {
63 fn poll_read(
64 self: Pin<&mut Self>,
65 cx: &mut Context<'_>,
66 buf: &mut ReadBuf<'_>,
67 ) -> Poll<io::Result<()>> {
68 let prev_len = buf.filled().len() as u64;
69 let this = self.project();
70 if let Poll::Ready(e) = this.inner.poll_read(cx, buf) {
71 this.progress.inc(buf.filled().len() as u64 - prev_len);
72 Poll::Ready(e)
73 } else {
74 Poll::Pending
75 }
76 }
77}
78
79impl<S> WithProgress<S> {
80 pub fn wrap_sync_read_with_callback(
81 message: &str,
82 read: S,
83 total_items: u64,
84 callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
85 ) -> WithProgress<S> {
86 WithProgress {
87 inner: read,
88 progress: Progress::new(message)
89 .with_callback(callback)
90 .with_total(total_items),
91 }
92 }
93
94 pub fn bytes(mut self) -> Self {
95 self.progress.item_type = ItemType::Bytes;
96 self
97 }
98}
99
100#[derive(Clone, Educe)]
101#[educe(Debug)]
102pub struct Progress {
103 completed_items: u64,
104 total_items: Option<u64>,
105 last_logged_items: u64,
106 start: Instant,
107 last_logged: Instant,
108 message: String,
109 item_type: ItemType,
110 #[educe(Debug(ignore))]
111 callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
112}
113
114#[derive(Debug, Clone, Copy)]
115enum ItemType {
116 Bytes,
117 Items,
118}
119
120impl Progress {
121 fn new(message: &str) -> Self {
122 let now = Instant::now();
123 Self {
124 completed_items: 0,
125 last_logged_items: 0,
126 total_items: None,
127 start: now,
128 last_logged: now,
129 message: message.into(),
130 item_type: ItemType::Items,
131 callback: None,
132 }
133 }
134
135 fn with_callback(mut self, callback: Option<Arc<dyn Fn(String) + Sync + Send>>) -> Self {
136 self.callback = callback;
137 self
138 }
139
140 fn with_total(mut self, total: u64) -> Self {
141 self.total_items = Some(total);
142 self
143 }
144
145 fn inc(&mut self, value: u64) {
146 self.completed_items += value;
147
148 self.emit_log_if_required();
149 }
150
151 #[cfg(test)]
152 fn set(&mut self, value: u64) {
153 self.completed_items = value;
154
155 self.emit_log_if_required();
156 }
157
158 fn msg(&self, now: Instant) -> String {
165 let message = &self.message;
166 let elapsed_secs = (now - self.start).as_secs_f64();
167 let elapsed_duration = format_duration(Duration::from_secs(elapsed_secs as u64));
168 let seconds_since_last_msg = (now - self.last_logged).as_secs_f64().max(0.1);
170
171 let at = match self.item_type {
172 ItemType::Bytes => human_bytes(self.completed_items as f64),
173 ItemType::Items => self.completed_items.to_string(),
174 };
175
176 let total = if let Some(total) = self.total_items {
177 let mut output = String::new();
178 if total > 0 {
179 output += " / ";
180 output += &match self.item_type {
181 ItemType::Bytes => human_bytes(total as f64),
182 ItemType::Items => total.to_string(),
183 };
184 output += &format!(", {}%", self.completed_items * 100 / total);
185 }
186 output
187 } else {
188 String::new()
189 };
190
191 let diff = (self.completed_items - self.last_logged_items) as f64 / seconds_since_last_msg;
192 let speed = match self.item_type {
193 ItemType::Bytes => format!("{}/s", human_bytes(diff)),
194 ItemType::Items => format!("{diff:.0} items/s"),
195 };
196
197 format!("{message} {at}{total}, {speed}, elapsed time: {elapsed_duration}")
198 }
199
200 fn emit_log_if_required(&mut self) {
201 let now = Instant::now();
202 if (now - self.last_logged) > UPDATE_FREQUENCY {
203 let msg = self.msg(now);
204 if let Some(cb) = self.callback.as_ref() {
205 cb(msg.clone());
206 }
207
208 tracing::info!(
209 target: "forest::progress",
210 "{}",
211 msg
212 );
213 self.last_logged = now;
214 self.last_logged_items = self.completed_items;
215 }
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn test_progress_msg_bytes() {
225 let mut progress = Progress::new("test");
226 let now = progress.start;
227 progress.item_type = ItemType::Bytes;
228 progress.total_items = Some(1024 * 1024 * 1024);
229 progress.set(1024 * 1024 * 1024);
230 progress.last_logged_items = 1024 * 1024 * 1024 / 2;
231 assert_eq!(
233 progress.msg(now + Duration::from_secs(1)),
234 "test 1 GiB / 1 GiB, 100%, 512 MiB/s, elapsed time: 1s"
235 );
236
237 progress.set(1024 * 1024 * 1024 / 2);
238 progress.last_logged_items = 1024 * 1024 * 128;
239 assert_eq!(
241 progress.msg(now + Duration::from_secs(125)),
242 "test 512 MiB / 1 GiB, 50%, 3.1 MiB/s, elapsed time: 2m 5s"
243 );
244
245 progress.set(1024 * 1024 * 1024 / 10);
246 progress.last_logged_items = 1024 * 1024;
247 assert_eq!(
249 progress.msg(now + Duration::from_secs(10)),
250 "test 102.4 MiB / 1 GiB, 9%, 10.1 MiB/s, elapsed time: 10s"
251 );
252 }
253
254 #[test]
255 fn test_progress_msg_items() {
256 let mut progress = Progress::new("test");
257 let now = progress.start;
258 progress.item_type = ItemType::Items;
259 progress.total_items = Some(1024);
260 progress.set(1024);
261 progress.last_logged_items = 1024 / 2;
262 assert_eq!(
263 progress.msg(now + Duration::from_secs(1)),
264 "test 1024 / 1024, 100%, 512 items/s, elapsed time: 1s"
265 );
266
267 progress.set(1024 / 2);
268 progress.last_logged_items = 1024 / 3;
269 assert_eq!(
270 progress.msg(now + Duration::from_secs(125)),
271 "test 512 / 1024, 50%, 1 items/s, elapsed time: 2m 5s"
272 );
273
274 progress.set(1024 / 10);
275 progress.last_logged_items = 0;
276 assert_eq!(
277 progress.msg(now + Duration::from_secs(10)),
278 "test 102 / 1024, 9%, 10 items/s, elapsed time: 10s"
279 );
280 }
281}