1use anyhow::{Context, Result};
2use rand::Rng;
3use rustix::fs::{flock, FlockOperation};
4use serde::{Deserialize, Serialize};
5use std::collections::HashSet;
6use std::fs::{self, File, OpenOptions};
7use std::io::{BufRead, BufReader, Seek, Write};
8use std::os::fd::AsFd;
9use std::path::{Path, PathBuf};
10
11pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
13
14pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20pub struct Item {
21 pub id: String,
22
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub title: Option<String>,
25
26 pub status: String,
27
28 pub sources: Vec<Source>,
29
30 pub metadata: serde_json::Value,
31
32 pub session_id: Option<String>,
34
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub worktree: Option<Worktree>,
37
38 #[serde(skip_serializing_if = "is_empty_vec")]
39 #[serde(default)]
40 pub blocked_by: Vec<String>,
41
42 #[serde(skip_serializing_if = "is_empty_json_vec")]
43 #[serde(default)]
44 pub errors: Vec<serde_json::Value>,
45
46 pub created_at: String,
47 pub updated_at: String,
48}
49
50fn is_empty_vec(v: &[String]) -> bool {
51 v.is_empty()
52}
53
54fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
55 v.is_empty()
56}
57
58impl Item {
67 pub fn to_json_value(&self) -> serde_json::Value {
68 let mut map = serde_json::Map::new();
69
70 map.insert("id".to_string(), serde_json::Value::String(self.id.clone()));
71
72 if let Some(ref title) = self.title {
74 map.insert(
75 "title".to_string(),
76 serde_json::Value::String(title.clone()),
77 );
78 }
79
80 map.insert(
81 "status".to_string(),
82 serde_json::Value::String(self.status.clone()),
83 );
84
85 map.insert(
86 "sources".to_string(),
87 serde_json::Value::Array(self.sources.iter().map(|s| s.to_json_value()).collect()),
88 );
89
90 map.insert("metadata".to_string(), self.metadata.clone());
91
92 map.insert(
93 "session_id".to_string(),
94 match &self.session_id {
95 Some(s) => serde_json::Value::String(s.clone()),
96 None => serde_json::Value::Null,
97 },
98 );
99
100 map.insert(
101 "created_at".to_string(),
102 serde_json::Value::String(self.created_at.clone()),
103 );
104 map.insert(
105 "updated_at".to_string(),
106 serde_json::Value::String(self.updated_at.clone()),
107 );
108
109 if let Some(ref wt) = self.worktree {
111 map.insert("worktree".to_string(), wt.to_json_value());
112 }
113
114 if !self.blocked_by.is_empty() {
116 map.insert(
117 "blocked_by".to_string(),
118 serde_json::Value::Array(
119 self.blocked_by
120 .iter()
121 .map(|s| serde_json::Value::String(s.clone()))
122 .collect(),
123 ),
124 );
125 }
126
127 if !self.errors.is_empty() {
129 map.insert(
130 "errors".to_string(),
131 serde_json::Value::Array(self.errors.clone()),
132 );
133 }
134
135 serde_json::Value::Object(map)
136 }
137
138 pub fn to_json_string(&self) -> String {
139 self.to_json_value().to_string()
140 }
141
142 pub fn pending(&self) -> bool {
143 self.status == "pending"
144 }
145
146 pub fn blocked(&self) -> bool {
147 !self.blocked_by.is_empty()
148 }
149
150 pub fn ready(&self, pending_ids: Option<&HashSet<String>>) -> bool {
151 if !self.pending() {
152 return false;
153 }
154 if !self.blocked() {
155 return true;
156 }
157 match pending_ids {
158 None => true,
159 Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
160 }
161 }
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
165pub struct Source {
166 #[serde(rename = "type")]
167 pub type_: String,
168
169 #[serde(skip_serializing_if = "Option::is_none")]
170 pub path: Option<String>,
171
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub content: Option<String>,
174
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub session_id: Option<String>,
177}
178
179impl Source {
180 pub fn to_json_value(&self) -> serde_json::Value {
181 let mut map = serde_json::Map::new();
182 map.insert(
183 "type".to_string(),
184 serde_json::Value::String(self.type_.clone()),
185 );
186 if let Some(ref path) = self.path {
187 map.insert(
188 "path".to_string(),
189 serde_json::Value::String(path.clone()),
190 );
191 }
192 if let Some(ref content) = self.content {
193 map.insert(
194 "content".to_string(),
195 serde_json::Value::String(content.clone()),
196 );
197 }
198 if let Some(ref session_id) = self.session_id {
199 map.insert(
200 "session_id".to_string(),
201 serde_json::Value::String(session_id.clone()),
202 );
203 }
204 serde_json::Value::Object(map)
205 }
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
209pub struct Worktree {
210 #[serde(skip_serializing_if = "Option::is_none")]
211 pub path: Option<String>,
212 #[serde(skip_serializing_if = "Option::is_none")]
213 pub branch: Option<String>,
214}
215
216impl Worktree {
217 pub fn to_json_value(&self) -> serde_json::Value {
218 let mut map = serde_json::Map::new();
219 if let Some(ref path) = self.path {
220 map.insert(
221 "path".to_string(),
222 serde_json::Value::String(path.clone()),
223 );
224 }
225 if let Some(ref branch) = self.branch {
226 map.insert(
227 "branch".to_string(),
228 serde_json::Value::String(branch.clone()),
229 );
230 }
231 serde_json::Value::Object(map)
232 }
233}
234
235pub struct Queue {
238 pub path: PathBuf,
239}
240
241impl Queue {
242 pub fn new(path: PathBuf) -> Self {
243 Self { path }
244 }
245
246 pub fn push(
248 &self,
249 sources: Vec<Source>,
250 title: Option<String>,
251 metadata: serde_json::Value,
252 session_id: Option<String>,
253 blocked_by: Vec<String>,
254 ) -> Result<Item> {
255 self.validate_sources(&sources)?;
256
257 self.with_exclusive_lock(|f| {
258 let existing = read_items(f, &self.path);
259 let existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
260
261 let now = now_iso8601();
262 let item = Item {
263 id: generate_id(&existing_ids),
264 title,
265 status: "pending".to_string(),
266 sources,
267 metadata,
268 session_id,
269 worktree: None,
270 blocked_by,
271 errors: Vec::new(),
272 created_at: now.clone(),
273 updated_at: now,
274 };
275
276 f.seek(std::io::SeekFrom::End(0))?;
278 writeln!(f, "{}", item.to_json_string())?;
279 f.flush()?;
280
281 Ok(item)
282 })
283 }
284
285 pub fn all(&self) -> Vec<Item> {
287 if !self.path.exists() {
288 return Vec::new();
289 }
290 match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
291 Ok(items) => items,
292 Err(_) => Vec::new(),
293 }
294 }
295
296 pub fn find(&self, id: &str) -> Option<Item> {
298 self.all().into_iter().find(|item| item.id == id)
299 }
300
301 pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
303 let items = self.all();
304 match status {
305 Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
306 None => items,
307 }
308 }
309
310 pub fn ready(&self) -> Vec<Item> {
312 let items = self.all();
313 let pending_ids: HashSet<String> = items
314 .iter()
315 .filter(|i| i.pending())
316 .map(|i| i.id.clone())
317 .collect();
318 items
319 .into_iter()
320 .filter(|item| item.ready(Some(&pending_ids)))
321 .collect()
322 }
323
324 pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
326 self.with_exclusive_lock(|f| {
327 let mut items = read_items(f, &self.path);
328 let index = items.iter().position(|item| item.id == id);
329 let index = match index {
330 Some(i) => i,
331 None => return Ok(None),
332 };
333
334 let item = &mut items[index];
335
336 if let Some(status) = &attrs.status {
337 if !VALID_STATUSES.contains(&status.as_str()) {
338 anyhow::bail!(
339 "Invalid status: {}. Valid: {}",
340 status,
341 VALID_STATUSES.join(", ")
342 );
343 }
344 item.status = status.clone();
345 }
346 if let Some(title) = attrs.title {
347 item.title = Some(title);
348 }
349 if let Some(metadata) = attrs.metadata {
350 item.metadata = metadata;
351 }
352 if let Some(session_id) = attrs.session_id {
353 item.session_id = Some(session_id);
354 }
355 if let Some(blocked_by) = attrs.blocked_by {
356 item.blocked_by = blocked_by;
357 }
358 if let Some(sources) = attrs.sources {
359 item.sources = sources;
360 }
361
362 item.updated_at = now_iso8601();
363
364 let updated = item.clone();
365 rewrite_items(f, &items)?;
366 Ok(Some(updated))
367 })
368 }
369
370 pub fn remove(&self, id: &str) -> Result<Option<Item>> {
372 self.with_exclusive_lock(|f| {
373 let mut items = read_items(f, &self.path);
374 let index = items.iter().position(|item| item.id == id);
375 match index {
376 Some(i) => {
377 let removed = items.remove(i);
378 rewrite_items(f, &items)?;
379 Ok(Some(removed))
380 }
381 None => Ok(None),
382 }
383 })
384 }
385
386 fn validate_sources(&self, sources: &[Source]) -> Result<()> {
389 if sources.is_empty() {
390 anyhow::bail!("Sources cannot be empty");
391 }
392 for source in sources {
393 if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
394 anyhow::bail!(
395 "Invalid source type: {}. Valid: {}",
396 source.type_,
397 VALID_SOURCE_TYPES.join(", ")
398 );
399 }
400 }
401 Ok(())
402 }
403
404 fn ensure_directory(&self) -> Result<()> {
405 if let Some(parent) = self.path.parent() {
406 if !parent.exists() {
407 fs::create_dir_all(parent)?;
408 }
409 }
410 Ok(())
411 }
412
413 fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
414 where
415 F: FnOnce(&mut File) -> Result<T>,
416 {
417 self.ensure_directory()?;
418 let mut file = OpenOptions::new()
419 .read(true)
420 .write(true)
421 .create(true)
422 .truncate(false)
423 .open(&self.path)
424 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
425
426 flock(&file.as_fd(), FlockOperation::LockExclusive)
427 .with_context(|| "Failed to acquire exclusive lock")?;
428
429 let result = f(&mut file);
430
431 result
433 }
434
435 fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
436 where
437 F: FnOnce(&mut File) -> Result<T>,
438 {
439 let mut file = File::open(&self.path)
440 .with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
441
442 flock(&file.as_fd(), FlockOperation::LockShared)
443 .with_context(|| "Failed to acquire shared lock")?;
444
445 f(&mut file)
446 }
447}
448
449fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
451 file.seek(std::io::SeekFrom::Start(0)).ok();
452 let reader = BufReader::new(file);
453 let mut items = Vec::new();
454
455 for (line_num, line) in reader.lines().enumerate() {
456 let line = match line {
457 Ok(l) => l,
458 Err(_) => continue,
459 };
460 let trimmed = line.trim();
461 if trimmed.is_empty() {
462 continue;
463 }
464 match serde_json::from_str::<Item>(trimmed) {
465 Ok(item) => items.push(item),
466 Err(e) => {
467 eprintln!(
468 "Warning: Skipping corrupt line {} in {}: {}",
469 line_num + 1,
470 path.display(),
471 e
472 );
473 }
474 }
475 }
476 items
477}
478
479fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
481 file.seek(std::io::SeekFrom::Start(0))?;
482 file.set_len(0)?;
483 for item in items {
484 writeln!(file, "{}", item.to_json_string())?;
485 }
486 file.flush()?;
487 Ok(())
488}
489
490fn generate_id(existing_ids: &HashSet<String>) -> String {
492 let chars: Vec<char> = ('a'..='z').chain('0'..='9').collect();
493 let mut rng = rand::thread_rng();
494 loop {
495 let id: String = (0..3).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
496 if !existing_ids.contains(&id) {
497 return id;
498 }
499 }
500}
501
502fn now_iso8601() -> String {
504 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string()
505}
506
507#[derive(Default)]
509pub struct UpdateAttrs {
510 pub status: Option<String>,
511 pub title: Option<String>,
512 pub metadata: Option<serde_json::Value>,
513 pub session_id: Option<String>,
514 pub blocked_by: Option<Vec<String>>,
515 pub sources: Option<Vec<Source>>,
516}