use crate::file_transfer_record::FileTransferRecord;
use crate::inner::group_state::RecordEntry;
use crate::inner::scheduler_state::SchedulerState;
use crate::inner::task_callbacks::ProgressCb;
use crate::transfer_status::TransferStatus;
use std::panic::{catch_unwind, AssertUnwindSafe};
pub(crate) fn invoke_progress_cb(cb: &ProgressCb, dto: FileTransferRecord) {
let ret = catch_unwind(AssertUnwindSafe(|| cb(dto)));
if ret.is_err() {
crate::meow_flow_log!(
"callback",
"progress callback panicked; panic suppressed to protect scheduler"
);
}
}
pub(crate) fn emit_global_progress(state: &SchedulerState, dto: FileTransferRecord) {
let listeners: Vec<ProgressCb> = match state.global_progress_listener().read() {
Ok(g) => g.iter().map(|(_, cb)| cb.clone()).collect(),
Err(_) => {
crate::meow_flow_log!(
"emit_global_progress",
"global listener lock poisoned; skip progress broadcast"
);
return;
}
};
crate::meow_flow_log!(
"emit_global_progress",
"broadcast start: listener_count={} task_id={:?}",
listeners.len(),
dto.task_id()
);
for cb in listeners {
invoke_progress_cb(&cb, dto.clone());
}
}
pub(crate) fn emit_status(
state: &SchedulerState,
entry: &RecordEntry,
status: TransferStatus,
transferred: u64,
total: u64,
) {
crate::meow_flow_log!(
"emit_status",
"status emit start: task_id={:?} status={:?} transferred={} total={}",
entry.inner().task_id(),
status,
transferred,
total
);
let inner = entry.inner();
let dto = FileTransferRecord::new(
inner.task_id(),
inner.file_sign().to_string(),
inner.file_name().to_string(),
total,
if total == 0 {
0.0
} else {
transferred as f32 / total as f32
},
status,
inner.direction(),
);
if let Some(cb) = &entry.callbacks().progress_cb() {
invoke_progress_cb(cb, dto.clone());
}
emit_global_progress(state, dto);
}