use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[derive(Debug)]
#[non_exhaustive]
pub enum TransferError {
Cancelled,
Network(std::io::Error),
Rpc {
code: i32,
name: String,
},
FloodWait { seconds: u64 },
Checkpoint(std::io::Error),
Other(crate::InvocationError),
}
impl fmt::Display for TransferError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Cancelled => write!(f, "transfer cancelled by caller"),
Self::Network(e) => write!(f, "network error: {e}"),
Self::Rpc { code, name } => write!(f, "Telegram error ({code}): {name}"),
Self::FloodWait { seconds } => {
write!(
f,
"Telegram rate limit reached. Retry after {seconds} seconds."
)
}
Self::Checkpoint(e) => write!(f, "checkpoint I/O error: {e}"),
Self::Other(e) => write!(f, "{e}"),
}
}
}
impl std::error::Error for TransferError {}
impl From<TransferError> for crate::InvocationError {
fn from(e: TransferError) -> Self {
match e {
TransferError::Cancelled => {
crate::InvocationError::Deserialize("transfer cancelled by caller".into())
}
TransferError::Network(io) => crate::InvocationError::Io(io),
TransferError::Checkpoint(io) => crate::InvocationError::Io(io),
TransferError::Rpc { code, name } => crate::InvocationError::Rpc(crate::RpcError {
code,
name,
value: None,
}),
TransferError::FloodWait { seconds } => crate::InvocationError::Rpc(crate::RpcError {
code: 420,
name: format!("FLOOD_WAIT_{seconds}"),
value: Some(seconds as u32),
}),
TransferError::Other(e) => e,
}
}
}
impl From<crate::InvocationError> for TransferError {
fn from(e: crate::InvocationError) -> Self {
match &e {
crate::InvocationError::Io(_) => {
if let crate::InvocationError::Io(io) = e {
return TransferError::Network(io);
}
unreachable!()
}
crate::InvocationError::Dropped => TransferError::Network(std::io::Error::new(
std::io::ErrorKind::ConnectionReset,
"connection dropped",
)),
crate::InvocationError::Rpc(rpc) => {
if rpc.code == 420 {
return TransferError::FloodWait {
seconds: rpc.value.unwrap_or(0) as u64,
};
}
TransferError::Rpc {
code: rpc.code,
name: rpc.name.clone(),
}
}
crate::InvocationError::Deserialize(s) if s.contains("cancel") => {
TransferError::Cancelled
}
_ => TransferError::Other(e),
}
}
}
#[derive(Clone, Debug)]
pub struct TransferHandle {
inner: Arc<TransferState>,
}
#[derive(Debug)]
struct TransferState {
paused: AtomicBool,
cancelled: AtomicBool,
bytes_done: AtomicU64,
total: AtomicU64,
start_ms: AtomicU64, }
impl TransferHandle {
pub fn new() -> Self {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
inner: Arc::new(TransferState {
paused: AtomicBool::new(false),
cancelled: AtomicBool::new(false),
bytes_done: AtomicU64::new(0),
total: AtomicU64::new(0),
start_ms: AtomicU64::new(now),
}),
}
}
pub fn pause(&self) {
self.inner.paused.store(true, Ordering::Release);
}
pub fn resume(&self) {
self.inner.paused.store(false, Ordering::Release);
}
pub fn cancel(&self) {
self.inner.cancelled.store(true, Ordering::Release);
}
pub fn is_paused(&self) -> bool {
self.inner.paused.load(Ordering::Acquire)
}
pub fn is_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::Acquire)
}
pub fn progress(&self) -> TransferProgress {
let done = self.inner.bytes_done.load(Ordering::Relaxed);
let total = self.inner.total.load(Ordering::Relaxed);
let start_ms = self.inner.start_ms.load(Ordering::Relaxed);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let elapsed_ms = now_ms.saturating_sub(start_ms).max(1);
TransferProgress {
done,
total,
elapsed_ms,
}
}
pub(crate) fn set_total(&self, total: u64) {
self.inner.total.store(total, Ordering::Relaxed);
}
pub(crate) fn add_bytes(&self, n: u64) {
self.inner.bytes_done.fetch_add(n, Ordering::Relaxed);
}
pub(crate) fn reset_start(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
self.inner.start_ms.store(now, Ordering::Relaxed);
}
pub async fn poll_pause_cancel(&self) -> Result<(), TransferError> {
loop {
if self.is_cancelled() {
tracing::debug!(target: "ferogram::transfer", "transfer cancelled by caller");
return Err(TransferError::Cancelled);
}
if !self.is_paused() {
return Ok(());
}
tracing::trace!(target: "ferogram::transfer", "transfer paused, waiting");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
impl Default for TransferHandle {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy)]
pub struct TransferProgress {
pub done: u64,
pub total: u64,
pub elapsed_ms: u64,
}
impl TransferProgress {
pub fn percent(&self) -> f64 {
if self.total == 0 {
return 0.0;
}
(self.done as f64 / self.total as f64 * 100.0).min(100.0)
}
pub fn speed_bps(&self) -> u64 {
let elapsed_s = self.elapsed_ms.max(1) as f64 / 1000.0;
(self.done as f64 / elapsed_s) as u64
}
pub fn eta_secs(&self) -> u64 {
if self.total == 0 || self.done >= self.total {
return 0;
}
let remaining = self.total - self.done;
let speed = self.speed_bps().max(1);
remaining / speed
}
pub fn speed_human(&self) -> String {
let bps = self.speed_bps();
if bps >= 1024 * 1024 {
format!("{:.1} MB/s", bps as f64 / (1024.0 * 1024.0))
} else if bps >= 1024 {
format!("{:.1} KB/s", bps as f64 / 1024.0)
} else {
format!("{bps} B/s")
}
}
pub fn bytes_human(&self) -> String {
format!("{} / {}", fmt_bytes(self.done), fmt_bytes(self.total))
}
}
fn fmt_bytes(b: u64) -> String {
if b >= 1024 * 1024 * 1024 {
format!("{:.1} GB", b as f64 / (1024.0 * 1024.0 * 1024.0))
} else if b >= 1024 * 1024 {
format!("{:.1} MB", b as f64 / (1024.0 * 1024.0))
} else if b >= 1024 {
format!("{:.1} KB", b as f64 / 1024.0)
} else {
format!("{b} B")
}
}