Skip to main content

agent_sdk/task/
store.rs

1use std::path::PathBuf;
2
3use chrono::Utc;
4use tracing::{debug, info, warn};
5
6use crate::error::{AgentId, SdkError, SdkResult, TaskId};
7use crate::types::task::{Task, TaskResult, TaskStatus};
8
9use super::file_lock::FileLock;
10
11/// File-backed task store. Tasks live in status-based directories beneath
12/// the configured base directory:
13/// `{base}/{pending,in_progress,completed,failed}/{task_id}.json`
14pub struct TaskStore {
15    base_dir: PathBuf,
16}
17
18impl TaskStore {
19    pub fn new(base_dir: PathBuf) -> Self {
20        Self { base_dir }
21    }
22
23    pub fn init(&self) -> SdkResult<()> {
24        for subdir in &["pending", "in_progress", "completed", "failed"] {
25            std::fs::create_dir_all(self.tasks_dir().join(subdir))?;
26        }
27        Ok(())
28    }
29
30    fn tasks_dir(&self) -> PathBuf {
31        self.base_dir.clone()
32    }
33
34    fn task_path(&self, status_dir: &str, task_id: TaskId) -> PathBuf {
35        self.tasks_dir()
36            .join(status_dir)
37            .join(format!("{}.json", task_id))
38    }
39
40    fn lock_path(&self, status_dir: &str, task_id: TaskId) -> PathBuf {
41        self.tasks_dir()
42            .join(status_dir)
43            .join(format!("{}.lock", task_id))
44    }
45
46    pub fn create_task(&self, task: &Task) -> SdkResult<()> {
47        let path = self.task_path("pending", task.id);
48        let json = serde_json::to_string_pretty(task)?;
49        std::fs::write(&path, json)?;
50        debug!(task_id = %task.id, "Created task: {}", task.title);
51        Ok(())
52    }
53
54    pub fn try_claim_next(
55        &self,
56        agent_id: AgentId,
57        agent_name: &str,
58        completed_task_ids: &[TaskId],
59    ) -> SdkResult<Option<Task>> {
60        let pending_dir = self.tasks_dir().join("pending");
61
62        let mut candidates: Vec<(u32, chrono::DateTime<Utc>, PathBuf)> = Vec::new();
63        for entry in std::fs::read_dir(&pending_dir)? {
64            let entry = match entry {
65                Ok(e) => e,
66                Err(_) => continue,
67            };
68            let path = entry.path();
69            if path.extension().map(|ext| ext == "json").unwrap_or(false) {
70                let content = match std::fs::read_to_string(&path) {
71                    Ok(c) => c,
72                    Err(_) => continue,
73                };
74                let task: Task = match serde_json::from_str(&content) {
75                    Ok(t) => t,
76                    Err(_) => continue,
77                };
78                candidates.push((task.priority, task.created_at, path));
79            }
80        }
81
82        // Lower priority value means higher priority.
83        candidates.sort_by_key(|(priority, created_at, _)| (*priority, *created_at));
84
85        for (_, _, path) in candidates {
86            let task_id_str = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
87            let task_id: TaskId = match task_id_str.parse() {
88                Ok(id) => id,
89                Err(_) => continue,
90            };
91
92            let lock_path = self.lock_path("pending", task_id);
93            let lock = match FileLock::try_acquire(&lock_path)? {
94                Some(lock) => lock,
95                None => continue,
96            };
97
98            let content = std::fs::read_to_string(&path)?;
99            let mut task: Task = serde_json::from_str(&content)?;
100
101            let deps_resolved = task
102                .dependencies
103                .iter()
104                .all(|dep_id| completed_task_ids.contains(dep_id));
105
106            if !deps_resolved {
107                drop(lock);
108                continue;
109            }
110
111            if let Some(assignee) = assigned_teammate(&task.context) {
112                if assignee != agent_name {
113                    drop(lock);
114                    continue;
115                }
116            }
117
118            task.status = TaskStatus::Claimed {
119                agent_id,
120                at: Utc::now(),
121            };
122            task.updated_at = Utc::now();
123
124            let new_path = self.task_path("in_progress", task_id);
125            let json = serde_json::to_string_pretty(&task)?;
126            std::fs::write(&path, &json)?;
127            std::fs::rename(&path, &new_path)?;
128
129            let new_lock_path = self.lock_path("in_progress", task_id);
130            let _ = std::fs::rename(&lock_path, &new_lock_path);
131
132            info!(task_id = %task_id, agent_id = %agent_id, "Task claimed: {}", task.title);
133            drop(lock);
134            return Ok(Some(task));
135        }
136
137        Ok(None)
138    }
139
140    pub fn mark_in_progress(&self, task_id: TaskId, agent_id: AgentId) -> SdkResult<()> {
141        let path = self.task_path("in_progress", task_id);
142        let content = std::fs::read_to_string(&path)?;
143        let mut task: Task = serde_json::from_str(&content)?;
144
145        task.status = TaskStatus::InProgress {
146            agent_id,
147            started_at: Utc::now(),
148        };
149        task.updated_at = Utc::now();
150
151        let json = serde_json::to_string_pretty(&task)?;
152        std::fs::write(&path, json)?;
153        Ok(())
154    }
155
156    pub fn complete_task(
157        &self,
158        task_id: TaskId,
159        agent_id: AgentId,
160        result: TaskResult,
161    ) -> SdkResult<()> {
162        let src = self.task_path("in_progress", task_id);
163        let content = std::fs::read_to_string(&src)?;
164        let mut task: Task = serde_json::from_str(&content)?;
165
166        task.status = TaskStatus::Completed {
167            agent_id,
168            completed_at: Utc::now(),
169        };
170        task.result = Some(result);
171        task.updated_at = Utc::now();
172
173        let dst = self.task_path("completed", task_id);
174        let json = serde_json::to_string_pretty(&task)?;
175        std::fs::write(&src, &json)?;
176        std::fs::rename(&src, &dst)?;
177
178        let _ = std::fs::remove_file(self.lock_path("in_progress", task_id));
179
180        info!(task_id = %task_id, "Task completed");
181        Ok(())
182    }
183
184    pub fn fail_task(
185        &self,
186        task_id: TaskId,
187        agent_id: AgentId,
188        error: String,
189    ) -> SdkResult<()> {
190        let src = self.task_path("in_progress", task_id);
191        let content = std::fs::read_to_string(&src)?;
192        let mut task: Task = serde_json::from_str(&content)?;
193
194        task.retry_count += 1;
195        task.updated_at = Utc::now();
196
197        if task.retry_count < task.max_retries {
198            task.status = TaskStatus::Pending;
199            let dst = self.task_path("pending", task_id);
200            let json = serde_json::to_string_pretty(&task)?;
201            std::fs::write(&src, &json)?;
202            std::fs::rename(&src, &dst)?;
203            warn!(task_id = %task_id, retry = task.retry_count, "Task failed, retrying");
204        } else {
205            task.status = TaskStatus::Failed {
206                agent_id,
207                error,
208                failed_at: Utc::now(),
209            };
210            let dst = self.task_path("failed", task_id);
211            let json = serde_json::to_string_pretty(&task)?;
212            std::fs::write(&src, &json)?;
213            std::fs::rename(&src, &dst)?;
214            warn!(task_id = %task_id, "Task permanently failed");
215        }
216
217        let _ = std::fs::remove_file(self.lock_path("in_progress", task_id));
218        Ok(())
219    }
220
221    pub fn read_task(&self, task_id: TaskId) -> SdkResult<Task> {
222        for dir in &["pending", "in_progress", "completed", "failed"] {
223            let path = self.task_path(dir, task_id);
224            if path.exists() {
225                let content = std::fs::read_to_string(&path)?;
226                return Ok(serde_json::from_str(&content)?);
227            }
228        }
229        Err(SdkError::TaskNotFound { task_id })
230    }
231
232    pub fn list_all_tasks(&self) -> SdkResult<Vec<Task>> {
233        let mut tasks = Vec::new();
234        for dir in &["pending", "in_progress", "completed", "failed"] {
235            tasks.extend(self.list_tasks_in_dir(dir)?);
236        }
237        Ok(tasks)
238    }
239
240    pub fn list_tasks_in_dir(&self, status_dir: &str) -> SdkResult<Vec<Task>> {
241        let dir = self.tasks_dir().join(status_dir);
242        if !dir.exists() {
243            return Ok(Vec::new());
244        }
245
246        let mut tasks = Vec::new();
247        for entry in std::fs::read_dir(&dir)? {
248            let entry = entry?;
249            let path = entry.path();
250            if path.extension().map(|e| e == "json").unwrap_or(false) {
251                let content = std::fs::read_to_string(&path)?;
252                match serde_json::from_str::<Task>(&content) {
253                    Ok(task) => tasks.push(task),
254                    Err(e) => warn!("Failed to parse task file {:?}: {}", path, e),
255                }
256            }
257        }
258        Ok(tasks)
259    }
260
261    pub fn completed_task_ids(&self) -> SdkResult<Vec<TaskId>> {
262        Ok(self
263            .list_tasks_in_dir("completed")?
264            .into_iter()
265            .map(|t| t.id)
266            .collect())
267    }
268
269    pub fn recover_orphaned_tasks(&self) -> SdkResult<usize> {
270        let in_progress = self.list_tasks_in_dir("in_progress")?;
271        let mut recovered = 0;
272
273        for mut task in in_progress {
274            task.status = TaskStatus::Pending;
275            task.retry_count += 1;
276            task.updated_at = Utc::now();
277
278            let src = self.task_path("in_progress", task.id);
279            let dst = self.task_path("pending", task.id);
280            let json = serde_json::to_string_pretty(&task)?;
281            std::fs::write(&src, &json)?;
282            std::fs::rename(&src, &dst)?;
283
284            let _ = std::fs::remove_file(self.lock_path("in_progress", task.id));
285
286            recovered += 1;
287            info!(task_id = %task.id, "Recovered orphaned task");
288        }
289
290        Ok(recovered)
291    }
292
293    pub fn summary(&self) -> SdkResult<TaskSummary> {
294        Ok(TaskSummary {
295            pending: self.list_tasks_in_dir("pending")?.len(),
296            in_progress: self.list_tasks_in_dir("in_progress")?.len(),
297            completed: self.list_tasks_in_dir("completed")?.len(),
298            failed: self.list_tasks_in_dir("failed")?.len(),
299        })
300    }
301}
302
303fn assigned_teammate(context: &serde_json::Value) -> Option<&str> {
304    context.get("assigned_teammate").and_then(|v| v.as_str())
305}
306
307#[derive(Debug, Clone)]
308pub struct TaskSummary {
309    pub pending: usize,
310    pub in_progress: usize,
311    pub completed: usize,
312    pub failed: usize,
313}
314
315impl TaskSummary {
316    pub fn total(&self) -> usize {
317        self.pending + self.in_progress + self.completed + self.failed
318    }
319
320    pub fn is_done(&self) -> bool {
321        self.pending == 0 && self.in_progress == 0
322    }
323}