Skip to main content

sift_queue/queue/
mod.rs

1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11/// Valid status values for queue items.
12pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14/// Valid source types accepted by `push` (used for validation on add).
15pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17// ── Types ───────────────────────────────────────────────────────────────────
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub struct Item {
21    pub id: String,
22
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub title: Option<String>,
25
26    pub status: String,
27
28    pub sources: Vec<Source>,
29
30    pub metadata: serde_json::Value,
31
32    /// Always serialized, even when null.
33    pub session_id: Option<String>,
34
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub worktree: Option<Worktree>,
37
38    #[serde(skip_serializing_if = "is_empty_vec")]
39    #[serde(default)]
40    pub blocked_by: Vec<String>,
41
42    #[serde(skip_serializing_if = "is_empty_json_vec")]
43    #[serde(default)]
44    pub errors: Vec<serde_json::Value>,
45
46    pub created_at: String,
47    pub updated_at: String,
48}
49
50fn is_empty_vec(v: &[String]) -> bool {
51    v.is_empty()
52}
53
54fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
55    v.is_empty()
56}
57
58/// We need the JSON field order to match Ruby exactly.
59/// Ruby `to_h` outputs: id, (title if present), status, sources, metadata,
60/// session_id, created_at, updated_at, (worktree if present),
61/// (blocked_by if non-empty), (errors if non-empty).
62///
63/// serde by default serializes in struct field order, so we order the fields
64/// to match. But Ruby puts title BEFORE status when present, and worktree/
65/// blocked_by/errors AFTER updated_at. Let's use a custom serializer.
66impl Item {
67    pub fn to_json_value(&self) -> serde_json::Value {
68        let mut map = serde_json::Map::new();
69
70        map.insert("id".to_string(), serde_json::Value::String(self.id.clone()));
71
72        // title goes right after id, before status (only if present)
73        if let Some(ref title) = self.title {
74            map.insert(
75                "title".to_string(),
76                serde_json::Value::String(title.clone()),
77            );
78        }
79
80        map.insert(
81            "status".to_string(),
82            serde_json::Value::String(self.status.clone()),
83        );
84
85        map.insert(
86            "sources".to_string(),
87            serde_json::Value::Array(self.sources.iter().map(|s| s.to_json_value()).collect()),
88        );
89
90        map.insert("metadata".to_string(), self.metadata.clone());
91
92        map.insert(
93            "session_id".to_string(),
94            match &self.session_id {
95                Some(s) => serde_json::Value::String(s.clone()),
96                None => serde_json::Value::Null,
97            },
98        );
99
100        map.insert(
101            "created_at".to_string(),
102            serde_json::Value::String(self.created_at.clone()),
103        );
104        map.insert(
105            "updated_at".to_string(),
106            serde_json::Value::String(self.updated_at.clone()),
107        );
108
109        // worktree after updated_at, only if present
110        if let Some(ref wt) = self.worktree {
111            map.insert("worktree".to_string(), wt.to_json_value());
112        }
113
114        // blocked_by after worktree, only if non-empty
115        if !self.blocked_by.is_empty() {
116            map.insert(
117                "blocked_by".to_string(),
118                serde_json::Value::Array(
119                    self.blocked_by
120                        .iter()
121                        .map(|s| serde_json::Value::String(s.clone()))
122                        .collect(),
123                ),
124            );
125        }
126
127        // errors after blocked_by, only if non-empty
128        if !self.errors.is_empty() {
129            map.insert(
130                "errors".to_string(),
131                serde_json::Value::Array(self.errors.clone()),
132            );
133        }
134
135        serde_json::Value::Object(map)
136    }
137
138    pub fn to_json_string(&self) -> String {
139        self.to_json_value().to_string()
140    }
141
142    pub fn pending(&self) -> bool {
143        self.status == "pending"
144    }
145
146    pub fn blocked(&self) -> bool {
147        !self.blocked_by.is_empty()
148    }
149
150    pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
151        if !self.pending() {
152            return false;
153        }
154        if !self.blocked() {
155            return true;
156        }
157        match pending_ids {
158            None => true,
159            Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
160        }
161    }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
165pub struct Source {
166    #[serde(rename = "type")]
167    pub type_: String,
168
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub path: Option<String>,
171
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub content: Option<String>,
174
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub session_id: Option<String>,
177}
178
179impl Source {
180    pub fn to_json_value(&self) -> serde_json::Value {
181        let mut map = serde_json::Map::new();
182        map.insert(
183            "type".to_string(),
184            serde_json::Value::String(self.type_.clone()),
185        );
186        if let Some(ref path) = self.path {
187            map.insert(
188                "path".to_string(),
189                serde_json::Value::String(path.clone()),
190            );
191        }
192        if let Some(ref content) = self.content {
193            map.insert(
194                "content".to_string(),
195                serde_json::Value::String(content.clone()),
196            );
197        }
198        if let Some(ref session_id) = self.session_id {
199            map.insert(
200                "session_id".to_string(),
201                serde_json::Value::String(session_id.clone()),
202            );
203        }
204        serde_json::Value::Object(map)
205    }
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
209pub struct Worktree {
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub path: Option<String>,
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub branch: Option<String>,
214}
215
216impl Worktree {
217    pub fn to_json_value(&self) -> serde_json::Value {
218        let mut map = serde_json::Map::new();
219        if let Some(ref path) = self.path {
220            map.insert(
221                "path".to_string(),
222                serde_json::Value::String(path.clone()),
223            );
224        }
225        if let Some(ref branch) = self.branch {
226            map.insert(
227                "branch".to_string(),
228                serde_json::Value::String(branch.clone()),
229            );
230        }
231        serde_json::Value::Object(map)
232    }
233}
234
235// ── Queue ───────────────────────────────────────────────────────────────────
236
237pub struct Queue {
238    pub path: PathBuf,
239}
240
241impl Queue {
242    pub fn new(path: PathBuf) -> Self {
243        Self { path }
244    }
245
246    /// Add a new item to the queue. Returns the created Item.
247    pub fn push(
248        &self,
249        sources: Vec<Source>,
250        title: Option<String>,
251        metadata: serde_json::Value,
252        session_id: Option<String>,
253        blocked_by: Vec<String>,
254    ) -> Result<Item> {
255        self.validate_sources(&sources)?;
256
257        self.with_exclusive_lock(|f| {
258            let existing = read_items(f, &self.path);
259            let existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
260
261            let now = now_iso8601();
262            let item = Item {
263                id: generate_id(&existing_ids),
264                title,
265                status: "pending".to_string(),
266                sources,
267                metadata,
268                session_id,
269                worktree: None,
270                blocked_by,
271                errors: Vec::new(),
272                created_at: now.clone(),
273                updated_at: now,
274            };
275
276            // Append to end of file
277            f.seek(std::io::SeekFrom::End(0))?;
278            writeln!(f, "{}", item.to_json_string())?;
279            f.flush()?;
280
281            Ok(item)
282        })
283    }
284
285    /// Get all items, skipping corrupt lines with warnings.
286    pub fn all(&self) -> Vec<Item> {
287        if !self.path.exists() {
288            return Vec::new();
289        }
290        match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
291            Ok(items) => items,
292            Err(_) => Vec::new(),
293        }
294    }
295
296    /// Find an item by ID.
297    pub fn find(&self, id: &str) -> Option<Item> {
298        self.all().into_iter().find(|item| item.id == id)
299    }
300
301    /// Filter items by status (optional).
302    pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
303        let items = self.all();
304        match status {
305            Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
306            None => items,
307        }
308    }
309
310    /// Return pending items that are not blocked by any pending item.
311    pub fn ready(&self) -> Vec<Item> {
312        let items = self.all();
313        let pending_ids: HashSet<String> = items
314            .iter()
315            .filter(|i| i.pending())
316            .map(|i| i.id.clone())
317            .collect();
318        items
319            .into_iter()
320            .filter(|item| item.ready(Some(&pending_ids)))
321            .collect()
322    }
323
324    /// Update an item by ID. Returns the updated Item or None.
325    pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
326        self.with_exclusive_lock(|f| {
327            let mut items = read_items(f, &self.path);
328            let index = items.iter().position(|item| item.id == id);
329            let index = match index {
330                Some(i) => i,
331                None => return Ok(None),
332            };
333
334            let item = &mut items[index];
335
336            if let Some(status) = &attrs.status {
337                if !VALID_STATUSES.contains(&status.as_str()) {
338                    anyhow::bail!(
339                        "Invalid status: {}. Valid: {}",
340                        status,
341                        VALID_STATUSES.join(", ")
342                    );
343                }
344                item.status = status.clone();
345            }
346            if let Some(title) = attrs.title {
347                item.title = Some(title);
348            }
349            if let Some(metadata) = attrs.metadata {
350                item.metadata = metadata;
351            }
352            if let Some(session_id) = attrs.session_id {
353                item.session_id = Some(session_id);
354            }
355            if let Some(blocked_by) = attrs.blocked_by {
356                item.blocked_by = blocked_by;
357            }
358            if let Some(sources) = attrs.sources {
359                item.sources = sources;
360            }
361
362            item.updated_at = now_iso8601();
363
364            let updated = item.clone();
365            rewrite_items(f, &items)?;
366            Ok(Some(updated))
367        })
368    }
369
370    /// Remove an item by ID. Returns the removed Item or None.
371    pub fn remove(&self, id: &str) -> Result<Option<Item>> {
372        self.with_exclusive_lock(|f| {
373            let mut items = read_items(f, &self.path);
374            let index = items.iter().position(|item| item.id == id);
375            match index {
376                Some(i) => {
377                    let removed = items.remove(i);
378                    rewrite_items(f, &items)?;
379                    Ok(Some(removed))
380                }
381                None => Ok(None),
382            }
383        })
384    }
385
386    // ── Private helpers ─────────────────────────────────────────────────
387
388    fn validate_sources(&self, sources: &[Source]) -> Result<()> {
389        if sources.is_empty() {
390            anyhow::bail!("Sources cannot be empty");
391        }
392        for source in sources {
393            if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
394                anyhow::bail!(
395                    "Invalid source type: {}. Valid: {}",
396                    source.type_,
397                    VALID_SOURCE_TYPES.join(", ")
398                );
399            }
400        }
401        Ok(())
402    }
403
404    fn ensure_directory(&self) -> Result<()> {
405        if let Some(parent) = self.path.parent() {
406            if !parent.exists() {
407                fs::create_dir_all(parent)?;
408            }
409        }
410        Ok(())
411    }
412
413    fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
414    where
415        F: FnOnce(&mut File) -> Result<T>,
416    {
417        self.ensure_directory()?;
418        let mut file = OpenOptions::new()
419            .read(true)
420            .write(true)
421            .create(true)
422            .truncate(false)
423            .open(&self.path)
424            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
425
426        flock(&file.as_fd(), FlockOperation::LockExclusive)
427            .with_context(|| "Failed to acquire exclusive lock")?;
428
429        let result = f(&mut file);
430
431        // Lock released when file is dropped
432        result
433    }
434
435    fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
436    where
437        F: FnOnce(&mut File) -> Result<T>,
438    {
439        let mut file = File::open(&self.path)
440            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
441
442        flock(&file.as_fd(), FlockOperation::LockShared)
443            .with_context(|| "Failed to acquire shared lock")?;
444
445        f(&mut file)
446    }
447}
448
449/// Read items from an open file, skipping corrupt lines.
450fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
451    file.seek(std::io::SeekFrom::Start(0)).ok();
452    let reader = BufReader::new(file);
453    let mut items = Vec::new();
454
455    for (line_num, line) in reader.lines().enumerate() {
456        let line = match line {
457            Ok(l) => l,
458            Err(_) => continue,
459        };
460        let trimmed = line.trim();
461        if trimmed.is_empty() {
462            continue;
463        }
464        match serde_json::from_str::<Item>(trimmed) {
465            Ok(item) => items.push(item),
466            Err(e) => {
467                eprintln!(
468                    "Warning: Skipping corrupt line {} in {}: {}",
469                    line_num + 1,
470                    path.display(),
471                    e
472                );
473            }
474        }
475    }
476    items
477}
478
479/// Truncate and rewrite all items to the file.
480fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
481    file.seek(std::io::SeekFrom::Start(0))?;
482    file.set_len(0)?;
483    for item in items {
484        writeln!(file, "{}", item.to_json_string())?;
485    }
486    file.flush()?;
487    Ok(())
488}
489
490/// Generate a 3-char alphanumeric ID that doesn't collide with existing.
491fn generate_id(existing_ids: &HashSet<String>) -> String {
492    let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
493    let mut rng = rand::thread_rng();
494    loop {
495        let id: String = (0..3).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
496        if !existing_ids.contains(&id) {
497            return id;
498        }
499    }
500}
501
502/// Current UTC time in ISO 8601 with millisecond precision.
503fn now_iso8601() -> String {
504    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
505}
506
507/// Attributes for updating an item.
508#[derive(Default)]
509pub struct UpdateAttrs {
510    pub status: Option<String>,
511    pub title: Option<String>,
512    pub metadata: Option<serde_json::Value>,
513    pub session_id: Option<String>,
514    pub blocked_by: Option<Vec<String>>,
515    pub sources: Option<Vec<Source>>,
516}