1use std::collections::HashMap;
4use std::fmt;
5
6use chrono::TimeDelta;
7use tokio::sync::broadcast::error::RecvError;
8use tokio::sync::broadcast::{self};
9use tokio_util::sync::CancellationToken;
10use tracing::Span;
11use tracing::warn_span;
12use tracing_indicatif::span_ext::IndicatifSpanExt;
13use tracing_indicatif::style::ProgressStyle;
14
15use crate::TransferEvent;
16
17#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
19pub trait TimeDeltaExt {
20 fn english(&self) -> impl fmt::Display;
23}
24
25impl TimeDeltaExt for TimeDelta {
26 fn english(&self) -> impl fmt::Display {
27 struct Display(TimeDelta);
28
29 impl fmt::Display for Display {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 if self.0.num_seconds() == 0 {
32 return write!(f, "0 seconds");
33 }
34
35 let days = self.0.num_days();
36 let hours = self.0.num_hours() - (days * 24);
37 let minutes = self.0.num_minutes() - (days * 24 * 60) - (hours * 60);
38 let seconds = self.0.num_seconds()
39 - -(days * 24 * 60 * 60)
40 - (hours * 60 * 60)
41 - (minutes * 60);
42
43 if days > 0 {
44 write!(f, "{days} day{s}", s = if days == 1 { "" } else { "s" })?;
45 }
46
47 if hours > 0 {
48 if days > 0 {
49 write!(f, ", ")?;
50 }
51
52 write!(f, "{hours} hour{s}", s = if hours == 1 { "" } else { "s" })?;
53 }
54
55 if minutes > 0 {
56 if days > 0 || hours > 0 {
57 write!(f, ", ")?;
58 }
59
60 write!(
61 f,
62 "{minutes} minute{s}",
63 s = if minutes == 1 { "" } else { "s" }
64 )?;
65 }
66
67 if seconds > 0 {
68 if days > 0 || hours > 0 || minutes > 0 {
69 write!(f, ", ")?;
70 }
71
72 write!(
73 f,
74 "{seconds} second{s}",
75 s = if seconds == 1 { "" } else { "s" }
76 )?;
77 }
78
79 Ok(())
80 }
81 }
82
83 Display(*self)
84 }
85}
86
87#[cfg(feature = "cli")]
89#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
90#[derive(Debug, Clone, Copy, Default)]
91pub struct TransferStats {
92 pub files: usize,
94 pub bytes: u64,
96}
97
98#[cfg_attr(docsrs, doc(cfg(feature = "cli")))]
111pub async fn handle_events(
112 mut events: broadcast::Receiver<TransferEvent>,
113 cancel: CancellationToken,
114) -> Option<TransferStats> {
115 struct BlockTransferState {
116 transferred: u64,
118 }
119
120 struct TransferState {
121 bar: Span,
123 transferred: u64,
125 block_transfers: HashMap<u64, BlockTransferState>,
127 }
128
129 let mut indeterminate = None;
130 let mut transfers = HashMap::new();
131 let mut stats = TransferStats::default();
132
133 loop {
134 tokio::select! {
135 _ = cancel.cancelled() => break,
136 event = events.recv() => match event {
137 Ok(event) if indeterminate.is_none() => match event {
138 TransferEvent::TransferStarted { id, path, size, .. } => {
139 let bar = warn_span!("progress");
140
141 let style = match size {
142 Some(size) => {
143 bar.pb_set_length(size);
144 ProgressStyle::with_template(
145 "[{elapsed_precise:.cyan/blue}] {bar:40.cyan/blue} \
146 {bytes:.cyan/blue} / {total_bytes:.cyan/blue} \
147 ({bytes_per_sec:.cyan/blue}) [ETA {eta_precise:.cyan/blue}]: \
148 {msg}",
149 )
150 .unwrap()
151 }
152 None => ProgressStyle::with_template(
153 "[{elapsed_precise:.cyan/blue}] {spinner:.cyan/blue} \
154 {bytes:.cyan/blue} ({bytes_per_sec:.cyan/blue}): {msg}",
155 )
156 .unwrap(),
157 };
158
159 bar.pb_set_style(&style);
160 bar.pb_set_message(path.to_str().unwrap_or("<path not UTF-8>"));
161 bar.pb_start();
162 transfers.insert(
163 id,
164 TransferState {
165 bar,
166 transferred: 0,
167 block_transfers: HashMap::new(),
168 },
169 );
170 }
171 TransferEvent::BlockStarted { id, block, .. } => {
172 if let Some(transfer) = transfers.get_mut(&id) {
173 transfer
174 .block_transfers
175 .insert(block, BlockTransferState { transferred: 0 });
176 }
177 }
178 TransferEvent::BlockProgress {
179 id,
180 block,
181 transferred,
182 } => {
183 if let Some(transfer) = transfers.get_mut(&id)
184 && let Some(block) = transfer.block_transfers.get_mut(&block)
185 {
186 transfer.transferred += transferred - block.transferred;
187 block.transferred = transferred;
188 transfer.bar.pb_set_position(transfer.transferred);
189 }
190 }
191 TransferEvent::BlockCompleted { id, block, failed } => {
192 if let Some(transfer) = transfers.get_mut(&id)
193 && let Some(block) = transfer.block_transfers.get_mut(&block)
194 {
195 if failed {
196 transfer.transferred -= block.transferred;
197 }
198
199 transfer.bar.pb_set_position(transfer.transferred);
200 }
201 }
202 TransferEvent::TransferCompleted { id, failed } => {
203 if let Some(transfer) = transfers.remove(&id)
204 && !failed
205 {
206 stats.files += 1;
207 stats.bytes += transfer.transferred;
208 }
209 }
210 },
211 Ok(_) => continue,
212 Err(RecvError::Closed) => break,
213 Err(RecvError::Lagged(_)) => {
214 transfers.clear();
216
217 let bar = warn_span!("progress");
219 bar.pb_set_style(
220 &ProgressStyle::with_template(
221 "{spinner:.cyan/blue} transfer progress is unavailable due to missed events",
222 )
223 .unwrap(),
224 );
225 bar.pb_start();
226
227 indeterminate = Some(bar);
228 }
229 }
230 }
231 }
232
233 if indeterminate.is_none() {
234 Some(stats)
235 } else {
236 None
237 }
238}