use crate::file_transfer_record::FileTransferRecord;
use crate::ids::TaskId;
use crate::inner::group_state::RecordEntry;
use crate::inner::scheduler_state::SchedulerState;
use crate::inner::task_callbacks::{CompleteCb, ProgressCb};
use crate::transfer_status::TransferStatus;
pub(crate) fn invoke_progress_cb(state: &SchedulerState, cb: &ProgressCb, dto: FileTransferRecord) {
let Some(submit) = state.cb_submit() else {
crate::meow_flow_log!(
"callback",
"progress callback skipped: dispatcher already taken (closing)"
);
return;
};
submit.submit_progress(cb.clone(), dto);
}
pub(crate) fn invoke_complete_cb(
state: &SchedulerState,
cb: &CompleteCb,
task_id: TaskId,
payload: Option<String>,
) {
let Some(submit) = state.cb_submit() else {
crate::meow_flow_log!(
"callback",
"complete callback skipped: dispatcher already taken (closing)"
);
return;
};
submit.submit_complete(cb.clone(), task_id, payload);
}
pub(crate) fn emit_global_progress(state: &SchedulerState, dto: FileTransferRecord) {
let listeners: Vec<ProgressCb> = match state.global_progress_listener().read() {
Ok(g) => g.iter().map(|(_, cb)| cb.clone()).collect(),
Err(_) => {
crate::meow_flow_log!(
"emit_global_progress",
"global listener lock poisoned; skip progress broadcast"
);
return;
}
};
crate::meow_flow_log!(
"emit_global_progress",
"broadcast start: listener_count={} task_id={:?}",
listeners.len(),
dto.task_id()
);
for cb in listeners {
invoke_progress_cb(state, &cb, dto.clone());
}
}
pub(crate) fn emit_status(
state: &SchedulerState,
entry: &RecordEntry,
status: TransferStatus,
transferred: u64,
total: u64,
) {
crate::meow_flow_log!(
"emit_status",
"status emit start: task_id={:?} status={:?} transferred={} total={}",
entry.inner().task_id(),
status,
transferred,
total
);
let inner = entry.inner();
let dto = FileTransferRecord::new(
inner.task_id(),
inner.file_sign_arc(),
inner.file_name_arc(),
total,
if total == 0 {
0.0
} else {
transferred as f32 / total as f32
},
status,
inner.direction(),
);
if let Some(cb) = &entry.callbacks().progress_cb() {
invoke_progress_cb(state, cb, dto.clone());
}
emit_global_progress(state, dto);
}