1use crate::DBQuery;
2use crate::cache::CacheAble;
3use crate::mem::RowData;
4use crate::mem::stub::StubMDB;
5use csv::Reader;
6use enum_dispatch::enum_dispatch;
7use lazy_static::lazy_static;
8use orion_error::ErrorOwe;
9use orion_error::ErrorWith;
10use orion_error::ToStructError;
11use orion_error::UvsFrom;
12use r2d2_sqlite::SqliteConnectionManager;
13use rusqlite::OpenFlags;
14use rusqlite::Params;
15use rusqlite::ToSql;
16use rusqlite::types::ToSqlOutput;
17use rusqlite::types::Value;
18use std::path::PathBuf;
19use wp_error::KnowledgeReason;
20use wp_error::KnowledgeResult;
21use wp_log::debug_kdb;
22use wp_log::info_kdb;
23use wp_log::warn_kdb;
24use wp_model_core::model;
25use wp_model_core::model::DataField;
26
27use super::AnyResult;
28use super::SqlNamedParam;
29use crate::loader::ProviderKind;
30use crate::runtime::MetadataCacheScope;
31
32lazy_static! {
33 pub static ref MEM_SQLITE_INS: r2d2::Pool<SqliteConnectionManager> =
38 r2d2::Pool::builder()
39 .max_size(1)
40 .build(SqliteConnectionManager::memory())
41 .expect("init SQLite memory pool (size=1) failed");
42}
43
44#[derive(Debug, Clone)]
45pub struct MemDB {
46 conn: r2d2::Pool<SqliteConnectionManager>,
47}
48
49#[derive(Debug, Clone)]
50#[enum_dispatch(DBQuery)]
51pub enum MDBEnum {
52 Stub(StubMDB),
53 Use(MemDB),
54}
55impl Default for MDBEnum {
56 fn default() -> Self {
57 MDBEnum::Stub(StubMDB {})
58 }
59}
60impl MDBEnum {
61 pub fn global() -> Self {
62 MDBEnum::Use(MemDB::global())
63 }
64 pub fn load_test() -> AnyResult<()> {
65 MemDB::load_test()?;
66 Ok(())
67 }
68}
69
70pub fn cache_query<const N: usize, P: Params>(
71 db: &MDBEnum,
72 sql: &str,
73 c_params: &[DataField; N],
74 q_params: P,
75 cache: &mut impl CacheAble<DataField, RowData, N>,
76) -> RowData {
77 crate::cache_util::cache_query_impl(c_params, cache, || db.query_row_params(sql, q_params))
78}
79impl ToSql for SqlNamedParam {
80 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
81 match self.0.get_value() {
82 model::Value::Bool(v) => Ok(ToSqlOutput::Owned(Value::Integer(if *v { 1 } else { 0 }))),
83 model::Value::Null => Ok(ToSqlOutput::Owned(Value::Null)),
84 model::Value::Chars(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
85 model::Value::Symbol(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
86 model::Value::Time(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
87 model::Value::Digit(v) => Ok(ToSqlOutput::Owned(Value::Integer(*v))),
88 model::Value::Hex(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
89 model::Value::Float(v) => Ok(ToSqlOutput::Owned(Value::Real(*v))),
90 model::Value::IpNet(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
91 model::Value::IpAddr(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
92 model::Value::Ignore(_) => Ok(ToSqlOutput::Owned(Value::Null)),
93 model::Value::Obj(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
94 model::Value::Array(v) => Ok(ToSqlOutput::Owned(Value::Text(format!("{:?}", v)))),
95 model::Value::Domain(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
96 model::Value::Url(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
97 model::Value::Email(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
98 model::Value::IdCard(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
99 model::Value::MobilePhone(v) => Ok(ToSqlOutput::Owned(Value::Text(v.0.to_string()))),
100 }
101 }
102}
103
104impl DBQuery for MemDB {
105 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
106 let conn = self.conn.get().owe_res().want("get memdb connect")?;
107 let _ = crate::sqlite_ext::register_builtin(&conn);
108 super::query_util::query_cached(&conn, sql, [])
109 }
110
111 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
112 let conn = self.conn.get().owe_res().want("get memdb connect")?;
113 let _ = crate::sqlite_ext::register_builtin(&conn);
115 super::query_util::query_first_row_cached(&conn, sql, [])
116 }
117
118 fn query_row_params<P: Params>(&self, sql: &str, params: P) -> KnowledgeResult<RowData> {
119 debug_kdb!("[memdb] query_row_params: {}", sql);
120 let conn = self.conn.get().owe_res()?;
121 let _ = crate::sqlite_ext::register_builtin(&conn);
123 super::query_util::query_first_row_cached(&conn, sql, params)
124 }
125
126 fn query_row_tdos<P: Params>(
127 &self,
128 _sql: &str,
129 _params: &[DataField; 2],
130 ) -> KnowledgeResult<RowData> {
131 todo!();
135 }
136}
137impl MemDB {
138 pub fn query_with_scope(
139 &self,
140 scope: &MetadataCacheScope,
141 sql: &str,
142 ) -> KnowledgeResult<Vec<RowData>> {
143 let conn = self.conn.get().owe_res().want("get memdb connect")?;
144 let _ = crate::sqlite_ext::register_builtin(&conn);
145 super::query_util::query_cached_with_scope(
146 &conn,
147 scope,
148 Some(ProviderKind::SqliteAuthority),
149 sql,
150 [],
151 )
152 }
153
154 pub fn query_row_with_scope(
155 &self,
156 scope: &MetadataCacheScope,
157 sql: &str,
158 ) -> KnowledgeResult<RowData> {
159 let conn = self.conn.get().owe_res().want("get memdb connect")?;
160 let _ = crate::sqlite_ext::register_builtin(&conn);
161 super::query_util::query_first_row_cached_with_scope(
162 &conn,
163 scope,
164 Some(ProviderKind::SqliteAuthority),
165 sql,
166 [],
167 )
168 }
169
170 pub fn query_fields_with_scope(
171 &self,
172 scope: &MetadataCacheScope,
173 sql: &str,
174 params: &[DataField],
175 ) -> KnowledgeResult<Vec<RowData>> {
176 let conn = self.conn.get().owe_res().want("get memdb connect")?;
177 let _ = crate::sqlite_ext::register_builtin(&conn);
178 let named_params = params
179 .iter()
180 .cloned()
181 .map(SqlNamedParam)
182 .collect::<Vec<_>>();
183 let refs: Vec<(&str, &dyn ToSql)> = named_params
184 .iter()
185 .map(|param| (param.0.get_name(), param as &dyn ToSql))
186 .collect();
187 super::query_util::query_cached_with_scope(
188 &conn,
189 scope,
190 Some(ProviderKind::SqliteAuthority),
191 sql,
192 refs.as_slice(),
193 )
194 }
195
196 pub fn query_named_fields_with_scope(
197 &self,
198 scope: &MetadataCacheScope,
199 sql: &str,
200 params: &[DataField],
201 ) -> KnowledgeResult<RowData> {
202 self.query_fields_with_scope(scope, sql, params)
203 .map(|rows| rows.into_iter().next().unwrap_or_default())
204 }
205
206 pub fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
207 let conn = self.conn.get().owe_res().want("get memdb connect")?;
208 let _ = crate::sqlite_ext::register_builtin(&conn);
209 let named_params = params
210 .iter()
211 .cloned()
212 .map(SqlNamedParam)
213 .collect::<Vec<_>>();
214 let refs: Vec<(&str, &dyn ToSql)> = named_params
215 .iter()
216 .map(|param| (param.0.get_name(), param as &dyn ToSql))
217 .collect();
218 super::query_util::query_cached(&conn, sql, refs.as_slice())
219 }
220
221 pub fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
222 self.query_fields(sql, params)
223 .map(|rows| rows.into_iter().next().unwrap_or_default())
224 }
225
226 pub fn instance() -> Self {
227 let manager = SqliteConnectionManager::memory();
229 let pool = r2d2::Pool::builder()
230 .max_size(1)
231 .build(manager)
232 .expect("init SQLite memory pool (size=1) failed");
233 Self { conn: pool }
234 }
235 pub fn shared_pool(max_size: u32) -> AnyResult<Self> {
238 let uri = "file:wp_knowledge_shm?mode=memory&cache=shared";
241 let manager = SqliteConnectionManager::file(uri).with_flags(
242 OpenFlags::SQLITE_OPEN_READ_WRITE
243 | OpenFlags::SQLITE_OPEN_CREATE
244 | OpenFlags::SQLITE_OPEN_URI,
245 );
246 let pool = r2d2::Pool::builder().max_size(max_size).build(manager)?;
247 Ok(Self { conn: pool })
248 }
249
250 pub fn new_file(
252 path: &str,
253 max_size: u32,
254 flags: rusqlite::OpenFlags,
255 ) -> KnowledgeResult<Self> {
256 let manager = r2d2_sqlite::SqliteConnectionManager::file(path).with_flags(flags);
257 let pool = r2d2::Pool::builder()
258 .max_size(max_size)
259 .build(manager)
260 .owe_res()?;
261 Ok(Self { conn: pool })
262 }
263 pub fn with_conn<T, F: FnOnce(&rusqlite::Connection) -> AnyResult<T>>(
268 &self,
269 f: F,
270 ) -> AnyResult<T> {
271 let pooled = self.conn.get()?;
272 let conn_ref: &rusqlite::Connection = &pooled;
273 f(conn_ref)
274 }
275
276 pub fn table_create(&self, sql: &str) -> anyhow::Result<()> {
277 let conn = self.conn.get()?;
278 conn.execute(sql, ())?;
279 debug_kdb!("crate table: {} ", sql);
280 Ok(())
281 }
282 pub fn execute(&self, sql: &str) -> anyhow::Result<()> {
283 let conn = self.conn.get()?;
284 conn.execute(sql, ())?;
285 debug_kdb!("execute: {} ", sql);
286 Ok(())
287 }
288
289 pub fn table_clean(&self, sql: &str) -> anyhow::Result<()> {
290 let conn = self.conn.get()?;
291 conn.execute(sql, ())?;
292 debug_kdb!("clean table: {} ", sql);
293 Ok(())
294 }
295
296 pub fn table_load(
297 &self,
298 sql: &str,
299 csv_path: PathBuf,
300 cols: Vec<usize>,
301 max: usize,
302 ) -> AnyResult<usize> {
303 info_kdb!("load table data in {}", csv_path.display());
304 if !csv_path.exists() {
305 warn_kdb!("{} not find, load knowdb failed", csv_path.display());
306 return Ok(0);
307 }
308 let mut rdr = Reader::from_path(&csv_path)?;
309 let conn = self.conn.get()?;
310 let mut load_cnt: usize = 0;
311 let mut stmt = conn.prepare(sql)?;
313 for (idx, result) in rdr.records().enumerate() {
314 if load_cnt >= max {
315 break;
316 }
317 let record = result.map_err(|e| {
318 anyhow::anyhow!("read csv record failed at line {}: {}", idx + 1, e)
319 })?;
320
321 if let Some(max_col) = cols.iter().max()
323 && *max_col >= record.len()
324 {
325 return Err(anyhow::anyhow!(
326 "csv has insufficient columns at line {}: need index {}, got {} columns",
327 idx + 1,
328 *max_col,
329 record.len()
330 ));
331 }
332
333 let mut vec: Vec<&str> = Vec::with_capacity(cols.len());
335 for &ci in &cols {
336 let v = record
337 .get(ci)
338 .ok_or_else(|| anyhow::anyhow!("line {} col {} missing", idx + 1, ci))?;
339 vec.push(v);
340 }
341 let params = rusqlite::params_from_iter(vec);
342 stmt.execute(params)?;
343 load_cnt += 1;
344 }
345 info_kdb!("from {} load data cnt: {}", csv_path.display(), load_cnt);
346 Ok(load_cnt)
347 }
348
349 pub fn check_data(&self, table: &str, scope: (usize, usize)) -> KnowledgeResult<usize> {
350 let conn = self.conn.get().owe_res()?;
351 let count_sql = format!("select count(*) from {}", table);
352 let count: usize = conn
353 .query_row(count_sql.as_str(), (), |row| row.get(0))
354 .owe_rule()?;
355 if count >= scope.0 {
356 Ok(count)
357 } else {
358 Err(KnowledgeReason::from_conf()
359 .to_err()
360 .with_detail("table data less")
361 .with(("table", table))
362 .with(("count", count.to_string())))
363
364 }
372 }
373
374 pub fn global() -> Self {
375 Self {
376 conn: MEM_SQLITE_INS.clone(),
377 }
378 }
379 pub fn load_test() -> AnyResult<Self> {
380 let db = Self::global();
381 debug_kdb!("[memdb] load_test invoked");
382 db.table_create(EXAMPLE_CREATE_SQL)?;
383 let csv = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src/mem/dict/example.csv");
385 let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
386 db.table_load(EXAMPLE_INSERT_SQL, csv, vec![0, 1], 100)?;
387 if let Ok(cnt) = db.check_data("example", (1, usize::MAX)) {
389 debug_kdb!("[memdb] example rows loaded = {}", cnt);
390 }
391 Ok(db)
392 }
393}
394pub const EXAMPLE_CREATE_SQL: &str = r#"CREATE TABLE IF NOT EXISTS example (
395 id INTEGER PRIMARY KEY,
396 name TEXT NOT NULL,
397 pinying TEXT NOT NULL
398 )"#;
399pub const EXAMPLE_CLEAN_SQL: &str = "DELETE FROM example";
400pub const EXAMPLE_INSERT_SQL: &str = r#"INSERT INTO example(name,pinying) VALUES (?1, ?2 ) "#;
401
402#[cfg(test)]
403mod tests {
404
405 use std::{fs::File, io::Read};
406
407 use super::*;
408 use crate::mem::ToSqlParams;
410 use anyhow::Context;
411 use orion_conf::EnvTomlLoad;
412 use orion_variate::EnvDict;
413 use serde::Serialize;
414 use std::fs;
415 use wp_data_fmt::{Csv, RecordFormatter};
416
417 #[test]
418 fn test_load() -> AnyResult<()> {
419 let db = MemDB::instance();
420 db.table_create(EXAMPLE_CREATE_SQL)?;
421 let loaded = db.table_load(
422 EXAMPLE_INSERT_SQL,
423 PathBuf::from("src/mem/dict/example.csv"),
424 vec![0, 1],
425 100,
426 )?;
427 assert_eq!(loaded, 10);
428 let fmt = Csv::default();
429 let tdos = db.query_row("select * from example;")?;
430 for obj in tdos {
431 println!("{}", fmt.fmt_field(&obj.into()));
432 }
433 Ok(())
434 }
435
436 #[test]
437 fn test_csv_off_by_one() -> AnyResult<()> {
438 let db = MemDB::instance();
439 db.table_create(EXAMPLE_CREATE_SQL)?;
440 let loaded = db.table_load(
442 EXAMPLE_INSERT_SQL,
443 PathBuf::from("src/mem/dict/example.csv"),
444 vec![0, 1],
445 1,
446 )?;
447 assert_eq!(loaded, 1);
448 Ok(())
449 }
450
451 #[test]
452 fn test_row_null_mapping() -> AnyResult<()> {
453 let db = MemDB::instance();
454 db.execute("CREATE TABLE tnull (v TEXT)")?;
455 db.execute("INSERT INTO tnull (v) VALUES (NULL)")?;
456 let row = db.query_row("SELECT v FROM tnull")?;
457 assert_eq!(row.len(), 1);
458 assert_eq!(row[0].get_name(), "v");
459 assert!(matches!(row[0].get_value(), model::Value::Null));
461 Ok(())
462 }
463
464 #[test]
465 fn test_row_blob_mapping() -> AnyResult<()> {
466 let db = MemDB::instance();
467 db.execute("CREATE TABLE tblob (b BLOB)")?;
468 db.execute("INSERT INTO tblob (b) VALUES (X'414243')")?;
470 let row = db.query_row("SELECT b FROM tblob")?;
471 assert_eq!(row.len(), 1);
472 assert_eq!(row[0].get_name(), "b");
473 assert_eq!(row[0].to_string(), "chars(ABC)");
475 Ok(())
476 }
477
478 #[test]
479 fn test_csv_missing_column_error() -> AnyResult<()> {
480 use std::fs;
481 use std::io::Write;
482 let db = MemDB::instance();
483 db.table_create(EXAMPLE_CREATE_SQL)?;
484 let mut path = std::env::temp_dir();
486 path.push("wp_knowledge_csv_missing_col.csv");
487 {
488 let mut f = fs::File::create(&path)?;
489 writeln!(f, "name")?;
490 writeln!(f, "only_one_col")?;
491 }
492 let res = db.table_load(
493 EXAMPLE_INSERT_SQL,
494 path.clone(),
495 vec![0, 1], 10,
497 );
498 assert!(res.is_err());
499 let e = format!("{}", res.err().unwrap());
500 assert!(e.contains("line"));
501 assert!(e.contains("insufficient columns"));
502 let _ = fs::remove_file(&path);
504 Ok(())
505 }
506
507 #[test]
508 fn test_global_persistence_across_handles() -> AnyResult<()> {
509 {
511 let db1 = MemDB::global();
512 db1.execute("CREATE TABLE IF NOT EXISTS gtest (v TEXT)")?;
513 db1.execute("INSERT INTO gtest (v) VALUES ('ok')")?;
514 }
515 {
517 let db2 = MemDB::global();
518 let rows = db2.query_row("SELECT v FROM gtest")?;
519 assert_eq!(rows.len(), 1);
520 assert_eq!(rows[0].to_string(), "chars(ok)");
521 }
522 Ok(())
523 }
524
525 #[test]
526 fn test_init_by_conf() -> AnyResult<()> {
527 let db = MemDB::global();
528 db.table_create(EXAMPLE_CREATE_SQL)?;
529 let _ = db.table_clean(EXAMPLE_CLEAN_SQL);
530 db.table_load(
531 EXAMPLE_INSERT_SQL,
532 PathBuf::from("src/mem/dict/example.csv"),
533 vec![0, 1],
534 100,
535 )?;
536 Ok(())
537 }
538
539 #[test]
542 fn test_alter_level() -> anyhow::Result<()> {
543 let db = MemDB::global();
544 let _ = db.execute("DROP TABLE IF EXISTS alert_cat_level");
546 db.table_create(
547 r#"CREATE TABLE IF NOT EXISTS alert_cat_level (
548 id INTEGER PRIMARY KEY,
549 log_type TEXT NOT NULL,
550 level1_code TEXT NOT NULL,
551 level1_name TEXT NOT NULL,
552 level2_code TEXT NOT NULL,
553 level2_name TEXT NOT NULL,
554 original_code TEXT NOT NULL,
555 original_name TEXT NOT NULL
556 )"#,
557 )?;
558 let _ = db.table_clean("DELETE FROM alert_cat_level");
559 db.table_load(
560 r#"INSERT INTO alert_cat_level (log_type, level1_code, level1_name, level2_code, level2_name, original_code, original_name) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
561 PathBuf::from("src/mem/dict/event_cat_level.csv"),
562 vec![0, 1, 2, 3, 4, 5, 6],
563 2000,
564 )?;
565
566 let sql = "select level1_code from alert_cat_level where log_type = :log_type and original_code = :code ";
567 let result = db.query_row_params(
568 sql,
570 &[(":log_type", "app_log"), (":code", "00000002")],
571 )?;
572 assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
573
574 let px = [
575 SqlNamedParam(DataField::from_chars(":code", "00000002")),
576 SqlNamedParam(DataField::from_chars(":log_type", "app_log")),
577 ];
578
579 let p = px.to_params();
580 let result = db.query_row_params(sql, &p)?;
581 assert_eq!(result, vec![DataField::from_chars("level1_code", "105")]);
582
583 Ok(())
584 }
585
586 #[test]
587 fn test_tosql_bind_various_types() -> AnyResult<()> {
588 use chrono::NaiveDate;
589 use std::net::{IpAddr, Ipv4Addr};
590 use wp_model_core::model::types::value::ObjectValue;
591 use wp_model_core::model::{DateTimeValue, HexT};
592
593 let db = MemDB::instance();
594 db.execute("CREATE TABLE p (v)")?;
595
596 {
598 let sql = "INSERT INTO p (v) VALUES (:v)";
599 let p = [SqlNamedParam(DataField::from_bool(":v", true))];
600 db.query_row_params(sql, &p.to_params())?;
601 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
602 assert!(matches!(row[0].get_value(), model::Value::Digit(1)));
603 }
604 {
606 let sql = "INSERT INTO p (v) VALUES (:v)";
607 let p = [SqlNamedParam(DataField::new(
608 model::DataType::default(),
609 ":v",
610 model::Value::Null,
611 ))];
612 db.query_row_params(sql, &p.to_params())?;
613 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
614 assert!(matches!(row[0].get_value(), model::Value::Null));
615 }
616 {
618 let dt: DateTimeValue = NaiveDate::from_ymd_opt(2023, 1, 1)
619 .unwrap()
620 .and_hms_opt(0, 0, 0)
621 .unwrap();
622 let sql = "INSERT INTO p (v) VALUES (:v)";
623 let p = [SqlNamedParam(DataField::from_time(":v", dt))];
624 db.query_row_params(sql, &p.to_params())?;
625 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
626 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
627 }
628 {
630 let sql = "INSERT INTO p (v) VALUES (:v)";
631 let p = [SqlNamedParam(DataField::from_hex(":v", HexT(0xABCD)))];
632 db.query_row_params(sql, &p.to_params())?;
633 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
634 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
635 }
636 {
638 let sql = "INSERT INTO p (v) VALUES (:v)";
639 let p = [SqlNamedParam(DataField::from_ip(
640 ":v",
641 IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)),
642 ))];
643 db.query_row_params(sql, &p.to_params())?;
644 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
645 assert_eq!(row[0].to_string(), "chars(1.2.3.4)");
646 }
647 {
649 let mut obj = ObjectValue::new();
650 obj.insert("k".to_string(), DataField::from_chars("", "v"));
651 let sql = "INSERT INTO p (v) VALUES (:v)";
652 let p = [SqlNamedParam(DataField::from_obj(":v", obj))];
653 db.query_row_params(sql, &p.to_params())?;
654 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
655 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
656 }
657 {
659 let arr = vec![DataField::from_chars("", "a"), DataField::from_digit("", 1)];
660 let sql = "INSERT INTO p (v) VALUES (:v)";
661 let p = [SqlNamedParam(DataField::from_arr(":v", arr))];
662 db.query_row_params(sql, &p.to_params())?;
663 let row = db.query_row("SELECT v FROM p ORDER BY rowid DESC LIMIT 1")?;
664 assert!(matches!(row[0].get_value(), model::Value::Chars(_)));
665 }
666 Ok(())
667 }
668
669 #[test]
670 fn test_column_alias_names() -> AnyResult<()> {
671 let db = MemDB::instance();
672 db.execute("CREATE TABLE ctest (a INTEGER, b TEXT)")?;
674 db.execute("INSERT INTO ctest (a,b) VALUES (42,'x')")?;
675 let row = db.query_row("SELECT a AS 'the number', b AS 'the text' FROM ctest LIMIT 1")?;
676 assert_eq!(row.len(), 2);
677 assert_eq!(row[0].get_name(), "the number");
678 assert_eq!(row[1].get_name(), "the text");
679 Ok(())
680 }
681
682 #[test]
683 fn test_concurrent_inserts() -> AnyResult<()> {
684 use std::thread;
685 let db = MemDB::global();
686 db.execute("CREATE TABLE IF NOT EXISTS concur (v INTEGER)")?;
687 let threads: Vec<_> = (0..4)
688 .map(|_| {
689 thread::spawn(|| {
690 let dbt = MemDB::global();
691 for _ in 0..10 {
692 let _ = dbt.execute("INSERT INTO concur (v) VALUES (1)");
693 }
694 })
695 })
696 .collect();
697 for t in threads {
698 t.join().unwrap();
699 }
700 let row = db.query_row("SELECT SUM(v) AS total FROM concur")?;
701 assert_eq!(row[0].to_string(), "digit(40)");
703 Ok(())
704 }
705
706 #[test]
707 fn test_query_returns_all_rows() -> AnyResult<()> {
708 let db = MemDB::instance();
709 db.execute("CREATE TABLE multi (id INTEGER, name TEXT)")?;
710 let rows = db.query("SELECT * FROM multi")?;
711 assert!(rows.is_empty(), "empty table should return empty vec");
712 db.execute("INSERT INTO multi (id, name) VALUES (1, 'alice')")?;
713 db.execute("INSERT INTO multi (id, name) VALUES (2, 'bob')")?;
714 db.execute("INSERT INTO multi (id, name) VALUES (3, 'charlie')")?;
715
716 let rows = db.query("SELECT id, name FROM multi ORDER BY id")?;
717 assert_eq!(rows.len(), 3, "should return all 3 rows");
718
719 Ok(())
720 }
721
722 #[allow(dead_code)]
723 fn load_toml_conf<T: serde::de::DeserializeOwned>(path: &str) -> AnyResult<T> {
724 let mut f = File::open(path).with_context(|| format!("conf file not found: {}", path))?;
725 let mut buffer = Vec::with_capacity(10240);
726 f.read_to_end(&mut buffer).expect("read conf file error");
727 let conf_data = String::from_utf8(buffer)?;
728 let dict = EnvDict::new();
729 let conf: T = T::env_parse_toml(conf_data.as_str(), &dict)?;
730 Ok(conf)
731 }
732
733 #[allow(dead_code)]
734 fn export_toml_local<T: Serialize>(val: &T, path: &str) -> AnyResult<()> {
735 let data = toml::to_string_pretty(val)?;
736 if let Some(parent) = std::path::Path::new(path).parent() {
737 fs::create_dir_all(parent)?;
738 }
739 fs::write(path, data)?;
740 Ok(())
741 }
742}