Skip to main content

actionqueue_cli/cmd/
submit.rs

1//! Submit command execution path.
2
3use std::path::{Path, PathBuf};
4use std::str::FromStr;
5
6use actionqueue_core::ids::TaskId;
7use actionqueue_core::mutation::{
8    DurabilityPolicy, MutationAuthority, MutationCommand, RunCreateCommand, TaskCreateCommand,
9};
10use actionqueue_core::task::constraints::TaskConstraints;
11use actionqueue_core::task::metadata::TaskMetadata;
12use actionqueue_core::task::run_policy::RunPolicy;
13use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
14use serde_json::json;
15
16use crate::args::SubmitArgs;
17use crate::cmd::{now_unix_seconds, resolve_data_dir, CliError, CommandOutput};
18
19/// Executes submit command flow.
20pub fn run(args: SubmitArgs) -> Result<CommandOutput, CliError> {
21    let data_dir = resolve_data_dir(args.data_dir.as_deref());
22    let task_id = TaskId::from_str(&args.task_id).map_err(|error| {
23        CliError::validation(
24            "invalid_task_id",
25            format!("invalid task id '{}': {error}", args.task_id),
26        )
27    })?;
28
29    let run_policy = parse_run_policy(&args.run_policy)?;
30    let constraints = parse_constraints(args.constraints.as_deref())?;
31    let metadata = parse_metadata(args.metadata.as_deref())?;
32    let payload = load_payload(args.payload_path.as_deref())?;
33    let now = now_unix_seconds()?;
34
35    let task_payload = match args.content_type.clone() {
36        Some(ct) => TaskPayload::with_content_type(payload, ct),
37        None => TaskPayload::new(payload),
38    };
39
40    let task_spec = TaskSpec::new(task_id, task_payload, run_policy.clone(), constraints, metadata)
41        .map_err(|error| {
42            CliError::validation(
43                "task_spec_invalid",
44                format!("submit task spec failed validation: {error}"),
45            )
46        })?;
47
48    let recovery = actionqueue_storage::recovery::bootstrap::load_projection_from_storage(
49        &data_dir,
50    )
51    .map_err(|error| {
52        CliError::runtime(
53            "storage_bootstrap_failed",
54            format!("unable to load storage projection: {error}"),
55        )
56    })?;
57
58    let mut authority = actionqueue_storage::mutation::StorageMutationAuthority::new(
59        recovery.wal_writer,
60        recovery.projection,
61    );
62
63    submit_task_create(&mut authority, task_spec.clone(), now)?;
64
65    let clock = actionqueue_engine::time::clock::SystemClock;
66    let derivation =
67        actionqueue_engine::derive::derive_runs(&clock, task_id, task_spec.run_policy(), 0, now)
68            .map_err(|error| {
69                CliError::validation(
70                    "run_derivation_failed",
71                    format!("submit run derivation failed: {error}"),
72                )
73            })?;
74
75    let runs_created = derivation.derived().len();
76    for run in derivation.into_derived() {
77        submit_run_create(&mut authority, run)?;
78    }
79
80    let latest_sequence = authority.projection().latest_sequence();
81    if args.json {
82        return Ok(CommandOutput::Json(json!({
83            "command": "submit",
84            "task_id": task_id.to_string(),
85            "run_policy": format_run_policy(run_policy),
86            "runs_created": runs_created,
87            "latest_sequence": latest_sequence,
88            "data_dir": data_dir.display().to_string(),
89        })));
90    }
91
92    let lines = [
93        "command=submit".to_string(),
94        format!("task_id={task_id}"),
95        format!("run_policy={}", format_run_policy(run_policy)),
96        format!("runs_created={runs_created}"),
97        format!("latest_sequence={latest_sequence}"),
98        format!("data_dir={}", data_dir.display()),
99    ];
100    Ok(CommandOutput::Text(lines.join("\n")))
101}
102
103fn submit_task_create(
104    authority: &mut actionqueue_storage::mutation::StorageMutationAuthority<
105        actionqueue_storage::wal::InstrumentedWalWriter<
106            actionqueue_storage::wal::fs_writer::WalFsWriter,
107        >,
108        actionqueue_storage::recovery::reducer::ReplayReducer,
109    >,
110    task_spec: TaskSpec,
111    timestamp: u64,
112) -> Result<(), CliError> {
113    let sequence = next_sequence(authority)?;
114    let command =
115        MutationCommand::TaskCreate(TaskCreateCommand::new(sequence, task_spec, timestamp));
116    authority
117        .submit_command(command, DurabilityPolicy::Immediate)
118        .map_err(map_authority_error)
119        .map(|_| ())
120}
121
122fn submit_run_create(
123    authority: &mut actionqueue_storage::mutation::StorageMutationAuthority<
124        actionqueue_storage::wal::InstrumentedWalWriter<
125            actionqueue_storage::wal::fs_writer::WalFsWriter,
126        >,
127        actionqueue_storage::recovery::reducer::ReplayReducer,
128    >,
129    run_instance: actionqueue_core::run::RunInstance,
130) -> Result<(), CliError> {
131    let sequence = next_sequence(authority)?;
132    let command = MutationCommand::RunCreate(RunCreateCommand::new(sequence, run_instance));
133    authority
134        .submit_command(command, DurabilityPolicy::Immediate)
135        .map_err(map_authority_error)
136        .map(|_| ())
137}
138
139fn next_sequence(
140    authority: &actionqueue_storage::mutation::StorageMutationAuthority<
141        actionqueue_storage::wal::InstrumentedWalWriter<
142            actionqueue_storage::wal::fs_writer::WalFsWriter,
143        >,
144        actionqueue_storage::recovery::reducer::ReplayReducer,
145    >,
146) -> Result<u64, CliError> {
147    authority
148        .projection()
149        .latest_sequence()
150        .checked_add(1)
151        .ok_or_else(|| CliError::runtime("sequence_overflow", "next WAL sequence overflowed u64"))
152}
153
154fn map_authority_error(
155    error: actionqueue_storage::mutation::MutationAuthorityError<
156        actionqueue_storage::recovery::reducer::ReplayReducerError,
157    >,
158) -> CliError {
159    match error {
160        actionqueue_storage::mutation::MutationAuthorityError::Validation(validation) => {
161            CliError::validation("mutation_validation_failed", validation.to_string())
162        }
163        actionqueue_storage::mutation::MutationAuthorityError::Append(append) => {
164            CliError::runtime("wal_append_failed", append.to_string())
165        }
166        actionqueue_storage::mutation::MutationAuthorityError::PartialDurability {
167            sequence,
168            flush_error,
169        } => CliError::runtime(
170            "wal_partial_durability",
171            format!("append succeeded at sequence {sequence} but flush failed: {flush_error}"),
172        ),
173        actionqueue_storage::mutation::MutationAuthorityError::Apply { sequence, source } => {
174            CliError::runtime(
175                "projection_apply_failed",
176                format!(
177                    "projection apply failed after durable append sequence {sequence}: {source}"
178                ),
179            )
180        }
181    }
182}
183
184fn parse_run_policy(raw: &str) -> Result<RunPolicy, CliError> {
185    if raw.eq_ignore_ascii_case("once") {
186        return Ok(RunPolicy::Once);
187    }
188
189    // Cron parsing: format is cron:EXPRESSION — split on first colon only.
190    #[cfg(feature = "workflow")]
191    {
192        let parts: Vec<&str> = raw.splitn(2, ':').collect();
193        if parts.len() == 2 && parts[0].eq_ignore_ascii_case("cron") {
194            return RunPolicy::cron(parts[1])
195                .map_err(|error| CliError::validation("invalid_run_policy", error.to_string()));
196        }
197    }
198
199    // Repeat: repeat:N:SECONDS
200    let parts: Vec<&str> = raw.split(':').collect();
201    if parts.len() == 3 && parts[0].eq_ignore_ascii_case("repeat") {
202        let count = parts[1].parse::<u32>().map_err(|error| {
203            CliError::validation(
204                "invalid_run_policy",
205                format!("invalid repeat count '{}': {error}", parts[1]),
206            )
207        })?;
208        let interval_secs = parts[2].parse::<u64>().map_err(|error| {
209            CliError::validation(
210                "invalid_run_policy",
211                format!("invalid repeat interval '{}': {error}", parts[2]),
212            )
213        })?;
214        return RunPolicy::repeat(count, interval_secs)
215            .map_err(|error| CliError::validation("invalid_run_policy", error.to_string()));
216    }
217
218    Err(CliError::validation(
219        "invalid_run_policy",
220        format!(
221            "unsupported run policy '{raw}', expected 'once', 'repeat:N:SECONDS'{}",
222            if cfg!(feature = "workflow") { ", or 'cron:EXPRESSION'" } else { "" }
223        ),
224    ))
225}
226
227fn format_run_policy(policy: RunPolicy) -> String {
228    match policy {
229        RunPolicy::Once => "once".to_string(),
230        RunPolicy::Repeat(ref rp) => {
231            format!("repeat:{}:{}", rp.count(), rp.interval_secs())
232        }
233        #[cfg(feature = "workflow")]
234        RunPolicy::Cron(ref cp) => {
235            format!("cron:{}", cp.expression())
236        }
237    }
238}
239
240fn parse_constraints(raw: Option<&str>) -> Result<TaskConstraints, CliError> {
241    match raw {
242        None => Ok(TaskConstraints::default()),
243        Some(source) => {
244            let json = read_inline_or_file(source)?;
245            serde_json::from_str::<TaskConstraints>(&json).map_err(|error| {
246                CliError::validation(
247                    "invalid_constraints_json",
248                    format!("failed to parse constraints JSON: {error}"),
249                )
250            })
251        }
252    }
253}
254
255fn parse_metadata(raw: Option<&str>) -> Result<TaskMetadata, CliError> {
256    match raw {
257        None => Ok(TaskMetadata::default()),
258        Some(source) => {
259            let json = read_inline_or_file(source)?;
260            serde_json::from_str::<TaskMetadata>(&json).map_err(|error| {
261                CliError::validation(
262                    "invalid_metadata_json",
263                    format!("failed to parse metadata JSON: {error}"),
264                )
265            })
266        }
267    }
268}
269
270fn load_payload(payload_path: Option<&Path>) -> Result<Vec<u8>, CliError> {
271    match payload_path {
272        None => Ok(Vec::new()),
273        Some(path) => std::fs::read(path).map_err(|error| {
274            CliError::validation(
275                "payload_read_failed",
276                format!("unable to read payload '{}': {error}", path.display()),
277            )
278        }),
279    }
280}
281
282fn read_inline_or_file(raw: &str) -> Result<String, CliError> {
283    if let Some(stripped) = raw.strip_prefix('@') {
284        let path = PathBuf::from(stripped);
285        return std::fs::read_to_string(&path).map_err(|error| {
286            CliError::validation(
287                "json_source_read_failed",
288                format!("unable to read JSON source '{}': {error}", path.display()),
289            )
290        });
291    }
292
293    Ok(raw.to_string())
294}