1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub struct Item {
21 pub id: String,
22
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub title: Option<String>,
25
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub description: Option<String>,
28
29 pub status: String,
30
31 pub sources: Vec<Source>,
32
33 pub metadata: serde_json::Value,
34
35 pub session_id: Option<String>,
37
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub worktree: Option<Worktree>,
40
41 #[serde(skip_serializing_if = "is_empty_vec")]
42 #[serde(default)]
43 pub blocked_by: Vec<String>,
44
45 #[serde(skip_serializing_if = "is_empty_json_vec")]
46 #[serde(default)]
47 pub errors: Vec<serde_json::Value>,
48
49 pub created_at: String,
50 pub updated_at: String,
51}
52
53fn is_empty_vec(v: &[String]) -> bool {
54 v.is_empty()
55}
56
57fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
58 v.is_empty()
59}
60
61impl Item {
70 pub fn to_json_value(&self) -> serde_json::Value {
71 let mut map = serde_json::Map::new();
72
73 map.insert("id".to_string(), serde_json::Value::String(self.id.clone()));
74
75 if let Some(ref title) = self.title {
77 map.insert(
78 "title".to_string(),
79 serde_json::Value::String(title.clone()),
80 );
81 }
82 if let Some(ref description) = self.description {
83 map.insert(
84 "description".to_string(),
85 serde_json::Value::String(description.clone()),
86 );
87 }
88
89 map.insert(
90 "status".to_string(),
91 serde_json::Value::String(self.status.clone()),
92 );
93
94 map.insert(
95 "sources".to_string(),
96 serde_json::Value::Array(self.sources.iter().map(|s| s.to_json_value()).collect()),
97 );
98
99 map.insert("metadata".to_string(), self.metadata.clone());
100
101 map.insert(
102 "session_id".to_string(),
103 match &self.session_id {
104 Some(s) => serde_json::Value::String(s.clone()),
105 None => serde_json::Value::Null,
106 },
107 );
108
109 map.insert(
110 "created_at".to_string(),
111 serde_json::Value::String(self.created_at.clone()),
112 );
113 map.insert(
114 "updated_at".to_string(),
115 serde_json::Value::String(self.updated_at.clone()),
116 );
117
118 if let Some(ref wt) = self.worktree {
120 map.insert("worktree".to_string(), wt.to_json_value());
121 }
122
123 if !self.blocked_by.is_empty() {
125 map.insert(
126 "blocked_by".to_string(),
127 serde_json::Value::Array(
128 self.blocked_by
129 .iter()
130 .map(|s| serde_json::Value::String(s.clone()))
131 .collect(),
132 ),
133 );
134 }
135
136 if !self.errors.is_empty() {
138 map.insert(
139 "errors".to_string(),
140 serde_json::Value::Array(self.errors.clone()),
141 );
142 }
143
144 serde_json::Value::Object(map)
145 }
146
147 pub fn to_json_string(&self) -> String {
148 self.to_json_value().to_string()
149 }
150
151 pub fn pending(&self) -> bool {
152 self.status == "pending"
153 }
154
155 pub fn blocked(&self) -> bool {
156 !self.blocked_by.is_empty()
157 }
158
159 pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
160 if !self.pending() {
161 return false;
162 }
163 if !self.blocked() {
164 return true;
165 }
166 match pending_ids {
167 None => true,
168 Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
169 }
170 }
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174pub struct Source {
175 #[serde(rename = "type")]
176 pub type_: String,
177
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub path: Option<String>,
180
181 #[serde(skip_serializing_if = "Option::is_none")]
182 pub content: Option<String>,
183
184 #[serde(skip_serializing_if = "Option::is_none")]
185 pub session_id: Option<String>,
186}
187
188impl Source {
189 pub fn to_json_value(&self) -> serde_json::Value {
190 let mut map = serde_json::Map::new();
191 map.insert(
192 "type".to_string(),
193 serde_json::Value::String(self.type_.clone()),
194 );
195 if let Some(ref path) = self.path {
196 map.insert(
197 "path".to_string(),
198 serde_json::Value::String(path.clone()),
199 );
200 }
201 if let Some(ref content) = self.content {
202 map.insert(
203 "content".to_string(),
204 serde_json::Value::String(content.clone()),
205 );
206 }
207 if let Some(ref session_id) = self.session_id {
208 map.insert(
209 "session_id".to_string(),
210 serde_json::Value::String(session_id.clone()),
211 );
212 }
213 serde_json::Value::Object(map)
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
218pub struct Worktree {
219 #[serde(skip_serializing_if = "Option::is_none")]
220 pub path: Option<String>,
221 #[serde(skip_serializing_if = "Option::is_none")]
222 pub branch: Option<String>,
223}
224
225impl Worktree {
226 pub fn to_json_value(&self) -> serde_json::Value {
227 let mut map = serde_json::Map::new();
228 if let Some(ref path) = self.path {
229 map.insert(
230 "path".to_string(),
231 serde_json::Value::String(path.clone()),
232 );
233 }
234 if let Some(ref branch) = self.branch {
235 map.insert(
236 "branch".to_string(),
237 serde_json::Value::String(branch.clone()),
238 );
239 }
240 serde_json::Value::Object(map)
241 }
242}
243
244pub struct Queue {
247 pub path: PathBuf,
248}
249
250impl Queue {
251 pub fn new(path: PathBuf) -> Self {
252 Self { path }
253 }
254
255 pub fn push(
257 &self,
258 sources: Vec<Source>,
259 title: Option<String>,
260 metadata: serde_json::Value,
261 session_id: Option<String>,
262 blocked_by: Vec<String>,
263 ) -> Result<Item> {
264 self.validate_sources(&sources)?;
265 self.push_with_description(sources, title, None, metadata, session_id, blocked_by)
266 }
267
268 pub fn push_with_description(
270 &self,
271 sources: Vec<Source>,
272 title: Option<String>,
273 description: Option<String>,
274 metadata: serde_json::Value,
275 session_id: Option<String>,
276 blocked_by: Vec<String>,
277 ) -> Result<Item> {
278 let has_metadata = match &metadata {
279 serde_json::Value::Object(map) => !map.is_empty(),
280 _ => true,
281 };
282
283 if sources.is_empty() && title.is_none() && description.is_none() && !has_metadata {
284 anyhow::bail!("Item requires at least one source, title, description, or metadata");
285 }
286 self.validate_source_types(&sources)?;
287
288 self.with_exclusive_lock(|f| {
289 let existing = read_items(f, &self.path);
290 let existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
291
292 let now = now_iso8601();
293 let item = Item {
294 id: generate_id(&existing_ids),
295 title,
296 description,
297 status: "pending".to_string(),
298 sources,
299 metadata,
300 session_id,
301 worktree: None,
302 blocked_by,
303 errors: Vec::new(),
304 created_at: now.clone(),
305 updated_at: now,
306 };
307
308 f.seek(std::io::SeekFrom::End(0))?;
310 writeln!(f, "{}", item.to_json_string())?;
311 f.flush()?;
312
313 Ok(item)
314 })
315 }
316
317 pub fn all(&self) -> Vec<Item> {
319 if !self.path.exists() {
320 return Vec::new();
321 }
322 match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
323 Ok(items) => items,
324 Err(_) => Vec::new(),
325 }
326 }
327
328 pub fn find(&self, id: &str) -> Option<Item> {
330 self.all().into_iter().find(|item| item.id == id)
331 }
332
333 pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
335 let items = self.all();
336 match status {
337 Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
338 None => items,
339 }
340 }
341
342 pub fn ready(&self) -> Vec<Item> {
344 let items = self.all();
345 let pending_ids: HashSet<String> = items
346 .iter()
347 .filter(|i| i.pending())
348 .map(|i| i.id.clone())
349 .collect();
350 items
351 .into_iter()
352 .filter(|item| item.ready(Some(&pending_ids)))
353 .collect()
354 }
355
356 pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
358 self.with_exclusive_lock(|f| {
359 let mut items = read_items(f, &self.path);
360 let index = items.iter().position(|item| item.id == id);
361 let index = match index {
362 Some(i) => i,
363 None => return Ok(None),
364 };
365
366 let item = &mut items[index];
367
368 if let Some(status) = &attrs.status {
369 if !VALID_STATUSES.contains(&status.as_str()) {
370 anyhow::bail!(
371 "Invalid status: {}. Valid: {}",
372 status,
373 VALID_STATUSES.join(", ")
374 );
375 }
376 item.status = status.clone();
377 }
378 if let Some(title) = attrs.title {
379 item.title = Some(title);
380 }
381 if let Some(description) = attrs.description {
382 item.description = Some(description);
383 }
384 if let Some(metadata) = attrs.metadata {
385 item.metadata = metadata;
386 }
387 if let Some(session_id) = attrs.session_id {
388 item.session_id = Some(session_id);
389 }
390 if let Some(blocked_by) = attrs.blocked_by {
391 item.blocked_by = blocked_by;
392 }
393 if let Some(sources) = attrs.sources {
394 item.sources = sources;
395 }
396
397 item.updated_at = now_iso8601();
398
399 let updated = item.clone();
400 rewrite_items(f, &items)?;
401 Ok(Some(updated))
402 })
403 }
404
405 pub fn remove(&self, id: &str) -> Result<Option<Item>> {
407 self.with_exclusive_lock(|f| {
408 let mut items = read_items(f, &self.path);
409 let index = items.iter().position(|item| item.id == id);
410 match index {
411 Some(i) => {
412 let removed = items.remove(i);
413 rewrite_items(f, &items)?;
414 Ok(Some(removed))
415 }
416 None => Ok(None),
417 }
418 })
419 }
420
421 fn validate_sources(&self, sources: &[Source]) -> Result<()> {
424 if sources.is_empty() {
425 anyhow::bail!("Sources cannot be empty");
426 }
427 self.validate_source_types(sources)
428 }
429
430 fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
431 for source in sources {
432 if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
433 anyhow::bail!(
434 "Invalid source type: {}. Valid: {}",
435 source.type_,
436 VALID_SOURCE_TYPES.join(", ")
437 );
438 }
439 }
440 Ok(())
441 }
442
443 fn ensure_directory(&self) -> Result<()> {
444 if let Some(parent) = self.path.parent() {
445 if !parent.exists() {
446 fs::create_dir_all(parent)?;
447 }
448 }
449 Ok(())
450 }
451
452 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
453 where
454 F: FnOnce(&mut File) -> Result<T>,
455 {
456 self.ensure_directory()?;
457 let mut file = OpenOptions::new()
458 .read(true)
459 .write(true)
460 .create(true)
461 .truncate(false)
462 .open(&self.path)
463 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
464
465 flock(&file.as_fd(), FlockOperation::LockExclusive)
466 .with_context(|| "Failed to acquire exclusive lock")?;
467
468 let result = f(&mut file);
469
470 result
472 }
473
474 fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
475 where
476 F: FnOnce(&mut File) -> Result<T>,
477 {
478 let mut file = File::open(&self.path)
479 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
480
481 flock(&file.as_fd(), FlockOperation::LockShared)
482 .with_context(|| "Failed to acquire shared lock")?;
483
484 f(&mut file)
485 }
486}
487
488fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
490 file.seek(std::io::SeekFrom::Start(0)).ok();
491 let reader = BufReader::new(file);
492 let mut items = Vec::new();
493
494 for (line_num, line) in reader.lines().enumerate() {
495 let line = match line {
496 Ok(l) => l,
497 Err(_) => continue,
498 };
499 let trimmed = line.trim();
500 if trimmed.is_empty() {
501 continue;
502 }
503 match serde_json::from_str::<Item>(trimmed) {
504 Ok(item) => items.push(item),
505 Err(e) => {
506 eprintln!(
507 "Warning: Skipping corrupt line {} in {}: {}",
508 line_num + 1,
509 path.display(),
510 e
511 );
512 }
513 }
514 }
515 items
516}
517
518fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
520 file.seek(std::io::SeekFrom::Start(0))?;
521 file.set_len(0)?;
522 for item in items {
523 writeln!(file, "{}", item.to_json_string())?;
524 }
525 file.flush()?;
526 Ok(())
527}
528
529fn generate_id(existing_ids: &HashSet<String>) -> String {
531 let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
532 let mut rng = rand::thread_rng();
533 loop {
534 let id: String = (0..3).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
535 if !existing_ids.contains(&id) {
536 return id;
537 }
538 }
539}
540
541fn now_iso8601() -> String {
543 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
544}
545
546#[derive(Default)]
548pub struct UpdateAttrs {
549 pub status: Option<String>,
550 pub title: Option<String>,
551 pub description: Option<String>,
552 pub metadata: Option<serde_json::Value>,
553 pub session_id: Option<String>,
554 pub blocked_by: Option<Vec<String>>,
555 pub sources: Option<Vec<Source>>,
556}