use crate::kernel::persistence::StateStore;
use crate::streaming::{
EventLog, EventStore, InMemoryEventStore, JsonlEventStore, JsonlStateStore,
};
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct EventStoreConfig {
pub store_type: String,
pub path: Option<String>,
pub dsn: Option<String>,
}
#[derive(Debug, Clone)]
pub struct StateStoreConfig {
pub store_type: String,
pub path: Option<String>,
pub dsn: Option<String>,
}
pub async fn create_event_store(
config: &EventStoreConfig,
base_dir: &Path,
) -> anyhow::Result<Arc<dyn EventStore>> {
match config.store_type.as_str() {
"jsonl" => {
let path = match &config.path {
Some(p) => base_dir.join(p),
None => base_dir.join("events"),
};
let store = JsonlEventStore::new(path).await?;
Ok(Arc::new(store))
}
"memory" => Ok(Arc::new(InMemoryEventStore::new())),
"sqlite" => {
tracing::warn!("SQLite event store not implemented, falling back to JSONL");
let path = match &config.path {
Some(p) => base_dir.join(p),
None => base_dir.join("events"),
};
let store = JsonlEventStore::new(path).await?;
Ok(Arc::new(store))
}
other => anyhow::bail!("Unknown event store type: {}", other),
}
}
pub async fn create_state_store(
config: &StateStoreConfig,
base_dir: &Path,
) -> anyhow::Result<Arc<dyn StateStore>> {
match config.store_type.as_str() {
"jsonl" => {
let path = match &config.path {
Some(p) => base_dir.join(p),
None => base_dir.join("state"),
};
let store = JsonlStateStore::new(path).await?;
Ok(Arc::new(store))
}
"sqlite" => {
tracing::warn!("SQLite state store not implemented, falling back to JSONL");
let path = match &config.path {
Some(p) => base_dir.join(p),
None => base_dir.join("state"),
};
let store = JsonlStateStore::new(path).await?;
Ok(Arc::new(store))
}
other => anyhow::bail!("Unknown state store type: {}", other),
}
}
pub async fn create_event_log(
config: &EventStoreConfig,
base_dir: &Path,
) -> anyhow::Result<EventLog> {
let store = create_event_store(config, base_dir).await?;
Ok(EventLog::new(store))
}
#[derive(Clone)]
pub struct StorageContext {
pub event_store: Arc<dyn EventStore>,
pub state_store: Arc<dyn StateStore>,
pub base_dir: PathBuf,
}
impl StorageContext {
pub async fn new(
event_config: &EventStoreConfig,
state_config: &StateStoreConfig,
base_dir: PathBuf,
) -> anyhow::Result<Self> {
let event_store = create_event_store(event_config, &base_dir).await?;
let state_store = create_state_store(state_config, &base_dir).await?;
Ok(Self {
event_store,
state_store,
base_dir,
})
}
pub fn in_memory() -> Self {
Self {
event_store: Arc::new(InMemoryEventStore::new()),
state_store: Arc::new(InMemoryStateStore::new()),
base_dir: PathBuf::from("."),
}
}
pub fn event_log(&self) -> EventLog {
EventLog::new(self.event_store.clone())
}
}
#[derive(Debug, Default)]
pub struct InMemoryStateStore {
snapshots: std::sync::RwLock<
std::collections::HashMap<String, crate::kernel::persistence::ExecutionSnapshot>,
>,
kv: std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>,
}
impl InMemoryStateStore {
pub fn new() -> Self {
Self::default()
}
}
#[async_trait::async_trait]
impl crate::kernel::persistence::StorageBackend for InMemoryStateStore {
fn name(&self) -> &str {
"memory"
}
fn requires_network(&self) -> bool {
false
}
async fn health_check(&self) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait::async_trait]
impl StateStore for InMemoryStateStore {
async fn save_snapshot(
&self,
snapshot: crate::kernel::persistence::ExecutionSnapshot,
) -> anyhow::Result<()> {
let mut snapshots = self
.snapshots
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
snapshots.insert(snapshot.execution_id.as_str().to_string(), snapshot);
Ok(())
}
async fn load_snapshot(
&self,
execution_id: &crate::kernel::ExecutionId,
) -> anyhow::Result<Option<crate::kernel::persistence::ExecutionSnapshot>> {
let snapshots = self
.snapshots
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
Ok(snapshots.get(execution_id.as_str()).cloned())
}
async fn delete_snapshot(
&self,
execution_id: &crate::kernel::ExecutionId,
) -> anyhow::Result<()> {
let mut snapshots = self
.snapshots
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
snapshots.remove(execution_id.as_str());
Ok(())
}
async fn set(
&self,
key: &str,
value: &[u8],
_ttl: Option<std::time::Duration>,
) -> anyhow::Result<()> {
let mut kv = self
.kv
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
kv.insert(key.to_string(), value.to_vec());
Ok(())
}
async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
let kv = self
.kv
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
Ok(kv.get(key).cloned())
}
async fn delete(&self, key: &str) -> anyhow::Result<()> {
let mut kv = self
.kv
.write()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
kv.remove(key);
Ok(())
}
async fn list_snapshots(
&self,
_tenant_id: &crate::kernel::TenantId,
limit: usize,
) -> anyhow::Result<Vec<crate::kernel::ExecutionId>> {
let snapshots = self
.snapshots
.read()
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))?;
Ok(snapshots
.keys()
.take(limit)
.map(|k| crate::kernel::ExecutionId::from(k.as_str()))
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_create_jsonl_event_store() {
let dir = tempdir().unwrap();
let config = EventStoreConfig {
store_type: "jsonl".to_string(),
path: Some("events".to_string()),
dsn: None,
};
let _store = create_event_store(&config, dir.path()).await.unwrap();
assert!(dir.path().join("events").exists());
}
#[tokio::test]
async fn test_create_storage_context() {
let dir = tempdir().unwrap();
let event_config = EventStoreConfig {
store_type: "jsonl".to_string(),
path: Some("events".to_string()),
dsn: None,
};
let state_config = StateStoreConfig {
store_type: "jsonl".to_string(),
path: Some("state".to_string()),
dsn: None,
};
let _ctx = StorageContext::new(&event_config, &state_config, dir.path().to_path_buf())
.await
.unwrap();
assert!(dir.path().join("events").exists());
assert!(dir.path().join("state").exists());
}
}