Skip to main content

claude_pool/
store.rs

1//! Pluggable storage backend for pool state.
2//!
3//! The [`PoolStore`] trait abstracts where task and slot records live.
4//! [`InMemoryStore`] keeps everything in-process; [`JsonFileStore`] persists
5//! state to JSON files on disk so it survives restarts.
6
7use std::path::{Path, PathBuf};
8
9use async_trait::async_trait;
10use dashmap::DashMap;
11
12use crate::error::Result;
13use crate::types::*;
14
15/// Trait for storing and retrieving pool state.
16///
17/// Implementations must be `Send + Sync` for use in async contexts.
18#[async_trait]
19pub trait PoolStore: Send + Sync {
20    /// Insert or update a task record.
21    async fn put_task(&self, record: TaskRecord) -> Result<()>;
22
23    /// Get a task by ID.
24    async fn get_task(&self, id: &TaskId) -> Result<Option<TaskRecord>>;
25
26    /// List tasks matching an optional filter.
27    async fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<TaskRecord>>;
28
29    /// Delete a task record.
30    async fn delete_task(&self, id: &TaskId) -> Result<bool>;
31
32    /// Insert or update a slot record.
33    async fn put_slot(&self, record: SlotRecord) -> Result<()>;
34
35    /// Get a slot by ID.
36    async fn get_slot(&self, id: &SlotId) -> Result<Option<SlotRecord>>;
37
38    /// List all slots.
39    async fn list_slots(&self) -> Result<Vec<SlotRecord>>;
40
41    /// Delete a slot record.
42    async fn delete_slot(&self, id: &SlotId) -> Result<bool>;
43}
44
45/// In-memory store using [`DashMap`] for concurrent access.
46///
47/// All data is lost when the process exits. Suitable for single-session
48/// usage and development.
49#[derive(Debug, Default)]
50pub struct InMemoryStore {
51    tasks: DashMap<String, TaskRecord>,
52    slots: DashMap<String, SlotRecord>,
53}
54
55impl InMemoryStore {
56    /// Create a new empty in-memory store.
57    pub fn new() -> Self {
58        Self::default()
59    }
60}
61
62#[async_trait]
63impl PoolStore for InMemoryStore {
64    async fn put_task(&self, record: TaskRecord) -> Result<()> {
65        self.tasks.insert(record.id.0.clone(), record);
66        Ok(())
67    }
68
69    async fn get_task(&self, id: &TaskId) -> Result<Option<TaskRecord>> {
70        Ok(self.tasks.get(&id.0).map(|r| r.value().clone()))
71    }
72
73    async fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<TaskRecord>> {
74        let tasks: Vec<TaskRecord> = self
75            .tasks
76            .iter()
77            .map(|r| r.value().clone())
78            .filter(|t| {
79                if let Some(state) = filter.state
80                    && t.state != state
81                {
82                    return false;
83                }
84                if let Some(ref wid) = filter.slot_id
85                    && t.slot_id.as_ref() != Some(wid)
86                {
87                    return false;
88                }
89                if let Some(ref tags) = filter.tags
90                    && !tags.iter().any(|tag| t.tags.contains(tag))
91                {
92                    return false;
93                }
94                true
95            })
96            .collect();
97        Ok(tasks)
98    }
99
100    async fn delete_task(&self, id: &TaskId) -> Result<bool> {
101        Ok(self.tasks.remove(&id.0).is_some())
102    }
103
104    async fn put_slot(&self, record: SlotRecord) -> Result<()> {
105        self.slots.insert(record.id.0.clone(), record);
106        Ok(())
107    }
108
109    async fn get_slot(&self, id: &SlotId) -> Result<Option<SlotRecord>> {
110        Ok(self.slots.get(&id.0).map(|r| r.value().clone()))
111    }
112
113    async fn list_slots(&self) -> Result<Vec<SlotRecord>> {
114        Ok(self.slots.iter().map(|r| r.value().clone()).collect())
115    }
116
117    async fn delete_slot(&self, id: &SlotId) -> Result<bool> {
118        Ok(self.slots.remove(&id.0).is_some())
119    }
120}
121
122/// JSON file-backed store for durable pool state.
123///
124/// Tasks are stored as individual JSON files under `{dir}/tasks/{id}.json`
125/// and slots under `{dir}/slots/{id}.json`. This makes state inspectable
126/// with standard tools and survives process restarts.
127#[derive(Debug)]
128pub struct JsonFileStore {
129    dir: PathBuf,
130}
131
132impl JsonFileStore {
133    /// Create a new JSON file store rooted at `dir`.
134    ///
135    /// Creates the directory structure (`tasks/`, `slots/`) if it does not exist.
136    pub async fn new(dir: impl Into<PathBuf>) -> Result<Self> {
137        let dir = dir.into();
138        tokio::fs::create_dir_all(dir.join("tasks"))
139            .await
140            .map_err(|e| crate::Error::Store(format!("failed to create tasks dir: {e}")))?;
141        tokio::fs::create_dir_all(dir.join("slots"))
142            .await
143            .map_err(|e| crate::Error::Store(format!("failed to create slots dir: {e}")))?;
144        Ok(Self { dir })
145    }
146
147    /// Get the base directory.
148    pub fn dir(&self) -> &Path {
149        &self.dir
150    }
151
152    fn task_path(&self, id: &TaskId) -> PathBuf {
153        self.dir.join("tasks").join(format!("{}.json", id.0))
154    }
155
156    fn slot_path(&self, id: &SlotId) -> PathBuf {
157        self.dir.join("slots").join(format!("{}.json", id.0))
158    }
159}
160
161#[async_trait]
162impl PoolStore for JsonFileStore {
163    async fn put_task(&self, record: TaskRecord) -> Result<()> {
164        let path = self.task_path(&record.id);
165        let json = serde_json::to_string_pretty(&record)?;
166        tokio::fs::write(&path, json).await.map_err(|e| {
167            crate::Error::Store(format!("failed to write task {}: {e}", path.display()))
168        })?;
169        Ok(())
170    }
171
172    async fn get_task(&self, id: &TaskId) -> Result<Option<TaskRecord>> {
173        let path = self.task_path(id);
174        match tokio::fs::read_to_string(&path).await {
175            Ok(contents) => {
176                let record: TaskRecord = serde_json::from_str(&contents)?;
177                Ok(Some(record))
178            }
179            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
180            Err(e) => Err(crate::Error::Store(format!(
181                "failed to read task {}: {e}",
182                path.display()
183            ))),
184        }
185    }
186
187    async fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<TaskRecord>> {
188        let tasks_dir = self.dir.join("tasks");
189        let mut entries = tokio::fs::read_dir(&tasks_dir)
190            .await
191            .map_err(|e| crate::Error::Store(format!("failed to read tasks dir: {e}")))?;
192
193        let mut tasks = Vec::new();
194        while let Some(entry) = entries
195            .next_entry()
196            .await
197            .map_err(|e| crate::Error::Store(format!("failed to read dir entry: {e}")))?
198        {
199            let path = entry.path();
200            if path.extension().and_then(|e| e.to_str()) != Some("json") {
201                continue;
202            }
203
204            let contents = tokio::fs::read_to_string(&path).await.map_err(|e| {
205                crate::Error::Store(format!("failed to read {}: {e}", path.display()))
206            })?;
207            let record: TaskRecord = serde_json::from_str(&contents)?;
208
209            // Apply filter.
210            if let Some(state) = filter.state
211                && record.state != state
212            {
213                continue;
214            }
215            if let Some(ref wid) = filter.slot_id
216                && record.slot_id.as_ref() != Some(wid)
217            {
218                continue;
219            }
220            if let Some(ref tags) = filter.tags
221                && !tags.iter().any(|tag| record.tags.contains(tag))
222            {
223                continue;
224            }
225            tasks.push(record);
226        }
227
228        Ok(tasks)
229    }
230
231    async fn delete_task(&self, id: &TaskId) -> Result<bool> {
232        let path = self.task_path(id);
233        match tokio::fs::remove_file(&path).await {
234            Ok(()) => Ok(true),
235            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
236            Err(e) => Err(crate::Error::Store(format!(
237                "failed to delete task {}: {e}",
238                path.display()
239            ))),
240        }
241    }
242
243    async fn put_slot(&self, record: SlotRecord) -> Result<()> {
244        let path = self.slot_path(&record.id);
245        let json = serde_json::to_string_pretty(&record)?;
246        tokio::fs::write(&path, json).await.map_err(|e| {
247            crate::Error::Store(format!("failed to write slot {}: {e}", path.display()))
248        })?;
249        Ok(())
250    }
251
252    async fn get_slot(&self, id: &SlotId) -> Result<Option<SlotRecord>> {
253        let path = self.slot_path(id);
254        match tokio::fs::read_to_string(&path).await {
255            Ok(contents) => {
256                let record: SlotRecord = serde_json::from_str(&contents)?;
257                Ok(Some(record))
258            }
259            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
260            Err(e) => Err(crate::Error::Store(format!(
261                "failed to read slot {}: {e}",
262                path.display()
263            ))),
264        }
265    }
266
267    async fn list_slots(&self) -> Result<Vec<SlotRecord>> {
268        let slots_dir = self.dir.join("slots");
269        let mut entries = tokio::fs::read_dir(&slots_dir)
270            .await
271            .map_err(|e| crate::Error::Store(format!("failed to read slots dir: {e}")))?;
272
273        let mut slots = Vec::new();
274        while let Some(entry) = entries
275            .next_entry()
276            .await
277            .map_err(|e| crate::Error::Store(format!("failed to read dir entry: {e}")))?
278        {
279            let path = entry.path();
280            if path.extension().and_then(|e| e.to_str()) != Some("json") {
281                continue;
282            }
283
284            let contents = tokio::fs::read_to_string(&path).await.map_err(|e| {
285                crate::Error::Store(format!("failed to read {}: {e}", path.display()))
286            })?;
287            let record: SlotRecord = serde_json::from_str(&contents)?;
288            slots.push(record);
289        }
290
291        Ok(slots)
292    }
293
294    async fn delete_slot(&self, id: &SlotId) -> Result<bool> {
295        let path = self.slot_path(id);
296        match tokio::fs::remove_file(&path).await {
297            Ok(()) => Ok(true),
298            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
299            Err(e) => Err(crate::Error::Store(format!(
300                "failed to delete slot {}: {e}",
301                path.display()
302            ))),
303        }
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[tokio::test]
312    async fn task_crud() {
313        let store = InMemoryStore::new();
314        let id = TaskId("t-1".into());
315
316        let record = TaskRecord {
317            id: id.clone(),
318            prompt: "write tests".into(),
319            state: TaskState::Pending,
320            slot_id: None,
321            result: None,
322            tags: vec!["testing".into()],
323            config: None,
324            review_required: false,
325            max_rejections: 3,
326            rejection_count: 0,
327            original_prompt: None,
328            created_at_ms: None,
329            started_at_ms: None,
330            completed_at_ms: None,
331        };
332
333        store.put_task(record).await.unwrap();
334
335        let fetched = store.get_task(&id).await.unwrap().unwrap();
336        assert_eq!(fetched.prompt, "write tests");
337        assert_eq!(fetched.state, TaskState::Pending);
338
339        let all = store.list_tasks(&TaskFilter::default()).await.unwrap();
340        assert_eq!(all.len(), 1);
341
342        let deleted = store.delete_task(&id).await.unwrap();
343        assert!(deleted);
344        assert!(store.get_task(&id).await.unwrap().is_none());
345    }
346
347    #[tokio::test]
348    async fn slot_crud() {
349        let store = InMemoryStore::new();
350        let id = SlotId("w-0".into());
351
352        let record = SlotRecord {
353            id: id.clone(),
354            state: SlotState::Idle,
355            config: SlotConfig::default(),
356            current_task: None,
357            session_id: None,
358            tasks_completed: 0,
359            cost_microdollars: 0,
360            restart_count: 0,
361            worktree_path: None,
362            mcp_config_path: None,
363        };
364
365        store.put_slot(record).await.unwrap();
366
367        let fetched = store.get_slot(&id).await.unwrap().unwrap();
368        assert_eq!(fetched.state, SlotState::Idle);
369
370        let all = store.list_slots().await.unwrap();
371        assert_eq!(all.len(), 1);
372
373        let deleted = store.delete_slot(&id).await.unwrap();
374        assert!(deleted);
375        assert!(store.get_slot(&id).await.unwrap().is_none());
376    }
377
378    #[tokio::test]
379    async fn task_filter_by_state() {
380        let store = InMemoryStore::new();
381
382        for i in 0..3 {
383            let state = if i == 0 {
384                TaskState::Pending
385            } else {
386                TaskState::Completed
387            };
388            store
389                .put_task(TaskRecord {
390                    id: TaskId(format!("t-{i}")),
391                    prompt: format!("task {i}"),
392                    state,
393                    slot_id: None,
394                    result: None,
395                    tags: vec![],
396                    config: None,
397                    review_required: false,
398                    max_rejections: 3,
399                    rejection_count: 0,
400                    original_prompt: None,
401                    created_at_ms: None,
402                    started_at_ms: None,
403                    completed_at_ms: None,
404                })
405                .await
406                .unwrap();
407        }
408
409        let pending = store
410            .list_tasks(&TaskFilter {
411                state: Some(TaskState::Pending),
412                ..Default::default()
413            })
414            .await
415            .unwrap();
416        assert_eq!(pending.len(), 1);
417
418        let completed = store
419            .list_tasks(&TaskFilter {
420                state: Some(TaskState::Completed),
421                ..Default::default()
422            })
423            .await
424            .unwrap();
425        assert_eq!(completed.len(), 2);
426    }
427
428    // --- JsonFileStore tests ---
429
430    fn make_task_record(id: &str, prompt: &str, state: TaskState) -> TaskRecord {
431        TaskRecord {
432            id: TaskId(id.into()),
433            prompt: prompt.into(),
434            state,
435            slot_id: None,
436            result: None,
437            tags: vec![],
438            config: None,
439            review_required: false,
440            max_rejections: 3,
441            rejection_count: 0,
442            original_prompt: None,
443            created_at_ms: None,
444            started_at_ms: None,
445            completed_at_ms: None,
446        }
447    }
448
449    fn make_slot_record(id: &str) -> SlotRecord {
450        SlotRecord {
451            id: SlotId(id.into()),
452            state: SlotState::Idle,
453            config: SlotConfig::default(),
454            current_task: None,
455            session_id: None,
456            tasks_completed: 0,
457            cost_microdollars: 0,
458            restart_count: 0,
459            worktree_path: None,
460            mcp_config_path: None,
461        }
462    }
463
464    #[tokio::test]
465    async fn json_file_store_task_crud() {
466        let dir = tempfile::tempdir().unwrap();
467        let store = JsonFileStore::new(dir.path()).await.unwrap();
468        let id = TaskId("jfs-t-1".into());
469
470        let record = make_task_record("jfs-t-1", "write tests", TaskState::Pending);
471        store.put_task(record).await.unwrap();
472
473        // File should exist on disk.
474        assert!(store.task_path(&id).exists());
475
476        let fetched = store.get_task(&id).await.unwrap().unwrap();
477        assert_eq!(fetched.prompt, "write tests");
478        assert_eq!(fetched.state, TaskState::Pending);
479
480        let all = store.list_tasks(&TaskFilter::default()).await.unwrap();
481        assert_eq!(all.len(), 1);
482
483        let deleted = store.delete_task(&id).await.unwrap();
484        assert!(deleted);
485        assert!(store.get_task(&id).await.unwrap().is_none());
486        assert!(!store.task_path(&id).exists());
487    }
488
489    #[tokio::test]
490    async fn json_file_store_slot_crud() {
491        let dir = tempfile::tempdir().unwrap();
492        let store = JsonFileStore::new(dir.path()).await.unwrap();
493        let id = SlotId("jfs-s-0".into());
494
495        let record = make_slot_record("jfs-s-0");
496        store.put_slot(record).await.unwrap();
497
498        assert!(store.slot_path(&id).exists());
499
500        let fetched = store.get_slot(&id).await.unwrap().unwrap();
501        assert_eq!(fetched.state, SlotState::Idle);
502
503        let all = store.list_slots().await.unwrap();
504        assert_eq!(all.len(), 1);
505
506        let deleted = store.delete_slot(&id).await.unwrap();
507        assert!(deleted);
508        assert!(store.get_slot(&id).await.unwrap().is_none());
509    }
510
511    #[tokio::test]
512    async fn json_file_store_task_filter() {
513        let dir = tempfile::tempdir().unwrap();
514        let store = JsonFileStore::new(dir.path()).await.unwrap();
515
516        store
517            .put_task(make_task_record("jfs-f-0", "task 0", TaskState::Pending))
518            .await
519            .unwrap();
520        store
521            .put_task(make_task_record("jfs-f-1", "task 1", TaskState::Completed))
522            .await
523            .unwrap();
524        store
525            .put_task(make_task_record("jfs-f-2", "task 2", TaskState::Completed))
526            .await
527            .unwrap();
528
529        let pending = store
530            .list_tasks(&TaskFilter {
531                state: Some(TaskState::Pending),
532                ..Default::default()
533            })
534            .await
535            .unwrap();
536        assert_eq!(pending.len(), 1);
537
538        let completed = store
539            .list_tasks(&TaskFilter {
540                state: Some(TaskState::Completed),
541                ..Default::default()
542            })
543            .await
544            .unwrap();
545        assert_eq!(completed.len(), 2);
546    }
547
548    #[tokio::test]
549    async fn json_file_store_delete_nonexistent() {
550        let dir = tempfile::tempdir().unwrap();
551        let store = JsonFileStore::new(dir.path()).await.unwrap();
552
553        let deleted = store.delete_task(&TaskId("nope".into())).await.unwrap();
554        assert!(!deleted);
555
556        let deleted = store.delete_slot(&SlotId("nope".into())).await.unwrap();
557        assert!(!deleted);
558    }
559
560    #[tokio::test]
561    async fn json_file_store_survives_reopen() {
562        let dir = tempfile::tempdir().unwrap();
563
564        // Write with one store instance.
565        {
566            let store = JsonFileStore::new(dir.path()).await.unwrap();
567            store
568                .put_task(make_task_record(
569                    "persist-1",
570                    "durable task",
571                    TaskState::Pending,
572                ))
573                .await
574                .unwrap();
575            store
576                .put_slot(make_slot_record("persist-s-0"))
577                .await
578                .unwrap();
579        }
580
581        // Read with a fresh store instance.
582        {
583            let store = JsonFileStore::new(dir.path()).await.unwrap();
584            let task = store
585                .get_task(&TaskId("persist-1".into()))
586                .await
587                .unwrap()
588                .unwrap();
589            assert_eq!(task.prompt, "durable task");
590
591            let slot = store
592                .get_slot(&SlotId("persist-s-0".into()))
593                .await
594                .unwrap()
595                .unwrap();
596            assert_eq!(slot.state, SlotState::Idle);
597        }
598    }
599}