Skip to main content

forest/utils/io/
progress_log.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! It can often take time to perform some operations in Forest and we would like to have a way for logging progress.
5//!
6//! Previously we used progress bars thanks to the [`indicatif`](https://crates.io/crates/indicatif) library but we had a few issues with them:
7//! - They behaved poorly together with regular logging
8//! - They were too verbose and printed even for very small tasks (less than 5 seconds)
9//! - They were only used when connected to a TTY and not written in log files
10//!
11//! This lead us to develop our own logging code.
12//! This module provides two new types for logging progress that are [`WithProgress`] and [`WithProgressRaw`].
13//! The main goal of [`WithProgressRaw`] is to maintain a similar API to the previous one from progress bar so we could remove the [`indicatif`](https://crates.io/crates/indicatif) dependency,
14//! but, gradually, we would like to move to something better and use the [`WithProgress`] type.
15//! The [`WithProgress`] type will provide a way to wrap user code while handling logging presentation details.
16//! [`WithProgress`] is a wrapper that should extend to Iterators, Streams, Read/Write types. Right now it only wraps async reads.
17//!
18//! # Example
19//! ```
20//! use tokio_test::block_on;
21//! use tokio::io::AsyncBufReadExt;
22//! use forest::doctest_private::WithProgress;
23//! block_on(async {
24//!     let data: String = "some very big string".into();
25//!     let mut reader = tokio::io::BufReader::new(data.as_bytes());
26//!     let len = 0; // Compute total read length or find of way to estimate it
27//!     // We just need to wrap our reader and use the wrapped version
28//!     let reader_wp = tokio::io::BufReader::new(WithProgress::wrap_sync_read_with_callback("reading", reader, len, None));
29//!     let mut stream = reader_wp.lines();
30//!     while let Some(line) = stream.next_line().await.unwrap() {
31//!         // Do something with the line
32//!     }
33//! })
34//! ```
35//! # Future work
36//! - Add and move progressively to new API (Iterator, Streams), and removed deprecated usage of [`WithProgressRaw`]
37//! - Add a more accurate ETA etc
38
39use educe::Educe;
40use human_repr::HumanCount as _;
41use humantime::format_duration;
42use pin_project_lite::pin_project;
43use std::io;
44use std::pin::Pin;
45use std::sync::Arc;
46use std::task::{Context, Poll};
47use std::time::{Duration, Instant};
48use tokio::io::ReadBuf;
49
50const UPDATE_FREQUENCY: Duration = Duration::from_millis(5000);
51
52pin_project! {
53    #[derive(Debug, Clone)]
54    pub struct WithProgress<Inner> {
55        #[pin]
56        inner: Inner,
57        progress: Progress,
58    }
59}
60
61impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for WithProgress<R> {
62    fn poll_read(
63        self: Pin<&mut Self>,
64        cx: &mut Context<'_>,
65        buf: &mut ReadBuf<'_>,
66    ) -> Poll<io::Result<()>> {
67        let prev_len = buf.filled().len() as u64;
68        let this = self.project();
69        if let Poll::Ready(e) = this.inner.poll_read(cx, buf) {
70            this.progress.inc(buf.filled().len() as u64 - prev_len);
71            Poll::Ready(e)
72        } else {
73            Poll::Pending
74        }
75    }
76}
77
78impl<S> WithProgress<S> {
79    pub fn wrap_sync_read_with_callback(
80        message: &str,
81        read: S,
82        total_items: u64,
83        callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
84    ) -> WithProgress<S> {
85        WithProgress {
86            inner: read,
87            progress: Progress::new(message)
88                .with_callback(callback)
89                .with_total(total_items),
90        }
91    }
92
93    pub fn bytes(mut self) -> Self {
94        self.progress.item_type = ItemType::Bytes;
95        self
96    }
97}
98
99#[derive(Clone, Educe)]
100#[educe(Debug)]
101pub struct Progress {
102    completed_items: u64,
103    total_items: Option<u64>,
104    last_logged_items: u64,
105    start: Instant,
106    last_logged: Instant,
107    message: String,
108    item_type: ItemType,
109    #[educe(Debug(ignore))]
110    callback: Option<Arc<dyn Fn(String) + Send + Sync>>,
111}
112
113#[derive(Debug, Clone, Copy)]
114enum ItemType {
115    Bytes,
116    Items,
117}
118
119impl Progress {
120    fn new(message: &str) -> Self {
121        let now = Instant::now();
122        Self {
123            completed_items: 0,
124            last_logged_items: 0,
125            total_items: None,
126            start: now,
127            last_logged: now,
128            message: message.into(),
129            item_type: ItemType::Items,
130            callback: None,
131        }
132    }
133
134    fn with_callback(mut self, callback: Option<Arc<dyn Fn(String) + Sync + Send>>) -> Self {
135        self.callback = callback;
136        self
137    }
138
139    fn with_total(mut self, total: u64) -> Self {
140        self.total_items = Some(total);
141        self
142    }
143
144    fn inc(&mut self, value: u64) {
145        self.completed_items += value;
146
147        self.emit_log_if_required();
148    }
149
150    #[cfg(test)]
151    fn set(&mut self, value: u64) {
152        self.completed_items = value;
153
154        self.emit_log_if_required();
155    }
156
157    // Example output:
158    //
159    // Bytes, with total: 12.4 MiB / 1.2 GiB, 1%, 1.5 MiB/s, elapsed time: 8m 12s
160    // Bytes, without total: 12.4 MiB, 1.5 MiB/s, elapsed time: 8m 12s
161    // Items, with total: 12 / 1200, 1%, 1.5 items/s, elapsed time: 8m 12s
162    // Items, without total: 12, 1.5 items/s, elapsed time: 8m 12s
163    fn msg(&self, now: Instant) -> String {
164        let message = &self.message;
165        let elapsed_secs = (now - self.start).as_secs_f64();
166        let elapsed_duration = format_duration(Duration::from_secs(elapsed_secs as u64));
167        // limit minimum duration to 0.1s to avoid inifinities.
168        let seconds_since_last_msg = (now - self.last_logged).as_secs_f64().max(0.1);
169
170        let at = match self.item_type {
171            ItemType::Bytes => self.completed_items.human_count_bytes().to_string(),
172            ItemType::Items => self.completed_items.to_string(),
173        };
174
175        let total = if let Some(total) = self.total_items {
176            let mut output = String::new();
177            if total > 0 {
178                output += " / ";
179                output += &match self.item_type {
180                    ItemType::Bytes => total.human_count_bytes().to_string(),
181                    ItemType::Items => total.to_string(),
182                };
183                output += &format!(", {}%", self.completed_items * 100 / total);
184            }
185            output
186        } else {
187            String::new()
188        };
189
190        let diff = (self.completed_items - self.last_logged_items) as f64 / seconds_since_last_msg;
191        let speed = match self.item_type {
192            ItemType::Bytes => format!("{}/s", diff.human_count_bytes()),
193            ItemType::Items => format!("{diff:.0} items/s"),
194        };
195
196        format!("{message} {at}{total}, {speed}, elapsed time: {elapsed_duration}")
197    }
198
199    fn emit_log_if_required(&mut self) {
200        let now = Instant::now();
201        if (now - self.last_logged) > UPDATE_FREQUENCY {
202            let msg = self.msg(now);
203            if let Some(cb) = self.callback.as_ref() {
204                cb(msg.clone());
205            }
206
207            tracing::info!(
208                target: "forest::progress",
209                "{}",
210                msg
211            );
212            self.last_logged = now;
213            self.last_logged_items = self.completed_items;
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn test_progress_msg_bytes() {
224        let mut progress = Progress::new("test");
225        let now = progress.start;
226        progress.item_type = ItemType::Bytes;
227        progress.total_items = Some(1024 * 1024 * 1024);
228        progress.set(1024 * 1024 * 1024);
229        progress.last_logged_items = 1024 * 1024 * 1024 / 2;
230        // Going from 0MiB to 512MiB in 1s should show 512MiB/S
231        assert_eq!(
232            progress.msg(now + Duration::from_secs(1)),
233            "test 1 GiB / 1 GiB, 100%, 512 MiB/s, elapsed time: 1s"
234        );
235
236        progress.set(1024 * 1024 * 1024 / 2);
237        progress.last_logged_items = 1024 * 1024 * 128;
238        // Going from 128MiB to 512MiB in 125s should show (512MiB-128MiB)/125s = ~3.1 MiB/s
239        assert_eq!(
240            progress.msg(now + Duration::from_secs(125)),
241            "test 512 MiB / 1 GiB, 50%, 3.1 MiB/s, elapsed time: 2m 5s"
242        );
243
244        progress.set(1024 * 1024 * 1024 / 10);
245        progress.last_logged_items = 1024 * 1024;
246        // Going from 1MiB to 102.4MiB in 10s should show (102.4MiB-1MiB)/10s = ~10.1 MiB/s
247        assert_eq!(
248            progress.msg(now + Duration::from_secs(10)),
249            "test 102.4 MiB / 1 GiB, 9%, 10.1 MiB/s, elapsed time: 10s"
250        );
251    }
252
253    #[test]
254    fn test_progress_msg_items() {
255        let mut progress = Progress::new("test");
256        let now = progress.start;
257        progress.item_type = ItemType::Items;
258        progress.total_items = Some(1024);
259        progress.set(1024);
260        progress.last_logged_items = 1024 / 2;
261        assert_eq!(
262            progress.msg(now + Duration::from_secs(1)),
263            "test 1024 / 1024, 100%, 512 items/s, elapsed time: 1s"
264        );
265
266        progress.set(1024 / 2);
267        progress.last_logged_items = 1024 / 3;
268        assert_eq!(
269            progress.msg(now + Duration::from_secs(125)),
270            "test 512 / 1024, 50%, 1 items/s, elapsed time: 2m 5s"
271        );
272
273        progress.set(1024 / 10);
274        progress.last_logged_items = 0;
275        assert_eq!(
276            progress.msg(now + Duration::from_secs(10)),
277            "test 102 / 1024, 9%, 10 items/s, elapsed time: 10s"
278        );
279    }
280}