1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5 collections::hash_map::DefaultHasher,
6 hash::{Hash, Hasher},
7};
8
9use async_trait::async_trait;
10use rusqlite::ToSql;
11use rusqlite::{Connection, OpenFlags};
12use wp_error::KnowledgeResult;
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25 CachePolicy, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor, QueryRequest,
26 QueryResponse, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29 CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30 telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34 db: MemDB,
35 metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41 ThreadClonedMDB::query_with_scope(self, sql)
42 }
43
44 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45 ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46 }
47
48 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49 ThreadClonedMDB::query_row_with_scope(self, sql)
50 }
51
52 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53 ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54 }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60 self.db.query_with_scope(&self.metadata_scope, sql)
61 }
62
63 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64 self.db
65 .query_fields_with_scope(&self.metadata_scope, sql, params)
66 }
67
68 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69 self.db.query_row_with_scope(&self.metadata_scope, sql)
70 }
71
72 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73 self.db
74 .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75 }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81 PostgresProvider::query(self, sql)
82 }
83
84 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85 PostgresProvider::query_fields(self, sql, params)
86 }
87
88 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89 PostgresProvider::query_row(self, sql)
90 }
91
92 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93 PostgresProvider::query_named_fields(self, sql, params)
94 }
95
96 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97 PostgresProvider::query_async(self, sql).await
98 }
99
100 async fn query_fields_async(
101 &self,
102 sql: &str,
103 params: &[DataField],
104 ) -> KnowledgeResult<Vec<RowData>> {
105 PostgresProvider::query_fields_async(self, sql, params).await
106 }
107
108 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109 PostgresProvider::query_row_async(self, sql).await
110 }
111
112 async fn query_named_fields_async(
113 &self,
114 sql: &str,
115 params: &[DataField],
116 ) -> KnowledgeResult<RowData> {
117 PostgresProvider::query_named_fields_async(self, sql, params).await
118 }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124 MySqlProvider::query(self, sql)
125 }
126
127 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128 MySqlProvider::query_fields(self, sql, params)
129 }
130
131 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132 MySqlProvider::query_row(self, sql)
133 }
134
135 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136 MySqlProvider::query_named_fields(self, sql, params)
137 }
138
139 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140 MySqlProvider::query_async(self, sql).await
141 }
142
143 async fn query_fields_async(
144 &self,
145 sql: &str,
146 params: &[DataField],
147 ) -> KnowledgeResult<Vec<RowData>> {
148 MySqlProvider::query_fields_async(self, sql, params).await
149 }
150
151 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152 MySqlProvider::query_row_async(self, sql).await
153 }
154
155 async fn query_named_fields_async(
156 &self,
157 sql: &str,
158 params: &[DataField],
159 ) -> KnowledgeResult<RowData> {
160 MySqlProvider::query_named_fields_async(self, sql, params).await
161 }
162}
163
164fn install_provider<F>(
165 kind: ProviderKind,
166 datasource_id: DatasourceId,
167 build: F,
168) -> KnowledgeResult<()>
169where
170 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172 runtime().install_provider(kind, datasource_id, build)?;
173 Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177 DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181 let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182 install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183 Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184 authority_uri,
185 datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186 generation.0,
187 )))
188 })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192 install_provider(
193 ProviderKind::SqliteAuthority,
194 datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195 |generation| {
196 Ok(Arc::new(MemProvider {
197 db: memdb,
198 metadata_scope: MetadataCacheScope {
199 datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200 generation,
201 },
202 }))
203 },
204 )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208 init_postgres_provider_with_config(
209 PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
210 )
211}
212
213pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
214 let connection_uri = config.connection_uri().to_string();
215 let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
216 install_provider(
217 ProviderKind::Postgres,
218 datasource_id.clone(),
219 |generation| {
220 let provider = PostgresProvider::connect(
221 &config,
222 MetadataCacheScope {
223 datasource_id: datasource_id.clone(),
224 generation,
225 },
226 )?;
227 Ok(Arc::new(provider))
228 },
229 )
230}
231
232pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
233 init_mysql_provider_with_config(
234 MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
235 )
236}
237
238pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
239 let connection_uri = config.connection_uri().to_string();
240 let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
241 install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
242 let provider = MySqlProvider::connect(
243 &config,
244 MetadataCacheScope {
245 datasource_id: datasource_id.clone(),
246 generation,
247 },
248 )?;
249 Ok(Arc::new(provider))
250 })
251}
252
253pub fn current_generation() -> Option<u64> {
254 runtime()
255 .current_generation()
256 .map(|generation| generation.0)
257}
258
259pub fn runtime_snapshot() -> RuntimeSnapshot {
260 runtime().snapshot()
261}
262
263pub fn install_runtime_telemetry(
264 telemetry_impl: Arc<dyn KnowledgeTelemetry>,
265) -> Arc<dyn KnowledgeTelemetry> {
266 install_telemetry(telemetry_impl)
267}
268
269pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
270 runtime()
271 .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
272 .map(QueryResponse::into_rows)
273}
274
275pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
276 runtime()
277 .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
278 .await
279 .map(QueryResponse::into_rows)
280}
281
282pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
283 runtime()
284 .execute(&QueryRequest::first_row(
285 sql,
286 Vec::new(),
287 CachePolicy::Bypass,
288 ))
289 .map(QueryResponse::into_row)
290}
291
292pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
293 runtime()
294 .execute_async(&QueryRequest::first_row(
295 sql,
296 Vec::new(),
297 CachePolicy::Bypass,
298 ))
299 .await
300 .map(QueryResponse::into_row)
301}
302
303pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
304 runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
305}
306
307pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
308 runtime()
309 .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
310 .await
311}
312
313pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
314 query_fields(sql, params)
315}
316
317pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
318 query_fields_async(sql, params).await
319}
320
321pub fn query_named<'a>(
322 sql: &str,
323 params: &'a [(&'a str, &'a dyn ToSql)],
324) -> KnowledgeResult<RowData> {
325 let fields = named_params_to_fields(params)?;
326 query_fields(sql, &fields)
327}
328
329pub fn cache_query_fields<const N: usize>(
330 sql: &str,
331 c_params: &[DataField; N],
332 query_params: &[DataField],
333 cache: &mut impl CacheAble<DataField, RowData, N>,
334) -> RowData {
335 cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
336}
337
338pub fn cache_query_fields_with_scope<const N: usize>(
339 sql: &str,
340 local_cache_scope: u64,
341 c_params: &[DataField; N],
342 query_params: &[DataField],
343 cache: &mut impl CacheAble<DataField, RowData, N>,
344) -> RowData {
345 if let Some(generation) = current_generation() {
346 cache.prepare_generation(generation);
347 }
348 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
349 runtime().record_local_cache_hit();
350 if telemetry_enabled() {
351 telemetry().on_cache(&CacheTelemetryEvent {
352 layer: CacheLayer::Local,
353 outcome: CacheOutcome::Hit,
354 provider_kind: runtime().current_provider_kind(),
355 });
356 }
357 return hit.clone();
358 }
359 runtime().record_local_cache_miss();
360 if telemetry_enabled() {
361 telemetry().on_cache(&CacheTelemetryEvent {
362 layer: CacheLayer::Local,
363 outcome: CacheOutcome::Miss,
364 provider_kind: runtime().current_provider_kind(),
365 });
366 }
367
368 match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
369 Ok(row) => {
370 cache.save_scoped(local_cache_scope, c_params, row.clone());
371 row
372 }
373 Err(err) => {
374 warn_kdb!("[kdb] query error: {}", err);
375 Vec::new()
376 }
377 }
378}
379
380pub async fn cache_query_fields_async<const N: usize>(
381 sql: &str,
382 c_params: &[DataField; N],
383 query_params: &[DataField],
384 cache: &mut impl CacheAble<DataField, RowData, N>,
385) -> RowData {
386 cache_query_fields_async_with_scope(
387 sql,
388 stable_hash(sql),
389 c_params,
390 || query_params.to_vec(),
391 cache,
392 )
393 .await
394}
395
396pub async fn cache_query_fields_async_with<const N: usize, F>(
397 sql: &str,
398 c_params: &[DataField; N],
399 build_query_params: F,
400 cache: &mut impl CacheAble<DataField, RowData, N>,
401) -> RowData
402where
403 F: FnOnce() -> Vec<DataField>,
404{
405 cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
406 .await
407}
408
409pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
410 sql: &str,
411 local_cache_scope: u64,
412 c_params: &[DataField; N],
413 build_query_params: F,
414 cache: &mut impl CacheAble<DataField, RowData, N>,
415) -> RowData
416where
417 F: FnOnce() -> Vec<DataField>,
418{
419 if let Some(generation) = current_generation() {
420 cache.prepare_generation(generation);
421 }
422 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
423 runtime().record_local_cache_hit();
424 if telemetry_enabled() {
425 telemetry().on_cache(&CacheTelemetryEvent {
426 layer: CacheLayer::Local,
427 outcome: CacheOutcome::Hit,
428 provider_kind: runtime().current_provider_kind(),
429 });
430 }
431 return hit.clone();
432 }
433 runtime().record_local_cache_miss();
434 if telemetry_enabled() {
435 telemetry().on_cache(&CacheTelemetryEvent {
436 layer: CacheLayer::Local,
437 outcome: CacheOutcome::Miss,
438 provider_kind: runtime().current_provider_kind(),
439 });
440 }
441
442 let query_params = build_query_params();
443 match runtime()
444 .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
445 .await
446 {
447 Ok(row) => {
448 cache.save_scoped(local_cache_scope, c_params, row.clone());
449 row
450 }
451 Err(err) => {
452 warn_kdb!("[kdb] query error: {}", err);
453 Vec::new()
454 }
455 }
456}
457
458pub fn cache_query<const N: usize>(
459 sql: &str,
460 c_params: &[DataField; N],
461 named_params: &[(&str, &dyn ToSql)],
462 cache: &mut impl CacheAble<DataField, RowData, N>,
463) -> RowData {
464 let query_fields = match named_params_to_fields(named_params) {
465 Ok(fields) => fields,
466 Err(err) => {
467 warn_kdb!("[kdb] query param conversion error: {}", err);
468 return Vec::new();
469 }
470 };
471
472 cache_query_fields(sql, c_params, &query_fields, cache)
473}
474
475pub async fn cache_query_async<const N: usize>(
476 sql: &str,
477 c_params: &[DataField; N],
478 named_params: &[(&str, &dyn ToSql)],
479 cache: &mut impl CacheAble<DataField, RowData, N>,
480) -> RowData {
481 let query_fields = match named_params_to_fields(named_params) {
482 Ok(fields) => fields,
483 Err(err) => {
484 warn_kdb!("[kdb] query param conversion error: {}", err);
485 return Vec::new();
486 }
487 };
488
489 cache_query_fields_async(sql, c_params, &query_fields, cache).await
490}
491
492fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
493 if let Ok(conn) = Connection::open_with_flags(
494 authority_uri,
495 OpenFlags::SQLITE_OPEN_READ_WRITE
496 | OpenFlags::SQLITE_OPEN_CREATE
497 | OpenFlags::SQLITE_OPEN_URI,
498 ) {
499 let _ = conn.execute_batch(
500 "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
501 );
502 }
503 Ok(())
504}
505
506pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
507 ensure_wal(authority_uri)?;
508 let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
509 let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
510 init_mem_provider(mem)
511}
512
513pub fn init_thread_cloned_from_knowdb(
514 root: &Path,
515 knowdb_conf: &Path,
516 authority_uri: &str,
517 dict: &orion_variate::EnvDict,
518) -> KnowledgeResult<()> {
519 let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
520 if let Some(provider) = conf.provider {
521 match provider.kind {
522 ProviderKind::Postgres => {
523 info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
524 init_postgres_provider_with_config(
525 PostgresProviderConfig::new(provider.connection_uri)
526 .with_pool_size(provider.pool_size)
527 .with_min_connections(provider.min_connections)
528 .with_acquire_timeout_ms(provider.acquire_timeout_ms)
529 .with_idle_timeout_ms(provider.idle_timeout_ms)
530 .with_max_lifetime_ms(provider.max_lifetime_ms),
531 )?;
532 runtime().configure_result_cache(
533 conf.cache.enabled,
534 conf.cache.capacity,
535 Duration::from_millis(conf.cache.ttl_ms.max(1)),
536 );
537 return Ok(());
538 }
539 ProviderKind::Mysql => {
540 info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
541 init_mysql_provider_with_config(
542 MySqlProviderConfig::new(provider.connection_uri)
543 .with_pool_size(provider.pool_size)
544 .with_min_connections(provider.min_connections)
545 .with_acquire_timeout_ms(provider.acquire_timeout_ms)
546 .with_idle_timeout_ms(provider.idle_timeout_ms)
547 .with_max_lifetime_ms(provider.max_lifetime_ms),
548 )?;
549 runtime().configure_result_cache(
550 conf.cache.enabled,
551 conf.cache.capacity,
552 Duration::from_millis(conf.cache.ttl_ms.max(1)),
553 );
554 return Ok(());
555 }
556 ProviderKind::SqliteAuthority => {}
557 }
558 }
559
560 crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
561 let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
562 let path_part = rest.split('?').next().unwrap_or(rest);
563 format!("file:{}?mode=ro&uri=true", path_part)
564 } else {
565 authority_uri.to_string()
566 };
567
568 info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
569 init_thread_cloned_from_authority(&ro_uri)?;
570 runtime().configure_result_cache(
571 conf.cache.enabled,
572 conf.cache.capacity,
573 Duration::from_millis(conf.cache.ttl_ms.max(1)),
574 );
575 Ok(())
576}
577
578fn stable_hash(value: &str) -> u64 {
579 let mut hasher = DefaultHasher::new();
580 value.hash(&mut hasher);
581 hasher.finish()
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use crate::cache::FieldQueryCache;
588 use crate::mem::memdb::MemDB;
589 use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
590 use crate::runtime::fields_to_params;
591 use crate::telemetry::{
592 CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
593 ReloadTelemetryEvent, reset_telemetry,
594 };
595 use orion_error::{ToStructError, UvsFrom};
596 use orion_variate::EnvDict;
597 use std::fs;
598 use std::hint::black_box;
599 use std::path::PathBuf;
600 use std::sync::atomic::{AtomicU64, Ordering};
601 use std::time::{Duration, Instant};
602 use wp_error::KnowledgeReason;
603
604 #[derive(Default)]
605 struct TestTelemetry {
606 reload_success: AtomicU64,
607 reload_failure: AtomicU64,
608 local_hits: AtomicU64,
609 local_misses: AtomicU64,
610 result_hits: AtomicU64,
611 result_misses: AtomicU64,
612 metadata_hits: AtomicU64,
613 metadata_misses: AtomicU64,
614 query_success: AtomicU64,
615 query_failure: AtomicU64,
616 }
617
618 impl KnowledgeTelemetry for TestTelemetry {
619 fn on_cache(&self, event: &CacheTelemetryEvent) {
620 let counter = match (event.layer, event.outcome) {
621 (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
622 (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
623 (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
624 (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
625 (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
626 (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
627 };
628 counter.fetch_add(1, Ordering::Relaxed);
629 }
630
631 fn on_reload(&self, event: &ReloadTelemetryEvent) {
632 let counter = match event.outcome {
633 crate::telemetry::ReloadOutcome::Success => &self.reload_success,
634 crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
635 };
636 counter.fetch_add(1, Ordering::Relaxed);
637 }
638
639 fn on_query(&self, event: &QueryTelemetryEvent) {
640 let counter = if event.success {
641 &self.query_success
642 } else {
643 &self.query_failure
644 };
645 counter.fetch_add(1, Ordering::Relaxed);
646 }
647 }
648
649 fn perf_env_usize(key: &str, default: usize) -> usize {
650 std::env::var(key)
651 .ok()
652 .and_then(|value| value.parse::<usize>().ok())
653 .unwrap_or(default)
654 }
655
656 fn seed_perf_provider(rows: usize) {
657 let db = MemDB::instance();
658 db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
659 .expect("create perf_kv");
660 db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
661 for id in 1..=rows {
662 let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
663 db.execute(sql.as_str()).expect("insert perf_kv row");
664 }
665 db.execute("COMMIT").expect("commit perf_kv load");
666 init_mem_provider(db).expect("init perf provider");
667 }
668
669 #[tokio::test(flavor = "current_thread")]
670 async fn query_async_works_with_mem_provider() {
671 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
672 let db = MemDB::instance();
673 db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
674 .expect("create async_kv");
675 db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
676 .expect("insert async_kv row");
677 init_mem_provider(db).expect("init mem provider");
678
679 let row = query_fields_async(
680 "SELECT value FROM async_kv WHERE id=:id",
681 &[DataField::from_digit(":id", 1)],
682 )
683 .await
684 .expect("query async row");
685 assert_eq!(row.len(), 1);
686 assert_eq!(row[0].to_string(), "chars(hello)");
687 }
688
689 #[derive(Clone)]
690 struct PerfQuery {
691 cache_key: [DataField; 1],
692 query_params: [DataField; 1],
693 bypass_req: QueryRequest,
694 global_req: QueryRequest,
695 }
696
697 fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
698 (0..ops)
699 .map(|idx| {
700 let id = ((idx * 17) % hotset + 1) as i64;
701 let cache_key = [DataField::from_digit("id", id)];
702 let query_params = [DataField::from_digit(":id", id)];
703 let bypass_req = QueryRequest::first_row(
704 "SELECT value FROM perf_kv WHERE id=:id",
705 fields_to_params(&query_params),
706 CachePolicy::Bypass,
707 );
708 let global_req = QueryRequest::first_row(
709 "SELECT value FROM perf_kv WHERE id=:id",
710 fields_to_params(&query_params),
711 CachePolicy::UseGlobal,
712 );
713 PerfQuery {
714 cache_key,
715 query_params,
716 bypass_req,
717 global_req,
718 }
719 })
720 .collect()
721 }
722
723 #[derive(Debug, Clone, Copy)]
724 struct PerfCounters {
725 result_hits: u64,
726 result_misses: u64,
727 local_hits: u64,
728 local_misses: u64,
729 metadata_hits: u64,
730 metadata_misses: u64,
731 }
732
733 #[derive(Debug, Clone)]
734 struct PerfResult {
735 name: &'static str,
736 elapsed: Duration,
737 ops: usize,
738 counters: PerfCounters,
739 }
740
741 impl PerfResult {
742 fn qps(&self) -> f64 {
743 let secs = self.elapsed.as_secs_f64();
744 if secs == 0.0 {
745 self.ops as f64
746 } else {
747 self.ops as f64 / secs
748 }
749 }
750 }
751
752 fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
753 PerfCounters {
754 result_hits: after
755 .result_cache_hits
756 .saturating_sub(before.result_cache_hits),
757 result_misses: after
758 .result_cache_misses
759 .saturating_sub(before.result_cache_misses),
760 local_hits: after
761 .local_cache_hits
762 .saturating_sub(before.local_cache_hits),
763 local_misses: after
764 .local_cache_misses
765 .saturating_sub(before.local_cache_misses),
766 metadata_hits: after
767 .metadata_cache_hits
768 .saturating_sub(before.metadata_cache_hits),
769 metadata_misses: after
770 .metadata_cache_misses
771 .saturating_sub(before.metadata_cache_misses),
772 }
773 }
774
775 fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
776 seed_perf_provider(rows);
777 let before = runtime_snapshot();
778 let started = Instant::now();
779 for item in workload {
780 let row = runtime()
781 .execute(&item.bypass_req)
782 .expect("execute bypass request")
783 .into_row();
784 black_box(row);
785 }
786 let elapsed = started.elapsed();
787 let after = runtime_snapshot();
788 PerfResult {
789 name: "bypass",
790 elapsed,
791 ops: workload.len(),
792 counters: snapshot_delta(&before, &after),
793 }
794 }
795
796 fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
797 seed_perf_provider(rows);
798 let before = runtime_snapshot();
799 let started = Instant::now();
800 for item in workload {
801 let row = runtime()
802 .execute(&item.global_req)
803 .expect("execute global-cache request")
804 .into_row();
805 black_box(row);
806 }
807 let elapsed = started.elapsed();
808 let after = runtime_snapshot();
809 PerfResult {
810 name: "global_cache",
811 elapsed,
812 ops: workload.len(),
813 counters: snapshot_delta(&before, &after),
814 }
815 }
816
817 fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
818 seed_perf_provider(rows);
819 let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
820 let before = runtime_snapshot();
821 let started = Instant::now();
822 for item in workload {
823 let row = cache_query_fields(
824 "SELECT value FROM perf_kv WHERE id=:id",
825 &item.cache_key,
826 &item.query_params,
827 &mut cache,
828 );
829 black_box(row);
830 }
831 let elapsed = started.elapsed();
832 let after = runtime_snapshot();
833 PerfResult {
834 name: "local_cache",
835 elapsed,
836 ops: workload.len(),
837 counters: snapshot_delta(&before, &after),
838 }
839 }
840
841 fn print_perf_result(result: &PerfResult) {
842 eprintln!(
843 "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
844 result.name,
845 result.elapsed.as_millis(),
846 result.qps(),
847 result.counters.result_hits,
848 result.counters.result_misses,
849 result.counters.local_hits,
850 result.counters.local_misses,
851 result.counters.metadata_hits,
852 result.counters.metadata_misses,
853 );
854 }
855
856 fn uniq_cache_cfg_tmp_dir() -> PathBuf {
857 use rand::{Rng, rng};
858 let rnd: u64 = rng().next_u64();
859 std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
860 }
861
862 fn write_minimal_knowdb_with_cache(
863 root: &Path,
864 enabled: bool,
865 capacity: usize,
866 ttl_ms: u64,
867 ) -> std::path::PathBuf {
868 let models = root.join("models").join("knowledge");
869 let example_dir = models.join("example");
870 fs::create_dir_all(&example_dir).expect("create knowdb models/example");
871 fs::write(
872 models.join("knowdb.toml"),
873 format!(
874 r#"
875version = 2
876base_dir = "."
877
878[cache]
879enabled = {enabled}
880capacity = {capacity}
881ttl_ms = {ttl_ms}
882
883[csv]
884has_header = false
885
886[[tables]]
887name = "example"
888columns.by_index = [0,1]
889"#
890 ),
891 )
892 .expect("write knowdb.toml");
893 fs::write(
894 example_dir.join("create.sql"),
895 r#"
896CREATE TABLE IF NOT EXISTS {table} (
897 id INTEGER PRIMARY KEY,
898 value TEXT NOT NULL
899);
900"#,
901 )
902 .expect("write create.sql");
903 fs::write(
904 example_dir.join("insert.sql"),
905 "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
906 )
907 .expect("write insert.sql");
908 fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
909 models.join("knowdb.toml")
910 }
911
912 fn write_provider_only_knowdb_with_cache(
913 root: &Path,
914 provider_kind: &str,
915 connection_uri: &str,
916 enabled: bool,
917 capacity: usize,
918 ttl_ms: u64,
919 ) -> std::path::PathBuf {
920 let models = root.join("models").join("knowledge");
921 fs::create_dir_all(&models).expect("create knowdb models");
922 fs::write(
923 models.join("knowdb.toml"),
924 format!(
925 r#"
926version = 2
927base_dir = "."
928
929[cache]
930enabled = {enabled}
931capacity = {capacity}
932ttl_ms = {ttl_ms}
933
934[provider]
935kind = "{provider_kind}"
936connection_uri = "{connection_uri}"
937"#
938 ),
939 )
940 .expect("write provider knowdb.toml");
941 models.join("knowdb.toml")
942 }
943
944 fn restore_default_result_cache_config() {
945 runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
946 }
947
948 #[test]
949 fn provider_can_be_replaced() {
950 let _guard = crate::runtime::runtime_test_guard()
951 .lock()
952 .expect("provider test guard");
953 let db1 = MemDB::instance();
954 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
955 .expect("create table in db1");
956 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
957 .expect("seed db1");
958 init_mem_provider(db1).expect("init provider db1");
959 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
960 assert_eq!(row[0].to_string(), "chars(first)");
961
962 let db2 = MemDB::instance();
963 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
964 .expect("create table in db2");
965 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
966 .expect("seed db2");
967 init_mem_provider(db2).expect("replace provider with db2");
968 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
969 assert_eq!(row[0].to_string(), "chars(second)");
970 }
971
972 #[test]
973 fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
974 let _guard = crate::runtime::runtime_test_guard()
975 .lock()
976 .expect("provider test guard");
977 COLNAME_CACHE.write().expect("metadata cache lock").clear();
978
979 let db = MemDB::instance();
980 db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
981 .expect("create table");
982 db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
983 .expect("seed table");
984
985 let old_scope = MetadataCacheScope {
986 datasource_id: DatasourceId("sqlite:old".to_string()),
987 generation: Generation(1),
988 };
989 let new_scope = MetadataCacheScope {
990 datasource_id: DatasourceId("sqlite:new".to_string()),
991 generation: Generation(2),
992 };
993 let old_provider = MemProvider {
994 db: db.clone(),
995 metadata_scope: old_scope.clone(),
996 };
997
998 install_provider(
999 ProviderKind::SqliteAuthority,
1000 new_scope.datasource_id.clone(),
1001 |_generation| {
1002 Ok(Arc::new(MemProvider {
1003 db: db.clone(),
1004 metadata_scope: new_scope.clone(),
1005 }))
1006 },
1007 )
1008 .expect("install new provider");
1009
1010 let row = old_provider
1011 .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1012 .expect("old provider query");
1013 assert_eq!(row[0].to_string(), "chars(scope-old)");
1014
1015 let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1016 assert!(cache.contains(&metadata_cache_key_for_scope(
1017 &old_scope,
1018 "SELECT value FROM cache_scope_t WHERE id = 1",
1019 )));
1020 assert!(!cache.contains(&metadata_cache_key_for_scope(
1021 &new_scope,
1022 "SELECT value FROM cache_scope_t WHERE id = 1",
1023 )));
1024 }
1025
1026 #[tokio::test(flavor = "current_thread")]
1027 async fn async_query_uses_runtime_bridge() {
1028 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1029 let db = MemDB::instance();
1030 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1031 .expect("create table");
1032 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1033 .expect("seed table");
1034 init_mem_provider(db).expect("init provider");
1035
1036 let row = query_row_async("SELECT value FROM t WHERE id = 1")
1037 .await
1038 .expect("async query row");
1039 assert_eq!(row[0].to_string(), "chars(async-first)");
1040 }
1041
1042 #[tokio::test(flavor = "current_thread")]
1043 async fn async_cache_query_fields_hits_local_cache() {
1044 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1045 let db = MemDB::instance();
1046 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1047 .expect("create table");
1048 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1049 .expect("seed table");
1050 init_mem_provider(db).expect("init provider");
1051
1052 let key = [DataField::from_digit("id", 1)];
1053 let params = [DataField::from_digit(":id", 1)];
1054 let mut cache = FieldQueryCache::default();
1055
1056 let first = cache_query_fields_async(
1057 "SELECT value FROM t WHERE id=:id",
1058 &key,
1059 ¶ms,
1060 &mut cache,
1061 )
1062 .await;
1063 let second = cache_query_fields_async(
1064 "SELECT value FROM t WHERE id=:id",
1065 &key,
1066 ¶ms,
1067 &mut cache,
1068 )
1069 .await;
1070
1071 assert_eq!(first[0].to_string(), "chars(async-cache)");
1072 assert_eq!(second[0].to_string(), "chars(async-cache)");
1073 }
1074
1075 #[test]
1076 fn local_cache_is_cleared_when_generation_changes() {
1077 let _guard = crate::runtime::runtime_test_guard()
1078 .lock()
1079 .expect("provider test guard");
1080 let db1 = MemDB::instance();
1081 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1082 .expect("create table in db1");
1083 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1084 .expect("seed db1");
1085 init_mem_provider(db1).expect("init provider db1");
1086
1087 let key = [DataField::from_digit("id", 1)];
1088 let params = [DataField::from_digit(":id", 1)];
1089 let mut cache = FieldQueryCache::default();
1090 let row = cache_query_fields(
1091 "SELECT value FROM t WHERE id=:id",
1092 &key,
1093 ¶ms,
1094 &mut cache,
1095 );
1096 assert_eq!(row[0].to_string(), "chars(first)");
1097
1098 let db2 = MemDB::instance();
1099 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1100 .expect("create table in db2");
1101 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1102 .expect("seed db2");
1103 init_mem_provider(db2).expect("replace provider with db2");
1104
1105 let row = cache_query_fields(
1106 "SELECT value FROM t WHERE id=:id",
1107 &key,
1108 ¶ms,
1109 &mut cache,
1110 );
1111 assert_eq!(row[0].to_string(), "chars(second)");
1112 }
1113
1114 #[test]
1115 fn local_cache_is_scoped_by_sql_text() {
1116 let _guard = crate::runtime::runtime_test_guard()
1117 .lock()
1118 .expect("provider test guard");
1119 let db = MemDB::instance();
1120 db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1121 .expect("create t1");
1122 db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1123 .expect("create t2");
1124 db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1125 .expect("seed t1");
1126 db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1127 .expect("seed t2");
1128 init_mem_provider(db).expect("init provider");
1129
1130 let key = [DataField::from_digit("id", 1)];
1131 let params = [DataField::from_digit(":id", 1)];
1132 let mut cache = FieldQueryCache::default();
1133
1134 let row = cache_query_fields(
1135 "SELECT value FROM t1 WHERE id=:id",
1136 &key,
1137 ¶ms,
1138 &mut cache,
1139 );
1140 assert_eq!(row[0].to_string(), "chars(first)");
1141
1142 let row = cache_query_fields(
1143 "SELECT value FROM t2 WHERE id=:id",
1144 &key,
1145 ¶ms,
1146 &mut cache,
1147 );
1148 assert_eq!(row[0].to_string(), "chars(second)");
1149 }
1150
1151 #[test]
1152 fn runtime_snapshot_tracks_generation_and_cache_size() {
1153 let _guard = crate::runtime::runtime_test_guard()
1154 .lock()
1155 .expect("provider test guard");
1156 let db = MemDB::instance();
1157 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1158 .expect("create table");
1159 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1160 .expect("seed table");
1161 init_mem_provider(db).expect("init provider");
1162
1163 let mut cache = FieldQueryCache::default();
1164 let key = [DataField::from_digit("id", 1)];
1165 let params = [DataField::from_digit(":id", 1)];
1166 let row = cache_query_fields(
1167 "SELECT value FROM t WHERE id=:id",
1168 &key,
1169 ¶ms,
1170 &mut cache,
1171 );
1172 assert_eq!(row[0].to_string(), "chars(first)");
1173
1174 let snapshot = runtime_snapshot();
1175 assert!(matches!(
1176 snapshot.provider_kind,
1177 Some(ProviderKind::SqliteAuthority)
1178 ));
1179 assert!(snapshot.generation.is_some());
1180 assert!(snapshot.result_cache_len >= 1);
1181 assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1182 assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1183 assert!(snapshot.reload_successes >= 1);
1184 }
1185
1186 #[test]
1187 fn metadata_cache_is_scoped_by_generation() {
1188 let _guard = crate::runtime::runtime_test_guard()
1189 .lock()
1190 .expect("provider test guard");
1191 let sql = "SELECT value FROM t WHERE id = 1";
1192 let before = runtime_snapshot().metadata_cache_len;
1193
1194 let db1 = MemDB::instance();
1195 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1196 .expect("create table in db1");
1197 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1198 .expect("seed db1");
1199 init_mem_provider(db1).expect("init provider db1");
1200 let row = query_row(sql).expect("query db1");
1201 assert_eq!(row[0].to_string(), "chars(first)");
1202 let after_first = runtime_snapshot().metadata_cache_len;
1203 assert!(
1204 after_first > before,
1205 "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1206 );
1207
1208 let db2 = MemDB::instance();
1209 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1210 .expect("create table in db2");
1211 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1212 .expect("seed db2");
1213 init_mem_provider(db2).expect("replace provider with db2");
1214 let row = query_row(sql).expect("query db2");
1215 assert_eq!(row[0].to_string(), "chars(second)");
1216 let after_second = runtime_snapshot().metadata_cache_len;
1217 assert!(
1218 after_second > after_first,
1219 "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1220 );
1221 }
1222
1223 #[test]
1224 fn failed_provider_reload_keeps_previous_provider() {
1225 let _guard = crate::runtime::runtime_test_guard()
1226 .lock()
1227 .expect("provider test guard");
1228 let db1 = MemDB::instance();
1229 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1230 .expect("create table in db1");
1231 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1232 .expect("seed db1");
1233 init_mem_provider(db1).expect("init provider db1");
1234 let before_generation = current_generation();
1235
1236 let reload_err = install_provider(
1237 ProviderKind::SqliteAuthority,
1238 datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1239 |_generation| {
1240 Err(KnowledgeReason::from_logic()
1241 .to_err()
1242 .with_detail("expected reload failure"))
1243 },
1244 );
1245 assert!(reload_err.is_err());
1246
1247 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1248 assert_eq!(row[0].to_string(), "chars(first)");
1249 assert_eq!(current_generation(), before_generation);
1250 }
1251
1252 #[test]
1253 fn runtime_snapshot_records_cache_counters() {
1254 let _guard = crate::runtime::runtime_test_guard()
1255 .lock()
1256 .expect("provider test guard");
1257 let db = MemDB::instance();
1258 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1259 .expect("create table");
1260 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1261 .expect("seed table");
1262 init_mem_provider(db).expect("init provider");
1263
1264 let before = runtime_snapshot();
1265 let mut cache = FieldQueryCache::default();
1266 let key = [DataField::from_digit("id", 1)];
1267 let params = [DataField::from_digit(":id", 1)];
1268 let row = cache_query_fields(
1269 "SELECT value FROM t WHERE id=:id",
1270 &key,
1271 ¶ms,
1272 &mut cache,
1273 );
1274 assert_eq!(row[0].to_string(), "chars(first)");
1275 let row = cache_query_fields(
1276 "SELECT value FROM t WHERE id=:id",
1277 &key,
1278 ¶ms,
1279 &mut cache,
1280 );
1281 assert_eq!(row[0].to_string(), "chars(first)");
1282
1283 let after = runtime_snapshot();
1284 assert!(after.local_cache_hits > before.local_cache_hits);
1285 assert!(after.local_cache_misses > before.local_cache_misses);
1286 assert!(after.result_cache_misses > before.result_cache_misses);
1287 assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1288 }
1289
1290 #[test]
1291 fn telemetry_receives_reload_cache_and_query_events() {
1292 let _guard = crate::runtime::runtime_test_guard()
1293 .lock()
1294 .expect("provider test guard");
1295 let telemetry_impl = Arc::new(TestTelemetry::default());
1296 let previous = install_runtime_telemetry(telemetry_impl.clone());
1297
1298 let db = MemDB::instance();
1299 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1300 .expect("create table");
1301 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1302 .expect("seed table");
1303 init_mem_provider(db).expect("init provider");
1304
1305 let mut cache = FieldQueryCache::default();
1306 let key = [DataField::from_digit("id", 1)];
1307 let params = [DataField::from_digit(":id", 1)];
1308 let row = cache_query_fields(
1309 "SELECT value FROM t WHERE id=:id",
1310 &key,
1311 ¶ms,
1312 &mut cache,
1313 );
1314 assert_eq!(row[0].to_string(), "chars(first)");
1315 let row = cache_query_fields(
1316 "SELECT value FROM t WHERE id=:id",
1317 &key,
1318 ¶ms,
1319 &mut cache,
1320 );
1321 assert_eq!(row[0].to_string(), "chars(first)");
1322
1323 let reload_err = install_provider(
1324 ProviderKind::SqliteAuthority,
1325 datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1326 |_generation| {
1327 Err(KnowledgeReason::from_logic()
1328 .to_err()
1329 .with_detail("expected telemetry reload failure"))
1330 },
1331 );
1332 assert!(reload_err.is_err());
1333
1334 install_runtime_telemetry(previous);
1335 reset_telemetry();
1336
1337 assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1338 assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1339 assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1340 assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1341 assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1342 assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1343 assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1344 }
1345
1346 #[test]
1347 #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1348 fn cache_perf_reports_cache_vs_no_cache() {
1349 let _guard = crate::runtime::runtime_test_guard()
1350 .lock()
1351 .expect("provider test guard");
1352 let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1353 let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1354 let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1355 let workload = build_perf_workload(ops, hotset);
1356
1357 eprintln!(
1358 "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1359 rows, ops, hotset
1360 );
1361
1362 let bypass = run_bypass_perf(&workload, rows);
1363 let global = run_global_cache_perf(&workload, rows);
1364 let local = run_local_cache_perf(&workload, rows);
1365
1366 print_perf_result(&bypass);
1367 print_perf_result(&global);
1368 print_perf_result(&local);
1369
1370 eprintln!(
1371 "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1372 bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1373 bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1374 );
1375
1376 assert_eq!(bypass.counters.result_hits, 0);
1377 assert_eq!(bypass.counters.result_misses, 0);
1378 assert_eq!(bypass.counters.local_hits, 0);
1379 assert_eq!(bypass.counters.local_misses, 0);
1380 assert!(global.counters.result_hits > 0);
1381 assert!(global.counters.result_misses > 0);
1382 assert_eq!(global.counters.local_hits, 0);
1383 assert_eq!(global.counters.local_misses, 0);
1384 assert!(local.counters.local_hits > 0);
1385 assert!(local.counters.local_misses > 0);
1386 assert!(local.counters.result_misses > 0);
1387 }
1388
1389 #[test]
1390 fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1391 let _guard = crate::runtime::runtime_test_guard()
1392 .lock()
1393 .expect("provider test guard");
1394 let root = uniq_cache_cfg_tmp_dir();
1395 let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1396 let auth_file = root.join(".run").join("authority.sqlite");
1397 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1398 .expect("create authority parent");
1399 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1400
1401 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1402 .expect("init knowdb with cache config");
1403
1404 let snapshot = runtime_snapshot();
1405 assert!(snapshot.result_cache_enabled);
1406 assert_eq!(snapshot.result_cache_capacity, 7);
1407 assert_eq!(snapshot.result_cache_ttl_ms, 5);
1408
1409 let req = QueryRequest::first_row(
1410 "SELECT value FROM example WHERE id=:id",
1411 fields_to_params(&[DataField::from_digit(":id", 1)]),
1412 CachePolicy::UseGlobal,
1413 );
1414 let before = runtime_snapshot();
1415 let row = runtime()
1416 .execute(&req)
1417 .expect("first result-cache query")
1418 .into_row();
1419 assert_eq!(row[0].to_string(), "chars(alpha)");
1420 let row = runtime()
1421 .execute(&req)
1422 .expect("second result-cache query")
1423 .into_row();
1424 assert_eq!(row[0].to_string(), "chars(alpha)");
1425 std::thread::sleep(Duration::from_millis(12));
1426 let row = runtime()
1427 .execute(&req)
1428 .expect("expired result-cache query")
1429 .into_row();
1430 assert_eq!(row[0].to_string(), "chars(alpha)");
1431 let after = runtime_snapshot();
1432
1433 assert!(after.result_cache_hits > before.result_cache_hits);
1434 assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1435
1436 restore_default_result_cache_config();
1437 let _ = fs::remove_dir_all(&root);
1438 }
1439
1440 #[test]
1441 fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1442 let _guard = crate::runtime::runtime_test_guard()
1443 .lock()
1444 .expect("provider test guard");
1445 let root = uniq_cache_cfg_tmp_dir();
1446 let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1447 let auth_file = root.join(".run").join("authority.sqlite");
1448 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1449 .expect("create authority parent");
1450 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1451
1452 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1453 .expect("init knowdb with cache disabled");
1454
1455 let snapshot = runtime_snapshot();
1456 assert!(!snapshot.result_cache_enabled);
1457 assert_eq!(snapshot.result_cache_capacity, 3);
1458 assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1459
1460 let req = QueryRequest::first_row(
1461 "SELECT value FROM example WHERE id=:id",
1462 fields_to_params(&[DataField::from_digit(":id", 1)]),
1463 CachePolicy::UseGlobal,
1464 );
1465 let before = runtime_snapshot();
1466 let _ = runtime()
1467 .execute(&req)
1468 .expect("first bypassed result-cache query");
1469 let _ = runtime()
1470 .execute(&req)
1471 .expect("second bypassed result-cache query");
1472 let after = runtime_snapshot();
1473
1474 assert_eq!(after.result_cache_hits, before.result_cache_hits);
1475 assert_eq!(after.result_cache_misses, before.result_cache_misses);
1476
1477 restore_default_result_cache_config();
1478 let _ = fs::remove_dir_all(&root);
1479 }
1480
1481 #[test]
1482 fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1483 let _guard = crate::runtime::runtime_test_guard()
1484 .lock()
1485 .expect("provider test guard");
1486 restore_default_result_cache_config();
1487
1488 let db = MemDB::instance();
1489 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1490 .expect("create table");
1491 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1492 .expect("seed table");
1493 init_mem_provider(db).expect("init provider");
1494
1495 let before = runtime_snapshot();
1496 let root = uniq_cache_cfg_tmp_dir();
1497 let conf_path = write_provider_only_knowdb_with_cache(
1498 &root,
1499 "mysql",
1500 "not-a-valid-mysql-url",
1501 false,
1502 3,
1503 5,
1504 );
1505 let authority_uri = format!(
1506 "file:{}?mode=rwc&uri=true",
1507 root.join("unused.sqlite").display()
1508 );
1509
1510 let err =
1511 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1512 assert!(err.is_err());
1513
1514 let after = runtime_snapshot();
1515 assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1516 assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1517 assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1518
1519 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1520 assert_eq!(row[0].to_string(), "chars(first)");
1521
1522 let _ = fs::remove_dir_all(&root);
1523 }
1524}