Skip to main content

wp_knowledge/
facade.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5    collections::hash_map::DefaultHasher,
6    hash::{Hash, Hasher},
7};
8
9use crate::error::KnowledgeResult;
10use async_trait::async_trait;
11use rusqlite::ToSql;
12use rusqlite::{Connection, OpenFlags};
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25    CachePolicy, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor, QueryRequest,
26    QueryResponse, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29    CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30    telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34    db: MemDB,
35    metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41        ThreadClonedMDB::query_with_scope(self, sql)
42    }
43
44    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45        ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46    }
47
48    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49        ThreadClonedMDB::query_row_with_scope(self, sql)
50    }
51
52    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53        ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54    }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60        self.db.query_with_scope(&self.metadata_scope, sql)
61    }
62
63    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64        self.db
65            .query_fields_with_scope(&self.metadata_scope, sql, params)
66    }
67
68    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69        self.db.query_row_with_scope(&self.metadata_scope, sql)
70    }
71
72    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73        self.db
74            .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75    }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81        PostgresProvider::query(self, sql)
82    }
83
84    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85        PostgresProvider::query_fields(self, sql, params)
86    }
87
88    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89        PostgresProvider::query_row(self, sql)
90    }
91
92    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93        PostgresProvider::query_named_fields(self, sql, params)
94    }
95
96    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97        PostgresProvider::query_async(self, sql).await
98    }
99
100    async fn query_fields_async(
101        &self,
102        sql: &str,
103        params: &[DataField],
104    ) -> KnowledgeResult<Vec<RowData>> {
105        PostgresProvider::query_fields_async(self, sql, params).await
106    }
107
108    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109        PostgresProvider::query_row_async(self, sql).await
110    }
111
112    async fn query_named_fields_async(
113        &self,
114        sql: &str,
115        params: &[DataField],
116    ) -> KnowledgeResult<RowData> {
117        PostgresProvider::query_named_fields_async(self, sql, params).await
118    }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124        MySqlProvider::query(self, sql)
125    }
126
127    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128        MySqlProvider::query_fields(self, sql, params)
129    }
130
131    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132        MySqlProvider::query_row(self, sql)
133    }
134
135    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136        MySqlProvider::query_named_fields(self, sql, params)
137    }
138
139    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140        MySqlProvider::query_async(self, sql).await
141    }
142
143    async fn query_fields_async(
144        &self,
145        sql: &str,
146        params: &[DataField],
147    ) -> KnowledgeResult<Vec<RowData>> {
148        MySqlProvider::query_fields_async(self, sql, params).await
149    }
150
151    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152        MySqlProvider::query_row_async(self, sql).await
153    }
154
155    async fn query_named_fields_async(
156        &self,
157        sql: &str,
158        params: &[DataField],
159    ) -> KnowledgeResult<RowData> {
160        MySqlProvider::query_named_fields_async(self, sql, params).await
161    }
162}
163
164fn install_provider<F>(
165    kind: ProviderKind,
166    datasource_id: DatasourceId,
167    build: F,
168) -> KnowledgeResult<()>
169where
170    F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172    runtime().install_provider(kind, datasource_id, build)?;
173    Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177    DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181    let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182    install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183        Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184            authority_uri,
185            datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186            generation.0,
187        )))
188    })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192    install_provider(
193        ProviderKind::SqliteAuthority,
194        datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195        |generation| {
196            Ok(Arc::new(MemProvider {
197                db: memdb,
198                metadata_scope: MetadataCacheScope {
199                    datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200                    generation,
201                },
202            }))
203        },
204    )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208    init_postgres_provider_with_config(
209        PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
210    )
211}
212
213pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
214    let connection_uri = config.connection_uri().to_string();
215    let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
216    install_provider(
217        ProviderKind::Postgres,
218        datasource_id.clone(),
219        |generation| {
220            let provider = PostgresProvider::connect(
221                &config,
222                MetadataCacheScope {
223                    datasource_id: datasource_id.clone(),
224                    generation,
225                },
226            )?;
227            Ok(Arc::new(provider))
228        },
229    )
230}
231
232pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
233    init_mysql_provider_with_config(
234        MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
235    )
236}
237
238pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
239    let connection_uri = config.connection_uri().to_string();
240    let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
241    install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
242        let provider = MySqlProvider::connect(
243            &config,
244            MetadataCacheScope {
245                datasource_id: datasource_id.clone(),
246                generation,
247            },
248        )?;
249        Ok(Arc::new(provider))
250    })
251}
252
253pub fn current_generation() -> Option<u64> {
254    runtime()
255        .current_generation()
256        .map(|generation| generation.0)
257}
258
259pub fn runtime_snapshot() -> RuntimeSnapshot {
260    runtime().snapshot()
261}
262
263pub fn install_runtime_telemetry(
264    telemetry_impl: Arc<dyn KnowledgeTelemetry>,
265) -> Arc<dyn KnowledgeTelemetry> {
266    install_telemetry(telemetry_impl)
267}
268
269pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
270    runtime()
271        .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
272        .map(QueryResponse::into_rows)
273}
274
275pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
276    runtime()
277        .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
278        .await
279        .map(QueryResponse::into_rows)
280}
281
282pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
283    runtime()
284        .execute(&QueryRequest::first_row(
285            sql,
286            Vec::new(),
287            CachePolicy::Bypass,
288        ))
289        .map(QueryResponse::into_row)
290}
291
292pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
293    runtime()
294        .execute_async(&QueryRequest::first_row(
295            sql,
296            Vec::new(),
297            CachePolicy::Bypass,
298        ))
299        .await
300        .map(QueryResponse::into_row)
301}
302
303pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
304    runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
305}
306
307pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
308    runtime()
309        .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
310        .await
311}
312
313pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
314    query_fields(sql, params)
315}
316
317pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
318    query_fields_async(sql, params).await
319}
320
321pub fn query_named<'a>(
322    sql: &str,
323    params: &'a [(&'a str, &'a dyn ToSql)],
324) -> KnowledgeResult<RowData> {
325    let fields = named_params_to_fields(params)?;
326    query_fields(sql, &fields)
327}
328
329pub fn cache_query_fields<const N: usize>(
330    sql: &str,
331    c_params: &[DataField; N],
332    query_params: &[DataField],
333    cache: &mut impl CacheAble<DataField, RowData, N>,
334) -> RowData {
335    cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
336}
337
338pub fn cache_query_fields_with_scope<const N: usize>(
339    sql: &str,
340    local_cache_scope: u64,
341    c_params: &[DataField; N],
342    query_params: &[DataField],
343    cache: &mut impl CacheAble<DataField, RowData, N>,
344) -> RowData {
345    if let Some(generation) = current_generation() {
346        cache.prepare_generation(generation);
347    }
348    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
349        runtime().record_local_cache_hit();
350        if telemetry_enabled() {
351            telemetry().on_cache(&CacheTelemetryEvent {
352                layer: CacheLayer::Local,
353                outcome: CacheOutcome::Hit,
354                provider_kind: runtime().current_provider_kind(),
355            });
356        }
357        return hit.clone();
358    }
359    runtime().record_local_cache_miss();
360    if telemetry_enabled() {
361        telemetry().on_cache(&CacheTelemetryEvent {
362            layer: CacheLayer::Local,
363            outcome: CacheOutcome::Miss,
364            provider_kind: runtime().current_provider_kind(),
365        });
366    }
367
368    match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
369        Ok(row) => {
370            cache.save_scoped(local_cache_scope, c_params, row.clone());
371            row
372        }
373        Err(err) => {
374            warn_kdb!("[kdb] query error: {}", err);
375            Vec::new()
376        }
377    }
378}
379
380pub async fn cache_query_fields_async<const N: usize>(
381    sql: &str,
382    c_params: &[DataField; N],
383    query_params: &[DataField],
384    cache: &mut impl CacheAble<DataField, RowData, N>,
385) -> RowData {
386    cache_query_fields_async_with_scope(
387        sql,
388        stable_hash(sql),
389        c_params,
390        || query_params.to_vec(),
391        cache,
392    )
393    .await
394}
395
396pub async fn cache_query_fields_async_with<const N: usize, F>(
397    sql: &str,
398    c_params: &[DataField; N],
399    build_query_params: F,
400    cache: &mut impl CacheAble<DataField, RowData, N>,
401) -> RowData
402where
403    F: FnOnce() -> Vec<DataField>,
404{
405    cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
406        .await
407}
408
409pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
410    sql: &str,
411    local_cache_scope: u64,
412    c_params: &[DataField; N],
413    build_query_params: F,
414    cache: &mut impl CacheAble<DataField, RowData, N>,
415) -> RowData
416where
417    F: FnOnce() -> Vec<DataField>,
418{
419    if let Some(generation) = current_generation() {
420        cache.prepare_generation(generation);
421    }
422    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
423        runtime().record_local_cache_hit();
424        if telemetry_enabled() {
425            telemetry().on_cache(&CacheTelemetryEvent {
426                layer: CacheLayer::Local,
427                outcome: CacheOutcome::Hit,
428                provider_kind: runtime().current_provider_kind(),
429            });
430        }
431        return hit.clone();
432    }
433    runtime().record_local_cache_miss();
434    if telemetry_enabled() {
435        telemetry().on_cache(&CacheTelemetryEvent {
436            layer: CacheLayer::Local,
437            outcome: CacheOutcome::Miss,
438            provider_kind: runtime().current_provider_kind(),
439        });
440    }
441
442    let query_params = build_query_params();
443    match runtime()
444        .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
445        .await
446    {
447        Ok(row) => {
448            cache.save_scoped(local_cache_scope, c_params, row.clone());
449            row
450        }
451        Err(err) => {
452            warn_kdb!("[kdb] query error: {}", err);
453            Vec::new()
454        }
455    }
456}
457
458pub fn cache_query<const N: usize>(
459    sql: &str,
460    c_params: &[DataField; N],
461    named_params: &[(&str, &dyn ToSql)],
462    cache: &mut impl CacheAble<DataField, RowData, N>,
463) -> RowData {
464    let query_fields = match named_params_to_fields(named_params) {
465        Ok(fields) => fields,
466        Err(err) => {
467            warn_kdb!("[kdb] query param conversion error: {}", err);
468            return Vec::new();
469        }
470    };
471
472    cache_query_fields(sql, c_params, &query_fields, cache)
473}
474
475pub async fn cache_query_async<const N: usize>(
476    sql: &str,
477    c_params: &[DataField; N],
478    named_params: &[(&str, &dyn ToSql)],
479    cache: &mut impl CacheAble<DataField, RowData, N>,
480) -> RowData {
481    let query_fields = match named_params_to_fields(named_params) {
482        Ok(fields) => fields,
483        Err(err) => {
484            warn_kdb!("[kdb] query param conversion error: {}", err);
485            return Vec::new();
486        }
487    };
488
489    cache_query_fields_async(sql, c_params, &query_fields, cache).await
490}
491
492fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
493    if let Ok(conn) = Connection::open_with_flags(
494        authority_uri,
495        OpenFlags::SQLITE_OPEN_READ_WRITE
496            | OpenFlags::SQLITE_OPEN_CREATE
497            | OpenFlags::SQLITE_OPEN_URI,
498    ) {
499        let _ = conn.execute_batch(
500            "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
501        );
502    }
503    Ok(())
504}
505
506pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
507    ensure_wal(authority_uri)?;
508    let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
509    let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
510    init_mem_provider(mem)
511}
512
513pub fn init_thread_cloned_from_knowdb(
514    root: &Path,
515    knowdb_conf: &Path,
516    authority_uri: &str,
517    dict: &orion_variate::EnvDict,
518) -> KnowledgeResult<()> {
519    let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
520    if let Some(provider) = conf.provider {
521        match provider.kind {
522            ProviderKind::Postgres => {
523                info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
524                init_postgres_provider_with_config(
525                    PostgresProviderConfig::new(provider.connection_uri)
526                        .with_pool_size(provider.pool_size)
527                        .with_min_connections(provider.min_connections)
528                        .with_acquire_timeout_ms(provider.acquire_timeout_ms)
529                        .with_idle_timeout_ms(provider.idle_timeout_ms)
530                        .with_max_lifetime_ms(provider.max_lifetime_ms),
531                )?;
532                runtime().configure_result_cache(
533                    conf.cache.enabled,
534                    conf.cache.capacity,
535                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
536                );
537                return Ok(());
538            }
539            ProviderKind::Mysql => {
540                info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
541                init_mysql_provider_with_config(
542                    MySqlProviderConfig::new(provider.connection_uri)
543                        .with_pool_size(provider.pool_size)
544                        .with_min_connections(provider.min_connections)
545                        .with_acquire_timeout_ms(provider.acquire_timeout_ms)
546                        .with_idle_timeout_ms(provider.idle_timeout_ms)
547                        .with_max_lifetime_ms(provider.max_lifetime_ms),
548                )?;
549                runtime().configure_result_cache(
550                    conf.cache.enabled,
551                    conf.cache.capacity,
552                    Duration::from_millis(conf.cache.ttl_ms.max(1)),
553                );
554                return Ok(());
555            }
556            ProviderKind::SqliteAuthority => {}
557        }
558    }
559
560    crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
561    let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
562        let path_part = rest.split('?').next().unwrap_or(rest);
563        format!("file:{}?mode=ro&uri=true", path_part)
564    } else {
565        authority_uri.to_string()
566    };
567
568    info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
569    init_thread_cloned_from_authority(&ro_uri)?;
570    runtime().configure_result_cache(
571        conf.cache.enabled,
572        conf.cache.capacity,
573        Duration::from_millis(conf.cache.ttl_ms.max(1)),
574    );
575    Ok(())
576}
577
578fn stable_hash(value: &str) -> u64 {
579    let mut hasher = DefaultHasher::new();
580    value.hash(&mut hasher);
581    hasher.finish()
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use crate::cache::FieldQueryCache;
588    use crate::error::Reason;
589    use crate::mem::memdb::MemDB;
590    use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
591    use crate::runtime::fields_to_params;
592    use crate::telemetry::{
593        CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
594        ReloadTelemetryEvent, reset_telemetry,
595    };
596    use orion_error::UvsFrom;
597    use orion_error::conversion::ToStructError;
598    use orion_variate::EnvDict;
599    use std::fs;
600    use std::hint::black_box;
601    use std::path::PathBuf;
602    use std::sync::atomic::{AtomicU64, Ordering};
603    use std::time::{Duration, Instant};
604
605    #[derive(Default)]
606    struct TestTelemetry {
607        reload_success: AtomicU64,
608        reload_failure: AtomicU64,
609        local_hits: AtomicU64,
610        local_misses: AtomicU64,
611        result_hits: AtomicU64,
612        result_misses: AtomicU64,
613        metadata_hits: AtomicU64,
614        metadata_misses: AtomicU64,
615        query_success: AtomicU64,
616        query_failure: AtomicU64,
617    }
618
619    impl KnowledgeTelemetry for TestTelemetry {
620        fn on_cache(&self, event: &CacheTelemetryEvent) {
621            let counter = match (event.layer, event.outcome) {
622                (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
623                (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
624                (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
625                (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
626                (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
627                (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
628            };
629            counter.fetch_add(1, Ordering::Relaxed);
630        }
631
632        fn on_reload(&self, event: &ReloadTelemetryEvent) {
633            let counter = match event.outcome {
634                crate::telemetry::ReloadOutcome::Success => &self.reload_success,
635                crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
636            };
637            counter.fetch_add(1, Ordering::Relaxed);
638        }
639
640        fn on_query(&self, event: &QueryTelemetryEvent) {
641            let counter = if event.success {
642                &self.query_success
643            } else {
644                &self.query_failure
645            };
646            counter.fetch_add(1, Ordering::Relaxed);
647        }
648    }
649
650    fn perf_env_usize(key: &str, default: usize) -> usize {
651        std::env::var(key)
652            .ok()
653            .and_then(|value| value.parse::<usize>().ok())
654            .unwrap_or(default)
655    }
656
657    fn seed_perf_provider(rows: usize) {
658        let db = MemDB::instance();
659        db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
660            .expect("create perf_kv");
661        db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
662        for id in 1..=rows {
663            let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
664            db.execute(sql.as_str()).expect("insert perf_kv row");
665        }
666        db.execute("COMMIT").expect("commit perf_kv load");
667        init_mem_provider(db).expect("init perf provider");
668    }
669
670    #[tokio::test(flavor = "current_thread")]
671    async fn query_async_works_with_mem_provider() {
672        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
673        let db = MemDB::instance();
674        db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
675            .expect("create async_kv");
676        db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
677            .expect("insert async_kv row");
678        init_mem_provider(db).expect("init mem provider");
679
680        let row = query_fields_async(
681            "SELECT value FROM async_kv WHERE id=:id",
682            &[DataField::from_digit(":id", 1)],
683        )
684        .await
685        .expect("query async row");
686        assert_eq!(row.len(), 1);
687        assert_eq!(row[0].to_string(), "chars(hello)");
688    }
689
690    #[derive(Clone)]
691    struct PerfQuery {
692        cache_key: [DataField; 1],
693        query_params: [DataField; 1],
694        bypass_req: QueryRequest,
695        global_req: QueryRequest,
696    }
697
698    fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
699        (0..ops)
700            .map(|idx| {
701                let id = ((idx * 17) % hotset + 1) as i64;
702                let cache_key = [DataField::from_digit("id", id)];
703                let query_params = [DataField::from_digit(":id", id)];
704                let bypass_req = QueryRequest::first_row(
705                    "SELECT value FROM perf_kv WHERE id=:id",
706                    fields_to_params(&query_params),
707                    CachePolicy::Bypass,
708                );
709                let global_req = QueryRequest::first_row(
710                    "SELECT value FROM perf_kv WHERE id=:id",
711                    fields_to_params(&query_params),
712                    CachePolicy::UseGlobal,
713                );
714                PerfQuery {
715                    cache_key,
716                    query_params,
717                    bypass_req,
718                    global_req,
719                }
720            })
721            .collect()
722    }
723
724    #[derive(Debug, Clone, Copy)]
725    struct PerfCounters {
726        result_hits: u64,
727        result_misses: u64,
728        local_hits: u64,
729        local_misses: u64,
730        metadata_hits: u64,
731        metadata_misses: u64,
732    }
733
734    #[derive(Debug, Clone)]
735    struct PerfResult {
736        name: &'static str,
737        elapsed: Duration,
738        ops: usize,
739        counters: PerfCounters,
740    }
741
742    impl PerfResult {
743        fn qps(&self) -> f64 {
744            let secs = self.elapsed.as_secs_f64();
745            if secs == 0.0 {
746                self.ops as f64
747            } else {
748                self.ops as f64 / secs
749            }
750        }
751    }
752
753    fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
754        PerfCounters {
755            result_hits: after
756                .result_cache_hits
757                .saturating_sub(before.result_cache_hits),
758            result_misses: after
759                .result_cache_misses
760                .saturating_sub(before.result_cache_misses),
761            local_hits: after
762                .local_cache_hits
763                .saturating_sub(before.local_cache_hits),
764            local_misses: after
765                .local_cache_misses
766                .saturating_sub(before.local_cache_misses),
767            metadata_hits: after
768                .metadata_cache_hits
769                .saturating_sub(before.metadata_cache_hits),
770            metadata_misses: after
771                .metadata_cache_misses
772                .saturating_sub(before.metadata_cache_misses),
773        }
774    }
775
776    fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
777        seed_perf_provider(rows);
778        let before = runtime_snapshot();
779        let started = Instant::now();
780        for item in workload {
781            let row = runtime()
782                .execute(&item.bypass_req)
783                .expect("execute bypass request")
784                .into_row();
785            black_box(row);
786        }
787        let elapsed = started.elapsed();
788        let after = runtime_snapshot();
789        PerfResult {
790            name: "bypass",
791            elapsed,
792            ops: workload.len(),
793            counters: snapshot_delta(&before, &after),
794        }
795    }
796
797    fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
798        seed_perf_provider(rows);
799        let before = runtime_snapshot();
800        let started = Instant::now();
801        for item in workload {
802            let row = runtime()
803                .execute(&item.global_req)
804                .expect("execute global-cache request")
805                .into_row();
806            black_box(row);
807        }
808        let elapsed = started.elapsed();
809        let after = runtime_snapshot();
810        PerfResult {
811            name: "global_cache",
812            elapsed,
813            ops: workload.len(),
814            counters: snapshot_delta(&before, &after),
815        }
816    }
817
818    fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
819        seed_perf_provider(rows);
820        let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
821        let before = runtime_snapshot();
822        let started = Instant::now();
823        for item in workload {
824            let row = cache_query_fields(
825                "SELECT value FROM perf_kv WHERE id=:id",
826                &item.cache_key,
827                &item.query_params,
828                &mut cache,
829            );
830            black_box(row);
831        }
832        let elapsed = started.elapsed();
833        let after = runtime_snapshot();
834        PerfResult {
835            name: "local_cache",
836            elapsed,
837            ops: workload.len(),
838            counters: snapshot_delta(&before, &after),
839        }
840    }
841
842    fn print_perf_result(result: &PerfResult) {
843        eprintln!(
844            "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
845            result.name,
846            result.elapsed.as_millis(),
847            result.qps(),
848            result.counters.result_hits,
849            result.counters.result_misses,
850            result.counters.local_hits,
851            result.counters.local_misses,
852            result.counters.metadata_hits,
853            result.counters.metadata_misses,
854        );
855    }
856
857    fn uniq_cache_cfg_tmp_dir() -> PathBuf {
858        use rand::{Rng, rng};
859        let rnd: u64 = rng().next_u64();
860        std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
861    }
862
863    fn write_minimal_knowdb_with_cache(
864        root: &Path,
865        enabled: bool,
866        capacity: usize,
867        ttl_ms: u64,
868    ) -> std::path::PathBuf {
869        let models = root.join("models").join("knowledge");
870        let example_dir = models.join("example");
871        fs::create_dir_all(&example_dir).expect("create knowdb models/example");
872        fs::write(
873            models.join("knowdb.toml"),
874            format!(
875                r#"
876version = 2
877base_dir = "."
878
879[cache]
880enabled = {enabled}
881capacity = {capacity}
882ttl_ms = {ttl_ms}
883
884[csv]
885has_header = false
886
887[[tables]]
888name = "example"
889columns.by_index = [0,1]
890"#
891            ),
892        )
893        .expect("write knowdb.toml");
894        fs::write(
895            example_dir.join("create.sql"),
896            r#"
897CREATE TABLE IF NOT EXISTS {table} (
898  id INTEGER PRIMARY KEY,
899  value TEXT NOT NULL
900);
901"#,
902        )
903        .expect("write create.sql");
904        fs::write(
905            example_dir.join("insert.sql"),
906            "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
907        )
908        .expect("write insert.sql");
909        fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
910        models.join("knowdb.toml")
911    }
912
913    fn write_provider_only_knowdb_with_cache(
914        root: &Path,
915        provider_kind: &str,
916        connection_uri: &str,
917        enabled: bool,
918        capacity: usize,
919        ttl_ms: u64,
920    ) -> std::path::PathBuf {
921        let models = root.join("models").join("knowledge");
922        fs::create_dir_all(&models).expect("create knowdb models");
923        fs::write(
924            models.join("knowdb.toml"),
925            format!(
926                r#"
927version = 2
928base_dir = "."
929
930[cache]
931enabled = {enabled}
932capacity = {capacity}
933ttl_ms = {ttl_ms}
934
935[provider]
936kind = "{provider_kind}"
937connection_uri = "{connection_uri}"
938"#
939            ),
940        )
941        .expect("write provider knowdb.toml");
942        models.join("knowdb.toml")
943    }
944
945    fn restore_default_result_cache_config() {
946        runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
947    }
948
949    #[test]
950    fn provider_can_be_replaced() {
951        let _guard = crate::runtime::runtime_test_guard()
952            .lock()
953            .expect("provider test guard");
954        let db1 = MemDB::instance();
955        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
956            .expect("create table in db1");
957        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
958            .expect("seed db1");
959        init_mem_provider(db1).expect("init provider db1");
960        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
961        assert_eq!(row[0].to_string(), "chars(first)");
962
963        let db2 = MemDB::instance();
964        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
965            .expect("create table in db2");
966        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
967            .expect("seed db2");
968        init_mem_provider(db2).expect("replace provider with db2");
969        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
970        assert_eq!(row[0].to_string(), "chars(second)");
971    }
972
973    #[test]
974    fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
975        let _guard = crate::runtime::runtime_test_guard()
976            .lock()
977            .expect("provider test guard");
978        COLNAME_CACHE.write().expect("metadata cache lock").clear();
979
980        let db = MemDB::instance();
981        db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
982            .expect("create table");
983        db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
984            .expect("seed table");
985
986        let old_scope = MetadataCacheScope {
987            datasource_id: DatasourceId("sqlite:old".to_string()),
988            generation: Generation(1),
989        };
990        let new_scope = MetadataCacheScope {
991            datasource_id: DatasourceId("sqlite:new".to_string()),
992            generation: Generation(2),
993        };
994        let old_provider = MemProvider {
995            db: db.clone(),
996            metadata_scope: old_scope.clone(),
997        };
998
999        install_provider(
1000            ProviderKind::SqliteAuthority,
1001            new_scope.datasource_id.clone(),
1002            |_generation| {
1003                Ok(Arc::new(MemProvider {
1004                    db: db.clone(),
1005                    metadata_scope: new_scope.clone(),
1006                }))
1007            },
1008        )
1009        .expect("install new provider");
1010
1011        let row = old_provider
1012            .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1013            .expect("old provider query");
1014        assert_eq!(row[0].to_string(), "chars(scope-old)");
1015
1016        let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1017        assert!(cache.contains(&metadata_cache_key_for_scope(
1018            &old_scope,
1019            "SELECT value FROM cache_scope_t WHERE id = 1",
1020        )));
1021        assert!(!cache.contains(&metadata_cache_key_for_scope(
1022            &new_scope,
1023            "SELECT value FROM cache_scope_t WHERE id = 1",
1024        )));
1025    }
1026
1027    #[tokio::test(flavor = "current_thread")]
1028    async fn async_query_uses_runtime_bridge() {
1029        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1030        let db = MemDB::instance();
1031        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1032            .expect("create table");
1033        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1034            .expect("seed table");
1035        init_mem_provider(db).expect("init provider");
1036
1037        let row = query_row_async("SELECT value FROM t WHERE id = 1")
1038            .await
1039            .expect("async query row");
1040        assert_eq!(row[0].to_string(), "chars(async-first)");
1041    }
1042
1043    #[tokio::test(flavor = "current_thread")]
1044    async fn async_cache_query_fields_hits_local_cache() {
1045        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1046        let db = MemDB::instance();
1047        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1048            .expect("create table");
1049        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1050            .expect("seed table");
1051        init_mem_provider(db).expect("init provider");
1052
1053        let key = [DataField::from_digit("id", 1)];
1054        let params = [DataField::from_digit(":id", 1)];
1055        let mut cache = FieldQueryCache::default();
1056
1057        let first = cache_query_fields_async(
1058            "SELECT value FROM t WHERE id=:id",
1059            &key,
1060            &params,
1061            &mut cache,
1062        )
1063        .await;
1064        let second = cache_query_fields_async(
1065            "SELECT value FROM t WHERE id=:id",
1066            &key,
1067            &params,
1068            &mut cache,
1069        )
1070        .await;
1071
1072        assert_eq!(first[0].to_string(), "chars(async-cache)");
1073        assert_eq!(second[0].to_string(), "chars(async-cache)");
1074    }
1075
1076    #[test]
1077    fn local_cache_is_cleared_when_generation_changes() {
1078        let _guard = crate::runtime::runtime_test_guard()
1079            .lock()
1080            .expect("provider test guard");
1081        let db1 = MemDB::instance();
1082        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1083            .expect("create table in db1");
1084        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1085            .expect("seed db1");
1086        init_mem_provider(db1).expect("init provider db1");
1087
1088        let key = [DataField::from_digit("id", 1)];
1089        let params = [DataField::from_digit(":id", 1)];
1090        let mut cache = FieldQueryCache::default();
1091        let row = cache_query_fields(
1092            "SELECT value FROM t WHERE id=:id",
1093            &key,
1094            &params,
1095            &mut cache,
1096        );
1097        assert_eq!(row[0].to_string(), "chars(first)");
1098
1099        let db2 = MemDB::instance();
1100        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1101            .expect("create table in db2");
1102        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1103            .expect("seed db2");
1104        init_mem_provider(db2).expect("replace provider with db2");
1105
1106        let row = cache_query_fields(
1107            "SELECT value FROM t WHERE id=:id",
1108            &key,
1109            &params,
1110            &mut cache,
1111        );
1112        assert_eq!(row[0].to_string(), "chars(second)");
1113    }
1114
1115    #[test]
1116    fn local_cache_is_scoped_by_sql_text() {
1117        let _guard = crate::runtime::runtime_test_guard()
1118            .lock()
1119            .expect("provider test guard");
1120        let db = MemDB::instance();
1121        db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1122            .expect("create t1");
1123        db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1124            .expect("create t2");
1125        db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1126            .expect("seed t1");
1127        db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1128            .expect("seed t2");
1129        init_mem_provider(db).expect("init provider");
1130
1131        let key = [DataField::from_digit("id", 1)];
1132        let params = [DataField::from_digit(":id", 1)];
1133        let mut cache = FieldQueryCache::default();
1134
1135        let row = cache_query_fields(
1136            "SELECT value FROM t1 WHERE id=:id",
1137            &key,
1138            &params,
1139            &mut cache,
1140        );
1141        assert_eq!(row[0].to_string(), "chars(first)");
1142
1143        let row = cache_query_fields(
1144            "SELECT value FROM t2 WHERE id=:id",
1145            &key,
1146            &params,
1147            &mut cache,
1148        );
1149        assert_eq!(row[0].to_string(), "chars(second)");
1150    }
1151
1152    #[test]
1153    fn runtime_snapshot_tracks_generation_and_cache_size() {
1154        let _guard = crate::runtime::runtime_test_guard()
1155            .lock()
1156            .expect("provider test guard");
1157        let db = MemDB::instance();
1158        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1159            .expect("create table");
1160        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1161            .expect("seed table");
1162        init_mem_provider(db).expect("init provider");
1163
1164        let mut cache = FieldQueryCache::default();
1165        let key = [DataField::from_digit("id", 1)];
1166        let params = [DataField::from_digit(":id", 1)];
1167        let row = cache_query_fields(
1168            "SELECT value FROM t WHERE id=:id",
1169            &key,
1170            &params,
1171            &mut cache,
1172        );
1173        assert_eq!(row[0].to_string(), "chars(first)");
1174
1175        let snapshot = runtime_snapshot();
1176        assert!(matches!(
1177            snapshot.provider_kind,
1178            Some(ProviderKind::SqliteAuthority)
1179        ));
1180        assert!(snapshot.generation.is_some());
1181        assert!(snapshot.result_cache_len >= 1);
1182        assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1183        assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1184        assert!(snapshot.reload_successes >= 1);
1185    }
1186
1187    #[test]
1188    fn metadata_cache_is_scoped_by_generation() {
1189        let _guard = crate::runtime::runtime_test_guard()
1190            .lock()
1191            .expect("provider test guard");
1192        let sql = "SELECT value FROM t WHERE id = 1";
1193        let before = runtime_snapshot().metadata_cache_len;
1194
1195        let db1 = MemDB::instance();
1196        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1197            .expect("create table in db1");
1198        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1199            .expect("seed db1");
1200        init_mem_provider(db1).expect("init provider db1");
1201        let row = query_row(sql).expect("query db1");
1202        assert_eq!(row[0].to_string(), "chars(first)");
1203        let after_first = runtime_snapshot().metadata_cache_len;
1204        assert!(
1205            after_first > before,
1206            "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1207        );
1208
1209        let db2 = MemDB::instance();
1210        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1211            .expect("create table in db2");
1212        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1213            .expect("seed db2");
1214        init_mem_provider(db2).expect("replace provider with db2");
1215        let row = query_row(sql).expect("query db2");
1216        assert_eq!(row[0].to_string(), "chars(second)");
1217        let after_second = runtime_snapshot().metadata_cache_len;
1218        assert!(
1219            after_second > after_first,
1220            "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1221        );
1222    }
1223
1224    #[test]
1225    fn failed_provider_reload_keeps_previous_provider() {
1226        let _guard = crate::runtime::runtime_test_guard()
1227            .lock()
1228            .expect("provider test guard");
1229        let db1 = MemDB::instance();
1230        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1231            .expect("create table in db1");
1232        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1233            .expect("seed db1");
1234        init_mem_provider(db1).expect("init provider db1");
1235        let before_generation = current_generation();
1236
1237        let reload_err = install_provider(
1238            ProviderKind::SqliteAuthority,
1239            datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1240            |_generation| {
1241                Err(Reason::from_logic()
1242                    .to_err()
1243                    .with_detail("expected reload failure"))
1244            },
1245        );
1246        assert!(reload_err.is_err());
1247
1248        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1249        assert_eq!(row[0].to_string(), "chars(first)");
1250        assert_eq!(current_generation(), before_generation);
1251    }
1252
1253    #[test]
1254    fn runtime_snapshot_records_cache_counters() {
1255        let _guard = crate::runtime::runtime_test_guard()
1256            .lock()
1257            .expect("provider test guard");
1258        let db = MemDB::instance();
1259        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1260            .expect("create table");
1261        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1262            .expect("seed table");
1263        init_mem_provider(db).expect("init provider");
1264
1265        let before = runtime_snapshot();
1266        let mut cache = FieldQueryCache::default();
1267        let key = [DataField::from_digit("id", 1)];
1268        let params = [DataField::from_digit(":id", 1)];
1269        let row = cache_query_fields(
1270            "SELECT value FROM t WHERE id=:id",
1271            &key,
1272            &params,
1273            &mut cache,
1274        );
1275        assert_eq!(row[0].to_string(), "chars(first)");
1276        let row = cache_query_fields(
1277            "SELECT value FROM t WHERE id=:id",
1278            &key,
1279            &params,
1280            &mut cache,
1281        );
1282        assert_eq!(row[0].to_string(), "chars(first)");
1283
1284        let after = runtime_snapshot();
1285        assert!(after.local_cache_hits > before.local_cache_hits);
1286        assert!(after.local_cache_misses > before.local_cache_misses);
1287        assert!(after.result_cache_misses > before.result_cache_misses);
1288        assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1289    }
1290
1291    #[test]
1292    fn telemetry_receives_reload_cache_and_query_events() {
1293        let _guard = crate::runtime::runtime_test_guard()
1294            .lock()
1295            .expect("provider test guard");
1296        let telemetry_impl = Arc::new(TestTelemetry::default());
1297        let previous = install_runtime_telemetry(telemetry_impl.clone());
1298
1299        let db = MemDB::instance();
1300        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1301            .expect("create table");
1302        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1303            .expect("seed table");
1304        init_mem_provider(db).expect("init provider");
1305
1306        let mut cache = FieldQueryCache::default();
1307        let key = [DataField::from_digit("id", 1)];
1308        let params = [DataField::from_digit(":id", 1)];
1309        let row = cache_query_fields(
1310            "SELECT value FROM t WHERE id=:id",
1311            &key,
1312            &params,
1313            &mut cache,
1314        );
1315        assert_eq!(row[0].to_string(), "chars(first)");
1316        let row = cache_query_fields(
1317            "SELECT value FROM t WHERE id=:id",
1318            &key,
1319            &params,
1320            &mut cache,
1321        );
1322        assert_eq!(row[0].to_string(), "chars(first)");
1323
1324        let reload_err = install_provider(
1325            ProviderKind::SqliteAuthority,
1326            datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1327            |_generation| {
1328                Err(Reason::from_logic()
1329                    .to_err()
1330                    .with_detail("expected telemetry reload failure"))
1331            },
1332        );
1333        assert!(reload_err.is_err());
1334
1335        install_runtime_telemetry(previous);
1336        reset_telemetry();
1337
1338        assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1339        assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1340        assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1341        assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1342        assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1343        assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1344        assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1345    }
1346
1347    #[test]
1348    #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1349    fn cache_perf_reports_cache_vs_no_cache() {
1350        let _guard = crate::runtime::runtime_test_guard()
1351            .lock()
1352            .expect("provider test guard");
1353        let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1354        let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1355        let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1356        let workload = build_perf_workload(ops, hotset);
1357
1358        eprintln!(
1359            "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1360            rows, ops, hotset
1361        );
1362
1363        let bypass = run_bypass_perf(&workload, rows);
1364        let global = run_global_cache_perf(&workload, rows);
1365        let local = run_local_cache_perf(&workload, rows);
1366
1367        print_perf_result(&bypass);
1368        print_perf_result(&global);
1369        print_perf_result(&local);
1370
1371        eprintln!(
1372            "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1373            bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1374            bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1375        );
1376
1377        assert_eq!(bypass.counters.result_hits, 0);
1378        assert_eq!(bypass.counters.result_misses, 0);
1379        assert_eq!(bypass.counters.local_hits, 0);
1380        assert_eq!(bypass.counters.local_misses, 0);
1381        assert!(global.counters.result_hits > 0);
1382        assert!(global.counters.result_misses > 0);
1383        assert_eq!(global.counters.local_hits, 0);
1384        assert_eq!(global.counters.local_misses, 0);
1385        assert!(local.counters.local_hits > 0);
1386        assert!(local.counters.local_misses > 0);
1387        assert!(local.counters.result_misses > 0);
1388    }
1389
1390    #[test]
1391    fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1392        let _guard = crate::runtime::runtime_test_guard()
1393            .lock()
1394            .expect("provider test guard");
1395        let root = uniq_cache_cfg_tmp_dir();
1396        let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1397        let auth_file = root.join(".run").join("authority.sqlite");
1398        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1399            .expect("create authority parent");
1400        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1401
1402        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1403            .expect("init knowdb with cache config");
1404
1405        let snapshot = runtime_snapshot();
1406        assert!(snapshot.result_cache_enabled);
1407        assert_eq!(snapshot.result_cache_capacity, 7);
1408        assert_eq!(snapshot.result_cache_ttl_ms, 5);
1409
1410        let req = QueryRequest::first_row(
1411            "SELECT value FROM example WHERE id=:id",
1412            fields_to_params(&[DataField::from_digit(":id", 1)]),
1413            CachePolicy::UseGlobal,
1414        );
1415        let before = runtime_snapshot();
1416        let row = runtime()
1417            .execute(&req)
1418            .expect("first result-cache query")
1419            .into_row();
1420        assert_eq!(row[0].to_string(), "chars(alpha)");
1421        let row = runtime()
1422            .execute(&req)
1423            .expect("second result-cache query")
1424            .into_row();
1425        assert_eq!(row[0].to_string(), "chars(alpha)");
1426        std::thread::sleep(Duration::from_millis(12));
1427        let row = runtime()
1428            .execute(&req)
1429            .expect("expired result-cache query")
1430            .into_row();
1431        assert_eq!(row[0].to_string(), "chars(alpha)");
1432        let after = runtime_snapshot();
1433
1434        assert!(after.result_cache_hits > before.result_cache_hits);
1435        assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1436
1437        restore_default_result_cache_config();
1438        let _ = fs::remove_dir_all(&root);
1439    }
1440
1441    #[test]
1442    fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1443        let _guard = crate::runtime::runtime_test_guard()
1444            .lock()
1445            .expect("provider test guard");
1446        let root = uniq_cache_cfg_tmp_dir();
1447        let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1448        let auth_file = root.join(".run").join("authority.sqlite");
1449        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1450            .expect("create authority parent");
1451        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1452
1453        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1454            .expect("init knowdb with cache disabled");
1455
1456        let snapshot = runtime_snapshot();
1457        assert!(!snapshot.result_cache_enabled);
1458        assert_eq!(snapshot.result_cache_capacity, 3);
1459        assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1460
1461        let req = QueryRequest::first_row(
1462            "SELECT value FROM example WHERE id=:id",
1463            fields_to_params(&[DataField::from_digit(":id", 1)]),
1464            CachePolicy::UseGlobal,
1465        );
1466        let before = runtime_snapshot();
1467        let _ = runtime()
1468            .execute(&req)
1469            .expect("first bypassed result-cache query");
1470        let _ = runtime()
1471            .execute(&req)
1472            .expect("second bypassed result-cache query");
1473        let after = runtime_snapshot();
1474
1475        assert_eq!(after.result_cache_hits, before.result_cache_hits);
1476        assert_eq!(after.result_cache_misses, before.result_cache_misses);
1477
1478        restore_default_result_cache_config();
1479        let _ = fs::remove_dir_all(&root);
1480    }
1481
1482    #[test]
1483    fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1484        let _guard = crate::runtime::runtime_test_guard()
1485            .lock()
1486            .expect("provider test guard");
1487        restore_default_result_cache_config();
1488
1489        let db = MemDB::instance();
1490        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1491            .expect("create table");
1492        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1493            .expect("seed table");
1494        init_mem_provider(db).expect("init provider");
1495
1496        let before = runtime_snapshot();
1497        let root = uniq_cache_cfg_tmp_dir();
1498        let conf_path = write_provider_only_knowdb_with_cache(
1499            &root,
1500            "mysql",
1501            "not-a-valid-mysql-url",
1502            false,
1503            3,
1504            5,
1505        );
1506        let authority_uri = format!(
1507            "file:{}?mode=rwc&uri=true",
1508            root.join("unused.sqlite").display()
1509        );
1510
1511        let err =
1512            init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1513        assert!(err.is_err());
1514
1515        let after = runtime_snapshot();
1516        assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1517        assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1518        assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1519
1520        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1521        assert_eq!(row[0].to_string(), "chars(first)");
1522
1523        let _ = fs::remove_dir_all(&root);
1524    }
1525}