1use std::path::{Path, PathBuf};
4use std::str::FromStr;
5
6use actionqueue_core::ids::TaskId;
7use actionqueue_core::mutation::{
8 DurabilityPolicy, MutationAuthority, MutationCommand, RunCreateCommand, TaskCreateCommand,
9};
10use actionqueue_core::task::constraints::TaskConstraints;
11use actionqueue_core::task::metadata::TaskMetadata;
12use actionqueue_core::task::run_policy::RunPolicy;
13use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
14use serde_json::json;
15
16use crate::args::SubmitArgs;
17use crate::cmd::{now_unix_seconds, resolve_data_dir, CliError, CommandOutput};
18
19pub fn run(args: SubmitArgs) -> Result<CommandOutput, CliError> {
21 let data_dir = resolve_data_dir(args.data_dir.as_deref());
22 let task_id = TaskId::from_str(&args.task_id).map_err(|error| {
23 CliError::validation(
24 "invalid_task_id",
25 format!("invalid task id '{}': {error}", args.task_id),
26 )
27 })?;
28
29 let run_policy = parse_run_policy(&args.run_policy)?;
30 let constraints = parse_constraints(args.constraints.as_deref())?;
31 let metadata = parse_metadata(args.metadata.as_deref())?;
32 let payload = load_payload(args.payload_path.as_deref())?;
33 let now = now_unix_seconds()?;
34
35 let task_payload = match args.content_type.clone() {
36 Some(ct) => TaskPayload::with_content_type(payload, ct),
37 None => TaskPayload::new(payload),
38 };
39
40 let task_spec = TaskSpec::new(task_id, task_payload, run_policy.clone(), constraints, metadata)
41 .map_err(|error| {
42 CliError::validation(
43 "task_spec_invalid",
44 format!("submit task spec failed validation: {error}"),
45 )
46 })?;
47
48 let recovery = actionqueue_storage::recovery::bootstrap::load_projection_from_storage(
49 &data_dir,
50 )
51 .map_err(|error| {
52 CliError::runtime(
53 "storage_bootstrap_failed",
54 format!("unable to load storage projection: {error}"),
55 )
56 })?;
57
58 let mut authority = actionqueue_storage::mutation::StorageMutationAuthority::new(
59 recovery.wal_writer,
60 recovery.projection,
61 );
62
63 submit_task_create(&mut authority, task_spec.clone(), now)?;
64
65 let clock = actionqueue_engine::time::clock::SystemClock;
66 let derivation =
67 actionqueue_engine::derive::derive_runs(&clock, task_id, task_spec.run_policy(), 0, now)
68 .map_err(|error| {
69 CliError::validation(
70 "run_derivation_failed",
71 format!("submit run derivation failed: {error}"),
72 )
73 })?;
74
75 let runs_created = derivation.derived().len();
76 for run in derivation.into_derived() {
77 submit_run_create(&mut authority, run)?;
78 }
79
80 let latest_sequence = authority.projection().latest_sequence();
81 if args.json {
82 return Ok(CommandOutput::Json(json!({
83 "command": "submit",
84 "task_id": task_id.to_string(),
85 "run_policy": format_run_policy(run_policy),
86 "runs_created": runs_created,
87 "latest_sequence": latest_sequence,
88 "data_dir": data_dir.display().to_string(),
89 })));
90 }
91
92 let lines = [
93 "command=submit".to_string(),
94 format!("task_id={task_id}"),
95 format!("run_policy={}", format_run_policy(run_policy)),
96 format!("runs_created={runs_created}"),
97 format!("latest_sequence={latest_sequence}"),
98 format!("data_dir={}", data_dir.display()),
99 ];
100 Ok(CommandOutput::Text(lines.join("\n")))
101}
102
103fn submit_task_create(
104 authority: &mut actionqueue_storage::mutation::StorageMutationAuthority<
105 actionqueue_storage::wal::InstrumentedWalWriter<
106 actionqueue_storage::wal::fs_writer::WalFsWriter,
107 >,
108 actionqueue_storage::recovery::reducer::ReplayReducer,
109 >,
110 task_spec: TaskSpec,
111 timestamp: u64,
112) -> Result<(), CliError> {
113 let sequence = next_sequence(authority)?;
114 let command =
115 MutationCommand::TaskCreate(TaskCreateCommand::new(sequence, task_spec, timestamp));
116 authority
117 .submit_command(command, DurabilityPolicy::Immediate)
118 .map_err(map_authority_error)
119 .map(|_| ())
120}
121
122fn submit_run_create(
123 authority: &mut actionqueue_storage::mutation::StorageMutationAuthority<
124 actionqueue_storage::wal::InstrumentedWalWriter<
125 actionqueue_storage::wal::fs_writer::WalFsWriter,
126 >,
127 actionqueue_storage::recovery::reducer::ReplayReducer,
128 >,
129 run_instance: actionqueue_core::run::RunInstance,
130) -> Result<(), CliError> {
131 let sequence = next_sequence(authority)?;
132 let command = MutationCommand::RunCreate(RunCreateCommand::new(sequence, run_instance));
133 authority
134 .submit_command(command, DurabilityPolicy::Immediate)
135 .map_err(map_authority_error)
136 .map(|_| ())
137}
138
139fn next_sequence(
140 authority: &actionqueue_storage::mutation::StorageMutationAuthority<
141 actionqueue_storage::wal::InstrumentedWalWriter<
142 actionqueue_storage::wal::fs_writer::WalFsWriter,
143 >,
144 actionqueue_storage::recovery::reducer::ReplayReducer,
145 >,
146) -> Result<u64, CliError> {
147 authority
148 .projection()
149 .latest_sequence()
150 .checked_add(1)
151 .ok_or_else(|| CliError::runtime("sequence_overflow", "next WAL sequence overflowed u64"))
152}
153
154fn map_authority_error(
155 error: actionqueue_storage::mutation::MutationAuthorityError<
156 actionqueue_storage::recovery::reducer::ReplayReducerError,
157 >,
158) -> CliError {
159 match error {
160 actionqueue_storage::mutation::MutationAuthorityError::Validation(validation) => {
161 CliError::validation("mutation_validation_failed", validation.to_string())
162 }
163 actionqueue_storage::mutation::MutationAuthorityError::Append(append) => {
164 CliError::runtime("wal_append_failed", append.to_string())
165 }
166 actionqueue_storage::mutation::MutationAuthorityError::PartialDurability {
167 sequence,
168 flush_error,
169 } => CliError::runtime(
170 "wal_partial_durability",
171 format!("append succeeded at sequence {sequence} but flush failed: {flush_error}"),
172 ),
173 actionqueue_storage::mutation::MutationAuthorityError::Apply { sequence, source } => {
174 CliError::runtime(
175 "projection_apply_failed",
176 format!(
177 "projection apply failed after durable append sequence {sequence}: {source}"
178 ),
179 )
180 }
181 }
182}
183
184fn parse_run_policy(raw: &str) -> Result<RunPolicy, CliError> {
185 if raw.eq_ignore_ascii_case("once") {
186 return Ok(RunPolicy::Once);
187 }
188
189 #[cfg(feature = "workflow")]
191 {
192 let parts: Vec<&str> = raw.splitn(2, ':').collect();
193 if parts.len() == 2 && parts[0].eq_ignore_ascii_case("cron") {
194 return RunPolicy::cron(parts[1])
195 .map_err(|error| CliError::validation("invalid_run_policy", error.to_string()));
196 }
197 }
198
199 let parts: Vec<&str> = raw.split(':').collect();
201 if parts.len() == 3 && parts[0].eq_ignore_ascii_case("repeat") {
202 let count = parts[1].parse::<u32>().map_err(|error| {
203 CliError::validation(
204 "invalid_run_policy",
205 format!("invalid repeat count '{}': {error}", parts[1]),
206 )
207 })?;
208 let interval_secs = parts[2].parse::<u64>().map_err(|error| {
209 CliError::validation(
210 "invalid_run_policy",
211 format!("invalid repeat interval '{}': {error}", parts[2]),
212 )
213 })?;
214 return RunPolicy::repeat(count, interval_secs)
215 .map_err(|error| CliError::validation("invalid_run_policy", error.to_string()));
216 }
217
218 Err(CliError::validation(
219 "invalid_run_policy",
220 format!(
221 "unsupported run policy '{raw}', expected 'once', 'repeat:N:SECONDS'{}",
222 if cfg!(feature = "workflow") { ", or 'cron:EXPRESSION'" } else { "" }
223 ),
224 ))
225}
226
227fn format_run_policy(policy: RunPolicy) -> String {
228 match policy {
229 RunPolicy::Once => "once".to_string(),
230 RunPolicy::Repeat(ref rp) => {
231 format!("repeat:{}:{}", rp.count(), rp.interval_secs())
232 }
233 #[cfg(feature = "workflow")]
234 RunPolicy::Cron(ref cp) => {
235 format!("cron:{}", cp.expression())
236 }
237 }
238}
239
240fn parse_constraints(raw: Option<&str>) -> Result<TaskConstraints, CliError> {
241 match raw {
242 None => Ok(TaskConstraints::default()),
243 Some(source) => {
244 let json = read_inline_or_file(source)?;
245 serde_json::from_str::<TaskConstraints>(&json).map_err(|error| {
246 CliError::validation(
247 "invalid_constraints_json",
248 format!("failed to parse constraints JSON: {error}"),
249 )
250 })
251 }
252 }
253}
254
255fn parse_metadata(raw: Option<&str>) -> Result<TaskMetadata, CliError> {
256 match raw {
257 None => Ok(TaskMetadata::default()),
258 Some(source) => {
259 let json = read_inline_or_file(source)?;
260 serde_json::from_str::<TaskMetadata>(&json).map_err(|error| {
261 CliError::validation(
262 "invalid_metadata_json",
263 format!("failed to parse metadata JSON: {error}"),
264 )
265 })
266 }
267 }
268}
269
270fn load_payload(payload_path: Option<&Path>) -> Result<Vec<u8>, CliError> {
271 match payload_path {
272 None => Ok(Vec::new()),
273 Some(path) => std::fs::read(path).map_err(|error| {
274 CliError::validation(
275 "payload_read_failed",
276 format!("unable to read payload '{}': {error}", path.display()),
277 )
278 }),
279 }
280}
281
282fn read_inline_or_file(raw: &str) -> Result<String, CliError> {
283 if let Some(stripped) = raw.strip_prefix('@') {
284 let path = PathBuf::from(stripped);
285 return std::fs::read_to_string(&path).map_err(|error| {
286 CliError::validation(
287 "json_source_read_failed",
288 format!("unable to read JSON source '{}': {error}", path.display()),
289 )
290 });
291 }
292
293 Ok(raw.to_string())
294}