1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5 collections::hash_map::DefaultHasher,
6 hash::{Hash, Hasher},
7};
8
9use crate::error::KnowledgeResult;
10use async_trait::async_trait;
11use rusqlite::ToSql;
12use rusqlite::{Connection, OpenFlags};
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, SqlProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25 CachePolicy, CachedRedisValue, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor,
26 QueryRequest, QueryResponse, RedisCacheKey, RedisCmdTag, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29 CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30 telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34 db: MemDB,
35 metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41 ThreadClonedMDB::query_with_scope(self, sql)
42 }
43
44 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45 ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46 }
47
48 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49 ThreadClonedMDB::query_row_with_scope(self, sql)
50 }
51
52 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53 ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54 }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60 self.db.query_with_scope(&self.metadata_scope, sql)
61 }
62
63 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64 self.db
65 .query_fields_with_scope(&self.metadata_scope, sql, params)
66 }
67
68 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69 self.db.query_row_with_scope(&self.metadata_scope, sql)
70 }
71
72 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73 self.db
74 .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75 }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81 PostgresProvider::query(self, sql)
82 }
83
84 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85 PostgresProvider::query_fields(self, sql, params)
86 }
87
88 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89 PostgresProvider::query_row(self, sql)
90 }
91
92 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93 PostgresProvider::query_named_fields(self, sql, params)
94 }
95
96 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97 PostgresProvider::query_async(self, sql).await
98 }
99
100 async fn query_fields_async(
101 &self,
102 sql: &str,
103 params: &[DataField],
104 ) -> KnowledgeResult<Vec<RowData>> {
105 PostgresProvider::query_fields_async(self, sql, params).await
106 }
107
108 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109 PostgresProvider::query_row_async(self, sql).await
110 }
111
112 async fn query_named_fields_async(
113 &self,
114 sql: &str,
115 params: &[DataField],
116 ) -> KnowledgeResult<RowData> {
117 PostgresProvider::query_named_fields_async(self, sql, params).await
118 }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124 MySqlProvider::query(self, sql)
125 }
126
127 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128 MySqlProvider::query_fields(self, sql, params)
129 }
130
131 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132 MySqlProvider::query_row(self, sql)
133 }
134
135 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136 MySqlProvider::query_named_fields(self, sql, params)
137 }
138
139 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140 MySqlProvider::query_async(self, sql).await
141 }
142
143 async fn query_fields_async(
144 &self,
145 sql: &str,
146 params: &[DataField],
147 ) -> KnowledgeResult<Vec<RowData>> {
148 MySqlProvider::query_fields_async(self, sql, params).await
149 }
150
151 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152 MySqlProvider::query_row_async(self, sql).await
153 }
154
155 async fn query_named_fields_async(
156 &self,
157 sql: &str,
158 params: &[DataField],
159 ) -> KnowledgeResult<RowData> {
160 MySqlProvider::query_named_fields_async(self, sql, params).await
161 }
162}
163
164fn install_provider<F>(
165 kind: ProviderKind,
166 datasource_id: DatasourceId,
167 build: F,
168) -> KnowledgeResult<()>
169where
170 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172 runtime().install_provider(kind, datasource_id, build)?;
173 Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177 DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181 let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182 install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183 Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184 authority_uri,
185 datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186 generation.0,
187 )))
188 })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192 install_provider(
193 ProviderKind::SqliteAuthority,
194 datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195 |generation| {
196 Ok(Arc::new(MemProvider {
197 db: memdb,
198 metadata_scope: MetadataCacheScope {
199 datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200 generation,
201 },
202 }))
203 },
204 )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208 init_postgres_provider_with_config(
209 PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
210 )
211}
212
213pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
214 let connection_uri = config.connection_uri().to_string();
215 let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
216 install_provider(
217 ProviderKind::Postgres,
218 datasource_id.clone(),
219 |generation| {
220 let provider = PostgresProvider::connect(
221 &config,
222 MetadataCacheScope {
223 datasource_id: datasource_id.clone(),
224 generation,
225 },
226 )?;
227 Ok(Arc::new(provider))
228 },
229 )
230}
231
232pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
233 init_mysql_provider_with_config(
234 MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
235 )
236}
237
238pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
239 let connection_uri = config.connection_uri().to_string();
240 let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
241 install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
242 let provider = MySqlProvider::connect(
243 &config,
244 MetadataCacheScope {
245 datasource_id: datasource_id.clone(),
246 generation,
247 },
248 )?;
249 Ok(Arc::new(provider))
250 })
251}
252
253pub fn redis_bf_exists(key: &str, item: &str) -> KnowledgeResult<bool> {
262 if let Some(generation) = current_generation() {
263 let ck = redis_cache_key(RedisCmdTag::BfExists, generation, key, &[item]);
264 if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck, key) {
265 return Ok(v);
266 }
267 let result = crate::redis::bf_exists("knowdb", key, item)?;
268 runtime().redis_cache_put(ck, key, CachedRedisValue::Bool(result));
269 return Ok(result);
270 }
271 crate::redis::bf_exists("knowdb", key, item)
272}
273
274pub fn redis_hget(key: &str, field: &str) -> KnowledgeResult<Option<String>> {
278 if let Some(generation) = current_generation() {
279 let ck = redis_cache_key(RedisCmdTag::HGet, generation, key, &[field]);
280 if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck, key) {
281 return Ok(v.clone());
282 }
283 let result = crate::redis::hget("knowdb", key, field)?;
284 runtime().redis_cache_put(ck, key, CachedRedisValue::OptString(result.clone()));
285 return Ok(result);
286 }
287 crate::redis::hget("knowdb", key, field)
288}
289
290pub fn redis_get(key: &str) -> KnowledgeResult<Option<String>> {
294 if let Some(generation) = current_generation() {
295 let ck = redis_cache_key(RedisCmdTag::Get, generation, key, &[]);
296 if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck, key) {
297 return Ok(v.clone());
298 }
299 let result = crate::redis::get("knowdb", key)?;
300 runtime().redis_cache_put(ck, key, CachedRedisValue::OptString(result.clone()));
301 return Ok(result);
302 }
303 crate::redis::get("knowdb", key)
304}
305
306pub fn redis_set_exists(key: &str, member: &str) -> KnowledgeResult<bool> {
308 if let Some(generation) = current_generation() {
309 let ck = redis_cache_key(RedisCmdTag::SetExists, generation, key, &[member]);
310 if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck, key) {
311 return Ok(v);
312 }
313 let result = crate::redis::set_exists("knowdb", key, member)?;
314 runtime().redis_cache_put(ck, key, CachedRedisValue::Bool(result));
315 return Ok(result);
316 }
317 crate::redis::set_exists("knowdb", key, member)
318}
319
320pub fn redis_bf_add(key: &str, items: &[&str]) -> KnowledgeResult<Vec<bool>> {
325 let owned: Vec<String> = items.iter().map(|s| s.to_string()).collect();
326 crate::redis::bf_madd("knowdb", key, &owned)
327}
328
329pub fn redis_bf_create(key: &str, error_rate: f64, capacity: i64) -> KnowledgeResult<()> {
335 crate::redis::bf_reserve("knowdb", key, error_rate, capacity)
336}
337
338#[allow(dead_code)]
339pub(crate) fn redis_ping() -> KnowledgeResult<bool> {
340 crate::redis::ping_blocking("knowdb")
341}
342
343#[allow(dead_code)]
344pub(crate) fn redis_close() -> KnowledgeResult<()> {
345 crate::redis::close(Some("knowdb"))
346}
347
348fn redis_cache_key(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
349 use std::collections::hash_map::DefaultHasher;
350 use std::hash::{Hash, Hasher};
351 let mut hasher = DefaultHasher::new();
352 key.hash(&mut hasher);
353 let key_hash = hasher.finish();
354 let mut hasher = DefaultHasher::new();
355 for arg in args {
356 arg.hash(&mut hasher);
357 }
358 let args_hash = hasher.finish();
359 RedisCacheKey {
360 generation,
361 cmd_tag: cmd,
362 key_hash,
363 args_hash,
364 }
365}
366
367pub fn current_generation() -> Option<u64> {
370 runtime()
371 .current_generation()
372 .map(|generation| generation.0)
373}
374
375pub fn runtime_snapshot() -> RuntimeSnapshot {
376 runtime().snapshot()
377}
378
379pub fn install_runtime_telemetry(
380 telemetry_impl: Arc<dyn KnowledgeTelemetry>,
381) -> Arc<dyn KnowledgeTelemetry> {
382 install_telemetry(telemetry_impl)
383}
384
385pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
386 runtime()
387 .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
388 .map(QueryResponse::into_rows)
389}
390
391pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
392 runtime()
393 .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
394 .await
395 .map(QueryResponse::into_rows)
396}
397
398pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
399 runtime()
400 .execute(&QueryRequest::first_row(
401 sql,
402 Vec::new(),
403 CachePolicy::Bypass,
404 ))
405 .map(QueryResponse::into_row)
406}
407
408pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
409 runtime()
410 .execute_async(&QueryRequest::first_row(
411 sql,
412 Vec::new(),
413 CachePolicy::Bypass,
414 ))
415 .await
416 .map(QueryResponse::into_row)
417}
418
419pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
420 runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
421}
422
423pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
424 runtime()
425 .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
426 .await
427}
428
429pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
430 query_fields(sql, params)
431}
432
433pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
434 query_fields_async(sql, params).await
435}
436
437pub fn query_named<'a>(
438 sql: &str,
439 params: &'a [(&'a str, &'a dyn ToSql)],
440) -> KnowledgeResult<RowData> {
441 let fields = named_params_to_fields(params)?;
442 query_fields(sql, &fields)
443}
444
445pub fn cache_query_fields<const N: usize>(
446 sql: &str,
447 c_params: &[DataField; N],
448 query_params: &[DataField],
449 cache: &mut impl CacheAble<DataField, RowData, N>,
450) -> RowData {
451 cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
452}
453
454pub fn cache_query_fields_with_scope<const N: usize>(
455 sql: &str,
456 local_cache_scope: u64,
457 c_params: &[DataField; N],
458 query_params: &[DataField],
459 cache: &mut impl CacheAble<DataField, RowData, N>,
460) -> RowData {
461 if let Some(generation) = current_generation() {
462 cache.prepare_generation(generation);
463 }
464 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
465 runtime().record_local_cache_hit();
466 if telemetry_enabled() {
467 telemetry().on_cache(&CacheTelemetryEvent {
468 layer: CacheLayer::Local,
469 outcome: CacheOutcome::Hit,
470 provider_kind: runtime().current_provider_kind(),
471 });
472 }
473 return hit.clone();
474 }
475 runtime().record_local_cache_miss();
476 if telemetry_enabled() {
477 telemetry().on_cache(&CacheTelemetryEvent {
478 layer: CacheLayer::Local,
479 outcome: CacheOutcome::Miss,
480 provider_kind: runtime().current_provider_kind(),
481 });
482 }
483
484 match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
485 Ok(row) => {
486 cache.save_scoped(local_cache_scope, c_params, row.clone());
487 row
488 }
489 Err(err) => {
490 warn_kdb!("[kdb] query error: {}", err);
491 Vec::new()
492 }
493 }
494}
495
496pub async fn cache_query_fields_async<const N: usize>(
497 sql: &str,
498 c_params: &[DataField; N],
499 query_params: &[DataField],
500 cache: &mut impl CacheAble<DataField, RowData, N>,
501) -> RowData {
502 cache_query_fields_async_with_scope(
503 sql,
504 stable_hash(sql),
505 c_params,
506 || query_params.to_vec(),
507 cache,
508 )
509 .await
510}
511
512pub async fn cache_query_fields_async_with<const N: usize, F>(
513 sql: &str,
514 c_params: &[DataField; N],
515 build_query_params: F,
516 cache: &mut impl CacheAble<DataField, RowData, N>,
517) -> RowData
518where
519 F: FnOnce() -> Vec<DataField>,
520{
521 cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
522 .await
523}
524
525pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
526 sql: &str,
527 local_cache_scope: u64,
528 c_params: &[DataField; N],
529 build_query_params: F,
530 cache: &mut impl CacheAble<DataField, RowData, N>,
531) -> RowData
532where
533 F: FnOnce() -> Vec<DataField>,
534{
535 if let Some(generation) = current_generation() {
536 cache.prepare_generation(generation);
537 }
538 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
539 runtime().record_local_cache_hit();
540 if telemetry_enabled() {
541 telemetry().on_cache(&CacheTelemetryEvent {
542 layer: CacheLayer::Local,
543 outcome: CacheOutcome::Hit,
544 provider_kind: runtime().current_provider_kind(),
545 });
546 }
547 return hit.clone();
548 }
549 runtime().record_local_cache_miss();
550 if telemetry_enabled() {
551 telemetry().on_cache(&CacheTelemetryEvent {
552 layer: CacheLayer::Local,
553 outcome: CacheOutcome::Miss,
554 provider_kind: runtime().current_provider_kind(),
555 });
556 }
557
558 let query_params = build_query_params();
559 match runtime()
560 .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
561 .await
562 {
563 Ok(row) => {
564 cache.save_scoped(local_cache_scope, c_params, row.clone());
565 row
566 }
567 Err(err) => {
568 warn_kdb!("[kdb] query error: {}", err);
569 Vec::new()
570 }
571 }
572}
573
574pub fn cache_query<const N: usize>(
575 sql: &str,
576 c_params: &[DataField; N],
577 named_params: &[(&str, &dyn ToSql)],
578 cache: &mut impl CacheAble<DataField, RowData, N>,
579) -> RowData {
580 let query_fields = match named_params_to_fields(named_params) {
581 Ok(fields) => fields,
582 Err(err) => {
583 warn_kdb!("[kdb] query param conversion error: {}", err);
584 return Vec::new();
585 }
586 };
587
588 cache_query_fields(sql, c_params, &query_fields, cache)
589}
590
591pub async fn cache_query_async<const N: usize>(
592 sql: &str,
593 c_params: &[DataField; N],
594 named_params: &[(&str, &dyn ToSql)],
595 cache: &mut impl CacheAble<DataField, RowData, N>,
596) -> RowData {
597 let query_fields = match named_params_to_fields(named_params) {
598 Ok(fields) => fields,
599 Err(err) => {
600 warn_kdb!("[kdb] query param conversion error: {}", err);
601 return Vec::new();
602 }
603 };
604
605 cache_query_fields_async(sql, c_params, &query_fields, cache).await
606}
607
608fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
609 if let Ok(conn) = Connection::open_with_flags(
610 authority_uri,
611 OpenFlags::SQLITE_OPEN_READ_WRITE
612 | OpenFlags::SQLITE_OPEN_CREATE
613 | OpenFlags::SQLITE_OPEN_URI,
614 ) {
615 let _ = conn.execute_batch(
616 "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
617 );
618 }
619 Ok(())
620}
621
622pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
623 ensure_wal(authority_uri)?;
624 let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
625 let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
626 init_mem_provider(mem)
627}
628
629pub fn init_thread_cloned_from_knowdb(
630 root: &Path,
631 knowdb_conf: &Path,
632 authority_uri: &str,
633 dict: &orion_variate::EnvDict,
634) -> KnowledgeResult<()> {
635 let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
636 if let Some(provider_cfg) = conf.provider() {
637 if let Some(sqldb) = provider_cfg.sqldb {
639 match sqldb.kind {
640 SqlProviderKind::Postgres => {
641 info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
642 init_postgres_provider_with_config(
643 PostgresProviderConfig::new(sqldb.connection_uri)
644 .with_pool_size(sqldb.pool_size)
645 .with_min_connections(sqldb.min_connections)
646 .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
647 .with_idle_timeout_ms(sqldb.idle_timeout_ms)
648 .with_max_lifetime_ms(sqldb.max_lifetime_ms),
649 )?;
650 runtime().configure_result_cache(
651 conf.cache.enabled,
652 conf.cache.capacity,
653 Duration::from_millis(conf.cache.ttl_ms.max(1)),
654 );
655 return Ok(());
656 }
657 SqlProviderKind::Mysql => {
658 info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
659 init_mysql_provider_with_config(
660 MySqlProviderConfig::new(sqldb.connection_uri)
661 .with_pool_size(sqldb.pool_size)
662 .with_min_connections(sqldb.min_connections)
663 .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
664 .with_idle_timeout_ms(sqldb.idle_timeout_ms)
665 .with_max_lifetime_ms(sqldb.max_lifetime_ms),
666 )?;
667 runtime().configure_result_cache(
668 conf.cache.enabled,
669 conf.cache.capacity,
670 Duration::from_millis(conf.cache.ttl_ms.max(1)),
671 );
672 return Ok(());
673 }
674 }
675 }
676 if let Some(redis_cfg) = provider_cfg.redis {
678 info_ctrl!("init redis knowdb provider({}) ", conf_abs.display(),);
679 crate::redis::init_with_opts(
680 "knowdb",
681 &redis_cfg.connection_uri,
682 redis_cfg.pool_size,
683 redis_cfg.command_timeout_ms,
684 )?;
685 runtime().configure_redis_cache(
686 conf.cache.enabled,
687 conf.cache.capacity,
688 conf.cache.redis_key_map(),
689 );
690 return Ok(());
691 }
692 }
693
694 crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
695 let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
696 let path_part = rest.split('?').next().unwrap_or(rest);
697 format!("file:{}?mode=ro&uri=true", path_part)
698 } else {
699 authority_uri.to_string()
700 };
701
702 info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
703 init_thread_cloned_from_authority(&ro_uri)?;
704 runtime().configure_result_cache(
705 conf.cache.enabled,
706 conf.cache.capacity,
707 Duration::from_millis(conf.cache.ttl_ms.max(1)),
708 );
709 Ok(())
710}
711
712fn stable_hash(value: &str) -> u64 {
713 let mut hasher = DefaultHasher::new();
714 value.hash(&mut hasher);
715 hasher.finish()
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721 use crate::cache::FieldQueryCache;
722 use crate::error::KnowReason;
723 use crate::mem::memdb::MemDB;
724 use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
725 use crate::runtime::fields_to_params;
726 use crate::telemetry::{
727 CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
728 ReloadTelemetryEvent, reset_telemetry,
729 };
730 use orion_error::conversion::ToStructError;
731 use orion_variate::EnvDict;
732 use std::fs;
733 use std::hint::black_box;
734 use std::path::PathBuf;
735 use std::sync::atomic::{AtomicU64, Ordering};
736 use std::time::{Duration, Instant};
737
738 #[derive(Default)]
739 struct TestTelemetry {
740 reload_success: AtomicU64,
741 reload_failure: AtomicU64,
742 local_hits: AtomicU64,
743 local_misses: AtomicU64,
744 result_hits: AtomicU64,
745 result_misses: AtomicU64,
746 metadata_hits: AtomicU64,
747 metadata_misses: AtomicU64,
748 query_success: AtomicU64,
749 query_failure: AtomicU64,
750 }
751
752 impl KnowledgeTelemetry for TestTelemetry {
753 fn on_cache(&self, event: &CacheTelemetryEvent) {
754 let counter = match (event.layer, event.outcome) {
755 (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
756 (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
757 (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
758 (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
759 (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
760 (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
761 };
762 counter.fetch_add(1, Ordering::Relaxed);
763 }
764
765 fn on_reload(&self, event: &ReloadTelemetryEvent) {
766 let counter = match event.outcome {
767 crate::telemetry::ReloadOutcome::Success => &self.reload_success,
768 crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
769 };
770 counter.fetch_add(1, Ordering::Relaxed);
771 }
772
773 fn on_query(&self, event: &QueryTelemetryEvent) {
774 let counter = if event.success {
775 &self.query_success
776 } else {
777 &self.query_failure
778 };
779 counter.fetch_add(1, Ordering::Relaxed);
780 }
781 }
782
783 fn perf_env_usize(key: &str, default: usize) -> usize {
784 std::env::var(key)
785 .ok()
786 .and_then(|value| value.parse::<usize>().ok())
787 .unwrap_or(default)
788 }
789
790 fn seed_perf_provider(rows: usize) {
791 let db = MemDB::instance();
792 db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
793 .expect("create perf_kv");
794 db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
795 for id in 1..=rows {
796 let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
797 db.execute(sql.as_str()).expect("insert perf_kv row");
798 }
799 db.execute("COMMIT").expect("commit perf_kv load");
800 init_mem_provider(db).expect("init perf provider");
801 }
802
803 #[tokio::test(flavor = "current_thread")]
804 async fn query_async_works_with_mem_provider() {
805 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
806 let db = MemDB::instance();
807 db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
808 .expect("create async_kv");
809 db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
810 .expect("insert async_kv row");
811 init_mem_provider(db).expect("init mem provider");
812
813 let row = query_fields_async(
814 "SELECT value FROM async_kv WHERE id=:id",
815 &[DataField::from_digit(":id", 1)],
816 )
817 .await
818 .expect("query async row");
819 assert_eq!(row.len(), 1);
820 assert_eq!(row[0].to_string(), "chars(hello)");
821 }
822
823 #[derive(Clone)]
824 struct PerfQuery {
825 cache_key: [DataField; 1],
826 query_params: [DataField; 1],
827 bypass_req: QueryRequest,
828 global_req: QueryRequest,
829 }
830
831 fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
832 (0..ops)
833 .map(|idx| {
834 let id = ((idx * 17) % hotset + 1) as i64;
835 let cache_key = [DataField::from_digit("id", id)];
836 let query_params = [DataField::from_digit(":id", id)];
837 let bypass_req = QueryRequest::first_row(
838 "SELECT value FROM perf_kv WHERE id=:id",
839 fields_to_params(&query_params),
840 CachePolicy::Bypass,
841 );
842 let global_req = QueryRequest::first_row(
843 "SELECT value FROM perf_kv WHERE id=:id",
844 fields_to_params(&query_params),
845 CachePolicy::UseGlobal,
846 );
847 PerfQuery {
848 cache_key,
849 query_params,
850 bypass_req,
851 global_req,
852 }
853 })
854 .collect()
855 }
856
857 #[derive(Debug, Clone, Copy)]
858 struct PerfCounters {
859 result_hits: u64,
860 result_misses: u64,
861 local_hits: u64,
862 local_misses: u64,
863 metadata_hits: u64,
864 metadata_misses: u64,
865 }
866
867 #[derive(Debug, Clone)]
868 struct PerfResult {
869 name: &'static str,
870 elapsed: Duration,
871 ops: usize,
872 counters: PerfCounters,
873 }
874
875 impl PerfResult {
876 fn qps(&self) -> f64 {
877 let secs = self.elapsed.as_secs_f64();
878 if secs == 0.0 {
879 self.ops as f64
880 } else {
881 self.ops as f64 / secs
882 }
883 }
884 }
885
886 fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
887 PerfCounters {
888 result_hits: after
889 .result_cache_hits
890 .saturating_sub(before.result_cache_hits),
891 result_misses: after
892 .result_cache_misses
893 .saturating_sub(before.result_cache_misses),
894 local_hits: after
895 .local_cache_hits
896 .saturating_sub(before.local_cache_hits),
897 local_misses: after
898 .local_cache_misses
899 .saturating_sub(before.local_cache_misses),
900 metadata_hits: after
901 .metadata_cache_hits
902 .saturating_sub(before.metadata_cache_hits),
903 metadata_misses: after
904 .metadata_cache_misses
905 .saturating_sub(before.metadata_cache_misses),
906 }
907 }
908
909 fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
910 seed_perf_provider(rows);
911 let before = runtime_snapshot();
912 let started = Instant::now();
913 for item in workload {
914 let row = runtime()
915 .execute(&item.bypass_req)
916 .expect("execute bypass request")
917 .into_row();
918 black_box(row);
919 }
920 let elapsed = started.elapsed();
921 let after = runtime_snapshot();
922 PerfResult {
923 name: "bypass",
924 elapsed,
925 ops: workload.len(),
926 counters: snapshot_delta(&before, &after),
927 }
928 }
929
930 fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
931 seed_perf_provider(rows);
932 let before = runtime_snapshot();
933 let started = Instant::now();
934 for item in workload {
935 let row = runtime()
936 .execute(&item.global_req)
937 .expect("execute global-cache request")
938 .into_row();
939 black_box(row);
940 }
941 let elapsed = started.elapsed();
942 let after = runtime_snapshot();
943 PerfResult {
944 name: "global_cache",
945 elapsed,
946 ops: workload.len(),
947 counters: snapshot_delta(&before, &after),
948 }
949 }
950
951 fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
952 seed_perf_provider(rows);
953 let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
954 let before = runtime_snapshot();
955 let started = Instant::now();
956 for item in workload {
957 let row = cache_query_fields(
958 "SELECT value FROM perf_kv WHERE id=:id",
959 &item.cache_key,
960 &item.query_params,
961 &mut cache,
962 );
963 black_box(row);
964 }
965 let elapsed = started.elapsed();
966 let after = runtime_snapshot();
967 PerfResult {
968 name: "local_cache",
969 elapsed,
970 ops: workload.len(),
971 counters: snapshot_delta(&before, &after),
972 }
973 }
974
975 fn print_perf_result(result: &PerfResult) {
976 eprintln!(
977 "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
978 result.name,
979 result.elapsed.as_millis(),
980 result.qps(),
981 result.counters.result_hits,
982 result.counters.result_misses,
983 result.counters.local_hits,
984 result.counters.local_misses,
985 result.counters.metadata_hits,
986 result.counters.metadata_misses,
987 );
988 }
989
990 fn uniq_cache_cfg_tmp_dir() -> PathBuf {
991 use rand::{Rng, rng};
992 let rnd: u64 = rng().next_u64();
993 std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
994 }
995
996 fn write_minimal_knowdb_with_cache(
997 root: &Path,
998 enabled: bool,
999 capacity: usize,
1000 ttl_ms: u64,
1001 ) -> std::path::PathBuf {
1002 let models = root.join("models").join("knowledge");
1003 let example_dir = models.join("example");
1004 fs::create_dir_all(&example_dir).expect("create knowdb models/example");
1005 fs::write(
1006 models.join("knowdb.toml"),
1007 format!(
1008 r#"
1009version = 2
1010base_dir = "."
1011
1012[cache]
1013enabled = {enabled}
1014capacity = {capacity}
1015ttl_ms = {ttl_ms}
1016
1017[csv]
1018has_header = false
1019
1020[[tables]]
1021name = "example"
1022columns.by_index = [0,1]
1023"#
1024 ),
1025 )
1026 .expect("write knowdb.toml");
1027 fs::write(
1028 example_dir.join("create.sql"),
1029 r#"
1030CREATE TABLE IF NOT EXISTS {table} (
1031 id INTEGER PRIMARY KEY,
1032 value TEXT NOT NULL
1033);
1034"#,
1035 )
1036 .expect("write create.sql");
1037 fs::write(
1038 example_dir.join("insert.sql"),
1039 "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
1040 )
1041 .expect("write insert.sql");
1042 fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
1043 models.join("knowdb.toml")
1044 }
1045
1046 fn write_provider_only_knowdb_with_cache(
1047 root: &Path,
1048 provider_kind: &str,
1049 connection_uri: &str,
1050 enabled: bool,
1051 capacity: usize,
1052 ttl_ms: u64,
1053 ) -> std::path::PathBuf {
1054 let models = root.join("models").join("knowledge");
1055 fs::create_dir_all(&models).expect("create knowdb models");
1056 fs::write(
1057 models.join("knowdb.toml"),
1058 format!(
1059 r#"
1060version = 2
1061base_dir = "."
1062
1063[cache]
1064enabled = {enabled}
1065capacity = {capacity}
1066ttl_ms = {ttl_ms}
1067
1068[provider.sqldb]
1069kind = "{provider_kind}"
1070connection_uri = "{connection_uri}"
1071"#
1072 ),
1073 )
1074 .expect("write provider knowdb.toml");
1075 models.join("knowdb.toml")
1076 }
1077
1078 fn restore_default_result_cache_config() {
1079 runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
1080 }
1081
1082 #[test]
1083 fn provider_can_be_replaced() {
1084 let _guard = crate::runtime::runtime_test_guard()
1085 .lock()
1086 .expect("provider test guard");
1087 let db1 = MemDB::instance();
1088 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1089 .expect("create table in db1");
1090 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1091 .expect("seed db1");
1092 init_mem_provider(db1).expect("init provider db1");
1093 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
1094 assert_eq!(row[0].to_string(), "chars(first)");
1095
1096 let db2 = MemDB::instance();
1097 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1098 .expect("create table in db2");
1099 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1100 .expect("seed db2");
1101 init_mem_provider(db2).expect("replace provider with db2");
1102 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
1103 assert_eq!(row[0].to_string(), "chars(second)");
1104 }
1105
1106 #[test]
1107 fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
1108 let _guard = crate::runtime::runtime_test_guard()
1109 .lock()
1110 .expect("provider test guard");
1111 COLNAME_CACHE.write().expect("metadata cache lock").clear();
1112
1113 let db = MemDB::instance();
1114 db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
1115 .expect("create table");
1116 db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
1117 .expect("seed table");
1118
1119 let old_scope = MetadataCacheScope {
1120 datasource_id: DatasourceId("sqlite:old".to_string()),
1121 generation: Generation(1),
1122 };
1123 let new_scope = MetadataCacheScope {
1124 datasource_id: DatasourceId("sqlite:new".to_string()),
1125 generation: Generation(2),
1126 };
1127 let old_provider = MemProvider {
1128 db: db.clone(),
1129 metadata_scope: old_scope.clone(),
1130 };
1131
1132 install_provider(
1133 ProviderKind::SqliteAuthority,
1134 new_scope.datasource_id.clone(),
1135 |_generation| {
1136 Ok(Arc::new(MemProvider {
1137 db: db.clone(),
1138 metadata_scope: new_scope.clone(),
1139 }))
1140 },
1141 )
1142 .expect("install new provider");
1143
1144 let row = old_provider
1145 .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1146 .expect("old provider query");
1147 assert_eq!(row[0].to_string(), "chars(scope-old)");
1148
1149 let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1150 assert!(cache.contains(&metadata_cache_key_for_scope(
1151 &old_scope,
1152 "SELECT value FROM cache_scope_t WHERE id = 1",
1153 )));
1154 assert!(!cache.contains(&metadata_cache_key_for_scope(
1155 &new_scope,
1156 "SELECT value FROM cache_scope_t WHERE id = 1",
1157 )));
1158 }
1159
1160 #[tokio::test(flavor = "current_thread")]
1161 async fn async_query_uses_runtime_bridge() {
1162 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1163 let db = MemDB::instance();
1164 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1165 .expect("create table");
1166 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1167 .expect("seed table");
1168 init_mem_provider(db).expect("init provider");
1169
1170 let row = query_row_async("SELECT value FROM t WHERE id = 1")
1171 .await
1172 .expect("async query row");
1173 assert_eq!(row[0].to_string(), "chars(async-first)");
1174 }
1175
1176 #[tokio::test(flavor = "current_thread")]
1177 async fn async_cache_query_fields_hits_local_cache() {
1178 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1179 let db = MemDB::instance();
1180 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1181 .expect("create table");
1182 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1183 .expect("seed table");
1184 init_mem_provider(db).expect("init provider");
1185
1186 let key = [DataField::from_digit("id", 1)];
1187 let params = [DataField::from_digit(":id", 1)];
1188 let mut cache = FieldQueryCache::default();
1189
1190 let first = cache_query_fields_async(
1191 "SELECT value FROM t WHERE id=:id",
1192 &key,
1193 ¶ms,
1194 &mut cache,
1195 )
1196 .await;
1197 let second = cache_query_fields_async(
1198 "SELECT value FROM t WHERE id=:id",
1199 &key,
1200 ¶ms,
1201 &mut cache,
1202 )
1203 .await;
1204
1205 assert_eq!(first[0].to_string(), "chars(async-cache)");
1206 assert_eq!(second[0].to_string(), "chars(async-cache)");
1207 }
1208
1209 #[test]
1210 fn local_cache_is_cleared_when_generation_changes() {
1211 let _guard = crate::runtime::runtime_test_guard()
1212 .lock()
1213 .expect("provider test guard");
1214 let db1 = MemDB::instance();
1215 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1216 .expect("create table in db1");
1217 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1218 .expect("seed db1");
1219 init_mem_provider(db1).expect("init provider db1");
1220
1221 let key = [DataField::from_digit("id", 1)];
1222 let params = [DataField::from_digit(":id", 1)];
1223 let mut cache = FieldQueryCache::default();
1224 let row = cache_query_fields(
1225 "SELECT value FROM t WHERE id=:id",
1226 &key,
1227 ¶ms,
1228 &mut cache,
1229 );
1230 assert_eq!(row[0].to_string(), "chars(first)");
1231
1232 let db2 = MemDB::instance();
1233 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1234 .expect("create table in db2");
1235 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1236 .expect("seed db2");
1237 init_mem_provider(db2).expect("replace provider with db2");
1238
1239 let row = cache_query_fields(
1240 "SELECT value FROM t WHERE id=:id",
1241 &key,
1242 ¶ms,
1243 &mut cache,
1244 );
1245 assert_eq!(row[0].to_string(), "chars(second)");
1246 }
1247
1248 #[test]
1249 fn local_cache_is_scoped_by_sql_text() {
1250 let _guard = crate::runtime::runtime_test_guard()
1251 .lock()
1252 .expect("provider test guard");
1253 let db = MemDB::instance();
1254 db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1255 .expect("create t1");
1256 db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1257 .expect("create t2");
1258 db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1259 .expect("seed t1");
1260 db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1261 .expect("seed t2");
1262 init_mem_provider(db).expect("init provider");
1263
1264 let key = [DataField::from_digit("id", 1)];
1265 let params = [DataField::from_digit(":id", 1)];
1266 let mut cache = FieldQueryCache::default();
1267
1268 let row = cache_query_fields(
1269 "SELECT value FROM t1 WHERE id=:id",
1270 &key,
1271 ¶ms,
1272 &mut cache,
1273 );
1274 assert_eq!(row[0].to_string(), "chars(first)");
1275
1276 let row = cache_query_fields(
1277 "SELECT value FROM t2 WHERE id=:id",
1278 &key,
1279 ¶ms,
1280 &mut cache,
1281 );
1282 assert_eq!(row[0].to_string(), "chars(second)");
1283 }
1284
1285 #[test]
1286 fn runtime_snapshot_tracks_generation_and_cache_size() {
1287 let _guard = crate::runtime::runtime_test_guard()
1288 .lock()
1289 .expect("provider test guard");
1290 let db = MemDB::instance();
1291 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1292 .expect("create table");
1293 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1294 .expect("seed table");
1295 init_mem_provider(db).expect("init provider");
1296
1297 let mut cache = FieldQueryCache::default();
1298 let key = [DataField::from_digit("id", 1)];
1299 let params = [DataField::from_digit(":id", 1)];
1300 let row = cache_query_fields(
1301 "SELECT value FROM t WHERE id=:id",
1302 &key,
1303 ¶ms,
1304 &mut cache,
1305 );
1306 assert_eq!(row[0].to_string(), "chars(first)");
1307
1308 let snapshot = runtime_snapshot();
1309 assert!(matches!(
1310 snapshot.provider_kind,
1311 Some(ProviderKind::SqliteAuthority)
1312 ));
1313 assert!(snapshot.generation.is_some());
1314 assert!(snapshot.result_cache_len >= 1);
1315 assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1316 assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1317 assert!(snapshot.reload_successes >= 1);
1318 }
1319
1320 #[test]
1321 fn metadata_cache_is_scoped_by_generation() {
1322 let _guard = crate::runtime::runtime_test_guard()
1323 .lock()
1324 .expect("provider test guard");
1325 let sql = "SELECT value FROM t WHERE id = 1";
1326 let before = runtime_snapshot().metadata_cache_len;
1327
1328 let db1 = MemDB::instance();
1329 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1330 .expect("create table in db1");
1331 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1332 .expect("seed db1");
1333 init_mem_provider(db1).expect("init provider db1");
1334 let row = query_row(sql).expect("query db1");
1335 assert_eq!(row[0].to_string(), "chars(first)");
1336 let after_first = runtime_snapshot().metadata_cache_len;
1337 assert!(
1338 after_first > before,
1339 "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1340 );
1341
1342 let db2 = MemDB::instance();
1343 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1344 .expect("create table in db2");
1345 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1346 .expect("seed db2");
1347 init_mem_provider(db2).expect("replace provider with db2");
1348 let row = query_row(sql).expect("query db2");
1349 assert_eq!(row[0].to_string(), "chars(second)");
1350 let after_second = runtime_snapshot().metadata_cache_len;
1351 assert!(
1352 after_second > after_first,
1353 "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1354 );
1355 }
1356
1357 #[test]
1358 fn failed_provider_reload_keeps_previous_provider() {
1359 let _guard = crate::runtime::runtime_test_guard()
1360 .lock()
1361 .expect("provider test guard");
1362 let db1 = MemDB::instance();
1363 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1364 .expect("create table in db1");
1365 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1366 .expect("seed db1");
1367 init_mem_provider(db1).expect("init provider db1");
1368 let before_generation = current_generation();
1369
1370 let reload_err = install_provider(
1371 ProviderKind::SqliteAuthority,
1372 datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1373 |_generation| {
1374 Err(KnowReason::from_logic()
1375 .to_err()
1376 .with_detail("expected reload failure"))
1377 },
1378 );
1379 assert!(reload_err.is_err());
1380
1381 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1382 assert_eq!(row[0].to_string(), "chars(first)");
1383 assert_eq!(current_generation(), before_generation);
1384 }
1385
1386 #[test]
1387 fn runtime_snapshot_records_cache_counters() {
1388 let _guard = crate::runtime::runtime_test_guard()
1389 .lock()
1390 .expect("provider test guard");
1391 let db = MemDB::instance();
1392 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1393 .expect("create table");
1394 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1395 .expect("seed table");
1396 init_mem_provider(db).expect("init provider");
1397
1398 let before = runtime_snapshot();
1399 let mut cache = FieldQueryCache::default();
1400 let key = [DataField::from_digit("id", 1)];
1401 let params = [DataField::from_digit(":id", 1)];
1402 let row = cache_query_fields(
1403 "SELECT value FROM t WHERE id=:id",
1404 &key,
1405 ¶ms,
1406 &mut cache,
1407 );
1408 assert_eq!(row[0].to_string(), "chars(first)");
1409 let row = cache_query_fields(
1410 "SELECT value FROM t WHERE id=:id",
1411 &key,
1412 ¶ms,
1413 &mut cache,
1414 );
1415 assert_eq!(row[0].to_string(), "chars(first)");
1416
1417 let after = runtime_snapshot();
1418 assert!(after.local_cache_hits > before.local_cache_hits);
1419 assert!(after.local_cache_misses > before.local_cache_misses);
1420 assert!(after.result_cache_misses > before.result_cache_misses);
1421 assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1422 }
1423
1424 #[test]
1425 fn telemetry_receives_reload_cache_and_query_events() {
1426 let _guard = crate::runtime::runtime_test_guard()
1427 .lock()
1428 .expect("provider test guard");
1429 let telemetry_impl = Arc::new(TestTelemetry::default());
1430 let previous = install_runtime_telemetry(telemetry_impl.clone());
1431
1432 let db = MemDB::instance();
1433 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1434 .expect("create table");
1435 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1436 .expect("seed table");
1437 init_mem_provider(db).expect("init provider");
1438
1439 let mut cache = FieldQueryCache::default();
1440 let key = [DataField::from_digit("id", 1)];
1441 let params = [DataField::from_digit(":id", 1)];
1442 let row = cache_query_fields(
1443 "SELECT value FROM t WHERE id=:id",
1444 &key,
1445 ¶ms,
1446 &mut cache,
1447 );
1448 assert_eq!(row[0].to_string(), "chars(first)");
1449 let row = cache_query_fields(
1450 "SELECT value FROM t WHERE id=:id",
1451 &key,
1452 ¶ms,
1453 &mut cache,
1454 );
1455 assert_eq!(row[0].to_string(), "chars(first)");
1456
1457 let reload_err = install_provider(
1458 ProviderKind::SqliteAuthority,
1459 datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1460 |_generation| {
1461 Err(KnowReason::from_logic()
1462 .to_err()
1463 .with_detail("expected telemetry reload failure"))
1464 },
1465 );
1466 assert!(reload_err.is_err());
1467
1468 install_runtime_telemetry(previous);
1469 reset_telemetry();
1470
1471 assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1472 assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1473 assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1474 assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1475 assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1476 assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1477 assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1478 }
1479
1480 #[test]
1481 #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1482 fn cache_perf_reports_cache_vs_no_cache() {
1483 let _guard = crate::runtime::runtime_test_guard()
1484 .lock()
1485 .expect("provider test guard");
1486 let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1487 let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1488 let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1489 let workload = build_perf_workload(ops, hotset);
1490
1491 eprintln!(
1492 "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1493 rows, ops, hotset
1494 );
1495
1496 let bypass = run_bypass_perf(&workload, rows);
1497 let global = run_global_cache_perf(&workload, rows);
1498 let local = run_local_cache_perf(&workload, rows);
1499
1500 print_perf_result(&bypass);
1501 print_perf_result(&global);
1502 print_perf_result(&local);
1503
1504 eprintln!(
1505 "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1506 bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1507 bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1508 );
1509
1510 assert_eq!(bypass.counters.result_hits, 0);
1511 assert_eq!(bypass.counters.result_misses, 0);
1512 assert_eq!(bypass.counters.local_hits, 0);
1513 assert_eq!(bypass.counters.local_misses, 0);
1514 assert!(global.counters.result_hits > 0);
1515 assert!(global.counters.result_misses > 0);
1516 assert_eq!(global.counters.local_hits, 0);
1517 assert_eq!(global.counters.local_misses, 0);
1518 assert!(local.counters.local_hits > 0);
1519 assert!(local.counters.local_misses > 0);
1520 assert!(local.counters.result_misses > 0);
1521 }
1522
1523 #[test]
1524 fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1525 let _guard = crate::runtime::runtime_test_guard()
1526 .lock()
1527 .expect("provider test guard");
1528 let root = uniq_cache_cfg_tmp_dir();
1529 let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1530 let auth_file = root.join(".run").join("authority.sqlite");
1531 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1532 .expect("create authority parent");
1533 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1534
1535 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1536 .expect("init knowdb with cache config");
1537
1538 let snapshot = runtime_snapshot();
1539 assert!(snapshot.result_cache_enabled);
1540 assert_eq!(snapshot.result_cache_capacity, 7);
1541 assert_eq!(snapshot.result_cache_ttl_ms, 5);
1542
1543 let req = QueryRequest::first_row(
1544 "SELECT value FROM example WHERE id=:id",
1545 fields_to_params(&[DataField::from_digit(":id", 1)]),
1546 CachePolicy::UseGlobal,
1547 );
1548 let before = runtime_snapshot();
1549 let row = runtime()
1550 .execute(&req)
1551 .expect("first result-cache query")
1552 .into_row();
1553 assert_eq!(row[0].to_string(), "chars(alpha)");
1554 let row = runtime()
1555 .execute(&req)
1556 .expect("second result-cache query")
1557 .into_row();
1558 assert_eq!(row[0].to_string(), "chars(alpha)");
1559 std::thread::sleep(Duration::from_millis(12));
1560 let row = runtime()
1561 .execute(&req)
1562 .expect("expired result-cache query")
1563 .into_row();
1564 assert_eq!(row[0].to_string(), "chars(alpha)");
1565 let after = runtime_snapshot();
1566
1567 assert!(after.result_cache_hits > before.result_cache_hits);
1568 assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1569
1570 restore_default_result_cache_config();
1571 let _ = fs::remove_dir_all(&root);
1572 }
1573
1574 #[test]
1575 fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1576 let _guard = crate::runtime::runtime_test_guard()
1577 .lock()
1578 .expect("provider test guard");
1579 let root = uniq_cache_cfg_tmp_dir();
1580 let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1581 let auth_file = root.join(".run").join("authority.sqlite");
1582 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1583 .expect("create authority parent");
1584 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1585
1586 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1587 .expect("init knowdb with cache disabled");
1588
1589 let snapshot = runtime_snapshot();
1590 assert!(!snapshot.result_cache_enabled);
1591 assert_eq!(snapshot.result_cache_capacity, 3);
1592 assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1593
1594 let req = QueryRequest::first_row(
1595 "SELECT value FROM example WHERE id=:id",
1596 fields_to_params(&[DataField::from_digit(":id", 1)]),
1597 CachePolicy::UseGlobal,
1598 );
1599 let before = runtime_snapshot();
1600 let _ = runtime()
1601 .execute(&req)
1602 .expect("first bypassed result-cache query");
1603 let _ = runtime()
1604 .execute(&req)
1605 .expect("second bypassed result-cache query");
1606 let after = runtime_snapshot();
1607
1608 assert_eq!(after.result_cache_hits, before.result_cache_hits);
1609 assert_eq!(after.result_cache_misses, before.result_cache_misses);
1610
1611 restore_default_result_cache_config();
1612 let _ = fs::remove_dir_all(&root);
1613 }
1614
1615 #[test]
1616 fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1617 let _guard = crate::runtime::runtime_test_guard()
1618 .lock()
1619 .expect("provider test guard");
1620 restore_default_result_cache_config();
1621
1622 let db = MemDB::instance();
1623 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1624 .expect("create table");
1625 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1626 .expect("seed table");
1627 init_mem_provider(db).expect("init provider");
1628
1629 let before = runtime_snapshot();
1630 let root = uniq_cache_cfg_tmp_dir();
1631 let conf_path = write_provider_only_knowdb_with_cache(
1632 &root,
1633 "mysql",
1634 "not-a-valid-mysql-url",
1635 false,
1636 3,
1637 5,
1638 );
1639 let authority_uri = format!(
1640 "file:{}?mode=rwc&uri=true",
1641 root.join("unused.sqlite").display()
1642 );
1643
1644 let err =
1645 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1646 assert!(err.is_err());
1647
1648 let after = runtime_snapshot();
1649 assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1650 assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1651 assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1652
1653 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1654 assert_eq!(row[0].to_string(), "chars(first)");
1655
1656 let _ = fs::remove_dir_all(&root);
1657 }
1658}