actionqueue_workflow/submission.rs
1//! Validated dynamic task creation from handlers.
2//!
3//! Provides [`SubmissionChannel`] — a typed mpsc channel through which handlers
4//! can propose new tasks. The dispatch loop owns all mutations: proposals flow
5//! through the channel and are validated through the mutation authority before
6//! being WAL-appended. Handlers cannot bypass the mutation authority.
7//!
8//! # Invariant preservation
9//!
10//! This design preserves ActionQueue Invariant 5: "External extensions cannot mutate
11//! persisted state directly." Handlers propose via [`SubmissionChannel::submit`];
12//! the dispatch loop validates and commits.
13//!
14//! # Submission semantics
15//!
16//! Submissions are fire-and-forget from the handler's perspective:
17//! - If the channel is closed (dispatch loop shut down), the submission is
18//! silently dropped — handlers have no error path for this case.
19//! - The dispatch loop processes submissions on the next tick, not immediately.
20//! - Handlers must not assume submissions are visible before they return.
21
22use std::sync::Arc;
23
24use actionqueue_core::ids::TaskId;
25use actionqueue_core::task::task_spec::TaskSpec;
26use actionqueue_executor_local::handler::TaskSubmissionPort;
27use tokio::sync::mpsc;
28
29/// A proposed task submission from a handler.
30///
31/// Created by [`SubmissionChannel::submit`] and consumed by the dispatch loop.
32/// Fields are private to preserve the validated-construction pattern used
33/// throughout ActionQueue; use [`into_parts`][TaskSubmission::into_parts] to
34/// destructure for processing.
35#[derive(Debug)]
36pub struct TaskSubmission {
37 /// The task specification to create. Should have `parent_task_id` set
38 /// (via [`TaskSpec::with_parent`]) to associate this child with the
39 /// Coordinator's task in the hierarchy.
40 task_spec: TaskSpec,
41 /// Prerequisite task IDs that must reach terminal success before this
42 /// task's runs are promoted to Ready. Empty means no dependencies.
43 dependencies: Vec<TaskId>,
44}
45
46impl TaskSubmission {
47 /// Creates a new task submission.
48 pub fn new(task_spec: TaskSpec, dependencies: Vec<TaskId>) -> Self {
49 Self { task_spec, dependencies }
50 }
51
52 /// Returns a reference to the task specification.
53 pub fn task_spec(&self) -> &TaskSpec {
54 &self.task_spec
55 }
56
57 /// Returns the dependency list.
58 pub fn dependencies(&self) -> &[TaskId] {
59 &self.dependencies
60 }
61
62 /// Consumes the submission, returning its parts.
63 pub fn into_parts(self) -> (TaskSpec, Vec<TaskId>) {
64 (self.task_spec, self.dependencies)
65 }
66}
67
68/// Concrete submission channel backed by a tokio mpsc channel.
69///
70/// Created by the dispatch loop and cloned (cheaply via `Arc`) into each
71/// `ExecutorContext`. Implements [`TaskSubmissionPort`] so it can be passed
72/// as `Arc<dyn TaskSubmissionPort>` without coupling executor-local to tokio.
73pub struct SubmissionChannel {
74 tx: mpsc::UnboundedSender<TaskSubmission>,
75}
76
77impl TaskSubmissionPort for SubmissionChannel {
78 fn submit(&self, task_spec: TaskSpec, dependencies: Vec<TaskId>) {
79 let task_id = task_spec.id();
80 if let Err(_e) = self.tx.send(TaskSubmission::new(task_spec, dependencies)) {
81 tracing::warn!(
82 %task_id,
83 "submission dropped: dispatch loop receiver closed"
84 );
85 }
86 }
87}
88
89/// Receiving end of the submission channel (held by the dispatch loop).
90pub struct SubmissionReceiver(mpsc::UnboundedReceiver<TaskSubmission>);
91
92impl SubmissionReceiver {
93 /// Attempts to receive a pending submission without blocking.
94 ///
95 /// Returns `Some` if a submission is pending, `None` if the queue is
96 /// empty or the sender side has disconnected.
97 pub fn try_recv(&mut self) -> Option<TaskSubmission> {
98 match self.0.try_recv() {
99 Ok(submission) => Some(submission),
100 Err(mpsc::error::TryRecvError::Empty) => None,
101 Err(mpsc::error::TryRecvError::Disconnected) => {
102 tracing::debug!("submission channel disconnected");
103 None
104 }
105 }
106 }
107}
108
109/// Creates a matched pair of [`SubmissionChannel`] (handler side) and
110/// [`SubmissionReceiver`] (dispatch loop side).
111pub fn submission_channel() -> (Arc<SubmissionChannel>, SubmissionReceiver) {
112 let (tx, rx) = mpsc::unbounded_channel();
113 (Arc::new(SubmissionChannel { tx }), SubmissionReceiver(rx))
114}