Skip to main content

ralph/queue/operations/batch/
generate.rs

1//! Batch task generation operations (clone and split).
2//!
3//! Responsibilities:
4//! - Batch clone multiple tasks with new IDs.
5//! - Batch split multiple tasks into child tasks.
6//!
7//! Does not handle:
8//! - Task deletion (see delete.rs).
9//! - Task filtering/selection (see filters.rs).
10//! - Task field updates (see update.rs).
11//!
12//! Assumptions/invariants:
13//! - Clone operations can source from active or done queues.
14//! - Split operations only work on tasks in the active queue.
15//! - Both operations support atomic rollback on failure.
16
17use crate::contracts::{QueueFile, TaskStatus};
18use crate::queue;
19use crate::queue::operations::{CloneTaskOptions, SplitTaskOptions, suggest_new_task_insert_index};
20use anyhow::{Result, bail};
21
22use super::{BatchOperationResult, BatchResultCollector, preprocess_batch_ids};
23
24/// Batch clone multiple tasks.
25///
26/// # Arguments
27/// * `queue` - The active queue to insert cloned tasks into
28/// * `done` - Optional done queue to search for source tasks
29/// * `task_ids` - List of task IDs to clone
30/// * `status` - Status for cloned tasks
31/// * `title_prefix` - Optional prefix for cloned task titles
32/// * `now_rfc3339` - Current timestamp for created_at/updated_at
33/// * `id_prefix` - Task ID prefix
34/// * `id_width` - Task ID numeric width
35/// * `max_dependency_depth` - Max dependency depth for validation
36/// * `continue_on_error` - If true, continue processing on individual failures
37///
38/// # Returns
39/// A `BatchOperationResult` with details of successes and failures, including created task IDs.
40#[allow(clippy::too_many_arguments)]
41pub fn batch_clone_tasks(
42    queue: &mut QueueFile,
43    done: Option<&QueueFile>,
44    task_ids: &[String],
45    status: TaskStatus,
46    title_prefix: Option<&str>,
47    now_rfc3339: &str,
48    id_prefix: &str,
49    id_width: usize,
50    max_dependency_depth: u8,
51    continue_on_error: bool,
52) -> Result<BatchOperationResult> {
53    let unique_ids = preprocess_batch_ids(task_ids, "clone")?;
54
55    // In atomic mode, validate all source tasks exist first
56    if !continue_on_error {
57        for task_id in &unique_ids {
58            let exists_in_active = queue.tasks.iter().any(|t| t.id == *task_id);
59            let exists_in_done = done.is_some_and(|d| d.tasks.iter().any(|t| t.id == *task_id));
60            if !exists_in_active && !exists_in_done {
61                bail!(
62                    "{}",
63                    crate::error_messages::source_task_not_found(task_id, true)
64                );
65            }
66        }
67    }
68
69    // Create a working copy for atomic mode
70    let original_queue = if !continue_on_error {
71        Some(queue.clone())
72    } else {
73        None
74    };
75
76    // Place the collector inside the rollback scope for atomic mode
77    let mut collector = BatchResultCollector::new(unique_ids.len(), continue_on_error, "clone");
78
79    for task_id in &unique_ids {
80        let opts = CloneTaskOptions::new(task_id, status, now_rfc3339, id_prefix, id_width)
81            .with_title_prefix(title_prefix)
82            .with_max_depth(max_dependency_depth);
83
84        match queue::operations::clone_task(queue, done, &opts) {
85            Ok((new_id, cloned_task)) => {
86                // Insert the cloned task at a good position
87                let insert_idx = suggest_new_task_insert_index(queue);
88                queue.tasks.insert(insert_idx, cloned_task);
89
90                collector.record_success(task_id.clone(), vec![new_id]);
91            }
92            Err(e) => {
93                let error_msg = e.to_string();
94                if !continue_on_error {
95                    // Rollback: restore original queue
96                    if let Some(ref original) = original_queue {
97                        *queue = original.clone();
98                    }
99                    // Use bail directly for the atomic mode error
100                    bail!(
101                        "Batch clone failed at task {}: {}. Use --continue-on-error to process remaining tasks.",
102                        task_id,
103                        error_msg
104                    );
105                }
106                // In continue-on-error mode, record the failure
107                // Note: record_failure returns Ok(()) when continue_on_error=true
108                collector.record_failure(task_id.clone(), error_msg)?;
109            }
110        }
111    }
112
113    Ok(collector.finish())
114}
115
116/// Batch split multiple tasks into child tasks.
117///
118/// # Arguments
119/// * `queue` - The active queue to modify
120/// * `task_ids` - List of task IDs to split
121/// * `number` - Number of child tasks to create per source
122/// * `status` - Status for child tasks
123/// * `title_prefix` - Optional prefix for child task titles
124/// * `distribute_plan` - Whether to distribute plan items across children
125/// * `now_rfc3339` - Current timestamp for timestamps
126/// * `id_prefix` - Task ID prefix
127/// * `id_width` - Task ID numeric width
128/// * `max_dependency_depth` - Max dependency depth for validation
129/// * `continue_on_error` - If true, continue processing on individual failures
130///
131/// # Returns
132/// A `BatchOperationResult` with details of successes and failures.
133#[allow(clippy::too_many_arguments)]
134pub fn batch_split_tasks(
135    queue: &mut QueueFile,
136    task_ids: &[String],
137    number: usize,
138    status: TaskStatus,
139    title_prefix: Option<&str>,
140    distribute_plan: bool,
141    now_rfc3339: &str,
142    id_prefix: &str,
143    id_width: usize,
144    max_dependency_depth: u8,
145    continue_on_error: bool,
146) -> Result<BatchOperationResult> {
147    if number < 2 {
148        bail!("Number of child tasks must be at least 2");
149    }
150
151    let unique_ids = preprocess_batch_ids(task_ids, "split")?;
152
153    // In atomic mode, validate all source tasks exist first
154    if !continue_on_error {
155        for task_id in &unique_ids {
156            if !queue.tasks.iter().any(|t| t.id == *task_id) {
157                bail!(
158                    "{}",
159                    crate::error_messages::source_task_not_found(task_id, false)
160                );
161            }
162        }
163    }
164
165    // Create a working copy for atomic mode
166    let original_queue = if !continue_on_error {
167        Some(queue.clone())
168    } else {
169        None
170    };
171
172    // Place the collector inside the rollback scope for atomic mode
173    let mut collector = BatchResultCollector::new(unique_ids.len(), continue_on_error, "split");
174
175    for task_id in &unique_ids {
176        let opts = SplitTaskOptions::new(task_id, number, status, now_rfc3339, id_prefix, id_width)
177            .with_title_prefix(title_prefix)
178            .with_distribute_plan(distribute_plan)
179            .with_max_depth(max_dependency_depth);
180
181        match queue::operations::split_task(queue, None, &opts) {
182            Ok((updated_source, child_tasks)) => {
183                // Find source task position
184                if let Some(idx) = queue.tasks.iter().position(|t| t.id == *task_id) {
185                    // Replace source with updated version
186                    queue.tasks[idx] = updated_source;
187
188                    // Insert children after the source
189                    let child_ids: Vec<String> = child_tasks.iter().map(|t| t.id.clone()).collect();
190                    for (i, child) in child_tasks.into_iter().enumerate() {
191                        queue.tasks.insert(idx + 1 + i, child);
192                    }
193
194                    collector.record_success(task_id.clone(), child_ids);
195                } else {
196                    // This shouldn't happen since we validated above
197                    let err_msg = "Source task disappeared during split".to_string();
198                    if !continue_on_error {
199                        if let Some(ref original) = original_queue {
200                            *queue = original.clone();
201                        }
202                        bail!("{}", err_msg);
203                    }
204                    // In continue-on-error mode, record the failure
205                    // Note: record_failure returns Ok(()) when continue_on_error=true
206                    collector.record_failure(task_id.clone(), err_msg)?;
207                }
208            }
209            Err(e) => {
210                let error_msg = e.to_string();
211                if !continue_on_error {
212                    if let Some(ref original) = original_queue {
213                        *queue = original.clone();
214                    }
215                    bail!(
216                        "Batch split failed at task {}: {}. Use --continue-on-error to process remaining tasks.",
217                        task_id,
218                        error_msg
219                    );
220                }
221                // In continue-on-error mode, record the failure
222                // Note: record_failure returns Ok(()) when continue_on_error=true
223                collector.record_failure(task_id.clone(), error_msg)?;
224            }
225        }
226    }
227
228    Ok(collector.finish())
229}