1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5 collections::hash_map::DefaultHasher,
6 hash::{Hash, Hasher},
7};
8
9use async_trait::async_trait;
10use rusqlite::ToSql;
11use rusqlite::{Connection, OpenFlags};
12use wp_error::KnowledgeResult;
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25 CachePolicy, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor, QueryRequest,
26 QueryResponse, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29 CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30 telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34 db: MemDB,
35 metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41 ThreadClonedMDB::query_with_scope(self, sql)
42 }
43
44 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45 ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46 }
47
48 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49 ThreadClonedMDB::query_row_with_scope(self, sql)
50 }
51
52 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53 ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54 }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60 self.db.query_with_scope(&self.metadata_scope, sql)
61 }
62
63 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64 self.db
65 .query_fields_with_scope(&self.metadata_scope, sql, params)
66 }
67
68 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69 self.db.query_row_with_scope(&self.metadata_scope, sql)
70 }
71
72 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73 self.db
74 .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75 }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81 PostgresProvider::query(self, sql)
82 }
83
84 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85 PostgresProvider::query_fields(self, sql, params)
86 }
87
88 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89 PostgresProvider::query_row(self, sql)
90 }
91
92 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93 PostgresProvider::query_named_fields(self, sql, params)
94 }
95
96 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97 PostgresProvider::query_async(self, sql).await
98 }
99
100 async fn query_fields_async(
101 &self,
102 sql: &str,
103 params: &[DataField],
104 ) -> KnowledgeResult<Vec<RowData>> {
105 PostgresProvider::query_fields_async(self, sql, params).await
106 }
107
108 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109 PostgresProvider::query_row_async(self, sql).await
110 }
111
112 async fn query_named_fields_async(
113 &self,
114 sql: &str,
115 params: &[DataField],
116 ) -> KnowledgeResult<RowData> {
117 PostgresProvider::query_named_fields_async(self, sql, params).await
118 }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124 MySqlProvider::query(self, sql)
125 }
126
127 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128 MySqlProvider::query_fields(self, sql, params)
129 }
130
131 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132 MySqlProvider::query_row(self, sql)
133 }
134
135 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136 MySqlProvider::query_named_fields(self, sql, params)
137 }
138
139 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140 MySqlProvider::query_async(self, sql).await
141 }
142
143 async fn query_fields_async(
144 &self,
145 sql: &str,
146 params: &[DataField],
147 ) -> KnowledgeResult<Vec<RowData>> {
148 MySqlProvider::query_fields_async(self, sql, params).await
149 }
150
151 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152 MySqlProvider::query_row_async(self, sql).await
153 }
154
155 async fn query_named_fields_async(
156 &self,
157 sql: &str,
158 params: &[DataField],
159 ) -> KnowledgeResult<RowData> {
160 MySqlProvider::query_named_fields_async(self, sql, params).await
161 }
162}
163
164fn install_provider<F>(
165 kind: ProviderKind,
166 datasource_id: DatasourceId,
167 build: F,
168) -> KnowledgeResult<()>
169where
170 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172 runtime().install_provider(kind, datasource_id, build)?;
173 Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177 DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181 let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182 install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183 Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184 authority_uri,
185 datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186 generation.0,
187 )))
188 })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192 install_provider(
193 ProviderKind::SqliteAuthority,
194 datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195 |generation| {
196 Ok(Arc::new(MemProvider {
197 db: memdb,
198 metadata_scope: MetadataCacheScope {
199 datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200 generation,
201 },
202 }))
203 },
204 )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208 let datasource_id = datasource_id_for(ProviderKind::Postgres, connection_uri);
209 install_provider(
210 ProviderKind::Postgres,
211 datasource_id.clone(),
212 |generation| {
213 let config = PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size);
214 let provider = PostgresProvider::connect(
215 &config,
216 MetadataCacheScope {
217 datasource_id: datasource_id.clone(),
218 generation,
219 },
220 )?;
221 Ok(Arc::new(provider))
222 },
223 )
224}
225
226pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
227 let datasource_id = datasource_id_for(ProviderKind::Mysql, connection_uri);
228 install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
229 let config = MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size);
230 let provider = MySqlProvider::connect(
231 &config,
232 MetadataCacheScope {
233 datasource_id: datasource_id.clone(),
234 generation,
235 },
236 )?;
237 Ok(Arc::new(provider))
238 })
239}
240
241pub fn current_generation() -> Option<u64> {
242 runtime()
243 .current_generation()
244 .map(|generation| generation.0)
245}
246
247pub fn runtime_snapshot() -> RuntimeSnapshot {
248 runtime().snapshot()
249}
250
251pub fn install_runtime_telemetry(
252 telemetry_impl: Arc<dyn KnowledgeTelemetry>,
253) -> Arc<dyn KnowledgeTelemetry> {
254 install_telemetry(telemetry_impl)
255}
256
257pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
258 runtime()
259 .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
260 .map(QueryResponse::into_rows)
261}
262
263pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
264 runtime()
265 .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
266 .await
267 .map(QueryResponse::into_rows)
268}
269
270pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
271 runtime()
272 .execute(&QueryRequest::first_row(
273 sql,
274 Vec::new(),
275 CachePolicy::Bypass,
276 ))
277 .map(QueryResponse::into_row)
278}
279
280pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
281 runtime()
282 .execute_async(&QueryRequest::first_row(
283 sql,
284 Vec::new(),
285 CachePolicy::Bypass,
286 ))
287 .await
288 .map(QueryResponse::into_row)
289}
290
291pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
292 runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
293}
294
295pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
296 runtime()
297 .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
298 .await
299}
300
301pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
302 query_fields(sql, params)
303}
304
305pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
306 query_fields_async(sql, params).await
307}
308
309pub fn query_named<'a>(
310 sql: &str,
311 params: &'a [(&'a str, &'a dyn ToSql)],
312) -> KnowledgeResult<RowData> {
313 let fields = named_params_to_fields(params)?;
314 query_fields(sql, &fields)
315}
316
317pub fn cache_query_fields<const N: usize>(
318 sql: &str,
319 c_params: &[DataField; N],
320 query_params: &[DataField],
321 cache: &mut impl CacheAble<DataField, RowData, N>,
322) -> RowData {
323 cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
324}
325
326pub fn cache_query_fields_with_scope<const N: usize>(
327 sql: &str,
328 local_cache_scope: u64,
329 c_params: &[DataField; N],
330 query_params: &[DataField],
331 cache: &mut impl CacheAble<DataField, RowData, N>,
332) -> RowData {
333 if let Some(generation) = current_generation() {
334 cache.prepare_generation(generation);
335 }
336 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
337 runtime().record_local_cache_hit();
338 if telemetry_enabled() {
339 telemetry().on_cache(&CacheTelemetryEvent {
340 layer: CacheLayer::Local,
341 outcome: CacheOutcome::Hit,
342 provider_kind: runtime().current_provider_kind(),
343 });
344 }
345 return hit.clone();
346 }
347 runtime().record_local_cache_miss();
348 if telemetry_enabled() {
349 telemetry().on_cache(&CacheTelemetryEvent {
350 layer: CacheLayer::Local,
351 outcome: CacheOutcome::Miss,
352 provider_kind: runtime().current_provider_kind(),
353 });
354 }
355
356 match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
357 Ok(row) => {
358 cache.save_scoped(local_cache_scope, c_params, row.clone());
359 row
360 }
361 Err(err) => {
362 warn_kdb!("[kdb] query error: {}", err);
363 Vec::new()
364 }
365 }
366}
367
368pub async fn cache_query_fields_async<const N: usize>(
369 sql: &str,
370 c_params: &[DataField; N],
371 query_params: &[DataField],
372 cache: &mut impl CacheAble<DataField, RowData, N>,
373) -> RowData {
374 cache_query_fields_async_with_scope(
375 sql,
376 stable_hash(sql),
377 c_params,
378 || query_params.to_vec(),
379 cache,
380 )
381 .await
382}
383
384pub async fn cache_query_fields_async_with<const N: usize, F>(
385 sql: &str,
386 c_params: &[DataField; N],
387 build_query_params: F,
388 cache: &mut impl CacheAble<DataField, RowData, N>,
389) -> RowData
390where
391 F: FnOnce() -> Vec<DataField>,
392{
393 cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
394 .await
395}
396
397pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
398 sql: &str,
399 local_cache_scope: u64,
400 c_params: &[DataField; N],
401 build_query_params: F,
402 cache: &mut impl CacheAble<DataField, RowData, N>,
403) -> RowData
404where
405 F: FnOnce() -> Vec<DataField>,
406{
407 if let Some(generation) = current_generation() {
408 cache.prepare_generation(generation);
409 }
410 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
411 runtime().record_local_cache_hit();
412 if telemetry_enabled() {
413 telemetry().on_cache(&CacheTelemetryEvent {
414 layer: CacheLayer::Local,
415 outcome: CacheOutcome::Hit,
416 provider_kind: runtime().current_provider_kind(),
417 });
418 }
419 return hit.clone();
420 }
421 runtime().record_local_cache_miss();
422 if telemetry_enabled() {
423 telemetry().on_cache(&CacheTelemetryEvent {
424 layer: CacheLayer::Local,
425 outcome: CacheOutcome::Miss,
426 provider_kind: runtime().current_provider_kind(),
427 });
428 }
429
430 let query_params = build_query_params();
431 match runtime()
432 .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
433 .await
434 {
435 Ok(row) => {
436 cache.save_scoped(local_cache_scope, c_params, row.clone());
437 row
438 }
439 Err(err) => {
440 warn_kdb!("[kdb] query error: {}", err);
441 Vec::new()
442 }
443 }
444}
445
446pub fn cache_query<const N: usize>(
447 sql: &str,
448 c_params: &[DataField; N],
449 named_params: &[(&str, &dyn ToSql)],
450 cache: &mut impl CacheAble<DataField, RowData, N>,
451) -> RowData {
452 let query_fields = match named_params_to_fields(named_params) {
453 Ok(fields) => fields,
454 Err(err) => {
455 warn_kdb!("[kdb] query param conversion error: {}", err);
456 return Vec::new();
457 }
458 };
459
460 cache_query_fields(sql, c_params, &query_fields, cache)
461}
462
463pub async fn cache_query_async<const N: usize>(
464 sql: &str,
465 c_params: &[DataField; N],
466 named_params: &[(&str, &dyn ToSql)],
467 cache: &mut impl CacheAble<DataField, RowData, N>,
468) -> RowData {
469 let query_fields = match named_params_to_fields(named_params) {
470 Ok(fields) => fields,
471 Err(err) => {
472 warn_kdb!("[kdb] query param conversion error: {}", err);
473 return Vec::new();
474 }
475 };
476
477 cache_query_fields_async(sql, c_params, &query_fields, cache).await
478}
479
480fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
481 if let Ok(conn) = Connection::open_with_flags(
482 authority_uri,
483 OpenFlags::SQLITE_OPEN_READ_WRITE
484 | OpenFlags::SQLITE_OPEN_CREATE
485 | OpenFlags::SQLITE_OPEN_URI,
486 ) {
487 let _ = conn.execute_batch(
488 "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
489 );
490 }
491 Ok(())
492}
493
494pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
495 ensure_wal(authority_uri)?;
496 let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
497 let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
498 init_mem_provider(mem)
499}
500
501pub fn init_thread_cloned_from_knowdb(
502 root: &Path,
503 knowdb_conf: &Path,
504 authority_uri: &str,
505 dict: &orion_variate::EnvDict,
506) -> KnowledgeResult<()> {
507 let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
508 if let Some(provider) = conf.provider {
509 match provider.kind {
510 ProviderKind::Postgres => {
511 info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
512 init_postgres_provider(provider.connection_uri.as_str(), provider.pool_size)?;
513 runtime().configure_result_cache(
514 conf.cache.enabled,
515 conf.cache.capacity,
516 Duration::from_millis(conf.cache.ttl_ms.max(1)),
517 );
518 return Ok(());
519 }
520 ProviderKind::Mysql => {
521 info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
522 init_mysql_provider(provider.connection_uri.as_str(), provider.pool_size)?;
523 runtime().configure_result_cache(
524 conf.cache.enabled,
525 conf.cache.capacity,
526 Duration::from_millis(conf.cache.ttl_ms.max(1)),
527 );
528 return Ok(());
529 }
530 ProviderKind::SqliteAuthority => {}
531 }
532 }
533
534 crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
535 let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
536 let path_part = rest.split('?').next().unwrap_or(rest);
537 format!("file:{}?mode=ro&uri=true", path_part)
538 } else {
539 authority_uri.to_string()
540 };
541
542 info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
543 init_thread_cloned_from_authority(&ro_uri)?;
544 runtime().configure_result_cache(
545 conf.cache.enabled,
546 conf.cache.capacity,
547 Duration::from_millis(conf.cache.ttl_ms.max(1)),
548 );
549 Ok(())
550}
551
552fn stable_hash(value: &str) -> u64 {
553 let mut hasher = DefaultHasher::new();
554 value.hash(&mut hasher);
555 hasher.finish()
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561 use crate::cache::FieldQueryCache;
562 use crate::mem::memdb::MemDB;
563 use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
564 use crate::runtime::fields_to_params;
565 use crate::telemetry::{
566 CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
567 ReloadTelemetryEvent, reset_telemetry,
568 };
569 use orion_error::{ToStructError, UvsFrom};
570 use orion_variate::EnvDict;
571 use std::fs;
572 use std::hint::black_box;
573 use std::path::PathBuf;
574 use std::sync::atomic::{AtomicU64, Ordering};
575 use std::time::{Duration, Instant};
576 use wp_error::KnowledgeReason;
577
578 #[derive(Default)]
579 struct TestTelemetry {
580 reload_success: AtomicU64,
581 reload_failure: AtomicU64,
582 local_hits: AtomicU64,
583 local_misses: AtomicU64,
584 result_hits: AtomicU64,
585 result_misses: AtomicU64,
586 metadata_hits: AtomicU64,
587 metadata_misses: AtomicU64,
588 query_success: AtomicU64,
589 query_failure: AtomicU64,
590 }
591
592 impl KnowledgeTelemetry for TestTelemetry {
593 fn on_cache(&self, event: &CacheTelemetryEvent) {
594 let counter = match (event.layer, event.outcome) {
595 (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
596 (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
597 (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
598 (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
599 (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
600 (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
601 };
602 counter.fetch_add(1, Ordering::Relaxed);
603 }
604
605 fn on_reload(&self, event: &ReloadTelemetryEvent) {
606 let counter = match event.outcome {
607 crate::telemetry::ReloadOutcome::Success => &self.reload_success,
608 crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
609 };
610 counter.fetch_add(1, Ordering::Relaxed);
611 }
612
613 fn on_query(&self, event: &QueryTelemetryEvent) {
614 let counter = if event.success {
615 &self.query_success
616 } else {
617 &self.query_failure
618 };
619 counter.fetch_add(1, Ordering::Relaxed);
620 }
621 }
622
623 fn perf_env_usize(key: &str, default: usize) -> usize {
624 std::env::var(key)
625 .ok()
626 .and_then(|value| value.parse::<usize>().ok())
627 .unwrap_or(default)
628 }
629
630 fn seed_perf_provider(rows: usize) {
631 let db = MemDB::instance();
632 db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
633 .expect("create perf_kv");
634 db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
635 for id in 1..=rows {
636 let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
637 db.execute(sql.as_str()).expect("insert perf_kv row");
638 }
639 db.execute("COMMIT").expect("commit perf_kv load");
640 init_mem_provider(db).expect("init perf provider");
641 }
642
643 #[tokio::test(flavor = "current_thread")]
644 async fn query_async_works_with_mem_provider() {
645 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
646 let db = MemDB::instance();
647 db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
648 .expect("create async_kv");
649 db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
650 .expect("insert async_kv row");
651 init_mem_provider(db).expect("init mem provider");
652
653 let row = query_fields_async(
654 "SELECT value FROM async_kv WHERE id=:id",
655 &[DataField::from_digit(":id", 1)],
656 )
657 .await
658 .expect("query async row");
659 assert_eq!(row.len(), 1);
660 assert_eq!(row[0].to_string(), "chars(hello)");
661 }
662
663 #[derive(Clone)]
664 struct PerfQuery {
665 cache_key: [DataField; 1],
666 query_params: [DataField; 1],
667 bypass_req: QueryRequest,
668 global_req: QueryRequest,
669 }
670
671 fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
672 (0..ops)
673 .map(|idx| {
674 let id = ((idx * 17) % hotset + 1) as i64;
675 let cache_key = [DataField::from_digit("id", id)];
676 let query_params = [DataField::from_digit(":id", id)];
677 let bypass_req = QueryRequest::first_row(
678 "SELECT value FROM perf_kv WHERE id=:id",
679 fields_to_params(&query_params),
680 CachePolicy::Bypass,
681 );
682 let global_req = QueryRequest::first_row(
683 "SELECT value FROM perf_kv WHERE id=:id",
684 fields_to_params(&query_params),
685 CachePolicy::UseGlobal,
686 );
687 PerfQuery {
688 cache_key,
689 query_params,
690 bypass_req,
691 global_req,
692 }
693 })
694 .collect()
695 }
696
697 #[derive(Debug, Clone, Copy)]
698 struct PerfCounters {
699 result_hits: u64,
700 result_misses: u64,
701 local_hits: u64,
702 local_misses: u64,
703 metadata_hits: u64,
704 metadata_misses: u64,
705 }
706
707 #[derive(Debug, Clone)]
708 struct PerfResult {
709 name: &'static str,
710 elapsed: Duration,
711 ops: usize,
712 counters: PerfCounters,
713 }
714
715 impl PerfResult {
716 fn qps(&self) -> f64 {
717 let secs = self.elapsed.as_secs_f64();
718 if secs == 0.0 {
719 self.ops as f64
720 } else {
721 self.ops as f64 / secs
722 }
723 }
724 }
725
726 fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
727 PerfCounters {
728 result_hits: after
729 .result_cache_hits
730 .saturating_sub(before.result_cache_hits),
731 result_misses: after
732 .result_cache_misses
733 .saturating_sub(before.result_cache_misses),
734 local_hits: after
735 .local_cache_hits
736 .saturating_sub(before.local_cache_hits),
737 local_misses: after
738 .local_cache_misses
739 .saturating_sub(before.local_cache_misses),
740 metadata_hits: after
741 .metadata_cache_hits
742 .saturating_sub(before.metadata_cache_hits),
743 metadata_misses: after
744 .metadata_cache_misses
745 .saturating_sub(before.metadata_cache_misses),
746 }
747 }
748
749 fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
750 seed_perf_provider(rows);
751 let before = runtime_snapshot();
752 let started = Instant::now();
753 for item in workload {
754 let row = runtime()
755 .execute(&item.bypass_req)
756 .expect("execute bypass request")
757 .into_row();
758 black_box(row);
759 }
760 let elapsed = started.elapsed();
761 let after = runtime_snapshot();
762 PerfResult {
763 name: "bypass",
764 elapsed,
765 ops: workload.len(),
766 counters: snapshot_delta(&before, &after),
767 }
768 }
769
770 fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
771 seed_perf_provider(rows);
772 let before = runtime_snapshot();
773 let started = Instant::now();
774 for item in workload {
775 let row = runtime()
776 .execute(&item.global_req)
777 .expect("execute global-cache request")
778 .into_row();
779 black_box(row);
780 }
781 let elapsed = started.elapsed();
782 let after = runtime_snapshot();
783 PerfResult {
784 name: "global_cache",
785 elapsed,
786 ops: workload.len(),
787 counters: snapshot_delta(&before, &after),
788 }
789 }
790
791 fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
792 seed_perf_provider(rows);
793 let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
794 let before = runtime_snapshot();
795 let started = Instant::now();
796 for item in workload {
797 let row = cache_query_fields(
798 "SELECT value FROM perf_kv WHERE id=:id",
799 &item.cache_key,
800 &item.query_params,
801 &mut cache,
802 );
803 black_box(row);
804 }
805 let elapsed = started.elapsed();
806 let after = runtime_snapshot();
807 PerfResult {
808 name: "local_cache",
809 elapsed,
810 ops: workload.len(),
811 counters: snapshot_delta(&before, &after),
812 }
813 }
814
815 fn print_perf_result(result: &PerfResult) {
816 eprintln!(
817 "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
818 result.name,
819 result.elapsed.as_millis(),
820 result.qps(),
821 result.counters.result_hits,
822 result.counters.result_misses,
823 result.counters.local_hits,
824 result.counters.local_misses,
825 result.counters.metadata_hits,
826 result.counters.metadata_misses,
827 );
828 }
829
830 fn uniq_cache_cfg_tmp_dir() -> PathBuf {
831 use rand::{Rng, rng};
832 let rnd: u64 = rng().next_u64();
833 std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
834 }
835
836 fn write_minimal_knowdb_with_cache(
837 root: &Path,
838 enabled: bool,
839 capacity: usize,
840 ttl_ms: u64,
841 ) -> std::path::PathBuf {
842 let models = root.join("models").join("knowledge");
843 let example_dir = models.join("example");
844 fs::create_dir_all(&example_dir).expect("create knowdb models/example");
845 fs::write(
846 models.join("knowdb.toml"),
847 format!(
848 r#"
849version = 2
850base_dir = "."
851
852[cache]
853enabled = {enabled}
854capacity = {capacity}
855ttl_ms = {ttl_ms}
856
857[csv]
858has_header = false
859
860[[tables]]
861name = "example"
862columns.by_index = [0,1]
863"#
864 ),
865 )
866 .expect("write knowdb.toml");
867 fs::write(
868 example_dir.join("create.sql"),
869 r#"
870CREATE TABLE IF NOT EXISTS {table} (
871 id INTEGER PRIMARY KEY,
872 value TEXT NOT NULL
873);
874"#,
875 )
876 .expect("write create.sql");
877 fs::write(
878 example_dir.join("insert.sql"),
879 "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
880 )
881 .expect("write insert.sql");
882 fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
883 models.join("knowdb.toml")
884 }
885
886 fn write_provider_only_knowdb_with_cache(
887 root: &Path,
888 provider_kind: &str,
889 connection_uri: &str,
890 enabled: bool,
891 capacity: usize,
892 ttl_ms: u64,
893 ) -> std::path::PathBuf {
894 let models = root.join("models").join("knowledge");
895 fs::create_dir_all(&models).expect("create knowdb models");
896 fs::write(
897 models.join("knowdb.toml"),
898 format!(
899 r#"
900version = 2
901base_dir = "."
902
903[cache]
904enabled = {enabled}
905capacity = {capacity}
906ttl_ms = {ttl_ms}
907
908[provider]
909kind = "{provider_kind}"
910connection_uri = "{connection_uri}"
911"#
912 ),
913 )
914 .expect("write provider knowdb.toml");
915 models.join("knowdb.toml")
916 }
917
918 fn restore_default_result_cache_config() {
919 runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
920 }
921
922 #[test]
923 fn provider_can_be_replaced() {
924 let _guard = crate::runtime::runtime_test_guard()
925 .lock()
926 .expect("provider test guard");
927 let db1 = MemDB::instance();
928 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
929 .expect("create table in db1");
930 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
931 .expect("seed db1");
932 init_mem_provider(db1).expect("init provider db1");
933 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
934 assert_eq!(row[0].to_string(), "chars(first)");
935
936 let db2 = MemDB::instance();
937 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
938 .expect("create table in db2");
939 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
940 .expect("seed db2");
941 init_mem_provider(db2).expect("replace provider with db2");
942 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
943 assert_eq!(row[0].to_string(), "chars(second)");
944 }
945
946 #[test]
947 fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
948 let _guard = crate::runtime::runtime_test_guard()
949 .lock()
950 .expect("provider test guard");
951 COLNAME_CACHE.write().expect("metadata cache lock").clear();
952
953 let db = MemDB::instance();
954 db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
955 .expect("create table");
956 db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
957 .expect("seed table");
958
959 let old_scope = MetadataCacheScope {
960 datasource_id: DatasourceId("sqlite:old".to_string()),
961 generation: Generation(1),
962 };
963 let new_scope = MetadataCacheScope {
964 datasource_id: DatasourceId("sqlite:new".to_string()),
965 generation: Generation(2),
966 };
967 let old_provider = MemProvider {
968 db: db.clone(),
969 metadata_scope: old_scope.clone(),
970 };
971
972 install_provider(
973 ProviderKind::SqliteAuthority,
974 new_scope.datasource_id.clone(),
975 |_generation| {
976 Ok(Arc::new(MemProvider {
977 db: db.clone(),
978 metadata_scope: new_scope.clone(),
979 }))
980 },
981 )
982 .expect("install new provider");
983
984 let row = old_provider
985 .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
986 .expect("old provider query");
987 assert_eq!(row[0].to_string(), "chars(scope-old)");
988
989 let cache = COLNAME_CACHE.read().expect("metadata cache lock");
990 assert!(cache.contains(&metadata_cache_key_for_scope(
991 &old_scope,
992 "SELECT value FROM cache_scope_t WHERE id = 1",
993 )));
994 assert!(!cache.contains(&metadata_cache_key_for_scope(
995 &new_scope,
996 "SELECT value FROM cache_scope_t WHERE id = 1",
997 )));
998 }
999
1000 #[tokio::test(flavor = "current_thread")]
1001 async fn async_query_uses_runtime_bridge() {
1002 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1003 let db = MemDB::instance();
1004 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1005 .expect("create table");
1006 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1007 .expect("seed table");
1008 init_mem_provider(db).expect("init provider");
1009
1010 let row = query_row_async("SELECT value FROM t WHERE id = 1")
1011 .await
1012 .expect("async query row");
1013 assert_eq!(row[0].to_string(), "chars(async-first)");
1014 }
1015
1016 #[tokio::test(flavor = "current_thread")]
1017 async fn async_cache_query_fields_hits_local_cache() {
1018 let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1019 let db = MemDB::instance();
1020 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1021 .expect("create table");
1022 db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1023 .expect("seed table");
1024 init_mem_provider(db).expect("init provider");
1025
1026 let key = [DataField::from_digit("id", 1)];
1027 let params = [DataField::from_digit(":id", 1)];
1028 let mut cache = FieldQueryCache::default();
1029
1030 let first = cache_query_fields_async(
1031 "SELECT value FROM t WHERE id=:id",
1032 &key,
1033 ¶ms,
1034 &mut cache,
1035 )
1036 .await;
1037 let second = cache_query_fields_async(
1038 "SELECT value FROM t WHERE id=:id",
1039 &key,
1040 ¶ms,
1041 &mut cache,
1042 )
1043 .await;
1044
1045 assert_eq!(first[0].to_string(), "chars(async-cache)");
1046 assert_eq!(second[0].to_string(), "chars(async-cache)");
1047 }
1048
1049 #[test]
1050 fn local_cache_is_cleared_when_generation_changes() {
1051 let _guard = crate::runtime::runtime_test_guard()
1052 .lock()
1053 .expect("provider test guard");
1054 let db1 = MemDB::instance();
1055 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1056 .expect("create table in db1");
1057 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1058 .expect("seed db1");
1059 init_mem_provider(db1).expect("init provider db1");
1060
1061 let key = [DataField::from_digit("id", 1)];
1062 let params = [DataField::from_digit(":id", 1)];
1063 let mut cache = FieldQueryCache::default();
1064 let row = cache_query_fields(
1065 "SELECT value FROM t WHERE id=:id",
1066 &key,
1067 ¶ms,
1068 &mut cache,
1069 );
1070 assert_eq!(row[0].to_string(), "chars(first)");
1071
1072 let db2 = MemDB::instance();
1073 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1074 .expect("create table in db2");
1075 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1076 .expect("seed db2");
1077 init_mem_provider(db2).expect("replace provider with db2");
1078
1079 let row = cache_query_fields(
1080 "SELECT value FROM t WHERE id=:id",
1081 &key,
1082 ¶ms,
1083 &mut cache,
1084 );
1085 assert_eq!(row[0].to_string(), "chars(second)");
1086 }
1087
1088 #[test]
1089 fn local_cache_is_scoped_by_sql_text() {
1090 let _guard = crate::runtime::runtime_test_guard()
1091 .lock()
1092 .expect("provider test guard");
1093 let db = MemDB::instance();
1094 db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1095 .expect("create t1");
1096 db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1097 .expect("create t2");
1098 db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1099 .expect("seed t1");
1100 db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1101 .expect("seed t2");
1102 init_mem_provider(db).expect("init provider");
1103
1104 let key = [DataField::from_digit("id", 1)];
1105 let params = [DataField::from_digit(":id", 1)];
1106 let mut cache = FieldQueryCache::default();
1107
1108 let row = cache_query_fields(
1109 "SELECT value FROM t1 WHERE id=:id",
1110 &key,
1111 ¶ms,
1112 &mut cache,
1113 );
1114 assert_eq!(row[0].to_string(), "chars(first)");
1115
1116 let row = cache_query_fields(
1117 "SELECT value FROM t2 WHERE id=:id",
1118 &key,
1119 ¶ms,
1120 &mut cache,
1121 );
1122 assert_eq!(row[0].to_string(), "chars(second)");
1123 }
1124
1125 #[test]
1126 fn runtime_snapshot_tracks_generation_and_cache_size() {
1127 let _guard = crate::runtime::runtime_test_guard()
1128 .lock()
1129 .expect("provider test guard");
1130 let db = MemDB::instance();
1131 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1132 .expect("create table");
1133 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1134 .expect("seed table");
1135 init_mem_provider(db).expect("init provider");
1136
1137 let mut cache = FieldQueryCache::default();
1138 let key = [DataField::from_digit("id", 1)];
1139 let params = [DataField::from_digit(":id", 1)];
1140 let row = cache_query_fields(
1141 "SELECT value FROM t WHERE id=:id",
1142 &key,
1143 ¶ms,
1144 &mut cache,
1145 );
1146 assert_eq!(row[0].to_string(), "chars(first)");
1147
1148 let snapshot = runtime_snapshot();
1149 assert!(matches!(
1150 snapshot.provider_kind,
1151 Some(ProviderKind::SqliteAuthority)
1152 ));
1153 assert!(snapshot.generation.is_some());
1154 assert!(snapshot.result_cache_len >= 1);
1155 assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1156 assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1157 assert!(snapshot.reload_successes >= 1);
1158 }
1159
1160 #[test]
1161 fn metadata_cache_is_scoped_by_generation() {
1162 let _guard = crate::runtime::runtime_test_guard()
1163 .lock()
1164 .expect("provider test guard");
1165 let sql = "SELECT value FROM t WHERE id = 1";
1166 let before = runtime_snapshot().metadata_cache_len;
1167
1168 let db1 = MemDB::instance();
1169 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1170 .expect("create table in db1");
1171 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1172 .expect("seed db1");
1173 init_mem_provider(db1).expect("init provider db1");
1174 let row = query_row(sql).expect("query db1");
1175 assert_eq!(row[0].to_string(), "chars(first)");
1176 let after_first = runtime_snapshot().metadata_cache_len;
1177 assert!(
1178 after_first > before,
1179 "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1180 );
1181
1182 let db2 = MemDB::instance();
1183 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1184 .expect("create table in db2");
1185 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1186 .expect("seed db2");
1187 init_mem_provider(db2).expect("replace provider with db2");
1188 let row = query_row(sql).expect("query db2");
1189 assert_eq!(row[0].to_string(), "chars(second)");
1190 let after_second = runtime_snapshot().metadata_cache_len;
1191 assert!(
1192 after_second > after_first,
1193 "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1194 );
1195 }
1196
1197 #[test]
1198 fn failed_provider_reload_keeps_previous_provider() {
1199 let _guard = crate::runtime::runtime_test_guard()
1200 .lock()
1201 .expect("provider test guard");
1202 let db1 = MemDB::instance();
1203 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1204 .expect("create table in db1");
1205 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1206 .expect("seed db1");
1207 init_mem_provider(db1).expect("init provider db1");
1208 let before_generation = current_generation();
1209
1210 let reload_err = install_provider(
1211 ProviderKind::SqliteAuthority,
1212 datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1213 |_generation| {
1214 Err(KnowledgeReason::from_logic()
1215 .to_err()
1216 .with_detail("expected reload failure"))
1217 },
1218 );
1219 assert!(reload_err.is_err());
1220
1221 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1222 assert_eq!(row[0].to_string(), "chars(first)");
1223 assert_eq!(current_generation(), before_generation);
1224 }
1225
1226 #[test]
1227 fn runtime_snapshot_records_cache_counters() {
1228 let _guard = crate::runtime::runtime_test_guard()
1229 .lock()
1230 .expect("provider test guard");
1231 let db = MemDB::instance();
1232 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1233 .expect("create table");
1234 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1235 .expect("seed table");
1236 init_mem_provider(db).expect("init provider");
1237
1238 let before = runtime_snapshot();
1239 let mut cache = FieldQueryCache::default();
1240 let key = [DataField::from_digit("id", 1)];
1241 let params = [DataField::from_digit(":id", 1)];
1242 let row = cache_query_fields(
1243 "SELECT value FROM t WHERE id=:id",
1244 &key,
1245 ¶ms,
1246 &mut cache,
1247 );
1248 assert_eq!(row[0].to_string(), "chars(first)");
1249 let row = cache_query_fields(
1250 "SELECT value FROM t WHERE id=:id",
1251 &key,
1252 ¶ms,
1253 &mut cache,
1254 );
1255 assert_eq!(row[0].to_string(), "chars(first)");
1256
1257 let after = runtime_snapshot();
1258 assert!(after.local_cache_hits > before.local_cache_hits);
1259 assert!(after.local_cache_misses > before.local_cache_misses);
1260 assert!(after.result_cache_misses > before.result_cache_misses);
1261 assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1262 }
1263
1264 #[test]
1265 fn telemetry_receives_reload_cache_and_query_events() {
1266 let _guard = crate::runtime::runtime_test_guard()
1267 .lock()
1268 .expect("provider test guard");
1269 let telemetry_impl = Arc::new(TestTelemetry::default());
1270 let previous = install_runtime_telemetry(telemetry_impl.clone());
1271
1272 let db = MemDB::instance();
1273 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1274 .expect("create table");
1275 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1276 .expect("seed table");
1277 init_mem_provider(db).expect("init provider");
1278
1279 let mut cache = FieldQueryCache::default();
1280 let key = [DataField::from_digit("id", 1)];
1281 let params = [DataField::from_digit(":id", 1)];
1282 let row = cache_query_fields(
1283 "SELECT value FROM t WHERE id=:id",
1284 &key,
1285 ¶ms,
1286 &mut cache,
1287 );
1288 assert_eq!(row[0].to_string(), "chars(first)");
1289 let row = cache_query_fields(
1290 "SELECT value FROM t WHERE id=:id",
1291 &key,
1292 ¶ms,
1293 &mut cache,
1294 );
1295 assert_eq!(row[0].to_string(), "chars(first)");
1296
1297 let reload_err = install_provider(
1298 ProviderKind::SqliteAuthority,
1299 datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1300 |_generation| {
1301 Err(KnowledgeReason::from_logic()
1302 .to_err()
1303 .with_detail("expected telemetry reload failure"))
1304 },
1305 );
1306 assert!(reload_err.is_err());
1307
1308 install_runtime_telemetry(previous);
1309 reset_telemetry();
1310
1311 assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1312 assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1313 assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1314 assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1315 assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1316 assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1317 assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1318 }
1319
1320 #[test]
1321 #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1322 fn cache_perf_reports_cache_vs_no_cache() {
1323 let _guard = crate::runtime::runtime_test_guard()
1324 .lock()
1325 .expect("provider test guard");
1326 let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1327 let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1328 let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1329 let workload = build_perf_workload(ops, hotset);
1330
1331 eprintln!(
1332 "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1333 rows, ops, hotset
1334 );
1335
1336 let bypass = run_bypass_perf(&workload, rows);
1337 let global = run_global_cache_perf(&workload, rows);
1338 let local = run_local_cache_perf(&workload, rows);
1339
1340 print_perf_result(&bypass);
1341 print_perf_result(&global);
1342 print_perf_result(&local);
1343
1344 eprintln!(
1345 "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1346 bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1347 bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1348 );
1349
1350 assert_eq!(bypass.counters.result_hits, 0);
1351 assert_eq!(bypass.counters.result_misses, 0);
1352 assert_eq!(bypass.counters.local_hits, 0);
1353 assert_eq!(bypass.counters.local_misses, 0);
1354 assert!(global.counters.result_hits > 0);
1355 assert!(global.counters.result_misses > 0);
1356 assert_eq!(global.counters.local_hits, 0);
1357 assert_eq!(global.counters.local_misses, 0);
1358 assert!(local.counters.local_hits > 0);
1359 assert!(local.counters.local_misses > 0);
1360 assert!(local.counters.result_misses > 0);
1361 }
1362
1363 #[test]
1364 fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1365 let _guard = crate::runtime::runtime_test_guard()
1366 .lock()
1367 .expect("provider test guard");
1368 let root = uniq_cache_cfg_tmp_dir();
1369 let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1370 let auth_file = root.join(".run").join("authority.sqlite");
1371 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1372 .expect("create authority parent");
1373 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1374
1375 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1376 .expect("init knowdb with cache config");
1377
1378 let snapshot = runtime_snapshot();
1379 assert!(snapshot.result_cache_enabled);
1380 assert_eq!(snapshot.result_cache_capacity, 7);
1381 assert_eq!(snapshot.result_cache_ttl_ms, 5);
1382
1383 let req = QueryRequest::first_row(
1384 "SELECT value FROM example WHERE id=:id",
1385 fields_to_params(&[DataField::from_digit(":id", 1)]),
1386 CachePolicy::UseGlobal,
1387 );
1388 let before = runtime_snapshot();
1389 let row = runtime()
1390 .execute(&req)
1391 .expect("first result-cache query")
1392 .into_row();
1393 assert_eq!(row[0].to_string(), "chars(alpha)");
1394 let row = runtime()
1395 .execute(&req)
1396 .expect("second result-cache query")
1397 .into_row();
1398 assert_eq!(row[0].to_string(), "chars(alpha)");
1399 std::thread::sleep(Duration::from_millis(12));
1400 let row = runtime()
1401 .execute(&req)
1402 .expect("expired result-cache query")
1403 .into_row();
1404 assert_eq!(row[0].to_string(), "chars(alpha)");
1405 let after = runtime_snapshot();
1406
1407 assert!(after.result_cache_hits > before.result_cache_hits);
1408 assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1409
1410 restore_default_result_cache_config();
1411 let _ = fs::remove_dir_all(&root);
1412 }
1413
1414 #[test]
1415 fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1416 let _guard = crate::runtime::runtime_test_guard()
1417 .lock()
1418 .expect("provider test guard");
1419 let root = uniq_cache_cfg_tmp_dir();
1420 let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1421 let auth_file = root.join(".run").join("authority.sqlite");
1422 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1423 .expect("create authority parent");
1424 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1425
1426 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1427 .expect("init knowdb with cache disabled");
1428
1429 let snapshot = runtime_snapshot();
1430 assert!(!snapshot.result_cache_enabled);
1431 assert_eq!(snapshot.result_cache_capacity, 3);
1432 assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1433
1434 let req = QueryRequest::first_row(
1435 "SELECT value FROM example WHERE id=:id",
1436 fields_to_params(&[DataField::from_digit(":id", 1)]),
1437 CachePolicy::UseGlobal,
1438 );
1439 let before = runtime_snapshot();
1440 let _ = runtime()
1441 .execute(&req)
1442 .expect("first bypassed result-cache query");
1443 let _ = runtime()
1444 .execute(&req)
1445 .expect("second bypassed result-cache query");
1446 let after = runtime_snapshot();
1447
1448 assert_eq!(after.result_cache_hits, before.result_cache_hits);
1449 assert_eq!(after.result_cache_misses, before.result_cache_misses);
1450
1451 restore_default_result_cache_config();
1452 let _ = fs::remove_dir_all(&root);
1453 }
1454
1455 #[test]
1456 fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1457 let _guard = crate::runtime::runtime_test_guard()
1458 .lock()
1459 .expect("provider test guard");
1460 restore_default_result_cache_config();
1461
1462 let db = MemDB::instance();
1463 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1464 .expect("create table");
1465 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1466 .expect("seed table");
1467 init_mem_provider(db).expect("init provider");
1468
1469 let before = runtime_snapshot();
1470 let root = uniq_cache_cfg_tmp_dir();
1471 let conf_path = write_provider_only_knowdb_with_cache(
1472 &root,
1473 "mysql",
1474 "not-a-valid-mysql-url",
1475 false,
1476 3,
1477 5,
1478 );
1479 let authority_uri = format!(
1480 "file:{}?mode=rwc&uri=true",
1481 root.join("unused.sqlite").display()
1482 );
1483
1484 let err =
1485 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1486 assert!(err.is_err());
1487
1488 let after = runtime_snapshot();
1489 assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1490 assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1491 assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1492
1493 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1494 assert_eq!(row[0].to_string(), "chars(first)");
1495
1496 let _ = fs::remove_dir_all(&root);
1497 }
1498}