use crate::transfer_status::TransferStatus;
use crate::inner::scheduler_state::SchedulerState;
use crate::inner::worker_event::WorkerEvent;
pub(crate) async fn handle_worker_event(event: WorkerEvent, state: &mut SchedulerState) {
match event {
WorkerEvent::Progress {
key,
next_offset,
total_size,
} => {
crate::meow_flow_log!(
"worker_event",
"progress: key={:?} next_offset={} total_size={}",
key,
next_offset,
total_size
);
if state.groups().contains_key(&key) {
state.offsets_mut().insert(key.clone(), next_offset);
if let Some(group) = state.groups().get(&key) {
crate::inner::exec_impl::emit::emit_status(
state,
group.entry(),
TransferStatus::Transmission,
next_offset,
total_size,
);
}
}
}
WorkerEvent::Completed {
key,
total_size,
completion_payload,
} => {
crate::meow_flow_log!(
"worker_event",
"completed: key={:?} total_size={}",
key,
total_size
);
state.active_mut().remove(&key);
state.paused_set_mut().remove(&key);
state.offsets_mut().insert(key.clone(), total_size);
if let Some(group) = state.groups_mut().remove(&key) {
state
.task_id_to_dedupe_mut()
.remove(&group.leader_inner().task_id());
let task_id = group.entry().inner().task_id();
crate::inner::exec_impl::emit::emit_status(
state,
group.entry(),
TransferStatus::Complete,
total_size,
total_size,
);
if let Some(cb) = group.entry().callbacks().complete_cb() {
crate::inner::exec_impl::emit::invoke_complete_cb(
state,
cb,
task_id,
completion_payload,
);
}
}
state.offsets_mut().remove(&key);
}
WorkerEvent::Failed { key, error } => {
crate::meow_flow_log!("worker_event", "failed: key={:?} error={}", key, error);
state.active_mut().remove(&key);
state.paused_set_mut().remove(&key);
if let Some(group) = state.groups_mut().remove(&key) {
state
.task_id_to_dedupe_mut()
.remove(&group.leader_inner().task_id());
let current = state.offsets().get(&key).copied().unwrap_or(0);
crate::inner::exec_impl::emit::emit_status(
state,
group.entry(),
TransferStatus::Failed(error),
current,
group.entry().inner().total_size(),
);
}
state.offsets_mut().remove(&key);
}
WorkerEvent::Canceled { key } => {
crate::meow_flow_log!("worker_event", "canceled: key={:?}", key);
state.active_mut().remove(&key);
if state.paused_set().contains(&key) {
crate::meow_flow_log!(
"worker_event",
"canceled from pause flow; keep group for resume: key={:?}",
key
);
return;
}
if let Some(group) = state.groups_mut().remove(&key) {
state
.task_id_to_dedupe_mut()
.remove(&group.leader_inner().task_id());
let current = state.offsets().get(&key).copied().unwrap_or(0);
crate::inner::exec_impl::emit::emit_status(
state,
group.entry(),
TransferStatus::Canceled,
current,
group.entry().inner().total_size(),
);
}
state.offsets_mut().remove(&key);
}
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, RwLock};
use super::handle_worker_event;
use crate::dflt::default_http_transfer::default_breakpoint_arcs;
use crate::http_breakpoint::BreakpointDownloadHttpConfig;
use crate::inner::cb_dispatcher;
use crate::inner::group_state::{GroupState, RecordEntry};
use crate::inner::inner_task::InnerTask;
use crate::inner::scheduler_state::SchedulerState;
use crate::inner::task_callbacks::TaskCallbacks;
use crate::inner::worker_event::WorkerEvent;
use crate::inner::UniqueId;
use crate::up_pounce_builder::UploadPounceBuilder;
async fn live_upload_state() -> (SchedulerState, UniqueId, u64) {
let mut path = std::env::temp_dir();
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock")
.as_nanos();
path.push(format!("rusty_cat_offsets_teardown_{ts}.bin"));
std::fs::write(&path, vec![7u8; 4096]).expect("write fixture");
let pounce = UploadPounceBuilder::new("teardown.bin", &path, 1024)
.with_url("https://placeholder/upload")
.build()
.expect("build pounce");
let (def_up, def_down) = default_breakpoint_arcs();
let inner = InnerTask::from_pounce(
pounce,
BreakpointDownloadHttpConfig::default(),
None,
def_up,
def_down,
)
.await
.expect("from_pounce");
let _ = std::fs::remove_file(&path);
let key = inner.dedupe_key();
let total = inner.total_size();
let (cb_submit, cb_join) = cb_dispatcher::start().expect("start dispatcher");
std::mem::forget(cb_join);
let mut state = SchedulerState::new(1, 1, Arc::new(RwLock::new(Vec::new())), cb_submit);
state
.task_id_to_dedupe_mut()
.insert(inner.task_id(), key.clone());
state.offsets_mut().insert(key.clone(), 0);
let entry = RecordEntry::new(inner.clone(), TaskCallbacks::empty());
state
.groups_mut()
.insert(key.clone(), GroupState::new(inner.clone(), entry));
(state, key, total)
}
#[tokio::test]
async fn stray_progress_after_complete_creates_no_orphan_offset() {
let (mut state, key, total) = live_upload_state().await;
assert!(state.groups().contains_key(&key));
handle_worker_event(
WorkerEvent::Completed {
key: key.clone(),
total_size: total,
completion_payload: None,
},
&mut state,
)
.await;
assert!(
!state.groups().contains_key(&key),
"group must be removed after Completed"
);
assert!(
state.offsets().get(&key).is_none(),
"offsets entry must be cleared after Completed"
);
handle_worker_event(
WorkerEvent::Progress {
key: key.clone(),
next_offset: 512,
total_size: total,
},
&mut state,
)
.await;
assert!(
state.offsets().get(&key).is_none(),
"stray Progress after teardown must not resurrect an orphan offsets entry"
);
}
#[tokio::test]
async fn progress_for_live_group_persists_offset() {
let (mut state, key, total) = live_upload_state().await;
handle_worker_event(
WorkerEvent::Progress {
key: key.clone(),
next_offset: 1024,
total_size: total,
},
&mut state,
)
.await;
assert_eq!(
state.offsets().get(&key).copied(),
Some(1024),
"progress for a live group must persist its contiguous offset"
);
}
}