1use std::collections::HashMap;
4use std::fmt;
5
6use chrono::TimeDelta;
7use tokio::sync::broadcast::error::RecvError;
8use tokio::sync::broadcast::{self};
9use tokio_util::sync::CancellationToken;
10use tracing::Span;
11use tracing::warn;
12use tracing::warn_span;
13use tracing_indicatif::span_ext::IndicatifSpanExt;
14use tracing_indicatif::style::ProgressStyle;
15
16use crate::TransferEvent;
17
18#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
20pub trait TimeDeltaExt {
21 fn english(&self) -> impl fmt::Display;
24}
25
26impl TimeDeltaExt for TimeDelta {
27 fn english(&self) -> impl fmt::Display {
28 struct Display(TimeDelta);
29
30 impl fmt::Display for Display {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 if self.0.num_seconds() == 0 {
33 return write!(f, "0 seconds");
34 }
35
36 let days = self.0.num_days();
37 let hours = self.0.num_hours() - (days * 24);
38 let minutes = self.0.num_minutes() - (days * 24 * 60) - (hours * 60);
39 let seconds = self.0.num_seconds()
40 - -(days * 24 * 60 * 60)
41 - (hours * 60 * 60)
42 - (minutes * 60);
43
44 if days > 0 {
45 write!(f, "{days} day{s}", s = if days == 1 { "" } else { "s" })?;
46 }
47
48 if hours > 0 {
49 if days > 0 {
50 write!(f, ", ")?;
51 }
52
53 write!(f, "{hours} hour{s}", s = if hours == 1 { "" } else { "s" })?;
54 }
55
56 if minutes > 0 {
57 if days > 0 || hours > 0 {
58 write!(f, ", ")?;
59 }
60
61 write!(
62 f,
63 "{minutes} minute{s}",
64 s = if minutes == 1 { "" } else { "s" }
65 )?;
66 }
67
68 if seconds > 0 {
69 if days > 0 || hours > 0 || minutes > 0 {
70 write!(f, ", ")?;
71 }
72
73 write!(
74 f,
75 "{seconds} second{s}",
76 s = if seconds == 1 { "" } else { "s" }
77 )?;
78 }
79
80 Ok(())
81 }
82 }
83
84 Display(*self)
85 }
86}
87
88#[cfg(feature = "cli")]
90#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
91#[derive(Debug, Clone, Copy, Default)]
92pub struct TransferStats {
93 pub files: usize,
95 pub bytes: u64,
97}
98
99#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
106pub async fn handle_events(
107 mut events: broadcast::Receiver<TransferEvent>,
108 cancel: CancellationToken,
109) -> TransferStats {
110 struct BlockTransferState {
111 transferred: u64,
113 }
114
115 struct TransferState {
116 bar: Span,
118 transferred: u64,
120 block_transfers: HashMap<u64, BlockTransferState>,
122 }
123
124 let mut transfers = HashMap::new();
125 let mut warned = false;
126 let mut stats = TransferStats::default();
127
128 loop {
129 tokio::select! {
130 _ = cancel.cancelled() => break,
131 event = events.recv() => match event {
132 Ok(TransferEvent::TransferStarted { id, path, size, .. }) => {
133 let bar = warn_span!("progress");
134
135 let style = match size {
136 Some(size) => {
137 bar.pb_set_length(size);
138 ProgressStyle::with_template(
139 "[{elapsed_precise:.cyan/blue}] {bar:40.cyan/blue} {bytes:.cyan/blue} / {total_bytes:.cyan/blue} ({bytes_per_sec:.cyan/blue}) [ETA {eta_precise:.cyan/blue}]: {msg}",
140 ).unwrap()
141 }
142 None => {
143 ProgressStyle::with_template(
144 "[{elapsed_precise:.cyan/blue}] {spinner:.cyan/blue} {bytes:.cyan/blue} ({bytes_per_sec:.cyan/blue}): {msg}",
145 ).unwrap()
146 }
147 };
148
149 bar.pb_set_style(&style);
150 bar.pb_set_message(path.to_str().unwrap_or("<path not UTF-8>"));
151 bar.pb_start();
152 transfers.insert(id, TransferState { bar, transferred: 0, block_transfers: HashMap::new() });
153 }
154 Ok(TransferEvent::BlockStarted { id, block, .. }) => {
155 if let Some(transfer) = transfers.get_mut(&id) {
156 transfer.block_transfers.insert(block, BlockTransferState { transferred: 0 });
157 }
158 }
159 Ok(TransferEvent::BlockProgress { id, block, transferred }) => {
160 if let Some(transfer) = transfers.get_mut(&id)
161 && let Some(block) = transfer.block_transfers.get_mut(&block) {
162 transfer.transferred += transferred - block.transferred;
163 block.transferred = transferred;
164 transfer.bar.pb_set_position(transfer.transferred);
165 }
166 }
167 Ok(TransferEvent::BlockCompleted { id, block, failed }) => {
168 if let Some(transfer) = transfers.get_mut(&id)
169 && let Some(block) = transfer.block_transfers.get_mut(&block) {
170 if failed {
171 transfer.transferred -= block.transferred;
172 }
173
174 transfer.bar.pb_set_position(transfer.transferred);
175 }
176 }
177 Ok(TransferEvent::TransferCompleted { id, failed }) => {
178 if let Some(transfer) = transfers.remove(&id) && !failed {
179 stats.files += 1;
180 stats.bytes += transfer.transferred;
181 }
182 }
183 Err(RecvError::Closed) => break,
184 Err(RecvError::Lagged(_)) => {
185 if !warned {
186 warn!("event stream is lagging: progress may be incorrect");
187 warned = true;
188 }
189 }
190 }
191 }
192 }
193
194 stats
195}