1use std::path::{Path, PathBuf};
8
9use async_trait::async_trait;
10use dashmap::DashMap;
11
12use crate::error::Result;
13use crate::types::*;
14
15#[async_trait]
19pub trait PoolStore: Send + Sync {
20 async fn put_task(&self, record: TaskRecord) -> Result<()>;
22
23 async fn get_task(&self, id: &TaskId) -> Result<Option<TaskRecord>>;
25
26 async fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<TaskRecord>>;
28
29 async fn delete_task(&self, id: &TaskId) -> Result<bool>;
31
32 async fn put_slot(&self, record: SlotRecord) -> Result<()>;
34
35 async fn get_slot(&self, id: &SlotId) -> Result<Option<SlotRecord>>;
37
38 async fn list_slots(&self) -> Result<Vec<SlotRecord>>;
40
41 async fn delete_slot(&self, id: &SlotId) -> Result<bool>;
43}
44
45#[derive(Debug, Default)]
50pub struct InMemoryStore {
51 tasks: DashMap<String, TaskRecord>,
52 slots: DashMap<String, SlotRecord>,
53}
54
55impl InMemoryStore {
56 pub fn new() -> Self {
58 Self::default()
59 }
60}
61
62#[async_trait]
63impl PoolStore for InMemoryStore {
64 async fn put_task(&self, record: TaskRecord) -> Result<()> {
65 self.tasks.insert(record.id.0.clone(), record);
66 Ok(())
67 }
68
69 async fn get_task(&self, id: &TaskId) -> Result<Option<TaskRecord>> {
70 Ok(self.tasks.get(&id.0).map(|r| r.value().clone()))
71 }
72
73 async fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<TaskRecord>> {
74 let tasks: Vec<TaskRecord> = self
75 .tasks
76 .iter()
77 .map(|r| r.value().clone())
78 .filter(|t| {
79 if let Some(state) = filter.state
80 && t.state != state
81 {
82 return false;
83 }
84 if let Some(ref wid) = filter.slot_id
85 && t.slot_id.as_ref() != Some(wid)
86 {
87 return false;
88 }
89 if let Some(ref tags) = filter.tags
90 && !tags.iter().any(|tag| t.tags.contains(tag))
91 {
92 return false;
93 }
94 true
95 })
96 .collect();
97 Ok(tasks)
98 }
99
100 async fn delete_task(&self, id: &TaskId) -> Result<bool> {
101 Ok(self.tasks.remove(&id.0).is_some())
102 }
103
104 async fn put_slot(&self, record: SlotRecord) -> Result<()> {
105 self.slots.insert(record.id.0.clone(), record);
106 Ok(())
107 }
108
109 async fn get_slot(&self, id: &SlotId) -> Result<Option<SlotRecord>> {
110 Ok(self.slots.get(&id.0).map(|r| r.value().clone()))
111 }
112
113 async fn list_slots(&self) -> Result<Vec<SlotRecord>> {
114 Ok(self.slots.iter().map(|r| r.value().clone()).collect())
115 }
116
117 async fn delete_slot(&self, id: &SlotId) -> Result<bool> {
118 Ok(self.slots.remove(&id.0).is_some())
119 }
120}
121
122#[derive(Debug)]
128pub struct JsonFileStore {
129 dir: PathBuf,
130}
131
132impl JsonFileStore {
133 pub async fn new(dir: impl Into<PathBuf>) -> Result<Self> {
137 let dir = dir.into();
138 tokio::fs::create_dir_all(dir.join("tasks"))
139 .await
140 .map_err(|e| crate::Error::Store(format!("failed to create tasks dir: {e}")))?;
141 tokio::fs::create_dir_all(dir.join("slots"))
142 .await
143 .map_err(|e| crate::Error::Store(format!("failed to create slots dir: {e}")))?;
144 Ok(Self { dir })
145 }
146
147 pub fn dir(&self) -> &Path {
149 &self.dir
150 }
151
152 fn task_path(&self, id: &TaskId) -> PathBuf {
153 self.dir.join("tasks").join(format!("{}.json", id.0))
154 }
155
156 fn slot_path(&self, id: &SlotId) -> PathBuf {
157 self.dir.join("slots").join(format!("{}.json", id.0))
158 }
159}
160
161#[async_trait]
162impl PoolStore for JsonFileStore {
163 async fn put_task(&self, record: TaskRecord) -> Result<()> {
164 let path = self.task_path(&record.id);
165 let json = serde_json::to_string_pretty(&record)?;
166 tokio::fs::write(&path, json).await.map_err(|e| {
167 crate::Error::Store(format!("failed to write task {}: {e}", path.display()))
168 })?;
169 Ok(())
170 }
171
172 async fn get_task(&self, id: &TaskId) -> Result<Option<TaskRecord>> {
173 let path = self.task_path(id);
174 match tokio::fs::read_to_string(&path).await {
175 Ok(contents) => {
176 let record: TaskRecord = serde_json::from_str(&contents)?;
177 Ok(Some(record))
178 }
179 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
180 Err(e) => Err(crate::Error::Store(format!(
181 "failed to read task {}: {e}",
182 path.display()
183 ))),
184 }
185 }
186
187 async fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<TaskRecord>> {
188 let tasks_dir = self.dir.join("tasks");
189 let mut entries = tokio::fs::read_dir(&tasks_dir)
190 .await
191 .map_err(|e| crate::Error::Store(format!("failed to read tasks dir: {e}")))?;
192
193 let mut tasks = Vec::new();
194 while let Some(entry) = entries
195 .next_entry()
196 .await
197 .map_err(|e| crate::Error::Store(format!("failed to read dir entry: {e}")))?
198 {
199 let path = entry.path();
200 if path.extension().and_then(|e| e.to_str()) != Some("json") {
201 continue;
202 }
203
204 let contents = tokio::fs::read_to_string(&path).await.map_err(|e| {
205 crate::Error::Store(format!("failed to read {}: {e}", path.display()))
206 })?;
207 let record: TaskRecord = serde_json::from_str(&contents)?;
208
209 if let Some(state) = filter.state
211 && record.state != state
212 {
213 continue;
214 }
215 if let Some(ref wid) = filter.slot_id
216 && record.slot_id.as_ref() != Some(wid)
217 {
218 continue;
219 }
220 if let Some(ref tags) = filter.tags
221 && !tags.iter().any(|tag| record.tags.contains(tag))
222 {
223 continue;
224 }
225 tasks.push(record);
226 }
227
228 Ok(tasks)
229 }
230
231 async fn delete_task(&self, id: &TaskId) -> Result<bool> {
232 let path = self.task_path(id);
233 match tokio::fs::remove_file(&path).await {
234 Ok(()) => Ok(true),
235 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
236 Err(e) => Err(crate::Error::Store(format!(
237 "failed to delete task {}: {e}",
238 path.display()
239 ))),
240 }
241 }
242
243 async fn put_slot(&self, record: SlotRecord) -> Result<()> {
244 let path = self.slot_path(&record.id);
245 let json = serde_json::to_string_pretty(&record)?;
246 tokio::fs::write(&path, json).await.map_err(|e| {
247 crate::Error::Store(format!("failed to write slot {}: {e}", path.display()))
248 })?;
249 Ok(())
250 }
251
252 async fn get_slot(&self, id: &SlotId) -> Result<Option<SlotRecord>> {
253 let path = self.slot_path(id);
254 match tokio::fs::read_to_string(&path).await {
255 Ok(contents) => {
256 let record: SlotRecord = serde_json::from_str(&contents)?;
257 Ok(Some(record))
258 }
259 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
260 Err(e) => Err(crate::Error::Store(format!(
261 "failed to read slot {}: {e}",
262 path.display()
263 ))),
264 }
265 }
266
267 async fn list_slots(&self) -> Result<Vec<SlotRecord>> {
268 let slots_dir = self.dir.join("slots");
269 let mut entries = tokio::fs::read_dir(&slots_dir)
270 .await
271 .map_err(|e| crate::Error::Store(format!("failed to read slots dir: {e}")))?;
272
273 let mut slots = Vec::new();
274 while let Some(entry) = entries
275 .next_entry()
276 .await
277 .map_err(|e| crate::Error::Store(format!("failed to read dir entry: {e}")))?
278 {
279 let path = entry.path();
280 if path.extension().and_then(|e| e.to_str()) != Some("json") {
281 continue;
282 }
283
284 let contents = tokio::fs::read_to_string(&path).await.map_err(|e| {
285 crate::Error::Store(format!("failed to read {}: {e}", path.display()))
286 })?;
287 let record: SlotRecord = serde_json::from_str(&contents)?;
288 slots.push(record);
289 }
290
291 Ok(slots)
292 }
293
294 async fn delete_slot(&self, id: &SlotId) -> Result<bool> {
295 let path = self.slot_path(id);
296 match tokio::fs::remove_file(&path).await {
297 Ok(()) => Ok(true),
298 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
299 Err(e) => Err(crate::Error::Store(format!(
300 "failed to delete slot {}: {e}",
301 path.display()
302 ))),
303 }
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[tokio::test]
312 async fn task_crud() {
313 let store = InMemoryStore::new();
314 let id = TaskId("t-1".into());
315
316 let record = TaskRecord {
317 id: id.clone(),
318 prompt: "write tests".into(),
319 state: TaskState::Pending,
320 slot_id: None,
321 result: None,
322 tags: vec!["testing".into()],
323 config: None,
324 review_required: false,
325 max_rejections: 3,
326 rejection_count: 0,
327 original_prompt: None,
328 created_at_ms: None,
329 started_at_ms: None,
330 completed_at_ms: None,
331 };
332
333 store.put_task(record).await.unwrap();
334
335 let fetched = store.get_task(&id).await.unwrap().unwrap();
336 assert_eq!(fetched.prompt, "write tests");
337 assert_eq!(fetched.state, TaskState::Pending);
338
339 let all = store.list_tasks(&TaskFilter::default()).await.unwrap();
340 assert_eq!(all.len(), 1);
341
342 let deleted = store.delete_task(&id).await.unwrap();
343 assert!(deleted);
344 assert!(store.get_task(&id).await.unwrap().is_none());
345 }
346
347 #[tokio::test]
348 async fn slot_crud() {
349 let store = InMemoryStore::new();
350 let id = SlotId("w-0".into());
351
352 let record = SlotRecord {
353 id: id.clone(),
354 state: SlotState::Idle,
355 config: SlotConfig::default(),
356 current_task: None,
357 session_id: None,
358 tasks_completed: 0,
359 cost_microdollars: 0,
360 restart_count: 0,
361 worktree_path: None,
362 mcp_config_path: None,
363 };
364
365 store.put_slot(record).await.unwrap();
366
367 let fetched = store.get_slot(&id).await.unwrap().unwrap();
368 assert_eq!(fetched.state, SlotState::Idle);
369
370 let all = store.list_slots().await.unwrap();
371 assert_eq!(all.len(), 1);
372
373 let deleted = store.delete_slot(&id).await.unwrap();
374 assert!(deleted);
375 assert!(store.get_slot(&id).await.unwrap().is_none());
376 }
377
378 #[tokio::test]
379 async fn task_filter_by_state() {
380 let store = InMemoryStore::new();
381
382 for i in 0..3 {
383 let state = if i == 0 {
384 TaskState::Pending
385 } else {
386 TaskState::Completed
387 };
388 store
389 .put_task(TaskRecord {
390 id: TaskId(format!("t-{i}")),
391 prompt: format!("task {i}"),
392 state,
393 slot_id: None,
394 result: None,
395 tags: vec![],
396 config: None,
397 review_required: false,
398 max_rejections: 3,
399 rejection_count: 0,
400 original_prompt: None,
401 created_at_ms: None,
402 started_at_ms: None,
403 completed_at_ms: None,
404 })
405 .await
406 .unwrap();
407 }
408
409 let pending = store
410 .list_tasks(&TaskFilter {
411 state: Some(TaskState::Pending),
412 ..Default::default()
413 })
414 .await
415 .unwrap();
416 assert_eq!(pending.len(), 1);
417
418 let completed = store
419 .list_tasks(&TaskFilter {
420 state: Some(TaskState::Completed),
421 ..Default::default()
422 })
423 .await
424 .unwrap();
425 assert_eq!(completed.len(), 2);
426 }
427
428 fn make_task_record(id: &str, prompt: &str, state: TaskState) -> TaskRecord {
431 TaskRecord {
432 id: TaskId(id.into()),
433 prompt: prompt.into(),
434 state,
435 slot_id: None,
436 result: None,
437 tags: vec![],
438 config: None,
439 review_required: false,
440 max_rejections: 3,
441 rejection_count: 0,
442 original_prompt: None,
443 created_at_ms: None,
444 started_at_ms: None,
445 completed_at_ms: None,
446 }
447 }
448
449 fn make_slot_record(id: &str) -> SlotRecord {
450 SlotRecord {
451 id: SlotId(id.into()),
452 state: SlotState::Idle,
453 config: SlotConfig::default(),
454 current_task: None,
455 session_id: None,
456 tasks_completed: 0,
457 cost_microdollars: 0,
458 restart_count: 0,
459 worktree_path: None,
460 mcp_config_path: None,
461 }
462 }
463
464 #[tokio::test]
465 async fn json_file_store_task_crud() {
466 let dir = tempfile::tempdir().unwrap();
467 let store = JsonFileStore::new(dir.path()).await.unwrap();
468 let id = TaskId("jfs-t-1".into());
469
470 let record = make_task_record("jfs-t-1", "write tests", TaskState::Pending);
471 store.put_task(record).await.unwrap();
472
473 assert!(store.task_path(&id).exists());
475
476 let fetched = store.get_task(&id).await.unwrap().unwrap();
477 assert_eq!(fetched.prompt, "write tests");
478 assert_eq!(fetched.state, TaskState::Pending);
479
480 let all = store.list_tasks(&TaskFilter::default()).await.unwrap();
481 assert_eq!(all.len(), 1);
482
483 let deleted = store.delete_task(&id).await.unwrap();
484 assert!(deleted);
485 assert!(store.get_task(&id).await.unwrap().is_none());
486 assert!(!store.task_path(&id).exists());
487 }
488
489 #[tokio::test]
490 async fn json_file_store_slot_crud() {
491 let dir = tempfile::tempdir().unwrap();
492 let store = JsonFileStore::new(dir.path()).await.unwrap();
493 let id = SlotId("jfs-s-0".into());
494
495 let record = make_slot_record("jfs-s-0");
496 store.put_slot(record).await.unwrap();
497
498 assert!(store.slot_path(&id).exists());
499
500 let fetched = store.get_slot(&id).await.unwrap().unwrap();
501 assert_eq!(fetched.state, SlotState::Idle);
502
503 let all = store.list_slots().await.unwrap();
504 assert_eq!(all.len(), 1);
505
506 let deleted = store.delete_slot(&id).await.unwrap();
507 assert!(deleted);
508 assert!(store.get_slot(&id).await.unwrap().is_none());
509 }
510
511 #[tokio::test]
512 async fn json_file_store_task_filter() {
513 let dir = tempfile::tempdir().unwrap();
514 let store = JsonFileStore::new(dir.path()).await.unwrap();
515
516 store
517 .put_task(make_task_record("jfs-f-0", "task 0", TaskState::Pending))
518 .await
519 .unwrap();
520 store
521 .put_task(make_task_record("jfs-f-1", "task 1", TaskState::Completed))
522 .await
523 .unwrap();
524 store
525 .put_task(make_task_record("jfs-f-2", "task 2", TaskState::Completed))
526 .await
527 .unwrap();
528
529 let pending = store
530 .list_tasks(&TaskFilter {
531 state: Some(TaskState::Pending),
532 ..Default::default()
533 })
534 .await
535 .unwrap();
536 assert_eq!(pending.len(), 1);
537
538 let completed = store
539 .list_tasks(&TaskFilter {
540 state: Some(TaskState::Completed),
541 ..Default::default()
542 })
543 .await
544 .unwrap();
545 assert_eq!(completed.len(), 2);
546 }
547
548 #[tokio::test]
549 async fn json_file_store_delete_nonexistent() {
550 let dir = tempfile::tempdir().unwrap();
551 let store = JsonFileStore::new(dir.path()).await.unwrap();
552
553 let deleted = store.delete_task(&TaskId("nope".into())).await.unwrap();
554 assert!(!deleted);
555
556 let deleted = store.delete_slot(&SlotId("nope".into())).await.unwrap();
557 assert!(!deleted);
558 }
559
560 #[tokio::test]
561 async fn json_file_store_survives_reopen() {
562 let dir = tempfile::tempdir().unwrap();
563
564 {
566 let store = JsonFileStore::new(dir.path()).await.unwrap();
567 store
568 .put_task(make_task_record(
569 "persist-1",
570 "durable task",
571 TaskState::Pending,
572 ))
573 .await
574 .unwrap();
575 store
576 .put_slot(make_slot_record("persist-s-0"))
577 .await
578 .unwrap();
579 }
580
581 {
583 let store = JsonFileStore::new(dir.path()).await.unwrap();
584 let task = store
585 .get_task(&TaskId("persist-1".into()))
586 .await
587 .unwrap()
588 .unwrap();
589 assert_eq!(task.prompt, "durable task");
590
591 let slot = store
592 .get_slot(&SlotId("persist-s-0".into()))
593 .await
594 .unwrap()
595 .unwrap();
596 assert_eq!(slot.state, SlotState::Idle);
597 }
598 }
599}