Skip to main content

sift_queue/queue/
mod.rs

1use anyhow::{Context, Result};
2use rustix::fs::{flock, FlockOperation};
3use serde::{Deserialize, Serialize};
4use std::collections::HashSet;
5use std::fs::{self, File, OpenOptions};
6use std::io::{BufRead, BufReader, Seek, Write};
7use std::os::fd::AsFd;
8use std::path::{Path, PathBuf};
9
10/// Valid persisted status values for queue items.
11pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
12
13/// Valid display status values surfaced by list/show output.
14pub const VALID_DISPLAY_STATUSES: &[&str] = &["pending", "blocked", "in_progress", "closed"];
15
16/// Valid source types accepted by `push` (used for validation on add).
17pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
18
19/// Valid priority range accepted by first-class priority fields.
20pub const VALID_PRIORITY_RANGE: std::ops::RangeInclusive<u8> = 0..=4;
21
22const ID_ALPHABET: [char; 32] = [
23    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j',
24    'k', 'm', 'n', 'p', 'q', 'r', 's', 't', 'v', 'w', 'x', 'y', 'z',
25];
26const MIN_GENERATED_ID_LENGTH: usize = 3;
27// 10% is intentional for now; reduce this if collision retries become too frequent in production.
28const ID_SPACE_OCCUPANCY_THRESHOLD_PERCENT: usize = 10;
29
30// ── Types ───────────────────────────────────────────────────────────────────
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
33pub struct Item {
34    pub id: String,
35
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub title: Option<String>,
38
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub description: Option<String>,
41
42    pub status: String,
43
44    #[serde(skip_serializing_if = "Option::is_none")]
45    #[serde(default)]
46    pub priority: Option<u8>,
47
48    pub sources: Vec<Source>,
49
50    pub metadata: serde_json::Value,
51
52    pub created_at: String,
53    pub updated_at: String,
54
55    #[serde(skip_serializing_if = "is_empty_vec")]
56    #[serde(default)]
57    pub blocked_by: Vec<String>,
58
59    #[serde(skip_serializing_if = "is_empty_json_vec")]
60    #[serde(default)]
61    pub errors: Vec<serde_json::Value>,
62}
63
64fn is_empty_vec(v: &[String]) -> bool {
65    v.is_empty()
66}
67
68fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
69    v.is_empty()
70}
71
72/// Serialize items through serde so the struct definition is the source of
73/// truth for JSON output.
74///
75/// Note: this serializes the item's current `status` field verbatim. Callers
76/// that want display/view semantics (for example, surfacing `blocked` as a
77/// computed status in read-oriented output) must first call
78/// `with_computed_status(...)`.
79impl Item {
80    pub fn to_json_value(&self) -> serde_json::Value {
81        serde_json::to_value(self).expect("item serialization should succeed")
82    }
83
84    pub fn to_json_string(&self) -> String {
85        serde_json::to_string(self).expect("item serialization should succeed")
86    }
87
88    pub fn pending(&self) -> bool {
89        self.status == "pending"
90    }
91
92    pub fn blocked(&self) -> bool {
93        !self.blocked_by.is_empty()
94    }
95
96    /// Compute the visible status for read-oriented views.
97    ///
98    /// Persisted lifecycle status remains `pending|in_progress|closed`, but
99    /// pending items with open blockers are surfaced as `blocked` in display
100    /// output.
101    pub fn computed_status(&self, open_ids: Option<&HashSet<String>>) -> String {
102        if !self.pending() || !self.blocked() {
103            return self.status.clone();
104        }
105
106        match open_ids {
107            None => "blocked".to_string(),
108            Some(ids) => {
109                if self.blocked_by.iter().any(|id| ids.contains(id)) {
110                    "blocked".to_string()
111                } else {
112                    self.status.clone()
113                }
114            }
115        }
116    }
117
118    pub fn with_computed_status(&self, open_ids: Option<&HashSet<String>>) -> Self {
119        let mut item = self.clone();
120        item.status = self.computed_status(open_ids);
121        item
122    }
123
124    pub fn ready(&self, open_ids: Option<&HashSet<String>>) -> bool {
125        if !self.pending() {
126            return false;
127        }
128        if !self.blocked() {
129            return true;
130        }
131        match open_ids {
132            None => true,
133            Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
134        }
135    }
136}
137
138pub fn parse_priority_value(input: &str) -> Result<u8> {
139    let trimmed = input.trim();
140
141    let priority = trimmed
142        .parse::<u8>()
143        .with_context(|| format!("Invalid priority: {input}. Valid: 0-4"))?;
144
145    if !VALID_PRIORITY_RANGE.contains(&priority) {
146        anyhow::bail!("Invalid priority: {input}. Valid: 0-4");
147    }
148
149    Ok(priority)
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
153pub struct Source {
154    #[serde(rename = "type")]
155    pub type_: String,
156
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub path: Option<String>,
159
160    #[serde(skip_serializing_if = "Option::is_none")]
161    pub content: Option<String>,
162}
163
164impl Source {
165    pub fn to_json_value(&self) -> serde_json::Value {
166        serde_json::to_value(self).expect("source serialization should succeed")
167    }
168}
169
170// ── Queue ───────────────────────────────────────────────────────────────────
171
172pub struct Queue {
173    pub path: PathBuf,
174}
175
176pub struct NewItem {
177    pub sources: Vec<Source>,
178    pub title: Option<String>,
179    pub description: Option<String>,
180    pub priority: Option<u8>,
181    pub metadata: serde_json::Value,
182    pub blocked_by: Vec<String>,
183}
184
185impl Queue {
186    pub fn new(path: PathBuf) -> Self {
187        Self { path }
188    }
189
190    /// Add a new item to the queue. Returns the created Item.
191    pub fn push(
192        &self,
193        sources: Vec<Source>,
194        title: Option<String>,
195        description: Option<String>,
196        priority: Option<u8>,
197        metadata: serde_json::Value,
198        blocked_by: Vec<String>,
199    ) -> Result<Item> {
200        let new_item = NewItem {
201            sources,
202            title,
203            description,
204            priority,
205            metadata,
206            blocked_by,
207        };
208
209        let mut items = self.push_many_with_description(vec![new_item])?;
210        Ok(items.remove(0))
211    }
212
213    /// Add many new items to the queue under a single lock.
214    pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
215        if items.is_empty() {
216            anyhow::bail!("At least one item is required");
217        }
218
219        for item in &items {
220            self.validate_new_item(item)?;
221        }
222
223        self.with_exclusive_lock(|f| {
224            let existing = read_items(f, &self.path);
225            let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
226            let now = now_iso8601();
227            let mut created = Vec::with_capacity(items.len());
228
229            f.seek(std::io::SeekFrom::End(0))?;
230            for new_item in items {
231                let id = generate_id(&existing_ids);
232                existing_ids.insert(id.clone());
233
234                let item = Item {
235                    id,
236                    title: new_item.title,
237                    description: new_item.description,
238                    status: "pending".to_string(),
239                    priority: new_item.priority,
240                    sources: new_item.sources,
241                    metadata: new_item.metadata,
242                    created_at: now.clone(),
243                    updated_at: now.clone(),
244                    blocked_by: new_item.blocked_by,
245                    errors: Vec::new(),
246                };
247
248                writeln!(f, "{}", item.to_json_string())?;
249                created.push(item);
250            }
251            f.flush()?;
252
253            Ok(created)
254        })
255    }
256
257    /// Get all items, skipping corrupt lines with warnings.
258    pub fn all(&self) -> Vec<Item> {
259        if !self.path.exists() {
260            return Vec::new();
261        }
262        match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
263            Ok(items) => items,
264            Err(_) => Vec::new(),
265        }
266    }
267
268    /// Find an item by ID.
269    pub fn find(&self, id: &str) -> Option<Item> {
270        self.all().into_iter().find(|item| item.id == id)
271    }
272
273    /// Return IDs for all non-closed items currently in the queue.
274    pub fn open_ids(&self) -> HashSet<String> {
275        open_ids_for_items(&self.all())
276    }
277
278    /// Apply computed/display status semantics to a single item.
279    pub fn item_with_computed_status(&self, item: Item) -> Item {
280        let open_ids = self.open_ids();
281        item.with_computed_status(Some(&open_ids))
282    }
283
284    /// Apply computed/display status semantics to a collection of items.
285    pub fn items_with_computed_status(&self, items: Vec<Item>) -> Vec<Item> {
286        let open_ids = self.open_ids();
287        items
288            .into_iter()
289            .map(|item| item.with_computed_status(Some(&open_ids)))
290            .collect()
291    }
292
293    /// Load all items and apply computed/display status semantics.
294    pub fn all_with_computed_status(&self) -> Vec<Item> {
295        let items = self.all();
296        let open_ids = open_ids_for_items(&items);
297        items
298            .into_iter()
299            .map(|item| item.with_computed_status(Some(&open_ids)))
300            .collect()
301    }
302
303    /// Find an item by ID and apply computed/display status semantics.
304    pub fn find_with_computed_status(&self, id: &str) -> Option<Item> {
305        let items = self.all();
306        let open_ids = open_ids_for_items(&items);
307        items
308            .into_iter()
309            .find(|item| item.id == id)
310            .map(|item| item.with_computed_status(Some(&open_ids)))
311    }
312
313    /// Filter items by persisted lifecycle status (optional).
314    ///
315    /// This method intentionally does not apply computed/display status
316    /// semantics. Callers that want view-oriented filtering (for example,
317    /// treating `blocked` as a visible status in list/show output) should load
318    /// items, call `with_computed_status(...)`, and then filter those results.
319    pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
320        let items = self.all();
321        match status {
322            Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
323            None => items,
324        }
325    }
326
327    /// Return pending items that are not blocked by any non-closed item.
328    pub fn ready(&self) -> Vec<Item> {
329        let items = self.all();
330        let open_ids: HashSet<String> = items
331            .iter()
332            .filter(|i| i.status != "closed")
333            .map(|i| i.id.clone())
334            .collect();
335        items
336            .into_iter()
337            .filter(|item| item.ready(Some(&open_ids)))
338            .collect()
339    }
340
341    /// Update an item by ID. Returns the updated Item or None.
342    pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
343        self.with_exclusive_lock(|f| {
344            let mut items = read_items(f, &self.path);
345            let index = items.iter().position(|item| item.id == id);
346            let index = match index {
347                Some(i) => i,
348                None => return Ok(None),
349            };
350
351            let original = items[index].clone();
352            let item = &mut items[index];
353
354            if let Some(status) = &attrs.status {
355                if !VALID_STATUSES.contains(&status.as_str()) {
356                    anyhow::bail!(
357                        "Invalid status: {}. Valid: {}",
358                        status,
359                        VALID_STATUSES.join(", ")
360                    );
361                }
362                item.status = status.clone();
363            }
364            if let Some(title) = attrs.title {
365                item.title = Some(title);
366            }
367            if let Some(description) = attrs.description {
368                item.description = Some(description);
369            }
370            if let Some(priority) = attrs.priority {
371                if let Some(value) = priority {
372                    validate_priority(value)?;
373                }
374                item.priority = priority;
375            }
376            if let Some(metadata) = attrs.metadata {
377                item.metadata = metadata;
378            }
379            if let Some(blocked_by) = attrs.blocked_by {
380                validate_blocked_by(id, &blocked_by)?;
381                item.blocked_by = blocked_by;
382            }
383            if let Some(sources) = attrs.sources {
384                self.validate_source_types(&sources)?;
385                item.sources = sources;
386            }
387
388            if *item == original {
389                return Ok(Some(original));
390            }
391
392            item.updated_at = now_iso8601();
393
394            let updated = item.clone();
395            rewrite_items(f, &items)?;
396            Ok(Some(updated))
397        })
398    }
399
400    /// Remove an item by ID. Returns the removed Item or None.
401    pub fn remove(&self, id: &str) -> Result<Option<Item>> {
402        self.with_exclusive_lock(|f| {
403            let mut items = read_items(f, &self.path);
404            let index = items.iter().position(|item| item.id == id);
405            match index {
406                Some(i) => {
407                    let removed = items.remove(i);
408                    rewrite_items(f, &items)?;
409                    Ok(Some(removed))
410                }
411                None => Ok(None),
412            }
413        })
414    }
415
416    // ── Private helpers ─────────────────────────────────────────────────
417
418    fn validate_new_item(&self, item: &NewItem) -> Result<()> {
419        if let Some(priority) = item.priority {
420            validate_priority(priority)?;
421        }
422
423        if item.sources.is_empty() && item.title.is_none() && item.description.is_none() {
424            anyhow::bail!("Item requires at least one source, title, or description");
425        }
426
427        self.validate_source_types(&item.sources)
428    }
429
430    fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
431        for source in sources {
432            if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
433                anyhow::bail!(
434                    "Invalid source type: {}. Valid: {}",
435                    source.type_,
436                    VALID_SOURCE_TYPES.join(", ")
437                );
438            }
439        }
440        Ok(())
441    }
442
443    fn ensure_directory(&self) -> Result<()> {
444        if let Some(parent) = self.path.parent() {
445            if !parent.exists() {
446                fs::create_dir_all(parent)?;
447            }
448        }
449        Ok(())
450    }
451
452    fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
453    where
454        F: FnOnce(&mut File) -> Result<T>,
455    {
456        self.ensure_directory()?;
457        let mut file = OpenOptions::new()
458            .read(true)
459            .write(true)
460            .create(true)
461            .truncate(false)
462            .open(&self.path)
463            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
464
465        flock(&file.as_fd(), FlockOperation::LockExclusive)
466            .with_context(|| "Failed to acquire exclusive lock")?;
467
468        let result = f(&mut file);
469
470        // Lock released when file is dropped
471        result
472    }
473
474    fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
475    where
476        F: FnOnce(&mut File) -> Result<T>,
477    {
478        let mut file = File::open(&self.path)
479            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
480
481        flock(&file.as_fd(), FlockOperation::LockShared)
482            .with_context(|| "Failed to acquire shared lock")?;
483
484        f(&mut file)
485    }
486}
487
488fn open_ids_for_items(items: &[Item]) -> HashSet<String> {
489    items
490        .iter()
491        .filter(|item| item.status != "closed")
492        .map(|item| item.id.clone())
493        .collect()
494}
495
496/// Read items from an open file, skipping corrupt lines.
497fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
498    file.seek(std::io::SeekFrom::Start(0)).ok();
499    let reader = BufReader::new(file);
500    let mut items = Vec::new();
501
502    for (line_num, line) in reader.lines().enumerate() {
503        let line = match line {
504            Ok(l) => l,
505            Err(_) => continue,
506        };
507        let trimmed = line.trim();
508        if trimmed.is_empty() {
509            continue;
510        }
511        match serde_json::from_str::<Item>(trimmed) {
512            Ok(item) => items.push(item),
513            Err(e) => {
514                eprintln!(
515                    "Warning: Skipping corrupt line {} in {}: {}",
516                    line_num + 1,
517                    path.display(),
518                    e
519                );
520            }
521        }
522    }
523    items
524}
525
526/// Truncate and rewrite all items to the file.
527fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
528    file.seek(std::io::SeekFrom::Start(0))?;
529    file.set_len(0)?;
530    for item in items {
531        writeln!(file, "{}", item.to_json_string())?;
532    }
533    file.flush()?;
534    Ok(())
535}
536
537/// Generate a random lowercase ID using adaptive length and retry on collision.
538fn generate_id(existing_ids: &HashSet<String>) -> String {
539    let length = generated_id_length(existing_ids.len());
540
541    loop {
542        let id = nanoid::nanoid!(length, &ID_ALPHABET);
543        if !existing_ids.contains(&id) {
544            return id;
545        }
546    }
547}
548
549fn generated_id_length(active_or_reserved_ids: usize) -> usize {
550    let mut length = MIN_GENERATED_ID_LENGTH;
551
552    loop {
553        let total_space = id_space_for_length(length);
554        let occupancy_threshold = total_space * ID_SPACE_OCCUPANCY_THRESHOLD_PERCENT / 100;
555
556        if active_or_reserved_ids < occupancy_threshold {
557            return length;
558        }
559
560        length += 1;
561    }
562}
563
564fn id_space_for_length(length: usize) -> usize {
565    ID_ALPHABET
566        .len()
567        .checked_pow(length as u32)
568        .expect("generated ID space should fit in usize")
569}
570
571/// Current UTC time in ISO 8601 with millisecond precision.
572fn now_iso8601() -> String {
573    chrono::Utc::now()
574        .format("%Y-%m-%dT%H:%M:%S%.3fZ")
575        .to_string()
576}
577
578fn validate_priority(priority: u8) -> Result<()> {
579    if VALID_PRIORITY_RANGE.contains(&priority) {
580        Ok(())
581    } else {
582        anyhow::bail!("Invalid priority: {}. Valid: 0-4", priority);
583    }
584}
585
586fn validate_blocked_by(item_id: &str, blocked_by: &[String]) -> Result<()> {
587    if blocked_by.iter().any(|blocker_id| blocker_id == item_id) {
588        anyhow::bail!("Item cannot block itself: {}", item_id);
589    }
590    Ok(())
591}
592
593/// Attributes for updating an item.
594#[derive(Default)]
595pub struct UpdateAttrs {
596    pub status: Option<String>,
597    pub title: Option<String>,
598    pub description: Option<String>,
599    pub priority: Option<Option<u8>>,
600    pub metadata: Option<serde_json::Value>,
601    pub blocked_by: Option<Vec<String>>,
602    pub sources: Option<Vec<Source>>,
603}
604
605#[cfg(test)]
606mod tests {
607    use super::{generated_id_length, id_space_for_length};
608
609    #[test]
610    fn id_space_matches_base32_lengths() {
611        assert_eq!(id_space_for_length(3), 32_768);
612        assert_eq!(id_space_for_length(4), 1_048_576);
613        assert_eq!(id_space_for_length(5), 33_554_432);
614        assert_eq!(id_space_for_length(6), 1_073_741_824);
615    }
616
617    #[test]
618    fn generated_ids_start_at_three_characters() {
619        assert_eq!(generated_id_length(0), 3);
620        assert_eq!(generated_id_length(3_275), 3);
621    }
622
623    #[test]
624    fn generated_id_length_grows_at_documented_thresholds() {
625        assert_eq!(generated_id_length(3_276), 4);
626        assert_eq!(generated_id_length(104_857), 5);
627        assert_eq!(generated_id_length(3_355_443), 6);
628        assert_eq!(generated_id_length(107_374_182), 7);
629    }
630}