Skip to main content

wp_knowledge/
facade.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5    collections::hash_map::DefaultHasher,
6    hash::{Hash, Hasher},
7};
8
9use async_trait::async_trait;
10use rusqlite::ToSql;
11use rusqlite::{Connection, OpenFlags};
12use wp_error::KnowledgeResult;
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25    CachePolicy, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor, QueryRequest,
26    QueryResponse, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29    CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30    telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34    db: MemDB,
35    metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41        ThreadClonedMDB::query_with_scope(self, sql)
42    }
43
44    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45        ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46    }
47
48    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49        ThreadClonedMDB::query_row_with_scope(self, sql)
50    }
51
52    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53        ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54    }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60        self.db.query_with_scope(&self.metadata_scope, sql)
61    }
62
63    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64        self.db
65            .query_fields_with_scope(&self.metadata_scope, sql, params)
66    }
67
68    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69        self.db.query_row_with_scope(&self.metadata_scope, sql)
70    }
71
72    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73        self.db
74            .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75    }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81        PostgresProvider::query(self, sql)
82    }
83
84    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85        PostgresProvider::query_fields(self, sql, params)
86    }
87
88    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89        PostgresProvider::query_row(self, sql)
90    }
91
92    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93        PostgresProvider::query_named_fields(self, sql, params)
94    }
95
96    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97        PostgresProvider::query_async(self, sql).await
98    }
99
100    async fn query_fields_async(
101        &self,
102        sql: &str,
103        params: &[DataField],
104    ) -> KnowledgeResult<Vec<RowData>> {
105        PostgresProvider::query_fields_async(self, sql, params).await
106    }
107
108    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109        PostgresProvider::query_row_async(self, sql).await
110    }
111
112    async fn query_named_fields_async(
113        &self,
114        sql: &str,
115        params: &[DataField],
116    ) -> KnowledgeResult<RowData> {
117        PostgresProvider::query_named_fields_async(self, sql, params).await
118    }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124        MySqlProvider::query(self, sql)
125    }
126
127    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128        MySqlProvider::query_fields(self, sql, params)
129    }
130
131    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132        MySqlProvider::query_row(self, sql)
133    }
134
135    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136        MySqlProvider::query_named_fields(self, sql, params)
137    }
138
139    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140        MySqlProvider::query_async(self, sql).await
141    }
142
143    async fn query_fields_async(
144        &self,
145        sql: &str,
146        params: &[DataField],
147    ) -> KnowledgeResult<Vec<RowData>> {
148        MySqlProvider::query_fields_async(self, sql, params).await
149    }
150
151    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152        MySqlProvider::query_row_async(self, sql).await
153    }
154
155    async fn query_named_fields_async(
156        &self,
157        sql: &str,
158        params: &[DataField],
159    ) -> KnowledgeResult<RowData> {
160        MySqlProvider::query_named_fields_async(self, sql, params).await
161    }
162}
163
164fn install_provider<F>(
165    kind: ProviderKind,
166    datasource_id: DatasourceId,
167    build: F,
168) -> KnowledgeResult<()>
169where
170    F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172    runtime().install_provider(kind, datasource_id, build)?;
173    Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177    DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181    let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182    install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183        Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184            authority_uri,
185            datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186            generation.0,
187        )))
188    })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192    install_provider(
193        ProviderKind::SqliteAuthority,
194        datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195        |generation| {
196            Ok(Arc::new(MemProvider {
197                db: memdb,
198                metadata_scope: MetadataCacheScope {
199                    datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200                    generation,
201                },
202            }))
203        },
204    )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208    init_postgres_provider_with_config(
209        PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
210    )
211}
212
213pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
214    let connection_uri = config.connection_uri().to_string();
215    let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
216    install_provider(
217        ProviderKind::Postgres,
218        datasource_id.clone(),
219        |generation| {
220            let provider = PostgresProvider::connect(
221                &config,
222                MetadataCacheScope {
223                    datasource_id: datasource_id.clone(),
224                    generation,
225                },
226            )?;
227            Ok(Arc::new(provider))
228        },
229    )
230}
231
232pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
233    init_mysql_provider_with_config(
234        MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
235    )
236}
237
238pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
239    let connection_uri = config.connection_uri().to_string();
240    let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
241    install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
242        let provider = MySqlProvider::connect(
243            &config,
244            MetadataCacheScope {
245                datasource_id: datasource_id.clone(),
246                generation,
247            },
248        )?;
249        Ok(Arc::new(provider))
250    })
251}
252
253pub fn current_generation() -> Option<u64> {
254    runtime()
255        .current_generation()
256        .map(|generation| generation.0)
257}
258
259pub fn runtime_snapshot() -> RuntimeSnapshot {
260    runtime().snapshot()
261}
262
263pub fn install_runtime_telemetry(
264    telemetry_impl: Arc<dyn KnowledgeTelemetry>,
265) -> Arc<dyn KnowledgeTelemetry> {
266    install_telemetry(telemetry_impl)
267}
268
269pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
270    runtime()
271        .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
272        .map(QueryResponse::into_rows)
273}
274
275pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
276    runtime()
277        .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
278        .await
279        .map(QueryResponse::into_rows)
280}
281
282pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
283    runtime()
284        .execute(&QueryRequest::first_row(
285            sql,
286            Vec::new(),
287            CachePolicy::Bypass,
288        ))
289        .map(QueryResponse::into_row)
290}
291
292pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
293    runtime()
294        .execute_async(&QueryRequest::first_row(
295            sql,
296            Vec::new(),
297            CachePolicy::Bypass,
298        ))
299        .await
300        .map(QueryResponse::into_row)
301}
302
303pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
304    runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
305}
306
307pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
308    runtime()
309        .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
310        .await
311}
312
313pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
314    query_fields(sql, params)
315}
316
317pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
318    query_fields_async(sql, params).await
319}
320
321pub fn query_named<'a>(
322    sql: &str,
323    params: &'a [(&'a str, &'a dyn ToSql)],
324) -> KnowledgeResult<RowData> {
325    let fields = named_params_to_fields(params)?;
326    query_fields(sql, &fields)
327}
328
329pub fn cache_query_fields<const N: usize>(
330    sql: &str,
331    c_params: &[DataField; N],
332    query_params: &[DataField],
333    cache: &mut impl CacheAble<DataField, RowData, N>,
334) -> RowData {
335    cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
336}
337
338pub fn cache_query_fields_with_scope<const N: usize>(
339    sql: &str,
340    local_cache_scope: u64,
341    c_params: &[DataField; N],
342    query_params: &[DataField],
343    cache: &mut impl CacheAble<DataField, RowData, N>,
344) -> RowData {
345    if let Some(generation) = current_generation() {
346        cache.prepare_generation(generation);
347    }
348    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
349        runtime().record_local_cache_hit();
350        if telemetry_enabled() {
351            telemetry().on_cache(&CacheTelemetryEvent {
352                layer: CacheLayer::Local,
353                outcome: CacheOutcome::Hit,
354                provider_kind: runtime().current_provider_kind(),
355            });
356        }
357        return hit.clone();
358    }
359    runtime().record_local_cache_miss();
360    if telemetry_enabled() {
361        telemetry().on_cache(&CacheTelemetryEvent {
362            layer: CacheLayer::Local,
363            outcome: CacheOutcome::Miss,
364            provider_kind: runtime().current_provider_kind(),
365        });
366    }
367
368    match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
369        Ok(row) => {
370            cache.save_scoped(local_cache_scope, c_params, row.clone());
371            row
372        }
373        Err(err) => {
374            warn_kdb!("[kdb] query error: {}", err);
375            Vec::new()
376        }
377    }
378}
379
380pub async fn cache_query_fields_async<const N: usize>(
381    sql: &str,
382    c_params: &[DataField; N],
383    query_params: &[DataField],
384    cache: &mut impl CacheAble<DataField, RowData, N>,
385) -> RowData {
386    cache_query_fields_async_with_scope(
387        sql,
388        stable_hash(sql),
389        c_params,
390        || query_params.to_vec(),
391        cache,
392    )
393    .await
394}
395
396pub async fn cache_query_fields_async_with<const N: usize, F>(
397    sql: &str,
398    c_params: &[DataField; N],
399    build_query_params: F,
400    cache: &mut impl CacheAble<DataField, RowData, N>,
401) -> RowData
402where
403    F: FnOnce() -> Vec<DataField>,
404{
405    cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
406        .await
407}
408
409pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
410    sql: &str,
411    local_cache_scope: u64,
412    c_params: &[DataField; N],
413    build_query_params: F,
414    cache: &mut impl CacheAble<DataField, RowData, N>,
415) -> RowData
416where
417    F: FnOnce() -> Vec<DataField>,
418{
419    if let Some(generation) = current_generation() {
420        cache.prepare_generation(generation);
421    }
422    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
423        runtime().record_local_cache_hit();
424        if telemetry_enabled() {
425            telemetry().on_cache(&CacheTelemetryEvent {
426                layer: CacheLayer::Local,
427                outcome: CacheOutcome::Hit,
428                provider_kind: runtime().current_provider_kind(),
429            });
430        }
431        return hit.clone();
432    }
433    runtime().record_local_cache_miss();
434    if telemetry_enabled() {
435        telemetry().on_cache(&CacheTelemetryEvent {
436            layer: CacheLayer::Local,
437            outcome: CacheOutcome::Miss,
438            provider_kind: runtime().current_provider_kind(),
439        });
440    }
441
442    let query_params = build_query_params();
443    match runtime()
444        .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
445        .await
446    {
447        Ok(row) => {
448            cache.save_scoped(local_cache_scope, c_params, row.clone());
449            row
450        }
451        Err(err) => {
452            warn_kdb!("[kdb] query error: {}", err);
453            Vec::new()
454        }
455    }
456}
457
458pub fn cache_query<const N: usize>(
459    sql: &str,
460    c_params: &[DataField; N],
461    named_params: &[(&str, &dyn ToSql)],
462    cache: &mut impl CacheAble<DataField, RowData, N>,
463) -> RowData {
464    let query_fields = match named_params_to_fields(named_params) {
465        Ok(fields) => fields,
466        Err(err) => {
467            warn_kdb!("[kdb] query param conversion error: {}", err);
468            return Vec::new();
469        }
470    };
471
472    cache_query_fields(sql, c_params, &query_fields, cache)
473}
474
475pub async fn cache_query_async<const N: usize>(
476    sql: &str,
477    c_params: &[DataField; N],
478    named_params: &[(&str, &dyn ToSql)],
479    cache: &mut impl CacheAble<DataField, RowData, N>,
480) -> RowData {
481    let query_fields = match named_params_to_fields(named_params) {
482        Ok(fields) => fields,
483        Err(err) => {
484            warn_kdb!("[kdb] query param conversion error: {}", err);
485            return Vec::new();
486        }
487    };
488
489    cache_query_fields_async(sql, c_params, &query_fields, cache).await
490}
491
492fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
493    if let Ok(conn) = Connection::open_with_flags(
494        authority_uri,
495        OpenFlags::SQLITE_OPEN_READ_WRITE
496            | OpenFlags::SQLITE_OPEN_CREATE
497            | OpenFlags::SQLITE_OPEN_URI,
498    ) {
499        let _ = conn.execute_batch(
500            "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
501        );
502    }
503    Ok(())
504}
505
506pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
507    ensure_wal(authority_uri)?;
508    let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
509    let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
510    init_mem_provider(mem)
511}
512
513pub fn init_thread_cloned_from_knowdb(
514    root: &Path,
515    knowdb_conf: &Path,
516    authority_uri: &str,
517    dict: &orion_variate::EnvDict,
518) -> KnowledgeResult<()> {
519    let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
520    if let Some(provider) = conf.provider {
521        match provider.kind {
522            ProviderKind::Postgres => {
523                info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
524                init_postgres_provider_with_config(
525                    PostgresProviderConfig::new(provider.connection_uri)
526                        .with_pool_size(provider.pool_size)
527                        .with_min_connections(provider.min_connections)
528                        .with_acquire_timeout_ms(provider.acquire_timeout_ms)
529                        .with_idle_timeout_ms(provider.idle_timeout_ms)
530                        .with_max_lifetime_ms(provider.max_lifetime_ms),
531                )?;
532                runtime().configure_result_cache(
533                    conf.cache.enabled,
534                    conf.cache.capacity,
535                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
536                );
537                return Ok(());
538            }
539            ProviderKind::Mysql => {
540                info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
541                init_mysql_provider_with_config(
542                    MySqlProviderConfig::new(provider.connection_uri)
543                        .with_pool_size(provider.pool_size)
544                        .with_min_connections(provider.min_connections)
545                        .with_acquire_timeout_ms(provider.acquire_timeout_ms)
546                        .with_idle_timeout_ms(provider.idle_timeout_ms)
547                        .with_max_lifetime_ms(provider.max_lifetime_ms),
548                )?;
549                runtime().configure_result_cache(
550                    conf.cache.enabled,
551                    conf.cache.capacity,
552                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
553                );
554                return Ok(());
555            }
556            ProviderKind::SqliteAuthority => {}
557        }
558    }
559
560    crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
561    let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
562        let path_part = rest.split('?').next().unwrap_or(rest);
563        format!("file:{}?mode=ro&uri=true", path_part)
564    } else {
565        authority_uri.to_string()
566    };
567
568    info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
569    init_thread_cloned_from_authority(&ro_uri)?;
570    runtime().configure_result_cache(
571        conf.cache.enabled,
572        conf.cache.capacity,
573        Duration::from_millis(conf.cache.ttl_ms.max(1)),
574    );
575    Ok(())
576}
577
578fn stable_hash(value: &str) -> u64 {
579    let mut hasher = DefaultHasher::new();
580    value.hash(&mut hasher);
581    hasher.finish()
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use crate::cache::FieldQueryCache;
588    use crate::mem::memdb::MemDB;
589    use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
590    use crate::runtime::fields_to_params;
591    use crate::telemetry::{
592        CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
593        ReloadTelemetryEvent, reset_telemetry,
594    };
595    use orion_error::{ToStructError, UvsFrom};
596    use orion_variate::EnvDict;
597    use std::fs;
598    use std::hint::black_box;
599    use std::path::PathBuf;
600    use std::sync::atomic::{AtomicU64, Ordering};
601    use std::time::{Duration, Instant};
602    use wp_error::KnowledgeReason;
603
604    #[derive(Default)]
605    struct TestTelemetry {
606        reload_success: AtomicU64,
607        reload_failure: AtomicU64,
608        local_hits: AtomicU64,
609        local_misses: AtomicU64,
610        result_hits: AtomicU64,
611        result_misses: AtomicU64,
612        metadata_hits: AtomicU64,
613        metadata_misses: AtomicU64,
614        query_success: AtomicU64,
615        query_failure: AtomicU64,
616    }
617
618    impl KnowledgeTelemetry for TestTelemetry {
619        fn on_cache(&self, event: &CacheTelemetryEvent) {
620            let counter = match (event.layer, event.outcome) {
621                (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
622                (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
623                (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
624                (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
625                (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
626                (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
627            };
628            counter.fetch_add(1, Ordering::Relaxed);
629        }
630
631        fn on_reload(&self, event: &ReloadTelemetryEvent) {
632            let counter = match event.outcome {
633                crate::telemetry::ReloadOutcome::Success => &self.reload_success,
634                crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
635            };
636            counter.fetch_add(1, Ordering::Relaxed);
637        }
638
639        fn on_query(&self, event: &QueryTelemetryEvent) {
640            let counter = if event.success {
641                &self.query_success
642            } else {
643                &self.query_failure
644            };
645            counter.fetch_add(1, Ordering::Relaxed);
646        }
647    }
648
649    fn perf_env_usize(key: &str, default: usize) -> usize {
650        std::env::var(key)
651            .ok()
652            .and_then(|value| value.parse::<usize>().ok())
653            .unwrap_or(default)
654    }
655
656    fn seed_perf_provider(rows: usize) {
657        let db = MemDB::instance();
658        db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
659            .expect("create perf_kv");
660        db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
661        for id in 1..=rows {
662            let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
663            db.execute(sql.as_str()).expect("insert perf_kv row");
664        }
665        db.execute("COMMIT").expect("commit perf_kv load");
666        init_mem_provider(db).expect("init perf provider");
667    }
668
669    #[tokio::test(flavor = "current_thread")]
670    async fn query_async_works_with_mem_provider() {
671        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
672        let db = MemDB::instance();
673        db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
674            .expect("create async_kv");
675        db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
676            .expect("insert async_kv row");
677        init_mem_provider(db).expect("init mem provider");
678
679        let row = query_fields_async(
680            "SELECT value FROM async_kv WHERE id=:id",
681            &[DataField::from_digit(":id", 1)],
682        )
683        .await
684        .expect("query async row");
685        assert_eq!(row.len(), 1);
686        assert_eq!(row[0].to_string(), "chars(hello)");
687    }
688
689    #[derive(Clone)]
690    struct PerfQuery {
691        cache_key: [DataField; 1],
692        query_params: [DataField; 1],
693        bypass_req: QueryRequest,
694        global_req: QueryRequest,
695    }
696
697    fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
698        (0..ops)
699            .map(|idx| {
700                let id = ((idx * 17) % hotset + 1) as i64;
701                let cache_key = [DataField::from_digit("id", id)];
702                let query_params = [DataField::from_digit(":id", id)];
703                let bypass_req = QueryRequest::first_row(
704                    "SELECT value FROM perf_kv WHERE id=:id",
705                    fields_to_params(&query_params),
706                    CachePolicy::Bypass,
707                );
708                let global_req = QueryRequest::first_row(
709                    "SELECT value FROM perf_kv WHERE id=:id",
710                    fields_to_params(&query_params),
711                    CachePolicy::UseGlobal,
712                );
713                PerfQuery {
714                    cache_key,
715                    query_params,
716                    bypass_req,
717                    global_req,
718                }
719            })
720            .collect()
721    }
722
723    #[derive(Debug, Clone, Copy)]
724    struct PerfCounters {
725        result_hits: u64,
726        result_misses: u64,
727        local_hits: u64,
728        local_misses: u64,
729        metadata_hits: u64,
730        metadata_misses: u64,
731    }
732
733    #[derive(Debug, Clone)]
734    struct PerfResult {
735        name: &'static str,
736        elapsed: Duration,
737        ops: usize,
738        counters: PerfCounters,
739    }
740
741    impl PerfResult {
742        fn qps(&self) -> f64 {
743            let secs = self.elapsed.as_secs_f64();
744            if secs == 0.0 {
745                self.ops as f64
746            } else {
747                self.ops as f64 / secs
748            }
749        }
750    }
751
752    fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
753        PerfCounters {
754            result_hits: after
755                .result_cache_hits
756                .saturating_sub(before.result_cache_hits),
757            result_misses: after
758                .result_cache_misses
759                .saturating_sub(before.result_cache_misses),
760            local_hits: after
761                .local_cache_hits
762                .saturating_sub(before.local_cache_hits),
763            local_misses: after
764                .local_cache_misses
765                .saturating_sub(before.local_cache_misses),
766            metadata_hits: after
767                .metadata_cache_hits
768                .saturating_sub(before.metadata_cache_hits),
769            metadata_misses: after
770                .metadata_cache_misses
771                .saturating_sub(before.metadata_cache_misses),
772        }
773    }
774
775    fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
776        seed_perf_provider(rows);
777        let before = runtime_snapshot();
778        let started = Instant::now();
779        for item in workload {
780            let row = runtime()
781                .execute(&item.bypass_req)
782                .expect("execute bypass request")
783                .into_row();
784            black_box(row);
785        }
786        let elapsed = started.elapsed();
787        let after = runtime_snapshot();
788        PerfResult {
789            name: "bypass",
790            elapsed,
791            ops: workload.len(),
792            counters: snapshot_delta(&before, &after),
793        }
794    }
795
796    fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
797        seed_perf_provider(rows);
798        let before = runtime_snapshot();
799        let started = Instant::now();
800        for item in workload {
801            let row = runtime()
802                .execute(&item.global_req)
803                .expect("execute global-cache request")
804                .into_row();
805            black_box(row);
806        }
807        let elapsed = started.elapsed();
808        let after = runtime_snapshot();
809        PerfResult {
810            name: "global_cache",
811            elapsed,
812            ops: workload.len(),
813            counters: snapshot_delta(&before, &after),
814        }
815    }
816
817    fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
818        seed_perf_provider(rows);
819        let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
820        let before = runtime_snapshot();
821        let started = Instant::now();
822        for item in workload {
823            let row = cache_query_fields(
824                "SELECT value FROM perf_kv WHERE id=:id",
825                &item.cache_key,
826                &item.query_params,
827                &mut cache,
828            );
829            black_box(row);
830        }
831        let elapsed = started.elapsed();
832        let after = runtime_snapshot();
833        PerfResult {
834            name: "local_cache",
835            elapsed,
836            ops: workload.len(),
837            counters: snapshot_delta(&before, &after),
838        }
839    }
840
841    fn print_perf_result(result: &PerfResult) {
842        eprintln!(
843            "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
844            result.name,
845            result.elapsed.as_millis(),
846            result.qps(),
847            result.counters.result_hits,
848            result.counters.result_misses,
849            result.counters.local_hits,
850            result.counters.local_misses,
851            result.counters.metadata_hits,
852            result.counters.metadata_misses,
853        );
854    }
855
856    fn uniq_cache_cfg_tmp_dir() -> PathBuf {
857        use rand::{Rng, rng};
858        let rnd: u64 = rng().next_u64();
859        std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
860    }
861
862    fn write_minimal_knowdb_with_cache(
863        root: &Path,
864        enabled: bool,
865        capacity: usize,
866        ttl_ms: u64,
867    ) -> std::path::PathBuf {
868        let models = root.join("models").join("knowledge");
869        let example_dir = models.join("example");
870        fs::create_dir_all(&example_dir).expect("create knowdb models/example");
871        fs::write(
872            models.join("knowdb.toml"),
873            format!(
874                r#"
875version = 2
876base_dir = "."
877
878[cache]
879enabled = {enabled}
880capacity = {capacity}
881ttl_ms = {ttl_ms}
882
883[csv]
884has_header = false
885
886[[tables]]
887name = "example"
888columns.by_index = [0,1]
889"#
890            ),
891        )
892        .expect("write knowdb.toml");
893        fs::write(
894            example_dir.join("create.sql"),
895            r#"
896CREATE TABLE IF NOT EXISTS {table} (
897  id INTEGER PRIMARY KEY,
898  value TEXT NOT NULL
899);
900"#,
901        )
902        .expect("write create.sql");
903        fs::write(
904            example_dir.join("insert.sql"),
905            "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
906        )
907        .expect("write insert.sql");
908        fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
909        models.join("knowdb.toml")
910    }
911
912    fn write_provider_only_knowdb_with_cache(
913        root: &Path,
914        provider_kind: &str,
915        connection_uri: &str,
916        enabled: bool,
917        capacity: usize,
918        ttl_ms: u64,
919    ) -> std::path::PathBuf {
920        let models = root.join("models").join("knowledge");
921        fs::create_dir_all(&models).expect("create knowdb models");
922        fs::write(
923            models.join("knowdb.toml"),
924            format!(
925                r#"
926version = 2
927base_dir = "."
928
929[cache]
930enabled = {enabled}
931capacity = {capacity}
932ttl_ms = {ttl_ms}
933
934[provider]
935kind = "{provider_kind}"
936connection_uri = "{connection_uri}"
937"#
938            ),
939        )
940        .expect("write provider knowdb.toml");
941        models.join("knowdb.toml")
942    }
943
944    fn restore_default_result_cache_config() {
945        runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
946    }
947
948    #[test]
949    fn provider_can_be_replaced() {
950        let _guard = crate::runtime::runtime_test_guard()
951            .lock()
952            .expect("provider test guard");
953        let db1 = MemDB::instance();
954        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
955            .expect("create table in db1");
956        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
957            .expect("seed db1");
958        init_mem_provider(db1).expect("init provider db1");
959        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
960        assert_eq!(row[0].to_string(), "chars(first)");
961
962        let db2 = MemDB::instance();
963        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
964            .expect("create table in db2");
965        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
966            .expect("seed db2");
967        init_mem_provider(db2).expect("replace provider with db2");
968        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
969        assert_eq!(row[0].to_string(), "chars(second)");
970    }
971
972    #[test]
973    fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
974        let _guard = crate::runtime::runtime_test_guard()
975            .lock()
976            .expect("provider test guard");
977        COLNAME_CACHE.write().expect("metadata cache lock").clear();
978
979        let db = MemDB::instance();
980        db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
981            .expect("create table");
982        db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
983            .expect("seed table");
984
985        let old_scope = MetadataCacheScope {
986            datasource_id: DatasourceId("sqlite:old".to_string()),
987            generation: Generation(1),
988        };
989        let new_scope = MetadataCacheScope {
990            datasource_id: DatasourceId("sqlite:new".to_string()),
991            generation: Generation(2),
992        };
993        let old_provider = MemProvider {
994            db: db.clone(),
995            metadata_scope: old_scope.clone(),
996        };
997
998        install_provider(
999            ProviderKind::SqliteAuthority,
1000            new_scope.datasource_id.clone(),
1001            |_generation| {
1002                Ok(Arc::new(MemProvider {
1003                    db: db.clone(),
1004                    metadata_scope: new_scope.clone(),
1005                }))
1006            },
1007        )
1008        .expect("install new provider");
1009
1010        let row = old_provider
1011            .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1012            .expect("old provider query");
1013        assert_eq!(row[0].to_string(), "chars(scope-old)");
1014
1015        let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1016        assert!(cache.contains(&metadata_cache_key_for_scope(
1017            &old_scope,
1018            "SELECT value FROM cache_scope_t WHERE id = 1",
1019        )));
1020        assert!(!cache.contains(&metadata_cache_key_for_scope(
1021            &new_scope,
1022            "SELECT value FROM cache_scope_t WHERE id = 1",
1023        )));
1024    }
1025
1026    #[tokio::test(flavor = "current_thread")]
1027    async fn async_query_uses_runtime_bridge() {
1028        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1029        let db = MemDB::instance();
1030        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1031            .expect("create table");
1032        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1033            .expect("seed table");
1034        init_mem_provider(db).expect("init provider");
1035
1036        let row = query_row_async("SELECT value FROM t WHERE id = 1")
1037            .await
1038            .expect("async query row");
1039        assert_eq!(row[0].to_string(), "chars(async-first)");
1040    }
1041
1042    #[tokio::test(flavor = "current_thread")]
1043    async fn async_cache_query_fields_hits_local_cache() {
1044        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1045        let db = MemDB::instance();
1046        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1047            .expect("create table");
1048        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1049            .expect("seed table");
1050        init_mem_provider(db).expect("init provider");
1051
1052        let key = [DataField::from_digit("id", 1)];
1053        let params = [DataField::from_digit(":id", 1)];
1054        let mut cache = FieldQueryCache::default();
1055
1056        let first = cache_query_fields_async(
1057            "SELECT value FROM t WHERE id=:id",
1058            &key,
1059            &params,
1060            &mut cache,
1061        )
1062        .await;
1063        let second = cache_query_fields_async(
1064            "SELECT value FROM t WHERE id=:id",
1065            &key,
1066            &params,
1067            &mut cache,
1068        )
1069        .await;
1070
1071        assert_eq!(first[0].to_string(), "chars(async-cache)");
1072        assert_eq!(second[0].to_string(), "chars(async-cache)");
1073    }
1074
1075    #[test]
1076    fn local_cache_is_cleared_when_generation_changes() {
1077        let _guard = crate::runtime::runtime_test_guard()
1078            .lock()
1079            .expect("provider test guard");
1080        let db1 = MemDB::instance();
1081        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1082            .expect("create table in db1");
1083        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1084            .expect("seed db1");
1085        init_mem_provider(db1).expect("init provider db1");
1086
1087        let key = [DataField::from_digit("id", 1)];
1088        let params = [DataField::from_digit(":id", 1)];
1089        let mut cache = FieldQueryCache::default();
1090        let row = cache_query_fields(
1091            "SELECT value FROM t WHERE id=:id",
1092            &key,
1093            &params,
1094            &mut cache,
1095        );
1096        assert_eq!(row[0].to_string(), "chars(first)");
1097
1098        let db2 = MemDB::instance();
1099        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1100            .expect("create table in db2");
1101        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1102            .expect("seed db2");
1103        init_mem_provider(db2).expect("replace provider with db2");
1104
1105        let row = cache_query_fields(
1106            "SELECT value FROM t WHERE id=:id",
1107            &key,
1108            &params,
1109            &mut cache,
1110        );
1111        assert_eq!(row[0].to_string(), "chars(second)");
1112    }
1113
1114    #[test]
1115    fn local_cache_is_scoped_by_sql_text() {
1116        let _guard = crate::runtime::runtime_test_guard()
1117            .lock()
1118            .expect("provider test guard");
1119        let db = MemDB::instance();
1120        db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1121            .expect("create t1");
1122        db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1123            .expect("create t2");
1124        db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1125            .expect("seed t1");
1126        db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1127            .expect("seed t2");
1128        init_mem_provider(db).expect("init provider");
1129
1130        let key = [DataField::from_digit("id", 1)];
1131        let params = [DataField::from_digit(":id", 1)];
1132        let mut cache = FieldQueryCache::default();
1133
1134        let row = cache_query_fields(
1135            "SELECT value FROM t1 WHERE id=:id",
1136            &key,
1137            &params,
1138            &mut cache,
1139        );
1140        assert_eq!(row[0].to_string(), "chars(first)");
1141
1142        let row = cache_query_fields(
1143            "SELECT value FROM t2 WHERE id=:id",
1144            &key,
1145            &params,
1146            &mut cache,
1147        );
1148        assert_eq!(row[0].to_string(), "chars(second)");
1149    }
1150
1151    #[test]
1152    fn runtime_snapshot_tracks_generation_and_cache_size() {
1153        let _guard = crate::runtime::runtime_test_guard()
1154            .lock()
1155            .expect("provider test guard");
1156        let db = MemDB::instance();
1157        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1158            .expect("create table");
1159        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1160            .expect("seed table");
1161        init_mem_provider(db).expect("init provider");
1162
1163        let mut cache = FieldQueryCache::default();
1164        let key = [DataField::from_digit("id", 1)];
1165        let params = [DataField::from_digit(":id", 1)];
1166        let row = cache_query_fields(
1167            "SELECT value FROM t WHERE id=:id",
1168            &key,
1169            &params,
1170            &mut cache,
1171        );
1172        assert_eq!(row[0].to_string(), "chars(first)");
1173
1174        let snapshot = runtime_snapshot();
1175        assert!(matches!(
1176            snapshot.provider_kind,
1177            Some(ProviderKind::SqliteAuthority)
1178        ));
1179        assert!(snapshot.generation.is_some());
1180        assert!(snapshot.result_cache_len >= 1);
1181        assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1182        assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1183        assert!(snapshot.reload_successes >= 1);
1184    }
1185
1186    #[test]
1187    fn metadata_cache_is_scoped_by_generation() {
1188        let _guard = crate::runtime::runtime_test_guard()
1189            .lock()
1190            .expect("provider test guard");
1191        let sql = "SELECT value FROM t WHERE id = 1";
1192        let before = runtime_snapshot().metadata_cache_len;
1193
1194        let db1 = MemDB::instance();
1195        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1196            .expect("create table in db1");
1197        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1198            .expect("seed db1");
1199        init_mem_provider(db1).expect("init provider db1");
1200        let row = query_row(sql).expect("query db1");
1201        assert_eq!(row[0].to_string(), "chars(first)");
1202        let after_first = runtime_snapshot().metadata_cache_len;
1203        assert!(
1204            after_first > before,
1205            "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1206        );
1207
1208        let db2 = MemDB::instance();
1209        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1210            .expect("create table in db2");
1211        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1212            .expect("seed db2");
1213        init_mem_provider(db2).expect("replace provider with db2");
1214        let row = query_row(sql).expect("query db2");
1215        assert_eq!(row[0].to_string(), "chars(second)");
1216        let after_second = runtime_snapshot().metadata_cache_len;
1217        assert!(
1218            after_second > after_first,
1219            "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1220        );
1221    }
1222
1223    #[test]
1224    fn failed_provider_reload_keeps_previous_provider() {
1225        let _guard = crate::runtime::runtime_test_guard()
1226            .lock()
1227            .expect("provider test guard");
1228        let db1 = MemDB::instance();
1229        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1230            .expect("create table in db1");
1231        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1232            .expect("seed db1");
1233        init_mem_provider(db1).expect("init provider db1");
1234        let before_generation = current_generation();
1235
1236        let reload_err = install_provider(
1237            ProviderKind::SqliteAuthority,
1238            datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1239            |_generation| {
1240                Err(KnowledgeReason::from_logic()
1241                    .to_err()
1242                    .with_detail("expected reload failure"))
1243            },
1244        );
1245        assert!(reload_err.is_err());
1246
1247        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1248        assert_eq!(row[0].to_string(), "chars(first)");
1249        assert_eq!(current_generation(), before_generation);
1250    }
1251
1252    #[test]
1253    fn runtime_snapshot_records_cache_counters() {
1254        let _guard = crate::runtime::runtime_test_guard()
1255            .lock()
1256            .expect("provider test guard");
1257        let db = MemDB::instance();
1258        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1259            .expect("create table");
1260        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1261            .expect("seed table");
1262        init_mem_provider(db).expect("init provider");
1263
1264        let before = runtime_snapshot();
1265        let mut cache = FieldQueryCache::default();
1266        let key = [DataField::from_digit("id", 1)];
1267        let params = [DataField::from_digit(":id", 1)];
1268        let row = cache_query_fields(
1269            "SELECT value FROM t WHERE id=:id",
1270            &key,
1271            &params,
1272            &mut cache,
1273        );
1274        assert_eq!(row[0].to_string(), "chars(first)");
1275        let row = cache_query_fields(
1276            "SELECT value FROM t WHERE id=:id",
1277            &key,
1278            &params,
1279            &mut cache,
1280        );
1281        assert_eq!(row[0].to_string(), "chars(first)");
1282
1283        let after = runtime_snapshot();
1284        assert!(after.local_cache_hits > before.local_cache_hits);
1285        assert!(after.local_cache_misses > before.local_cache_misses);
1286        assert!(after.result_cache_misses > before.result_cache_misses);
1287        assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1288    }
1289
1290    #[test]
1291    fn telemetry_receives_reload_cache_and_query_events() {
1292        let _guard = crate::runtime::runtime_test_guard()
1293            .lock()
1294            .expect("provider test guard");
1295        let telemetry_impl = Arc::new(TestTelemetry::default());
1296        let previous = install_runtime_telemetry(telemetry_impl.clone());
1297
1298        let db = MemDB::instance();
1299        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1300            .expect("create table");
1301        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1302            .expect("seed table");
1303        init_mem_provider(db).expect("init provider");
1304
1305        let mut cache = FieldQueryCache::default();
1306        let key = [DataField::from_digit("id", 1)];
1307        let params = [DataField::from_digit(":id", 1)];
1308        let row = cache_query_fields(
1309            "SELECT value FROM t WHERE id=:id",
1310            &key,
1311            &params,
1312            &mut cache,
1313        );
1314        assert_eq!(row[0].to_string(), "chars(first)");
1315        let row = cache_query_fields(
1316            "SELECT value FROM t WHERE id=:id",
1317            &key,
1318            &params,
1319            &mut cache,
1320        );
1321        assert_eq!(row[0].to_string(), "chars(first)");
1322
1323        let reload_err = install_provider(
1324            ProviderKind::SqliteAuthority,
1325            datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1326            |_generation| {
1327                Err(KnowledgeReason::from_logic()
1328                    .to_err()
1329                    .with_detail("expected telemetry reload failure"))
1330            },
1331        );
1332        assert!(reload_err.is_err());
1333
1334        install_runtime_telemetry(previous);
1335        reset_telemetry();
1336
1337        assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1338        assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1339        assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1340        assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1341        assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1342        assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1343        assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1344    }
1345
1346    #[test]
1347    #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1348    fn cache_perf_reports_cache_vs_no_cache() {
1349        let _guard = crate::runtime::runtime_test_guard()
1350            .lock()
1351            .expect("provider test guard");
1352        let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1353        let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1354        let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1355        let workload = build_perf_workload(ops, hotset);
1356
1357        eprintln!(
1358            "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1359            rows, ops, hotset
1360        );
1361
1362        let bypass = run_bypass_perf(&workload, rows);
1363        let global = run_global_cache_perf(&workload, rows);
1364        let local = run_local_cache_perf(&workload, rows);
1365
1366        print_perf_result(&bypass);
1367        print_perf_result(&global);
1368        print_perf_result(&local);
1369
1370        eprintln!(
1371            "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1372            bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1373            bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1374        );
1375
1376        assert_eq!(bypass.counters.result_hits, 0);
1377        assert_eq!(bypass.counters.result_misses, 0);
1378        assert_eq!(bypass.counters.local_hits, 0);
1379        assert_eq!(bypass.counters.local_misses, 0);
1380        assert!(global.counters.result_hits > 0);
1381        assert!(global.counters.result_misses > 0);
1382        assert_eq!(global.counters.local_hits, 0);
1383        assert_eq!(global.counters.local_misses, 0);
1384        assert!(local.counters.local_hits > 0);
1385        assert!(local.counters.local_misses > 0);
1386        assert!(local.counters.result_misses > 0);
1387    }
1388
1389    #[test]
1390    fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1391        let _guard = crate::runtime::runtime_test_guard()
1392            .lock()
1393            .expect("provider test guard");
1394        let root = uniq_cache_cfg_tmp_dir();
1395        let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1396        let auth_file = root.join(".run").join("authority.sqlite");
1397        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1398            .expect("create authority parent");
1399        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1400
1401        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1402            .expect("init knowdb with cache config");
1403
1404        let snapshot = runtime_snapshot();
1405        assert!(snapshot.result_cache_enabled);
1406        assert_eq!(snapshot.result_cache_capacity, 7);
1407        assert_eq!(snapshot.result_cache_ttl_ms, 5);
1408
1409        let req = QueryRequest::first_row(
1410            "SELECT value FROM example WHERE id=:id",
1411            fields_to_params(&[DataField::from_digit(":id", 1)]),
1412            CachePolicy::UseGlobal,
1413        );
1414        let before = runtime_snapshot();
1415        let row = runtime()
1416            .execute(&req)
1417            .expect("first result-cache query")
1418            .into_row();
1419        assert_eq!(row[0].to_string(), "chars(alpha)");
1420        let row = runtime()
1421            .execute(&req)
1422            .expect("second result-cache query")
1423            .into_row();
1424        assert_eq!(row[0].to_string(), "chars(alpha)");
1425        std::thread::sleep(Duration::from_millis(12));
1426        let row = runtime()
1427            .execute(&req)
1428            .expect("expired result-cache query")
1429            .into_row();
1430        assert_eq!(row[0].to_string(), "chars(alpha)");
1431        let after = runtime_snapshot();
1432
1433        assert!(after.result_cache_hits > before.result_cache_hits);
1434        assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1435
1436        restore_default_result_cache_config();
1437        let _ = fs::remove_dir_all(&root);
1438    }
1439
1440    #[test]
1441    fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1442        let _guard = crate::runtime::runtime_test_guard()
1443            .lock()
1444            .expect("provider test guard");
1445        let root = uniq_cache_cfg_tmp_dir();
1446        let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1447        let auth_file = root.join(".run").join("authority.sqlite");
1448        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1449            .expect("create authority parent");
1450        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1451
1452        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1453            .expect("init knowdb with cache disabled");
1454
1455        let snapshot = runtime_snapshot();
1456        assert!(!snapshot.result_cache_enabled);
1457        assert_eq!(snapshot.result_cache_capacity, 3);
1458        assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1459
1460        let req = QueryRequest::first_row(
1461            "SELECT value FROM example WHERE id=:id",
1462            fields_to_params(&[DataField::from_digit(":id", 1)]),
1463            CachePolicy::UseGlobal,
1464        );
1465        let before = runtime_snapshot();
1466        let _ = runtime()
1467            .execute(&req)
1468            .expect("first bypassed result-cache query");
1469        let _ = runtime()
1470            .execute(&req)
1471            .expect("second bypassed result-cache query");
1472        let after = runtime_snapshot();
1473
1474        assert_eq!(after.result_cache_hits, before.result_cache_hits);
1475        assert_eq!(after.result_cache_misses, before.result_cache_misses);
1476
1477        restore_default_result_cache_config();
1478        let _ = fs::remove_dir_all(&root);
1479    }
1480
1481    #[test]
1482    fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1483        let _guard = crate::runtime::runtime_test_guard()
1484            .lock()
1485            .expect("provider test guard");
1486        restore_default_result_cache_config();
1487
1488        let db = MemDB::instance();
1489        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1490            .expect("create table");
1491        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1492            .expect("seed table");
1493        init_mem_provider(db).expect("init provider");
1494
1495        let before = runtime_snapshot();
1496        let root = uniq_cache_cfg_tmp_dir();
1497        let conf_path = write_provider_only_knowdb_with_cache(
1498            &root,
1499            "mysql",
1500            "not-a-valid-mysql-url",
1501            false,
1502            3,
1503            5,
1504        );
1505        let authority_uri = format!(
1506            "file:{}?mode=rwc&uri=true",
1507            root.join("unused.sqlite").display()
1508        );
1509
1510        let err =
1511            init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1512        assert!(err.is_err());
1513
1514        let after = runtime_snapshot();
1515        assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1516        assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1517        assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1518
1519        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1520        assert_eq!(row[0].to_string(), "chars(first)");
1521
1522        let _ = fs::remove_dir_all(&root);
1523    }
1524}