Skip to main content

enact_core/streaming/
jsonl_state_store.rs

1//! JSONL State Store - File-based state snapshot storage
2//!
3//! Stores execution snapshots and key-value state in JSONL format.
4//! - Snapshots: `{dir}/snapshots/{execution_id}.json`
5//! - Key-values: `{dir}/kv/{key}.json`
6
7use crate::kernel::persistence::{ExecutionSnapshot, StateStore, StorageBackend};
8use crate::kernel::{ExecutionId, TenantId};
9use async_trait::async_trait;
10use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs;
14use tracing::debug;
15
16/// JSONL-based state store
17///
18/// Stores snapshots as individual JSON files and key-value pairs in a separate directory.
19pub struct JsonlStateStore {
20    /// Base directory for state files
21    base_dir: PathBuf,
22}
23
24impl JsonlStateStore {
25    /// Create a new JSONL state store at the given directory
26    pub async fn new(base_dir: PathBuf) -> anyhow::Result<Self> {
27        // Ensure directories exist
28        fs::create_dir_all(base_dir.join("snapshots")).await?;
29        fs::create_dir_all(base_dir.join("kv")).await?;
30
31        Ok(Self { base_dir })
32    }
33
34    /// Get the file path for a snapshot
35    fn snapshot_path(&self, execution_id: &ExecutionId) -> PathBuf {
36        self.base_dir
37            .join("snapshots")
38            .join(format!("{}.json", execution_id.as_str()))
39    }
40
41    /// Get the file path for a key-value entry
42    fn kv_path(&self, key: &str) -> PathBuf {
43        // Sanitize key for filesystem (replace special chars)
44        let safe_key = key.replace(['/', ':', '\\'], "_");
45        self.base_dir.join("kv").join(format!("{}.json", safe_key))
46    }
47}
48
49impl std::fmt::Debug for JsonlStateStore {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("JsonlStateStore")
52            .field("base_dir", &self.base_dir)
53            .finish()
54    }
55}
56
57#[async_trait]
58impl StorageBackend for JsonlStateStore {
59    fn name(&self) -> &str {
60        "jsonl"
61    }
62
63    fn requires_network(&self) -> bool {
64        false
65    }
66
67    async fn health_check(&self) -> anyhow::Result<()> {
68        // Check that directories exist and are writable
69        if !self.base_dir.exists() {
70            anyhow::bail!("State store directory does not exist: {:?}", self.base_dir);
71        }
72        Ok(())
73    }
74
75    async fn shutdown(&self) -> anyhow::Result<()> {
76        Ok(())
77    }
78}
79
80#[async_trait]
81impl StateStore for JsonlStateStore {
82    async fn save_snapshot(&self, snapshot: ExecutionSnapshot) -> anyhow::Result<()> {
83        let path = self.snapshot_path(&snapshot.execution_id);
84
85        let json = serde_json::to_string_pretty(&snapshot)?;
86        fs::write(&path, json).await?;
87
88        debug!(
89            "Saved snapshot for execution {} to {:?}",
90            snapshot.execution_id.as_str(),
91            path
92        );
93
94        Ok(())
95    }
96
97    async fn load_snapshot(
98        &self,
99        execution_id: &ExecutionId,
100    ) -> anyhow::Result<Option<ExecutionSnapshot>> {
101        let path = self.snapshot_path(execution_id);
102
103        if !path.exists() {
104            return Ok(None);
105        }
106
107        let json = fs::read_to_string(&path).await?;
108        let snapshot: ExecutionSnapshot = serde_json::from_str(&json)?;
109
110        debug!(
111            "Loaded snapshot for execution {} from {:?}",
112            execution_id.as_str(),
113            path
114        );
115
116        Ok(Some(snapshot))
117    }
118
119    async fn delete_snapshot(&self, execution_id: &ExecutionId) -> anyhow::Result<()> {
120        let path = self.snapshot_path(execution_id);
121
122        if path.exists() {
123            fs::remove_file(&path).await?;
124            debug!(
125                "Deleted snapshot for execution {} from {:?}",
126                execution_id.as_str(),
127                path
128            );
129        }
130
131        Ok(())
132    }
133
134    async fn set(&self, key: &str, value: &[u8], _ttl: Option<Duration>) -> anyhow::Result<()> {
135        let path = self.kv_path(key);
136
137        // Store as JSON with metadata
138        let entry = serde_json::json!({
139            "key": key,
140            "value": BASE64.encode(value),
141            "timestamp": chrono::Utc::now().to_rfc3339(),
142        });
143
144        let json = serde_json::to_string_pretty(&entry)?;
145        fs::write(&path, json).await?;
146
147        // Note: TTL is not implemented for JSONL store (would need background cleanup)
148        debug!("Set key {} to {:?}", key, path);
149
150        Ok(())
151    }
152
153    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
154        let path = self.kv_path(key);
155
156        if !path.exists() {
157            return Ok(None);
158        }
159
160        let json = fs::read_to_string(&path).await?;
161        let entry: serde_json::Value = serde_json::from_str(&json)?;
162
163        if let Some(value_str) = entry.get("value").and_then(|v| v.as_str()) {
164            let value = BASE64.decode(value_str)?;
165            return Ok(Some(value));
166        }
167
168        Ok(None)
169    }
170
171    async fn delete(&self, key: &str) -> anyhow::Result<()> {
172        let path = self.kv_path(key);
173
174        if path.exists() {
175            fs::remove_file(&path).await?;
176            debug!("Deleted key {} from {:?}", key, path);
177        }
178
179        Ok(())
180    }
181
182    async fn list_snapshots(
183        &self,
184        _tenant_id: &TenantId,
185        limit: usize,
186    ) -> anyhow::Result<Vec<ExecutionId>> {
187        let snapshot_dir = self.base_dir.join("snapshots");
188        let mut execution_ids = Vec::new();
189
190        let mut entries = fs::read_dir(&snapshot_dir).await?;
191        while let Some(entry) = entries.next_entry().await? {
192            let path = entry.path();
193            if path.extension().map(|e| e == "json").unwrap_or(false) {
194                if let Some(id_str) = path.file_stem().and_then(|s| s.to_str()) {
195                    execution_ids.push(ExecutionId::from(id_str));
196                }
197            }
198            if execution_ids.len() >= limit {
199                break;
200            }
201        }
202
203        Ok(execution_ids)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::kernel::ExecutionState;
211    use tempfile::tempdir;
212
213    #[tokio::test]
214    async fn test_jsonl_state_store_snapshot() {
215        let dir = tempdir().unwrap();
216        let store = JsonlStateStore::new(dir.path().to_path_buf())
217            .await
218            .unwrap();
219
220        let exec_id = ExecutionId::new();
221        let tenant_id = TenantId::from("test");
222        let snapshot =
223            ExecutionSnapshot::new(exec_id.clone(), tenant_id, ExecutionState::Running, 5);
224
225        // Save
226        store.save_snapshot(snapshot.clone()).await.unwrap();
227
228        // Load
229        let loaded = store.load_snapshot(&exec_id).await.unwrap();
230        assert!(loaded.is_some());
231        let loaded = loaded.unwrap();
232        assert_eq!(loaded.execution_id, exec_id);
233        assert_eq!(loaded.state, ExecutionState::Running);
234    }
235
236    #[tokio::test]
237    async fn test_jsonl_state_store_kv() {
238        let dir = tempdir().unwrap();
239        let store = JsonlStateStore::new(dir.path().to_path_buf())
240            .await
241            .unwrap();
242
243        let key = "test:key:123";
244        let value = b"hello world";
245
246        // Set
247        store.set(key, value, None).await.unwrap();
248
249        // Get
250        let loaded = store.get(key).await.unwrap();
251        assert!(loaded.is_some());
252        assert_eq!(loaded.unwrap(), value);
253
254        // Delete
255        store.delete(key).await.unwrap();
256        let loaded = store.get(key).await.unwrap();
257        assert!(loaded.is_none());
258    }
259}