Skip to main content

wp_knowledge/
facade.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::time::Duration;
4use std::{
5    collections::HashMap,
6    collections::hash_map::DefaultHasher,
7    hash::{Hash, Hasher},
8};
9
10use crate::error::KnowledgeResult;
11use async_trait::async_trait;
12use rusqlite::ToSql;
13use rusqlite::{Connection, OpenFlags};
14use wp_log::{info_ctrl, warn_kdb};
15use wp_model_core::model::DataField;
16
17use crate::cache::CacheAble;
18use crate::loader::{ProviderKind, SqlProviderKind, parse_knowdb_conf};
19use crate::mem::RowData;
20use crate::mem::memdb::MemDB;
21use crate::mem::thread_clone::ThreadClonedMDB;
22use crate::mysql::{MySqlProvider, MySqlProviderConfig};
23use crate::param::named_params_to_fields;
24use crate::postgres::{PostgresProvider, PostgresProviderConfig};
25use crate::runtime::{
26    CachePolicy, CachedRedisValue, DatasourceId, Generation, MetadataCacheScope, ProviderExecutor,
27    QueryRequest, QueryResponse, RedisCacheKey, RedisCmdTag, RuntimeSnapshot, runtime,
28};
29use crate::telemetry::{
30    CacheLayer, CacheOutcome, CacheTelemetryEvent, KnowledgeTelemetry, install_telemetry,
31    telemetry, telemetry_enabled,
32};
33
34struct MemProvider {
35    db: MemDB,
36    metadata_scope: MetadataCacheScope,
37}
38
39#[async_trait]
40impl ProviderExecutor for ThreadClonedMDB {
41    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
42        ThreadClonedMDB::query_with_scope(self, sql)
43    }
44
45    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
46        ThreadClonedMDB::query_fields_with_scope(self, sql, params)
47    }
48
49    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
50        ThreadClonedMDB::query_row_with_scope(self, sql)
51    }
52
53    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
54        ThreadClonedMDB::query_named_fields_with_scope(self, sql, params)
55    }
56}
57
58#[async_trait]
59impl ProviderExecutor for MemProvider {
60    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
61        self.db.query_with_scope(&self.metadata_scope, sql)
62    }
63
64    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
65        self.db
66            .query_fields_with_scope(&self.metadata_scope, sql, params)
67    }
68
69    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
70        self.db.query_row_with_scope(&self.metadata_scope, sql)
71    }
72
73    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
74        self.db
75            .query_named_fields_with_scope(&self.metadata_scope, sql, params)
76    }
77}
78
79#[async_trait]
80impl ProviderExecutor for PostgresProvider {
81    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
82        PostgresProvider::query(self, sql)
83    }
84
85    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
86        PostgresProvider::query_fields(self, sql, params)
87    }
88
89    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
90        PostgresProvider::query_row(self, sql)
91    }
92
93    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
94        PostgresProvider::query_named_fields(self, sql, params)
95    }
96
97    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
98        PostgresProvider::query_async(self, sql).await
99    }
100
101    async fn query_fields_async(
102        &self,
103        sql: &str,
104        params: &[DataField],
105    ) -> KnowledgeResult<Vec<RowData>> {
106        PostgresProvider::query_fields_async(self, sql, params).await
107    }
108
109    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
110        PostgresProvider::query_row_async(self, sql).await
111    }
112
113    async fn query_named_fields_async(
114        &self,
115        sql: &str,
116        params: &[DataField],
117    ) -> KnowledgeResult<RowData> {
118        PostgresProvider::query_named_fields_async(self, sql, params).await
119    }
120}
121
122#[async_trait]
123impl ProviderExecutor for MySqlProvider {
124    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
125        MySqlProvider::query(self, sql)
126    }
127
128    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
129        MySqlProvider::query_fields(self, sql, params)
130    }
131
132    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData> {
133        MySqlProvider::query_row(self, sql)
134    }
135
136    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
137        MySqlProvider::query_named_fields(self, sql, params)
138    }
139
140    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
141        MySqlProvider::query_async(self, sql).await
142    }
143
144    async fn query_fields_async(
145        &self,
146        sql: &str,
147        params: &[DataField],
148    ) -> KnowledgeResult<Vec<RowData>> {
149        MySqlProvider::query_fields_async(self, sql, params).await
150    }
151
152    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
153        MySqlProvider::query_row_async(self, sql).await
154    }
155
156    async fn query_named_fields_async(
157        &self,
158        sql: &str,
159        params: &[DataField],
160    ) -> KnowledgeResult<RowData> {
161        MySqlProvider::query_named_fields_async(self, sql, params).await
162    }
163}
164
165fn install_provider<F>(
166    kind: ProviderKind,
167    datasource_id: DatasourceId,
168    build: F,
169) -> KnowledgeResult<()>
170where
171    F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
172{
173    runtime().install_provider(kind, datasource_id, build)?;
174    Ok(())
175}
176
177fn datasource_id_for(kind: ProviderKind, seed: &str) -> DatasourceId {
178    DatasourceId::from_seed(kind, seed)
179}
180
181pub fn init_thread_cloned_from_authority(authority_uri: &str) -> KnowledgeResult<()> {
182    let datasource_id = datasource_id_for(ProviderKind::SqliteAuthority, authority_uri);
183    install_provider(ProviderKind::SqliteAuthority, datasource_id, |generation| {
184        Ok(Arc::new(ThreadClonedMDB::from_authority_with_scope(
185            authority_uri,
186            datasource_id_for(ProviderKind::SqliteAuthority, authority_uri),
187            generation.0,
188        )))
189    })
190}
191
192pub fn init_mem_provider(memdb: MemDB) -> KnowledgeResult<()> {
193    install_provider(
194        ProviderKind::SqliteAuthority,
195        datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
196        |generation| {
197            Ok(Arc::new(MemProvider {
198                db: memdb,
199                metadata_scope: MetadataCacheScope {
200                    datasource_id: datasource_id_for(ProviderKind::SqliteAuthority, "memdb"),
201                    generation,
202                },
203            }))
204        },
205    )
206}
207
208pub fn init_postgres_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
209    init_postgres_provider_with_config(
210        PostgresProviderConfig::new(connection_uri).with_pool_size(pool_size),
211    )
212}
213
214pub fn init_postgres_provider_with_config(config: PostgresProviderConfig) -> KnowledgeResult<()> {
215    let connection_uri = config.connection_uri().to_string();
216    let datasource_id = datasource_id_for(ProviderKind::Postgres, &connection_uri);
217    install_provider(
218        ProviderKind::Postgres,
219        datasource_id.clone(),
220        |generation| {
221            let provider = PostgresProvider::connect(
222                &config,
223                MetadataCacheScope {
224                    datasource_id: datasource_id.clone(),
225                    generation,
226                },
227            )?;
228            Ok(Arc::new(provider))
229        },
230    )
231}
232
233pub fn init_mysql_provider(connection_uri: &str, pool_size: Option<u32>) -> KnowledgeResult<()> {
234    init_mysql_provider_with_config(
235        MySqlProviderConfig::new(connection_uri).with_pool_size(pool_size),
236    )
237}
238
239pub fn init_mysql_provider_with_config(config: MySqlProviderConfig) -> KnowledgeResult<()> {
240    let connection_uri = config.connection_uri().to_string();
241    let datasource_id = datasource_id_for(ProviderKind::Mysql, &connection_uri);
242    install_provider(ProviderKind::Mysql, datasource_id.clone(), |generation| {
243        let provider = MySqlProvider::connect(
244            &config,
245            MetadataCacheScope {
246                datasource_id: datasource_id.clone(),
247                generation,
248            },
249        )?;
250        Ok(Arc::new(provider))
251    })
252}
253
254// ---------------------------------------------------------------------------
255// Redis provider API
256// ---------------------------------------------------------------------------
257
258/// Check whether an item exists in a Bloom filter.
259///
260/// Returns `true` if the item *may* exist, `false` if it definitely does not.
261/// Requires the RedisBloom module.
262pub fn redis_bf_exists(key: &str, item: &str) -> KnowledgeResult<bool> {
263    if let Some(generation) = current_generation() {
264        let ck = redis_cache_key(RedisCmdTag::BfExists, generation, key, &[item]);
265        if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck) {
266            return Ok(v);
267        }
268        let result = crate::redis::bf_exists("knowdb", key, item)?;
269        runtime().redis_cache_put(ck, CachedRedisValue::Bool(result));
270        return Ok(result);
271    }
272    crate::redis::bf_exists("knowdb", key, item)
273}
274
275/// Get the value of a hash field.
276///
277/// Returns `None` when the key or field does not exist.
278pub fn redis_hget(key: &str, field: &str) -> KnowledgeResult<Option<String>> {
279    if let Some(generation) = current_generation() {
280        let ck = redis_cache_key(RedisCmdTag::HGet, generation, key, &[field]);
281        if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck) {
282            return Ok(v.clone());
283        }
284        let result = crate::redis::hget("knowdb", key, field)?;
285        runtime().redis_cache_put(ck, CachedRedisValue::OptString(result.clone()));
286        return Ok(result);
287    }
288    crate::redis::hget("knowdb", key, field)
289}
290
291/// Get the value of a key.
292///
293/// Returns `None` when the key does not exist.
294pub fn redis_get(key: &str) -> KnowledgeResult<Option<String>> {
295    if let Some(generation) = current_generation() {
296        let ck = redis_cache_key(RedisCmdTag::Get, generation, key, &[]);
297        if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck) {
298            return Ok(v.clone());
299        }
300        let result = crate::redis::get("knowdb", key)?;
301        runtime().redis_cache_put(ck, CachedRedisValue::OptString(result.clone()));
302        return Ok(result);
303    }
304    crate::redis::get("knowdb", key)
305}
306
307/// Check whether a member exists in a set.
308pub fn redis_set_exists(key: &str, member: &str) -> KnowledgeResult<bool> {
309    if let Some(generation) = current_generation() {
310        let ck = redis_cache_key(RedisCmdTag::SetExists, generation, key, &[member]);
311        if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck) {
312            return Ok(v);
313        }
314        let result = crate::redis::set_exists("knowdb", key, member)?;
315        runtime().redis_cache_put(ck, CachedRedisValue::Bool(result));
316        return Ok(result);
317    }
318    crate::redis::set_exists("knowdb", key, member)
319}
320
321/// Add items to a Bloom filter.
322///
323/// Returns a `Vec<bool>` indicating whether each item was *new* to the filter.
324/// Requires the RedisBloom module.
325pub fn redis_bf_add(key: &str, items: &[&str]) -> KnowledgeResult<Vec<bool>> {
326    let owned: Vec<String> = items.iter().map(|s| s.to_string()).collect();
327    crate::redis::bf_madd("knowdb", key, &owned)
328}
329
330/// Create a new Bloom filter.
331///
332/// `error_rate`: desired false positive rate (e.g. `0.01` for 1%).
333/// `capacity`: expected number of items to add.
334/// Requires the RedisBloom module.
335pub fn redis_bf_create(key: &str, error_rate: f64, capacity: i64) -> KnowledgeResult<()> {
336    crate::redis::bf_reserve("knowdb", key, error_rate, capacity)
337}
338
339// ---------------------------------------------------------------------------
340// External call API — named queries from [fun.<name>]
341// ---------------------------------------------------------------------------
342
343/// Register [fun] definitions from knowdb.toml. Called once during init.
344pub(crate) fn register_fun_map(map: HashMap<String, crate::loader::FunSpec>) {
345    crate::fun::register_fun_map(map);
346}
347
348/// Execute a [fun.<name>] named query where returns = "bool".
349pub fn external_exists(service: &str, arg: &str) -> KnowledgeResult<bool> {
350    let spec = crate::fun::resolve_spec(service, true)?;
351    let redis_key = spec.key.as_deref().unwrap_or(arg);
352    if spec.enabled()
353        && let Some(generation) = current_generation()
354    {
355        let ck = redis_cache_key(RedisCmdTag::BfExists, generation, redis_key, &[arg]);
356        if let Some(CachedRedisValue::Bool(v)) = runtime().redis_cache_get(&ck) {
357            return Ok(v);
358        }
359        let result = crate::fun::external_exists(service, arg)?;
360        if let Some(ttl) = spec.ttl_ms {
361            runtime().redis_cache_put_with_ttl(ck, CachedRedisValue::Bool(result), ttl);
362        } else {
363            runtime().redis_cache_put(ck, CachedRedisValue::Bool(result));
364        }
365        return Ok(result);
366    }
367    crate::fun::external_exists(service, arg)
368}
369
370/// Execute a [fun.<name>] named query where returns = "value".
371pub fn external_value(service: &str, arg: &str) -> KnowledgeResult<Option<String>> {
372    let spec = crate::fun::resolve_spec(service, false)?;
373    let redis_key = spec.key.as_deref().unwrap_or(arg);
374    if spec.enabled()
375        && let Some(generation) = current_generation()
376    {
377        let ck = redis_cache_key(RedisCmdTag::Get, generation, redis_key, &[arg]);
378        if let Some(CachedRedisValue::OptString(ref v)) = runtime().redis_cache_get(&ck) {
379            return Ok(v.clone());
380        }
381        let result = crate::fun::external_value(service, arg)?;
382        if let Some(ttl) = spec.ttl_ms {
383            runtime().redis_cache_put_with_ttl(
384                ck,
385                CachedRedisValue::OptString(result.clone()),
386                ttl,
387            );
388        } else {
389            runtime().redis_cache_put(ck, CachedRedisValue::OptString(result.clone()));
390        }
391        return Ok(result);
392    }
393    crate::fun::external_value(service, arg)
394}
395
396#[allow(dead_code)]
397pub(crate) fn redis_ping() -> KnowledgeResult<bool> {
398    crate::redis::ping_blocking("knowdb")
399}
400
401#[allow(dead_code)]
402pub(crate) fn redis_close() -> KnowledgeResult<()> {
403    crate::redis::close(Some("knowdb"))
404}
405
406fn redis_cache_key(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
407    use std::collections::hash_map::DefaultHasher;
408    use std::hash::{Hash, Hasher};
409    let mut hasher = DefaultHasher::new();
410    key.hash(&mut hasher);
411    let key_hash = hasher.finish();
412    let mut hasher = DefaultHasher::new();
413    for arg in args {
414        arg.hash(&mut hasher);
415    }
416    let args_hash = hasher.finish();
417    RedisCacheKey {
418        generation,
419        cmd_tag: cmd,
420        key_hash,
421        args_hash,
422    }
423}
424
425// ---------------------------------------------------------------------------
426
427pub fn current_generation() -> Option<u64> {
428    runtime()
429        .current_generation()
430        .map(|generation| generation.0)
431}
432
433pub fn runtime_snapshot() -> RuntimeSnapshot {
434    runtime().snapshot()
435}
436
437pub fn install_runtime_telemetry(
438    telemetry_impl: Arc<dyn KnowledgeTelemetry>,
439) -> Arc<dyn KnowledgeTelemetry> {
440    install_telemetry(telemetry_impl)
441}
442
443pub fn query(sql: &str) -> KnowledgeResult<Vec<RowData>> {
444    runtime()
445        .execute(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
446        .map(QueryResponse::into_rows)
447}
448
449pub async fn query_async(sql: &str) -> KnowledgeResult<Vec<RowData>> {
450    runtime()
451        .execute_async(&QueryRequest::many(sql, Vec::new(), CachePolicy::Bypass))
452        .await
453        .map(QueryResponse::into_rows)
454}
455
456pub fn query_row(sql: &str) -> KnowledgeResult<RowData> {
457    runtime()
458        .execute(&QueryRequest::first_row(
459            sql,
460            Vec::new(),
461            CachePolicy::Bypass,
462        ))
463        .map(QueryResponse::into_row)
464}
465
466pub async fn query_row_async(sql: &str) -> KnowledgeResult<RowData> {
467    runtime()
468        .execute_async(&QueryRequest::first_row(
469            sql,
470            Vec::new(),
471            CachePolicy::Bypass,
472        ))
473        .await
474        .map(QueryResponse::into_row)
475}
476
477pub fn query_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
478    runtime().execute_first_row_fields(sql, params, CachePolicy::Bypass)
479}
480
481pub async fn query_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
482    runtime()
483        .execute_first_row_fields_async(sql, params, CachePolicy::Bypass)
484        .await
485}
486
487pub fn query_named_fields(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
488    query_fields(sql, params)
489}
490
491pub async fn query_named_fields_async(sql: &str, params: &[DataField]) -> KnowledgeResult<RowData> {
492    query_fields_async(sql, params).await
493}
494
495pub fn query_named<'a>(
496    sql: &str,
497    params: &'a [(&'a str, &'a dyn ToSql)],
498) -> KnowledgeResult<RowData> {
499    let fields = named_params_to_fields(params)?;
500    query_fields(sql, &fields)
501}
502
503pub fn cache_query_fields<const N: usize>(
504    sql: &str,
505    c_params: &[DataField; N],
506    query_params: &[DataField],
507    cache: &mut impl CacheAble<DataField, RowData, N>,
508) -> RowData {
509    cache_query_fields_with_scope(sql, stable_hash(sql), c_params, query_params, cache)
510}
511
512pub fn cache_query_fields_with_scope<const N: usize>(
513    sql: &str,
514    local_cache_scope: u64,
515    c_params: &[DataField; N],
516    query_params: &[DataField],
517    cache: &mut impl CacheAble<DataField, RowData, N>,
518) -> RowData {
519    if let Some(generation) = current_generation() {
520        cache.prepare_generation(generation);
521    }
522    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
523        runtime().record_local_cache_hit();
524        if telemetry_enabled() {
525            telemetry().on_cache(&CacheTelemetryEvent {
526                layer: CacheLayer::Local,
527                outcome: CacheOutcome::Hit,
528                provider_kind: runtime().current_provider_kind(),
529            });
530        }
531        return hit.clone();
532    }
533    runtime().record_local_cache_miss();
534    if telemetry_enabled() {
535        telemetry().on_cache(&CacheTelemetryEvent {
536            layer: CacheLayer::Local,
537            outcome: CacheOutcome::Miss,
538            provider_kind: runtime().current_provider_kind(),
539        });
540    }
541
542    match runtime().execute_first_row_fields(sql, query_params, CachePolicy::UseGlobal) {
543        Ok(row) => {
544            cache.save_scoped(local_cache_scope, c_params, row.clone());
545            row
546        }
547        Err(err) => {
548            warn_kdb!("[kdb] query error: {}", err);
549            Vec::new()
550        }
551    }
552}
553
554pub async fn cache_query_fields_async<const N: usize>(
555    sql: &str,
556    c_params: &[DataField; N],
557    query_params: &[DataField],
558    cache: &mut impl CacheAble<DataField, RowData, N>,
559) -> RowData {
560    cache_query_fields_async_with_scope(
561        sql,
562        stable_hash(sql),
563        c_params,
564        || query_params.to_vec(),
565        cache,
566    )
567    .await
568}
569
570pub async fn cache_query_fields_async_with<const N: usize, F>(
571    sql: &str,
572    c_params: &[DataField; N],
573    build_query_params: F,
574    cache: &mut impl CacheAble<DataField, RowData, N>,
575) -> RowData
576where
577    F: FnOnce() -> Vec<DataField>,
578{
579    cache_query_fields_async_with_scope(sql, stable_hash(sql), c_params, build_query_params, cache)
580        .await
581}
582
583pub async fn cache_query_fields_async_with_scope<const N: usize, F>(
584    sql: &str,
585    local_cache_scope: u64,
586    c_params: &[DataField; N],
587    build_query_params: F,
588    cache: &mut impl CacheAble<DataField, RowData, N>,
589) -> RowData
590where
591    F: FnOnce() -> Vec<DataField>,
592{
593    if let Some(generation) = current_generation() {
594        cache.prepare_generation(generation);
595    }
596    if let Some(hit) = cache.fetch_scoped(local_cache_scope, c_params) {
597        runtime().record_local_cache_hit();
598        if telemetry_enabled() {
599            telemetry().on_cache(&CacheTelemetryEvent {
600                layer: CacheLayer::Local,
601                outcome: CacheOutcome::Hit,
602                provider_kind: runtime().current_provider_kind(),
603            });
604        }
605        return hit.clone();
606    }
607    runtime().record_local_cache_miss();
608    if telemetry_enabled() {
609        telemetry().on_cache(&CacheTelemetryEvent {
610            layer: CacheLayer::Local,
611            outcome: CacheOutcome::Miss,
612            provider_kind: runtime().current_provider_kind(),
613        });
614    }
615
616    let query_params = build_query_params();
617    match runtime()
618        .execute_first_row_fields_async(sql, &query_params, CachePolicy::UseGlobal)
619        .await
620    {
621        Ok(row) => {
622            cache.save_scoped(local_cache_scope, c_params, row.clone());
623            row
624        }
625        Err(err) => {
626            warn_kdb!("[kdb] query error: {}", err);
627            Vec::new()
628        }
629    }
630}
631
632pub fn cache_query<const N: usize>(
633    sql: &str,
634    c_params: &[DataField; N],
635    named_params: &[(&str, &dyn ToSql)],
636    cache: &mut impl CacheAble<DataField, RowData, N>,
637) -> RowData {
638    let query_fields = match named_params_to_fields(named_params) {
639        Ok(fields) => fields,
640        Err(err) => {
641            warn_kdb!("[kdb] query param conversion error: {}", err);
642            return Vec::new();
643        }
644    };
645
646    cache_query_fields(sql, c_params, &query_fields, cache)
647}
648
649pub async fn cache_query_async<const N: usize>(
650    sql: &str,
651    c_params: &[DataField; N],
652    named_params: &[(&str, &dyn ToSql)],
653    cache: &mut impl CacheAble<DataField, RowData, N>,
654) -> RowData {
655    let query_fields = match named_params_to_fields(named_params) {
656        Ok(fields) => fields,
657        Err(err) => {
658            warn_kdb!("[kdb] query param conversion error: {}", err);
659            return Vec::new();
660        }
661    };
662
663    cache_query_fields_async(sql, c_params, &query_fields, cache).await
664}
665
666fn ensure_wal(authority_uri: &str) -> KnowledgeResult<()> {
667    if let Ok(conn) = Connection::open_with_flags(
668        authority_uri,
669        OpenFlags::SQLITE_OPEN_READ_WRITE
670            | OpenFlags::SQLITE_OPEN_CREATE
671            | OpenFlags::SQLITE_OPEN_URI,
672    ) {
673        let _ = conn.execute_batch(
674            "PRAGMA journal_mode=WAL;\nPRAGMA synchronous=NORMAL;\nPRAGMA temp_store=MEMORY;",
675        );
676    }
677    Ok(())
678}
679
680pub fn init_wal_pool_from_authority(authority_uri: &str, pool_size: u32) -> KnowledgeResult<()> {
681    ensure_wal(authority_uri)?;
682    let flags = OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_URI;
683    let mem = MemDB::new_file(authority_uri, pool_size, flags)?;
684    init_mem_provider(mem)
685}
686
687pub fn init_thread_cloned_from_knowdb(
688    root: &Path,
689    knowdb_conf: &Path,
690    authority_uri: &str,
691    dict: &orion_variate::EnvDict,
692) -> KnowledgeResult<()> {
693    let (conf, conf_abs, _) = parse_knowdb_conf(root, knowdb_conf, dict)?;
694    if let Some(provider_cfg) = conf.provider() {
695        // New-style [provider.sqldb]
696        if let Some(sqldb) = provider_cfg.sqldb {
697            match sqldb.kind {
698                SqlProviderKind::Postgres => {
699                    info_ctrl!("init postgres knowdb provider({}) ", conf_abs.display(),);
700                    init_postgres_provider_with_config(
701                        PostgresProviderConfig::new(sqldb.connection_uri)
702                            .with_pool_size(sqldb.pool_size)
703                            .with_min_connections(sqldb.min_connections)
704                            .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
705                            .with_idle_timeout_ms(sqldb.idle_timeout_ms)
706                            .with_max_lifetime_ms(sqldb.max_lifetime_ms),
707                    )?;
708                    runtime().configure_result_cache(
709                        conf.cache.enabled,
710                        conf.cache.capacity,
711                        Duration::from_millis(conf.cache.ttl_ms.max(1)),
712                    );
713                    return Ok(());
714                }
715                SqlProviderKind::Mysql => {
716                    info_ctrl!("init mysql knowdb provider({}) ", conf_abs.display(),);
717                    init_mysql_provider_with_config(
718                        MySqlProviderConfig::new(sqldb.connection_uri)
719                            .with_pool_size(sqldb.pool_size)
720                            .with_min_connections(sqldb.min_connections)
721                            .with_acquire_timeout_ms(sqldb.acquire_timeout_ms)
722                            .with_idle_timeout_ms(sqldb.idle_timeout_ms)
723                            .with_max_lifetime_ms(sqldb.max_lifetime_ms),
724                    )?;
725                    runtime().configure_result_cache(
726                        conf.cache.enabled,
727                        conf.cache.capacity,
728                        Duration::from_millis(conf.cache.ttl_ms.max(1)),
729                    );
730                    return Ok(());
731                }
732            }
733        }
734        // New-style [provider.redis]
735        if let Some(redis_cfg) = provider_cfg.redis {
736            info_ctrl!("init redis knowdb provider({}) ", conf_abs.display(),);
737            crate::redis::init_with_opts(
738                "knowdb",
739                &redis_cfg.connection_uri,
740                redis_cfg.pool_size,
741                redis_cfg.command_timeout_ms,
742            )?;
743            runtime().configure_redis_cache(conf.cache.enabled, conf.cache.capacity);
744            register_fun_map(conf.fun.clone());
745            return Ok(());
746        }
747    }
748
749    crate::loader::build_authority_from_knowdb(root, knowdb_conf, authority_uri, dict)?;
750    let ro_uri = if let Some(rest) = authority_uri.strip_prefix("file:") {
751        let path_part = rest.split('?').next().unwrap_or(rest);
752        format!("file:{}?mode=ro&uri=true", path_part)
753    } else {
754        authority_uri.to_string()
755    };
756
757    info_ctrl!("init authority knowdb success({}) ", knowdb_conf.display(),);
758    init_thread_cloned_from_authority(&ro_uri)?;
759    runtime().configure_result_cache(
760        conf.cache.enabled,
761        conf.cache.capacity,
762        Duration::from_millis(conf.cache.ttl_ms.max(1)),
763    );
764    Ok(())
765}
766
767fn stable_hash(value: &str) -> u64 {
768    let mut hasher = DefaultHasher::new();
769    value.hash(&mut hasher);
770    hasher.finish()
771}
772
773#[cfg(test)]
774mod tests {
775    use super::*;
776    use crate::cache::FieldQueryCache;
777    use crate::error::KnowReason;
778    use crate::mem::memdb::MemDB;
779    use crate::mem::query_util::{COLNAME_CACHE, metadata_cache_key_for_scope};
780    use crate::runtime::fields_to_params;
781    use crate::telemetry::{
782        CacheLayer, CacheTelemetryEvent, KnowledgeTelemetry, QueryTelemetryEvent,
783        ReloadTelemetryEvent, reset_telemetry,
784    };
785    use orion_error::conversion::ToStructError;
786    use orion_variate::EnvDict;
787    use std::fs;
788    use std::hint::black_box;
789    use std::path::PathBuf;
790    use std::sync::atomic::{AtomicU64, Ordering};
791    use std::time::{Duration, Instant};
792
793    #[derive(Default)]
794    struct TestTelemetry {
795        reload_success: AtomicU64,
796        reload_failure: AtomicU64,
797        local_hits: AtomicU64,
798        local_misses: AtomicU64,
799        result_hits: AtomicU64,
800        result_misses: AtomicU64,
801        metadata_hits: AtomicU64,
802        metadata_misses: AtomicU64,
803        query_success: AtomicU64,
804        query_failure: AtomicU64,
805    }
806
807    impl KnowledgeTelemetry for TestTelemetry {
808        fn on_cache(&self, event: &CacheTelemetryEvent) {
809            let counter = match (event.layer, event.outcome) {
810                (CacheLayer::Local, CacheOutcome::Hit) => &self.local_hits,
811                (CacheLayer::Local, CacheOutcome::Miss) => &self.local_misses,
812                (CacheLayer::Result, CacheOutcome::Hit) => &self.result_hits,
813                (CacheLayer::Result, CacheOutcome::Miss) => &self.result_misses,
814                (CacheLayer::Metadata, CacheOutcome::Hit) => &self.metadata_hits,
815                (CacheLayer::Metadata, CacheOutcome::Miss) => &self.metadata_misses,
816            };
817            counter.fetch_add(1, Ordering::Relaxed);
818        }
819
820        fn on_reload(&self, event: &ReloadTelemetryEvent) {
821            let counter = match event.outcome {
822                crate::telemetry::ReloadOutcome::Success => &self.reload_success,
823                crate::telemetry::ReloadOutcome::Failure => &self.reload_failure,
824            };
825            counter.fetch_add(1, Ordering::Relaxed);
826        }
827
828        fn on_query(&self, event: &QueryTelemetryEvent) {
829            let counter = if event.success {
830                &self.query_success
831            } else {
832                &self.query_failure
833            };
834            counter.fetch_add(1, Ordering::Relaxed);
835        }
836    }
837
838    fn perf_env_usize(key: &str, default: usize) -> usize {
839        std::env::var(key)
840            .ok()
841            .and_then(|value| value.parse::<usize>().ok())
842            .unwrap_or(default)
843    }
844
845    fn seed_perf_provider(rows: usize) {
846        let db = MemDB::instance();
847        db.execute("CREATE TABLE perf_kv (id INTEGER PRIMARY KEY, value TEXT)")
848            .expect("create perf_kv");
849        db.execute("BEGIN IMMEDIATE").expect("begin perf_kv load");
850        for id in 1..=rows {
851            let sql = format!("INSERT INTO perf_kv (id, value) VALUES ({id}, 'value_{id}')");
852            db.execute(sql.as_str()).expect("insert perf_kv row");
853        }
854        db.execute("COMMIT").expect("commit perf_kv load");
855        init_mem_provider(db).expect("init perf provider");
856    }
857
858    #[tokio::test(flavor = "current_thread")]
859    async fn query_async_works_with_mem_provider() {
860        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
861        let db = MemDB::instance();
862        db.execute("CREATE TABLE async_kv (id INTEGER PRIMARY KEY, value TEXT)")
863            .expect("create async_kv");
864        db.execute("INSERT INTO async_kv (id, value) VALUES (1, 'hello')")
865            .expect("insert async_kv row");
866        init_mem_provider(db).expect("init mem provider");
867
868        let row = query_fields_async(
869            "SELECT value FROM async_kv WHERE id=:id",
870            &[DataField::from_digit(":id", 1)],
871        )
872        .await
873        .expect("query async row");
874        assert_eq!(row.len(), 1);
875        assert_eq!(row[0].to_string(), "chars(hello)");
876    }
877
878    #[derive(Clone)]
879    struct PerfQuery {
880        cache_key: [DataField; 1],
881        query_params: [DataField; 1],
882        bypass_req: QueryRequest,
883        global_req: QueryRequest,
884    }
885
886    fn build_perf_workload(ops: usize, hotset: usize) -> Vec<PerfQuery> {
887        (0..ops)
888            .map(|idx| {
889                let id = ((idx * 17) % hotset + 1) as i64;
890                let cache_key = [DataField::from_digit("id", id)];
891                let query_params = [DataField::from_digit(":id", id)];
892                let bypass_req = QueryRequest::first_row(
893                    "SELECT value FROM perf_kv WHERE id=:id",
894                    fields_to_params(&query_params),
895                    CachePolicy::Bypass,
896                );
897                let global_req = QueryRequest::first_row(
898                    "SELECT value FROM perf_kv WHERE id=:id",
899                    fields_to_params(&query_params),
900                    CachePolicy::UseGlobal,
901                );
902                PerfQuery {
903                    cache_key,
904                    query_params,
905                    bypass_req,
906                    global_req,
907                }
908            })
909            .collect()
910    }
911
912    #[derive(Debug, Clone, Copy)]
913    struct PerfCounters {
914        result_hits: u64,
915        result_misses: u64,
916        local_hits: u64,
917        local_misses: u64,
918        metadata_hits: u64,
919        metadata_misses: u64,
920    }
921
922    #[derive(Debug, Clone)]
923    struct PerfResult {
924        name: &'static str,
925        elapsed: Duration,
926        ops: usize,
927        counters: PerfCounters,
928    }
929
930    impl PerfResult {
931        fn qps(&self) -> f64 {
932            let secs = self.elapsed.as_secs_f64();
933            if secs == 0.0 {
934                self.ops as f64
935            } else {
936                self.ops as f64 / secs
937            }
938        }
939    }
940
941    fn snapshot_delta(before: &RuntimeSnapshot, after: &RuntimeSnapshot) -> PerfCounters {
942        PerfCounters {
943            result_hits: after
944                .result_cache_hits
945                .saturating_sub(before.result_cache_hits),
946            result_misses: after
947                .result_cache_misses
948                .saturating_sub(before.result_cache_misses),
949            local_hits: after
950                .local_cache_hits
951                .saturating_sub(before.local_cache_hits),
952            local_misses: after
953                .local_cache_misses
954                .saturating_sub(before.local_cache_misses),
955            metadata_hits: after
956                .metadata_cache_hits
957                .saturating_sub(before.metadata_cache_hits),
958            metadata_misses: after
959                .metadata_cache_misses
960                .saturating_sub(before.metadata_cache_misses),
961        }
962    }
963
964    fn run_bypass_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
965        seed_perf_provider(rows);
966        let before = runtime_snapshot();
967        let started = Instant::now();
968        for item in workload {
969            let row = runtime()
970                .execute(&item.bypass_req)
971                .expect("execute bypass request")
972                .into_row();
973            black_box(row);
974        }
975        let elapsed = started.elapsed();
976        let after = runtime_snapshot();
977        PerfResult {
978            name: "bypass",
979            elapsed,
980            ops: workload.len(),
981            counters: snapshot_delta(&before, &after),
982        }
983    }
984
985    fn run_global_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
986        seed_perf_provider(rows);
987        let before = runtime_snapshot();
988        let started = Instant::now();
989        for item in workload {
990            let row = runtime()
991                .execute(&item.global_req)
992                .expect("execute global-cache request")
993                .into_row();
994            black_box(row);
995        }
996        let elapsed = started.elapsed();
997        let after = runtime_snapshot();
998        PerfResult {
999            name: "global_cache",
1000            elapsed,
1001            ops: workload.len(),
1002            counters: snapshot_delta(&before, &after),
1003        }
1004    }
1005
1006    fn run_local_cache_perf(workload: &[PerfQuery], rows: usize) -> PerfResult {
1007        seed_perf_provider(rows);
1008        let mut cache = FieldQueryCache::with_capacity(workload.len().max(1));
1009        let before = runtime_snapshot();
1010        let started = Instant::now();
1011        for item in workload {
1012            let row = cache_query_fields(
1013                "SELECT value FROM perf_kv WHERE id=:id",
1014                &item.cache_key,
1015                &item.query_params,
1016                &mut cache,
1017            );
1018            black_box(row);
1019        }
1020        let elapsed = started.elapsed();
1021        let after = runtime_snapshot();
1022        PerfResult {
1023            name: "local_cache",
1024            elapsed,
1025            ops: workload.len(),
1026            counters: snapshot_delta(&before, &after),
1027        }
1028    }
1029
1030    fn print_perf_result(result: &PerfResult) {
1031        eprintln!(
1032            "[wp-knowledge][cache-perf] scenario={} elapsed_ms={} qps={:.0} result_hit={} result_miss={} local_hit={} local_miss={} metadata_hit={} metadata_miss={}",
1033            result.name,
1034            result.elapsed.as_millis(),
1035            result.qps(),
1036            result.counters.result_hits,
1037            result.counters.result_misses,
1038            result.counters.local_hits,
1039            result.counters.local_misses,
1040            result.counters.metadata_hits,
1041            result.counters.metadata_misses,
1042        );
1043    }
1044
1045    fn uniq_cache_cfg_tmp_dir() -> PathBuf {
1046        use rand::{Rng, rng};
1047        let rnd: u64 = rng().next_u64();
1048        std::env::temp_dir().join(format!("wpk_cache_cfg_{}", rnd))
1049    }
1050
1051    fn write_minimal_knowdb_with_cache(
1052        root: &Path,
1053        enabled: bool,
1054        capacity: usize,
1055        ttl_ms: u64,
1056    ) -> std::path::PathBuf {
1057        let models = root.join("models").join("knowledge");
1058        let example_dir = models.join("example");
1059        fs::create_dir_all(&example_dir).expect("create knowdb models/example");
1060        fs::write(
1061            models.join("knowdb.toml"),
1062            format!(
1063                r#"
1064version = 2
1065base_dir = "."
1066
1067[cache]
1068enabled = {enabled}
1069capacity = {capacity}
1070ttl_ms = {ttl_ms}
1071
1072[csv]
1073has_header = false
1074
1075[[tables]]
1076name = "example"
1077columns.by_index = [0,1]
1078"#
1079            ),
1080        )
1081        .expect("write knowdb.toml");
1082        fs::write(
1083            example_dir.join("create.sql"),
1084            r#"
1085CREATE TABLE IF NOT EXISTS {table} (
1086  id INTEGER PRIMARY KEY,
1087  value TEXT NOT NULL
1088);
1089"#,
1090        )
1091        .expect("write create.sql");
1092        fs::write(
1093            example_dir.join("insert.sql"),
1094            "INSERT INTO {table} (id, value) VALUES (?1, ?2);\n",
1095        )
1096        .expect("write insert.sql");
1097        fs::write(example_dir.join("data.csv"), "1,alpha\n").expect("write data.csv");
1098        models.join("knowdb.toml")
1099    }
1100
1101    fn write_provider_only_knowdb_with_cache(
1102        root: &Path,
1103        provider_kind: &str,
1104        connection_uri: &str,
1105        enabled: bool,
1106        capacity: usize,
1107        ttl_ms: u64,
1108    ) -> std::path::PathBuf {
1109        let models = root.join("models").join("knowledge");
1110        fs::create_dir_all(&models).expect("create knowdb models");
1111        fs::write(
1112            models.join("knowdb.toml"),
1113            format!(
1114                r#"
1115version = 2
1116base_dir = "."
1117
1118[cache]
1119enabled = {enabled}
1120capacity = {capacity}
1121ttl_ms = {ttl_ms}
1122
1123[provider.sqldb]
1124kind = "{provider_kind}"
1125connection_uri = "{connection_uri}"
1126"#
1127            ),
1128        )
1129        .expect("write provider knowdb.toml");
1130        models.join("knowdb.toml")
1131    }
1132
1133    fn restore_default_result_cache_config() {
1134        runtime().configure_result_cache(true, 1024, Duration::from_millis(30_000));
1135    }
1136
1137    #[test]
1138    fn provider_can_be_replaced() {
1139        let _guard = crate::runtime::runtime_test_guard()
1140            .lock()
1141            .expect("provider test guard");
1142        let db1 = MemDB::instance();
1143        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1144            .expect("create table in db1");
1145        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1146            .expect("seed db1");
1147        init_mem_provider(db1).expect("init provider db1");
1148        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db1");
1149        assert_eq!(row[0].to_string(), "chars(first)");
1150
1151        let db2 = MemDB::instance();
1152        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1153            .expect("create table in db2");
1154        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1155            .expect("seed db2");
1156        init_mem_provider(db2).expect("replace provider with db2");
1157        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query db2");
1158        assert_eq!(row[0].to_string(), "chars(second)");
1159    }
1160
1161    #[test]
1162    fn sqlite_metadata_cache_uses_provider_scope_after_reload() {
1163        let _guard = crate::runtime::runtime_test_guard()
1164            .lock()
1165            .expect("provider test guard");
1166        COLNAME_CACHE.write().expect("metadata cache lock").clear();
1167
1168        let db = MemDB::instance();
1169        db.execute("CREATE TABLE cache_scope_t (id INTEGER PRIMARY KEY, value TEXT)")
1170            .expect("create table");
1171        db.execute("INSERT INTO cache_scope_t (id, value) VALUES (1, 'scope-old')")
1172            .expect("seed table");
1173
1174        let old_scope = MetadataCacheScope {
1175            datasource_id: DatasourceId("sqlite:old".to_string()),
1176            generation: Generation(1),
1177        };
1178        let new_scope = MetadataCacheScope {
1179            datasource_id: DatasourceId("sqlite:new".to_string()),
1180            generation: Generation(2),
1181        };
1182        let old_provider = MemProvider {
1183            db: db.clone(),
1184            metadata_scope: old_scope.clone(),
1185        };
1186
1187        install_provider(
1188            ProviderKind::SqliteAuthority,
1189            new_scope.datasource_id.clone(),
1190            |_generation| {
1191                Ok(Arc::new(MemProvider {
1192                    db: db.clone(),
1193                    metadata_scope: new_scope.clone(),
1194                }))
1195            },
1196        )
1197        .expect("install new provider");
1198
1199        let row = old_provider
1200            .query_row("SELECT value FROM cache_scope_t WHERE id = 1")
1201            .expect("old provider query");
1202        assert_eq!(row[0].to_string(), "chars(scope-old)");
1203
1204        let cache = COLNAME_CACHE.read().expect("metadata cache lock");
1205        assert!(cache.contains(&metadata_cache_key_for_scope(
1206            &old_scope,
1207            "SELECT value FROM cache_scope_t WHERE id = 1",
1208        )));
1209        assert!(!cache.contains(&metadata_cache_key_for_scope(
1210            &new_scope,
1211            "SELECT value FROM cache_scope_t WHERE id = 1",
1212        )));
1213    }
1214
1215    #[tokio::test(flavor = "current_thread")]
1216    async fn async_query_uses_runtime_bridge() {
1217        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1218        let db = MemDB::instance();
1219        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1220            .expect("create table");
1221        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-first')")
1222            .expect("seed table");
1223        init_mem_provider(db).expect("init provider");
1224
1225        let row = query_row_async("SELECT value FROM t WHERE id = 1")
1226            .await
1227            .expect("async query row");
1228        assert_eq!(row[0].to_string(), "chars(async-first)");
1229    }
1230
1231    #[tokio::test(flavor = "current_thread")]
1232    async fn async_cache_query_fields_hits_local_cache() {
1233        let _guard = crate::runtime::runtime_test_guard().lock_async().await;
1234        let db = MemDB::instance();
1235        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1236            .expect("create table");
1237        db.execute("INSERT INTO t (id, value) VALUES (1, 'async-cache')")
1238            .expect("seed table");
1239        init_mem_provider(db).expect("init provider");
1240
1241        let key = [DataField::from_digit("id", 1)];
1242        let params = [DataField::from_digit(":id", 1)];
1243        let mut cache = FieldQueryCache::default();
1244
1245        let first = cache_query_fields_async(
1246            "SELECT value FROM t WHERE id=:id",
1247            &key,
1248            &params,
1249            &mut cache,
1250        )
1251        .await;
1252        let second = cache_query_fields_async(
1253            "SELECT value FROM t WHERE id=:id",
1254            &key,
1255            &params,
1256            &mut cache,
1257        )
1258        .await;
1259
1260        assert_eq!(first[0].to_string(), "chars(async-cache)");
1261        assert_eq!(second[0].to_string(), "chars(async-cache)");
1262    }
1263
1264    #[test]
1265    fn local_cache_is_cleared_when_generation_changes() {
1266        let _guard = crate::runtime::runtime_test_guard()
1267            .lock()
1268            .expect("provider test guard");
1269        let db1 = MemDB::instance();
1270        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1271            .expect("create table in db1");
1272        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1273            .expect("seed db1");
1274        init_mem_provider(db1).expect("init provider db1");
1275
1276        let key = [DataField::from_digit("id", 1)];
1277        let params = [DataField::from_digit(":id", 1)];
1278        let mut cache = FieldQueryCache::default();
1279        let row = cache_query_fields(
1280            "SELECT value FROM t WHERE id=:id",
1281            &key,
1282            &params,
1283            &mut cache,
1284        );
1285        assert_eq!(row[0].to_string(), "chars(first)");
1286
1287        let db2 = MemDB::instance();
1288        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1289            .expect("create table in db2");
1290        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1291            .expect("seed db2");
1292        init_mem_provider(db2).expect("replace provider with db2");
1293
1294        let row = cache_query_fields(
1295            "SELECT value FROM t WHERE id=:id",
1296            &key,
1297            &params,
1298            &mut cache,
1299        );
1300        assert_eq!(row[0].to_string(), "chars(second)");
1301    }
1302
1303    #[test]
1304    fn local_cache_is_scoped_by_sql_text() {
1305        let _guard = crate::runtime::runtime_test_guard()
1306            .lock()
1307            .expect("provider test guard");
1308        let db = MemDB::instance();
1309        db.execute("CREATE TABLE t1 (id INTEGER PRIMARY KEY, value TEXT)")
1310            .expect("create t1");
1311        db.execute("CREATE TABLE t2 (id INTEGER PRIMARY KEY, value TEXT)")
1312            .expect("create t2");
1313        db.execute("INSERT INTO t1 (id, value) VALUES (1, 'first')")
1314            .expect("seed t1");
1315        db.execute("INSERT INTO t2 (id, value) VALUES (1, 'second')")
1316            .expect("seed t2");
1317        init_mem_provider(db).expect("init provider");
1318
1319        let key = [DataField::from_digit("id", 1)];
1320        let params = [DataField::from_digit(":id", 1)];
1321        let mut cache = FieldQueryCache::default();
1322
1323        let row = cache_query_fields(
1324            "SELECT value FROM t1 WHERE id=:id",
1325            &key,
1326            &params,
1327            &mut cache,
1328        );
1329        assert_eq!(row[0].to_string(), "chars(first)");
1330
1331        let row = cache_query_fields(
1332            "SELECT value FROM t2 WHERE id=:id",
1333            &key,
1334            &params,
1335            &mut cache,
1336        );
1337        assert_eq!(row[0].to_string(), "chars(second)");
1338    }
1339
1340    #[test]
1341    fn runtime_snapshot_tracks_generation_and_cache_size() {
1342        let _guard = crate::runtime::runtime_test_guard()
1343            .lock()
1344            .expect("provider test guard");
1345        let db = MemDB::instance();
1346        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1347            .expect("create table");
1348        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1349            .expect("seed table");
1350        init_mem_provider(db).expect("init provider");
1351
1352        let mut cache = FieldQueryCache::default();
1353        let key = [DataField::from_digit("id", 1)];
1354        let params = [DataField::from_digit(":id", 1)];
1355        let row = cache_query_fields(
1356            "SELECT value FROM t WHERE id=:id",
1357            &key,
1358            &params,
1359            &mut cache,
1360        );
1361        assert_eq!(row[0].to_string(), "chars(first)");
1362
1363        let snapshot = runtime_snapshot();
1364        assert!(matches!(
1365            snapshot.provider_kind,
1366            Some(ProviderKind::SqliteAuthority)
1367        ));
1368        assert!(snapshot.generation.is_some());
1369        assert!(snapshot.result_cache_len >= 1);
1370        assert!(snapshot.result_cache_capacity >= snapshot.result_cache_len);
1371        assert!(snapshot.metadata_cache_capacity >= snapshot.metadata_cache_len);
1372        assert!(snapshot.reload_successes >= 1);
1373    }
1374
1375    #[test]
1376    fn metadata_cache_is_scoped_by_generation() {
1377        let _guard = crate::runtime::runtime_test_guard()
1378            .lock()
1379            .expect("provider test guard");
1380        let sql = "SELECT value FROM t WHERE id = 1";
1381        let before = runtime_snapshot().metadata_cache_len;
1382
1383        let db1 = MemDB::instance();
1384        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1385            .expect("create table in db1");
1386        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1387            .expect("seed db1");
1388        init_mem_provider(db1).expect("init provider db1");
1389        let row = query_row(sql).expect("query db1");
1390        assert_eq!(row[0].to_string(), "chars(first)");
1391        let after_first = runtime_snapshot().metadata_cache_len;
1392        assert!(
1393            after_first > before,
1394            "metadata cache did not record first generation entry: before={before} after_first={after_first}"
1395        );
1396
1397        let db2 = MemDB::instance();
1398        db2.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1399            .expect("create table in db2");
1400        db2.execute("INSERT INTO t (id, value) VALUES (1, 'second')")
1401            .expect("seed db2");
1402        init_mem_provider(db2).expect("replace provider with db2");
1403        let row = query_row(sql).expect("query db2");
1404        assert_eq!(row[0].to_string(), "chars(second)");
1405        let after_second = runtime_snapshot().metadata_cache_len;
1406        assert!(
1407            after_second > after_first,
1408            "metadata cache did not keep a distinct generation entry: after_first={after_first} after_second={after_second}"
1409        );
1410    }
1411
1412    #[test]
1413    fn failed_provider_reload_keeps_previous_provider() {
1414        let _guard = crate::runtime::runtime_test_guard()
1415            .lock()
1416            .expect("provider test guard");
1417        let db1 = MemDB::instance();
1418        db1.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1419            .expect("create table in db1");
1420        db1.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1421            .expect("seed db1");
1422        init_mem_provider(db1).expect("init provider db1");
1423        let before_generation = current_generation();
1424
1425        let reload_err = install_provider(
1426            ProviderKind::SqliteAuthority,
1427            datasource_id_for(ProviderKind::SqliteAuthority, "reload-failure"),
1428            |_generation| {
1429                Err(KnowReason::from_logic()
1430                    .to_err()
1431                    .with_detail("expected reload failure"))
1432            },
1433        );
1434        assert!(reload_err.is_err());
1435
1436        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1437        assert_eq!(row[0].to_string(), "chars(first)");
1438        assert_eq!(current_generation(), before_generation);
1439    }
1440
1441    #[test]
1442    fn runtime_snapshot_records_cache_counters() {
1443        let _guard = crate::runtime::runtime_test_guard()
1444            .lock()
1445            .expect("provider test guard");
1446        let db = MemDB::instance();
1447        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1448            .expect("create table");
1449        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1450            .expect("seed table");
1451        init_mem_provider(db).expect("init provider");
1452
1453        let before = runtime_snapshot();
1454        let mut cache = FieldQueryCache::default();
1455        let key = [DataField::from_digit("id", 1)];
1456        let params = [DataField::from_digit(":id", 1)];
1457        let row = cache_query_fields(
1458            "SELECT value FROM t WHERE id=:id",
1459            &key,
1460            &params,
1461            &mut cache,
1462        );
1463        assert_eq!(row[0].to_string(), "chars(first)");
1464        let row = cache_query_fields(
1465            "SELECT value FROM t WHERE id=:id",
1466            &key,
1467            &params,
1468            &mut cache,
1469        );
1470        assert_eq!(row[0].to_string(), "chars(first)");
1471
1472        let after = runtime_snapshot();
1473        assert!(after.local_cache_hits > before.local_cache_hits);
1474        assert!(after.local_cache_misses > before.local_cache_misses);
1475        assert!(after.result_cache_misses > before.result_cache_misses);
1476        assert!(after.metadata_cache_misses > before.metadata_cache_misses);
1477    }
1478
1479    #[test]
1480    fn telemetry_receives_reload_cache_and_query_events() {
1481        let _guard = crate::runtime::runtime_test_guard()
1482            .lock()
1483            .expect("provider test guard");
1484        let telemetry_impl = Arc::new(TestTelemetry::default());
1485        let previous = install_runtime_telemetry(telemetry_impl.clone());
1486
1487        let db = MemDB::instance();
1488        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1489            .expect("create table");
1490        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1491            .expect("seed table");
1492        init_mem_provider(db).expect("init provider");
1493
1494        let mut cache = FieldQueryCache::default();
1495        let key = [DataField::from_digit("id", 1)];
1496        let params = [DataField::from_digit(":id", 1)];
1497        let row = cache_query_fields(
1498            "SELECT value FROM t WHERE id=:id",
1499            &key,
1500            &params,
1501            &mut cache,
1502        );
1503        assert_eq!(row[0].to_string(), "chars(first)");
1504        let row = cache_query_fields(
1505            "SELECT value FROM t WHERE id=:id",
1506            &key,
1507            &params,
1508            &mut cache,
1509        );
1510        assert_eq!(row[0].to_string(), "chars(first)");
1511
1512        let reload_err = install_provider(
1513            ProviderKind::SqliteAuthority,
1514            datasource_id_for(ProviderKind::SqliteAuthority, "telemetry-failure"),
1515            |_generation| {
1516                Err(KnowReason::from_logic()
1517                    .to_err()
1518                    .with_detail("expected telemetry reload failure"))
1519            },
1520        );
1521        assert!(reload_err.is_err());
1522
1523        install_runtime_telemetry(previous);
1524        reset_telemetry();
1525
1526        assert!(telemetry_impl.reload_success.load(Ordering::Relaxed) >= 1);
1527        assert!(telemetry_impl.reload_failure.load(Ordering::Relaxed) >= 1);
1528        assert!(telemetry_impl.local_hits.load(Ordering::Relaxed) >= 1);
1529        assert!(telemetry_impl.local_misses.load(Ordering::Relaxed) >= 1);
1530        assert!(telemetry_impl.result_misses.load(Ordering::Relaxed) >= 1);
1531        assert!(telemetry_impl.metadata_misses.load(Ordering::Relaxed) >= 1);
1532        assert!(telemetry_impl.query_success.load(Ordering::Relaxed) >= 1);
1533    }
1534
1535    #[test]
1536    #[ignore = "manual perf comparison; run with cargo test cache_perf_reports_cache_vs_no_cache -- --ignored --nocapture"]
1537    fn cache_perf_reports_cache_vs_no_cache() {
1538        let _guard = crate::runtime::runtime_test_guard()
1539            .lock()
1540            .expect("provider test guard");
1541        let rows = perf_env_usize("WP_KDB_PERF_ROWS", 10_000).max(1);
1542        let ops = perf_env_usize("WP_KDB_PERF_OPS", 120_000).max(1);
1543        let hotset = perf_env_usize("WP_KDB_PERF_HOTSET", 128).clamp(1, rows);
1544        let workload = build_perf_workload(ops, hotset);
1545
1546        eprintln!(
1547            "[wp-knowledge][cache-perf] rows={} ops={} hotset={} sql=SELECT value FROM perf_kv WHERE id=:id",
1548            rows, ops, hotset
1549        );
1550
1551        let bypass = run_bypass_perf(&workload, rows);
1552        let global = run_global_cache_perf(&workload, rows);
1553        let local = run_local_cache_perf(&workload, rows);
1554
1555        print_perf_result(&bypass);
1556        print_perf_result(&global);
1557        print_perf_result(&local);
1558
1559        eprintln!(
1560            "[wp-knowledge][cache-perf] speedup global_vs_bypass={:.2}x local_vs_bypass={:.2}x",
1561            bypass.elapsed.as_secs_f64() / global.elapsed.as_secs_f64(),
1562            bypass.elapsed.as_secs_f64() / local.elapsed.as_secs_f64(),
1563        );
1564
1565        assert_eq!(bypass.counters.result_hits, 0);
1566        assert_eq!(bypass.counters.result_misses, 0);
1567        assert_eq!(bypass.counters.local_hits, 0);
1568        assert_eq!(bypass.counters.local_misses, 0);
1569        assert!(global.counters.result_hits > 0);
1570        assert!(global.counters.result_misses > 0);
1571        assert_eq!(global.counters.local_hits, 0);
1572        assert_eq!(global.counters.local_misses, 0);
1573        assert!(local.counters.local_hits > 0);
1574        assert!(local.counters.local_misses > 0);
1575        assert!(local.counters.result_misses > 0);
1576    }
1577
1578    #[test]
1579    fn init_thread_cloned_from_knowdb_applies_result_cache_config() {
1580        let _guard = crate::runtime::runtime_test_guard()
1581            .lock()
1582            .expect("provider test guard");
1583        let root = uniq_cache_cfg_tmp_dir();
1584        let conf_path = write_minimal_knowdb_with_cache(&root, true, 7, 5);
1585        let auth_file = root.join(".run").join("authority.sqlite");
1586        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1587            .expect("create authority parent");
1588        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1589
1590        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1591            .expect("init knowdb with cache config");
1592
1593        let snapshot = runtime_snapshot();
1594        assert!(snapshot.result_cache_enabled);
1595        assert_eq!(snapshot.result_cache_capacity, 7);
1596        assert_eq!(snapshot.result_cache_ttl_ms, 5);
1597
1598        let req = QueryRequest::first_row(
1599            "SELECT value FROM example WHERE id=:id",
1600            fields_to_params(&[DataField::from_digit(":id", 1)]),
1601            CachePolicy::UseGlobal,
1602        );
1603        let before = runtime_snapshot();
1604        let row = runtime()
1605            .execute(&req)
1606            .expect("first result-cache query")
1607            .into_row();
1608        assert_eq!(row[0].to_string(), "chars(alpha)");
1609        let row = runtime()
1610            .execute(&req)
1611            .expect("second result-cache query")
1612            .into_row();
1613        assert_eq!(row[0].to_string(), "chars(alpha)");
1614        std::thread::sleep(Duration::from_millis(12));
1615        let row = runtime()
1616            .execute(&req)
1617            .expect("expired result-cache query")
1618            .into_row();
1619        assert_eq!(row[0].to_string(), "chars(alpha)");
1620        let after = runtime_snapshot();
1621
1622        assert!(after.result_cache_hits > before.result_cache_hits);
1623        assert!(after.result_cache_misses >= before.result_cache_misses + 2);
1624
1625        restore_default_result_cache_config();
1626        let _ = fs::remove_dir_all(&root);
1627    }
1628
1629    #[test]
1630    fn disabled_result_cache_from_knowdb_config_forces_bypass() {
1631        let _guard = crate::runtime::runtime_test_guard()
1632            .lock()
1633            .expect("provider test guard");
1634        let root = uniq_cache_cfg_tmp_dir();
1635        let conf_path = write_minimal_knowdb_with_cache(&root, false, 3, 30_000);
1636        let auth_file = root.join(".run").join("authority.sqlite");
1637        fs::create_dir_all(auth_file.parent().expect("authority parent"))
1638            .expect("create authority parent");
1639        let authority_uri = format!("file:{}?mode=rwc&uri=true", auth_file.display());
1640
1641        init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default())
1642            .expect("init knowdb with cache disabled");
1643
1644        let snapshot = runtime_snapshot();
1645        assert!(!snapshot.result_cache_enabled);
1646        assert_eq!(snapshot.result_cache_capacity, 3);
1647        assert_eq!(snapshot.result_cache_ttl_ms, 30_000);
1648
1649        let req = QueryRequest::first_row(
1650            "SELECT value FROM example WHERE id=:id",
1651            fields_to_params(&[DataField::from_digit(":id", 1)]),
1652            CachePolicy::UseGlobal,
1653        );
1654        let before = runtime_snapshot();
1655        let _ = runtime()
1656            .execute(&req)
1657            .expect("first bypassed result-cache query");
1658        let _ = runtime()
1659            .execute(&req)
1660            .expect("second bypassed result-cache query");
1661        let after = runtime_snapshot();
1662
1663        assert_eq!(after.result_cache_hits, before.result_cache_hits);
1664        assert_eq!(after.result_cache_misses, before.result_cache_misses);
1665
1666        restore_default_result_cache_config();
1667        let _ = fs::remove_dir_all(&root);
1668    }
1669
1670    #[test]
1671    fn failed_knowdb_provider_init_does_not_apply_cache_config() {
1672        let _guard = crate::runtime::runtime_test_guard()
1673            .lock()
1674            .expect("provider test guard");
1675        restore_default_result_cache_config();
1676
1677        let db = MemDB::instance();
1678        db.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, value TEXT)")
1679            .expect("create table");
1680        db.execute("INSERT INTO t (id, value) VALUES (1, 'first')")
1681            .expect("seed table");
1682        init_mem_provider(db).expect("init provider");
1683
1684        let before = runtime_snapshot();
1685        let root = uniq_cache_cfg_tmp_dir();
1686        let conf_path = write_provider_only_knowdb_with_cache(
1687            &root,
1688            "mysql",
1689            "not-a-valid-mysql-url",
1690            false,
1691            3,
1692            5,
1693        );
1694        let authority_uri = format!(
1695            "file:{}?mode=rwc&uri=true",
1696            root.join("unused.sqlite").display()
1697        );
1698
1699        let err =
1700            init_thread_cloned_from_knowdb(&root, &conf_path, &authority_uri, &EnvDict::default());
1701        assert!(err.is_err());
1702
1703        let after = runtime_snapshot();
1704        assert_eq!(after.result_cache_enabled, before.result_cache_enabled);
1705        assert_eq!(after.result_cache_capacity, before.result_cache_capacity);
1706        assert_eq!(after.result_cache_ttl_ms, before.result_cache_ttl_ms);
1707
1708        let row = query_row("SELECT value FROM t WHERE id = 1").expect("query previous provider");
1709        assert_eq!(row[0].to_string(), "chars(first)");
1710
1711        let _ = fs::remove_dir_all(&root);
1712    }
1713}