Skip to main content

wp_knowledge/
facade.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5    collections::hash_map::DefaultHasher,
6    hash::{Hash, Hasher},
7};
8
9use async_trait::async_trait;
10use rusqlite::ToSql;
11use rusqlite::{Connection, OpenFlags};
12use wp_error::KnowledgeResult;
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25    CachePolicy, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor, QueryRequest,
26    QueryResponse, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29    CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30    telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34    db: MemDB,
35    metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41        ThreadClonedMDB::query_with_scope(self, sql)
42    }
43
44    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45        ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46    }
47
48    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49        ThreadClonedMDB::query_row_with_scope(self, sql)
50    }
51
52    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53        ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54    }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60        self.db.query_with_scope(&self.metadata_scope, sql)
61    }
62
63    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64        self.db
65            .query_fields_with_scope(&self.metadata_scope, sql, params)
66    }
67
68    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69        self.db.query_row_with_scope(&self.metadata_scope, sql)
70    }
71
72    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73        self.db
74            .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75    }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81        PostgresProvider::query(self, sql)
82    }
83
84    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85        PostgresProvider::query_fields(self, sql, params)
86    }
87
88    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89        PostgresProvider::query_row(self, sql)
90    }
91
92    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93        PostgresProvider::query_named_fields(self, sql, params)
94    }
95
96    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97        PostgresProvider::query_async(self, sql).await
98    }
99
100    async fn query_fields_async(
101        &self,
102        sql: &str,
103        params: &[DataField],
104    ) -> KnowledgeResult<Vec<RowData>> {
105        PostgresProvider::query_fields_async(self, sql, params).await
106    }
107
108    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109        PostgresProvider::query_row_async(self, sql).await
110    }
111
112    async fn query_named_fields_async(
113        &self,
114        sql: &str,
115        params: &[DataField],
116    ) -> KnowledgeResult<RowData> {
117        PostgresProvider::query_named_fields_async(self, sql, params).await
118    }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124        MySqlProvider::query(self, sql)
125    }
126
127    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128        MySqlProvider::query_fields(self, sql, params)
129    }
130
131    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132        MySqlProvider::query_row(self, sql)
133    }
134
135    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136        MySqlProvider::query_named_fields(self, sql, params)
137    }
138
139    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140        MySqlProvider::query_async(self, sql).await
141    }
142
143    async fn query_fields_async(
144        &self,
145        sql: &str,
146        params: &[DataField],
147    ) -> KnowledgeResult<Vec<RowData>> {
148        MySqlProvider::query_fields_async(self, sql, params).await
149    }
150
151    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152        MySqlProvider::query_row_async(self, sql).await
153    }
154
155    async fn query_named_fields_async(
156        &self,
157        sql: &str,
158        params: &[DataField],
159    ) -> KnowledgeResult<RowData> {
160        MySqlProvider::query_named_fields_async(self, sql, params).await
161    }
162}
163
164fn install_provider<F>(
165    kind: ProviderKind,
166    datasource_id: DatasourceId,
167    build: F,
168) -> KnowledgeResult<()>
169where
170    F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172    runtime().install_provider(kind, datasource_id, build)?;
173    Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177    DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181    let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182    install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183        Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184            authority_uri,
185            datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186            generation.0,
187        )))
188    })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192    install_provider(
193        ProviderKind::SqliteAuthority,
194        datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195        |generation| {
196            Ok(Arc::new(MemProvider {
197                db: memdb,
198                metadata_scope: MetadataCacheScope {
199                    datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200                    generation,
201                },
202            }))
203        },
204    )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208    let datasource_id = datasource_id_for(ProviderKind::Postgres, connection_uri);
209    install_provider(
210        ProviderKind::Postgres,
211        datasource_id.clone(),
212        |generation| {
213            let config = PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size);
214            let provider = PostgresProvider::connect(
215                &config,
216                MetadataCacheScope {
217                    datasource_id: datasource_id.clone(),
218                    generation,
219                },
220            )?;
221            Ok(Arc::new(provider))
222        },
223    )
224}
225
226pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
227    let datasource_id = datasource_id_for(ProviderKind::Mysql, connection_uri);
228    install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
229        let config = MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size);
230        let provider = MySqlProvider::connect(
231            &config,
232            MetadataCacheScope {
233                datasource_id: datasource_id.clone(),
234                generation,
235            },
236        )?;
237        Ok(Arc::new(provider))
238    })
239}
240
241pub fn current_generation() -> Option<u64> {
242    runtime()
243        .current_generation()
244        .map(|generation| generation.0)
245}
246
247pub fn runtime_snapshot() -> RuntimeSnapshot {
248    runtime().snapshot()
249}
250
251pub fn install_runtime_telemetry(
252    telemetry_impl: Arc<dyn KnowledgeTelemetry>,
253) -> Arc<dyn KnowledgeTelemetry> {
254    install_telemetry(telemetry_impl)
255}
256
257pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
258    runtime()
259        .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
260        .map(QueryResponse::into_rows)
261}
262
263pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
264    runtime()
265        .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
266        .await
267        .map(QueryResponse::into_rows)
268}
269
270pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
271    runtime()
272        .execute(&QueryRequest::first_row(
273            sql,
274            Vec::new(),
275            CachePolicy::Bypass,
276        ))
277        .map(QueryResponse::into_row)
278}
279
280pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
281    runtime()
282        .execute_async(&QueryRequest::first_row(
283            sql,
284            Vec::new(),
285            CachePolicy::Bypass,
286        ))
287        .await
288        .map(QueryResponse::into_row)
289}
290
291pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
292    runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
293}
294
295pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
296    runtime()
297        .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
298        .await
299}
300
301pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
302    query_fields(sql, params)
303}
304
305pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
306    query_fields_async(sql, params).await
307}
308
309pub fn query_named<'a>(
310    sql: &str,
311    params: &'a [(&'a str, &'a dyn ToSql)],
312) -> KnowledgeResult<RowData> {
313    let fields = named_params_to_fields(params)?;
314    query_fields(sql, &fields)
315}
316
317pub fn cache_query_fields<const N: usize>(
318    sql: &str,
319    c_params: &[DataField; N],
320    query_params: &[DataField],
321    cache: &mut impl CacheAble<DataField, RowData, N>,
322) -> RowData {
323    cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
324}
325
326pub fn cache_query_fields_with_scope<const N: usize>(
327    sql: &str,
328    local_cache_scope: u64,
329    c_params: &[DataField; N],
330    query_params: &[DataField],
331    cache: &mut impl CacheAble<DataField, RowData, N>,
332) -> RowData {
333    if let Some(generation) = current_generation() {
334        cache.prepare_generation(generation);
335    }
336    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
337        runtime().record_local_cache_hit();
338        if telemetry_enabled() {
339            telemetry().on_cache(&CacheTelemetryEvent {
340                layer: CacheLayer::Local,
341                outcome: CacheOutcome::Hit,
342                provider_kind: runtime().current_provider_kind(),
343            });
344        }
345        return hit.clone();
346    }
347    runtime().record_local_cache_miss();
348    if telemetry_enabled() {
349        telemetry().on_cache(&CacheTelemetryEvent {
350            layer: CacheLayer::Local,
351            outcome: CacheOutcome::Miss,
352            provider_kind: runtime().current_provider_kind(),
353        });
354    }
355
356    match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
357        Ok(row) => {
358            cache.save_scoped(local_cache_scope, c_params, row.clone());
359            row
360        }
361        Err(err) => {
362            warn_kdb!("[kdb] query error: {}", err);
363            Vec::new()
364        }
365    }
366}
367
368pub async fn cache_query_fields_async<const N: usize>(
369    sql: &str,
370    c_params: &[DataField; N],
371    query_params: &[DataField],
372    cache: &mut impl CacheAble<DataField, RowData, N>,
373) -> RowData {
374    cache_query_fields_async_with_scope(
375        sql,
376        stable_hash(sql),
377        c_params,
378        || query_params.to_vec(),
379        cache,
380    )
381    .await
382}
383
384pub async fn cache_query_fields_async_with<const N: usize, F>(
385    sql: &str,
386    c_params: &[DataField; N],
387    build_query_params: F,
388    cache: &mut impl CacheAble<DataField, RowData, N>,
389) -> RowData
390where
391    F: FnOnce() -> Vec<DataField>,
392{
393    cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
394        .await
395}
396
397pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
398    sql: &str,
399    local_cache_scope: u64,
400    c_params: &[DataField; N],
401    build_query_params: F,
402    cache: &mut impl CacheAble<DataField, RowData, N>,
403) -> RowData
404where
405    F: FnOnce() -> Vec<DataField>,
406{
407    if let Some(generation) = current_generation() {
408        cache.prepare_generation(generation);
409    }
410    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
411        runtime().record_local_cache_hit();
412        if telemetry_enabled() {
413            telemetry().on_cache(&CacheTelemetryEvent {
414                layer: CacheLayer::Local,
415                outcome: CacheOutcome::Hit,
416                provider_kind: runtime().current_provider_kind(),
417            });
418        }
419        return hit.clone();
420    }
421    runtime().record_local_cache_miss();
422    if telemetry_enabled() {
423        telemetry().on_cache(&CacheTelemetryEvent {
424            layer: CacheLayer::Local,
425            outcome: CacheOutcome::Miss,
426            provider_kind: runtime().current_provider_kind(),
427        });
428    }
429
430    let query_params = build_query_params();
431    match runtime()
432        .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
433        .await
434    {
435        Ok(row) => {
436            cache.save_scoped(local_cache_scope, c_params, row.clone());
437            row
438        }
439        Err(err) => {
440            warn_kdb!("[kdb] query error: {}", err);
441            Vec::new()
442        }
443    }
444}
445
446pub fn cache_query<const N: usize>(
447    sql: &str,
448    c_params: &[DataField; N],
449    named_params: &[(&str, &dyn ToSql)],
450    cache: &mut impl CacheAble<DataField, RowData, N>,
451) -> RowData {
452    let query_fields = match named_params_to_fields(named_params) {
453        Ok(fields) => fields,
454        Err(err) => {
455            warn_kdb!("[kdb] query param conversion error: {}", err);
456            return Vec::new();
457        }
458    };
459
460    cache_query_fields(sql, c_params, &query_fields, cache)
461}
462
463pub async fn cache_query_async<const N: usize>(
464    sql: &str,
465    c_params: &[DataField; N],
466    named_params: &[(&str, &dyn ToSql)],
467    cache: &mut impl CacheAble<DataField, RowData, N>,
468) -> RowData {
469    let query_fields = match named_params_to_fields(named_params) {
470        Ok(fields) => fields,
471        Err(err) => {
472            warn_kdb!("[kdb] query param conversion error: {}", err);
473            return Vec::new();
474        }
475    };
476
477    cache_query_fields_async(sql, c_params, &query_fields, cache).await
478}
479
480fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
481    if let Ok(conn) = Connection::open_with_flags(
482        authority_uri,
483        OpenFlags::SQLITE_OPEN_READ_WRITE
484            | OpenFlags::SQLITE_OPEN_CREATE
485            | OpenFlags::SQLITE_OPEN_URI,
486    ) {
487        let _ = conn.execute_batch(
488            "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
489        );
490    }
491    Ok(())
492}
493
494pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
495    ensure_wal(authority_uri)?;
496    let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
497    let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
498    init_mem_provider(mem)
499}
500
501pub fn init_thread_cloned_from_knowdb(
502    root: &Path,
503    knowdb_conf: &Path,
504    authority_uri: &str,
505    dict: &orion_variate::EnvDict,
506) -> KnowledgeResult<()> {
507    let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
508    if let Some(provider) = conf.provider {
509        match provider.kind {
510            ProviderKind::Postgres => {
511                info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
512                init_postgres_provider(provider.connection_uri.as_str(), provider.pool_size)?;
513                runtime().configure_result_cache(
514                    conf.cache.enabled,
515                    conf.cache.capacity,
516                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
517                );
518                return Ok(());
519            }
520            ProviderKind::Mysql => {
521                info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
522                init_mysql_provider(provider.connection_uri.as_str(), provider.pool_size)?;
523                runtime().configure_result_cache(
524                    conf.cache.enabled,
525                    conf.cache.capacity,
526                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
527                );
528                return Ok(());
529            }
530            ProviderKind::SqliteAuthority => {}
531        }
532    }
533
534    crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
535    let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
536        let path_part = rest.split('?').next().unwrap_or(rest);
537        format!("file:{}?mode=ro&uri=true", path_part)
538    } else {
539        authority_uri.to_string()
540    };
541
542    info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
543    init_thread_cloned_from_authority(&ro_uri)?;
544    runtime().configure_result_cache(
545        conf.cache.enabled,
546        conf.cache.capacity,
547        Duration::from_millis(conf.cache.ttl_ms.max(1)),
548    );
549    Ok(())
550}
551
552fn stable_hash(value: &str) -> u64 {
553    let mut hasher = DefaultHasher::new();
554    value.hash(&mut hasher);
555    hasher.finish()
556}
557
558#[cfg(test)]
559mod tests {
560    use super::*;
561    use crate::cache::FieldQueryCache;
562    use crate::mem::memdb::MemDB;
563    use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
564    use crate::runtime::fields_to_params;
565    use crate::telemetry::{
566        CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
567        ReloadTelemetryEvent, reset_telemetry,
568    };
569    use orion_error::{ToStructError, UvsFrom};
570    use orion_variate::EnvDict;
571    use std::fs;
572    use std::hint::black_box;
573    use std::path::PathBuf;
574    use std::sync::atomic::{AtomicU64, Ordering};
575    use std::time::{Duration, Instant};
576    use wp_error::KnowledgeReason;
577
578    #[derive(Default)]
579    struct TestTelemetry {
580        reload_success: AtomicU64,
581        reload_failure: AtomicU64,
582        local_hits: AtomicU64,
583        local_misses: AtomicU64,
584        result_hits: AtomicU64,
585        result_misses: AtomicU64,
586        metadata_hits: AtomicU64,
587        metadata_misses: AtomicU64,
588        query_success: AtomicU64,
589        query_failure: AtomicU64,
590    }
591
592    impl KnowledgeTelemetry for TestTelemetry {
593        fn on_cache(&self, event: &CacheTelemetryEvent) {
594            let counter = match (event.layer, event.outcome) {
595                (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
596                (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
597                (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
598                (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
599                (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
600                (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
601            };
602            counter.fetch_add(1, Ordering::Relaxed);
603        }
604
605        fn on_reload(&self, event: &ReloadTelemetryEvent) {
606            let counter = match event.outcome {
607                crate::telemetry::ReloadOutcome::Success => &self.reload_success,
608                crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
609            };
610            counter.fetch_add(1, Ordering::Relaxed);
611        }
612
613        fn on_query(&self, event: &QueryTelemetryEvent) {
614            let counter = if event.success {
615                &self.query_success
616            } else {
617                &self.query_failure
618            };
619            counter.fetch_add(1, Ordering::Relaxed);
620        }
621    }
622
623    fn perf_env_usize(key: &str, default: usize) -> usize {
624        std::env::var(key)
625            .ok()
626            .and_then(|value| value.parse::<usize>().ok())
627            .unwrap_or(default)
628    }
629
630    fn seed_perf_provider(rows: usize) {
631        let db = MemDB::instance();
632        db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
633            .expect("create perf_kv");
634        db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
635        for id in 1..=rows {
636            let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
637            db.execute(sql.as_str()).expect("insert perf_kv row");
638        }
639        db.execute("COMMIT").expect("commit perf_kv load");
640        init_mem_provider(db).expect("init perf provider");
641    }
642
643    #[tokio::test(flavor = "current_thread")]
644    async fn query_async_works_with_mem_provider() {
645        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
646        let db = MemDB::instance();
647        db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
648            .expect("create async_kv");
649        db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
650            .expect("insert async_kv row");
651        init_mem_provider(db).expect("init mem provider");
652
653        let row = query_fields_async(
654            "SELECT value FROM async_kv WHERE id=:id",
655            &[DataField::from_digit(":id", 1)],
656        )
657        .await
658        .expect("query async row");
659        assert_eq!(row.len(), 1);
660        assert_eq!(row[0].to_string(), "chars(hello)");
661    }
662
663    #[derive(Clone)]
664    struct PerfQuery {
665        cache_key: [DataField; 1],
666        query_params: [DataField; 1],
667        bypass_req: QueryRequest,
668        global_req: QueryRequest,
669    }
670
671    fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
672        (0..ops)
673            .map(|idx| {
674                let id = ((idx * 17) % hotset + 1) as i64;
675                let cache_key = [DataField::from_digit("id", id)];
676                let query_params = [DataField::from_digit(":id", id)];
677                let bypass_req = QueryRequest::first_row(
678                    "SELECT value FROM perf_kv WHERE id=:id",
679                    fields_to_params(&query_params),
680                    CachePolicy::Bypass,
681                );
682                let global_req = QueryRequest::first_row(
683                    "SELECT value FROM perf_kv WHERE id=:id",
684                    fields_to_params(&query_params),
685                    CachePolicy::UseGlobal,
686                );
687                PerfQuery {
688                    cache_key,
689                    query_params,
690                    bypass_req,
691                    global_req,
692                }
693            })
694            .collect()
695    }
696
697    #[derive(Debug, Clone, Copy)]
698    struct PerfCounters {
699        result_hits: u64,
700        result_misses: u64,
701        local_hits: u64,
702        local_misses: u64,
703        metadata_hits: u64,
704        metadata_misses: u64,
705    }
706
707    #[derive(Debug, Clone)]
708    struct PerfResult {
709        name: &'static str,
710        elapsed: Duration,
711        ops: usize,
712        counters: PerfCounters,
713    }
714
715    impl PerfResult {
716        fn qps(&self) -> f64 {
717            let secs = self.elapsed.as_secs_f64();
718            if secs == 0.0 {
719                self.ops as f64
720            } else {
721                self.ops as f64 / secs
722            }
723        }
724    }
725
726    fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
727        PerfCounters {
728            result_hits: after
729                .result_cache_hits
730                .saturating_sub(before.result_cache_hits),
731            result_misses: after
732                .result_cache_misses
733                .saturating_sub(before.result_cache_misses),
734            local_hits: after
735                .local_cache_hits
736                .saturating_sub(before.local_cache_hits),
737            local_misses: after
738                .local_cache_misses
739                .saturating_sub(before.local_cache_misses),
740            metadata_hits: after
741                .metadata_cache_hits
742                .saturating_sub(before.metadata_cache_hits),
743            metadata_misses: after
744                .metadata_cache_misses
745                .saturating_sub(before.metadata_cache_misses),
746        }
747    }
748
749    fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
750        seed_perf_provider(rows);
751        let before = runtime_snapshot();
752        let started = Instant::now();
753        for item in workload {
754            let row = runtime()
755                .execute(&item.bypass_req)
756                .expect("execute bypass request")
757                .into_row();
758            black_box(row);
759        }
760        let elapsed = started.elapsed();
761        let after = runtime_snapshot();
762        PerfResult {
763            name: "bypass",
764            elapsed,
765            ops: workload.len(),
766            counters: snapshot_delta(&before, &after),
767        }
768    }
769
770    fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
771        seed_perf_provider(rows);
772        let before = runtime_snapshot();
773        let started = Instant::now();
774        for item in workload {
775            let row = runtime()
776                .execute(&item.global_req)
777                .expect("execute global-cache request")
778                .into_row();
779            black_box(row);
780        }
781        let elapsed = started.elapsed();
782        let after = runtime_snapshot();
783        PerfResult {
784            name: "global_cache",
785            elapsed,
786            ops: workload.len(),
787            counters: snapshot_delta(&before, &after),
788        }
789    }
790
791    fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
792        seed_perf_provider(rows);
793        let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
794        let before = runtime_snapshot();
795        let started = Instant::now();
796        for item in workload {
797            let row = cache_query_fields(
798                "SELECT value FROM perf_kv WHERE id=:id",
799                &item.cache_key,
800                &item.query_params,
801                &mut cache,
802            );
803            black_box(row);
804        }
805        let elapsed = started.elapsed();
806        let after = runtime_snapshot();
807        PerfResult {
808            name: "local_cache",
809            elapsed,
810            ops: workload.len(),
811            counters: snapshot_delta(&before, &after),
812        }
813    }
814
815    fn print_perf_result(result: &PerfResult) {
816        eprintln!(
817            "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
818            result.name,
819            result.elapsed.as_millis(),
820            result.qps(),
821            result.counters.result_hits,
822            result.counters.result_misses,
823            result.counters.local_hits,
824            result.counters.local_misses,
825            result.counters.metadata_hits,
826            result.counters.metadata_misses,
827        );
828    }
829
830    fn uniq_cache_cfg_tmp_dir() -> PathBuf {
831        use rand::{Rng, rng};
832        let rnd: u64 = rng().next_u64();
833        std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
834    }
835
836    fn write_minimal_knowdb_with_cache(
837        root: &Path,
838        enabled: bool,
839        capacity: usize,
840        ttl_ms: u64,
841    ) -> std::path::PathBuf {
842        let models = root.join("models").join("knowledge");
843        let example_dir = models.join("example");
844        fs::create_dir_all(&example_dir).expect("create knowdb models/example");
845        fs::write(
846            models.join("knowdb.toml"),
847            format!(
848                r#"
849version = 2
850base_dir = "."
851
852[cache]
853enabled = {enabled}
854capacity = {capacity}
855ttl_ms = {ttl_ms}
856
857[csv]
858has_header = false
859
860[[tables]]
861name = "example"
862columns.by_index = [0,1]
863"#
864            ),
865        )
866        .expect("write knowdb.toml");
867        fs::write(
868            example_dir.join("create.sql"),
869            r#"
870CREATE TABLE IF NOT EXISTS {table} (
871  id INTEGER PRIMARY KEY,
872  value TEXT NOT NULL
873);
874"#,
875        )
876        .expect("write create.sql");
877        fs::write(
878            example_dir.join("insert.sql"),
879            "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
880        )
881        .expect("write insert.sql");
882        fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
883        models.join("knowdb.toml")
884    }
885
886    fn write_provider_only_knowdb_with_cache(
887        root: &Path,
888        provider_kind: &str,
889        connection_uri: &str,
890        enabled: bool,
891        capacity: usize,
892        ttl_ms: u64,
893    ) -> std::path::PathBuf {
894        let models = root.join("models").join("knowledge");
895        fs::create_dir_all(&models).expect("create knowdb models");
896        fs::write(
897            models.join("knowdb.toml"),
898            format!(
899                r#"
900version = 2
901base_dir = "."
902
903[cache]
904enabled = {enabled}
905capacity = {capacity}
906ttl_ms = {ttl_ms}
907
908[provider]
909kind = "{provider_kind}"
910connection_uri = "{connection_uri}"
911"#
912            ),
913        )
914        .expect("write provider knowdb.toml");
915        models.join("knowdb.toml")
916    }
917
918    fn restore_default_result_cache_config() {
919        runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
920    }
921
922    #[test]
923    fn provider_can_be_replaced() {
924        let _guard = crate::runtime::runtime_test_guard()
925            .lock()
926            .expect("provider test guard");
927        let db1 = MemDB::instance();
928        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
929            .expect("create table in db1");
930        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
931            .expect("seed db1");
932        init_mem_provider(db1).expect("init provider db1");
933        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
934        assert_eq!(row[0].to_string(), "chars(first)");
935
936        let db2 = MemDB::instance();
937        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
938            .expect("create table in db2");
939        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
940            .expect("seed db2");
941        init_mem_provider(db2).expect("replace provider with db2");
942        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
943        assert_eq!(row[0].to_string(), "chars(second)");
944    }
945
946    #[test]
947    fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
948        let _guard = crate::runtime::runtime_test_guard()
949            .lock()
950            .expect("provider test guard");
951        COLNAME_CACHE.write().expect("metadata cache lock").clear();
952
953        let db = MemDB::instance();
954        db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
955            .expect("create table");
956        db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
957            .expect("seed table");
958
959        let old_scope = MetadataCacheScope {
960            datasource_id: DatasourceId("sqlite:old".to_string()),
961            generation: Generation(1),
962        };
963        let new_scope = MetadataCacheScope {
964            datasource_id: DatasourceId("sqlite:new".to_string()),
965            generation: Generation(2),
966        };
967        let old_provider = MemProvider {
968            db: db.clone(),
969            metadata_scope: old_scope.clone(),
970        };
971
972        install_provider(
973            ProviderKind::SqliteAuthority,
974            new_scope.datasource_id.clone(),
975            |_generation| {
976                Ok(Arc::new(MemProvider {
977                    db: db.clone(),
978                    metadata_scope: new_scope.clone(),
979                }))
980            },
981        )
982        .expect("install new provider");
983
984        let row = old_provider
985            .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
986            .expect("old provider query");
987        assert_eq!(row[0].to_string(), "chars(scope-old)");
988
989        let cache = COLNAME_CACHE.read().expect("metadata cache lock");
990        assert!(cache.contains(&metadata_cache_key_for_scope(
991            &old_scope,
992            "SELECT value FROM cache_scope_t WHERE id = 1",
993        )));
994        assert!(!cache.contains(&metadata_cache_key_for_scope(
995            &new_scope,
996            "SELECT value FROM cache_scope_t WHERE id = 1",
997        )));
998    }
999
1000    #[tokio::test(flavor = "current_thread")]
1001    async fn async_query_uses_runtime_bridge() {
1002        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1003        let db = MemDB::instance();
1004        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1005            .expect("create table");
1006        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1007            .expect("seed table");
1008        init_mem_provider(db).expect("init provider");
1009
1010        let row = query_row_async("SELECT value FROM t WHERE id = 1")
1011            .await
1012            .expect("async query row");
1013        assert_eq!(row[0].to_string(), "chars(async-first)");
1014    }
1015
1016    #[tokio::test(flavor = "current_thread")]
1017    async fn async_cache_query_fields_hits_local_cache() {
1018        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1019        let db = MemDB::instance();
1020        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1021            .expect("create table");
1022        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1023            .expect("seed table");
1024        init_mem_provider(db).expect("init provider");
1025
1026        let key = [DataField::from_digit("id", 1)];
1027        let params = [DataField::from_digit(":id", 1)];
1028        let mut cache = FieldQueryCache::default();
1029
1030        let first = cache_query_fields_async(
1031            "SELECT value FROM t WHERE id=:id",
1032            &key,
1033            &params,
1034            &mut cache,
1035        )
1036        .await;
1037        let second = cache_query_fields_async(
1038            "SELECT value FROM t WHERE id=:id",
1039            &key,
1040            &params,
1041            &mut cache,
1042        )
1043        .await;
1044
1045        assert_eq!(first[0].to_string(), "chars(async-cache)");
1046        assert_eq!(second[0].to_string(), "chars(async-cache)");
1047    }
1048
1049    #[test]
1050    fn local_cache_is_cleared_when_generation_changes() {
1051        let _guard = crate::runtime::runtime_test_guard()
1052            .lock()
1053            .expect("provider test guard");
1054        let db1 = MemDB::instance();
1055        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1056            .expect("create table in db1");
1057        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1058            .expect("seed db1");
1059        init_mem_provider(db1).expect("init provider db1");
1060
1061        let key = [DataField::from_digit("id", 1)];
1062        let params = [DataField::from_digit(":id", 1)];
1063        let mut cache = FieldQueryCache::default();
1064        let row = cache_query_fields(
1065            "SELECT value FROM t WHERE id=:id",
1066            &key,
1067            &params,
1068            &mut cache,
1069        );
1070        assert_eq!(row[0].to_string(), "chars(first)");
1071
1072        let db2 = MemDB::instance();
1073        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1074            .expect("create table in db2");
1075        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1076            .expect("seed db2");
1077        init_mem_provider(db2).expect("replace provider with db2");
1078
1079        let row = cache_query_fields(
1080            "SELECT value FROM t WHERE id=:id",
1081            &key,
1082            &params,
1083            &mut cache,
1084        );
1085        assert_eq!(row[0].to_string(), "chars(second)");
1086    }
1087
1088    #[test]
1089    fn local_cache_is_scoped_by_sql_text() {
1090        let _guard = crate::runtime::runtime_test_guard()
1091            .lock()
1092            .expect("provider test guard");
1093        let db = MemDB::instance();
1094        db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1095            .expect("create t1");
1096        db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1097            .expect("create t2");
1098        db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1099            .expect("seed t1");
1100        db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1101            .expect("seed t2");
1102        init_mem_provider(db).expect("init provider");
1103
1104        let key = [DataField::from_digit("id", 1)];
1105        let params = [DataField::from_digit(":id", 1)];
1106        let mut cache = FieldQueryCache::default();
1107
1108        let row = cache_query_fields(
1109            "SELECT value FROM t1 WHERE id=:id",
1110            &key,
1111            &params,
1112            &mut cache,
1113        );
1114        assert_eq!(row[0].to_string(), "chars(first)");
1115
1116        let row = cache_query_fields(
1117            "SELECT value FROM t2 WHERE id=:id",
1118            &key,
1119            &params,
1120            &mut cache,
1121        );
1122        assert_eq!(row[0].to_string(), "chars(second)");
1123    }
1124
1125    #[test]
1126    fn runtime_snapshot_tracks_generation_and_cache_size() {
1127        let _guard = crate::runtime::runtime_test_guard()
1128            .lock()
1129            .expect("provider test guard");
1130        let db = MemDB::instance();
1131        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1132            .expect("create table");
1133        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1134            .expect("seed table");
1135        init_mem_provider(db).expect("init provider");
1136
1137        let mut cache = FieldQueryCache::default();
1138        let key = [DataField::from_digit("id", 1)];
1139        let params = [DataField::from_digit(":id", 1)];
1140        let row = cache_query_fields(
1141            "SELECT value FROM t WHERE id=:id",
1142            &key,
1143            &params,
1144            &mut cache,
1145        );
1146        assert_eq!(row[0].to_string(), "chars(first)");
1147
1148        let snapshot = runtime_snapshot();
1149        assert!(matches!(
1150            snapshot.provider_kind,
1151            Some(ProviderKind::SqliteAuthority)
1152        ));
1153        assert!(snapshot.generation.is_some());
1154        assert!(snapshot.result_cache_len >= 1);
1155        assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1156        assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1157        assert!(snapshot.reload_successes >= 1);
1158    }
1159
1160    #[test]
1161    fn metadata_cache_is_scoped_by_generation() {
1162        let _guard = crate::runtime::runtime_test_guard()
1163            .lock()
1164            .expect("provider test guard");
1165        let sql = "SELECT value FROM t WHERE id = 1";
1166        let before = runtime_snapshot().metadata_cache_len;
1167
1168        let db1 = MemDB::instance();
1169        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1170            .expect("create table in db1");
1171        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1172            .expect("seed db1");
1173        init_mem_provider(db1).expect("init provider db1");
1174        let row = query_row(sql).expect("query db1");
1175        assert_eq!(row[0].to_string(), "chars(first)");
1176        let after_first = runtime_snapshot().metadata_cache_len;
1177        assert!(
1178            after_first > before,
1179            "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1180        );
1181
1182        let db2 = MemDB::instance();
1183        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1184            .expect("create table in db2");
1185        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1186            .expect("seed db2");
1187        init_mem_provider(db2).expect("replace provider with db2");
1188        let row = query_row(sql).expect("query db2");
1189        assert_eq!(row[0].to_string(), "chars(second)");
1190        let after_second = runtime_snapshot().metadata_cache_len;
1191        assert!(
1192            after_second > after_first,
1193            "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1194        );
1195    }
1196
1197    #[test]
1198    fn failed_provider_reload_keeps_previous_provider() {
1199        let _guard = crate::runtime::runtime_test_guard()
1200            .lock()
1201            .expect("provider test guard");
1202        let db1 = MemDB::instance();
1203        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1204            .expect("create table in db1");
1205        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1206            .expect("seed db1");
1207        init_mem_provider(db1).expect("init provider db1");
1208        let before_generation = current_generation();
1209
1210        let reload_err = install_provider(
1211            ProviderKind::SqliteAuthority,
1212            datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1213            |_generation| {
1214                Err(KnowledgeReason::from_logic()
1215                    .to_err()
1216                    .with_detail("expected reload failure"))
1217            },
1218        );
1219        assert!(reload_err.is_err());
1220
1221        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1222        assert_eq!(row[0].to_string(), "chars(first)");
1223        assert_eq!(current_generation(), before_generation);
1224    }
1225
1226    #[test]
1227    fn runtime_snapshot_records_cache_counters() {
1228        let _guard = crate::runtime::runtime_test_guard()
1229            .lock()
1230            .expect("provider test guard");
1231        let db = MemDB::instance();
1232        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1233            .expect("create table");
1234        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1235            .expect("seed table");
1236        init_mem_provider(db).expect("init provider");
1237
1238        let before = runtime_snapshot();
1239        let mut cache = FieldQueryCache::default();
1240        let key = [DataField::from_digit("id", 1)];
1241        let params = [DataField::from_digit(":id", 1)];
1242        let row = cache_query_fields(
1243            "SELECT value FROM t WHERE id=:id",
1244            &key,
1245            &params,
1246            &mut cache,
1247        );
1248        assert_eq!(row[0].to_string(), "chars(first)");
1249        let row = cache_query_fields(
1250            "SELECT value FROM t WHERE id=:id",
1251            &key,
1252            &params,
1253            &mut cache,
1254        );
1255        assert_eq!(row[0].to_string(), "chars(first)");
1256
1257        let after = runtime_snapshot();
1258        assert!(after.local_cache_hits > before.local_cache_hits);
1259        assert!(after.local_cache_misses > before.local_cache_misses);
1260        assert!(after.result_cache_misses > before.result_cache_misses);
1261        assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1262    }
1263
1264    #[test]
1265    fn telemetry_receives_reload_cache_and_query_events() {
1266        let _guard = crate::runtime::runtime_test_guard()
1267            .lock()
1268            .expect("provider test guard");
1269        let telemetry_impl = Arc::new(TestTelemetry::default());
1270        let previous = install_runtime_telemetry(telemetry_impl.clone());
1271
1272        let db = MemDB::instance();
1273        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1274            .expect("create table");
1275        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1276            .expect("seed table");
1277        init_mem_provider(db).expect("init provider");
1278
1279        let mut cache = FieldQueryCache::default();
1280        let key = [DataField::from_digit("id", 1)];
1281        let params = [DataField::from_digit(":id", 1)];
1282        let row = cache_query_fields(
1283            "SELECT value FROM t WHERE id=:id",
1284            &key,
1285            &params,
1286            &mut cache,
1287        );
1288        assert_eq!(row[0].to_string(), "chars(first)");
1289        let row = cache_query_fields(
1290            "SELECT value FROM t WHERE id=:id",
1291            &key,
1292            &params,
1293            &mut cache,
1294        );
1295        assert_eq!(row[0].to_string(), "chars(first)");
1296
1297        let reload_err = install_provider(
1298            ProviderKind::SqliteAuthority,
1299            datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1300            |_generation| {
1301                Err(KnowledgeReason::from_logic()
1302                    .to_err()
1303                    .with_detail("expected telemetry reload failure"))
1304            },
1305        );
1306        assert!(reload_err.is_err());
1307
1308        install_runtime_telemetry(previous);
1309        reset_telemetry();
1310
1311        assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1312        assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1313        assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1314        assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1315        assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1316        assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1317        assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1318    }
1319
1320    #[test]
1321    #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1322    fn cache_perf_reports_cache_vs_no_cache() {
1323        let _guard = crate::runtime::runtime_test_guard()
1324            .lock()
1325            .expect("provider test guard");
1326        let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1327        let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1328        let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1329        let workload = build_perf_workload(ops, hotset);
1330
1331        eprintln!(
1332            "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1333            rows, ops, hotset
1334        );
1335
1336        let bypass = run_bypass_perf(&workload, rows);
1337        let global = run_global_cache_perf(&workload, rows);
1338        let local = run_local_cache_perf(&workload, rows);
1339
1340        print_perf_result(&bypass);
1341        print_perf_result(&global);
1342        print_perf_result(&local);
1343
1344        eprintln!(
1345            "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1346            bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1347            bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1348        );
1349
1350        assert_eq!(bypass.counters.result_hits, 0);
1351        assert_eq!(bypass.counters.result_misses, 0);
1352        assert_eq!(bypass.counters.local_hits, 0);
1353        assert_eq!(bypass.counters.local_misses, 0);
1354        assert!(global.counters.result_hits > 0);
1355        assert!(global.counters.result_misses > 0);
1356        assert_eq!(global.counters.local_hits, 0);
1357        assert_eq!(global.counters.local_misses, 0);
1358        assert!(local.counters.local_hits > 0);
1359        assert!(local.counters.local_misses > 0);
1360        assert!(local.counters.result_misses > 0);
1361    }
1362
1363    #[test]
1364    fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1365        let _guard = crate::runtime::runtime_test_guard()
1366            .lock()
1367            .expect("provider test guard");
1368        let root = uniq_cache_cfg_tmp_dir();
1369        let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1370        let auth_file = root.join(".run").join("authority.sqlite");
1371        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1372            .expect("create authority parent");
1373        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1374
1375        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1376            .expect("init knowdb with cache config");
1377
1378        let snapshot = runtime_snapshot();
1379        assert!(snapshot.result_cache_enabled);
1380        assert_eq!(snapshot.result_cache_capacity, 7);
1381        assert_eq!(snapshot.result_cache_ttl_ms, 5);
1382
1383        let req = QueryRequest::first_row(
1384            "SELECT value FROM example WHERE id=:id",
1385            fields_to_params(&[DataField::from_digit(":id", 1)]),
1386            CachePolicy::UseGlobal,
1387        );
1388        let before = runtime_snapshot();
1389        let row = runtime()
1390            .execute(&req)
1391            .expect("first result-cache query")
1392            .into_row();
1393        assert_eq!(row[0].to_string(), "chars(alpha)");
1394        let row = runtime()
1395            .execute(&req)
1396            .expect("second result-cache query")
1397            .into_row();
1398        assert_eq!(row[0].to_string(), "chars(alpha)");
1399        std::thread::sleep(Duration::from_millis(12));
1400        let row = runtime()
1401            .execute(&req)
1402            .expect("expired result-cache query")
1403            .into_row();
1404        assert_eq!(row[0].to_string(), "chars(alpha)");
1405        let after = runtime_snapshot();
1406
1407        assert!(after.result_cache_hits > before.result_cache_hits);
1408        assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1409
1410        restore_default_result_cache_config();
1411        let _ = fs::remove_dir_all(&root);
1412    }
1413
1414    #[test]
1415    fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1416        let _guard = crate::runtime::runtime_test_guard()
1417            .lock()
1418            .expect("provider test guard");
1419        let root = uniq_cache_cfg_tmp_dir();
1420        let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1421        let auth_file = root.join(".run").join("authority.sqlite");
1422        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1423            .expect("create authority parent");
1424        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1425
1426        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1427            .expect("init knowdb with cache disabled");
1428
1429        let snapshot = runtime_snapshot();
1430        assert!(!snapshot.result_cache_enabled);
1431        assert_eq!(snapshot.result_cache_capacity, 3);
1432        assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1433
1434        let req = QueryRequest::first_row(
1435            "SELECT value FROM example WHERE id=:id",
1436            fields_to_params(&[DataField::from_digit(":id", 1)]),
1437            CachePolicy::UseGlobal,
1438        );
1439        let before = runtime_snapshot();
1440        let _ = runtime()
1441            .execute(&req)
1442            .expect("first bypassed result-cache query");
1443        let _ = runtime()
1444            .execute(&req)
1445            .expect("second bypassed result-cache query");
1446        let after = runtime_snapshot();
1447
1448        assert_eq!(after.result_cache_hits, before.result_cache_hits);
1449        assert_eq!(after.result_cache_misses, before.result_cache_misses);
1450
1451        restore_default_result_cache_config();
1452        let _ = fs::remove_dir_all(&root);
1453    }
1454
1455    #[test]
1456    fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1457        let _guard = crate::runtime::runtime_test_guard()
1458            .lock()
1459            .expect("provider test guard");
1460        restore_default_result_cache_config();
1461
1462        let db = MemDB::instance();
1463        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1464            .expect("create table");
1465        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1466            .expect("seed table");
1467        init_mem_provider(db).expect("init provider");
1468
1469        let before = runtime_snapshot();
1470        let root = uniq_cache_cfg_tmp_dir();
1471        let conf_path = write_provider_only_knowdb_with_cache(
1472            &root,
1473            "mysql",
1474            "not-a-valid-mysql-url",
1475            false,
1476            3,
1477            5,
1478        );
1479        let authority_uri = format!(
1480            "file:{}?mode=rwc&uri=true",
1481            root.join("unused.sqlite").display()
1482        );
1483
1484        let err =
1485            init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1486        assert!(err.is_err());
1487
1488        let after = runtime_snapshot();
1489        assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1490        assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1491        assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1492
1493        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1494        assert_eq!(row[0].to_string(), "chars(first)");
1495
1496        let _ = fs::remove_dir_all(&root);
1497    }
1498}