Skip to main content

wp_knowledge/mem/
memdb.rs

1use crate::DBQuery;
2use crate::cache::CacheAble;
3use crate::error::{KnowReason, KnowledgeResult};
4use crate::mem::RowData;
5use crate::mem::stub::StubMDB;
6use csv::Reader;
7use enum_dispatch::enum_dispatch;
8use lazy_static::lazy_static;
9use orion_error::conversion::ErrorWith;
10use orion_error::conversion::{SourceRawErr, ToStructError};
11use r2d2_sqlite::SqliteConnectionManager;
12use rusqlite::OpenFlags;
13use rusqlite::Params;
14use rusqlite::ToSql;
15use rusqlite::types::ToSqlOutput;
16use rusqlite::types::Value;
17use std::path::PathBuf;
18use wp_log::debug_kdb;
19use wp_log::info_kdb;
20use wp_log::warn_kdb;
21use wp_model_core::model;
22use wp_model_core::model::DataField;
23
24use super::SqlNamedParam;
25use crate::loader::ProviderKind;
26use crate::runtime::MetadataCacheScope;
27
28lazy_static! {
29    // Important: Use a single SQLite in-memory connection so schema/data persist across calls.
30    // r2d2 with `memory()` creates isolated DBs per connection; limit pool to size=1 to reuse
31    // the same connection and avoid "no such table" issues when different checkouts observe
32    // different ephemeral databases.
33    pub static ref MEM_SQLITE_INS: r2d2::Pool<SqliteConnectionManager> =
34        r2d2::Pool::builder()
35            .max_size(1)
36            .build(SqliteConnectionManager::memory())
37            .expect("init SQLite memory pool (size=1) failed");
38}
39
40#[derive(Debug, Clone)]
41pub struct MemDB {
42    conn: r2d2::Pool<SqliteConnectionManager>,
43}
44
45#[derive(Debug, Clone)]
46#[enum_dispatch(DBQuery)]
47pub enum MDBEnum {
48    Stub(StubMDB),
49    Use(MemDB),
50}
51impl Default for MDBEnum {
52    fn default() -> Self {
53        MDBEnum::Stub(StubMDB {})
54    }
55}
56impl MDBEnum {
57    pub fn global() -> Self {
58        MDBEnum::Use(MemDB::global())
59    }
60    pub fn load_test() -> KnowledgeResult<()> {
61        MemDB::load_test()?;
62        Ok(())
63    }
64}
65
66pub fn cache_query<const N: usize, P: Params>(
67    db: &MDBEnum,
68    sql: &str,
69    c_params: &[DataField; N],
70    q_params: P,
71    cache: &mut impl CacheAble<DataField, RowData, N>,
72) -> RowData {
73    crate::cache_util::cache_query_impl(c_params, cache, || db.query_row_params(sql, q_params))
74}
75impl ToSql for SqlNamedParam {
76    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
77        match self.0.get_value() {
78            model::Value::Bool(v) => Ok(ToSqlOutput::Owned(Value::Integer(if *v { 1 } else { 0 }))),
79            model::Value::Null => Ok(ToSqlOutput::Owned(Value::Null)),
80            model::Value::Chars(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
81            model::Value::Symbol(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
82            model::Value::Time(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
83            model::Value::Digit(v) => Ok(ToSqlOutput::Owned(Value::Integer(*v))),
84            model::Value::Hex(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
85            model::Value::Float(v) => Ok(ToSqlOutput::Owned(Value::Real(*v))),
86            model::Value::IpNet(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
87            model::Value::IpAddr(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
88            model::Value::Ignore(_) => Ok(ToSqlOutput::Owned(Value::Null)),
89            model::Value::Obj(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
90            model::Value::Array(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
91            model::Value::Domain(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
92            model::Value::Url(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
93            model::Value::Email(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
94            model::Value::IdCard(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
95            model::Value::MobilePhone(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
96        }
97    }
98}
99
100impl DBQuery for MemDB {
101    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
102        let conn = self
103            .conn
104            .get()
105            .source_raw_err(KnowReason::from_res(), "source error")
106            .doing("get memdb connect")?;
107        let _ = crate::sqlite_ext::register_builtin(&conn);
108        super::query_util::query_cached(&conn, sql, [])
109    }
110
111    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
112        let conn = self
113            .conn
114            .get()
115            .source_raw_err(KnowReason::from_res(), "source error")
116            .doing("get memdb connect")?;
117        // Ensure SQLite UDFs are available on this connection (ip4_int/cidr4_* etc.)
118        let _ = crate::sqlite_ext::register_builtin(&conn);
119        super::query_util::query_first_row_cached(&conn, sql, [])
120    }
121
122    fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
123        debug_kdb!("[memdb] query_row_params: {}", sql);
124        let conn = self
125            .conn
126            .get()
127            .source_raw_err(KnowReason::from_res(), "source error")?;
128        // Ensure SQLite UDFs are available on this connection
129        let _ = crate::sqlite_ext::register_builtin(&conn);
130        super::query_util::query_first_row_cached(&conn, sql, params)
131    }
132
133    fn query_row_tdos<P: Params>(
134        &self,
135        _sql: &str,
136        _params: &[DataField; 2],
137    ) -> KnowledgeResult<RowData> {
138        //let data: [TDOParams; 2] = [TDOParams(&params[0]), TDOParams(&params[1])];
139        //params.iter().for_each(|x| data.push(TDOParams(x)));
140        //self.query_row_params(sql, data)
141        todo!();
142    }
143}
144impl MemDB {
145    pub fn query_with_scope(
146        &self,
147        scope: &MetadataCacheScope,
148        sql: &str,
149    ) -> KnowledgeResult<Vec<RowData>> {
150        let conn = self
151            .conn
152            .get()
153            .source_raw_err(KnowReason::from_res(), "source error")
154            .doing("get memdb connect")?;
155        let _ = crate::sqlite_ext::register_builtin(&conn);
156        super::query_util::query_cached_with_scope(
157            &conn,
158            scope,
159            Some(ProviderKind::SqliteAuthority),
160            sql,
161            [],
162        )
163    }
164
165    pub fn query_row_with_scope(
166        &self,
167        scope: &MetadataCacheScope,
168        sql: &str,
169    ) -> KnowledgeResult<RowData> {
170        let conn = self
171            .conn
172            .get()
173            .source_raw_err(KnowReason::from_res(), "source error")
174            .doing("get memdb connect")?;
175        let _ = crate::sqlite_ext::register_builtin(&conn);
176        super::query_util::query_first_row_cached_with_scope(
177            &conn,
178            scope,
179            Some(ProviderKind::SqliteAuthority),
180            sql,
181            [],
182        )
183    }
184
185    pub fn query_fields_with_scope(
186        &self,
187        scope: &MetadataCacheScope,
188        sql: &str,
189        params: &[DataField],
190    ) -> KnowledgeResult<Vec<RowData>> {
191        let conn = self
192            .conn
193            .get()
194            .source_raw_err(KnowReason::from_res(), "source error")
195            .doing("get memdb connect")?;
196        let _ = crate::sqlite_ext::register_builtin(&conn);
197        let named_params = params
198            .iter()
199            .cloned()
200            .map(SqlNamedParam)
201            .collect::<Vec<_>>();
202        let refs: Vec<(&str, &dyn ToSql)> = named_params
203            .iter()
204            .map(|param| (param.0.get_name(), param as &dyn ToSql))
205            .collect();
206        super::query_util::query_cached_with_scope(
207            &conn,
208            scope,
209            Some(ProviderKind::SqliteAuthority),
210            sql,
211            refs.as_slice(),
212        )
213    }
214
215    pub fn query_named_fields_with_scope(
216        &self,
217        scope: &MetadataCacheScope,
218        sql: &str,
219        params: &[DataField],
220    ) -> KnowledgeResult<RowData> {
221        self.query_fields_with_scope(scope, sql, params)
222            .map(|rows| rows.into_iter().next().unwrap_or_default())
223    }
224
225    pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
226        let conn = self
227            .conn
228            .get()
229            .source_raw_err(KnowReason::from_res(), "source error")
230            .doing("get memdb connect")?;
231        let _ = crate::sqlite_ext::register_builtin(&conn);
232        let named_params = params
233            .iter()
234            .cloned()
235            .map(SqlNamedParam)
236            .collect::<Vec<_>>();
237        let refs: Vec<(&str, &dyn ToSql)> = named_params
238            .iter()
239            .map(|param| (param.0.get_name(), param as &dyn ToSql))
240            .collect();
241        super::query_util::query_cached(&conn, sql, refs.as_slice())
242    }
243
244    pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
245        self.query_fields(sql, params)
246            .map(|rows| rows.into_iter().next().unwrap_or_default())
247    }
248
249    pub fn instance() -> Self {
250        // Provide a single-connection pool for a consistent in-memory DB view
251        let manager = SqliteConnectionManager::memory();
252        let pool = r2d2::Pool::builder()
253            .max_size(1)
254            .build(manager)
255            .expect("init SQLite memory pool (size=1) failed");
256        Self { conn: pool }
257    }
258    /// Experimental: shared in-memory SQLite via URI with a pool size > 1.
259    /// Requires SQLite compiled with shared-cache support.
260    pub fn shared_pool(max_size: u32) -> KnowledgeResult<Self> {
261        // Shared in-memory URI. Every connection to this URI shares same DB.
262        // Note: this depends on platform SQLite features.
263        let uri = "file:wp_knowledge_shm?mode=memory&cache=shared";
264        let manager = SqliteConnectionManager::file(uri).with_flags(
265            OpenFlags::SQLITE_OPEN_READ_WRITE
266                | OpenFlags::SQLITE_OPEN_CREATE
267                | OpenFlags::SQLITE_OPEN_URI,
268        );
269        let pool = r2d2::Pool::builder()
270            .max_size(max_size)
271            .build(manager)
272            .source_raw_err(KnowReason::from_res(), "source error")?;
273        Ok(Self { conn: pool })
274    }
275
276    /// Create a MemDB backed by a file path with custom flags and pool size.
277    pub fn new_file(
278        path: &str,
279        max_size: u32,
280        flags: rusqlite::OpenFlags,
281    ) -> KnowledgeResult<Self> {
282        let manager = r2d2_sqlite::SqliteConnectionManager::file(path).with_flags(flags);
283        let pool = r2d2::Pool::builder()
284            .max_size(max_size)
285            .build(manager)
286            .source_raw_err(KnowReason::from_res(), "source error")?;
287        Ok(Self { conn: pool })
288    }
289    // V1 init_load_by_conf removed: use loader::build_authority_from_knowdb for V2
290
291    /// Execute a closure with a checked-out SQLite connection from the pool.
292    /// Useful for one-time prepared statements or specialized operations.
293    pub fn with_conn<T, F: FnOnce(&rusqlite::Connection) -> anyhow::Result<T>>(
294        &self,
295        f: F,
296    ) -> anyhow::Result<T> {
297        let pooled = self.conn.get()?;
298        let conn_ref: &rusqlite::Connection = &pooled;
299        f(conn_ref)
300    }
301
302    pub fn table_create(&self, sql: &str) -> KnowledgeResult<()> {
303        let conn = self
304            .conn
305            .get()
306            .source_raw_err(KnowReason::from_res(), "source error")?;
307        conn.execute(sql, ())
308            .source_raw_err(KnowReason::from_rule(), "source error")?;
309        debug_kdb!("crate table: {} ", sql);
310        Ok(())
311    }
312    pub fn execute(&self, sql: &str) -> KnowledgeResult<()> {
313        let conn = self
314            .conn
315            .get()
316            .source_raw_err(KnowReason::from_res(), "source error")?;
317        conn.execute(sql, ())
318            .source_raw_err(KnowReason::from_rule(), "source error")?;
319        debug_kdb!("execute: {} ", sql);
320        Ok(())
321    }
322
323    pub fn table_clean(&self, sql: &str) -> KnowledgeResult<()> {
324        let conn = self
325            .conn
326            .get()
327            .source_raw_err(KnowReason::from_res(), "source error")?;
328        conn.execute(sql, ())
329            .source_raw_err(KnowReason::from_rule(), "source error")?;
330        debug_kdb!("clean table: {} ", sql);
331        Ok(())
332    }
333
334    pub fn table_load(
335        &self,
336        sql: &str,
337        csv_path: PathBuf,
338        cols: Vec<usize>,
339        max: usize,
340    ) -> KnowledgeResult<usize> {
341        info_kdb!("load table data in {}", csv_path.display());
342        if !csv_path.exists() {
343            warn_kdb!("{} not find, load knowdb failed", csv_path.display());
344            return Ok(0);
345        }
346        let mut rdr =
347            Reader::from_path(&csv_path).source_raw_err(KnowReason::from_res(), "source error")?;
348        let conn = self
349            .conn
350            .get()
351            .source_raw_err(KnowReason::from_res(), "source error")?;
352        let mut load_cnt: usize = 0;
353        // Prepare once outside loop for performance
354        let mut stmt = conn
355            .prepare(sql)
356            .source_raw_err(KnowReason::from_rule(), "source error")?;
357        for (idx, result) in rdr.records().enumerate() {
358            if load_cnt >= max {
359                break;
360            }
361            let record = result.map_err(|e| {
362                KnowReason::from_rule().to_err().with_detail(format!(
363                    "read csv record failed at line {}: {}",
364                    idx + 1,
365                    e
366                ))
367            })?;
368
369            // Basic bounds check to avoid panic on bad column indices
370            if let Some(max_col) = cols.iter().max()
371                && *max_col >= record.len()
372            {
373                return Err(KnowReason::from_rule().to_err().with_detail(format!(
374                    "csv has insufficient columns at line {}: need index {}, got {} columns",
375                    idx + 1,
376                    *max_col,
377                    record.len()
378                )));
379            }
380
381            // Unified dynamic binding (strict): any missing column is an error
382            let mut vec: Vec<&str> = Vec::with_capacity(cols.len());
383            for &ci in &cols {
384                let v = record.get(ci).ok_or_else(|| {
385                    KnowReason::from_rule().to_err().with_detail(format!(
386                        "line {} col {} missing",
387                        idx + 1,
388                        ci
389                    ))
390                })?;
391                vec.push(v);
392            }
393            let params = rusqlite::params_from_iter(vec);
394            stmt.execute(params)
395                .source_raw_err(KnowReason::from_rule(), "source error")?;
396            load_cnt += 1;
397        }
398        info_kdb!("from {} load data cnt: {}", csv_path.display(), load_cnt);
399        Ok(load_cnt)
400    }
401
402    pub fn check_data(&self, table: &str, scope: (usize, usize)) -> KnowledgeResult<usize> {
403        let conn = self
404            .conn
405            .get()
406            .source_raw_err(KnowReason::from_res(), "source error")?;
407        let count_sql = format!("select count(*) from {}", table);
408        let count: usize = conn
409            .query_row(count_sql.as_str(), (), |row| row.get(0))
410            .source_raw_err(KnowReason::from_rule(), "source error")?;
411        if count >= scope.0 {
412            Ok(count)
413        } else {
414            Err(KnowReason::from_conf()
415                .to_err()
416                .with_detail("table data less")
417                .with_context(("table", table))
418                .with_context(("count", count.to_string())))
419
420            /*
421            Err(anyhow!(
422                "data less! , load data count {} <= min {}",
423                count,
424                scope.0,
425            ))
426            */
427        }
428    }
429
430    pub fn global() -> Self {
431        Self {
432            conn: MEM_SQLITE_INS.clone(),
433        }
434    }
435    pub fn load_test() -> KnowledgeResult<Self> {
436        let db = Self::global();
437        debug_kdb!("[memdb] load_test invoked");
438        db.table_create(EXAMPLE_CREATE_SQL)?;
439        // 通过 crate 根目录定位测试字典,避免 cwd 影响
440        let csv = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/mem/dict/example.csv");
441        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
442        db.table_load(EXAMPLE_INSERT_SQL, csv, vec![0, 1], 100)?;
443        // quick sanity check
444        if let Ok(cnt) = db.check_data("example", (1, usize::MAX)) {
445            debug_kdb!("[memdb] example rows loaded = {}", cnt);
446        }
447        Ok(db)
448    }
449}
450pub const EXAMPLE_CREATE_SQL: &str = r#"CREATE TABLE IF NOT EXISTS example (
451    id   INTEGER PRIMARY KEY,
452    name TEXT NOT NULL,
453    pinying TEXT NOT NULL
454    )"#;
455pub const EXAMPLE_CLEAN_SQL: &str = "DELETE FROM example";
456pub const EXAMPLE_INSERT_SQL: &str = r#"INSERT INTO example(name,pinying) VALUES (?1, ?2 ) "#;
457
458#[cfg(test)]
459mod tests {
460
461    use std::{fs::File, io::Read};
462
463    use super::*;
464    // V1 TableConf removed
465    use crate::error::{KnowReason, KnowledgeResult};
466    use crate::mem::ToSqlParams;
467
468    use orion_conf::EnvTomlLoad;
469    use orion_error::conversion::SourceErr;
470    use orion_variate::EnvDict;
471    use serde::Serialize;
472    use std::fs;
473    use wp_data_fmt::{Csv, RecordFormatter};
474
475    #[test]
476    fn test_load() -> KnowledgeResult<()> {
477        let db = MemDB::instance();
478        db.table_create(EXAMPLE_CREATE_SQL)?;
479        let loaded = db.table_load(
480            EXAMPLE_INSERT_SQL,
481            PathBuf::from("src/mem/dict/example.csv"),
482            vec![0, 1],
483            100,
484        )?;
485        assert_eq!(loaded, 10);
486        let fmt = Csv::default();
487        let tdos = db.query_row("select * from example;")?;
488        for obj in tdos {
489            println!("{}", fmt.fmt_field(&obj.into()));
490        }
491        Ok(())
492    }
493
494    #[test]
495    fn test_csv_off_by_one() -> KnowledgeResult<()> {
496        let db = MemDB::instance();
497        db.table_create(EXAMPLE_CREATE_SQL)?;
498        // Expect only 1 row loaded when max=1 (no off-by-one)
499        let loaded = db.table_load(
500            EXAMPLE_INSERT_SQL,
501            PathBuf::from("src/mem/dict/example.csv"),
502            vec![0, 1],
503            1,
504        )?;
505        assert_eq!(loaded, 1);
506        Ok(())
507    }
508
509    #[test]
510    fn test_row_null_mapping() -> KnowledgeResult<()> {
511        let db = MemDB::instance();
512        db.execute("CREATE TABLE tnull (v TEXT)")?;
513        db.execute("INSERT INTO tnull (v) VALUES (NULL)")?;
514        let row = db.query_row("SELECT v FROM tnull")?;
515        assert_eq!(row.len(), 1);
516        assert_eq!(row[0].get_name(), "v");
517        // Ensure NULL becomes a Value::Null rather than panic
518        assert!(matches!(row[0].get_value(), model::Value::Null));
519        Ok(())
520    }
521
522    #[test]
523    fn test_row_blob_mapping() -> KnowledgeResult<()> {
524        let db = MemDB::instance();
525        db.execute("CREATE TABLE tblob (b BLOB)")?;
526        // Insert ASCII 'ABC' as blob
527        db.execute("INSERT INTO tblob (b) VALUES (X'414243')")?;
528        let row = db.query_row("SELECT b FROM tblob")?;
529        assert_eq!(row.len(), 1);
530        assert_eq!(row[0].get_name(), "b");
531        // lossy utf8 decode should yield "ABC"
532        assert_eq!(row[0].to_string(), "chars(ABC)");
533        Ok(())
534    }
535
536    #[test]
537    fn test_csv_missing_column_error() -> KnowledgeResult<()> {
538        use std::fs;
539        use std::io::Write;
540        let db = MemDB::instance();
541        db.table_create(EXAMPLE_CREATE_SQL)?;
542        // Create a temp csv with only 1 column per row
543        let mut path = std::env::temp_dir();
544        path.push("wp_knowledge_csv_missing_col.csv");
545        {
546            let mut f =
547                fs::File::create(&path).source_raw_err(KnowReason::from_res(), "source error")?;
548            writeln!(f, "name").source_raw_err(KnowReason::from_res(), "source error")?;
549            writeln!(f, "only_one_col").source_raw_err(KnowReason::from_res(), "source error")?;
550        }
551        let res = db.table_load(
552            EXAMPLE_INSERT_SQL,
553            path.clone(),
554            vec![0, 1], // request 2 columns but csv has 1
555            10,
556        );
557        assert!(res.is_err());
558        let e = format!("{}", res.err().unwrap());
559        assert!(e.contains("line"));
560        assert!(e.contains("insufficient columns"));
561        // cleanup
562        let _ = fs::remove_file(&path);
563        Ok(())
564    }
565
566    #[test]
567    fn test_global_persistence_across_handles() -> KnowledgeResult<()> {
568        // Create table via one global handle
569        {
570            let db1 = MemDB::global();
571            db1.execute("CREATE TABLE IF NOT EXISTS gtest (v TEXT)")?;
572            db1.execute("INSERT INTO gtest (v) VALUES ('ok')")?;
573        }
574        // Read via a new global handle; should see the same in-memory DB
575        {
576            let db2 = MemDB::global();
577            let rows = db2.query_row("SELECT v FROM gtest")?;
578            assert_eq!(rows.len(), 1);
579            assert_eq!(rows[0].to_string(), "chars(ok)");
580        }
581        Ok(())
582    }
583
584    #[test]
585    fn test_init_by_conf() -> KnowledgeResult<()> {
586        let db = MemDB::global();
587        db.table_create(EXAMPLE_CREATE_SQL)?;
588        let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
589        db.table_load(
590            EXAMPLE_INSERT_SQL,
591            PathBuf::from("src/mem/dict/example.csv"),
592            vec![0, 1],
593            100,
594        )?;
595        Ok(())
596    }
597
598    // V1 conf serde test removed
599
600    #[test]
601    fn test_alter_level() -> KnowledgeResult<()> {
602        let db = MemDB::global();
603        // ensure clean state across global in-memory handle
604        let _ = db.execute("DROP TABLE IF EXISTS alert_cat_level");
605        db.table_create(
606            r#"CREATE TABLE IF NOT EXISTS alert_cat_level (
607                id   INTEGER PRIMARY KEY,
608                log_type TEXT NOT NULL,
609                level1_code TEXT NOT NULL,
610                level1_name TEXT NOT NULL,
611                level2_code TEXT NOT NULL,
612                level2_name TEXT NOT NULL,
613                original_code TEXT NOT NULL,
614                original_name TEXT NOT NULL
615            )"#,
616        )?;
617        let _ = db.table_clean("DELETE FROM alert_cat_level");
618        db.table_load(
619            r#"INSERT INTO alert_cat_level (log_type, level1_code, level1_name, level2_code, level2_name, original_code, original_name) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
620            PathBuf::from("src/mem/dict/event_cat_level.csv"),
621            vec![0, 1, 2, 3, 4, 5, 6],
622            2000,
623        )?;
624
625        let sql = "select level1_code from alert_cat_level where log_type = :log_type  and  original_code = :code ";
626        let result = db.query_row_params(
627            //"select level1_code from alert_cat_level where log_type = 'jowto_server_alert_log' and original_code = '00000002'",
628            sql,
629            &[(":log_type", "app_log"), (":code", "00000002")],
630        )?;
631        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
632
633        let px = [
634            SqlNamedParam(DataField::from_chars(":code", "00000002")),
635            SqlNamedParam(DataField::from_chars(":log_type", "app_log")),
636        ];
637
638        let p = px.to_params();
639        let result = db.query_row_params(sql, &p)?;
640        assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
641
642        Ok(())
643    }
644
645    #[test]
646    fn test_tosql_bind_various_types() -> KnowledgeResult<()> {
647        use chrono::NaiveDate;
648        use std::net::{IpAddr, Ipv4Addr};
649        use wp_model_core::model::types::value::ObjectValue;
650        use wp_model_core::model::{DateTimeValue, HexT};
651
652        let db = MemDB::instance();
653        db.execute("CREATE TABLE p (v)")?;
654
655        // Bool -> integer 1
656        {
657            let sql = "INSERT INTO p (v) VALUES (:v)";
658            let p = [SqlNamedParam(DataField::from_bool(":v", true))];
659            db.query_row_params(sql, &p.to_params())?;
660            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
661            assert!(matches!(row[0].get_value(), model::Value::Digit(1)));
662        }
663        // Null
664        {
665            let sql = "INSERT INTO p (v) VALUES (:v)";
666            let p = [SqlNamedParam(DataField::new(
667                model::DataType::default(),
668                ":v",
669                model::Value::Null,
670            ))];
671            db.query_row_params(sql, &p.to_params())?;
672            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
673            assert!(matches!(row[0].get_value(), model::Value::Null));
674        }
675        // Time -> text
676        {
677            let dt: DateTimeValue = NaiveDate::from_ymd_opt(2023, 1, 1)
678                .unwrap()
679                .and_hms_opt(0, 0, 0)
680                .unwrap();
681            let sql = "INSERT INTO p (v) VALUES (:v)";
682            let p = [SqlNamedParam(DataField::from_time(":v", dt))];
683            db.query_row_params(sql, &p.to_params())?;
684            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
685            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
686        }
687        // Hex -> text
688        {
689            let sql = "INSERT INTO p (v) VALUES (:v)";
690            let p = [SqlNamedParam(DataField::from_hex(":v", HexT(0xABCD)))];
691            db.query_row_params(sql, &p.to_params())?;
692            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
693            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
694        }
695        // IpAddr -> text
696        {
697            let sql = "INSERT INTO p (v) VALUES (:v)";
698            let p = [SqlNamedParam(DataField::from_ip(
699                ":v",
700                IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
701            ))];
702            db.query_row_params(sql, &p.to_params())?;
703            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
704            assert_eq!(row[0].to_string(), "chars(1.2.3.4)");
705        }
706        // Obj -> text (debug)
707        {
708            let mut obj = ObjectValue::new();
709            obj.insert("k".to_string(), DataField::from_chars("", "v"));
710            let sql = "INSERT INTO p (v) VALUES (:v)";
711            let p = [SqlNamedParam(DataField::from_obj(":v", obj))];
712            db.query_row_params(sql, &p.to_params())?;
713            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
714            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
715        }
716        // Array -> text (debug)
717        {
718            let arr = vec![DataField::from_chars("", "a"), DataField::from_digit("", 1)];
719            let sql = "INSERT INTO p (v) VALUES (:v)";
720            let p = [SqlNamedParam(DataField::from_arr(":v", arr))];
721            db.query_row_params(sql, &p.to_params())?;
722            let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
723            assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
724        }
725        Ok(())
726    }
727
728    #[test]
729    fn test_column_alias_names() -> KnowledgeResult<()> {
730        let db = MemDB::instance();
731        // Create a simple one-shot table/view using alias
732        db.execute("CREATE TABLE ctest (a INTEGER, b TEXT)")?;
733        db.execute("INSERT INTO ctest (a,b) VALUES (42,'x')")?;
734        let row = db.query_row("SELECT a AS 'the number', b AS 'the text' FROM ctest LIMIT 1")?;
735        assert_eq!(row.len(), 2);
736        assert_eq!(row[0].get_name(), "the number");
737        assert_eq!(row[1].get_name(), "the text");
738        Ok(())
739    }
740
741    #[test]
742    fn test_concurrent_inserts() -> KnowledgeResult<()> {
743        use std::thread;
744        let db = MemDB::global();
745        db.execute("CREATE TABLE IF NOT EXISTS concur (v INTEGER)")?;
746        let threads: Vec<_> = (0..4)
747            .map(|_| {
748                thread::spawn(|| {
749                    let dbt = MemDB::global();
750                    for _ in 0..10 {
751                        let _ = dbt.execute("INSERT INTO concur (v) VALUES (1)");
752                    }
753                })
754            })
755            .collect();
756        for t in threads {
757            t.join().unwrap();
758        }
759        let row = db.query_row("SELECT SUM(v) AS total FROM concur")?;
760        // total should be 40
761        assert_eq!(row[0].to_string(), "digit(40)");
762        Ok(())
763    }
764
765    #[test]
766    fn test_query_returns_all_rows() -> KnowledgeResult<()> {
767        let db = MemDB::instance();
768        db.execute("CREATE TABLE multi (id INTEGER, name TEXT)")?;
769        let rows = db.query("SELECT * FROM multi")?;
770        assert!(rows.is_empty(), "empty table should return empty vec");
771        db.execute("INSERT INTO multi (id, name) VALUES (1, 'alice')")?;
772        db.execute("INSERT INTO multi (id, name) VALUES (2, 'bob')")?;
773        db.execute("INSERT INTO multi (id, name) VALUES (3, 'charlie')")?;
774
775        let rows = db.query("SELECT id, name FROM multi ORDER BY id")?;
776        assert_eq!(rows.len(), 3, "should return all 3 rows");
777
778        Ok(())
779    }
780
781    #[allow(dead_code)]
782    fn load_toml_conf<T: serde::de::DeserializeOwned>(path: &str) -> KnowledgeResult<T> {
783        let mut f = File::open(path)
784            .source_raw_err(KnowReason::from_res(), "source error")
785            .doing(format!("conf file not found: {}", path))?;
786        let mut buffer = Vec::with_capacity(10240);
787        f.read_to_end(&mut buffer)
788            .source_raw_err(KnowReason::from_res(), "source error")?;
789        let conf_data =
790            String::from_utf8(buffer).source_raw_err(KnowReason::from_rule(), "source error")?;
791        let dict = EnvDict::new();
792        let conf: T = T::env_parse_toml(conf_data.as_str(), &dict)
793            .source_err(KnowReason::from_conf(), "parse toml config")?;
794        Ok(conf)
795    }
796
797    #[allow(dead_code)]
798    fn export_toml_local<T: Serialize>(val: &T, path: &str) -> KnowledgeResult<()> {
799        let data =
800            toml::to_string_pretty(val).source_raw_err(KnowReason::from_rule(), "source error")?;
801        if let Some(parent) = std::path::Path::new(path).parent() {
802            fs::create_dir_all(parent).source_raw_err(KnowReason::from_res(), "source error")?;
803        }
804        fs::write(path, data).source_raw_err(KnowReason::from_res(), "source error")?;
805        Ok(())
806    }
807}