1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5 collections::hash_map::DefaultHasher,
6 hash::{Hash, Hasher},
7};
8
9use rusqlite::ToSql;
10use rusqlite::{Connection, OpenFlags};
11use wp_error::KnowledgeResult;
12use wp_log::{info_ctrl, warn_kdb};
13use wp_model_core::model::DataField;
14
15use crate::cache::CacheAble;
16use crate::loader::{ProviderKind, parse_knowdb_conf};
17use crate::mem::RowData;
18use crate::mem::memdb::MemDB;
19use crate::mem::thread_clone::ThreadClonedMDB;
20use crate::mysql::{MySqlProvider, MySqlProviderConfig};
21use crate::param::named_params_to_fields;
22use crate::postgres::{PostgresProvider, PostgresProviderConfig};
23use crate::runtime::{
24 CachePolicy, DatasourceId, Generation, ProviderExecutor, QueryRequest, QueryResponse,
25 RuntimeSnapshot, fields_to_params, runtime,
26};
27use crate::telemetry::{
28 CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry, telemetry,
29};
30
31struct MemProvider(MemDB);
32
33impl ProviderExecutor for ThreadClonedMDB {
34 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
35 crate::DBQuery::query(self, sql)
36 }
37
38 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
39 ThreadClonedMDB::query_fields(self, sql, params)
40 }
41
42 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
43 crate::DBQuery::query_row(self, sql)
44 }
45
46 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
47 ThreadClonedMDB::query_named_fields(self, sql, params)
48 }
49}
50
51impl ProviderExecutor for MemProvider {
52 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
53 crate::DBQuery::query(&self.0, sql)
54 }
55
56 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
57 self.0.query_fields(sql, params)
58 }
59
60 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
61 crate::DBQuery::query_row(&self.0, sql)
62 }
63
64 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
65 self.0.query_named_fields(sql, params)
66 }
67}
68
69impl ProviderExecutor for PostgresProvider {
70 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
71 PostgresProvider::query(self, sql)
72 }
73
74 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
75 PostgresProvider::query_fields(self, sql, params)
76 }
77
78 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
79 PostgresProvider::query_row(self, sql)
80 }
81
82 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
83 PostgresProvider::query_named_fields(self, sql, params)
84 }
85}
86
87impl ProviderExecutor for MySqlProvider {
88 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
89 MySqlProvider::query(self, sql)
90 }
91
92 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
93 MySqlProvider::query_fields(self, sql, params)
94 }
95
96 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
97 MySqlProvider::query_row(self, sql)
98 }
99
100 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
101 MySqlProvider::query_named_fields(self, sql, params)
102 }
103}
104
105fn install_provider<F>(
106 kind: ProviderKind,
107 datasource_id: DatasourceId,
108 build: F,
109) -> KnowledgeResult<()>
110where
111 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
112{
113 runtime().install_provider(kind, datasource_id, build)?;
114 Ok(())
115}
116
117fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
118 DatasourceId::from_seed(kind, seed)
119}
120
121pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
122 let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
123 install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
124 Ok(Arc::new(ThreadClonedMDB::from_authority_with_generation(
125 authority_uri,
126 generation.0,
127 )))
128 })
129}
130
131pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
132 install_provider(
133 ProviderKind::SqliteAuthority,
134 datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
135 |_generation| Ok(Arc::new(MemProvider(memdb))),
136 )
137}
138
139pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
140 let datasource_id = datasource_id_for(ProviderKind::Postgres, connection_uri);
141 install_provider(ProviderKind::Postgres, datasource_id, |_generation| {
142 let config = PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size);
143 let provider = PostgresProvider::connect(&config)?;
144 Ok(Arc::new(provider))
145 })
146}
147
148pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
149 let datasource_id = datasource_id_for(ProviderKind::Mysql, connection_uri);
150 install_provider(ProviderKind::Mysql, datasource_id, |_generation| {
151 let config = MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size);
152 let provider = MySqlProvider::connect(&config)?;
153 Ok(Arc::new(provider))
154 })
155}
156
157pub fn current_generation() -> Option<u64> {
158 runtime()
159 .current_generation()
160 .map(|generation| generation.0)
161}
162
163pub fn runtime_snapshot() -> RuntimeSnapshot {
164 runtime().snapshot()
165}
166
167pub fn install_runtime_telemetry(
168 telemetry_impl: Arc<dyn KnowledgeTelemetry>,
169) -> Arc<dyn KnowledgeTelemetry> {
170 install_telemetry(telemetry_impl)
171}
172
173pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
174 runtime()
175 .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
176 .map(QueryResponse::into_rows)
177}
178
179pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
180 runtime()
181 .execute(&QueryRequest::first_row(
182 sql,
183 Vec::new(),
184 CachePolicy::Bypass,
185 ))
186 .map(QueryResponse::into_row)
187}
188
189pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
190 runtime()
191 .execute(&QueryRequest::first_row(
192 sql,
193 fields_to_params(params),
194 CachePolicy::Bypass,
195 ))
196 .map(QueryResponse::into_row)
197}
198
199pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
200 query_fields(sql, params)
201}
202
203pub fn query_named<'a>(
204 sql: &str,
205 params: &'a [(&'a str, &'a dyn ToSql)],
206) -> KnowledgeResult<RowData> {
207 let fields = named_params_to_fields(params)?;
208 query_fields(sql, &fields)
209}
210
211pub fn cache_query_fields<const N: usize>(
212 sql: &str,
213 c_params: &[DataField; N],
214 query_params: &[DataField],
215 cache: &mut impl CacheAble<DataField, RowData, N>,
216) -> RowData {
217 let local_cache_scope = stable_hash(sql);
218 if let Some(generation) = current_generation() {
219 cache.prepare_generation(generation);
220 }
221 if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
222 runtime().record_local_cache_hit();
223 telemetry().on_cache(&CacheTelemetryEvent {
224 layer: CacheLayer::Local,
225 outcome: CacheOutcome::Hit,
226 provider_kind: runtime().current_provider_kind(),
227 });
228 return hit.clone();
229 }
230 runtime().record_local_cache_miss();
231 telemetry().on_cache(&CacheTelemetryEvent {
232 layer: CacheLayer::Local,
233 outcome: CacheOutcome::Miss,
234 provider_kind: runtime().current_provider_kind(),
235 });
236
237 let req = QueryRequest::first_row(sql, fields_to_params(query_params), CachePolicy::UseGlobal);
238 match runtime().execute(&req) {
239 Ok(out) => {
240 let row = out.into_row();
241 cache.save_scoped(local_cache_scope, c_params, row.clone());
242 row
243 }
244 Err(err) => {
245 warn_kdb!("[kdb] query error: {}", err);
246 Vec::new()
247 }
248 }
249}
250
251pub fn cache_query<const N: usize>(
252 sql: &str,
253 c_params: &[DataField; N],
254 named_params: &[(&str, &dyn ToSql)],
255 cache: &mut impl CacheAble<DataField, RowData, N>,
256) -> RowData {
257 let query_fields = match named_params_to_fields(named_params) {
258 Ok(fields) => fields,
259 Err(err) => {
260 warn_kdb!("[kdb] query param conversion error: {}", err);
261 return Vec::new();
262 }
263 };
264
265 cache_query_fields(sql, c_params, &query_fields, cache)
266}
267
268fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
269 if let Ok(conn) = Connection::open_with_flags(
270 authority_uri,
271 OpenFlags::SQLITE_OPEN_READ_WRITE
272 | OpenFlags::SQLITE_OPEN_CREATE
273 | OpenFlags::SQLITE_OPEN_URI,
274 ) {
275 let _ = conn.execute_batch(
276 "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
277 );
278 }
279 Ok(())
280}
281
282pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
283 ensure_wal(authority_uri)?;
284 let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
285 let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
286 init_mem_provider(mem)
287}
288
289pub fn init_thread_cloned_from_knowdb(
290 root: &Path,
291 knowdb_conf: &Path,
292 authority_uri: &str,
293 dict: &orion_variate::EnvDict,
294) -> KnowledgeResult<()> {
295 let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
296 if let Some(provider) = conf.provider {
297 match provider.kind {
298 ProviderKind::Postgres => {
299 info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
300 init_postgres_provider(provider.connection_uri.as_str(), provider.pool_size)?;
301 runtime().configure_result_cache(
302 conf.cache.enabled,
303 conf.cache.capacity,
304 Duration::from_millis(conf.cache.ttl_ms.max(1)),
305 );
306 return Ok(());
307 }
308 ProviderKind::Mysql => {
309 info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
310 init_mysql_provider(provider.connection_uri.as_str(), provider.pool_size)?;
311 runtime().configure_result_cache(
312 conf.cache.enabled,
313 conf.cache.capacity,
314 Duration::from_millis(conf.cache.ttl_ms.max(1)),
315 );
316 return Ok(());
317 }
318 ProviderKind::SqliteAuthority => {}
319 }
320 }
321
322 crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
323 let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
324 let path_part = rest.split('?').next().unwrap_or(rest);
325 format!("file:{}?mode=ro&uri=true", path_part)
326 } else {
327 authority_uri.to_string()
328 };
329
330 info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
331 init_thread_cloned_from_authority(&ro_uri)?;
332 runtime().configure_result_cache(
333 conf.cache.enabled,
334 conf.cache.capacity,
335 Duration::from_millis(conf.cache.ttl_ms.max(1)),
336 );
337 Ok(())
338}
339
340fn stable_hash(value: &str) -> u64 {
341 let mut hasher = DefaultHasher::new();
342 value.hash(&mut hasher);
343 hasher.finish()
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::cache::FieldQueryCache;
350 use crate::mem::memdb::MemDB;
351 use crate::telemetry::{
352 CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
353 ReloadTelemetryEvent, reset_telemetry,
354 };
355 use orion_error::{ToStructError, UvsFrom};
356 use orion_variate::EnvDict;
357 use std::fs;
358 use std::hint::black_box;
359 use std::path::PathBuf;
360 use std::sync::atomic::{AtomicU64, Ordering};
361 use std::time::{Duration, Instant};
362 use wp_error::KnowledgeReason;
363
364 #[derive(Default)]
365 struct TestTelemetry {
366 reload_success: AtomicU64,
367 reload_failure: AtomicU64,
368 local_hits: AtomicU64,
369 local_misses: AtomicU64,
370 result_hits: AtomicU64,
371 result_misses: AtomicU64,
372 metadata_hits: AtomicU64,
373 metadata_misses: AtomicU64,
374 query_success: AtomicU64,
375 query_failure: AtomicU64,
376 }
377
378 impl KnowledgeTelemetry for TestTelemetry {
379 fn on_cache(&self, event: &CacheTelemetryEvent) {
380 let counter = match (event.layer, event.outcome) {
381 (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
382 (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
383 (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
384 (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
385 (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
386 (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
387 };
388 counter.fetch_add(1, Ordering::Relaxed);
389 }
390
391 fn on_reload(&self, event: &ReloadTelemetryEvent) {
392 let counter = match event.outcome {
393 crate::telemetry::ReloadOutcome::Success => &self.reload_success,
394 crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
395 };
396 counter.fetch_add(1, Ordering::Relaxed);
397 }
398
399 fn on_query(&self, event: &QueryTelemetryEvent) {
400 let counter = if event.success {
401 &self.query_success
402 } else {
403 &self.query_failure
404 };
405 counter.fetch_add(1, Ordering::Relaxed);
406 }
407 }
408
409 fn perf_env_usize(key: &str, default: usize) -> usize {
410 std::env::var(key)
411 .ok()
412 .and_then(|value| value.parse::<usize>().ok())
413 .unwrap_or(default)
414 }
415
416 fn seed_perf_provider(rows: usize) {
417 let db = MemDB::instance();
418 db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
419 .expect("create perf_kv");
420 db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
421 for id in 1..=rows {
422 let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
423 db.execute(sql.as_str()).expect("insert perf_kv row");
424 }
425 db.execute("COMMIT").expect("commit perf_kv load");
426 init_mem_provider(db).expect("init perf provider");
427 }
428
429 #[derive(Clone)]
430 struct PerfQuery {
431 cache_key: [DataField; 1],
432 query_params: [DataField; 1],
433 bypass_req: QueryRequest,
434 global_req: QueryRequest,
435 }
436
437 fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
438 (0..ops)
439 .map(|idx| {
440 let id = ((idx * 17) % hotset + 1) as i64;
441 let cache_key = [DataField::from_digit("id", id)];
442 let query_params = [DataField::from_digit(":id", id)];
443 let bypass_req = QueryRequest::first_row(
444 "SELECT value FROM perf_kv WHERE id=:id",
445 fields_to_params(&query_params),
446 CachePolicy::Bypass,
447 );
448 let global_req = QueryRequest::first_row(
449 "SELECT value FROM perf_kv WHERE id=:id",
450 fields_to_params(&query_params),
451 CachePolicy::UseGlobal,
452 );
453 PerfQuery {
454 cache_key,
455 query_params,
456 bypass_req,
457 global_req,
458 }
459 })
460 .collect()
461 }
462
463 #[derive(Debug, Clone, Copy)]
464 struct PerfCounters {
465 result_hits: u64,
466 result_misses: u64,
467 local_hits: u64,
468 local_misses: u64,
469 metadata_hits: u64,
470 metadata_misses: u64,
471 }
472
473 #[derive(Debug, Clone)]
474 struct PerfResult {
475 name: &'static str,
476 elapsed: Duration,
477 ops: usize,
478 counters: PerfCounters,
479 }
480
481 impl PerfResult {
482 fn qps(&self) -> f64 {
483 let secs = self.elapsed.as_secs_f64();
484 if secs == 0.0 {
485 self.ops as f64
486 } else {
487 self.ops as f64 / secs
488 }
489 }
490 }
491
492 fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
493 PerfCounters {
494 result_hits: after
495 .result_cache_hits
496 .saturating_sub(before.result_cache_hits),
497 result_misses: after
498 .result_cache_misses
499 .saturating_sub(before.result_cache_misses),
500 local_hits: after
501 .local_cache_hits
502 .saturating_sub(before.local_cache_hits),
503 local_misses: after
504 .local_cache_misses
505 .saturating_sub(before.local_cache_misses),
506 metadata_hits: after
507 .metadata_cache_hits
508 .saturating_sub(before.metadata_cache_hits),
509 metadata_misses: after
510 .metadata_cache_misses
511 .saturating_sub(before.metadata_cache_misses),
512 }
513 }
514
515 fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
516 seed_perf_provider(rows);
517 let before = runtime_snapshot();
518 let started = Instant::now();
519 for item in workload {
520 let row = runtime()
521 .execute(&item.bypass_req)
522 .expect("execute bypass request")
523 .into_row();
524 black_box(row);
525 }
526 let elapsed = started.elapsed();
527 let after = runtime_snapshot();
528 PerfResult {
529 name: "bypass",
530 elapsed,
531 ops: workload.len(),
532 counters: snapshot_delta(&before, &after),
533 }
534 }
535
536 fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
537 seed_perf_provider(rows);
538 let before = runtime_snapshot();
539 let started = Instant::now();
540 for item in workload {
541 let row = runtime()
542 .execute(&item.global_req)
543 .expect("execute global-cache request")
544 .into_row();
545 black_box(row);
546 }
547 let elapsed = started.elapsed();
548 let after = runtime_snapshot();
549 PerfResult {
550 name: "global_cache",
551 elapsed,
552 ops: workload.len(),
553 counters: snapshot_delta(&before, &after),
554 }
555 }
556
557 fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
558 seed_perf_provider(rows);
559 let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
560 let before = runtime_snapshot();
561 let started = Instant::now();
562 for item in workload {
563 let row = cache_query_fields(
564 "SELECT value FROM perf_kv WHERE id=:id",
565 &item.cache_key,
566 &item.query_params,
567 &mut cache,
568 );
569 black_box(row);
570 }
571 let elapsed = started.elapsed();
572 let after = runtime_snapshot();
573 PerfResult {
574 name: "local_cache",
575 elapsed,
576 ops: workload.len(),
577 counters: snapshot_delta(&before, &after),
578 }
579 }
580
581 fn print_perf_result(result: &PerfResult) {
582 eprintln!(
583 "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
584 result.name,
585 result.elapsed.as_millis(),
586 result.qps(),
587 result.counters.result_hits,
588 result.counters.result_misses,
589 result.counters.local_hits,
590 result.counters.local_misses,
591 result.counters.metadata_hits,
592 result.counters.metadata_misses,
593 );
594 }
595
596 fn uniq_cache_cfg_tmp_dir() -> PathBuf {
597 use rand::{Rng, rng};
598 let rnd: u64 = rng().next_u64();
599 std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
600 }
601
602 fn write_minimal_knowdb_with_cache(
603 root: &Path,
604 enabled: bool,
605 capacity: usize,
606 ttl_ms: u64,
607 ) -> std::path::PathBuf {
608 let models = root.join("models").join("knowledge");
609 let example_dir = models.join("example");
610 fs::create_dir_all(&example_dir).expect("create knowdb models/example");
611 fs::write(
612 models.join("knowdb.toml"),
613 format!(
614 r#"
615version = 2
616base_dir = "."
617
618[cache]
619enabled = {enabled}
620capacity = {capacity}
621ttl_ms = {ttl_ms}
622
623[csv]
624has_header = false
625
626[[tables]]
627name = "example"
628columns.by_index = [0,1]
629"#
630 ),
631 )
632 .expect("write knowdb.toml");
633 fs::write(
634 example_dir.join("create.sql"),
635 r#"
636CREATE TABLE IF NOT EXISTS {table} (
637 id INTEGER PRIMARY KEY,
638 value TEXT NOT NULL
639);
640"#,
641 )
642 .expect("write create.sql");
643 fs::write(
644 example_dir.join("insert.sql"),
645 "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
646 )
647 .expect("write insert.sql");
648 fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
649 models.join("knowdb.toml")
650 }
651
652 fn write_provider_only_knowdb_with_cache(
653 root: &Path,
654 provider_kind: &str,
655 connection_uri: &str,
656 enabled: bool,
657 capacity: usize,
658 ttl_ms: u64,
659 ) -> std::path::PathBuf {
660 let models = root.join("models").join("knowledge");
661 fs::create_dir_all(&models).expect("create knowdb models");
662 fs::write(
663 models.join("knowdb.toml"),
664 format!(
665 r#"
666version = 2
667base_dir = "."
668
669[cache]
670enabled = {enabled}
671capacity = {capacity}
672ttl_ms = {ttl_ms}
673
674[provider]
675kind = "{provider_kind}"
676connection_uri = "{connection_uri}"
677"#
678 ),
679 )
680 .expect("write provider knowdb.toml");
681 models.join("knowdb.toml")
682 }
683
684 fn restore_default_result_cache_config() {
685 runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
686 }
687
688 #[test]
689 fn provider_can_be_replaced() {
690 let _guard = crate::runtime::runtime_test_guard()
691 .lock()
692 .expect("provider test guard");
693 let db1 = MemDB::instance();
694 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
695 .expect("create table in db1");
696 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
697 .expect("seed db1");
698 init_mem_provider(db1).expect("init provider db1");
699 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
700 assert_eq!(row[0].to_string(), "chars(first)");
701
702 let db2 = MemDB::instance();
703 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
704 .expect("create table in db2");
705 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
706 .expect("seed db2");
707 init_mem_provider(db2).expect("replace provider with db2");
708 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
709 assert_eq!(row[0].to_string(), "chars(second)");
710 }
711
712 #[test]
713 fn local_cache_is_cleared_when_generation_changes() {
714 let _guard = crate::runtime::runtime_test_guard()
715 .lock()
716 .expect("provider test guard");
717 let db1 = MemDB::instance();
718 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
719 .expect("create table in db1");
720 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
721 .expect("seed db1");
722 init_mem_provider(db1).expect("init provider db1");
723
724 let key = [DataField::from_digit("id", 1)];
725 let params = [DataField::from_digit(":id", 1)];
726 let mut cache = FieldQueryCache::default();
727 let row = cache_query_fields(
728 "SELECT value FROM t WHERE id=:id",
729 &key,
730 ¶ms,
731 &mut cache,
732 );
733 assert_eq!(row[0].to_string(), "chars(first)");
734
735 let db2 = MemDB::instance();
736 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
737 .expect("create table in db2");
738 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
739 .expect("seed db2");
740 init_mem_provider(db2).expect("replace provider with db2");
741
742 let row = cache_query_fields(
743 "SELECT value FROM t WHERE id=:id",
744 &key,
745 ¶ms,
746 &mut cache,
747 );
748 assert_eq!(row[0].to_string(), "chars(second)");
749 }
750
751 #[test]
752 fn local_cache_is_scoped_by_sql_text() {
753 let _guard = crate::runtime::runtime_test_guard()
754 .lock()
755 .expect("provider test guard");
756 let db = MemDB::instance();
757 db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
758 .expect("create t1");
759 db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
760 .expect("create t2");
761 db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
762 .expect("seed t1");
763 db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
764 .expect("seed t2");
765 init_mem_provider(db).expect("init provider");
766
767 let key = [DataField::from_digit("id", 1)];
768 let params = [DataField::from_digit(":id", 1)];
769 let mut cache = FieldQueryCache::default();
770
771 let row = cache_query_fields(
772 "SELECT value FROM t1 WHERE id=:id",
773 &key,
774 ¶ms,
775 &mut cache,
776 );
777 assert_eq!(row[0].to_string(), "chars(first)");
778
779 let row = cache_query_fields(
780 "SELECT value FROM t2 WHERE id=:id",
781 &key,
782 ¶ms,
783 &mut cache,
784 );
785 assert_eq!(row[0].to_string(), "chars(second)");
786 }
787
788 #[test]
789 fn runtime_snapshot_tracks_generation_and_cache_size() {
790 let _guard = crate::runtime::runtime_test_guard()
791 .lock()
792 .expect("provider test guard");
793 let db = MemDB::instance();
794 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
795 .expect("create table");
796 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
797 .expect("seed table");
798 init_mem_provider(db).expect("init provider");
799
800 let mut cache = FieldQueryCache::default();
801 let key = [DataField::from_digit("id", 1)];
802 let params = [DataField::from_digit(":id", 1)];
803 let row = cache_query_fields(
804 "SELECT value FROM t WHERE id=:id",
805 &key,
806 ¶ms,
807 &mut cache,
808 );
809 assert_eq!(row[0].to_string(), "chars(first)");
810
811 let snapshot = runtime_snapshot();
812 assert!(matches!(
813 snapshot.provider_kind,
814 Some(ProviderKind::SqliteAuthority)
815 ));
816 assert!(snapshot.generation.is_some());
817 assert!(snapshot.result_cache_len >= 1);
818 assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
819 assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
820 assert!(snapshot.reload_successes >= 1);
821 }
822
823 #[test]
824 fn metadata_cache_is_scoped_by_generation() {
825 let _guard = crate::runtime::runtime_test_guard()
826 .lock()
827 .expect("provider test guard");
828 let sql = "SELECT value FROM t WHERE id = 1";
829 let before = runtime_snapshot().metadata_cache_len;
830
831 let db1 = MemDB::instance();
832 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
833 .expect("create table in db1");
834 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
835 .expect("seed db1");
836 init_mem_provider(db1).expect("init provider db1");
837 let row = query_row(sql).expect("query db1");
838 assert_eq!(row[0].to_string(), "chars(first)");
839 let after_first = runtime_snapshot().metadata_cache_len;
840 assert!(
841 after_first > before,
842 "metadata cache did not record first generation entry: before={before} after_first={after_first}"
843 );
844
845 let db2 = MemDB::instance();
846 db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
847 .expect("create table in db2");
848 db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
849 .expect("seed db2");
850 init_mem_provider(db2).expect("replace provider with db2");
851 let row = query_row(sql).expect("query db2");
852 assert_eq!(row[0].to_string(), "chars(second)");
853 let after_second = runtime_snapshot().metadata_cache_len;
854 assert!(
855 after_second > after_first,
856 "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
857 );
858 }
859
860 #[test]
861 fn failed_provider_reload_keeps_previous_provider() {
862 let _guard = crate::runtime::runtime_test_guard()
863 .lock()
864 .expect("provider test guard");
865 let db1 = MemDB::instance();
866 db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
867 .expect("create table in db1");
868 db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
869 .expect("seed db1");
870 init_mem_provider(db1).expect("init provider db1");
871 let before_generation = current_generation();
872
873 let reload_err = install_provider(
874 ProviderKind::SqliteAuthority,
875 datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
876 |_generation| {
877 Err(KnowledgeReason::from_logic()
878 .to_err()
879 .with_detail("expected reload failure"))
880 },
881 );
882 assert!(reload_err.is_err());
883
884 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
885 assert_eq!(row[0].to_string(), "chars(first)");
886 assert_eq!(current_generation(), before_generation);
887 }
888
889 #[test]
890 fn runtime_snapshot_records_cache_counters() {
891 let _guard = crate::runtime::runtime_test_guard()
892 .lock()
893 .expect("provider test guard");
894 let db = MemDB::instance();
895 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
896 .expect("create table");
897 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
898 .expect("seed table");
899 init_mem_provider(db).expect("init provider");
900
901 let before = runtime_snapshot();
902 let mut cache = FieldQueryCache::default();
903 let key = [DataField::from_digit("id", 1)];
904 let params = [DataField::from_digit(":id", 1)];
905 let row = cache_query_fields(
906 "SELECT value FROM t WHERE id=:id",
907 &key,
908 ¶ms,
909 &mut cache,
910 );
911 assert_eq!(row[0].to_string(), "chars(first)");
912 let row = cache_query_fields(
913 "SELECT value FROM t WHERE id=:id",
914 &key,
915 ¶ms,
916 &mut cache,
917 );
918 assert_eq!(row[0].to_string(), "chars(first)");
919
920 let after = runtime_snapshot();
921 assert!(after.local_cache_hits > before.local_cache_hits);
922 assert!(after.local_cache_misses > before.local_cache_misses);
923 assert!(after.result_cache_misses > before.result_cache_misses);
924 assert!(after.metadata_cache_misses > before.metadata_cache_misses);
925 }
926
927 #[test]
928 fn telemetry_receives_reload_cache_and_query_events() {
929 let _guard = crate::runtime::runtime_test_guard()
930 .lock()
931 .expect("provider test guard");
932 let telemetry_impl = Arc::new(TestTelemetry::default());
933 let previous = install_runtime_telemetry(telemetry_impl.clone());
934
935 let db = MemDB::instance();
936 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
937 .expect("create table");
938 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
939 .expect("seed table");
940 init_mem_provider(db).expect("init provider");
941
942 let mut cache = FieldQueryCache::default();
943 let key = [DataField::from_digit("id", 1)];
944 let params = [DataField::from_digit(":id", 1)];
945 let row = cache_query_fields(
946 "SELECT value FROM t WHERE id=:id",
947 &key,
948 ¶ms,
949 &mut cache,
950 );
951 assert_eq!(row[0].to_string(), "chars(first)");
952 let row = cache_query_fields(
953 "SELECT value FROM t WHERE id=:id",
954 &key,
955 ¶ms,
956 &mut cache,
957 );
958 assert_eq!(row[0].to_string(), "chars(first)");
959
960 let reload_err = install_provider(
961 ProviderKind::SqliteAuthority,
962 datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
963 |_generation| {
964 Err(KnowledgeReason::from_logic()
965 .to_err()
966 .with_detail("expected telemetry reload failure"))
967 },
968 );
969 assert!(reload_err.is_err());
970
971 install_runtime_telemetry(previous);
972 reset_telemetry();
973
974 assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
975 assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
976 assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
977 assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
978 assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
979 assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
980 assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
981 }
982
983 #[test]
984 #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
985 fn cache_perf_reports_cache_vs_no_cache() {
986 let _guard = crate::runtime::runtime_test_guard()
987 .lock()
988 .expect("provider test guard");
989 let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
990 let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
991 let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
992 let workload = build_perf_workload(ops, hotset);
993
994 eprintln!(
995 "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
996 rows, ops, hotset
997 );
998
999 let bypass = run_bypass_perf(&workload, rows);
1000 let global = run_global_cache_perf(&workload, rows);
1001 let local = run_local_cache_perf(&workload, rows);
1002
1003 print_perf_result(&bypass);
1004 print_perf_result(&global);
1005 print_perf_result(&local);
1006
1007 eprintln!(
1008 "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1009 bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1010 bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1011 );
1012
1013 assert_eq!(bypass.counters.result_hits, 0);
1014 assert_eq!(bypass.counters.result_misses, 0);
1015 assert_eq!(bypass.counters.local_hits, 0);
1016 assert_eq!(bypass.counters.local_misses, 0);
1017 assert!(global.counters.result_hits > 0);
1018 assert!(global.counters.result_misses > 0);
1019 assert_eq!(global.counters.local_hits, 0);
1020 assert_eq!(global.counters.local_misses, 0);
1021 assert!(local.counters.local_hits > 0);
1022 assert!(local.counters.local_misses > 0);
1023 assert!(local.counters.result_misses > 0);
1024 }
1025
1026 #[test]
1027 fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1028 let _guard = crate::runtime::runtime_test_guard()
1029 .lock()
1030 .expect("provider test guard");
1031 let root = uniq_cache_cfg_tmp_dir();
1032 let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1033 let auth_file = root.join(".run").join("authority.sqlite");
1034 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1035 .expect("create authority parent");
1036 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1037
1038 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1039 .expect("init knowdb with cache config");
1040
1041 let snapshot = runtime_snapshot();
1042 assert!(snapshot.result_cache_enabled);
1043 assert_eq!(snapshot.result_cache_capacity, 7);
1044 assert_eq!(snapshot.result_cache_ttl_ms, 5);
1045
1046 let req = QueryRequest::first_row(
1047 "SELECT value FROM example WHERE id=:id",
1048 fields_to_params(&[DataField::from_digit(":id", 1)]),
1049 CachePolicy::UseGlobal,
1050 );
1051 let before = runtime_snapshot();
1052 let row = runtime()
1053 .execute(&req)
1054 .expect("first result-cache query")
1055 .into_row();
1056 assert_eq!(row[0].to_string(), "chars(alpha)");
1057 let row = runtime()
1058 .execute(&req)
1059 .expect("second result-cache query")
1060 .into_row();
1061 assert_eq!(row[0].to_string(), "chars(alpha)");
1062 std::thread::sleep(Duration::from_millis(12));
1063 let row = runtime()
1064 .execute(&req)
1065 .expect("expired result-cache query")
1066 .into_row();
1067 assert_eq!(row[0].to_string(), "chars(alpha)");
1068 let after = runtime_snapshot();
1069
1070 assert!(after.result_cache_hits > before.result_cache_hits);
1071 assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1072
1073 restore_default_result_cache_config();
1074 let _ = fs::remove_dir_all(&root);
1075 }
1076
1077 #[test]
1078 fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1079 let _guard = crate::runtime::runtime_test_guard()
1080 .lock()
1081 .expect("provider test guard");
1082 let root = uniq_cache_cfg_tmp_dir();
1083 let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1084 let auth_file = root.join(".run").join("authority.sqlite");
1085 fs::create_dir_all(auth_file.parent().expect("authority parent"))
1086 .expect("create authority parent");
1087 let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1088
1089 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1090 .expect("init knowdb with cache disabled");
1091
1092 let snapshot = runtime_snapshot();
1093 assert!(!snapshot.result_cache_enabled);
1094 assert_eq!(snapshot.result_cache_capacity, 3);
1095 assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1096
1097 let req = QueryRequest::first_row(
1098 "SELECT value FROM example WHERE id=:id",
1099 fields_to_params(&[DataField::from_digit(":id", 1)]),
1100 CachePolicy::UseGlobal,
1101 );
1102 let before = runtime_snapshot();
1103 let _ = runtime()
1104 .execute(&req)
1105 .expect("first bypassed result-cache query");
1106 let _ = runtime()
1107 .execute(&req)
1108 .expect("second bypassed result-cache query");
1109 let after = runtime_snapshot();
1110
1111 assert_eq!(after.result_cache_hits, before.result_cache_hits);
1112 assert_eq!(after.result_cache_misses, before.result_cache_misses);
1113
1114 restore_default_result_cache_config();
1115 let _ = fs::remove_dir_all(&root);
1116 }
1117
1118 #[test]
1119 fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1120 let _guard = crate::runtime::runtime_test_guard()
1121 .lock()
1122 .expect("provider test guard");
1123 restore_default_result_cache_config();
1124
1125 let db = MemDB::instance();
1126 db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1127 .expect("create table");
1128 db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1129 .expect("seed table");
1130 init_mem_provider(db).expect("init provider");
1131
1132 let before = runtime_snapshot();
1133 let root = uniq_cache_cfg_tmp_dir();
1134 let conf_path = write_provider_only_knowdb_with_cache(
1135 &root,
1136 "mysql",
1137 "not-a-valid-mysql-url",
1138 false,
1139 3,
1140 5,
1141 );
1142 let authority_uri = format!(
1143 "file:{}?mode=rwc&uri=true",
1144 root.join("unused.sqlite").display()
1145 );
1146
1147 let err =
1148 init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1149 assert!(err.is_err());
1150
1151 let after = runtime_snapshot();
1152 assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1153 assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1154 assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1155
1156 let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1157 assert_eq!(row[0].to_string(), "chars(first)");
1158
1159 let _ = fs::remove_dir_all(&root);
1160 }
1161}