Skip to main content

wp_knowledge/mem/
memdb.rs

1use crate::DBQuery;
2use crate::cache::CacheAble;
3use crate::mem::RowData;
4use crate::mem::stub::StubMDB;
5use csv::Reader;
6use enum_dispatch::enum_dispatch;
7use lazy_static::lazy_static;
8use orion_error::ErrorOwe;
9use orion_error::ErrorWith;
10use orion_error::ToStructError;
11use orion_error::UvsFrom;
12use r2d2_sqlite::SqliteConnectionManager;
13use rusqlite::OpenFlags;
14use rusqlite::Params;
15use rusqlite::ToSql;
16use rusqlite::types::ToSqlOutput;
17use rusqlite::types::Value;
18use std::path::PathBuf;
19use wp_error::KnowledgeReason;
20use wp_error::KnowledgeResult;
21use wp_log::debug_kdb;
22use wp_log::info_kdb;
23use wp_log::warn_kdb;
24use wp_model_core::model;
25use wp_model_core::model::DataField;
26
27use super::AnyResult;
28use super::SqlNamedParam;
29use crate::loader::ProviderKind;
30use crate::runtime::MetadataCacheScope;
31
32lazy_static! {
33    // Important: Use a single SQLite in-memory connection so schema/data persist across calls.
34    // r2d2 with `memory()` creates isolated DBs per connection; limit pool to size=1 to reuse
35    // the same connection and avoid "no such table" issues when different checkouts observe
36    // different ephemeral databases.
37    pub static ref MEM_SQLITE_INS: r2d2::Pool<SqliteConnectionManager> =
38        r2d2::Pool::builder()
39            .max_size(1)
40            .build(SqliteConnectionManager::memory())
41            .expect("init SQLite memory pool (size=1) failed");
42}
43
44#[derive(Debug, Clone)]
45pub struct MemDB {
46    conn: r2d2::Pool<SqliteConnectionManager>,
47}
48
49#[derive(Debug, Clone)]
50#[enum_dispatch(DBQuery)]
51pub enum MDBEnum {
52    Stub(StubMDB),
53    Use(MemDB),
54}
55impl Default for MDBEnum {
56    fn default() -> Self {
57        MDBEnum::Stub(StubMDB {})
58    }
59}
60impl MDBEnum {
61    pub fn global() -> Self {
62        MDBEnum::Use(MemDB::global())
63    }
64    pub fn load_test() -> AnyResult<()> {
65        MemDB::load_test()?;
66        Ok(())
67    }
68}
69
70pub fn cache_query<const N: usize, P: Params>(
71    db: &MDBEnum,
72    sql: &str,
73    c_params: &[DataField; N],
74    q_params: P,
75    cache: &mut impl CacheAble<DataField, RowData, N>,
76) -> RowData {
77    crate::cache_util::cache_query_impl(c_params, cache, || db.query_row_params(sql, q_params))
78}
79impl ToSql for SqlNamedParam {
80    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
81        match self.0.get_value() {
82            model::Value::Bool(v) => Ok(ToSqlOutput::Owned(Value::Integer(if *v { 1 } else { 0 }))),
83            model::Value::Null => Ok(ToSqlOutput::Owned(Value::Null)),
84            model::Value::Chars(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
85            model::Value::Symbol(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
86            model::Value::Time(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
87            model::Value::Digit(v) => Ok(ToSqlOutput::Owned(Value::Integer(*v))),
88            model::Value::Hex(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
89            model::Value::Float(v) => Ok(ToSqlOutput::Owned(Value::Real(*v))),
90            model::Value::IpNet(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
91            model::Value::IpAddr(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
92            model::Value::Ignore(_) => Ok(ToSqlOutput::Owned(Value::Null)),
93            model::Value::Obj(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
94            model::Value::Array(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
95            model::Value::Domain(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
96            model::Value::Url(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
97            model::Value::Email(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
98            model::Value::IdCard(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
99            model::Value::MobilePhone(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
100        }
101    }
102}
103
104impl DBQuery for MemDB {
105    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
106        let conn = self.conn.get().owe_res().want("get memdb connect")?;
107        let _ = crate::sqlite_ext::register_builtin(&conn);
108        super::query_util::query_cached(&conn, sql, [])
109    }
110
111    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
112        let conn = self.conn.get().owe_res().want("get memdb connect")?;
113        // Ensure SQLite UDFs are available on this connection (ip4_int/cidr4_* etc.)
114        let _ = crate::sqlite_ext::register_builtin(&conn);
115        super::query_util::query_first_row_cached(&conn, sql, [])
116    }
117
118    fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
119        debug_kdb!("[memdb] query_row_params: {}", sql);
120        let conn = self.conn.get().owe_res()?;
121        // Ensure SQLite UDFs are available on this connection
122        let _ = crate::sqlite_ext::register_builtin(&conn);
123        super::query_util::query_first_row_cached(&conn, sql, params)
124    }
125
126    fn query_row_tdos<P: Params>(
127        &self,
128        _sql: &str,
129        _params: &[DataField; 2],
130    ) -> KnowledgeResult<RowData> {
131        //let data: [TDOParams; 2] = [TDOParams(&params[0]), TDOParams(&params[1])];
132        //params.iter().for_each(|x| data.push(TDOParams(x)));
133        //self.query_row_params(sql, data)
134        todo!();
135    }
136}
137impl MemDB {
138    pub fn query_with_scope(
139        &self,
140        scope: &MetadataCacheScope,
141        sql: &str,
142    ) -> KnowledgeResult<Vec<RowData>> {
143        let conn = self.conn.get().owe_res().want("get memdb connect")?;
144        let _ = crate::sqlite_ext::register_builtin(&conn);
145        super::query_util::query_cached_with_scope(
146            &conn,
147            scope,
148            Some(ProviderKind::SqliteAuthority),
149            sql,
150            [],
151        )
152    }
153
154    pub fn query_row_with_scope(
155        &self,
156        scope: &MetadataCacheScope,
157        sql: &str,
158    ) -> KnowledgeResult<RowData> {
159        let conn = self.conn.get().owe_res().want("get memdb connect")?;
160        let _ = crate::sqlite_ext::register_builtin(&conn);
161        super::query_util::query_first_row_cached_with_scope(
162            &conn,
163            scope,
164            Some(ProviderKind::SqliteAuthority),
165            sql,
166            [],
167        )
168    }
169
170    pub fn query_fields_with_scope(
171        &self,
172        scope: &MetadataCacheScope,
173        sql: &str,
174        params: &[DataField],
175    ) -> KnowledgeResult<Vec<RowData>> {
176        let conn = self.conn.get().owe_res().want("get memdb connect")?;
177        let _ = crate::sqlite_ext::register_builtin(&conn);
178        let named_params = params
179            .iter()
180            .cloned()
181            .map(SqlNamedParam)
182            .collect::<Vec<_>>();
183        let refs: Vec<(&str, &dyn ToSql)> = named_params
184            .iter()
185            .map(|param| (param.0.get_name(), param as &dyn ToSql))
186            .collect();
187        super::query_util::query_cached_with_scope(
188            &conn,
189            scope,
190            Some(ProviderKind::SqliteAuthority),
191            sql,
192            refs.as_slice(),
193        )
194    }
195
196    pub fn query_named_fields_with_scope(
197        &self,
198        scope: &MetadataCacheScope,
199        sql: &str,
200        params: &[DataField],
201    ) -> KnowledgeResult<RowData> {
202        self.query_fields_with_scope(scope, sql, params)
203            .map(|rows| rows.into_iter().next().unwrap_or_default())
204    }
205
206    pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
207        let conn = self.conn.get().owe_res().want("get memdb connect")?;
208        let _ = crate::sqlite_ext::register_builtin(&conn);
209        let named_params = params
210            .iter()
211            .cloned()
212            .map(SqlNamedParam)
213            .collect::<Vec<_>>();
214        let refs: Vec<(&str, &dyn ToSql)> = named_params
215            .iter()
216            .map(|param| (param.0.get_name(), param as &dyn ToSql))
217            .collect();
218        super::query_util::query_cached(&conn, sql, refs.as_slice())
219    }
220
221    pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
222        self.query_fields(sql, params)
223            .map(|rows| rows.into_iter().next().unwrap_or_default())
224    }
225
226    pub fn instance() -> Self {
227        // Provide a single-connection pool for a consistent in-memory DB view
228        let manager = SqliteConnectionManager::memory();
229        let pool = r2d2::Pool::builder()
230            .max_size(1)
231            .build(manager)
232            .expect("init SQLite memory pool (size=1) failed");
233        Self { conn: pool }
234    }
235    /// Experimental: shared in-memory SQLite via URI with a pool size > 1.
236    /// Requires SQLite compiled with shared-cache support.
237    pub fn shared_pool(max_size: u32) -> AnyResult<Self> {
238        // Shared in-memory URI. Every connection to this URI shares same DB.
239        // Note: this depends on platform SQLite features.
240        let uri = "file:wp_knowledge_shm?mode=memory&cache=shared";
241        let manager = SqliteConnectionManager::file(uri).with_flags(
242            OpenFlags::SQLITE_OPEN_READ_WRITE
243                | OpenFlags::SQLITE_OPEN_CREATE
244                | OpenFlags::SQLITE_OPEN_URI,
245        );
246        let pool = r2d2::Pool::builder().max_size(max_size).build(manager)?;
247        Ok(Self { conn: pool })
248    }
249
250    /// Create a MemDB backed by a file path with custom flags and pool size.
251    pub fn new_file(
252        path: &str,
253        max_size: u32,
254        flags: rusqlite::OpenFlags,
255    ) -> KnowledgeResult<Self> {
256        let manager = r2d2_sqlite::SqliteConnectionManager::file(path).with_flags(flags);
257        let pool = r2d2::Pool::builder()
258            .max_size(max_size)
259            .build(manager)
260            .owe_res()?;
261        Ok(Self { conn: pool })
262    }
263    // V1 init_load_by_conf removed: use loader::build_authority_from_knowdb for V2
264
265    /// Execute a closure with a checked-out SQLite connection from the pool.
266    /// Useful for one-time prepared statements or specialized operations.
267    pub fn with_conn<T, F: FnOnce(&rusqlite::Connection) -> AnyResult<T>>(
268        &self,
269        f: F,
270    ) -> AnyResult<T> {
271        let pooled = self.conn.get()?;
272        let conn_ref: &rusqlite::Connection = &pooled;
273        f(conn_ref)
274    }
275
276    pub fn table_create(&self, sql: &str) -> anyhow::Result<()> {
277        let conn = self.conn.get()?;
278        conn.execute(sql, ())?;
279        debug_kdb!("crate table: {} ", sql);
280        Ok(())
281    }
282    pub fn execute(&self, sql: &str) -> anyhow::Result<()> {
283        let conn = self.conn.get()?;
284        conn.execute(sql, ())?;
285        debug_kdb!("execute: {} ", sql);
286        Ok(())
287    }
288
289    pub fn table_clean(&self, sql: &str) -> anyhow::Result<()> {
290        let conn = self.conn.get()?;
291        conn.execute(sql, ())?;
292        debug_kdb!("clean table: {} ", sql);
293        Ok(())
294    }
295
296    pub fn table_load(
297        &self,
298        sql: &str,
299        csv_path: PathBuf,
300        cols: Vec<usize>,
301        max: usize,
302    ) -> AnyResult<usize> {
303        info_kdb!("load table data in {}", csv_path.display());
304        if !csv_path.exists() {
305            warn_kdb!("{} not find, load knowdb failed", csv_path.display());
306            return Ok(0);
307        }
308        let mut rdr = Reader::from_path(&csv_path)?;
309        let conn = self.conn.get()?;
310        let mut load_cnt: usize = 0;
311        // Prepare once outside loop for performance
312        let mut stmt = conn.prepare(sql)?;
313        for (idx, result) in rdr.records().enumerate() {
314            if load_cnt >= max {
315                break;
316            }
317            let record = result.map_err(|e| {
318                anyhow::anyhow!("read csv record failed at line {}: {}", idx + 1, e)
319            })?;
320
321            // Basic bounds check to avoid panic on bad column indices
322            if let Some(max_col) = cols.iter().max()
323                && *max_col >= record.len()
324            {
325                return Err(anyhow::anyhow!(
326                    "csv has insufficient columns at line {}: need index {}, got {} columns",
327                    idx + 1,
328                    *max_col,
329                    record.len()
330                ));
331            }
332
333            // Unified dynamic binding (strict): any missing column is an error
334            let mut vec: Vec<&str> = Vec::with_capacity(cols.len());
335            for &ci in &cols {
336                let v = record
337                    .get(ci)
338                    .ok_or_else(|| anyhow::anyhow!("line {} col {} missing", idx + 1, ci))?;
339                vec.push(v);
340            }
341            let params = rusqlite::params_from_iter(vec);
342            stmt.execute(params)?;
343            load_cnt += 1;
344        }
345        info_kdb!("from {} load data cnt: {}", csv_path.display(), load_cnt);
346        Ok(load_cnt)
347    }
348
349    pub fn check_data(&self, table: &str, scope: (usize, usize)) -> KnowledgeResult<usize> {
350        let conn = self.conn.get().owe_res()?;
351        let count_sql = format!("select count(*) from {}", table);
352        let count: usize = conn
353            .query_row(count_sql.as_str(), (), |row| row.get(0))
354            .owe_rule()?;
355        if count >= scope.0 {
356            Ok(count)
357        } else {
358            Err(KnowledgeReason::from_conf()
359                .to_err()
360                .with_detail("table data less")
361                .with(("table", table))
362                .with(("count", count.to_string())))
363
364            /*
365            Err(anyhow!(
366                "data less! , load data count {} <= min {}",
367                count,
368                scope.0,
369            ))
370            */
371        }
372    }
373
374    pub fn global() -> Self {
375        Self {
376            conn: MEM_SQLITE_INS.clone(),
377        }
378    }
379    pub fn load_test() -> AnyResult<Self> {
380        let db = Self::global();
381        debug_kdb!("[memdb] load_test invoked");
382        db.table_create(EXAMPLE_CREATE_SQL)?;
383        // 通过 crate 根目录定位测试字典,避免 cwd 影响
384        let csv = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/mem/dict/example.csv");
385        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
386        db.table_load(EXAMPLE_INSERT_SQL, csv, vec![0, 1], 100)?;
387        // quick sanity check
388        if let Ok(cnt) = db.check_data("example", (1, usize::MAX)) {
389            debug_kdb!("[memdb] example rows loaded = {}", cnt);
390        }
391        Ok(db)
392    }
393}
394pub const EXAMPLE_CREATE_SQL: &str = r#"CREATE TABLE IF NOT EXISTS example (
395    id   INTEGER PRIMARY KEY,
396    name TEXT NOT NULL,
397    pinying TEXT NOT NULL
398    )"#;
399pub const EXAMPLE_CLEAN_SQL: &str = "DELETE FROM example";
400pub const EXAMPLE_INSERT_SQL: &str = r#"INSERT INTO example(name,pinying) VALUES (?1, ?2 ) "#;
401
402#[cfg(test)]
403mod tests {
404
405    use std::{fs::File, io::Read};
406
407    use super::*;
408    // V1 TableConf removed
409    use crate::mem::ToSqlParams;
410    use anyhow::Context;
411    use orion_conf::EnvTomlLoad;
412    use orion_variate::EnvDict;
413    use serde::Serialize;
414    use std::fs;
415    use wp_data_fmt::{Csv, RecordFormatter};
416
417    #[test]
418    fn test_load() -> AnyResult<()> {
419        let db = MemDB::instance();
420        db.table_create(EXAMPLE_CREATE_SQL)?;
421        let loaded = db.table_load(
422            EXAMPLE_INSERT_SQL,
423            PathBuf::from("src/mem/dict/example.csv"),
424            vec![0, 1],
425            100,
426        )?;
427        assert_eq!(loaded, 10);
428        let fmt = Csv::default();
429        let tdos = db.query_row("select * from example;")?;
430        for obj in tdos {
431            println!("{}", fmt.fmt_field(&obj.into()));
432        }
433        Ok(())
434    }
435
436    #[test]
437    fn test_csv_off_by_one() -> AnyResult<()> {
438        let db = MemDB::instance();
439        db.table_create(EXAMPLE_CREATE_SQL)?;
440        // Expect only 1 row loaded when max=1 (no off-by-one)
441        let loaded = db.table_load(
442            EXAMPLE_INSERT_SQL,
443            PathBuf::from("src/mem/dict/example.csv"),
444            vec![0, 1],
445            1,
446        )?;
447        assert_eq!(loaded, 1);
448        Ok(())
449    }
450
451    #[test]
452    fn test_row_null_mapping() -> AnyResult<()> {
453        let db = MemDB::instance();
454        db.execute("CREATE TABLE tnull (v TEXT)")?;
455        db.execute("INSERT INTO tnull (v) VALUES (NULL)")?;
456        let row = db.query_row("SELECT v FROM tnull")?;
457        assert_eq!(row.len(), 1);
458        assert_eq!(row[0].get_name(), "v");
459        // Ensure NULL becomes a Value::Null rather than panic
460        assert!(matches!(row[0].get_value(), model::Value::Null));
461        Ok(())
462    }
463
464    #[test]
465    fn test_row_blob_mapping() -> AnyResult<()> {
466        let db = MemDB::instance();
467        db.execute("CREATE TABLE tblob (b BLOB)")?;
468        // Insert ASCII 'ABC' as blob
469        db.execute("INSERT INTO tblob (b) VALUES (X'414243')")?;
470        let row = db.query_row("SELECT b FROM tblob")?;
471        assert_eq!(row.len(), 1);
472        assert_eq!(row[0].get_name(), "b");
473        // lossy utf8 decode should yield "ABC"
474        assert_eq!(row[0].to_string(), "chars(ABC)");
475        Ok(())
476    }
477
478    #[test]
479    fn test_csv_missing_column_error() -> AnyResult<()> {
480        use std::fs;
481        use std::io::Write;
482        let db = MemDB::instance();
483        db.table_create(EXAMPLE_CREATE_SQL)?;
484        // Create a temp csv with only 1 column per row
485        let mut path = std::env::temp_dir();
486        path.push("wp_knowledge_csv_missing_col.csv");
487        {
488            let mut f = fs::File::create(&path)?;
489            writeln!(f, "name")?;
490            writeln!(f, "only_one_col")?;
491        }
492        let res = db.table_load(
493            EXAMPLE_INSERT_SQL,
494            path.clone(),
495            vec![0, 1], // request 2 columns but csv has 1
496            10,
497        );
498        assert!(res.is_err());
499        let e = format!("{}", res.err().unwrap());
500        assert!(e.contains("line"));
501        assert!(e.contains("insufficient columns"));
502        // cleanup
503        let _ = fs::remove_file(&path);
504        Ok(())
505    }
506
507    #[test]
508    fn test_global_persistence_across_handles() -> AnyResult<()> {
509        // Create table via one global handle
510        {
511            let db1 = MemDB::global();
512            db1.execute("CREATE TABLE IF NOT EXISTS gtest (v TEXT)")?;
513            db1.execute("INSERT INTO gtest (v) VALUES ('ok')")?;
514        }
515        // Read via a new global handle; should see the same in-memory DB
516        {
517            let db2 = MemDB::global();
518            let rows = db2.query_row("SELECT v FROM gtest")?;
519            assert_eq!(rows.len(), 1);
520            assert_eq!(rows[0].to_string(), "chars(ok)");
521        }
522        Ok(())
523    }
524
525    #[test]
526    fn test_init_by_conf() -> AnyResult<()> {
527        let db = MemDB::global();
528        db.table_create(EXAMPLE_CREATE_SQL)?;
529        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
530        db.table_load(
531            EXAMPLE_INSERT_SQL,
532            PathBuf::from("src/mem/dict/example.csv"),
533            vec![0, 1],
534            100,
535        )?;
536        Ok(())
537    }
538
539    // V1 conf serde test removed
540
541    #[test]
542    fn test_alter_level() -> anyhow::Result<()> {
543        let db = MemDB::global();
544        // ensure clean state across global in-memory handle
545        let _ = db.execute("DROP TABLE IF EXISTS alert_cat_level");
546        db.table_create(
547            r#"CREATE TABLE IF NOT EXISTS alert_cat_level (
548                id   INTEGER PRIMARY KEY,
549                log_type TEXT NOT NULL,
550                level1_code TEXT NOT NULL,
551                level1_name TEXT NOT NULL,
552                level2_code TEXT NOT NULL,
553                level2_name TEXT NOT NULL,
554                original_code TEXT NOT NULL,
555                original_name TEXT NOT NULL
556            )"#,
557        )?;
558        let _ = db.table_clean("DELETE FROM alert_cat_level");
559        db.table_load(
560            r#"INSERT INTO alert_cat_level (log_type, level1_code, level1_name, level2_code, level2_name, original_code, original_name) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
561            PathBuf::from("src/mem/dict/event_cat_level.csv"),
562            vec![0, 1, 2, 3, 4, 5, 6],
563            2000,
564        )?;
565
566        let sql = "select level1_code from alert_cat_level where log_type = :log_type  and  original_code = :code ";
567        let result = db.query_row_params(
568            //"select level1_code from alert_cat_level where log_type = 'jowto_server_alert_log' and original_code = '00000002'",
569            sql,
570            &[(":log_type", "app_log"), (":code", "00000002")],
571        )?;
572        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
573
574        let px = [
575            SqlNamedParam(DataField::from_chars(":code", "00000002")),
576            SqlNamedParam(DataField::from_chars(":log_type", "app_log")),
577        ];
578
579        let p = px.to_params();
580        let result = db.query_row_params(sql, &p)?;
581        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
582
583        Ok(())
584    }
585
586    #[test]
587    fn test_tosql_bind_various_types() -> AnyResult<()> {
588        use chrono::NaiveDate;
589        use std::net::{IpAddr, Ipv4Addr};
590        use wp_model_core::model::types::value::ObjectValue;
591        use wp_model_core::model::{DateTimeValue, HexT};
592
593        let db = MemDB::instance();
594        db.execute("CREATE TABLE p (v)")?;
595
596        // Bool -> integer 1
597        {
598            let sql = "INSERT INTO p (v) VALUES (:v)";
599            let p = [SqlNamedParam(DataField::from_bool(":v", true))];
600            db.query_row_params(sql, &p.to_params())?;
601            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
602            assert!(matches!(row[0].get_value(), model::Value::Digit(1)));
603        }
604        // Null
605        {
606            let sql = "INSERT INTO p (v) VALUES (:v)";
607            let p = [SqlNamedParam(DataField::new(
608                model::DataType::default(),
609                ":v",
610                model::Value::Null,
611            ))];
612            db.query_row_params(sql, &p.to_params())?;
613            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
614            assert!(matches!(row[0].get_value(), model::Value::Null));
615        }
616        // Time -> text
617        {
618            let dt: DateTimeValue = NaiveDate::from_ymd_opt(2023, 1, 1)
619                .unwrap()
620                .and_hms_opt(0, 0, 0)
621                .unwrap();
622            let sql = "INSERT INTO p (v) VALUES (:v)";
623            let p = [SqlNamedParam(DataField::from_time(":v", dt))];
624            db.query_row_params(sql, &p.to_params())?;
625            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
626            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
627        }
628        // Hex -> text
629        {
630            let sql = "INSERT INTO p (v) VALUES (:v)";
631            let p = [SqlNamedParam(DataField::from_hex(":v", HexT(0xABCD)))];
632            db.query_row_params(sql, &p.to_params())?;
633            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
634            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
635        }
636        // IpAddr -> text
637        {
638            let sql = "INSERT INTO p (v) VALUES (:v)";
639            let p = [SqlNamedParam(DataField::from_ip(
640                ":v",
641                IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
642            ))];
643            db.query_row_params(sql, &p.to_params())?;
644            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
645            assert_eq!(row[0].to_string(), "chars(1.2.3.4)");
646        }
647        // Obj -> text (debug)
648        {
649            let mut obj = ObjectValue::new();
650            obj.insert("k".to_string(), DataField::from_chars("", "v"));
651            let sql = "INSERT INTO p (v) VALUES (:v)";
652            let p = [SqlNamedParam(DataField::from_obj(":v", obj))];
653            db.query_row_params(sql, &p.to_params())?;
654            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
655            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
656        }
657        // Array -> text (debug)
658        {
659            let arr = vec![DataField::from_chars("", "a"), DataField::from_digit("", 1)];
660            let sql = "INSERT INTO p (v) VALUES (:v)";
661            let p = [SqlNamedParam(DataField::from_arr(":v", arr))];
662            db.query_row_params(sql, &p.to_params())?;
663            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
664            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
665        }
666        Ok(())
667    }
668
669    #[test]
670    fn test_column_alias_names() -> AnyResult<()> {
671        let db = MemDB::instance();
672        // Create a simple one-shot table/view using alias
673        db.execute("CREATE TABLE ctest (a INTEGER, b TEXT)")?;
674        db.execute("INSERT INTO ctest (a,b) VALUES (42,'x')")?;
675        let row = db.query_row("SELECT a AS 'the number', b AS 'the text' FROM ctest LIMIT 1")?;
676        assert_eq!(row.len(), 2);
677        assert_eq!(row[0].get_name(), "the number");
678        assert_eq!(row[1].get_name(), "the text");
679        Ok(())
680    }
681
682    #[test]
683    fn test_concurrent_inserts() -> AnyResult<()> {
684        use std::thread;
685        let db = MemDB::global();
686        db.execute("CREATE TABLE IF NOT EXISTS concur (v INTEGER)")?;
687        let threads: Vec<_> = (0..4)
688            .map(|_| {
689                thread::spawn(|| {
690                    let dbt = MemDB::global();
691                    for _ in 0..10 {
692                        let _ = dbt.execute("INSERT INTO concur (v) VALUES (1)");
693                    }
694                })
695            })
696            .collect();
697        for t in threads {
698            t.join().unwrap();
699        }
700        let row = db.query_row("SELECT SUM(v) AS total FROM concur")?;
701        // total should be 40
702        assert_eq!(row[0].to_string(), "digit(40)");
703        Ok(())
704    }
705
706    #[test]
707    fn test_query_returns_all_rows() -> AnyResult<()> {
708        let db = MemDB::instance();
709        db.execute("CREATE TABLE multi (id INTEGER, name TEXT)")?;
710        let rows = db.query("SELECT * FROM multi")?;
711        assert!(rows.is_empty(), "empty table should return empty vec");
712        db.execute("INSERT INTO multi (id, name) VALUES (1, 'alice')")?;
713        db.execute("INSERT INTO multi (id, name) VALUES (2, 'bob')")?;
714        db.execute("INSERT INTO multi (id, name) VALUES (3, 'charlie')")?;
715
716        let rows = db.query("SELECT id, name FROM multi ORDER BY id")?;
717        assert_eq!(rows.len(), 3, "should return all 3 rows");
718
719        Ok(())
720    }
721
722    #[allow(dead_code)]
723    fn load_toml_conf<T: serde::de::DeserializeOwned>(path: &str) -> AnyResult<T> {
724        let mut f = File::open(path).with_context(|| format!("conf file not found: {}", path))?;
725        let mut buffer = Vec::with_capacity(10240);
726        f.read_to_end(&mut buffer).expect("read conf file error");
727        let conf_data = String::from_utf8(buffer)?;
728        let dict = EnvDict::new();
729        let conf: T = T::env_parse_toml(conf_data.as_str(), &dict)?;
730        Ok(conf)
731    }
732
733    #[allow(dead_code)]
734    fn export_toml_local<T: Serialize>(val: &T, path: &str) -> AnyResult<()> {
735        let data = toml::to_string_pretty(val)?;
736        if let Some(parent) = std::path::Path::new(path).parent() {
737            fs::create_dir_all(parent)?;
738        }
739        fs::write(path, data)?;
740        Ok(())
741    }
742}