Skip to main content

agent_teams/task/
mod.rs

1//! Task management: trait, file-based implementation, and dependency graph.
2
3pub mod graph;
4
5use std::path::{Path, PathBuf};
6
7use async_trait::async_trait;
8
9use crate::error::{Error, Result};
10use crate::models::task::{
11    CreateTaskRequest, TaskFile, TaskFilter, TaskStatus, TaskUpdate,
12};
13use crate::util::atomic_write::atomic_write_json;
14use crate::util::file_lock::FileLock;
15use crate::util::id_gen::next_task_id;
16use crate::util::validate_name;
17
18pub use graph::DependencyGraph;
19
20/// Trait for task CRUD operations.
21#[async_trait]
22pub trait TaskManager: Send + Sync {
23    /// Create a new task in the given team.
24    async fn create_task(&self, team: &str, req: CreateTaskRequest) -> Result<TaskFile>;
25
26    /// Update an existing task.
27    async fn update_task(&self, team: &str, id: &str, update: TaskUpdate) -> Result<TaskFile>;
28
29    /// Get a task by ID.
30    async fn get_task(&self, team: &str, id: &str) -> Result<TaskFile>;
31
32    /// List tasks, optionally filtered.
33    async fn list_tasks(&self, team: &str, filter: Option<TaskFilter>) -> Result<Vec<TaskFile>>;
34
35    /// Delete a task by ID.
36    async fn delete_task(&self, team: &str, id: &str) -> Result<()>;
37}
38
39/// File-based task manager storing tasks as `{base_dir}/{team}/{id}.json`.
40pub struct FileTaskManager {
41    base_dir: PathBuf,
42}
43
44impl FileTaskManager {
45    /// Create a new `FileTaskManager` with a custom base directory.
46    pub fn new(base_dir: PathBuf) -> Self {
47        Self { base_dir }
48    }
49
50    /// Create a `FileTaskManager` using the default `~/.claude/tasks` directory.
51    pub fn default_dir() -> Result<Self> {
52        let home = dirs::home_dir().ok_or_else(|| {
53            Error::Other("Could not determine home directory".into())
54        })?;
55        Ok(Self::new(home.join(".claude").join("tasks")))
56    }
57
58    /// Directory for a specific team's tasks.
59    fn task_dir(&self, team: &str) -> PathBuf {
60        self.base_dir.join(team)
61    }
62
63    /// File path for a specific task.
64    fn task_path(&self, team: &str, id: &str) -> PathBuf {
65        self.task_dir(team).join(format!("{id}.json"))
66    }
67
68    /// Lock file path for a team's task directory.
69    fn lock_path(&self, team: &str) -> PathBuf {
70        self.task_dir(team).join(".lock")
71    }
72
73    /// Read a task from disk (caller must hold the lock).
74    fn read_task_at(path: &Path, team: &str, id: &str) -> Result<TaskFile> {
75        let data = std::fs::read_to_string(path).map_err(|e| {
76            if e.kind() == std::io::ErrorKind::NotFound {
77                Error::TaskNotFound {
78                    team: team.into(),
79                    id: id.into(),
80                }
81            } else {
82                Error::Io(e)
83            }
84        })?;
85        let task: TaskFile = serde_json::from_str(&data)?;
86        Ok(task)
87    }
88
89    /// Read all tasks in a team directory (caller must hold the lock).
90    fn read_all_tasks_in(dir: &Path) -> Result<Vec<TaskFile>> {
91        if !dir.exists() {
92            return Ok(vec![]);
93        }
94
95        let mut tasks = Vec::new();
96        for entry in std::fs::read_dir(dir)? {
97            let entry = entry?;
98            let name = entry.file_name();
99            let name = name.to_string_lossy();
100            // Only parse numeric .json files
101            if let Some(stem) = name.strip_suffix(".json")
102                && stem.parse::<u64>().is_ok()
103            {
104                let data = std::fs::read_to_string(entry.path())?;
105                let task: TaskFile = serde_json::from_str(&data)?;
106                tasks.push(task);
107            }
108        }
109
110        Ok(tasks)
111    }
112
113    /// Apply a `TaskUpdate` to an existing `TaskFile`, validating transitions.
114    fn apply_update(task: &mut TaskFile, update: &TaskUpdate) -> Result<()> {
115        // Status transition validation
116        if let Some(new_status) = update.status {
117            if !task.status.can_transition_to(new_status) {
118                return Err(Error::InvalidStatusTransition {
119                    from: task.status.to_string(),
120                    to: new_status.to_string(),
121                });
122            }
123            task.status = new_status;
124        }
125
126        if let Some(ref subject) = update.subject {
127            task.subject.clone_from(subject);
128        }
129        if let Some(ref desc) = update.description {
130            task.description = Some(desc.clone());
131        }
132        if let Some(ref af) = update.active_form {
133            task.active_form = Some(af.clone());
134        }
135        if let Some(ref owner) = update.owner {
136            task.owner = Some(owner.clone());
137        }
138
139        // Merge dependency additions
140        if let Some(ref add_blocks) = update.add_blocks {
141            for id in add_blocks {
142                if !task.blocks.contains(id) {
143                    task.blocks.push(id.clone());
144                }
145            }
146        }
147        if let Some(ref add_blocked_by) = update.add_blocked_by {
148            for id in add_blocked_by {
149                if !task.blocked_by.contains(id) {
150                    task.blocked_by.push(id.clone());
151                }
152            }
153        }
154
155        // Merge metadata
156        if let Some(ref new_meta) = update.metadata
157            && let Some(obj) = new_meta.as_object()
158        {
159            let existing = task
160                .metadata
161                .get_or_insert_with(|| serde_json::json!({}));
162            if let Some(existing_obj) = existing.as_object_mut() {
163                for (k, v) in obj {
164                    if v.is_null() {
165                        existing_obj.remove(k);
166                    } else {
167                        existing_obj.insert(k.clone(), v.clone());
168                    }
169                }
170            }
171        }
172
173        Ok(())
174    }
175
176    /// When a task is completed, remove it from other tasks' `blocked_by` lists.
177    fn cascade_completion(dir: &Path, completed_id: &str) -> Result<()> {
178        if !dir.exists() {
179            return Ok(());
180        }
181
182        for entry in std::fs::read_dir(dir)? {
183            let entry = entry?;
184            let path = entry.path();
185            let name = entry.file_name();
186            let name = name.to_string_lossy();
187
188            if let Some(stem) = name.strip_suffix(".json")
189                && stem.parse::<u64>().is_ok()
190                && stem != completed_id
191            {
192                let data = std::fs::read_to_string(&path)?;
193                let mut task: TaskFile = serde_json::from_str(&data)?;
194
195                if task.blocked_by.contains(&completed_id.to_string()) {
196                    task.blocked_by.retain(|id| id != completed_id);
197                    atomic_write_json(&path, &task)?;
198                }
199            }
200        }
201
202        Ok(())
203    }
204}
205
206#[async_trait]
207impl TaskManager for FileTaskManager {
208    async fn create_task(&self, team: &str, req: CreateTaskRequest) -> Result<TaskFile> {
209        validate_name(team)?;
210
211        let dir = self.task_dir(team);
212        let lock_path = self.lock_path(team);
213
214        tokio::task::spawn_blocking(move || {
215            std::fs::create_dir_all(&dir)?;
216
217            let _lock = FileLock::acquire(&lock_path)?;
218
219            let id = next_task_id(&dir)?;
220
221            let task = TaskFile {
222                id: id.clone(),
223                subject: req.subject,
224                description: req.description,
225                active_form: req.active_form,
226                status: TaskStatus::Pending,
227                owner: None,
228                blocks: vec![],
229                blocked_by: vec![],
230                metadata: req.metadata,
231            };
232
233            let path = dir.join(format!("{id}.json"));
234            atomic_write_json(&path, &task)?;
235
236            Ok(task)
237        })
238        .await
239        .map_err(|e| Error::JoinError(format!("{e}")))?
240    }
241
242    async fn update_task(&self, team: &str, id: &str, update: TaskUpdate) -> Result<TaskFile> {
243        validate_name(team)?;
244        validate_name(id)?;
245
246        let dir = self.task_dir(team);
247        let lock_path = self.lock_path(team);
248        let task_path = self.task_path(team, id);
249        let team = team.to_string();
250        let id = id.to_string();
251
252        tokio::task::spawn_blocking(move || {
253            std::fs::create_dir_all(&dir)?;
254
255            // Phase 1: Lock
256            let _lock = FileLock::acquire(&lock_path)?;
257
258            // Phase 2: Read
259            let mut task = Self::read_task_at(&task_path, &team, &id)?;
260
261            // Phase 3: Validate + Mutate
262            let was_not_completed = task.status != TaskStatus::Completed;
263            Self::apply_update(&mut task, &update)?;
264            let is_now_completed = task.status == TaskStatus::Completed;
265
266            // Phase 4: Atomic write
267            atomic_write_json(&task_path, &task)?;
268
269            // Cascade: if task just became completed, unblock dependents
270            if was_not_completed && is_now_completed {
271                Self::cascade_completion(&dir, &id)?;
272            }
273
274            Ok(task)
275        })
276        .await
277        .map_err(|e| Error::JoinError(format!("{e}")))?
278    }
279
280    async fn get_task(&self, team: &str, id: &str) -> Result<TaskFile> {
281        validate_name(team)?;
282        validate_name(id)?;
283
284        let dir = self.task_dir(team);
285        let lock_path = self.lock_path(team);
286        let task_path = self.task_path(team, id);
287        let team = team.to_string();
288        let id = id.to_string();
289
290        tokio::task::spawn_blocking(move || {
291            std::fs::create_dir_all(&dir)?;
292
293            let _lock = FileLock::acquire(&lock_path)?;
294            Self::read_task_at(&task_path, &team, &id)
295        })
296        .await
297        .map_err(|e| Error::JoinError(format!("{e}")))?
298    }
299
300    async fn list_tasks(&self, team: &str, filter: Option<TaskFilter>) -> Result<Vec<TaskFile>> {
301        validate_name(team)?;
302
303        let dir = self.task_dir(team);
304        let lock_path = self.lock_path(team);
305
306        tokio::task::spawn_blocking(move || {
307            std::fs::create_dir_all(&dir)?;
308
309            let _lock = FileLock::acquire(&lock_path)?;
310            let mut tasks = Self::read_all_tasks_in(&dir)?;
311
312            if let Some(filter) = filter {
313                if let Some(status) = filter.status {
314                    tasks.retain(|t| t.status == status);
315                }
316                if let Some(ref owner) = filter.owner {
317                    tasks.retain(|t| t.owner.as_deref() == Some(owner.as_str()));
318                }
319                if filter.unblocked_only {
320                    tasks.retain(|t| t.blocked_by.is_empty());
321                }
322            }
323
324            // Sort by ID numerically for consistent ordering
325            tasks.sort_by(|a, b| {
326                let a_num = a.id.parse::<u64>().unwrap_or(u64::MAX);
327                let b_num = b.id.parse::<u64>().unwrap_or(u64::MAX);
328                a_num.cmp(&b_num)
329            });
330
331            Ok(tasks)
332        })
333        .await
334        .map_err(|e| Error::JoinError(format!("{e}")))?
335    }
336
337    async fn delete_task(&self, team: &str, id: &str) -> Result<()> {
338        validate_name(team)?;
339        validate_name(id)?;
340
341        let dir = self.task_dir(team);
342        let lock_path = self.lock_path(team);
343        let task_path = self.task_path(team, id);
344        let team = team.to_string();
345        let id = id.to_string();
346
347        tokio::task::spawn_blocking(move || {
348            std::fs::create_dir_all(&dir)?;
349
350            let _lock = FileLock::acquire(&lock_path)?;
351
352            // Read to verify it exists
353            let _task = Self::read_task_at(&task_path, &team, &id)?;
354
355            std::fs::remove_file(&task_path)?;
356
357            // Also remove from other tasks' blocked_by
358            Self::cascade_completion(&dir, &id)?;
359
360            Ok(())
361        })
362        .await
363        .map_err(|e| Error::JoinError(format!("{e}")))?
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370
371    fn make_manager(dir: &Path) -> FileTaskManager {
372        FileTaskManager::new(dir.to_path_buf())
373    }
374
375    #[tokio::test]
376    async fn create_task_basic() {
377        let dir = tempfile::tempdir().unwrap();
378        let mgr = make_manager(dir.path());
379
380        let task = mgr
381            .create_task(
382                "test-team",
383                CreateTaskRequest {
384                    subject: "Fix bug".into(),
385                    description: Some("A nasty bug".into()),
386                    active_form: None,
387                    metadata: None,
388                },
389            )
390            .await
391            .unwrap();
392
393        assert_eq!(task.id, "1");
394        assert_eq!(task.subject, "Fix bug");
395        assert_eq!(task.status, TaskStatus::Pending);
396        assert!(task.owner.is_none());
397    }
398
399    #[tokio::test]
400    async fn create_multiple_tasks_increments_id() {
401        let dir = tempfile::tempdir().unwrap();
402        let mgr = make_manager(dir.path());
403
404        let t1 = mgr
405            .create_task(
406                "team",
407                CreateTaskRequest {
408                    subject: "Task 1".into(),
409                    description: None,
410                    active_form: None,
411                    metadata: None,
412                },
413            )
414            .await
415            .unwrap();
416
417        let t2 = mgr
418            .create_task(
419                "team",
420                CreateTaskRequest {
421                    subject: "Task 2".into(),
422                    description: None,
423                    active_form: None,
424                    metadata: None,
425                },
426            )
427            .await
428            .unwrap();
429
430        assert_eq!(t1.id, "1");
431        assert_eq!(t2.id, "2");
432    }
433
434    #[tokio::test]
435    async fn get_task_not_found() {
436        let dir = tempfile::tempdir().unwrap();
437        let mgr = make_manager(dir.path());
438
439        let err = mgr.get_task("team", "999").await.unwrap_err();
440        assert!(err.to_string().contains("not found"));
441    }
442
443    #[tokio::test]
444    async fn update_task_status_transition() {
445        let dir = tempfile::tempdir().unwrap();
446        let mgr = make_manager(dir.path());
447
448        let task = mgr
449            .create_task(
450                "team",
451                CreateTaskRequest {
452                    subject: "Work item".into(),
453                    description: None,
454                    active_form: None,
455                    metadata: None,
456                },
457            )
458            .await
459            .unwrap();
460
461        // pending -> in_progress
462        let updated = mgr
463            .update_task(
464                "team",
465                &task.id,
466                TaskUpdate {
467                    status: Some(TaskStatus::InProgress),
468                    owner: Some("agent-1".into()),
469                    ..Default::default()
470                },
471            )
472            .await
473            .unwrap();
474
475        assert_eq!(updated.status, TaskStatus::InProgress);
476        assert_eq!(updated.owner.as_deref(), Some("agent-1"));
477
478        // in_progress -> completed
479        let completed = mgr
480            .update_task(
481                "team",
482                &task.id,
483                TaskUpdate {
484                    status: Some(TaskStatus::Completed),
485                    ..Default::default()
486                },
487            )
488            .await
489            .unwrap();
490
491        assert_eq!(completed.status, TaskStatus::Completed);
492    }
493
494    #[tokio::test]
495    async fn update_task_invalid_transition() {
496        let dir = tempfile::tempdir().unwrap();
497        let mgr = make_manager(dir.path());
498
499        let task = mgr
500            .create_task(
501                "team",
502                CreateTaskRequest {
503                    subject: "Work".into(),
504                    description: None,
505                    active_form: None,
506                    metadata: None,
507                },
508            )
509            .await
510            .unwrap();
511
512        // pending -> completed is not allowed (must go through in_progress)
513        let err = mgr
514            .update_task(
515                "team",
516                &task.id,
517                TaskUpdate {
518                    status: Some(TaskStatus::Completed),
519                    ..Default::default()
520                },
521            )
522            .await
523            .unwrap_err();
524
525        assert!(err.to_string().contains("Invalid task status transition"));
526    }
527
528    #[tokio::test]
529    async fn delete_task_removes_file() {
530        let dir = tempfile::tempdir().unwrap();
531        let mgr = make_manager(dir.path());
532
533        let task = mgr
534            .create_task(
535                "team",
536                CreateTaskRequest {
537                    subject: "To delete".into(),
538                    description: None,
539                    active_form: None,
540                    metadata: None,
541                },
542            )
543            .await
544            .unwrap();
545
546        mgr.delete_task("team", &task.id).await.unwrap();
547
548        let err = mgr.get_task("team", &task.id).await.unwrap_err();
549        assert!(err.to_string().contains("not found"));
550    }
551
552    #[tokio::test]
553    async fn list_tasks_with_filter() {
554        let dir = tempfile::tempdir().unwrap();
555        let mgr = make_manager(dir.path());
556
557        let t1 = mgr
558            .create_task(
559                "team",
560                CreateTaskRequest {
561                    subject: "Task 1".into(),
562                    description: None,
563                    active_form: None,
564                    metadata: None,
565                },
566            )
567            .await
568            .unwrap();
569
570        mgr.create_task(
571            "team",
572            CreateTaskRequest {
573                subject: "Task 2".into(),
574                description: None,
575                active_form: None,
576                metadata: None,
577            },
578        )
579        .await
580        .unwrap();
581
582        // Move t1 to in_progress
583        mgr.update_task(
584            "team",
585            &t1.id,
586            TaskUpdate {
587                status: Some(TaskStatus::InProgress),
588                owner: Some("agent-1".into()),
589                ..Default::default()
590            },
591        )
592        .await
593        .unwrap();
594
595        // List all
596        let all = mgr.list_tasks("team", None).await.unwrap();
597        assert_eq!(all.len(), 2);
598
599        // Filter by status
600        let pending = mgr
601            .list_tasks(
602                "team",
603                Some(TaskFilter {
604                    status: Some(TaskStatus::Pending),
605                    ..Default::default()
606                }),
607            )
608            .await
609            .unwrap();
610        assert_eq!(pending.len(), 1);
611        assert_eq!(pending[0].subject, "Task 2");
612
613        // Filter by owner
614        let owned = mgr
615            .list_tasks(
616                "team",
617                Some(TaskFilter {
618                    owner: Some("agent-1".into()),
619                    ..Default::default()
620                }),
621            )
622            .await
623            .unwrap();
624        assert_eq!(owned.len(), 1);
625        assert_eq!(owned[0].subject, "Task 1");
626    }
627
628    #[tokio::test]
629    async fn completion_cascades_to_blocked_by() {
630        let dir = tempfile::tempdir().unwrap();
631        let mgr = make_manager(dir.path());
632
633        let t1 = mgr
634            .create_task(
635                "team",
636                CreateTaskRequest {
637                    subject: "Prerequisite".into(),
638                    description: None,
639                    active_form: None,
640                    metadata: None,
641                },
642            )
643            .await
644            .unwrap();
645
646        let t2 = mgr
647            .create_task(
648                "team",
649                CreateTaskRequest {
650                    subject: "Dependent".into(),
651                    description: None,
652                    active_form: None,
653                    metadata: None,
654                },
655            )
656            .await
657            .unwrap();
658
659        // Make t2 blocked by t1
660        mgr.update_task(
661            "team",
662            &t2.id,
663            TaskUpdate {
664                add_blocked_by: Some(vec![t1.id.clone()]),
665                ..Default::default()
666            },
667        )
668        .await
669        .unwrap();
670
671        // Verify blocked
672        let t2_read = mgr.get_task("team", &t2.id).await.unwrap();
673        assert_eq!(t2_read.blocked_by, vec![t1.id.clone()]);
674
675        // Complete t1: pending -> in_progress -> completed
676        mgr.update_task(
677            "team",
678            &t1.id,
679            TaskUpdate {
680                status: Some(TaskStatus::InProgress),
681                ..Default::default()
682            },
683        )
684        .await
685        .unwrap();
686
687        mgr.update_task(
688            "team",
689            &t1.id,
690            TaskUpdate {
691                status: Some(TaskStatus::Completed),
692                ..Default::default()
693            },
694        )
695        .await
696        .unwrap();
697
698        // t2 should no longer be blocked
699        let t2_unblocked = mgr.get_task("team", &t2.id).await.unwrap();
700        assert!(
701            t2_unblocked.blocked_by.is_empty(),
702            "t2 should be unblocked after t1 completed, but blocked_by = {:?}",
703            t2_unblocked.blocked_by
704        );
705    }
706
707    #[tokio::test]
708    async fn update_task_metadata_merge() {
709        let dir = tempfile::tempdir().unwrap();
710        let mgr = make_manager(dir.path());
711
712        let task = mgr
713            .create_task(
714                "team",
715                CreateTaskRequest {
716                    subject: "Meta task".into(),
717                    description: None,
718                    active_form: None,
719                    metadata: Some(serde_json::json!({"key1": "val1", "key2": "val2"})),
720                },
721            )
722            .await
723            .unwrap();
724
725        // Merge: update key1, delete key2, add key3
726        let updated = mgr
727            .update_task(
728                "team",
729                &task.id,
730                TaskUpdate {
731                    metadata: Some(serde_json::json!({
732                        "key1": "updated",
733                        "key2": null,
734                        "key3": "new"
735                    })),
736                    ..Default::default()
737                },
738            )
739            .await
740            .unwrap();
741
742        let meta = updated.metadata.unwrap();
743        assert_eq!(meta["key1"], "updated");
744        assert!(meta.get("key2").is_none());
745        assert_eq!(meta["key3"], "new");
746    }
747
748    #[tokio::test]
749    async fn path_traversal_rejected() {
750        let dir = tempfile::tempdir().unwrap();
751        let mgr = make_manager(dir.path());
752
753        // Team name with path separator
754        let err = mgr
755            .create_task(
756                "../escape",
757                CreateTaskRequest {
758                    subject: "Bad".into(),
759                    description: None,
760                    active_form: None,
761                    metadata: None,
762                },
763            )
764            .await
765            .unwrap_err();
766        assert!(matches!(err, Error::InvalidName { .. }));
767
768        // Empty team name
769        let err = mgr
770            .create_task(
771                "",
772                CreateTaskRequest {
773                    subject: "Bad".into(),
774                    description: None,
775                    active_form: None,
776                    metadata: None,
777                },
778            )
779            .await
780            .unwrap_err();
781        assert!(matches!(err, Error::InvalidName { .. }));
782
783        // Task ID with path separator in get_task
784        // First create a valid task so we have a valid team
785        mgr.create_task(
786            "team",
787            CreateTaskRequest {
788                subject: "Good".into(),
789                description: None,
790                active_form: None,
791                metadata: None,
792            },
793        )
794        .await
795        .unwrap();
796
797        let err = mgr.get_task("team", "../1").await.unwrap_err();
798        assert!(matches!(err, Error::InvalidName { .. }));
799
800        let err = mgr.delete_task("team", "..").await.unwrap_err();
801        assert!(matches!(err, Error::InvalidName { .. }));
802    }
803}