factstr-sqlite 0.5.1

Embedded SQLite store for FACTSTR with append, query, streams, and durable streams.
Documentation
use sqlx::SqlitePool;

pub(crate) const STORE_FORMAT_VERSION: &str = "1";
pub(crate) const APPEND_BATCH_BOUNDARY_FORMAT_KEY: &str = "append_batch_boundary_format";
pub(crate) const APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1: &str = "sparse_v1";

pub(crate) async fn initialize_schema(pool: &SqlitePool) -> Result<(), sqlx::Error> {
    let is_new_database = sqlx::query_scalar::<_, i64>(
        "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'events'",
    )
    .fetch_one(pool)
    .await?
        == 0;

    sqlx::query(
        "CREATE TABLE IF NOT EXISTS events (
            sequence_number INTEGER PRIMARY KEY,
            occurred_at TEXT NOT NULL,
            event_type TEXT NOT NULL,
            payload TEXT NOT NULL
        )",
    )
    .execute(pool)
    .await?;

    sqlx::query(
        "CREATE TABLE IF NOT EXISTS append_batches (
            first_sequence_number INTEGER PRIMARY KEY,
            last_sequence_number INTEGER NOT NULL
        )",
    )
    .execute(pool)
    .await?;

    sqlx::query(
        "CREATE TABLE IF NOT EXISTS store_metadata (
            key TEXT PRIMARY KEY,
            value TEXT NOT NULL
        )",
    )
    .execute(pool)
    .await?;

    sqlx::query(
        "CREATE TABLE IF NOT EXISTS subscriber_cursors (
            subscriber_id TEXT PRIMARY KEY,
            event_query TEXT NOT NULL,
            last_processed_sequence_number INTEGER NOT NULL
        )",
    )
    .execute(pool)
    .await?;

    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type)")
        .execute(pool)
        .await?;

    sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON events(occurred_at)")
        .execute(pool)
        .await?;

    sqlx::query(
        "INSERT INTO store_metadata (key, value)
         VALUES ('store_format_version', ?1)
         ON CONFLICT(key) DO NOTHING",
    )
    .bind(STORE_FORMAT_VERSION)
    .execute(pool)
    .await?;

    if is_new_database {
        sqlx::query(
            "INSERT INTO store_metadata (key, value)
             VALUES (?1, ?2)",
        )
        .bind(APPEND_BATCH_BOUNDARY_FORMAT_KEY)
        .bind(APPEND_BATCH_BOUNDARY_FORMAT_SPARSE_V1)
        .execute(pool)
        .await?;
    }

    Ok(())
}