eventdbx 1.1.1

An event-sourced, key-value, write-side database system.
Documentation
use std::{collections::BTreeMap, fs};

use rusqlite::types::Value;
use rusqlite::{Connection, params_from_iter};

use crate::{
    config::SqlitePluginConfig,
    error::{EventError, Result},
    schema::AggregateSchema,
    store::{AggregateState, EventRecord},
};

use super::{
    Plugin,
    util::{quote_identifier, sanitize_identifier},
};

pub(super) struct SqlitePlugin {
    config: SqlitePluginConfig,
}

impl SqlitePlugin {
    pub(super) fn new(config: SqlitePluginConfig) -> Self {
        Self { config }
    }

    fn ensure_database(&self) -> Result<Connection> {
        if let Some(parent) = self.config.path.parent() {
            if !parent.exists() {
                fs::create_dir_all(parent).map_err(|err| EventError::Storage(err.to_string()))?;
            }
        }

        Connection::open(&self.config.path).map_err(|err| EventError::Storage(err.to_string()))
    }

    fn ensure_table(
        conn: &Connection,
        aggregate: &str,
        schema: Option<&AggregateSchema>,
        state: &BTreeMap<String, String>,
    ) -> Result<(String, Vec<(String, String)>)> {
        let table_name = sanitize_identifier(aggregate);
        let quoted_table = quote_identifier(&table_name);

        let create_sql = format!(
            "CREATE TABLE IF NOT EXISTS {table} (
                id TEXT PRIMARY KEY,
                version INTEGER NOT NULL,
                created_at TEXT NOT NULL,
                created_by TEXT,
                updated_at TEXT NOT NULL,
                updated_by TEXT
            )",
            table = quoted_table
        );
        conn.execute(&create_sql, [])
            .map_err(|err| EventError::Storage(err.to_string()))?;

        for (column, ty) in [
            ("created_at", "TEXT"),
            ("created_by", "TEXT"),
            ("updated_at", "TEXT"),
            ("updated_by", "TEXT"),
        ] {
            let alter_sql = format!(
                "ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {column} {ty}",
                table = quoted_table,
                column = quote_identifier(column),
                ty = ty
            );
            conn.execute(&alter_sql, [])
                .map_err(|err| EventError::Storage(err.to_string()))?;
        }

        let mut fields = BTreeMap::new();
        if let Some(schema) = schema {
            for event_schema in schema.events.values() {
                for field in &event_schema.fields {
                    fields.insert(field.clone(), ());
                }
            }
        }
        for key in state.keys() {
            fields.insert(key.clone(), ());
        }

        let mut column_names = Vec::new();
        for field in fields.keys() {
            let column = sanitize_identifier(field);
            let quoted_column = quote_identifier(&column);
            let alter_sql = format!(
                "ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {column} TEXT",
                table = quoted_table,
                column = quoted_column
            );
            conn.execute(&alter_sql, [])
                .map_err(|err| EventError::Storage(err.to_string()))?;
            column_names.push((column, field.clone()));
        }

        Ok((quoted_table, column_names))
    }
}

impl Plugin for SqlitePlugin {
    fn name(&self) -> &'static str {
        "sqlite"
    }

    fn notify_event(
        &self,
        record: &EventRecord,
        state: &AggregateState,
        schema: Option<&AggregateSchema>,
    ) -> Result<()> {
        let conn = self.ensure_database()?;
        let (table, field_columns) =
            Self::ensure_table(&conn, &record.aggregate_type, schema, &state.state)?;

        let created_at = record.metadata.created_at.to_rfc3339();
        let updated_at = record.metadata.created_at.to_rfc3339();
        let actor_id = record
            .metadata
            .issued_by
            .as_ref()
            .map(|claims| claims.user.clone());
        let actor_value = actor_id.clone().map(Value::from).unwrap_or(Value::Null);

        let mut columns = vec![
            "id".to_string(),
            "version".to_string(),
            "created_at".to_string(),
            "created_by".to_string(),
            "updated_at".to_string(),
            "updated_by".to_string(),
        ];

        let mut params = vec![
            Value::from(state.aggregate_id.clone()),
            Value::from(state.version as i64),
            Value::from(created_at),
            actor_value.clone(),
            Value::from(updated_at),
            actor_value,
        ];

        for (column, original) in &field_columns {
            columns.push(column.clone());
            let value = state
                .state
                .get(original)
                .cloned()
                .map(Value::from)
                .unwrap_or(Value::Null);
            params.push(value);
        }

        let quoted_columns: Vec<String> = columns
            .iter()
            .map(|column| quote_identifier(column))
            .collect();
        let placeholders: Vec<String> =
            (1..=columns.len()).map(|idx| format!("?{}", idx)).collect();
        let assignments: Vec<String> = columns
            .iter()
            .zip(quoted_columns.iter())
            .filter_map(|(name, quoted)| match name.as_str() {
                "id" | "created_at" | "created_by" => None,
                _ => Some(format!("{quoted} = excluded.{name}")),
            })
            .collect();

        let insert_sql = format!(
            "INSERT INTO {table} ({columns}) VALUES ({placeholders}) \
             ON CONFLICT(id) DO UPDATE SET {assignments}",
            table = table,
            columns = quoted_columns.join(", "),
            placeholders = placeholders.join(", "),
            assignments = assignments.join(", ")
        );

        conn.execute(&insert_sql, params_from_iter(params))
            .map_err(|err| EventError::Storage(err.to_string()))?;

        Ok(())
    }
}