rillflow 0.1.0-alpha.8

Rillflow — a lightweight document + event store for Rust, powered by Postgres.
Documentation
use crate::{Error, Result, metrics};
use serde::Serialize;
use serde_json::Value;
use sqlx::{PgPool, Postgres, Transaction};
use uuid::Uuid;

#[derive(Clone, Copy, Debug)]
pub enum Expected {
    Any,
    NoStream,
    Exact(i32),
}

#[derive(Clone, Debug)]
pub struct Event {
    pub typ: String,
    pub body: Value,
}

impl Event {
    pub fn new<T: Serialize>(typ: impl Into<String>, body: &T) -> Self {
        Self {
            typ: typ.into(),
            body: serde_json::to_value(body).expect("failed to serialize event body"),
        }
    }
}

#[derive(Default, Clone, Debug)]
pub struct AppendOptions {
    pub headers: Option<Value>,
    pub causation_id: Option<Uuid>,
    pub correlation_id: Option<Uuid>,
}

#[derive(Clone)]
pub struct Events {
    pub(crate) pool: PgPool,
    pub(crate) use_advisory_lock: bool,
}

#[derive(Clone, Debug)]
pub struct EventEnvelope {
    pub global_seq: i64,
    pub stream_id: Uuid,
    pub stream_seq: i32,
    pub typ: String,
    pub body: Value,
    pub headers: Value,
    pub causation_id: Option<Uuid>,
    pub correlation_id: Option<Uuid>,
    pub created_at: chrono::DateTime<chrono::Utc>,
}

impl Events {
    pub fn with_advisory_locks(mut self) -> Self {
        self.use_advisory_lock = true;
        self
    }

    pub async fn append_stream(
        &self,
        stream_id: Uuid,
        expected: Expected,
        events: Vec<Event>,
    ) -> Result<()> {
        self.append_with(stream_id, expected, events, &AppendOptions::default())
            .await
    }

    pub async fn append_stream_with_headers(
        &self,
        stream_id: Uuid,
        expected: Expected,
        events: Vec<Event>,
        headers: &Value,
        causation_id: Option<Uuid>,
        correlation_id: Option<Uuid>,
    ) -> Result<()> {
        let opts = AppendOptions {
            headers: Some(headers.clone()),
            causation_id,
            correlation_id,
        };
        self.append_with(stream_id, expected, events, &opts).await
    }

    pub async fn read_stream(&self, stream_id: Uuid) -> Result<Vec<(i32, Event)>> {
        let rows: Vec<(i32, String, Value)> = sqlx::query_as(
            "select stream_seq, event_type, body from events where stream_id = $1 order by stream_seq asc",
        )
        .bind(stream_id)
        .fetch_all(&self.pool)
        .await?;

        Ok(rows
            .into_iter()
            .map(|(seq, typ, body)| (seq, Event { typ, body }))
            .collect())
    }

    pub async fn read_stream_envelopes(&self, stream_id: Uuid) -> Result<Vec<EventEnvelope>> {
        #[allow(clippy::type_complexity)]
        let rows: Vec<(
            i64,
            Uuid,
            i32,
            String,
            Value,
            Value,
            Option<Uuid>,
            Option<Uuid>,
            chrono::DateTime<chrono::Utc>,
        )> = sqlx::query_as(
            r#"select global_seq, stream_id, stream_seq, event_type, body, headers, causation_id, correlation_id, created_at
                from events where stream_id = $1 order by stream_seq asc"#,
        )
        .bind(stream_id)
        .fetch_all(&self.pool)
        .await?;

        Ok(rows
            .into_iter()
            .map(
                |(
                    global_seq,
                    stream_id,
                    stream_seq,
                    typ,
                    body,
                    headers,
                    causation_id,
                    correlation_id,
                    created_at,
                )| EventEnvelope {
                    global_seq,
                    stream_id,
                    stream_seq,
                    typ,
                    body,
                    headers,
                    causation_id,
                    correlation_id,
                    created_at,
                },
            )
            .collect())
    }

    pub async fn append_with(
        &self,
        stream_id: Uuid,
        expected: Expected,
        events: Vec<Event>,
        opts: &AppendOptions,
    ) -> Result<()> {
        if events.is_empty() {
            return Ok(());
        }

        let mut tx = self.pool.begin().await?;
        let res = self
            .append_with_tx_internal(&mut tx, stream_id, expected, &events, opts)
            .await;
        match res {
            Ok(()) => {
                tx.commit().await?;
                metrics::record_event_appends(None, events.len() as u64);
                Ok(())
            }
            Err(err) => {
                metrics::record_event_conflict(None);
                Err(err)
            }
        }
    }

    async fn append_with_tx_internal(
        &self,
        tx: &mut Transaction<'_, Postgres>,
        stream_id: Uuid,
        expected: Expected,
        events: &[Event],
        opts: &AppendOptions,
    ) -> Result<()> {
        if events.is_empty() {
            return Ok(());
        }

        if self.use_advisory_lock {
            let key = stream_id.to_string();
            sqlx::query("select pg_advisory_xact_lock(hashtext($1)::bigint)")
                .bind(&key)
                .execute(&mut **tx)
                .await?;
        }

        let current: i32 = sqlx::query_scalar::<_, Option<i32>>(
            "select max(stream_seq) from events where stream_id = $1",
        )
        .bind(stream_id)
        .fetch_one(&mut **tx)
        .await?
        .unwrap_or(0);

        match expected {
            Expected::Any => {}
            Expected::NoStream if current != 0 => return Err(Error::VersionConflict),
            Expected::Exact(value) if value != current => return Err(Error::VersionConflict),
            _ => {}
        }

        let mut seq = current;
        let headers = opts
            .headers
            .clone()
            .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
        for event in events {
            seq += 1;
            let res = sqlx::query(
                r#"insert into events (stream_id, stream_seq, event_type, body, headers, causation_id, correlation_id)
                    values ($1, $2, $3, $4, $5, $6, $7)"#,
            )
            .bind(stream_id)
            .bind(seq)
            .bind(&event.typ)
            .bind(&event.body)
            .bind(&headers)
            .bind(opts.causation_id)
            .bind(opts.correlation_id)
            .execute(&mut **tx)
            .await;

            if let Err(e) = res {
                if let sqlx::Error::Database(db_err) = &e {
                    let msg = db_err.message().to_lowercase();
                    if msg.contains("events_idemp_key_uq")
                        || msg.contains("unique") && msg.contains("idempotency_key")
                    {
                        return Err(Error::IdempotencyConflict);
                    }
                }
                return Err(e.into());
            }
        }
        Ok(())
    }

    pub(crate) async fn append_with_tx(
        &self,
        tx: &mut Transaction<'_, Postgres>,
        stream_id: Uuid,
        expected: Expected,
        events: &[Event],
        opts: &AppendOptions,
    ) -> Result<()> {
        self.append_with_tx_internal(tx, stream_id, expected, events, opts)
            .await
    }

    pub fn builder(&self, stream_id: Uuid) -> AppendBuilder {
        AppendBuilder {
            events: Vec::new(),
            opts: AppendOptions::default(),
            expected: Expected::Any,
            stream_id,
            inner: self.clone(),
        }
    }
}

pub struct AppendBuilder {
    inner: Events,
    stream_id: Uuid,
    expected: Expected,
    opts: AppendOptions,
    events: Vec<Event>,
}

impl AppendBuilder {
    pub fn expected(mut self, expected: Expected) -> Self {
        self.expected = expected;
        self
    }

    pub fn headers(mut self, headers: serde_json::Value) -> Self {
        self.opts.headers = Some(headers);
        self
    }

    pub fn idempotency_key(mut self, key: impl Into<String>) -> Self {
        let mut h = self
            .opts
            .headers
            .take()
            .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
        if let Value::Object(ref mut m) = h {
            m.insert("idempotency_key".to_string(), Value::String(key.into()));
        }
        self.opts.headers = Some(h);
        self
    }

    pub fn causation_id(mut self, id: Uuid) -> Self {
        self.opts.causation_id = Some(id);
        self
    }

    pub fn correlation_id(mut self, id: Uuid) -> Self {
        self.opts.correlation_id = Some(id);
        self
    }

    pub fn push(mut self, event: Event) -> Self {
        self.events.push(event);
        self
    }

    pub fn extend<I: IntoIterator<Item = Event>>(mut self, iter: I) -> Self {
        self.events.extend(iter);
        self
    }

    pub async fn send(self) -> Result<()> {
        self.inner
            .append_with(self.stream_id, self.expected, self.events, &self.opts)
            .await
    }
}