1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5 collections::HashMap,
6 collections::hash_map::DefaultHasher,
7 hash::{Hash, Hasher},
8};
9
10use crate::error::KnowledgeResult;
11use async_trait::async_trait;
12use rusqlite::ToSql;
13use rusqlite::{Connection, OpenFlags};
14use wp_log::{info_ctrl, warn_kdb};
15use wp_model_core::model::DataField;
16
17use crate::cache::CacheAble;
18use crate::loader::{ProviderKind, SqlProviderKind, parse_knowdb_conf};
19use crate::mem::RowData;
20use crate::mem::memdb::MemDB;
21use crate::mem::thread_clone::ThreadClonedMDB;
22use crate::mysql::{MySqlProvider, MySqlProviderConfig};
23use crate::param::named_params_to_fields;
24use crate::postgres::{PostgresProvider, PostgresProviderConfig};
25use crate::runtime::{
26 CachePolicy, CachedRedisValue, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor,
27 QueryRequest, QueryResponse, RedisCacheKey, RedisCmdTag, RuntimeSnapshot, runtime,
28};
29use crate::telemetry::{
30 CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
31 telemetry, telemetry_enabled,
32};
33
34struct MemProvider {
35 db: MemDB,
36 metadata_scope: MetadataCacheScope,
37}
38
39#[async_trait]
40impl ProviderExecutor for ThreadClonedMDB {
41 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
42 ThreadClonedMDB::query_with_scope(self, sql)
43 }
44
45 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
46 ThreadClonedMDB::query_fields_with_scope(self, sql, params)
47 }
48
49 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
50 ThreadClonedMDB::query_row_with_scope(self, sql)
51 }
52
53 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
54 ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
55 }
56}
57
58#[async_trait]
59impl ProviderExecutor for MemProvider {
60 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
61 self.db.query_with_scope(&self.metadata_scope, sql)
62 }
63
64 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
65 self.db
66 .query_fields_with_scope(&self.metadata_scope, sql, params)
67 }
68
69 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
70 self.db.query_row_with_scope(&self.metadata_scope, sql)
71 }
72
73 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
74 self.db
75 .query_named_fields_with_scope(&self.metadata_scope, sql, params)
76 }
77}
78
79#[async_trait]
80impl ProviderExecutor for PostgresProvider {
81 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
82 PostgresProvider::query(self, sql)
83 }
84
85 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
86 PostgresProvider::query_fields(self, sql, params)
87 }
88
89 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
90 PostgresProvider::query_row(self, sql)
91 }
92
93 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
94 PostgresProvider::query_named_fields(self, sql, params)
95 }
96
97 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
98 PostgresProvider::query_async(self, sql).await
99 }
100
101 async fn query_fields_async(
102 &self,
103 sql: &str,
104 params: &[DataField],
105 ) -> KnowledgeResult<Vec<RowData>> {
106 PostgresProvider::query_fields_async(self, sql, params).await
107 }
108
109 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
110 PostgresProvider::query_row_async(self, sql).await
111 }
112
113 async fn query_named_fields_async(
114 &self,
115 sql: &str,
116 params: &[DataField],
117 ) -> KnowledgeResult<RowData> {
118 PostgresProvider::query_named_fields_async(self, sql, params).await
119 }
120}
121
122#[async_trait]
123impl ProviderExecutor for MySqlProvider {
124 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
125 MySqlProvider::query(self, sql)
126 }
127
128 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
129 MySqlProvider::query_fields(self, sql, params)
130 }
131
132 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
133 MySqlProvider::query_row(self, sql)
134 }
135
136 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
137 MySqlProvider::query_named_fields(self, sql, params)
138 }
139
140 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
141 MySqlProvider::query_async(self, sql).await
142 }
143
144 async fn query_fields_async(
145 &self,
146 sql: &str,
147 params: &[DataField],
148 ) -> KnowledgeResult<Vec<RowData>> {
149 MySqlProvider::query_fields_async(self, sql, params).await
150 }
151
152 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
153 MySqlProvider::query_row_async(self, sql).await
154 }
155
156 async fn query_named_fields_async(
157 &self,
158 sql: &str,
159 params: &[DataField],
160 ) -> KnowledgeResult<RowData> {
161 MySqlProvider::query_named_fields_async(self, sql, params).await
162 }
163}
164
165fn install_provider<F>(
166 kind: ProviderKind,
167 datasource_id: DatasourceId,
168 build: F,
169) -> KnowledgeResult<()>
170where
171 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
172{
173 runtime().install_provider(kind, datasource_id, build)?;
174 Ok(())
175}
176
177fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
178 DatasourceId::from_seed(kind, seed)
179}
180
181pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
182 let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
183 install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
184 Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
185 authority_uri,
186 datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
187 generation.0,
188 )))
189 })
190}
191
192pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
193 install_provider(
194 ProviderKind::SqliteAuthority,
195 datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
196 |generation| {
197 Ok(Arc::new(MemProvider {
198 db: memdb,
199 metadata_scope: MetadataCacheScope {
200 datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
201 generation,
202 },
203 }))
204 },
205 )
206}
207
208pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
209 init_postgres_provider_with_config(
210 PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
211 )
212}
213
214pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
215 let connection_uri = config.connection_uri().to_string();
216 let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
217 install_provider(
218 ProviderKind::Postgres,
219 datasource_id.clone(),
220 |generation| {
221 let provider = PostgresProvider::connect(
222 &config,
223 MetadataCacheScope {
224 datasource_id: datasource_id.clone(),
225 generation,
226 },
227 )?;
228 Ok(Arc::new(provider))
229 },
230 )
231}
232
233pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
234 init_mysql_provider_with_config(
235 MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
236 )
237}
238
239pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
240 let connection_uri = config.connection_uri().to_string();
241 let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
242 install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
243 let provider = MySqlProvider::connect(
244 &config,
245 MetadataCacheScope {
246 datasource_id: datasource_id.clone(),
247 generation,
248 },
249 )?;
250 Ok(Arc::new(provider))
251 })
252}
253
254pub fn redis_bf_exists(key: &str, item: &str) -> KnowledgeResult<bool> {
263 if let Some(generation) = current_generation() {
264 let ck = redis_cache_key(RedisCmdTag::BfExists, generation, key, &[item]);
265 if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck) {
266 return Ok(v);
267 }
268 let result = crate::redis::bf_exists("knowdb", key, item)?;
269 runtime().redis_cache_put(ck, CachedRedisValue::Bool(result));
270 return Ok(result);
271 }
272 crate::redis::bf_exists("knowdb", key, item)
273}
274
275pub fn redis_hget(key: &str, field: &str) -> KnowledgeResult<Option<String>> {
279 if let Some(generation) = current_generation() {
280 let ck = redis_cache_key(RedisCmdTag::HGet, generation, key, &[field]);
281 if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck) {
282 return Ok(v.clone());
283 }
284 let result = crate::redis::hget("knowdb", key, field)?;
285 runtime().redis_cache_put(ck, CachedRedisValue::OptString(result.clone()));
286 return Ok(result);
287 }
288 crate::redis::hget("knowdb", key, field)
289}
290
291pub fn redis_get(key: &str) -> KnowledgeResult<Option<String>> {
295 if let Some(generation) = current_generation() {
296 let ck = redis_cache_key(RedisCmdTag::Get, generation, key, &[]);
297 if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck) {
298 return Ok(v.clone());
299 }
300 let result = crate::redis::get("knowdb", key)?;
301 runtime().redis_cache_put(ck, CachedRedisValue::OptString(result.clone()));
302 return Ok(result);
303 }
304 crate::redis::get("knowdb", key)
305}
306
307pub fn redis_set_exists(key: &str, member: &str) -> KnowledgeResult<bool> {
309 if let Some(generation) = current_generation() {
310 let ck = redis_cache_key(RedisCmdTag::SetExists, generation, key, &[member]);
311 if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck) {
312 return Ok(v);
313 }
314 let result = crate::redis::set_exists("knowdb", key, member)?;
315 runtime().redis_cache_put(ck, CachedRedisValue::Bool(result));
316 return Ok(result);
317 }
318 crate::redis::set_exists("knowdb", key, member)
319}
320
321pub fn redis_bf_add(key: &str, items: &[&str]) -> KnowledgeResult<Vec<bool>> {
326 let owned: Vec<String> = items.iter().map(|s| s.to_string()).collect();
327 crate::redis::bf_madd("knowdb", key, &owned)
328}
329
330pub fn redis_bf_create(key: &str, error_rate: f64, capacity: i64) -> KnowledgeResult<()> {
336 crate::redis::bf_reserve("knowdb", key, error_rate, capacity)
337}
338
339pub(crate) fn register_fun_map(map: HashMap<String, crate::loader::FunSpec>) {
345 crate::fun::register_fun_map(map);
346}
347
348pub fn external_exists(service: &str, arg: &str) -> KnowledgeResult<bool> {
350 let spec = crate::fun::resolve_spec(service, true)?;
351 let redis_key = spec.key.as_deref().unwrap_or(arg);
352 if spec.enabled()
353 && let Some(generation) = current_generation()
354 {
355 let ck = redis_cache_key(RedisCmdTag::BfExists, generation, redis_key, &[arg]);
356 if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck) {
357 return Ok(v);
358 }
359 let result = crate::fun::external_exists(service, arg)?;
360 if let Some(ttl) = spec.ttl_ms {
361 runtime().redis_cache_put_with_ttl(ck, CachedRedisValue::Bool(result), ttl);
362 } else {
363 runtime().redis_cache_put(ck, CachedRedisValue::Bool(result));
364 }
365 return Ok(result);
366 }
367 crate::fun::external_exists(service, arg)
368}
369
370pub fn external_value(service: &str, arg: &str) -> KnowledgeResult<Option<String>> {
372 let spec = crate::fun::resolve_spec(service, false)?;
373 let redis_key = spec.key.as_deref().unwrap_or(arg);
374 if spec.enabled()
375 && let Some(generation) = current_generation()
376 {
377 let ck = redis_cache_key(RedisCmdTag::Get, generation, redis_key, &[arg]);
378 if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck) {
379 return Ok(v.clone());
380 }
381 let result = crate::fun::external_value(service, arg)?;
382 if let Some(ttl) = spec.ttl_ms {
383 runtime().redis_cache_put_with_ttl(
384 ck,
385 CachedRedisValue::OptString(result.clone()),
386 ttl,
387 );
388 } else {
389 runtime().redis_cache_put(ck, CachedRedisValue::OptString(result.clone()));
390 }
391 return Ok(result);
392 }
393 crate::fun::external_value(service, arg)
394}
395
396#[allow(dead_code)]
397pub(crate) fn redis_ping() -> KnowledgeResult<bool> {
398 crate::redis::ping_blocking("knowdb")
399}
400
401#[allow(dead_code)]
402pub(crate) fn redis_close() -> KnowledgeResult<()> {
403 crate::redis::close(Some("knowdb"))
404}
405
406fn redis_cache_key(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
407 use std::collections::hash_map::DefaultHasher;
408 use std::hash::{Hash, Hasher};
409 let mut hasher = DefaultHasher::new();
410 key.hash(&mut hasher);
411 let key_hash = hasher.finish();
412 let mut hasher = DefaultHasher::new();
413 for arg in args {
414 arg.hash(&mut hasher);
415 }
416 let args_hash = hasher.finish();
417 RedisCacheKey {
418 generation,
419 cmd_tag: cmd,
420 key_hash,
421 args_hash,
422 }
423}
424
425pub fn current_generation() -> Option<u64> {
428 runtime()
429 .current_generation()
430 .map(|generation| generation.0)
431}
432
433pub fn runtime_snapshot() -> RuntimeSnapshot {
434 runtime().snapshot()
435}
436
437pub fn install_runtime_telemetry(
438 telemetry_impl: Arc<dyn KnowledgeTelemetry>,
439) -> Arc<dyn KnowledgeTelemetry> {
440 install_telemetry(telemetry_impl)
441}
442
443pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
444 runtime()
445 .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
446 .map(QueryResponse::into_rows)
447}
448
449pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
450 runtime()
451 .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
452 .await
453 .map(QueryResponse::into_rows)
454}
455
456pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
457 runtime()
458 .execute(&QueryRequest::first_row(
459 sql,
460 Vec::new(),
461 CachePolicy::Bypass,
462 ))
463 .map(QueryResponse::into_row)
464}
465
466pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
467 runtime()
468 .execute_async(&QueryRequest::first_row(
469 sql,
470 Vec::new(),
471 CachePolicy::Bypass,
472 ))
473 .await
474 .map(QueryResponse::into_row)
475}
476
477pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
478 runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
479}
480
481pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
482 runtime()
483 .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
484 .await
485}
486
487pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
488 query_fields(sql, params)
489}
490
491pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
492 query_fields_async(sql, params).await
493}
494
495pub fn query_named<'a>(
496 sql: &str,
497 params: &'a [(&'a str, &'a dyn ToSql)],
498) -> KnowledgeResult<RowData> {
499 let fields = named_params_to_fields(params)?;
500 query_fields(sql, &fields)
501}
502
503pub fn cache_query_fields<const N: usize>(
504 sql: &str,
505 c_params: &[DataField; N],
506 query_params: &[DataField],
507 cache: &mut impl CacheAble<DataField, RowData, N>,
508) -> RowData {
509 cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
510}
511
512pub fn cache_query_fields_with_scope<const N: usize>(
513 sql: &str,
514 local_cache_scope: u64,
515 c_params: &[DataField; N],
516 query_params: &[DataField],
517 cache: &mut impl CacheAble<DataField, RowData, N>,
518) -> RowData {
519 if let Some(generation) = current_generation() {
520 cache.prepare_generation(generation);
521 }
522 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
523 runtime().record_local_cache_hit();
524 if telemetry_enabled() {
525 telemetry().on_cache(&CacheTelemetryEvent {
526 layer: CacheLayer::Local,
527 outcome: CacheOutcome::Hit,
528 provider_kind: runtime().current_provider_kind(),
529 });
530 }
531 return hit.clone();
532 }
533 runtime().record_local_cache_miss();
534 if telemetry_enabled() {
535 telemetry().on_cache(&CacheTelemetryEvent {
536 layer: CacheLayer::Local,
537 outcome: CacheOutcome::Miss,
538 provider_kind: runtime().current_provider_kind(),
539 });
540 }
541
542 match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
543 Ok(row) => {
544 cache.save_scoped(local_cache_scope, c_params, row.clone());
545 row
546 }
547 Err(err) => {
548 warn_kdb!("[kdb] query error: {}", err);
549 Vec::new()
550 }
551 }
552}
553
554pub async fn cache_query_fields_async<const N: usize>(
555 sql: &str,
556 c_params: &[DataField; N],
557 query_params: &[DataField],
558 cache: &mut impl CacheAble<DataField, RowData, N>,
559) -> RowData {
560 cache_query_fields_async_with_scope(
561 sql,
562 stable_hash(sql),
563 c_params,
564 || query_params.to_vec(),
565 cache,
566 )
567 .await
568}
569
570pub async fn cache_query_fields_async_with<const N: usize, F>(
571 sql: &str,
572 c_params: &[DataField; N],
573 build_query_params: F,
574 cache: &mut impl CacheAble<DataField, RowData, N>,
575) -> RowData
576where
577 F: FnOnce() -> Vec<DataField>,
578{
579 cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
580 .await
581}
582
583pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
584 sql: &str,
585 local_cache_scope: u64,
586 c_params: &[DataField; N],
587 build_query_params: F,
588 cache: &mut impl CacheAble<DataField, RowData, N>,
589) -> RowData
590where
591 F: FnOnce() -> Vec<DataField>,
592{
593 if let Some(generation) = current_generation() {
594 cache.prepare_generation(generation);
595 }
596 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
597 runtime().record_local_cache_hit();
598 if telemetry_enabled() {
599 telemetry().on_cache(&CacheTelemetryEvent {
600 layer: CacheLayer::Local,
601 outcome: CacheOutcome::Hit,
602 provider_kind: runtime().current_provider_kind(),
603 });
604 }
605 return hit.clone();
606 }
607 runtime().record_local_cache_miss();
608 if telemetry_enabled() {
609 telemetry().on_cache(&CacheTelemetryEvent {
610 layer: CacheLayer::Local,
611 outcome: CacheOutcome::Miss,
612 provider_kind: runtime().current_provider_kind(),
613 });
614 }
615
616 let query_params = build_query_params();
617 match runtime()
618 .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
619 .await
620 {
621 Ok(row) => {
622 cache.save_scoped(local_cache_scope, c_params, row.clone());
623 row
624 }
625 Err(err) => {
626 warn_kdb!("[kdb] query error: {}", err);
627 Vec::new()
628 }
629 }
630}
631
632pub fn cache_query<const N: usize>(
633 sql: &str,
634 c_params: &[DataField; N],
635 named_params: &[(&str, &dyn ToSql)],
636 cache: &mut impl CacheAble<DataField, RowData, N>,
637) -> RowData {
638 let query_fields = match named_params_to_fields(named_params) {
639 Ok(fields) => fields,
640 Err(err) => {
641 warn_kdb!("[kdb] query param conversion error: {}", err);
642 return Vec::new();
643 }
644 };
645
646 cache_query_fields(sql, c_params, &query_fields, cache)
647}
648
649pub async fn cache_query_async<const N: usize>(
650 sql: &str,
651 c_params: &[DataField; N],
652 named_params: &[(&str, &dyn ToSql)],
653 cache: &mut impl CacheAble<DataField, RowData, N>,
654) -> RowData {
655 let query_fields = match named_params_to_fields(named_params) {
656 Ok(fields) => fields,
657 Err(err) => {
658 warn_kdb!("[kdb] query param conversion error: {}", err);
659 return Vec::new();
660 }
661 };
662
663 cache_query_fields_async(sql, c_params, &query_fields, cache).await
664}
665
666fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
667 if let Ok(conn) = Connection::open_with_flags(
668 authority_uri,
669 OpenFlags::SQLITE_OPEN_READ_WRITE
670 | OpenFlags::SQLITE_OPEN_CREATE
671 | OpenFlags::SQLITE_OPEN_URI,
672 ) {
673 let _ = conn.execute_batch(
674 "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
675 );
676 }
677 Ok(())
678}
679
680pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
681 ensure_wal(authority_uri)?;
682 let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
683 let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
684 init_mem_provider(mem)
685}
686
687pub fn init_thread_cloned_from_knowdb(
688 root: &Path,
689 knowdb_conf: &Path,
690 authority_uri: &str,
691 dict: &orion_variate::EnvDict,
692) -> KnowledgeResult<()> {
693 let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
694 if let Some(provider_cfg) = conf.provider() {
695 if let Some(sqldb) = provider_cfg.sqldb {
697 match sqldb.kind {
698 SqlProviderKind::Postgres => {
699 info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
700 init_postgres_provider_with_config(
701 PostgresProviderConfig::new(sqldb.connection_uri)
702 .with_pool_size(sqldb.pool_size)
703 .with_min_connections(sqldb.min_connections)
704 .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
705 .with_idle_timeout_ms(sqldb.idle_timeout_ms)
706 .with_max_lifetime_ms(sqldb.max_lifetime_ms),
707 )?;
708 runtime().configure_result_cache(
709 conf.cache.enabled,
710 conf.cache.capacity,
711 Duration::from_millis(conf.cache.ttl_ms.max(1)),
712 );
713 return Ok(());
714 }
715 SqlProviderKind::Mysql => {
716 info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
717 init_mysql_provider_with_config(
718 MySqlProviderConfig::new(sqldb.connection_uri)
719 .with_pool_size(sqldb.pool_size)
720 .with_min_connections(sqldb.min_connections)
721 .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
722 .with_idle_timeout_ms(sqldb.idle_timeout_ms)
723 .with_max_lifetime_ms(sqldb.max_lifetime_ms),
724 )?;
725 runtime().configure_result_cache(
726 conf.cache.enabled,
727 conf.cache.capacity,
728 Duration::from_millis(conf.cache.ttl_ms.max(1)),
729 );
730 return Ok(());
731 }
732 }
733 }
734 if let Some(redis_cfg) = provider_cfg.redis {
736 info_ctrl!("init redis knowdb provider({}) ", conf_abs.display(),);
737 crate::redis::init_with_opts(
738 "knowdb",
739 &redis_cfg.connection_uri,
740 redis_cfg.pool_size,
741 redis_cfg.command_timeout_ms,
742 )?;
743 runtime().configure_redis_cache(conf.cache.enabled, conf.cache.capacity);
744 register_fun_map(conf.fun.clone());
745 return Ok(());
746 }
747 }
748
749 crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
750 let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
751 let path_part = rest.split('?').next().unwrap_or(rest);
752 format!("file:{}?mode=ro&uri=true", path_part)
753 } else {
754 authority_uri.to_string()
755 };
756
757 info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
758 init_thread_cloned_from_authority(&ro_uri)?;
759 runtime().configure_result_cache(
760 conf.cache.enabled,
761 conf.cache.capacity,
762 Duration::from_millis(conf.cache.ttl_ms.max(1)),
763 );
764 Ok(())
765}
766
767fn stable_hash(value: &str) -> u64 {
768 let mut hasher = DefaultHasher::new();
769 value.hash(&mut hasher);
770 hasher.finish()
771}
772
773#[cfg(test)]
774mod tests {
775 use super::*;
776 use crate::cache::FieldQueryCache;
777 use crate::error::KnowReason;
778 use crate::mem::memdb::MemDB;
779 use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
780 use crate::runtime::fields_to_params;
781 use crate::telemetry::{
782 CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
783 ReloadTelemetryEvent, reset_telemetry,
784 };
785 use orion_error::conversion::ToStructError;
786 use orion_variate::EnvDict;
787 use std::fs;
788 use std::hint::black_box;
789 use std::path::PathBuf;
790 use std::sync::atomic::{AtomicU64, Ordering};
791 use std::time::{Duration, Instant};
792
793 #[derive(Default)]
794 struct TestTelemetry {
795 reload_success: AtomicU64,
796 reload_failure: AtomicU64,
797 local_hits: AtomicU64,
798 local_misses: AtomicU64,
799 result_hits: AtomicU64,
800 result_misses: AtomicU64,
801 metadata_hits: AtomicU64,
802 metadata_misses: AtomicU64,
803 query_success: AtomicU64,
804 query_failure: AtomicU64,
805 }
806
807 impl KnowledgeTelemetry for TestTelemetry {
808 fn on_cache(&self, event: &CacheTelemetryEvent) {
809 let counter = match (event.layer, event.outcome) {
810 (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
811 (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
812 (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
813 (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
814 (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
815 (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
816 };
817 counter.fetch_add(1, Ordering::Relaxed);
818 }
819
820 fn on_reload(&self, event: &ReloadTelemetryEvent) {
821 let counter = match event.outcome {
822 crate::telemetry::ReloadOutcome::Success => &self.reload_success,
823 crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
824 };
825 counter.fetch_add(1, Ordering::Relaxed);
826 }
827
828 fn on_query(&self, event: &QueryTelemetryEvent) {
829 let counter = if event.success {
830 &self.query_success
831 } else {
832 &self.query_failure
833 };
834 counter.fetch_add(1, Ordering::Relaxed);
835 }
836 }
837
838 fn perf_env_usize(key: &str, default: usize) -> usize {
839 std::env::var(key)
840 .ok()
841 .and_then(|value| value.parse::<usize>().ok())
842 .unwrap_or(default)
843 }
844
845 fn seed_perf_provider(rows: usize) {
846 let db = MemDB::instance();
847 db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
848 .expect("create perf_kv");
849 db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
850 for id in 1..=rows {
851 let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
852 db.execute(sql.as_str()).expect("insert perf_kv row");
853 }
854 db.execute("COMMIT").expect("commit perf_kv load");
855 init_mem_provider(db).expect("init perf provider");
856 }
857
858 #[tokio::test(flavor = "current_thread")]
859 async fn query_async_works_with_mem_provider() {
860 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
861 let db = MemDB::instance();
862 db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
863 .expect("create async_kv");
864 db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
865 .expect("insert async_kv row");
866 init_mem_provider(db).expect("init mem provider");
867
868 let row = query_fields_async(
869 "SELECT value FROM async_kv WHERE id=:id",
870 &[DataField::from_digit(":id", 1)],
871 )
872 .await
873 .expect("query async row");
874 assert_eq!(row.len(), 1);
875 assert_eq!(row[0].to_string(), "chars(hello)");
876 }
877
878 #[derive(Clone)]
879 struct PerfQuery {
880 cache_key: [DataField; 1],
881 query_params: [DataField; 1],
882 bypass_req: QueryRequest,
883 global_req: QueryRequest,
884 }
885
886 fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
887 (0..ops)
888 .map(|idx| {
889 let id = ((idx * 17) % hotset + 1) as i64;
890 let cache_key = [DataField::from_digit("id", id)];
891 let query_params = [DataField::from_digit(":id", id)];
892 let bypass_req = QueryRequest::first_row(
893 "SELECT value FROM perf_kv WHERE id=:id",
894 fields_to_params(&query_params),
895 CachePolicy::Bypass,
896 );
897 let global_req = QueryRequest::first_row(
898 "SELECT value FROM perf_kv WHERE id=:id",
899 fields_to_params(&query_params),
900 CachePolicy::UseGlobal,
901 );
902 PerfQuery {
903 cache_key,
904 query_params,
905 bypass_req,
906 global_req,
907 }
908 })
909 .collect()
910 }
911
912 #[derive(Debug, Clone, Copy)]
913 struct PerfCounters {
914 result_hits: u64,
915 result_misses: u64,
916 local_hits: u64,
917 local_misses: u64,
918 metadata_hits: u64,
919 metadata_misses: u64,
920 }
921
922 #[derive(Debug, Clone)]
923 struct PerfResult {
924 name: &'static str,
925 elapsed: Duration,
926 ops: usize,
927 counters: PerfCounters,
928 }
929
930 impl PerfResult {
931 fn qps(&self) -> f64 {
932 let secs = self.elapsed.as_secs_f64();
933 if secs == 0.0 {
934 self.ops as f64
935 } else {
936 self.ops as f64 / secs
937 }
938 }
939 }
940
941 fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
942 PerfCounters {
943 result_hits: after
944 .result_cache_hits
945 .saturating_sub(before.result_cache_hits),
946 result_misses: after
947 .result_cache_misses
948 .saturating_sub(before.result_cache_misses),
949 local_hits: after
950 .local_cache_hits
951 .saturating_sub(before.local_cache_hits),
952 local_misses: after
953 .local_cache_misses
954 .saturating_sub(before.local_cache_misses),
955 metadata_hits: after
956 .metadata_cache_hits
957 .saturating_sub(before.metadata_cache_hits),
958 metadata_misses: after
959 .metadata_cache_misses
960 .saturating_sub(before.metadata_cache_misses),
961 }
962 }
963
964 fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
965 seed_perf_provider(rows);
966 let before = runtime_snapshot();
967 let started = Instant::now();
968 for item in workload {
969 let row = runtime()
970 .execute(&item.bypass_req)
971 .expect("execute bypass request")
972 .into_row();
973 black_box(row);
974 }
975 let elapsed = started.elapsed();
976 let after = runtime_snapshot();
977 PerfResult {
978 name: "bypass",
979 elapsed,
980 ops: workload.len(),
981 counters: snapshot_delta(&before, &after),
982 }
983 }
984
985 fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
986 seed_perf_provider(rows);
987 let before = runtime_snapshot();
988 let started = Instant::now();
989 for item in workload {
990 let row = runtime()
991 .execute(&item.global_req)
992 .expect("execute global-cache request")
993 .into_row();
994 black_box(row);
995 }
996 let elapsed = started.elapsed();
997 let after = runtime_snapshot();
998 PerfResult {
999 name: "global_cache",
1000 elapsed,
1001 ops: workload.len(),
1002 counters: snapshot_delta(&before, &after),
1003 }
1004 }
1005
1006 fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
1007 seed_perf_provider(rows);
1008 let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
1009 let before = runtime_snapshot();
1010 let started = Instant::now();
1011 for item in workload {
1012 let row = cache_query_fields(
1013 "SELECT value FROM perf_kv WHERE id=:id",
1014 &item.cache_key,
1015 &item.query_params,
1016 &mut cache,
1017 );
1018 black_box(row);
1019 }
1020 let elapsed = started.elapsed();
1021 let after = runtime_snapshot();
1022 PerfResult {
1023 name: "local_cache",
1024 elapsed,
1025 ops: workload.len(),
1026 counters: snapshot_delta(&before, &after),
1027 }
1028 }
1029
1030 fn print_perf_result(result: &PerfResult) {
1031 eprintln!(
1032 "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
1033 result.name,
1034 result.elapsed.as_millis(),
1035 result.qps(),
1036 result.counters.result_hits,
1037 result.counters.result_misses,
1038 result.counters.local_hits,
1039 result.counters.local_misses,
1040 result.counters.metadata_hits,
1041 result.counters.metadata_misses,
1042 );
1043 }
1044
1045 fn uniq_cache_cfg_tmp_dir() -> PathBuf {
1046 use rand::{Rng, rng};
1047 let rnd: u64 = rng().next_u64();
1048 std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
1049 }
1050
1051 fn write_minimal_knowdb_with_cache(
1052 root: &Path,
1053 enabled: bool,
1054 capacity: usize,
1055 ttl_ms: u64,
1056 ) -> std::path::PathBuf {
1057 let models = root.join("models").join("knowledge");
1058 let example_dir = models.join("example");
1059 fs::create_dir_all(&example_dir).expect("create knowdb models/example");
1060 fs::write(
1061 models.join("knowdb.toml"),
1062 format!(
1063 r#"
1064version = 2
1065base_dir = "."
1066
1067[cache]
1068enabled = {enabled}
1069capacity = {capacity}
1070ttl_ms = {ttl_ms}
1071
1072[csv]
1073has_header = false
1074
1075[[tables]]
1076name = "example"
1077columns.by_index = [0,1]
1078"#
1079 ),
1080 )
1081 .expect("write knowdb.toml");
1082 fs::write(
1083 example_dir.join("create.sql"),
1084 r#"
1085CREATE TABLE IF NOT EXISTS {table} (
1086 id INTEGER PRIMARY KEY,
1087 value TEXT NOT NULL
1088);
1089"#,
1090 )
1091 .expect("write create.sql");
1092 fs::write(
1093 example_dir.join("insert.sql"),
1094 "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
1095 )
1096 .expect("write insert.sql");
1097 fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
1098 models.join("knowdb.toml")
1099 }
1100
1101 fn write_provider_only_knowdb_with_cache(
1102 root: &Path,
1103 provider_kind: &str,
1104 connection_uri: &str,
1105 enabled: bool,
1106 capacity: usize,
1107 ttl_ms: u64,
1108 ) -> std::path::PathBuf {
1109 let models = root.join("models").join("knowledge");
1110 fs::create_dir_all(&models).expect("create knowdb models");
1111 fs::write(
1112 models.join("knowdb.toml"),
1113 format!(
1114 r#"
1115version = 2
1116base_dir = "."
1117
1118[cache]
1119enabled = {enabled}
1120capacity = {capacity}
1121ttl_ms = {ttl_ms}
1122
1123[provider.sqldb]
1124kind = "{provider_kind}"
1125connection_uri = "{connection_uri}"
1126"#
1127 ),
1128 )
1129 .expect("write provider knowdb.toml");
1130 models.join("knowdb.toml")
1131 }
1132
1133 fn restore_default_result_cache_config() {
1134 runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
1135 }
1136
1137 #[test]
1138 fn provider_can_be_replaced() {
1139 let _guard = crate::runtime::runtime_test_guard()
1140 .lock()
1141 .expect("provider test guard");
1142 let db1 = MemDB::instance();
1143 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1144 .expect("create table in db1");
1145 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1146 .expect("seed db1");
1147 init_mem_provider(db1).expect("init provider db1");
1148 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
1149 assert_eq!(row[0].to_string(), "chars(first)");
1150
1151 let db2 = MemDB::instance();
1152 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1153 .expect("create table in db2");
1154 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1155 .expect("seed db2");
1156 init_mem_provider(db2).expect("replace provider with db2");
1157 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
1158 assert_eq!(row[0].to_string(), "chars(second)");
1159 }
1160
1161 #[test]
1162 fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
1163 let _guard = crate::runtime::runtime_test_guard()
1164 .lock()
1165 .expect("provider test guard");
1166 COLNAME_CACHE.write().expect("metadata cache lock").clear();
1167
1168 let db = MemDB::instance();
1169 db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
1170 .expect("create table");
1171 db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
1172 .expect("seed table");
1173
1174 let old_scope = MetadataCacheScope {
1175 datasource_id: DatasourceId("sqlite:old".to_string()),
1176 generation: Generation(1),
1177 };
1178 let new_scope = MetadataCacheScope {
1179 datasource_id: DatasourceId("sqlite:new".to_string()),
1180 generation: Generation(2),
1181 };
1182 let old_provider = MemProvider {
1183 db: db.clone(),
1184 metadata_scope: old_scope.clone(),
1185 };
1186
1187 install_provider(
1188 ProviderKind::SqliteAuthority,
1189 new_scope.datasource_id.clone(),
1190 |_generation| {
1191 Ok(Arc::new(MemProvider {
1192 db: db.clone(),
1193 metadata_scope: new_scope.clone(),
1194 }))
1195 },
1196 )
1197 .expect("install new provider");
1198
1199 let row = old_provider
1200 .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1201 .expect("old provider query");
1202 assert_eq!(row[0].to_string(), "chars(scope-old)");
1203
1204 let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1205 assert!(cache.contains(&metadata_cache_key_for_scope(
1206 &old_scope,
1207 "SELECT value FROM cache_scope_t WHERE id = 1",
1208 )));
1209 assert!(!cache.contains(&metadata_cache_key_for_scope(
1210 &new_scope,
1211 "SELECT value FROM cache_scope_t WHERE id = 1",
1212 )));
1213 }
1214
1215 #[tokio::test(flavor = "current_thread")]
1216 async fn async_query_uses_runtime_bridge() {
1217 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1218 let db = MemDB::instance();
1219 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1220 .expect("create table");
1221 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1222 .expect("seed table");
1223 init_mem_provider(db).expect("init provider");
1224
1225 let row = query_row_async("SELECT value FROM t WHERE id = 1")
1226 .await
1227 .expect("async query row");
1228 assert_eq!(row[0].to_string(), "chars(async-first)");
1229 }
1230
1231 #[tokio::test(flavor = "current_thread")]
1232 async fn async_cache_query_fields_hits_local_cache() {
1233 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1234 let db = MemDB::instance();
1235 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1236 .expect("create table");
1237 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1238 .expect("seed table");
1239 init_mem_provider(db).expect("init provider");
1240
1241 let key = [DataField::from_digit("id", 1)];
1242 let params = [DataField::from_digit(":id", 1)];
1243 let mut cache = FieldQueryCache::default();
1244
1245 let first = cache_query_fields_async(
1246 "SELECT value FROM t WHERE id=:id",
1247 &key,
1248 ¶ms,
1249 &mut cache,
1250 )
1251 .await;
1252 let second = cache_query_fields_async(
1253 "SELECT value FROM t WHERE id=:id",
1254 &key,
1255 ¶ms,
1256 &mut cache,
1257 )
1258 .await;
1259
1260 assert_eq!(first[0].to_string(), "chars(async-cache)");
1261 assert_eq!(second[0].to_string(), "chars(async-cache)");
1262 }
1263
1264 #[test]
1265 fn local_cache_is_cleared_when_generation_changes() {
1266 let _guard = crate::runtime::runtime_test_guard()
1267 .lock()
1268 .expect("provider test guard");
1269 let db1 = MemDB::instance();
1270 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1271 .expect("create table in db1");
1272 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1273 .expect("seed db1");
1274 init_mem_provider(db1).expect("init provider db1");
1275
1276 let key = [DataField::from_digit("id", 1)];
1277 let params = [DataField::from_digit(":id", 1)];
1278 let mut cache = FieldQueryCache::default();
1279 let row = cache_query_fields(
1280 "SELECT value FROM t WHERE id=:id",
1281 &key,
1282 ¶ms,
1283 &mut cache,
1284 );
1285 assert_eq!(row[0].to_string(), "chars(first)");
1286
1287 let db2 = MemDB::instance();
1288 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1289 .expect("create table in db2");
1290 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1291 .expect("seed db2");
1292 init_mem_provider(db2).expect("replace provider with db2");
1293
1294 let row = cache_query_fields(
1295 "SELECT value FROM t WHERE id=:id",
1296 &key,
1297 ¶ms,
1298 &mut cache,
1299 );
1300 assert_eq!(row[0].to_string(), "chars(second)");
1301 }
1302
1303 #[test]
1304 fn local_cache_is_scoped_by_sql_text() {
1305 let _guard = crate::runtime::runtime_test_guard()
1306 .lock()
1307 .expect("provider test guard");
1308 let db = MemDB::instance();
1309 db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1310 .expect("create t1");
1311 db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1312 .expect("create t2");
1313 db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1314 .expect("seed t1");
1315 db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1316 .expect("seed t2");
1317 init_mem_provider(db).expect("init provider");
1318
1319 let key = [DataField::from_digit("id", 1)];
1320 let params = [DataField::from_digit(":id", 1)];
1321 let mut cache = FieldQueryCache::default();
1322
1323 let row = cache_query_fields(
1324 "SELECT value FROM t1 WHERE id=:id",
1325 &key,
1326 ¶ms,
1327 &mut cache,
1328 );
1329 assert_eq!(row[0].to_string(), "chars(first)");
1330
1331 let row = cache_query_fields(
1332 "SELECT value FROM t2 WHERE id=:id",
1333 &key,
1334 ¶ms,
1335 &mut cache,
1336 );
1337 assert_eq!(row[0].to_string(), "chars(second)");
1338 }
1339
1340 #[test]
1341 fn runtime_snapshot_tracks_generation_and_cache_size() {
1342 let _guard = crate::runtime::runtime_test_guard()
1343 .lock()
1344 .expect("provider test guard");
1345 let db = MemDB::instance();
1346 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1347 .expect("create table");
1348 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1349 .expect("seed table");
1350 init_mem_provider(db).expect("init provider");
1351
1352 let mut cache = FieldQueryCache::default();
1353 let key = [DataField::from_digit("id", 1)];
1354 let params = [DataField::from_digit(":id", 1)];
1355 let row = cache_query_fields(
1356 "SELECT value FROM t WHERE id=:id",
1357 &key,
1358 ¶ms,
1359 &mut cache,
1360 );
1361 assert_eq!(row[0].to_string(), "chars(first)");
1362
1363 let snapshot = runtime_snapshot();
1364 assert!(matches!(
1365 snapshot.provider_kind,
1366 Some(ProviderKind::SqliteAuthority)
1367 ));
1368 assert!(snapshot.generation.is_some());
1369 assert!(snapshot.result_cache_len >= 1);
1370 assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1371 assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1372 assert!(snapshot.reload_successes >= 1);
1373 }
1374
1375 #[test]
1376 fn metadata_cache_is_scoped_by_generation() {
1377 let _guard = crate::runtime::runtime_test_guard()
1378 .lock()
1379 .expect("provider test guard");
1380 let sql = "SELECT value FROM t WHERE id = 1";
1381 let before = runtime_snapshot().metadata_cache_len;
1382
1383 let db1 = MemDB::instance();
1384 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1385 .expect("create table in db1");
1386 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1387 .expect("seed db1");
1388 init_mem_provider(db1).expect("init provider db1");
1389 let row = query_row(sql).expect("query db1");
1390 assert_eq!(row[0].to_string(), "chars(first)");
1391 let after_first = runtime_snapshot().metadata_cache_len;
1392 assert!(
1393 after_first > before,
1394 "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1395 );
1396
1397 let db2 = MemDB::instance();
1398 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1399 .expect("create table in db2");
1400 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1401 .expect("seed db2");
1402 init_mem_provider(db2).expect("replace provider with db2");
1403 let row = query_row(sql).expect("query db2");
1404 assert_eq!(row[0].to_string(), "chars(second)");
1405 let after_second = runtime_snapshot().metadata_cache_len;
1406 assert!(
1407 after_second > after_first,
1408 "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1409 );
1410 }
1411
1412 #[test]
1413 fn failed_provider_reload_keeps_previous_provider() {
1414 let _guard = crate::runtime::runtime_test_guard()
1415 .lock()
1416 .expect("provider test guard");
1417 let db1 = MemDB::instance();
1418 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1419 .expect("create table in db1");
1420 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1421 .expect("seed db1");
1422 init_mem_provider(db1).expect("init provider db1");
1423 let before_generation = current_generation();
1424
1425 let reload_err = install_provider(
1426 ProviderKind::SqliteAuthority,
1427 datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1428 |_generation| {
1429 Err(KnowReason::from_logic()
1430 .to_err()
1431 .with_detail("expected reload failure"))
1432 },
1433 );
1434 assert!(reload_err.is_err());
1435
1436 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1437 assert_eq!(row[0].to_string(), "chars(first)");
1438 assert_eq!(current_generation(), before_generation);
1439 }
1440
1441 #[test]
1442 fn runtime_snapshot_records_cache_counters() {
1443 let _guard = crate::runtime::runtime_test_guard()
1444 .lock()
1445 .expect("provider test guard");
1446 let db = MemDB::instance();
1447 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1448 .expect("create table");
1449 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1450 .expect("seed table");
1451 init_mem_provider(db).expect("init provider");
1452
1453 let before = runtime_snapshot();
1454 let mut cache = FieldQueryCache::default();
1455 let key = [DataField::from_digit("id", 1)];
1456 let params = [DataField::from_digit(":id", 1)];
1457 let row = cache_query_fields(
1458 "SELECT value FROM t WHERE id=:id",
1459 &key,
1460 ¶ms,
1461 &mut cache,
1462 );
1463 assert_eq!(row[0].to_string(), "chars(first)");
1464 let row = cache_query_fields(
1465 "SELECT value FROM t WHERE id=:id",
1466 &key,
1467 ¶ms,
1468 &mut cache,
1469 );
1470 assert_eq!(row[0].to_string(), "chars(first)");
1471
1472 let after = runtime_snapshot();
1473 assert!(after.local_cache_hits > before.local_cache_hits);
1474 assert!(after.local_cache_misses > before.local_cache_misses);
1475 assert!(after.result_cache_misses > before.result_cache_misses);
1476 assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1477 }
1478
1479 #[test]
1480 fn telemetry_receives_reload_cache_and_query_events() {
1481 let _guard = crate::runtime::runtime_test_guard()
1482 .lock()
1483 .expect("provider test guard");
1484 let telemetry_impl = Arc::new(TestTelemetry::default());
1485 let previous = install_runtime_telemetry(telemetry_impl.clone());
1486
1487 let db = MemDB::instance();
1488 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1489 .expect("create table");
1490 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1491 .expect("seed table");
1492 init_mem_provider(db).expect("init provider");
1493
1494 let mut cache = FieldQueryCache::default();
1495 let key = [DataField::from_digit("id", 1)];
1496 let params = [DataField::from_digit(":id", 1)];
1497 let row = cache_query_fields(
1498 "SELECT value FROM t WHERE id=:id",
1499 &key,
1500 ¶ms,
1501 &mut cache,
1502 );
1503 assert_eq!(row[0].to_string(), "chars(first)");
1504 let row = cache_query_fields(
1505 "SELECT value FROM t WHERE id=:id",
1506 &key,
1507 ¶ms,
1508 &mut cache,
1509 );
1510 assert_eq!(row[0].to_string(), "chars(first)");
1511
1512 let reload_err = install_provider(
1513 ProviderKind::SqliteAuthority,
1514 datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1515 |_generation| {
1516 Err(KnowReason::from_logic()
1517 .to_err()
1518 .with_detail("expected telemetry reload failure"))
1519 },
1520 );
1521 assert!(reload_err.is_err());
1522
1523 install_runtime_telemetry(previous);
1524 reset_telemetry();
1525
1526 assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1527 assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1528 assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1529 assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1530 assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1531 assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1532 assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1533 }
1534
1535 #[test]
1536 #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1537 fn cache_perf_reports_cache_vs_no_cache() {
1538 let _guard = crate::runtime::runtime_test_guard()
1539 .lock()
1540 .expect("provider test guard");
1541 let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1542 let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1543 let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1544 let workload = build_perf_workload(ops, hotset);
1545
1546 eprintln!(
1547 "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1548 rows, ops, hotset
1549 );
1550
1551 let bypass = run_bypass_perf(&workload, rows);
1552 let global = run_global_cache_perf(&workload, rows);
1553 let local = run_local_cache_perf(&workload, rows);
1554
1555 print_perf_result(&bypass);
1556 print_perf_result(&global);
1557 print_perf_result(&local);
1558
1559 eprintln!(
1560 "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1561 bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1562 bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1563 );
1564
1565 assert_eq!(bypass.counters.result_hits, 0);
1566 assert_eq!(bypass.counters.result_misses, 0);
1567 assert_eq!(bypass.counters.local_hits, 0);
1568 assert_eq!(bypass.counters.local_misses, 0);
1569 assert!(global.counters.result_hits > 0);
1570 assert!(global.counters.result_misses > 0);
1571 assert_eq!(global.counters.local_hits, 0);
1572 assert_eq!(global.counters.local_misses, 0);
1573 assert!(local.counters.local_hits > 0);
1574 assert!(local.counters.local_misses > 0);
1575 assert!(local.counters.result_misses > 0);
1576 }
1577
1578 #[test]
1579 fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1580 let _guard = crate::runtime::runtime_test_guard()
1581 .lock()
1582 .expect("provider test guard");
1583 let root = uniq_cache_cfg_tmp_dir();
1584 let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1585 let auth_file = root.join(".run").join("authority.sqlite");
1586 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1587 .expect("create authority parent");
1588 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1589
1590 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1591 .expect("init knowdb with cache config");
1592
1593 let snapshot = runtime_snapshot();
1594 assert!(snapshot.result_cache_enabled);
1595 assert_eq!(snapshot.result_cache_capacity, 7);
1596 assert_eq!(snapshot.result_cache_ttl_ms, 5);
1597
1598 let req = QueryRequest::first_row(
1599 "SELECT value FROM example WHERE id=:id",
1600 fields_to_params(&[DataField::from_digit(":id", 1)]),
1601 CachePolicy::UseGlobal,
1602 );
1603 let before = runtime_snapshot();
1604 let row = runtime()
1605 .execute(&req)
1606 .expect("first result-cache query")
1607 .into_row();
1608 assert_eq!(row[0].to_string(), "chars(alpha)");
1609 let row = runtime()
1610 .execute(&req)
1611 .expect("second result-cache query")
1612 .into_row();
1613 assert_eq!(row[0].to_string(), "chars(alpha)");
1614 std::thread::sleep(Duration::from_millis(12));
1615 let row = runtime()
1616 .execute(&req)
1617 .expect("expired result-cache query")
1618 .into_row();
1619 assert_eq!(row[0].to_string(), "chars(alpha)");
1620 let after = runtime_snapshot();
1621
1622 assert!(after.result_cache_hits > before.result_cache_hits);
1623 assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1624
1625 restore_default_result_cache_config();
1626 let _ = fs::remove_dir_all(&root);
1627 }
1628
1629 #[test]
1630 fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1631 let _guard = crate::runtime::runtime_test_guard()
1632 .lock()
1633 .expect("provider test guard");
1634 let root = uniq_cache_cfg_tmp_dir();
1635 let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1636 let auth_file = root.join(".run").join("authority.sqlite");
1637 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1638 .expect("create authority parent");
1639 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1640
1641 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1642 .expect("init knowdb with cache disabled");
1643
1644 let snapshot = runtime_snapshot();
1645 assert!(!snapshot.result_cache_enabled);
1646 assert_eq!(snapshot.result_cache_capacity, 3);
1647 assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1648
1649 let req = QueryRequest::first_row(
1650 "SELECT value FROM example WHERE id=:id",
1651 fields_to_params(&[DataField::from_digit(":id", 1)]),
1652 CachePolicy::UseGlobal,
1653 );
1654 let before = runtime_snapshot();
1655 let _ = runtime()
1656 .execute(&req)
1657 .expect("first bypassed result-cache query");
1658 let _ = runtime()
1659 .execute(&req)
1660 .expect("second bypassed result-cache query");
1661 let after = runtime_snapshot();
1662
1663 assert_eq!(after.result_cache_hits, before.result_cache_hits);
1664 assert_eq!(after.result_cache_misses, before.result_cache_misses);
1665
1666 restore_default_result_cache_config();
1667 let _ = fs::remove_dir_all(&root);
1668 }
1669
1670 #[test]
1671 fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1672 let _guard = crate::runtime::runtime_test_guard()
1673 .lock()
1674 .expect("provider test guard");
1675 restore_default_result_cache_config();
1676
1677 let db = MemDB::instance();
1678 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1679 .expect("create table");
1680 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1681 .expect("seed table");
1682 init_mem_provider(db).expect("init provider");
1683
1684 let before = runtime_snapshot();
1685 let root = uniq_cache_cfg_tmp_dir();
1686 let conf_path = write_provider_only_knowdb_with_cache(
1687 &root,
1688 "mysql",
1689 "not-a-valid-mysql-url",
1690 false,
1691 3,
1692 5,
1693 );
1694 let authority_uri = format!(
1695 "file:{}?mode=rwc&uri=true",
1696 root.join("unused.sqlite").display()
1697 );
1698
1699 let err =
1700 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1701 assert!(err.is_err());
1702
1703 let after = runtime_snapshot();
1704 assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1705 assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1706 assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1707
1708 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1709 assert_eq!(row[0].to_string(), "chars(first)");
1710
1711 let _ = fs::remove_dir_all(&root);
1712 }
1713}