1use crate::{SyncState, SyncStats};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::{RwLock, broadcast};
6
7#[derive(Debug, Clone, Hash, PartialEq, Eq)]
8pub struct QueueKey {
9 pub repo_root: String,
10 pub tool: String,
11}
12
13impl QueueKey {
14 pub fn new(repo_root: impl Into<String>, tool: impl Into<String>) -> Self {
15 Self {
16 repo_root: repo_root.into(),
17 tool: tool.into(),
18 }
19 }
20}
21
22#[derive(Debug, Clone)]
23pub struct SyncJob {
24 pub id: String,
25 pub key: QueueKey,
26 pub directory: String,
27 pub force: bool,
28 pub state: SyncState,
29 pub queued_at: Instant,
30 pub started_at: Option<Instant>,
31 pub completed_at: Option<Instant>,
32 pub stats: Option<SyncStats>,
33 pub error: Option<String>,
34 state_tx: broadcast::Sender<SyncState>,
35}
36
37impl SyncJob {
38 pub fn new(key: QueueKey, directory: String, force: bool) -> Self {
39 let (state_tx, _) = broadcast::channel(16);
40 Self {
41 id: uuid::Uuid::new_v4().to_string(),
42 key,
43 directory,
44 force,
45 state: SyncState::Queued,
46 queued_at: Instant::now(),
47 started_at: None,
48 completed_at: None,
49 stats: None,
50 error: None,
51 state_tx,
52 }
53 }
54
55 pub fn subscribe(&self) -> broadcast::Receiver<SyncState> {
56 self.state_tx.subscribe()
57 }
58
59 fn set_state(&mut self, new_state: SyncState) {
60 self.state = new_state;
61 let _ = self.state_tx.send(new_state);
62 }
63
64 pub fn start(&mut self) {
65 self.started_at = Some(Instant::now());
66 self.set_state(SyncState::Running);
67 }
68
69 pub fn complete(&mut self, job_stats: SyncStats) {
70 self.completed_at = Some(Instant::now());
71 self.stats = Some(job_stats);
72 self.set_state(SyncState::Done);
73 }
74
75 pub fn fail(&mut self, error: String) {
76 self.completed_at = Some(Instant::now());
77 self.error = Some(error);
78 self.set_state(SyncState::Error);
79 }
80
81 #[allow(clippy::cast_possible_truncation)]
82 pub fn queued_at_ms(&self) -> u64 {
83 self.queued_at.elapsed().as_millis() as u64
84 }
85}
86
87pub struct SyncQueue {
88 jobs: Arc<RwLock<HashMap<String, SyncJob>>>,
89 pending: Arc<RwLock<HashMap<QueueKey, String>>>,
90}
91
92impl SyncQueue {
93 pub fn new() -> Self {
94 Self {
95 jobs: Arc::new(RwLock::new(HashMap::new())),
96 pending: Arc::new(RwLock::new(HashMap::new())),
97 }
98 }
99
100 pub async fn enqueue(
101 &self,
102 repo_root: &str,
103 tool: &str,
104 directory: &str,
105 force: bool,
106 ) -> (String, bool) {
107 let key = QueueKey::new(repo_root, tool);
108
109 {
110 let pending = self.pending.read().await;
111 if let Some(existing_id) = pending.get(&key) {
112 let jobs = self.jobs.read().await;
113 if let Some(job) = jobs.get(existing_id)
114 && job.state == SyncState::Queued
115 && !force
116 {
117 return (existing_id.clone(), false);
118 }
119 }
120 }
121
122 let job = SyncJob::new(key.clone(), directory.to_string(), force);
123 let id = job.id.clone();
124
125 self.jobs.write().await.insert(id.clone(), job);
126 self.pending.write().await.insert(key, id.clone());
127
128 (id, true)
129 }
130
131 pub async fn get(&self, id: &str) -> Option<SyncJob> {
132 self.jobs.read().await.get(id).cloned()
133 }
134
135 pub async fn get_pending(&self, key: &QueueKey) -> Option<SyncJob> {
136 let id = self.pending.read().await.get(key).cloned()?;
137 self.jobs.read().await.get(&id).cloned()
138 }
139
140 #[allow(clippy::significant_drop_tightening)]
141 pub async fn start(&self, id: &str) -> bool {
142 let mut jobs = self.jobs.write().await;
143 if let Some(job) = jobs.get_mut(id)
144 && job.state == SyncState::Queued
145 {
146 job.start();
147 return true;
148 }
149 false
150 }
151
152 pub async fn complete(&self, id: &str, job_stats: SyncStats) {
153 let key = {
154 let mut jobs = self.jobs.write().await;
155 jobs.get_mut(id).map(|job| {
156 job.complete(job_stats);
157 job.key.clone()
158 })
159 };
160
161 if let Some(key) = key {
162 let mut pending = self.pending.write().await;
163 if pending.get(&key).is_some_and(|pid| pid == id) {
164 pending.remove(&key);
165 }
166 }
167 }
168
169 pub async fn fail(&self, id: &str, error: String) {
170 let key = {
171 let mut jobs = self.jobs.write().await;
172 jobs.get_mut(id).map(|job| {
173 job.fail(error);
174 job.key.clone()
175 })
176 };
177
178 if let Some(key) = key {
179 let mut pending = self.pending.write().await;
180 if pending.get(&key).is_some_and(|pid| pid == id) {
181 pending.remove(&key);
182 }
183 }
184 }
185
186 #[allow(clippy::significant_drop_tightening)]
187 pub async fn wait(&self, id: &str, timeout: Duration) -> Option<SyncState> {
188 let mut rx = {
189 let jobs = self.jobs.read().await;
190 let job = jobs.get(id)?;
191
192 if job.state == SyncState::Done || job.state == SyncState::Error {
193 return Some(job.state);
194 }
195
196 job.subscribe()
197 };
198
199 let deadline = Instant::now() + timeout;
200
201 loop {
202 let remaining = deadline.saturating_duration_since(Instant::now());
203 if remaining.is_zero() {
204 return None;
205 }
206
207 match tokio::time::timeout(remaining, rx.recv()).await {
208 Ok(Ok(new_state)) => {
209 if new_state == SyncState::Done || new_state == SyncState::Error {
210 return Some(new_state);
211 }
212 }
213 Ok(Err(_)) | Err(_) => return None,
214 }
215 }
216 }
217
218 pub async fn list_queues(&self) -> Vec<crate::QueueInfo> {
219 let pending = self.pending.read().await;
220 let jobs = self.jobs.read().await;
221
222 pending
223 .iter()
224 .filter_map(|(key, id)| {
225 let job = jobs.get(id)?;
226 let active = (job.state == SyncState::Running).then(|| id.clone());
227 let pending_count = u32::from(job.state == SyncState::Queued);
228 Some(crate::QueueInfo {
229 repo_root: key.repo_root.clone(),
230 tool: key.tool.clone(),
231 pending: pending_count,
232 active,
233 })
234 })
235 .collect()
236 }
237
238 pub async fn cleanup_old(&self, max_age: Duration) {
239 let now = Instant::now();
240 let mut jobs = self.jobs.write().await;
241
242 jobs.retain(|_, job| {
243 job.completed_at
244 .is_none_or(|completed_at| now.duration_since(completed_at) < max_age)
245 });
246 }
247}
248
249impl Default for SyncQueue {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[tokio::test]
260 async fn test_enqueue_new_job() {
261 let queue = SyncQueue::new();
262 let (id, is_new) = queue
263 .enqueue("/repo", "decisions", ".ixchel/decisions", false)
264 .await;
265
266 assert!(is_new);
267 assert!(!id.is_empty());
268
269 let job = queue.get(&id).await.unwrap();
270 assert_eq!(job.state, SyncState::Queued);
271 assert_eq!(job.key.repo_root, "/repo");
272 assert_eq!(job.key.tool, "decisions");
273 }
274
275 #[tokio::test]
276 async fn test_enqueue_coalesces_duplicate() {
277 let queue = SyncQueue::new();
278 let (id1, is_new1) = queue
279 .enqueue("/repo", "decisions", ".ixchel/decisions", false)
280 .await;
281 let (id2, is_new2) = queue
282 .enqueue("/repo", "decisions", ".ixchel/decisions", false)
283 .await;
284
285 assert!(is_new1);
286 assert!(!is_new2);
287 assert_eq!(id1, id2);
288 }
289
290 #[tokio::test]
291 async fn test_enqueue_force_creates_new() {
292 let queue = SyncQueue::new();
293 let (id1, _) = queue
294 .enqueue("/repo", "decisions", ".ixchel/decisions", false)
295 .await;
296 let (id2, is_new2) = queue
297 .enqueue("/repo", "decisions", ".ixchel/decisions", true)
298 .await;
299
300 assert!(is_new2);
301 assert_ne!(id1, id2);
302 }
303
304 #[tokio::test]
305 async fn test_different_repos_separate_queues() {
306 let queue = SyncQueue::new();
307 let (id1, is_new1) = queue
308 .enqueue("/repo1", "decisions", ".ixchel/decisions", false)
309 .await;
310 let (id2, is_new2) = queue
311 .enqueue("/repo2", "decisions", ".ixchel/decisions", false)
312 .await;
313
314 assert!(is_new1);
315 assert!(is_new2);
316 assert_ne!(id1, id2);
317 }
318
319 #[tokio::test]
320 async fn test_job_lifecycle() {
321 let queue = SyncQueue::new();
322 let (id, _) = queue
323 .enqueue("/repo", "decisions", ".ixchel/decisions", false)
324 .await;
325
326 assert!(queue.start(&id).await);
327
328 let job = queue.get(&id).await.unwrap();
329 assert_eq!(job.state, SyncState::Running);
330
331 queue
332 .complete(
333 &id,
334 SyncStats {
335 files_scanned: 10,
336 files_updated: 2,
337 duration_ms: 100,
338 },
339 )
340 .await;
341
342 let job = queue.get(&id).await.unwrap();
343 assert_eq!(job.state, SyncState::Done);
344 assert!(job.stats.is_some());
345 }
346
347 #[tokio::test]
348 async fn test_wait_already_complete() {
349 let queue = SyncQueue::new();
350 let (id, _) = queue
351 .enqueue("/repo", "decisions", ".ixchel/decisions", false)
352 .await;
353
354 queue.start(&id).await;
355 queue.complete(&id, SyncStats::default()).await;
356
357 let result = queue.wait(&id, Duration::from_millis(100)).await;
358 assert_eq!(result, Some(SyncState::Done));
359 }
360
361 #[tokio::test]
362 async fn test_list_queues() {
363 let queue = SyncQueue::new();
364 queue
365 .enqueue("/repo1", "decisions", ".ixchel/decisions", false)
366 .await;
367 queue
368 .enqueue("/repo2", "issues", ".ixchel/issues", false)
369 .await;
370
371 let queues = queue.list_queues().await;
372 assert_eq!(queues.len(), 2);
373 }
374}