1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17pub const VALID_PRIORITY_RANGE: std::ops::RangeInclusive<u8> = 0..=4;
19
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
23pub struct Item {
24 pub id: String,
25
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub title: Option<String>,
28
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub description: Option<String>,
31
32 pub status: String,
33
34 #[serde(skip_serializing_if = "Option::is_none")]
35 #[serde(default)]
36 pub priority: Option<u8>,
37
38 pub sources: Vec<Source>,
39
40 pub metadata: serde_json::Value,
41
42 pub created_at: String,
43 pub updated_at: String,
44
45 #[serde(skip_serializing_if = "is_empty_vec")]
46 #[serde(default)]
47 pub blocked_by: Vec<String>,
48
49 #[serde(skip_serializing_if = "is_empty_json_vec")]
50 #[serde(default)]
51 pub errors: Vec<serde_json::Value>,
52}
53
54fn is_empty_vec(v: &[String]) -> bool {
55 v.is_empty()
56}
57
58fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
59 v.is_empty()
60}
61
62impl Item {
65 pub fn to_json_value(&self) -> serde_json::Value {
66 serde_json::to_value(self).expect("item serialization should succeed")
67 }
68
69 pub fn to_json_string(&self) -> String {
70 serde_json::to_string(self).expect("item serialization should succeed")
71 }
72
73 pub fn pending(&self) -> bool {
74 self.status == "pending"
75 }
76
77 pub fn blocked(&self) -> bool {
78 !self.blocked_by.is_empty()
79 }
80
81 pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
82 if !self.pending() {
83 return false;
84 }
85 if !self.blocked() {
86 return true;
87 }
88 match pending_ids {
89 None => true,
90 Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
91 }
92 }
93}
94
95pub fn parse_priority_value(input: &str) -> Result<u8> {
96 let trimmed = input.trim();
97
98 let priority = trimmed
99 .parse::<u8>()
100 .with_context(|| format!("Invalid priority: {input}. Valid: 0-4"))?;
101
102 if !VALID_PRIORITY_RANGE.contains(&priority) {
103 anyhow::bail!("Invalid priority: {input}. Valid: 0-4");
104 }
105
106 Ok(priority)
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
110pub struct Source {
111 #[serde(rename = "type")]
112 pub type_: String,
113
114 #[serde(skip_serializing_if = "Option::is_none")]
115 pub path: Option<String>,
116
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub content: Option<String>,
119}
120
121impl Source {
122 pub fn to_json_value(&self) -> serde_json::Value {
123 serde_json::to_value(self).expect("source serialization should succeed")
124 }
125}
126
127pub struct Queue {
130 pub path: PathBuf,
131}
132
133pub struct NewItem {
134 pub sources: Vec<Source>,
135 pub title: Option<String>,
136 pub description: Option<String>,
137 pub priority: Option<u8>,
138 pub metadata: serde_json::Value,
139 pub blocked_by: Vec<String>,
140}
141
142impl Queue {
143 pub fn new(path: PathBuf) -> Self {
144 Self { path }
145 }
146
147 pub fn push(
149 &self,
150 sources: Vec<Source>,
151 title: Option<String>,
152 priority: Option<u8>,
153 metadata: serde_json::Value,
154 blocked_by: Vec<String>,
155 ) -> Result<Item> {
156 self.validate_sources(&sources)?;
157 self.push_with_description(sources, title, None, priority, metadata, blocked_by)
158 }
159
160 pub fn push_with_description(
162 &self,
163 sources: Vec<Source>,
164 title: Option<String>,
165 description: Option<String>,
166 priority: Option<u8>,
167 metadata: serde_json::Value,
168 blocked_by: Vec<String>,
169 ) -> Result<Item> {
170 let new_item = NewItem {
171 sources,
172 title,
173 description,
174 priority,
175 metadata,
176 blocked_by,
177 };
178
179 let mut items = self.push_many_with_description(vec![new_item])?;
180 Ok(items.remove(0))
181 }
182
183 pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
185 if items.is_empty() {
186 anyhow::bail!("At least one item is required");
187 }
188
189 for item in &items {
190 self.validate_new_item(item)?;
191 }
192
193 self.with_exclusive_lock(|f| {
194 let existing = read_items(f, &self.path);
195 let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
196 let now = now_iso8601();
197 let mut created = Vec::with_capacity(items.len());
198
199 f.seek(std::io::SeekFrom::End(0))?;
200 for new_item in items {
201 let id = generate_id(&existing_ids);
202 existing_ids.insert(id.clone());
203
204 let item = Item {
205 id,
206 title: new_item.title,
207 description: new_item.description,
208 status: "pending".to_string(),
209 priority: new_item.priority,
210 sources: new_item.sources,
211 metadata: new_item.metadata,
212 created_at: now.clone(),
213 updated_at: now.clone(),
214 blocked_by: new_item.blocked_by,
215 errors: Vec::new(),
216 };
217
218 writeln!(f, "{}", item.to_json_string())?;
219 created.push(item);
220 }
221 f.flush()?;
222
223 Ok(created)
224 })
225 }
226
227 pub fn all(&self) -> Vec<Item> {
229 if !self.path.exists() {
230 return Vec::new();
231 }
232 match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
233 Ok(items) => items,
234 Err(_) => Vec::new(),
235 }
236 }
237
238 pub fn find(&self, id: &str) -> Option<Item> {
240 self.all().into_iter().find(|item| item.id == id)
241 }
242
243 pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
245 let items = self.all();
246 match status {
247 Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
248 None => items,
249 }
250 }
251
252 pub fn ready(&self) -> Vec<Item> {
254 let items = self.all();
255 let pending_ids: HashSet<String> = items
256 .iter()
257 .filter(|i| i.pending())
258 .map(|i| i.id.clone())
259 .collect();
260 items
261 .into_iter()
262 .filter(|item| item.ready(Some(&pending_ids)))
263 .collect()
264 }
265
266 pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
268 self.with_exclusive_lock(|f| {
269 let mut items = read_items(f, &self.path);
270 let index = items.iter().position(|item| item.id == id);
271 let index = match index {
272 Some(i) => i,
273 None => return Ok(None),
274 };
275
276 let item = &mut items[index];
277
278 if let Some(status) = &attrs.status {
279 if !VALID_STATUSES.contains(&status.as_str()) {
280 anyhow::bail!(
281 "Invalid status: {}. Valid: {}",
282 status,
283 VALID_STATUSES.join(", ")
284 );
285 }
286 item.status = status.clone();
287 }
288 if let Some(title) = attrs.title {
289 item.title = Some(title);
290 }
291 if let Some(description) = attrs.description {
292 item.description = Some(description);
293 }
294 if let Some(priority) = attrs.priority {
295 if let Some(value) = priority {
296 validate_priority(value)?;
297 }
298 item.priority = priority;
299 }
300 if let Some(metadata) = attrs.metadata {
301 item.metadata = metadata;
302 }
303 if let Some(blocked_by) = attrs.blocked_by {
304 item.blocked_by = blocked_by;
305 }
306 if let Some(sources) = attrs.sources {
307 item.sources = sources;
308 }
309
310 item.updated_at = now_iso8601();
311
312 let updated = item.clone();
313 rewrite_items(f, &items)?;
314 Ok(Some(updated))
315 })
316 }
317
318 pub fn remove(&self, id: &str) -> Result<Option<Item>> {
320 self.with_exclusive_lock(|f| {
321 let mut items = read_items(f, &self.path);
322 let index = items.iter().position(|item| item.id == id);
323 match index {
324 Some(i) => {
325 let removed = items.remove(i);
326 rewrite_items(f, &items)?;
327 Ok(Some(removed))
328 }
329 None => Ok(None),
330 }
331 })
332 }
333
334 fn validate_sources(&self, sources: &[Source]) -> Result<()> {
337 if sources.is_empty() {
338 anyhow::bail!("Sources cannot be empty");
339 }
340 self.validate_source_types(sources)
341 }
342
343 fn validate_new_item(&self, item: &NewItem) -> Result<()> {
344 if let Some(priority) = item.priority {
345 validate_priority(priority)?;
346 }
347
348 if item.sources.is_empty() && item.title.is_none() && item.description.is_none() {
349 anyhow::bail!("Item requires at least one source, title, or description");
350 }
351
352 self.validate_source_types(&item.sources)
353 }
354
355 fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
356 for source in sources {
357 if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
358 anyhow::bail!(
359 "Invalid source type: {}. Valid: {}",
360 source.type_,
361 VALID_SOURCE_TYPES.join(", ")
362 );
363 }
364 }
365 Ok(())
366 }
367
368 fn ensure_directory(&self) -> Result<()> {
369 if let Some(parent) = self.path.parent() {
370 if !parent.exists() {
371 fs::create_dir_all(parent)?;
372 }
373 }
374 Ok(())
375 }
376
377 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
378 where
379 F: FnOnce(&mut File) -> Result<T>,
380 {
381 self.ensure_directory()?;
382 let mut file = OpenOptions::new()
383 .read(true)
384 .write(true)
385 .create(true)
386 .truncate(false)
387 .open(&self.path)
388 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
389
390 flock(&file.as_fd(), FlockOperation::LockExclusive)
391 .with_context(|| "Failed to acquire exclusive lock")?;
392
393 let result = f(&mut file);
394
395 result
397 }
398
399 fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
400 where
401 F: FnOnce(&mut File) -> Result<T>,
402 {
403 let mut file = File::open(&self.path)
404 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
405
406 flock(&file.as_fd(), FlockOperation::LockShared)
407 .with_context(|| "Failed to acquire shared lock")?;
408
409 f(&mut file)
410 }
411}
412
413fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
415 file.seek(std::io::SeekFrom::Start(0)).ok();
416 let reader = BufReader::new(file);
417 let mut items = Vec::new();
418
419 for (line_num, line) in reader.lines().enumerate() {
420 let line = match line {
421 Ok(l) => l,
422 Err(_) => continue,
423 };
424 let trimmed = line.trim();
425 if trimmed.is_empty() {
426 continue;
427 }
428 match serde_json::from_str::<Item>(trimmed) {
429 Ok(item) => items.push(item),
430 Err(e) => {
431 eprintln!(
432 "Warning: Skipping corrupt line {} in {}: {}",
433 line_num + 1,
434 path.display(),
435 e
436 );
437 }
438 }
439 }
440 items
441}
442
443fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
445 file.seek(std::io::SeekFrom::Start(0))?;
446 file.set_len(0)?;
447 for item in items {
448 writeln!(file, "{}", item.to_json_string())?;
449 }
450 file.flush()?;
451 Ok(())
452}
453
454fn generate_id(existing_ids: &HashSet<String>) -> String {
456 let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
457 let mut rng = rand::thread_rng();
458 loop {
459 let id: String = (0..3)
460 .map(|_| chars[rng.gen_range(0..chars.len())])
461 .collect();
462 if !existing_ids.contains(&id) {
463 return id;
464 }
465 }
466}
467
468fn now_iso8601() -> String {
470 chrono::Utc::now()
471 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
472 .to_string()
473}
474
475fn validate_priority(priority: u8) -> Result<()> {
476 if VALID_PRIORITY_RANGE.contains(&priority) {
477 Ok(())
478 } else {
479 anyhow::bail!("Invalid priority: {}. Valid: 0-4", priority);
480 }
481}
482
483#[derive(Default)]
485pub struct UpdateAttrs {
486 pub status: Option<String>,
487 pub title: Option<String>,
488 pub description: Option<String>,
489 pub priority: Option<Option<u8>>,
490 pub metadata: Option<serde_json::Value>,
491 pub blocked_by: Option<Vec<String>>,
492 pub sources: Option<Vec<Source>>,
493}