1use crate::contracts::QueueFile;
19use crate::queue::TaskEditKey;
20use anyhow::{Context, Result, anyhow, bail};
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub struct TaskMutationRequest {
25 #[serde(default = "task_mutation_request_version")]
26 pub version: u8,
27 #[serde(default = "task_mutation_request_atomic_default")]
28 pub atomic: bool,
29 #[serde(default)]
30 pub tasks: Vec<TaskMutationSpec>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
34pub struct TaskMutationSpec {
35 pub task_id: String,
36 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub expected_updated_at: Option<String>,
38 #[serde(default)]
39 pub edits: Vec<TaskFieldEdit>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct TaskFieldEdit {
44 pub field: String,
45 #[serde(default)]
46 pub value: String,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct TaskMutationReport {
51 #[serde(default = "task_mutation_request_version")]
52 pub version: u8,
53 pub atomic: bool,
54 pub tasks: Vec<TaskMutationTaskReport>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
58pub struct TaskMutationTaskReport {
59 pub task_id: String,
60 pub applied_edits: usize,
61}
62
63#[derive(Debug, thiserror::Error)]
64pub enum TaskMutationError {
65 #[error("Task mutation request must include at least one task.")]
66 EmptyRequest,
67 #[error("Task mutation for {task_id} must include at least one edit.")]
68 EmptyTaskEdits { task_id: String },
69 #[error(
70 "Task mutation conflict for {task_id}: expected updated_at {expected}, found {actual}."
71 )]
72 OptimisticConflict {
73 task_id: String,
74 expected: String,
75 actual: String,
76 },
77 #[error(
78 "Task mutation conflict for {task_id}: expected updated_at {expected}, but the task has no updated_at."
79 )]
80 MissingActualTimestamp { task_id: String, expected: String },
81}
82
83const fn task_mutation_request_version() -> u8 {
84 1
85}
86
87const fn task_mutation_request_atomic_default() -> bool {
88 true
89}
90
91#[allow(clippy::too_many_arguments)]
92pub fn apply_task_mutation_request(
93 queue: &mut QueueFile,
94 done: Option<&QueueFile>,
95 request: &TaskMutationRequest,
96 now_rfc3339: &str,
97 id_prefix: &str,
98 id_width: usize,
99 max_dependency_depth: u8,
100) -> Result<TaskMutationReport> {
101 if request.tasks.is_empty() {
102 return Err(TaskMutationError::EmptyRequest.into());
103 }
104
105 if request.atomic {
106 let mut working = queue.clone();
107 let report = apply_request_into_queue(
108 &mut working,
109 done,
110 request,
111 now_rfc3339,
112 id_prefix,
113 id_width,
114 max_dependency_depth,
115 )?;
116 *queue = working;
117 return Ok(report);
118 }
119
120 apply_request_into_queue(
121 queue,
122 done,
123 request,
124 now_rfc3339,
125 id_prefix,
126 id_width,
127 max_dependency_depth,
128 )
129}
130
131#[allow(clippy::too_many_arguments)]
132fn apply_request_into_queue(
133 queue: &mut QueueFile,
134 done: Option<&QueueFile>,
135 request: &TaskMutationRequest,
136 now_rfc3339: &str,
137 id_prefix: &str,
138 id_width: usize,
139 max_dependency_depth: u8,
140) -> Result<TaskMutationReport> {
141 let mut reports = Vec::with_capacity(request.tasks.len());
142
143 for task in &request.tasks {
144 if task.edits.is_empty() {
145 return Err(TaskMutationError::EmptyTaskEdits {
146 task_id: task.task_id.trim().to_string(),
147 }
148 .into());
149 }
150
151 ensure_expected_updated_at(queue, task)?;
152
153 for edit in &task.edits {
154 let key = edit.field.parse::<TaskEditKey>().with_context(|| {
155 format!(
156 "Invalid task mutation field '{}' for task {}",
157 edit.field, task.task_id
158 )
159 })?;
160 super::edit::apply_task_edit(
161 queue,
162 done,
163 &task.task_id,
164 key,
165 &edit.value,
166 now_rfc3339,
167 id_prefix,
168 id_width,
169 max_dependency_depth,
170 )?;
171 }
172
173 reports.push(TaskMutationTaskReport {
174 task_id: task.task_id.trim().to_string(),
175 applied_edits: task.edits.len(),
176 });
177 }
178
179 Ok(TaskMutationReport {
180 version: task_mutation_request_version(),
181 atomic: request.atomic,
182 tasks: reports,
183 })
184}
185
186fn ensure_expected_updated_at(queue: &QueueFile, task: &TaskMutationSpec) -> Result<()> {
187 let Some(expected) = task.expected_updated_at.as_ref() else {
188 return Ok(());
189 };
190
191 let task_id = task.task_id.trim();
192 if task_id.is_empty() {
193 bail!("Task mutation is missing task_id.");
194 }
195
196 let current = queue
197 .tasks
198 .iter()
199 .find(|candidate| candidate.id.trim() == task_id)
200 .ok_or_else(|| anyhow!("{}", crate::error_messages::task_not_found(task_id)))?;
201
202 let expected_trimmed = expected.trim();
203 let expected_dt = crate::timeutil::parse_rfc3339(expected_trimmed)
204 .with_context(|| format!("parse expected updated_at for task {}", task_id))?;
205
206 match current.updated_at.as_deref().map(str::trim) {
207 Some(actual)
208 if crate::timeutil::parse_rfc3339(actual)
209 .map(|actual_dt| actual_dt == expected_dt)
210 .unwrap_or(false) =>
211 {
212 Ok(())
213 }
214 Some(actual) => Err(TaskMutationError::OptimisticConflict {
215 task_id: task_id.to_string(),
216 expected: expected_trimmed.to_string(),
217 actual: actual.to_string(),
218 }
219 .into()),
220 None => Err(TaskMutationError::MissingActualTimestamp {
221 task_id: task_id.to_string(),
222 expected: expected_trimmed.to_string(),
223 }
224 .into()),
225 }
226}