Skip to main content

wp_knowledge/mem/
memdb.rs

1use crate::DBQuery;
2use crate::cache::CacheAble;
3use crate::mem::RowData;
4use crate::mem::stub::StubMDB;
5use csv::Reader;
6use enum_dispatch::enum_dispatch;
7use lazy_static::lazy_static;
8use orion_error::ErrorOwe;
9use orion_error::ErrorWith;
10use orion_error::ToStructError;
11use orion_error::UvsFrom;
12use r2d2_sqlite::SqliteConnectionManager;
13use rusqlite::OpenFlags;
14use rusqlite::Params;
15use rusqlite::ToSql;
16use rusqlite::types::ToSqlOutput;
17use rusqlite::types::Value;
18use std::path::PathBuf;
19use wp_error::KnowledgeReason;
20use wp_error::KnowledgeResult;
21use wp_log::debug_kdb;
22use wp_log::info_kdb;
23use wp_log::warn_kdb;
24use wp_model_core::model;
25use wp_model_core::model::DataField;
26
27use super::AnyResult;
28use super::SqlNamedParam;
29
30lazy_static! {
31    // Important: Use a single SQLite in-memory connection so schema/data persist across calls.
32    // r2d2 with `memory()` creates isolated DBs per connection; limit pool to size=1 to reuse
33    // the same connection and avoid "no such table" issues when different checkouts observe
34    // different ephemeral databases.
35    pub static ref MEM_SQLITE_INS: r2d2::Pool<SqliteConnectionManager> =
36        r2d2::Pool::builder()
37            .max_size(1)
38            .build(SqliteConnectionManager::memory())
39            .expect("init SQLite memory pool (size=1) failed");
40}
41
42#[derive(Debug, Clone)]
43pub struct MemDB {
44    conn: r2d2::Pool<SqliteConnectionManager>,
45}
46
47#[derive(Debug, Clone)]
48#[enum_dispatch(DBQuery)]
49pub enum MDBEnum {
50    Stub(StubMDB),
51    Use(MemDB),
52}
53impl Default for MDBEnum {
54    fn default() -> Self {
55        MDBEnum::Stub(StubMDB {})
56    }
57}
58impl MDBEnum {
59    pub fn global() -> Self {
60        MDBEnum::Use(MemDB::global())
61    }
62    pub fn load_test() -> AnyResult<()> {
63        MemDB::load_test()?;
64        Ok(())
65    }
66}
67
68pub fn cache_query<const N: usize, P: Params>(
69    db: &MDBEnum,
70    sql: &str,
71    c_params: &[DataField; N],
72    q_params: P,
73    cache: &mut impl CacheAble<DataField, RowData, N>,
74) -> RowData {
75    crate::cache_util::cache_query_impl(c_params, cache, || db.query_row_params(sql, q_params))
76}
77impl ToSql for SqlNamedParam {
78    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
79        match self.0.get_value() {
80            model::Value::Bool(v) => Ok(ToSqlOutput::Owned(Value::Integer(if *v { 1 } else { 0 }))),
81            model::Value::Null => Ok(ToSqlOutput::Owned(Value::Null)),
82            model::Value::Chars(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
83            model::Value::Symbol(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
84            model::Value::Time(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
85            model::Value::Digit(v) => Ok(ToSqlOutput::Owned(Value::Integer(*v))),
86            model::Value::Hex(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
87            model::Value::Float(v) => Ok(ToSqlOutput::Owned(Value::Real(*v))),
88            model::Value::IpNet(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
89            model::Value::IpAddr(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
90            model::Value::Ignore(_) => Ok(ToSqlOutput::Owned(Value::Null)),
91            model::Value::Obj(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
92            model::Value::Array(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
93            model::Value::Domain(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
94            model::Value::Url(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
95            model::Value::Email(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
96            model::Value::IdCard(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
97            model::Value::MobilePhone(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
98        }
99    }
100}
101
102impl DBQuery for MemDB {
103    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
104        let conn = self.conn.get().owe_res().want("get memdb connect")?;
105        let _ = crate::sqlite_ext::register_builtin(&conn);
106        super::query_util::query_cached(&conn, sql, [])
107    }
108
109    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
110        let conn = self.conn.get().owe_res().want("get memdb connect")?;
111        // Ensure SQLite UDFs are available on this connection (ip4_int/cidr4_* etc.)
112        let _ = crate::sqlite_ext::register_builtin(&conn);
113        super::query_util::query_first_row_cached(&conn, sql, [])
114    }
115
116    fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
117        debug_kdb!("[memdb] query_row_params: {}", sql);
118        let conn = self.conn.get().owe_res()?;
119        // Ensure SQLite UDFs are available on this connection
120        let _ = crate::sqlite_ext::register_builtin(&conn);
121        super::query_util::query_first_row_cached(&conn, sql, params)
122    }
123
124    fn query_row_tdos<P: Params>(
125        &self,
126        _sql: &str,
127        _params: &[DataField; 2],
128    ) -> KnowledgeResult<RowData> {
129        //let data: [TDOParams; 2] = [TDOParams(&params[0]), TDOParams(&params[1])];
130        //params.iter().for_each(|x| data.push(TDOParams(x)));
131        //self.query_row_params(sql, data)
132        todo!();
133    }
134}
135impl MemDB {
136    pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
137        let conn = self.conn.get().owe_res().want("get memdb connect")?;
138        let _ = crate::sqlite_ext::register_builtin(&conn);
139        let named_params = params
140            .iter()
141            .cloned()
142            .map(SqlNamedParam)
143            .collect::<Vec<_>>();
144        let refs: Vec<(&str, &dyn ToSql)> = named_params
145            .iter()
146            .map(|param| (param.0.get_name(), param as &dyn ToSql))
147            .collect();
148        super::query_util::query_cached(&conn, sql, refs.as_slice())
149    }
150
151    pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
152        self.query_fields(sql, params)
153            .map(|rows| rows.into_iter().next().unwrap_or_default())
154    }
155
156    pub fn instance() -> Self {
157        // Provide a single-connection pool for a consistent in-memory DB view
158        let manager = SqliteConnectionManager::memory();
159        let pool = r2d2::Pool::builder()
160            .max_size(1)
161            .build(manager)
162            .expect("init SQLite memory pool (size=1) failed");
163        Self { conn: pool }
164    }
165    /// Experimental: shared in-memory SQLite via URI with a pool size > 1.
166    /// Requires SQLite compiled with shared-cache support.
167    pub fn shared_pool(max_size: u32) -> AnyResult<Self> {
168        // Shared in-memory URI. Every connection to this URI shares same DB.
169        // Note: this depends on platform SQLite features.
170        let uri = "file:wp_knowledge_shm?mode=memory&cache=shared";
171        let manager = SqliteConnectionManager::file(uri).with_flags(
172            OpenFlags::SQLITE_OPEN_READ_WRITE
173                | OpenFlags::SQLITE_OPEN_CREATE
174                | OpenFlags::SQLITE_OPEN_URI,
175        );
176        let pool = r2d2::Pool::builder().max_size(max_size).build(manager)?;
177        Ok(Self { conn: pool })
178    }
179
180    /// Create a MemDB backed by a file path with custom flags and pool size.
181    pub fn new_file(
182        path: &str,
183        max_size: u32,
184        flags: rusqlite::OpenFlags,
185    ) -> KnowledgeResult<Self> {
186        let manager = r2d2_sqlite::SqliteConnectionManager::file(path).with_flags(flags);
187        let pool = r2d2::Pool::builder()
188            .max_size(max_size)
189            .build(manager)
190            .owe_res()?;
191        Ok(Self { conn: pool })
192    }
193    // V1 init_load_by_conf removed: use loader::build_authority_from_knowdb for V2
194
195    /// Execute a closure with a checked-out SQLite connection from the pool.
196    /// Useful for one-time prepared statements or specialized operations.
197    pub fn with_conn<T, F: FnOnce(&rusqlite::Connection) -> AnyResult<T>>(
198        &self,
199        f: F,
200    ) -> AnyResult<T> {
201        let pooled = self.conn.get()?;
202        let conn_ref: &rusqlite::Connection = &pooled;
203        f(conn_ref)
204    }
205
206    pub fn table_create(&self, sql: &str) -> anyhow::Result<()> {
207        let conn = self.conn.get()?;
208        conn.execute(sql, ())?;
209        debug_kdb!("crate table: {} ", sql);
210        Ok(())
211    }
212    pub fn execute(&self, sql: &str) -> anyhow::Result<()> {
213        let conn = self.conn.get()?;
214        conn.execute(sql, ())?;
215        debug_kdb!("execute: {} ", sql);
216        Ok(())
217    }
218
219    pub fn table_clean(&self, sql: &str) -> anyhow::Result<()> {
220        let conn = self.conn.get()?;
221        conn.execute(sql, ())?;
222        debug_kdb!("clean table: {} ", sql);
223        Ok(())
224    }
225
226    pub fn table_load(
227        &self,
228        sql: &str,
229        csv_path: PathBuf,
230        cols: Vec<usize>,
231        max: usize,
232    ) -> AnyResult<usize> {
233        info_kdb!("load table data in {}", csv_path.display());
234        if !csv_path.exists() {
235            warn_kdb!("{} not find, load knowdb failed", csv_path.display());
236            return Ok(0);
237        }
238        let mut rdr = Reader::from_path(&csv_path)?;
239        let conn = self.conn.get()?;
240        let mut load_cnt: usize = 0;
241        // Prepare once outside loop for performance
242        let mut stmt = conn.prepare(sql)?;
243        for (idx, result) in rdr.records().enumerate() {
244            if load_cnt >= max {
245                break;
246            }
247            let record = result.map_err(|e| {
248                anyhow::anyhow!("read csv record failed at line {}: {}", idx + 1, e)
249            })?;
250
251            // Basic bounds check to avoid panic on bad column indices
252            if let Some(max_col) = cols.iter().max()
253                && *max_col >= record.len()
254            {
255                return Err(anyhow::anyhow!(
256                    "csv has insufficient columns at line {}: need index {}, got {} columns",
257                    idx + 1,
258                    *max_col,
259                    record.len()
260                ));
261            }
262
263            // Unified dynamic binding (strict): any missing column is an error
264            let mut vec: Vec<&str> = Vec::with_capacity(cols.len());
265            for &ci in &cols {
266                let v = record
267                    .get(ci)
268                    .ok_or_else(|| anyhow::anyhow!("line {} col {} missing", idx + 1, ci))?;
269                vec.push(v);
270            }
271            let params = rusqlite::params_from_iter(vec);
272            stmt.execute(params)?;
273            load_cnt += 1;
274        }
275        info_kdb!("from {} load data cnt: {}", csv_path.display(), load_cnt);
276        Ok(load_cnt)
277    }
278
279    pub fn check_data(&self, table: &str, scope: (usize, usize)) -> KnowledgeResult<usize> {
280        let conn = self.conn.get().owe_res()?;
281        let count_sql = format!("select count(*) from {}", table);
282        let count: usize = conn
283            .query_row(count_sql.as_str(), (), |row| row.get(0))
284            .owe_rule()?;
285        if count >= scope.0 {
286            Ok(count)
287        } else {
288            Err(KnowledgeReason::from_conf()
289                .to_err()
290                .with_detail("table data less")
291                .with(("table", table))
292                .with(("count", count.to_string())))
293
294            /*
295            Err(anyhow!(
296                "data less! , load data count {} <= min {}",
297                count,
298                scope.0,
299            ))
300            */
301        }
302    }
303
304    pub fn global() -> Self {
305        Self {
306            conn: MEM_SQLITE_INS.clone(),
307        }
308    }
309    pub fn load_test() -> AnyResult<Self> {
310        let db = Self::global();
311        debug_kdb!("[memdb] load_test invoked");
312        db.table_create(EXAMPLE_CREATE_SQL)?;
313        // 通过 crate 根目录定位测试字典,避免 cwd 影响
314        let csv = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/mem/dict/example.csv");
315        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
316        db.table_load(EXAMPLE_INSERT_SQL, csv, vec![0, 1], 100)?;
317        // quick sanity check
318        if let Ok(cnt) = db.check_data("example", (1, usize::MAX)) {
319            debug_kdb!("[memdb] example rows loaded = {}", cnt);
320        }
321        Ok(db)
322    }
323}
324pub const EXAMPLE_CREATE_SQL: &str = r#"CREATE TABLE IF NOT EXISTS example (
325    id   INTEGER PRIMARY KEY,
326    name TEXT NOT NULL,
327    pinying TEXT NOT NULL
328    )"#;
329pub const EXAMPLE_CLEAN_SQL: &str = "DELETE FROM example";
330pub const EXAMPLE_INSERT_SQL: &str = r#"INSERT INTO example(name,pinying) VALUES (?1, ?2 ) "#;
331
332#[cfg(test)]
333mod tests {
334
335    use std::{fs::File, io::Read};
336
337    use super::*;
338    // V1 TableConf removed
339    use crate::mem::ToSqlParams;
340    use anyhow::Context;
341    use orion_conf::EnvTomlLoad;
342    use orion_variate::EnvDict;
343    use serde::Serialize;
344    use std::fs;
345    use wp_data_fmt::{Csv, RecordFormatter};
346
347    #[test]
348    fn test_load() -> AnyResult<()> {
349        let db = MemDB::instance();
350        db.table_create(EXAMPLE_CREATE_SQL)?;
351        let loaded = db.table_load(
352            EXAMPLE_INSERT_SQL,
353            PathBuf::from("src/mem/dict/example.csv"),
354            vec![0, 1],
355            100,
356        )?;
357        assert_eq!(loaded, 10);
358        let fmt = Csv::default();
359        let tdos = db.query_row("select * from example;")?;
360        for obj in tdos {
361            println!("{}", fmt.fmt_field(&obj.into()));
362        }
363        Ok(())
364    }
365
366    #[test]
367    fn test_csv_off_by_one() -> AnyResult<()> {
368        let db = MemDB::instance();
369        db.table_create(EXAMPLE_CREATE_SQL)?;
370        // Expect only 1 row loaded when max=1 (no off-by-one)
371        let loaded = db.table_load(
372            EXAMPLE_INSERT_SQL,
373            PathBuf::from("src/mem/dict/example.csv"),
374            vec![0, 1],
375            1,
376        )?;
377        assert_eq!(loaded, 1);
378        Ok(())
379    }
380
381    #[test]
382    fn test_row_null_mapping() -> AnyResult<()> {
383        let db = MemDB::instance();
384        db.execute("CREATE TABLE tnull (v TEXT)")?;
385        db.execute("INSERT INTO tnull (v) VALUES (NULL)")?;
386        let row = db.query_row("SELECT v FROM tnull")?;
387        assert_eq!(row.len(), 1);
388        assert_eq!(row[0].get_name(), "v");
389        // Ensure NULL becomes a Value::Null rather than panic
390        assert!(matches!(row[0].get_value(), model::Value::Null));
391        Ok(())
392    }
393
394    #[test]
395    fn test_row_blob_mapping() -> AnyResult<()> {
396        let db = MemDB::instance();
397        db.execute("CREATE TABLE tblob (b BLOB)")?;
398        // Insert ASCII 'ABC' as blob
399        db.execute("INSERT INTO tblob (b) VALUES (X'414243')")?;
400        let row = db.query_row("SELECT b FROM tblob")?;
401        assert_eq!(row.len(), 1);
402        assert_eq!(row[0].get_name(), "b");
403        // lossy utf8 decode should yield "ABC"
404        assert_eq!(row[0].to_string(), "chars(ABC)");
405        Ok(())
406    }
407
408    #[test]
409    fn test_csv_missing_column_error() -> AnyResult<()> {
410        use std::fs;
411        use std::io::Write;
412        let db = MemDB::instance();
413        db.table_create(EXAMPLE_CREATE_SQL)?;
414        // Create a temp csv with only 1 column per row
415        let mut path = std::env::temp_dir();
416        path.push("wp_knowledge_csv_missing_col.csv");
417        {
418            let mut f = fs::File::create(&path)?;
419            writeln!(f, "name")?;
420            writeln!(f, "only_one_col")?;
421        }
422        let res = db.table_load(
423            EXAMPLE_INSERT_SQL,
424            path.clone(),
425            vec![0, 1], // request 2 columns but csv has 1
426            10,
427        );
428        assert!(res.is_err());
429        let e = format!("{}", res.err().unwrap());
430        assert!(e.contains("line"));
431        assert!(e.contains("insufficient columns"));
432        // cleanup
433        let _ = fs::remove_file(&path);
434        Ok(())
435    }
436
437    #[test]
438    fn test_global_persistence_across_handles() -> AnyResult<()> {
439        // Create table via one global handle
440        {
441            let db1 = MemDB::global();
442            db1.execute("CREATE TABLE IF NOT EXISTS gtest (v TEXT)")?;
443            db1.execute("INSERT INTO gtest (v) VALUES ('ok')")?;
444        }
445        // Read via a new global handle; should see the same in-memory DB
446        {
447            let db2 = MemDB::global();
448            let rows = db2.query_row("SELECT v FROM gtest")?;
449            assert_eq!(rows.len(), 1);
450            assert_eq!(rows[0].to_string(), "chars(ok)");
451        }
452        Ok(())
453    }
454
455    #[test]
456    fn test_init_by_conf() -> AnyResult<()> {
457        let db = MemDB::global();
458        db.table_create(EXAMPLE_CREATE_SQL)?;
459        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
460        db.table_load(
461            EXAMPLE_INSERT_SQL,
462            PathBuf::from("src/mem/dict/example.csv"),
463            vec![0, 1],
464            100,
465        )?;
466        Ok(())
467    }
468
469    // V1 conf serde test removed
470
471    #[test]
472    fn test_alter_level() -> anyhow::Result<()> {
473        let db = MemDB::global();
474        // ensure clean state across global in-memory handle
475        let _ = db.execute("DROP TABLE IF EXISTS alert_cat_level");
476        db.table_create(
477            r#"CREATE TABLE IF NOT EXISTS alert_cat_level (
478                id   INTEGER PRIMARY KEY,
479                log_type TEXT NOT NULL,
480                level1_code TEXT NOT NULL,
481                level1_name TEXT NOT NULL,
482                level2_code TEXT NOT NULL,
483                level2_name TEXT NOT NULL,
484                original_code TEXT NOT NULL,
485                original_name TEXT NOT NULL
486            )"#,
487        )?;
488        let _ = db.table_clean("DELETE FROM alert_cat_level");
489        db.table_load(
490            r#"INSERT INTO alert_cat_level (log_type, level1_code, level1_name, level2_code, level2_name, original_code, original_name) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
491            PathBuf::from("src/mem/dict/event_cat_level.csv"),
492            vec![0, 1, 2, 3, 4, 5, 6],
493            2000,
494        )?;
495
496        let sql = "select level1_code from alert_cat_level where log_type = :log_type  and  original_code = :code ";
497        let result = db.query_row_params(
498            //"select level1_code from alert_cat_level where log_type = 'jowto_server_alert_log' and original_code = '00000002'",
499            sql,
500            &[(":log_type", "app_log"), (":code", "00000002")],
501        )?;
502        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
503
504        let px = [
505            SqlNamedParam(DataField::from_chars(":code", "00000002")),
506            SqlNamedParam(DataField::from_chars(":log_type", "app_log")),
507        ];
508
509        let p = px.to_params();
510        let result = db.query_row_params(sql, &p)?;
511        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
512
513        Ok(())
514    }
515
516    #[test]
517    fn test_tosql_bind_various_types() -> AnyResult<()> {
518        use chrono::NaiveDate;
519        use std::net::{IpAddr, Ipv4Addr};
520        use wp_model_core::model::types::value::ObjectValue;
521        use wp_model_core::model::{DateTimeValue, HexT};
522
523        let db = MemDB::instance();
524        db.execute("CREATE TABLE p (v)")?;
525
526        // Bool -> integer 1
527        {
528            let sql = "INSERT INTO p (v) VALUES (:v)";
529            let p = [SqlNamedParam(DataField::from_bool(":v", true))];
530            db.query_row_params(sql, &p.to_params())?;
531            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
532            assert!(matches!(row[0].get_value(), model::Value::Digit(1)));
533        }
534        // Null
535        {
536            let sql = "INSERT INTO p (v) VALUES (:v)";
537            let p = [SqlNamedParam(DataField::new(
538                model::DataType::default(),
539                ":v",
540                model::Value::Null,
541            ))];
542            db.query_row_params(sql, &p.to_params())?;
543            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
544            assert!(matches!(row[0].get_value(), model::Value::Null));
545        }
546        // Time -> text
547        {
548            let dt: DateTimeValue = NaiveDate::from_ymd_opt(2023, 1, 1)
549                .unwrap()
550                .and_hms_opt(0, 0, 0)
551                .unwrap();
552            let sql = "INSERT INTO p (v) VALUES (:v)";
553            let p = [SqlNamedParam(DataField::from_time(":v", dt))];
554            db.query_row_params(sql, &p.to_params())?;
555            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
556            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
557        }
558        // Hex -> text
559        {
560            let sql = "INSERT INTO p (v) VALUES (:v)";
561            let p = [SqlNamedParam(DataField::from_hex(":v", HexT(0xABCD)))];
562            db.query_row_params(sql, &p.to_params())?;
563            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
564            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
565        }
566        // IpAddr -> text
567        {
568            let sql = "INSERT INTO p (v) VALUES (:v)";
569            let p = [SqlNamedParam(DataField::from_ip(
570                ":v",
571                IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
572            ))];
573            db.query_row_params(sql, &p.to_params())?;
574            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
575            assert_eq!(row[0].to_string(), "chars(1.2.3.4)");
576        }
577        // Obj -> text (debug)
578        {
579            let mut obj = ObjectValue::new();
580            obj.insert("k".to_string(), DataField::from_chars("", "v"));
581            let sql = "INSERT INTO p (v) VALUES (:v)";
582            let p = [SqlNamedParam(DataField::from_obj(":v", obj))];
583            db.query_row_params(sql, &p.to_params())?;
584            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
585            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
586        }
587        // Array -> text (debug)
588        {
589            let arr = vec![DataField::from_chars("", "a"), DataField::from_digit("", 1)];
590            let sql = "INSERT INTO p (v) VALUES (:v)";
591            let p = [SqlNamedParam(DataField::from_arr(":v", arr))];
592            db.query_row_params(sql, &p.to_params())?;
593            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
594            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
595        }
596        Ok(())
597    }
598
599    #[test]
600    fn test_column_alias_names() -> AnyResult<()> {
601        let db = MemDB::instance();
602        // Create a simple one-shot table/view using alias
603        db.execute("CREATE TABLE ctest (a INTEGER, b TEXT)")?;
604        db.execute("INSERT INTO ctest (a,b) VALUES (42,'x')")?;
605        let row = db.query_row("SELECT a AS 'the number', b AS 'the text' FROM ctest LIMIT 1")?;
606        assert_eq!(row.len(), 2);
607        assert_eq!(row[0].get_name(), "the number");
608        assert_eq!(row[1].get_name(), "the text");
609        Ok(())
610    }
611
612    #[test]
613    fn test_concurrent_inserts() -> AnyResult<()> {
614        use std::thread;
615        let db = MemDB::global();
616        db.execute("CREATE TABLE IF NOT EXISTS concur (v INTEGER)")?;
617        let threads: Vec<_> = (0..4)
618            .map(|_| {
619                thread::spawn(|| {
620                    let dbt = MemDB::global();
621                    for _ in 0..10 {
622                        let _ = dbt.execute("INSERT INTO concur (v) VALUES (1)");
623                    }
624                })
625            })
626            .collect();
627        for t in threads {
628            t.join().unwrap();
629        }
630        let row = db.query_row("SELECT SUM(v) AS total FROM concur")?;
631        // total should be 40
632        assert_eq!(row[0].to_string(), "digit(40)");
633        Ok(())
634    }
635
636    #[test]
637    fn test_query_returns_all_rows() -> AnyResult<()> {
638        let db = MemDB::instance();
639        db.execute("CREATE TABLE multi (id INTEGER, name TEXT)")?;
640        let rows = db.query("SELECT * FROM multi")?;
641        assert!(rows.is_empty(), "empty table should return empty vec");
642        db.execute("INSERT INTO multi (id, name) VALUES (1, 'alice')")?;
643        db.execute("INSERT INTO multi (id, name) VALUES (2, 'bob')")?;
644        db.execute("INSERT INTO multi (id, name) VALUES (3, 'charlie')")?;
645
646        let rows = db.query("SELECT id, name FROM multi ORDER BY id")?;
647        assert_eq!(rows.len(), 3, "should return all 3 rows");
648
649        Ok(())
650    }
651
652    #[allow(dead_code)]
653    fn load_toml_conf<T: serde::de::DeserializeOwned>(path: &str) -> AnyResult<T> {
654        let mut f = File::open(path).with_context(|| format!("conf file not found: {}", path))?;
655        let mut buffer = Vec::with_capacity(10240);
656        f.read_to_end(&mut buffer).expect("read conf file error");
657        let conf_data = String::from_utf8(buffer)?;
658        let dict = EnvDict::new();
659        let conf: T = T::env_parse_toml(conf_data.as_str(), &dict)?;
660        Ok(conf)
661    }
662
663    #[allow(dead_code)]
664    fn export_toml_local<T: Serialize>(val: &T, path: &str) -> AnyResult<()> {
665        let data = toml::to_string_pretty(val)?;
666        if let Some(parent) = std::path::Path::new(path).parent() {
667            fs::create_dir_all(parent)?;
668        }
669        fs::write(path, data)?;
670        Ok(())
671    }
672}