Skip to main content

sift_queue/queue/
mod.rs

1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11/// Valid status values for queue items.
12pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14/// Valid source types accepted by `push` (used for validation on add).
15pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17// ── Types ───────────────────────────────────────────────────────────────────
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub struct Item {
21    pub id: String,
22
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub title: Option<String>,
25
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub description: Option<String>,
28
29    pub status: String,
30
31    pub sources: Vec<Source>,
32
33    pub metadata: serde_json::Value,
34
35    /// Always serialized, even when null.
36    pub session_id: Option<String>,
37
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub worktree: Option<Worktree>,
40
41    #[serde(skip_serializing_if = "is_empty_vec")]
42    #[serde(default)]
43    pub blocked_by: Vec<String>,
44
45    #[serde(skip_serializing_if = "is_empty_json_vec")]
46    #[serde(default)]
47    pub errors: Vec<serde_json::Value>,
48
49    pub created_at: String,
50    pub updated_at: String,
51}
52
53fn is_empty_vec(v: &[String]) -> bool {
54    v.is_empty()
55}
56
57fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
58    v.is_empty()
59}
60
61/// We need the JSON field order to match Ruby exactly.
62/// Ruby `to_h` outputs: id, (title if present), (description if present),
63/// status, sources, metadata, session_id, created_at, updated_at,
64/// (worktree if present), (blocked_by if non-empty), (errors if non-empty).
65///
66/// serde by default serializes in struct field order, so we order the fields
67/// to match. But Ruby puts title BEFORE status when present, and worktree/
68/// blocked_by/errors AFTER updated_at. Let's use a custom serializer.
69impl Item {
70    pub fn to_json_value(&self) -> serde_json::Value {
71        let mut map = serde_json::Map::new();
72
73        map.insert("id".to_string(), serde_json::Value::String(self.id.clone()));
74
75        // title/description go right after id, before status (only if present)
76        if let Some(ref title) = self.title {
77            map.insert(
78                "title".to_string(),
79                serde_json::Value::String(title.clone()),
80            );
81        }
82        if let Some(ref description) = self.description {
83            map.insert(
84                "description".to_string(),
85                serde_json::Value::String(description.clone()),
86            );
87        }
88
89        map.insert(
90            "status".to_string(),
91            serde_json::Value::String(self.status.clone()),
92        );
93
94        map.insert(
95            "sources".to_string(),
96            serde_json::Value::Array(self.sources.iter().map(|s| s.to_json_value()).collect()),
97        );
98
99        map.insert("metadata".to_string(), self.metadata.clone());
100
101        map.insert(
102            "session_id".to_string(),
103            match &self.session_id {
104                Some(s) => serde_json::Value::String(s.clone()),
105                None => serde_json::Value::Null,
106            },
107        );
108
109        map.insert(
110            "created_at".to_string(),
111            serde_json::Value::String(self.created_at.clone()),
112        );
113        map.insert(
114            "updated_at".to_string(),
115            serde_json::Value::String(self.updated_at.clone()),
116        );
117
118        // worktree after updated_at, only if present
119        if let Some(ref wt) = self.worktree {
120            map.insert("worktree".to_string(), wt.to_json_value());
121        }
122
123        // blocked_by after worktree, only if non-empty
124        if !self.blocked_by.is_empty() {
125            map.insert(
126                "blocked_by".to_string(),
127                serde_json::Value::Array(
128                    self.blocked_by
129                        .iter()
130                        .map(|s| serde_json::Value::String(s.clone()))
131                        .collect(),
132                ),
133            );
134        }
135
136        // errors after blocked_by, only if non-empty
137        if !self.errors.is_empty() {
138            map.insert(
139                "errors".to_string(),
140                serde_json::Value::Array(self.errors.clone()),
141            );
142        }
143
144        serde_json::Value::Object(map)
145    }
146
147    pub fn to_json_string(&self) -> String {
148        self.to_json_value().to_string()
149    }
150
151    pub fn pending(&self) -> bool {
152        self.status == "pending"
153    }
154
155    pub fn blocked(&self) -> bool {
156        !self.blocked_by.is_empty()
157    }
158
159    pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
160        if !self.pending() {
161            return false;
162        }
163        if !self.blocked() {
164            return true;
165        }
166        match pending_ids {
167            None => true,
168            Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
169        }
170    }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174pub struct Source {
175    #[serde(rename = "type")]
176    pub type_: String,
177
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub path: Option<String>,
180
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub content: Option<String>,
183
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub session_id: Option<String>,
186}
187
188impl Source {
189    pub fn to_json_value(&self) -> serde_json::Value {
190        let mut map = serde_json::Map::new();
191        map.insert(
192            "type".to_string(),
193            serde_json::Value::String(self.type_.clone()),
194        );
195        if let Some(ref path) = self.path {
196            map.insert(
197                "path".to_string(),
198                serde_json::Value::String(path.clone()),
199            );
200        }
201        if let Some(ref content) = self.content {
202            map.insert(
203                "content".to_string(),
204                serde_json::Value::String(content.clone()),
205            );
206        }
207        if let Some(ref session_id) = self.session_id {
208            map.insert(
209                "session_id".to_string(),
210                serde_json::Value::String(session_id.clone()),
211            );
212        }
213        serde_json::Value::Object(map)
214    }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
218pub struct Worktree {
219    #[serde(skip_serializing_if = "Option::is_none")]
220    pub path: Option<String>,
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub branch: Option<String>,
223}
224
225impl Worktree {
226    pub fn to_json_value(&self) -> serde_json::Value {
227        let mut map = serde_json::Map::new();
228        if let Some(ref path) = self.path {
229            map.insert(
230                "path".to_string(),
231                serde_json::Value::String(path.clone()),
232            );
233        }
234        if let Some(ref branch) = self.branch {
235            map.insert(
236                "branch".to_string(),
237                serde_json::Value::String(branch.clone()),
238            );
239        }
240        serde_json::Value::Object(map)
241    }
242}
243
244// ── Queue ───────────────────────────────────────────────────────────────────
245
246pub struct Queue {
247    pub path: PathBuf,
248}
249
250impl Queue {
251    pub fn new(path: PathBuf) -> Self {
252        Self { path }
253    }
254
255    /// Add a new item to the queue. Returns the created Item.
256    pub fn push(
257        &self,
258        sources: Vec<Source>,
259        title: Option<String>,
260        metadata: serde_json::Value,
261        session_id: Option<String>,
262        blocked_by: Vec<String>,
263    ) -> Result<Item> {
264        self.validate_sources(&sources)?;
265        self.push_with_description(sources, title, None, metadata, session_id, blocked_by)
266    }
267
268    /// Add a new item to the queue with description support.
269    pub fn push_with_description(
270        &self,
271        sources: Vec<Source>,
272        title: Option<String>,
273        description: Option<String>,
274        metadata: serde_json::Value,
275        session_id: Option<String>,
276        blocked_by: Vec<String>,
277    ) -> Result<Item> {
278        let has_metadata = match &metadata {
279            serde_json::Value::Object(map) => !map.is_empty(),
280            _ => true,
281        };
282
283        if sources.is_empty() && title.is_none() && description.is_none() && !has_metadata {
284            anyhow::bail!("Item requires at least one source, title, description, or metadata");
285        }
286        self.validate_source_types(&sources)?;
287
288        self.with_exclusive_lock(|f| {
289            let existing = read_items(f, &self.path);
290            let existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
291
292            let now = now_iso8601();
293            let item = Item {
294                id: generate_id(&existing_ids),
295                title,
296                description,
297                status: "pending".to_string(),
298                sources,
299                metadata,
300                session_id,
301                worktree: None,
302                blocked_by,
303                errors: Vec::new(),
304                created_at: now.clone(),
305                updated_at: now,
306            };
307
308            // Append to end of file
309            f.seek(std::io::SeekFrom::End(0))?;
310            writeln!(f, "{}", item.to_json_string())?;
311            f.flush()?;
312
313            Ok(item)
314        })
315    }
316
317    /// Get all items, skipping corrupt lines with warnings.
318    pub fn all(&self) -> Vec<Item> {
319        if !self.path.exists() {
320            return Vec::new();
321        }
322        match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
323            Ok(items) => items,
324            Err(_) => Vec::new(),
325        }
326    }
327
328    /// Find an item by ID.
329    pub fn find(&self, id: &str) -> Option<Item> {
330        self.all().into_iter().find(|item| item.id == id)
331    }
332
333    /// Filter items by status (optional).
334    pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
335        let items = self.all();
336        match status {
337            Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
338            None => items,
339        }
340    }
341
342    /// Return pending items that are not blocked by any pending item.
343    pub fn ready(&self) -> Vec<Item> {
344        let items = self.all();
345        let pending_ids: HashSet<String> = items
346            .iter()
347            .filter(|i| i.pending())
348            .map(|i| i.id.clone())
349            .collect();
350        items
351            .into_iter()
352            .filter(|item| item.ready(Some(&pending_ids)))
353            .collect()
354    }
355
356    /// Update an item by ID. Returns the updated Item or None.
357    pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
358        self.with_exclusive_lock(|f| {
359            let mut items = read_items(f, &self.path);
360            let index = items.iter().position(|item| item.id == id);
361            let index = match index {
362                Some(i) => i,
363                None => return Ok(None),
364            };
365
366            let item = &mut items[index];
367
368            if let Some(status) = &attrs.status {
369                if !VALID_STATUSES.contains(&status.as_str()) {
370                    anyhow::bail!(
371                        "Invalid status: {}. Valid: {}",
372                        status,
373                        VALID_STATUSES.join(", ")
374                    );
375                }
376                item.status = status.clone();
377            }
378            if let Some(title) = attrs.title {
379                item.title = Some(title);
380            }
381            if let Some(description) = attrs.description {
382                item.description = Some(description);
383            }
384            if let Some(metadata) = attrs.metadata {
385                item.metadata = metadata;
386            }
387            if let Some(session_id) = attrs.session_id {
388                item.session_id = Some(session_id);
389            }
390            if let Some(blocked_by) = attrs.blocked_by {
391                item.blocked_by = blocked_by;
392            }
393            if let Some(sources) = attrs.sources {
394                item.sources = sources;
395            }
396
397            item.updated_at = now_iso8601();
398
399            let updated = item.clone();
400            rewrite_items(f, &items)?;
401            Ok(Some(updated))
402        })
403    }
404
405    /// Remove an item by ID. Returns the removed Item or None.
406    pub fn remove(&self, id: &str) -> Result<Option<Item>> {
407        self.with_exclusive_lock(|f| {
408            let mut items = read_items(f, &self.path);
409            let index = items.iter().position(|item| item.id == id);
410            match index {
411                Some(i) => {
412                    let removed = items.remove(i);
413                    rewrite_items(f, &items)?;
414                    Ok(Some(removed))
415                }
416                None => Ok(None),
417            }
418        })
419    }
420
421    // ── Private helpers ─────────────────────────────────────────────────
422
423    fn validate_sources(&self, sources: &[Source]) -> Result<()> {
424        if sources.is_empty() {
425            anyhow::bail!("Sources cannot be empty");
426        }
427        self.validate_source_types(sources)
428    }
429
430    fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
431        for source in sources {
432            if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
433                anyhow::bail!(
434                    "Invalid source type: {}. Valid: {}",
435                    source.type_,
436                    VALID_SOURCE_TYPES.join(", ")
437                );
438            }
439        }
440        Ok(())
441    }
442
443    fn ensure_directory(&self) -> Result<()> {
444        if let Some(parent) = self.path.parent() {
445            if !parent.exists() {
446                fs::create_dir_all(parent)?;
447            }
448        }
449        Ok(())
450    }
451
452    fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
453    where
454        F: FnOnce(&mut File) -> Result<T>,
455    {
456        self.ensure_directory()?;
457        let mut file = OpenOptions::new()
458            .read(true)
459            .write(true)
460            .create(true)
461            .truncate(false)
462            .open(&self.path)
463            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
464
465        flock(&file.as_fd(), FlockOperation::LockExclusive)
466            .with_context(|| "Failed to acquire exclusive lock")?;
467
468        let result = f(&mut file);
469
470        // Lock released when file is dropped
471        result
472    }
473
474    fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
475    where
476        F: FnOnce(&mut File) -> Result<T>,
477    {
478        let mut file = File::open(&self.path)
479            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
480
481        flock(&file.as_fd(), FlockOperation::LockShared)
482            .with_context(|| "Failed to acquire shared lock")?;
483
484        f(&mut file)
485    }
486}
487
488/// Read items from an open file, skipping corrupt lines.
489fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
490    file.seek(std::io::SeekFrom::Start(0)).ok();
491    let reader = BufReader::new(file);
492    let mut items = Vec::new();
493
494    for (line_num, line) in reader.lines().enumerate() {
495        let line = match line {
496            Ok(l) => l,
497            Err(_) => continue,
498        };
499        let trimmed = line.trim();
500        if trimmed.is_empty() {
501            continue;
502        }
503        match serde_json::from_str::<Item>(trimmed) {
504            Ok(item) => items.push(item),
505            Err(e) => {
506                eprintln!(
507                    "Warning: Skipping corrupt line {} in {}: {}",
508                    line_num + 1,
509                    path.display(),
510                    e
511                );
512            }
513        }
514    }
515    items
516}
517
518/// Truncate and rewrite all items to the file.
519fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
520    file.seek(std::io::SeekFrom::Start(0))?;
521    file.set_len(0)?;
522    for item in items {
523        writeln!(file, "{}", item.to_json_string())?;
524    }
525    file.flush()?;
526    Ok(())
527}
528
529/// Generate a 3-char alphanumeric ID that doesn't collide with existing.
530fn generate_id(existing_ids: &HashSet<String>) -> String {
531    let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
532    let mut rng = rand::thread_rng();
533    loop {
534        let id: String = (0..3).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
535        if !existing_ids.contains(&id) {
536            return id;
537        }
538    }
539}
540
541/// Current UTC time in ISO 8601 with millisecond precision.
542fn now_iso8601() -> String {
543    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
544}
545
546/// Attributes for updating an item.
547#[derive(Default)]
548pub struct UpdateAttrs {
549    pub status: Option<String>,
550    pub title: Option<String>,
551    pub description: Option<String>,
552    pub metadata: Option<serde_json::Value>,
553    pub session_id: Option<String>,
554    pub blocked_by: Option<Vec<String>>,
555    pub sources: Option<Vec<Source>>,
556}