factstr-postgres 0.5.1

PostgreSQL store for FACTSTR with append, query, streams, and durable streams.
Documentation
#![allow(dead_code)]

use std::env;
use std::sync::atomic::{AtomicU64, Ordering};

use factstr_postgres::{PostgresBootstrapOptions, PostgresStore};
use sqlx::{Connection, Executor, PgConnection, Row};
use time::OffsetDateTime;
use time::macros::format_description;
use url::Url;

static NEXT_SCHEMA_ID: AtomicU64 = AtomicU64::new(1);

#[allow(dead_code)]
pub(crate) fn run_store_test<TestFn>(test: TestFn)
where
    TestFn: FnOnce(Box<dyn Fn() -> PostgresStore>),
{
    let temporary_schema = TemporarySchema::new();
    let store_database_url = temporary_schema.database_url().to_owned();
    test(Box::new(move || {
        PostgresStore::connect(&store_database_url).expect("postgres store should connect")
    }));
}

pub(crate) struct TemporarySchema {
    base_database_url: String,
    schema_name: String,
    store_database_url: String,
}

pub(crate) struct TemporaryDatabase {
    server_url: String,
    database_name: String,
    database_url: String,
}

impl TemporarySchema {
    pub(crate) fn new() -> Self {
        create_temporary_schema()
    }

    pub(crate) fn database_url(&self) -> &str {
        &self.store_database_url
    }

    pub(crate) fn create_store(&self) -> PostgresStore {
        PostgresStore::connect(&self.store_database_url).expect("postgres store should connect")
    }
}

impl Drop for TemporarySchema {
    fn drop(&mut self) {
        admin_runtime().block_on(async {
            let Ok(mut connection) = PgConnection::connect(&self.base_database_url).await else {
                return;
            };

            let drop_statement = format!("DROP SCHEMA IF EXISTS \"{}\" CASCADE", self.schema_name);
            let _ = connection.execute(drop_statement.as_str()).await;
        });
    }
}

impl TemporaryDatabase {
    pub(crate) fn new() -> Self {
        create_temporary_database()
    }

    pub(crate) fn database_url(&self) -> &str {
        &self.database_url
    }

    pub(crate) fn bootstrap_options(&self) -> PostgresBootstrapOptions {
        PostgresBootstrapOptions {
            server_url: self.server_url.clone(),
            database_name: self.database_name.clone(),
        }
    }

    pub(crate) fn server_url(&self) -> &str {
        &self.server_url
    }
}

impl Drop for TemporaryDatabase {
    fn drop(&mut self) {
        admin_runtime().block_on(async {
            let Ok(mut connection) = PgConnection::connect(&self.server_url).await else {
                return;
            };

            let drop_statement = format!(
                "DROP DATABASE IF EXISTS {}",
                quote_postgres_identifier(&self.database_name)
            );
            let _ = connection.execute(drop_statement.as_str()).await;
        });
    }
}

#[allow(dead_code)]
pub(crate) fn durable_stream_cursor(
    store_database_url: &str,
    durable_stream_id: &str,
) -> (String, u64) {
    admin_runtime().block_on(async {
        let mut connection = PgConnection::connect(store_database_url)
            .await
            .expect("postgres test connection should succeed");

        let row = sqlx::query(
            "SELECT event_query, last_processed_sequence_number
             FROM durable_stream_cursors
             WHERE durable_stream_id = $1",
        )
        .bind(durable_stream_id)
        .fetch_one(&mut connection)
        .await
        .expect("durable stream cursor should exist");

        (
            row.get::<String, _>("event_query"),
            row.get::<i64, _>("last_processed_sequence_number") as u64,
        )
    })
}

#[allow(dead_code)]
pub(crate) fn clear_append_batches(store_database_url: &str) {
    admin_runtime().block_on(async {
        let mut connection = PgConnection::connect(store_database_url)
            .await
            .expect("postgres test connection should succeed");

        connection
            .execute("DELETE FROM append_batches")
            .await
            .expect("append_batches should be cleared");
    });
}

#[allow(dead_code)]
pub(crate) fn insert_append_batch(
    store_database_url: &str,
    first_sequence_number: u64,
    last_sequence_number: u64,
) {
    admin_runtime().block_on(async {
        let mut connection = PgConnection::connect(store_database_url)
            .await
            .expect("postgres test connection should succeed");

        sqlx::query(
            "INSERT INTO append_batches (first_sequence_number, last_sequence_number)
             VALUES ($1, $2)",
        )
        .bind(first_sequence_number as i64)
        .bind(last_sequence_number as i64)
        .execute(&mut connection)
        .await
        .expect("append batch should be inserted");
    });
}

#[allow(dead_code)]
pub(crate) fn append_batch_rows(store_database_url: &str) -> Vec<(u64, u64)> {
    admin_runtime().block_on(async {
        let mut connection = PgConnection::connect(store_database_url)
            .await
            .expect("postgres test connection should succeed");

        sqlx::query(
            "SELECT first_sequence_number, last_sequence_number
             FROM append_batches
             ORDER BY first_sequence_number ASC",
        )
        .fetch_all(&mut connection)
        .await
        .expect("append batch lookup should succeed")
        .into_iter()
        .map(|row| {
            (
                row.get::<i64, _>("first_sequence_number") as u64,
                row.get::<i64, _>("last_sequence_number") as u64,
            )
        })
        .collect()
    })
}

#[allow(dead_code)]
pub(crate) fn metadata_value(store_database_url: &str, key: &str) -> Option<String> {
    admin_runtime().block_on(async {
        let mut connection = PgConnection::connect(store_database_url)
            .await
            .expect("postgres test connection should succeed");

        sqlx::query_scalar::<_, String>("SELECT value FROM store_metadata WHERE key = $1")
            .bind(key)
            .fetch_optional(&mut connection)
            .await
            .expect("store metadata lookup should succeed")
    })
}

#[allow(dead_code)]
pub(crate) fn delete_metadata_key(store_database_url: &str, key: &str) {
    admin_runtime().block_on(async {
        let mut connection = PgConnection::connect(store_database_url)
            .await
            .expect("postgres test connection should succeed");

        sqlx::query("DELETE FROM store_metadata WHERE key = $1")
            .bind(key)
            .execute(&mut connection)
            .await
            .expect("store metadata delete should succeed");
    });
}

fn database_url() -> String {
    env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set to run factstr-postgres integration tests")
}

fn unique_schema_name() -> String {
    let timestamp = OffsetDateTime::now_utc()
        .format(format_description!(
            "[year]_[month]_[day]_[hour][minute][second]_[subsecond digits:6]_utc"
        ))
        .expect("timestamp should format");
    let next_id = NEXT_SCHEMA_ID.fetch_add(1, Ordering::Relaxed);

    format!("factstr_test_{timestamp}_{next_id}")
}

fn create_temporary_schema() -> TemporarySchema {
    let base_database_url = database_url();
    let schema_name = unique_schema_name();
    admin_runtime().block_on(async {
        let mut connection = PgConnection::connect(&base_database_url)
            .await
            .expect("postgres test connection should succeed");

        connection
            .execute(format!("CREATE SCHEMA \"{schema_name}\"").as_str())
            .await
            .expect("test schema should be created");
    });

    TemporarySchema {
        store_database_url: schema_database_url(&base_database_url, &schema_name),
        base_database_url,
        schema_name,
    }
}

fn admin_runtime() -> tokio::runtime::Runtime {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("tokio runtime should build")
}

fn schema_database_url(base_database_url: &str, schema_name: &str) -> String {
    let mut url = Url::parse(base_database_url).expect("DATABASE_URL should be a valid URL");
    url.query_pairs_mut()
        .append_pair("options", &format!("--search_path={schema_name}"));
    url.into()
}

fn create_temporary_database() -> TemporaryDatabase {
    let base_database_url = database_url();
    let database_name = unique_database_name();

    TemporaryDatabase {
        server_url: postgres_admin_url(&base_database_url),
        database_url: database_url_with_name(&base_database_url, &database_name),
        database_name,
    }
}

fn unique_database_name() -> String {
    let timestamp = OffsetDateTime::now_utc()
        .format(format_description!(
            "[year]_[month]_[day]_[hour][minute][second]_[subsecond digits:6]_utc"
        ))
        .expect("timestamp should format");
    let next_id = NEXT_SCHEMA_ID.fetch_add(1, Ordering::Relaxed);

    format!("factstr_bootstrap_{timestamp}_{next_id}")
}

fn postgres_admin_url(base_database_url: &str) -> String {
    database_url_with_name(base_database_url, "postgres")
}

fn database_url_with_name(base_database_url: &str, database_name: &str) -> String {
    let mut url = Url::parse(base_database_url).expect("DATABASE_URL should be a valid URL");
    url.set_path(&format!("/{database_name}"));
    url.into()
}

pub(crate) fn database_url_with_query_parameter(
    base_database_url: &str,
    key: &str,
    value: &str,
) -> String {
    let mut url = Url::parse(base_database_url).expect("DATABASE_URL should be a valid URL");
    url.query_pairs_mut().append_pair(key, value);
    url.into()
}

fn quote_postgres_identifier(identifier: &str) -> String {
    assert!(!identifier.is_empty(), "identifier should not be empty");
    assert!(
        !identifier.contains('\0'),
        "identifier should not contain null bytes"
    );

    format!("\"{}\"", identifier.replace('"', "\"\""))
}