1use std::fmt::Debug;
2use std::future::Future;
3use std::time::Instant;
4
5use cdk_common::database::Error;
6
7use crate::database::DatabaseExecutor;
8use crate::stmt::query;
9
10const SLOW_QUERY_THRESHOLD_MS: u128 = 20;
11
12#[inline(always)]
15pub fn run_db_operation_sync<F, E, E1, T>(
16 info: &str,
17 operation: F,
18 error_map: E,
19) -> Result<T, Error>
20where
21 F: FnOnce() -> Result<T, E1>,
22 E1: Debug,
23 E: FnOnce(E1) -> Error,
24{
25 let start = Instant::now();
26
27 tracing::trace!("Running db operation {}", info);
28
29 let result = operation().map_err(|e| {
30 tracing::error!("Query {} failed with error {:?}", info, e);
31 error_map(e)
32 });
33
34 let duration = start.elapsed();
35 if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
36 tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
37 }
38
39 result
40}
41
42#[inline(always)]
45pub async fn run_db_operation<Fut, E, E1, T>(
46 info: &str,
47 operation: Fut,
48 error_map: E,
49) -> Result<T, Error>
50where
51 Fut: Future<Output = Result<T, E1>>,
52 E1: Debug,
53 E: FnOnce(E1) -> Error,
54{
55 let start = Instant::now();
56
57 tracing::trace!("Running db operation {}", info);
58
59 let result = operation.await.map_err(|e| {
60 tracing::error!("Query {} failed with error {:?}", info, e);
61 error_map(e)
62 });
63
64 let duration = start.elapsed();
65 if duration.as_millis() > SLOW_QUERY_THRESHOLD_MS {
66 tracing::warn!("[SLOW QUERY] Took {} ms: {}", duration.as_millis(), info);
67 }
68
69 result
70}
71
72#[inline(always)]
74pub async fn migrate<C>(
75 conn: &C,
76 db_prefix: &str,
77 migrations: &[(&str, &str, &str)],
78) -> Result<(), Error>
79where
80 C: DatabaseExecutor,
81{
82 query(
83 r#"
84 CREATE TABLE IF NOT EXISTS migrations (
85 name TEXT PRIMARY KEY,
86 applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
87 )
88 "#,
89 )?
90 .execute(conn)
91 .await?;
92
93 for (prefix, name, sql) in migrations {
95 if !prefix.is_empty() && *prefix != db_prefix {
96 continue;
97 }
98
99 let is_missing = query("SELECT name FROM migrations WHERE name = :name")?
100 .bind("name", name)
101 .pluck(conn)
102 .await?
103 .is_none();
104
105 if is_missing {
106 query(sql)?.batch(conn).await?;
107 query(r#"INSERT INTO migrations (name) VALUES (:name)"#)?
108 .bind("name", name)
109 .execute(conn)
110 .await?;
111 }
112 }
113
114 Ok(())
115}