Skip to main content

ralph/queue/operations/
transaction.rs

1//! Transaction-style task mutation helpers.
2//!
3//! Responsibilities:
4//! - Define structured task-mutation requests that can apply multiple field edits atomically.
5//! - Enforce optimistic-lock checks against `updated_at` when requested by callers.
6//! - Reuse existing edit primitives while providing all-or-nothing mutation semantics.
7//!
8//! Does not handle:
9//! - Queue persistence or lock acquisition.
10//! - CLI argument parsing or JSON IO.
11//! - Terminal archive moves across queue/done files.
12//!
13//! Invariants/assumptions:
14//! - Requests target tasks in the active queue only.
15//! - Atomic requests leave the caller's queue untouched when any mutation fails.
16//! - `expected_updated_at` compares canonical RFC3339 instants, not source formatting.
17
18use crate::contracts::QueueFile;
19use crate::queue::TaskEditKey;
20use anyhow::{Context, Result, anyhow, bail};
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub struct TaskMutationRequest {
25    #[serde(default = "task_mutation_request_version")]
26    pub version: u8,
27    #[serde(default = "task_mutation_request_atomic_default")]
28    pub atomic: bool,
29    #[serde(default)]
30    pub tasks: Vec<TaskMutationSpec>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
34pub struct TaskMutationSpec {
35    pub task_id: String,
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub expected_updated_at: Option<String>,
38    #[serde(default)]
39    pub edits: Vec<TaskFieldEdit>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct TaskFieldEdit {
44    pub field: String,
45    #[serde(default)]
46    pub value: String,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct TaskMutationReport {
51    #[serde(default = "task_mutation_request_version")]
52    pub version: u8,
53    pub atomic: bool,
54    pub tasks: Vec<TaskMutationTaskReport>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
58pub struct TaskMutationTaskReport {
59    pub task_id: String,
60    pub applied_edits: usize,
61}
62
63#[derive(Debug, thiserror::Error)]
64pub enum TaskMutationError {
65    #[error("Task mutation request must include at least one task.")]
66    EmptyRequest,
67    #[error("Task mutation for {task_id} must include at least one edit.")]
68    EmptyTaskEdits { task_id: String },
69    #[error(
70        "Task mutation conflict for {task_id}: expected updated_at {expected}, found {actual}."
71    )]
72    OptimisticConflict {
73        task_id: String,
74        expected: String,
75        actual: String,
76    },
77    #[error(
78        "Task mutation conflict for {task_id}: expected updated_at {expected}, but the task has no updated_at."
79    )]
80    MissingActualTimestamp { task_id: String, expected: String },
81}
82
83const fn task_mutation_request_version() -> u8 {
84    1
85}
86
87const fn task_mutation_request_atomic_default() -> bool {
88    true
89}
90
91#[allow(clippy::too_many_arguments)]
92pub fn apply_task_mutation_request(
93    queue: &mut QueueFile,
94    done: Option<&QueueFile>,
95    request: &TaskMutationRequest,
96    now_rfc3339: &str,
97    id_prefix: &str,
98    id_width: usize,
99    max_dependency_depth: u8,
100) -> Result<TaskMutationReport> {
101    if request.tasks.is_empty() {
102        return Err(TaskMutationError::EmptyRequest.into());
103    }
104
105    if request.atomic {
106        let mut working = queue.clone();
107        let report = apply_request_into_queue(
108            &mut working,
109            done,
110            request,
111            now_rfc3339,
112            id_prefix,
113            id_width,
114            max_dependency_depth,
115        )?;
116        *queue = working;
117        return Ok(report);
118    }
119
120    apply_request_into_queue(
121        queue,
122        done,
123        request,
124        now_rfc3339,
125        id_prefix,
126        id_width,
127        max_dependency_depth,
128    )
129}
130
131#[allow(clippy::too_many_arguments)]
132fn apply_request_into_queue(
133    queue: &mut QueueFile,
134    done: Option<&QueueFile>,
135    request: &TaskMutationRequest,
136    now_rfc3339: &str,
137    id_prefix: &str,
138    id_width: usize,
139    max_dependency_depth: u8,
140) -> Result<TaskMutationReport> {
141    let mut reports = Vec::with_capacity(request.tasks.len());
142
143    for task in &request.tasks {
144        if task.edits.is_empty() {
145            return Err(TaskMutationError::EmptyTaskEdits {
146                task_id: task.task_id.trim().to_string(),
147            }
148            .into());
149        }
150
151        ensure_expected_updated_at(queue, task)?;
152
153        for edit in &task.edits {
154            let key = edit.field.parse::<TaskEditKey>().with_context(|| {
155                format!(
156                    "Invalid task mutation field '{}' for task {}",
157                    edit.field, task.task_id
158                )
159            })?;
160            super::edit::apply_task_edit(
161                queue,
162                done,
163                &task.task_id,
164                key,
165                &edit.value,
166                now_rfc3339,
167                id_prefix,
168                id_width,
169                max_dependency_depth,
170            )?;
171        }
172
173        reports.push(TaskMutationTaskReport {
174            task_id: task.task_id.trim().to_string(),
175            applied_edits: task.edits.len(),
176        });
177    }
178
179    Ok(TaskMutationReport {
180        version: task_mutation_request_version(),
181        atomic: request.atomic,
182        tasks: reports,
183    })
184}
185
186fn ensure_expected_updated_at(queue: &QueueFile, task: &TaskMutationSpec) -> Result<()> {
187    let Some(expected) = task.expected_updated_at.as_ref() else {
188        return Ok(());
189    };
190
191    let task_id = task.task_id.trim();
192    if task_id.is_empty() {
193        bail!("Task mutation is missing task_id.");
194    }
195
196    let current = queue
197        .tasks
198        .iter()
199        .find(|candidate| candidate.id.trim() == task_id)
200        .ok_or_else(|| anyhow!("{}", crate::error_messages::task_not_found(task_id)))?;
201
202    let expected_trimmed = expected.trim();
203    let expected_dt = crate::timeutil::parse_rfc3339(expected_trimmed)
204        .with_context(|| format!("parse expected updated_at for task {}", task_id))?;
205
206    match current.updated_at.as_deref().map(str::trim) {
207        Some(actual)
208            if crate::timeutil::parse_rfc3339(actual)
209                .map(|actual_dt| actual_dt == expected_dt)
210                .unwrap_or(false) =>
211        {
212            Ok(())
213        }
214        Some(actual) => Err(TaskMutationError::OptimisticConflict {
215            task_id: task_id.to_string(),
216            expected: expected_trimmed.to_string(),
217            actual: actual.to_string(),
218        }
219        .into()),
220        None => Err(TaskMutationError::MissingActualTimestamp {
221            task_id: task_id.to_string(),
222            expected: expected_trimmed.to_string(),
223        }
224        .into()),
225    }
226}