cloud_copy/
cli.rs

1//! Utility code for CLI implementations.
2
3use std::collections::HashMap;
4use std::fmt;
5
6use chrono::TimeDelta;
7use tokio::sync::broadcast::error::RecvError;
8use tokio::sync::broadcast::{self};
9use tokio_util::sync::CancellationToken;
10use tracing::Span;
11use tracing::warn_span;
12use tracing_indicatif::span_ext::IndicatifSpanExt;
13use tracing_indicatif::style::ProgressStyle;
14
15use crate::TransferEvent;
16
17/// Extension methods for [`TimeDelta`].
18#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
19pub trait TimeDeltaExt {
20    /// Returns a display implementation for `TimeDelta` that displays days,
21    /// hours, minutes, and seconds in english.
22    fn english(&self) -> impl fmt::Display;
23}
24
25impl TimeDeltaExt for TimeDelta {
26    fn english(&self) -> impl fmt::Display {
27        struct Display(TimeDelta);
28
29        impl fmt::Display for Display {
30            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31                if self.0.num_seconds() == 0 {
32                    return write!(f, "0 seconds");
33                }
34
35                let days = self.0.num_days();
36                let hours = self.0.num_hours() - (days * 24);
37                let minutes = self.0.num_minutes() - (days * 24 * 60) - (hours * 60);
38                let seconds = self.0.num_seconds()
39                    - -(days * 24 * 60 * 60)
40                    - (hours * 60 * 60)
41                    - (minutes * 60);
42
43                if days > 0 {
44                    write!(f, "{days} day{s}", s = if days == 1 { "" } else { "s" })?;
45                }
46
47                if hours > 0 {
48                    if days > 0 {
49                        write!(f, ", ")?;
50                    }
51
52                    write!(f, "{hours} hour{s}", s = if hours == 1 { "" } else { "s" })?;
53                }
54
55                if minutes > 0 {
56                    if days > 0 || hours > 0 {
57                        write!(f, ", ")?;
58                    }
59
60                    write!(
61                        f,
62                        "{minutes} minute{s}",
63                        s = if minutes == 1 { "" } else { "s" }
64                    )?;
65                }
66
67                if seconds > 0 {
68                    if days > 0 || hours > 0 || minutes > 0 {
69                        write!(f, ", ")?;
70                    }
71
72                    write!(
73                        f,
74                        "{seconds} second{s}",
75                        s = if seconds == 1 { "" } else { "s" }
76                    )?;
77                }
78
79                Ok(())
80            }
81        }
82
83        Display(*self)
84    }
85}
86
87/// Represents statistics from transferring files.
88#[cfg(feature = "cli")]
89#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
90#[derive(Debug, Clone, Copy, Default)]
91pub struct TransferStats {
92    /// The number of files that were transferred.
93    pub files: usize,
94    /// The total number of bytes transferred for all files.
95    pub bytes: u64,
96}
97
98/// Handles events that may occur during a copy operation.
99///
100/// This is responsible for showing and updating progress bars for files
101/// being transferred.
102///
103/// Used from CLI implementations.
104///
105/// Returns `None` if the event stream lagged and reliable statistics aren't
106/// available.
107///
108/// Returns `Some` if the event stream did not lag and reliable statistics are
109/// available.
110#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
111pub async fn handle_events(
112    mut events: broadcast::Receiver<TransferEvent>,
113    cancel: CancellationToken,
114) -> Option<TransferStats> {
115    struct BlockTransferState {
116        /// The number of bytes that were transferred for the block.
117        transferred: u64,
118    }
119
120    struct TransferState {
121        /// The progress bar to display for a transfer.
122        bar: Span,
123        /// The total number of bytes transferred.
124        transferred: u64,
125        /// Block transfer state.
126        block_transfers: HashMap<u64, BlockTransferState>,
127    }
128
129    let mut indeterminate = None;
130    let mut transfers = HashMap::new();
131    let mut stats = TransferStats::default();
132
133    loop {
134        tokio::select! {
135            _ = cancel.cancelled() => break,
136            event = events.recv() => match event {
137                Ok(event) if indeterminate.is_none() => match event {
138                    TransferEvent::TransferStarted { id, path, size, .. } => {
139                        let bar = warn_span!("progress");
140
141                        let style = match size {
142                            Some(size) => {
143                                bar.pb_set_length(size);
144                                ProgressStyle::with_template(
145                                    "[{elapsed_precise:.cyan/blue}] {bar:40.cyan/blue} \
146                                    {bytes:.cyan/blue} / {total_bytes:.cyan/blue} \
147                                    ({bytes_per_sec:.cyan/blue}) [ETA {eta_precise:.cyan/blue}]: \
148                                    {msg}",
149                                )
150                                .unwrap()
151                            }
152                            None => ProgressStyle::with_template(
153                                "[{elapsed_precise:.cyan/blue}] {spinner:.cyan/blue} \
154                                {bytes:.cyan/blue} ({bytes_per_sec:.cyan/blue}): {msg}",
155                            )
156                            .unwrap(),
157                        };
158
159                        bar.pb_set_style(&style);
160                        bar.pb_set_message(path.to_str().unwrap_or("<path not UTF-8>"));
161                        bar.pb_start();
162                        transfers.insert(
163                            id,
164                            TransferState {
165                                bar,
166                                transferred: 0,
167                                block_transfers: HashMap::new(),
168                            },
169                        );
170                    }
171                    TransferEvent::BlockStarted { id, block, .. } => {
172                        if let Some(transfer) = transfers.get_mut(&id) {
173                            transfer
174                                .block_transfers
175                                .insert(block, BlockTransferState { transferred: 0 });
176                        }
177                    }
178                    TransferEvent::BlockProgress {
179                        id,
180                        block,
181                        transferred,
182                    } => {
183                        if let Some(transfer) = transfers.get_mut(&id)
184                            && let Some(block) = transfer.block_transfers.get_mut(&block)
185                        {
186                            transfer.transferred += transferred - block.transferred;
187                            block.transferred = transferred;
188                            transfer.bar.pb_set_position(transfer.transferred);
189                        }
190                    }
191                    TransferEvent::BlockCompleted { id, block, failed } => {
192                        if let Some(transfer) = transfers.get_mut(&id)
193                            && let Some(block) = transfer.block_transfers.get_mut(&block)
194                        {
195                            if failed {
196                                transfer.transferred -= block.transferred;
197                            }
198
199                            transfer.bar.pb_set_position(transfer.transferred);
200                        }
201                    }
202                    TransferEvent::TransferCompleted { id, failed } => {
203                        if let Some(transfer) = transfers.remove(&id)
204                            && !failed
205                        {
206                            stats.files += 1;
207                            stats.bytes += transfer.transferred;
208                        }
209                    }
210                },
211                Ok(_) => continue,
212                Err(RecvError::Closed) => break,
213                Err(RecvError::Lagged(_)) => {
214                    // Clear the state to remove existing progress bars
215                    transfers.clear();
216
217                    // Show a single spinner progress bar for the remainder of the stream
218                    let bar = warn_span!("progress");
219                    bar.pb_set_style(
220                        &ProgressStyle::with_template(
221                            "{spinner:.cyan/blue} transfer progress is unavailable due to missed events",
222                        )
223                        .unwrap(),
224                    );
225                    bar.pb_start();
226
227                    indeterminate = Some(bar);
228                }
229            }
230        }
231    }
232
233    if indeterminate.is_none() {
234        Some(stats)
235    } else {
236        None
237    }
238}