1use std::collections::hash_map::DefaultHasher;
2use std::hash::{Hash, Hasher};
3use std::num::NonZeroUsize;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, OnceLock, RwLock};
6use std::time::{Duration, Instant};
7
8use crate::error::{KnowReason, KnowledgeResult};
9use async_trait::async_trait;
10use lru::LruCache;
11use orion_error::conversion::ToStructError;
12use tokio::task;
13use wp_log::{debug_kdb, warn_kdb};
14use wp_model_core::model::{DataField, DataType, Value};
15
16use crate::loader::ProviderKind;
17use crate::mem::RowData;
18use crate::telemetry::{
19 CacheLayer, CacheOutcome, CacheTelemetryEvent, QueryTelemetryEvent, ReloadOutcome,
20 ReloadTelemetryEvent, telemetry, telemetry_enabled,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct DatasourceId(pub String);
25
26impl DatasourceId {
27 pub fn from_seed(kind: ProviderKind, seed: &str) -> Self {
28 let mut hasher = DefaultHasher::new();
29 seed.hash(&mut hasher);
30 let kind_str = match kind {
31 ProviderKind::SqliteAuthority => "sqlite",
32 ProviderKind::Postgres => "postgres",
33 ProviderKind::Mysql => "mysql",
34 ProviderKind::Redis => "redis",
35 };
36 Self(format!("{kind_str}:{:016x}", hasher.finish()))
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41pub struct Generation(pub u64);
42
43#[derive(Debug, Clone)]
44pub enum QueryMode {
45 Many,
46 FirstRow,
47}
48
49#[derive(Debug, Clone, Copy)]
50pub enum CachePolicy {
51 Bypass,
52 UseGlobal,
53 UseCallScope,
54}
55
56#[derive(Debug, Clone)]
57pub enum QueryValue {
58 Null,
59 Bool(bool),
60 Int(i64),
61 Float(f64),
62 Text(String),
63}
64
65#[derive(Debug, Clone)]
66pub struct QueryParam {
67 pub name: String,
68 pub value: QueryValue,
69}
70
71#[derive(Debug, Clone)]
72pub struct QueryRequest {
73 pub sql: String,
74 pub params: Vec<QueryParam>,
75 pub mode: QueryMode,
76 pub cache_policy: CachePolicy,
77}
78
79impl QueryRequest {
80 pub fn many(
81 sql: impl Into<String>,
82 params: Vec<QueryParam>,
83 cache_policy: CachePolicy,
84 ) -> Self {
85 Self {
86 sql: sql.into(),
87 params,
88 mode: QueryMode::Many,
89 cache_policy,
90 }
91 }
92
93 pub fn first_row(
94 sql: impl Into<String>,
95 params: Vec<QueryParam>,
96 cache_policy: CachePolicy,
97 ) -> Self {
98 Self {
99 sql: sql.into(),
100 params,
101 mode: QueryMode::FirstRow,
102 cache_policy,
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
108pub enum QueryResponse {
109 Rows(Vec<RowData>),
110 Row(RowData),
111}
112
113impl QueryResponse {
114 pub fn into_rows(self) -> Vec<RowData> {
115 match self {
116 QueryResponse::Rows(rows) => rows,
117 QueryResponse::Row(row) => vec![row],
118 }
119 }
120
121 pub fn into_row(self) -> RowData {
122 match self {
123 QueryResponse::Rows(rows) => rows.into_iter().next().unwrap_or_default(),
124 QueryResponse::Row(row) => row,
125 }
126 }
127}
128
129#[async_trait]
130pub trait ProviderExecutor: Send + Sync {
131 fn query(&self, sql: &str) -> KnowledgeResult<Vec<RowData>>;
132 fn query_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<Vec<RowData>>;
133 fn query_row(&self, sql: &str) -> KnowledgeResult<RowData>;
134 fn query_named_fields(&self, sql: &str, params: &[DataField]) -> KnowledgeResult<RowData>;
135
136 async fn query_async(&self, sql: &str) -> KnowledgeResult<Vec<RowData>> {
137 self.query(sql)
138 }
139
140 async fn query_fields_async(
141 &self,
142 sql: &str,
143 params: &[DataField],
144 ) -> KnowledgeResult<Vec<RowData>> {
145 self.query_fields(sql, params)
146 }
147
148 async fn query_row_async(&self, sql: &str) -> KnowledgeResult<RowData> {
149 self.query_row(sql)
150 }
151
152 async fn query_named_fields_async(
153 &self,
154 sql: &str,
155 params: &[DataField],
156 ) -> KnowledgeResult<RowData> {
157 self.query_named_fields(sql, params)
158 }
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
162pub enum QueryModeTag {
163 Many,
164 FirstRow,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq, Hash)]
168pub struct ResultCacheKey {
169 pub datasource_id: DatasourceId,
170 pub generation: Generation,
171 pub query_hash: u64,
172 pub params_hash: u64,
173 pub mode: QueryModeTag,
174}
175
176pub struct ProviderHandle {
177 pub provider: Arc<dyn ProviderExecutor>,
178 pub datasource_id: DatasourceId,
179 pub generation: Generation,
180 pub kind: ProviderKind,
181}
182
183#[derive(Debug, Clone)]
184pub struct RuntimeSnapshot {
185 pub provider_kind: Option<ProviderKind>,
186 pub datasource_id: Option<DatasourceId>,
187 pub generation: Option<Generation>,
188 pub result_cache_enabled: bool,
189 pub result_cache_len: usize,
190 pub result_cache_capacity: usize,
191 pub result_cache_ttl_ms: u64,
192 pub metadata_cache_len: usize,
193 pub metadata_cache_capacity: usize,
194 pub result_cache_hits: u64,
195 pub result_cache_misses: u64,
196 pub metadata_cache_hits: u64,
197 pub metadata_cache_misses: u64,
198 pub local_cache_hits: u64,
199 pub local_cache_misses: u64,
200 pub reload_successes: u64,
201 pub reload_failures: u64,
202}
203
204#[derive(Debug, Clone)]
205pub struct MetadataCacheScope {
206 pub datasource_id: DatasourceId,
207 pub generation: Generation,
208}
209
210#[derive(Debug, Clone, Copy)]
211pub struct ResultCacheConfig {
212 pub enabled: bool,
213 pub capacity: usize,
214 pub ttl: Duration,
215}
216
217impl Default for ResultCacheConfig {
218 fn default() -> Self {
219 Self {
220 enabled: true,
221 capacity: 1024,
222 ttl: Duration::from_millis(30_000),
223 }
224 }
225}
226
227#[derive(Debug, Clone)]
228struct CachedQueryResponse {
229 response: Arc<QueryResponse>,
230 cached_at: Instant,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
238pub(crate) enum RedisCmdTag {
239 BfExists,
240 HGet,
241 Get,
242 SetExists,
243}
244
245#[derive(Debug, Clone, PartialEq, Eq, Hash)]
246pub(crate) struct RedisCacheKey {
247 pub generation: u64,
248 pub cmd_tag: RedisCmdTag,
249 pub key_hash: u64,
250 pub args_hash: u64,
251}
252
253#[derive(Debug, Clone)]
254pub(crate) enum CachedRedisValue {
255 Bool(bool),
256 OptString(Option<String>),
257}
258
259#[derive(Debug, Clone)]
260struct CachedRedisEntry {
261 value: CachedRedisValue,
262 cached_at: Instant,
263 ttl_ms: u64, }
265
266pub struct KnowledgeRuntime {
267 provider: RwLock<Option<Arc<ProviderHandle>>>,
268 next_generation: AtomicU64,
269 provider_epoch: AtomicU64,
270 current_generation_value: AtomicU64,
271 result_cache_config: RwLock<ResultCacheConfig>,
272 result_cache_enabled: AtomicBool,
273 result_cache_ttl_ms: AtomicU64,
274 result_cache: RwLock<LruCache<ResultCacheKey, CachedQueryResponse>>,
275 result_cache_hits: AtomicU64,
276 result_cache_misses: AtomicU64,
277 metadata_cache_hits: AtomicU64,
278 metadata_cache_misses: AtomicU64,
279 local_cache_hits: AtomicU64,
280 local_cache_misses: AtomicU64,
281 reload_successes: AtomicU64,
282 reload_failures: AtomicU64,
283 redis_cache: RwLock<LruCache<RedisCacheKey, CachedRedisEntry>>,
284 redis_cache_hits: AtomicU64,
285 redis_cache_misses: AtomicU64,
286 redis_global_enabled: AtomicBool,
287}
288
289impl KnowledgeRuntime {
290 pub fn new(result_cache_capacity: usize) -> Self {
291 let config = ResultCacheConfig {
292 capacity: result_cache_capacity.max(1),
293 ..ResultCacheConfig::default()
294 };
295 let capacity = NonZeroUsize::new(config.capacity).expect("non-zero capacity");
296 Self {
297 provider: RwLock::new(None),
298 next_generation: AtomicU64::new(0),
299 provider_epoch: AtomicU64::new(0),
300 current_generation_value: AtomicU64::new(0),
301 result_cache_config: RwLock::new(config),
302 result_cache_enabled: AtomicBool::new(config.enabled),
303 result_cache_ttl_ms: AtomicU64::new(config.ttl.as_millis() as u64),
304 result_cache: RwLock::new(LruCache::new(capacity)),
305 result_cache_hits: AtomicU64::new(0),
306 result_cache_misses: AtomicU64::new(0),
307 metadata_cache_hits: AtomicU64::new(0),
308 metadata_cache_misses: AtomicU64::new(0),
309 local_cache_hits: AtomicU64::new(0),
310 local_cache_misses: AtomicU64::new(0),
311 reload_successes: AtomicU64::new(0),
312 reload_failures: AtomicU64::new(0),
313 redis_cache: RwLock::new(LruCache::new(capacity)),
314 redis_cache_hits: AtomicU64::new(0),
315 redis_cache_misses: AtomicU64::new(0),
316 redis_global_enabled: AtomicBool::new(true),
317 }
318 }
319
320 pub fn install_provider<F>(
321 &self,
322 kind: ProviderKind,
323 datasource_id: DatasourceId,
324 build: F,
325 ) -> KnowledgeResult<Generation>
326 where
327 F: FnOnce(Generation) -> KnowledgeResult<Arc<dyn ProviderExecutor>>,
328 {
329 let generation = Generation(self.next_generation.fetch_add(1, Ordering::SeqCst) + 1);
330 let previous = self
331 .provider
332 .read()
333 .ok()
334 .and_then(|guard| guard.as_ref().cloned());
335 debug_kdb!(
336 "[kdb] reload provider start kind={kind:?} datasource_id={} target_generation={} previous_generation={}",
337 datasource_id.0,
338 generation.0,
339 previous
340 .as_ref()
341 .map(|handle| handle.generation.0.to_string())
342 .unwrap_or_else(|| "none".to_string())
343 );
344 let provider = match build(generation) {
345 Ok(provider) => provider,
346 Err(err) => {
347 self.reload_failures.fetch_add(1, Ordering::Relaxed);
348 warn_kdb!(
349 "[kdb] reload provider failed kind={kind:?} datasource_id={} target_generation={} err={}",
350 datasource_id.0,
351 generation.0,
352 err
353 );
354 if telemetry_enabled() {
355 telemetry().on_reload(&ReloadTelemetryEvent {
356 outcome: ReloadOutcome::Failure,
357 provider_kind: kind.clone(),
358 });
359 }
360 return Err(err);
361 }
362 };
363 debug_kdb!(
364 "[kdb] install provider kind={kind:?} datasource_id={} generation={}",
365 datasource_id.0,
366 generation.0
367 );
368 let kind_for_handle = kind.clone();
369 let datasource_id_for_handle = datasource_id.clone();
370 let handle = Arc::new(ProviderHandle {
371 provider,
372 datasource_id: datasource_id_for_handle,
373 generation,
374 kind: kind_for_handle,
375 });
376 self.provider_epoch.fetch_add(1, Ordering::AcqRel);
377 {
378 let mut guard = self
379 .provider
380 .write()
381 .expect("runtime provider lock poisoned");
382 *guard = Some(handle);
383 }
384 self.current_generation_value
385 .store(generation.0, Ordering::Release);
386 self.provider_epoch.fetch_add(1, Ordering::Release);
387 self.reload_successes.fetch_add(1, Ordering::Relaxed);
388 if telemetry_enabled() {
389 telemetry().on_reload(&ReloadTelemetryEvent {
390 outcome: ReloadOutcome::Success,
391 provider_kind: kind.clone(),
392 });
393 }
394 debug_kdb!(
395 "[kdb] reload provider success kind={kind:?} datasource_id={} generation={}",
396 datasource_id.0,
397 generation.0
398 );
399 Ok(generation)
400 }
401
402 pub fn configure_result_cache(&self, enabled: bool, capacity: usize, ttl: Duration) {
403 let new_config = ResultCacheConfig {
404 enabled,
405 capacity: capacity.max(1),
406 ttl: ttl.max(Duration::from_millis(1)),
407 };
408 let mut should_reset_cache = false;
409 {
410 let mut guard = self
411 .result_cache_config
412 .write()
413 .expect("runtime result cache config lock poisoned");
414 if guard.capacity != new_config.capacity || (!new_config.enabled && guard.enabled) {
415 should_reset_cache = true;
416 }
417 *guard = new_config;
418 }
419 self.result_cache_enabled
420 .store(new_config.enabled, Ordering::Relaxed);
421 self.result_cache_ttl_ms.store(
422 new_config.ttl.as_millis().min(u128::from(u64::MAX)) as u64,
423 Ordering::Relaxed,
424 );
425
426 if should_reset_cache {
427 let mut cache = self
428 .result_cache
429 .write()
430 .expect("runtime result cache lock poisoned");
431 *cache = LruCache::new(
432 NonZeroUsize::new(new_config.capacity).expect("non-zero result cache capacity"),
433 );
434 }
435 }
436
437 pub fn configure_redis_cache(&self, global_enabled: bool, capacity: usize) {
438 let new_capacity =
439 NonZeroUsize::new(capacity.max(1)).expect("non-zero redis cache capacity");
440 if let Ok(mut cache) = self.redis_cache.write() {
441 *cache = LruCache::new(new_capacity);
442 }
443 self.redis_global_enabled
444 .store(global_enabled, Ordering::Relaxed);
445 }
446
447 pub fn current_generation(&self) -> Option<Generation> {
448 let epoch_before = self.provider_epoch.load(Ordering::Acquire);
449 if epoch_before % 2 == 1 {
450 return self.current_generation_from_provider();
451 }
452 let generation = self.current_generation_value.load(Ordering::Acquire);
453 let epoch_after = self.provider_epoch.load(Ordering::Acquire);
454 if epoch_before != epoch_after {
455 return self.current_generation_from_provider();
456 }
457 match generation {
458 0 => None,
459 generation => Some(Generation(generation)),
460 }
461 }
462
463 pub fn snapshot(&self) -> RuntimeSnapshot {
464 let provider = self
465 .provider
466 .read()
467 .ok()
468 .and_then(|guard| guard.as_ref().cloned());
469 let result_cache_config = self
470 .result_cache_config
471 .read()
472 .map(|guard| *guard)
473 .unwrap_or_default();
474 let (result_cache_len, result_cache_capacity) = self
475 .result_cache
476 .read()
477 .map(|cache| (cache.len(), cache.cap().get()))
478 .unwrap_or((0, 0));
479 let (metadata_cache_len, metadata_cache_capacity) =
480 crate::mem::query_util::column_metadata_cache_snapshot();
481 RuntimeSnapshot {
482 provider_kind: provider.as_ref().map(|handle| handle.kind.clone()),
483 datasource_id: provider.as_ref().map(|handle| handle.datasource_id.clone()),
484 generation: provider.as_ref().map(|handle| handle.generation),
485 result_cache_enabled: result_cache_config.enabled,
486 result_cache_len,
487 result_cache_capacity,
488 result_cache_ttl_ms: result_cache_config.ttl.as_millis() as u64,
489 metadata_cache_len,
490 metadata_cache_capacity,
491 result_cache_hits: self.result_cache_hits.load(Ordering::Relaxed),
492 result_cache_misses: self.result_cache_misses.load(Ordering::Relaxed),
493 metadata_cache_hits: self.metadata_cache_hits.load(Ordering::Relaxed),
494 metadata_cache_misses: self.metadata_cache_misses.load(Ordering::Relaxed),
495 local_cache_hits: self.local_cache_hits.load(Ordering::Relaxed),
496 local_cache_misses: self.local_cache_misses.load(Ordering::Relaxed),
497 reload_successes: self.reload_successes.load(Ordering::Relaxed),
498 reload_failures: self.reload_failures.load(Ordering::Relaxed),
499 }
500 }
501
502 pub fn current_metadata_scope(&self) -> MetadataCacheScope {
503 self.provider
504 .read()
505 .ok()
506 .and_then(|guard| guard.as_ref().cloned())
507 .map(|handle| MetadataCacheScope {
508 datasource_id: handle.datasource_id.clone(),
509 generation: handle.generation,
510 })
511 .unwrap_or_else(|| MetadataCacheScope {
512 datasource_id: DatasourceId("sqlite:standalone".to_string()),
513 generation: Generation(0),
514 })
515 }
516
517 pub fn current_provider_kind(&self) -> Option<ProviderKind> {
518 self.provider
519 .read()
520 .ok()
521 .and_then(|guard| guard.as_ref().map(|handle| handle.kind.clone()))
522 }
523
524 pub fn record_result_cache_hit(&self) {
525 self.result_cache_hits.fetch_add(1, Ordering::Relaxed);
526 }
527
528 pub fn record_result_cache_miss(&self) {
529 self.result_cache_misses.fetch_add(1, Ordering::Relaxed);
530 }
531
532 pub fn record_metadata_cache_hit(&self) {
533 self.metadata_cache_hits.fetch_add(1, Ordering::Relaxed);
534 }
535
536 pub fn record_metadata_cache_miss(&self) {
537 self.metadata_cache_misses.fetch_add(1, Ordering::Relaxed);
538 }
539
540 pub fn record_local_cache_hit(&self) {
541 self.local_cache_hits.fetch_add(1, Ordering::Relaxed);
542 }
543
544 pub fn record_local_cache_miss(&self) {
545 self.local_cache_misses.fetch_add(1, Ordering::Relaxed);
546 }
547
548 pub fn execute(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
549 let handle = self.current_handle()?;
550 self.execute_with_handle(&handle, req)
551 }
552
553 fn execute_with_handle(
554 &self,
555 handle: &Arc<ProviderHandle>,
556 req: &QueryRequest,
557 ) -> KnowledgeResult<QueryResponse> {
558 let use_global_cache =
559 matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
560 if use_global_cache && let Some(hit) = self.fetch_result_cache(handle, req) {
561 self.record_result_cache_hit();
562 if telemetry_enabled() {
563 telemetry().on_cache(&CacheTelemetryEvent {
564 layer: CacheLayer::Result,
565 outcome: CacheOutcome::Hit,
566 provider_kind: Some(handle.kind.clone()),
567 });
568 }
569 debug_kdb!(
570 "[kdb] global result cache hit kind={:?} generation={}",
571 handle.kind,
572 handle.generation.0
573 );
574 return Ok(hit);
575 }
576 if use_global_cache {
577 self.record_result_cache_miss();
578 if telemetry_enabled() {
579 telemetry().on_cache(&CacheTelemetryEvent {
580 layer: CacheLayer::Result,
581 outcome: CacheOutcome::Miss,
582 provider_kind: Some(handle.kind.clone()),
583 });
584 }
585 debug_kdb!(
586 "[kdb] global result cache miss kind={:?} generation={}",
587 handle.kind,
588 handle.generation.0
589 );
590 }
591
592 let params = params_to_fields(&req.params);
593 let mode_tag = query_mode_tag(&req.mode);
594 let started = Instant::now();
595 debug_kdb!(
596 "[kdb] execute query kind={:?} generation={} mode={:?} cache_policy={:?}",
597 handle.kind,
598 handle.generation.0,
599 req.mode,
600 req.cache_policy
601 );
602 let response = match match req.mode {
603 QueryMode::Many => {
604 if params.is_empty() {
605 handle.provider.query(&req.sql).map(QueryResponse::Rows)
606 } else {
607 handle
608 .provider
609 .query_fields(&req.sql, ¶ms)
610 .map(QueryResponse::Rows)
611 }
612 }
613 QueryMode::FirstRow => {
614 if params.is_empty() {
615 handle.provider.query_row(&req.sql).map(QueryResponse::Row)
616 } else {
617 handle
618 .provider
619 .query_named_fields(&req.sql, ¶ms)
620 .map(QueryResponse::Row)
621 }
622 }
623 } {
624 Ok(response) => {
625 if telemetry_enabled() {
626 telemetry().on_query(&QueryTelemetryEvent {
627 provider_kind: handle.kind.clone(),
628 mode: mode_tag,
629 success: true,
630 elapsed: started.elapsed(),
631 });
632 }
633 response
634 }
635 Err(err) => {
636 if telemetry_enabled() {
637 telemetry().on_query(&QueryTelemetryEvent {
638 provider_kind: handle.kind.clone(),
639 mode: mode_tag,
640 success: false,
641 elapsed: started.elapsed(),
642 });
643 }
644 return Err(err);
645 }
646 };
647
648 if use_global_cache {
649 self.save_result_cache(handle, req, response.clone());
650 debug_kdb!(
651 "[kdb] global result cache store kind={:?} generation={}",
652 handle.kind,
653 handle.generation.0
654 );
655 }
656
657 Ok(response)
658 }
659
660 pub fn execute_first_row_fields(
661 &self,
662 sql: &str,
663 params: &[DataField],
664 cache_policy: CachePolicy,
665 ) -> KnowledgeResult<RowData> {
666 let handle = self.current_handle()?;
667 self.execute_first_row_fields_with_handle(&handle, sql, params, cache_policy)
668 }
669
670 fn execute_first_row_fields_with_handle(
671 &self,
672 handle: &Arc<ProviderHandle>,
673 sql: &str,
674 params: &[DataField],
675 cache_policy: CachePolicy,
676 ) -> KnowledgeResult<RowData> {
677 let use_global_cache =
678 matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
679 if use_global_cache
680 && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
681 handle,
682 sql,
683 params,
684 QueryModeTag::FirstRow,
685 ))
686 {
687 self.record_result_cache_hit();
688 if telemetry_enabled() {
689 telemetry().on_cache(&CacheTelemetryEvent {
690 layer: CacheLayer::Result,
691 outcome: CacheOutcome::Hit,
692 provider_kind: Some(handle.kind.clone()),
693 });
694 }
695 return Ok(hit.into_row());
696 }
697 if use_global_cache {
698 self.record_result_cache_miss();
699 if telemetry_enabled() {
700 telemetry().on_cache(&CacheTelemetryEvent {
701 layer: CacheLayer::Result,
702 outcome: CacheOutcome::Miss,
703 provider_kind: Some(handle.kind.clone()),
704 });
705 }
706 }
707
708 let started = Instant::now();
709 let row = if params.is_empty() {
710 handle.provider.query_row(sql)
711 } else {
712 handle.provider.query_named_fields(sql, params)
713 };
714 let row = match row {
715 Ok(row) => {
716 if telemetry_enabled() {
717 telemetry().on_query(&QueryTelemetryEvent {
718 provider_kind: handle.kind.clone(),
719 mode: QueryModeTag::FirstRow,
720 success: true,
721 elapsed: started.elapsed(),
722 });
723 }
724 row
725 }
726 Err(err) => {
727 if telemetry_enabled() {
728 telemetry().on_query(&QueryTelemetryEvent {
729 provider_kind: handle.kind.clone(),
730 mode: QueryModeTag::FirstRow,
731 success: false,
732 elapsed: started.elapsed(),
733 });
734 }
735 return Err(err);
736 }
737 };
738
739 if use_global_cache {
740 self.save_result_cache_by_key(
741 result_cache_key_fields(handle, sql, params, QueryModeTag::FirstRow),
742 QueryResponse::Row(row.clone()),
743 );
744 }
745
746 Ok(row)
747 }
748
749 pub async fn execute_async(&self, req: &QueryRequest) -> KnowledgeResult<QueryResponse> {
750 let handle = self.current_handle()?;
751 if matches!(handle.kind, ProviderKind::SqliteAuthority) {
752 let handle = handle.clone();
753 let req = req.clone();
754 return task::spawn_blocking(move || runtime().execute_with_handle(&handle, &req))
755 .await
756 .map_err(|err| {
757 KnowReason::from_logic()
758 .to_err()
759 .with_detail(format!("knowledge async sqlite query join failed: {err}"))
760 })?;
761 }
762 let use_global_cache =
763 matches!(req.cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
764 if use_global_cache && let Some(hit) = self.fetch_result_cache(&handle, req) {
765 self.record_result_cache_hit();
766 if telemetry_enabled() {
767 telemetry().on_cache(&CacheTelemetryEvent {
768 layer: CacheLayer::Result,
769 outcome: CacheOutcome::Hit,
770 provider_kind: Some(handle.kind.clone()),
771 });
772 }
773 return Ok(hit);
774 }
775 if use_global_cache {
776 self.record_result_cache_miss();
777 if telemetry_enabled() {
778 telemetry().on_cache(&CacheTelemetryEvent {
779 layer: CacheLayer::Result,
780 outcome: CacheOutcome::Miss,
781 provider_kind: Some(handle.kind.clone()),
782 });
783 }
784 }
785
786 let params = params_to_fields(&req.params);
787 let mode_tag = query_mode_tag(&req.mode);
788 let started = Instant::now();
789 let response = match req.mode {
790 QueryMode::Many => {
791 if params.is_empty() {
792 handle
793 .provider
794 .query_async(&req.sql)
795 .await
796 .map(QueryResponse::Rows)
797 } else {
798 handle
799 .provider
800 .query_fields_async(&req.sql, ¶ms)
801 .await
802 .map(QueryResponse::Rows)
803 }
804 }
805 QueryMode::FirstRow => {
806 if params.is_empty() {
807 handle
808 .provider
809 .query_row_async(&req.sql)
810 .await
811 .map(QueryResponse::Row)
812 } else {
813 handle
814 .provider
815 .query_named_fields_async(&req.sql, ¶ms)
816 .await
817 .map(QueryResponse::Row)
818 }
819 }
820 };
821 let response = match response {
822 Ok(response) => {
823 if telemetry_enabled() {
824 telemetry().on_query(&QueryTelemetryEvent {
825 provider_kind: handle.kind.clone(),
826 mode: mode_tag,
827 success: true,
828 elapsed: started.elapsed(),
829 });
830 }
831 response
832 }
833 Err(err) => {
834 if telemetry_enabled() {
835 telemetry().on_query(&QueryTelemetryEvent {
836 provider_kind: handle.kind.clone(),
837 mode: mode_tag,
838 success: false,
839 elapsed: started.elapsed(),
840 });
841 }
842 return Err(err);
843 }
844 };
845
846 if use_global_cache {
847 self.save_result_cache(&handle, req, response.clone());
848 }
849
850 Ok(response)
851 }
852
853 pub async fn execute_first_row_fields_async(
854 &self,
855 sql: &str,
856 params: &[DataField],
857 cache_policy: CachePolicy,
858 ) -> KnowledgeResult<RowData> {
859 let handle = self.current_handle()?;
860 if matches!(handle.kind, ProviderKind::SqliteAuthority) {
861 let handle = handle.clone();
862 let sql = sql.to_string();
863 let params = params.to_vec();
864 return task::spawn_blocking(move || {
865 runtime().execute_first_row_fields_with_handle(&handle, &sql, ¶ms, cache_policy)
866 })
867 .await
868 .map_err(|err| {
869 KnowReason::from_logic().to_err().with_detail(format!(
870 "knowledge async sqlite first-row query join failed: {err}"
871 ))
872 })?;
873 }
874 let use_global_cache =
875 matches!(cache_policy, CachePolicy::UseGlobal) && self.result_cache_enabled();
876 if use_global_cache
877 && let Some(hit) = self.fetch_result_cache_by_key(result_cache_key_fields(
878 &handle,
879 sql,
880 params,
881 QueryModeTag::FirstRow,
882 ))
883 {
884 self.record_result_cache_hit();
885 if telemetry_enabled() {
886 telemetry().on_cache(&CacheTelemetryEvent {
887 layer: CacheLayer::Result,
888 outcome: CacheOutcome::Hit,
889 provider_kind: Some(handle.kind.clone()),
890 });
891 }
892 return Ok(hit.into_row());
893 }
894 if use_global_cache {
895 self.record_result_cache_miss();
896 if telemetry_enabled() {
897 telemetry().on_cache(&CacheTelemetryEvent {
898 layer: CacheLayer::Result,
899 outcome: CacheOutcome::Miss,
900 provider_kind: Some(handle.kind.clone()),
901 });
902 }
903 }
904
905 let started = Instant::now();
906 let row = if params.is_empty() {
907 handle.provider.query_row_async(sql).await
908 } else {
909 handle.provider.query_named_fields_async(sql, params).await
910 };
911 let row = match row {
912 Ok(row) => {
913 if telemetry_enabled() {
914 telemetry().on_query(&QueryTelemetryEvent {
915 provider_kind: handle.kind.clone(),
916 mode: QueryModeTag::FirstRow,
917 success: true,
918 elapsed: started.elapsed(),
919 });
920 }
921 row
922 }
923 Err(err) => {
924 if telemetry_enabled() {
925 telemetry().on_query(&QueryTelemetryEvent {
926 provider_kind: handle.kind.clone(),
927 mode: QueryModeTag::FirstRow,
928 success: false,
929 elapsed: started.elapsed(),
930 });
931 }
932 return Err(err);
933 }
934 };
935
936 if use_global_cache {
937 self.save_result_cache_by_key(
938 result_cache_key_fields(&handle, sql, params, QueryModeTag::FirstRow),
939 QueryResponse::Row(row.clone()),
940 );
941 }
942
943 Ok(row)
944 }
945
946 fn current_handle(&self) -> KnowledgeResult<Arc<ProviderHandle>> {
947 self.provider
948 .read()
949 .expect("runtime provider lock poisoned")
950 .clone()
951 .ok_or_else(|| {
952 KnowReason::from_logic()
953 .to_err()
954 .with_detail("knowledge provider not initialized")
955 })
956 }
957
958 fn current_generation_from_provider(&self) -> Option<Generation> {
959 self.provider
960 .read()
961 .ok()
962 .and_then(|guard| guard.as_ref().map(|handle| handle.generation))
963 }
964
965 fn fetch_result_cache(
966 &self,
967 handle: &ProviderHandle,
968 req: &QueryRequest,
969 ) -> Option<QueryResponse> {
970 self.fetch_result_cache_by_key(result_cache_key(handle, req))
971 }
972
973 fn fetch_result_cache_by_key(&self, key: ResultCacheKey) -> Option<QueryResponse> {
974 if !self.result_cache_enabled() {
975 return None;
976 }
977 let cached = self
978 .result_cache
979 .read()
980 .ok()
981 .and_then(|cache| cache.peek(&key).cloned())?;
982 if cached.cached_at.elapsed() > self.result_cache_ttl() {
983 if let Ok(mut cache) = self.result_cache.write() {
984 let _ = cache.pop(&key);
985 }
986 return None;
987 }
988 Some((*cached.response).clone())
989 }
990
991 fn save_result_cache(
992 &self,
993 handle: &ProviderHandle,
994 req: &QueryRequest,
995 response: QueryResponse,
996 ) {
997 self.save_result_cache_by_key(result_cache_key(handle, req), response);
998 }
999
1000 fn save_result_cache_by_key(&self, key: ResultCacheKey, response: QueryResponse) {
1001 if let Ok(mut cache) = self.result_cache.write() {
1002 cache.put(
1003 key,
1004 CachedQueryResponse {
1005 response: Arc::new(response),
1006 cached_at: Instant::now(),
1007 },
1008 );
1009 }
1010 }
1011
1012 fn redis_cache_enabled(&self) -> bool {
1017 self.result_cache_enabled.load(Ordering::Acquire)
1018 }
1019
1020 #[allow(dead_code)]
1021 fn redis_cache_ttl(&self) -> Duration {
1022 Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Acquire))
1023 }
1024
1025 fn fetch_redis_cache(&self, key: &RedisCacheKey) -> Option<CachedRedisEntry> {
1026 if !self.redis_cache_enabled() {
1027 return None;
1028 }
1029 let entry = self
1030 .redis_cache
1031 .read()
1032 .ok()
1033 .and_then(|cache| cache.peek(key).cloned())?;
1034 if entry.ttl_ms > 0 && entry.cached_at.elapsed() > Duration::from_millis(entry.ttl_ms) {
1036 if let Ok(mut cache) = self.redis_cache.write() {
1037 let _ = cache.pop(key);
1038 }
1039 return None;
1040 }
1041 Some(entry)
1042 }
1043
1044 fn save_redis_cache(&self, key: RedisCacheKey, entry: CachedRedisEntry) {
1045 if !self.redis_cache_enabled() {
1046 return;
1047 }
1048 if let Ok(mut cache) = self.redis_cache.write() {
1049 cache.put(key, entry);
1050 }
1051 }
1052
1053 pub(crate) fn redis_cache_get(&self, ck: &RedisCacheKey) -> Option<CachedRedisValue> {
1054 if !self.redis_global_enabled.load(Ordering::Relaxed) {
1055 return None;
1056 }
1057 let entry = self.fetch_redis_cache(ck)?;
1058 self.redis_cache_hits.fetch_add(1, Ordering::Relaxed);
1059 Some(entry.value)
1060 }
1061
1062 pub(crate) fn redis_cache_put(&self, ck: RedisCacheKey, value: CachedRedisValue) {
1063 self.redis_cache_put_with_ttl(ck, value, 0);
1064 }
1065
1066 pub(crate) fn redis_cache_put_with_ttl(
1067 &self,
1068 ck: RedisCacheKey,
1069 value: CachedRedisValue,
1070 ttl_ms: u64,
1071 ) {
1072 if !self.redis_global_enabled.load(Ordering::Relaxed) {
1073 return;
1074 }
1075 self.redis_cache_misses.fetch_add(1, Ordering::Relaxed);
1076 self.save_redis_cache(
1077 ck,
1078 CachedRedisEntry {
1079 value,
1080 cached_at: Instant::now(),
1081 ttl_ms,
1082 },
1083 );
1084 }
1085
1086 #[allow(dead_code)]
1087 fn clear_redis_cache(&self) {
1088 if let Ok(mut cache) = self.redis_cache.write() {
1089 cache.clear();
1090 }
1091 }
1092
1093 #[inline]
1094 fn result_cache_enabled(&self) -> bool {
1095 self.result_cache_enabled.load(Ordering::Relaxed)
1096 }
1097
1098 #[inline]
1099 fn result_cache_ttl(&self) -> Duration {
1100 Duration::from_millis(self.result_cache_ttl_ms.load(Ordering::Relaxed))
1101 }
1102}
1103
1104pub fn runtime() -> &'static KnowledgeRuntime {
1105 static RUNTIME: OnceLock<KnowledgeRuntime> = OnceLock::new();
1106 RUNTIME.get_or_init(|| KnowledgeRuntime::new(1024))
1107}
1108
1109#[cfg(test)]
1110pub(crate) struct RuntimeTestGuard(tokio::sync::Mutex<()>);
1111
1112#[cfg(test)]
1113impl RuntimeTestGuard {
1114 pub(crate) fn lock(&self) -> Result<tokio::sync::MutexGuard<'_, ()>, std::convert::Infallible> {
1115 Ok(self.0.blocking_lock())
1116 }
1117
1118 pub(crate) async fn lock_async(&self) -> tokio::sync::MutexGuard<'_, ()> {
1119 self.0.lock().await
1120 }
1121}
1122
1123#[cfg(test)]
1124pub(crate) fn runtime_test_guard() -> &'static RuntimeTestGuard {
1125 static GUARD: OnceLock<RuntimeTestGuard> = OnceLock::new();
1126 GUARD.get_or_init(|| RuntimeTestGuard(tokio::sync::Mutex::new(())))
1127}
1128
1129fn result_cache_key(handle: &ProviderHandle, req: &QueryRequest) -> ResultCacheKey {
1130 ResultCacheKey {
1131 datasource_id: handle.datasource_id.clone(),
1132 generation: handle.generation,
1133 query_hash: stable_hash(&req.sql),
1134 params_hash: stable_params_hash(&req.params),
1135 mode: match req.mode {
1136 QueryMode::Many => QueryModeTag::Many,
1137 QueryMode::FirstRow => QueryModeTag::FirstRow,
1138 },
1139 }
1140}
1141
1142fn result_cache_key_fields(
1143 handle: &ProviderHandle,
1144 sql: &str,
1145 params: &[DataField],
1146 mode: QueryModeTag,
1147) -> ResultCacheKey {
1148 ResultCacheKey {
1149 datasource_id: handle.datasource_id.clone(),
1150 generation: handle.generation,
1151 query_hash: stable_hash(sql),
1152 params_hash: stable_field_params_hash(params),
1153 mode,
1154 }
1155}
1156
1157fn query_mode_tag(mode: &QueryMode) -> QueryModeTag {
1158 match mode {
1159 QueryMode::Many => QueryModeTag::Many,
1160 QueryMode::FirstRow => QueryModeTag::FirstRow,
1161 }
1162}
1163
1164fn stable_hash(value: &str) -> u64 {
1165 let mut hasher = DefaultHasher::new();
1166 value.hash(&mut hasher);
1167 hasher.finish()
1168}
1169
1170fn stable_params_hash(params: &[QueryParam]) -> u64 {
1171 let mut hasher = DefaultHasher::new();
1172 for param in params {
1173 param.name.hash(&mut hasher);
1174 match ¶m.value {
1175 QueryValue::Null => 0u8.hash(&mut hasher),
1176 QueryValue::Bool(value) => {
1177 1u8.hash(&mut hasher);
1178 value.hash(&mut hasher);
1179 }
1180 QueryValue::Int(value) => {
1181 2u8.hash(&mut hasher);
1182 value.hash(&mut hasher);
1183 }
1184 QueryValue::Float(value) => {
1185 3u8.hash(&mut hasher);
1186 value.to_bits().hash(&mut hasher);
1187 }
1188 QueryValue::Text(value) => {
1189 4u8.hash(&mut hasher);
1190 value.hash(&mut hasher);
1191 }
1192 }
1193 }
1194 hasher.finish()
1195}
1196
1197fn stable_field_params_hash(params: &[DataField]) -> u64 {
1198 let mut hasher = DefaultHasher::new();
1199 for field in params {
1200 field.get_name().hash(&mut hasher);
1201 match field.get_value() {
1202 Value::Null | Value::Ignore(_) => 0u8.hash(&mut hasher),
1203 Value::Bool(value) => {
1204 1u8.hash(&mut hasher);
1205 value.hash(&mut hasher);
1206 }
1207 Value::Digit(value) => {
1208 2u8.hash(&mut hasher);
1209 value.hash(&mut hasher);
1210 }
1211 Value::Float(value) => {
1212 3u8.hash(&mut hasher);
1213 value.to_bits().hash(&mut hasher);
1214 }
1215 Value::Chars(value) => {
1216 4u8.hash(&mut hasher);
1217 value.hash(&mut hasher);
1218 }
1219 Value::Symbol(value) => {
1220 5u8.hash(&mut hasher);
1221 value.hash(&mut hasher);
1222 }
1223 Value::Time(value) => {
1224 6u8.hash(&mut hasher);
1225 value.hash(&mut hasher);
1226 }
1227 Value::Hex(value) => {
1228 7u8.hash(&mut hasher);
1229 value.to_string().hash(&mut hasher);
1230 }
1231 Value::IpNet(value) => {
1232 8u8.hash(&mut hasher);
1233 value.to_string().hash(&mut hasher);
1234 }
1235 Value::IpAddr(value) => {
1236 9u8.hash(&mut hasher);
1237 value.hash(&mut hasher);
1238 }
1239 Value::Obj(value) => {
1240 10u8.hash(&mut hasher);
1241 format!("{:?}", value).hash(&mut hasher);
1242 }
1243 Value::Array(value) => {
1244 11u8.hash(&mut hasher);
1245 format!("{:?}", value).hash(&mut hasher);
1246 }
1247 Value::Domain(value) => {
1248 12u8.hash(&mut hasher);
1249 value.0.hash(&mut hasher);
1250 }
1251 Value::Url(value) => {
1252 13u8.hash(&mut hasher);
1253 value.0.hash(&mut hasher);
1254 }
1255 Value::Email(value) => {
1256 14u8.hash(&mut hasher);
1257 value.0.hash(&mut hasher);
1258 }
1259 Value::IdCard(value) => {
1260 15u8.hash(&mut hasher);
1261 value.0.hash(&mut hasher);
1262 }
1263 Value::MobilePhone(value) => {
1264 16u8.hash(&mut hasher);
1265 value.0.hash(&mut hasher);
1266 }
1267 }
1268 }
1269 hasher.finish()
1270}
1271
1272pub fn fields_to_params(params: &[DataField]) -> Vec<QueryParam> {
1273 params
1274 .iter()
1275 .map(|field| {
1276 let value = match field.get_value() {
1277 Value::Null | Value::Ignore(_) => QueryValue::Null,
1278 Value::Bool(value) => QueryValue::Bool(*value),
1279 Value::Digit(value) => QueryValue::Int(*value),
1280 Value::Float(value) => QueryValue::Float(*value),
1281 Value::Chars(value) => QueryValue::Text(value.to_string()),
1282 Value::Symbol(value) => QueryValue::Text(value.to_string()),
1283 Value::Time(value) => QueryValue::Text(value.to_string()),
1284 Value::Hex(value) => QueryValue::Text(value.to_string()),
1285 Value::IpNet(value) => QueryValue::Text(value.to_string()),
1286 Value::IpAddr(value) => QueryValue::Text(value.to_string()),
1287 Value::Obj(value) => QueryValue::Text(format!("{:?}", value)),
1288 Value::Array(value) => QueryValue::Text(format!("{:?}", value)),
1289 Value::Domain(value) => QueryValue::Text(value.0.to_string()),
1290 Value::Url(value) => QueryValue::Text(value.0.to_string()),
1291 Value::Email(value) => QueryValue::Text(value.0.to_string()),
1292 Value::IdCard(value) => QueryValue::Text(value.0.to_string()),
1293 Value::MobilePhone(value) => QueryValue::Text(value.0.to_string()),
1294 };
1295 QueryParam {
1296 name: field.get_name().to_string(),
1297 value,
1298 }
1299 })
1300 .collect()
1301}
1302
1303pub fn params_to_fields(params: &[QueryParam]) -> Vec<DataField> {
1304 params
1305 .iter()
1306 .map(|param| match ¶m.value {
1307 QueryValue::Null => {
1308 DataField::new(DataType::default(), param.name.clone(), Value::Null)
1309 }
1310 QueryValue::Bool(value) => {
1311 DataField::new(DataType::default(), param.name.clone(), Value::Bool(*value))
1312 }
1313 QueryValue::Int(value) => DataField::from_digit(param.name.clone(), *value),
1314 QueryValue::Float(value) => DataField::from_float(param.name.clone(), *value),
1315 QueryValue::Text(value) => DataField::from_chars(param.name.clone(), value.clone()),
1316 })
1317 .collect()
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322 use super::*;
1323 use async_trait::async_trait;
1324 use std::sync::Arc;
1325 use wp_model_core::model::Value;
1326
1327 struct TestProvider {
1328 value: &'static str,
1329 }
1330
1331 #[async_trait]
1332 impl ProviderExecutor for TestProvider {
1333 fn query(&self, _sql: &str) -> KnowledgeResult<Vec<RowData>> {
1334 Ok(vec![vec![DataField::from_chars("value", self.value)]])
1335 }
1336
1337 fn query_fields(&self, _sql: &str, _params: &[DataField]) -> KnowledgeResult<Vec<RowData>> {
1338 self.query("")
1339 }
1340
1341 fn query_row(&self, _sql: &str) -> KnowledgeResult<RowData> {
1342 Ok(vec![DataField::from_chars("value", self.value)])
1343 }
1344
1345 fn query_named_fields(
1346 &self,
1347 _sql: &str,
1348 _params: &[DataField],
1349 ) -> KnowledgeResult<RowData> {
1350 self.query_row("")
1351 }
1352 }
1353
1354 #[test]
1355 fn query_param_hash_is_stable() {
1356 let params = vec![
1357 QueryParam {
1358 name: ":id".to_string(),
1359 value: QueryValue::Int(7),
1360 },
1361 QueryParam {
1362 name: ":name".to_string(),
1363 value: QueryValue::Text("abc".to_string()),
1364 },
1365 ];
1366 assert_eq!(stable_params_hash(¶ms), stable_params_hash(¶ms));
1367 }
1368
1369 #[test]
1370 fn fields_to_params_preserves_raw_chars_value() {
1371 let fields = [DataField::from_chars(
1372 ":name".to_string(),
1373 "令狐冲".to_string(),
1374 )];
1375 let params = fields_to_params(&fields);
1376 assert_eq!(params.len(), 1);
1377 match ¶ms[0].value {
1378 QueryValue::Text(value) => assert_eq!(value, "令狐冲"),
1379 other => panic!("unexpected param value: {other:?}"),
1380 }
1381 let roundtrip = params_to_fields(¶ms);
1382 assert!(matches!(roundtrip[0].get_value(), Value::Chars(_)));
1383 }
1384
1385 #[tokio::test(flavor = "current_thread")]
1386 async fn sqlite_async_bridge_keeps_captured_handle_after_reload() {
1387 let _guard = runtime_test_guard().lock_async().await;
1388 runtime()
1389 .install_provider(
1390 ProviderKind::SqliteAuthority,
1391 DatasourceId("sqlite:old".to_string()),
1392 |_generation| Ok(Arc::new(TestProvider { value: "old" })),
1393 )
1394 .expect("install old provider");
1395 let old_handle = runtime().current_handle().expect("current old handle");
1396
1397 runtime()
1398 .install_provider(
1399 ProviderKind::SqliteAuthority,
1400 DatasourceId("sqlite:new".to_string()),
1401 |_generation| Ok(Arc::new(TestProvider { value: "new" })),
1402 )
1403 .expect("install new provider");
1404
1405 let req = QueryRequest::first_row("SELECT value", Vec::new(), CachePolicy::Bypass);
1406 let row = task::spawn_blocking(move || runtime().execute_with_handle(&old_handle, &req))
1407 .await
1408 .expect("join sqlite bridge")
1409 .expect("execute old handle")
1410 .into_row();
1411 assert_eq!(row[0].to_string(), "chars(old)");
1412 }
1413
1414 fn redis_ck(cmd: RedisCmdTag, generation: u64, key: &str, args: &[&str]) -> RedisCacheKey {
1419 let mut hasher = DefaultHasher::new();
1420 key.hash(&mut hasher);
1421 let key_hash = hasher.finish();
1422 let mut hasher = DefaultHasher::new();
1423 for arg in args {
1424 arg.hash(&mut hasher);
1425 }
1426 let args_hash = hasher.finish();
1427 RedisCacheKey {
1428 generation,
1429 cmd_tag: cmd,
1430 key_hash,
1431 args_hash,
1432 }
1433 }
1434
1435 #[test]
1436 fn redis_cache_hit_and_miss() {
1437 let rt = KnowledgeRuntime::new(64);
1438 rt.configure_redis_cache(true, 64);
1439
1440 let ck = redis_ck(RedisCmdTag::Get, 1, "user:1", &[]);
1441 assert!(rt.redis_cache_get(&ck).is_none());
1443 rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1445 let val = rt.redis_cache_get(&ck).expect("should hit cache");
1447 assert!(matches!(val, CachedRedisValue::Bool(true)));
1448 }
1449
1450 #[test]
1451 fn redis_cache_global_enabled_access() {
1452 let rt = KnowledgeRuntime::new(64);
1453 rt.configure_redis_cache(true, 64);
1454
1455 let ck = redis_ck(RedisCmdTag::Get, 1, "k", &[]);
1456 rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1457 assert!(rt.redis_cache_get(&ck).is_some());
1458 }
1459
1460 #[test]
1461 fn redis_cache_global_disabled_blocks_all() {
1462 let rt = KnowledgeRuntime::new(64);
1463 rt.configure_redis_cache(false, 64);
1464
1465 let ck = redis_ck(RedisCmdTag::BfExists, 1, "any_key", &["item"]);
1466 rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1467 assert!(rt.redis_cache_get(&ck).is_none());
1469 }
1470
1471 #[test]
1472 fn redis_cache_ttl_expiry() {
1473 let rt = KnowledgeRuntime::new(64);
1474 rt.configure_redis_cache(true, 64);
1475
1476 let ck = redis_ck(RedisCmdTag::BfExists, 1, "k", &["item"]);
1477 rt.redis_cache_put_with_ttl(
1479 ck.clone(),
1480 CachedRedisValue::Bool(true),
1481 1, );
1483 assert!(rt.redis_cache_get(&ck).is_some());
1485 std::thread::sleep(std::time::Duration::from_millis(5));
1487 assert!(rt.redis_cache_get(&ck).is_none());
1489 }
1490
1491 #[test]
1492 fn redis_cache_no_ttl_never_expires() {
1493 let rt = KnowledgeRuntime::new(64);
1494 rt.configure_redis_cache(true, 64);
1495
1496 let ck = redis_ck(RedisCmdTag::Get, 1, "k", &[]);
1497 rt.redis_cache_put(ck.clone(), CachedRedisValue::Bool(true));
1499 assert!(rt.redis_cache_get(&ck).is_some());
1501 std::thread::sleep(std::time::Duration::from_millis(5));
1502 assert!(rt.redis_cache_get(&ck).is_some());
1504 }
1505
1506 #[test]
1507 fn redis_cache_generation_isolation() {
1508 let rt = KnowledgeRuntime::new(64);
1509 rt.configure_redis_cache(true, 64);
1510
1511 let ck_gen1 = redis_ck(RedisCmdTag::BfExists, 1, "key", &["item"]);
1512 let ck_gen2 = redis_ck(RedisCmdTag::BfExists, 2, "key", &["item"]);
1513
1514 rt.redis_cache_put(ck_gen1.clone(), CachedRedisValue::Bool(false));
1516
1517 assert!(rt.redis_cache_get(&ck_gen2).is_none());
1519 assert!(rt.redis_cache_get(&ck_gen1).is_some());
1521 }
1522}