Skip to main content

sift_queue/queue/
mod.rs

1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11/// Valid status values for queue items.
12pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14/// Valid source types accepted by `push` (used for validation on add).
15pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17/// Valid priority range accepted by first-class priority fields.
18pub const VALID_PRIORITY_RANGE: std::ops::RangeInclusive<u8> = 0..=4;
19
20// ── Types ───────────────────────────────────────────────────────────────────
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
23pub struct Item {
24    pub id: String,
25
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub title: Option<String>,
28
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub description: Option<String>,
31
32    pub status: String,
33
34    #[serde(skip_serializing_if = "Option::is_none")]
35    #[serde(default)]
36    pub priority: Option<u8>,
37
38    pub sources: Vec<Source>,
39
40    pub metadata: serde_json::Value,
41
42    pub created_at: String,
43    pub updated_at: String,
44
45    #[serde(skip_serializing_if = "is_empty_vec")]
46    #[serde(default)]
47    pub blocked_by: Vec<String>,
48
49    #[serde(skip_serializing_if = "is_empty_json_vec")]
50    #[serde(default)]
51    pub errors: Vec<serde_json::Value>,
52}
53
54fn is_empty_vec(v: &[String]) -> bool {
55    v.is_empty()
56}
57
58fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
59    v.is_empty()
60}
61
62/// Serialize items through serde so the struct definition is the source of
63/// truth for JSON output.
64impl Item {
65    pub fn to_json_value(&self) -> serde_json::Value {
66        serde_json::to_value(self).expect("item serialization should succeed")
67    }
68
69    pub fn to_json_string(&self) -> String {
70        serde_json::to_string(self).expect("item serialization should succeed")
71    }
72
73    pub fn pending(&self) -> bool {
74        self.status == "pending"
75    }
76
77    pub fn blocked(&self) -> bool {
78        !self.blocked_by.is_empty()
79    }
80
81    pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
82        if !self.pending() {
83            return false;
84        }
85        if !self.blocked() {
86            return true;
87        }
88        match pending_ids {
89            None => true,
90            Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
91        }
92    }
93}
94
95pub fn parse_priority_value(input: &str) -> Result<u8> {
96    let trimmed = input.trim();
97
98    let priority = trimmed
99        .parse::<u8>()
100        .with_context(|| format!("Invalid priority: {input}. Valid: 0-4"))?;
101
102    if !VALID_PRIORITY_RANGE.contains(&priority) {
103        anyhow::bail!("Invalid priority: {input}. Valid: 0-4");
104    }
105
106    Ok(priority)
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
110pub struct Source {
111    #[serde(rename = "type")]
112    pub type_: String,
113
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub path: Option<String>,
116
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub content: Option<String>,
119}
120
121impl Source {
122    pub fn to_json_value(&self) -> serde_json::Value {
123        serde_json::to_value(self).expect("source serialization should succeed")
124    }
125}
126
127// ── Queue ───────────────────────────────────────────────────────────────────
128
129pub struct Queue {
130    pub path: PathBuf,
131}
132
133pub struct NewItem {
134    pub sources: Vec<Source>,
135    pub title: Option<String>,
136    pub description: Option<String>,
137    pub priority: Option<u8>,
138    pub metadata: serde_json::Value,
139    pub blocked_by: Vec<String>,
140}
141
142impl Queue {
143    pub fn new(path: PathBuf) -> Self {
144        Self { path }
145    }
146
147    /// Add a new item to the queue. Returns the created Item.
148    pub fn push(
149        &self,
150        sources: Vec<Source>,
151        title: Option<String>,
152        priority: Option<u8>,
153        metadata: serde_json::Value,
154        blocked_by: Vec<String>,
155    ) -> Result<Item> {
156        self.validate_sources(&sources)?;
157        self.push_with_description(sources, title, None, priority, metadata, blocked_by)
158    }
159
160    /// Add a new item to the queue with description support.
161    pub fn push_with_description(
162        &self,
163        sources: Vec<Source>,
164        title: Option<String>,
165        description: Option<String>,
166        priority: Option<u8>,
167        metadata: serde_json::Value,
168        blocked_by: Vec<String>,
169    ) -> Result<Item> {
170        let new_item = NewItem {
171            sources,
172            title,
173            description,
174            priority,
175            metadata,
176            blocked_by,
177        };
178
179        let mut items = self.push_many_with_description(vec![new_item])?;
180        Ok(items.remove(0))
181    }
182
183    /// Add many new items to the queue under a single lock.
184    pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
185        if items.is_empty() {
186            anyhow::bail!("At least one item is required");
187        }
188
189        for item in &items {
190            self.validate_new_item(item)?;
191        }
192
193        self.with_exclusive_lock(|f| {
194            let existing = read_items(f, &self.path);
195            let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
196            let now = now_iso8601();
197            let mut created = Vec::with_capacity(items.len());
198
199            f.seek(std::io::SeekFrom::End(0))?;
200            for new_item in items {
201                let id = generate_id(&existing_ids);
202                existing_ids.insert(id.clone());
203
204                let item = Item {
205                    id,
206                    title: new_item.title,
207                    description: new_item.description,
208                    status: "pending".to_string(),
209                    priority: new_item.priority,
210                    sources: new_item.sources,
211                    metadata: new_item.metadata,
212                    created_at: now.clone(),
213                    updated_at: now.clone(),
214                    blocked_by: new_item.blocked_by,
215                    errors: Vec::new(),
216                };
217
218                writeln!(f, "{}", item.to_json_string())?;
219                created.push(item);
220            }
221            f.flush()?;
222
223            Ok(created)
224        })
225    }
226
227    /// Get all items, skipping corrupt lines with warnings.
228    pub fn all(&self) -> Vec<Item> {
229        if !self.path.exists() {
230            return Vec::new();
231        }
232        match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
233            Ok(items) => items,
234            Err(_) => Vec::new(),
235        }
236    }
237
238    /// Find an item by ID.
239    pub fn find(&self, id: &str) -> Option<Item> {
240        self.all().into_iter().find(|item| item.id == id)
241    }
242
243    /// Filter items by status (optional).
244    pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
245        let items = self.all();
246        match status {
247            Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
248            None => items,
249        }
250    }
251
252    /// Return pending items that are not blocked by any pending item.
253    pub fn ready(&self) -> Vec<Item> {
254        let items = self.all();
255        let pending_ids: HashSet<String> = items
256            .iter()
257            .filter(|i| i.pending())
258            .map(|i| i.id.clone())
259            .collect();
260        items
261            .into_iter()
262            .filter(|item| item.ready(Some(&pending_ids)))
263            .collect()
264    }
265
266    /// Update an item by ID. Returns the updated Item or None.
267    pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
268        self.with_exclusive_lock(|f| {
269            let mut items = read_items(f, &self.path);
270            let index = items.iter().position(|item| item.id == id);
271            let index = match index {
272                Some(i) => i,
273                None => return Ok(None),
274            };
275
276            let item = &mut items[index];
277
278            if let Some(status) = &attrs.status {
279                if !VALID_STATUSES.contains(&status.as_str()) {
280                    anyhow::bail!(
281                        "Invalid status: {}. Valid: {}",
282                        status,
283                        VALID_STATUSES.join(", ")
284                    );
285                }
286                item.status = status.clone();
287            }
288            if let Some(title) = attrs.title {
289                item.title = Some(title);
290            }
291            if let Some(description) = attrs.description {
292                item.description = Some(description);
293            }
294            if let Some(priority) = attrs.priority {
295                if let Some(value) = priority {
296                    validate_priority(value)?;
297                }
298                item.priority = priority;
299            }
300            if let Some(metadata) = attrs.metadata {
301                item.metadata = metadata;
302            }
303            if let Some(blocked_by) = attrs.blocked_by {
304                item.blocked_by = blocked_by;
305            }
306            if let Some(sources) = attrs.sources {
307                item.sources = sources;
308            }
309
310            item.updated_at = now_iso8601();
311
312            let updated = item.clone();
313            rewrite_items(f, &items)?;
314            Ok(Some(updated))
315        })
316    }
317
318    /// Remove an item by ID. Returns the removed Item or None.
319    pub fn remove(&self, id: &str) -> Result<Option<Item>> {
320        self.with_exclusive_lock(|f| {
321            let mut items = read_items(f, &self.path);
322            let index = items.iter().position(|item| item.id == id);
323            match index {
324                Some(i) => {
325                    let removed = items.remove(i);
326                    rewrite_items(f, &items)?;
327                    Ok(Some(removed))
328                }
329                None => Ok(None),
330            }
331        })
332    }
333
334    // ── Private helpers ─────────────────────────────────────────────────
335
336    fn validate_sources(&self, sources: &[Source]) -> Result<()> {
337        if sources.is_empty() {
338            anyhow::bail!("Sources cannot be empty");
339        }
340        self.validate_source_types(sources)
341    }
342
343    fn validate_new_item(&self, item: &NewItem) -> Result<()> {
344        if let Some(priority) = item.priority {
345            validate_priority(priority)?;
346        }
347
348        if item.sources.is_empty() && item.title.is_none() && item.description.is_none() {
349            anyhow::bail!("Item requires at least one source, title, or description");
350        }
351
352        self.validate_source_types(&item.sources)
353    }
354
355    fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
356        for source in sources {
357            if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
358                anyhow::bail!(
359                    "Invalid source type: {}. Valid: {}",
360                    source.type_,
361                    VALID_SOURCE_TYPES.join(", ")
362                );
363            }
364        }
365        Ok(())
366    }
367
368    fn ensure_directory(&self) -> Result<()> {
369        if let Some(parent) = self.path.parent() {
370            if !parent.exists() {
371                fs::create_dir_all(parent)?;
372            }
373        }
374        Ok(())
375    }
376
377    fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
378    where
379        F: FnOnce(&mut File) -> Result<T>,
380    {
381        self.ensure_directory()?;
382        let mut file = OpenOptions::new()
383            .read(true)
384            .write(true)
385            .create(true)
386            .truncate(false)
387            .open(&self.path)
388            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
389
390        flock(&file.as_fd(), FlockOperation::LockExclusive)
391            .with_context(|| "Failed to acquire exclusive lock")?;
392
393        let result = f(&mut file);
394
395        // Lock released when file is dropped
396        result
397    }
398
399    fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
400    where
401        F: FnOnce(&mut File) -> Result<T>,
402    {
403        let mut file = File::open(&self.path)
404            .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
405
406        flock(&file.as_fd(), FlockOperation::LockShared)
407            .with_context(|| "Failed to acquire shared lock")?;
408
409        f(&mut file)
410    }
411}
412
413/// Read items from an open file, skipping corrupt lines.
414fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
415    file.seek(std::io::SeekFrom::Start(0)).ok();
416    let reader = BufReader::new(file);
417    let mut items = Vec::new();
418
419    for (line_num, line) in reader.lines().enumerate() {
420        let line = match line {
421            Ok(l) => l,
422            Err(_) => continue,
423        };
424        let trimmed = line.trim();
425        if trimmed.is_empty() {
426            continue;
427        }
428        match serde_json::from_str::<Item>(trimmed) {
429            Ok(item) => items.push(item),
430            Err(e) => {
431                eprintln!(
432                    "Warning: Skipping corrupt line {} in {}: {}",
433                    line_num + 1,
434                    path.display(),
435                    e
436                );
437            }
438        }
439    }
440    items
441}
442
443/// Truncate and rewrite all items to the file.
444fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
445    file.seek(std::io::SeekFrom::Start(0))?;
446    file.set_len(0)?;
447    for item in items {
448        writeln!(file, "{}", item.to_json_string())?;
449    }
450    file.flush()?;
451    Ok(())
452}
453
454/// Generate a 3-char alphanumeric ID that doesn't collide with existing.
455fn generate_id(existing_ids: &HashSet<String>) -> String {
456    let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
457    let mut rng = rand::thread_rng();
458    loop {
459        let id: String = (0..3)
460            .map(|_| chars[rng.gen_range(0..chars.len())])
461            .collect();
462        if !existing_ids.contains(&id) {
463            return id;
464        }
465    }
466}
467
468/// Current UTC time in ISO 8601 with millisecond precision.
469fn now_iso8601() -> String {
470    chrono::Utc::now()
471        .format("%Y-%m-%dT%H:%M:%S%.3fZ")
472        .to_string()
473}
474
475fn validate_priority(priority: u8) -> Result<()> {
476    if VALID_PRIORITY_RANGE.contains(&priority) {
477        Ok(())
478    } else {
479        anyhow::bail!("Invalid priority: {}. Valid: 0-4", priority);
480    }
481}
482
483/// Attributes for updating an item.
484#[derive(Default)]
485pub struct UpdateAttrs {
486    pub status: Option<String>,
487    pub title: Option<String>,
488    pub description: Option<String>,
489    pub priority: Option<Option<u8>>,
490    pub metadata: Option<serde_json::Value>,
491    pub blocked_by: Option<Vec<String>>,
492    pub sources: Option<Vec<Source>>,
493}