1use anyhow::{Context, Result};
2use rustix::fs::{flock, FlockOperation};
3use serde::{Deserialize, Serialize};
4use std::collections::HashSet;
5use std::fs::{self, File, OpenOptions};
6use std::io::{BufRead, BufReader, Seek, Write};
7use std::os::fd::AsFd;
8use std::path::{Path, PathBuf};
9
10pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
12
13pub const VALID_DISPLAY_STATUSES: &[&str] = &["pending", "blocked", "in_progress", "closed"];
15
16pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
18
19pub const VALID_PRIORITY_RANGE: std::ops::RangeInclusive<u8> = 0..=4;
21
22const ID_ALPHABET: [char; 32] = [
23 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j',
24 'k', 'm', 'n', 'p', 'q', 'r', 's', 't', 'v', 'w', 'x', 'y', 'z',
25];
26const MIN_GENERATED_ID_LENGTH: usize = 3;
27const ID_SPACE_OCCUPANCY_THRESHOLD_PERCENT: usize = 10;
29
30#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
33pub struct Item {
34 pub id: String,
35
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub title: Option<String>,
38
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub description: Option<String>,
41
42 pub status: String,
43
44 #[serde(skip_serializing_if = "Option::is_none")]
45 #[serde(default)]
46 pub priority: Option<u8>,
47
48 pub sources: Vec<Source>,
49
50 pub metadata: serde_json::Value,
51
52 pub created_at: String,
53 pub updated_at: String,
54
55 #[serde(skip_serializing_if = "is_empty_vec")]
56 #[serde(default)]
57 pub blocked_by: Vec<String>,
58
59 #[serde(skip_serializing_if = "is_empty_json_vec")]
60 #[serde(default)]
61 pub errors: Vec<serde_json::Value>,
62}
63
64fn is_empty_vec(v: &[String]) -> bool {
65 v.is_empty()
66}
67
68fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
69 v.is_empty()
70}
71
72impl Item {
80 pub fn to_json_value(&self) -> serde_json::Value {
81 serde_json::to_value(self).expect("item serialization should succeed")
82 }
83
84 pub fn to_json_string(&self) -> String {
85 serde_json::to_string(self).expect("item serialization should succeed")
86 }
87
88 pub fn pending(&self) -> bool {
89 self.status == "pending"
90 }
91
92 pub fn blocked(&self) -> bool {
93 !self.blocked_by.is_empty()
94 }
95
96 pub fn computed_status(&self, open_ids: Option<&HashSet<String>>) -> String {
102 if !self.pending() || !self.blocked() {
103 return self.status.clone();
104 }
105
106 match open_ids {
107 None => "blocked".to_string(),
108 Some(ids) => {
109 if self.blocked_by.iter().any(|id| ids.contains(id)) {
110 "blocked".to_string()
111 } else {
112 self.status.clone()
113 }
114 }
115 }
116 }
117
118 pub fn with_computed_status(&self, open_ids: Option<&HashSet<String>>) -> Self {
119 let mut item = self.clone();
120 item.status = self.computed_status(open_ids);
121 item
122 }
123
124 pub fn ready(&self, open_ids: Option<&HashSet<String>>) -> bool {
125 if !self.pending() {
126 return false;
127 }
128 if !self.blocked() {
129 return true;
130 }
131 match open_ids {
132 None => true,
133 Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
134 }
135 }
136}
137
138pub fn parse_priority_value(input: &str) -> Result<u8> {
139 let trimmed = input.trim();
140
141 let priority = trimmed
142 .parse::<u8>()
143 .with_context(|| format!("Invalid priority: {input}. Valid: 0-4"))?;
144
145 if !VALID_PRIORITY_RANGE.contains(&priority) {
146 anyhow::bail!("Invalid priority: {input}. Valid: 0-4");
147 }
148
149 Ok(priority)
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
153pub struct Source {
154 #[serde(rename = "type")]
155 pub type_: String,
156
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub path: Option<String>,
159
160 #[serde(skip_serializing_if = "Option::is_none")]
161 pub content: Option<String>,
162}
163
164impl Source {
165 pub fn to_json_value(&self) -> serde_json::Value {
166 serde_json::to_value(self).expect("source serialization should succeed")
167 }
168}
169
170pub struct Queue {
173 pub path: PathBuf,
174}
175
176pub struct NewItem {
177 pub sources: Vec<Source>,
178 pub title: Option<String>,
179 pub description: Option<String>,
180 pub priority: Option<u8>,
181 pub metadata: serde_json::Value,
182 pub blocked_by: Vec<String>,
183}
184
185impl Queue {
186 pub fn new(path: PathBuf) -> Self {
187 Self { path }
188 }
189
190 pub fn push(
192 &self,
193 sources: Vec<Source>,
194 title: Option<String>,
195 description: Option<String>,
196 priority: Option<u8>,
197 metadata: serde_json::Value,
198 blocked_by: Vec<String>,
199 ) -> Result<Item> {
200 let new_item = NewItem {
201 sources,
202 title,
203 description,
204 priority,
205 metadata,
206 blocked_by,
207 };
208
209 let mut items = self.push_many_with_description(vec![new_item])?;
210 Ok(items.remove(0))
211 }
212
213 pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
215 if items.is_empty() {
216 anyhow::bail!("At least one item is required");
217 }
218
219 for item in &items {
220 self.validate_new_item(item)?;
221 }
222
223 self.with_exclusive_lock(|f| {
224 let existing = read_items(f, &self.path);
225 let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
226 let now = now_iso8601();
227 let mut created = Vec::with_capacity(items.len());
228
229 f.seek(std::io::SeekFrom::End(0))?;
230 for new_item in items {
231 let id = generate_id(&existing_ids);
232 existing_ids.insert(id.clone());
233
234 let item = Item {
235 id,
236 title: new_item.title,
237 description: new_item.description,
238 status: "pending".to_string(),
239 priority: new_item.priority,
240 sources: new_item.sources,
241 metadata: new_item.metadata,
242 created_at: now.clone(),
243 updated_at: now.clone(),
244 blocked_by: new_item.blocked_by,
245 errors: Vec::new(),
246 };
247
248 writeln!(f, "{}", item.to_json_string())?;
249 created.push(item);
250 }
251 f.flush()?;
252
253 Ok(created)
254 })
255 }
256
257 pub fn all(&self) -> Vec<Item> {
259 if !self.path.exists() {
260 return Vec::new();
261 }
262 match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
263 Ok(items) => items,
264 Err(_) => Vec::new(),
265 }
266 }
267
268 pub fn find(&self, id: &str) -> Option<Item> {
270 self.all().into_iter().find(|item| item.id == id)
271 }
272
273 pub fn open_ids(&self) -> HashSet<String> {
275 open_ids_for_items(&self.all())
276 }
277
278 pub fn item_with_computed_status(&self, item: Item) -> Item {
280 let open_ids = self.open_ids();
281 item.with_computed_status(Some(&open_ids))
282 }
283
284 pub fn items_with_computed_status(&self, items: Vec<Item>) -> Vec<Item> {
286 let open_ids = self.open_ids();
287 items
288 .into_iter()
289 .map(|item| item.with_computed_status(Some(&open_ids)))
290 .collect()
291 }
292
293 pub fn all_with_computed_status(&self) -> Vec<Item> {
295 let items = self.all();
296 let open_ids = open_ids_for_items(&items);
297 items
298 .into_iter()
299 .map(|item| item.with_computed_status(Some(&open_ids)))
300 .collect()
301 }
302
303 pub fn find_with_computed_status(&self, id: &str) -> Option<Item> {
305 let items = self.all();
306 let open_ids = open_ids_for_items(&items);
307 items
308 .into_iter()
309 .find(|item| item.id == id)
310 .map(|item| item.with_computed_status(Some(&open_ids)))
311 }
312
313 pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
320 let items = self.all();
321 match status {
322 Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
323 None => items,
324 }
325 }
326
327 pub fn ready(&self) -> Vec<Item> {
329 let items = self.all();
330 let open_ids: HashSet<String> = items
331 .iter()
332 .filter(|i| i.status != "closed")
333 .map(|i| i.id.clone())
334 .collect();
335 items
336 .into_iter()
337 .filter(|item| item.ready(Some(&open_ids)))
338 .collect()
339 }
340
341 pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
343 self.with_exclusive_lock(|f| {
344 let mut items = read_items(f, &self.path);
345 let index = items.iter().position(|item| item.id == id);
346 let index = match index {
347 Some(i) => i,
348 None => return Ok(None),
349 };
350
351 let original = items[index].clone();
352 let item = &mut items[index];
353
354 if let Some(status) = &attrs.status {
355 if !VALID_STATUSES.contains(&status.as_str()) {
356 anyhow::bail!(
357 "Invalid status: {}. Valid: {}",
358 status,
359 VALID_STATUSES.join(", ")
360 );
361 }
362 item.status = status.clone();
363 }
364 if let Some(title) = attrs.title {
365 item.title = Some(title);
366 }
367 if let Some(description) = attrs.description {
368 item.description = Some(description);
369 }
370 if let Some(priority) = attrs.priority {
371 if let Some(value) = priority {
372 validate_priority(value)?;
373 }
374 item.priority = priority;
375 }
376 if let Some(metadata) = attrs.metadata {
377 item.metadata = metadata;
378 }
379 if let Some(blocked_by) = attrs.blocked_by {
380 validate_blocked_by(id, &blocked_by)?;
381 item.blocked_by = blocked_by;
382 }
383 if let Some(sources) = attrs.sources {
384 self.validate_source_types(&sources)?;
385 item.sources = sources;
386 }
387
388 if *item == original {
389 return Ok(Some(original));
390 }
391
392 item.updated_at = now_iso8601();
393
394 let updated = item.clone();
395 rewrite_items(f, &items)?;
396 Ok(Some(updated))
397 })
398 }
399
400 pub fn remove(&self, id: &str) -> Result<Option<Item>> {
402 self.with_exclusive_lock(|f| {
403 let mut items = read_items(f, &self.path);
404 let index = items.iter().position(|item| item.id == id);
405 match index {
406 Some(i) => {
407 let removed = items.remove(i);
408 rewrite_items(f, &items)?;
409 Ok(Some(removed))
410 }
411 None => Ok(None),
412 }
413 })
414 }
415
416 fn validate_new_item(&self, item: &NewItem) -> Result<()> {
419 if let Some(priority) = item.priority {
420 validate_priority(priority)?;
421 }
422
423 if item.sources.is_empty() && item.title.is_none() && item.description.is_none() {
424 anyhow::bail!("Item requires at least one source, title, or description");
425 }
426
427 self.validate_source_types(&item.sources)
428 }
429
430 fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
431 for source in sources {
432 if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
433 anyhow::bail!(
434 "Invalid source type: {}. Valid: {}",
435 source.type_,
436 VALID_SOURCE_TYPES.join(", ")
437 );
438 }
439 }
440 Ok(())
441 }
442
443 fn ensure_directory(&self) -> Result<()> {
444 if let Some(parent) = self.path.parent() {
445 if !parent.exists() {
446 fs::create_dir_all(parent)?;
447 }
448 }
449 Ok(())
450 }
451
452 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
453 where
454 F: FnOnce(&mut File) -> Result<T>,
455 {
456 self.ensure_directory()?;
457 let mut file = OpenOptions::new()
458 .read(true)
459 .write(true)
460 .create(true)
461 .truncate(false)
462 .open(&self.path)
463 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
464
465 flock(&file.as_fd(), FlockOperation::LockExclusive)
466 .with_context(|| "Failed to acquire exclusive lock")?;
467
468 let result = f(&mut file);
469
470 result
472 }
473
474 fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
475 where
476 F: FnOnce(&mut File) -> Result<T>,
477 {
478 let mut file = File::open(&self.path)
479 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
480
481 flock(&file.as_fd(), FlockOperation::LockShared)
482 .with_context(|| "Failed to acquire shared lock")?;
483
484 f(&mut file)
485 }
486}
487
488fn open_ids_for_items(items: &[Item]) -> HashSet<String> {
489 items
490 .iter()
491 .filter(|item| item.status != "closed")
492 .map(|item| item.id.clone())
493 .collect()
494}
495
496fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
498 file.seek(std::io::SeekFrom::Start(0)).ok();
499 let reader = BufReader::new(file);
500 let mut items = Vec::new();
501
502 for (line_num, line) in reader.lines().enumerate() {
503 let line = match line {
504 Ok(l) => l,
505 Err(_) => continue,
506 };
507 let trimmed = line.trim();
508 if trimmed.is_empty() {
509 continue;
510 }
511 match serde_json::from_str::<Item>(trimmed) {
512 Ok(item) => items.push(item),
513 Err(e) => {
514 eprintln!(
515 "Warning: Skipping corrupt line {} in {}: {}",
516 line_num + 1,
517 path.display(),
518 e
519 );
520 }
521 }
522 }
523 items
524}
525
526fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
528 file.seek(std::io::SeekFrom::Start(0))?;
529 file.set_len(0)?;
530 for item in items {
531 writeln!(file, "{}", item.to_json_string())?;
532 }
533 file.flush()?;
534 Ok(())
535}
536
537fn generate_id(existing_ids: &HashSet<String>) -> String {
539 let length = generated_id_length(existing_ids.len());
540
541 loop {
542 let id = nanoid::nanoid!(length, &ID_ALPHABET);
543 if !existing_ids.contains(&id) {
544 return id;
545 }
546 }
547}
548
549fn generated_id_length(active_or_reserved_ids: usize) -> usize {
550 let mut length = MIN_GENERATED_ID_LENGTH;
551
552 loop {
553 let total_space = id_space_for_length(length);
554 let occupancy_threshold = total_space * ID_SPACE_OCCUPANCY_THRESHOLD_PERCENT / 100;
555
556 if active_or_reserved_ids < occupancy_threshold {
557 return length;
558 }
559
560 length += 1;
561 }
562}
563
564fn id_space_for_length(length: usize) -> usize {
565 ID_ALPHABET
566 .len()
567 .checked_pow(length as u32)
568 .expect("generated ID space should fit in usize")
569}
570
571fn now_iso8601() -> String {
573 chrono::Utc::now()
574 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
575 .to_string()
576}
577
578fn validate_priority(priority: u8) -> Result<()> {
579 if VALID_PRIORITY_RANGE.contains(&priority) {
580 Ok(())
581 } else {
582 anyhow::bail!("Invalid priority: {}. Valid: 0-4", priority);
583 }
584}
585
586fn validate_blocked_by(item_id: &str, blocked_by: &[String]) -> Result<()> {
587 if blocked_by.iter().any(|blocker_id| blocker_id == item_id) {
588 anyhow::bail!("Item cannot block itself: {}", item_id);
589 }
590 Ok(())
591}
592
593#[derive(Default)]
595pub struct UpdateAttrs {
596 pub status: Option<String>,
597 pub title: Option<String>,
598 pub description: Option<String>,
599 pub priority: Option<Option<u8>>,
600 pub metadata: Option<serde_json::Value>,
601 pub blocked_by: Option<Vec<String>>,
602 pub sources: Option<Vec<Source>>,
603}
604
605#[cfg(test)]
606mod tests {
607 use super::{generated_id_length, id_space_for_length};
608
609 #[test]
610 fn id_space_matches_base32_lengths() {
611 assert_eq!(id_space_for_length(3), 32_768);
612 assert_eq!(id_space_for_length(4), 1_048_576);
613 assert_eq!(id_space_for_length(5), 33_554_432);
614 assert_eq!(id_space_for_length(6), 1_073_741_824);
615 }
616
617 #[test]
618 fn generated_ids_start_at_three_characters() {
619 assert_eq!(generated_id_length(0), 3);
620 assert_eq!(generated_id_length(3_275), 3);
621 }
622
623 #[test]
624 fn generated_id_length_grows_at_documented_thresholds() {
625 assert_eq!(generated_id_length(3_276), 4);
626 assert_eq!(generated_id_length(104_857), 5);
627 assert_eq!(generated_id_length(3_355_443), 6);
628 assert_eq!(generated_id_length(107_374_182), 7);
629 }
630}