enact_core/streaming/
jsonl_state_store.rs1use crate::kernel::persistence::{ExecutionSnapshot, StateStore, StorageBackend};
8use crate::kernel::{ExecutionId, TenantId};
9use async_trait::async_trait;
10use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs;
14use tracing::debug;
15
16pub struct JsonlStateStore {
20 base_dir: PathBuf,
22}
23
24impl JsonlStateStore {
25 pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
27 fs::create_dir_all(base_dir.join("snapshots")).await?;
29 fs::create_dir_all(base_dir.join("kv")).await?;
30
31 Ok(Self { base_dir })
32 }
33
34 fn snapshot_path(&self, execution_id: &ExecutionId) -> PathBuf {
36 self.base_dir
37 .join("snapshots")
38 .join(format!("{}.json", execution_id.as_str()))
39 }
40
41 fn kv_path(&self, key: &str) -> PathBuf {
43 let safe_key = key.replace(['/', ':', '\\'], "_");
45 self.base_dir.join("kv").join(format!("{}.json", safe_key))
46 }
47}
48
49impl std::fmt::Debug for JsonlStateStore {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("JsonlStateStore")
52 .field("base_dir", &self.base_dir)
53 .finish()
54 }
55}
56
57#[async_trait]
58impl StorageBackend for JsonlStateStore {
59 fn name(&self) -> &str {
60 "jsonl"
61 }
62
63 fn requires_network(&self) -> bool {
64 false
65 }
66
67 async fn health_check(&self) -> anyhow::Result<()> {
68 if !self.base_dir.exists() {
70 anyhow::bail!("State store directory does not exist: {:?}", self.base_dir);
71 }
72 Ok(())
73 }
74
75 async fn shutdown(&self) -> anyhow::Result<()> {
76 Ok(())
77 }
78}
79
80#[async_trait]
81impl StateStore for JsonlStateStore {
82 async fn save_snapshot(&self, snapshot: ExecutionSnapshot) -> anyhow::Result<()> {
83 let path = self.snapshot_path(&snapshot.execution_id);
84
85 let json = serde_json::to_string_pretty(&snapshot)?;
86 fs::write(&path, json).await?;
87
88 debug!(
89 "Saved snapshot for execution {} to {:?}",
90 snapshot.execution_id.as_str(),
91 path
92 );
93
94 Ok(())
95 }
96
97 async fn load_snapshot(
98 &self,
99 execution_id: &ExecutionId,
100 ) -> anyhow::Result<Option<ExecutionSnapshot>> {
101 let path = self.snapshot_path(execution_id);
102
103 if !path.exists() {
104 return Ok(None);
105 }
106
107 let json = fs::read_to_string(&path).await?;
108 let snapshot: ExecutionSnapshot = serde_json::from_str(&json)?;
109
110 debug!(
111 "Loaded snapshot for execution {} from {:?}",
112 execution_id.as_str(),
113 path
114 );
115
116 Ok(Some(snapshot))
117 }
118
119 async fn delete_snapshot(&self, execution_id: &ExecutionId) -> anyhow::Result<()> {
120 let path = self.snapshot_path(execution_id);
121
122 if path.exists() {
123 fs::remove_file(&path).await?;
124 debug!(
125 "Deleted snapshot for execution {} from {:?}",
126 execution_id.as_str(),
127 path
128 );
129 }
130
131 Ok(())
132 }
133
134 async fn set(&self, key: &str, value: &[u8], _ttl: Option<Duration>) -> anyhow::Result<()> {
135 let path = self.kv_path(key);
136
137 let entry = serde_json::json!({
139 "key": key,
140 "value": BASE64.encode(value),
141 "timestamp": chrono::Utc::now().to_rfc3339(),
142 });
143
144 let json = serde_json::to_string_pretty(&entry)?;
145 fs::write(&path, json).await?;
146
147 debug!("Set key {} to {:?}", key, path);
149
150 Ok(())
151 }
152
153 async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
154 let path = self.kv_path(key);
155
156 if !path.exists() {
157 return Ok(None);
158 }
159
160 let json = fs::read_to_string(&path).await?;
161 let entry: serde_json::Value = serde_json::from_str(&json)?;
162
163 if let Some(value_str) = entry.get("value").and_then(|v| v.as_str()) {
164 let value = BASE64.decode(value_str)?;
165 return Ok(Some(value));
166 }
167
168 Ok(None)
169 }
170
171 async fn delete(&self, key: &str) -> anyhow::Result<()> {
172 let path = self.kv_path(key);
173
174 if path.exists() {
175 fs::remove_file(&path).await?;
176 debug!("Deleted key {} from {:?}", key, path);
177 }
178
179 Ok(())
180 }
181
182 async fn list_snapshots(
183 &self,
184 _tenant_id: &TenantId,
185 limit: usize,
186 ) -> anyhow::Result<Vec<ExecutionId>> {
187 let snapshot_dir = self.base_dir.join("snapshots");
188 let mut execution_ids = Vec::new();
189
190 let mut entries = fs::read_dir(&snapshot_dir).await?;
191 while let Some(entry) = entries.next_entry().await? {
192 let path = entry.path();
193 if path.extension().map(|e| e == "json").unwrap_or(false) {
194 if let Some(id_str) = path.file_stem().and_then(|s| s.to_str()) {
195 execution_ids.push(ExecutionId::from(id_str));
196 }
197 }
198 if execution_ids.len() >= limit {
199 break;
200 }
201 }
202
203 Ok(execution_ids)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::kernel::ExecutionState;
211 use tempfile::tempdir;
212
213 #[tokio::test]
214 async fn test_jsonl_state_store_snapshot() {
215 let dir = tempdir().unwrap();
216 let store = JsonlStateStore::new(dir.path().to_path_buf())
217 .await
218 .unwrap();
219
220 let exec_id = ExecutionId::new();
221 let tenant_id = TenantId::from("test");
222 let snapshot =
223 ExecutionSnapshot::new(exec_id.clone(), tenant_id, ExecutionState::Running, 5);
224
225 store.save_snapshot(snapshot.clone()).await.unwrap();
227
228 let loaded = store.load_snapshot(&exec_id).await.unwrap();
230 assert!(loaded.is_some());
231 let loaded = loaded.unwrap();
232 assert_eq!(loaded.execution_id, exec_id);
233 assert_eq!(loaded.state, ExecutionState::Running);
234 }
235
236 #[tokio::test]
237 async fn test_jsonl_state_store_kv() {
238 let dir = tempdir().unwrap();
239 let store = JsonlStateStore::new(dir.path().to_path_buf())
240 .await
241 .unwrap();
242
243 let key = "test:key:123";
244 let value = b"hello world";
245
246 store.set(key, value, None).await.unwrap();
248
249 let loaded = store.get(key).await.unwrap();
251 assert!(loaded.is_some());
252 assert_eq!(loaded.unwrap(), value);
253
254 store.delete(key).await.unwrap();
256 let loaded = store.get(key).await.unwrap();
257 assert!(loaded.is_none());
258 }
259}