Skip to main content

actionqueue_workflow/
submission.rs

1//! Validated dynamic task creation from handlers.
2//!
3//! Provides [`SubmissionChannel`] — a typed mpsc channel through which handlers
4//! can propose new tasks. The dispatch loop owns all mutations: proposals flow
5//! through the channel and are validated through the mutation authority before
6//! being WAL-appended. Handlers cannot bypass the mutation authority.
7//!
8//! # Invariant preservation
9//!
10//! This design preserves ActionQueue Invariant 5: "External extensions cannot mutate
11//! persisted state directly." Handlers propose via [`SubmissionChannel::submit`];
12//! the dispatch loop validates and commits.
13//!
14//! # Submission semantics
15//!
16//! Submissions are fire-and-forget from the handler's perspective:
17//! - If the channel is closed (dispatch loop shut down), the submission is
18//!   silently dropped — handlers have no error path for this case.
19//! - The dispatch loop processes submissions on the next tick, not immediately.
20//! - Handlers must not assume submissions are visible before they return.
21
22use std::sync::Arc;
23
24use actionqueue_core::ids::TaskId;
25use actionqueue_core::task::task_spec::TaskSpec;
26use actionqueue_executor_local::handler::TaskSubmissionPort;
27use tokio::sync::mpsc;
28
29/// A proposed task submission from a handler.
30///
31/// Created by [`SubmissionChannel::submit`] and consumed by the dispatch loop.
32/// Fields are private to preserve the validated-construction pattern used
33/// throughout ActionQueue; use [`into_parts`][TaskSubmission::into_parts] to
34/// destructure for processing.
35#[derive(Debug)]
36pub struct TaskSubmission {
37    /// The task specification to create. Should have `parent_task_id` set
38    /// (via [`TaskSpec::with_parent`]) to associate this child with the
39    /// Coordinator's task in the hierarchy.
40    task_spec: TaskSpec,
41    /// Prerequisite task IDs that must reach terminal success before this
42    /// task's runs are promoted to Ready. Empty means no dependencies.
43    dependencies: Vec<TaskId>,
44}
45
46impl TaskSubmission {
47    /// Creates a new task submission.
48    pub fn new(task_spec: TaskSpec, dependencies: Vec<TaskId>) -> Self {
49        Self { task_spec, dependencies }
50    }
51
52    /// Returns a reference to the task specification.
53    pub fn task_spec(&self) -> &TaskSpec {
54        &self.task_spec
55    }
56
57    /// Returns the dependency list.
58    pub fn dependencies(&self) -> &[TaskId] {
59        &self.dependencies
60    }
61
62    /// Consumes the submission, returning its parts.
63    pub fn into_parts(self) -> (TaskSpec, Vec<TaskId>) {
64        (self.task_spec, self.dependencies)
65    }
66}
67
68/// Concrete submission channel backed by a tokio mpsc channel.
69///
70/// Created by the dispatch loop and cloned (cheaply via `Arc`) into each
71/// `ExecutorContext`. Implements [`TaskSubmissionPort`] so it can be passed
72/// as `Arc<dyn TaskSubmissionPort>` without coupling executor-local to tokio.
73pub struct SubmissionChannel {
74    tx: mpsc::UnboundedSender<TaskSubmission>,
75}
76
77impl TaskSubmissionPort for SubmissionChannel {
78    fn submit(&self, task_spec: TaskSpec, dependencies: Vec<TaskId>) {
79        let task_id = task_spec.id();
80        if let Err(_e) = self.tx.send(TaskSubmission::new(task_spec, dependencies)) {
81            tracing::warn!(
82                %task_id,
83                "submission dropped: dispatch loop receiver closed"
84            );
85        }
86    }
87}
88
89/// Receiving end of the submission channel (held by the dispatch loop).
90pub struct SubmissionReceiver(mpsc::UnboundedReceiver<TaskSubmission>);
91
92impl SubmissionReceiver {
93    /// Attempts to receive a pending submission without blocking.
94    ///
95    /// Returns `Some` if a submission is pending, `None` if the queue is
96    /// empty or the sender side has disconnected.
97    pub fn try_recv(&mut self) -> Option<TaskSubmission> {
98        match self.0.try_recv() {
99            Ok(submission) => Some(submission),
100            Err(mpsc::error::TryRecvError::Empty) => None,
101            Err(mpsc::error::TryRecvError::Disconnected) => {
102                tracing::debug!("submission channel disconnected");
103                None
104            }
105        }
106    }
107}
108
109/// Creates a matched pair of [`SubmissionChannel`] (handler side) and
110/// [`SubmissionReceiver`] (dispatch loop side).
111pub fn submission_channel() -> (Arc<SubmissionChannel>, SubmissionReceiver) {
112    let (tx, rx) = mpsc::unbounded_channel();
113    (Arc::new(SubmissionChannel { tx }), SubmissionReceiver(rx))
114}