Skip to main content

ix_daemon/
queue.rs

1use crate::{SyncState, SyncStats};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::{RwLock, broadcast};
6
7#[derive(Debug, Clone, Hash, PartialEq, Eq)]
8pub struct QueueKey {
9    pub repo_root: String,
10    pub tool: String,
11}
12
13impl QueueKey {
14    pub fn new(repo_root: impl Into<String>, tool: impl Into<String>) -> Self {
15        Self {
16            repo_root: repo_root.into(),
17            tool: tool.into(),
18        }
19    }
20}
21
22#[derive(Debug, Clone)]
23pub struct SyncJob {
24    pub id: String,
25    pub key: QueueKey,
26    pub directory: String,
27    pub force: bool,
28    pub state: SyncState,
29    pub queued_at: Instant,
30    pub started_at: Option<Instant>,
31    pub completed_at: Option<Instant>,
32    pub stats: Option<SyncStats>,
33    pub error: Option<String>,
34    state_tx: broadcast::Sender<SyncState>,
35}
36
37impl SyncJob {
38    pub fn new(key: QueueKey, directory: String, force: bool) -> Self {
39        let (state_tx, _) = broadcast::channel(16);
40        Self {
41            id: uuid::Uuid::new_v4().to_string(),
42            key,
43            directory,
44            force,
45            state: SyncState::Queued,
46            queued_at: Instant::now(),
47            started_at: None,
48            completed_at: None,
49            stats: None,
50            error: None,
51            state_tx,
52        }
53    }
54
55    pub fn subscribe(&self) -> broadcast::Receiver<SyncState> {
56        self.state_tx.subscribe()
57    }
58
59    fn set_state(&mut self, new_state: SyncState) {
60        self.state = new_state;
61        let _ = self.state_tx.send(new_state);
62    }
63
64    pub fn start(&mut self) {
65        self.started_at = Some(Instant::now());
66        self.set_state(SyncState::Running);
67    }
68
69    pub fn complete(&mut self, job_stats: SyncStats) {
70        self.completed_at = Some(Instant::now());
71        self.stats = Some(job_stats);
72        self.set_state(SyncState::Done);
73    }
74
75    pub fn fail(&mut self, error: String) {
76        self.completed_at = Some(Instant::now());
77        self.error = Some(error);
78        self.set_state(SyncState::Error);
79    }
80
81    #[allow(clippy::cast_possible_truncation)]
82    pub fn queued_at_ms(&self) -> u64 {
83        self.queued_at.elapsed().as_millis() as u64
84    }
85}
86
87pub struct SyncQueue {
88    jobs: Arc<RwLock<HashMap<String, SyncJob>>>,
89    pending: Arc<RwLock<HashMap<QueueKey, String>>>,
90}
91
92impl SyncQueue {
93    pub fn new() -> Self {
94        Self {
95            jobs: Arc::new(RwLock::new(HashMap::new())),
96            pending: Arc::new(RwLock::new(HashMap::new())),
97        }
98    }
99
100    pub async fn enqueue(
101        &self,
102        repo_root: &str,
103        tool: &str,
104        directory: &str,
105        force: bool,
106    ) -> (String, bool) {
107        let key = QueueKey::new(repo_root, tool);
108
109        {
110            let pending = self.pending.read().await;
111            if let Some(existing_id) = pending.get(&key) {
112                let jobs = self.jobs.read().await;
113                if let Some(job) = jobs.get(existing_id)
114                    && job.state == SyncState::Queued
115                    && !force
116                {
117                    return (existing_id.clone(), false);
118                }
119            }
120        }
121
122        let job = SyncJob::new(key.clone(), directory.to_string(), force);
123        let id = job.id.clone();
124
125        self.jobs.write().await.insert(id.clone(), job);
126        self.pending.write().await.insert(key, id.clone());
127
128        (id, true)
129    }
130
131    pub async fn get(&self, id: &str) -> Option<SyncJob> {
132        self.jobs.read().await.get(id).cloned()
133    }
134
135    pub async fn get_pending(&self, key: &QueueKey) -> Option<SyncJob> {
136        let id = self.pending.read().await.get(key).cloned()?;
137        self.jobs.read().await.get(&id).cloned()
138    }
139
140    #[allow(clippy::significant_drop_tightening)]
141    pub async fn start(&self, id: &str) -> bool {
142        let mut jobs = self.jobs.write().await;
143        if let Some(job) = jobs.get_mut(id)
144            && job.state == SyncState::Queued
145        {
146            job.start();
147            return true;
148        }
149        false
150    }
151
152    pub async fn complete(&self, id: &str, job_stats: SyncStats) {
153        let key = {
154            let mut jobs = self.jobs.write().await;
155            jobs.get_mut(id).map(|job| {
156                job.complete(job_stats);
157                job.key.clone()
158            })
159        };
160
161        if let Some(key) = key {
162            let mut pending = self.pending.write().await;
163            if pending.get(&key).is_some_and(|pid| pid == id) {
164                pending.remove(&key);
165            }
166        }
167    }
168
169    pub async fn fail(&self, id: &str, error: String) {
170        let key = {
171            let mut jobs = self.jobs.write().await;
172            jobs.get_mut(id).map(|job| {
173                job.fail(error);
174                job.key.clone()
175            })
176        };
177
178        if let Some(key) = key {
179            let mut pending = self.pending.write().await;
180            if pending.get(&key).is_some_and(|pid| pid == id) {
181                pending.remove(&key);
182            }
183        }
184    }
185
186    #[allow(clippy::significant_drop_tightening)]
187    pub async fn wait(&self, id: &str, timeout: Duration) -> Option<SyncState> {
188        let mut rx = {
189            let jobs = self.jobs.read().await;
190            let job = jobs.get(id)?;
191
192            if job.state == SyncState::Done || job.state == SyncState::Error {
193                return Some(job.state);
194            }
195
196            job.subscribe()
197        };
198
199        let deadline = Instant::now() + timeout;
200
201        loop {
202            let remaining = deadline.saturating_duration_since(Instant::now());
203            if remaining.is_zero() {
204                return None;
205            }
206
207            match tokio::time::timeout(remaining, rx.recv()).await {
208                Ok(Ok(new_state)) => {
209                    if new_state == SyncState::Done || new_state == SyncState::Error {
210                        return Some(new_state);
211                    }
212                }
213                Ok(Err(_)) | Err(_) => return None,
214            }
215        }
216    }
217
218    pub async fn list_queues(&self) -> Vec<crate::QueueInfo> {
219        let pending = self.pending.read().await;
220        let jobs = self.jobs.read().await;
221
222        pending
223            .iter()
224            .filter_map(|(key, id)| {
225                let job = jobs.get(id)?;
226                let active = (job.state == SyncState::Running).then(|| id.clone());
227                let pending_count = u32::from(job.state == SyncState::Queued);
228                Some(crate::QueueInfo {
229                    repo_root: key.repo_root.clone(),
230                    tool: key.tool.clone(),
231                    pending: pending_count,
232                    active,
233                })
234            })
235            .collect()
236    }
237
238    pub async fn cleanup_old(&self, max_age: Duration) {
239        let now = Instant::now();
240        let mut jobs = self.jobs.write().await;
241
242        jobs.retain(|_, job| {
243            job.completed_at
244                .is_none_or(|completed_at| now.duration_since(completed_at) < max_age)
245        });
246    }
247}
248
249impl Default for SyncQueue {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[tokio::test]
260    async fn test_enqueue_new_job() {
261        let queue = SyncQueue::new();
262        let (id, is_new) = queue
263            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
264            .await;
265
266        assert!(is_new);
267        assert!(!id.is_empty());
268
269        let job = queue.get(&id).await.unwrap();
270        assert_eq!(job.state, SyncState::Queued);
271        assert_eq!(job.key.repo_root, "/repo");
272        assert_eq!(job.key.tool, "decisions");
273    }
274
275    #[tokio::test]
276    async fn test_enqueue_coalesces_duplicate() {
277        let queue = SyncQueue::new();
278        let (id1, is_new1) = queue
279            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
280            .await;
281        let (id2, is_new2) = queue
282            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
283            .await;
284
285        assert!(is_new1);
286        assert!(!is_new2);
287        assert_eq!(id1, id2);
288    }
289
290    #[tokio::test]
291    async fn test_enqueue_force_creates_new() {
292        let queue = SyncQueue::new();
293        let (id1, _) = queue
294            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
295            .await;
296        let (id2, is_new2) = queue
297            .enqueue("/repo", "decisions", ".ixchel/decisions", true)
298            .await;
299
300        assert!(is_new2);
301        assert_ne!(id1, id2);
302    }
303
304    #[tokio::test]
305    async fn test_different_repos_separate_queues() {
306        let queue = SyncQueue::new();
307        let (id1, is_new1) = queue
308            .enqueue("/repo1", "decisions", ".ixchel/decisions", false)
309            .await;
310        let (id2, is_new2) = queue
311            .enqueue("/repo2", "decisions", ".ixchel/decisions", false)
312            .await;
313
314        assert!(is_new1);
315        assert!(is_new2);
316        assert_ne!(id1, id2);
317    }
318
319    #[tokio::test]
320    async fn test_job_lifecycle() {
321        let queue = SyncQueue::new();
322        let (id, _) = queue
323            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
324            .await;
325
326        assert!(queue.start(&id).await);
327
328        let job = queue.get(&id).await.unwrap();
329        assert_eq!(job.state, SyncState::Running);
330
331        queue
332            .complete(
333                &id,
334                SyncStats {
335                    files_scanned: 10,
336                    files_updated: 2,
337                    duration_ms: 100,
338                },
339            )
340            .await;
341
342        let job = queue.get(&id).await.unwrap();
343        assert_eq!(job.state, SyncState::Done);
344        assert!(job.stats.is_some());
345    }
346
347    #[tokio::test]
348    async fn test_wait_already_complete() {
349        let queue = SyncQueue::new();
350        let (id, _) = queue
351            .enqueue("/repo", "decisions", ".ixchel/decisions", false)
352            .await;
353
354        queue.start(&id).await;
355        queue.complete(&id, SyncStats::default()).await;
356
357        let result = queue.wait(&id, Duration::from_millis(100)).await;
358        assert_eq!(result, Some(SyncState::Done));
359    }
360
361    #[tokio::test]
362    async fn test_list_queues() {
363        let queue = SyncQueue::new();
364        queue
365            .enqueue("/repo1", "decisions", ".ixchel/decisions", false)
366            .await;
367        queue
368            .enqueue("/repo2", "issues", ".ixchel/issues", false)
369            .await;
370
371        let queues = queue.list_queues().await;
372        assert_eq!(queues.len(), 2);
373    }
374}