1use crate::DBQuery;
2use crate::cache::CacheAble;
3use crate::error::{KnowReason, KnowledgeResult};
4use crate::mem::RowData;
5use crate::mem::stub::StubMDB;
6use csv::Reader;
7use enum_dispatch::enum_dispatch;
8use lazy_static::lazy_static;
9use orion_error::conversion::ErrorWith;
10use orion_error::conversion::{SourceRawErr, ToStructError};
11use r2d2_sqlite::SqliteConnectionManager;
12use rusqlite::OpenFlags;
13use rusqlite::Params;
14use rusqlite::ToSql;
15use rusqlite::types::ToSqlOutput;
16use rusqlite::types::Value;
17use std::path::PathBuf;
18use wp_log::debug_kdb;
19use wp_log::info_kdb;
20use wp_log::warn_kdb;
21use wp_model_core::model;
22use wp_model_core::model::DataField;
23
24use super::SqlNamedParam;
25use crate::loader::ProviderKind;
26use crate::runtime::MetadataCacheScope;
27
28lazy_static! {
29 pub static ref MEM_SQLITE_INS: r2d2::Pool<SqliteConnectionManager> =
34 r2d2::Pool::builder()
35 .max_size(1)
36 .build(SqliteConnectionManager::memory())
37 .expect("init SQLite memory pool (size=1) failed");
38}
39
40#[derive(Debug, Clone)]
41pub struct MemDB {
42 conn: r2d2::Pool<SqliteConnectionManager>,
43}
44
45#[derive(Debug, Clone)]
46#[enum_dispatch(DBQuery)]
47pub enum MDBEnum {
48 Stub(StubMDB),
49 Use(MemDB),
50}
51impl Default for MDBEnum {
52 fn default() -> Self {
53 MDBEnum::Stub(StubMDB {})
54 }
55}
56impl MDBEnum {
57 pub fn global() -> Self {
58 MDBEnum::Use(MemDB::global())
59 }
60 pub fn load_test() -> KnowledgeResult<()> {
61 MemDB::load_test()?;
62 Ok(())
63 }
64}
65
66pub fn cache_query<const N: usize, P: Params>(
67 db: &MDBEnum,
68 sql: &str,
69 c_params: &[DataField; N],
70 q_params: P,
71 cache: &mut impl CacheAble<DataField, RowData, N>,
72) -> RowData {
73 crate::cache_util::cache_query_impl(c_params, cache, || db.query_row_params(sql, q_params))
74}
75impl ToSql for SqlNamedParam {
76 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
77 match self.0.get_value() {
78 model::Value::Bool(v) => Ok(ToSqlOutput::Owned(Value::Integer(if *v { 1 } else { 0 }))),
79 model::Value::Null => Ok(ToSqlOutput::Owned(Value::Null)),
80 model::Value::Chars(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
81 model::Value::Symbol(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
82 model::Value::Time(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
83 model::Value::Digit(v) => Ok(ToSqlOutput::Owned(Value::Integer(*v))),
84 model::Value::Hex(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
85 model::Value::Float(v) => Ok(ToSqlOutput::Owned(Value::Real(*v))),
86 model::Value::IpNet(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
87 model::Value::IpAddr(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
88 model::Value::Ignore(_) => Ok(ToSqlOutput::Owned(Value::Null)),
89 model::Value::Obj(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
90 model::Value::Array(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
91 model::Value::Domain(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
92 model::Value::Url(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
93 model::Value::Email(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
94 model::Value::IdCard(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
95 model::Value::MobilePhone(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
96 }
97 }
98}
99
100impl DBQuery for MemDB {
101 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
102 let conn = self
103 .conn
104 .get()
105 .source_raw_err(KnowReason::from_res(), "source error")
106 .doing("get memdb connect")?;
107 let _ = crate::sqlite_ext::register_builtin(&conn);
108 super::query_util::query_cached(&conn, sql, [])
109 }
110
111 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
112 let conn = self
113 .conn
114 .get()
115 .source_raw_err(KnowReason::from_res(), "source error")
116 .doing("get memdb connect")?;
117 let _ = crate::sqlite_ext::register_builtin(&conn);
119 super::query_util::query_first_row_cached(&conn, sql, [])
120 }
121
122 fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
123 debug_kdb!("[memdb] query_row_params: {}", sql);
124 let conn = self
125 .conn
126 .get()
127 .source_raw_err(KnowReason::from_res(), "source error")?;
128 let _ = crate::sqlite_ext::register_builtin(&conn);
130 super::query_util::query_first_row_cached(&conn, sql, params)
131 }
132
133 fn query_row_tdos<P: Params>(
134 &self,
135 _sql: &str,
136 _params: &[DataField; 2],
137 ) -> KnowledgeResult<RowData> {
138 todo!();
142 }
143}
144impl MemDB {
145 pub fn query_with_scope(
146 &self,
147 scope: &MetadataCacheScope,
148 sql: &str,
149 ) -> KnowledgeResult<Vec<RowData>> {
150 let conn = self
151 .conn
152 .get()
153 .source_raw_err(KnowReason::from_res(), "source error")
154 .doing("get memdb connect")?;
155 let _ = crate::sqlite_ext::register_builtin(&conn);
156 super::query_util::query_cached_with_scope(
157 &conn,
158 scope,
159 Some(ProviderKind::SqliteAuthority),
160 sql,
161 [],
162 )
163 }
164
165 pub fn query_row_with_scope(
166 &self,
167 scope: &MetadataCacheScope,
168 sql: &str,
169 ) -> KnowledgeResult<RowData> {
170 let conn = self
171 .conn
172 .get()
173 .source_raw_err(KnowReason::from_res(), "source error")
174 .doing("get memdb connect")?;
175 let _ = crate::sqlite_ext::register_builtin(&conn);
176 super::query_util::query_first_row_cached_with_scope(
177 &conn,
178 scope,
179 Some(ProviderKind::SqliteAuthority),
180 sql,
181 [],
182 )
183 }
184
185 pub fn query_fields_with_scope(
186 &self,
187 scope: &MetadataCacheScope,
188 sql: &str,
189 params: &[DataField],
190 ) -> KnowledgeResult<Vec<RowData>> {
191 let conn = self
192 .conn
193 .get()
194 .source_raw_err(KnowReason::from_res(), "source error")
195 .doing("get memdb connect")?;
196 let _ = crate::sqlite_ext::register_builtin(&conn);
197 let named_params = params
198 .iter()
199 .cloned()
200 .map(SqlNamedParam)
201 .collect::<Vec<_>>();
202 let refs: Vec<(&str, &dyn ToSql)> = named_params
203 .iter()
204 .map(|param| (param.0.get_name(), param as &dyn ToSql))
205 .collect();
206 super::query_util::query_cached_with_scope(
207 &conn,
208 scope,
209 Some(ProviderKind::SqliteAuthority),
210 sql,
211 refs.as_slice(),
212 )
213 }
214
215 pub fn query_named_fields_with_scope(
216 &self,
217 scope: &MetadataCacheScope,
218 sql: &str,
219 params: &[DataField],
220 ) -> KnowledgeResult<RowData> {
221 self.query_fields_with_scope(scope, sql, params)
222 .map(|rows| rows.into_iter().next().unwrap_or_default())
223 }
224
225 pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
226 let conn = self
227 .conn
228 .get()
229 .source_raw_err(KnowReason::from_res(), "source error")
230 .doing("get memdb connect")?;
231 let _ = crate::sqlite_ext::register_builtin(&conn);
232 let named_params = params
233 .iter()
234 .cloned()
235 .map(SqlNamedParam)
236 .collect::<Vec<_>>();
237 let refs: Vec<(&str, &dyn ToSql)> = named_params
238 .iter()
239 .map(|param| (param.0.get_name(), param as &dyn ToSql))
240 .collect();
241 super::query_util::query_cached(&conn, sql, refs.as_slice())
242 }
243
244 pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
245 self.query_fields(sql, params)
246 .map(|rows| rows.into_iter().next().unwrap_or_default())
247 }
248
249 pub fn instance() -> Self {
250 let manager = SqliteConnectionManager::memory();
252 let pool = r2d2::Pool::builder()
253 .max_size(1)
254 .build(manager)
255 .expect("init SQLite memory pool (size=1) failed");
256 Self { conn: pool }
257 }
258 pub fn shared_pool(max_size: u32) -> KnowledgeResult<Self> {
261 let uri = "file:wp_knowledge_shm?mode=memory&cache=shared";
264 let manager = SqliteConnectionManager::file(uri).with_flags(
265 OpenFlags::SQLITE_OPEN_READ_WRITE
266 | OpenFlags::SQLITE_OPEN_CREATE
267 | OpenFlags::SQLITE_OPEN_URI,
268 );
269 let pool = r2d2::Pool::builder()
270 .max_size(max_size)
271 .build(manager)
272 .source_raw_err(KnowReason::from_res(), "source error")?;
273 Ok(Self { conn: pool })
274 }
275
276 pub fn new_file(
278 path: &str,
279 max_size: u32,
280 flags: rusqlite::OpenFlags,
281 ) -> KnowledgeResult<Self> {
282 let manager = r2d2_sqlite::SqliteConnectionManager::file(path).with_flags(flags);
283 let pool = r2d2::Pool::builder()
284 .max_size(max_size)
285 .build(manager)
286 .source_raw_err(KnowReason::from_res(), "source error")?;
287 Ok(Self { conn: pool })
288 }
289 pub fn with_conn<T, F: FnOnce(&rusqlite::Connection) -> anyhow::Result<T>>(
294 &self,
295 f: F,
296 ) -> anyhow::Result<T> {
297 let pooled = self.conn.get()?;
298 let conn_ref: &rusqlite::Connection = &pooled;
299 f(conn_ref)
300 }
301
302 pub fn table_create(&self, sql: &str) -> KnowledgeResult<()> {
303 let conn = self
304 .conn
305 .get()
306 .source_raw_err(KnowReason::from_res(), "source error")?;
307 conn.execute(sql, ())
308 .source_raw_err(KnowReason::from_rule(), "source error")?;
309 debug_kdb!("crate table: {} ", sql);
310 Ok(())
311 }
312 pub fn execute(&self, sql: &str) -> KnowledgeResult<()> {
313 let conn = self
314 .conn
315 .get()
316 .source_raw_err(KnowReason::from_res(), "source error")?;
317 conn.execute(sql, ())
318 .source_raw_err(KnowReason::from_rule(), "source error")?;
319 debug_kdb!("execute: {} ", sql);
320 Ok(())
321 }
322
323 pub fn table_clean(&self, sql: &str) -> KnowledgeResult<()> {
324 let conn = self
325 .conn
326 .get()
327 .source_raw_err(KnowReason::from_res(), "source error")?;
328 conn.execute(sql, ())
329 .source_raw_err(KnowReason::from_rule(), "source error")?;
330 debug_kdb!("clean table: {} ", sql);
331 Ok(())
332 }
333
334 pub fn table_load(
335 &self,
336 sql: &str,
337 csv_path: PathBuf,
338 cols: Vec<usize>,
339 max: usize,
340 ) -> KnowledgeResult<usize> {
341 info_kdb!("load table data in {}", csv_path.display());
342 if !csv_path.exists() {
343 warn_kdb!("{} not find, load knowdb failed", csv_path.display());
344 return Ok(0);
345 }
346 let mut rdr =
347 Reader::from_path(&csv_path).source_raw_err(KnowReason::from_res(), "source error")?;
348 let conn = self
349 .conn
350 .get()
351 .source_raw_err(KnowReason::from_res(), "source error")?;
352 let mut load_cnt: usize = 0;
353 let mut stmt = conn
355 .prepare(sql)
356 .source_raw_err(KnowReason::from_rule(), "source error")?;
357 for (idx, result) in rdr.records().enumerate() {
358 if load_cnt >= max {
359 break;
360 }
361 let record = result.map_err(|e| {
362 KnowReason::from_rule().to_err().with_detail(format!(
363 "read csv record failed at line {}: {}",
364 idx + 1,
365 e
366 ))
367 })?;
368
369 if let Some(max_col) = cols.iter().max()
371 && *max_col >= record.len()
372 {
373 return Err(KnowReason::from_rule().to_err().with_detail(format!(
374 "csv has insufficient columns at line {}: need index {}, got {} columns",
375 idx + 1,
376 *max_col,
377 record.len()
378 )));
379 }
380
381 let mut vec: Vec<&str> = Vec::with_capacity(cols.len());
383 for &ci in &cols {
384 let v = record.get(ci).ok_or_else(|| {
385 KnowReason::from_rule().to_err().with_detail(format!(
386 "line {} col {} missing",
387 idx + 1,
388 ci
389 ))
390 })?;
391 vec.push(v);
392 }
393 let params = rusqlite::params_from_iter(vec);
394 stmt.execute(params)
395 .source_raw_err(KnowReason::from_rule(), "source error")?;
396 load_cnt += 1;
397 }
398 info_kdb!("from {} load data cnt: {}", csv_path.display(), load_cnt);
399 Ok(load_cnt)
400 }
401
402 pub fn check_data(&self, table: &str, scope: (usize, usize)) -> KnowledgeResult<usize> {
403 let conn = self
404 .conn
405 .get()
406 .source_raw_err(KnowReason::from_res(), "source error")?;
407 let count_sql = format!("select count(*) from {}", table);
408 let count: usize = conn
409 .query_row(count_sql.as_str(), (), |row| row.get(0))
410 .source_raw_err(KnowReason::from_rule(), "source error")?;
411 if count >= scope.0 {
412 Ok(count)
413 } else {
414 Err(KnowReason::from_conf()
415 .to_err()
416 .with_detail("table data less")
417 .with_context(("table", table))
418 .with_context(("count", count.to_string())))
419
420 }
428 }
429
430 pub fn global() -> Self {
431 Self {
432 conn: MEM_SQLITE_INS.clone(),
433 }
434 }
435 pub fn load_test() -> KnowledgeResult<Self> {
436 let db = Self::global();
437 debug_kdb!("[memdb] load_test invoked");
438 db.table_create(EXAMPLE_CREATE_SQL)?;
439 let csv = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/mem/dict/example.csv");
441 let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
442 db.table_load(EXAMPLE_INSERT_SQL, csv, vec![0, 1], 100)?;
443 if let Ok(cnt) = db.check_data("example", (1, usize::MAX)) {
445 debug_kdb!("[memdb] example rows loaded = {}", cnt);
446 }
447 Ok(db)
448 }
449}
450pub const EXAMPLE_CREATE_SQL: &str = r#"CREATE TABLE IF NOT EXISTS example (
451 id INTEGER PRIMARY KEY,
452 name TEXT NOT NULL,
453 pinying TEXT NOT NULL
454 )"#;
455pub const EXAMPLE_CLEAN_SQL: &str = "DELETE FROM example";
456pub const EXAMPLE_INSERT_SQL: &str = r#"INSERT INTO example(name,pinying) VALUES (?1, ?2 ) "#;
457
458#[cfg(test)]
459mod tests {
460
461 use std::{fs::File, io::Read};
462
463 use super::*;
464 use crate::error::{KnowReason, KnowledgeResult};
466 use crate::mem::ToSqlParams;
467
468 use orion_conf::EnvTomlLoad;
469 use orion_error::conversion::SourceErr;
470 use orion_variate::EnvDict;
471 use serde::Serialize;
472 use std::fs;
473 use wp_data_fmt::{Csv, RecordFormatter};
474
475 #[test]
476 fn test_load() -> KnowledgeResult<()> {
477 let db = MemDB::instance();
478 db.table_create(EXAMPLE_CREATE_SQL)?;
479 let loaded = db.table_load(
480 EXAMPLE_INSERT_SQL,
481 PathBuf::from("src/mem/dict/example.csv"),
482 vec![0, 1],
483 100,
484 )?;
485 assert_eq!(loaded, 10);
486 let fmt = Csv::default();
487 let tdos = db.query_row("select * from example;")?;
488 for obj in tdos {
489 println!("{}", fmt.fmt_field(&obj.into()));
490 }
491 Ok(())
492 }
493
494 #[test]
495 fn test_csv_off_by_one() -> KnowledgeResult<()> {
496 let db = MemDB::instance();
497 db.table_create(EXAMPLE_CREATE_SQL)?;
498 let loaded = db.table_load(
500 EXAMPLE_INSERT_SQL,
501 PathBuf::from("src/mem/dict/example.csv"),
502 vec![0, 1],
503 1,
504 )?;
505 assert_eq!(loaded, 1);
506 Ok(())
507 }
508
509 #[test]
510 fn test_row_null_mapping() -> KnowledgeResult<()> {
511 let db = MemDB::instance();
512 db.execute("CREATE TABLE tnull (v TEXT)")?;
513 db.execute("INSERT INTO tnull (v) VALUES (NULL)")?;
514 let row = db.query_row("SELECT v FROM tnull")?;
515 assert_eq!(row.len(), 1);
516 assert_eq!(row[0].get_name(), "v");
517 assert!(matches!(row[0].get_value(), model::Value::Null));
519 Ok(())
520 }
521
522 #[test]
523 fn test_row_blob_mapping() -> KnowledgeResult<()> {
524 let db = MemDB::instance();
525 db.execute("CREATE TABLE tblob (b BLOB)")?;
526 db.execute("INSERT INTO tblob (b) VALUES (X'414243')")?;
528 let row = db.query_row("SELECT b FROM tblob")?;
529 assert_eq!(row.len(), 1);
530 assert_eq!(row[0].get_name(), "b");
531 assert_eq!(row[0].to_string(), "chars(ABC)");
533 Ok(())
534 }
535
536 #[test]
537 fn test_csv_missing_column_error() -> KnowledgeResult<()> {
538 use std::fs;
539 use std::io::Write;
540 let db = MemDB::instance();
541 db.table_create(EXAMPLE_CREATE_SQL)?;
542 let mut path = std::env::temp_dir();
544 path.push("wp_knowledge_csv_missing_col.csv");
545 {
546 let mut f =
547 fs::File::create(&path).source_raw_err(KnowReason::from_res(), "source error")?;
548 writeln!(f, "name").source_raw_err(KnowReason::from_res(), "source error")?;
549 writeln!(f, "only_one_col").source_raw_err(KnowReason::from_res(), "source error")?;
550 }
551 let res = db.table_load(
552 EXAMPLE_INSERT_SQL,
553 path.clone(),
554 vec![0, 1], 10,
556 );
557 assert!(res.is_err());
558 let e = format!("{}", res.err().unwrap());
559 assert!(e.contains("line"));
560 assert!(e.contains("insufficient columns"));
561 let _ = fs::remove_file(&path);
563 Ok(())
564 }
565
566 #[test]
567 fn test_global_persistence_across_handles() -> KnowledgeResult<()> {
568 {
570 let db1 = MemDB::global();
571 db1.execute("CREATE TABLE IF NOT EXISTS gtest (v TEXT)")?;
572 db1.execute("INSERT INTO gtest (v) VALUES ('ok')")?;
573 }
574 {
576 let db2 = MemDB::global();
577 let rows = db2.query_row("SELECT v FROM gtest")?;
578 assert_eq!(rows.len(), 1);
579 assert_eq!(rows[0].to_string(), "chars(ok)");
580 }
581 Ok(())
582 }
583
584 #[test]
585 fn test_init_by_conf() -> KnowledgeResult<()> {
586 let db = MemDB::global();
587 db.table_create(EXAMPLE_CREATE_SQL)?;
588 let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
589 db.table_load(
590 EXAMPLE_INSERT_SQL,
591 PathBuf::from("src/mem/dict/example.csv"),
592 vec![0, 1],
593 100,
594 )?;
595 Ok(())
596 }
597
598 #[test]
601 fn test_alter_level() -> KnowledgeResult<()> {
602 let db = MemDB::global();
603 let _ = db.execute("DROP TABLE IF EXISTS alert_cat_level");
605 db.table_create(
606 r#"CREATE TABLE IF NOT EXISTS alert_cat_level (
607 id INTEGER PRIMARY KEY,
608 log_type TEXT NOT NULL,
609 level1_code TEXT NOT NULL,
610 level1_name TEXT NOT NULL,
611 level2_code TEXT NOT NULL,
612 level2_name TEXT NOT NULL,
613 original_code TEXT NOT NULL,
614 original_name TEXT NOT NULL
615 )"#,
616 )?;
617 let _ = db.table_clean("DELETE FROM alert_cat_level");
618 db.table_load(
619 r#"INSERT INTO alert_cat_level (log_type, level1_code, level1_name, level2_code, level2_name, original_code, original_name) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
620 PathBuf::from("src/mem/dict/event_cat_level.csv"),
621 vec![0, 1, 2, 3, 4, 5, 6],
622 2000,
623 )?;
624
625 let sql = "select level1_code from alert_cat_level where log_type = :log_type and original_code = :code ";
626 let result = db.query_row_params(
627 sql,
629 &[(":log_type", "app_log"), (":code", "00000002")],
630 )?;
631 assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
632
633 let px = [
634 SqlNamedParam(DataField::from_chars(":code", "00000002")),
635 SqlNamedParam(DataField::from_chars(":log_type", "app_log")),
636 ];
637
638 let p = px.to_params();
639 let result = db.query_row_params(sql, &p)?;
640 assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
641
642 Ok(())
643 }
644
645 #[test]
646 fn test_tosql_bind_various_types() -> KnowledgeResult<()> {
647 use chrono::NaiveDate;
648 use std::net::{IpAddr, Ipv4Addr};
649 use wp_model_core::model::types::value::ObjectValue;
650 use wp_model_core::model::{DateTimeValue, HexT};
651
652 let db = MemDB::instance();
653 db.execute("CREATE TABLE p (v)")?;
654
655 {
657 let sql = "INSERT INTO p (v) VALUES (:v)";
658 let p = [SqlNamedParam(DataField::from_bool(":v", true))];
659 db.query_row_params(sql, &p.to_params())?;
660 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
661 assert!(matches!(row[0].get_value(), model::Value::Digit(1)));
662 }
663 {
665 let sql = "INSERT INTO p (v) VALUES (:v)";
666 let p = [SqlNamedParam(DataField::new(
667 model::DataType::default(),
668 ":v",
669 model::Value::Null,
670 ))];
671 db.query_row_params(sql, &p.to_params())?;
672 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
673 assert!(matches!(row[0].get_value(), model::Value::Null));
674 }
675 {
677 let dt: DateTimeValue = NaiveDate::from_ymd_opt(2023, 1, 1)
678 .unwrap()
679 .and_hms_opt(0, 0, 0)
680 .unwrap();
681 let sql = "INSERT INTO p (v) VALUES (:v)";
682 let p = [SqlNamedParam(DataField::from_time(":v", dt))];
683 db.query_row_params(sql, &p.to_params())?;
684 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
685 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
686 }
687 {
689 let sql = "INSERT INTO p (v) VALUES (:v)";
690 let p = [SqlNamedParam(DataField::from_hex(":v", HexT(0xABCD)))];
691 db.query_row_params(sql, &p.to_params())?;
692 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
693 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
694 }
695 {
697 let sql = "INSERT INTO p (v) VALUES (:v)";
698 let p = [SqlNamedParam(DataField::from_ip(
699 ":v",
700 IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
701 ))];
702 db.query_row_params(sql, &p.to_params())?;
703 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
704 assert_eq!(row[0].to_string(), "chars(1.2.3.4)");
705 }
706 {
708 let mut obj = ObjectValue::new();
709 obj.insert("k".to_string(), DataField::from_chars("", "v"));
710 let sql = "INSERT INTO p (v) VALUES (:v)";
711 let p = [SqlNamedParam(DataField::from_obj(":v", obj))];
712 db.query_row_params(sql, &p.to_params())?;
713 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
714 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
715 }
716 {
718 let arr = vec![DataField::from_chars("", "a"), DataField::from_digit("", 1)];
719 let sql = "INSERT INTO p (v) VALUES (:v)";
720 let p = [SqlNamedParam(DataField::from_arr(":v", arr))];
721 db.query_row_params(sql, &p.to_params())?;
722 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
723 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
724 }
725 Ok(())
726 }
727
728 #[test]
729 fn test_column_alias_names() -> KnowledgeResult<()> {
730 let db = MemDB::instance();
731 db.execute("CREATE TABLE ctest (a INTEGER, b TEXT)")?;
733 db.execute("INSERT INTO ctest (a,b) VALUES (42,'x')")?;
734 let row = db.query_row("SELECT a AS 'the number', b AS 'the text' FROM ctest LIMIT 1")?;
735 assert_eq!(row.len(), 2);
736 assert_eq!(row[0].get_name(), "the number");
737 assert_eq!(row[1].get_name(), "the text");
738 Ok(())
739 }
740
741 #[test]
742 fn test_concurrent_inserts() -> KnowledgeResult<()> {
743 use std::thread;
744 let db = MemDB::global();
745 db.execute("CREATE TABLE IF NOT EXISTS concur (v INTEGER)")?;
746 let threads: Vec<_> = (0..4)
747 .map(|_| {
748 thread::spawn(|| {
749 let dbt = MemDB::global();
750 for _ in 0..10 {
751 let _ = dbt.execute("INSERT INTO concur (v) VALUES (1)");
752 }
753 })
754 })
755 .collect();
756 for t in threads {
757 t.join().unwrap();
758 }
759 let row = db.query_row("SELECT SUM(v) AS total FROM concur")?;
760 assert_eq!(row[0].to_string(), "digit(40)");
762 Ok(())
763 }
764
765 #[test]
766 fn test_query_returns_all_rows() -> KnowledgeResult<()> {
767 let db = MemDB::instance();
768 db.execute("CREATE TABLE multi (id INTEGER, name TEXT)")?;
769 let rows = db.query("SELECT * FROM multi")?;
770 assert!(rows.is_empty(), "empty table should return empty vec");
771 db.execute("INSERT INTO multi (id, name) VALUES (1, 'alice')")?;
772 db.execute("INSERT INTO multi (id, name) VALUES (2, 'bob')")?;
773 db.execute("INSERT INTO multi (id, name) VALUES (3, 'charlie')")?;
774
775 let rows = db.query("SELECT id, name FROM multi ORDER BY id")?;
776 assert_eq!(rows.len(), 3, "should return all 3 rows");
777
778 Ok(())
779 }
780
781 #[allow(dead_code)]
782 fn load_toml_conf<T: serde::de::DeserializeOwned>(path: &str) -> KnowledgeResult<T> {
783 let mut f = File::open(path)
784 .source_raw_err(KnowReason::from_res(), "source error")
785 .doing(format!("conf file not found: {}", path))?;
786 let mut buffer = Vec::with_capacity(10240);
787 f.read_to_end(&mut buffer)
788 .source_raw_err(KnowReason::from_res(), "source error")?;
789 let conf_data =
790 String::from_utf8(buffer).source_raw_err(KnowReason::from_rule(), "source error")?;
791 let dict = EnvDict::new();
792 let conf: T = T::env_parse_toml(conf_data.as_str(), &dict)
793 .source_err(KnowReason::from_conf(), "parse toml config")?;
794 Ok(conf)
795 }
796
797 #[allow(dead_code)]
798 fn export_toml_local<T: Serialize>(val: &T, path: &str) -> KnowledgeResult<()> {
799 let data =
800 toml::to_string_pretty(val).source_raw_err(KnowReason::from_rule(), "source error")?;
801 if let Some(parent) = std::path::Path::new(path).parent() {
802 fs::create_dir_all(parent).source_raw_err(KnowReason::from_res(), "source error")?;
803 }
804 fs::write(path, data).source_raw_err(KnowReason::from_res(), "source error")?;
805 Ok(())
806 }
807}