1use std::collections::HashMap;
2use std::collections::hash_map::DefaultHasher;
3use std::hash::{Hash, Hasher};
4use std::num::NonZeroUsize;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, OnceLock, RwLock};
7use std::time::{Duration, Instant};
8
9use crate::error::{KnowReason, KnowledgeResult};
10use async_trait::async_trait;
11use lru::LruCache;
12use orion_error::conversion::ToStructError;
13use tokio::task;
14use wp_log::{debug_kdb, warn_kdb};
15use wp_model_core::model::{DataField, DataType, Value};
16
17use crate::loader::ProviderKind;
18use crate::mem::RowData;
19use crate::telemetry::{
20 CacheLayer, CacheOutcome, CacheTelemetryEvent, QueryTelemetryEvent, ReloadOutcome,
21 ReloadTelemetryEvent, telemetry, telemetry_enabled,
22};
23
24#[derive(Debug, Clone, PartialEq, Eq, Hash)]
25pub struct DatasourceId(pub String);
26
27impl DatasourceId {
28 pub fn from_seed(kind: ProviderKind, seed: &str) -> Self {
29 let mut hasher = DefaultHasher::new();
30 seed.hash(&mut hasher);
31 let kind_str = match kind {
32 ProviderKind::SqliteAuthority => "sqlite",
33 ProviderKind::Postgres => "postgres",
34 ProviderKind::Mysql => "mysql",
35 ProviderKind::Redis => "redis",
36 };
37 Self(format!("{kind_str}:{:016x}", hasher.finish()))
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
42pub struct Generation(pub u64);
43
44#[derive(Debug, Clone)]
45pub enum QueryMode {
46 Many,
47 FirstRow,
48}
49
50#[derive(Debug, Clone, Copy)]
51pub enum CachePolicy {
52 Bypass,
53 UseGlobal,
54 UseCallScope,
55}
56
57#[derive(Debug, Clone)]
58pub enum QueryValue {
59 Null,
60 Bool(bool),
61 Int(i64),
62 Float(f64),
63 Text(String),
64}
65
66#[derive(Debug, Clone)]
67pub struct QueryParam {
68 pub name: String,
69 pub value: QueryValue,
70}
71
72#[derive(Debug, Clone)]
73pub struct QueryRequest {
74 pub sql: String,
75 pub params: Vec<QueryParam>,
76 pub mode: QueryMode,
77 pub cache_policy: CachePolicy,
78}
79
80impl QueryRequest {
81 pub fn many(
82 sql: impl Into<String>,
83 params: Vec<QueryParam>,
84 cache_policy: CachePolicy,
85 ) -> Self {
86 Self {
87 sql: sql.into(),
88 params,
89 mode: QueryMode::Many,
90 cache_policy,
91 }
92 }
93
94 pub fn first_row(
95 sql: impl Into<String>,
96 params: Vec<QueryParam>,
97 cache_policy: CachePolicy,
98 ) -> Self {
99 Self {
100 sql: sql.into(),
101 params,
102 mode: QueryMode::FirstRow,
103 cache_policy,
104 }
105 }
106}
107
108#[derive(Debug, Clone)]
109pub enum QueryResponse {
110 Rows(Vec<RowData>),
111 Row(RowData),
112}
113
114impl QueryResponse {
115 pub fn into_rows(self) -> Vec<RowData> {
116 match self {
117 QueryResponse::Rows(rows) => rows,
118 QueryResponse::Row(row) => vec![row],
119 }
120 }
121
122 pub fn into_row(self) -> RowData {
123 match self {
124 QueryResponse::Rows(rows) => rows.into_iter().next().unwrap_or_default(),
125 QueryResponse::Row(row) => row,
126 }
127 }
128}
129
130#[async_trait]
131pub trait ProviderExecutor: Send + Sync {
132 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>>;
133 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>>;
134 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData>;
135 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData>;
136
137 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
138 self.query(sql)
139 }
140
141 async fn query_fields_async(
142 &self,
143 sql: &str,
144 params: &[DataField],
145 ) -> KnowledgeResult<Vec<RowData>> {
146 self.query_fields(sql, params)
147 }
148
149 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
150 self.query_row(sql)
151 }
152
153 async fn query_named_fields_async(
154 &self,
155 sql: &str,
156 params: &[DataField],
157 ) -> KnowledgeResult<RowData> {
158 self.query_named_fields(sql, params)
159 }
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
163pub enum QueryModeTag {
164 Many,
165 FirstRow,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, Hash)]
169pub struct ResultCacheKey {
170 pub datasource_id: DatasourceId,
171 pub generation: Generation,
172 pub query_hash: u64,
173 pub params_hash: u64,
174 pub mode: QueryModeTag,
175}
176
177pub struct ProviderHandle {
178 pub provider: Arc<dyn ProviderExecutor>,
179 pub datasource_id: DatasourceId,
180 pub generation: Generation,
181 pub kind: ProviderKind,
182}
183
184#[derive(Debug, Clone)]
185pub struct RuntimeSnapshot {
186 pub provider_kind: Option<ProviderKind>,
187 pub datasource_id: Option<DatasourceId>,
188 pub generation: Option<Generation>,
189 pub result_cache_enabled: bool,
190 pub result_cache_len: usize,
191 pub result_cache_capacity: usize,
192 pub result_cache_ttl_ms: u64,
193 pub metadata_cache_len: usize,
194 pub metadata_cache_capacity: usize,
195 pub result_cache_hits: u64,
196 pub result_cache_misses: u64,
197 pub metadata_cache_hits: u64,
198 pub metadata_cache_misses: u64,
199 pub local_cache_hits: u64,
200 pub local_cache_misses: u64,
201 pub reload_successes: u64,
202 pub reload_failures: u64,
203}
204
205#[derive(Debug, Clone)]
206pub struct MetadataCacheScope {
207 pub datasource_id: DatasourceId,
208 pub generation: Generation,
209}
210
211#[derive(Debug, Clone, Copy)]
212pub struct ResultCacheConfig {
213 pub enabled: bool,
214 pub capacity: usize,
215 pub ttl: Duration,
216}
217
218impl Default for ResultCacheConfig {
219 fn default() -> Self {
220 Self {
221 enabled: true,
222 capacity: 1024,
223 ttl: Duration::from_millis(30_000),
224 }
225 }
226}
227
228#[derive(Debug, Clone)]
229struct CachedQueryResponse {
230 response: Arc<QueryResponse>,
231 cached_at: Instant,
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
239pub(crate) enum RedisCmdTag {
240 BfExists,
241 HGet,
242 Get,
243 SetExists,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Hash)]
247pub(crate) struct RedisCacheKey {
248 pub generation: u64,
249 pub cmd_tag: RedisCmdTag,
250 pub key_hash: u64,
251 pub args_hash: u64,
252}
253
254#[derive(Debug, Clone)]
255pub(crate) enum CachedRedisValue {
256 Bool(bool),
257 OptString(Option<String>),
258}
259
260pub struct KnowledgeRuntime {
261 provider: RwLock<Option<Arc<ProviderHandle>>>,
262 next_generation: AtomicU64,
263 provider_epoch: AtomicU64,
264 current_generation_value: AtomicU64,
265 result_cache_config: RwLock<ResultCacheConfig>,
266 result_cache_enabled: AtomicBool,
267 result_cache_ttl_ms: AtomicU64,
268 result_cache: RwLock<LruCache<ResultCacheKey, CachedQueryResponse>>,
269 result_cache_hits: AtomicU64,
270 result_cache_misses: AtomicU64,
271 metadata_cache_hits: AtomicU64,
272 metadata_cache_misses: AtomicU64,
273 local_cache_hits: AtomicU64,
274 local_cache_misses: AtomicU64,
275 reload_successes: AtomicU64,
276 reload_failures: AtomicU64,
277 redis_cache: RwLock<LruCache<RedisCacheKey, CachedRedisValue>>,
278 redis_cache_hits: AtomicU64,
279 redis_cache_misses: AtomicU64,
280 redis_global_enabled: AtomicBool,
281 redis_enabled_map: RwLock<HashMap<String, bool>>,
282}
283
284impl KnowledgeRuntime {
285 pub fn new(result_cache_capacity: usize) -> Self {
286 let config = ResultCacheConfig {
287 capacity: result_cache_capacity.max(1),
288 ..ResultCacheConfig::default()
289 };
290 let capacity = NonZeroUsize::new(config.capacity).expect("non-zero capacity");
291 Self {
292 provider: RwLock::new(None),
293 next_generation: AtomicU64::new(0),
294 provider_epoch: AtomicU64::new(0),
295 current_generation_value: AtomicU64::new(0),
296 result_cache_config: RwLock::new(config),
297 result_cache_enabled: AtomicBool::new(config.enabled),
298 result_cache_ttl_ms: AtomicU64::new(config.ttl.as_millis() as u64),
299 result_cache: RwLock::new(LruCache::new(capacity)),
300 result_cache_hits: AtomicU64::new(0),
301 result_cache_misses: AtomicU64::new(0),
302 metadata_cache_hits: AtomicU64::new(0),
303 metadata_cache_misses: AtomicU64::new(0),
304 local_cache_hits: AtomicU64::new(0),
305 local_cache_misses: AtomicU64::new(0),
306 reload_successes: AtomicU64::new(0),
307 reload_failures: AtomicU64::new(0),
308 redis_cache: RwLock::new(LruCache::new(capacity)),
309 redis_cache_hits: AtomicU64::new(0),
310 redis_cache_misses: AtomicU64::new(0),
311 redis_global_enabled: AtomicBool::new(true),
312 redis_enabled_map: RwLock::new(HashMap::new()),
313 }
314 }
315
316 pub fn install_provider<F>(
317 &self,
318 kind: ProviderKind,
319 datasource_id: DatasourceId,
320 build: F,
321 ) -> KnowledgeResult<Generation>
322 where
323 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
324 {
325 let generation = Generation(self.next_generation.fetch_add(1, Ordering::SeqCst) + 1);
326 let previous = self
327 .provider
328 .read()
329 .ok()
330 .and_then(|guard| guard.as_ref().cloned());
331 debug_kdb!(
332 "[kdb] reload provider start kind={kind:?} datasource_id={} target_generation={} previous_generation={}",
333 datasource_id.0,
334 generation.0,
335 previous
336 .as_ref()
337 .map(|handle| handle.generation.0.to_string())
338 .unwrap_or_else(|| "none".to_string())
339 );
340 let provider = match build(generation) {
341 Ok(provider) => provider,
342 Err(err) => {
343 self.reload_failures.fetch_add(1, Ordering::Relaxed);
344 warn_kdb!(
345 "[kdb] reload provider failed kind={kind:?} datasource_id={} target_generation={} err={}",
346 datasource_id.0,
347 generation.0,
348 err
349 );
350 if telemetry_enabled() {
351 telemetry().on_reload(&ReloadTelemetryEvent {
352 outcome: ReloadOutcome::Failure,
353 provider_kind: kind.clone(),
354 });
355 }
356 return Err(err);
357 }
358 };
359 debug_kdb!(
360 "[kdb] install provider kind={kind:?} datasource_id={} generation={}",
361 datasource_id.0,
362 generation.0
363 );
364 let kind_for_handle = kind.clone();
365 let datasource_id_for_handle = datasource_id.clone();
366 let handle = Arc::new(ProviderHandle {
367 provider,
368 datasource_id: datasource_id_for_handle,
369 generation,
370 kind: kind_for_handle,
371 });
372 self.provider_epoch.fetch_add(1, Ordering::AcqRel);
373 {
374 let mut guard = self
375 .provider
376 .write()
377 .expect("runtime provider lock poisoned");
378 *guard = Some(handle);
379 }
380 self.current_generation_value
381 .store(generation.0, Ordering::Release);
382 self.provider_epoch.fetch_add(1, Ordering::Release);
383 self.reload_successes.fetch_add(1, Ordering::Relaxed);
384 if telemetry_enabled() {
385 telemetry().on_reload(&ReloadTelemetryEvent {
386 outcome: ReloadOutcome::Success,
387 provider_kind: kind.clone(),
388 });
389 }
390 debug_kdb!(
391 "[kdb] reload provider success kind={kind:?} datasource_id={} generation={}",
392 datasource_id.0,
393 generation.0
394 );
395 Ok(generation)
396 }
397
398 pub fn configure_result_cache(&self, enabled: bool, capacity: usize, ttl: Duration) {
399 let new_config = ResultCacheConfig {
400 enabled,
401 capacity: capacity.max(1),
402 ttl: ttl.max(Duration::from_millis(1)),
403 };
404 let mut should_reset_cache = false;
405 {
406 let mut guard = self
407 .result_cache_config
408 .write()
409 .expect("runtime result cache config lock poisoned");
410 if guard.capacity != new_config.capacity || (!new_config.enabled && guard.enabled) {
411 should_reset_cache = true;
412 }
413 *guard = new_config;
414 }
415 self.result_cache_enabled
416 .store(new_config.enabled, Ordering::Relaxed);
417 self.result_cache_ttl_ms.store(
418 new_config.ttl.as_millis().min(u128::from(u64::MAX)) as u64,
419 Ordering::Relaxed,
420 );
421
422 if should_reset_cache {
423 let mut cache = self
424 .result_cache
425 .write()
426 .expect("runtime result cache lock poisoned");
427 *cache = LruCache::new(
428 NonZeroUsize::new(new_config.capacity).expect("non-zero result cache capacity"),
429 );
430 }
431 }
432
433 pub fn configure_redis_cache(
434 &self,
435 global_enabled: bool,
436 capacity: usize,
437 key_enabled_map: HashMap<String, bool>,
438 ) {
439 let new_capacity =
440 NonZeroUsize::new(capacity.max(1)).expect("non-zero redis cache capacity");
441 if let Ok(mut cache) = self.redis_cache.write() {
442 *cache = LruCache::new(new_capacity);
443 }
444 *self
445 .redis_enabled_map
446 .write()
447 .expect("redis enabled map lock poisoned") = key_enabled_map;
448 self.redis_global_enabled
449 .store(global_enabled, Ordering::Relaxed);
450 }
451
452 pub fn current_generation(&self) -> Option<Generation> {
453 let epoch_before = self.provider_epoch.load(Ordering::Acquire);
454 if epoch_before % 2 == 1 {
455 return self.current_generation_from_provider();
456 }
457 let generation = self.current_generation_value.load(Ordering::Acquire);
458 let epoch_after = self.provider_epoch.load(Ordering::Acquire);
459 if epoch_before != epoch_after {
460 return self.current_generation_from_provider();
461 }
462 match generation {
463 0 => None,
464 generation => Some(Generation(generation)),
465 }
466 }
467
468 pub fn snapshot(&self) -> RuntimeSnapshot {
469 let provider = self
470 .provider
471 .read()
472 .ok()
473 .and_then(|guard| guard.as_ref().cloned());
474 let result_cache_config = self
475 .result_cache_config
476 .read()
477 .map(|guard| *guard)
478 .unwrap_or_default();
479 let (result_cache_len, result_cache_capacity) = self
480 .result_cache
481 .read()
482 .map(|cache| (cache.len(), cache.cap().get()))
483 .unwrap_or((0, 0));
484 let (metadata_cache_len, metadata_cache_capacity) =
485 crate::mem::query_util::column_metadata_cache_snapshot();
486 RuntimeSnapshot {
487 provider_kind: provider.as_ref().map(|handle| handle.kind.clone()),
488 datasource_id: provider.as_ref().map(|handle| handle.datasource_id.clone()),
489 generation: provider.as_ref().map(|handle| handle.generation),
490 result_cache_enabled: result_cache_config.enabled,
491 result_cache_len,
492 result_cache_capacity,
493 result_cache_ttl_ms: result_cache_config.ttl.as_millis() as u64,
494 metadata_cache_len,
495 metadata_cache_capacity,
496 result_cache_hits: self.result_cache_hits.load(Ordering::Relaxed),
497 result_cache_misses: self.result_cache_misses.load(Ordering::Relaxed),
498 metadata_cache_hits: self.metadata_cache_hits.load(Ordering::Relaxed),
499 metadata_cache_misses: self.metadata_cache_misses.load(Ordering::Relaxed),
500 local_cache_hits: self.local_cache_hits.load(Ordering::Relaxed),
501 local_cache_misses: self.local_cache_misses.load(Ordering::Relaxed),
502 reload_successes: self.reload_successes.load(Ordering::Relaxed),
503 reload_failures: self.reload_failures.load(Ordering::Relaxed),
504 }
505 }
506
507 pub fn current_metadata_scope(&self) -> MetadataCacheScope {
508 self.provider
509 .read()
510 .ok()
511 .and_then(|guard| guard.as_ref().cloned())
512 .map(|handle| MetadataCacheScope {
513 datasource_id: handle.datasource_id.clone(),
514 generation: handle.generation,
515 })
516 .unwrap_or_else(|| MetadataCacheScope {
517 datasource_id: DatasourceId("sqlite:standalone".to_string()),
518 generation: Generation(0),
519 })
520 }
521
522 pub fn current_provider_kind(&self) -> Option<ProviderKind> {
523 self.provider
524 .read()
525 .ok()
526 .and_then(|guard| guard.as_ref().map(|handle| handle.kind.clone()))
527 }
528
529 pub fn record_result_cache_hit(&self) {
530 self.result_cache_hits.fetch_add(1, Ordering::Relaxed);
531 }
532
533 pub fn record_result_cache_miss(&self) {
534 self.result_cache_misses.fetch_add(1, Ordering::Relaxed);
535 }
536
537 pub fn record_metadata_cache_hit(&self) {
538 self.metadata_cache_hits.fetch_add(1, Ordering::Relaxed);
539 }
540
541 pub fn record_metadata_cache_miss(&self) {
542 self.metadata_cache_misses.fetch_add(1, Ordering::Relaxed);
543 }
544
545 pub fn record_local_cache_hit(&self) {
546 self.local_cache_hits.fetch_add(1, Ordering::Relaxed);
547 }
548
549 pub fn record_local_cache_miss(&self) {
550 self.local_cache_misses.fetch_add(1, Ordering::Relaxed);
551 }
552
553 pub fn execute(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
554 let handle = self.current_handle()?;
555 self.execute_with_handle(&handle, req)
556 }
557
558 fn execute_with_handle(
559 &self,
560 handle: &Arc<ProviderHandle>,
561 req: &QueryRequest,
562 ) -> KnowledgeResult<QueryResponse> {
563 let use_global_cache =
564 matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
565 if use_global_cache && let Some(hit) = self.fetch_result_cache(handle, req) {
566 self.record_result_cache_hit();
567 if telemetry_enabled() {
568 telemetry().on_cache(&CacheTelemetryEvent {
569 layer: CacheLayer::Result,
570 outcome: CacheOutcome::Hit,
571 provider_kind: Some(handle.kind.clone()),
572 });
573 }
574 debug_kdb!(
575 "[kdb] global result cache hit kind={:?} generation={}",
576 handle.kind,
577 handle.generation.0
578 );
579 return Ok(hit);
580 }
581 if use_global_cache {
582 self.record_result_cache_miss();
583 if telemetry_enabled() {
584 telemetry().on_cache(&CacheTelemetryEvent {
585 layer: CacheLayer::Result,
586 outcome: CacheOutcome::Miss,
587 provider_kind: Some(handle.kind.clone()),
588 });
589 }
590 debug_kdb!(
591 "[kdb] global result cache miss kind={:?} generation={}",
592 handle.kind,
593 handle.generation.0
594 );
595 }
596
597 let params = params_to_fields(&req.params);
598 let mode_tag = query_mode_tag(&req.mode);
599 let started = Instant::now();
600 debug_kdb!(
601 "[kdb] execute query kind={:?} generation={} mode={:?} cache_policy={:?}",
602 handle.kind,
603 handle.generation.0,
604 req.mode,
605 req.cache_policy
606 );
607 let response = match match req.mode {
608 QueryMode::Many => {
609 if params.is_empty() {
610 handle.provider.query(&req.sql).map(QueryResponse::Rows)
611 } else {
612 handle
613 .provider
614 .query_fields(&req.sql, ¶ms)
615 .map(QueryResponse::Rows)
616 }
617 }
618 QueryMode::FirstRow => {
619 if params.is_empty() {
620 handle.provider.query_row(&req.sql).map(QueryResponse::Row)
621 } else {
622 handle
623 .provider
624 .query_named_fields(&req.sql, ¶ms)
625 .map(QueryResponse::Row)
626 }
627 }
628 } {
629 Ok(response) => {
630 if telemetry_enabled() {
631 telemetry().on_query(&QueryTelemetryEvent {
632 provider_kind: handle.kind.clone(),
633 mode: mode_tag,
634 success: true,
635 elapsed: started.elapsed(),
636 });
637 }
638 response
639 }
640 Err(err) => {
641 if telemetry_enabled() {
642 telemetry().on_query(&QueryTelemetryEvent {
643 provider_kind: handle.kind.clone(),
644 mode: mode_tag,
645 success: false,
646 elapsed: started.elapsed(),
647 });
648 }
649 return Err(err);
650 }
651 };
652
653 if use_global_cache {
654 self.save_result_cache(handle, req, response.clone());
655 debug_kdb!(
656 "[kdb] global result cache store kind={:?} generation={}",
657 handle.kind,
658 handle.generation.0
659 );
660 }
661
662 Ok(response)
663 }
664
665 pub fn execute_first_row_fields(
666 &self,
667 sql: &str,
668 params: &[DataField],
669 cache_policy: CachePolicy,
670 ) -> KnowledgeResult<RowData> {
671 let handle = self.current_handle()?;
672 self.execute_first_row_fields_with_handle(&handle, sql, params, cache_policy)
673 }
674
675 fn execute_first_row_fields_with_handle(
676 &self,
677 handle: &Arc<ProviderHandle>,
678 sql: &str,
679 params: &[DataField],
680 cache_policy: CachePolicy,
681 ) -> KnowledgeResult<RowData> {
682 let use_global_cache =
683 matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
684 if use_global_cache
685 && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
686 handle,
687 sql,
688 params,
689 QueryModeTag::FirstRow,
690 ))
691 {
692 self.record_result_cache_hit();
693 if telemetry_enabled() {
694 telemetry().on_cache(&CacheTelemetryEvent {
695 layer: CacheLayer::Result,
696 outcome: CacheOutcome::Hit,
697 provider_kind: Some(handle.kind.clone()),
698 });
699 }
700 return Ok(hit.into_row());
701 }
702 if use_global_cache {
703 self.record_result_cache_miss();
704 if telemetry_enabled() {
705 telemetry().on_cache(&CacheTelemetryEvent {
706 layer: CacheLayer::Result,
707 outcome: CacheOutcome::Miss,
708 provider_kind: Some(handle.kind.clone()),
709 });
710 }
711 }
712
713 let started = Instant::now();
714 let row = if params.is_empty() {
715 handle.provider.query_row(sql)
716 } else {
717 handle.provider.query_named_fields(sql, params)
718 };
719 let row = match row {
720 Ok(row) => {
721 if telemetry_enabled() {
722 telemetry().on_query(&QueryTelemetryEvent {
723 provider_kind: handle.kind.clone(),
724 mode: QueryModeTag::FirstRow,
725 success: true,
726 elapsed: started.elapsed(),
727 });
728 }
729 row
730 }
731 Err(err) => {
732 if telemetry_enabled() {
733 telemetry().on_query(&QueryTelemetryEvent {
734 provider_kind: handle.kind.clone(),
735 mode: QueryModeTag::FirstRow,
736 success: false,
737 elapsed: started.elapsed(),
738 });
739 }
740 return Err(err);
741 }
742 };
743
744 if use_global_cache {
745 self.save_result_cache_by_key(
746 result_cache_key_fields(handle, sql, params, QueryModeTag::FirstRow),
747 QueryResponse::Row(row.clone()),
748 );
749 }
750
751 Ok(row)
752 }
753
754 pub async fn execute_async(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
755 let handle = self.current_handle()?;
756 if matches!(handle.kind, ProviderKind::SqliteAuthority) {
757 let handle = handle.clone();
758 let req = req.clone();
759 return task::spawn_blocking(move || runtime().execute_with_handle(&handle, &req))
760 .await
761 .map_err(|err| {
762 KnowReason::from_logic()
763 .to_err()
764 .with_detail(format!("knowledge async sqlite query join failed: {err}"))
765 })?;
766 }
767 let use_global_cache =
768 matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
769 if use_global_cache && let Some(hit) = self.fetch_result_cache(&handle, req) {
770 self.record_result_cache_hit();
771 if telemetry_enabled() {
772 telemetry().on_cache(&CacheTelemetryEvent {
773 layer: CacheLayer::Result,
774 outcome: CacheOutcome::Hit,
775 provider_kind: Some(handle.kind.clone()),
776 });
777 }
778 return Ok(hit);
779 }
780 if use_global_cache {
781 self.record_result_cache_miss();
782 if telemetry_enabled() {
783 telemetry().on_cache(&CacheTelemetryEvent {
784 layer: CacheLayer::Result,
785 outcome: CacheOutcome::Miss,
786 provider_kind: Some(handle.kind.clone()),
787 });
788 }
789 }
790
791 let params = params_to_fields(&req.params);
792 let mode_tag = query_mode_tag(&req.mode);
793 let started = Instant::now();
794 let response = match req.mode {
795 QueryMode::Many => {
796 if params.is_empty() {
797 handle
798 .provider
799 .query_async(&req.sql)
800 .await
801 .map(QueryResponse::Rows)
802 } else {
803 handle
804 .provider
805 .query_fields_async(&req.sql, ¶ms)
806 .await
807 .map(QueryResponse::Rows)
808 }
809 }
810 QueryMode::FirstRow => {
811 if params.is_empty() {
812 handle
813 .provider
814 .query_row_async(&req.sql)
815 .await
816 .map(QueryResponse::Row)
817 } else {
818 handle
819 .provider
820 .query_named_fields_async(&req.sql, ¶ms)
821 .await
822 .map(QueryResponse::Row)
823 }
824 }
825 };
826 let response = match response {
827 Ok(response) => {
828 if telemetry_enabled() {
829 telemetry().on_query(&QueryTelemetryEvent {
830 provider_kind: handle.kind.clone(),
831 mode: mode_tag,
832 success: true,
833 elapsed: started.elapsed(),
834 });
835 }
836 response
837 }
838 Err(err) => {
839 if telemetry_enabled() {
840 telemetry().on_query(&QueryTelemetryEvent {
841 provider_kind: handle.kind.clone(),
842 mode: mode_tag,
843 success: false,
844 elapsed: started.elapsed(),
845 });
846 }
847 return Err(err);
848 }
849 };
850
851 if use_global_cache {
852 self.save_result_cache(&handle, req, response.clone());
853 }
854
855 Ok(response)
856 }
857
858 pub async fn execute_first_row_fields_async(
859 &self,
860 sql: &str,
861 params: &[DataField],
862 cache_policy: CachePolicy,
863 ) -> KnowledgeResult<RowData> {
864 let handle = self.current_handle()?;
865 if matches!(handle.kind, ProviderKind::SqliteAuthority) {
866 let handle = handle.clone();
867 let sql = sql.to_string();
868 let params = params.to_vec();
869 return task::spawn_blocking(move || {
870 runtime().execute_first_row_fields_with_handle(&handle, &sql, ¶ms, cache_policy)
871 })
872 .await
873 .map_err(|err| {
874 KnowReason::from_logic().to_err().with_detail(format!(
875 "knowledge async sqlite first-row query join failed: {err}"
876 ))
877 })?;
878 }
879 let use_global_cache =
880 matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
881 if use_global_cache
882 && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
883 &handle,
884 sql,
885 params,
886 QueryModeTag::FirstRow,
887 ))
888 {
889 self.record_result_cache_hit();
890 if telemetry_enabled() {
891 telemetry().on_cache(&CacheTelemetryEvent {
892 layer: CacheLayer::Result,
893 outcome: CacheOutcome::Hit,
894 provider_kind: Some(handle.kind.clone()),
895 });
896 }
897 return Ok(hit.into_row());
898 }
899 if use_global_cache {
900 self.record_result_cache_miss();
901 if telemetry_enabled() {
902 telemetry().on_cache(&CacheTelemetryEvent {
903 layer: CacheLayer::Result,
904 outcome: CacheOutcome::Miss,
905 provider_kind: Some(handle.kind.clone()),
906 });
907 }
908 }
909
910 let started = Instant::now();
911 let row = if params.is_empty() {
912 handle.provider.query_row_async(sql).await
913 } else {
914 handle.provider.query_named_fields_async(sql, params).await
915 };
916 let row = match row {
917 Ok(row) => {
918 if telemetry_enabled() {
919 telemetry().on_query(&QueryTelemetryEvent {
920 provider_kind: handle.kind.clone(),
921 mode: QueryModeTag::FirstRow,
922 success: true,
923 elapsed: started.elapsed(),
924 });
925 }
926 row
927 }
928 Err(err) => {
929 if telemetry_enabled() {
930 telemetry().on_query(&QueryTelemetryEvent {
931 provider_kind: handle.kind.clone(),
932 mode: QueryModeTag::FirstRow,
933 success: false,
934 elapsed: started.elapsed(),
935 });
936 }
937 return Err(err);
938 }
939 };
940
941 if use_global_cache {
942 self.save_result_cache_by_key(
943 result_cache_key_fields(&handle, sql, params, QueryModeTag::FirstRow),
944 QueryResponse::Row(row.clone()),
945 );
946 }
947
948 Ok(row)
949 }
950
951 fn current_handle(&self) -> KnowledgeResult<Arc<ProviderHandle>> {
952 self.provider
953 .read()
954 .expect("runtime provider lock poisoned")
955 .clone()
956 .ok_or_else(|| {
957 KnowReason::from_logic()
958 .to_err()
959 .with_detail("knowledge provider not initialized")
960 })
961 }
962
963 fn current_generation_from_provider(&self) -> Option<Generation> {
964 self.provider
965 .read()
966 .ok()
967 .and_then(|guard| guard.as_ref().map(|handle| handle.generation))
968 }
969
970 fn fetch_result_cache(
971 &self,
972 handle: &ProviderHandle,
973 req: &QueryRequest,
974 ) -> Option<QueryResponse> {
975 self.fetch_result_cache_by_key(result_cache_key(handle, req))
976 }
977
978 fn fetch_result_cache_by_key(&self, key: ResultCacheKey) -> Option<QueryResponse> {
979 if !self.result_cache_enabled() {
980 return None;
981 }
982 let cached = self
983 .result_cache
984 .read()
985 .ok()
986 .and_then(|cache| cache.peek(&key).cloned())?;
987 if cached.cached_at.elapsed() > self.result_cache_ttl() {
988 if let Ok(mut cache) = self.result_cache.write() {
989 let _ = cache.pop(&key);
990 }
991 return None;
992 }
993 Some((*cached.response).clone())
994 }
995
996 fn save_result_cache(
997 &self,
998 handle: &ProviderHandle,
999 req: &QueryRequest,
1000 response: QueryResponse,
1001 ) {
1002 self.save_result_cache_by_key(result_cache_key(handle, req), response);
1003 }
1004
1005 fn save_result_cache_by_key(&self, key: ResultCacheKey, response: QueryResponse) {
1006 if let Ok(mut cache) = self.result_cache.write() {
1007 cache.put(
1008 key,
1009 CachedQueryResponse {
1010 response: Arc::new(response),
1011 cached_at: Instant::now(),
1012 },
1013 );
1014 }
1015 }
1016
1017 fn redis_cache_enabled(&self) -> bool {
1022 self.result_cache_enabled.load(Ordering::Acquire)
1023 }
1024
1025 #[allow(dead_code)]
1026 fn redis_cache_ttl(&self) -> Duration {
1027 Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Acquire))
1028 }
1029
1030 fn fetch_redis_cache(&self, key: &RedisCacheKey) -> Option<CachedRedisValue> {
1031 if !self.redis_cache_enabled() {
1032 return None;
1033 }
1034 let cached = self
1035 .redis_cache
1036 .read()
1037 .ok()
1038 .and_then(|cache| cache.peek(key).cloned())?;
1039 Some(cached)
1040 }
1041
1042 fn save_redis_cache(&self, key: RedisCacheKey, value: CachedRedisValue) {
1043 if !self.redis_cache_enabled() {
1044 return;
1045 }
1046 if let Ok(mut cache) = self.redis_cache.write() {
1047 cache.put(key, value);
1048 }
1049 }
1050
1051 pub(crate) fn redis_cache_get(
1052 &self,
1053 ck: &RedisCacheKey,
1054 redis_key: &str,
1055 ) -> Option<CachedRedisValue> {
1056 let enabled = self
1058 .redis_enabled_map
1059 .read()
1060 .ok()
1061 .and_then(|map| map.get(redis_key).copied())
1062 .unwrap_or(self.redis_global_enabled.load(Ordering::Relaxed));
1063 if !enabled {
1064 return None;
1065 }
1066 let value = self.fetch_redis_cache(ck)?;
1067 self.redis_cache_hits.fetch_add(1, Ordering::Relaxed);
1068 Some(value)
1069 }
1070
1071 pub(crate) fn redis_cache_put(
1072 &self,
1073 ck: RedisCacheKey,
1074 redis_key: &str,
1075 value: CachedRedisValue,
1076 ) {
1077 let enabled = self
1079 .redis_enabled_map
1080 .read()
1081 .ok()
1082 .and_then(|map| map.get(redis_key).copied())
1083 .unwrap_or(self.redis_global_enabled.load(Ordering::Relaxed));
1084 if !enabled {
1085 return;
1086 }
1087 self.redis_cache_misses.fetch_add(1, Ordering::Relaxed);
1088 self.save_redis_cache(ck, value);
1089 }
1090
1091 #[allow(dead_code)]
1092 fn clear_redis_cache(&self) {
1093 if let Ok(mut cache) = self.redis_cache.write() {
1094 cache.clear();
1095 }
1096 }
1097
1098 #[inline]
1099 fn result_cache_enabled(&self) -> bool {
1100 self.result_cache_enabled.load(Ordering::Relaxed)
1101 }
1102
1103 #[inline]
1104 fn result_cache_ttl(&self) -> Duration {
1105 Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Relaxed))
1106 }
1107}
1108
1109pub fn runtime() -> &'static KnowledgeRuntime {
1110 static RUNTIME: OnceLock<KnowledgeRuntime> = OnceLock::new();
1111 RUNTIME.get_or_init(|| KnowledgeRuntime::new(1024))
1112}
1113
1114#[cfg(test)]
1115pub(crate) struct RuntimeTestGuard(tokio::sync::Mutex<()>);
1116
1117#[cfg(test)]
1118impl RuntimeTestGuard {
1119 pub(crate) fn lock(&self) -> Result<tokio::sync::MutexGuard<'_, ()>, std::convert::Infallible> {
1120 Ok(self.0.blocking_lock())
1121 }
1122
1123 pub(crate) async fn lock_async(&self) -> tokio::sync::MutexGuard<'_, ()> {
1124 self.0.lock().await
1125 }
1126}
1127
1128#[cfg(test)]
1129pub(crate) fn runtime_test_guard() -> &'static RuntimeTestGuard {
1130 static GUARD: OnceLock<RuntimeTestGuard> = OnceLock::new();
1131 GUARD.get_or_init(|| RuntimeTestGuard(tokio::sync::Mutex::new(())))
1132}
1133
1134fn result_cache_key(handle: &ProviderHandle, req: &QueryRequest) -> ResultCacheKey {
1135 ResultCacheKey {
1136 datasource_id: handle.datasource_id.clone(),
1137 generation: handle.generation,
1138 query_hash: stable_hash(&req.sql),
1139 params_hash: stable_params_hash(&req.params),
1140 mode: match req.mode {
1141 QueryMode::Many => QueryModeTag::Many,
1142 QueryMode::FirstRow => QueryModeTag::FirstRow,
1143 },
1144 }
1145}
1146
1147fn result_cache_key_fields(
1148 handle: &ProviderHandle,
1149 sql: &str,
1150 params: &[DataField],
1151 mode: QueryModeTag,
1152) -> ResultCacheKey {
1153 ResultCacheKey {
1154 datasource_id: handle.datasource_id.clone(),
1155 generation: handle.generation,
1156 query_hash: stable_hash(sql),
1157 params_hash: stable_field_params_hash(params),
1158 mode,
1159 }
1160}
1161
1162fn query_mode_tag(mode: &QueryMode) -> QueryModeTag {
1163 match mode {
1164 QueryMode::Many => QueryModeTag::Many,
1165 QueryMode::FirstRow => QueryModeTag::FirstRow,
1166 }
1167}
1168
1169fn stable_hash(value: &str) -> u64 {
1170 let mut hasher = DefaultHasher::new();
1171 value.hash(&mut hasher);
1172 hasher.finish()
1173}
1174
1175fn stable_params_hash(params: &[QueryParam]) -> u64 {
1176 let mut hasher = DefaultHasher::new();
1177 for param in params {
1178 param.name.hash(&mut hasher);
1179 match ¶m.value {
1180 QueryValue::Null => 0u8.hash(&mut hasher),
1181 QueryValue::Bool(value) => {
1182 1u8.hash(&mut hasher);
1183 value.hash(&mut hasher);
1184 }
1185 QueryValue::Int(value) => {
1186 2u8.hash(&mut hasher);
1187 value.hash(&mut hasher);
1188 }
1189 QueryValue::Float(value) => {
1190 3u8.hash(&mut hasher);
1191 value.to_bits().hash(&mut hasher);
1192 }
1193 QueryValue::Text(value) => {
1194 4u8.hash(&mut hasher);
1195 value.hash(&mut hasher);
1196 }
1197 }
1198 }
1199 hasher.finish()
1200}
1201
1202fn stable_field_params_hash(params: &[DataField]) -> u64 {
1203 let mut hasher = DefaultHasher::new();
1204 for field in params {
1205 field.get_name().hash(&mut hasher);
1206 match field.get_value() {
1207 Value::Null | Value::Ignore(_) => 0u8.hash(&mut hasher),
1208 Value::Bool(value) => {
1209 1u8.hash(&mut hasher);
1210 value.hash(&mut hasher);
1211 }
1212 Value::Digit(value) => {
1213 2u8.hash(&mut hasher);
1214 value.hash(&mut hasher);
1215 }
1216 Value::Float(value) => {
1217 3u8.hash(&mut hasher);
1218 value.to_bits().hash(&mut hasher);
1219 }
1220 Value::Chars(value) => {
1221 4u8.hash(&mut hasher);
1222 value.hash(&mut hasher);
1223 }
1224 Value::Symbol(value) => {
1225 5u8.hash(&mut hasher);
1226 value.hash(&mut hasher);
1227 }
1228 Value::Time(value) => {
1229 6u8.hash(&mut hasher);
1230 value.hash(&mut hasher);
1231 }
1232 Value::Hex(value) => {
1233 7u8.hash(&mut hasher);
1234 value.to_string().hash(&mut hasher);
1235 }
1236 Value::IpNet(value) => {
1237 8u8.hash(&mut hasher);
1238 value.to_string().hash(&mut hasher);
1239 }
1240 Value::IpAddr(value) => {
1241 9u8.hash(&mut hasher);
1242 value.hash(&mut hasher);
1243 }
1244 Value::Obj(value) => {
1245 10u8.hash(&mut hasher);
1246 format!("{:?}", value).hash(&mut hasher);
1247 }
1248 Value::Array(value) => {
1249 11u8.hash(&mut hasher);
1250 format!("{:?}", value).hash(&mut hasher);
1251 }
1252 Value::Domain(value) => {
1253 12u8.hash(&mut hasher);
1254 value.0.hash(&mut hasher);
1255 }
1256 Value::Url(value) => {
1257 13u8.hash(&mut hasher);
1258 value.0.hash(&mut hasher);
1259 }
1260 Value::Email(value) => {
1261 14u8.hash(&mut hasher);
1262 value.0.hash(&mut hasher);
1263 }
1264 Value::IdCard(value) => {
1265 15u8.hash(&mut hasher);
1266 value.0.hash(&mut hasher);
1267 }
1268 Value::MobilePhone(value) => {
1269 16u8.hash(&mut hasher);
1270 value.0.hash(&mut hasher);
1271 }
1272 }
1273 }
1274 hasher.finish()
1275}
1276
1277pub fn fields_to_params(params: &[DataField]) -> Vec<QueryParam> {
1278 params
1279 .iter()
1280 .map(|field| {
1281 let value = match field.get_value() {
1282 Value::Null | Value::Ignore(_) => QueryValue::Null,
1283 Value::Bool(value) => QueryValue::Bool(*value),
1284 Value::Digit(value) => QueryValue::Int(*value),
1285 Value::Float(value) => QueryValue::Float(*value),
1286 Value::Chars(value) => QueryValue::Text(value.to_string()),
1287 Value::Symbol(value) => QueryValue::Text(value.to_string()),
1288 Value::Time(value) => QueryValue::Text(value.to_string()),
1289 Value::Hex(value) => QueryValue::Text(value.to_string()),
1290 Value::IpNet(value) => QueryValue::Text(value.to_string()),
1291 Value::IpAddr(value) => QueryValue::Text(value.to_string()),
1292 Value::Obj(value) => QueryValue::Text(format!("{:?}", value)),
1293 Value::Array(value) => QueryValue::Text(format!("{:?}", value)),
1294 Value::Domain(value) => QueryValue::Text(value.0.to_string()),
1295 Value::Url(value) => QueryValue::Text(value.0.to_string()),
1296 Value::Email(value) => QueryValue::Text(value.0.to_string()),
1297 Value::IdCard(value) => QueryValue::Text(value.0.to_string()),
1298 Value::MobilePhone(value) => QueryValue::Text(value.0.to_string()),
1299 };
1300 QueryParam {
1301 name: field.get_name().to_string(),
1302 value,
1303 }
1304 })
1305 .collect()
1306}
1307
1308pub fn params_to_fields(params: &[QueryParam]) -> Vec<DataField> {
1309 params
1310 .iter()
1311 .map(|param| match ¶m.value {
1312 QueryValue::Null => {
1313 DataField::new(DataType::default(), param.name.clone(), Value::Null)
1314 }
1315 QueryValue::Bool(value) => {
1316 DataField::new(DataType::default(), param.name.clone(), Value::Bool(*value))
1317 }
1318 QueryValue::Int(value) => DataField::from_digit(param.name.clone(), *value),
1319 QueryValue::Float(value) => DataField::from_float(param.name.clone(), *value),
1320 QueryValue::Text(value) => DataField::from_chars(param.name.clone(), value.clone()),
1321 })
1322 .collect()
1323}
1324
1325#[cfg(test)]
1326mod tests {
1327 use super::*;
1328 use async_trait::async_trait;
1329 use std::sync::Arc;
1330 use wp_model_core::model::Value;
1331
1332 struct TestProvider {
1333 value: &'static str,
1334 }
1335
1336 #[async_trait]
1337 impl ProviderExecutor for TestProvider {
1338 fn query(&self, _sql: &str) -> KnowledgeResult<Vec<RowData>> {
1339 Ok(vec![vec![DataField::from_chars("value", self.value)]])
1340 }
1341
1342 fn query_fields(&self, _sql: &str, _params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
1343 self.query("")
1344 }
1345
1346 fn query_row(&self, _sql: &str) -> KnowledgeResult<RowData> {
1347 Ok(vec![DataField::from_chars("value", self.value)])
1348 }
1349
1350 fn query_named_fields(
1351 &self,
1352 _sql: &str,
1353 _params: &[DataField],
1354 ) -> KnowledgeResult<RowData> {
1355 self.query_row("")
1356 }
1357 }
1358
1359 #[test]
1360 fn query_param_hash_is_stable() {
1361 let params = vec![
1362 QueryParam {
1363 name: ":id".to_string(),
1364 value: QueryValue::Int(7),
1365 },
1366 QueryParam {
1367 name: ":name".to_string(),
1368 value: QueryValue::Text("abc".to_string()),
1369 },
1370 ];
1371 assert_eq!(stable_params_hash(¶ms), stable_params_hash(¶ms));
1372 }
1373
1374 #[test]
1375 fn fields_to_params_preserves_raw_chars_value() {
1376 let fields = [DataField::from_chars(
1377 ":name".to_string(),
1378 "令狐冲".to_string(),
1379 )];
1380 let params = fields_to_params(&fields);
1381 assert_eq!(params.len(), 1);
1382 match ¶ms[0].value {
1383 QueryValue::Text(value) => assert_eq!(value, "令狐冲"),
1384 other => panic!("unexpected param value: {other:?}"),
1385 }
1386 let roundtrip = params_to_fields(¶ms);
1387 assert!(matches!(roundtrip[0].get_value(), Value::Chars(_)));
1388 }
1389
1390 #[tokio::test(flavor = "current_thread")]
1391 async fn sqlite_async_bridge_keeps_captured_handle_after_reload() {
1392 let _guard = runtime_test_guard().lock_async().await;
1393 runtime()
1394 .install_provider(
1395 ProviderKind::SqliteAuthority,
1396 DatasourceId("sqlite:old".to_string()),
1397 |_generation| Ok(Arc::new(TestProvider { value: "old" })),
1398 )
1399 .expect("install old provider");
1400 let old_handle = runtime().current_handle().expect("current old handle");
1401
1402 runtime()
1403 .install_provider(
1404 ProviderKind::SqliteAuthority,
1405 DatasourceId("sqlite:new".to_string()),
1406 |_generation| Ok(Arc::new(TestProvider { value: "new" })),
1407 )
1408 .expect("install new provider");
1409
1410 let req = QueryRequest::first_row("SELECT value", Vec::new(), CachePolicy::Bypass);
1411 let row = task::spawn_blocking(move || runtime().execute_with_handle(&old_handle, &req))
1412 .await
1413 .expect("join sqlite bridge")
1414 .expect("execute old handle")
1415 .into_row();
1416 assert_eq!(row[0].to_string(), "chars(old)");
1417 }
1418
1419 fn redis_ck(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
1424 let mut hasher = DefaultHasher::new();
1425 key.hash(&mut hasher);
1426 let key_hash = hasher.finish();
1427 let mut hasher = DefaultHasher::new();
1428 for arg in args {
1429 arg.hash(&mut hasher);
1430 }
1431 let args_hash = hasher.finish();
1432 RedisCacheKey {
1433 generation,
1434 cmd_tag: cmd,
1435 key_hash,
1436 args_hash,
1437 }
1438 }
1439
1440 #[test]
1441 fn redis_cache_hit_and_miss() {
1442 let rt = KnowledgeRuntime::new(64);
1443 rt.configure_redis_cache(true, 64, HashMap::new());
1444
1445 let ck = redis_ck(RedisCmdTag::Get, 1, "user:1", &[]);
1446 assert!(rt.redis_cache_get(&ck, "user:1").is_none());
1448 rt.redis_cache_put(ck.clone(), "user:1", CachedRedisValue::Bool(true));
1450 let val = rt.redis_cache_get(&ck, "user:1").expect("should hit cache");
1452 assert!(matches!(val, CachedRedisValue::Bool(true)));
1453 }
1454
1455 #[test]
1456 fn redis_cache_disabled_key_is_not_read() {
1457 let rt = KnowledgeRuntime::new(64);
1458 let mut key_map = HashMap::new();
1459 key_map.insert("volatile".to_string(), false);
1460 rt.configure_redis_cache(true, 64, key_map);
1461
1462 let ck = redis_ck(RedisCmdTag::Get, 1, "volatile", &[]);
1463 rt.redis_cache_put(ck.clone(), "volatile", CachedRedisValue::Bool(true));
1465 assert!(rt.redis_cache_get(&ck, "volatile").is_none());
1467 }
1468
1469 #[test]
1470 fn redis_cache_disabled_key_is_not_stored() {
1471 let rt = KnowledgeRuntime::new(64);
1472 let mut key_map = HashMap::new();
1473 key_map.insert("volatile".to_string(), false);
1474 rt.configure_redis_cache(true, 64, key_map);
1475
1476 let ck = redis_ck(RedisCmdTag::Get, 1, "volatile", &[]);
1477 rt.redis_cache_put(ck.clone(), "volatile", CachedRedisValue::Bool(true));
1479 let mut key_map = HashMap::new();
1481 key_map.insert("volatile".to_string(), true);
1482 rt.configure_redis_cache(true, 64, key_map);
1483 assert!(rt.redis_cache_get(&ck, "volatile").is_none());
1484 }
1485
1486 #[test]
1487 fn redis_cache_per_key_override_works_independently() {
1488 let rt = KnowledgeRuntime::new(64);
1489 let mut key_map = HashMap::new();
1490 key_map.insert("disabled_key".to_string(), false);
1491 rt.configure_redis_cache(true, 64, key_map);
1492
1493 let ck_disabled = redis_ck(RedisCmdTag::HGet, 1, "disabled_key", &["f"]);
1494 let ck_enabled = redis_ck(RedisCmdTag::HGet, 1, "enabled_key", &["f"]);
1495
1496 rt.redis_cache_put(
1498 ck_disabled.clone(),
1499 "disabled_key",
1500 CachedRedisValue::OptString(Some("x".to_string())),
1501 );
1502 rt.redis_cache_put(
1503 ck_enabled.clone(),
1504 "enabled_key",
1505 CachedRedisValue::OptString(Some("y".to_string())),
1506 );
1507
1508 assert!(rt.redis_cache_get(&ck_disabled, "disabled_key").is_none());
1510 let val = rt
1512 .redis_cache_get(&ck_enabled, "enabled_key")
1513 .expect("should hit");
1514 assert!(matches!(val, CachedRedisValue::OptString(Some(ref s)) if s == "y"));
1515 }
1516
1517 #[test]
1518 fn redis_cache_global_disabled_blocks_all() {
1519 let rt = KnowledgeRuntime::new(64);
1520 rt.configure_redis_cache(false, 64, HashMap::new());
1521
1522 let ck = redis_ck(RedisCmdTag::BfExists, 1, "any_key", &["item"]);
1523 rt.redis_cache_put(ck.clone(), "any_key", CachedRedisValue::Bool(true));
1524 assert!(rt.redis_cache_get(&ck, "any_key").is_none());
1526 }
1527
1528 #[test]
1529 fn redis_cache_generation_isolation() {
1530 let rt = KnowledgeRuntime::new(64);
1531 rt.configure_redis_cache(true, 64, HashMap::new());
1532
1533 let ck_gen1 = redis_ck(RedisCmdTag::BfExists, 1, "key", &["item"]);
1534 let ck_gen2 = redis_ck(RedisCmdTag::BfExists, 2, "key", &["item"]);
1535
1536 rt.redis_cache_put(ck_gen1.clone(), "key", CachedRedisValue::Bool(false));
1538
1539 assert!(rt.redis_cache_get(&ck_gen2, "key").is_none());
1541 assert!(rt.redis_cache_get(&ck_gen1, "key").is_some());
1543 }
1544}