use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result};
use super::event::Event;
use super::state::Funnel;
use crate::warehouse::funnel::{append_event, load_all_events};
use crate::warehouse::iceberg::IcebergWarehouse;
pub struct Store {
wh: Arc<IcebergWarehouse>,
root: PathBuf,
pub funnel: Funnel,
}
impl Store {
pub fn default_root(workspace_root: &Path) -> PathBuf {
workspace_root.join(".nornir").join("warehouse")
}
pub fn open(root: impl Into<PathBuf>) -> Result<Self> {
let root = root.into();
let wh = Arc::new(
IcebergWarehouse::open(&root)
.with_context(|| format!("open iceberg warehouse at {}", root.display()))?,
);
let events = wh
.block_on(async { load_all_events(&wh).await })
.with_context(|| format!("load funnel_events from {}", root.display()))?;
let mut funnel = Funnel::default();
for (i, ev) in events.iter().enumerate() {
funnel
.apply(ev)
.with_context(|| format!("replay funnel event {} of {}", i + 1, events.len()))?;
}
Ok(Self { wh, root, funnel })
}
pub fn record(&mut self, event: Event) -> Result<()> {
self.funnel
.apply(&event)
.context("validate event against current funnel state")?;
self.wh
.block_on(async { append_event(&self.wh, &event).await })
.context("append event to iceberg")?;
Ok(())
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn warehouse(&self) -> &Arc<IcebergWarehouse> {
&self.wh
}
pub async fn open_async(root: impl Into<PathBuf>) -> Result<Self> {
let root = root.into();
tokio::task::spawn_blocking(move || Self::open(root))
.await
.context("join blocking task for Store::open")?
}
pub async fn record_async(&mut self, event: Event) -> Result<()> {
self.funnel
.apply(&event)
.context("validate event against current funnel state")?;
let wh = Arc::clone(&self.wh);
let ev = event;
tokio::task::spawn_blocking(move || {
wh.block_on(async { append_event(&wh, &ev).await })
})
.await
.context("join blocking task for append_event")??;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::funnel::event::Event;
use crate::funnel::ids::IdeaId;
use chrono::Utc;
use tempfile::tempdir;
#[test]
fn roundtrip_idea_then_replay() {
let dir = tempdir().unwrap();
let root = dir.path().join("wh");
{
let mut s = Store::open(&root).unwrap();
s.record(Event::IdeaSubmitted {
id: IdeaId::seq(1),
source: "test".into(),
text: "first idea".into(),
refs: vec![],
ts: Utc::now(),
})
.unwrap();
assert_eq!(s.funnel.ideas.len(), 1);
}
let s2 = Store::open(&root).unwrap();
assert_eq!(s2.funnel.ideas.len(), 1, "ideas should persist across reopen");
assert!(s2.funnel.ideas.contains_key(&IdeaId::seq(1)));
}
}