use crate::client::ProgressCallback;
use crate::ProgressEvent;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct IntervalReport {
pub stream_id: usize,
pub interval_start: Duration,
pub interval_end: Duration,
pub bytes: u64,
pub bits_per_second: f64,
pub packets: Option<u64>,
pub jitter_ms: Option<f64>,
pub lost_packets: Option<u64>,
pub lost_percent: Option<f64>,
pub retransmits: Option<u64>,
pub cwnd: Option<u64>,
}
#[derive(Debug, Clone)]
pub enum IntervalMessage {
Report(IntervalReport),
Complete,
}
#[derive(Clone)]
pub struct IntervalReporter {
sender: mpsc::UnboundedSender<IntervalMessage>,
}
impl IntervalReporter {
pub fn new() -> (Self, mpsc::UnboundedReceiver<IntervalMessage>) {
let (sender, receiver) = mpsc::unbounded_channel();
(Self { sender }, receiver)
}
pub fn report(&self, report: IntervalReport) {
let _ = self.sender.send(IntervalMessage::Report(report));
}
pub fn complete(&self) {
let _ = self.sender.send(IntervalMessage::Complete);
}
}
pub async fn run_reporter_task(
mut receiver: mpsc::UnboundedReceiver<IntervalMessage>,
json_mode: bool,
callback: Option<Arc<dyn ProgressCallback>>,
) {
while let Some(msg) = receiver.recv().await {
match msg {
IntervalMessage::Report(report) => {
if let Some(ref cb) = callback {
let event = ProgressEvent::IntervalUpdate {
interval_start: report.interval_start,
interval_end: report.interval_end,
bytes: report.bytes,
bits_per_second: report.bits_per_second,
packets: report.packets,
jitter_ms: report.jitter_ms,
lost_packets: report.lost_packets,
lost_percent: report.lost_percent,
retransmits: report.retransmits,
};
cb.on_progress(event);
}
if !json_mode {
format_interval_output(&report);
}
}
IntervalMessage::Complete => {
break;
}
}
}
}
fn format_interval_output(report: &IntervalReport) {
let (transfer_val, transfer_unit) = if report.bytes >= 1_000_000_000 {
(report.bytes as f64 / 1_000_000_000.0, "GBytes")
} else if report.bytes >= 1_000_000 {
(report.bytes as f64 / 1_000_000.0, "MBytes")
} else {
(report.bytes as f64 / 1_000.0, "KBytes")
};
let (bitrate_val, bitrate_unit) = if report.bits_per_second >= 1_000_000_000.0 {
(report.bits_per_second / 1_000_000_000.0, "Gbits/sec")
} else {
(report.bits_per_second / 1_000_000.0, "Mbits/sec")
};
if let Some(packets) = report.packets {
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:6}",
report.stream_id,
report.interval_start.as_secs_f64(),
report.interval_end.as_secs_f64(),
transfer_val,
transfer_unit,
bitrate_val,
bitrate_unit,
packets
);
} else if let (Some(retr), Some(cwnd_val)) = (report.retransmits, report.cwnd) {
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:4} {:5} KBytes",
report.stream_id,
report.interval_start.as_secs_f64(),
report.interval_end.as_secs_f64(),
transfer_val,
transfer_unit,
bitrate_val,
bitrate_unit,
retr,
cwnd_val / 1024
);
} else if let Some(retr) = report.retransmits {
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10} {:4}",
report.stream_id,
report.interval_start.as_secs_f64(),
report.interval_end.as_secs_f64(),
transfer_val,
transfer_unit,
bitrate_val,
bitrate_unit,
retr
);
} else {
println!(
"[{:3}] {:4.2}-{:4.2} sec {:6.2} {:>7} {:6.1} {:>10}",
report.stream_id,
report.interval_start.as_secs_f64(),
report.interval_end.as_secs_f64(),
transfer_val,
transfer_unit,
bitrate_val,
bitrate_unit
);
}
}