forest/utils/io/
progress_log.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! It can often take time to perform some operations in Forest and we would like to have a way for logging progress.
5//!
6//! Previously we used progress bars thanks to the [`indicatif`](https://crates.io/crates/indicatif) library but we had a few issues with them:
7//! - They behaved poorly together with regular logging
8//! - They were too verbose and printed even for very small tasks (less than 5 seconds)
9//! - They were only used when connected to a TTY and not written in log files
10//!
11//! This lead us to develop our own logging code.
12//! This module provides two new types for logging progress that are [`WithProgress`] and [`WithProgressRaw`].
13//! The main goal of [`WithProgressRaw`] is to maintain a similar API to the previous one from progress bar so we could remove the [`indicatif`](https://crates.io/crates/indicatif) dependency,
14//! but, gradually, we would like to move to something better and use the [`WithProgress`] type.
15//! The [`WithProgress`] type will provide a way to wrap user code while handling logging presentation details.
16//! [`WithProgress`] is a wrapper that should extend to Iterators, Streams, Read/Write types. Right now it only wraps async reads.
17//!
18//! # Example
19//! ```
20//! use tokio_test::block_on;
21//! use tokio::io::AsyncBufReadExt;
22//! use forest::doctest_private::WithProgress;
23//! block_on(async {
24//!     let data: String = "some very big string".into();
25//!     let mut reader = tokio::io::BufReader::new(data.as_bytes());
26//!     let len = 0; // Compute total read length or find of way to estimate it
27//!     // We just need to wrap our reader and use the wrapped version
28//!     let reader_wp = tokio::io::BufReader::new(WithProgress::wrap_sync_read_with_callback("reading", reader, len, None));
29//!     let mut stream = reader_wp.lines();
30//!     while let Some(line) = stream.next_line().await.unwrap() {
31//!         // Do something with the line
32//!     }
33//! })
34//! ```
35//! # Future work
36//! - Add and move progressively to new API (Iterator, Streams), and removed deprecated usage of [`WithProgressRaw`]
37//! - Add a more accurate ETA etc
38
39use human_bytes::human_bytes;
40use humantime::format_duration;
41use std::time::{Duration, Instant};
42
43use educe::Educe;
44use pin_project_lite::pin_project;
45use std::io;
46use std::pin::Pin;
47use std::sync::Arc;
48use std::task::{Context, Poll};
49use tokio::io::ReadBuf;
50
51const UPDATE_FREQUENCY: Duration = Duration::from_millis(5000);
52
53pin_project! {
54    #[derive(Debug, Clone)]
55    pub struct WithProgress<Inner> {
56        #[pin]
57        inner: Inner,
58        progress: Progress,
59    }
60}
61
62impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for WithProgress<R> {
63    fn poll_read(
64        self: Pin<&mut Self>,
65        cx: &mut Context<'_>,
66        buf: &mut ReadBuf<'_>,
67    ) -> Poll<io::Result<()>> {
68        let prev_len = buf.filled().len() as u64;
69        let this = self.project();
70        if let Poll::Ready(e) = this.inner.poll_read(cx, buf) {
71            this.progress.inc(buf.filled().len() as u64 - prev_len);
72            Poll::Ready(e)
73        } else {
74            Poll::Pending
75        }
76    }
77}
78
79impl<S> WithProgress<S> {
80    pub fn wrap_sync_read_with_callback(
81        message: &str,
82        read: S,
83        total_items: u64,
84        callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
85    ) -> WithProgress<S> {
86        WithProgress {
87            inner: read,
88            progress: Progress::new(message)
89                .with_callback(callback)
90                .with_total(total_items),
91        }
92    }
93
94    pub fn bytes(mut self) -> Self {
95        self.progress.item_type = ItemType::Bytes;
96        self
97    }
98}
99
100#[derive(Clone, Educe)]
101#[educe(Debug)]
102pub struct Progress {
103    completed_items: u64,
104    total_items: Option<u64>,
105    last_logged_items: u64,
106    start: Instant,
107    last_logged: Instant,
108    message: String,
109    item_type: ItemType,
110    #[educe(Debug(ignore))]
111    callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
112}
113
114#[derive(Debug, Clone, Copy)]
115enum ItemType {
116    Bytes,
117    Items,
118}
119
120impl Progress {
121    fn new(message: &str) -> Self {
122        let now = Instant::now();
123        Self {
124            completed_items: 0,
125            last_logged_items: 0,
126            total_items: None,
127            start: now,
128            last_logged: now,
129            message: message.into(),
130            item_type: ItemType::Items,
131            callback: None,
132        }
133    }
134
135    fn with_callback(mut self, callback: Option<Arc<dyn Fn(String) + Sync + Send>>) -> Self {
136        self.callback = callback;
137        self
138    }
139
140    fn with_total(mut self, total: u64) -> Self {
141        self.total_items = Some(total);
142        self
143    }
144
145    fn inc(&mut self, value: u64) {
146        self.completed_items += value;
147
148        self.emit_log_if_required();
149    }
150
151    #[cfg(test)]
152    fn set(&mut self, value: u64) {
153        self.completed_items = value;
154
155        self.emit_log_if_required();
156    }
157
158    // Example output:
159    //
160    // Bytes, with total: 12.4 MiB / 1.2 GiB, 1%, 1.5 MiB/s, elapsed time: 8m 12s
161    // Bytes, without total: 12.4 MiB, 1.5 MiB/s, elapsed time: 8m 12s
162    // Items, with total: 12 / 1200, 1%, 1.5 items/s, elapsed time: 8m 12s
163    // Items, without total: 12, 1.5 items/s, elapsed time: 8m 12s
164    fn msg(&self, now: Instant) -> String {
165        let message = &self.message;
166        let elapsed_secs = (now - self.start).as_secs_f64();
167        let elapsed_duration = format_duration(Duration::from_secs(elapsed_secs as u64));
168        // limit minimum duration to 0.1s to avoid inifinities.
169        let seconds_since_last_msg = (now - self.last_logged).as_secs_f64().max(0.1);
170
171        let at = match self.item_type {
172            ItemType::Bytes => human_bytes(self.completed_items as f64),
173            ItemType::Items => self.completed_items.to_string(),
174        };
175
176        let total = if let Some(total) = self.total_items {
177            let mut output = String::new();
178            if total > 0 {
179                output += " / ";
180                output += &match self.item_type {
181                    ItemType::Bytes => human_bytes(total as f64),
182                    ItemType::Items => total.to_string(),
183                };
184                output += &format!(", {}%", self.completed_items * 100 / total);
185            }
186            output
187        } else {
188            String::new()
189        };
190
191        let diff = (self.completed_items - self.last_logged_items) as f64 / seconds_since_last_msg;
192        let speed = match self.item_type {
193            ItemType::Bytes => format!("{}/s", human_bytes(diff)),
194            ItemType::Items => format!("{diff:.0} items/s"),
195        };
196
197        format!("{message} {at}{total}, {speed}, elapsed time: {elapsed_duration}")
198    }
199
200    fn emit_log_if_required(&mut self) {
201        let now = Instant::now();
202        if (now - self.last_logged) > UPDATE_FREQUENCY {
203            let msg = self.msg(now);
204            if let Some(cb) = self.callback.as_ref() {
205                cb(msg.clone());
206            }
207
208            tracing::info!(
209                target: "forest::progress",
210                "{}",
211                msg
212            );
213            self.last_logged = now;
214            self.last_logged_items = self.completed_items;
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn test_progress_msg_bytes() {
225        let mut progress = Progress::new("test");
226        let now = progress.start;
227        progress.item_type = ItemType::Bytes;
228        progress.total_items = Some(1024 * 1024 * 1024);
229        progress.set(1024 * 1024 * 1024);
230        progress.last_logged_items = 1024 * 1024 * 1024 / 2;
231        // Going from 0MiB to 512MiB in 1s should show 512MiB/S
232        assert_eq!(
233            progress.msg(now + Duration::from_secs(1)),
234            "test 1 GiB / 1 GiB, 100%, 512 MiB/s, elapsed time: 1s"
235        );
236
237        progress.set(1024 * 1024 * 1024 / 2);
238        progress.last_logged_items = 1024 * 1024 * 128;
239        // Going from 128MiB to 512MiB in 125s should show (512MiB-128MiB)/125s = ~3.1 MiB/s
240        assert_eq!(
241            progress.msg(now + Duration::from_secs(125)),
242            "test 512 MiB / 1 GiB, 50%, 3.1 MiB/s, elapsed time: 2m 5s"
243        );
244
245        progress.set(1024 * 1024 * 1024 / 10);
246        progress.last_logged_items = 1024 * 1024;
247        // Going from 1MiB to 102.4MiB in 10s should show (102.4MiB-1MiB)/10s = ~10.1 MiB/s
248        assert_eq!(
249            progress.msg(now + Duration::from_secs(10)),
250            "test 102.4 MiB / 1 GiB, 9%, 10.1 MiB/s, elapsed time: 10s"
251        );
252    }
253
254    #[test]
255    fn test_progress_msg_items() {
256        let mut progress = Progress::new("test");
257        let now = progress.start;
258        progress.item_type = ItemType::Items;
259        progress.total_items = Some(1024);
260        progress.set(1024);
261        progress.last_logged_items = 1024 / 2;
262        assert_eq!(
263            progress.msg(now + Duration::from_secs(1)),
264            "test 1024 / 1024, 100%, 512 items/s, elapsed time: 1s"
265        );
266
267        progress.set(1024 / 2);
268        progress.last_logged_items = 1024 / 3;
269        assert_eq!(
270            progress.msg(now + Duration::from_secs(125)),
271            "test 512 / 1024, 50%, 1 items/s, elapsed time: 2m 5s"
272        );
273
274        progress.set(1024 / 10);
275        progress.last_logged_items = 0;
276        assert_eq!(
277            progress.msg(now + Duration::from_secs(10)),
278            "test 102 / 1024, 9%, 10 items/s, elapsed time: 10s"
279        );
280    }
281}