Skip to main content

sift_queue/queue/
mod.rs

1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11/// Valid persisted status values for queue items.
12pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14/// Valid display status values surfaced by list/show output.
15pub const VALID_DISPLAY_STATUSES: &[&str] = &["pending", "blocked", "in_progress", "closed"];
16
17/// Valid source types accepted by `push` (used for validation on add).
18pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
19
20/// Valid priority range accepted by first-class priority fields.
21pub const VALID_PRIORITY_RANGE: std::ops::RangeInclusive<u8> = 0..=4;
22
23// ── Types ───────────────────────────────────────────────────────────────────
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26pub struct Item {
27    pub id: String,
28
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub title: Option<String>,
31
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub description: Option<String>,
34
35    pub status: String,
36
37    #[serde(skip_serializing_if = "Option::is_none")]
38    #[serde(default)]
39    pub priority: Option<u8>,
40
41    pub sources: Vec<Source>,
42
43    pub metadata: serde_json::Value,
44
45    pub created_at: String,
46    pub updated_at: String,
47
48    #[serde(skip_serializing_if = "is_empty_vec")]
49    #[serde(default)]
50    pub blocked_by: Vec<String>,
51
52    #[serde(skip_serializing_if = "is_empty_json_vec")]
53    #[serde(default)]
54    pub errors: Vec<serde_json::Value>,
55}
56
57fn is_empty_vec(v: &[String]) -> bool {
58    v.is_empty()
59}
60
61fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
62    v.is_empty()
63}
64
65/// Serialize items through serde so the struct definition is the source of
66/// truth for JSON output.
67///
68/// Note: this serializes the item's current `status` field verbatim. Callers
69/// that want display/view semantics (for example, surfacing `blocked` as a
70/// computed status in read-oriented output) must first call
71/// `with_computed_status(...)`.
72impl Item {
73    pub fn to_json_value(&self) -> serde_json::Value {
74        serde_json::to_value(self).expect("item serialization should succeed")
75    }
76
77    pub fn to_json_string(&self) -> String {
78        serde_json::to_string(self).expect("item serialization should succeed")
79    }
80
81    pub fn pending(&self) -> bool {
82        self.status == "pending"
83    }
84
85    pub fn blocked(&self) -> bool {
86        !self.blocked_by.is_empty()
87    }
88
89    /// Compute the visible status for read-oriented views.
90    ///
91    /// Persisted lifecycle status remains `pending|in_progress|closed`, but
92    /// pending items with open blockers are surfaced as `blocked` in display
93    /// output.
94    pub fn computed_status(&self, open_ids: Option<&HashSet<String>>) -> String {
95        if !self.pending() || !self.blocked() {
96            return self.status.clone();
97        }
98
99        match open_ids {
100            None => "blocked".to_string(),
101            Some(ids) => {
102                if self.blocked_by.iter().any(|id| ids.contains(id)) {
103                    "blocked".to_string()
104                } else {
105                    self.status.clone()
106                }
107            }
108        }
109    }
110
111    pub fn with_computed_status(&self, open_ids: Option<&HashSet<String>>) -> Self {
112        let mut item = self.clone();
113        item.status = self.computed_status(open_ids);
114        item
115    }
116
117    pub fn ready(&self, open_ids: Option<&HashSet<String>>) -> bool {
118        if !self.pending() {
119            return false;
120        }
121        if !self.blocked() {
122            return true;
123        }
124        match open_ids {
125            None => true,
126            Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
127        }
128    }
129}
130
131pub fn parse_priority_value(input: &str) -> Result<u8> {
132    let trimmed = input.trim();
133
134    let priority = trimmed
135        .parse::<u8>()
136        .with_context(|| format!("Invalid priority: {input}. Valid: 0-4"))?;
137
138    if !VALID_PRIORITY_RANGE.contains(&priority) {
139        anyhow::bail!("Invalid priority: {input}. Valid: 0-4");
140    }
141
142    Ok(priority)
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
146pub struct Source {
147    #[serde(rename = "type")]
148    pub type_: String,
149
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub path: Option<String>,
152
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub content: Option<String>,
155}
156
157impl Source {
158    pub fn to_json_value(&self) -> serde_json::Value {
159        serde_json::to_value(self).expect("source serialization should succeed")
160    }
161}
162
163// ── Queue ───────────────────────────────────────────────────────────────────
164
165pub struct Queue {
166    pub path: PathBuf,
167}
168
169pub struct NewItem {
170    pub sources: Vec<Source>,
171    pub title: Option<String>,
172    pub description: Option<String>,
173    pub priority: Option<u8>,
174    pub metadata: serde_json::Value,
175    pub blocked_by: Vec<String>,
176}
177
178impl Queue {
179    pub fn new(path: PathBuf) -> Self {
180        Self { path }
181    }
182
183    /// Add a new item to the queue. Returns the created Item.
184    pub fn push(
185        &self,
186        sources: Vec<Source>,
187        title: Option<String>,
188        description: Option<String>,
189        priority: Option<u8>,
190        metadata: serde_json::Value,
191        blocked_by: Vec<String>,
192    ) -> Result<Item> {
193        let new_item = NewItem {
194            sources,
195            title,
196            description,
197            priority,
198            metadata,
199            blocked_by,
200        };
201
202        let mut items = self.push_many_with_description(vec![new_item])?;
203        Ok(items.remove(0))
204    }
205
206    /// Add many new items to the queue under a single lock.
207    pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
208        if items.is_empty() {
209            anyhow::bail!("At least one item is required");
210        }
211
212        for item in &items {
213            self.validate_new_item(item)?;
214        }
215
216        self.with_exclusive_lock(|f| {
217            let existing = read_items(f, &self.path);
218            let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
219            let now = now_iso8601();
220            let mut created = Vec::with_capacity(items.len());
221
222            f.seek(std::io::SeekFrom::End(0))?;
223            for new_item in items {
224                let id = generate_id(&existing_ids);
225                existing_ids.insert(id.clone());
226
227                let item = Item {
228                    id,
229                    title: new_item.title,
230                    description: new_item.description,
231                    status: "pending".to_string(),
232                    priority: new_item.priority,
233                    sources: new_item.sources,
234                    metadata: new_item.metadata,
235                    created_at: now.clone(),
236                    updated_at: now.clone(),
237                    blocked_by: new_item.blocked_by,
238                    errors: Vec::new(),
239                };
240
241                writeln!(f, "{}", item.to_json_string())?;
242                created.push(item);
243            }
244            f.flush()?;
245
246            Ok(created)
247        })
248    }
249
250    /// Get all items, skipping corrupt lines with warnings.
251    pub fn all(&self) -> Vec<Item> {
252        if !self.path.exists() {
253            return Vec::new();
254        }
255        match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
256            Ok(items) => items,
257            Err(_) => Vec::new(),
258        }
259    }
260
261    /// Find an item by ID.
262    pub fn find(&self, id: &str) -> Option<Item> {
263        self.all().into_iter().find(|item| item.id == id)
264    }
265
266    /// Return IDs for all non-closed items currently in the queue.
267    pub fn open_ids(&self) -> HashSet<String> {
268        open_ids_for_items(&self.all())
269    }
270
271    /// Apply computed/display status semantics to a single item.
272    pub fn item_with_computed_status(&self, item: Item) -> Item {
273        let open_ids = self.open_ids();
274        item.with_computed_status(Some(&open_ids))
275    }
276
277    /// Apply computed/display status semantics to a collection of items.
278    pub fn items_with_computed_status(&self, items: Vec<Item>) -> Vec<Item> {
279        let open_ids = self.open_ids();
280        items
281            .into_iter()
282            .map(|item| item.with_computed_status(Some(&open_ids)))
283            .collect()
284    }
285
286    /// Load all items and apply computed/display status semantics.
287    pub fn all_with_computed_status(&self) -> Vec<Item> {
288        let items = self.all();
289        let open_ids = open_ids_for_items(&items);
290        items
291            .into_iter()
292            .map(|item| item.with_computed_status(Some(&open_ids)))
293            .collect()
294    }
295
296    /// Find an item by ID and apply computed/display status semantics.
297    pub fn find_with_computed_status(&self, id: &str) -> Option<Item> {
298        let items = self.all();
299        let open_ids = open_ids_for_items(&items);
300        items
301            .into_iter()
302            .find(|item| item.id == id)
303            .map(|item| item.with_computed_status(Some(&open_ids)))
304    }
305
306    /// Filter items by persisted lifecycle status (optional).
307    ///
308    /// This method intentionally does not apply computed/display status
309    /// semantics. Callers that want view-oriented filtering (for example,
310    /// treating `blocked` as a visible status in list/show output) should load
311    /// items, call `with_computed_status(...)`, and then filter those results.
312    pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
313        let items = self.all();
314        match status {
315            Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
316            None => items,
317        }
318    }
319
320    /// Return pending items that are not blocked by any non-closed item.
321    pub fn ready(&self) -> Vec<Item> {
322        let items = self.all();
323        let open_ids: HashSet<String> = items
324            .iter()
325            .filter(|i| i.status != "closed")
326            .map(|i| i.id.clone())
327            .collect();
328        items
329            .into_iter()
330            .filter(|item| item.ready(Some(&open_ids)))
331            .collect()
332    }
333
334    /// Update an item by ID. Returns the updated Item or None.
335    pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
336        self.with_exclusive_lock(|f| {
337            let mut items = read_items(f, &self.path);
338            let index = items.iter().position(|item| item.id == id);
339            let index = match index {
340                Some(i) => i,
341                None => return Ok(None),
342            };
343
344            let original = items[index].clone();
345            let item = &mut items[index];
346
347            if let Some(status) = &attrs.status {
348                if !VALID_STATUSES.contains(&status.as_str()) {
349                    anyhow::bail!(
350                        "Invalid status: {}. Valid: {}",
351                        status,
352                        VALID_STATUSES.join(", ")
353                    );
354                }
355                item.status = status.clone();
356            }
357            if let Some(title) = attrs.title {
358                item.title = Some(title);
359            }
360            if let Some(description) = attrs.description {
361                item.description = Some(description);
362            }
363            if let Some(priority) = attrs.priority {
364                if let Some(value) = priority {
365                    validate_priority(value)?;
366                }
367                item.priority = priority;
368            }
369            if let Some(metadata) = attrs.metadata {
370                item.metadata = metadata;
371            }
372            if let Some(blocked_by) = attrs.blocked_by {
373                validate_blocked_by(id, &blocked_by)?;
374                item.blocked_by = blocked_by;
375            }
376            if let Some(sources) = attrs.sources {
377                self.validate_source_types(&sources)?;
378                item.sources = sources;
379            }
380
381            if *item == original {
382                return Ok(Some(original));
383            }
384
385            item.updated_at = now_iso8601();
386
387            let updated = item.clone();
388            rewrite_items(f, &items)?;
389            Ok(Some(updated))
390        })
391    }
392
393    /// Remove an item by ID. Returns the removed Item or None.
394    pub fn remove(&self, id: &str) -> Result<Option<Item>> {
395        self.with_exclusive_lock(|f| {
396            let mut items = read_items(f, &self.path);
397            let index = items.iter().position(|item| item.id == id);
398            match index {
399                Some(i) => {
400                    let removed = items.remove(i);
401                    rewrite_items(f, &items)?;
402                    Ok(Some(removed))
403                }
404                None => Ok(None),
405            }
406        })
407    }
408
409    // ── Private helpers ─────────────────────────────────────────────────
410
411    fn validate_new_item(&self, item: &NewItem) -> Result<()> {
412        if let Some(priority) = item.priority {
413            validate_priority(priority)?;
414        }
415
416        if item.sources.is_empty() && item.title.is_none() && item.description.is_none() {
417            anyhow::bail!("Item requires at least one source, title, or description");
418        }
419
420        self.validate_source_types(&item.sources)
421    }
422
423    fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
424        for source in sources {
425            if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
426                anyhow::bail!(
427                    "Invalid source type: {}. Valid: {}",
428                    source.type_,
429                    VALID_SOURCE_TYPES.join(", ")
430                );
431            }
432        }
433        Ok(())
434    }
435
436    fn ensure_directory(&self) -> Result<()> {
437        if let Some(parent) = self.path.parent() {
438            if !parent.exists() {
439                fs::create_dir_all(parent)?;
440            }
441        }
442        Ok(())
443    }
444
445    fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
446    where
447        F: FnOnce(&mut File) -> Result<T>,
448    {
449        self.ensure_directory()?;
450        let mut file = OpenOptions::new()
451            .read(true)
452            .write(true)
453            .create(true)
454            .truncate(false)
455            .open(&self.path)
456            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
457
458        flock(&file.as_fd(), FlockOperation::LockExclusive)
459            .with_context(|| "Failed to acquire exclusive lock")?;
460
461        let result = f(&mut file);
462
463        // Lock released when file is dropped
464        result
465    }
466
467    fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
468    where
469        F: FnOnce(&mut File) -> Result<T>,
470    {
471        let mut file = File::open(&self.path)
472            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
473
474        flock(&file.as_fd(), FlockOperation::LockShared)
475            .with_context(|| "Failed to acquire shared lock")?;
476
477        f(&mut file)
478    }
479}
480
481fn open_ids_for_items(items: &[Item]) -> HashSet<String> {
482    items
483        .iter()
484        .filter(|item| item.status != "closed")
485        .map(|item| item.id.clone())
486        .collect()
487}
488
489/// Read items from an open file, skipping corrupt lines.
490fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
491    file.seek(std::io::SeekFrom::Start(0)).ok();
492    let reader = BufReader::new(file);
493    let mut items = Vec::new();
494
495    for (line_num, line) in reader.lines().enumerate() {
496        let line = match line {
497            Ok(l) => l,
498            Err(_) => continue,
499        };
500        let trimmed = line.trim();
501        if trimmed.is_empty() {
502            continue;
503        }
504        match serde_json::from_str::<Item>(trimmed) {
505            Ok(item) => items.push(item),
506            Err(e) => {
507                eprintln!(
508                    "Warning: Skipping corrupt line {} in {}: {}",
509                    line_num + 1,
510                    path.display(),
511                    e
512                );
513            }
514        }
515    }
516    items
517}
518
519/// Truncate and rewrite all items to the file.
520fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
521    file.seek(std::io::SeekFrom::Start(0))?;
522    file.set_len(0)?;
523    for item in items {
524        writeln!(file, "{}", item.to_json_string())?;
525    }
526    file.flush()?;
527    Ok(())
528}
529
530/// Generate a 3-char alphanumeric ID that doesn't collide with existing.
531fn generate_id(existing_ids: &HashSet<String>) -> String {
532    let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
533    let mut rng = rand::thread_rng();
534    loop {
535        let id: String = (0..3)
536            .map(|_| chars[rng.gen_range(0..chars.len())])
537            .collect();
538        if !existing_ids.contains(&id) {
539            return id;
540        }
541    }
542}
543
544/// Current UTC time in ISO 8601 with millisecond precision.
545fn now_iso8601() -> String {
546    chrono::Utc::now()
547        .format("%Y-%m-%dT%H:%M:%S%.3fZ")
548        .to_string()
549}
550
551fn validate_priority(priority: u8) -> Result<()> {
552    if VALID_PRIORITY_RANGE.contains(&priority) {
553        Ok(())
554    } else {
555        anyhow::bail!("Invalid priority: {}. Valid: 0-4", priority);
556    }
557}
558
559fn validate_blocked_by(item_id: &str, blocked_by: &[String]) -> Result<()> {
560    if blocked_by.iter().any(|blocker_id| blocker_id == item_id) {
561        anyhow::bail!("Item cannot block itself: {}", item_id);
562    }
563    Ok(())
564}
565
566/// Attributes for updating an item.
567#[derive(Default)]
568pub struct UpdateAttrs {
569    pub status: Option<String>,
570    pub title: Option<String>,
571    pub description: Option<String>,
572    pub priority: Option<Option<u8>>,
573    pub metadata: Option<serde_json::Value>,
574    pub blocked_by: Option<Vec<String>>,
575    pub sources: Option<Vec<Source>>,
576}