rillflow 0.1.0-alpha.1

Rillflow — a lightweight document + event store for Rust, powered by Postgres.
Documentation
use crate::{Error, Result};
use serde::Serialize;
use serde_json::Value;
use sqlx::{PgPool, Row};
use uuid::Uuid;

#[derive(Clone, Copy, Debug)]
pub enum Expected {
    Any,
    NoStream,
    Exact(i32),
}

#[derive(Clone, Debug)]
pub struct Event {
    pub typ: String,
    pub body: Value,
}

impl Event {
    pub fn new<T: Serialize>(typ: impl Into<String>, body: &T) -> Self {
        Self {
            typ: typ.into(),
            body: serde_json::to_value(body).expect("failed to serialize event body"),
        }
    }
}

pub struct Events {
    pub(crate) pool: PgPool,
}

impl Events {
    pub async fn append_stream(
        &self,
        stream_id: Uuid,
        expected: Expected,
        events: Vec<Event>,
    ) -> Result<()> {
        if events.is_empty() {
            return Ok(());
        }

        let mut tx = self.pool.begin().await?;

        let current: i32 = sqlx::query_scalar::<_, Option<i32>>(
            "select max(stream_seq) from events where stream_id = $1",
        )
        .bind(stream_id)
        .fetch_one(&mut *tx)
        .await?
        .unwrap_or(0);

        match expected {
            Expected::Any => {}
            Expected::NoStream if current != 0 => return Err(Error::VersionConflict),
            Expected::Exact(value) if value != current => return Err(Error::VersionConflict),
            _ => {}
        }

        let mut seq = current;
        for event in events {
            seq += 1;
            sqlx::query(
                "insert into events (stream_id, stream_seq, event_type, body) values ($1, $2, $3, $4)"
            )
            .bind(stream_id)
            .bind(seq)
            .bind(&event.typ)
            .bind(&event.body)
            .execute(&mut *tx)
            .await?;
        }

        tx.commit().await?;
        Ok(())
    }

    pub async fn read_stream(&self, stream_id: Uuid) -> Result<Vec<(i32, Event)>> {
        let rows = sqlx::query(
            "select stream_seq, event_type, body from events where stream_id = $1 order by stream_seq asc"
        )
        .bind(stream_id)
        .fetch_all(&self.pool)
        .await?;

        Ok(rows
            .into_iter()
            .map(|row| {
                (
                    row.get::<i32, _>("stream_seq"),
                    Event {
                        typ: row.get::<String, _>("event_type"),
                        body: row.get::<Value, _>("body"),
                    },
                )
            })
            .collect())
    }
}