use std::sync::Arc;
use actionqueue_core::ids::TaskId;
use actionqueue_core::task::task_spec::TaskSpec;
use actionqueue_executor_local::handler::TaskSubmissionPort;
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct TaskSubmission {
task_spec: TaskSpec,
dependencies: Vec<TaskId>,
}
impl TaskSubmission {
pub fn new(task_spec: TaskSpec, dependencies: Vec<TaskId>) -> Self {
Self { task_spec, dependencies }
}
pub fn task_spec(&self) -> &TaskSpec {
&self.task_spec
}
pub fn dependencies(&self) -> &[TaskId] {
&self.dependencies
}
pub fn into_parts(self) -> (TaskSpec, Vec<TaskId>) {
(self.task_spec, self.dependencies)
}
}
pub struct SubmissionChannel {
tx: mpsc::UnboundedSender<TaskSubmission>,
}
impl TaskSubmissionPort for SubmissionChannel {
fn submit(&self, task_spec: TaskSpec, dependencies: Vec<TaskId>) {
let task_id = task_spec.id();
if let Err(_e) = self.tx.send(TaskSubmission::new(task_spec, dependencies)) {
tracing::warn!(
%task_id,
"submission dropped: dispatch loop receiver closed"
);
}
}
}
pub struct SubmissionReceiver(mpsc::UnboundedReceiver<TaskSubmission>);
impl SubmissionReceiver {
pub fn try_recv(&mut self) -> Option<TaskSubmission> {
match self.0.try_recv() {
Ok(submission) => Some(submission),
Err(mpsc::error::TryRecvError::Empty) => None,
Err(mpsc::error::TryRecvError::Disconnected) => {
tracing::debug!("submission channel disconnected");
None
}
}
}
}
pub fn submission_channel() -> (Arc<SubmissionChannel>, SubmissionReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
(Arc::new(SubmissionChannel { tx }), SubmissionReceiver(rx))
}