cdk_sql_common/
common.rs

1use std::fmt::Debug;
2use std::future::Future;
3use std::time::Instant;
4
5use cdk_common::database::Error;
6
7use crate::database::DatabaseExecutor;
8use crate::stmt::query;
9
10const SLOW_QUERY_THRESHOLD_MS: u128 = 20;
11
12/// Run a database operation and log slow operations, it also converts and logs any error with a
13/// given info for more context. This function is expecting a synchronous database operation
14#[inline(always)]
15pub fn run_db_operation_sync<F, E, E1, T>(
16    info: &str,
17    operation: F,
18    error_map: E,
19) -> Result<T, Error>
20where
21    F: FnOnce() -> Result<T, E1>,
22    E1: Debug,
23    E: FnOnce(E1) -> Error,
24{
25    let start = Instant::now();
26
27    tracing::trace!("Running db operation {}", info);
28
29    let result = operation().map_err(|e| {
30        tracing::error!("Query {} failed with error {:?}", info, e);
31        error_map(e)
32    });
33
34    let duration = start.elapsed();
35    if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
36        tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
37    }
38
39    result
40}
41
42/// Run a database operation and log slow operations, it also converts and logs any error with a
43/// given info for more context
44#[inline(always)]
45pub async fn run_db_operation<Fut, E, E1, T>(
46    info: &str,
47    operation: Fut,
48    error_map: E,
49) -> Result<T, Error>
50where
51    Fut: Future<Output = Result<T, E1>>,
52    E1: Debug,
53    E: FnOnce(E1) -> Error,
54{
55    let start = Instant::now();
56
57    tracing::trace!("Running db operation {}", info);
58
59    let result = operation.await.map_err(|e| {
60        tracing::error!("Query {} failed with error {:?}", info, e);
61        error_map(e)
62    });
63
64    let duration = start.elapsed();
65    if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
66        tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
67    }
68
69    result
70}
71
72/// Migrates the migration generated by `build.rs`
73#[inline(always)]
74pub async fn migrate<C>(
75    conn: &C,
76    db_prefix: &str,
77    migrations: &[(&str, &str, &str)],
78) -> Result<(), Error>
79where
80    C: DatabaseExecutor,
81{
82    query(
83        r#"
84           CREATE TABLE IF NOT EXISTS migrations (
85               name TEXT PRIMARY KEY,
86               applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
87           )
88           "#,
89    )?
90    .execute(conn)
91    .await?;
92
93    // Apply each migration if it hasn’t been applied yet
94    for (prefix, name, sql) in migrations {
95        if !prefix.is_empty() && *prefix != db_prefix {
96            continue;
97        }
98
99        let is_missing = query("SELECT name FROM migrations WHERE name = :name")?
100            .bind("name", name)
101            .pluck(conn)
102            .await?
103            .is_none();
104
105        if is_missing {
106            query(sql)?.batch(conn).await?;
107            query(r#"INSERT INTO migrations (name) VALUES (:name)"#)?
108                .bind("name", name)
109                .execute(conn)
110                .await?;
111        }
112    }
113
114    Ok(())
115}