rillflow 0.1.0-alpha.8

Rillflow — a lightweight document + event store for Rust, powered by Postgres.
Documentation
use anyhow::Result;
use rillflow::{
    Aggregate, AggregateRepository, Event, Expected, Store,
    snapshotter::{AggregateFolder, Snapshotter, SnapshotterConfig},
};
use serde_json::json;
use testcontainers::{
    GenericImage, ImageExt,
    core::{IntoContainerPort, WaitFor},
    runners::AsyncRunner,
};
use uuid::Uuid;

#[derive(Default, serde::Serialize)]
struct Counter(i32);

impl Aggregate for Counter {
    fn new() -> Self {
        Self(0)
    }
    fn apply(&mut self, env: &rillflow::EventEnvelope) {
        if env.typ == "Inc" {
            self.0 += 1;
        }
    }
    fn version(&self) -> i32 {
        self.0
    }
}

#[tokio::test]
async fn snapshotter_writes_snapshots_for_long_streams() -> Result<()> {
    let image = GenericImage::new("postgres", "16-alpine")
        .with_exposed_port(5432.tcp())
        .with_wait_for(WaitFor::message_on_stderr(
            "database system is ready to accept connections",
        ))
        .with_env_var("POSTGRES_USER", "postgres")
        .with_env_var("POSTGRES_PASSWORD", "postgres");
    let container = image.start().await?;
    let host = container.get_host().await?;
    let port = container.get_host_port_ipv4(5432).await?;
    let url = format!("postgres://postgres:postgres@{host}:{port}/postgres?sslmode=disable");

    let store = Store::connect(&url).await?;
    rillflow::testing::migrate_core_schema(store.pool()).await?;

    let stream = Uuid::new_v4();
    let events: Vec<_> = (0..150).map(|_| Event::new("Inc", &json!({}))).collect();
    store
        .events()
        .append_stream(stream, Expected::Any, events)
        .await?;

    let repo = AggregateRepository::new(store.events());
    let folder = AggregateFolder::<Counter>::new(repo);
    let snapshotter = Snapshotter::new(
        store.pool().clone(),
        std::sync::Arc::new(folder),
        SnapshotterConfig {
            threshold_events: 100,
            ..Default::default()
        },
    );
    let n = snapshotter.tick_once().await?;
    assert!(n >= 1);

    let (ver,): (i32,) = sqlx::query_as("select version from snapshots where stream_id = $1")
        .bind(stream)
        .fetch_one(store.pool())
        .await?;
    assert!(ver >= 100);

    Ok(())
}