rillflow 0.1.0-alpha.8

Rillflow — a lightweight document + event store for Rust, powered by Postgres.
Documentation
use rillflow::{
    Aggregate, AggregateRepository, SessionContext, Store, events::AppendOptions,
    store::TenantStrategy,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;

#[derive(Serialize, Deserialize, Clone, Debug)]
struct Customer {
    id: Uuid,
    email: String,
    tier: String,
}

#[derive(Clone, Debug, Default, Serialize)]
struct VisitCounter {
    total: i32,
}

impl Aggregate for VisitCounter {
    fn new() -> Self {
        Self { total: 0 }
    }

    fn apply(&mut self, env: &rillflow::EventEnvelope) {
        if env.typ == "CustomerVisited" {
            self.total += 1;
        }
    }

    fn version(&self) -> i32 {
        self.total
    }
}

#[tokio::main]
async fn main() -> rillflow::Result<()> {
    let url = std::env::var("DATABASE_URL")
        .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/postgres".into());

    let store = Store::builder(&url)
        .session_defaults(AppendOptions {
            headers: Some(json!({"source": "session-example"})),
            causation_id: None,
            correlation_id: None,
        })
        .session_context(
            SessionContext::builder()
                .tenant("default")
                .headers(json!({"environment": "dev"})),
        )
        .tenant_strategy(TenantStrategy::SchemaPerTenant)
        .session_advisory_locks(true)
        .build()
        .await?;

    store.ensure_tenant("default").await?;
    store.ensure_tenant("acme").await?;

    rillflow::testing::migrate_core_schema(store.pool()).await?;

    let customer_id = Uuid::new_v4();

    let mut session = store.session();
    let customer = Customer {
        id: customer_id,
        email: "alice@example.com".into(),
        tier: "starter".into(),
    };
    session.store(customer_id, &customer)?;
    session.enqueue_event(
        customer_id,
        rillflow::Expected::Any,
        rillflow::Event::new("CustomerRegistered", &customer),
    )?;
    session.save_changes().await?;

    let upgraded = Customer {
        id: customer_id,
        email: "alice@example.com".into(),
        tier: "pro".into(),
    };
    let mut session = store.session();
    session.store(customer_id, &upgraded)?;
    session.set_event_idempotency_key("req-upgrade-1");
    session
        .context_mut()
        .merge_headers(json!({"upgraded": true}));
    session.context_mut().correlation_id = Some(Uuid::new_v4());
    session.enqueue_event(
        customer_id,
        rillflow::Expected::Any,
        rillflow::Event::new("CustomerTierChanged", &json!({"tier": "pro"})),
    )?;
    session.save_changes().await?;

    let mut acme_session = store.session();
    acme_session.context_mut().tenant = Some("acme".into());
    acme_session.store(customer_id, &customer)?;
    acme_session.save_changes().await?;

    let mut session = store.session();
    session.context_mut().tenant = Some("default".into());
    let repo = AggregateRepository::new(store.events());
    let mut aggregates = session.aggregates(&repo);
    let visits_stream = customer_id;

    aggregates.commit(
        visits_stream,
        rillflow::Expected::Any,
        vec![rillflow::Event::new("CustomerVisited", &json!({}))],
    )?;
    aggregates.commit_and_snapshot(
        visits_stream,
        &VisitCounter::default(),
        vec![rillflow::Event::new("CustomerVisited", &json!({}))],
        2,
    )?;
    session.save_changes().await?;

    let body = store
        .events()
        .read_stream_envelopes(customer_id)
        .await?
        .into_iter()
        .map(|env| (env.stream_seq, env.typ, env.headers))
        .collect::<Vec<_>>();

    println!("events: {:?}", body);

    Ok(())
}