1use crate::DBQuery;
2use crate::cache::CacheAble;
3use crate::error::{KnowledgeResult, Reason};
4use crate::mem::RowData;
5use crate::mem::stub::StubMDB;
6use csv::Reader;
7use enum_dispatch::enum_dispatch;
8use lazy_static::lazy_static;
9use orion_error::ErrorWith;
10use orion_error::UvsFrom;
11use orion_error::compat_traits::ErrorOweBase;
12use orion_error::conversion::ToStructError;
13use r2d2_sqlite::SqliteConnectionManager;
14use rusqlite::OpenFlags;
15use rusqlite::Params;
16use rusqlite::ToSql;
17use rusqlite::types::ToSqlOutput;
18use rusqlite::types::Value;
19use std::path::PathBuf;
20use wp_log::debug_kdb;
21use wp_log::info_kdb;
22use wp_log::warn_kdb;
23use wp_model_core::model;
24use wp_model_core::model::DataField;
25
26use super::SqlNamedParam;
27use crate::loader::ProviderKind;
28use crate::runtime::MetadataCacheScope;
29
30lazy_static! {
31 pub static ref MEM_SQLITE_INS: r2d2::Pool<SqliteConnectionManager> =
36 r2d2::Pool::builder()
37 .max_size(1)
38 .build(SqliteConnectionManager::memory())
39 .expect("init SQLite memory pool (size=1) failed");
40}
41
42#[derive(Debug, Clone)]
43pub struct MemDB {
44 conn: r2d2::Pool<SqliteConnectionManager>,
45}
46
47#[derive(Debug, Clone)]
48#[enum_dispatch(DBQuery)]
49pub enum MDBEnum {
50 Stub(StubMDB),
51 Use(MemDB),
52}
53impl Default for MDBEnum {
54 fn default() -> Self {
55 MDBEnum::Stub(StubMDB {})
56 }
57}
58impl MDBEnum {
59 pub fn global() -> Self {
60 MDBEnum::Use(MemDB::global())
61 }
62 pub fn load_test() -> KnowledgeResult<()> {
63 MemDB::load_test()?;
64 Ok(())
65 }
66}
67
68pub fn cache_query<const N: usize, P: Params>(
69 db: &MDBEnum,
70 sql: &str,
71 c_params: &[DataField; N],
72 q_params: P,
73 cache: &mut impl CacheAble<DataField, RowData, N>,
74) -> RowData {
75 crate::cache_util::cache_query_impl(c_params, cache, || db.query_row_params(sql, q_params))
76}
77impl ToSql for SqlNamedParam {
78 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
79 match self.0.get_value() {
80 model::Value::Bool(v) => Ok(ToSqlOutput::Owned(Value::Integer(if *v { 1 } else { 0 }))),
81 model::Value::Null => Ok(ToSqlOutput::Owned(Value::Null)),
82 model::Value::Chars(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
83 model::Value::Symbol(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
84 model::Value::Time(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
85 model::Value::Digit(v) => Ok(ToSqlOutput::Owned(Value::Integer(*v))),
86 model::Value::Hex(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
87 model::Value::Float(v) => Ok(ToSqlOutput::Owned(Value::Real(*v))),
88 model::Value::IpNet(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
89 model::Value::IpAddr(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
90 model::Value::Ignore(_) => Ok(ToSqlOutput::Owned(Value::Null)),
91 model::Value::Obj(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
92 model::Value::Array(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
93 model::Value::Domain(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
94 model::Value::Url(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
95 model::Value::Email(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
96 model::Value::IdCard(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
97 model::Value::MobilePhone(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
98 }
99 }
100}
101
102impl DBQuery for MemDB {
103 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
104 let conn = self
105 .conn
106 .get()
107 .owe(Reason::from_res())
108 .doing("get memdb connect")?;
109 let _ = crate::sqlite_ext::register_builtin(&conn);
110 super::query_util::query_cached(&conn, sql, [])
111 }
112
113 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
114 let conn = self
115 .conn
116 .get()
117 .owe(Reason::from_res())
118 .doing("get memdb connect")?;
119 let _ = crate::sqlite_ext::register_builtin(&conn);
121 super::query_util::query_first_row_cached(&conn, sql, [])
122 }
123
124 fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
125 debug_kdb!("[memdb] query_row_params: {}", sql);
126 let conn = self.conn.get().owe(Reason::from_res())?;
127 let _ = crate::sqlite_ext::register_builtin(&conn);
129 super::query_util::query_first_row_cached(&conn, sql, params)
130 }
131
132 fn query_row_tdos<P: Params>(
133 &self,
134 _sql: &str,
135 _params: &[DataField; 2],
136 ) -> KnowledgeResult<RowData> {
137 todo!();
141 }
142}
143impl MemDB {
144 pub fn query_with_scope(
145 &self,
146 scope: &MetadataCacheScope,
147 sql: &str,
148 ) -> KnowledgeResult<Vec<RowData>> {
149 let conn = self
150 .conn
151 .get()
152 .owe(Reason::from_res())
153 .doing("get memdb connect")?;
154 let _ = crate::sqlite_ext::register_builtin(&conn);
155 super::query_util::query_cached_with_scope(
156 &conn,
157 scope,
158 Some(ProviderKind::SqliteAuthority),
159 sql,
160 [],
161 )
162 }
163
164 pub fn query_row_with_scope(
165 &self,
166 scope: &MetadataCacheScope,
167 sql: &str,
168 ) -> KnowledgeResult<RowData> {
169 let conn = self
170 .conn
171 .get()
172 .owe(Reason::from_res())
173 .doing("get memdb connect")?;
174 let _ = crate::sqlite_ext::register_builtin(&conn);
175 super::query_util::query_first_row_cached_with_scope(
176 &conn,
177 scope,
178 Some(ProviderKind::SqliteAuthority),
179 sql,
180 [],
181 )
182 }
183
184 pub fn query_fields_with_scope(
185 &self,
186 scope: &MetadataCacheScope,
187 sql: &str,
188 params: &[DataField],
189 ) -> KnowledgeResult<Vec<RowData>> {
190 let conn = self
191 .conn
192 .get()
193 .owe(Reason::from_res())
194 .doing("get memdb connect")?;
195 let _ = crate::sqlite_ext::register_builtin(&conn);
196 let named_params = params
197 .iter()
198 .cloned()
199 .map(SqlNamedParam)
200 .collect::<Vec<_>>();
201 let refs: Vec<(&str, &dyn ToSql)> = named_params
202 .iter()
203 .map(|param| (param.0.get_name(), param as &dyn ToSql))
204 .collect();
205 super::query_util::query_cached_with_scope(
206 &conn,
207 scope,
208 Some(ProviderKind::SqliteAuthority),
209 sql,
210 refs.as_slice(),
211 )
212 }
213
214 pub fn query_named_fields_with_scope(
215 &self,
216 scope: &MetadataCacheScope,
217 sql: &str,
218 params: &[DataField],
219 ) -> KnowledgeResult<RowData> {
220 self.query_fields_with_scope(scope, sql, params)
221 .map(|rows| rows.into_iter().next().unwrap_or_default())
222 }
223
224 pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
225 let conn = self
226 .conn
227 .get()
228 .owe(Reason::from_res())
229 .doing("get memdb connect")?;
230 let _ = crate::sqlite_ext::register_builtin(&conn);
231 let named_params = params
232 .iter()
233 .cloned()
234 .map(SqlNamedParam)
235 .collect::<Vec<_>>();
236 let refs: Vec<(&str, &dyn ToSql)> = named_params
237 .iter()
238 .map(|param| (param.0.get_name(), param as &dyn ToSql))
239 .collect();
240 super::query_util::query_cached(&conn, sql, refs.as_slice())
241 }
242
243 pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
244 self.query_fields(sql, params)
245 .map(|rows| rows.into_iter().next().unwrap_or_default())
246 }
247
248 pub fn instance() -> Self {
249 let manager = SqliteConnectionManager::memory();
251 let pool = r2d2::Pool::builder()
252 .max_size(1)
253 .build(manager)
254 .expect("init SQLite memory pool (size=1) failed");
255 Self { conn: pool }
256 }
257 pub fn shared_pool(max_size: u32) -> KnowledgeResult<Self> {
260 let uri = "file:wp_knowledge_shm?mode=memory&cache=shared";
263 let manager = SqliteConnectionManager::file(uri).with_flags(
264 OpenFlags::SQLITE_OPEN_READ_WRITE
265 | OpenFlags::SQLITE_OPEN_CREATE
266 | OpenFlags::SQLITE_OPEN_URI,
267 );
268 let pool = r2d2::Pool::builder()
269 .max_size(max_size)
270 .build(manager)
271 .owe(Reason::from_res())?;
272 Ok(Self { conn: pool })
273 }
274
275 pub fn new_file(
277 path: &str,
278 max_size: u32,
279 flags: rusqlite::OpenFlags,
280 ) -> KnowledgeResult<Self> {
281 let manager = r2d2_sqlite::SqliteConnectionManager::file(path).with_flags(flags);
282 let pool = r2d2::Pool::builder()
283 .max_size(max_size)
284 .build(manager)
285 .owe(Reason::from_res())?;
286 Ok(Self { conn: pool })
287 }
288 pub fn with_conn<T, F: FnOnce(&rusqlite::Connection) -> anyhow::Result<T>>(
293 &self,
294 f: F,
295 ) -> anyhow::Result<T> {
296 let pooled = self.conn.get()?;
297 let conn_ref: &rusqlite::Connection = &pooled;
298 f(conn_ref)
299 }
300
301 pub fn table_create(&self, sql: &str) -> KnowledgeResult<()> {
302 let conn = self.conn.get().owe(Reason::from_res())?;
303 conn.execute(sql, ()).owe(Reason::from_rule())?;
304 debug_kdb!("crate table: {} ", sql);
305 Ok(())
306 }
307 pub fn execute(&self, sql: &str) -> KnowledgeResult<()> {
308 let conn = self.conn.get().owe(Reason::from_res())?;
309 conn.execute(sql, ()).owe(Reason::from_rule())?;
310 debug_kdb!("execute: {} ", sql);
311 Ok(())
312 }
313
314 pub fn table_clean(&self, sql: &str) -> KnowledgeResult<()> {
315 let conn = self.conn.get().owe(Reason::from_res())?;
316 conn.execute(sql, ()).owe(Reason::from_rule())?;
317 debug_kdb!("clean table: {} ", sql);
318 Ok(())
319 }
320
321 pub fn table_load(
322 &self,
323 sql: &str,
324 csv_path: PathBuf,
325 cols: Vec<usize>,
326 max: usize,
327 ) -> KnowledgeResult<usize> {
328 info_kdb!("load table data in {}", csv_path.display());
329 if !csv_path.exists() {
330 warn_kdb!("{} not find, load knowdb failed", csv_path.display());
331 return Ok(0);
332 }
333 let mut rdr = Reader::from_path(&csv_path).owe(Reason::from_res())?;
334 let conn = self.conn.get().owe(Reason::from_res())?;
335 let mut load_cnt: usize = 0;
336 let mut stmt = conn.prepare(sql).owe(Reason::from_rule())?;
338 for (idx, result) in rdr.records().enumerate() {
339 if load_cnt >= max {
340 break;
341 }
342 let record = result.map_err(|e| {
343 Reason::from_rule().to_err().with_detail(format!(
344 "read csv record failed at line {}: {}",
345 idx + 1,
346 e
347 ))
348 })?;
349
350 if let Some(max_col) = cols.iter().max()
352 && *max_col >= record.len()
353 {
354 return Err(Reason::from_rule().to_err().with_detail(format!(
355 "csv has insufficient columns at line {}: need index {}, got {} columns",
356 idx + 1,
357 *max_col,
358 record.len()
359 )));
360 }
361
362 let mut vec: Vec<&str> = Vec::with_capacity(cols.len());
364 for &ci in &cols {
365 let v = record.get(ci).ok_or_else(|| {
366 Reason::from_rule().to_err().with_detail(format!(
367 "line {} col {} missing",
368 idx + 1,
369 ci
370 ))
371 })?;
372 vec.push(v);
373 }
374 let params = rusqlite::params_from_iter(vec);
375 stmt.execute(params).owe(Reason::from_rule())?;
376 load_cnt += 1;
377 }
378 info_kdb!("from {} load data cnt: {}", csv_path.display(), load_cnt);
379 Ok(load_cnt)
380 }
381
382 pub fn check_data(&self, table: &str, scope: (usize, usize)) -> KnowledgeResult<usize> {
383 let conn = self.conn.get().owe(Reason::from_res())?;
384 let count_sql = format!("select count(*) from {}", table);
385 let count: usize = conn
386 .query_row(count_sql.as_str(), (), |row| row.get(0))
387 .owe(Reason::from_rule())?;
388 if count >= scope.0 {
389 Ok(count)
390 } else {
391 Err(Reason::from_conf()
392 .to_err()
393 .with_detail("table data less")
394 .with_context(("table", table))
395 .with_context(("count", count.to_string())))
396
397 }
405 }
406
407 pub fn global() -> Self {
408 Self {
409 conn: MEM_SQLITE_INS.clone(),
410 }
411 }
412 pub fn load_test() -> KnowledgeResult<Self> {
413 let db = Self::global();
414 debug_kdb!("[memdb] load_test invoked");
415 db.table_create(EXAMPLE_CREATE_SQL)?;
416 let csv = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/mem/dict/example.csv");
418 let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
419 db.table_load(EXAMPLE_INSERT_SQL, csv, vec![0, 1], 100)?;
420 if let Ok(cnt) = db.check_data("example", (1, usize::MAX)) {
422 debug_kdb!("[memdb] example rows loaded = {}", cnt);
423 }
424 Ok(db)
425 }
426}
427pub const EXAMPLE_CREATE_SQL: &str = r#"CREATE TABLE IF NOT EXISTS example (
428 id INTEGER PRIMARY KEY,
429 name TEXT NOT NULL,
430 pinying TEXT NOT NULL
431 )"#;
432pub const EXAMPLE_CLEAN_SQL: &str = "DELETE FROM example";
433pub const EXAMPLE_INSERT_SQL: &str = r#"INSERT INTO example(name,pinying) VALUES (?1, ?2 ) "#;
434
435#[cfg(test)]
436mod tests {
437
438 use std::{fs::File, io::Read};
439
440 use super::*;
441 use crate::error::{KnowledgeResult, Reason};
443 use crate::mem::ToSqlParams;
444
445 use orion_conf::EnvTomlLoad;
446 use orion_error::UvsFrom;
447 use orion_error::compat_traits::ErrorOweBase;
448 use orion_variate::EnvDict;
449 use serde::Serialize;
450 use std::fs;
451 use wp_data_fmt::{Csv, RecordFormatter};
452
453 #[test]
454 fn test_load() -> KnowledgeResult<()> {
455 let db = MemDB::instance();
456 db.table_create(EXAMPLE_CREATE_SQL)?;
457 let loaded = db.table_load(
458 EXAMPLE_INSERT_SQL,
459 PathBuf::from("src/mem/dict/example.csv"),
460 vec![0, 1],
461 100,
462 )?;
463 assert_eq!(loaded, 10);
464 let fmt = Csv::default();
465 let tdos = db.query_row("select * from example;")?;
466 for obj in tdos {
467 println!("{}", fmt.fmt_field(&obj.into()));
468 }
469 Ok(())
470 }
471
472 #[test]
473 fn test_csv_off_by_one() -> KnowledgeResult<()> {
474 let db = MemDB::instance();
475 db.table_create(EXAMPLE_CREATE_SQL)?;
476 let loaded = db.table_load(
478 EXAMPLE_INSERT_SQL,
479 PathBuf::from("src/mem/dict/example.csv"),
480 vec![0, 1],
481 1,
482 )?;
483 assert_eq!(loaded, 1);
484 Ok(())
485 }
486
487 #[test]
488 fn test_row_null_mapping() -> KnowledgeResult<()> {
489 let db = MemDB::instance();
490 db.execute("CREATE TABLE tnull (v TEXT)")?;
491 db.execute("INSERT INTO tnull (v) VALUES (NULL)")?;
492 let row = db.query_row("SELECT v FROM tnull")?;
493 assert_eq!(row.len(), 1);
494 assert_eq!(row[0].get_name(), "v");
495 assert!(matches!(row[0].get_value(), model::Value::Null));
497 Ok(())
498 }
499
500 #[test]
501 fn test_row_blob_mapping() -> KnowledgeResult<()> {
502 let db = MemDB::instance();
503 db.execute("CREATE TABLE tblob (b BLOB)")?;
504 db.execute("INSERT INTO tblob (b) VALUES (X'414243')")?;
506 let row = db.query_row("SELECT b FROM tblob")?;
507 assert_eq!(row.len(), 1);
508 assert_eq!(row[0].get_name(), "b");
509 assert_eq!(row[0].to_string(), "chars(ABC)");
511 Ok(())
512 }
513
514 #[test]
515 fn test_csv_missing_column_error() -> KnowledgeResult<()> {
516 use std::fs;
517 use std::io::Write;
518 let db = MemDB::instance();
519 db.table_create(EXAMPLE_CREATE_SQL)?;
520 let mut path = std::env::temp_dir();
522 path.push("wp_knowledge_csv_missing_col.csv");
523 {
524 let mut f = fs::File::create(&path).owe(Reason::from_res())?;
525 writeln!(f, "name").owe(Reason::from_res())?;
526 writeln!(f, "only_one_col").owe(Reason::from_res())?;
527 }
528 let res = db.table_load(
529 EXAMPLE_INSERT_SQL,
530 path.clone(),
531 vec![0, 1], 10,
533 );
534 assert!(res.is_err());
535 let e = format!("{}", res.err().unwrap());
536 assert!(e.contains("line"));
537 assert!(e.contains("insufficient columns"));
538 let _ = fs::remove_file(&path);
540 Ok(())
541 }
542
543 #[test]
544 fn test_global_persistence_across_handles() -> KnowledgeResult<()> {
545 {
547 let db1 = MemDB::global();
548 db1.execute("CREATE TABLE IF NOT EXISTS gtest (v TEXT)")?;
549 db1.execute("INSERT INTO gtest (v) VALUES ('ok')")?;
550 }
551 {
553 let db2 = MemDB::global();
554 let rows = db2.query_row("SELECT v FROM gtest")?;
555 assert_eq!(rows.len(), 1);
556 assert_eq!(rows[0].to_string(), "chars(ok)");
557 }
558 Ok(())
559 }
560
561 #[test]
562 fn test_init_by_conf() -> KnowledgeResult<()> {
563 let db = MemDB::global();
564 db.table_create(EXAMPLE_CREATE_SQL)?;
565 let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
566 db.table_load(
567 EXAMPLE_INSERT_SQL,
568 PathBuf::from("src/mem/dict/example.csv"),
569 vec![0, 1],
570 100,
571 )?;
572 Ok(())
573 }
574
575 #[test]
578 fn test_alter_level() -> KnowledgeResult<()> {
579 let db = MemDB::global();
580 let _ = db.execute("DROP TABLE IF EXISTS alert_cat_level");
582 db.table_create(
583 r#"CREATE TABLE IF NOT EXISTS alert_cat_level (
584 id INTEGER PRIMARY KEY,
585 log_type TEXT NOT NULL,
586 level1_code TEXT NOT NULL,
587 level1_name TEXT NOT NULL,
588 level2_code TEXT NOT NULL,
589 level2_name TEXT NOT NULL,
590 original_code TEXT NOT NULL,
591 original_name TEXT NOT NULL
592 )"#,
593 )?;
594 let _ = db.table_clean("DELETE FROM alert_cat_level");
595 db.table_load(
596 r#"INSERT INTO alert_cat_level (log_type, level1_code, level1_name, level2_code, level2_name, original_code, original_name) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
597 PathBuf::from("src/mem/dict/event_cat_level.csv"),
598 vec![0, 1, 2, 3, 4, 5, 6],
599 2000,
600 )?;
601
602 let sql = "select level1_code from alert_cat_level where log_type = :log_type and original_code = :code ";
603 let result = db.query_row_params(
604 sql,
606 &[(":log_type", "app_log"), (":code", "00000002")],
607 )?;
608 assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
609
610 let px = [
611 SqlNamedParam(DataField::from_chars(":code", "00000002")),
612 SqlNamedParam(DataField::from_chars(":log_type", "app_log")),
613 ];
614
615 let p = px.to_params();
616 let result = db.query_row_params(sql, &p)?;
617 assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
618
619 Ok(())
620 }
621
622 #[test]
623 fn test_tosql_bind_various_types() -> KnowledgeResult<()> {
624 use chrono::NaiveDate;
625 use std::net::{IpAddr, Ipv4Addr};
626 use wp_model_core::model::types::value::ObjectValue;
627 use wp_model_core::model::{DateTimeValue, HexT};
628
629 let db = MemDB::instance();
630 db.execute("CREATE TABLE p (v)")?;
631
632 {
634 let sql = "INSERT INTO p (v) VALUES (:v)";
635 let p = [SqlNamedParam(DataField::from_bool(":v", true))];
636 db.query_row_params(sql, &p.to_params())?;
637 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
638 assert!(matches!(row[0].get_value(), model::Value::Digit(1)));
639 }
640 {
642 let sql = "INSERT INTO p (v) VALUES (:v)";
643 let p = [SqlNamedParam(DataField::new(
644 model::DataType::default(),
645 ":v",
646 model::Value::Null,
647 ))];
648 db.query_row_params(sql, &p.to_params())?;
649 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
650 assert!(matches!(row[0].get_value(), model::Value::Null));
651 }
652 {
654 let dt: DateTimeValue = NaiveDate::from_ymd_opt(2023, 1, 1)
655 .unwrap()
656 .and_hms_opt(0, 0, 0)
657 .unwrap();
658 let sql = "INSERT INTO p (v) VALUES (:v)";
659 let p = [SqlNamedParam(DataField::from_time(":v", dt))];
660 db.query_row_params(sql, &p.to_params())?;
661 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
662 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
663 }
664 {
666 let sql = "INSERT INTO p (v) VALUES (:v)";
667 let p = [SqlNamedParam(DataField::from_hex(":v", HexT(0xABCD)))];
668 db.query_row_params(sql, &p.to_params())?;
669 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
670 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
671 }
672 {
674 let sql = "INSERT INTO p (v) VALUES (:v)";
675 let p = [SqlNamedParam(DataField::from_ip(
676 ":v",
677 IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
678 ))];
679 db.query_row_params(sql, &p.to_params())?;
680 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
681 assert_eq!(row[0].to_string(), "chars(1.2.3.4)");
682 }
683 {
685 let mut obj = ObjectValue::new();
686 obj.insert("k".to_string(), DataField::from_chars("", "v"));
687 let sql = "INSERT INTO p (v) VALUES (:v)";
688 let p = [SqlNamedParam(DataField::from_obj(":v", obj))];
689 db.query_row_params(sql, &p.to_params())?;
690 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
691 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
692 }
693 {
695 let arr = vec![DataField::from_chars("", "a"), DataField::from_digit("", 1)];
696 let sql = "INSERT INTO p (v) VALUES (:v)";
697 let p = [SqlNamedParam(DataField::from_arr(":v", arr))];
698 db.query_row_params(sql, &p.to_params())?;
699 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
700 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
701 }
702 Ok(())
703 }
704
705 #[test]
706 fn test_column_alias_names() -> KnowledgeResult<()> {
707 let db = MemDB::instance();
708 db.execute("CREATE TABLE ctest (a INTEGER, b TEXT)")?;
710 db.execute("INSERT INTO ctest (a,b) VALUES (42,'x')")?;
711 let row = db.query_row("SELECT a AS 'the number', b AS 'the text' FROM ctest LIMIT 1")?;
712 assert_eq!(row.len(), 2);
713 assert_eq!(row[0].get_name(), "the number");
714 assert_eq!(row[1].get_name(), "the text");
715 Ok(())
716 }
717
718 #[test]
719 fn test_concurrent_inserts() -> KnowledgeResult<()> {
720 use std::thread;
721 let db = MemDB::global();
722 db.execute("CREATE TABLE IF NOT EXISTS concur (v INTEGER)")?;
723 let threads: Vec<_> = (0..4)
724 .map(|_| {
725 thread::spawn(|| {
726 let dbt = MemDB::global();
727 for _ in 0..10 {
728 let _ = dbt.execute("INSERT INTO concur (v) VALUES (1)");
729 }
730 })
731 })
732 .collect();
733 for t in threads {
734 t.join().unwrap();
735 }
736 let row = db.query_row("SELECT SUM(v) AS total FROM concur")?;
737 assert_eq!(row[0].to_string(), "digit(40)");
739 Ok(())
740 }
741
742 #[test]
743 fn test_query_returns_all_rows() -> KnowledgeResult<()> {
744 let db = MemDB::instance();
745 db.execute("CREATE TABLE multi (id INTEGER, name TEXT)")?;
746 let rows = db.query("SELECT * FROM multi")?;
747 assert!(rows.is_empty(), "empty table should return empty vec");
748 db.execute("INSERT INTO multi (id, name) VALUES (1, 'alice')")?;
749 db.execute("INSERT INTO multi (id, name) VALUES (2, 'bob')")?;
750 db.execute("INSERT INTO multi (id, name) VALUES (3, 'charlie')")?;
751
752 let rows = db.query("SELECT id, name FROM multi ORDER BY id")?;
753 assert_eq!(rows.len(), 3, "should return all 3 rows");
754
755 Ok(())
756 }
757
758 #[allow(dead_code)]
759 fn load_toml_conf<T: serde::de::DeserializeOwned>(path: &str) -> KnowledgeResult<T> {
760 let mut f = File::open(path)
761 .owe(Reason::from_res())
762 .doing(format!("conf file not found: {}", path))?;
763 let mut buffer = Vec::with_capacity(10240);
764 f.read_to_end(&mut buffer).owe(Reason::from_res())?;
765 let conf_data = String::from_utf8(buffer).owe(Reason::from_rule())?;
766 let dict = EnvDict::new();
767 let conf: T = T::env_parse_toml(conf_data.as_str(), &dict).owe(Reason::from_conf())?;
768 Ok(conf)
769 }
770
771 #[allow(dead_code)]
772 fn export_toml_local<T: Serialize>(val: &T, path: &str) -> KnowledgeResult<()> {
773 let data = toml::to_string_pretty(val).owe(Reason::from_rule())?;
774 if let Some(parent) = std::path::Path::new(path).parent() {
775 fs::create_dir_all(parent).owe(Reason::from_res())?;
776 }
777 fs::write(path, data).owe(Reason::from_res())?;
778 Ok(())
779 }
780}