Skip to main content

wp_knowledge/
runtime.rs

1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::num::NonZeroUsize;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::{Duration, Instant};
7
8use crate::error::{KnowReason, KnowledgeResult};
9use async_trait::async_trait;
10use lru::LruCache;
11use orion_error::conversion::ToStructError;
12use tokio::task;
13use wp_log::{debug_kdb, warn_kdb};
14use wp_model_core::model::{DataField, DataType, Value};
15
16use crate::loader::ProviderKind;
17use crate::mem::RowData;
18use crate::telemetry::{
19    CacheLayer, CacheOutcome, CacheTelemetryEvent, QueryTelemetryEvent, ReloadOutcome,
20    ReloadTelemetryEvent, telemetry, telemetry_enabled,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct DatasourceId(pub String);
25
26impl DatasourceId {
27    pub fn from_seed(kind: ProviderKind, seed: &str) -> Self {
28        let mut hasher = DefaultHasher::new();
29        seed.hash(&mut hasher);
30        let kind_str = match kind {
31            ProviderKind::SqliteAuthority => "sqlite",
32            ProviderKind::Postgres => "postgres",
33            ProviderKind::Mysql => "mysql",
34            ProviderKind::Redis => "redis",
35        };
36        Self(format!("{kind_str}:{:016x}", hasher.finish()))
37    }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41pub struct Generation(pub u64);
42
43#[derive(Debug, Clone)]
44pub enum QueryMode {
45    Many,
46    FirstRow,
47}
48
49#[derive(Debug, Clone, Copy)]
50pub enum CachePolicy {
51    Bypass,
52    UseGlobal,
53    UseCallScope,
54}
55
56#[derive(Debug, Clone)]
57pub enum QueryValue {
58    Null,
59    Bool(bool),
60    Int(i64),
61    Float(f64),
62    Text(String),
63}
64
65#[derive(Debug, Clone)]
66pub struct QueryParam {
67    pub name: String,
68    pub value: QueryValue,
69}
70
71#[derive(Debug, Clone)]
72pub struct QueryRequest {
73    pub sql: String,
74    pub params: Vec<QueryParam>,
75    pub mode: QueryMode,
76    pub cache_policy: CachePolicy,
77}
78
79impl QueryRequest {
80    pub fn many(
81        sql: impl Into<String>,
82        params: Vec<QueryParam>,
83        cache_policy: CachePolicy,
84    ) -> Self {
85        Self {
86            sql: sql.into(),
87            params,
88            mode: QueryMode::Many,
89            cache_policy,
90        }
91    }
92
93    pub fn first_row(
94        sql: impl Into<String>,
95        params: Vec<QueryParam>,
96        cache_policy: CachePolicy,
97    ) -> Self {
98        Self {
99            sql: sql.into(),
100            params,
101            mode: QueryMode::FirstRow,
102            cache_policy,
103        }
104    }
105}
106
107#[derive(Debug, Clone)]
108pub enum QueryResponse {
109    Rows(Vec<RowData>),
110    Row(RowData),
111}
112
113impl QueryResponse {
114    pub fn into_rows(self) -> Vec<RowData> {
115        match self {
116            QueryResponse::Rows(rows) => rows,
117            QueryResponse::Row(row) => vec![row],
118        }
119    }
120
121    pub fn into_row(self) -> RowData {
122        match self {
123            QueryResponse::Rows(rows) => rows.into_iter().next().unwrap_or_default(),
124            QueryResponse::Row(row) => row,
125        }
126    }
127}
128
129#[async_trait]
130pub trait ProviderExecutor: Send + Sync {
131    fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>>;
132    fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>>;
133    fn query_row(&self, sql: &str) -> KnowledgeResult<RowData>;
134    fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData>;
135
136    async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
137        self.query(sql)
138    }
139
140    async fn query_fields_async(
141        &self,
142        sql: &str,
143        params: &[DataField],
144    ) -> KnowledgeResult<Vec<RowData>> {
145        self.query_fields(sql, params)
146    }
147
148    async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
149        self.query_row(sql)
150    }
151
152    async fn query_named_fields_async(
153        &self,
154        sql: &str,
155        params: &[DataField],
156    ) -> KnowledgeResult<RowData> {
157        self.query_named_fields(sql, params)
158    }
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
162pub enum QueryModeTag {
163    Many,
164    FirstRow,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq, Hash)]
168pub struct ResultCacheKey {
169    pub datasource_id: DatasourceId,
170    pub generation: Generation,
171    pub query_hash: u64,
172    pub params_hash: u64,
173    pub mode: QueryModeTag,
174}
175
176pub struct ProviderHandle {
177    pub provider: Arc<dyn ProviderExecutor>,
178    pub datasource_id: DatasourceId,
179    pub generation: Generation,
180    pub kind: ProviderKind,
181}
182
183#[derive(Debug, Clone)]
184pub struct RuntimeSnapshot {
185    pub provider_kind: Option<ProviderKind>,
186    pub datasource_id: Option<DatasourceId>,
187    pub generation: Option<Generation>,
188    pub result_cache_enabled: bool,
189    pub result_cache_len: usize,
190    pub result_cache_capacity: usize,
191    pub result_cache_ttl_ms: u64,
192    pub metadata_cache_len: usize,
193    pub metadata_cache_capacity: usize,
194    pub result_cache_hits: u64,
195    pub result_cache_misses: u64,
196    pub metadata_cache_hits: u64,
197    pub metadata_cache_misses: u64,
198    pub local_cache_hits: u64,
199    pub local_cache_misses: u64,
200    pub reload_successes: u64,
201    pub reload_failures: u64,
202}
203
204#[derive(Debug, Clone)]
205pub struct MetadataCacheScope {
206    pub datasource_id: DatasourceId,
207    pub generation: Generation,
208}
209
210#[derive(Debug, Clone, Copy)]
211pub struct ResultCacheConfig {
212    pub enabled: bool,
213    pub capacity: usize,
214    pub ttl: Duration,
215}
216
217impl Default for ResultCacheConfig {
218    fn default() -> Self {
219        Self {
220            enabled: true,
221            capacity: 1024,
222            ttl: Duration::from_millis(30_000),
223        }
224    }
225}
226
227#[derive(Debug, Clone)]
228struct CachedQueryResponse {
229    response: Arc<QueryResponse>,
230    cached_at: Instant,
231}
232
233// ---------------------------------------------------------------------------
234// Redis result cache types
235// ---------------------------------------------------------------------------
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
238pub(crate) enum RedisCmdTag {
239    BfExists,
240    HGet,
241    Get,
242    SetExists,
243}
244
245#[derive(Debug, Clone, PartialEq, Eq, Hash)]
246pub(crate) struct RedisCacheKey {
247    pub generation: u64,
248    pub cmd_tag: RedisCmdTag,
249    pub key_hash: u64,
250    pub args_hash: u64,
251}
252
253#[derive(Debug, Clone)]
254pub(crate) enum CachedRedisValue {
255    Bool(bool),
256    OptString(Option<String>),
257}
258
259#[derive(Debug, Clone)]
260struct CachedRedisEntry {
261    value: CachedRedisValue,
262    cached_at: Instant,
263    ttl_ms: u64, // 0 = no TTL (generation-only)
264}
265
266pub struct KnowledgeRuntime {
267    provider: RwLock<Option<Arc<ProviderHandle>>>,
268    next_generation: AtomicU64,
269    provider_epoch: AtomicU64,
270    current_generation_value: AtomicU64,
271    result_cache_config: RwLock<ResultCacheConfig>,
272    result_cache_enabled: AtomicBool,
273    result_cache_ttl_ms: AtomicU64,
274    result_cache: RwLock<LruCache<ResultCacheKey, CachedQueryResponse>>,
275    result_cache_hits: AtomicU64,
276    result_cache_misses: AtomicU64,
277    metadata_cache_hits: AtomicU64,
278    metadata_cache_misses: AtomicU64,
279    local_cache_hits: AtomicU64,
280    local_cache_misses: AtomicU64,
281    reload_successes: AtomicU64,
282    reload_failures: AtomicU64,
283    redis_cache: RwLock<LruCache<RedisCacheKey, CachedRedisEntry>>,
284    redis_cache_hits: AtomicU64,
285    redis_cache_misses: AtomicU64,
286    redis_global_enabled: AtomicBool,
287}
288
289impl KnowledgeRuntime {
290    pub fn new(result_cache_capacity: usize) -> Self {
291        let config = ResultCacheConfig {
292            capacity: result_cache_capacity.max(1),
293            ..ResultCacheConfig::default()
294        };
295        let capacity = NonZeroUsize::new(config.capacity).expect("non-zero capacity");
296        Self {
297            provider: RwLock::new(None),
298            next_generation: AtomicU64::new(0),
299            provider_epoch: AtomicU64::new(0),
300            current_generation_value: AtomicU64::new(0),
301            result_cache_config: RwLock::new(config),
302            result_cache_enabled: AtomicBool::new(config.enabled),
303            result_cache_ttl_ms: AtomicU64::new(config.ttl.as_millis() as u64),
304            result_cache: RwLock::new(LruCache::new(capacity)),
305            result_cache_hits: AtomicU64::new(0),
306            result_cache_misses: AtomicU64::new(0),
307            metadata_cache_hits: AtomicU64::new(0),
308            metadata_cache_misses: AtomicU64::new(0),
309            local_cache_hits: AtomicU64::new(0),
310            local_cache_misses: AtomicU64::new(0),
311            reload_successes: AtomicU64::new(0),
312            reload_failures: AtomicU64::new(0),
313            redis_cache: RwLock::new(LruCache::new(capacity)),
314            redis_cache_hits: AtomicU64::new(0),
315            redis_cache_misses: AtomicU64::new(0),
316            redis_global_enabled: AtomicBool::new(true),
317        }
318    }
319
320    pub fn install_provider<F>(
321        &self,
322        kind: ProviderKind,
323        datasource_id: DatasourceId,
324        build: F,
325    ) -> KnowledgeResult<Generation>
326    where
327        F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
328    {
329        let generation = Generation(self.next_generation.fetch_add(1, Ordering::SeqCst) + 1);
330        let previous = self
331            .provider
332            .read()
333            .ok()
334            .and_then(|guard| guard.as_ref().cloned());
335        debug_kdb!(
336            "[kdb] reload provider start kind={kind:?} datasource_id={} target_generation={} previous_generation={}",
337            datasource_id.0,
338            generation.0,
339            previous
340                .as_ref()
341                .map(|handle| handle.generation.0.to_string())
342                .unwrap_or_else(|| "none".to_string())
343        );
344        let provider = match build(generation) {
345            Ok(provider) => provider,
346            Err(err) => {
347                self.reload_failures.fetch_add(1, Ordering::Relaxed);
348                warn_kdb!(
349                    "[kdb] reload provider failed kind={kind:?} datasource_id={} target_generation={} err={}",
350                    datasource_id.0,
351                    generation.0,
352                    err
353                );
354                if telemetry_enabled() {
355                    telemetry().on_reload(&ReloadTelemetryEvent {
356                        outcome: ReloadOutcome::Failure,
357                        provider_kind: kind.clone(),
358                    });
359                }
360                return Err(err);
361            }
362        };
363        debug_kdb!(
364            "[kdb] install provider kind={kind:?} datasource_id={} generation={}",
365            datasource_id.0,
366            generation.0
367        );
368        let kind_for_handle = kind.clone();
369        let datasource_id_for_handle = datasource_id.clone();
370        let handle = Arc::new(ProviderHandle {
371            provider,
372            datasource_id: datasource_id_for_handle,
373            generation,
374            kind: kind_for_handle,
375        });
376        self.provider_epoch.fetch_add(1, Ordering::AcqRel);
377        {
378            let mut guard = self
379                .provider
380                .write()
381                .expect("runtime provider lock poisoned");
382            *guard = Some(handle);
383        }
384        self.current_generation_value
385            .store(generation.0, Ordering::Release);
386        self.provider_epoch.fetch_add(1, Ordering::Release);
387        self.reload_successes.fetch_add(1, Ordering::Relaxed);
388        if telemetry_enabled() {
389            telemetry().on_reload(&ReloadTelemetryEvent {
390                outcome: ReloadOutcome::Success,
391                provider_kind: kind.clone(),
392            });
393        }
394        debug_kdb!(
395            "[kdb] reload provider success kind={kind:?} datasource_id={} generation={}",
396            datasource_id.0,
397            generation.0
398        );
399        Ok(generation)
400    }
401
402    pub fn configure_result_cache(&self, enabled: bool, capacity: usize, ttl: Duration) {
403        let new_config = ResultCacheConfig {
404            enabled,
405            capacity: capacity.max(1),
406            ttl: ttl.max(Duration::from_millis(1)),
407        };
408        let mut should_reset_cache = false;
409        {
410            let mut guard = self
411                .result_cache_config
412                .write()
413                .expect("runtime result cache config lock poisoned");
414            if guard.capacity != new_config.capacity || (!new_config.enabled && guard.enabled) {
415                should_reset_cache = true;
416            }
417            *guard = new_config;
418        }
419        self.result_cache_enabled
420            .store(new_config.enabled, Ordering::Relaxed);
421        self.result_cache_ttl_ms.store(
422            new_config.ttl.as_millis().min(u128::from(u64::MAX)) as u64,
423            Ordering::Relaxed,
424        );
425
426        if should_reset_cache {
427            let mut cache = self
428                .result_cache
429                .write()
430                .expect("runtime result cache lock poisoned");
431            *cache = LruCache::new(
432                NonZeroUsize::new(new_config.capacity).expect("non-zero result cache capacity"),
433            );
434        }
435    }
436
437    pub fn configure_redis_cache(&self, global_enabled: bool, capacity: usize) {
438        let new_capacity =
439            NonZeroUsize::new(capacity.max(1)).expect("non-zero redis cache capacity");
440        if let Ok(mut cache) = self.redis_cache.write() {
441            *cache = LruCache::new(new_capacity);
442        }
443        self.redis_global_enabled
444            .store(global_enabled, Ordering::Relaxed);
445    }
446
447    pub fn current_generation(&self) -> Option<Generation> {
448        let epoch_before = self.provider_epoch.load(Ordering::Acquire);
449        if epoch_before % 2 == 1 {
450            return self.current_generation_from_provider();
451        }
452        let generation = self.current_generation_value.load(Ordering::Acquire);
453        let epoch_after = self.provider_epoch.load(Ordering::Acquire);
454        if epoch_before != epoch_after {
455            return self.current_generation_from_provider();
456        }
457        match generation {
458            0 => None,
459            generation => Some(Generation(generation)),
460        }
461    }
462
463    pub fn snapshot(&self) -> RuntimeSnapshot {
464        let provider = self
465            .provider
466            .read()
467            .ok()
468            .and_then(|guard| guard.as_ref().cloned());
469        let result_cache_config = self
470            .result_cache_config
471            .read()
472            .map(|guard| *guard)
473            .unwrap_or_default();
474        let (result_cache_len, result_cache_capacity) = self
475            .result_cache
476            .read()
477            .map(|cache| (cache.len(), cache.cap().get()))
478            .unwrap_or((0, 0));
479        let (metadata_cache_len, metadata_cache_capacity) =
480            crate::mem::query_util::column_metadata_cache_snapshot();
481        RuntimeSnapshot {
482            provider_kind: provider.as_ref().map(|handle| handle.kind.clone()),
483            datasource_id: provider.as_ref().map(|handle| handle.datasource_id.clone()),
484            generation: provider.as_ref().map(|handle| handle.generation),
485            result_cache_enabled: result_cache_config.enabled,
486            result_cache_len,
487            result_cache_capacity,
488            result_cache_ttl_ms: result_cache_config.ttl.as_millis() as u64,
489            metadata_cache_len,
490            metadata_cache_capacity,
491            result_cache_hits: self.result_cache_hits.load(Ordering::Relaxed),
492            result_cache_misses: self.result_cache_misses.load(Ordering::Relaxed),
493            metadata_cache_hits: self.metadata_cache_hits.load(Ordering::Relaxed),
494            metadata_cache_misses: self.metadata_cache_misses.load(Ordering::Relaxed),
495            local_cache_hits: self.local_cache_hits.load(Ordering::Relaxed),
496            local_cache_misses: self.local_cache_misses.load(Ordering::Relaxed),
497            reload_successes: self.reload_successes.load(Ordering::Relaxed),
498            reload_failures: self.reload_failures.load(Ordering::Relaxed),
499        }
500    }
501
502    pub fn current_metadata_scope(&self) -> MetadataCacheScope {
503        self.provider
504            .read()
505            .ok()
506            .and_then(|guard| guard.as_ref().cloned())
507            .map(|handle| MetadataCacheScope {
508                datasource_id: handle.datasource_id.clone(),
509                generation: handle.generation,
510            })
511            .unwrap_or_else(|| MetadataCacheScope {
512                datasource_id: DatasourceId("sqlite:standalone".to_string()),
513                generation: Generation(0),
514            })
515    }
516
517    pub fn current_provider_kind(&self) -> Option<ProviderKind> {
518        self.provider
519            .read()
520            .ok()
521            .and_then(|guard| guard.as_ref().map(|handle| handle.kind.clone()))
522    }
523
524    pub fn record_result_cache_hit(&self) {
525        self.result_cache_hits.fetch_add(1, Ordering::Relaxed);
526    }
527
528    pub fn record_result_cache_miss(&self) {
529        self.result_cache_misses.fetch_add(1, Ordering::Relaxed);
530    }
531
532    pub fn record_metadata_cache_hit(&self) {
533        self.metadata_cache_hits.fetch_add(1, Ordering::Relaxed);
534    }
535
536    pub fn record_metadata_cache_miss(&self) {
537        self.metadata_cache_misses.fetch_add(1, Ordering::Relaxed);
538    }
539
540    pub fn record_local_cache_hit(&self) {
541        self.local_cache_hits.fetch_add(1, Ordering::Relaxed);
542    }
543
544    pub fn record_local_cache_miss(&self) {
545        self.local_cache_misses.fetch_add(1, Ordering::Relaxed);
546    }
547
548    pub fn execute(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
549        let handle = self.current_handle()?;
550        self.execute_with_handle(&handle, req)
551    }
552
553    fn execute_with_handle(
554        &self,
555        handle: &Arc<ProviderHandle>,
556        req: &QueryRequest,
557    ) -> KnowledgeResult<QueryResponse> {
558        let use_global_cache =
559            matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
560        if use_global_cache && let Some(hit) = self.fetch_result_cache(handle, req) {
561            self.record_result_cache_hit();
562            if telemetry_enabled() {
563                telemetry().on_cache(&CacheTelemetryEvent {
564                    layer: CacheLayer::Result,
565                    outcome: CacheOutcome::Hit,
566                    provider_kind: Some(handle.kind.clone()),
567                });
568            }
569            debug_kdb!(
570                "[kdb] global result cache hit kind={:?} generation={}",
571                handle.kind,
572                handle.generation.0
573            );
574            return Ok(hit);
575        }
576        if use_global_cache {
577            self.record_result_cache_miss();
578            if telemetry_enabled() {
579                telemetry().on_cache(&CacheTelemetryEvent {
580                    layer: CacheLayer::Result,
581                    outcome: CacheOutcome::Miss,
582                    provider_kind: Some(handle.kind.clone()),
583                });
584            }
585            debug_kdb!(
586                "[kdb] global result cache miss kind={:?} generation={}",
587                handle.kind,
588                handle.generation.0
589            );
590        }
591
592        let params = params_to_fields(&req.params);
593        let mode_tag = query_mode_tag(&req.mode);
594        let started = Instant::now();
595        debug_kdb!(
596            "[kdb] execute query kind={:?} generation={} mode={:?} cache_policy={:?}",
597            handle.kind,
598            handle.generation.0,
599            req.mode,
600            req.cache_policy
601        );
602        let response = match match req.mode {
603            QueryMode::Many => {
604                if params.is_empty() {
605                    handle.provider.query(&req.sql).map(QueryResponse::Rows)
606                } else {
607                    handle
608                        .provider
609                        .query_fields(&req.sql, &params)
610                        .map(QueryResponse::Rows)
611                }
612            }
613            QueryMode::FirstRow => {
614                if params.is_empty() {
615                    handle.provider.query_row(&req.sql).map(QueryResponse::Row)
616                } else {
617                    handle
618                        .provider
619                        .query_named_fields(&req.sql, &params)
620                        .map(QueryResponse::Row)
621                }
622            }
623        } {
624            Ok(response) => {
625                if telemetry_enabled() {
626                    telemetry().on_query(&QueryTelemetryEvent {
627                        provider_kind: handle.kind.clone(),
628                        mode: mode_tag,
629                        success: true,
630                        elapsed: started.elapsed(),
631                    });
632                }
633                response
634            }
635            Err(err) => {
636                if telemetry_enabled() {
637                    telemetry().on_query(&QueryTelemetryEvent {
638                        provider_kind: handle.kind.clone(),
639                        mode: mode_tag,
640                        success: false,
641                        elapsed: started.elapsed(),
642                    });
643                }
644                return Err(err);
645            }
646        };
647
648        if use_global_cache {
649            self.save_result_cache(handle, req, response.clone());
650            debug_kdb!(
651                "[kdb] global result cache store kind={:?} generation={}",
652                handle.kind,
653                handle.generation.0
654            );
655        }
656
657        Ok(response)
658    }
659
660    pub fn execute_first_row_fields(
661        &self,
662        sql: &str,
663        params: &[DataField],
664        cache_policy: CachePolicy,
665    ) -> KnowledgeResult<RowData> {
666        let handle = self.current_handle()?;
667        self.execute_first_row_fields_with_handle(&handle, sql, params, cache_policy)
668    }
669
670    fn execute_first_row_fields_with_handle(
671        &self,
672        handle: &Arc<ProviderHandle>,
673        sql: &str,
674        params: &[DataField],
675        cache_policy: CachePolicy,
676    ) -> KnowledgeResult<RowData> {
677        let use_global_cache =
678            matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
679        if use_global_cache
680            && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
681                handle,
682                sql,
683                params,
684                QueryModeTag::FirstRow,
685            ))
686        {
687            self.record_result_cache_hit();
688            if telemetry_enabled() {
689                telemetry().on_cache(&CacheTelemetryEvent {
690                    layer: CacheLayer::Result,
691                    outcome: CacheOutcome::Hit,
692                    provider_kind: Some(handle.kind.clone()),
693                });
694            }
695            return Ok(hit.into_row());
696        }
697        if use_global_cache {
698            self.record_result_cache_miss();
699            if telemetry_enabled() {
700                telemetry().on_cache(&CacheTelemetryEvent {
701                    layer: CacheLayer::Result,
702                    outcome: CacheOutcome::Miss,
703                    provider_kind: Some(handle.kind.clone()),
704                });
705            }
706        }
707
708        let started = Instant::now();
709        let row = if params.is_empty() {
710            handle.provider.query_row(sql)
711        } else {
712            handle.provider.query_named_fields(sql, params)
713        };
714        let row = match row {
715            Ok(row) => {
716                if telemetry_enabled() {
717                    telemetry().on_query(&QueryTelemetryEvent {
718                        provider_kind: handle.kind.clone(),
719                        mode: QueryModeTag::FirstRow,
720                        success: true,
721                        elapsed: started.elapsed(),
722                    });
723                }
724                row
725            }
726            Err(err) => {
727                if telemetry_enabled() {
728                    telemetry().on_query(&QueryTelemetryEvent {
729                        provider_kind: handle.kind.clone(),
730                        mode: QueryModeTag::FirstRow,
731                        success: false,
732                        elapsed: started.elapsed(),
733                    });
734                }
735                return Err(err);
736            }
737        };
738
739        if use_global_cache {
740            self.save_result_cache_by_key(
741                result_cache_key_fields(handle, sql, params, QueryModeTag::FirstRow),
742                QueryResponse::Row(row.clone()),
743            );
744        }
745
746        Ok(row)
747    }
748
749    pub async fn execute_async(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
750        let handle = self.current_handle()?;
751        if matches!(handle.kind, ProviderKind::SqliteAuthority) {
752            let handle = handle.clone();
753            let req = req.clone();
754            return task::spawn_blocking(move || runtime().execute_with_handle(&handle, &req))
755                .await
756                .map_err(|err| {
757                    KnowReason::from_logic()
758                        .to_err()
759                        .with_detail(format!("knowledge async sqlite query join failed: {err}"))
760                })?;
761        }
762        let use_global_cache =
763            matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
764        if use_global_cache && let Some(hit) = self.fetch_result_cache(&handle, req) {
765            self.record_result_cache_hit();
766            if telemetry_enabled() {
767                telemetry().on_cache(&CacheTelemetryEvent {
768                    layer: CacheLayer::Result,
769                    outcome: CacheOutcome::Hit,
770                    provider_kind: Some(handle.kind.clone()),
771                });
772            }
773            return Ok(hit);
774        }
775        if use_global_cache {
776            self.record_result_cache_miss();
777            if telemetry_enabled() {
778                telemetry().on_cache(&CacheTelemetryEvent {
779                    layer: CacheLayer::Result,
780                    outcome: CacheOutcome::Miss,
781                    provider_kind: Some(handle.kind.clone()),
782                });
783            }
784        }
785
786        let params = params_to_fields(&req.params);
787        let mode_tag = query_mode_tag(&req.mode);
788        let started = Instant::now();
789        let response = match req.mode {
790            QueryMode::Many => {
791                if params.is_empty() {
792                    handle
793                        .provider
794                        .query_async(&req.sql)
795                        .await
796                        .map(QueryResponse::Rows)
797                } else {
798                    handle
799                        .provider
800                        .query_fields_async(&req.sql, &params)
801                        .await
802                        .map(QueryResponse::Rows)
803                }
804            }
805            QueryMode::FirstRow => {
806                if params.is_empty() {
807                    handle
808                        .provider
809                        .query_row_async(&req.sql)
810                        .await
811                        .map(QueryResponse::Row)
812                } else {
813                    handle
814                        .provider
815                        .query_named_fields_async(&req.sql, &params)
816                        .await
817                        .map(QueryResponse::Row)
818                }
819            }
820        };
821        let response = match response {
822            Ok(response) => {
823                if telemetry_enabled() {
824                    telemetry().on_query(&QueryTelemetryEvent {
825                        provider_kind: handle.kind.clone(),
826                        mode: mode_tag,
827                        success: true,
828                        elapsed: started.elapsed(),
829                    });
830                }
831                response
832            }
833            Err(err) => {
834                if telemetry_enabled() {
835                    telemetry().on_query(&QueryTelemetryEvent {
836                        provider_kind: handle.kind.clone(),
837                        mode: mode_tag,
838                        success: false,
839                        elapsed: started.elapsed(),
840                    });
841                }
842                return Err(err);
843            }
844        };
845
846        if use_global_cache {
847            self.save_result_cache(&handle, req, response.clone());
848        }
849
850        Ok(response)
851    }
852
853    pub async fn execute_first_row_fields_async(
854        &self,
855        sql: &str,
856        params: &[DataField],
857        cache_policy: CachePolicy,
858    ) -> KnowledgeResult<RowData> {
859        let handle = self.current_handle()?;
860        if matches!(handle.kind, ProviderKind::SqliteAuthority) {
861            let handle = handle.clone();
862            let sql = sql.to_string();
863            let params = params.to_vec();
864            return task::spawn_blocking(move || {
865                runtime().execute_first_row_fields_with_handle(&handle, &sql, &params, cache_policy)
866            })
867            .await
868            .map_err(|err| {
869                KnowReason::from_logic().to_err().with_detail(format!(
870                    "knowledge async sqlite first-row query join failed: {err}"
871                ))
872            })?;
873        }
874        let use_global_cache =
875            matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
876        if use_global_cache
877            && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
878                &handle,
879                sql,
880                params,
881                QueryModeTag::FirstRow,
882            ))
883        {
884            self.record_result_cache_hit();
885            if telemetry_enabled() {
886                telemetry().on_cache(&CacheTelemetryEvent {
887                    layer: CacheLayer::Result,
888                    outcome: CacheOutcome::Hit,
889                    provider_kind: Some(handle.kind.clone()),
890                });
891            }
892            return Ok(hit.into_row());
893        }
894        if use_global_cache {
895            self.record_result_cache_miss();
896            if telemetry_enabled() {
897                telemetry().on_cache(&CacheTelemetryEvent {
898                    layer: CacheLayer::Result,
899                    outcome: CacheOutcome::Miss,
900                    provider_kind: Some(handle.kind.clone()),
901                });
902            }
903        }
904
905        let started = Instant::now();
906        let row = if params.is_empty() {
907            handle.provider.query_row_async(sql).await
908        } else {
909            handle.provider.query_named_fields_async(sql, params).await
910        };
911        let row = match row {
912            Ok(row) => {
913                if telemetry_enabled() {
914                    telemetry().on_query(&QueryTelemetryEvent {
915                        provider_kind: handle.kind.clone(),
916                        mode: QueryModeTag::FirstRow,
917                        success: true,
918                        elapsed: started.elapsed(),
919                    });
920                }
921                row
922            }
923            Err(err) => {
924                if telemetry_enabled() {
925                    telemetry().on_query(&QueryTelemetryEvent {
926                        provider_kind: handle.kind.clone(),
927                        mode: QueryModeTag::FirstRow,
928                        success: false,
929                        elapsed: started.elapsed(),
930                    });
931                }
932                return Err(err);
933            }
934        };
935
936        if use_global_cache {
937            self.save_result_cache_by_key(
938                result_cache_key_fields(&handle, sql, params, QueryModeTag::FirstRow),
939                QueryResponse::Row(row.clone()),
940            );
941        }
942
943        Ok(row)
944    }
945
946    fn current_handle(&self) -> KnowledgeResult<Arc<ProviderHandle>> {
947        self.provider
948            .read()
949            .expect("runtime provider lock poisoned")
950            .clone()
951            .ok_or_else(|| {
952                KnowReason::from_logic()
953                    .to_err()
954                    .with_detail("knowledge provider not initialized")
955            })
956    }
957
958    fn current_generation_from_provider(&self) -> Option<Generation> {
959        self.provider
960            .read()
961            .ok()
962            .and_then(|guard| guard.as_ref().map(|handle| handle.generation))
963    }
964
965    fn fetch_result_cache(
966        &self,
967        handle: &ProviderHandle,
968        req: &QueryRequest,
969    ) -> Option<QueryResponse> {
970        self.fetch_result_cache_by_key(result_cache_key(handle, req))
971    }
972
973    fn fetch_result_cache_by_key(&self, key: ResultCacheKey) -> Option<QueryResponse> {
974        if !self.result_cache_enabled() {
975            return None;
976        }
977        let cached = self
978            .result_cache
979            .read()
980            .ok()
981            .and_then(|cache| cache.peek(&key).cloned())?;
982        if cached.cached_at.elapsed() > self.result_cache_ttl() {
983            if let Ok(mut cache) = self.result_cache.write() {
984                let _ = cache.pop(&key);
985            }
986            return None;
987        }
988        Some((*cached.response).clone())
989    }
990
991    fn save_result_cache(
992        &self,
993        handle: &ProviderHandle,
994        req: &QueryRequest,
995        response: QueryResponse,
996    ) {
997        self.save_result_cache_by_key(result_cache_key(handle, req), response);
998    }
999
1000    fn save_result_cache_by_key(&self, key: ResultCacheKey, response: QueryResponse) {
1001        if let Ok(mut cache) = self.result_cache.write() {
1002            cache.put(
1003                key,
1004                CachedQueryResponse {
1005                    response: Arc::new(response),
1006                    cached_at: Instant::now(),
1007                },
1008            );
1009        }
1010    }
1011
1012    // -----------------------------------------------------------------------
1013    // Redis result cache
1014    // -----------------------------------------------------------------------
1015
1016    fn redis_cache_enabled(&self) -> bool {
1017        self.result_cache_enabled.load(Ordering::Acquire)
1018    }
1019
1020    #[allow(dead_code)]
1021    fn redis_cache_ttl(&self) -> Duration {
1022        Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Acquire))
1023    }
1024
1025    fn fetch_redis_cache(&self, key: &RedisCacheKey) -> Option<CachedRedisEntry> {
1026        if !self.redis_cache_enabled() {
1027            return None;
1028        }
1029        let entry = self
1030            .redis_cache
1031            .read()
1032            .ok()
1033            .and_then(|cache| cache.peek(key).cloned())?;
1034        // Check TTL expiry
1035        if entry.ttl_ms > 0 && entry.cached_at.elapsed() > Duration::from_millis(entry.ttl_ms) {
1036            if let Ok(mut cache) = self.redis_cache.write() {
1037                let _ = cache.pop(key);
1038            }
1039            return None;
1040        }
1041        Some(entry)
1042    }
1043
1044    fn save_redis_cache(&self, key: RedisCacheKey, entry: CachedRedisEntry) {
1045        if !self.redis_cache_enabled() {
1046            return;
1047        }
1048        if let Ok(mut cache) = self.redis_cache.write() {
1049            cache.put(key, entry);
1050        }
1051    }
1052
1053    pub(crate) fn redis_cache_get(&self, ck: &RedisCacheKey) -> Option<CachedRedisValue> {
1054        if !self.redis_global_enabled.load(Ordering::Relaxed) {
1055            return None;
1056        }
1057        let entry = self.fetch_redis_cache(ck)?;
1058        self.redis_cache_hits.fetch_add(1, Ordering::Relaxed);
1059        Some(entry.value)
1060    }
1061
1062    pub(crate) fn redis_cache_put(&self, ck: RedisCacheKey, value: CachedRedisValue) {
1063        self.redis_cache_put_with_ttl(ck, value, 0);
1064    }
1065
1066    pub(crate) fn redis_cache_put_with_ttl(
1067        &self,
1068        ck: RedisCacheKey,
1069        value: CachedRedisValue,
1070        ttl_ms: u64,
1071    ) {
1072        if !self.redis_global_enabled.load(Ordering::Relaxed) {
1073            return;
1074        }
1075        self.redis_cache_misses.fetch_add(1, Ordering::Relaxed);
1076        self.save_redis_cache(
1077            ck,
1078            CachedRedisEntry {
1079                value,
1080                cached_at: Instant::now(),
1081                ttl_ms,
1082            },
1083        );
1084    }
1085
1086    #[allow(dead_code)]
1087    fn clear_redis_cache(&self) {
1088        if let Ok(mut cache) = self.redis_cache.write() {
1089            cache.clear();
1090        }
1091    }
1092
1093    #[inline]
1094    fn result_cache_enabled(&self) -> bool {
1095        self.result_cache_enabled.load(Ordering::Relaxed)
1096    }
1097
1098    #[inline]
1099    fn result_cache_ttl(&self) -> Duration {
1100        Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Relaxed))
1101    }
1102}
1103
1104pub fn runtime() -> &'static KnowledgeRuntime {
1105    static RUNTIME: OnceLock<KnowledgeRuntime> = OnceLock::new();
1106    RUNTIME.get_or_init(|| KnowledgeRuntime::new(1024))
1107}
1108
1109#[cfg(test)]
1110pub(crate) struct RuntimeTestGuard(tokio::sync::Mutex<()>);
1111
1112#[cfg(test)]
1113impl RuntimeTestGuard {
1114    pub(crate) fn lock(&self) -> Result<tokio::sync::MutexGuard<'_, ()>, std::convert::Infallible> {
1115        Ok(self.0.blocking_lock())
1116    }
1117
1118    pub(crate) async fn lock_async(&self) -> tokio::sync::MutexGuard<'_, ()> {
1119        self.0.lock().await
1120    }
1121}
1122
1123#[cfg(test)]
1124pub(crate) fn runtime_test_guard() -> &'static RuntimeTestGuard {
1125    static GUARD: OnceLock<RuntimeTestGuard> = OnceLock::new();
1126    GUARD.get_or_init(|| RuntimeTestGuard(tokio::sync::Mutex::new(())))
1127}
1128
1129fn result_cache_key(handle: &ProviderHandle, req: &QueryRequest) -> ResultCacheKey {
1130    ResultCacheKey {
1131        datasource_id: handle.datasource_id.clone(),
1132        generation: handle.generation,
1133        query_hash: stable_hash(&req.sql),
1134        params_hash: stable_params_hash(&req.params),
1135        mode: match req.mode {
1136            QueryMode::Many => QueryModeTag::Many,
1137            QueryMode::FirstRow => QueryModeTag::FirstRow,
1138        },
1139    }
1140}
1141
1142fn result_cache_key_fields(
1143    handle: &ProviderHandle,
1144    sql: &str,
1145    params: &[DataField],
1146    mode: QueryModeTag,
1147) -> ResultCacheKey {
1148    ResultCacheKey {
1149        datasource_id: handle.datasource_id.clone(),
1150        generation: handle.generation,
1151        query_hash: stable_hash(sql),
1152        params_hash: stable_field_params_hash(params),
1153        mode,
1154    }
1155}
1156
1157fn query_mode_tag(mode: &QueryMode) -> QueryModeTag {
1158    match mode {
1159        QueryMode::Many => QueryModeTag::Many,
1160        QueryMode::FirstRow => QueryModeTag::FirstRow,
1161    }
1162}
1163
1164fn stable_hash(value: &str) -> u64 {
1165    let mut hasher = DefaultHasher::new();
1166    value.hash(&mut hasher);
1167    hasher.finish()
1168}
1169
1170fn stable_params_hash(params: &[QueryParam]) -> u64 {
1171    let mut hasher = DefaultHasher::new();
1172    for param in params {
1173        param.name.hash(&mut hasher);
1174        match &param.value {
1175            QueryValue::Null => 0u8.hash(&mut hasher),
1176            QueryValue::Bool(value) => {
1177                1u8.hash(&mut hasher);
1178                value.hash(&mut hasher);
1179            }
1180            QueryValue::Int(value) => {
1181                2u8.hash(&mut hasher);
1182                value.hash(&mut hasher);
1183            }
1184            QueryValue::Float(value) => {
1185                3u8.hash(&mut hasher);
1186                value.to_bits().hash(&mut hasher);
1187            }
1188            QueryValue::Text(value) => {
1189                4u8.hash(&mut hasher);
1190                value.hash(&mut hasher);
1191            }
1192        }
1193    }
1194    hasher.finish()
1195}
1196
1197fn stable_field_params_hash(params: &[DataField]) -> u64 {
1198    let mut hasher = DefaultHasher::new();
1199    for field in params {
1200        field.get_name().hash(&mut hasher);
1201        match field.get_value() {
1202            Value::Null | Value::Ignore(_) => 0u8.hash(&mut hasher),
1203            Value::Bool(value) => {
1204                1u8.hash(&mut hasher);
1205                value.hash(&mut hasher);
1206            }
1207            Value::Digit(value) => {
1208                2u8.hash(&mut hasher);
1209                value.hash(&mut hasher);
1210            }
1211            Value::Float(value) => {
1212                3u8.hash(&mut hasher);
1213                value.to_bits().hash(&mut hasher);
1214            }
1215            Value::Chars(value) => {
1216                4u8.hash(&mut hasher);
1217                value.hash(&mut hasher);
1218            }
1219            Value::Symbol(value) => {
1220                5u8.hash(&mut hasher);
1221                value.hash(&mut hasher);
1222            }
1223            Value::Time(value) => {
1224                6u8.hash(&mut hasher);
1225                value.hash(&mut hasher);
1226            }
1227            Value::Hex(value) => {
1228                7u8.hash(&mut hasher);
1229                value.to_string().hash(&mut hasher);
1230            }
1231            Value::IpNet(value) => {
1232                8u8.hash(&mut hasher);
1233                value.to_string().hash(&mut hasher);
1234            }
1235            Value::IpAddr(value) => {
1236                9u8.hash(&mut hasher);
1237                value.hash(&mut hasher);
1238            }
1239            Value::Obj(value) => {
1240                10u8.hash(&mut hasher);
1241                format!("{:?}", value).hash(&mut hasher);
1242            }
1243            Value::Array(value) => {
1244                11u8.hash(&mut hasher);
1245                format!("{:?}", value).hash(&mut hasher);
1246            }
1247            Value::Domain(value) => {
1248                12u8.hash(&mut hasher);
1249                value.0.hash(&mut hasher);
1250            }
1251            Value::Url(value) => {
1252                13u8.hash(&mut hasher);
1253                value.0.hash(&mut hasher);
1254            }
1255            Value::Email(value) => {
1256                14u8.hash(&mut hasher);
1257                value.0.hash(&mut hasher);
1258            }
1259            Value::IdCard(value) => {
1260                15u8.hash(&mut hasher);
1261                value.0.hash(&mut hasher);
1262            }
1263            Value::MobilePhone(value) => {
1264                16u8.hash(&mut hasher);
1265                value.0.hash(&mut hasher);
1266            }
1267        }
1268    }
1269    hasher.finish()
1270}
1271
1272pub fn fields_to_params(params: &[DataField]) -> Vec<QueryParam> {
1273    params
1274        .iter()
1275        .map(|field| {
1276            let value = match field.get_value() {
1277                Value::Null | Value::Ignore(_) => QueryValue::Null,
1278                Value::Bool(value) => QueryValue::Bool(*value),
1279                Value::Digit(value) => QueryValue::Int(*value),
1280                Value::Float(value) => QueryValue::Float(*value),
1281                Value::Chars(value) => QueryValue::Text(value.to_string()),
1282                Value::Symbol(value) => QueryValue::Text(value.to_string()),
1283                Value::Time(value) => QueryValue::Text(value.to_string()),
1284                Value::Hex(value) => QueryValue::Text(value.to_string()),
1285                Value::IpNet(value) => QueryValue::Text(value.to_string()),
1286                Value::IpAddr(value) => QueryValue::Text(value.to_string()),
1287                Value::Obj(value) => QueryValue::Text(format!("{:?}", value)),
1288                Value::Array(value) => QueryValue::Text(format!("{:?}", value)),
1289                Value::Domain(value) => QueryValue::Text(value.0.to_string()),
1290                Value::Url(value) => QueryValue::Text(value.0.to_string()),
1291                Value::Email(value) => QueryValue::Text(value.0.to_string()),
1292                Value::IdCard(value) => QueryValue::Text(value.0.to_string()),
1293                Value::MobilePhone(value) => QueryValue::Text(value.0.to_string()),
1294            };
1295            QueryParam {
1296                name: field.get_name().to_string(),
1297                value,
1298            }
1299        })
1300        .collect()
1301}
1302
1303pub fn params_to_fields(params: &[QueryParam]) -> Vec<DataField> {
1304    params
1305        .iter()
1306        .map(|param| match &param.value {
1307            QueryValue::Null => {
1308                DataField::new(DataType::default(), param.name.clone(), Value::Null)
1309            }
1310            QueryValue::Bool(value) => {
1311                DataField::new(DataType::default(), param.name.clone(), Value::Bool(*value))
1312            }
1313            QueryValue::Int(value) => DataField::from_digit(param.name.clone(), *value),
1314            QueryValue::Float(value) => DataField::from_float(param.name.clone(), *value),
1315            QueryValue::Text(value) => DataField::from_chars(param.name.clone(), value.clone()),
1316        })
1317        .collect()
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322    use super::*;
1323    use async_trait::async_trait;
1324    use std::sync::Arc;
1325    use wp_model_core::model::Value;
1326
1327    struct TestProvider {
1328        value: &'static str,
1329    }
1330
1331    #[async_trait]
1332    impl ProviderExecutor for TestProvider {
1333        fn query(&self, _sql: &str) -> KnowledgeResult<Vec<RowData>> {
1334            Ok(vec![vec![DataField::from_chars("value", self.value)]])
1335        }
1336
1337        fn query_fields(&self, _sql: &str, _params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
1338            self.query("")
1339        }
1340
1341        fn query_row(&self, _sql: &str) -> KnowledgeResult<RowData> {
1342            Ok(vec![DataField::from_chars("value", self.value)])
1343        }
1344
1345        fn query_named_fields(
1346            &self,
1347            _sql: &str,
1348            _params: &[DataField],
1349        ) -> KnowledgeResult<RowData> {
1350            self.query_row("")
1351        }
1352    }
1353
1354    #[test]
1355    fn query_param_hash_is_stable() {
1356        let params = vec![
1357            QueryParam {
1358                name: ":id".to_string(),
1359                value: QueryValue::Int(7),
1360            },
1361            QueryParam {
1362                name: ":name".to_string(),
1363                value: QueryValue::Text("abc".to_string()),
1364            },
1365        ];
1366        assert_eq!(stable_params_hash(&params), stable_params_hash(&params));
1367    }
1368
1369    #[test]
1370    fn fields_to_params_preserves_raw_chars_value() {
1371        let fields = [DataField::from_chars(
1372            ":name".to_string(),
1373            "令狐冲".to_string(),
1374        )];
1375        let params = fields_to_params(&fields);
1376        assert_eq!(params.len(), 1);
1377        match &params[0].value {
1378            QueryValue::Text(value) => assert_eq!(value, "令狐冲"),
1379            other => panic!("unexpected param value: {other:?}"),
1380        }
1381        let roundtrip = params_to_fields(&params);
1382        assert!(matches!(roundtrip[0].get_value(), Value::Chars(_)));
1383    }
1384
1385    #[tokio::test(flavor = "current_thread")]
1386    async fn sqlite_async_bridge_keeps_captured_handle_after_reload() {
1387        let _guard = runtime_test_guard().lock_async().await;
1388        runtime()
1389            .install_provider(
1390                ProviderKind::SqliteAuthority,
1391                DatasourceId("sqlite:old".to_string()),
1392                |_generation| Ok(Arc::new(TestProvider { value: "old" })),
1393            )
1394            .expect("install old provider");
1395        let old_handle = runtime().current_handle().expect("current old handle");
1396
1397        runtime()
1398            .install_provider(
1399                ProviderKind::SqliteAuthority,
1400                DatasourceId("sqlite:new".to_string()),
1401                |_generation| Ok(Arc::new(TestProvider { value: "new" })),
1402            )
1403            .expect("install new provider");
1404
1405        let req = QueryRequest::first_row("SELECT value", Vec::new(), CachePolicy::Bypass);
1406        let row = task::spawn_blocking(move || runtime().execute_with_handle(&old_handle, &req))
1407            .await
1408            .expect("join sqlite bridge")
1409            .expect("execute old handle")
1410            .into_row();
1411        assert_eq!(row[0].to_string(), "chars(old)");
1412    }
1413
1414    // -----------------------------------------------------------------------
1415    // Redis cache unit tests
1416    // -----------------------------------------------------------------------
1417
1418    fn redis_ck(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
1419        let mut hasher = DefaultHasher::new();
1420        key.hash(&mut hasher);
1421        let key_hash = hasher.finish();
1422        let mut hasher = DefaultHasher::new();
1423        for arg in args {
1424            arg.hash(&mut hasher);
1425        }
1426        let args_hash = hasher.finish();
1427        RedisCacheKey {
1428            generation,
1429            cmd_tag: cmd,
1430            key_hash,
1431            args_hash,
1432        }
1433    }
1434
1435    #[test]
1436    fn redis_cache_hit_and_miss() {
1437        let rt = KnowledgeRuntime::new(64);
1438        rt.configure_redis_cache(true, 64);
1439
1440        let ck = redis_ck(RedisCmdTag::Get, 1, "user:1", &[]);
1441        // First access — miss
1442        assert!(rt.redis_cache_get(&ck).is_none());
1443        // Store
1444        rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1445        // Second access — hit
1446        let val = rt.redis_cache_get(&ck).expect("should hit cache");
1447        assert!(matches!(val, CachedRedisValue::Bool(true)));
1448    }
1449
1450    #[test]
1451    fn redis_cache_global_enabled_access() {
1452        let rt = KnowledgeRuntime::new(64);
1453        rt.configure_redis_cache(true, 64);
1454
1455        let ck = redis_ck(RedisCmdTag::Get, 1, "k", &[]);
1456        rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1457        assert!(rt.redis_cache_get(&ck).is_some());
1458    }
1459
1460    #[test]
1461    fn redis_cache_global_disabled_blocks_all() {
1462        let rt = KnowledgeRuntime::new(64);
1463        rt.configure_redis_cache(false, 64);
1464
1465        let ck = redis_ck(RedisCmdTag::BfExists, 1, "any_key", &["item"]);
1466        rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1467        // Global disabled — no reads
1468        assert!(rt.redis_cache_get(&ck).is_none());
1469    }
1470
1471    #[test]
1472    fn redis_cache_ttl_expiry() {
1473        let rt = KnowledgeRuntime::new(64);
1474        rt.configure_redis_cache(true, 64);
1475
1476        let ck = redis_ck(RedisCmdTag::BfExists, 1, "k", &["item"]);
1477        // Store with 1ms TTL
1478        rt.redis_cache_put_with_ttl(
1479            ck.clone(),
1480            CachedRedisValue::Bool(true),
1481            1, // 1ms TTL
1482        );
1483        // Immediately valid
1484        assert!(rt.redis_cache_get(&ck).is_some());
1485        // Wait for TTL to expire
1486        std::thread::sleep(std::time::Duration::from_millis(5));
1487        // Should be expired
1488        assert!(rt.redis_cache_get(&ck).is_none());
1489    }
1490
1491    #[test]
1492    fn redis_cache_no_ttl_never_expires() {
1493        let rt = KnowledgeRuntime::new(64);
1494        rt.configure_redis_cache(true, 64);
1495
1496        let ck = redis_ck(RedisCmdTag::Get, 1, "k", &[]);
1497        // Store with ttl = 0 (generation-only)
1498        rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1499        // Should be valid
1500        assert!(rt.redis_cache_get(&ck).is_some());
1501        std::thread::sleep(std::time::Duration::from_millis(5));
1502        // Still valid — 0 means no TTL
1503        assert!(rt.redis_cache_get(&ck).is_some());
1504    }
1505
1506    #[test]
1507    fn redis_cache_generation_isolation() {
1508        let rt = KnowledgeRuntime::new(64);
1509        rt.configure_redis_cache(true, 64);
1510
1511        let ck_gen1 = redis_ck(RedisCmdTag::BfExists, 1, "key", &["item"]);
1512        let ck_gen2 = redis_ck(RedisCmdTag::BfExists, 2, "key", &["item"]);
1513
1514        // Store with generation 1
1515        rt.redis_cache_put(ck_gen1.clone(), CachedRedisValue::Bool(false));
1516
1517        // Same key but generation 2 — miss
1518        assert!(rt.redis_cache_get(&ck_gen2).is_none());
1519        // Generation 1 — hit
1520        assert!(rt.redis_cache_get(&ck_gen1).is_some());
1521    }
1522}