1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5 collections::hash_map::DefaultHasher,
6 hash::{Hash, Hasher},
7};
8
9use crate::error::KnowledgeResult;
10use async_trait::async_trait;
11use rusqlite::ToSql;
12use rusqlite::{Connection, OpenFlags};
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25 CachePolicy, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor, QueryRequest,
26 QueryResponse, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29 CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30 telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34 db: MemDB,
35 metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41 ThreadClonedMDB::query_with_scope(self, sql)
42 }
43
44 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45 ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46 }
47
48 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49 ThreadClonedMDB::query_row_with_scope(self, sql)
50 }
51
52 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53 ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54 }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60 self.db.query_with_scope(&self.metadata_scope, sql)
61 }
62
63 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64 self.db
65 .query_fields_with_scope(&self.metadata_scope, sql, params)
66 }
67
68 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69 self.db.query_row_with_scope(&self.metadata_scope, sql)
70 }
71
72 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73 self.db
74 .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75 }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81 PostgresProvider::query(self, sql)
82 }
83
84 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85 PostgresProvider::query_fields(self, sql, params)
86 }
87
88 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89 PostgresProvider::query_row(self, sql)
90 }
91
92 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93 PostgresProvider::query_named_fields(self, sql, params)
94 }
95
96 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97 PostgresProvider::query_async(self, sql).await
98 }
99
100 async fn query_fields_async(
101 &self,
102 sql: &str,
103 params: &[DataField],
104 ) -> KnowledgeResult<Vec<RowData>> {
105 PostgresProvider::query_fields_async(self, sql, params).await
106 }
107
108 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109 PostgresProvider::query_row_async(self, sql).await
110 }
111
112 async fn query_named_fields_async(
113 &self,
114 sql: &str,
115 params: &[DataField],
116 ) -> KnowledgeResult<RowData> {
117 PostgresProvider::query_named_fields_async(self, sql, params).await
118 }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124 MySqlProvider::query(self, sql)
125 }
126
127 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128 MySqlProvider::query_fields(self, sql, params)
129 }
130
131 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132 MySqlProvider::query_row(self, sql)
133 }
134
135 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136 MySqlProvider::query_named_fields(self, sql, params)
137 }
138
139 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140 MySqlProvider::query_async(self, sql).await
141 }
142
143 async fn query_fields_async(
144 &self,
145 sql: &str,
146 params: &[DataField],
147 ) -> KnowledgeResult<Vec<RowData>> {
148 MySqlProvider::query_fields_async(self, sql, params).await
149 }
150
151 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152 MySqlProvider::query_row_async(self, sql).await
153 }
154
155 async fn query_named_fields_async(
156 &self,
157 sql: &str,
158 params: &[DataField],
159 ) -> KnowledgeResult<RowData> {
160 MySqlProvider::query_named_fields_async(self, sql, params).await
161 }
162}
163
164fn install_provider<F>(
165 kind: ProviderKind,
166 datasource_id: DatasourceId,
167 build: F,
168) -> KnowledgeResult<()>
169where
170 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172 runtime().install_provider(kind, datasource_id, build)?;
173 Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177 DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181 let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182 install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183 Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184 authority_uri,
185 datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186 generation.0,
187 )))
188 })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192 install_provider(
193 ProviderKind::SqliteAuthority,
194 datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195 |generation| {
196 Ok(Arc::new(MemProvider {
197 db: memdb,
198 metadata_scope: MetadataCacheScope {
199 datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200 generation,
201 },
202 }))
203 },
204 )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208 init_postgres_provider_with_config(
209 PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
210 )
211}
212
213pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
214 let connection_uri = config.connection_uri().to_string();
215 let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
216 install_provider(
217 ProviderKind::Postgres,
218 datasource_id.clone(),
219 |generation| {
220 let provider = PostgresProvider::connect(
221 &config,
222 MetadataCacheScope {
223 datasource_id: datasource_id.clone(),
224 generation,
225 },
226 )?;
227 Ok(Arc::new(provider))
228 },
229 )
230}
231
232pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
233 init_mysql_provider_with_config(
234 MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
235 )
236}
237
238pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
239 let connection_uri = config.connection_uri().to_string();
240 let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
241 install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
242 let provider = MySqlProvider::connect(
243 &config,
244 MetadataCacheScope {
245 datasource_id: datasource_id.clone(),
246 generation,
247 },
248 )?;
249 Ok(Arc::new(provider))
250 })
251}
252
253pub fn current_generation() -> Option<u64> {
254 runtime()
255 .current_generation()
256 .map(|generation| generation.0)
257}
258
259pub fn runtime_snapshot() -> RuntimeSnapshot {
260 runtime().snapshot()
261}
262
263pub fn install_runtime_telemetry(
264 telemetry_impl: Arc<dyn KnowledgeTelemetry>,
265) -> Arc<dyn KnowledgeTelemetry> {
266 install_telemetry(telemetry_impl)
267}
268
269pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
270 runtime()
271 .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
272 .map(QueryResponse::into_rows)
273}
274
275pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
276 runtime()
277 .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
278 .await
279 .map(QueryResponse::into_rows)
280}
281
282pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
283 runtime()
284 .execute(&QueryRequest::first_row(
285 sql,
286 Vec::new(),
287 CachePolicy::Bypass,
288 ))
289 .map(QueryResponse::into_row)
290}
291
292pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
293 runtime()
294 .execute_async(&QueryRequest::first_row(
295 sql,
296 Vec::new(),
297 CachePolicy::Bypass,
298 ))
299 .await
300 .map(QueryResponse::into_row)
301}
302
303pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
304 runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
305}
306
307pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
308 runtime()
309 .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
310 .await
311}
312
313pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
314 query_fields(sql, params)
315}
316
317pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
318 query_fields_async(sql, params).await
319}
320
321pub fn query_named<'a>(
322 sql: &str,
323 params: &'a [(&'a str, &'a dyn ToSql)],
324) -> KnowledgeResult<RowData> {
325 let fields = named_params_to_fields(params)?;
326 query_fields(sql, &fields)
327}
328
329pub fn cache_query_fields<const N: usize>(
330 sql: &str,
331 c_params: &[DataField; N],
332 query_params: &[DataField],
333 cache: &mut impl CacheAble<DataField, RowData, N>,
334) -> RowData {
335 cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
336}
337
338pub fn cache_query_fields_with_scope<const N: usize>(
339 sql: &str,
340 local_cache_scope: u64,
341 c_params: &[DataField; N],
342 query_params: &[DataField],
343 cache: &mut impl CacheAble<DataField, RowData, N>,
344) -> RowData {
345 if let Some(generation) = current_generation() {
346 cache.prepare_generation(generation);
347 }
348 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
349 runtime().record_local_cache_hit();
350 if telemetry_enabled() {
351 telemetry().on_cache(&CacheTelemetryEvent {
352 layer: CacheLayer::Local,
353 outcome: CacheOutcome::Hit,
354 provider_kind: runtime().current_provider_kind(),
355 });
356 }
357 return hit.clone();
358 }
359 runtime().record_local_cache_miss();
360 if telemetry_enabled() {
361 telemetry().on_cache(&CacheTelemetryEvent {
362 layer: CacheLayer::Local,
363 outcome: CacheOutcome::Miss,
364 provider_kind: runtime().current_provider_kind(),
365 });
366 }
367
368 match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
369 Ok(row) => {
370 cache.save_scoped(local_cache_scope, c_params, row.clone());
371 row
372 }
373 Err(err) => {
374 warn_kdb!("[kdb] query error: {}", err);
375 Vec::new()
376 }
377 }
378}
379
380pub async fn cache_query_fields_async<const N: usize>(
381 sql: &str,
382 c_params: &[DataField; N],
383 query_params: &[DataField],
384 cache: &mut impl CacheAble<DataField, RowData, N>,
385) -> RowData {
386 cache_query_fields_async_with_scope(
387 sql,
388 stable_hash(sql),
389 c_params,
390 || query_params.to_vec(),
391 cache,
392 )
393 .await
394}
395
396pub async fn cache_query_fields_async_with<const N: usize, F>(
397 sql: &str,
398 c_params: &[DataField; N],
399 build_query_params: F,
400 cache: &mut impl CacheAble<DataField, RowData, N>,
401) -> RowData
402where
403 F: FnOnce() -> Vec<DataField>,
404{
405 cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
406 .await
407}
408
409pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
410 sql: &str,
411 local_cache_scope: u64,
412 c_params: &[DataField; N],
413 build_query_params: F,
414 cache: &mut impl CacheAble<DataField, RowData, N>,
415) -> RowData
416where
417 F: FnOnce() -> Vec<DataField>,
418{
419 if let Some(generation) = current_generation() {
420 cache.prepare_generation(generation);
421 }
422 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
423 runtime().record_local_cache_hit();
424 if telemetry_enabled() {
425 telemetry().on_cache(&CacheTelemetryEvent {
426 layer: CacheLayer::Local,
427 outcome: CacheOutcome::Hit,
428 provider_kind: runtime().current_provider_kind(),
429 });
430 }
431 return hit.clone();
432 }
433 runtime().record_local_cache_miss();
434 if telemetry_enabled() {
435 telemetry().on_cache(&CacheTelemetryEvent {
436 layer: CacheLayer::Local,
437 outcome: CacheOutcome::Miss,
438 provider_kind: runtime().current_provider_kind(),
439 });
440 }
441
442 let query_params = build_query_params();
443 match runtime()
444 .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
445 .await
446 {
447 Ok(row) => {
448 cache.save_scoped(local_cache_scope, c_params, row.clone());
449 row
450 }
451 Err(err) => {
452 warn_kdb!("[kdb] query error: {}", err);
453 Vec::new()
454 }
455 }
456}
457
458pub fn cache_query<const N: usize>(
459 sql: &str,
460 c_params: &[DataField; N],
461 named_params: &[(&str, &dyn ToSql)],
462 cache: &mut impl CacheAble<DataField, RowData, N>,
463) -> RowData {
464 let query_fields = match named_params_to_fields(named_params) {
465 Ok(fields) => fields,
466 Err(err) => {
467 warn_kdb!("[kdb] query param conversion error: {}", err);
468 return Vec::new();
469 }
470 };
471
472 cache_query_fields(sql, c_params, &query_fields, cache)
473}
474
475pub async fn cache_query_async<const N: usize>(
476 sql: &str,
477 c_params: &[DataField; N],
478 named_params: &[(&str, &dyn ToSql)],
479 cache: &mut impl CacheAble<DataField, RowData, N>,
480) -> RowData {
481 let query_fields = match named_params_to_fields(named_params) {
482 Ok(fields) => fields,
483 Err(err) => {
484 warn_kdb!("[kdb] query param conversion error: {}", err);
485 return Vec::new();
486 }
487 };
488
489 cache_query_fields_async(sql, c_params, &query_fields, cache).await
490}
491
492fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
493 if let Ok(conn) = Connection::open_with_flags(
494 authority_uri,
495 OpenFlags::SQLITE_OPEN_READ_WRITE
496 | OpenFlags::SQLITE_OPEN_CREATE
497 | OpenFlags::SQLITE_OPEN_URI,
498 ) {
499 let _ = conn.execute_batch(
500 "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
501 );
502 }
503 Ok(())
504}
505
506pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
507 ensure_wal(authority_uri)?;
508 let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
509 let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
510 init_mem_provider(mem)
511}
512
513pub fn init_thread_cloned_from_knowdb(
514 root: &Path,
515 knowdb_conf: &Path,
516 authority_uri: &str,
517 dict: &orion_variate::EnvDict,
518) -> KnowledgeResult<()> {
519 let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
520 if let Some(provider) = conf.provider {
521 match provider.kind {
522 ProviderKind::Postgres => {
523 info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
524 init_postgres_provider_with_config(
525 PostgresProviderConfig::new(provider.connection_uri)
526 .with_pool_size(provider.pool_size)
527 .with_min_connections(provider.min_connections)
528 .with_acquire_timeout_ms(provider.acquire_timeout_ms)
529 .with_idle_timeout_ms(provider.idle_timeout_ms)
530 .with_max_lifetime_ms(provider.max_lifetime_ms),
531 )?;
532 runtime().configure_result_cache(
533 conf.cache.enabled,
534 conf.cache.capacity,
535 Duration::from_millis(conf.cache.ttl_ms.max(1)),
536 );
537 return Ok(());
538 }
539 ProviderKind::Mysql => {
540 info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
541 init_mysql_provider_with_config(
542 MySqlProviderConfig::new(provider.connection_uri)
543 .with_pool_size(provider.pool_size)
544 .with_min_connections(provider.min_connections)
545 .with_acquire_timeout_ms(provider.acquire_timeout_ms)
546 .with_idle_timeout_ms(provider.idle_timeout_ms)
547 .with_max_lifetime_ms(provider.max_lifetime_ms),
548 )?;
549 runtime().configure_result_cache(
550 conf.cache.enabled,
551 conf.cache.capacity,
552 Duration::from_millis(conf.cache.ttl_ms.max(1)),
553 );
554 return Ok(());
555 }
556 ProviderKind::SqliteAuthority => {}
557 }
558 }
559
560 crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
561 let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
562 let path_part = rest.split('?').next().unwrap_or(rest);
563 format!("file:{}?mode=ro&uri=true", path_part)
564 } else {
565 authority_uri.to_string()
566 };
567
568 info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
569 init_thread_cloned_from_authority(&ro_uri)?;
570 runtime().configure_result_cache(
571 conf.cache.enabled,
572 conf.cache.capacity,
573 Duration::from_millis(conf.cache.ttl_ms.max(1)),
574 );
575 Ok(())
576}
577
578fn stable_hash(value: &str) -> u64 {
579 let mut hasher = DefaultHasher::new();
580 value.hash(&mut hasher);
581 hasher.finish()
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use crate::cache::FieldQueryCache;
588 use crate::error::Reason;
589 use crate::mem::memdb::MemDB;
590 use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
591 use crate::runtime::fields_to_params;
592 use crate::telemetry::{
593 CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
594 ReloadTelemetryEvent, reset_telemetry,
595 };
596 use orion_error::UvsFrom;
597 use orion_error::conversion::ToStructError;
598 use orion_variate::EnvDict;
599 use std::fs;
600 use std::hint::black_box;
601 use std::path::PathBuf;
602 use std::sync::atomic::{AtomicU64, Ordering};
603 use std::time::{Duration, Instant};
604
605 #[derive(Default)]
606 struct TestTelemetry {
607 reload_success: AtomicU64,
608 reload_failure: AtomicU64,
609 local_hits: AtomicU64,
610 local_misses: AtomicU64,
611 result_hits: AtomicU64,
612 result_misses: AtomicU64,
613 metadata_hits: AtomicU64,
614 metadata_misses: AtomicU64,
615 query_success: AtomicU64,
616 query_failure: AtomicU64,
617 }
618
619 impl KnowledgeTelemetry for TestTelemetry {
620 fn on_cache(&self, event: &CacheTelemetryEvent) {
621 let counter = match (event.layer, event.outcome) {
622 (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
623 (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
624 (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
625 (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
626 (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
627 (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
628 };
629 counter.fetch_add(1, Ordering::Relaxed);
630 }
631
632 fn on_reload(&self, event: &ReloadTelemetryEvent) {
633 let counter = match event.outcome {
634 crate::telemetry::ReloadOutcome::Success => &self.reload_success,
635 crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
636 };
637 counter.fetch_add(1, Ordering::Relaxed);
638 }
639
640 fn on_query(&self, event: &QueryTelemetryEvent) {
641 let counter = if event.success {
642 &self.query_success
643 } else {
644 &self.query_failure
645 };
646 counter.fetch_add(1, Ordering::Relaxed);
647 }
648 }
649
650 fn perf_env_usize(key: &str, default: usize) -> usize {
651 std::env::var(key)
652 .ok()
653 .and_then(|value| value.parse::<usize>().ok())
654 .unwrap_or(default)
655 }
656
657 fn seed_perf_provider(rows: usize) {
658 let db = MemDB::instance();
659 db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
660 .expect("create perf_kv");
661 db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
662 for id in 1..=rows {
663 let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
664 db.execute(sql.as_str()).expect("insert perf_kv row");
665 }
666 db.execute("COMMIT").expect("commit perf_kv load");
667 init_mem_provider(db).expect("init perf provider");
668 }
669
670 #[tokio::test(flavor = "current_thread")]
671 async fn query_async_works_with_mem_provider() {
672 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
673 let db = MemDB::instance();
674 db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
675 .expect("create async_kv");
676 db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
677 .expect("insert async_kv row");
678 init_mem_provider(db).expect("init mem provider");
679
680 let row = query_fields_async(
681 "SELECT value FROM async_kv WHERE id=:id",
682 &[DataField::from_digit(":id", 1)],
683 )
684 .await
685 .expect("query async row");
686 assert_eq!(row.len(), 1);
687 assert_eq!(row[0].to_string(), "chars(hello)");
688 }
689
690 #[derive(Clone)]
691 struct PerfQuery {
692 cache_key: [DataField; 1],
693 query_params: [DataField; 1],
694 bypass_req: QueryRequest,
695 global_req: QueryRequest,
696 }
697
698 fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
699 (0..ops)
700 .map(|idx| {
701 let id = ((idx * 17) % hotset + 1) as i64;
702 let cache_key = [DataField::from_digit("id", id)];
703 let query_params = [DataField::from_digit(":id", id)];
704 let bypass_req = QueryRequest::first_row(
705 "SELECT value FROM perf_kv WHERE id=:id",
706 fields_to_params(&query_params),
707 CachePolicy::Bypass,
708 );
709 let global_req = QueryRequest::first_row(
710 "SELECT value FROM perf_kv WHERE id=:id",
711 fields_to_params(&query_params),
712 CachePolicy::UseGlobal,
713 );
714 PerfQuery {
715 cache_key,
716 query_params,
717 bypass_req,
718 global_req,
719 }
720 })
721 .collect()
722 }
723
724 #[derive(Debug, Clone, Copy)]
725 struct PerfCounters {
726 result_hits: u64,
727 result_misses: u64,
728 local_hits: u64,
729 local_misses: u64,
730 metadata_hits: u64,
731 metadata_misses: u64,
732 }
733
734 #[derive(Debug, Clone)]
735 struct PerfResult {
736 name: &'static str,
737 elapsed: Duration,
738 ops: usize,
739 counters: PerfCounters,
740 }
741
742 impl PerfResult {
743 fn qps(&self) -> f64 {
744 let secs = self.elapsed.as_secs_f64();
745 if secs == 0.0 {
746 self.ops as f64
747 } else {
748 self.ops as f64 / secs
749 }
750 }
751 }
752
753 fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
754 PerfCounters {
755 result_hits: after
756 .result_cache_hits
757 .saturating_sub(before.result_cache_hits),
758 result_misses: after
759 .result_cache_misses
760 .saturating_sub(before.result_cache_misses),
761 local_hits: after
762 .local_cache_hits
763 .saturating_sub(before.local_cache_hits),
764 local_misses: after
765 .local_cache_misses
766 .saturating_sub(before.local_cache_misses),
767 metadata_hits: after
768 .metadata_cache_hits
769 .saturating_sub(before.metadata_cache_hits),
770 metadata_misses: after
771 .metadata_cache_misses
772 .saturating_sub(before.metadata_cache_misses),
773 }
774 }
775
776 fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
777 seed_perf_provider(rows);
778 let before = runtime_snapshot();
779 let started = Instant::now();
780 for item in workload {
781 let row = runtime()
782 .execute(&item.bypass_req)
783 .expect("execute bypass request")
784 .into_row();
785 black_box(row);
786 }
787 let elapsed = started.elapsed();
788 let after = runtime_snapshot();
789 PerfResult {
790 name: "bypass",
791 elapsed,
792 ops: workload.len(),
793 counters: snapshot_delta(&before, &after),
794 }
795 }
796
797 fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
798 seed_perf_provider(rows);
799 let before = runtime_snapshot();
800 let started = Instant::now();
801 for item in workload {
802 let row = runtime()
803 .execute(&item.global_req)
804 .expect("execute global-cache request")
805 .into_row();
806 black_box(row);
807 }
808 let elapsed = started.elapsed();
809 let after = runtime_snapshot();
810 PerfResult {
811 name: "global_cache",
812 elapsed,
813 ops: workload.len(),
814 counters: snapshot_delta(&before, &after),
815 }
816 }
817
818 fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
819 seed_perf_provider(rows);
820 let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
821 let before = runtime_snapshot();
822 let started = Instant::now();
823 for item in workload {
824 let row = cache_query_fields(
825 "SELECT value FROM perf_kv WHERE id=:id",
826 &item.cache_key,
827 &item.query_params,
828 &mut cache,
829 );
830 black_box(row);
831 }
832 let elapsed = started.elapsed();
833 let after = runtime_snapshot();
834 PerfResult {
835 name: "local_cache",
836 elapsed,
837 ops: workload.len(),
838 counters: snapshot_delta(&before, &after),
839 }
840 }
841
842 fn print_perf_result(result: &PerfResult) {
843 eprintln!(
844 "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
845 result.name,
846 result.elapsed.as_millis(),
847 result.qps(),
848 result.counters.result_hits,
849 result.counters.result_misses,
850 result.counters.local_hits,
851 result.counters.local_misses,
852 result.counters.metadata_hits,
853 result.counters.metadata_misses,
854 );
855 }
856
857 fn uniq_cache_cfg_tmp_dir() -> PathBuf {
858 use rand::{Rng, rng};
859 let rnd: u64 = rng().next_u64();
860 std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
861 }
862
863 fn write_minimal_knowdb_with_cache(
864 root: &Path,
865 enabled: bool,
866 capacity: usize,
867 ttl_ms: u64,
868 ) -> std::path::PathBuf {
869 let models = root.join("models").join("knowledge");
870 let example_dir = models.join("example");
871 fs::create_dir_all(&example_dir).expect("create knowdb models/example");
872 fs::write(
873 models.join("knowdb.toml"),
874 format!(
875 r#"
876version = 2
877base_dir = "."
878
879[cache]
880enabled = {enabled}
881capacity = {capacity}
882ttl_ms = {ttl_ms}
883
884[csv]
885has_header = false
886
887[[tables]]
888name = "example"
889columns.by_index = [0,1]
890"#
891 ),
892 )
893 .expect("write knowdb.toml");
894 fs::write(
895 example_dir.join("create.sql"),
896 r#"
897CREATE TABLE IF NOT EXISTS {table} (
898 id INTEGER PRIMARY KEY,
899 value TEXT NOT NULL
900);
901"#,
902 )
903 .expect("write create.sql");
904 fs::write(
905 example_dir.join("insert.sql"),
906 "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
907 )
908 .expect("write insert.sql");
909 fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
910 models.join("knowdb.toml")
911 }
912
913 fn write_provider_only_knowdb_with_cache(
914 root: &Path,
915 provider_kind: &str,
916 connection_uri: &str,
917 enabled: bool,
918 capacity: usize,
919 ttl_ms: u64,
920 ) -> std::path::PathBuf {
921 let models = root.join("models").join("knowledge");
922 fs::create_dir_all(&models).expect("create knowdb models");
923 fs::write(
924 models.join("knowdb.toml"),
925 format!(
926 r#"
927version = 2
928base_dir = "."
929
930[cache]
931enabled = {enabled}
932capacity = {capacity}
933ttl_ms = {ttl_ms}
934
935[provider]
936kind = "{provider_kind}"
937connection_uri = "{connection_uri}"
938"#
939 ),
940 )
941 .expect("write provider knowdb.toml");
942 models.join("knowdb.toml")
943 }
944
945 fn restore_default_result_cache_config() {
946 runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
947 }
948
949 #[test]
950 fn provider_can_be_replaced() {
951 let _guard = crate::runtime::runtime_test_guard()
952 .lock()
953 .expect("provider test guard");
954 let db1 = MemDB::instance();
955 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
956 .expect("create table in db1");
957 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
958 .expect("seed db1");
959 init_mem_provider(db1).expect("init provider db1");
960 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
961 assert_eq!(row[0].to_string(), "chars(first)");
962
963 let db2 = MemDB::instance();
964 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
965 .expect("create table in db2");
966 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
967 .expect("seed db2");
968 init_mem_provider(db2).expect("replace provider with db2");
969 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
970 assert_eq!(row[0].to_string(), "chars(second)");
971 }
972
973 #[test]
974 fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
975 let _guard = crate::runtime::runtime_test_guard()
976 .lock()
977 .expect("provider test guard");
978 COLNAME_CACHE.write().expect("metadata cache lock").clear();
979
980 let db = MemDB::instance();
981 db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
982 .expect("create table");
983 db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
984 .expect("seed table");
985
986 let old_scope = MetadataCacheScope {
987 datasource_id: DatasourceId("sqlite:old".to_string()),
988 generation: Generation(1),
989 };
990 let new_scope = MetadataCacheScope {
991 datasource_id: DatasourceId("sqlite:new".to_string()),
992 generation: Generation(2),
993 };
994 let old_provider = MemProvider {
995 db: db.clone(),
996 metadata_scope: old_scope.clone(),
997 };
998
999 install_provider(
1000 ProviderKind::SqliteAuthority,
1001 new_scope.datasource_id.clone(),
1002 |_generation| {
1003 Ok(Arc::new(MemProvider {
1004 db: db.clone(),
1005 metadata_scope: new_scope.clone(),
1006 }))
1007 },
1008 )
1009 .expect("install new provider");
1010
1011 let row = old_provider
1012 .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1013 .expect("old provider query");
1014 assert_eq!(row[0].to_string(), "chars(scope-old)");
1015
1016 let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1017 assert!(cache.contains(&metadata_cache_key_for_scope(
1018 &old_scope,
1019 "SELECT value FROM cache_scope_t WHERE id = 1",
1020 )));
1021 assert!(!cache.contains(&metadata_cache_key_for_scope(
1022 &new_scope,
1023 "SELECT value FROM cache_scope_t WHERE id = 1",
1024 )));
1025 }
1026
1027 #[tokio::test(flavor = "current_thread")]
1028 async fn async_query_uses_runtime_bridge() {
1029 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1030 let db = MemDB::instance();
1031 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1032 .expect("create table");
1033 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1034 .expect("seed table");
1035 init_mem_provider(db).expect("init provider");
1036
1037 let row = query_row_async("SELECT value FROM t WHERE id = 1")
1038 .await
1039 .expect("async query row");
1040 assert_eq!(row[0].to_string(), "chars(async-first)");
1041 }
1042
1043 #[tokio::test(flavor = "current_thread")]
1044 async fn async_cache_query_fields_hits_local_cache() {
1045 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1046 let db = MemDB::instance();
1047 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1048 .expect("create table");
1049 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1050 .expect("seed table");
1051 init_mem_provider(db).expect("init provider");
1052
1053 let key = [DataField::from_digit("id", 1)];
1054 let params = [DataField::from_digit(":id", 1)];
1055 let mut cache = FieldQueryCache::default();
1056
1057 let first = cache_query_fields_async(
1058 "SELECT value FROM t WHERE id=:id",
1059 &key,
1060 ¶ms,
1061 &mut cache,
1062 )
1063 .await;
1064 let second = cache_query_fields_async(
1065 "SELECT value FROM t WHERE id=:id",
1066 &key,
1067 ¶ms,
1068 &mut cache,
1069 )
1070 .await;
1071
1072 assert_eq!(first[0].to_string(), "chars(async-cache)");
1073 assert_eq!(second[0].to_string(), "chars(async-cache)");
1074 }
1075
1076 #[test]
1077 fn local_cache_is_cleared_when_generation_changes() {
1078 let _guard = crate::runtime::runtime_test_guard()
1079 .lock()
1080 .expect("provider test guard");
1081 let db1 = MemDB::instance();
1082 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1083 .expect("create table in db1");
1084 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1085 .expect("seed db1");
1086 init_mem_provider(db1).expect("init provider db1");
1087
1088 let key = [DataField::from_digit("id", 1)];
1089 let params = [DataField::from_digit(":id", 1)];
1090 let mut cache = FieldQueryCache::default();
1091 let row = cache_query_fields(
1092 "SELECT value FROM t WHERE id=:id",
1093 &key,
1094 ¶ms,
1095 &mut cache,
1096 );
1097 assert_eq!(row[0].to_string(), "chars(first)");
1098
1099 let db2 = MemDB::instance();
1100 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1101 .expect("create table in db2");
1102 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1103 .expect("seed db2");
1104 init_mem_provider(db2).expect("replace provider with db2");
1105
1106 let row = cache_query_fields(
1107 "SELECT value FROM t WHERE id=:id",
1108 &key,
1109 ¶ms,
1110 &mut cache,
1111 );
1112 assert_eq!(row[0].to_string(), "chars(second)");
1113 }
1114
1115 #[test]
1116 fn local_cache_is_scoped_by_sql_text() {
1117 let _guard = crate::runtime::runtime_test_guard()
1118 .lock()
1119 .expect("provider test guard");
1120 let db = MemDB::instance();
1121 db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1122 .expect("create t1");
1123 db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1124 .expect("create t2");
1125 db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1126 .expect("seed t1");
1127 db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1128 .expect("seed t2");
1129 init_mem_provider(db).expect("init provider");
1130
1131 let key = [DataField::from_digit("id", 1)];
1132 let params = [DataField::from_digit(":id", 1)];
1133 let mut cache = FieldQueryCache::default();
1134
1135 let row = cache_query_fields(
1136 "SELECT value FROM t1 WHERE id=:id",
1137 &key,
1138 ¶ms,
1139 &mut cache,
1140 );
1141 assert_eq!(row[0].to_string(), "chars(first)");
1142
1143 let row = cache_query_fields(
1144 "SELECT value FROM t2 WHERE id=:id",
1145 &key,
1146 ¶ms,
1147 &mut cache,
1148 );
1149 assert_eq!(row[0].to_string(), "chars(second)");
1150 }
1151
1152 #[test]
1153 fn runtime_snapshot_tracks_generation_and_cache_size() {
1154 let _guard = crate::runtime::runtime_test_guard()
1155 .lock()
1156 .expect("provider test guard");
1157 let db = MemDB::instance();
1158 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1159 .expect("create table");
1160 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1161 .expect("seed table");
1162 init_mem_provider(db).expect("init provider");
1163
1164 let mut cache = FieldQueryCache::default();
1165 let key = [DataField::from_digit("id", 1)];
1166 let params = [DataField::from_digit(":id", 1)];
1167 let row = cache_query_fields(
1168 "SELECT value FROM t WHERE id=:id",
1169 &key,
1170 ¶ms,
1171 &mut cache,
1172 );
1173 assert_eq!(row[0].to_string(), "chars(first)");
1174
1175 let snapshot = runtime_snapshot();
1176 assert!(matches!(
1177 snapshot.provider_kind,
1178 Some(ProviderKind::SqliteAuthority)
1179 ));
1180 assert!(snapshot.generation.is_some());
1181 assert!(snapshot.result_cache_len >= 1);
1182 assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1183 assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1184 assert!(snapshot.reload_successes >= 1);
1185 }
1186
1187 #[test]
1188 fn metadata_cache_is_scoped_by_generation() {
1189 let _guard = crate::runtime::runtime_test_guard()
1190 .lock()
1191 .expect("provider test guard");
1192 let sql = "SELECT value FROM t WHERE id = 1";
1193 let before = runtime_snapshot().metadata_cache_len;
1194
1195 let db1 = MemDB::instance();
1196 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1197 .expect("create table in db1");
1198 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1199 .expect("seed db1");
1200 init_mem_provider(db1).expect("init provider db1");
1201 let row = query_row(sql).expect("query db1");
1202 assert_eq!(row[0].to_string(), "chars(first)");
1203 let after_first = runtime_snapshot().metadata_cache_len;
1204 assert!(
1205 after_first > before,
1206 "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1207 );
1208
1209 let db2 = MemDB::instance();
1210 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1211 .expect("create table in db2");
1212 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1213 .expect("seed db2");
1214 init_mem_provider(db2).expect("replace provider with db2");
1215 let row = query_row(sql).expect("query db2");
1216 assert_eq!(row[0].to_string(), "chars(second)");
1217 let after_second = runtime_snapshot().metadata_cache_len;
1218 assert!(
1219 after_second > after_first,
1220 "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1221 );
1222 }
1223
1224 #[test]
1225 fn failed_provider_reload_keeps_previous_provider() {
1226 let _guard = crate::runtime::runtime_test_guard()
1227 .lock()
1228 .expect("provider test guard");
1229 let db1 = MemDB::instance();
1230 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1231 .expect("create table in db1");
1232 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1233 .expect("seed db1");
1234 init_mem_provider(db1).expect("init provider db1");
1235 let before_generation = current_generation();
1236
1237 let reload_err = install_provider(
1238 ProviderKind::SqliteAuthority,
1239 datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1240 |_generation| {
1241 Err(Reason::from_logic()
1242 .to_err()
1243 .with_detail("expected reload failure"))
1244 },
1245 );
1246 assert!(reload_err.is_err());
1247
1248 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1249 assert_eq!(row[0].to_string(), "chars(first)");
1250 assert_eq!(current_generation(), before_generation);
1251 }
1252
1253 #[test]
1254 fn runtime_snapshot_records_cache_counters() {
1255 let _guard = crate::runtime::runtime_test_guard()
1256 .lock()
1257 .expect("provider test guard");
1258 let db = MemDB::instance();
1259 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1260 .expect("create table");
1261 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1262 .expect("seed table");
1263 init_mem_provider(db).expect("init provider");
1264
1265 let before = runtime_snapshot();
1266 let mut cache = FieldQueryCache::default();
1267 let key = [DataField::from_digit("id", 1)];
1268 let params = [DataField::from_digit(":id", 1)];
1269 let row = cache_query_fields(
1270 "SELECT value FROM t WHERE id=:id",
1271 &key,
1272 ¶ms,
1273 &mut cache,
1274 );
1275 assert_eq!(row[0].to_string(), "chars(first)");
1276 let row = cache_query_fields(
1277 "SELECT value FROM t WHERE id=:id",
1278 &key,
1279 ¶ms,
1280 &mut cache,
1281 );
1282 assert_eq!(row[0].to_string(), "chars(first)");
1283
1284 let after = runtime_snapshot();
1285 assert!(after.local_cache_hits > before.local_cache_hits);
1286 assert!(after.local_cache_misses > before.local_cache_misses);
1287 assert!(after.result_cache_misses > before.result_cache_misses);
1288 assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1289 }
1290
1291 #[test]
1292 fn telemetry_receives_reload_cache_and_query_events() {
1293 let _guard = crate::runtime::runtime_test_guard()
1294 .lock()
1295 .expect("provider test guard");
1296 let telemetry_impl = Arc::new(TestTelemetry::default());
1297 let previous = install_runtime_telemetry(telemetry_impl.clone());
1298
1299 let db = MemDB::instance();
1300 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1301 .expect("create table");
1302 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1303 .expect("seed table");
1304 init_mem_provider(db).expect("init provider");
1305
1306 let mut cache = FieldQueryCache::default();
1307 let key = [DataField::from_digit("id", 1)];
1308 let params = [DataField::from_digit(":id", 1)];
1309 let row = cache_query_fields(
1310 "SELECT value FROM t WHERE id=:id",
1311 &key,
1312 ¶ms,
1313 &mut cache,
1314 );
1315 assert_eq!(row[0].to_string(), "chars(first)");
1316 let row = cache_query_fields(
1317 "SELECT value FROM t WHERE id=:id",
1318 &key,
1319 ¶ms,
1320 &mut cache,
1321 );
1322 assert_eq!(row[0].to_string(), "chars(first)");
1323
1324 let reload_err = install_provider(
1325 ProviderKind::SqliteAuthority,
1326 datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1327 |_generation| {
1328 Err(Reason::from_logic()
1329 .to_err()
1330 .with_detail("expected telemetry reload failure"))
1331 },
1332 );
1333 assert!(reload_err.is_err());
1334
1335 install_runtime_telemetry(previous);
1336 reset_telemetry();
1337
1338 assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1339 assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1340 assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1341 assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1342 assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1343 assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1344 assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1345 }
1346
1347 #[test]
1348 #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1349 fn cache_perf_reports_cache_vs_no_cache() {
1350 let _guard = crate::runtime::runtime_test_guard()
1351 .lock()
1352 .expect("provider test guard");
1353 let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1354 let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1355 let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1356 let workload = build_perf_workload(ops, hotset);
1357
1358 eprintln!(
1359 "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1360 rows, ops, hotset
1361 );
1362
1363 let bypass = run_bypass_perf(&workload, rows);
1364 let global = run_global_cache_perf(&workload, rows);
1365 let local = run_local_cache_perf(&workload, rows);
1366
1367 print_perf_result(&bypass);
1368 print_perf_result(&global);
1369 print_perf_result(&local);
1370
1371 eprintln!(
1372 "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1373 bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1374 bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1375 );
1376
1377 assert_eq!(bypass.counters.result_hits, 0);
1378 assert_eq!(bypass.counters.result_misses, 0);
1379 assert_eq!(bypass.counters.local_hits, 0);
1380 assert_eq!(bypass.counters.local_misses, 0);
1381 assert!(global.counters.result_hits > 0);
1382 assert!(global.counters.result_misses > 0);
1383 assert_eq!(global.counters.local_hits, 0);
1384 assert_eq!(global.counters.local_misses, 0);
1385 assert!(local.counters.local_hits > 0);
1386 assert!(local.counters.local_misses > 0);
1387 assert!(local.counters.result_misses > 0);
1388 }
1389
1390 #[test]
1391 fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1392 let _guard = crate::runtime::runtime_test_guard()
1393 .lock()
1394 .expect("provider test guard");
1395 let root = uniq_cache_cfg_tmp_dir();
1396 let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1397 let auth_file = root.join(".run").join("authority.sqlite");
1398 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1399 .expect("create authority parent");
1400 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1401
1402 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1403 .expect("init knowdb with cache config");
1404
1405 let snapshot = runtime_snapshot();
1406 assert!(snapshot.result_cache_enabled);
1407 assert_eq!(snapshot.result_cache_capacity, 7);
1408 assert_eq!(snapshot.result_cache_ttl_ms, 5);
1409
1410 let req = QueryRequest::first_row(
1411 "SELECT value FROM example WHERE id=:id",
1412 fields_to_params(&[DataField::from_digit(":id", 1)]),
1413 CachePolicy::UseGlobal,
1414 );
1415 let before = runtime_snapshot();
1416 let row = runtime()
1417 .execute(&req)
1418 .expect("first result-cache query")
1419 .into_row();
1420 assert_eq!(row[0].to_string(), "chars(alpha)");
1421 let row = runtime()
1422 .execute(&req)
1423 .expect("second result-cache query")
1424 .into_row();
1425 assert_eq!(row[0].to_string(), "chars(alpha)");
1426 std::thread::sleep(Duration::from_millis(12));
1427 let row = runtime()
1428 .execute(&req)
1429 .expect("expired result-cache query")
1430 .into_row();
1431 assert_eq!(row[0].to_string(), "chars(alpha)");
1432 let after = runtime_snapshot();
1433
1434 assert!(after.result_cache_hits > before.result_cache_hits);
1435 assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1436
1437 restore_default_result_cache_config();
1438 let _ = fs::remove_dir_all(&root);
1439 }
1440
1441 #[test]
1442 fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1443 let _guard = crate::runtime::runtime_test_guard()
1444 .lock()
1445 .expect("provider test guard");
1446 let root = uniq_cache_cfg_tmp_dir();
1447 let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1448 let auth_file = root.join(".run").join("authority.sqlite");
1449 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1450 .expect("create authority parent");
1451 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1452
1453 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1454 .expect("init knowdb with cache disabled");
1455
1456 let snapshot = runtime_snapshot();
1457 assert!(!snapshot.result_cache_enabled);
1458 assert_eq!(snapshot.result_cache_capacity, 3);
1459 assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1460
1461 let req = QueryRequest::first_row(
1462 "SELECT value FROM example WHERE id=:id",
1463 fields_to_params(&[DataField::from_digit(":id", 1)]),
1464 CachePolicy::UseGlobal,
1465 );
1466 let before = runtime_snapshot();
1467 let _ = runtime()
1468 .execute(&req)
1469 .expect("first bypassed result-cache query");
1470 let _ = runtime()
1471 .execute(&req)
1472 .expect("second bypassed result-cache query");
1473 let after = runtime_snapshot();
1474
1475 assert_eq!(after.result_cache_hits, before.result_cache_hits);
1476 assert_eq!(after.result_cache_misses, before.result_cache_misses);
1477
1478 restore_default_result_cache_config();
1479 let _ = fs::remove_dir_all(&root);
1480 }
1481
1482 #[test]
1483 fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1484 let _guard = crate::runtime::runtime_test_guard()
1485 .lock()
1486 .expect("provider test guard");
1487 restore_default_result_cache_config();
1488
1489 let db = MemDB::instance();
1490 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1491 .expect("create table");
1492 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1493 .expect("seed table");
1494 init_mem_provider(db).expect("init provider");
1495
1496 let before = runtime_snapshot();
1497 let root = uniq_cache_cfg_tmp_dir();
1498 let conf_path = write_provider_only_knowdb_with_cache(
1499 &root,
1500 "mysql",
1501 "not-a-valid-mysql-url",
1502 false,
1503 3,
1504 5,
1505 );
1506 let authority_uri = format!(
1507 "file:{}?mode=rwc&uri=true",
1508 root.join("unused.sqlite").display()
1509 );
1510
1511 let err =
1512 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1513 assert!(err.is_err());
1514
1515 let after = runtime_snapshot();
1516 assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1517 assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1518 assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1519
1520 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1521 assert_eq!(row[0].to_string(), "chars(first)");
1522
1523 let _ = fs::remove_dir_all(&root);
1524 }
1525}