cloud_copy/
cli.rs

1//! Utility code for CLI implementations.
2
3use std::collections::HashMap;
4use std::fmt;
5
6use chrono::TimeDelta;
7use tokio::sync::broadcast::error::RecvError;
8use tokio::sync::broadcast::{self};
9use tokio_util::sync::CancellationToken;
10use tracing::Span;
11use tracing::warn;
12use tracing::warn_span;
13use tracing_indicatif::span_ext::IndicatifSpanExt;
14use tracing_indicatif::style::ProgressStyle;
15
16use crate::TransferEvent;
17
18/// Extension methods for [`TimeDelta`].
19#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
20pub trait TimeDeltaExt {
21    /// Returns a display implementation for `TimeDelta` that displays days,
22    /// hours, minutes, and seconds in english.
23    fn english(&self) -> impl fmt::Display;
24}
25
26impl TimeDeltaExt for TimeDelta {
27    fn english(&self) -> impl fmt::Display {
28        struct Display(TimeDelta);
29
30        impl fmt::Display for Display {
31            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32                if self.0.num_seconds() == 0 {
33                    return write!(f, "0 seconds");
34                }
35
36                let days = self.0.num_days();
37                let hours = self.0.num_hours() - (days * 24);
38                let minutes = self.0.num_minutes() - (days * 24 * 60) - (hours * 60);
39                let seconds = self.0.num_seconds()
40                    - -(days * 24 * 60 * 60)
41                    - (hours * 60 * 60)
42                    - (minutes * 60);
43
44                if days > 0 {
45                    write!(f, "{days} day{s}", s = if days == 1 { "" } else { "s" })?;
46                }
47
48                if hours > 0 {
49                    if days > 0 {
50                        write!(f, ", ")?;
51                    }
52
53                    write!(f, "{hours} hour{s}", s = if hours == 1 { "" } else { "s" })?;
54                }
55
56                if minutes > 0 {
57                    if days > 0 || hours > 0 {
58                        write!(f, ", ")?;
59                    }
60
61                    write!(
62                        f,
63                        "{minutes} minute{s}",
64                        s = if minutes == 1 { "" } else { "s" }
65                    )?;
66                }
67
68                if seconds > 0 {
69                    if days > 0 || hours > 0 || minutes > 0 {
70                        write!(f, ", ")?;
71                    }
72
73                    write!(
74                        f,
75                        "{seconds} second{s}",
76                        s = if seconds == 1 { "" } else { "s" }
77                    )?;
78                }
79
80                Ok(())
81            }
82        }
83
84        Display(*self)
85    }
86}
87
88/// Represents statistics from transferring files.
89#[cfg(feature = "cli")]
90#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
91#[derive(Debug, Clone, Copy, Default)]
92pub struct TransferStats {
93    /// The number of files that were transferred.
94    pub files: usize,
95    /// The total number of bytes transferred for all files.
96    pub bytes: u64,
97}
98
99/// Handles events that may occur during a copy operation.
100///
101/// This is responsible for showing and updating progress bars for files
102/// being transferred.
103///
104/// Used from CLI implementations.
105#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
106pub async fn handle_events(
107    mut events: broadcast::Receiver<TransferEvent>,
108    cancel: CancellationToken,
109) -> TransferStats {
110    struct BlockTransferState {
111        /// The number of bytes that were transferred for the block.
112        transferred: u64,
113    }
114
115    struct TransferState {
116        /// The progress bar to display for a transfer.
117        bar: Span,
118        /// The total number of bytes transferred.
119        transferred: u64,
120        /// Block transfer state.
121        block_transfers: HashMap<u64, BlockTransferState>,
122    }
123
124    let mut transfers = HashMap::new();
125    let mut warned = false;
126    let mut stats = TransferStats::default();
127
128    loop {
129        tokio::select! {
130            _ = cancel.cancelled() => break,
131            event = events.recv() => match event {
132                Ok(TransferEvent::TransferStarted { id, path, size, .. }) => {
133                    let bar = warn_span!("progress");
134
135                    let style = match size {
136                        Some(size) => {
137                            bar.pb_set_length(size);
138                            ProgressStyle::with_template(
139                                "[{elapsed_precise:.cyan/blue}] {bar:40.cyan/blue} {bytes:.cyan/blue} / {total_bytes:.cyan/blue} ({bytes_per_sec:.cyan/blue}) [ETA {eta_precise:.cyan/blue}]: {msg}",
140                            ).unwrap()
141                        }
142                        None => {
143                            ProgressStyle::with_template(
144                                "[{elapsed_precise:.cyan/blue}] {spinner:.cyan/blue} {bytes:.cyan/blue} ({bytes_per_sec:.cyan/blue}): {msg}",
145                            ).unwrap()
146                        }
147                    };
148
149                    bar.pb_set_style(&style);
150                    bar.pb_set_message(path.to_str().unwrap_or("<path not UTF-8>"));
151                    bar.pb_start();
152                    transfers.insert(id, TransferState { bar, transferred: 0, block_transfers: HashMap::new() });
153                }
154                Ok(TransferEvent::BlockStarted { id, block, .. }) => {
155                    if let Some(transfer) = transfers.get_mut(&id) {
156                        transfer.block_transfers.insert(block, BlockTransferState { transferred: 0 });
157                    }
158                }
159                Ok(TransferEvent::BlockProgress { id, block, transferred }) => {
160                    if let Some(transfer) = transfers.get_mut(&id)
161                        && let Some(block) = transfer.block_transfers.get_mut(&block) {
162                            transfer.transferred += transferred - block.transferred;
163                            block.transferred = transferred;
164                            transfer.bar.pb_set_position(transfer.transferred);
165                        }
166                }
167                Ok(TransferEvent::BlockCompleted { id, block, failed }) => {
168                    if let Some(transfer) = transfers.get_mut(&id)
169                        && let Some(block) = transfer.block_transfers.get_mut(&block) {
170                            if failed {
171                                transfer.transferred -= block.transferred;
172                            }
173
174                            transfer.bar.pb_set_position(transfer.transferred);
175                        }
176                }
177                Ok(TransferEvent::TransferCompleted { id, failed }) => {
178                    if let Some(transfer) = transfers.remove(&id) && !failed {
179                        stats.files += 1;
180                        stats.bytes += transfer.transferred;
181                    }
182                }
183                Err(RecvError::Closed) => break,
184                Err(RecvError::Lagged(_)) => {
185                    if !warned {
186                        warn!("event stream is lagging: progress may be incorrect");
187                        warned = true;
188                    }
189                }
190            }
191        }
192    }
193
194    stats
195}