Skip to main content

enact_core/storage/
mod.rs

1//! Storage Factory - Create storage backends from configuration
2//!
3//! This module provides factory functions to create EventStore and StateStore
4//! instances based on configuration. It reads from `config.yaml` and creates
5//! the appropriate backend (jsonl, sqlite, etc.).
6//!
7//! ## Supported Backends
8//!
9//! - **jsonl**: File-based JSONL storage (default, good for testing/development)
10//! - **sqlite**: SQLite-based storage (planned)
11//! - **memory**: In-memory storage (for testing only)
12
13use crate::kernel::persistence::StateStore;
14use crate::streaming::{
15    EventLog, EventStore, InMemoryEventStore, JsonlEventStore, JsonlStateStore,
16};
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19
20/// Storage configuration (mirrors enact_config::storage::EventStore)
21#[derive(Debug, Clone)]
22pub struct EventStoreConfig {
23    pub store_type: String,
24    pub path: Option<String>,
25    pub dsn: Option<String>,
26}
27
28/// Storage configuration (mirrors enact_config::storage::StateStore)
29#[derive(Debug, Clone)]
30pub struct StateStoreConfig {
31    pub store_type: String,
32    pub path: Option<String>,
33    pub dsn: Option<String>,
34}
35
36/// Create an EventStore from configuration
37///
38/// # Arguments
39/// * `config` - Event store configuration
40/// * `base_dir` - Base directory (typically ENACT_HOME) for relative paths
41///
42/// # Returns
43/// An Arc-wrapped EventStore implementation
44pub async fn create_event_store(
45    config: &EventStoreConfig,
46    base_dir: &Path,
47) -> anyhow::Result<Arc<dyn EventStore>> {
48    match config.store_type.as_str() {
49        "jsonl" => {
50            let path = match &config.path {
51                Some(p) => base_dir.join(p),
52                None => base_dir.join("events"),
53            };
54            let store = JsonlEventStore::new(path).await?;
55            Ok(Arc::new(store))
56        }
57        "memory" => Ok(Arc::new(InMemoryEventStore::new())),
58        "sqlite" => {
59            // For now, fallback to JSONL since SQLite isn't implemented
60            tracing::warn!("SQLite event store not implemented, falling back to JSONL");
61            let path = match &config.path {
62                Some(p) => base_dir.join(p),
63                None => base_dir.join("events"),
64            };
65            let store = JsonlEventStore::new(path).await?;
66            Ok(Arc::new(store))
67        }
68        other => anyhow::bail!("Unknown event store type: {}", other),
69    }
70}
71
72/// Create a StateStore from configuration
73///
74/// # Arguments
75/// * `config` - State store configuration
76/// * `base_dir` - Base directory (typically ENACT_HOME) for relative paths
77///
78/// # Returns
79/// An Arc-wrapped StateStore implementation
80pub async fn create_state_store(
81    config: &StateStoreConfig,
82    base_dir: &Path,
83) -> anyhow::Result<Arc<dyn StateStore>> {
84    match config.store_type.as_str() {
85        "jsonl" => {
86            let path = match &config.path {
87                Some(p) => base_dir.join(p),
88                None => base_dir.join("state"),
89            };
90            let store = JsonlStateStore::new(path).await?;
91            Ok(Arc::new(store))
92        }
93        "sqlite" => {
94            // For now, fallback to JSONL since SQLite isn't implemented
95            tracing::warn!("SQLite state store not implemented, falling back to JSONL");
96            let path = match &config.path {
97                Some(p) => base_dir.join(p),
98                None => base_dir.join("state"),
99            };
100            let store = JsonlStateStore::new(path).await?;
101            Ok(Arc::new(store))
102        }
103        other => anyhow::bail!("Unknown state store type: {}", other),
104    }
105}
106
107/// Create an EventLog with the configured backend
108///
109/// This is a convenience function that creates an EventStore and wraps it in an EventLog.
110pub async fn create_event_log(
111    config: &EventStoreConfig,
112    base_dir: &Path,
113) -> anyhow::Result<EventLog> {
114    let store = create_event_store(config, base_dir).await?;
115    Ok(EventLog::new(store))
116}
117
118/// Storage context holding all configured stores
119///
120/// Use this to pass storage configuration throughout the application.
121#[derive(Clone)]
122pub struct StorageContext {
123    pub event_store: Arc<dyn EventStore>,
124    pub state_store: Arc<dyn StateStore>,
125    pub base_dir: PathBuf,
126}
127
128impl StorageContext {
129    /// Create a new storage context from configuration
130    pub async fn new(
131        event_config: &EventStoreConfig,
132        state_config: &StateStoreConfig,
133        base_dir: PathBuf,
134    ) -> anyhow::Result<Self> {
135        let event_store = create_event_store(event_config, &base_dir).await?;
136        let state_store = create_state_store(state_config, &base_dir).await?;
137
138        Ok(Self {
139            event_store,
140            state_store,
141            base_dir,
142        })
143    }
144
145    /// Create an in-memory storage context (for testing)
146    pub fn in_memory() -> Self {
147        Self {
148            event_store: Arc::new(InMemoryEventStore::new()),
149            state_store: Arc::new(InMemoryStateStore::new()),
150            base_dir: PathBuf::from("."),
151        }
152    }
153
154    /// Get the event log
155    pub fn event_log(&self) -> EventLog {
156        EventLog::new(self.event_store.clone())
157    }
158}
159
160/// In-memory state store for testing
161#[derive(Debug, Default)]
162pub struct InMemoryStateStore {
163    snapshots: std::sync::RwLock<
164        std::collections::HashMap<String, crate::kernel::persistence::ExecutionSnapshot>,
165    >,
166    kv: std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>,
167}
168
169impl InMemoryStateStore {
170    pub fn new() -> Self {
171        Self::default()
172    }
173}
174
175#[async_trait::async_trait]
176impl crate::kernel::persistence::StorageBackend for InMemoryStateStore {
177    fn name(&self) -> &str {
178        "memory"
179    }
180
181    fn requires_network(&self) -> bool {
182        false
183    }
184
185    async fn health_check(&self) -> anyhow::Result<()> {
186        Ok(())
187    }
188}
189
190#[async_trait::async_trait]
191impl StateStore for InMemoryStateStore {
192    async fn save_snapshot(
193        &self,
194        snapshot: crate::kernel::persistence::ExecutionSnapshot,
195    ) -> anyhow::Result<()> {
196        let mut snapshots = self
197            .snapshots
198            .write()
199            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
200        snapshots.insert(snapshot.execution_id.as_str().to_string(), snapshot);
201        Ok(())
202    }
203
204    async fn load_snapshot(
205        &self,
206        execution_id: &crate::kernel::ExecutionId,
207    ) -> anyhow::Result<Option<crate::kernel::persistence::ExecutionSnapshot>> {
208        let snapshots = self
209            .snapshots
210            .read()
211            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
212        Ok(snapshots.get(execution_id.as_str()).cloned())
213    }
214
215    async fn delete_snapshot(
216        &self,
217        execution_id: &crate::kernel::ExecutionId,
218    ) -> anyhow::Result<()> {
219        let mut snapshots = self
220            .snapshots
221            .write()
222            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
223        snapshots.remove(execution_id.as_str());
224        Ok(())
225    }
226
227    async fn set(
228        &self,
229        key: &str,
230        value: &[u8],
231        _ttl: Option<std::time::Duration>,
232    ) -> anyhow::Result<()> {
233        let mut kv = self
234            .kv
235            .write()
236            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
237        kv.insert(key.to_string(), value.to_vec());
238        Ok(())
239    }
240
241    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
242        let kv = self
243            .kv
244            .read()
245            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
246        Ok(kv.get(key).cloned())
247    }
248
249    async fn delete(&self, key: &str) -> anyhow::Result<()> {
250        let mut kv = self
251            .kv
252            .write()
253            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
254        kv.remove(key);
255        Ok(())
256    }
257
258    async fn list_snapshots(
259        &self,
260        _tenant_id: &crate::kernel::TenantId,
261        limit: usize,
262    ) -> anyhow::Result<Vec<crate::kernel::ExecutionId>> {
263        let snapshots = self
264            .snapshots
265            .read()
266            .map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
267        Ok(snapshots
268            .keys()
269            .take(limit)
270            .map(|k| crate::kernel::ExecutionId::from(k.as_str()))
271            .collect())
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use tempfile::tempdir;
279
280    #[tokio::test]
281    async fn test_create_jsonl_event_store() {
282        let dir = tempdir().unwrap();
283        let config = EventStoreConfig {
284            store_type: "jsonl".to_string(),
285            path: Some("events".to_string()),
286            dsn: None,
287        };
288
289        let _store = create_event_store(&config, dir.path()).await.unwrap();
290
291        // Verify directory was created
292        assert!(dir.path().join("events").exists());
293    }
294
295    #[tokio::test]
296    async fn test_create_storage_context() {
297        let dir = tempdir().unwrap();
298        let event_config = EventStoreConfig {
299            store_type: "jsonl".to_string(),
300            path: Some("events".to_string()),
301            dsn: None,
302        };
303        let state_config = StateStoreConfig {
304            store_type: "jsonl".to_string(),
305            path: Some("state".to_string()),
306            dsn: None,
307        };
308
309        let _ctx = StorageContext::new(&event_config, &state_config, dir.path().to_path_buf())
310            .await
311            .unwrap();
312
313        assert!(dir.path().join("events").exists());
314        assert!(dir.path().join("state").exists());
315    }
316}