ralph/queue/operations/
archive.rs1use super::backfill_terminal_completed_at;
4use crate::contracts::{QueueFile, TaskStatus};
5use crate::queue::{load_queue, load_queue_or_default, save_queue, validation};
6use crate::timeutil;
7use anyhow::Result;
8use std::path::Path;
9
10#[derive(Debug, Clone)]
11pub struct ArchiveReport {
12 pub moved_ids: Vec<String>,
13}
14
15pub fn archive_terminal_tasks_older_than_days_in_memory(
21 active: &mut QueueFile,
22 done: &mut QueueFile,
23 now_rfc3339: &str,
24 after_days: u32,
25) -> Result<ArchiveReport> {
26 if after_days == 0 {
27 return archive_terminal_tasks_in_memory(active, done, now_rfc3339);
28 }
29
30 let now = timeutil::parse_rfc3339(now_rfc3339)?;
31 let cutoff = now - time::Duration::days(after_days as i64);
32
33 let mut moved_ids = Vec::new();
34 let mut remaining = Vec::with_capacity(active.tasks.len());
35
36 for mut task in active.tasks.drain(..) {
37 if matches!(task.status, TaskStatus::Done | TaskStatus::Rejected) {
38 let eligible = task
39 .completed_at
40 .as_deref()
41 .and_then(timeutil::parse_rfc3339_opt)
42 .filter(|dt| dt.offset() == time::UtcOffset::UTC)
43 .is_some_and(|dt| dt <= cutoff);
44
45 if eligible {
46 task.updated_at = Some(now_rfc3339.to_string());
47 moved_ids.push(task.id.trim().to_string());
48 done.tasks.push(task);
49 } else {
50 remaining.push(task);
51 }
52 } else {
53 remaining.push(task);
54 }
55 }
56
57 active.tasks = remaining;
58 Ok(ArchiveReport { moved_ids })
59}
60
61pub fn maybe_archive_terminal_tasks_in_memory(
66 active: &mut QueueFile,
67 done: &mut QueueFile,
68 now_rfc3339: &str,
69 after_days: Option<u32>,
70) -> Result<ArchiveReport> {
71 match after_days {
72 None => Ok(ArchiveReport {
73 moved_ids: Vec::new(),
74 }),
75 Some(days) => {
76 archive_terminal_tasks_older_than_days_in_memory(active, done, now_rfc3339, days)
77 }
78 }
79}
80
81pub fn archive_terminal_tasks_older_than_days(
87 queue_path: &Path,
88 done_path: &Path,
89 id_prefix: &str,
90 id_width: usize,
91 max_dependency_depth: u8,
92 after_days: u32,
93) -> Result<ArchiveReport> {
94 let mut active = load_queue(queue_path)?;
95 let mut done = load_queue_or_default(done_path)?;
96
97 let now = timeutil::now_utc_rfc3339()?;
98 let report =
99 archive_terminal_tasks_older_than_days_in_memory(&mut active, &mut done, &now, after_days)?;
100
101 let backfilled_done = backfill_terminal_completed_at(&mut done, &now) > 0;
103
104 let warnings = validation::validate_queue_set(
105 &active,
106 Some(&done),
107 id_prefix,
108 id_width,
109 max_dependency_depth,
110 )?;
111 validation::log_warnings(&warnings);
112
113 if report.moved_ids.is_empty() && !backfilled_done {
114 return Ok(report);
115 }
116
117 if report.moved_ids.is_empty() {
119 save_queue(done_path, &done)?;
120 return Ok(report);
121 }
122
123 save_queue(done_path, &done)?;
124 save_queue(queue_path, &active)?;
125 Ok(report)
126}
127
128pub fn archive_terminal_tasks_in_memory(
130 active: &mut QueueFile,
131 done: &mut QueueFile,
132 now_rfc3339: &str,
133) -> Result<ArchiveReport> {
134 let now = super::validate::parse_rfc3339_utc(now_rfc3339)?;
135 let mut moved_ids = Vec::new();
136 let mut remaining = Vec::with_capacity(active.tasks.len());
137
138 for mut task in active.tasks.drain(..) {
139 if matches!(task.status, TaskStatus::Done | TaskStatus::Rejected) {
140 if task
141 .completed_at
142 .as_ref()
143 .is_none_or(|t| t.trim().is_empty())
144 {
145 task.completed_at = Some(now.clone());
146 }
147 task.updated_at = Some(now.clone());
148 moved_ids.push(task.id.trim().to_string());
149 done.tasks.push(task);
150 } else {
151 remaining.push(task);
152 }
153 }
154
155 active.tasks = remaining;
156
157 Ok(ArchiveReport { moved_ids })
158}
159
160pub fn archive_terminal_tasks(
162 queue_path: &Path,
163 done_path: &Path,
164 id_prefix: &str,
165 id_width: usize,
166 max_dependency_depth: u8,
167) -> Result<ArchiveReport> {
168 let mut active = load_queue(queue_path)?;
169 let mut done = load_queue_or_default(done_path)?;
170
171 let now = timeutil::now_utc_rfc3339()?;
172 let report = archive_terminal_tasks_in_memory(&mut active, &mut done, &now)?;
173 let backfilled_done = backfill_terminal_completed_at(&mut done, &now) > 0;
174
175 let warnings = validation::validate_queue_set(
177 &active,
178 Some(&done),
179 id_prefix,
180 id_width,
181 max_dependency_depth,
182 )?;
183 validation::log_warnings(&warnings);
184
185 if report.moved_ids.is_empty() && !backfilled_done {
186 return Ok(report);
187 }
188
189 if report.moved_ids.is_empty() {
190 save_queue(done_path, &done)?;
191 return Ok(report);
192 }
193
194 save_queue(done_path, &done)?;
195 save_queue(queue_path, &active)?;
196
197 Ok(report)
198}