Skip to main content

wp_knowledge/
runtime.rs

1use std::collections::HashMap;
2use std::collections::hash_map::DefaultHasher;
3use std::hash::{Hash, Hasher};
4use std::num::NonZeroUsize;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, OnceLock, RwLock};
7use std::time::{Duration, Instant};
8
9use crate::error::{KnowReason, KnowledgeResult};
10use async_trait::async_trait;
11use lru::LruCache;
12use orion_error::conversion::ToStructError;
13use tokio::task;
14use wp_log::{debug_kdb, warn_kdb};
15use wp_model_core::model::{DataField, DataType, Value};
16
17use crate::loader::ProviderKind;
18use crate::mem::RowData;
19use crate::telemetry::{
20    CacheLayer, CacheOutcome, CacheTelemetryEvent, QueryTelemetryEvent, ReloadOutcome,
21    ReloadTelemetryEvent, telemetry, telemetry_enabled,
22};
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub struct DatasourceId(pub String);
26
27impl DatasourceId {
28    pub fn from_seed(kind: ProviderKind, seed: &str) -> Self {
29        let mut hasher = DefaultHasher::new();
30        seed.hash(&mut hasher);
31        let kind_str = match kind {
32            ProviderKind::SqliteAuthority => "sqlite",
33            ProviderKind::Postgres => "postgres",
34            ProviderKind::Mysql => "mysql",
35            ProviderKind::Redis => "redis",
36        };
37        Self(format!("{kind_str}:{:016x}", hasher.finish()))
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub struct Generation(pub u64);
43
44#[derive(Debug, Clone)]
45pub enum QueryMode {
46    Many,
47    FirstRow,
48}
49
50#[derive(Debug, Clone, Copy)]
51pub enum CachePolicy {
52    Bypass,
53    UseGlobal,
54    UseCallScope,
55}
56
57#[derive(Debug, Clone)]
58pub enum QueryValue {
59    Null,
60    Bool(bool),
61    Int(i64),
62    Float(f64),
63    Text(String),
64}
65
66#[derive(Debug, Clone)]
67pub struct QueryParam {
68    pub name: String,
69    pub value: QueryValue,
70}
71
72#[derive(Debug, Clone)]
73pub struct QueryRequest {
74    pub sql: String,
75    pub params: Vec<QueryParam>,
76    pub mode: QueryMode,
77    pub cache_policy: CachePolicy,
78}
79
80impl QueryRequest {
81    pub fn many(
82        sql: impl Into<String>,
83        params: Vec<QueryParam>,
84        cache_policy: CachePolicy,
85    ) -> Self {
86        Self {
87            sql: sql.into(),
88            params,
89            mode: QueryMode::Many,
90            cache_policy,
91        }
92    }
93
94    pub fn first_row(
95        sql: impl Into<String>,
96        params: Vec<QueryParam>,
97        cache_policy: CachePolicy,
98    ) -> Self {
99        Self {
100            sql: sql.into(),
101            params,
102            mode: QueryMode::FirstRow,
103            cache_policy,
104        }
105    }
106}
107
108#[derive(Debug, Clone)]
109pub enum QueryResponse {
110    Rows(Vec<RowData>),
111    Row(RowData),
112}
113
114impl QueryResponse {
115    pub fn into_rows(self) -> Vec<RowData> {
116        match self {
117            QueryResponse::Rows(rows) => rows,
118            QueryResponse::Row(row) => vec![row],
119        }
120    }
121
122    pub fn into_row(self) -> RowData {
123        match self {
124            QueryResponse::Rows(rows) => rows.into_iter().next().unwrap_or_default(),
125            QueryResponse::Row(row) => row,
126        }
127    }
128}
129
130#[async_trait]
131pub trait ProviderExecutor: Send + Sync {
132    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>>;
133    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>>;
134    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData>;
135    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData>;
136
137    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
138        self.query(sql)
139    }
140
141    async fn query_fields_async(
142        &self,
143        sql: &str,
144        params: &[DataField],
145    ) -> KnowledgeResult<Vec<RowData>> {
146        self.query_fields(sql, params)
147    }
148
149    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
150        self.query_row(sql)
151    }
152
153    async fn query_named_fields_async(
154        &self,
155        sql: &str,
156        params: &[DataField],
157    ) -> KnowledgeResult<RowData> {
158        self.query_named_fields(sql, params)
159    }
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
163pub enum QueryModeTag {
164    Many,
165    FirstRow,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Hash)]
169pub struct ResultCacheKey {
170    pub datasource_id: DatasourceId,
171    pub generation: Generation,
172    pub query_hash: u64,
173    pub params_hash: u64,
174    pub mode: QueryModeTag,
175}
176
177pub struct ProviderHandle {
178    pub provider: Arc<dyn ProviderExecutor>,
179    pub datasource_id: DatasourceId,
180    pub generation: Generation,
181    pub kind: ProviderKind,
182}
183
184#[derive(Debug, Clone)]
185pub struct RuntimeSnapshot {
186    pub provider_kind: Option<ProviderKind>,
187    pub datasource_id: Option<DatasourceId>,
188    pub generation: Option<Generation>,
189    pub result_cache_enabled: bool,
190    pub result_cache_len: usize,
191    pub result_cache_capacity: usize,
192    pub result_cache_ttl_ms: u64,
193    pub metadata_cache_len: usize,
194    pub metadata_cache_capacity: usize,
195    pub result_cache_hits: u64,
196    pub result_cache_misses: u64,
197    pub metadata_cache_hits: u64,
198    pub metadata_cache_misses: u64,
199    pub local_cache_hits: u64,
200    pub local_cache_misses: u64,
201    pub reload_successes: u64,
202    pub reload_failures: u64,
203}
204
205#[derive(Debug, Clone)]
206pub struct MetadataCacheScope {
207    pub datasource_id: DatasourceId,
208    pub generation: Generation,
209}
210
211#[derive(Debug, Clone, Copy)]
212pub struct ResultCacheConfig {
213    pub enabled: bool,
214    pub capacity: usize,
215    pub ttl: Duration,
216}
217
218impl Default for ResultCacheConfig {
219    fn default() -> Self {
220        Self {
221            enabled: true,
222            capacity: 1024,
223            ttl: Duration::from_millis(30_000),
224        }
225    }
226}
227
228#[derive(Debug, Clone)]
229struct CachedQueryResponse {
230    response: Arc<QueryResponse>,
231    cached_at: Instant,
232}
233
234// ---------------------------------------------------------------------------
235// Redis result cache types
236// ---------------------------------------------------------------------------
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
239pub(crate) enum RedisCmdTag {
240    BfExists,
241    HGet,
242    Get,
243    SetExists,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Hash)]
247pub(crate) struct RedisCacheKey {
248    pub generation: u64,
249    pub cmd_tag: RedisCmdTag,
250    pub key_hash: u64,
251    pub args_hash: u64,
252}
253
254#[derive(Debug, Clone)]
255pub(crate) enum CachedRedisValue {
256    Bool(bool),
257    OptString(Option<String>),
258}
259
260pub struct KnowledgeRuntime {
261    provider: RwLock<Option<Arc<ProviderHandle>>>,
262    next_generation: AtomicU64,
263    provider_epoch: AtomicU64,
264    current_generation_value: AtomicU64,
265    result_cache_config: RwLock<ResultCacheConfig>,
266    result_cache_enabled: AtomicBool,
267    result_cache_ttl_ms: AtomicU64,
268    result_cache: RwLock<LruCache<ResultCacheKey, CachedQueryResponse>>,
269    result_cache_hits: AtomicU64,
270    result_cache_misses: AtomicU64,
271    metadata_cache_hits: AtomicU64,
272    metadata_cache_misses: AtomicU64,
273    local_cache_hits: AtomicU64,
274    local_cache_misses: AtomicU64,
275    reload_successes: AtomicU64,
276    reload_failures: AtomicU64,
277    redis_cache: RwLock<LruCache<RedisCacheKey, CachedRedisValue>>,
278    redis_cache_hits: AtomicU64,
279    redis_cache_misses: AtomicU64,
280    redis_global_enabled: AtomicBool,
281    redis_enabled_map: RwLock<HashMap<String, bool>>,
282}
283
284impl KnowledgeRuntime {
285    pub fn new(result_cache_capacity: usize) -> Self {
286        let config = ResultCacheConfig {
287            capacity: result_cache_capacity.max(1),
288            ..ResultCacheConfig::default()
289        };
290        let capacity = NonZeroUsize::new(config.capacity).expect("non-zero capacity");
291        Self {
292            provider: RwLock::new(None),
293            next_generation: AtomicU64::new(0),
294            provider_epoch: AtomicU64::new(0),
295            current_generation_value: AtomicU64::new(0),
296            result_cache_config: RwLock::new(config),
297            result_cache_enabled: AtomicBool::new(config.enabled),
298            result_cache_ttl_ms: AtomicU64::new(config.ttl.as_millis() as u64),
299            result_cache: RwLock::new(LruCache::new(capacity)),
300            result_cache_hits: AtomicU64::new(0),
301            result_cache_misses: AtomicU64::new(0),
302            metadata_cache_hits: AtomicU64::new(0),
303            metadata_cache_misses: AtomicU64::new(0),
304            local_cache_hits: AtomicU64::new(0),
305            local_cache_misses: AtomicU64::new(0),
306            reload_successes: AtomicU64::new(0),
307            reload_failures: AtomicU64::new(0),
308            redis_cache: RwLock::new(LruCache::new(capacity)),
309            redis_cache_hits: AtomicU64::new(0),
310            redis_cache_misses: AtomicU64::new(0),
311            redis_global_enabled: AtomicBool::new(true),
312            redis_enabled_map: RwLock::new(HashMap::new()),
313        }
314    }
315
316    pub fn install_provider<F>(
317        &self,
318        kind: ProviderKind,
319        datasource_id: DatasourceId,
320        build: F,
321    ) -> KnowledgeResult<Generation>
322    where
323        F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
324    {
325        let generation = Generation(self.next_generation.fetch_add(1, Ordering::SeqCst) + 1);
326        let previous = self
327            .provider
328            .read()
329            .ok()
330            .and_then(|guard| guard.as_ref().cloned());
331        debug_kdb!(
332            "[kdb] reload provider start kind={kind:?} datasource_id={} target_generation={} previous_generation={}",
333            datasource_id.0,
334            generation.0,
335            previous
336                .as_ref()
337                .map(|handle| handle.generation.0.to_string())
338                .unwrap_or_else(|| "none".to_string())
339        );
340        let provider = match build(generation) {
341            Ok(provider) => provider,
342            Err(err) => {
343                self.reload_failures.fetch_add(1, Ordering::Relaxed);
344                warn_kdb!(
345                    "[kdb] reload provider failed kind={kind:?} datasource_id={} target_generation={} err={}",
346                    datasource_id.0,
347                    generation.0,
348                    err
349                );
350                if telemetry_enabled() {
351                    telemetry().on_reload(&ReloadTelemetryEvent {
352                        outcome: ReloadOutcome::Failure,
353                        provider_kind: kind.clone(),
354                    });
355                }
356                return Err(err);
357            }
358        };
359        debug_kdb!(
360            "[kdb] install provider kind={kind:?} datasource_id={} generation={}",
361            datasource_id.0,
362            generation.0
363        );
364        let kind_for_handle = kind.clone();
365        let datasource_id_for_handle = datasource_id.clone();
366        let handle = Arc::new(ProviderHandle {
367            provider,
368            datasource_id: datasource_id_for_handle,
369            generation,
370            kind: kind_for_handle,
371        });
372        self.provider_epoch.fetch_add(1, Ordering::AcqRel);
373        {
374            let mut guard = self
375                .provider
376                .write()
377                .expect("runtime provider lock poisoned");
378            *guard = Some(handle);
379        }
380        self.current_generation_value
381            .store(generation.0, Ordering::Release);
382        self.provider_epoch.fetch_add(1, Ordering::Release);
383        self.reload_successes.fetch_add(1, Ordering::Relaxed);
384        if telemetry_enabled() {
385            telemetry().on_reload(&ReloadTelemetryEvent {
386                outcome: ReloadOutcome::Success,
387                provider_kind: kind.clone(),
388            });
389        }
390        debug_kdb!(
391            "[kdb] reload provider success kind={kind:?} datasource_id={} generation={}",
392            datasource_id.0,
393            generation.0
394        );
395        Ok(generation)
396    }
397
398    pub fn configure_result_cache(&self, enabled: bool, capacity: usize, ttl: Duration) {
399        let new_config = ResultCacheConfig {
400            enabled,
401            capacity: capacity.max(1),
402            ttl: ttl.max(Duration::from_millis(1)),
403        };
404        let mut should_reset_cache = false;
405        {
406            let mut guard = self
407                .result_cache_config
408                .write()
409                .expect("runtime result cache config lock poisoned");
410            if guard.capacity != new_config.capacity || (!new_config.enabled && guard.enabled) {
411                should_reset_cache = true;
412            }
413            *guard = new_config;
414        }
415        self.result_cache_enabled
416            .store(new_config.enabled, Ordering::Relaxed);
417        self.result_cache_ttl_ms.store(
418            new_config.ttl.as_millis().min(u128::from(u64::MAX)) as u64,
419            Ordering::Relaxed,
420        );
421
422        if should_reset_cache {
423            let mut cache = self
424                .result_cache
425                .write()
426                .expect("runtime result cache lock poisoned");
427            *cache = LruCache::new(
428                NonZeroUsize::new(new_config.capacity).expect("non-zero result cache capacity"),
429            );
430        }
431    }
432
433    pub fn configure_redis_cache(
434        &self,
435        global_enabled: bool,
436        capacity: usize,
437        key_enabled_map: HashMap<String, bool>,
438    ) {
439        let new_capacity =
440            NonZeroUsize::new(capacity.max(1)).expect("non-zero redis cache capacity");
441        if let Ok(mut cache) = self.redis_cache.write() {
442            *cache = LruCache::new(new_capacity);
443        }
444        *self
445            .redis_enabled_map
446            .write()
447            .expect("redis enabled map lock poisoned") = key_enabled_map;
448        self.redis_global_enabled
449            .store(global_enabled, Ordering::Relaxed);
450    }
451
452    pub fn current_generation(&self) -> Option<Generation> {
453        let epoch_before = self.provider_epoch.load(Ordering::Acquire);
454        if epoch_before % 2 == 1 {
455            return self.current_generation_from_provider();
456        }
457        let generation = self.current_generation_value.load(Ordering::Acquire);
458        let epoch_after = self.provider_epoch.load(Ordering::Acquire);
459        if epoch_before != epoch_after {
460            return self.current_generation_from_provider();
461        }
462        match generation {
463            0 => None,
464            generation => Some(Generation(generation)),
465        }
466    }
467
468    pub fn snapshot(&self) -> RuntimeSnapshot {
469        let provider = self
470            .provider
471            .read()
472            .ok()
473            .and_then(|guard| guard.as_ref().cloned());
474        let result_cache_config = self
475            .result_cache_config
476            .read()
477            .map(|guard| *guard)
478            .unwrap_or_default();
479        let (result_cache_len, result_cache_capacity) = self
480            .result_cache
481            .read()
482            .map(|cache| (cache.len(), cache.cap().get()))
483            .unwrap_or((0, 0));
484        let (metadata_cache_len, metadata_cache_capacity) =
485            crate::mem::query_util::column_metadata_cache_snapshot();
486        RuntimeSnapshot {
487            provider_kind: provider.as_ref().map(|handle| handle.kind.clone()),
488            datasource_id: provider.as_ref().map(|handle| handle.datasource_id.clone()),
489            generation: provider.as_ref().map(|handle| handle.generation),
490            result_cache_enabled: result_cache_config.enabled,
491            result_cache_len,
492            result_cache_capacity,
493            result_cache_ttl_ms: result_cache_config.ttl.as_millis() as u64,
494            metadata_cache_len,
495            metadata_cache_capacity,
496            result_cache_hits: self.result_cache_hits.load(Ordering::Relaxed),
497            result_cache_misses: self.result_cache_misses.load(Ordering::Relaxed),
498            metadata_cache_hits: self.metadata_cache_hits.load(Ordering::Relaxed),
499            metadata_cache_misses: self.metadata_cache_misses.load(Ordering::Relaxed),
500            local_cache_hits: self.local_cache_hits.load(Ordering::Relaxed),
501            local_cache_misses: self.local_cache_misses.load(Ordering::Relaxed),
502            reload_successes: self.reload_successes.load(Ordering::Relaxed),
503            reload_failures: self.reload_failures.load(Ordering::Relaxed),
504        }
505    }
506
507    pub fn current_metadata_scope(&self) -> MetadataCacheScope {
508        self.provider
509            .read()
510            .ok()
511            .and_then(|guard| guard.as_ref().cloned())
512            .map(|handle| MetadataCacheScope {
513                datasource_id: handle.datasource_id.clone(),
514                generation: handle.generation,
515            })
516            .unwrap_or_else(|| MetadataCacheScope {
517                datasource_id: DatasourceId("sqlite:standalone".to_string()),
518                generation: Generation(0),
519            })
520    }
521
522    pub fn current_provider_kind(&self) -> Option<ProviderKind> {
523        self.provider
524            .read()
525            .ok()
526            .and_then(|guard| guard.as_ref().map(|handle| handle.kind.clone()))
527    }
528
529    pub fn record_result_cache_hit(&self) {
530        self.result_cache_hits.fetch_add(1, Ordering::Relaxed);
531    }
532
533    pub fn record_result_cache_miss(&self) {
534        self.result_cache_misses.fetch_add(1, Ordering::Relaxed);
535    }
536
537    pub fn record_metadata_cache_hit(&self) {
538        self.metadata_cache_hits.fetch_add(1, Ordering::Relaxed);
539    }
540
541    pub fn record_metadata_cache_miss(&self) {
542        self.metadata_cache_misses.fetch_add(1, Ordering::Relaxed);
543    }
544
545    pub fn record_local_cache_hit(&self) {
546        self.local_cache_hits.fetch_add(1, Ordering::Relaxed);
547    }
548
549    pub fn record_local_cache_miss(&self) {
550        self.local_cache_misses.fetch_add(1, Ordering::Relaxed);
551    }
552
553    pub fn execute(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
554        let handle = self.current_handle()?;
555        self.execute_with_handle(&handle, req)
556    }
557
558    fn execute_with_handle(
559        &self,
560        handle: &Arc<ProviderHandle>,
561        req: &QueryRequest,
562    ) -> KnowledgeResult<QueryResponse> {
563        let use_global_cache =
564            matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
565        if use_global_cache && let Some(hit) = self.fetch_result_cache(handle, req) {
566            self.record_result_cache_hit();
567            if telemetry_enabled() {
568                telemetry().on_cache(&CacheTelemetryEvent {
569                    layer: CacheLayer::Result,
570                    outcome: CacheOutcome::Hit,
571                    provider_kind: Some(handle.kind.clone()),
572                });
573            }
574            debug_kdb!(
575                "[kdb] global result cache hit kind={:?} generation={}",
576                handle.kind,
577                handle.generation.0
578            );
579            return Ok(hit);
580        }
581        if use_global_cache {
582            self.record_result_cache_miss();
583            if telemetry_enabled() {
584                telemetry().on_cache(&CacheTelemetryEvent {
585                    layer: CacheLayer::Result,
586                    outcome: CacheOutcome::Miss,
587                    provider_kind: Some(handle.kind.clone()),
588                });
589            }
590            debug_kdb!(
591                "[kdb] global result cache miss kind={:?} generation={}",
592                handle.kind,
593                handle.generation.0
594            );
595        }
596
597        let params = params_to_fields(&req.params);
598        let mode_tag = query_mode_tag(&req.mode);
599        let started = Instant::now();
600        debug_kdb!(
601            "[kdb] execute query kind={:?} generation={} mode={:?} cache_policy={:?}",
602            handle.kind,
603            handle.generation.0,
604            req.mode,
605            req.cache_policy
606        );
607        let response = match match req.mode {
608            QueryMode::Many => {
609                if params.is_empty() {
610                    handle.provider.query(&req.sql).map(QueryResponse::Rows)
611                } else {
612                    handle
613                        .provider
614                        .query_fields(&req.sql, &params)
615                        .map(QueryResponse::Rows)
616                }
617            }
618            QueryMode::FirstRow => {
619                if params.is_empty() {
620                    handle.provider.query_row(&req.sql).map(QueryResponse::Row)
621                } else {
622                    handle
623                        .provider
624                        .query_named_fields(&req.sql, &params)
625                        .map(QueryResponse::Row)
626                }
627            }
628        } {
629            Ok(response) => {
630                if telemetry_enabled() {
631                    telemetry().on_query(&QueryTelemetryEvent {
632                        provider_kind: handle.kind.clone(),
633                        mode: mode_tag,
634                        success: true,
635                        elapsed: started.elapsed(),
636                    });
637                }
638                response
639            }
640            Err(err) => {
641                if telemetry_enabled() {
642                    telemetry().on_query(&QueryTelemetryEvent {
643                        provider_kind: handle.kind.clone(),
644                        mode: mode_tag,
645                        success: false,
646                        elapsed: started.elapsed(),
647                    });
648                }
649                return Err(err);
650            }
651        };
652
653        if use_global_cache {
654            self.save_result_cache(handle, req, response.clone());
655            debug_kdb!(
656                "[kdb] global result cache store kind={:?} generation={}",
657                handle.kind,
658                handle.generation.0
659            );
660        }
661
662        Ok(response)
663    }
664
665    pub fn execute_first_row_fields(
666        &self,
667        sql: &str,
668        params: &[DataField],
669        cache_policy: CachePolicy,
670    ) -> KnowledgeResult<RowData> {
671        let handle = self.current_handle()?;
672        self.execute_first_row_fields_with_handle(&handle, sql, params, cache_policy)
673    }
674
675    fn execute_first_row_fields_with_handle(
676        &self,
677        handle: &Arc<ProviderHandle>,
678        sql: &str,
679        params: &[DataField],
680        cache_policy: CachePolicy,
681    ) -> KnowledgeResult<RowData> {
682        let use_global_cache =
683            matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
684        if use_global_cache
685            && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
686                handle,
687                sql,
688                params,
689                QueryModeTag::FirstRow,
690            ))
691        {
692            self.record_result_cache_hit();
693            if telemetry_enabled() {
694                telemetry().on_cache(&CacheTelemetryEvent {
695                    layer: CacheLayer::Result,
696                    outcome: CacheOutcome::Hit,
697                    provider_kind: Some(handle.kind.clone()),
698                });
699            }
700            return Ok(hit.into_row());
701        }
702        if use_global_cache {
703            self.record_result_cache_miss();
704            if telemetry_enabled() {
705                telemetry().on_cache(&CacheTelemetryEvent {
706                    layer: CacheLayer::Result,
707                    outcome: CacheOutcome::Miss,
708                    provider_kind: Some(handle.kind.clone()),
709                });
710            }
711        }
712
713        let started = Instant::now();
714        let row = if params.is_empty() {
715            handle.provider.query_row(sql)
716        } else {
717            handle.provider.query_named_fields(sql, params)
718        };
719        let row = match row {
720            Ok(row) => {
721                if telemetry_enabled() {
722                    telemetry().on_query(&QueryTelemetryEvent {
723                        provider_kind: handle.kind.clone(),
724                        mode: QueryModeTag::FirstRow,
725                        success: true,
726                        elapsed: started.elapsed(),
727                    });
728                }
729                row
730            }
731            Err(err) => {
732                if telemetry_enabled() {
733                    telemetry().on_query(&QueryTelemetryEvent {
734                        provider_kind: handle.kind.clone(),
735                        mode: QueryModeTag::FirstRow,
736                        success: false,
737                        elapsed: started.elapsed(),
738                    });
739                }
740                return Err(err);
741            }
742        };
743
744        if use_global_cache {
745            self.save_result_cache_by_key(
746                result_cache_key_fields(handle, sql, params, QueryModeTag::FirstRow),
747                QueryResponse::Row(row.clone()),
748            );
749        }
750
751        Ok(row)
752    }
753
754    pub async fn execute_async(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
755        let handle = self.current_handle()?;
756        if matches!(handle.kind, ProviderKind::SqliteAuthority) {
757            let handle = handle.clone();
758            let req = req.clone();
759            return task::spawn_blocking(move || runtime().execute_with_handle(&handle, &req))
760                .await
761                .map_err(|err| {
762                    KnowReason::from_logic()
763                        .to_err()
764                        .with_detail(format!("knowledge async sqlite query join failed: {err}"))
765                })?;
766        }
767        let use_global_cache =
768            matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
769        if use_global_cache && let Some(hit) = self.fetch_result_cache(&handle, req) {
770            self.record_result_cache_hit();
771            if telemetry_enabled() {
772                telemetry().on_cache(&CacheTelemetryEvent {
773                    layer: CacheLayer::Result,
774                    outcome: CacheOutcome::Hit,
775                    provider_kind: Some(handle.kind.clone()),
776                });
777            }
778            return Ok(hit);
779        }
780        if use_global_cache {
781            self.record_result_cache_miss();
782            if telemetry_enabled() {
783                telemetry().on_cache(&CacheTelemetryEvent {
784                    layer: CacheLayer::Result,
785                    outcome: CacheOutcome::Miss,
786                    provider_kind: Some(handle.kind.clone()),
787                });
788            }
789        }
790
791        let params = params_to_fields(&req.params);
792        let mode_tag = query_mode_tag(&req.mode);
793        let started = Instant::now();
794        let response = match req.mode {
795            QueryMode::Many => {
796                if params.is_empty() {
797                    handle
798                        .provider
799                        .query_async(&req.sql)
800                        .await
801                        .map(QueryResponse::Rows)
802                } else {
803                    handle
804                        .provider
805                        .query_fields_async(&req.sql, &params)
806                        .await
807                        .map(QueryResponse::Rows)
808                }
809            }
810            QueryMode::FirstRow => {
811                if params.is_empty() {
812                    handle
813                        .provider
814                        .query_row_async(&req.sql)
815                        .await
816                        .map(QueryResponse::Row)
817                } else {
818                    handle
819                        .provider
820                        .query_named_fields_async(&req.sql, &params)
821                        .await
822                        .map(QueryResponse::Row)
823                }
824            }
825        };
826        let response = match response {
827            Ok(response) => {
828                if telemetry_enabled() {
829                    telemetry().on_query(&QueryTelemetryEvent {
830                        provider_kind: handle.kind.clone(),
831                        mode: mode_tag,
832                        success: true,
833                        elapsed: started.elapsed(),
834                    });
835                }
836                response
837            }
838            Err(err) => {
839                if telemetry_enabled() {
840                    telemetry().on_query(&QueryTelemetryEvent {
841                        provider_kind: handle.kind.clone(),
842                        mode: mode_tag,
843                        success: false,
844                        elapsed: started.elapsed(),
845                    });
846                }
847                return Err(err);
848            }
849        };
850
851        if use_global_cache {
852            self.save_result_cache(&handle, req, response.clone());
853        }
854
855        Ok(response)
856    }
857
858    pub async fn execute_first_row_fields_async(
859        &self,
860        sql: &str,
861        params: &[DataField],
862        cache_policy: CachePolicy,
863    ) -> KnowledgeResult<RowData> {
864        let handle = self.current_handle()?;
865        if matches!(handle.kind, ProviderKind::SqliteAuthority) {
866            let handle = handle.clone();
867            let sql = sql.to_string();
868            let params = params.to_vec();
869            return task::spawn_blocking(move || {
870                runtime().execute_first_row_fields_with_handle(&handle, &sql, &params, cache_policy)
871            })
872            .await
873            .map_err(|err| {
874                KnowReason::from_logic().to_err().with_detail(format!(
875                    "knowledge async sqlite first-row query join failed: {err}"
876                ))
877            })?;
878        }
879        let use_global_cache =
880            matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
881        if use_global_cache
882            && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
883                &handle,
884                sql,
885                params,
886                QueryModeTag::FirstRow,
887            ))
888        {
889            self.record_result_cache_hit();
890            if telemetry_enabled() {
891                telemetry().on_cache(&CacheTelemetryEvent {
892                    layer: CacheLayer::Result,
893                    outcome: CacheOutcome::Hit,
894                    provider_kind: Some(handle.kind.clone()),
895                });
896            }
897            return Ok(hit.into_row());
898        }
899        if use_global_cache {
900            self.record_result_cache_miss();
901            if telemetry_enabled() {
902                telemetry().on_cache(&CacheTelemetryEvent {
903                    layer: CacheLayer::Result,
904                    outcome: CacheOutcome::Miss,
905                    provider_kind: Some(handle.kind.clone()),
906                });
907            }
908        }
909
910        let started = Instant::now();
911        let row = if params.is_empty() {
912            handle.provider.query_row_async(sql).await
913        } else {
914            handle.provider.query_named_fields_async(sql, params).await
915        };
916        let row = match row {
917            Ok(row) => {
918                if telemetry_enabled() {
919                    telemetry().on_query(&QueryTelemetryEvent {
920                        provider_kind: handle.kind.clone(),
921                        mode: QueryModeTag::FirstRow,
922                        success: true,
923                        elapsed: started.elapsed(),
924                    });
925                }
926                row
927            }
928            Err(err) => {
929                if telemetry_enabled() {
930                    telemetry().on_query(&QueryTelemetryEvent {
931                        provider_kind: handle.kind.clone(),
932                        mode: QueryModeTag::FirstRow,
933                        success: false,
934                        elapsed: started.elapsed(),
935                    });
936                }
937                return Err(err);
938            }
939        };
940
941        if use_global_cache {
942            self.save_result_cache_by_key(
943                result_cache_key_fields(&handle, sql, params, QueryModeTag::FirstRow),
944                QueryResponse::Row(row.clone()),
945            );
946        }
947
948        Ok(row)
949    }
950
951    fn current_handle(&self) -> KnowledgeResult<Arc<ProviderHandle>> {
952        self.provider
953            .read()
954            .expect("runtime provider lock poisoned")
955            .clone()
956            .ok_or_else(|| {
957                KnowReason::from_logic()
958                    .to_err()
959                    .with_detail("knowledge provider not initialized")
960            })
961    }
962
963    fn current_generation_from_provider(&self) -> Option<Generation> {
964        self.provider
965            .read()
966            .ok()
967            .and_then(|guard| guard.as_ref().map(|handle| handle.generation))
968    }
969
970    fn fetch_result_cache(
971        &self,
972        handle: &ProviderHandle,
973        req: &QueryRequest,
974    ) -> Option<QueryResponse> {
975        self.fetch_result_cache_by_key(result_cache_key(handle, req))
976    }
977
978    fn fetch_result_cache_by_key(&self, key: ResultCacheKey) -> Option<QueryResponse> {
979        if !self.result_cache_enabled() {
980            return None;
981        }
982        let cached = self
983            .result_cache
984            .read()
985            .ok()
986            .and_then(|cache| cache.peek(&key).cloned())?;
987        if cached.cached_at.elapsed() > self.result_cache_ttl() {
988            if let Ok(mut cache) = self.result_cache.write() {
989                let _ = cache.pop(&key);
990            }
991            return None;
992        }
993        Some((*cached.response).clone())
994    }
995
996    fn save_result_cache(
997        &self,
998        handle: &ProviderHandle,
999        req: &QueryRequest,
1000        response: QueryResponse,
1001    ) {
1002        self.save_result_cache_by_key(result_cache_key(handle, req), response);
1003    }
1004
1005    fn save_result_cache_by_key(&self, key: ResultCacheKey, response: QueryResponse) {
1006        if let Ok(mut cache) = self.result_cache.write() {
1007            cache.put(
1008                key,
1009                CachedQueryResponse {
1010                    response: Arc::new(response),
1011                    cached_at: Instant::now(),
1012                },
1013            );
1014        }
1015    }
1016
1017    // -----------------------------------------------------------------------
1018    // Redis result cache
1019    // -----------------------------------------------------------------------
1020
1021    fn redis_cache_enabled(&self) -> bool {
1022        self.result_cache_enabled.load(Ordering::Acquire)
1023    }
1024
1025    #[allow(dead_code)]
1026    fn redis_cache_ttl(&self) -> Duration {
1027        Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Acquire))
1028    }
1029
1030    fn fetch_redis_cache(&self, key: &RedisCacheKey) -> Option<CachedRedisValue> {
1031        if !self.redis_cache_enabled() {
1032            return None;
1033        }
1034        let cached = self
1035            .redis_cache
1036            .read()
1037            .ok()
1038            .and_then(|cache| cache.peek(key).cloned())?;
1039        Some(cached)
1040    }
1041
1042    fn save_redis_cache(&self, key: RedisCacheKey, value: CachedRedisValue) {
1043        if !self.redis_cache_enabled() {
1044            return;
1045        }
1046        if let Ok(mut cache) = self.redis_cache.write() {
1047            cache.put(key, value);
1048        }
1049    }
1050
1051    pub(crate) fn redis_cache_get(
1052        &self,
1053        ck: &RedisCacheKey,
1054        redis_key: &str,
1055    ) -> Option<CachedRedisValue> {
1056        // Check per-key enabled flag
1057        let enabled = self
1058            .redis_enabled_map
1059            .read()
1060            .ok()
1061            .and_then(|map| map.get(redis_key).copied())
1062            .unwrap_or(self.redis_global_enabled.load(Ordering::Relaxed));
1063        if !enabled {
1064            return None;
1065        }
1066        let value = self.fetch_redis_cache(ck)?;
1067        self.redis_cache_hits.fetch_add(1, Ordering::Relaxed);
1068        Some(value)
1069    }
1070
1071    pub(crate) fn redis_cache_put(
1072        &self,
1073        ck: RedisCacheKey,
1074        redis_key: &str,
1075        value: CachedRedisValue,
1076    ) {
1077        // Don't store entries for disabled keys
1078        let enabled = self
1079            .redis_enabled_map
1080            .read()
1081            .ok()
1082            .and_then(|map| map.get(redis_key).copied())
1083            .unwrap_or(self.redis_global_enabled.load(Ordering::Relaxed));
1084        if !enabled {
1085            return;
1086        }
1087        self.redis_cache_misses.fetch_add(1, Ordering::Relaxed);
1088        self.save_redis_cache(ck, value);
1089    }
1090
1091    #[allow(dead_code)]
1092    fn clear_redis_cache(&self) {
1093        if let Ok(mut cache) = self.redis_cache.write() {
1094            cache.clear();
1095        }
1096    }
1097
1098    #[inline]
1099    fn result_cache_enabled(&self) -> bool {
1100        self.result_cache_enabled.load(Ordering::Relaxed)
1101    }
1102
1103    #[inline]
1104    fn result_cache_ttl(&self) -> Duration {
1105        Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Relaxed))
1106    }
1107}
1108
1109pub fn runtime() -> &'static KnowledgeRuntime {
1110    static RUNTIME: OnceLock<KnowledgeRuntime> = OnceLock::new();
1111    RUNTIME.get_or_init(|| KnowledgeRuntime::new(1024))
1112}
1113
1114#[cfg(test)]
1115pub(crate) struct RuntimeTestGuard(tokio::sync::Mutex<()>);
1116
1117#[cfg(test)]
1118impl RuntimeTestGuard {
1119    pub(crate) fn lock(&self) -> Result<tokio::sync::MutexGuard<'_, ()>, std::convert::Infallible> {
1120        Ok(self.0.blocking_lock())
1121    }
1122
1123    pub(crate) async fn lock_async(&self) -> tokio::sync::MutexGuard<'_, ()> {
1124        self.0.lock().await
1125    }
1126}
1127
1128#[cfg(test)]
1129pub(crate) fn runtime_test_guard() -> &'static RuntimeTestGuard {
1130    static GUARD: OnceLock<RuntimeTestGuard> = OnceLock::new();
1131    GUARD.get_or_init(|| RuntimeTestGuard(tokio::sync::Mutex::new(())))
1132}
1133
1134fn result_cache_key(handle: &ProviderHandle, req: &QueryRequest) -> ResultCacheKey {
1135    ResultCacheKey {
1136        datasource_id: handle.datasource_id.clone(),
1137        generation: handle.generation,
1138        query_hash: stable_hash(&req.sql),
1139        params_hash: stable_params_hash(&req.params),
1140        mode: match req.mode {
1141            QueryMode::Many => QueryModeTag::Many,
1142            QueryMode::FirstRow => QueryModeTag::FirstRow,
1143        },
1144    }
1145}
1146
1147fn result_cache_key_fields(
1148    handle: &ProviderHandle,
1149    sql: &str,
1150    params: &[DataField],
1151    mode: QueryModeTag,
1152) -> ResultCacheKey {
1153    ResultCacheKey {
1154        datasource_id: handle.datasource_id.clone(),
1155        generation: handle.generation,
1156        query_hash: stable_hash(sql),
1157        params_hash: stable_field_params_hash(params),
1158        mode,
1159    }
1160}
1161
1162fn query_mode_tag(mode: &QueryMode) -> QueryModeTag {
1163    match mode {
1164        QueryMode::Many => QueryModeTag::Many,
1165        QueryMode::FirstRow => QueryModeTag::FirstRow,
1166    }
1167}
1168
1169fn stable_hash(value: &str) -> u64 {
1170    let mut hasher = DefaultHasher::new();
1171    value.hash(&mut hasher);
1172    hasher.finish()
1173}
1174
1175fn stable_params_hash(params: &[QueryParam]) -> u64 {
1176    let mut hasher = DefaultHasher::new();
1177    for param in params {
1178        param.name.hash(&mut hasher);
1179        match &param.value {
1180            QueryValue::Null => 0u8.hash(&mut hasher),
1181            QueryValue::Bool(value) => {
1182                1u8.hash(&mut hasher);
1183                value.hash(&mut hasher);
1184            }
1185            QueryValue::Int(value) => {
1186                2u8.hash(&mut hasher);
1187                value.hash(&mut hasher);
1188            }
1189            QueryValue::Float(value) => {
1190                3u8.hash(&mut hasher);
1191                value.to_bits().hash(&mut hasher);
1192            }
1193            QueryValue::Text(value) => {
1194                4u8.hash(&mut hasher);
1195                value.hash(&mut hasher);
1196            }
1197        }
1198    }
1199    hasher.finish()
1200}
1201
1202fn stable_field_params_hash(params: &[DataField]) -> u64 {
1203    let mut hasher = DefaultHasher::new();
1204    for field in params {
1205        field.get_name().hash(&mut hasher);
1206        match field.get_value() {
1207            Value::Null | Value::Ignore(_) => 0u8.hash(&mut hasher),
1208            Value::Bool(value) => {
1209                1u8.hash(&mut hasher);
1210                value.hash(&mut hasher);
1211            }
1212            Value::Digit(value) => {
1213                2u8.hash(&mut hasher);
1214                value.hash(&mut hasher);
1215            }
1216            Value::Float(value) => {
1217                3u8.hash(&mut hasher);
1218                value.to_bits().hash(&mut hasher);
1219            }
1220            Value::Chars(value) => {
1221                4u8.hash(&mut hasher);
1222                value.hash(&mut hasher);
1223            }
1224            Value::Symbol(value) => {
1225                5u8.hash(&mut hasher);
1226                value.hash(&mut hasher);
1227            }
1228            Value::Time(value) => {
1229                6u8.hash(&mut hasher);
1230                value.hash(&mut hasher);
1231            }
1232            Value::Hex(value) => {
1233                7u8.hash(&mut hasher);
1234                value.to_string().hash(&mut hasher);
1235            }
1236            Value::IpNet(value) => {
1237                8u8.hash(&mut hasher);
1238                value.to_string().hash(&mut hasher);
1239            }
1240            Value::IpAddr(value) => {
1241                9u8.hash(&mut hasher);
1242                value.hash(&mut hasher);
1243            }
1244            Value::Obj(value) => {
1245                10u8.hash(&mut hasher);
1246                format!("{:?}", value).hash(&mut hasher);
1247            }
1248            Value::Array(value) => {
1249                11u8.hash(&mut hasher);
1250                format!("{:?}", value).hash(&mut hasher);
1251            }
1252            Value::Domain(value) => {
1253                12u8.hash(&mut hasher);
1254                value.0.hash(&mut hasher);
1255            }
1256            Value::Url(value) => {
1257                13u8.hash(&mut hasher);
1258                value.0.hash(&mut hasher);
1259            }
1260            Value::Email(value) => {
1261                14u8.hash(&mut hasher);
1262                value.0.hash(&mut hasher);
1263            }
1264            Value::IdCard(value) => {
1265                15u8.hash(&mut hasher);
1266                value.0.hash(&mut hasher);
1267            }
1268            Value::MobilePhone(value) => {
1269                16u8.hash(&mut hasher);
1270                value.0.hash(&mut hasher);
1271            }
1272        }
1273    }
1274    hasher.finish()
1275}
1276
1277pub fn fields_to_params(params: &[DataField]) -> Vec<QueryParam> {
1278    params
1279        .iter()
1280        .map(|field| {
1281            let value = match field.get_value() {
1282                Value::Null | Value::Ignore(_) => QueryValue::Null,
1283                Value::Bool(value) => QueryValue::Bool(*value),
1284                Value::Digit(value) => QueryValue::Int(*value),
1285                Value::Float(value) => QueryValue::Float(*value),
1286                Value::Chars(value) => QueryValue::Text(value.to_string()),
1287                Value::Symbol(value) => QueryValue::Text(value.to_string()),
1288                Value::Time(value) => QueryValue::Text(value.to_string()),
1289                Value::Hex(value) => QueryValue::Text(value.to_string()),
1290                Value::IpNet(value) => QueryValue::Text(value.to_string()),
1291                Value::IpAddr(value) => QueryValue::Text(value.to_string()),
1292                Value::Obj(value) => QueryValue::Text(format!("{:?}", value)),
1293                Value::Array(value) => QueryValue::Text(format!("{:?}", value)),
1294                Value::Domain(value) => QueryValue::Text(value.0.to_string()),
1295                Value::Url(value) => QueryValue::Text(value.0.to_string()),
1296                Value::Email(value) => QueryValue::Text(value.0.to_string()),
1297                Value::IdCard(value) => QueryValue::Text(value.0.to_string()),
1298                Value::MobilePhone(value) => QueryValue::Text(value.0.to_string()),
1299            };
1300            QueryParam {
1301                name: field.get_name().to_string(),
1302                value,
1303            }
1304        })
1305        .collect()
1306}
1307
1308pub fn params_to_fields(params: &[QueryParam]) -> Vec<DataField> {
1309    params
1310        .iter()
1311        .map(|param| match &param.value {
1312            QueryValue::Null => {
1313                DataField::new(DataType::default(), param.name.clone(), Value::Null)
1314            }
1315            QueryValue::Bool(value) => {
1316                DataField::new(DataType::default(), param.name.clone(), Value::Bool(*value))
1317            }
1318            QueryValue::Int(value) => DataField::from_digit(param.name.clone(), *value),
1319            QueryValue::Float(value) => DataField::from_float(param.name.clone(), *value),
1320            QueryValue::Text(value) => DataField::from_chars(param.name.clone(), value.clone()),
1321        })
1322        .collect()
1323}
1324
1325#[cfg(test)]
1326mod tests {
1327    use super::*;
1328    use async_trait::async_trait;
1329    use std::sync::Arc;
1330    use wp_model_core::model::Value;
1331
1332    struct TestProvider {
1333        value: &'static str,
1334    }
1335
1336    #[async_trait]
1337    impl ProviderExecutor for TestProvider {
1338        fn query(&self, _sql: &str) -> KnowledgeResult<Vec<RowData>> {
1339            Ok(vec![vec![DataField::from_chars("value", self.value)]])
1340        }
1341
1342        fn query_fields(&self, _sql: &str, _params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
1343            self.query("")
1344        }
1345
1346        fn query_row(&self, _sql: &str) -> KnowledgeResult<RowData> {
1347            Ok(vec![DataField::from_chars("value", self.value)])
1348        }
1349
1350        fn query_named_fields(
1351            &self,
1352            _sql: &str,
1353            _params: &[DataField],
1354        ) -> KnowledgeResult<RowData> {
1355            self.query_row("")
1356        }
1357    }
1358
1359    #[test]
1360    fn query_param_hash_is_stable() {
1361        let params = vec![
1362            QueryParam {
1363                name: ":id".to_string(),
1364                value: QueryValue::Int(7),
1365            },
1366            QueryParam {
1367                name: ":name".to_string(),
1368                value: QueryValue::Text("abc".to_string()),
1369            },
1370        ];
1371        assert_eq!(stable_params_hash(&params), stable_params_hash(&params));
1372    }
1373
1374    #[test]
1375    fn fields_to_params_preserves_raw_chars_value() {
1376        let fields = [DataField::from_chars(
1377            ":name".to_string(),
1378            "令狐冲".to_string(),
1379        )];
1380        let params = fields_to_params(&fields);
1381        assert_eq!(params.len(), 1);
1382        match &params[0].value {
1383            QueryValue::Text(value) => assert_eq!(value, "令狐冲"),
1384            other => panic!("unexpected param value: {other:?}"),
1385        }
1386        let roundtrip = params_to_fields(&params);
1387        assert!(matches!(roundtrip[0].get_value(), Value::Chars(_)));
1388    }
1389
1390    #[tokio::test(flavor = "current_thread")]
1391    async fn sqlite_async_bridge_keeps_captured_handle_after_reload() {
1392        let _guard = runtime_test_guard().lock_async().await;
1393        runtime()
1394            .install_provider(
1395                ProviderKind::SqliteAuthority,
1396                DatasourceId("sqlite:old".to_string()),
1397                |_generation| Ok(Arc::new(TestProvider { value: "old" })),
1398            )
1399            .expect("install old provider");
1400        let old_handle = runtime().current_handle().expect("current old handle");
1401
1402        runtime()
1403            .install_provider(
1404                ProviderKind::SqliteAuthority,
1405                DatasourceId("sqlite:new".to_string()),
1406                |_generation| Ok(Arc::new(TestProvider { value: "new" })),
1407            )
1408            .expect("install new provider");
1409
1410        let req = QueryRequest::first_row("SELECT value", Vec::new(), CachePolicy::Bypass);
1411        let row = task::spawn_blocking(move || runtime().execute_with_handle(&old_handle, &req))
1412            .await
1413            .expect("join sqlite bridge")
1414            .expect("execute old handle")
1415            .into_row();
1416        assert_eq!(row[0].to_string(), "chars(old)");
1417    }
1418
1419    // -----------------------------------------------------------------------
1420    // Redis cache unit tests
1421    // -----------------------------------------------------------------------
1422
1423    fn redis_ck(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
1424        let mut hasher = DefaultHasher::new();
1425        key.hash(&mut hasher);
1426        let key_hash = hasher.finish();
1427        let mut hasher = DefaultHasher::new();
1428        for arg in args {
1429            arg.hash(&mut hasher);
1430        }
1431        let args_hash = hasher.finish();
1432        RedisCacheKey {
1433            generation,
1434            cmd_tag: cmd,
1435            key_hash,
1436            args_hash,
1437        }
1438    }
1439
1440    #[test]
1441    fn redis_cache_hit_and_miss() {
1442        let rt = KnowledgeRuntime::new(64);
1443        rt.configure_redis_cache(true, 64, HashMap::new());
1444
1445        let ck = redis_ck(RedisCmdTag::Get, 1, "user:1", &[]);
1446        // First access — miss
1447        assert!(rt.redis_cache_get(&ck, "user:1").is_none());
1448        // Store
1449        rt.redis_cache_put(ck.clone(), "user:1", CachedRedisValue::Bool(true));
1450        // Second access — hit
1451        let val = rt.redis_cache_get(&ck, "user:1").expect("should hit cache");
1452        assert!(matches!(val, CachedRedisValue::Bool(true)));
1453    }
1454
1455    #[test]
1456    fn redis_cache_disabled_key_is_not_read() {
1457        let rt = KnowledgeRuntime::new(64);
1458        let mut key_map = HashMap::new();
1459        key_map.insert("volatile".to_string(), false);
1460        rt.configure_redis_cache(true, 64, key_map);
1461
1462        let ck = redis_ck(RedisCmdTag::Get, 1, "volatile", &[]);
1463        // Store a value
1464        rt.redis_cache_put(ck.clone(), "volatile", CachedRedisValue::Bool(true));
1465        // But disabled key should never return it
1466        assert!(rt.redis_cache_get(&ck, "volatile").is_none());
1467    }
1468
1469    #[test]
1470    fn redis_cache_disabled_key_is_not_stored() {
1471        let rt = KnowledgeRuntime::new(64);
1472        let mut key_map = HashMap::new();
1473        key_map.insert("volatile".to_string(), false);
1474        rt.configure_redis_cache(true, 64, key_map);
1475
1476        let ck = redis_ck(RedisCmdTag::Get, 1, "volatile", &[]);
1477        // put for disabled key should be a no-op
1478        rt.redis_cache_put(ck.clone(), "volatile", CachedRedisValue::Bool(true));
1479        // Even after enabling the key, nothing was stored
1480        let mut key_map = HashMap::new();
1481        key_map.insert("volatile".to_string(), true);
1482        rt.configure_redis_cache(true, 64, key_map);
1483        assert!(rt.redis_cache_get(&ck, "volatile").is_none());
1484    }
1485
1486    #[test]
1487    fn redis_cache_per_key_override_works_independently() {
1488        let rt = KnowledgeRuntime::new(64);
1489        let mut key_map = HashMap::new();
1490        key_map.insert("disabled_key".to_string(), false);
1491        rt.configure_redis_cache(true, 64, key_map);
1492
1493        let ck_disabled = redis_ck(RedisCmdTag::HGet, 1, "disabled_key", &["f"]);
1494        let ck_enabled = redis_ck(RedisCmdTag::HGet, 1, "enabled_key", &["f"]);
1495
1496        // Store both
1497        rt.redis_cache_put(
1498            ck_disabled.clone(),
1499            "disabled_key",
1500            CachedRedisValue::OptString(Some("x".to_string())),
1501        );
1502        rt.redis_cache_put(
1503            ck_enabled.clone(),
1504            "enabled_key",
1505            CachedRedisValue::OptString(Some("y".to_string())),
1506        );
1507
1508        // Disabled key returns None
1509        assert!(rt.redis_cache_get(&ck_disabled, "disabled_key").is_none());
1510        // Enabled key returns cached value
1511        let val = rt
1512            .redis_cache_get(&ck_enabled, "enabled_key")
1513            .expect("should hit");
1514        assert!(matches!(val, CachedRedisValue::OptString(Some(ref s)) if s == "y"));
1515    }
1516
1517    #[test]
1518    fn redis_cache_global_disabled_blocks_all() {
1519        let rt = KnowledgeRuntime::new(64);
1520        rt.configure_redis_cache(false, 64, HashMap::new());
1521
1522        let ck = redis_ck(RedisCmdTag::BfExists, 1, "any_key", &["item"]);
1523        rt.redis_cache_put(ck.clone(), "any_key", CachedRedisValue::Bool(true));
1524        // Global disabled — no reads
1525        assert!(rt.redis_cache_get(&ck, "any_key").is_none());
1526    }
1527
1528    #[test]
1529    fn redis_cache_generation_isolation() {
1530        let rt = KnowledgeRuntime::new(64);
1531        rt.configure_redis_cache(true, 64, HashMap::new());
1532
1533        let ck_gen1 = redis_ck(RedisCmdTag::BfExists, 1, "key", &["item"]);
1534        let ck_gen2 = redis_ck(RedisCmdTag::BfExists, 2, "key", &["item"]);
1535
1536        // Store with generation 1
1537        rt.redis_cache_put(ck_gen1.clone(), "key", CachedRedisValue::Bool(false));
1538
1539        // Same key but generation 2 — miss
1540        assert!(rt.redis_cache_get(&ck_gen2, "key").is_none());
1541        // Generation 1 — hit
1542        assert!(rt.redis_cache_get(&ck_gen1, "key").is_some());
1543    }
1544}