agent_sdk_store_file/
tool_execution.rs1use std::path::PathBuf;
2
3use agent_sdk_core::{
4 AgentError, EffectId, IdempotencyKey, JournalCursor, RunId, ToolCallId, ToolExecutionStore,
5 ToolExecutionStoreCursor, ToolExecutionStoreRecord,
6};
7
8use crate::util::{read_json_lines, remove_file_if_exists, root_join, safe_segment, store_error};
9
10#[derive(Clone, Debug)]
11pub struct FileToolExecutionStore {
13 root: PathBuf,
14}
15
16impl FileToolExecutionStore {
17 pub fn new(root: impl Into<PathBuf>) -> Self {
19 Self { root: root.into() }
20 }
21
22 fn run_path(&self, run_id: &RunId) -> PathBuf {
23 root_join(
24 &self.root,
25 &[
26 "tool_execution".to_string(),
27 safe_segment(run_id.as_str()),
28 "records.ndjson".to_string(),
29 ],
30 )
31 }
32
33 fn all_records(&self, run_id: &RunId) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
34 let mut records = read_json_lines::<ToolExecutionStoreRecord>(&self.run_path(run_id))?;
35 records.sort_by_key(|record| record.journal_seq);
36 Ok(records)
37 }
38
39 fn records_from_all_runs(&self) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
40 let root = self.root.join("tool_execution");
41 if !root.exists() {
42 return Ok(Vec::new());
43 }
44 let mut matches = Vec::new();
45 for entry in std::fs::read_dir(root).map_err(|error| store_error(error.to_string()))? {
46 let entry = entry.map_err(|error| store_error(error.to_string()))?;
47 let path = entry.path().join("records.ndjson");
48 matches.extend(read_json_lines::<ToolExecutionStoreRecord>(&path)?);
49 }
50 matches.sort_by(|left, right| {
51 left.run_id
52 .as_str()
53 .cmp(right.run_id.as_str())
54 .then(left.journal_seq.cmp(&right.journal_seq))
55 });
56 Ok(matches)
57 }
58}
59
60impl ToolExecutionStore for FileToolExecutionStore {
61 fn put_tool_execution_record(
62 &self,
63 record: ToolExecutionStoreRecord,
64 ) -> Result<ToolExecutionStoreCursor, AgentError> {
65 let mut records = self.all_records(&record.run_id)?;
66 records.retain(|existing| {
67 existing.tool_call_id != record.tool_call_id
68 || existing.journal_seq != record.journal_seq
69 });
70 records.push(record.clone());
71 records.sort_by_key(|record| record.journal_seq);
72 let path = self.run_path(&record.run_id);
73 remove_file_if_exists(&path)?;
74 for record in &records {
75 crate::util::append_json_line(&path, record)?;
76 }
77 Ok(ToolExecutionStoreCursor::new(records.len() as u64))
78 }
79
80 fn records_for_run(&self, run_id: &RunId) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
81 self.all_records(run_id)
82 }
83
84 fn record_for_tool_call(
85 &self,
86 run_id: &RunId,
87 tool_call_id: &ToolCallId,
88 ) -> Result<Option<ToolExecutionStoreRecord>, AgentError> {
89 Ok(self
90 .all_records(run_id)?
91 .into_iter()
92 .rev()
93 .find(|record| &record.tool_call_id == tool_call_id))
94 }
95
96 fn records_for_idempotency_key(
97 &self,
98 idempotency_key: &IdempotencyKey,
99 ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
100 Ok(self
101 .records_from_all_runs()?
102 .into_iter()
103 .filter(|record| record.idempotency_key.as_ref() == Some(idempotency_key))
104 .collect())
105 }
106
107 fn records_for_effect_id(
108 &self,
109 effect_id: &EffectId,
110 ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
111 Ok(self
112 .records_from_all_runs()?
113 .into_iter()
114 .filter(|record| record.effect_id.as_ref() == Some(effect_id))
115 .collect())
116 }
117
118 fn records_after_journal_seq(
119 &self,
120 run_id: &RunId,
121 journal_seq: u64,
122 ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
123 Ok(self
124 .all_records(run_id)?
125 .into_iter()
126 .filter(|record| record.journal_seq > journal_seq)
127 .collect())
128 }
129
130 fn records_in_journal_cursor_range(
131 &self,
132 run_id: &RunId,
133 after: Option<&JournalCursor>,
134 through: Option<&JournalCursor>,
135 ) -> Result<Vec<ToolExecutionStoreRecord>, AgentError> {
136 Ok(self
137 .all_records(run_id)?
138 .into_iter()
139 .filter(|record| record.is_in_journal_cursor_range(after, through))
140 .collect())
141 }
142}