1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14pub const VALID_DISPLAY_STATUSES: &[&str] = &["pending", "blocked", "in_progress", "closed"];
16
17pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
19
20pub const VALID_PRIORITY_RANGE: std::ops::RangeInclusive<u8> = 0..=4;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26pub struct Item {
27 pub id: String,
28
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub title: Option<String>,
31
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub description: Option<String>,
34
35 pub status: String,
36
37 #[serde(skip_serializing_if = "Option::is_none")]
38 #[serde(default)]
39 pub priority: Option<u8>,
40
41 pub sources: Vec<Source>,
42
43 pub metadata: serde_json::Value,
44
45 pub created_at: String,
46 pub updated_at: String,
47
48 #[serde(skip_serializing_if = "is_empty_vec")]
49 #[serde(default)]
50 pub blocked_by: Vec<String>,
51
52 #[serde(skip_serializing_if = "is_empty_json_vec")]
53 #[serde(default)]
54 pub errors: Vec<serde_json::Value>,
55}
56
57fn is_empty_vec(v: &[String]) -> bool {
58 v.is_empty()
59}
60
61fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
62 v.is_empty()
63}
64
65impl Item {
73 pub fn to_json_value(&self) -> serde_json::Value {
74 serde_json::to_value(self).expect("item serialization should succeed")
75 }
76
77 pub fn to_json_string(&self) -> String {
78 serde_json::to_string(self).expect("item serialization should succeed")
79 }
80
81 pub fn pending(&self) -> bool {
82 self.status == "pending"
83 }
84
85 pub fn blocked(&self) -> bool {
86 !self.blocked_by.is_empty()
87 }
88
89 pub fn computed_status(&self, open_ids: Option<&HashSet<String>>) -> String {
95 if !self.pending() || !self.blocked() {
96 return self.status.clone();
97 }
98
99 match open_ids {
100 None => "blocked".to_string(),
101 Some(ids) => {
102 if self.blocked_by.iter().any(|id| ids.contains(id)) {
103 "blocked".to_string()
104 } else {
105 self.status.clone()
106 }
107 }
108 }
109 }
110
111 pub fn with_computed_status(&self, open_ids: Option<&HashSet<String>>) -> Self {
112 let mut item = self.clone();
113 item.status = self.computed_status(open_ids);
114 item
115 }
116
117 pub fn ready(&self, open_ids: Option<&HashSet<String>>) -> bool {
118 if !self.pending() {
119 return false;
120 }
121 if !self.blocked() {
122 return true;
123 }
124 match open_ids {
125 None => true,
126 Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
127 }
128 }
129}
130
131pub fn parse_priority_value(input: &str) -> Result<u8> {
132 let trimmed = input.trim();
133
134 let priority = trimmed
135 .parse::<u8>()
136 .with_context(|| format!("Invalid priority: {input}. Valid: 0-4"))?;
137
138 if !VALID_PRIORITY_RANGE.contains(&priority) {
139 anyhow::bail!("Invalid priority: {input}. Valid: 0-4");
140 }
141
142 Ok(priority)
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
146pub struct Source {
147 #[serde(rename = "type")]
148 pub type_: String,
149
150 #[serde(skip_serializing_if = "Option::is_none")]
151 pub path: Option<String>,
152
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub content: Option<String>,
155}
156
157impl Source {
158 pub fn to_json_value(&self) -> serde_json::Value {
159 serde_json::to_value(self).expect("source serialization should succeed")
160 }
161}
162
163pub struct Queue {
166 pub path: PathBuf,
167}
168
169pub struct NewItem {
170 pub sources: Vec<Source>,
171 pub title: Option<String>,
172 pub description: Option<String>,
173 pub priority: Option<u8>,
174 pub metadata: serde_json::Value,
175 pub blocked_by: Vec<String>,
176}
177
178impl Queue {
179 pub fn new(path: PathBuf) -> Self {
180 Self { path }
181 }
182
183 pub fn push(
185 &self,
186 sources: Vec<Source>,
187 title: Option<String>,
188 description: Option<String>,
189 priority: Option<u8>,
190 metadata: serde_json::Value,
191 blocked_by: Vec<String>,
192 ) -> Result<Item> {
193 let new_item = NewItem {
194 sources,
195 title,
196 description,
197 priority,
198 metadata,
199 blocked_by,
200 };
201
202 let mut items = self.push_many_with_description(vec![new_item])?;
203 Ok(items.remove(0))
204 }
205
206 pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
208 if items.is_empty() {
209 anyhow::bail!("At least one item is required");
210 }
211
212 for item in &items {
213 self.validate_new_item(item)?;
214 }
215
216 self.with_exclusive_lock(|f| {
217 let existing = read_items(f, &self.path);
218 let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
219 let now = now_iso8601();
220 let mut created = Vec::with_capacity(items.len());
221
222 f.seek(std::io::SeekFrom::End(0))?;
223 for new_item in items {
224 let id = generate_id(&existing_ids);
225 existing_ids.insert(id.clone());
226
227 let item = Item {
228 id,
229 title: new_item.title,
230 description: new_item.description,
231 status: "pending".to_string(),
232 priority: new_item.priority,
233 sources: new_item.sources,
234 metadata: new_item.metadata,
235 created_at: now.clone(),
236 updated_at: now.clone(),
237 blocked_by: new_item.blocked_by,
238 errors: Vec::new(),
239 };
240
241 writeln!(f, "{}", item.to_json_string())?;
242 created.push(item);
243 }
244 f.flush()?;
245
246 Ok(created)
247 })
248 }
249
250 pub fn all(&self) -> Vec<Item> {
252 if !self.path.exists() {
253 return Vec::new();
254 }
255 match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
256 Ok(items) => items,
257 Err(_) => Vec::new(),
258 }
259 }
260
261 pub fn find(&self, id: &str) -> Option<Item> {
263 self.all().into_iter().find(|item| item.id == id)
264 }
265
266 pub fn open_ids(&self) -> HashSet<String> {
268 open_ids_for_items(&self.all())
269 }
270
271 pub fn item_with_computed_status(&self, item: Item) -> Item {
273 let open_ids = self.open_ids();
274 item.with_computed_status(Some(&open_ids))
275 }
276
277 pub fn items_with_computed_status(&self, items: Vec<Item>) -> Vec<Item> {
279 let open_ids = self.open_ids();
280 items
281 .into_iter()
282 .map(|item| item.with_computed_status(Some(&open_ids)))
283 .collect()
284 }
285
286 pub fn all_with_computed_status(&self) -> Vec<Item> {
288 let items = self.all();
289 let open_ids = open_ids_for_items(&items);
290 items
291 .into_iter()
292 .map(|item| item.with_computed_status(Some(&open_ids)))
293 .collect()
294 }
295
296 pub fn find_with_computed_status(&self, id: &str) -> Option<Item> {
298 let items = self.all();
299 let open_ids = open_ids_for_items(&items);
300 items
301 .into_iter()
302 .find(|item| item.id == id)
303 .map(|item| item.with_computed_status(Some(&open_ids)))
304 }
305
306 pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
313 let items = self.all();
314 match status {
315 Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
316 None => items,
317 }
318 }
319
320 pub fn ready(&self) -> Vec<Item> {
322 let items = self.all();
323 let open_ids: HashSet<String> = items
324 .iter()
325 .filter(|i| i.status != "closed")
326 .map(|i| i.id.clone())
327 .collect();
328 items
329 .into_iter()
330 .filter(|item| item.ready(Some(&open_ids)))
331 .collect()
332 }
333
334 pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
336 self.with_exclusive_lock(|f| {
337 let mut items = read_items(f, &self.path);
338 let index = items.iter().position(|item| item.id == id);
339 let index = match index {
340 Some(i) => i,
341 None => return Ok(None),
342 };
343
344 let original = items[index].clone();
345 let item = &mut items[index];
346
347 if let Some(status) = &attrs.status {
348 if !VALID_STATUSES.contains(&status.as_str()) {
349 anyhow::bail!(
350 "Invalid status: {}. Valid: {}",
351 status,
352 VALID_STATUSES.join(", ")
353 );
354 }
355 item.status = status.clone();
356 }
357 if let Some(title) = attrs.title {
358 item.title = Some(title);
359 }
360 if let Some(description) = attrs.description {
361 item.description = Some(description);
362 }
363 if let Some(priority) = attrs.priority {
364 if let Some(value) = priority {
365 validate_priority(value)?;
366 }
367 item.priority = priority;
368 }
369 if let Some(metadata) = attrs.metadata {
370 item.metadata = metadata;
371 }
372 if let Some(blocked_by) = attrs.blocked_by {
373 validate_blocked_by(id, &blocked_by)?;
374 item.blocked_by = blocked_by;
375 }
376 if let Some(sources) = attrs.sources {
377 self.validate_source_types(&sources)?;
378 item.sources = sources;
379 }
380
381 if *item == original {
382 return Ok(Some(original));
383 }
384
385 item.updated_at = now_iso8601();
386
387 let updated = item.clone();
388 rewrite_items(f, &items)?;
389 Ok(Some(updated))
390 })
391 }
392
393 pub fn remove(&self, id: &str) -> Result<Option<Item>> {
395 self.with_exclusive_lock(|f| {
396 let mut items = read_items(f, &self.path);
397 let index = items.iter().position(|item| item.id == id);
398 match index {
399 Some(i) => {
400 let removed = items.remove(i);
401 rewrite_items(f, &items)?;
402 Ok(Some(removed))
403 }
404 None => Ok(None),
405 }
406 })
407 }
408
409 fn validate_new_item(&self, item: &NewItem) -> Result<()> {
412 if let Some(priority) = item.priority {
413 validate_priority(priority)?;
414 }
415
416 if item.sources.is_empty() && item.title.is_none() && item.description.is_none() {
417 anyhow::bail!("Item requires at least one source, title, or description");
418 }
419
420 self.validate_source_types(&item.sources)
421 }
422
423 fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
424 for source in sources {
425 if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
426 anyhow::bail!(
427 "Invalid source type: {}. Valid: {}",
428 source.type_,
429 VALID_SOURCE_TYPES.join(", ")
430 );
431 }
432 }
433 Ok(())
434 }
435
436 fn ensure_directory(&self) -> Result<()> {
437 if let Some(parent) = self.path.parent() {
438 if !parent.exists() {
439 fs::create_dir_all(parent)?;
440 }
441 }
442 Ok(())
443 }
444
445 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
446 where
447 F: FnOnce(&mut File) -> Result<T>,
448 {
449 self.ensure_directory()?;
450 let mut file = OpenOptions::new()
451 .read(true)
452 .write(true)
453 .create(true)
454 .truncate(false)
455 .open(&self.path)
456 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
457
458 flock(&file.as_fd(), FlockOperation::LockExclusive)
459 .with_context(|| "Failed to acquire exclusive lock")?;
460
461 let result = f(&mut file);
462
463 result
465 }
466
467 fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
468 where
469 F: FnOnce(&mut File) -> Result<T>,
470 {
471 let mut file = File::open(&self.path)
472 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
473
474 flock(&file.as_fd(), FlockOperation::LockShared)
475 .with_context(|| "Failed to acquire shared lock")?;
476
477 f(&mut file)
478 }
479}
480
481fn open_ids_for_items(items: &[Item]) -> HashSet<String> {
482 items
483 .iter()
484 .filter(|item| item.status != "closed")
485 .map(|item| item.id.clone())
486 .collect()
487}
488
489fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
491 file.seek(std::io::SeekFrom::Start(0)).ok();
492 let reader = BufReader::new(file);
493 let mut items = Vec::new();
494
495 for (line_num, line) in reader.lines().enumerate() {
496 let line = match line {
497 Ok(l) => l,
498 Err(_) => continue,
499 };
500 let trimmed = line.trim();
501 if trimmed.is_empty() {
502 continue;
503 }
504 match serde_json::from_str::<Item>(trimmed) {
505 Ok(item) => items.push(item),
506 Err(e) => {
507 eprintln!(
508 "Warning: Skipping corrupt line {} in {}: {}",
509 line_num + 1,
510 path.display(),
511 e
512 );
513 }
514 }
515 }
516 items
517}
518
519fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
521 file.seek(std::io::SeekFrom::Start(0))?;
522 file.set_len(0)?;
523 for item in items {
524 writeln!(file, "{}", item.to_json_string())?;
525 }
526 file.flush()?;
527 Ok(())
528}
529
530fn generate_id(existing_ids: &HashSet<String>) -> String {
532 let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
533 let mut rng = rand::thread_rng();
534 loop {
535 let id: String = (0..3)
536 .map(|_| chars[rng.gen_range(0..chars.len())])
537 .collect();
538 if !existing_ids.contains(&id) {
539 return id;
540 }
541 }
542}
543
544fn now_iso8601() -> String {
546 chrono::Utc::now()
547 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
548 .to_string()
549}
550
551fn validate_priority(priority: u8) -> Result<()> {
552 if VALID_PRIORITY_RANGE.contains(&priority) {
553 Ok(())
554 } else {
555 anyhow::bail!("Invalid priority: {}. Valid: 0-4", priority);
556 }
557}
558
559fn validate_blocked_by(item_id: &str, blocked_by: &[String]) -> Result<()> {
560 if blocked_by.iter().any(|blocker_id| blocker_id == item_id) {
561 anyhow::bail!("Item cannot block itself: {}", item_id);
562 }
563 Ok(())
564}
565
566#[derive(Default)]
568pub struct UpdateAttrs {
569 pub status: Option<String>,
570 pub title: Option<String>,
571 pub description: Option<String>,
572 pub priority: Option<Option<u8>>,
573 pub metadata: Option<serde_json::Value>,
574 pub blocked_by: Option<Vec<String>>,
575 pub sources: Option<Vec<Source>>,
576}