Skip to main content

wp_knowledge/
facade.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5    collections::hash_map::DefaultHasher,
6    hash::{Hash, Hasher},
7};
8
9use rusqlite::ToSql;
10use rusqlite::{Connection, OpenFlags};
11use wp_error::KnowledgeResult;
12use wp_log::{info_ctrl, warn_kdb};
13use wp_model_core::model::DataField;
14
15use crate::cache::CacheAble;
16use crate::loader::{ProviderKind, parse_knowdb_conf};
17use crate::mem::RowData;
18use crate::mem::memdb::MemDB;
19use crate::mem::thread_clone::ThreadClonedMDB;
20use crate::mysql::{MySqlProvider, MySqlProviderConfig};
21use crate::param::named_params_to_fields;
22use crate::postgres::{PostgresProvider, PostgresProviderConfig};
23use crate::runtime::{
24    CachePolicy, DatasourceId, Generation, ProviderExecutor, QueryRequest, QueryResponse,
25    RuntimeSnapshot, fields_to_params, runtime,
26};
27use crate::telemetry::{
28    CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry, telemetry,
29};
30
31struct MemProvider(MemDB);
32
33impl ProviderExecutor for ThreadClonedMDB {
34    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
35        crate::DBQuery::query(self, sql)
36    }
37
38    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
39        ThreadClonedMDB::query_fields(self, sql, params)
40    }
41
42    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
43        crate::DBQuery::query_row(self, sql)
44    }
45
46    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
47        ThreadClonedMDB::query_named_fields(self, sql, params)
48    }
49}
50
51impl ProviderExecutor for MemProvider {
52    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
53        crate::DBQuery::query(&self.0, sql)
54    }
55
56    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
57        self.0.query_fields(sql, params)
58    }
59
60    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
61        crate::DBQuery::query_row(&self.0, sql)
62    }
63
64    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
65        self.0.query_named_fields(sql, params)
66    }
67}
68
69impl ProviderExecutor for PostgresProvider {
70    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
71        PostgresProvider::query(self, sql)
72    }
73
74    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
75        PostgresProvider::query_fields(self, sql, params)
76    }
77
78    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
79        PostgresProvider::query_row(self, sql)
80    }
81
82    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
83        PostgresProvider::query_named_fields(self, sql, params)
84    }
85}
86
87impl ProviderExecutor for MySqlProvider {
88    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
89        MySqlProvider::query(self, sql)
90    }
91
92    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
93        MySqlProvider::query_fields(self, sql, params)
94    }
95
96    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
97        MySqlProvider::query_row(self, sql)
98    }
99
100    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
101        MySqlProvider::query_named_fields(self, sql, params)
102    }
103}
104
105fn install_provider<F>(
106    kind: ProviderKind,
107    datasource_id: DatasourceId,
108    build: F,
109) -> KnowledgeResult<()>
110where
111    F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
112{
113    runtime().install_provider(kind, datasource_id, build)?;
114    Ok(())
115}
116
117fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
118    DatasourceId::from_seed(kind, seed)
119}
120
121pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
122    let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
123    install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
124        Ok(Arc::new(ThreadClonedMDB::from_authority_with_generation(
125            authority_uri,
126            generation.0,
127        )))
128    })
129}
130
131pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
132    install_provider(
133        ProviderKind::SqliteAuthority,
134        datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
135        |_generation| Ok(Arc::new(MemProvider(memdb))),
136    )
137}
138
139pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
140    let datasource_id = datasource_id_for(ProviderKind::Postgres, connection_uri);
141    install_provider(ProviderKind::Postgres, datasource_id, |_generation| {
142        let config = PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size);
143        let provider = PostgresProvider::connect(&config)?;
144        Ok(Arc::new(provider))
145    })
146}
147
148pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
149    let datasource_id = datasource_id_for(ProviderKind::Mysql, connection_uri);
150    install_provider(ProviderKind::Mysql, datasource_id, |_generation| {
151        let config = MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size);
152        let provider = MySqlProvider::connect(&config)?;
153        Ok(Arc::new(provider))
154    })
155}
156
157pub fn current_generation() -> Option<u64> {
158    runtime()
159        .current_generation()
160        .map(|generation| generation.0)
161}
162
163pub fn runtime_snapshot() -> RuntimeSnapshot {
164    runtime().snapshot()
165}
166
167pub fn install_runtime_telemetry(
168    telemetry_impl: Arc<dyn KnowledgeTelemetry>,
169) -> Arc<dyn KnowledgeTelemetry> {
170    install_telemetry(telemetry_impl)
171}
172
173pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
174    runtime()
175        .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
176        .map(QueryResponse::into_rows)
177}
178
179pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
180    runtime()
181        .execute(&QueryRequest::first_row(
182            sql,
183            Vec::new(),
184            CachePolicy::Bypass,
185        ))
186        .map(QueryResponse::into_row)
187}
188
189pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
190    runtime()
191        .execute(&QueryRequest::first_row(
192            sql,
193            fields_to_params(params),
194            CachePolicy::Bypass,
195        ))
196        .map(QueryResponse::into_row)
197}
198
199pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
200    query_fields(sql, params)
201}
202
203pub fn query_named<'a>(
204    sql: &str,
205    params: &'a [(&'a str, &'a dyn ToSql)],
206) -> KnowledgeResult<RowData> {
207    let fields = named_params_to_fields(params)?;
208    query_fields(sql, &fields)
209}
210
211pub fn cache_query_fields<const N: usize>(
212    sql: &str,
213    c_params: &[DataField; N],
214    query_params: &[DataField],
215    cache: &mut impl CacheAble<DataField, RowData, N>,
216) -> RowData {
217    let local_cache_scope = stable_hash(sql);
218    if let Some(generation) = current_generation() {
219        cache.prepare_generation(generation);
220    }
221    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
222        runtime().record_local_cache_hit();
223        telemetry().on_cache(&CacheTelemetryEvent {
224            layer: CacheLayer::Local,
225            outcome: CacheOutcome::Hit,
226            provider_kind: runtime().current_provider_kind(),
227        });
228        return hit.clone();
229    }
230    runtime().record_local_cache_miss();
231    telemetry().on_cache(&CacheTelemetryEvent {
232        layer: CacheLayer::Local,
233        outcome: CacheOutcome::Miss,
234        provider_kind: runtime().current_provider_kind(),
235    });
236
237    let req = QueryRequest::first_row(sql, fields_to_params(query_params), CachePolicy::UseGlobal);
238    match runtime().execute(&req) {
239        Ok(out) => {
240            let row = out.into_row();
241            cache.save_scoped(local_cache_scope, c_params, row.clone());
242            row
243        }
244        Err(err) => {
245            warn_kdb!("[kdb] query error: {}", err);
246            Vec::new()
247        }
248    }
249}
250
251pub fn cache_query<const N: usize>(
252    sql: &str,
253    c_params: &[DataField; N],
254    named_params: &[(&str, &dyn ToSql)],
255    cache: &mut impl CacheAble<DataField, RowData, N>,
256) -> RowData {
257    let query_fields = match named_params_to_fields(named_params) {
258        Ok(fields) => fields,
259        Err(err) => {
260            warn_kdb!("[kdb] query param conversion error: {}", err);
261            return Vec::new();
262        }
263    };
264
265    cache_query_fields(sql, c_params, &query_fields, cache)
266}
267
268fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
269    if let Ok(conn) = Connection::open_with_flags(
270        authority_uri,
271        OpenFlags::SQLITE_OPEN_READ_WRITE
272            | OpenFlags::SQLITE_OPEN_CREATE
273            | OpenFlags::SQLITE_OPEN_URI,
274    ) {
275        let _ = conn.execute_batch(
276            "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
277        );
278    }
279    Ok(())
280}
281
282pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
283    ensure_wal(authority_uri)?;
284    let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
285    let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
286    init_mem_provider(mem)
287}
288
289pub fn init_thread_cloned_from_knowdb(
290    root: &Path,
291    knowdb_conf: &Path,
292    authority_uri: &str,
293    dict: &orion_variate::EnvDict,
294) -> KnowledgeResult<()> {
295    let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
296    if let Some(provider) = conf.provider {
297        match provider.kind {
298            ProviderKind::Postgres => {
299                info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
300                init_postgres_provider(provider.connection_uri.as_str(), provider.pool_size)?;
301                runtime().configure_result_cache(
302                    conf.cache.enabled,
303                    conf.cache.capacity,
304                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
305                );
306                return Ok(());
307            }
308            ProviderKind::Mysql => {
309                info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
310                init_mysql_provider(provider.connection_uri.as_str(), provider.pool_size)?;
311                runtime().configure_result_cache(
312                    conf.cache.enabled,
313                    conf.cache.capacity,
314                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
315                );
316                return Ok(());
317            }
318            ProviderKind::SqliteAuthority => {}
319        }
320    }
321
322    crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
323    let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
324        let path_part = rest.split('?').next().unwrap_or(rest);
325        format!("file:{}?mode=ro&uri=true", path_part)
326    } else {
327        authority_uri.to_string()
328    };
329
330    info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
331    init_thread_cloned_from_authority(&ro_uri)?;
332    runtime().configure_result_cache(
333        conf.cache.enabled,
334        conf.cache.capacity,
335        Duration::from_millis(conf.cache.ttl_ms.max(1)),
336    );
337    Ok(())
338}
339
340fn stable_hash(value: &str) -> u64 {
341    let mut hasher = DefaultHasher::new();
342    value.hash(&mut hasher);
343    hasher.finish()
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::cache::FieldQueryCache;
350    use crate::mem::memdb::MemDB;
351    use crate::telemetry::{
352        CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
353        ReloadTelemetryEvent, reset_telemetry,
354    };
355    use orion_error::{ToStructError, UvsFrom};
356    use orion_variate::EnvDict;
357    use std::fs;
358    use std::hint::black_box;
359    use std::path::PathBuf;
360    use std::sync::atomic::{AtomicU64, Ordering};
361    use std::time::{Duration, Instant};
362    use wp_error::KnowledgeReason;
363
364    #[derive(Default)]
365    struct TestTelemetry {
366        reload_success: AtomicU64,
367        reload_failure: AtomicU64,
368        local_hits: AtomicU64,
369        local_misses: AtomicU64,
370        result_hits: AtomicU64,
371        result_misses: AtomicU64,
372        metadata_hits: AtomicU64,
373        metadata_misses: AtomicU64,
374        query_success: AtomicU64,
375        query_failure: AtomicU64,
376    }
377
378    impl KnowledgeTelemetry for TestTelemetry {
379        fn on_cache(&self, event: &CacheTelemetryEvent) {
380            let counter = match (event.layer, event.outcome) {
381                (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
382                (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
383                (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
384                (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
385                (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
386                (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
387            };
388            counter.fetch_add(1, Ordering::Relaxed);
389        }
390
391        fn on_reload(&self, event: &ReloadTelemetryEvent) {
392            let counter = match event.outcome {
393                crate::telemetry::ReloadOutcome::Success => &self.reload_success,
394                crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
395            };
396            counter.fetch_add(1, Ordering::Relaxed);
397        }
398
399        fn on_query(&self, event: &QueryTelemetryEvent) {
400            let counter = if event.success {
401                &self.query_success
402            } else {
403                &self.query_failure
404            };
405            counter.fetch_add(1, Ordering::Relaxed);
406        }
407    }
408
409    fn perf_env_usize(key: &str, default: usize) -> usize {
410        std::env::var(key)
411            .ok()
412            .and_then(|value| value.parse::<usize>().ok())
413            .unwrap_or(default)
414    }
415
416    fn seed_perf_provider(rows: usize) {
417        let db = MemDB::instance();
418        db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
419            .expect("create perf_kv");
420        db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
421        for id in 1..=rows {
422            let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
423            db.execute(sql.as_str()).expect("insert perf_kv row");
424        }
425        db.execute("COMMIT").expect("commit perf_kv load");
426        init_mem_provider(db).expect("init perf provider");
427    }
428
429    #[derive(Clone)]
430    struct PerfQuery {
431        cache_key: [DataField; 1],
432        query_params: [DataField; 1],
433        bypass_req: QueryRequest,
434        global_req: QueryRequest,
435    }
436
437    fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
438        (0..ops)
439            .map(|idx| {
440                let id = ((idx * 17) % hotset + 1) as i64;
441                let cache_key = [DataField::from_digit("id", id)];
442                let query_params = [DataField::from_digit(":id", id)];
443                let bypass_req = QueryRequest::first_row(
444                    "SELECT value FROM perf_kv WHERE id=:id",
445                    fields_to_params(&query_params),
446                    CachePolicy::Bypass,
447                );
448                let global_req = QueryRequest::first_row(
449                    "SELECT value FROM perf_kv WHERE id=:id",
450                    fields_to_params(&query_params),
451                    CachePolicy::UseGlobal,
452                );
453                PerfQuery {
454                    cache_key,
455                    query_params,
456                    bypass_req,
457                    global_req,
458                }
459            })
460            .collect()
461    }
462
463    #[derive(Debug, Clone, Copy)]
464    struct PerfCounters {
465        result_hits: u64,
466        result_misses: u64,
467        local_hits: u64,
468        local_misses: u64,
469        metadata_hits: u64,
470        metadata_misses: u64,
471    }
472
473    #[derive(Debug, Clone)]
474    struct PerfResult {
475        name: &'static str,
476        elapsed: Duration,
477        ops: usize,
478        counters: PerfCounters,
479    }
480
481    impl PerfResult {
482        fn qps(&self) -> f64 {
483            let secs = self.elapsed.as_secs_f64();
484            if secs == 0.0 {
485                self.ops as f64
486            } else {
487                self.ops as f64 / secs
488            }
489        }
490    }
491
492    fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
493        PerfCounters {
494            result_hits: after
495                .result_cache_hits
496                .saturating_sub(before.result_cache_hits),
497            result_misses: after
498                .result_cache_misses
499                .saturating_sub(before.result_cache_misses),
500            local_hits: after
501                .local_cache_hits
502                .saturating_sub(before.local_cache_hits),
503            local_misses: after
504                .local_cache_misses
505                .saturating_sub(before.local_cache_misses),
506            metadata_hits: after
507                .metadata_cache_hits
508                .saturating_sub(before.metadata_cache_hits),
509            metadata_misses: after
510                .metadata_cache_misses
511                .saturating_sub(before.metadata_cache_misses),
512        }
513    }
514
515    fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
516        seed_perf_provider(rows);
517        let before = runtime_snapshot();
518        let started = Instant::now();
519        for item in workload {
520            let row = runtime()
521                .execute(&item.bypass_req)
522                .expect("execute bypass request")
523                .into_row();
524            black_box(row);
525        }
526        let elapsed = started.elapsed();
527        let after = runtime_snapshot();
528        PerfResult {
529            name: "bypass",
530            elapsed,
531            ops: workload.len(),
532            counters: snapshot_delta(&before, &after),
533        }
534    }
535
536    fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
537        seed_perf_provider(rows);
538        let before = runtime_snapshot();
539        let started = Instant::now();
540        for item in workload {
541            let row = runtime()
542                .execute(&item.global_req)
543                .expect("execute global-cache request")
544                .into_row();
545            black_box(row);
546        }
547        let elapsed = started.elapsed();
548        let after = runtime_snapshot();
549        PerfResult {
550            name: "global_cache",
551            elapsed,
552            ops: workload.len(),
553            counters: snapshot_delta(&before, &after),
554        }
555    }
556
557    fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
558        seed_perf_provider(rows);
559        let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
560        let before = runtime_snapshot();
561        let started = Instant::now();
562        for item in workload {
563            let row = cache_query_fields(
564                "SELECT value FROM perf_kv WHERE id=:id",
565                &item.cache_key,
566                &item.query_params,
567                &mut cache,
568            );
569            black_box(row);
570        }
571        let elapsed = started.elapsed();
572        let after = runtime_snapshot();
573        PerfResult {
574            name: "local_cache",
575            elapsed,
576            ops: workload.len(),
577            counters: snapshot_delta(&before, &after),
578        }
579    }
580
581    fn print_perf_result(result: &PerfResult) {
582        eprintln!(
583            "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
584            result.name,
585            result.elapsed.as_millis(),
586            result.qps(),
587            result.counters.result_hits,
588            result.counters.result_misses,
589            result.counters.local_hits,
590            result.counters.local_misses,
591            result.counters.metadata_hits,
592            result.counters.metadata_misses,
593        );
594    }
595
596    fn uniq_cache_cfg_tmp_dir() -> PathBuf {
597        use rand::{Rng, rng};
598        let rnd: u64 = rng().next_u64();
599        std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
600    }
601
602    fn write_minimal_knowdb_with_cache(
603        root: &Path,
604        enabled: bool,
605        capacity: usize,
606        ttl_ms: u64,
607    ) -> std::path::PathBuf {
608        let models = root.join("models").join("knowledge");
609        let example_dir = models.join("example");
610        fs::create_dir_all(&example_dir).expect("create knowdb models/example");
611        fs::write(
612            models.join("knowdb.toml"),
613            format!(
614                r#"
615version = 2
616base_dir = "."
617
618[cache]
619enabled = {enabled}
620capacity = {capacity}
621ttl_ms = {ttl_ms}
622
623[csv]
624has_header = false
625
626[[tables]]
627name = "example"
628columns.by_index = [0,1]
629"#
630            ),
631        )
632        .expect("write knowdb.toml");
633        fs::write(
634            example_dir.join("create.sql"),
635            r#"
636CREATE TABLE IF NOT EXISTS {table} (
637  id INTEGER PRIMARY KEY,
638  value TEXT NOT NULL
639);
640"#,
641        )
642        .expect("write create.sql");
643        fs::write(
644            example_dir.join("insert.sql"),
645            "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
646        )
647        .expect("write insert.sql");
648        fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
649        models.join("knowdb.toml")
650    }
651
652    fn write_provider_only_knowdb_with_cache(
653        root: &Path,
654        provider_kind: &str,
655        connection_uri: &str,
656        enabled: bool,
657        capacity: usize,
658        ttl_ms: u64,
659    ) -> std::path::PathBuf {
660        let models = root.join("models").join("knowledge");
661        fs::create_dir_all(&models).expect("create knowdb models");
662        fs::write(
663            models.join("knowdb.toml"),
664            format!(
665                r#"
666version = 2
667base_dir = "."
668
669[cache]
670enabled = {enabled}
671capacity = {capacity}
672ttl_ms = {ttl_ms}
673
674[provider]
675kind = "{provider_kind}"
676connection_uri = "{connection_uri}"
677"#
678            ),
679        )
680        .expect("write provider knowdb.toml");
681        models.join("knowdb.toml")
682    }
683
684    fn restore_default_result_cache_config() {
685        runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
686    }
687
688    #[test]
689    fn provider_can_be_replaced() {
690        let _guard = crate::runtime::runtime_test_guard()
691            .lock()
692            .expect("provider test guard");
693        let db1 = MemDB::instance();
694        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
695            .expect("create table in db1");
696        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
697            .expect("seed db1");
698        init_mem_provider(db1).expect("init provider db1");
699        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
700        assert_eq!(row[0].to_string(), "chars(first)");
701
702        let db2 = MemDB::instance();
703        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
704            .expect("create table in db2");
705        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
706            .expect("seed db2");
707        init_mem_provider(db2).expect("replace provider with db2");
708        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
709        assert_eq!(row[0].to_string(), "chars(second)");
710    }
711
712    #[test]
713    fn local_cache_is_cleared_when_generation_changes() {
714        let _guard = crate::runtime::runtime_test_guard()
715            .lock()
716            .expect("provider test guard");
717        let db1 = MemDB::instance();
718        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
719            .expect("create table in db1");
720        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
721            .expect("seed db1");
722        init_mem_provider(db1).expect("init provider db1");
723
724        let key = [DataField::from_digit("id", 1)];
725        let params = [DataField::from_digit(":id", 1)];
726        let mut cache = FieldQueryCache::default();
727        let row = cache_query_fields(
728            "SELECT value FROM t WHERE id=:id",
729            &key,
730            &params,
731            &mut cache,
732        );
733        assert_eq!(row[0].to_string(), "chars(first)");
734
735        let db2 = MemDB::instance();
736        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
737            .expect("create table in db2");
738        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
739            .expect("seed db2");
740        init_mem_provider(db2).expect("replace provider with db2");
741
742        let row = cache_query_fields(
743            "SELECT value FROM t WHERE id=:id",
744            &key,
745            &params,
746            &mut cache,
747        );
748        assert_eq!(row[0].to_string(), "chars(second)");
749    }
750
751    #[test]
752    fn local_cache_is_scoped_by_sql_text() {
753        let _guard = crate::runtime::runtime_test_guard()
754            .lock()
755            .expect("provider test guard");
756        let db = MemDB::instance();
757        db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
758            .expect("create t1");
759        db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
760            .expect("create t2");
761        db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
762            .expect("seed t1");
763        db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
764            .expect("seed t2");
765        init_mem_provider(db).expect("init provider");
766
767        let key = [DataField::from_digit("id", 1)];
768        let params = [DataField::from_digit(":id", 1)];
769        let mut cache = FieldQueryCache::default();
770
771        let row = cache_query_fields(
772            "SELECT value FROM t1 WHERE id=:id",
773            &key,
774            &params,
775            &mut cache,
776        );
777        assert_eq!(row[0].to_string(), "chars(first)");
778
779        let row = cache_query_fields(
780            "SELECT value FROM t2 WHERE id=:id",
781            &key,
782            &params,
783            &mut cache,
784        );
785        assert_eq!(row[0].to_string(), "chars(second)");
786    }
787
788    #[test]
789    fn runtime_snapshot_tracks_generation_and_cache_size() {
790        let _guard = crate::runtime::runtime_test_guard()
791            .lock()
792            .expect("provider test guard");
793        let db = MemDB::instance();
794        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
795            .expect("create table");
796        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
797            .expect("seed table");
798        init_mem_provider(db).expect("init provider");
799
800        let mut cache = FieldQueryCache::default();
801        let key = [DataField::from_digit("id", 1)];
802        let params = [DataField::from_digit(":id", 1)];
803        let row = cache_query_fields(
804            "SELECT value FROM t WHERE id=:id",
805            &key,
806            &params,
807            &mut cache,
808        );
809        assert_eq!(row[0].to_string(), "chars(first)");
810
811        let snapshot = runtime_snapshot();
812        assert!(matches!(
813            snapshot.provider_kind,
814            Some(ProviderKind::SqliteAuthority)
815        ));
816        assert!(snapshot.generation.is_some());
817        assert!(snapshot.result_cache_len >= 1);
818        assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
819        assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
820        assert!(snapshot.reload_successes >= 1);
821    }
822
823    #[test]
824    fn metadata_cache_is_scoped_by_generation() {
825        let _guard = crate::runtime::runtime_test_guard()
826            .lock()
827            .expect("provider test guard");
828        let sql = "SELECT value FROM t WHERE id = 1";
829        let before = runtime_snapshot().metadata_cache_len;
830
831        let db1 = MemDB::instance();
832        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
833            .expect("create table in db1");
834        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
835            .expect("seed db1");
836        init_mem_provider(db1).expect("init provider db1");
837        let row = query_row(sql).expect("query db1");
838        assert_eq!(row[0].to_string(), "chars(first)");
839        let after_first = runtime_snapshot().metadata_cache_len;
840        assert!(
841            after_first > before,
842            "metadata cache did not record first generation entry: before={before} after_first={after_first}"
843        );
844
845        let db2 = MemDB::instance();
846        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
847            .expect("create table in db2");
848        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
849            .expect("seed db2");
850        init_mem_provider(db2).expect("replace provider with db2");
851        let row = query_row(sql).expect("query db2");
852        assert_eq!(row[0].to_string(), "chars(second)");
853        let after_second = runtime_snapshot().metadata_cache_len;
854        assert!(
855            after_second > after_first,
856            "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
857        );
858    }
859
860    #[test]
861    fn failed_provider_reload_keeps_previous_provider() {
862        let _guard = crate::runtime::runtime_test_guard()
863            .lock()
864            .expect("provider test guard");
865        let db1 = MemDB::instance();
866        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
867            .expect("create table in db1");
868        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
869            .expect("seed db1");
870        init_mem_provider(db1).expect("init provider db1");
871        let before_generation = current_generation();
872
873        let reload_err = install_provider(
874            ProviderKind::SqliteAuthority,
875            datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
876            |_generation| {
877                Err(KnowledgeReason::from_logic()
878                    .to_err()
879                    .with_detail("expected reload failure"))
880            },
881        );
882        assert!(reload_err.is_err());
883
884        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
885        assert_eq!(row[0].to_string(), "chars(first)");
886        assert_eq!(current_generation(), before_generation);
887    }
888
889    #[test]
890    fn runtime_snapshot_records_cache_counters() {
891        let _guard = crate::runtime::runtime_test_guard()
892            .lock()
893            .expect("provider test guard");
894        let db = MemDB::instance();
895        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
896            .expect("create table");
897        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
898            .expect("seed table");
899        init_mem_provider(db).expect("init provider");
900
901        let before = runtime_snapshot();
902        let mut cache = FieldQueryCache::default();
903        let key = [DataField::from_digit("id", 1)];
904        let params = [DataField::from_digit(":id", 1)];
905        let row = cache_query_fields(
906            "SELECT value FROM t WHERE id=:id",
907            &key,
908            &params,
909            &mut cache,
910        );
911        assert_eq!(row[0].to_string(), "chars(first)");
912        let row = cache_query_fields(
913            "SELECT value FROM t WHERE id=:id",
914            &key,
915            &params,
916            &mut cache,
917        );
918        assert_eq!(row[0].to_string(), "chars(first)");
919
920        let after = runtime_snapshot();
921        assert!(after.local_cache_hits > before.local_cache_hits);
922        assert!(after.local_cache_misses > before.local_cache_misses);
923        assert!(after.result_cache_misses > before.result_cache_misses);
924        assert!(after.metadata_cache_misses > before.metadata_cache_misses);
925    }
926
927    #[test]
928    fn telemetry_receives_reload_cache_and_query_events() {
929        let _guard = crate::runtime::runtime_test_guard()
930            .lock()
931            .expect("provider test guard");
932        let telemetry_impl = Arc::new(TestTelemetry::default());
933        let previous = install_runtime_telemetry(telemetry_impl.clone());
934
935        let db = MemDB::instance();
936        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
937            .expect("create table");
938        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
939            .expect("seed table");
940        init_mem_provider(db).expect("init provider");
941
942        let mut cache = FieldQueryCache::default();
943        let key = [DataField::from_digit("id", 1)];
944        let params = [DataField::from_digit(":id", 1)];
945        let row = cache_query_fields(
946            "SELECT value FROM t WHERE id=:id",
947            &key,
948            &params,
949            &mut cache,
950        );
951        assert_eq!(row[0].to_string(), "chars(first)");
952        let row = cache_query_fields(
953            "SELECT value FROM t WHERE id=:id",
954            &key,
955            &params,
956            &mut cache,
957        );
958        assert_eq!(row[0].to_string(), "chars(first)");
959
960        let reload_err = install_provider(
961            ProviderKind::SqliteAuthority,
962            datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
963            |_generation| {
964                Err(KnowledgeReason::from_logic()
965                    .to_err()
966                    .with_detail("expected telemetry reload failure"))
967            },
968        );
969        assert!(reload_err.is_err());
970
971        install_runtime_telemetry(previous);
972        reset_telemetry();
973
974        assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
975        assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
976        assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
977        assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
978        assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
979        assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
980        assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
981    }
982
983    #[test]
984    #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
985    fn cache_perf_reports_cache_vs_no_cache() {
986        let _guard = crate::runtime::runtime_test_guard()
987            .lock()
988            .expect("provider test guard");
989        let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
990        let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
991        let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
992        let workload = build_perf_workload(ops, hotset);
993
994        eprintln!(
995            "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
996            rows, ops, hotset
997        );
998
999        let bypass = run_bypass_perf(&workload, rows);
1000        let global = run_global_cache_perf(&workload, rows);
1001        let local = run_local_cache_perf(&workload, rows);
1002
1003        print_perf_result(&bypass);
1004        print_perf_result(&global);
1005        print_perf_result(&local);
1006
1007        eprintln!(
1008            "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1009            bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1010            bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1011        );
1012
1013        assert_eq!(bypass.counters.result_hits, 0);
1014        assert_eq!(bypass.counters.result_misses, 0);
1015        assert_eq!(bypass.counters.local_hits, 0);
1016        assert_eq!(bypass.counters.local_misses, 0);
1017        assert!(global.counters.result_hits > 0);
1018        assert!(global.counters.result_misses > 0);
1019        assert_eq!(global.counters.local_hits, 0);
1020        assert_eq!(global.counters.local_misses, 0);
1021        assert!(local.counters.local_hits > 0);
1022        assert!(local.counters.local_misses > 0);
1023        assert!(local.counters.result_misses > 0);
1024    }
1025
1026    #[test]
1027    fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1028        let _guard = crate::runtime::runtime_test_guard()
1029            .lock()
1030            .expect("provider test guard");
1031        let root = uniq_cache_cfg_tmp_dir();
1032        let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1033        let auth_file = root.join(".run").join("authority.sqlite");
1034        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1035            .expect("create authority parent");
1036        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1037
1038        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1039            .expect("init knowdb with cache config");
1040
1041        let snapshot = runtime_snapshot();
1042        assert!(snapshot.result_cache_enabled);
1043        assert_eq!(snapshot.result_cache_capacity, 7);
1044        assert_eq!(snapshot.result_cache_ttl_ms, 5);
1045
1046        let req = QueryRequest::first_row(
1047            "SELECT value FROM example WHERE id=:id",
1048            fields_to_params(&[DataField::from_digit(":id", 1)]),
1049            CachePolicy::UseGlobal,
1050        );
1051        let before = runtime_snapshot();
1052        let row = runtime()
1053            .execute(&req)
1054            .expect("first result-cache query")
1055            .into_row();
1056        assert_eq!(row[0].to_string(), "chars(alpha)");
1057        let row = runtime()
1058            .execute(&req)
1059            .expect("second result-cache query")
1060            .into_row();
1061        assert_eq!(row[0].to_string(), "chars(alpha)");
1062        std::thread::sleep(Duration::from_millis(12));
1063        let row = runtime()
1064            .execute(&req)
1065            .expect("expired result-cache query")
1066            .into_row();
1067        assert_eq!(row[0].to_string(), "chars(alpha)");
1068        let after = runtime_snapshot();
1069
1070        assert!(after.result_cache_hits > before.result_cache_hits);
1071        assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1072
1073        restore_default_result_cache_config();
1074        let _ = fs::remove_dir_all(&root);
1075    }
1076
1077    #[test]
1078    fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1079        let _guard = crate::runtime::runtime_test_guard()
1080            .lock()
1081            .expect("provider test guard");
1082        let root = uniq_cache_cfg_tmp_dir();
1083        let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1084        let auth_file = root.join(".run").join("authority.sqlite");
1085        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1086            .expect("create authority parent");
1087        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1088
1089        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1090            .expect("init knowdb with cache disabled");
1091
1092        let snapshot = runtime_snapshot();
1093        assert!(!snapshot.result_cache_enabled);
1094        assert_eq!(snapshot.result_cache_capacity, 3);
1095        assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1096
1097        let req = QueryRequest::first_row(
1098            "SELECT value FROM example WHERE id=:id",
1099            fields_to_params(&[DataField::from_digit(":id", 1)]),
1100            CachePolicy::UseGlobal,
1101        );
1102        let before = runtime_snapshot();
1103        let _ = runtime()
1104            .execute(&req)
1105            .expect("first bypassed result-cache query");
1106        let _ = runtime()
1107            .execute(&req)
1108            .expect("second bypassed result-cache query");
1109        let after = runtime_snapshot();
1110
1111        assert_eq!(after.result_cache_hits, before.result_cache_hits);
1112        assert_eq!(after.result_cache_misses, before.result_cache_misses);
1113
1114        restore_default_result_cache_config();
1115        let _ = fs::remove_dir_all(&root);
1116    }
1117
1118    #[test]
1119    fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1120        let _guard = crate::runtime::runtime_test_guard()
1121            .lock()
1122            .expect("provider test guard");
1123        restore_default_result_cache_config();
1124
1125        let db = MemDB::instance();
1126        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1127            .expect("create table");
1128        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1129            .expect("seed table");
1130        init_mem_provider(db).expect("init provider");
1131
1132        let before = runtime_snapshot();
1133        let root = uniq_cache_cfg_tmp_dir();
1134        let conf_path = write_provider_only_knowdb_with_cache(
1135            &root,
1136            "mysql",
1137            "not-a-valid-mysql-url",
1138            false,
1139            3,
1140            5,
1141        );
1142        let authority_uri = format!(
1143            "file:{}?mode=rwc&uri=true",
1144            root.join("unused.sqlite").display()
1145        );
1146
1147        let err =
1148            init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1149        assert!(err.is_err());
1150
1151        let after = runtime_snapshot();
1152        assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1153        assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1154        assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1155
1156        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1157        assert_eq!(row[0].to_string(), "chars(first)");
1158
1159        let _ = fs::remove_dir_all(&root);
1160    }
1161}