Skip to main content

wp_knowledge/mem/
memdb.rs

1use crate::DBQuery;
2use crate::cache::CacheAble;
3use crate::mem::RowData;
4use crate::mem::stub::StubMDB;
5use csv::Reader;
6use enum_dispatch::enum_dispatch;
7use lazy_static::lazy_static;
8use orion_error::ErrorOwe;
9use orion_error::ErrorWith;
10use orion_error::ToStructError;
11use orion_error::UvsFrom;
12use r2d2_sqlite::SqliteConnectionManager;
13use rusqlite::OpenFlags;
14use rusqlite::Params;
15use rusqlite::ToSql;
16use rusqlite::types::ToSqlOutput;
17use rusqlite::types::Value;
18use std::path::PathBuf;
19use wp_error::KnowledgeReason;
20use wp_error::KnowledgeResult;
21use wp_log::debug_kdb;
22use wp_log::info_kdb;
23use wp_log::warn_kdb;
24use wp_model_core::model;
25use wp_model_core::model::DataField;
26
27use super::AnyResult;
28use super::SqlNamedParam;
29
30lazy_static! {
31    // Important: Use a single SQLite in-memory connection so schema/data persist across calls.
32    // r2d2 with `memory()` creates isolated DBs per connection; limit pool to size=1 to reuse
33    // the same connection and avoid "no such table" issues when different checkouts observe
34    // different ephemeral databases.
35    pub static ref MEM_SQLITE_INS: r2d2::Pool<SqliteConnectionManager> =
36        r2d2::Pool::builder()
37            .max_size(1)
38            .build(SqliteConnectionManager::memory())
39            .expect("init SQLite memory pool (size=1) failed");
40}
41
42#[derive(Debug, Clone)]
43pub struct MemDB {
44    conn: r2d2::Pool<SqliteConnectionManager>,
45}
46
47#[derive(Debug, Clone)]
48#[enum_dispatch(DBQuery)]
49pub enum MDBEnum {
50    Stub(StubMDB),
51    Use(MemDB),
52}
53impl Default for MDBEnum {
54    fn default() -> Self {
55        MDBEnum::Stub(StubMDB {})
56    }
57}
58impl MDBEnum {
59    pub fn global() -> Self {
60        MDBEnum::Use(MemDB::global())
61    }
62    pub fn load_test() -> AnyResult<()> {
63        MemDB::load_test()?;
64        Ok(())
65    }
66}
67
68pub fn cache_query<const N: usize, P: Params>(
69    db: &MDBEnum,
70    sql: &str,
71    c_params: &[DataField; N],
72    q_params: P,
73    cache: &mut impl CacheAble<DataField, RowData, N>,
74) -> RowData {
75    crate::cache_util::cache_query_impl(c_params, cache, || db.query_row_params(sql, q_params))
76}
77impl ToSql for SqlNamedParam {
78    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
79        match self.0.get_value() {
80            model::Value::Bool(v) => Ok(ToSqlOutput::Owned(Value::Integer(if *v { 1 } else { 0 }))),
81            model::Value::Null => Ok(ToSqlOutput::Owned(Value::Null)),
82            model::Value::Chars(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
83            model::Value::Symbol(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
84            model::Value::Time(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
85            model::Value::Digit(v) => Ok(ToSqlOutput::Owned(Value::Integer(*v))),
86            model::Value::Hex(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
87            model::Value::Float(v) => Ok(ToSqlOutput::Owned(Value::Real(*v))),
88            model::Value::IpNet(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
89            model::Value::IpAddr(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
90            model::Value::Ignore(_) => Ok(ToSqlOutput::Owned(Value::Null)),
91            model::Value::Obj(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
92            model::Value::Array(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
93            model::Value::Domain(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
94            model::Value::Url(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
95            model::Value::Email(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
96            model::Value::IdCard(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
97            model::Value::MobilePhone(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
98        }
99    }
100}
101
102impl DBQuery for MemDB {
103    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
104        let conn = self.conn.get().owe_res().want("get memdb connect")?;
105        let _ = crate::sqlite_ext::register_builtin(&conn);
106        super::query_util::query_cached(&conn, sql, [])
107    }
108
109    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
110        let conn = self.conn.get().owe_res().want("get memdb connect")?;
111        // Ensure SQLite UDFs are available on this connection (ip4_int/cidr4_* etc.)
112        let _ = crate::sqlite_ext::register_builtin(&conn);
113        super::query_util::query_first_row_cached(&conn, sql, [])
114    }
115
116    fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
117        debug_kdb!("[memdb] query_row_params: {}", sql);
118        let conn = self.conn.get().owe_res()?;
119        // Ensure SQLite UDFs are available on this connection
120        let _ = crate::sqlite_ext::register_builtin(&conn);
121        super::query_util::query_first_row_cached(&conn, sql, params)
122    }
123
124    fn query_cipher(&self, table: &str) -> KnowledgeResult<Vec<String>> {
125        let sql = format!("select value from {}", table);
126        let conn = self.conn.get().owe_res()?;
127        let mut stmt = conn.prepare(&sql).owe_rule()?;
128        let mut rows = stmt.query([]).owe_rule()?;
129        let mut result = Vec::new();
130        while let Some(row) = rows.next().owe_rule()? {
131            let x = row.get_ref(0).owe_rule()?;
132            if let rusqlite::types::ValueRef::Text(val) = x {
133                result.push(String::from_utf8(val.to_vec()).owe_conf()?);
134            }
135        }
136
137        Ok(result)
138    }
139
140    fn query_row_tdos<P: Params>(
141        &self,
142        _sql: &str,
143        _params: &[DataField; 2],
144    ) -> KnowledgeResult<RowData> {
145        //let data: [TDOParams; 2] = [TDOParams(&params[0]), TDOParams(&params[1])];
146        //params.iter().for_each(|x| data.push(TDOParams(x)));
147        //self.query_row_params(sql, data)
148        todo!();
149    }
150}
151impl MemDB {
152    pub fn instance() -> Self {
153        // Provide a single-connection pool for a consistent in-memory DB view
154        let manager = SqliteConnectionManager::memory();
155        let pool = r2d2::Pool::builder()
156            .max_size(1)
157            .build(manager)
158            .expect("init SQLite memory pool (size=1) failed");
159        Self { conn: pool }
160    }
161    /// Experimental: shared in-memory SQLite via URI with a pool size > 1.
162    /// Requires SQLite compiled with shared-cache support.
163    pub fn shared_pool(max_size: u32) -> AnyResult<Self> {
164        // Shared in-memory URI. Every connection to this URI shares same DB.
165        // Note: this depends on platform SQLite features.
166        let uri = "file:wp_knowledge_shm?mode=memory&cache=shared";
167        let manager = SqliteConnectionManager::file(uri).with_flags(
168            OpenFlags::SQLITE_OPEN_READ_WRITE
169                | OpenFlags::SQLITE_OPEN_CREATE
170                | OpenFlags::SQLITE_OPEN_URI,
171        );
172        let pool = r2d2::Pool::builder().max_size(max_size).build(manager)?;
173        Ok(Self { conn: pool })
174    }
175
176    /// Create a MemDB backed by a file path with custom flags and pool size.
177    pub fn new_file(
178        path: &str,
179        max_size: u32,
180        flags: rusqlite::OpenFlags,
181    ) -> KnowledgeResult<Self> {
182        let manager = r2d2_sqlite::SqliteConnectionManager::file(path).with_flags(flags);
183        let pool = r2d2::Pool::builder()
184            .max_size(max_size)
185            .build(manager)
186            .owe_res()?;
187        Ok(Self { conn: pool })
188    }
189    // V1 init_load_by_conf removed: use loader::build_authority_from_knowdb for V2
190
191    /// Execute a closure with a checked-out SQLite connection from the pool.
192    /// Useful for one-time prepared statements or specialized operations.
193    pub fn with_conn<T, F: FnOnce(&rusqlite::Connection) -> AnyResult<T>>(
194        &self,
195        f: F,
196    ) -> AnyResult<T> {
197        let pooled = self.conn.get()?;
198        let conn_ref: &rusqlite::Connection = &pooled;
199        f(conn_ref)
200    }
201
202    pub fn table_create(&self, sql: &str) -> anyhow::Result<()> {
203        let conn = self.conn.get()?;
204        conn.execute(sql, ())?;
205        debug_kdb!("crate table: {} ", sql);
206        Ok(())
207    }
208    pub fn execute(&self, sql: &str) -> anyhow::Result<()> {
209        let conn = self.conn.get()?;
210        conn.execute(sql, ())?;
211        debug_kdb!("execute: {} ", sql);
212        Ok(())
213    }
214
215    pub fn table_clean(&self, sql: &str) -> anyhow::Result<()> {
216        let conn = self.conn.get()?;
217        conn.execute(sql, ())?;
218        debug_kdb!("clean table: {} ", sql);
219        Ok(())
220    }
221
222    pub fn table_load(
223        &self,
224        sql: &str,
225        csv_path: PathBuf,
226        cols: Vec<usize>,
227        max: usize,
228    ) -> AnyResult<usize> {
229        info_kdb!("load table data in {}", csv_path.display());
230        if !csv_path.exists() {
231            warn_kdb!("{} not find, load knowdb failed", csv_path.display());
232            return Ok(0);
233        }
234        let mut rdr = Reader::from_path(&csv_path)?;
235        let conn = self.conn.get()?;
236        let mut load_cnt: usize = 0;
237        // Prepare once outside loop for performance
238        let mut stmt = conn.prepare(sql)?;
239        for (idx, result) in rdr.records().enumerate() {
240            if load_cnt >= max {
241                break;
242            }
243            let record = result.map_err(|e| {
244                anyhow::anyhow!("read csv record failed at line {}: {}", idx + 1, e)
245            })?;
246
247            // Basic bounds check to avoid panic on bad column indices
248            if let Some(max_col) = cols.iter().max()
249                && *max_col >= record.len()
250            {
251                return Err(anyhow::anyhow!(
252                    "csv has insufficient columns at line {}: need index {}, got {} columns",
253                    idx + 1,
254                    *max_col,
255                    record.len()
256                ));
257            }
258
259            // Unified dynamic binding (strict): any missing column is an error
260            let mut vec: Vec<&str> = Vec::with_capacity(cols.len());
261            for &ci in &cols {
262                let v = record
263                    .get(ci)
264                    .ok_or_else(|| anyhow::anyhow!("line {} col {} missing", idx + 1, ci))?;
265                vec.push(v);
266            }
267            let params = rusqlite::params_from_iter(vec);
268            stmt.execute(params)?;
269            load_cnt += 1;
270        }
271        info_kdb!("from {} load data cnt: {}", csv_path.display(), load_cnt);
272        Ok(load_cnt)
273    }
274
275    pub fn check_data(&self, table: &str, scope: (usize, usize)) -> KnowledgeResult<usize> {
276        let conn = self.conn.get().owe_res()?;
277        let count_sql = format!("select count(*) from {}", table);
278        let count: usize = conn
279            .query_row(count_sql.as_str(), (), |row| row.get(0))
280            .owe_rule()?;
281        if count >= scope.0 {
282            Ok(count)
283        } else {
284            Err(KnowledgeReason::from_conf()
285                .to_err()
286                .with_detail("table data less")
287                .with(("table", table))
288                .with(("count", count.to_string())))
289
290            /*
291            Err(anyhow!(
292                "data less! , load data count {} <= min {}",
293                count,
294                scope.0,
295            ))
296            */
297        }
298    }
299
300    pub fn global() -> Self {
301        Self {
302            conn: MEM_SQLITE_INS.clone(),
303        }
304    }
305    pub fn load_test() -> AnyResult<Self> {
306        let db = Self::global();
307        debug_kdb!("[memdb] load_test invoked");
308        db.table_create(EXAMPLE_CREATE_SQL)?;
309        // 通过 crate 根目录定位测试字典,避免 cwd 影响
310        let csv = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/mem/dict/example.csv");
311        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
312        db.table_load(EXAMPLE_INSERT_SQL, csv, vec![0, 1], 100)?;
313        // quick sanity check
314        if let Ok(cnt) = db.check_data("example", (1, usize::MAX)) {
315            debug_kdb!("[memdb] example rows loaded = {}", cnt);
316        }
317        Ok(db)
318    }
319}
320pub const EXAMPLE_CREATE_SQL: &str = r#"CREATE TABLE IF NOT EXISTS example (
321    id   INTEGER PRIMARY KEY,
322    name TEXT NOT NULL,
323    pinying TEXT NOT NULL
324    )"#;
325pub const EXAMPLE_CLEAN_SQL: &str = "DELETE FROM example";
326pub const EXAMPLE_INSERT_SQL: &str = r#"INSERT INTO example(name,pinying) VALUES (?1, ?2 ) "#;
327
328#[cfg(test)]
329mod tests {
330
331    use std::{fs::File, io::Read};
332
333    use super::*;
334    // V1 TableConf removed
335    use crate::mem::ToSqlParams;
336    use anyhow::Context;
337    use orion_conf::EnvTomlLoad;
338    use orion_variate::EnvDict;
339    use serde::Serialize;
340    use std::fs;
341    use wp_data_fmt::{Csv, RecordFormatter};
342
343    #[test]
344    fn test_load() -> AnyResult<()> {
345        let db = MemDB::instance();
346        db.table_create(EXAMPLE_CREATE_SQL)?;
347        let loaded = db.table_load(
348            EXAMPLE_INSERT_SQL,
349            PathBuf::from("src/mem/dict/example.csv"),
350            vec![0, 1],
351            100,
352        )?;
353        assert_eq!(loaded, 10);
354        let fmt = Csv::default();
355        let tdos = db.query_row("select * from example;")?;
356        for obj in tdos {
357            println!("{}", fmt.fmt_field(&obj.into()));
358        }
359        Ok(())
360    }
361
362    #[test]
363    fn test_csv_off_by_one() -> AnyResult<()> {
364        let db = MemDB::instance();
365        db.table_create(EXAMPLE_CREATE_SQL)?;
366        // Expect only 1 row loaded when max=1 (no off-by-one)
367        let loaded = db.table_load(
368            EXAMPLE_INSERT_SQL,
369            PathBuf::from("src/mem/dict/example.csv"),
370            vec![0, 1],
371            1,
372        )?;
373        assert_eq!(loaded, 1);
374        Ok(())
375    }
376
377    #[test]
378    fn test_row_null_mapping() -> AnyResult<()> {
379        let db = MemDB::instance();
380        db.execute("CREATE TABLE tnull (v TEXT)")?;
381        db.execute("INSERT INTO tnull (v) VALUES (NULL)")?;
382        let row = db.query_row("SELECT v FROM tnull")?;
383        assert_eq!(row.len(), 1);
384        assert_eq!(row[0].get_name(), "v");
385        // Ensure NULL becomes a Value::Null rather than panic
386        assert!(matches!(row[0].get_value(), model::Value::Null));
387        Ok(())
388    }
389
390    #[test]
391    fn test_row_blob_mapping() -> AnyResult<()> {
392        let db = MemDB::instance();
393        db.execute("CREATE TABLE tblob (b BLOB)")?;
394        // Insert ASCII 'ABC' as blob
395        db.execute("INSERT INTO tblob (b) VALUES (X'414243')")?;
396        let row = db.query_row("SELECT b FROM tblob")?;
397        assert_eq!(row.len(), 1);
398        assert_eq!(row[0].get_name(), "b");
399        // lossy utf8 decode should yield "ABC"
400        assert_eq!(row[0].to_string(), "chars(ABC)");
401        Ok(())
402    }
403
404    #[test]
405    fn test_csv_missing_column_error() -> AnyResult<()> {
406        use std::fs;
407        use std::io::Write;
408        let db = MemDB::instance();
409        db.table_create(EXAMPLE_CREATE_SQL)?;
410        // Create a temp csv with only 1 column per row
411        let mut path = std::env::temp_dir();
412        path.push("wp_knowledge_csv_missing_col.csv");
413        {
414            let mut f = fs::File::create(&path)?;
415            writeln!(f, "name")?;
416            writeln!(f, "only_one_col")?;
417        }
418        let res = db.table_load(
419            EXAMPLE_INSERT_SQL,
420            path.clone(),
421            vec![0, 1], // request 2 columns but csv has 1
422            10,
423        );
424        assert!(res.is_err());
425        let e = format!("{}", res.err().unwrap());
426        assert!(e.contains("line"));
427        assert!(e.contains("insufficient columns"));
428        // cleanup
429        let _ = fs::remove_file(&path);
430        Ok(())
431    }
432
433    #[test]
434    fn test_global_persistence_across_handles() -> AnyResult<()> {
435        // Create table via one global handle
436        {
437            let db1 = MemDB::global();
438            db1.execute("CREATE TABLE IF NOT EXISTS gtest (v TEXT)")?;
439            db1.execute("INSERT INTO gtest (v) VALUES ('ok')")?;
440        }
441        // Read via a new global handle; should see the same in-memory DB
442        {
443            let db2 = MemDB::global();
444            let rows = db2.query_row("SELECT v FROM gtest")?;
445            assert_eq!(rows.len(), 1);
446            assert_eq!(rows[0].to_string(), "chars(ok)");
447        }
448        Ok(())
449    }
450
451    #[test]
452    fn test_init_by_conf() -> AnyResult<()> {
453        let db = MemDB::global();
454        db.table_create(EXAMPLE_CREATE_SQL)?;
455        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
456        db.table_load(
457            EXAMPLE_INSERT_SQL,
458            PathBuf::from("src/mem/dict/example.csv"),
459            vec![0, 1],
460            100,
461        )?;
462        Ok(())
463    }
464
465    // V1 conf serde test removed
466
467    #[test]
468    fn test_alter_level() -> anyhow::Result<()> {
469        let db = MemDB::global();
470        // ensure clean state across global in-memory handle
471        let _ = db.execute("DROP TABLE IF EXISTS alert_cat_level");
472        db.table_create(
473            r#"CREATE TABLE IF NOT EXISTS alert_cat_level (
474                id   INTEGER PRIMARY KEY,
475                log_type TEXT NOT NULL,
476                level1_code TEXT NOT NULL,
477                level1_name TEXT NOT NULL,
478                level2_code TEXT NOT NULL,
479                level2_name TEXT NOT NULL,
480                original_code TEXT NOT NULL,
481                original_name TEXT NOT NULL
482            )"#,
483        )?;
484        let _ = db.table_clean("DELETE FROM alert_cat_level");
485        db.table_load(
486            r#"INSERT INTO alert_cat_level (log_type, level1_code, level1_name, level2_code, level2_name, original_code, original_name) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
487            PathBuf::from("src/mem/dict/event_cat_level.csv"),
488            vec![0, 1, 2, 3, 4, 5, 6],
489            2000,
490        )?;
491
492        let sql = "select level1_code from alert_cat_level where log_type = :log_type  and  original_code = :code ";
493        let result = db.query_row_params(
494            //"select level1_code from alert_cat_level where log_type = 'jowto_server_alert_log' and original_code = '00000002'",
495            sql,
496            &[(":log_type", "app_log"), (":code", "00000002")],
497        )?;
498        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
499
500        let px = [
501            SqlNamedParam(DataField::from_chars(":code", "00000002")),
502            SqlNamedParam(DataField::from_chars(":log_type", "app_log")),
503        ];
504
505        let p = px.to_params();
506        let result = db.query_row_params(sql, &p)?;
507        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
508
509        Ok(())
510    }
511
512    #[test]
513    fn test_tosql_bind_various_types() -> AnyResult<()> {
514        use chrono::NaiveDate;
515        use std::net::{IpAddr, Ipv4Addr};
516        use wp_model_core::model::types::value::ObjectValue;
517        use wp_model_core::model::{DateTimeValue, HexT};
518
519        let db = MemDB::instance();
520        db.execute("CREATE TABLE p (v)")?;
521
522        // Bool -> integer 1
523        {
524            let sql = "INSERT INTO p (v) VALUES (:v)";
525            let p = [SqlNamedParam(DataField::from_bool(":v", true))];
526            db.query_row_params(sql, &p.to_params())?;
527            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
528            assert!(matches!(row[0].get_value(), model::Value::Digit(1)));
529        }
530        // Null
531        {
532            let sql = "INSERT INTO p (v) VALUES (:v)";
533            let p = [SqlNamedParam(DataField::new(
534                model::DataType::default(),
535                ":v",
536                model::Value::Null,
537            ))];
538            db.query_row_params(sql, &p.to_params())?;
539            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
540            assert!(matches!(row[0].get_value(), model::Value::Null));
541        }
542        // Time -> text
543        {
544            let dt: DateTimeValue = NaiveDate::from_ymd_opt(2023, 1, 1)
545                .unwrap()
546                .and_hms_opt(0, 0, 0)
547                .unwrap();
548            let sql = "INSERT INTO p (v) VALUES (:v)";
549            let p = [SqlNamedParam(DataField::from_time(":v", dt))];
550            db.query_row_params(sql, &p.to_params())?;
551            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
552            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
553        }
554        // Hex -> text
555        {
556            let sql = "INSERT INTO p (v) VALUES (:v)";
557            let p = [SqlNamedParam(DataField::from_hex(":v", HexT(0xABCD)))];
558            db.query_row_params(sql, &p.to_params())?;
559            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
560            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
561        }
562        // IpAddr -> text
563        {
564            let sql = "INSERT INTO p (v) VALUES (:v)";
565            let p = [SqlNamedParam(DataField::from_ip(
566                ":v",
567                IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
568            ))];
569            db.query_row_params(sql, &p.to_params())?;
570            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
571            assert_eq!(row[0].to_string(), "chars(1.2.3.4)");
572        }
573        // Obj -> text (debug)
574        {
575            let mut obj = ObjectValue::new();
576            obj.insert("k".to_string(), DataField::from_chars("", "v"));
577            let sql = "INSERT INTO p (v) VALUES (:v)";
578            let p = [SqlNamedParam(DataField::from_obj(":v", obj))];
579            db.query_row_params(sql, &p.to_params())?;
580            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
581            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
582        }
583        // Array -> text (debug)
584        {
585            let arr = vec![DataField::from_chars("", "a"), DataField::from_digit("", 1)];
586            let sql = "INSERT INTO p (v) VALUES (:v)";
587            let p = [SqlNamedParam(DataField::from_arr(":v", arr))];
588            db.query_row_params(sql, &p.to_params())?;
589            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
590            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
591        }
592        Ok(())
593    }
594
595    #[test]
596    fn test_column_alias_names() -> AnyResult<()> {
597        let db = MemDB::instance();
598        // Create a simple one-shot table/view using alias
599        db.execute("CREATE TABLE ctest (a INTEGER, b TEXT)")?;
600        db.execute("INSERT INTO ctest (a,b) VALUES (42,'x')")?;
601        let row = db.query_row("SELECT a AS 'the number', b AS 'the text' FROM ctest LIMIT 1")?;
602        assert_eq!(row.len(), 2);
603        assert_eq!(row[0].get_name(), "the number");
604        assert_eq!(row[1].get_name(), "the text");
605        Ok(())
606    }
607
608    #[test]
609    fn test_query_cipher_basic() -> AnyResult<()> {
610        let db = MemDB::instance();
611        db.execute("CREATE TABLE cipher (value TEXT)")?;
612        db.execute("INSERT INTO cipher (value) VALUES ('A')")?;
613        db.execute("INSERT INTO cipher (value) VALUES ('B')")?;
614        let vals = db.query_cipher("cipher")?;
615        assert!(vals.contains(&"A".to_string()));
616        assert!(vals.contains(&"B".to_string()));
617        Ok(())
618    }
619
620    #[test]
621    fn test_concurrent_inserts() -> AnyResult<()> {
622        use std::thread;
623        let db = MemDB::global();
624        db.execute("CREATE TABLE IF NOT EXISTS concur (v INTEGER)")?;
625        let threads: Vec<_> = (0..4)
626            .map(|_| {
627                thread::spawn(|| {
628                    let dbt = MemDB::global();
629                    for _ in 0..10 {
630                        let _ = dbt.execute("INSERT INTO concur (v) VALUES (1)");
631                    }
632                })
633            })
634            .collect();
635        for t in threads {
636            t.join().unwrap();
637        }
638        let row = db.query_row("SELECT SUM(v) AS total FROM concur")?;
639        // total should be 40
640        assert_eq!(row[0].to_string(), "digit(40)");
641        Ok(())
642    }
643
644    #[test]
645    fn test_query_returns_all_rows() -> AnyResult<()> {
646        let db = MemDB::instance();
647        db.execute("CREATE TABLE multi (id INTEGER, name TEXT)")?;
648        let rows = db.query("SELECT * FROM multi")?;
649        assert!(rows.is_empty(), "empty table should return empty vec");
650        db.execute("INSERT INTO multi (id, name) VALUES (1, 'alice')")?;
651        db.execute("INSERT INTO multi (id, name) VALUES (2, 'bob')")?;
652        db.execute("INSERT INTO multi (id, name) VALUES (3, 'charlie')")?;
653
654        let rows = db.query("SELECT id, name FROM multi ORDER BY id")?;
655        assert_eq!(rows.len(), 3, "should return all 3 rows");
656
657        Ok(())
658    }
659
660    #[allow(dead_code)]
661    fn load_toml_conf<T: serde::de::DeserializeOwned>(path: &str) -> AnyResult<T> {
662        let mut f = File::open(path).with_context(|| format!("conf file not found: {}", path))?;
663        let mut buffer = Vec::with_capacity(10240);
664        f.read_to_end(&mut buffer).expect("read conf file error");
665        let conf_data = String::from_utf8(buffer)?;
666        let dict = EnvDict::new();
667        let conf: T = T::env_parse_toml(conf_data.as_str(), &dict)?;
668        Ok(conf)
669    }
670
671    #[allow(dead_code)]
672    fn export_toml_local<T: Serialize>(val: &T, path: &str) -> AnyResult<()> {
673        let data = toml::to_string_pretty(val)?;
674        if let Some(parent) = std::path::Path::new(path).parent() {
675            fs::create_dir_all(parent)?;
676        }
677        fs::write(path, data)?;
678        Ok(())
679    }
680}