Skip to main content

wp_knowledge/
facade.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5    collections::hash_map::DefaultHasher,
6    hash::{Hash, Hasher},
7};
8
9use crate::error::KnowledgeResult;
10use async_trait::async_trait;
11use rusqlite::ToSql;
12use rusqlite::{Connection, OpenFlags};
13use wp_log::{info_ctrl, warn_kdb};
14use wp_model_core::model::DataField;
15
16use crate::cache::CacheAble;
17use crate::loader::{ProviderKind, SqlProviderKind, parse_knowdb_conf};
18use crate::mem::RowData;
19use crate::mem::memdb::MemDB;
20use crate::mem::thread_clone::ThreadClonedMDB;
21use crate::mysql::{MySqlProvider, MySqlProviderConfig};
22use crate::param::named_params_to_fields;
23use crate::postgres::{PostgresProvider, PostgresProviderConfig};
24use crate::runtime::{
25    CachePolicy, CachedRedisValue, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor,
26    QueryRequest, QueryResponse, RedisCacheKey, RedisCmdTag, RuntimeSnapshot, runtime,
27};
28use crate::telemetry::{
29    CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
30    telemetry, telemetry_enabled,
31};
32
33struct MemProvider {
34    db: MemDB,
35    metadata_scope: MetadataCacheScope,
36}
37
38#[async_trait]
39impl ProviderExecutor for ThreadClonedMDB {
40    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
41        ThreadClonedMDB::query_with_scope(self, sql)
42    }
43
44    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
45        ThreadClonedMDB::query_fields_with_scope(self, sql, params)
46    }
47
48    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
49        ThreadClonedMDB::query_row_with_scope(self, sql)
50    }
51
52    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
53        ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
54    }
55}
56
57#[async_trait]
58impl ProviderExecutor for MemProvider {
59    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
60        self.db.query_with_scope(&self.metadata_scope, sql)
61    }
62
63    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
64        self.db
65            .query_fields_with_scope(&self.metadata_scope, sql, params)
66    }
67
68    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
69        self.db.query_row_with_scope(&self.metadata_scope, sql)
70    }
71
72    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
73        self.db
74            .query_named_fields_with_scope(&self.metadata_scope, sql, params)
75    }
76}
77
78#[async_trait]
79impl ProviderExecutor for PostgresProvider {
80    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
81        PostgresProvider::query(self, sql)
82    }
83
84    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
85        PostgresProvider::query_fields(self, sql, params)
86    }
87
88    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
89        PostgresProvider::query_row(self, sql)
90    }
91
92    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
93        PostgresProvider::query_named_fields(self, sql, params)
94    }
95
96    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
97        PostgresProvider::query_async(self, sql).await
98    }
99
100    async fn query_fields_async(
101        &self,
102        sql: &str,
103        params: &[DataField],
104    ) -> KnowledgeResult<Vec<RowData>> {
105        PostgresProvider::query_fields_async(self, sql, params).await
106    }
107
108    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
109        PostgresProvider::query_row_async(self, sql).await
110    }
111
112    async fn query_named_fields_async(
113        &self,
114        sql: &str,
115        params: &[DataField],
116    ) -> KnowledgeResult<RowData> {
117        PostgresProvider::query_named_fields_async(self, sql, params).await
118    }
119}
120
121#[async_trait]
122impl ProviderExecutor for MySqlProvider {
123    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
124        MySqlProvider::query(self, sql)
125    }
126
127    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
128        MySqlProvider::query_fields(self, sql, params)
129    }
130
131    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
132        MySqlProvider::query_row(self, sql)
133    }
134
135    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
136        MySqlProvider::query_named_fields(self, sql, params)
137    }
138
139    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
140        MySqlProvider::query_async(self, sql).await
141    }
142
143    async fn query_fields_async(
144        &self,
145        sql: &str,
146        params: &[DataField],
147    ) -> KnowledgeResult<Vec<RowData>> {
148        MySqlProvider::query_fields_async(self, sql, params).await
149    }
150
151    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
152        MySqlProvider::query_row_async(self, sql).await
153    }
154
155    async fn query_named_fields_async(
156        &self,
157        sql: &str,
158        params: &[DataField],
159    ) -> KnowledgeResult<RowData> {
160        MySqlProvider::query_named_fields_async(self, sql, params).await
161    }
162}
163
164fn install_provider<F>(
165    kind: ProviderKind,
166    datasource_id: DatasourceId,
167    build: F,
168) -> KnowledgeResult<()>
169where
170    F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
171{
172    runtime().install_provider(kind, datasource_id, build)?;
173    Ok(())
174}
175
176fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
177    DatasourceId::from_seed(kind, seed)
178}
179
180pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
181    let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
182    install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
183        Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
184            authority_uri,
185            datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
186            generation.0,
187        )))
188    })
189}
190
191pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
192    install_provider(
193        ProviderKind::SqliteAuthority,
194        datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
195        |generation| {
196            Ok(Arc::new(MemProvider {
197                db: memdb,
198                metadata_scope: MetadataCacheScope {
199                    datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
200                    generation,
201                },
202            }))
203        },
204    )
205}
206
207pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
208    init_postgres_provider_with_config(
209        PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
210    )
211}
212
213pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
214    let connection_uri = config.connection_uri().to_string();
215    let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
216    install_provider(
217        ProviderKind::Postgres,
218        datasource_id.clone(),
219        |generation| {
220            let provider = PostgresProvider::connect(
221                &config,
222                MetadataCacheScope {
223                    datasource_id: datasource_id.clone(),
224                    generation,
225                },
226            )?;
227            Ok(Arc::new(provider))
228        },
229    )
230}
231
232pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
233    init_mysql_provider_with_config(
234        MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
235    )
236}
237
238pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
239    let connection_uri = config.connection_uri().to_string();
240    let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
241    install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
242        let provider = MySqlProvider::connect(
243            &config,
244            MetadataCacheScope {
245                datasource_id: datasource_id.clone(),
246                generation,
247            },
248        )?;
249        Ok(Arc::new(provider))
250    })
251}
252
253// ---------------------------------------------------------------------------
254// Redis provider API
255// ---------------------------------------------------------------------------
256
257/// Check whether an item exists in a Bloom filter.
258///
259/// Returns `true` if the item *may* exist, `false` if it definitely does not.
260/// Requires the RedisBloom module.
261pub fn redis_bf_exists(key: &str, item: &str) -> KnowledgeResult<bool> {
262    if let Some(generation) = current_generation() {
263        let ck = redis_cache_key(RedisCmdTag::BfExists, generation, key, &[item]);
264        if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck, key) {
265            return Ok(v);
266        }
267        let result = crate::redis::bf_exists("knowdb", key, item)?;
268        runtime().redis_cache_put(ck, key, CachedRedisValue::Bool(result));
269        return Ok(result);
270    }
271    crate::redis::bf_exists("knowdb", key, item)
272}
273
274/// Get the value of a hash field.
275///
276/// Returns `None` when the key or field does not exist.
277pub fn redis_hget(key: &str, field: &str) -> KnowledgeResult<Option<String>> {
278    if let Some(generation) = current_generation() {
279        let ck = redis_cache_key(RedisCmdTag::HGet, generation, key, &[field]);
280        if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck, key) {
281            return Ok(v.clone());
282        }
283        let result = crate::redis::hget("knowdb", key, field)?;
284        runtime().redis_cache_put(ck, key, CachedRedisValue::OptString(result.clone()));
285        return Ok(result);
286    }
287    crate::redis::hget("knowdb", key, field)
288}
289
290/// Get the value of a key.
291///
292/// Returns `None` when the key does not exist.
293pub fn redis_get(key: &str) -> KnowledgeResult<Option<String>> {
294    if let Some(generation) = current_generation() {
295        let ck = redis_cache_key(RedisCmdTag::Get, generation, key, &[]);
296        if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck, key) {
297            return Ok(v.clone());
298        }
299        let result = crate::redis::get("knowdb", key)?;
300        runtime().redis_cache_put(ck, key, CachedRedisValue::OptString(result.clone()));
301        return Ok(result);
302    }
303    crate::redis::get("knowdb", key)
304}
305
306/// Check whether a member exists in a set.
307pub fn redis_set_exists(key: &str, member: &str) -> KnowledgeResult<bool> {
308    if let Some(generation) = current_generation() {
309        let ck = redis_cache_key(RedisCmdTag::SetExists, generation, key, &[member]);
310        if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck, key) {
311            return Ok(v);
312        }
313        let result = crate::redis::set_exists("knowdb", key, member)?;
314        runtime().redis_cache_put(ck, key, CachedRedisValue::Bool(result));
315        return Ok(result);
316    }
317    crate::redis::set_exists("knowdb", key, member)
318}
319
320/// Add items to a Bloom filter.
321///
322/// Returns a `Vec<bool>` indicating whether each item was *new* to the filter.
323/// Requires the RedisBloom module.
324pub fn redis_bf_add(key: &str, items: &[&str]) -> KnowledgeResult<Vec<bool>> {
325    let owned: Vec<String> = items.iter().map(|s| s.to_string()).collect();
326    crate::redis::bf_madd("knowdb", key, &owned)
327}
328
329/// Create a new Bloom filter.
330///
331/// `error_rate`: desired false positive rate (e.g. `0.01` for 1%).
332/// `capacity`: expected number of items to add.
333/// Requires the RedisBloom module.
334pub fn redis_bf_create(key: &str, error_rate: f64, capacity: i64) -> KnowledgeResult<()> {
335    crate::redis::bf_reserve("knowdb", key, error_rate, capacity)
336}
337
338#[allow(dead_code)]
339pub(crate) fn redis_ping() -> KnowledgeResult<bool> {
340    crate::redis::ping_blocking("knowdb")
341}
342
343#[allow(dead_code)]
344pub(crate) fn redis_close() -> KnowledgeResult<()> {
345    crate::redis::close(Some("knowdb"))
346}
347
348fn redis_cache_key(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
349    use std::collections::hash_map::DefaultHasher;
350    use std::hash::{Hash, Hasher};
351    let mut hasher = DefaultHasher::new();
352    key.hash(&mut hasher);
353    let key_hash = hasher.finish();
354    let mut hasher = DefaultHasher::new();
355    for arg in args {
356        arg.hash(&mut hasher);
357    }
358    let args_hash = hasher.finish();
359    RedisCacheKey {
360        generation,
361        cmd_tag: cmd,
362        key_hash,
363        args_hash,
364    }
365}
366
367// ---------------------------------------------------------------------------
368
369pub fn current_generation() -> Option<u64> {
370    runtime()
371        .current_generation()
372        .map(|generation| generation.0)
373}
374
375pub fn runtime_snapshot() -> RuntimeSnapshot {
376    runtime().snapshot()
377}
378
379pub fn install_runtime_telemetry(
380    telemetry_impl: Arc<dyn KnowledgeTelemetry>,
381) -> Arc<dyn KnowledgeTelemetry> {
382    install_telemetry(telemetry_impl)
383}
384
385pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
386    runtime()
387        .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
388        .map(QueryResponse::into_rows)
389}
390
391pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
392    runtime()
393        .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
394        .await
395        .map(QueryResponse::into_rows)
396}
397
398pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
399    runtime()
400        .execute(&QueryRequest::first_row(
401            sql,
402            Vec::new(),
403            CachePolicy::Bypass,
404        ))
405        .map(QueryResponse::into_row)
406}
407
408pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
409    runtime()
410        .execute_async(&QueryRequest::first_row(
411            sql,
412            Vec::new(),
413            CachePolicy::Bypass,
414        ))
415        .await
416        .map(QueryResponse::into_row)
417}
418
419pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
420    runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
421}
422
423pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
424    runtime()
425        .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
426        .await
427}
428
429pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
430    query_fields(sql, params)
431}
432
433pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
434    query_fields_async(sql, params).await
435}
436
437pub fn query_named<'a>(
438    sql: &str,
439    params: &'a [(&'a str, &'a dyn ToSql)],
440) -> KnowledgeResult<RowData> {
441    let fields = named_params_to_fields(params)?;
442    query_fields(sql, &fields)
443}
444
445pub fn cache_query_fields<const N: usize>(
446    sql: &str,
447    c_params: &[DataField; N],
448    query_params: &[DataField],
449    cache: &mut impl CacheAble<DataField, RowData, N>,
450) -> RowData {
451    cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
452}
453
454pub fn cache_query_fields_with_scope<const N: usize>(
455    sql: &str,
456    local_cache_scope: u64,
457    c_params: &[DataField; N],
458    query_params: &[DataField],
459    cache: &mut impl CacheAble<DataField, RowData, N>,
460) -> RowData {
461    if let Some(generation) = current_generation() {
462        cache.prepare_generation(generation);
463    }
464    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
465        runtime().record_local_cache_hit();
466        if telemetry_enabled() {
467            telemetry().on_cache(&CacheTelemetryEvent {
468                layer: CacheLayer::Local,
469                outcome: CacheOutcome::Hit,
470                provider_kind: runtime().current_provider_kind(),
471            });
472        }
473        return hit.clone();
474    }
475    runtime().record_local_cache_miss();
476    if telemetry_enabled() {
477        telemetry().on_cache(&CacheTelemetryEvent {
478            layer: CacheLayer::Local,
479            outcome: CacheOutcome::Miss,
480            provider_kind: runtime().current_provider_kind(),
481        });
482    }
483
484    match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
485        Ok(row) => {
486            cache.save_scoped(local_cache_scope, c_params, row.clone());
487            row
488        }
489        Err(err) => {
490            warn_kdb!("[kdb] query error: {}", err);
491            Vec::new()
492        }
493    }
494}
495
496pub async fn cache_query_fields_async<const N: usize>(
497    sql: &str,
498    c_params: &[DataField; N],
499    query_params: &[DataField],
500    cache: &mut impl CacheAble<DataField, RowData, N>,
501) -> RowData {
502    cache_query_fields_async_with_scope(
503        sql,
504        stable_hash(sql),
505        c_params,
506        || query_params.to_vec(),
507        cache,
508    )
509    .await
510}
511
512pub async fn cache_query_fields_async_with<const N: usize, F>(
513    sql: &str,
514    c_params: &[DataField; N],
515    build_query_params: F,
516    cache: &mut impl CacheAble<DataField, RowData, N>,
517) -> RowData
518where
519    F: FnOnce() -> Vec<DataField>,
520{
521    cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
522        .await
523}
524
525pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
526    sql: &str,
527    local_cache_scope: u64,
528    c_params: &[DataField; N],
529    build_query_params: F,
530    cache: &mut impl CacheAble<DataField, RowData, N>,
531) -> RowData
532where
533    F: FnOnce() -> Vec<DataField>,
534{
535    if let Some(generation) = current_generation() {
536        cache.prepare_generation(generation);
537    }
538    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
539        runtime().record_local_cache_hit();
540        if telemetry_enabled() {
541            telemetry().on_cache(&CacheTelemetryEvent {
542                layer: CacheLayer::Local,
543                outcome: CacheOutcome::Hit,
544                provider_kind: runtime().current_provider_kind(),
545            });
546        }
547        return hit.clone();
548    }
549    runtime().record_local_cache_miss();
550    if telemetry_enabled() {
551        telemetry().on_cache(&CacheTelemetryEvent {
552            layer: CacheLayer::Local,
553            outcome: CacheOutcome::Miss,
554            provider_kind: runtime().current_provider_kind(),
555        });
556    }
557
558    let query_params = build_query_params();
559    match runtime()
560        .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
561        .await
562    {
563        Ok(row) => {
564            cache.save_scoped(local_cache_scope, c_params, row.clone());
565            row
566        }
567        Err(err) => {
568            warn_kdb!("[kdb] query error: {}", err);
569            Vec::new()
570        }
571    }
572}
573
574pub fn cache_query<const N: usize>(
575    sql: &str,
576    c_params: &[DataField; N],
577    named_params: &[(&str, &dyn ToSql)],
578    cache: &mut impl CacheAble<DataField, RowData, N>,
579) -> RowData {
580    let query_fields = match named_params_to_fields(named_params) {
581        Ok(fields) => fields,
582        Err(err) => {
583            warn_kdb!("[kdb] query param conversion error: {}", err);
584            return Vec::new();
585        }
586    };
587
588    cache_query_fields(sql, c_params, &query_fields, cache)
589}
590
591pub async fn cache_query_async<const N: usize>(
592    sql: &str,
593    c_params: &[DataField; N],
594    named_params: &[(&str, &dyn ToSql)],
595    cache: &mut impl CacheAble<DataField, RowData, N>,
596) -> RowData {
597    let query_fields = match named_params_to_fields(named_params) {
598        Ok(fields) => fields,
599        Err(err) => {
600            warn_kdb!("[kdb] query param conversion error: {}", err);
601            return Vec::new();
602        }
603    };
604
605    cache_query_fields_async(sql, c_params, &query_fields, cache).await
606}
607
608fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
609    if let Ok(conn) = Connection::open_with_flags(
610        authority_uri,
611        OpenFlags::SQLITE_OPEN_READ_WRITE
612            | OpenFlags::SQLITE_OPEN_CREATE
613            | OpenFlags::SQLITE_OPEN_URI,
614    ) {
615        let _ = conn.execute_batch(
616            "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
617        );
618    }
619    Ok(())
620}
621
622pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
623    ensure_wal(authority_uri)?;
624    let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
625    let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
626    init_mem_provider(mem)
627}
628
629pub fn init_thread_cloned_from_knowdb(
630    root: &Path,
631    knowdb_conf: &Path,
632    authority_uri: &str,
633    dict: &orion_variate::EnvDict,
634) -> KnowledgeResult<()> {
635    let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
636    if let Some(provider_cfg) = conf.provider() {
637        // New-style [provider.sqldb]
638        if let Some(sqldb) = provider_cfg.sqldb {
639            match sqldb.kind {
640                SqlProviderKind::Postgres => {
641                    info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
642                    init_postgres_provider_with_config(
643                        PostgresProviderConfig::new(sqldb.connection_uri)
644                            .with_pool_size(sqldb.pool_size)
645                            .with_min_connections(sqldb.min_connections)
646                            .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
647                            .with_idle_timeout_ms(sqldb.idle_timeout_ms)
648                            .with_max_lifetime_ms(sqldb.max_lifetime_ms),
649                    )?;
650                    runtime().configure_result_cache(
651                        conf.cache.enabled,
652                        conf.cache.capacity,
653                        Duration::from_millis(conf.cache.ttl_ms.max(1)),
654                    );
655                    return Ok(());
656                }
657                SqlProviderKind::Mysql => {
658                    info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
659                    init_mysql_provider_with_config(
660                        MySqlProviderConfig::new(sqldb.connection_uri)
661                            .with_pool_size(sqldb.pool_size)
662                            .with_min_connections(sqldb.min_connections)
663                            .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
664                            .with_idle_timeout_ms(sqldb.idle_timeout_ms)
665                            .with_max_lifetime_ms(sqldb.max_lifetime_ms),
666                    )?;
667                    runtime().configure_result_cache(
668                        conf.cache.enabled,
669                        conf.cache.capacity,
670                        Duration::from_millis(conf.cache.ttl_ms.max(1)),
671                    );
672                    return Ok(());
673                }
674            }
675        }
676        // New-style [provider.redis]
677        if let Some(redis_cfg) = provider_cfg.redis {
678            info_ctrl!("init redis knowdb provider({}) ", conf_abs.display(),);
679            crate::redis::init_with_opts(
680                "knowdb",
681                &redis_cfg.connection_uri,
682                redis_cfg.pool_size,
683                redis_cfg.command_timeout_ms,
684            )?;
685            runtime().configure_redis_cache(
686                conf.cache.enabled,
687                conf.cache.capacity,
688                conf.cache.redis_key_map(),
689            );
690            return Ok(());
691        }
692    }
693
694    crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
695    let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
696        let path_part = rest.split('?').next().unwrap_or(rest);
697        format!("file:{}?mode=ro&uri=true", path_part)
698    } else {
699        authority_uri.to_string()
700    };
701
702    info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
703    init_thread_cloned_from_authority(&ro_uri)?;
704    runtime().configure_result_cache(
705        conf.cache.enabled,
706        conf.cache.capacity,
707        Duration::from_millis(conf.cache.ttl_ms.max(1)),
708    );
709    Ok(())
710}
711
712fn stable_hash(value: &str) -> u64 {
713    let mut hasher = DefaultHasher::new();
714    value.hash(&mut hasher);
715    hasher.finish()
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721    use crate::cache::FieldQueryCache;
722    use crate::error::KnowReason;
723    use crate::mem::memdb::MemDB;
724    use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
725    use crate::runtime::fields_to_params;
726    use crate::telemetry::{
727        CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
728        ReloadTelemetryEvent, reset_telemetry,
729    };
730    use orion_error::conversion::ToStructError;
731    use orion_variate::EnvDict;
732    use std::fs;
733    use std::hint::black_box;
734    use std::path::PathBuf;
735    use std::sync::atomic::{AtomicU64, Ordering};
736    use std::time::{Duration, Instant};
737
738    #[derive(Default)]
739    struct TestTelemetry {
740        reload_success: AtomicU64,
741        reload_failure: AtomicU64,
742        local_hits: AtomicU64,
743        local_misses: AtomicU64,
744        result_hits: AtomicU64,
745        result_misses: AtomicU64,
746        metadata_hits: AtomicU64,
747        metadata_misses: AtomicU64,
748        query_success: AtomicU64,
749        query_failure: AtomicU64,
750    }
751
752    impl KnowledgeTelemetry for TestTelemetry {
753        fn on_cache(&self, event: &CacheTelemetryEvent) {
754            let counter = match (event.layer, event.outcome) {
755                (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
756                (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
757                (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
758                (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
759                (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
760                (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
761            };
762            counter.fetch_add(1, Ordering::Relaxed);
763        }
764
765        fn on_reload(&self, event: &ReloadTelemetryEvent) {
766            let counter = match event.outcome {
767                crate::telemetry::ReloadOutcome::Success => &self.reload_success,
768                crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
769            };
770            counter.fetch_add(1, Ordering::Relaxed);
771        }
772
773        fn on_query(&self, event: &QueryTelemetryEvent) {
774            let counter = if event.success {
775                &self.query_success
776            } else {
777                &self.query_failure
778            };
779            counter.fetch_add(1, Ordering::Relaxed);
780        }
781    }
782
783    fn perf_env_usize(key: &str, default: usize) -> usize {
784        std::env::var(key)
785            .ok()
786            .and_then(|value| value.parse::<usize>().ok())
787            .unwrap_or(default)
788    }
789
790    fn seed_perf_provider(rows: usize) {
791        let db = MemDB::instance();
792        db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
793            .expect("create perf_kv");
794        db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
795        for id in 1..=rows {
796            let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
797            db.execute(sql.as_str()).expect("insert perf_kv row");
798        }
799        db.execute("COMMIT").expect("commit perf_kv load");
800        init_mem_provider(db).expect("init perf provider");
801    }
802
803    #[tokio::test(flavor = "current_thread")]
804    async fn query_async_works_with_mem_provider() {
805        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
806        let db = MemDB::instance();
807        db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
808            .expect("create async_kv");
809        db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
810            .expect("insert async_kv row");
811        init_mem_provider(db).expect("init mem provider");
812
813        let row = query_fields_async(
814            "SELECT value FROM async_kv WHERE id=:id",
815            &[DataField::from_digit(":id", 1)],
816        )
817        .await
818        .expect("query async row");
819        assert_eq!(row.len(), 1);
820        assert_eq!(row[0].to_string(), "chars(hello)");
821    }
822
823    #[derive(Clone)]
824    struct PerfQuery {
825        cache_key: [DataField; 1],
826        query_params: [DataField; 1],
827        bypass_req: QueryRequest,
828        global_req: QueryRequest,
829    }
830
831    fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
832        (0..ops)
833            .map(|idx| {
834                let id = ((idx * 17) % hotset + 1) as i64;
835                let cache_key = [DataField::from_digit("id", id)];
836                let query_params = [DataField::from_digit(":id", id)];
837                let bypass_req = QueryRequest::first_row(
838                    "SELECT value FROM perf_kv WHERE id=:id",
839                    fields_to_params(&query_params),
840                    CachePolicy::Bypass,
841                );
842                let global_req = QueryRequest::first_row(
843                    "SELECT value FROM perf_kv WHERE id=:id",
844                    fields_to_params(&query_params),
845                    CachePolicy::UseGlobal,
846                );
847                PerfQuery {
848                    cache_key,
849                    query_params,
850                    bypass_req,
851                    global_req,
852                }
853            })
854            .collect()
855    }
856
857    #[derive(Debug, Clone, Copy)]
858    struct PerfCounters {
859        result_hits: u64,
860        result_misses: u64,
861        local_hits: u64,
862        local_misses: u64,
863        metadata_hits: u64,
864        metadata_misses: u64,
865    }
866
867    #[derive(Debug, Clone)]
868    struct PerfResult {
869        name: &'static str,
870        elapsed: Duration,
871        ops: usize,
872        counters: PerfCounters,
873    }
874
875    impl PerfResult {
876        fn qps(&self) -> f64 {
877            let secs = self.elapsed.as_secs_f64();
878            if secs == 0.0 {
879                self.ops as f64
880            } else {
881                self.ops as f64 / secs
882            }
883        }
884    }
885
886    fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
887        PerfCounters {
888            result_hits: after
889                .result_cache_hits
890                .saturating_sub(before.result_cache_hits),
891            result_misses: after
892                .result_cache_misses
893                .saturating_sub(before.result_cache_misses),
894            local_hits: after
895                .local_cache_hits
896                .saturating_sub(before.local_cache_hits),
897            local_misses: after
898                .local_cache_misses
899                .saturating_sub(before.local_cache_misses),
900            metadata_hits: after
901                .metadata_cache_hits
902                .saturating_sub(before.metadata_cache_hits),
903            metadata_misses: after
904                .metadata_cache_misses
905                .saturating_sub(before.metadata_cache_misses),
906        }
907    }
908
909    fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
910        seed_perf_provider(rows);
911        let before = runtime_snapshot();
912        let started = Instant::now();
913        for item in workload {
914            let row = runtime()
915                .execute(&item.bypass_req)
916                .expect("execute bypass request")
917                .into_row();
918            black_box(row);
919        }
920        let elapsed = started.elapsed();
921        let after = runtime_snapshot();
922        PerfResult {
923            name: "bypass",
924            elapsed,
925            ops: workload.len(),
926            counters: snapshot_delta(&before, &after),
927        }
928    }
929
930    fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
931        seed_perf_provider(rows);
932        let before = runtime_snapshot();
933        let started = Instant::now();
934        for item in workload {
935            let row = runtime()
936                .execute(&item.global_req)
937                .expect("execute global-cache request")
938                .into_row();
939            black_box(row);
940        }
941        let elapsed = started.elapsed();
942        let after = runtime_snapshot();
943        PerfResult {
944            name: "global_cache",
945            elapsed,
946            ops: workload.len(),
947            counters: snapshot_delta(&before, &after),
948        }
949    }
950
951    fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
952        seed_perf_provider(rows);
953        let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
954        let before = runtime_snapshot();
955        let started = Instant::now();
956        for item in workload {
957            let row = cache_query_fields(
958                "SELECT value FROM perf_kv WHERE id=:id",
959                &item.cache_key,
960                &item.query_params,
961                &mut cache,
962            );
963            black_box(row);
964        }
965        let elapsed = started.elapsed();
966        let after = runtime_snapshot();
967        PerfResult {
968            name: "local_cache",
969            elapsed,
970            ops: workload.len(),
971            counters: snapshot_delta(&before, &after),
972        }
973    }
974
975    fn print_perf_result(result: &PerfResult) {
976        eprintln!(
977            "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
978            result.name,
979            result.elapsed.as_millis(),
980            result.qps(),
981            result.counters.result_hits,
982            result.counters.result_misses,
983            result.counters.local_hits,
984            result.counters.local_misses,
985            result.counters.metadata_hits,
986            result.counters.metadata_misses,
987        );
988    }
989
990    fn uniq_cache_cfg_tmp_dir() -> PathBuf {
991        use rand::{Rng, rng};
992        let rnd: u64 = rng().next_u64();
993        std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
994    }
995
996    fn write_minimal_knowdb_with_cache(
997        root: &Path,
998        enabled: bool,
999        capacity: usize,
1000        ttl_ms: u64,
1001    ) -> std::path::PathBuf {
1002        let models = root.join("models").join("knowledge");
1003        let example_dir = models.join("example");
1004        fs::create_dir_all(&example_dir).expect("create knowdb models/example");
1005        fs::write(
1006            models.join("knowdb.toml"),
1007            format!(
1008                r#"
1009version = 2
1010base_dir = "."
1011
1012[cache]
1013enabled = {enabled}
1014capacity = {capacity}
1015ttl_ms = {ttl_ms}
1016
1017[csv]
1018has_header = false
1019
1020[[tables]]
1021name = "example"
1022columns.by_index = [0,1]
1023"#
1024            ),
1025        )
1026        .expect("write knowdb.toml");
1027        fs::write(
1028            example_dir.join("create.sql"),
1029            r#"
1030CREATE TABLE IF NOT EXISTS {table} (
1031  id INTEGER PRIMARY KEY,
1032  value TEXT NOT NULL
1033);
1034"#,
1035        )
1036        .expect("write create.sql");
1037        fs::write(
1038            example_dir.join("insert.sql"),
1039            "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
1040        )
1041        .expect("write insert.sql");
1042        fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
1043        models.join("knowdb.toml")
1044    }
1045
1046    fn write_provider_only_knowdb_with_cache(
1047        root: &Path,
1048        provider_kind: &str,
1049        connection_uri: &str,
1050        enabled: bool,
1051        capacity: usize,
1052        ttl_ms: u64,
1053    ) -> std::path::PathBuf {
1054        let models = root.join("models").join("knowledge");
1055        fs::create_dir_all(&models).expect("create knowdb models");
1056        fs::write(
1057            models.join("knowdb.toml"),
1058            format!(
1059                r#"
1060version = 2
1061base_dir = "."
1062
1063[cache]
1064enabled = {enabled}
1065capacity = {capacity}
1066ttl_ms = {ttl_ms}
1067
1068[provider.sqldb]
1069kind = "{provider_kind}"
1070connection_uri = "{connection_uri}"
1071"#
1072            ),
1073        )
1074        .expect("write provider knowdb.toml");
1075        models.join("knowdb.toml")
1076    }
1077
1078    fn restore_default_result_cache_config() {
1079        runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
1080    }
1081
1082    #[test]
1083    fn provider_can_be_replaced() {
1084        let _guard = crate::runtime::runtime_test_guard()
1085            .lock()
1086            .expect("provider test guard");
1087        let db1 = MemDB::instance();
1088        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1089            .expect("create table in db1");
1090        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1091            .expect("seed db1");
1092        init_mem_provider(db1).expect("init provider db1");
1093        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
1094        assert_eq!(row[0].to_string(), "chars(first)");
1095
1096        let db2 = MemDB::instance();
1097        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1098            .expect("create table in db2");
1099        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1100            .expect("seed db2");
1101        init_mem_provider(db2).expect("replace provider with db2");
1102        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
1103        assert_eq!(row[0].to_string(), "chars(second)");
1104    }
1105
1106    #[test]
1107    fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
1108        let _guard = crate::runtime::runtime_test_guard()
1109            .lock()
1110            .expect("provider test guard");
1111        COLNAME_CACHE.write().expect("metadata cache lock").clear();
1112
1113        let db = MemDB::instance();
1114        db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
1115            .expect("create table");
1116        db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
1117            .expect("seed table");
1118
1119        let old_scope = MetadataCacheScope {
1120            datasource_id: DatasourceId("sqlite:old".to_string()),
1121            generation: Generation(1),
1122        };
1123        let new_scope = MetadataCacheScope {
1124            datasource_id: DatasourceId("sqlite:new".to_string()),
1125            generation: Generation(2),
1126        };
1127        let old_provider = MemProvider {
1128            db: db.clone(),
1129            metadata_scope: old_scope.clone(),
1130        };
1131
1132        install_provider(
1133            ProviderKind::SqliteAuthority,
1134            new_scope.datasource_id.clone(),
1135            |_generation| {
1136                Ok(Arc::new(MemProvider {
1137                    db: db.clone(),
1138                    metadata_scope: new_scope.clone(),
1139                }))
1140            },
1141        )
1142        .expect("install new provider");
1143
1144        let row = old_provider
1145            .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1146            .expect("old provider query");
1147        assert_eq!(row[0].to_string(), "chars(scope-old)");
1148
1149        let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1150        assert!(cache.contains(&metadata_cache_key_for_scope(
1151            &old_scope,
1152            "SELECT value FROM cache_scope_t WHERE id = 1",
1153        )));
1154        assert!(!cache.contains(&metadata_cache_key_for_scope(
1155            &new_scope,
1156            "SELECT value FROM cache_scope_t WHERE id = 1",
1157        )));
1158    }
1159
1160    #[tokio::test(flavor = "current_thread")]
1161    async fn async_query_uses_runtime_bridge() {
1162        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1163        let db = MemDB::instance();
1164        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1165            .expect("create table");
1166        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1167            .expect("seed table");
1168        init_mem_provider(db).expect("init provider");
1169
1170        let row = query_row_async("SELECT value FROM t WHERE id = 1")
1171            .await
1172            .expect("async query row");
1173        assert_eq!(row[0].to_string(), "chars(async-first)");
1174    }
1175
1176    #[tokio::test(flavor = "current_thread")]
1177    async fn async_cache_query_fields_hits_local_cache() {
1178        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1179        let db = MemDB::instance();
1180        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1181            .expect("create table");
1182        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1183            .expect("seed table");
1184        init_mem_provider(db).expect("init provider");
1185
1186        let key = [DataField::from_digit("id", 1)];
1187        let params = [DataField::from_digit(":id", 1)];
1188        let mut cache = FieldQueryCache::default();
1189
1190        let first = cache_query_fields_async(
1191            "SELECT value FROM t WHERE id=:id",
1192            &key,
1193            &params,
1194            &mut cache,
1195        )
1196        .await;
1197        let second = cache_query_fields_async(
1198            "SELECT value FROM t WHERE id=:id",
1199            &key,
1200            &params,
1201            &mut cache,
1202        )
1203        .await;
1204
1205        assert_eq!(first[0].to_string(), "chars(async-cache)");
1206        assert_eq!(second[0].to_string(), "chars(async-cache)");
1207    }
1208
1209    #[test]
1210    fn local_cache_is_cleared_when_generation_changes() {
1211        let _guard = crate::runtime::runtime_test_guard()
1212            .lock()
1213            .expect("provider test guard");
1214        let db1 = MemDB::instance();
1215        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1216            .expect("create table in db1");
1217        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1218            .expect("seed db1");
1219        init_mem_provider(db1).expect("init provider db1");
1220
1221        let key = [DataField::from_digit("id", 1)];
1222        let params = [DataField::from_digit(":id", 1)];
1223        let mut cache = FieldQueryCache::default();
1224        let row = cache_query_fields(
1225            "SELECT value FROM t WHERE id=:id",
1226            &key,
1227            &params,
1228            &mut cache,
1229        );
1230        assert_eq!(row[0].to_string(), "chars(first)");
1231
1232        let db2 = MemDB::instance();
1233        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1234            .expect("create table in db2");
1235        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1236            .expect("seed db2");
1237        init_mem_provider(db2).expect("replace provider with db2");
1238
1239        let row = cache_query_fields(
1240            "SELECT value FROM t WHERE id=:id",
1241            &key,
1242            &params,
1243            &mut cache,
1244        );
1245        assert_eq!(row[0].to_string(), "chars(second)");
1246    }
1247
1248    #[test]
1249    fn local_cache_is_scoped_by_sql_text() {
1250        let _guard = crate::runtime::runtime_test_guard()
1251            .lock()
1252            .expect("provider test guard");
1253        let db = MemDB::instance();
1254        db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1255            .expect("create t1");
1256        db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1257            .expect("create t2");
1258        db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1259            .expect("seed t1");
1260        db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1261            .expect("seed t2");
1262        init_mem_provider(db).expect("init provider");
1263
1264        let key = [DataField::from_digit("id", 1)];
1265        let params = [DataField::from_digit(":id", 1)];
1266        let mut cache = FieldQueryCache::default();
1267
1268        let row = cache_query_fields(
1269            "SELECT value FROM t1 WHERE id=:id",
1270            &key,
1271            &params,
1272            &mut cache,
1273        );
1274        assert_eq!(row[0].to_string(), "chars(first)");
1275
1276        let row = cache_query_fields(
1277            "SELECT value FROM t2 WHERE id=:id",
1278            &key,
1279            &params,
1280            &mut cache,
1281        );
1282        assert_eq!(row[0].to_string(), "chars(second)");
1283    }
1284
1285    #[test]
1286    fn runtime_snapshot_tracks_generation_and_cache_size() {
1287        let _guard = crate::runtime::runtime_test_guard()
1288            .lock()
1289            .expect("provider test guard");
1290        let db = MemDB::instance();
1291        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1292            .expect("create table");
1293        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1294            .expect("seed table");
1295        init_mem_provider(db).expect("init provider");
1296
1297        let mut cache = FieldQueryCache::default();
1298        let key = [DataField::from_digit("id", 1)];
1299        let params = [DataField::from_digit(":id", 1)];
1300        let row = cache_query_fields(
1301            "SELECT value FROM t WHERE id=:id",
1302            &key,
1303            &params,
1304            &mut cache,
1305        );
1306        assert_eq!(row[0].to_string(), "chars(first)");
1307
1308        let snapshot = runtime_snapshot();
1309        assert!(matches!(
1310            snapshot.provider_kind,
1311            Some(ProviderKind::SqliteAuthority)
1312        ));
1313        assert!(snapshot.generation.is_some());
1314        assert!(snapshot.result_cache_len >= 1);
1315        assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1316        assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1317        assert!(snapshot.reload_successes >= 1);
1318    }
1319
1320    #[test]
1321    fn metadata_cache_is_scoped_by_generation() {
1322        let _guard = crate::runtime::runtime_test_guard()
1323            .lock()
1324            .expect("provider test guard");
1325        let sql = "SELECT value FROM t WHERE id = 1";
1326        let before = runtime_snapshot().metadata_cache_len;
1327
1328        let db1 = MemDB::instance();
1329        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1330            .expect("create table in db1");
1331        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1332            .expect("seed db1");
1333        init_mem_provider(db1).expect("init provider db1");
1334        let row = query_row(sql).expect("query db1");
1335        assert_eq!(row[0].to_string(), "chars(first)");
1336        let after_first = runtime_snapshot().metadata_cache_len;
1337        assert!(
1338            after_first > before,
1339            "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1340        );
1341
1342        let db2 = MemDB::instance();
1343        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1344            .expect("create table in db2");
1345        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1346            .expect("seed db2");
1347        init_mem_provider(db2).expect("replace provider with db2");
1348        let row = query_row(sql).expect("query db2");
1349        assert_eq!(row[0].to_string(), "chars(second)");
1350        let after_second = runtime_snapshot().metadata_cache_len;
1351        assert!(
1352            after_second > after_first,
1353            "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1354        );
1355    }
1356
1357    #[test]
1358    fn failed_provider_reload_keeps_previous_provider() {
1359        let _guard = crate::runtime::runtime_test_guard()
1360            .lock()
1361            .expect("provider test guard");
1362        let db1 = MemDB::instance();
1363        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1364            .expect("create table in db1");
1365        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1366            .expect("seed db1");
1367        init_mem_provider(db1).expect("init provider db1");
1368        let before_generation = current_generation();
1369
1370        let reload_err = install_provider(
1371            ProviderKind::SqliteAuthority,
1372            datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1373            |_generation| {
1374                Err(KnowReason::from_logic()
1375                    .to_err()
1376                    .with_detail("expected reload failure"))
1377            },
1378        );
1379        assert!(reload_err.is_err());
1380
1381        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1382        assert_eq!(row[0].to_string(), "chars(first)");
1383        assert_eq!(current_generation(), before_generation);
1384    }
1385
1386    #[test]
1387    fn runtime_snapshot_records_cache_counters() {
1388        let _guard = crate::runtime::runtime_test_guard()
1389            .lock()
1390            .expect("provider test guard");
1391        let db = MemDB::instance();
1392        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1393            .expect("create table");
1394        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1395            .expect("seed table");
1396        init_mem_provider(db).expect("init provider");
1397
1398        let before = runtime_snapshot();
1399        let mut cache = FieldQueryCache::default();
1400        let key = [DataField::from_digit("id", 1)];
1401        let params = [DataField::from_digit(":id", 1)];
1402        let row = cache_query_fields(
1403            "SELECT value FROM t WHERE id=:id",
1404            &key,
1405            &params,
1406            &mut cache,
1407        );
1408        assert_eq!(row[0].to_string(), "chars(first)");
1409        let row = cache_query_fields(
1410            "SELECT value FROM t WHERE id=:id",
1411            &key,
1412            &params,
1413            &mut cache,
1414        );
1415        assert_eq!(row[0].to_string(), "chars(first)");
1416
1417        let after = runtime_snapshot();
1418        assert!(after.local_cache_hits > before.local_cache_hits);
1419        assert!(after.local_cache_misses > before.local_cache_misses);
1420        assert!(after.result_cache_misses > before.result_cache_misses);
1421        assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1422    }
1423
1424    #[test]
1425    fn telemetry_receives_reload_cache_and_query_events() {
1426        let _guard = crate::runtime::runtime_test_guard()
1427            .lock()
1428            .expect("provider test guard");
1429        let telemetry_impl = Arc::new(TestTelemetry::default());
1430        let previous = install_runtime_telemetry(telemetry_impl.clone());
1431
1432        let db = MemDB::instance();
1433        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1434            .expect("create table");
1435        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1436            .expect("seed table");
1437        init_mem_provider(db).expect("init provider");
1438
1439        let mut cache = FieldQueryCache::default();
1440        let key = [DataField::from_digit("id", 1)];
1441        let params = [DataField::from_digit(":id", 1)];
1442        let row = cache_query_fields(
1443            "SELECT value FROM t WHERE id=:id",
1444            &key,
1445            &params,
1446            &mut cache,
1447        );
1448        assert_eq!(row[0].to_string(), "chars(first)");
1449        let row = cache_query_fields(
1450            "SELECT value FROM t WHERE id=:id",
1451            &key,
1452            &params,
1453            &mut cache,
1454        );
1455        assert_eq!(row[0].to_string(), "chars(first)");
1456
1457        let reload_err = install_provider(
1458            ProviderKind::SqliteAuthority,
1459            datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1460            |_generation| {
1461                Err(KnowReason::from_logic()
1462                    .to_err()
1463                    .with_detail("expected telemetry reload failure"))
1464            },
1465        );
1466        assert!(reload_err.is_err());
1467
1468        install_runtime_telemetry(previous);
1469        reset_telemetry();
1470
1471        assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1472        assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1473        assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1474        assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1475        assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1476        assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1477        assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1478    }
1479
1480    #[test]
1481    #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1482    fn cache_perf_reports_cache_vs_no_cache() {
1483        let _guard = crate::runtime::runtime_test_guard()
1484            .lock()
1485            .expect("provider test guard");
1486        let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1487        let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1488        let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1489        let workload = build_perf_workload(ops, hotset);
1490
1491        eprintln!(
1492            "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1493            rows, ops, hotset
1494        );
1495
1496        let bypass = run_bypass_perf(&workload, rows);
1497        let global = run_global_cache_perf(&workload, rows);
1498        let local = run_local_cache_perf(&workload, rows);
1499
1500        print_perf_result(&bypass);
1501        print_perf_result(&global);
1502        print_perf_result(&local);
1503
1504        eprintln!(
1505            "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1506            bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1507            bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1508        );
1509
1510        assert_eq!(bypass.counters.result_hits, 0);
1511        assert_eq!(bypass.counters.result_misses, 0);
1512        assert_eq!(bypass.counters.local_hits, 0);
1513        assert_eq!(bypass.counters.local_misses, 0);
1514        assert!(global.counters.result_hits > 0);
1515        assert!(global.counters.result_misses > 0);
1516        assert_eq!(global.counters.local_hits, 0);
1517        assert_eq!(global.counters.local_misses, 0);
1518        assert!(local.counters.local_hits > 0);
1519        assert!(local.counters.local_misses > 0);
1520        assert!(local.counters.result_misses > 0);
1521    }
1522
1523    #[test]
1524    fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1525        let _guard = crate::runtime::runtime_test_guard()
1526            .lock()
1527            .expect("provider test guard");
1528        let root = uniq_cache_cfg_tmp_dir();
1529        let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1530        let auth_file = root.join(".run").join("authority.sqlite");
1531        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1532            .expect("create authority parent");
1533        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1534
1535        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1536            .expect("init knowdb with cache config");
1537
1538        let snapshot = runtime_snapshot();
1539        assert!(snapshot.result_cache_enabled);
1540        assert_eq!(snapshot.result_cache_capacity, 7);
1541        assert_eq!(snapshot.result_cache_ttl_ms, 5);
1542
1543        let req = QueryRequest::first_row(
1544            "SELECT value FROM example WHERE id=:id",
1545            fields_to_params(&[DataField::from_digit(":id", 1)]),
1546            CachePolicy::UseGlobal,
1547        );
1548        let before = runtime_snapshot();
1549        let row = runtime()
1550            .execute(&req)
1551            .expect("first result-cache query")
1552            .into_row();
1553        assert_eq!(row[0].to_string(), "chars(alpha)");
1554        let row = runtime()
1555            .execute(&req)
1556            .expect("second result-cache query")
1557            .into_row();
1558        assert_eq!(row[0].to_string(), "chars(alpha)");
1559        std::thread::sleep(Duration::from_millis(12));
1560        let row = runtime()
1561            .execute(&req)
1562            .expect("expired result-cache query")
1563            .into_row();
1564        assert_eq!(row[0].to_string(), "chars(alpha)");
1565        let after = runtime_snapshot();
1566
1567        assert!(after.result_cache_hits > before.result_cache_hits);
1568        assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1569
1570        restore_default_result_cache_config();
1571        let _ = fs::remove_dir_all(&root);
1572    }
1573
1574    #[test]
1575    fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1576        let _guard = crate::runtime::runtime_test_guard()
1577            .lock()
1578            .expect("provider test guard");
1579        let root = uniq_cache_cfg_tmp_dir();
1580        let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1581        let auth_file = root.join(".run").join("authority.sqlite");
1582        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1583            .expect("create authority parent");
1584        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1585
1586        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1587            .expect("init knowdb with cache disabled");
1588
1589        let snapshot = runtime_snapshot();
1590        assert!(!snapshot.result_cache_enabled);
1591        assert_eq!(snapshot.result_cache_capacity, 3);
1592        assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1593
1594        let req = QueryRequest::first_row(
1595            "SELECT value FROM example WHERE id=:id",
1596            fields_to_params(&[DataField::from_digit(":id", 1)]),
1597            CachePolicy::UseGlobal,
1598        );
1599        let before = runtime_snapshot();
1600        let _ = runtime()
1601            .execute(&req)
1602            .expect("first bypassed result-cache query");
1603        let _ = runtime()
1604            .execute(&req)
1605            .expect("second bypassed result-cache query");
1606        let after = runtime_snapshot();
1607
1608        assert_eq!(after.result_cache_hits, before.result_cache_hits);
1609        assert_eq!(after.result_cache_misses, before.result_cache_misses);
1610
1611        restore_default_result_cache_config();
1612        let _ = fs::remove_dir_all(&root);
1613    }
1614
1615    #[test]
1616    fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1617        let _guard = crate::runtime::runtime_test_guard()
1618            .lock()
1619            .expect("provider test guard");
1620        restore_default_result_cache_config();
1621
1622        let db = MemDB::instance();
1623        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1624            .expect("create table");
1625        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1626            .expect("seed table");
1627        init_mem_provider(db).expect("init provider");
1628
1629        let before = runtime_snapshot();
1630        let root = uniq_cache_cfg_tmp_dir();
1631        let conf_path = write_provider_only_knowdb_with_cache(
1632            &root,
1633            "mysql",
1634            "not-a-valid-mysql-url",
1635            false,
1636            3,
1637            5,
1638        );
1639        let authority_uri = format!(
1640            "file:{}?mode=rwc&uri=true",
1641            root.join("unused.sqlite").display()
1642        );
1643
1644        let err =
1645            init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1646        assert!(err.is_err());
1647
1648        let after = runtime_snapshot();
1649        assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1650        assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1651        assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1652
1653        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1654        assert_eq!(row[0].to_string(), "chars(first)");
1655
1656        let _ = fs::remove_dir_all(&root);
1657    }
1658}