1pub mod graph;
4
5use std::path::{Path, PathBuf};
6
7use async_trait::async_trait;
8
9use crate::error::{Error, Result};
10use crate::models::task::{
11 CreateTaskRequest, TaskFile, TaskFilter, TaskStatus, TaskUpdate,
12};
13use crate::util::atomic_write::atomic_write_json;
14use crate::util::file_lock::FileLock;
15use crate::util::id_gen::next_task_id;
16use crate::util::validate_name;
17
18pub use graph::DependencyGraph;
19
20#[async_trait]
22pub trait TaskManager: Send + Sync {
23 async fn create_task(&self, team: &str, req: CreateTaskRequest) -> Result<TaskFile>;
25
26 async fn update_task(&self, team: &str, id: &str, update: TaskUpdate) -> Result<TaskFile>;
28
29 async fn get_task(&self, team: &str, id: &str) -> Result<TaskFile>;
31
32 async fn list_tasks(&self, team: &str, filter: Option<TaskFilter>) -> Result<Vec<TaskFile>>;
34
35 async fn delete_task(&self, team: &str, id: &str) -> Result<()>;
37}
38
39pub struct FileTaskManager {
41 base_dir: PathBuf,
42}
43
44impl FileTaskManager {
45 pub fn new(base_dir: PathBuf) -> Self {
47 Self { base_dir }
48 }
49
50 pub fn default_dir() -> Result<Self> {
52 let home = dirs::home_dir().ok_or_else(|| {
53 Error::Other("Could not determine home directory".into())
54 })?;
55 Ok(Self::new(home.join(".claude").join("tasks")))
56 }
57
58 fn task_dir(&self, team: &str) -> PathBuf {
60 self.base_dir.join(team)
61 }
62
63 fn task_path(&self, team: &str, id: &str) -> PathBuf {
65 self.task_dir(team).join(format!("{id}.json"))
66 }
67
68 fn lock_path(&self, team: &str) -> PathBuf {
70 self.task_dir(team).join(".lock")
71 }
72
73 fn read_task_at(path: &Path, team: &str, id: &str) -> Result<TaskFile> {
75 let data = std::fs::read_to_string(path).map_err(|e| {
76 if e.kind() == std::io::ErrorKind::NotFound {
77 Error::TaskNotFound {
78 team: team.into(),
79 id: id.into(),
80 }
81 } else {
82 Error::Io(e)
83 }
84 })?;
85 let task: TaskFile = serde_json::from_str(&data)?;
86 Ok(task)
87 }
88
89 fn read_all_tasks_in(dir: &Path) -> Result<Vec<TaskFile>> {
91 if !dir.exists() {
92 return Ok(vec![]);
93 }
94
95 let mut tasks = Vec::new();
96 for entry in std::fs::read_dir(dir)? {
97 let entry = entry?;
98 let name = entry.file_name();
99 let name = name.to_string_lossy();
100 if let Some(stem) = name.strip_suffix(".json")
102 && stem.parse::<u64>().is_ok()
103 {
104 let data = std::fs::read_to_string(entry.path())?;
105 let task: TaskFile = serde_json::from_str(&data)?;
106 tasks.push(task);
107 }
108 }
109
110 Ok(tasks)
111 }
112
113 fn apply_update(task: &mut TaskFile, update: &TaskUpdate) -> Result<()> {
115 if let Some(new_status) = update.status {
117 if !task.status.can_transition_to(new_status) {
118 return Err(Error::InvalidStatusTransition {
119 from: task.status.to_string(),
120 to: new_status.to_string(),
121 });
122 }
123 task.status = new_status;
124 }
125
126 if let Some(ref subject) = update.subject {
127 task.subject.clone_from(subject);
128 }
129 if let Some(ref desc) = update.description {
130 task.description = Some(desc.clone());
131 }
132 if let Some(ref af) = update.active_form {
133 task.active_form = Some(af.clone());
134 }
135 if let Some(ref owner) = update.owner {
136 task.owner = Some(owner.clone());
137 }
138
139 if let Some(ref add_blocks) = update.add_blocks {
141 for id in add_blocks {
142 if !task.blocks.contains(id) {
143 task.blocks.push(id.clone());
144 }
145 }
146 }
147 if let Some(ref add_blocked_by) = update.add_blocked_by {
148 for id in add_blocked_by {
149 if !task.blocked_by.contains(id) {
150 task.blocked_by.push(id.clone());
151 }
152 }
153 }
154
155 if let Some(ref new_meta) = update.metadata
157 && let Some(obj) = new_meta.as_object()
158 {
159 let existing = task
160 .metadata
161 .get_or_insert_with(|| serde_json::json!({}));
162 if let Some(existing_obj) = existing.as_object_mut() {
163 for (k, v) in obj {
164 if v.is_null() {
165 existing_obj.remove(k);
166 } else {
167 existing_obj.insert(k.clone(), v.clone());
168 }
169 }
170 }
171 }
172
173 Ok(())
174 }
175
176 fn cascade_completion(dir: &Path, completed_id: &str) -> Result<()> {
178 if !dir.exists() {
179 return Ok(());
180 }
181
182 for entry in std::fs::read_dir(dir)? {
183 let entry = entry?;
184 let path = entry.path();
185 let name = entry.file_name();
186 let name = name.to_string_lossy();
187
188 if let Some(stem) = name.strip_suffix(".json")
189 && stem.parse::<u64>().is_ok()
190 && stem != completed_id
191 {
192 let data = std::fs::read_to_string(&path)?;
193 let mut task: TaskFile = serde_json::from_str(&data)?;
194
195 if task.blocked_by.contains(&completed_id.to_string()) {
196 task.blocked_by.retain(|id| id != completed_id);
197 atomic_write_json(&path, &task)?;
198 }
199 }
200 }
201
202 Ok(())
203 }
204}
205
206#[async_trait]
207impl TaskManager for FileTaskManager {
208 async fn create_task(&self, team: &str, req: CreateTaskRequest) -> Result<TaskFile> {
209 validate_name(team)?;
210
211 let dir = self.task_dir(team);
212 let lock_path = self.lock_path(team);
213
214 tokio::task::spawn_blocking(move || {
215 std::fs::create_dir_all(&dir)?;
216
217 let _lock = FileLock::acquire(&lock_path)?;
218
219 let id = next_task_id(&dir)?;
220
221 let task = TaskFile {
222 id: id.clone(),
223 subject: req.subject,
224 description: req.description,
225 active_form: req.active_form,
226 status: TaskStatus::Pending,
227 owner: None,
228 blocks: vec![],
229 blocked_by: vec![],
230 metadata: req.metadata,
231 };
232
233 let path = dir.join(format!("{id}.json"));
234 atomic_write_json(&path, &task)?;
235
236 Ok(task)
237 })
238 .await
239 .map_err(|e| Error::JoinError(format!("{e}")))?
240 }
241
242 async fn update_task(&self, team: &str, id: &str, update: TaskUpdate) -> Result<TaskFile> {
243 validate_name(team)?;
244 validate_name(id)?;
245
246 let dir = self.task_dir(team);
247 let lock_path = self.lock_path(team);
248 let task_path = self.task_path(team, id);
249 let team = team.to_string();
250 let id = id.to_string();
251
252 tokio::task::spawn_blocking(move || {
253 std::fs::create_dir_all(&dir)?;
254
255 let _lock = FileLock::acquire(&lock_path)?;
257
258 let mut task = Self::read_task_at(&task_path, &team, &id)?;
260
261 let was_not_completed = task.status != TaskStatus::Completed;
263 Self::apply_update(&mut task, &update)?;
264 let is_now_completed = task.status == TaskStatus::Completed;
265
266 atomic_write_json(&task_path, &task)?;
268
269 if was_not_completed && is_now_completed {
271 Self::cascade_completion(&dir, &id)?;
272 }
273
274 Ok(task)
275 })
276 .await
277 .map_err(|e| Error::JoinError(format!("{e}")))?
278 }
279
280 async fn get_task(&self, team: &str, id: &str) -> Result<TaskFile> {
281 validate_name(team)?;
282 validate_name(id)?;
283
284 let dir = self.task_dir(team);
285 let lock_path = self.lock_path(team);
286 let task_path = self.task_path(team, id);
287 let team = team.to_string();
288 let id = id.to_string();
289
290 tokio::task::spawn_blocking(move || {
291 std::fs::create_dir_all(&dir)?;
292
293 let _lock = FileLock::acquire(&lock_path)?;
294 Self::read_task_at(&task_path, &team, &id)
295 })
296 .await
297 .map_err(|e| Error::JoinError(format!("{e}")))?
298 }
299
300 async fn list_tasks(&self, team: &str, filter: Option<TaskFilter>) -> Result<Vec<TaskFile>> {
301 validate_name(team)?;
302
303 let dir = self.task_dir(team);
304 let lock_path = self.lock_path(team);
305
306 tokio::task::spawn_blocking(move || {
307 std::fs::create_dir_all(&dir)?;
308
309 let _lock = FileLock::acquire(&lock_path)?;
310 let mut tasks = Self::read_all_tasks_in(&dir)?;
311
312 if let Some(filter) = filter {
313 if let Some(status) = filter.status {
314 tasks.retain(|t| t.status == status);
315 }
316 if let Some(ref owner) = filter.owner {
317 tasks.retain(|t| t.owner.as_deref() == Some(owner.as_str()));
318 }
319 if filter.unblocked_only {
320 tasks.retain(|t| t.blocked_by.is_empty());
321 }
322 }
323
324 tasks.sort_by(|a, b| {
326 let a_num = a.id.parse::<u64>().unwrap_or(u64::MAX);
327 let b_num = b.id.parse::<u64>().unwrap_or(u64::MAX);
328 a_num.cmp(&b_num)
329 });
330
331 Ok(tasks)
332 })
333 .await
334 .map_err(|e| Error::JoinError(format!("{e}")))?
335 }
336
337 async fn delete_task(&self, team: &str, id: &str) -> Result<()> {
338 validate_name(team)?;
339 validate_name(id)?;
340
341 let dir = self.task_dir(team);
342 let lock_path = self.lock_path(team);
343 let task_path = self.task_path(team, id);
344 let team = team.to_string();
345 let id = id.to_string();
346
347 tokio::task::spawn_blocking(move || {
348 std::fs::create_dir_all(&dir)?;
349
350 let _lock = FileLock::acquire(&lock_path)?;
351
352 let _task = Self::read_task_at(&task_path, &team, &id)?;
354
355 std::fs::remove_file(&task_path)?;
356
357 Self::cascade_completion(&dir, &id)?;
359
360 Ok(())
361 })
362 .await
363 .map_err(|e| Error::JoinError(format!("{e}")))?
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 fn make_manager(dir: &Path) -> FileTaskManager {
372 FileTaskManager::new(dir.to_path_buf())
373 }
374
375 #[tokio::test]
376 async fn create_task_basic() {
377 let dir = tempfile::tempdir().unwrap();
378 let mgr = make_manager(dir.path());
379
380 let task = mgr
381 .create_task(
382 "test-team",
383 CreateTaskRequest {
384 subject: "Fix bug".into(),
385 description: Some("A nasty bug".into()),
386 active_form: None,
387 metadata: None,
388 },
389 )
390 .await
391 .unwrap();
392
393 assert_eq!(task.id, "1");
394 assert_eq!(task.subject, "Fix bug");
395 assert_eq!(task.status, TaskStatus::Pending);
396 assert!(task.owner.is_none());
397 }
398
399 #[tokio::test]
400 async fn create_multiple_tasks_increments_id() {
401 let dir = tempfile::tempdir().unwrap();
402 let mgr = make_manager(dir.path());
403
404 let t1 = mgr
405 .create_task(
406 "team",
407 CreateTaskRequest {
408 subject: "Task 1".into(),
409 description: None,
410 active_form: None,
411 metadata: None,
412 },
413 )
414 .await
415 .unwrap();
416
417 let t2 = mgr
418 .create_task(
419 "team",
420 CreateTaskRequest {
421 subject: "Task 2".into(),
422 description: None,
423 active_form: None,
424 metadata: None,
425 },
426 )
427 .await
428 .unwrap();
429
430 assert_eq!(t1.id, "1");
431 assert_eq!(t2.id, "2");
432 }
433
434 #[tokio::test]
435 async fn get_task_not_found() {
436 let dir = tempfile::tempdir().unwrap();
437 let mgr = make_manager(dir.path());
438
439 let err = mgr.get_task("team", "999").await.unwrap_err();
440 assert!(err.to_string().contains("not found"));
441 }
442
443 #[tokio::test]
444 async fn update_task_status_transition() {
445 let dir = tempfile::tempdir().unwrap();
446 let mgr = make_manager(dir.path());
447
448 let task = mgr
449 .create_task(
450 "team",
451 CreateTaskRequest {
452 subject: "Work item".into(),
453 description: None,
454 active_form: None,
455 metadata: None,
456 },
457 )
458 .await
459 .unwrap();
460
461 let updated = mgr
463 .update_task(
464 "team",
465 &task.id,
466 TaskUpdate {
467 status: Some(TaskStatus::InProgress),
468 owner: Some("agent-1".into()),
469 ..Default::default()
470 },
471 )
472 .await
473 .unwrap();
474
475 assert_eq!(updated.status, TaskStatus::InProgress);
476 assert_eq!(updated.owner.as_deref(), Some("agent-1"));
477
478 let completed = mgr
480 .update_task(
481 "team",
482 &task.id,
483 TaskUpdate {
484 status: Some(TaskStatus::Completed),
485 ..Default::default()
486 },
487 )
488 .await
489 .unwrap();
490
491 assert_eq!(completed.status, TaskStatus::Completed);
492 }
493
494 #[tokio::test]
495 async fn update_task_invalid_transition() {
496 let dir = tempfile::tempdir().unwrap();
497 let mgr = make_manager(dir.path());
498
499 let task = mgr
500 .create_task(
501 "team",
502 CreateTaskRequest {
503 subject: "Work".into(),
504 description: None,
505 active_form: None,
506 metadata: None,
507 },
508 )
509 .await
510 .unwrap();
511
512 let err = mgr
514 .update_task(
515 "team",
516 &task.id,
517 TaskUpdate {
518 status: Some(TaskStatus::Completed),
519 ..Default::default()
520 },
521 )
522 .await
523 .unwrap_err();
524
525 assert!(err.to_string().contains("Invalid task status transition"));
526 }
527
528 #[tokio::test]
529 async fn delete_task_removes_file() {
530 let dir = tempfile::tempdir().unwrap();
531 let mgr = make_manager(dir.path());
532
533 let task = mgr
534 .create_task(
535 "team",
536 CreateTaskRequest {
537 subject: "To delete".into(),
538 description: None,
539 active_form: None,
540 metadata: None,
541 },
542 )
543 .await
544 .unwrap();
545
546 mgr.delete_task("team", &task.id).await.unwrap();
547
548 let err = mgr.get_task("team", &task.id).await.unwrap_err();
549 assert!(err.to_string().contains("not found"));
550 }
551
552 #[tokio::test]
553 async fn list_tasks_with_filter() {
554 let dir = tempfile::tempdir().unwrap();
555 let mgr = make_manager(dir.path());
556
557 let t1 = mgr
558 .create_task(
559 "team",
560 CreateTaskRequest {
561 subject: "Task 1".into(),
562 description: None,
563 active_form: None,
564 metadata: None,
565 },
566 )
567 .await
568 .unwrap();
569
570 mgr.create_task(
571 "team",
572 CreateTaskRequest {
573 subject: "Task 2".into(),
574 description: None,
575 active_form: None,
576 metadata: None,
577 },
578 )
579 .await
580 .unwrap();
581
582 mgr.update_task(
584 "team",
585 &t1.id,
586 TaskUpdate {
587 status: Some(TaskStatus::InProgress),
588 owner: Some("agent-1".into()),
589 ..Default::default()
590 },
591 )
592 .await
593 .unwrap();
594
595 let all = mgr.list_tasks("team", None).await.unwrap();
597 assert_eq!(all.len(), 2);
598
599 let pending = mgr
601 .list_tasks(
602 "team",
603 Some(TaskFilter {
604 status: Some(TaskStatus::Pending),
605 ..Default::default()
606 }),
607 )
608 .await
609 .unwrap();
610 assert_eq!(pending.len(), 1);
611 assert_eq!(pending[0].subject, "Task 2");
612
613 let owned = mgr
615 .list_tasks(
616 "team",
617 Some(TaskFilter {
618 owner: Some("agent-1".into()),
619 ..Default::default()
620 }),
621 )
622 .await
623 .unwrap();
624 assert_eq!(owned.len(), 1);
625 assert_eq!(owned[0].subject, "Task 1");
626 }
627
628 #[tokio::test]
629 async fn completion_cascades_to_blocked_by() {
630 let dir = tempfile::tempdir().unwrap();
631 let mgr = make_manager(dir.path());
632
633 let t1 = mgr
634 .create_task(
635 "team",
636 CreateTaskRequest {
637 subject: "Prerequisite".into(),
638 description: None,
639 active_form: None,
640 metadata: None,
641 },
642 )
643 .await
644 .unwrap();
645
646 let t2 = mgr
647 .create_task(
648 "team",
649 CreateTaskRequest {
650 subject: "Dependent".into(),
651 description: None,
652 active_form: None,
653 metadata: None,
654 },
655 )
656 .await
657 .unwrap();
658
659 mgr.update_task(
661 "team",
662 &t2.id,
663 TaskUpdate {
664 add_blocked_by: Some(vec![t1.id.clone()]),
665 ..Default::default()
666 },
667 )
668 .await
669 .unwrap();
670
671 let t2_read = mgr.get_task("team", &t2.id).await.unwrap();
673 assert_eq!(t2_read.blocked_by, vec![t1.id.clone()]);
674
675 mgr.update_task(
677 "team",
678 &t1.id,
679 TaskUpdate {
680 status: Some(TaskStatus::InProgress),
681 ..Default::default()
682 },
683 )
684 .await
685 .unwrap();
686
687 mgr.update_task(
688 "team",
689 &t1.id,
690 TaskUpdate {
691 status: Some(TaskStatus::Completed),
692 ..Default::default()
693 },
694 )
695 .await
696 .unwrap();
697
698 let t2_unblocked = mgr.get_task("team", &t2.id).await.unwrap();
700 assert!(
701 t2_unblocked.blocked_by.is_empty(),
702 "t2 should be unblocked after t1 completed, but blocked_by = {:?}",
703 t2_unblocked.blocked_by
704 );
705 }
706
707 #[tokio::test]
708 async fn update_task_metadata_merge() {
709 let dir = tempfile::tempdir().unwrap();
710 let mgr = make_manager(dir.path());
711
712 let task = mgr
713 .create_task(
714 "team",
715 CreateTaskRequest {
716 subject: "Meta task".into(),
717 description: None,
718 active_form: None,
719 metadata: Some(serde_json::json!({"key1": "val1", "key2": "val2"})),
720 },
721 )
722 .await
723 .unwrap();
724
725 let updated = mgr
727 .update_task(
728 "team",
729 &task.id,
730 TaskUpdate {
731 metadata: Some(serde_json::json!({
732 "key1": "updated",
733 "key2": null,
734 "key3": "new"
735 })),
736 ..Default::default()
737 },
738 )
739 .await
740 .unwrap();
741
742 let meta = updated.metadata.unwrap();
743 assert_eq!(meta["key1"], "updated");
744 assert!(meta.get("key2").is_none());
745 assert_eq!(meta["key3"], "new");
746 }
747
748 #[tokio::test]
749 async fn path_traversal_rejected() {
750 let dir = tempfile::tempdir().unwrap();
751 let mgr = make_manager(dir.path());
752
753 let err = mgr
755 .create_task(
756 "../escape",
757 CreateTaskRequest {
758 subject: "Bad".into(),
759 description: None,
760 active_form: None,
761 metadata: None,
762 },
763 )
764 .await
765 .unwrap_err();
766 assert!(matches!(err, Error::InvalidName { .. }));
767
768 let err = mgr
770 .create_task(
771 "",
772 CreateTaskRequest {
773 subject: "Bad".into(),
774 description: None,
775 active_form: None,
776 metadata: None,
777 },
778 )
779 .await
780 .unwrap_err();
781 assert!(matches!(err, Error::InvalidName { .. }));
782
783 mgr.create_task(
786 "team",
787 CreateTaskRequest {
788 subject: "Good".into(),
789 description: None,
790 active_form: None,
791 metadata: None,
792 },
793 )
794 .await
795 .unwrap();
796
797 let err = mgr.get_task("team", "../1").await.unwrap_err();
798 assert!(matches!(err, Error::InvalidName { .. }));
799
800 let err = mgr.delete_task("team", "..").await.unwrap_err();
801 assert!(matches!(err, Error::InvalidName { .. }));
802 }
803}