1use crate::file_lock::FileLock;
18use crate::task::{Task, TaskStatus};
19use std::io;
20use std::path::Path;
21use tracing::warn;
22
23pub struct TaskStore {
25 path: std::path::PathBuf,
26 tasks: Vec<Task>,
27 lock: FileLock,
28}
29
30fn parse_task_line(line: &str) -> Option<Task> {
32 match serde_json::from_str(line) {
33 Ok(task) => Some(task),
34 Err(e) => {
35 warn!(
36 error = %e,
37 line = line.chars().take(200).collect::<String>(),
38 "Skipping malformed task line in JSONL"
39 );
40 None
41 }
42 }
43}
44
45impl TaskStore {
46 pub fn load(path: &Path) -> io::Result<Self> {
53 let lock = FileLock::new(path)?;
54 let _guard = lock.shared()?;
55
56 let tasks = if path.exists() {
57 let content = std::fs::read_to_string(path)?;
58 content
59 .lines()
60 .filter(|line| !line.trim().is_empty())
61 .filter_map(|line| parse_task_line(line))
62 .collect()
63 } else {
64 Vec::new()
65 };
66
67 Ok(Self {
68 path: path.to_path_buf(),
69 tasks,
70 lock,
71 })
72 }
73
74 pub fn save(&self) -> io::Result<()> {
79 let _guard = self.lock.exclusive()?;
80
81 if let Some(parent) = self.path.parent() {
82 std::fs::create_dir_all(parent)?;
83 }
84 let content: String = self
85 .tasks
86 .iter()
87 .map(|t| {
88 serde_json::to_string(t).map_err(|e| {
89 io::Error::new(
90 io::ErrorKind::InvalidData,
91 format!("task serialization failed: {e}"),
92 )
93 })
94 })
95 .collect::<Result<Vec<_>, _>>()?
96 .join("\n");
97 std::fs::write(
98 &self.path,
99 if content.is_empty() {
100 String::new()
101 } else {
102 content + "\n"
103 },
104 )
105 }
106
107 pub fn reload(&mut self) -> io::Result<()> {
112 let _guard = self.lock.shared()?;
113
114 self.tasks = if self.path.exists() {
115 let content = std::fs::read_to_string(&self.path)?;
116 content
117 .lines()
118 .filter(|line| !line.trim().is_empty())
119 .filter_map(|line| parse_task_line(line))
120 .collect()
121 } else {
122 Vec::new()
123 };
124
125 Ok(())
126 }
127
128 pub fn with_exclusive_lock<F, T>(&mut self, f: F) -> io::Result<T>
143 where
144 F: FnOnce(&mut Self) -> T,
145 {
146 let _guard = self.lock.exclusive()?;
147
148 self.tasks = if self.path.exists() {
150 let content = std::fs::read_to_string(&self.path)?;
151 content
152 .lines()
153 .filter(|line| !line.trim().is_empty())
154 .filter_map(|line| parse_task_line(line))
155 .collect()
156 } else {
157 Vec::new()
158 };
159
160 let result = f(self);
162
163 if let Some(parent) = self.path.parent() {
165 std::fs::create_dir_all(parent)?;
166 }
167 let content: String = self
168 .tasks
169 .iter()
170 .map(|t| {
171 serde_json::to_string(t).map_err(|e| {
172 io::Error::new(
173 io::ErrorKind::InvalidData,
174 format!("task serialization failed: {e}"),
175 )
176 })
177 })
178 .collect::<Result<Vec<_>, _>>()?
179 .join("\n");
180 std::fs::write(
181 &self.path,
182 if content.is_empty() {
183 String::new()
184 } else {
185 content + "\n"
186 },
187 )?;
188
189 Ok(result)
190 }
191
192 pub fn add(&mut self, task: Task) -> &Task {
194 self.tasks.push(task);
195 self.tasks.last().unwrap()
196 }
197
198 pub fn get(&self, id: &str) -> Option<&Task> {
200 self.tasks.iter().find(|t| t.id == id)
201 }
202
203 pub fn get_by_key(&self, key: &str) -> Option<&Task> {
205 self.tasks.iter().find(|t| t.key.as_deref() == Some(key))
206 }
207
208 pub fn get_mut(&mut self, id: &str) -> Option<&mut Task> {
210 self.tasks.iter_mut().find(|t| t.id == id)
211 }
212
213 pub fn get_by_key_mut(&mut self, key: &str) -> Option<&mut Task> {
215 self.tasks
216 .iter_mut()
217 .find(|t| t.key.as_deref() == Some(key))
218 }
219
220 pub fn close(&mut self, id: &str) -> Option<&Task> {
222 if let Some(task) = self.get_mut(id) {
223 task.status = TaskStatus::Closed;
224 task.closed = Some(chrono::Utc::now().to_rfc3339());
225 return self.get(id);
226 }
227 None
228 }
229
230 pub fn start(&mut self, id: &str) -> Option<&Task> {
232 if let Some(task) = self.get_mut(id) {
233 task.start();
234 return self.get(id);
235 }
236 None
237 }
238
239 pub fn fail(&mut self, id: &str) -> Option<&Task> {
241 if let Some(task) = self.get_mut(id) {
242 task.status = TaskStatus::Failed;
243 task.closed = Some(chrono::Utc::now().to_rfc3339());
244 return self.get(id);
245 }
246 None
247 }
248
249 pub fn reopen(&mut self, id: &str) -> Option<&Task> {
251 if let Some(task) = self.get_mut(id) {
252 task.reopen();
253 return self.get(id);
254 }
255 None
256 }
257
258 pub fn ensure(&mut self, task: Task) -> &Task {
263 if let Some(key) = task.key.as_deref()
264 && let Some(existing_idx) = self
265 .tasks
266 .iter()
267 .position(|existing| existing.key.as_deref() == Some(key))
268 {
269 let existing = &mut self.tasks[existing_idx];
270 existing.title = task.title;
271 existing.priority = task.priority;
272 if task.description.is_some() {
273 existing.description = task.description;
274 }
275 if !task.blocked_by.is_empty() {
276 existing.blocked_by = task.blocked_by;
277 }
278 return &self.tasks[existing_idx];
279 }
280
281 self.tasks.push(task);
282 self.tasks.last().unwrap()
283 }
284
285 pub fn all(&self) -> &[Task] {
287 &self.tasks
288 }
289
290 pub fn open(&self) -> Vec<&Task> {
292 self.tasks
293 .iter()
294 .filter(|t| t.status != TaskStatus::Closed)
295 .collect()
296 }
297
298 pub fn ready(&self) -> Vec<&Task> {
300 self.tasks
301 .iter()
302 .filter(|t| t.is_ready(&self.tasks))
303 .collect()
304 }
305
306 pub fn has_open_tasks(&self) -> bool {
310 self.tasks.iter().any(|t| t.status != TaskStatus::Closed)
311 }
312
313 pub fn has_pending_tasks(&self) -> bool {
318 self.tasks.iter().any(|t| !t.status.is_terminal())
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use tempfile::TempDir;
326
327 #[test]
328 fn test_load_nonexistent_file() {
329 let tmp = TempDir::new().unwrap();
330 let path = tmp.path().join("tasks.jsonl");
331 let store = TaskStore::load(&path).unwrap();
332 assert_eq!(store.all().len(), 0);
333 }
334
335 #[test]
336 fn test_add_and_save() {
337 let tmp = TempDir::new().unwrap();
338 let path = tmp.path().join("tasks.jsonl");
339
340 let mut store = TaskStore::load(&path).unwrap();
341 let task = Task::new("Test task".to_string(), 1);
342 store.add(task);
343 store.save().unwrap();
344
345 let loaded = TaskStore::load(&path).unwrap();
346 assert_eq!(loaded.all().len(), 1);
347 assert_eq!(loaded.all()[0].title, "Test task");
348 }
349
350 #[test]
351 fn test_get_task() {
352 let tmp = TempDir::new().unwrap();
353 let path = tmp.path().join("tasks.jsonl");
354 let mut store = TaskStore::load(&path).unwrap();
355 let task = Task::new("Test".to_string(), 1);
356 let id = task.id.clone();
357 store.add(task);
358
359 assert!(store.get(&id).is_some());
360 assert_eq!(store.get(&id).unwrap().title, "Test");
361 }
362
363 #[test]
364 fn test_get_task_by_key() {
365 let tmp = TempDir::new().unwrap();
366 let path = tmp.path().join("tasks.jsonl");
367 let mut store = TaskStore::load(&path).unwrap();
368 let task = Task::new("Test".to_string(), 1).with_key(Some("phase:design".to_string()));
369 store.add(task);
370
371 assert!(store.get_by_key("phase:design").is_some());
372 assert_eq!(store.get_by_key("phase:design").unwrap().title, "Test");
373 }
374
375 #[test]
376 fn test_close_task() {
377 let tmp = TempDir::new().unwrap();
378 let path = tmp.path().join("tasks.jsonl");
379 let mut store = TaskStore::load(&path).unwrap();
380 let task = Task::new("Test".to_string(), 1);
381 let id = task.id.clone();
382 store.add(task);
383
384 let closed = store.close(&id).unwrap();
385 assert_eq!(closed.status, TaskStatus::Closed);
386 assert!(closed.closed.is_some());
387 }
388
389 #[test]
390 fn test_start_task() {
391 let tmp = TempDir::new().unwrap();
392 let path = tmp.path().join("tasks.jsonl");
393 let mut store = TaskStore::load(&path).unwrap();
394 let task = Task::new("Test".to_string(), 1);
395 let id = task.id.clone();
396 store.add(task);
397
398 let started = store.start(&id).unwrap();
399 assert_eq!(started.status, TaskStatus::InProgress);
400 assert!(started.started.is_some());
401 }
402
403 #[test]
404 fn test_reopen_task() {
405 let tmp = TempDir::new().unwrap();
406 let path = tmp.path().join("tasks.jsonl");
407 let mut store = TaskStore::load(&path).unwrap();
408 let task = Task::new("Test".to_string(), 1);
409 let id = task.id.clone();
410 store.add(task);
411 store.close(&id);
412
413 let reopened = store.reopen(&id).unwrap();
414 assert_eq!(reopened.status, TaskStatus::Open);
415 assert!(reopened.closed.is_none());
416 }
417
418 #[test]
419 fn test_open_tasks() {
420 let tmp = TempDir::new().unwrap();
421 let path = tmp.path().join("tasks.jsonl");
422 let mut store = TaskStore::load(&path).unwrap();
423
424 let task1 = Task::new("Open 1".to_string(), 1);
425 store.add(task1);
426
427 let mut task2 = Task::new("Closed".to_string(), 1);
428 task2.status = TaskStatus::Closed;
429 store.add(task2);
430
431 assert_eq!(store.open().len(), 1);
432 }
433
434 #[test]
435 fn test_ready_tasks() {
436 let tmp = TempDir::new().unwrap();
437 let path = tmp.path().join("tasks.jsonl");
438 let mut store = TaskStore::load(&path).unwrap();
439
440 let task1 = Task::new("Ready".to_string(), 1);
441 let id1 = task1.id.clone();
442 store.add(task1);
443
444 let mut task2 = Task::new("Blocked".to_string(), 1);
445 task2.blocked_by.push(id1);
446 store.add(task2);
447
448 let ready = store.ready();
449 assert_eq!(ready.len(), 1);
450 assert_eq!(ready[0].title, "Ready");
451 }
452
453 #[test]
454 fn test_ensure_deduplicates_by_key() {
455 let tmp = TempDir::new().unwrap();
456 let path = tmp.path().join("tasks.jsonl");
457 let mut store = TaskStore::load(&path).unwrap();
458
459 let first = Task::new("First".to_string(), 1).with_key(Some("impl:task-01".to_string()));
460 let second = Task::new("Second".to_string(), 3).with_key(Some("impl:task-01".to_string()));
461
462 let id = store.ensure(first).id.clone();
463 let deduped_id = store.ensure(second).id.clone();
464 let deduped = store
465 .get_by_key("impl:task-01")
466 .expect("deduped task should exist");
467
468 assert_eq!(store.all().len(), 1);
469 assert_eq!(deduped_id, id);
470 assert_eq!(deduped.title, "Second");
471 assert_eq!(deduped.priority, 3);
472 }
473
474 #[test]
475 fn test_has_open_tasks() {
476 let tmp = TempDir::new().unwrap();
477 let path = tmp.path().join("tasks.jsonl");
478 let mut store = TaskStore::load(&path).unwrap();
479
480 assert!(!store.has_open_tasks());
481
482 let task = Task::new("Test".to_string(), 1);
483 store.add(task);
484
485 assert!(store.has_open_tasks());
486 }
487
488 #[test]
489 fn test_has_pending_tasks_excludes_failed() {
490 let tmp = TempDir::new().unwrap();
491 let path = tmp.path().join("tasks.jsonl");
492 let mut store = TaskStore::load(&path).unwrap();
493
494 assert!(!store.has_pending_tasks());
496
497 let task1 = Task::new("Open task".to_string(), 1);
499 store.add(task1);
500 assert!(store.has_pending_tasks());
501
502 let id = store.all()[0].id.clone();
504 store.close(&id);
505 assert!(!store.has_pending_tasks());
506 }
507
508 #[test]
509 fn test_has_pending_tasks_failed_is_terminal() {
510 let tmp = TempDir::new().unwrap();
511 let path = tmp.path().join("tasks.jsonl");
512 let mut store = TaskStore::load(&path).unwrap();
513
514 let task = Task::new("Failed task".to_string(), 1);
516 store.add(task);
517 let id = store.all()[0].id.clone();
518 store.fail(&id);
519
520 assert!(!store.has_pending_tasks());
522
523 assert!(store.has_open_tasks());
525 }
526
527 #[test]
528 fn test_reload() {
529 let tmp = TempDir::new().unwrap();
530 let path = tmp.path().join("tasks.jsonl");
531
532 let mut store1 = TaskStore::load(&path).unwrap();
534 store1.add(Task::new("Task 1".to_string(), 1));
535 store1.save().unwrap();
536
537 let mut store2 = TaskStore::load(&path).unwrap();
539 store2.add(Task::new("Task 2".to_string(), 1));
540 store2.save().unwrap();
541
542 store1.reload().unwrap();
544 assert_eq!(store1.all().len(), 2);
545 }
546
547 #[test]
548 fn test_with_exclusive_lock() {
549 let tmp = TempDir::new().unwrap();
550 let path = tmp.path().join("tasks.jsonl");
551
552 let mut store = TaskStore::load(&path).unwrap();
553
554 store
556 .with_exclusive_lock(|s| {
557 s.add(Task::new("Atomic task".to_string(), 1));
558 })
559 .unwrap();
560
561 let loaded = TaskStore::load(&path).unwrap();
563 assert_eq!(loaded.all().len(), 1);
564 assert_eq!(loaded.all()[0].title, "Atomic task");
565 }
566
567 #[test]
568 fn test_concurrent_writes_with_lock() {
569 use std::sync::{Arc, Barrier};
570 use std::thread;
571
572 let tmp = TempDir::new().unwrap();
573 let path = tmp.path().join("tasks.jsonl");
574 let path_clone = path.clone();
575
576 let barrier = Arc::new(Barrier::new(2));
577 let barrier_clone = barrier.clone();
578
579 let handle1 = thread::spawn(move || {
581 let mut store = TaskStore::load(&path).unwrap();
582 barrier.wait();
583
584 store
585 .with_exclusive_lock(|s| {
586 s.add(Task::new("Task from thread 1".to_string(), 1));
587 })
588 .unwrap();
589 });
590
591 let handle2 = thread::spawn(move || {
593 let mut store = TaskStore::load(&path_clone).unwrap();
594 barrier_clone.wait();
595
596 store
597 .with_exclusive_lock(|s| {
598 s.add(Task::new("Task from thread 2".to_string(), 1));
599 })
600 .unwrap();
601 });
602
603 handle1.join().unwrap();
604 handle2.join().unwrap();
605
606 let final_store = TaskStore::load(tmp.path().join("tasks.jsonl").as_ref()).unwrap();
608 assert_eq!(final_store.all().len(), 2);
609 }
610
611 #[test]
612 fn test_load_skips_malformed_lines() {
613 let tmp = TempDir::new().unwrap();
614 let path = tmp.path().join("tasks.jsonl");
615
616 let mut store = TaskStore::load(&path).unwrap();
618 let task = Task::new("Valid task".to_string(), 1);
619 store.add(task);
620 store.save().unwrap();
621
622 let mut content = std::fs::read_to_string(&path).unwrap();
624 content.push_str("this is not json\n");
625 content.push_str("{\"broken\": true}\n");
626 std::fs::write(&path, content).unwrap();
627
628 let loaded = TaskStore::load(&path).unwrap();
630 assert_eq!(loaded.all().len(), 1);
631 assert_eq!(loaded.all()[0].title, "Valid task");
632 }
633}