ralph/queue/operations/batch/generate.rs
1//! Batch task generation operations (clone and split).
2//!
3//! Responsibilities:
4//! - Batch clone multiple tasks with new IDs.
5//! - Batch split multiple tasks into child tasks.
6//!
7//! Does not handle:
8//! - Task deletion (see delete.rs).
9//! - Task filtering/selection (see filters.rs).
10//! - Task field updates (see update.rs).
11//!
12//! Assumptions/invariants:
13//! - Clone operations can source from active or done queues.
14//! - Split operations only work on tasks in the active queue.
15//! - Both operations support atomic rollback on failure.
16
17use crate::contracts::{QueueFile, TaskStatus};
18use crate::queue;
19use crate::queue::operations::{CloneTaskOptions, SplitTaskOptions, suggest_new_task_insert_index};
20use anyhow::{Result, bail};
21
22use super::{BatchOperationResult, BatchResultCollector, preprocess_batch_ids};
23
24/// Batch clone multiple tasks.
25///
26/// # Arguments
27/// * `queue` - The active queue to insert cloned tasks into
28/// * `done` - Optional done queue to search for source tasks
29/// * `task_ids` - List of task IDs to clone
30/// * `status` - Status for cloned tasks
31/// * `title_prefix` - Optional prefix for cloned task titles
32/// * `now_rfc3339` - Current timestamp for created_at/updated_at
33/// * `id_prefix` - Task ID prefix
34/// * `id_width` - Task ID numeric width
35/// * `max_dependency_depth` - Max dependency depth for validation
36/// * `continue_on_error` - If true, continue processing on individual failures
37///
38/// # Returns
39/// A `BatchOperationResult` with details of successes and failures, including created task IDs.
40#[allow(clippy::too_many_arguments)]
41pub fn batch_clone_tasks(
42 queue: &mut QueueFile,
43 done: Option<&QueueFile>,
44 task_ids: &[String],
45 status: TaskStatus,
46 title_prefix: Option<&str>,
47 now_rfc3339: &str,
48 id_prefix: &str,
49 id_width: usize,
50 max_dependency_depth: u8,
51 continue_on_error: bool,
52) -> Result<BatchOperationResult> {
53 let unique_ids = preprocess_batch_ids(task_ids, "clone")?;
54
55 // In atomic mode, validate all source tasks exist first
56 if !continue_on_error {
57 for task_id in &unique_ids {
58 let exists_in_active = queue.tasks.iter().any(|t| t.id == *task_id);
59 let exists_in_done = done.is_some_and(|d| d.tasks.iter().any(|t| t.id == *task_id));
60 if !exists_in_active && !exists_in_done {
61 bail!(
62 "{}",
63 crate::error_messages::source_task_not_found(task_id, true)
64 );
65 }
66 }
67 }
68
69 // Create a working copy for atomic mode
70 let original_queue = if !continue_on_error {
71 Some(queue.clone())
72 } else {
73 None
74 };
75
76 // Place the collector inside the rollback scope for atomic mode
77 let mut collector = BatchResultCollector::new(unique_ids.len(), continue_on_error, "clone");
78
79 for task_id in &unique_ids {
80 let opts = CloneTaskOptions::new(task_id, status, now_rfc3339, id_prefix, id_width)
81 .with_title_prefix(title_prefix)
82 .with_max_depth(max_dependency_depth);
83
84 match queue::operations::clone_task(queue, done, &opts) {
85 Ok((new_id, cloned_task)) => {
86 // Insert the cloned task at a good position
87 let insert_idx = suggest_new_task_insert_index(queue);
88 queue.tasks.insert(insert_idx, cloned_task);
89
90 collector.record_success(task_id.clone(), vec![new_id]);
91 }
92 Err(e) => {
93 let error_msg = e.to_string();
94 if !continue_on_error {
95 // Rollback: restore original queue
96 if let Some(ref original) = original_queue {
97 *queue = original.clone();
98 }
99 // Use bail directly for the atomic mode error
100 bail!(
101 "Batch clone failed at task {}: {}. Use --continue-on-error to process remaining tasks.",
102 task_id,
103 error_msg
104 );
105 }
106 // In continue-on-error mode, record the failure
107 // Note: record_failure returns Ok(()) when continue_on_error=true
108 collector.record_failure(task_id.clone(), error_msg)?;
109 }
110 }
111 }
112
113 Ok(collector.finish())
114}
115
116/// Batch split multiple tasks into child tasks.
117///
118/// # Arguments
119/// * `queue` - The active queue to modify
120/// * `task_ids` - List of task IDs to split
121/// * `number` - Number of child tasks to create per source
122/// * `status` - Status for child tasks
123/// * `title_prefix` - Optional prefix for child task titles
124/// * `distribute_plan` - Whether to distribute plan items across children
125/// * `now_rfc3339` - Current timestamp for timestamps
126/// * `id_prefix` - Task ID prefix
127/// * `id_width` - Task ID numeric width
128/// * `max_dependency_depth` - Max dependency depth for validation
129/// * `continue_on_error` - If true, continue processing on individual failures
130///
131/// # Returns
132/// A `BatchOperationResult` with details of successes and failures.
133#[allow(clippy::too_many_arguments)]
134pub fn batch_split_tasks(
135 queue: &mut QueueFile,
136 task_ids: &[String],
137 number: usize,
138 status: TaskStatus,
139 title_prefix: Option<&str>,
140 distribute_plan: bool,
141 now_rfc3339: &str,
142 id_prefix: &str,
143 id_width: usize,
144 max_dependency_depth: u8,
145 continue_on_error: bool,
146) -> Result<BatchOperationResult> {
147 if number < 2 {
148 bail!("Number of child tasks must be at least 2");
149 }
150
151 let unique_ids = preprocess_batch_ids(task_ids, "split")?;
152
153 // In atomic mode, validate all source tasks exist first
154 if !continue_on_error {
155 for task_id in &unique_ids {
156 if !queue.tasks.iter().any(|t| t.id == *task_id) {
157 bail!(
158 "{}",
159 crate::error_messages::source_task_not_found(task_id, false)
160 );
161 }
162 }
163 }
164
165 // Create a working copy for atomic mode
166 let original_queue = if !continue_on_error {
167 Some(queue.clone())
168 } else {
169 None
170 };
171
172 // Place the collector inside the rollback scope for atomic mode
173 let mut collector = BatchResultCollector::new(unique_ids.len(), continue_on_error, "split");
174
175 for task_id in &unique_ids {
176 let opts = SplitTaskOptions::new(task_id, number, status, now_rfc3339, id_prefix, id_width)
177 .with_title_prefix(title_prefix)
178 .with_distribute_plan(distribute_plan)
179 .with_max_depth(max_dependency_depth);
180
181 match queue::operations::split_task(queue, None, &opts) {
182 Ok((updated_source, child_tasks)) => {
183 // Find source task position
184 if let Some(idx) = queue.tasks.iter().position(|t| t.id == *task_id) {
185 // Replace source with updated version
186 queue.tasks[idx] = updated_source;
187
188 // Insert children after the source
189 let child_ids: Vec<String> = child_tasks.iter().map(|t| t.id.clone()).collect();
190 for (i, child) in child_tasks.into_iter().enumerate() {
191 queue.tasks.insert(idx + 1 + i, child);
192 }
193
194 collector.record_success(task_id.clone(), child_ids);
195 } else {
196 // This shouldn't happen since we validated above
197 let err_msg = "Source task disappeared during split".to_string();
198 if !continue_on_error {
199 if let Some(ref original) = original_queue {
200 *queue = original.clone();
201 }
202 bail!("{}", err_msg);
203 }
204 // In continue-on-error mode, record the failure
205 // Note: record_failure returns Ok(()) when continue_on_error=true
206 collector.record_failure(task_id.clone(), err_msg)?;
207 }
208 }
209 Err(e) => {
210 let error_msg = e.to_string();
211 if !continue_on_error {
212 if let Some(ref original) = original_queue {
213 *queue = original.clone();
214 }
215 bail!(
216 "Batch split failed at task {}: {}. Use --continue-on-error to process remaining tasks.",
217 task_id,
218 error_msg
219 );
220 }
221 // In continue-on-error mode, record the failure
222 // Note: record_failure returns Ok(()) when continue_on_error=true
223 collector.record_failure(task_id.clone(), error_msg)?;
224 }
225 }
226 }
227
228 Ok(collector.finish())
229}