1use crate::kernel::persistence::StateStore;
14use crate::streaming::{
15 EventLog, EventStore, InMemoryEventStore, JsonlEventStore, JsonlStateStore,
16};
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19
20#[derive(Debug, Clone)]
22pub struct EventStoreConfig {
23 pub store_type: String,
24 pub path: Option<String>,
25 pub dsn: Option<String>,
26}
27
28#[derive(Debug, Clone)]
30pub struct StateStoreConfig {
31 pub store_type: String,
32 pub path: Option<String>,
33 pub dsn: Option<String>,
34}
35
36pub async fn create_event_store(
45 config: &EventStoreConfig,
46 base_dir: &Path,
47) -> anyhow::Result<Arc<dyn EventStore>> {
48 match config.store_type.as_str() {
49 "jsonl" => {
50 let path = match &config.path {
51 Some(p) => base_dir.join(p),
52 None => base_dir.join("events"),
53 };
54 let store = JsonlEventStore::new(path).await?;
55 Ok(Arc::new(store))
56 }
57 "memory" => Ok(Arc::new(InMemoryEventStore::new())),
58 "sqlite" => {
59 tracing::warn!("SQLite event store not implemented, falling back to JSONL");
61 let path = match &config.path {
62 Some(p) => base_dir.join(p),
63 None => base_dir.join("events"),
64 };
65 let store = JsonlEventStore::new(path).await?;
66 Ok(Arc::new(store))
67 }
68 other => anyhow::bail!("Unknown event store type: {}", other),
69 }
70}
71
72pub async fn create_state_store(
81 config: &StateStoreConfig,
82 base_dir: &Path,
83) -> anyhow::Result<Arc<dyn StateStore>> {
84 match config.store_type.as_str() {
85 "jsonl" => {
86 let path = match &config.path {
87 Some(p) => base_dir.join(p),
88 None => base_dir.join("state"),
89 };
90 let store = JsonlStateStore::new(path).await?;
91 Ok(Arc::new(store))
92 }
93 "sqlite" => {
94 tracing::warn!("SQLite state store not implemented, falling back to JSONL");
96 let path = match &config.path {
97 Some(p) => base_dir.join(p),
98 None => base_dir.join("state"),
99 };
100 let store = JsonlStateStore::new(path).await?;
101 Ok(Arc::new(store))
102 }
103 other => anyhow::bail!("Unknown state store type: {}", other),
104 }
105}
106
107pub async fn create_event_log(
111 config: &EventStoreConfig,
112 base_dir: &Path,
113) -> anyhow::Result<EventLog> {
114 let store = create_event_store(config, base_dir).await?;
115 Ok(EventLog::new(store))
116}
117
118#[derive(Clone)]
122pub struct StorageContext {
123 pub event_store: Arc<dyn EventStore>,
124 pub state_store: Arc<dyn StateStore>,
125 pub base_dir: PathBuf,
126}
127
128impl StorageContext {
129 pub async fn new(
131 event_config: &EventStoreConfig,
132 state_config: &StateStoreConfig,
133 base_dir: PathBuf,
134 ) -> anyhow::Result<Self> {
135 let event_store = create_event_store(event_config, &base_dir).await?;
136 let state_store = create_state_store(state_config, &base_dir).await?;
137
138 Ok(Self {
139 event_store,
140 state_store,
141 base_dir,
142 })
143 }
144
145 pub fn in_memory() -> Self {
147 Self {
148 event_store: Arc::new(InMemoryEventStore::new()),
149 state_store: Arc::new(InMemoryStateStore::new()),
150 base_dir: PathBuf::from("."),
151 }
152 }
153
154 pub fn event_log(&self) -> EventLog {
156 EventLog::new(self.event_store.clone())
157 }
158}
159
160#[derive(Debug, Default)]
162pub struct InMemoryStateStore {
163 snapshots: std::sync::RwLock<
164 std::collections::HashMap<String, crate::kernel::persistence::ExecutionSnapshot>,
165 >,
166 kv: std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>,
167}
168
169impl InMemoryStateStore {
170 pub fn new() -> Self {
171 Self::default()
172 }
173}
174
175#[async_trait::async_trait]
176impl crate::kernel::persistence::StorageBackend for InMemoryStateStore {
177 fn name(&self) -> &str {
178 "memory"
179 }
180
181 fn requires_network(&self) -> bool {
182 false
183 }
184
185 async fn health_check(&self) -> anyhow::Result<()> {
186 Ok(())
187 }
188}
189
190#[async_trait::async_trait]
191impl StateStore for InMemoryStateStore {
192 async fn save_snapshot(
193 &self,
194 snapshot: crate::kernel::persistence::ExecutionSnapshot,
195 ) -> anyhow::Result<()> {
196 let mut snapshots = self
197 .snapshots
198 .write()
199 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
200 snapshots.insert(snapshot.execution_id.as_str().to_string(), snapshot);
201 Ok(())
202 }
203
204 async fn load_snapshot(
205 &self,
206 execution_id: &crate::kernel::ExecutionId,
207 ) -> anyhow::Result<Option<crate::kernel::persistence::ExecutionSnapshot>> {
208 let snapshots = self
209 .snapshots
210 .read()
211 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
212 Ok(snapshots.get(execution_id.as_str()).cloned())
213 }
214
215 async fn delete_snapshot(
216 &self,
217 execution_id: &crate::kernel::ExecutionId,
218 ) -> anyhow::Result<()> {
219 let mut snapshots = self
220 .snapshots
221 .write()
222 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
223 snapshots.remove(execution_id.as_str());
224 Ok(())
225 }
226
227 async fn set(
228 &self,
229 key: &str,
230 value: &[u8],
231 _ttl: Option<std::time::Duration>,
232 ) -> anyhow::Result<()> {
233 let mut kv = self
234 .kv
235 .write()
236 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
237 kv.insert(key.to_string(), value.to_vec());
238 Ok(())
239 }
240
241 async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
242 let kv = self
243 .kv
244 .read()
245 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
246 Ok(kv.get(key).cloned())
247 }
248
249 async fn delete(&self, key: &str) -> anyhow::Result<()> {
250 let mut kv = self
251 .kv
252 .write()
253 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
254 kv.remove(key);
255 Ok(())
256 }
257
258 async fn list_snapshots(
259 &self,
260 _tenant_id: &crate::kernel::TenantId,
261 limit: usize,
262 ) -> anyhow::Result<Vec<crate::kernel::ExecutionId>> {
263 let snapshots = self
264 .snapshots
265 .read()
266 .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
267 Ok(snapshots
268 .keys()
269 .take(limit)
270 .map(|k| crate::kernel::ExecutionId::from(k.as_str()))
271 .collect())
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use tempfile::tempdir;
279
280 #[tokio::test]
281 async fn test_create_jsonl_event_store() {
282 let dir = tempdir().unwrap();
283 let config = EventStoreConfig {
284 store_type: "jsonl".to_string(),
285 path: Some("events".to_string()),
286 dsn: None,
287 };
288
289 let _store = create_event_store(&config, dir.path()).await.unwrap();
290
291 assert!(dir.path().join("events").exists());
293 }
294
295 #[tokio::test]
296 async fn test_create_storage_context() {
297 let dir = tempdir().unwrap();
298 let event_config = EventStoreConfig {
299 store_type: "jsonl".to_string(),
300 path: Some("events".to_string()),
301 dsn: None,
302 };
303 let state_config = StateStoreConfig {
304 store_type: "jsonl".to_string(),
305 path: Some("state".to_string()),
306 dsn: None,
307 };
308
309 let _ctx = StorageContext::new(&event_config, &state_config, dir.path().to_path_buf())
310 .await
311 .unwrap();
312
313 assert!(dir.path().join("events").exists());
314 assert!(dir.path().join("state").exists());
315 }
316}