1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub struct Item {
21 pub id: String,
22
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub title: Option<String>,
25
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub description: Option<String>,
28
29 pub status: String,
30
31 pub sources: Vec<Source>,
32
33 pub metadata: serde_json::Value,
34
35 pub session_id: Option<String>,
37
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub worktree: Option<Worktree>,
40
41 #[serde(skip_serializing_if = "is_empty_vec")]
42 #[serde(default)]
43 pub blocked_by: Vec<String>,
44
45 #[serde(skip_serializing_if = "is_empty_json_vec")]
46 #[serde(default)]
47 pub errors: Vec<serde_json::Value>,
48
49 pub created_at: String,
50 pub updated_at: String,
51}
52
53fn is_empty_vec(v: &[String]) -> bool {
54 v.is_empty()
55}
56
57fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
58 v.is_empty()
59}
60
61impl Item {
70 pub fn to_json_value(&self) -> serde_json::Value {
71 let mut map = serde_json::Map::new();
72
73 map.insert("id".to_string(), serde_json::Value::String(self.id.clone()));
74
75 if let Some(ref title) = self.title {
77 map.insert(
78 "title".to_string(),
79 serde_json::Value::String(title.clone()),
80 );
81 }
82 if let Some(ref description) = self.description {
83 map.insert(
84 "description".to_string(),
85 serde_json::Value::String(description.clone()),
86 );
87 }
88
89 map.insert(
90 "status".to_string(),
91 serde_json::Value::String(self.status.clone()),
92 );
93
94 map.insert(
95 "sources".to_string(),
96 serde_json::Value::Array(self.sources.iter().map(|s| s.to_json_value()).collect()),
97 );
98
99 map.insert("metadata".to_string(), self.metadata.clone());
100
101 map.insert(
102 "session_id".to_string(),
103 match &self.session_id {
104 Some(s) => serde_json::Value::String(s.clone()),
105 None => serde_json::Value::Null,
106 },
107 );
108
109 map.insert(
110 "created_at".to_string(),
111 serde_json::Value::String(self.created_at.clone()),
112 );
113 map.insert(
114 "updated_at".to_string(),
115 serde_json::Value::String(self.updated_at.clone()),
116 );
117
118 if let Some(ref wt) = self.worktree {
120 map.insert("worktree".to_string(), wt.to_json_value());
121 }
122
123 if !self.blocked_by.is_empty() {
125 map.insert(
126 "blocked_by".to_string(),
127 serde_json::Value::Array(
128 self.blocked_by
129 .iter()
130 .map(|s| serde_json::Value::String(s.clone()))
131 .collect(),
132 ),
133 );
134 }
135
136 if !self.errors.is_empty() {
138 map.insert(
139 "errors".to_string(),
140 serde_json::Value::Array(self.errors.clone()),
141 );
142 }
143
144 serde_json::Value::Object(map)
145 }
146
147 pub fn to_json_string(&self) -> String {
148 self.to_json_value().to_string()
149 }
150
151 pub fn pending(&self) -> bool {
152 self.status == "pending"
153 }
154
155 pub fn blocked(&self) -> bool {
156 !self.blocked_by.is_empty()
157 }
158
159 pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
160 if !self.pending() {
161 return false;
162 }
163 if !self.blocked() {
164 return true;
165 }
166 match pending_ids {
167 None => true,
168 Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
169 }
170 }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174pub struct Source {
175 #[serde(rename = "type")]
176 pub type_: String,
177
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub path: Option<String>,
180
181 #[serde(skip_serializing_if = "Option::is_none")]
182 pub content: Option<String>,
183
184 #[serde(skip_serializing_if = "Option::is_none")]
185 pub session_id: Option<String>,
186}
187
188impl Source {
189 pub fn to_json_value(&self) -> serde_json::Value {
190 let mut map = serde_json::Map::new();
191 map.insert(
192 "type".to_string(),
193 serde_json::Value::String(self.type_.clone()),
194 );
195 if let Some(ref path) = self.path {
196 map.insert("path".to_string(), serde_json::Value::String(path.clone()));
197 }
198 if let Some(ref content) = self.content {
199 map.insert(
200 "content".to_string(),
201 serde_json::Value::String(content.clone()),
202 );
203 }
204 if let Some(ref session_id) = self.session_id {
205 map.insert(
206 "session_id".to_string(),
207 serde_json::Value::String(session_id.clone()),
208 );
209 }
210 serde_json::Value::Object(map)
211 }
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
215pub struct Worktree {
216 #[serde(skip_serializing_if = "Option::is_none")]
217 pub path: Option<String>,
218 #[serde(skip_serializing_if = "Option::is_none")]
219 pub branch: Option<String>,
220}
221
222impl Worktree {
223 pub fn to_json_value(&self) -> serde_json::Value {
224 let mut map = serde_json::Map::new();
225 if let Some(ref path) = self.path {
226 map.insert("path".to_string(), serde_json::Value::String(path.clone()));
227 }
228 if let Some(ref branch) = self.branch {
229 map.insert(
230 "branch".to_string(),
231 serde_json::Value::String(branch.clone()),
232 );
233 }
234 serde_json::Value::Object(map)
235 }
236}
237
238pub struct Queue {
241 pub path: PathBuf,
242}
243
244pub struct NewItem {
245 pub sources: Vec<Source>,
246 pub title: Option<String>,
247 pub description: Option<String>,
248 pub metadata: serde_json::Value,
249 pub session_id: Option<String>,
250 pub blocked_by: Vec<String>,
251}
252
253impl Queue {
254 pub fn new(path: PathBuf) -> Self {
255 Self { path }
256 }
257
258 pub fn push(
260 &self,
261 sources: Vec<Source>,
262 title: Option<String>,
263 metadata: serde_json::Value,
264 session_id: Option<String>,
265 blocked_by: Vec<String>,
266 ) -> Result<Item> {
267 self.validate_sources(&sources)?;
268 self.push_with_description(sources, title, None, metadata, session_id, blocked_by)
269 }
270
271 pub fn push_with_description(
273 &self,
274 sources: Vec<Source>,
275 title: Option<String>,
276 description: Option<String>,
277 metadata: serde_json::Value,
278 session_id: Option<String>,
279 blocked_by: Vec<String>,
280 ) -> Result<Item> {
281 let new_item = NewItem {
282 sources,
283 title,
284 description,
285 metadata,
286 session_id,
287 blocked_by,
288 };
289
290 let mut items = self.push_many_with_description(vec![new_item])?;
291 Ok(items.remove(0))
292 }
293
294 pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
296 if items.is_empty() {
297 anyhow::bail!("At least one item is required");
298 }
299
300 for item in &items {
301 self.validate_new_item(item)?;
302 }
303
304 self.with_exclusive_lock(|f| {
305 let existing = read_items(f, &self.path);
306 let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
307 let now = now_iso8601();
308 let mut created = Vec::with_capacity(items.len());
309
310 f.seek(std::io::SeekFrom::End(0))?;
311 for new_item in items {
312 let id = generate_id(&existing_ids);
313 existing_ids.insert(id.clone());
314
315 let item = Item {
316 id,
317 title: new_item.title,
318 description: new_item.description,
319 status: "pending".to_string(),
320 sources: new_item.sources,
321 metadata: new_item.metadata,
322 session_id: new_item.session_id,
323 worktree: None,
324 blocked_by: new_item.blocked_by,
325 errors: Vec::new(),
326 created_at: now.clone(),
327 updated_at: now.clone(),
328 };
329
330 writeln!(f, "{}", item.to_json_string())?;
331 created.push(item);
332 }
333 f.flush()?;
334
335 Ok(created)
336 })
337 }
338
339 pub fn all(&self) -> Vec<Item> {
341 if !self.path.exists() {
342 return Vec::new();
343 }
344 match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
345 Ok(items) => items,
346 Err(_) => Vec::new(),
347 }
348 }
349
350 pub fn find(&self, id: &str) -> Option<Item> {
352 self.all().into_iter().find(|item| item.id == id)
353 }
354
355 pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
357 let items = self.all();
358 match status {
359 Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
360 None => items,
361 }
362 }
363
364 pub fn ready(&self) -> Vec<Item> {
366 let items = self.all();
367 let pending_ids: HashSet<String> = items
368 .iter()
369 .filter(|i| i.pending())
370 .map(|i| i.id.clone())
371 .collect();
372 items
373 .into_iter()
374 .filter(|item| item.ready(Some(&pending_ids)))
375 .collect()
376 }
377
378 pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
380 self.with_exclusive_lock(|f| {
381 let mut items = read_items(f, &self.path);
382 let index = items.iter().position(|item| item.id == id);
383 let index = match index {
384 Some(i) => i,
385 None => return Ok(None),
386 };
387
388 let item = &mut items[index];
389
390 if let Some(status) = &attrs.status {
391 if !VALID_STATUSES.contains(&status.as_str()) {
392 anyhow::bail!(
393 "Invalid status: {}. Valid: {}",
394 status,
395 VALID_STATUSES.join(", ")
396 );
397 }
398 item.status = status.clone();
399 }
400 if let Some(title) = attrs.title {
401 item.title = Some(title);
402 }
403 if let Some(description) = attrs.description {
404 item.description = Some(description);
405 }
406 if let Some(metadata) = attrs.metadata {
407 item.metadata = metadata;
408 }
409 if let Some(session_id) = attrs.session_id {
410 item.session_id = Some(session_id);
411 }
412 if let Some(blocked_by) = attrs.blocked_by {
413 item.blocked_by = blocked_by;
414 }
415 if let Some(sources) = attrs.sources {
416 item.sources = sources;
417 }
418
419 item.updated_at = now_iso8601();
420
421 let updated = item.clone();
422 rewrite_items(f, &items)?;
423 Ok(Some(updated))
424 })
425 }
426
427 pub fn remove(&self, id: &str) -> Result<Option<Item>> {
429 self.with_exclusive_lock(|f| {
430 let mut items = read_items(f, &self.path);
431 let index = items.iter().position(|item| item.id == id);
432 match index {
433 Some(i) => {
434 let removed = items.remove(i);
435 rewrite_items(f, &items)?;
436 Ok(Some(removed))
437 }
438 None => Ok(None),
439 }
440 })
441 }
442
443 fn validate_sources(&self, sources: &[Source]) -> Result<()> {
446 if sources.is_empty() {
447 anyhow::bail!("Sources cannot be empty");
448 }
449 self.validate_source_types(sources)
450 }
451
452 fn validate_new_item(&self, item: &NewItem) -> Result<()> {
453 let has_metadata = match &item.metadata {
454 serde_json::Value::Object(map) => !map.is_empty(),
455 _ => true,
456 };
457
458 if item.sources.is_empty()
459 && item.title.is_none()
460 && item.description.is_none()
461 && !has_metadata
462 {
463 anyhow::bail!("Item requires at least one source, title, description, or metadata");
464 }
465
466 self.validate_source_types(&item.sources)
467 }
468
469 fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
470 for source in sources {
471 if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
472 anyhow::bail!(
473 "Invalid source type: {}. Valid: {}",
474 source.type_,
475 VALID_SOURCE_TYPES.join(", ")
476 );
477 }
478 }
479 Ok(())
480 }
481
482 fn ensure_directory(&self) -> Result<()> {
483 if let Some(parent) = self.path.parent() {
484 if !parent.exists() {
485 fs::create_dir_all(parent)?;
486 }
487 }
488 Ok(())
489 }
490
491 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
492 where
493 F: FnOnce(&mut File) -> Result<T>,
494 {
495 self.ensure_directory()?;
496 let mut file = OpenOptions::new()
497 .read(true)
498 .write(true)
499 .create(true)
500 .truncate(false)
501 .open(&self.path)
502 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
503
504 flock(&file.as_fd(), FlockOperation::LockExclusive)
505 .with_context(|| "Failed to acquire exclusive lock")?;
506
507 let result = f(&mut file);
508
509 result
511 }
512
513 fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
514 where
515 F: FnOnce(&mut File) -> Result<T>,
516 {
517 let mut file = File::open(&self.path)
518 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
519
520 flock(&file.as_fd(), FlockOperation::LockShared)
521 .with_context(|| "Failed to acquire shared lock")?;
522
523 f(&mut file)
524 }
525}
526
527fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
529 file.seek(std::io::SeekFrom::Start(0)).ok();
530 let reader = BufReader::new(file);
531 let mut items = Vec::new();
532
533 for (line_num, line) in reader.lines().enumerate() {
534 let line = match line {
535 Ok(l) => l,
536 Err(_) => continue,
537 };
538 let trimmed = line.trim();
539 if trimmed.is_empty() {
540 continue;
541 }
542 match serde_json::from_str::<Item>(trimmed) {
543 Ok(item) => items.push(item),
544 Err(e) => {
545 eprintln!(
546 "Warning: Skipping corrupt line {} in {}: {}",
547 line_num + 1,
548 path.display(),
549 e
550 );
551 }
552 }
553 }
554 items
555}
556
557fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
559 file.seek(std::io::SeekFrom::Start(0))?;
560 file.set_len(0)?;
561 for item in items {
562 writeln!(file, "{}", item.to_json_string())?;
563 }
564 file.flush()?;
565 Ok(())
566}
567
568fn generate_id(existing_ids: &HashSet<String>) -> String {
570 let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
571 let mut rng = rand::thread_rng();
572 loop {
573 let id: String = (0..3)
574 .map(|_| chars[rng.gen_range(0..chars.len())])
575 .collect();
576 if !existing_ids.contains(&id) {
577 return id;
578 }
579 }
580}
581
582fn now_iso8601() -> String {
584 chrono::Utc::now()
585 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
586 .to_string()
587}
588
589#[derive(Default)]
591pub struct UpdateAttrs {
592 pub status: Option<String>,
593 pub title: Option<String>,
594 pub description: Option<String>,
595 pub metadata: Option<serde_json::Value>,
596 pub session_id: Option<String>,
597 pub blocked_by: Option<Vec<String>>,
598 pub sources: Option<Vec<Source>>,
599}