1use std::path::PathBuf;
2
3use chrono::Utc;
4use tracing::{debug, info, warn};
5
6use crate::error::{AgentId, SdkError, SdkResult, TaskId};
7use crate::types::task::{Task, TaskResult, TaskStatus};
8
9use super::file_lock::FileLock;
10
11pub struct TaskStore {
15 base_dir: PathBuf,
16}
17
18impl TaskStore {
19 pub fn new(base_dir: PathBuf) -> Self {
20 Self { base_dir }
21 }
22
23 pub fn init(&self) -> SdkResult<()> {
24 for subdir in &["pending", "in_progress", "completed", "failed"] {
25 std::fs::create_dir_all(self.tasks_dir().join(subdir))?;
26 }
27 Ok(())
28 }
29
30 fn tasks_dir(&self) -> PathBuf {
31 self.base_dir.clone()
32 }
33
34 fn task_path(&self, status_dir: &str, task_id: TaskId) -> PathBuf {
35 self.tasks_dir()
36 .join(status_dir)
37 .join(format!("{}.json", task_id))
38 }
39
40 fn lock_path(&self, status_dir: &str, task_id: TaskId) -> PathBuf {
41 self.tasks_dir()
42 .join(status_dir)
43 .join(format!("{}.lock", task_id))
44 }
45
46 pub fn create_task(&self, task: &Task) -> SdkResult<()> {
47 let path = self.task_path("pending", task.id);
48 let json = serde_json::to_string_pretty(task)?;
49 std::fs::write(&path, json)?;
50 debug!(task_id = %task.id, "Created task: {}", task.title);
51 Ok(())
52 }
53
54 pub fn try_claim_next(
55 &self,
56 agent_id: AgentId,
57 agent_name: &str,
58 completed_task_ids: &[TaskId],
59 ) -> SdkResult<Option<Task>> {
60 let pending_dir = self.tasks_dir().join("pending");
61
62 let mut candidates: Vec<(u32, chrono::DateTime<Utc>, PathBuf)> = Vec::new();
63 for entry in std::fs::read_dir(&pending_dir)? {
64 let entry = match entry {
65 Ok(e) => e,
66 Err(_) => continue,
67 };
68 let path = entry.path();
69 if path.extension().map(|ext| ext == "json").unwrap_or(false) {
70 let content = match std::fs::read_to_string(&path) {
71 Ok(c) => c,
72 Err(_) => continue,
73 };
74 let task: Task = match serde_json::from_str(&content) {
75 Ok(t) => t,
76 Err(_) => continue,
77 };
78 candidates.push((task.priority, task.created_at, path));
79 }
80 }
81
82 candidates.sort_by_key(|(priority, created_at, _)| (*priority, *created_at));
84
85 for (_, _, path) in candidates {
86 let task_id_str = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
87 let task_id: TaskId = match task_id_str.parse() {
88 Ok(id) => id,
89 Err(_) => continue,
90 };
91
92 let lock_path = self.lock_path("pending", task_id);
93 let lock = match FileLock::try_acquire(&lock_path)? {
94 Some(lock) => lock,
95 None => continue,
96 };
97
98 let content = std::fs::read_to_string(&path)?;
99 let mut task: Task = serde_json::from_str(&content)?;
100
101 let deps_resolved = task
102 .dependencies
103 .iter()
104 .all(|dep_id| completed_task_ids.contains(dep_id));
105
106 if !deps_resolved {
107 drop(lock);
108 continue;
109 }
110
111 if let Some(assignee) = assigned_teammate(&task.context) {
112 if assignee != agent_name {
113 drop(lock);
114 continue;
115 }
116 }
117
118 task.status = TaskStatus::Claimed {
119 agent_id,
120 at: Utc::now(),
121 };
122 task.updated_at = Utc::now();
123
124 let new_path = self.task_path("in_progress", task_id);
125 let json = serde_json::to_string_pretty(&task)?;
126 std::fs::write(&path, &json)?;
127 std::fs::rename(&path, &new_path)?;
128
129 let new_lock_path = self.lock_path("in_progress", task_id);
130 let _ = std::fs::rename(&lock_path, &new_lock_path);
131
132 info!(task_id = %task_id, agent_id = %agent_id, "Task claimed: {}", task.title);
133 drop(lock);
134 return Ok(Some(task));
135 }
136
137 Ok(None)
138 }
139
140 pub fn mark_in_progress(&self, task_id: TaskId, agent_id: AgentId) -> SdkResult<()> {
141 let path = self.task_path("in_progress", task_id);
142 let content = std::fs::read_to_string(&path)?;
143 let mut task: Task = serde_json::from_str(&content)?;
144
145 task.status = TaskStatus::InProgress {
146 agent_id,
147 started_at: Utc::now(),
148 };
149 task.updated_at = Utc::now();
150
151 let json = serde_json::to_string_pretty(&task)?;
152 std::fs::write(&path, json)?;
153 Ok(())
154 }
155
156 pub fn complete_task(
157 &self,
158 task_id: TaskId,
159 agent_id: AgentId,
160 result: TaskResult,
161 ) -> SdkResult<()> {
162 let src = self.task_path("in_progress", task_id);
163 let content = std::fs::read_to_string(&src)?;
164 let mut task: Task = serde_json::from_str(&content)?;
165
166 task.status = TaskStatus::Completed {
167 agent_id,
168 completed_at: Utc::now(),
169 };
170 task.result = Some(result);
171 task.updated_at = Utc::now();
172
173 let dst = self.task_path("completed", task_id);
174 let json = serde_json::to_string_pretty(&task)?;
175 std::fs::write(&src, &json)?;
176 std::fs::rename(&src, &dst)?;
177
178 let _ = std::fs::remove_file(self.lock_path("in_progress", task_id));
179
180 info!(task_id = %task_id, "Task completed");
181 Ok(())
182 }
183
184 pub fn fail_task(
185 &self,
186 task_id: TaskId,
187 agent_id: AgentId,
188 error: String,
189 ) -> SdkResult<()> {
190 let src = self.task_path("in_progress", task_id);
191 let content = std::fs::read_to_string(&src)?;
192 let mut task: Task = serde_json::from_str(&content)?;
193
194 task.retry_count += 1;
195 task.updated_at = Utc::now();
196
197 if task.retry_count < task.max_retries {
198 task.status = TaskStatus::Pending;
199 let dst = self.task_path("pending", task_id);
200 let json = serde_json::to_string_pretty(&task)?;
201 std::fs::write(&src, &json)?;
202 std::fs::rename(&src, &dst)?;
203 warn!(task_id = %task_id, retry = task.retry_count, "Task failed, retrying");
204 } else {
205 task.status = TaskStatus::Failed {
206 agent_id,
207 error,
208 failed_at: Utc::now(),
209 };
210 let dst = self.task_path("failed", task_id);
211 let json = serde_json::to_string_pretty(&task)?;
212 std::fs::write(&src, &json)?;
213 std::fs::rename(&src, &dst)?;
214 warn!(task_id = %task_id, "Task permanently failed");
215 }
216
217 let _ = std::fs::remove_file(self.lock_path("in_progress", task_id));
218 Ok(())
219 }
220
221 pub fn read_task(&self, task_id: TaskId) -> SdkResult<Task> {
222 for dir in &["pending", "in_progress", "completed", "failed"] {
223 let path = self.task_path(dir, task_id);
224 if path.exists() {
225 let content = std::fs::read_to_string(&path)?;
226 return Ok(serde_json::from_str(&content)?);
227 }
228 }
229 Err(SdkError::TaskNotFound { task_id })
230 }
231
232 pub fn list_all_tasks(&self) -> SdkResult<Vec<Task>> {
233 let mut tasks = Vec::new();
234 for dir in &["pending", "in_progress", "completed", "failed"] {
235 tasks.extend(self.list_tasks_in_dir(dir)?);
236 }
237 Ok(tasks)
238 }
239
240 pub fn list_tasks_in_dir(&self, status_dir: &str) -> SdkResult<Vec<Task>> {
241 let dir = self.tasks_dir().join(status_dir);
242 if !dir.exists() {
243 return Ok(Vec::new());
244 }
245
246 let mut tasks = Vec::new();
247 for entry in std::fs::read_dir(&dir)? {
248 let entry = entry?;
249 let path = entry.path();
250 if path.extension().map(|e| e == "json").unwrap_or(false) {
251 let content = std::fs::read_to_string(&path)?;
252 match serde_json::from_str::<Task>(&content) {
253 Ok(task) => tasks.push(task),
254 Err(e) => warn!("Failed to parse task file {:?}: {}", path, e),
255 }
256 }
257 }
258 Ok(tasks)
259 }
260
261 pub fn completed_task_ids(&self) -> SdkResult<Vec<TaskId>> {
262 Ok(self
263 .list_tasks_in_dir("completed")?
264 .into_iter()
265 .map(|t| t.id)
266 .collect())
267 }
268
269 pub fn recover_orphaned_tasks(&self) -> SdkResult<usize> {
270 let in_progress = self.list_tasks_in_dir("in_progress")?;
271 let mut recovered = 0;
272
273 for mut task in in_progress {
274 task.status = TaskStatus::Pending;
275 task.retry_count += 1;
276 task.updated_at = Utc::now();
277
278 let src = self.task_path("in_progress", task.id);
279 let dst = self.task_path("pending", task.id);
280 let json = serde_json::to_string_pretty(&task)?;
281 std::fs::write(&src, &json)?;
282 std::fs::rename(&src, &dst)?;
283
284 let _ = std::fs::remove_file(self.lock_path("in_progress", task.id));
285
286 recovered += 1;
287 info!(task_id = %task.id, "Recovered orphaned task");
288 }
289
290 Ok(recovered)
291 }
292
293 pub fn summary(&self) -> SdkResult<TaskSummary> {
294 Ok(TaskSummary {
295 pending: self.list_tasks_in_dir("pending")?.len(),
296 in_progress: self.list_tasks_in_dir("in_progress")?.len(),
297 completed: self.list_tasks_in_dir("completed")?.len(),
298 failed: self.list_tasks_in_dir("failed")?.len(),
299 })
300 }
301}
302
303fn assigned_teammate(context: &serde_json::Value) -> Option<&str> {
304 context.get("assigned_teammate").and_then(|v| v.as_str())
305}
306
307#[derive(Debug, Clone)]
308pub struct TaskSummary {
309 pub pending: usize,
310 pub in_progress: usize,
311 pub completed: usize,
312 pub failed: usize,
313}
314
315impl TaskSummary {
316 pub fn total(&self) -> usize {
317 self.pending + self.in_progress + self.completed + self.failed
318 }
319
320 pub fn is_done(&self) -> bool {
321 self.pending == 0 && self.in_progress == 0
322 }
323}