use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
use std::thread::JoinHandle;
use crate::error::{InnerErrorCode, MeowError};
use crate::file_transfer_record::FileTransferRecord;
use crate::ids::TaskId;
use crate::inner::task_callbacks::{CompleteCb, ProgressCb};
use crate::transfer_status::TransferStatus;
pub(crate) const CALLBACK_QUEUE_CAPACITY: usize = 2048;
pub(crate) enum CbJob {
Progress {
cb: ProgressCb,
dto: FileTransferRecord,
},
Complete {
cb: CompleteCb,
task_id: TaskId,
payload: Option<String>,
},
}
#[derive(Clone)]
pub(crate) struct CbSubmit {
tx: SyncSender<CbJob>,
}
impl CbSubmit {
pub(crate) fn submit_progress(&self, cb: ProgressCb, dto: FileTransferRecord) {
let is_sampling_frame = matches!(dto.status(), TransferStatus::Transmission);
if is_sampling_frame {
match self.tx.try_send(CbJob::Progress { cb, dto }) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
crate::meow_flow_log!(
"cb_dispatcher",
"transmission frame dropped: callback queue full"
);
}
Err(TrySendError::Disconnected(_)) => {
crate::meow_flow_log!(
"cb_dispatcher",
"transmission frame dropped: dispatcher already shut down"
);
}
}
return;
}
if let Err(e) = self.tx.send(CbJob::Progress { cb, dto }) {
crate::meow_flow_log!(
"cb_dispatcher",
"terminal progress frame undelivered: dispatcher gone ({e})"
);
}
}
pub(crate) fn submit_complete(&self, cb: CompleteCb, task_id: TaskId, payload: Option<String>) {
if let Err(e) = self.tx.send(CbJob::Complete {
cb,
task_id,
payload,
}) {
crate::meow_flow_log!(
"cb_dispatcher",
"complete callback undelivered: dispatcher gone ({e})"
);
}
}
}
pub(crate) struct CbDispatcherJoin {
handle: Option<JoinHandle<()>>,
}
impl CbDispatcherJoin {
pub(crate) fn join(&mut self) {
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
impl Drop for CbDispatcherJoin {
fn drop(&mut self) {
self.join();
}
}
pub(crate) fn start() -> Result<(CbSubmit, CbDispatcherJoin), MeowError> {
let (tx, rx) = sync_channel::<CbJob>(CALLBACK_QUEUE_CAPACITY);
let handle = std::thread::Builder::new()
.name("rusty-cat-cb".into())
.spawn(move || {
while let Ok(job) = rx.recv() {
match job {
CbJob::Progress { cb, dto } => {
let ret = catch_unwind(AssertUnwindSafe(|| cb(dto)));
if ret.is_err() {
crate::meow_flow_log!(
"cb_dispatcher",
"progress callback panicked; isolated in dispatcher thread"
);
}
}
CbJob::Complete {
cb,
task_id,
payload,
} => {
let ret = catch_unwind(AssertUnwindSafe(|| cb(task_id, payload)));
if ret.is_err() {
crate::meow_flow_log!(
"cb_dispatcher",
"complete callback panicked; isolated in dispatcher thread"
);
}
}
}
}
})
.map_err(|e| {
MeowError::from_source(
InnerErrorCode::RuntimeCreationFailedError,
"spawn rusty-cat callback dispatcher thread failed",
e,
)
})?;
Ok((
CbSubmit { tx },
CbDispatcherJoin {
handle: Some(handle),
},
))
}