alun-db 0.1.0

Alun database: Row-based CRUD, transactions, hooks, SQL templates, migrations
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
/// Db 门面 —— 统一数据库访问入口
///
/// 所有操作通过 Db 统一入口,内部按数据库类型分发。

use sqlx::{PgPool, MySqlPool, SqlitePool, AnyPool, Column, Row as SqlxRow};
use crate::{Row, DbResult, DbError, IdKind};
use alun_core::PageQuery;
use serde_json::{Value, Number};

/// 抽象数据库连接池 —— 统一封装四种后端连接池
///
/// 通过 `factory::create_db()` 根据配置自动选择对应的变体创建。
#[derive(Clone)]
pub enum DbPool {
    /// PostgreSQL 连接池(`sqlx::PgPool`)
    Postgres(PgPool),
    /// MySQL 连接池(`sqlx::MySqlPool`)
    Mysql(MySqlPool),
    /// SQLite 连接池(`sqlx::SqlitePool`)
    Sqlite(SqlitePool),
    /// 运行时确定数据库类型的通用连接池(`sqlx::AnyPool`)
    Any(AnyPool),
}

/// Db 门面 —— 配置驱动的数据库访问入口
///
/// 封装 PostgreSQL/MySQL/SQLite 三种后端,提供统一的 CRUD 接口。
/// 通过 `factory::create_db()` 从 `DatabaseConfig` 自动创建并连接测试。
///
/// # 示例
///
/// ```ignore
/// let db = create_db(&config.database).await?;
/// let user = db.find_by_id("user", 1).await?;
/// ```
#[derive(Clone)]
pub struct Db {
    /// 数据库连接池
    pool: DbPool,
}

// ── 每个数据库类型的查询/写入实现宏 ──

/// 为指定数据库类型生成后端查询/写入函数,包含列级 Row 转换
macro_rules! impl_db_ops {
    ($pool_ty:ty, $db_mod:ident) => {
        paste::paste! {
            fn [<typed_row_to_row_ $db_mod:snake>](
                row: &<sqlx::$db_mod as sqlx::Database>::Row
            ) -> Row {
                let mut r = Row::default();
                for col in row.columns() {
                    let name = col.name().to_string();
                    let idx: usize = col.ordinal();
                    if let Ok(v) = row.try_get::<i64, usize>(idx) {
                        r.data.insert(name, Value::Number(v.into()));
                    } else if let Ok(v) = row.try_get::<i32, usize>(idx) {
                        r.data.insert(name, Value::Number((v as i64).into()));
                    } else if let Ok(v) = row.try_get::<i16, usize>(idx) {
                        r.data.insert(name, Value::Number((v as i64).into()));
                    } else if let Ok(v) = row.try_get::<String, usize>(idx) {
                        r.data.insert(name, Value::String(v));
                    } else if let Ok(v) = row.try_get::<sqlx::types::Uuid, usize>(idx) {
                        r.data.insert(name, Value::String(v.to_string()));
                    } else if let Ok(v) = row.try_get::<f64, usize>(idx) {
                        if let Some(n) = Number::from_f64(v) {
                            r.data.insert(name, Value::Number(n));
                        }
                    } else if let Ok(v) = row.try_get::<bool, usize>(idx) {
                        r.data.insert(name, Value::Bool(v));
                    }
                }
                r.mark_all_changed();
                r
            }

            async fn [<query_one_ $pool_ty:snake>](
                pool: &$pool_ty, sql: &str, params: &[&str],
            ) -> DbResult<Option<Row>> {
                let mut q = sqlx::query::<sqlx::$db_mod>(sql);
                for p in params { q = q.bind(*p); }
                Ok(q.fetch_optional(pool).await?.as_ref()
                    .map([<typed_row_to_row_ $db_mod:snake>]))
            }

            async fn [<query_all_ $pool_ty:snake>](
                pool: &$pool_ty, sql: &str, params: &[&str],
            ) -> DbResult<Vec<Row>> {
                let mut q = sqlx::query::<sqlx::$db_mod>(sql);
                for p in params { q = q.bind(*p); }
                let rows = q.fetch_all(pool).await?;
                Ok(rows.iter().map([<typed_row_to_row_ $db_mod:snake>]).collect())
            }

            async fn [<count_ $pool_ty:snake>](
                pool: &$pool_ty, sql: &str, params: &[&str],
            ) -> DbResult<u64> {
                let mut q = sqlx::query_scalar::<sqlx::$db_mod, i64>(sql);
                for p in params { q = q.bind(*p); }
                Ok(q.fetch_optional(pool).await?.unwrap_or(0) as u64)
            }

            async fn [<execute_ $pool_ty:snake>](
                pool: &$pool_ty, sql: &str, params: &[&str],
            ) -> DbResult<u64> {
                let mut q = sqlx::query::<sqlx::$db_mod>(sql);
                for p in params { q = q.bind(*p); }
                q.execute(pool).await.map_err(DbError::from).map(|r| r.rows_affected())
            }
        }
    };
}

impl_db_ops!(PgPool, Postgres);
impl_db_ops!(MySqlPool, MySql);
impl_db_ops!(SqlitePool, Sqlite);

async fn query_one_any(pool: &AnyPool, sql: &str, params: &[&str]) -> DbResult<Option<Row>> {
    let mut q = sqlx::query(sql);
    for p in params { q = q.bind(*p); }
    Ok(q.fetch_optional(pool).await?.as_ref().map(typed_row_to_row_any))
}

async fn query_all_any(pool: &AnyPool, sql: &str, params: &[&str]) -> DbResult<Vec<Row>> {
    let mut q = sqlx::query(sql);
    for p in params { q = q.bind(*p); }
    let rows = q.fetch_all(pool).await?;
    Ok(rows.iter().map(typed_row_to_row_any).collect())
}

fn typed_row_to_row_any(row: &sqlx::any::AnyRow) -> Row {
    let mut r = Row::default();
    for col in row.columns() {
        let name = col.name().to_string();
        let idx: usize = col.ordinal();
        if let Ok(v) = row.try_get::<i64, usize>(idx) {
            r.data.insert(name, Value::Number(v.into()));
        } else if let Ok(v) = row.try_get::<i32, usize>(idx) {
            r.data.insert(name, Value::Number((v as i64).into()));
        } else if let Ok(v) = row.try_get::<String, usize>(idx) {
            r.data.insert(name, Value::String(v));
        } else if let Ok(v) = row.try_get::<f64, usize>(idx) {
            if let Some(n) = Number::from_f64(v) {
                r.data.insert(name, Value::Number(n));
            }
        } else if let Ok(v) = row.try_get::<bool, usize>(idx) {
            r.data.insert(name, Value::Bool(v));
        }
    }
    r.mark_all_changed();
    r
}

async fn count_any(pool: &AnyPool, sql: &str, params: &[&str]) -> DbResult<u64> {
    let mut q = sqlx::query_scalar::<sqlx::Any, i64>(sql);
    for p in params { q = q.bind(*p); }
    Ok(q.fetch_optional(pool).await?.unwrap_or(0) as u64)
}

async fn execute_any(pool: &AnyPool, sql: &str, params: &[&str]) -> DbResult<u64> {
    let mut q = sqlx::query(sql);
    for p in params { q = q.bind(*p); }
    Ok(q.execute(pool).await.map_err(DbError::from)?.rows_affected())
}

impl Db {
    /// 从 PostgreSQL 连接池创建 Db 实例
    pub fn postgres(pool: PgPool) -> Self { Self { pool: DbPool::Postgres(pool) } }
    /// 从 MySQL 连接池创建 Db 实例
    pub fn mysql(pool: MySqlPool) -> Self { Self { pool: DbPool::Mysql(pool) } }
    /// 从 SQLite 连接池创建 Db 实例
    pub fn sqlite(pool: SqlitePool) -> Self { Self { pool: DbPool::Sqlite(pool) } }

    /// 获取 PostgreSQL 连接池引用(非 PG 则 panic)
    pub fn pg_pool(&self) -> &PgPool { match &self.pool { DbPool::Postgres(p) => p, _ => panic!("不是 PG"), } }
    /// 获取 MySQL 连接池引用(非 MySQL 则 panic)
    pub fn mysql_pool(&self) -> &MySqlPool { match &self.pool { DbPool::Mysql(p) => p, _ => panic!("不是 MySQL"), } }
    /// 获取 SQLite 连接池引用(非 SQLite 则 panic)
    pub fn sqlite_pool(&self) -> &SqlitePool { match &self.pool { DbPool::Sqlite(p) => p, _ => panic!("不是 SQLite"), } }

    // ── 查询 ─────────────────────────────────────────

    /// 按主键 ID 查询单条记录(主键默认 `id`)
    ///
    /// 自动识别 ID 类型(UUID / 整数 / 字符串),添加正确的 SQL 类型转换后缀。
    ///
    /// # 参数
    ///
    /// - `table`: 表名
    /// - `id`: 主键值(支持 `i64`、`&str`、`Uuid` 等实现了 `Into<Value>` 的类型)
    ///
    /// # 返回值
    ///
    /// - `Ok(Some(row))`: 记录存在
    /// - `Ok(None)`: 记录不存在
    /// - `Err(_)`: 数据库错误
    pub async fn find_by_id(&self, table: &str, id: impl Into<serde_json::Value>) -> DbResult<Option<Row>> {
        let value: serde_json::Value = id.into();
        let pk = "id";
        let id_str = value_to_string(&value);
        let sql = format!("SELECT * FROM {} WHERE {}=$1{}", table, pk, id_cast(&value));
        let params = vec![id_str.as_str()];
        self.query_one(&sql, &params).await
    }

    /// 执行原始 SQL 查询(单条),返回 `Option<Row>`
    ///
    /// 参数使用 `$1`、`$2` 占位符,按顺序绑定。
    pub async fn query_one(&self, sql: &str, params: &[&str]) -> DbResult<Option<Row>> {
        match &self.pool {
            DbPool::Postgres(pool) => query_one_pg_pool(pool, sql, params).await,
            DbPool::Mysql(pool)    => query_one_my_sql_pool(pool, sql, params).await,
            DbPool::Sqlite(pool)   => query_one_sqlite_pool(pool, sql, params).await,
            DbPool::Any(pool)      => query_one_any(pool, sql, params).await,
        }
    }

    /// 执行原始 SQL 查询(多条),返回 `Vec<Row>`
    pub async fn query(&self, sql: &str, params: &[&str]) -> DbResult<Vec<Row>> {
        match &self.pool {
            DbPool::Postgres(pool) => query_all_pg_pool(pool, sql, params).await,
            DbPool::Mysql(pool)    => query_all_my_sql_pool(pool, sql, params).await,
            DbPool::Sqlite(pool)   => query_all_sqlite_pool(pool, sql, params).await,
            DbPool::Any(pool)      => query_all_any(pool, sql, params).await,
        }
    }

    /// 分页查询:自动包裹 COUNT 和 LIMIT/OFFSET
    ///
    /// 返回 `(数据列表, 总条数)`。传入的 SQL 应为无 LIMIT/OFFSET 的完整查询。
    pub async fn query_page(&self, sql: &str, params: &[&str], page: &PageQuery) -> DbResult<(Vec<Row>, u64)> {
        let count_sql = format!("SELECT COUNT(*) as cnt FROM ({}) AS _count_sub", sql);
        let total = self.count(&count_sql, params).await?;
        let page_sql = format!("{} LIMIT {} OFFSET {}", sql, page.limit(), page.offset());
        let rows = self.query(&page_sql, params).await?;
        Ok((rows, total))
    }

    /// 执行 COUNT 查询,返回行数(自动转换为 u64)
    pub async fn count(&self, sql: &str, params: &[&str]) -> DbResult<u64> {
        match &self.pool {
            DbPool::Postgres(pool) => count_pg_pool(pool, sql, params).await,
            DbPool::Mysql(pool)    => count_my_sql_pool(pool, sql, params).await,
            DbPool::Sqlite(pool)   => count_sqlite_pool(pool, sql, params).await,
            DbPool::Any(pool)      => count_any(pool, sql, params).await,
        }
    }

    // ── 增删改 ───────────────────────────────────────

    /// 插入单条记录(Row 需设置 `table` 和字段值)
    ///
    /// PostgreSQL 使用 `RETURNING *` 直接返回插入后的完整行;
    /// MySQL/SQLite 插入后通过主键回查。
    ///
    /// # 错误
    ///
    /// - Row 缺少表名 → `Argument` 错误
    /// - 没有变更字段 → `Argument` 错误
    pub async fn insert(&self, row: &Row) -> DbResult<Row> {
        let table = row.table.as_deref().ok_or(DbError::Argument("Row 缺少表名".into()))?;
        let columns: Vec<&String> = row.changes.iter().collect();
        if columns.is_empty() { return Err(DbError::Argument("没有变更的字段".into())); }

        let placeholders: Vec<String> = columns.iter().enumerate().map(|(i, c)| {
            let cast = row.data.get(*c).map(|v| value_cast(v)).unwrap_or("");
            format!("${}{}", i + 1, cast)
        }).collect();
        let col_str = columns.iter().map(|c| c.as_str()).collect::<Vec<_>>().join(", ");
        let values: Vec<String> = columns.iter()
            .filter_map(|c| row.data.get(*c)).map(value_to_string).collect();
        let val_refs: Vec<&str> = values.iter().map(|s| s.as_str()).collect();

        if matches!(&self.pool, DbPool::Postgres(_)) {
            let sql = format!("INSERT INTO {} ({}) VALUES ({}) RETURNING *", table, col_str, placeholders.join(", "));
            self.query_one(&sql, &val_refs).await?.ok_or_else(|| DbError::Other("INSERT 返回空".into()))
        } else {
            let sql = format!("INSERT INTO {} ({}) VALUES ({})", table, col_str, placeholders.join(", "));
            self.execute(&sql, &val_refs).await?;
            let pk_val = row.data.get("id");
            match pk_val {
                Some(v) => self.find_by_id(table, v.clone()).await?.ok_or(DbError::Other("INSERT 后查不到".into())),
                None => Err(DbError::Argument("非 PG 数据库需 Row 含主键".into())),
            }
        }
    }

    /// 批量插入记录,返回受影响行数
    ///
    /// 所有 Row 必须来自同一张表且变更字段一致。
    pub async fn batch_insert(&self, rows: &[Row]) -> DbResult<u64> {
        if rows.is_empty() { return Ok(0); }
        let table = rows[0].table.as_deref().ok_or(DbError::Argument("Row 缺少表名".into()))?;
        let columns: Vec<&String> = rows[0].changes.iter().collect();
        if columns.is_empty() { return Err(DbError::Argument("没有变更的字段".into())); }

        let col_names = columns.iter().map(|c| c.as_str()).collect::<Vec<_>>().join(", ");
        let mut all_params: Vec<String> = Vec::new();
        let mut groups: Vec<String> = Vec::new();
        for (ri, row) in rows.iter().enumerate() {
            let offset = ri * columns.len();
            let ph: Vec<String> = columns.iter().enumerate().map(|(ci, c)| {
                let cast = row.data.get(*c).map(|v| value_cast(v)).unwrap_or("");
                format!("${}{}", offset + ci + 1, cast)
            }).collect();
            groups.push(format!("({})", ph.join(", ")));
            for c in &columns {
                all_params.push(row.data.get(*c).map(value_to_string).unwrap_or_default());
            }
        }
        let sql = format!("INSERT INTO {} ({}) VALUES {}", table, col_names, groups.join(", "));
        let val_refs: Vec<&str> = all_params.iter().map(|s| s.as_str()).collect();
        self.execute(&sql, &val_refs).await
    }

    /// 更新单条记录(根据 Row 的 `changes` 和主键)
    ///
    /// 仅更新 `changes` 中标记的字段,主键必须存在于 `data` 中。
    /// PostgreSQL 使用 `RETURNING *` 返回更新后的行。
    pub async fn update(&self, row: &Row) -> DbResult<Option<Row>> {
        let table = row.table.as_deref().ok_or(DbError::Argument("Row 缺少表名".into()))?;
        let sets: Vec<String> = row.changes.iter().enumerate()
            .map(|(i, col)| {
                let cast = row.data.get(col).map(|v| value_cast(v)).unwrap_or("");
                format!("{} = ${}{}", col, i + 1, cast)
            }).collect();
        let pk = row.primary_keys.first().map(|s| s.as_str()).unwrap_or("id");
        let id_value = row.data.get(pk).ok_or(DbError::Argument("Row 缺少主键".into()))?;

        let mut params: Vec<String> = row.changes.iter()
            .filter_map(|c| row.data.get(c)).map(value_to_string).collect();
        params.push(value_to_string(id_value));
        let val_refs: Vec<&str> = params.iter().map(|s| s.as_str()).collect();

        let id_cast_sql = id_cast(id_value);
        if matches!(&self.pool, DbPool::Postgres(_)) {
            let sql = format!("UPDATE {} SET {} WHERE {}=${}{} RETURNING *",
                table, sets.join(", "), pk, row.changes.len() + 1, id_cast_sql);
            self.query_one(&sql, &val_refs).await
        } else {
            let sql = format!("UPDATE {} SET {} WHERE {}=${}{}",
                table, sets.join(", "), pk, row.changes.len() + 1, id_cast_sql);
            let n = self.execute(&sql, &val_refs).await?;
            if n > 0 { self.find_by_id(table, id_value.clone()).await } else { Ok(None) }
        }
    }

    /// 批量更新(按 WHERE 条件),返回受影响行数
    ///
    /// - `sets`: 要更新的字段值(Row 含 changes)
    /// - `where_sql`: WHERE 子句(不含 `WHERE` 关键字),参数使用 `$1` 占位符
    /// - `where_params`: WHERE 子句的参数值
    pub async fn batch_update(&self, table: &str, sets: &Row, where_sql: &str, where_params: &[&str]) -> DbResult<u64> {
        if sets.changes.is_empty() { return Err(DbError::Argument("没有要更新的字段".into())); }
        let set_clauses: Vec<String> = sets.changes.iter().enumerate()
            .map(|(i, col)| {
                let cast = sets.data.get(col).map(|v| value_cast(v)).unwrap_or("");
                format!("{} = ${}{}", col, i + 1, cast)
            }).collect();
        let set_values: Vec<String> = sets.changes.iter()
            .filter_map(|c| sets.data.get(c)).map(value_to_string).collect();

        let offset = sets.changes.len();
        let adjusted_where = adjust_param_indices_with_casts(where_sql, offset, where_params);
        let sql = format!("UPDATE {} SET {} WHERE {}", table, set_clauses.join(", "), adjusted_where);
        let mut all: Vec<String> = set_values;
        all.extend(where_params.iter().map(|s| s.to_string()));
        let val_refs: Vec<&str> = all.iter().map(|s| s.as_str()).collect();
        self.execute(&sql, &val_refs).await
    }

    /// 按主键删除记录,返回是否成功删除
    pub async fn delete_by_id(&self, table: &str, id: impl Into<serde_json::Value>) -> DbResult<bool> {
        let value: serde_json::Value = id.into();
        let pk = "id";
        let id_str = value_to_string(&value);
        let sql = format!("DELETE FROM {} WHERE {}=$1{}",
            table, pk, id_cast(&value));
        let n = self.execute(&sql, &[&id_str]).await?;
        Ok(n > 0)
    }

    /// 批量按主键删除,返回受影响行数
    pub async fn batch_delete_by_ids(&self, table: &str, ids: &[impl AsRef<str>]) -> DbResult<u64> {
        if ids.is_empty() { return Ok(0); }
        let is_uuid = ids.first().map(|id| {
            let s = id.as_ref();
            s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4
        }).unwrap_or(false);
        let cast = if is_uuid { "::uuid" } else { "" };
        let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${}{}", i, cast)).collect();
        let sql = format!("DELETE FROM {} WHERE id IN ({})", table, placeholders.join(", "));
        let params: Vec<&str> = ids.iter().map(|id| id.as_ref()).collect();
        self.execute(&sql, &params).await
    }

    /// 执行 INSERT/UPDATE/DELETE 等写操作,返回受影响行数
    pub async fn execute(&self, sql: &str, params: &[&str]) -> DbResult<u64> {
        match &self.pool {
            DbPool::Postgres(pool) => execute_pg_pool(pool, sql, params).await,
            DbPool::Mysql(pool)    => execute_my_sql_pool(pool, sql, params).await,
            DbPool::Sqlite(pool)   => execute_sqlite_pool(pool, sql, params).await,
            DbPool::Any(pool)      => execute_any(pool, sql, params).await,
        }
    }

    // ── 事务 ─────────────────────────────────────────

    /// 在事务闭包中执行操作,自动 BEGIN/COMMIT/ROLLBACK
    ///
    /// 闭包返回 `Ok` 则 COMMIT,返回 `Err` 则 ROLLBACK。
    /// 可通过 `ActiveTx::set_rollback_only()` 强制回滚。
    ///
    /// # 示例
    ///
    /// ```ignore
    /// db.transaction(|mut tx| async move {
    ///     let user = Row::table("user").id(1);
    ///     tx.execute("UPDATE user SET name=$1 WHERE id=$2", &["Alice", "1"]).await?;
    ///     (tx, Ok(()))
    /// }).await?;
    /// ```
    pub async fn transaction<F, Fut, T>(&self, f: F) -> DbResult<T>
    where
        F: FnOnce(crate::tx::ActiveTx) -> Fut + Send,
        Fut: std::future::Future<Output = (crate::tx::ActiveTx, DbResult<T>)> + Send,
        T: Send,
    {
        let mut rollback_only = false;
        crate::tx::execute_transaction(&self.pool, crate::tx::Isolation::ReadCommitted, &mut rollback_only, f).await
    }
}

// ── 辅助函数 ───────────────────────────────────────

pub(crate) fn value_to_string(v: &serde_json::Value) -> String {
    match v {
        serde_json::Value::String(s) => s.clone(),
        serde_json::Value::Number(n) => n.to_string(),
        serde_json::Value::Bool(b) => b.to_string(),
        serde_json::Value::Null => String::new(),
        other => other.to_string(),
    }
}

fn adjust_param_indices_with_casts(sql: &str, offset: usize, params: &[&str]) -> String {
    let re = regex::Regex::new(r"\$(\d+)").unwrap();
    if offset == 0 {
        return re.replace_all(sql, |caps: &regex::Captures| {
            let n: usize = caps[1].parse().unwrap_or(0);
            let cast = params.get(n.wrapping_sub(1)).map(|v| {
                let s: &str = v;
                if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 { "::uuid" }
                else if s.parse::<i64>().is_ok() { "::bigint" }
                else if s.parse::<f64>().is_ok() { "::double precision" }
                else { "" }
            }).unwrap_or("");
            format!("${}{}", n, cast)
        }).to_string();
    }
    re.replace_all(sql, |caps: &regex::Captures| {
        let n: usize = caps[1].parse().unwrap_or(0);
        let cast = params.get(n.wrapping_sub(1)).map(|v| {
            let s: &str = v;
            if s.len() == 36 && s.chars().filter(|&c| c == '-').count() == 4 { "::uuid" }
            else if s.parse::<i64>().is_ok() { "::bigint" }
            else if s.parse::<f64>().is_ok() { "::double precision" }
            else { "" }
        }).unwrap_or("");
        format!("${}{}", n + offset, cast)
    }).to_string()
}

fn id_cast(value: &Value) -> &'static str {
    match IdKind::detect(value) {
        IdKind::Uuid => "::uuid",
        IdKind::I64 => "::bigint",
        _ => "",
    }
}

fn value_cast(value: &Value) -> &'static str {
    match value {
        Value::Object(_) | Value::Array(_) => "::jsonb",
        Value::String(s) => {
            if is_inet_format(s) {
                "::inet"
            } else {
                match IdKind::detect(value) {
                    IdKind::Uuid => "::uuid",
                    IdKind::I64 => "::bigint",
                    IdKind::F64 => "::double precision",
                    IdKind::Bool => "::boolean",
                    _ => "",
                }
            }
        }
        _ => match IdKind::detect(value) {
            IdKind::Uuid => "::uuid",
            IdKind::I64 => "::bigint",
            IdKind::F64 => "::double precision",
            IdKind::Bool => "::boolean",
            _ => "",
        },
    }
}

fn is_inet_format(s: &str) -> bool {
    if s.is_empty() {
        return false;
    }
    let parts: Vec<&str> = s.split('.').collect();
    if parts.len() == 4 && parts.iter().all(|p| p.parse::<u8>().is_ok()) {
        return true;
    }
    if s.contains("::") {
        return true;
    }
    if s.contains(':') {
        let parts: Vec<&str> = s.split(':').collect();
        if parts.len() >= 2 && parts.len() <= 8 {
            return parts.iter().all(|p| p.is_empty() || u16::from_str_radix(p, 16).is_ok());
        }
    }
    false
}