Skip to main content

sift_queue/queue/
mod.rs

1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11/// Valid status values for queue items.
12pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14/// Valid source types accepted by `push` (used for validation on add).
15pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17// ── Types ───────────────────────────────────────────────────────────────────
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub struct Item {
21    pub id: String,
22
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub title: Option<String>,
25
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub description: Option<String>,
28
29    pub status: String,
30
31    pub sources: Vec<Source>,
32
33    pub metadata: serde_json::Value,
34
35    /// Always serialized, even when null.
36    pub session_id: Option<String>,
37
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub worktree: Option<Worktree>,
40
41    #[serde(skip_serializing_if = "is_empty_vec")]
42    #[serde(default)]
43    pub blocked_by: Vec<String>,
44
45    #[serde(skip_serializing_if = "is_empty_json_vec")]
46    #[serde(default)]
47    pub errors: Vec<serde_json::Value>,
48
49    pub created_at: String,
50    pub updated_at: String,
51}
52
53fn is_empty_vec(v: &[String]) -> bool {
54    v.is_empty()
55}
56
57fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
58    v.is_empty()
59}
60
61/// We need the JSON field order to match Ruby exactly.
62/// Ruby `to_h` outputs: id, (title if present), (description if present),
63/// status, sources, metadata, session_id, created_at, updated_at,
64/// (worktree if present), (blocked_by if non-empty), (errors if non-empty).
65///
66/// serde by default serializes in struct field order, so we order the fields
67/// to match. But Ruby puts title BEFORE status when present, and worktree/
68/// blocked_by/errors AFTER updated_at. Let's use a custom serializer.
69impl Item {
70    pub fn to_json_value(&self) -> serde_json::Value {
71        let mut map = serde_json::Map::new();
72
73        map.insert("id".to_string(), serde_json::Value::String(self.id.clone()));
74
75        // title/description go right after id, before status (only if present)
76        if let Some(ref title) = self.title {
77            map.insert(
78                "title".to_string(),
79                serde_json::Value::String(title.clone()),
80            );
81        }
82        if let Some(ref description) = self.description {
83            map.insert(
84                "description".to_string(),
85                serde_json::Value::String(description.clone()),
86            );
87        }
88
89        map.insert(
90            "status".to_string(),
91            serde_json::Value::String(self.status.clone()),
92        );
93
94        map.insert(
95            "sources".to_string(),
96            serde_json::Value::Array(self.sources.iter().map(|s| s.to_json_value()).collect()),
97        );
98
99        map.insert("metadata".to_string(), self.metadata.clone());
100
101        map.insert(
102            "session_id".to_string(),
103            match &self.session_id {
104                Some(s) => serde_json::Value::String(s.clone()),
105                None => serde_json::Value::Null,
106            },
107        );
108
109        map.insert(
110            "created_at".to_string(),
111            serde_json::Value::String(self.created_at.clone()),
112        );
113        map.insert(
114            "updated_at".to_string(),
115            serde_json::Value::String(self.updated_at.clone()),
116        );
117
118        // worktree after updated_at, only if present
119        if let Some(ref wt) = self.worktree {
120            map.insert("worktree".to_string(), wt.to_json_value());
121        }
122
123        // blocked_by after worktree, only if non-empty
124        if !self.blocked_by.is_empty() {
125            map.insert(
126                "blocked_by".to_string(),
127                serde_json::Value::Array(
128                    self.blocked_by
129                        .iter()
130                        .map(|s| serde_json::Value::String(s.clone()))
131                        .collect(),
132                ),
133            );
134        }
135
136        // errors after blocked_by, only if non-empty
137        if !self.errors.is_empty() {
138            map.insert(
139                "errors".to_string(),
140                serde_json::Value::Array(self.errors.clone()),
141            );
142        }
143
144        serde_json::Value::Object(map)
145    }
146
147    pub fn to_json_string(&self) -> String {
148        self.to_json_value().to_string()
149    }
150
151    pub fn pending(&self) -> bool {
152        self.status == "pending"
153    }
154
155    pub fn blocked(&self) -> bool {
156        !self.blocked_by.is_empty()
157    }
158
159    pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
160        if !self.pending() {
161            return false;
162        }
163        if !self.blocked() {
164            return true;
165        }
166        match pending_ids {
167            None => true,
168            Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
169        }
170    }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174pub struct Source {
175    #[serde(rename = "type")]
176    pub type_: String,
177
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub path: Option<String>,
180
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub content: Option<String>,
183
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub session_id: Option<String>,
186}
187
188impl Source {
189    pub fn to_json_value(&self) -> serde_json::Value {
190        let mut map = serde_json::Map::new();
191        map.insert(
192            "type".to_string(),
193            serde_json::Value::String(self.type_.clone()),
194        );
195        if let Some(ref path) = self.path {
196            map.insert("path".to_string(), serde_json::Value::String(path.clone()));
197        }
198        if let Some(ref content) = self.content {
199            map.insert(
200                "content".to_string(),
201                serde_json::Value::String(content.clone()),
202            );
203        }
204        if let Some(ref session_id) = self.session_id {
205            map.insert(
206                "session_id".to_string(),
207                serde_json::Value::String(session_id.clone()),
208            );
209        }
210        serde_json::Value::Object(map)
211    }
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
215pub struct Worktree {
216    #[serde(skip_serializing_if = "Option::is_none")]
217    pub path: Option<String>,
218    #[serde(skip_serializing_if = "Option::is_none")]
219    pub branch: Option<String>,
220}
221
222impl Worktree {
223    pub fn to_json_value(&self) -> serde_json::Value {
224        let mut map = serde_json::Map::new();
225        if let Some(ref path) = self.path {
226            map.insert("path".to_string(), serde_json::Value::String(path.clone()));
227        }
228        if let Some(ref branch) = self.branch {
229            map.insert(
230                "branch".to_string(),
231                serde_json::Value::String(branch.clone()),
232            );
233        }
234        serde_json::Value::Object(map)
235    }
236}
237
238// ── Queue ───────────────────────────────────────────────────────────────────
239
240pub struct Queue {
241    pub path: PathBuf,
242}
243
244pub struct NewItem {
245    pub sources: Vec<Source>,
246    pub title: Option<String>,
247    pub description: Option<String>,
248    pub metadata: serde_json::Value,
249    pub session_id: Option<String>,
250    pub blocked_by: Vec<String>,
251}
252
253impl Queue {
254    pub fn new(path: PathBuf) -> Self {
255        Self { path }
256    }
257
258    /// Add a new item to the queue. Returns the created Item.
259    pub fn push(
260        &self,
261        sources: Vec<Source>,
262        title: Option<String>,
263        metadata: serde_json::Value,
264        session_id: Option<String>,
265        blocked_by: Vec<String>,
266    ) -> Result<Item> {
267        self.validate_sources(&sources)?;
268        self.push_with_description(sources, title, None, metadata, session_id, blocked_by)
269    }
270
271    /// Add a new item to the queue with description support.
272    pub fn push_with_description(
273        &self,
274        sources: Vec<Source>,
275        title: Option<String>,
276        description: Option<String>,
277        metadata: serde_json::Value,
278        session_id: Option<String>,
279        blocked_by: Vec<String>,
280    ) -> Result<Item> {
281        let new_item = NewItem {
282            sources,
283            title,
284            description,
285            metadata,
286            session_id,
287            blocked_by,
288        };
289
290        let mut items = self.push_many_with_description(vec![new_item])?;
291        Ok(items.remove(0))
292    }
293
294    /// Add many new items to the queue under a single lock.
295    pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
296        if items.is_empty() {
297            anyhow::bail!("At least one item is required");
298        }
299
300        for item in &items {
301            self.validate_new_item(item)?;
302        }
303
304        self.with_exclusive_lock(|f| {
305            let existing = read_items(f, &self.path);
306            let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
307            let now = now_iso8601();
308            let mut created = Vec::with_capacity(items.len());
309
310            f.seek(std::io::SeekFrom::End(0))?;
311            for new_item in items {
312                let id = generate_id(&existing_ids);
313                existing_ids.insert(id.clone());
314
315                let item = Item {
316                    id,
317                    title: new_item.title,
318                    description: new_item.description,
319                    status: "pending".to_string(),
320                    sources: new_item.sources,
321                    metadata: new_item.metadata,
322                    session_id: new_item.session_id,
323                    worktree: None,
324                    blocked_by: new_item.blocked_by,
325                    errors: Vec::new(),
326                    created_at: now.clone(),
327                    updated_at: now.clone(),
328                };
329
330                writeln!(f, "{}", item.to_json_string())?;
331                created.push(item);
332            }
333            f.flush()?;
334
335            Ok(created)
336        })
337    }
338
339    /// Get all items, skipping corrupt lines with warnings.
340    pub fn all(&self) -> Vec<Item> {
341        if !self.path.exists() {
342            return Vec::new();
343        }
344        match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
345            Ok(items) => items,
346            Err(_) => Vec::new(),
347        }
348    }
349
350    /// Find an item by ID.
351    pub fn find(&self, id: &str) -> Option<Item> {
352        self.all().into_iter().find(|item| item.id == id)
353    }
354
355    /// Filter items by status (optional).
356    pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
357        let items = self.all();
358        match status {
359            Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
360            None => items,
361        }
362    }
363
364    /// Return pending items that are not blocked by any pending item.
365    pub fn ready(&self) -> Vec<Item> {
366        let items = self.all();
367        let pending_ids: HashSet<String> = items
368            .iter()
369            .filter(|i| i.pending())
370            .map(|i| i.id.clone())
371            .collect();
372        items
373            .into_iter()
374            .filter(|item| item.ready(Some(&pending_ids)))
375            .collect()
376    }
377
378    /// Update an item by ID. Returns the updated Item or None.
379    pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
380        self.with_exclusive_lock(|f| {
381            let mut items = read_items(f, &self.path);
382            let index = items.iter().position(|item| item.id == id);
383            let index = match index {
384                Some(i) => i,
385                None => return Ok(None),
386            };
387
388            let item = &mut items[index];
389
390            if let Some(status) = &attrs.status {
391                if !VALID_STATUSES.contains(&status.as_str()) {
392                    anyhow::bail!(
393                        "Invalid status: {}. Valid: {}",
394                        status,
395                        VALID_STATUSES.join(", ")
396                    );
397                }
398                item.status = status.clone();
399            }
400            if let Some(title) = attrs.title {
401                item.title = Some(title);
402            }
403            if let Some(description) = attrs.description {
404                item.description = Some(description);
405            }
406            if let Some(metadata) = attrs.metadata {
407                item.metadata = metadata;
408            }
409            if let Some(session_id) = attrs.session_id {
410                item.session_id = Some(session_id);
411            }
412            if let Some(blocked_by) = attrs.blocked_by {
413                item.blocked_by = blocked_by;
414            }
415            if let Some(sources) = attrs.sources {
416                item.sources = sources;
417            }
418
419            item.updated_at = now_iso8601();
420
421            let updated = item.clone();
422            rewrite_items(f, &items)?;
423            Ok(Some(updated))
424        })
425    }
426
427    /// Remove an item by ID. Returns the removed Item or None.
428    pub fn remove(&self, id: &str) -> Result<Option<Item>> {
429        self.with_exclusive_lock(|f| {
430            let mut items = read_items(f, &self.path);
431            let index = items.iter().position(|item| item.id == id);
432            match index {
433                Some(i) => {
434                    let removed = items.remove(i);
435                    rewrite_items(f, &items)?;
436                    Ok(Some(removed))
437                }
438                None => Ok(None),
439            }
440        })
441    }
442
443    // ── Private helpers ─────────────────────────────────────────────────
444
445    fn validate_sources(&self, sources: &[Source]) -> Result<()> {
446        if sources.is_empty() {
447            anyhow::bail!("Sources cannot be empty");
448        }
449        self.validate_source_types(sources)
450    }
451
452    fn validate_new_item(&self, item: &NewItem) -> Result<()> {
453        let has_metadata = match &item.metadata {
454            serde_json::Value::Object(map) => !map.is_empty(),
455            _ => true,
456        };
457
458        if item.sources.is_empty()
459            && item.title.is_none()
460            && item.description.is_none()
461            && !has_metadata
462        {
463            anyhow::bail!("Item requires at least one source, title, description, or metadata");
464        }
465
466        self.validate_source_types(&item.sources)
467    }
468
469    fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
470        for source in sources {
471            if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
472                anyhow::bail!(
473                    "Invalid source type: {}. Valid: {}",
474                    source.type_,
475                    VALID_SOURCE_TYPES.join(", ")
476                );
477            }
478        }
479        Ok(())
480    }
481
482    fn ensure_directory(&self) -> Result<()> {
483        if let Some(parent) = self.path.parent() {
484            if !parent.exists() {
485                fs::create_dir_all(parent)?;
486            }
487        }
488        Ok(())
489    }
490
491    fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
492    where
493        F: FnOnce(&mut File) -> Result<T>,
494    {
495        self.ensure_directory()?;
496        let mut file = OpenOptions::new()
497            .read(true)
498            .write(true)
499            .create(true)
500            .truncate(false)
501            .open(&self.path)
502            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
503
504        flock(&file.as_fd(), FlockOperation::LockExclusive)
505            .with_context(|| "Failed to acquire exclusive lock")?;
506
507        let result = f(&mut file);
508
509        // Lock released when file is dropped
510        result
511    }
512
513    fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
514    where
515        F: FnOnce(&mut File) -> Result<T>,
516    {
517        let mut file = File::open(&self.path)
518            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
519
520        flock(&file.as_fd(), FlockOperation::LockShared)
521            .with_context(|| "Failed to acquire shared lock")?;
522
523        f(&mut file)
524    }
525}
526
527/// Read items from an open file, skipping corrupt lines.
528fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
529    file.seek(std::io::SeekFrom::Start(0)).ok();
530    let reader = BufReader::new(file);
531    let mut items = Vec::new();
532
533    for (line_num, line) in reader.lines().enumerate() {
534        let line = match line {
535            Ok(l) => l,
536            Err(_) => continue,
537        };
538        let trimmed = line.trim();
539        if trimmed.is_empty() {
540            continue;
541        }
542        match serde_json::from_str::<Item>(trimmed) {
543            Ok(item) => items.push(item),
544            Err(e) => {
545                eprintln!(
546                    "Warning: Skipping corrupt line {} in {}: {}",
547                    line_num + 1,
548                    path.display(),
549                    e
550                );
551            }
552        }
553    }
554    items
555}
556
557/// Truncate and rewrite all items to the file.
558fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
559    file.seek(std::io::SeekFrom::Start(0))?;
560    file.set_len(0)?;
561    for item in items {
562        writeln!(file, "{}", item.to_json_string())?;
563    }
564    file.flush()?;
565    Ok(())
566}
567
568/// Generate a 3-char alphanumeric ID that doesn't collide with existing.
569fn generate_id(existing_ids: &HashSet<String>) -> String {
570    let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
571    let mut rng = rand::thread_rng();
572    loop {
573        let id: String = (0..3)
574            .map(|_| chars[rng.gen_range(0..chars.len())])
575            .collect();
576        if !existing_ids.contains(&id) {
577            return id;
578        }
579    }
580}
581
582/// Current UTC time in ISO 8601 with millisecond precision.
583fn now_iso8601() -> String {
584    chrono::Utc::now()
585        .format("%Y-%m-%dT%H:%M:%S%.3fZ")
586        .to_string()
587}
588
589/// Attributes for updating an item.
590#[derive(Default)]
591pub struct UpdateAttrs {
592    pub status: Option<String>,
593    pub title: Option<String>,
594    pub description: Option<String>,
595    pub metadata: Option<serde_json::Value>,
596    pub session_id: Option<String>,
597    pub blocked_by: Option<Vec<String>>,
598    pub sources: Option<Vec<Source>>,
599}