Skip to main content

agent_sdk_store_file/
tool_execution.rs

1use std::path::PathBuf;
2
3use agent_sdk_core::{
4    AgentError, EffectId, IdempotencyKey, JournalCursor, RunId, ToolCallId, ToolExecutionStore,
5    ToolExecutionStoreCursor, ToolExecutionStoreRecord,
6};
7
8use crate::util::{read_json_lines, remove_file_if_exists, root_join, safe_segment, store_error};
9
10#[derive(Clone, Debug)]
11/// Filesystem-backed rebuildable tool-execution projection store.
12pub struct FileToolExecutionStore {
13    root: PathBuf,
14}
15
16impl FileToolExecutionStore {
17    /// Creates a tool-execution projection store rooted under the provided path.
18    pub fn new(root: impl Into<PathBuf>) -> Self {
19        Self { root: root.into() }
20    }
21
22    fn run_path(&self, run_id: &RunId) -> PathBuf {
23        root_join(
24            &self.root,
25            &[
26                "tool_execution".to_string(),
27                safe_segment(run_id.as_str()),
28                "records.ndjson".to_string(),
29            ],
30        )
31    }
32
33    fn all_records(&self, run_id: &RunId) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
34        let mut records = read_json_lines::<ToolExecutionStoreRecord>(&self.run_path(run_id))?;
35        records.sort_by_key(|record| record.journal_seq);
36        Ok(records)
37    }
38
39    fn records_from_all_runs(&self) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
40        let root = self.root.join("tool_execution");
41        if !root.exists() {
42            return Ok(Vec::new());
43        }
44        let mut matches = Vec::new();
45        for entry in std::fs::read_dir(root).map_err(|error| store_error(error.to_string()))? {
46            let entry = entry.map_err(|error| store_error(error.to_string()))?;
47            let path = entry.path().join("records.ndjson");
48            matches.extend(read_json_lines::<ToolExecutionStoreRecord>(&path)?);
49        }
50        matches.sort_by(|left, right| {
51            left.run_id
52                .as_str()
53                .cmp(right.run_id.as_str())
54                .then(left.journal_seq.cmp(&right.journal_seq))
55        });
56        Ok(matches)
57    }
58}
59
60impl ToolExecutionStore for FileToolExecutionStore {
61    fn put_tool_execution_record(
62        &self,
63        record: ToolExecutionStoreRecord,
64    ) -> Result<ToolExecutionStoreCursor, AgentError> {
65        let mut records = self.all_records(&record.run_id)?;
66        records.retain(|existing| {
67            existing.tool_call_id != record.tool_call_id
68                || existing.journal_seq != record.journal_seq
69        });
70        records.push(record.clone());
71        records.sort_by_key(|record| record.journal_seq);
72        let path = self.run_path(&record.run_id);
73        remove_file_if_exists(&path)?;
74        for record in &records {
75            crate::util::append_json_line(&path, record)?;
76        }
77        Ok(ToolExecutionStoreCursor::new(records.len() as u64))
78    }
79
80    fn records_for_run(&self, run_id: &RunId) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
81        self.all_records(run_id)
82    }
83
84    fn record_for_tool_call(
85        &self,
86        run_id: &RunId,
87        tool_call_id: &ToolCallId,
88    ) -> Result<Option<ToolExecutionStoreRecord>, AgentError> {
89        Ok(self
90            .all_records(run_id)?
91            .into_iter()
92            .rev()
93            .find(|record| &record.tool_call_id == tool_call_id))
94    }
95
96    fn records_for_idempotency_key(
97        &self,
98        idempotency_key: &IdempotencyKey,
99    ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
100        Ok(self
101            .records_from_all_runs()?
102            .into_iter()
103            .filter(|record| record.idempotency_key.as_ref() == Some(idempotency_key))
104            .collect())
105    }
106
107    fn records_for_effect_id(
108        &self,
109        effect_id: &EffectId,
110    ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
111        Ok(self
112            .records_from_all_runs()?
113            .into_iter()
114            .filter(|record| record.effect_id.as_ref() == Some(effect_id))
115            .collect())
116    }
117
118    fn records_after_journal_seq(
119        &self,
120        run_id: &RunId,
121        journal_seq: u64,
122    ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
123        Ok(self
124            .all_records(run_id)?
125            .into_iter()
126            .filter(|record| record.journal_seq > journal_seq)
127            .collect())
128    }
129
130    fn records_in_journal_cursor_range(
131        &self,
132        run_id: &RunId,
133        after: Option<&JournalCursor>,
134        through: Option<&JournalCursor>,
135    ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
136        Ok(self
137            .all_records(run_id)?
138            .into_iter()
139            .filter(|record| record.is_in_journal_cursor_range(after, through))
140            .collect())
141    }
142}