cdk-sql-common 0.16.0-rc.0

Generic SQL storage backend for CDK
Documentation
use std::fmt::Debug;
use std::future::Future;
use std::time::Instant;

use cdk_common::database::Error;

use crate::database::DatabaseExecutor;
use crate::stmt::query;

const SLOW_QUERY_THRESHOLD_MS: u128 = 20;

/// Run a database operation and log slow operations, it also converts and logs any error with a
/// given info for more context. This function is expecting a synchronous database operation
#[inline(always)]
pub fn run_db_operation_sync<F, E, E1, T>(
    info: &str,
    operation: F,
    error_map: E,
) -> Result<T, Error>
where
    F: FnOnce() -> Result<T, E1>,
    E1: Debug,
    E: FnOnce(E1) -> Error,
{
    let start = Instant::now();

    tracing::trace!("Running db operation {}", info);

    let result = operation().map_err(|e| {
        tracing::error!("Query {} failed with error {:?}", info, e);
        error_map(e)
    });

    let duration = start.elapsed();
    if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
        tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
    }

    result
}

/// Run a database operation and log slow operations, it also converts and logs any error with a
/// given info for more context
#[inline(always)]
pub async fn run_db_operation<Fut, E, E1, T>(
    info: &str,
    operation: Fut,
    error_map: E,
) -> Result<T, Error>
where
    Fut: Future<Output = Result<T, E1>>,
    E1: Debug,
    E: FnOnce(E1) -> Error,
{
    let start = Instant::now();

    tracing::trace!("Running db operation {}", info);

    let result = operation.await.map_err(|e| {
        tracing::error!("Query {} failed with error {:?}", info, e);
        error_map(e)
    });

    let duration = start.elapsed();
    if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
        tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
    }

    result
}

/// Migrates the migration generated by `build.rs`
#[inline(always)]
pub async fn migrate<C>(
    conn: &C,
    db_prefix: &str,
    migrations: &[(&str, &str, &str)],
) -> Result<(), Error>
where
    C: DatabaseExecutor,
{
    query(
        r#"
           CREATE TABLE IF NOT EXISTS migrations (
               name TEXT PRIMARY KEY,
               applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
           )
           "#,
    )?
    .execute(conn)
    .await?;

    // Apply each migration if it hasn’t been applied yet
    for (prefix, name, sql) in migrations {
        if !prefix.is_empty() && *prefix != db_prefix {
            continue;
        }

        let is_missing = query("SELECT name FROM migrations WHERE name = :name")?
            .bind("name", name)
            .pluck(conn)
            .await?
            .is_none();

        if is_missing {
            query(sql)?.batch(conn).await?;
            query(r#"INSERT INTO migrations (name) VALUES (:name)"#)?
                .bind("name", name)
                .execute(conn)
                .await?;
        }
    }

    Ok(())
}