stoolap/executor/
semantic_cache.rs

1// Copyright 2025 Stoolap Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Semantic Query Caching with Predicate Subsumption
16//!
17//! This module implements intelligent query result caching that goes beyond simple
18//! string matching. It detects when a new query's results can be computed by filtering
19//! cached results from a previous query, without re-executing against storage.
20//!
21//! # Key Insight
22//!
23//! If Query A's predicate P_A is LESS RESTRICTIVE than Query B's predicate P_B,
24//! then B's results are a SUBSET of A's results:
25//!
26//! ```text
27//! Query A: SELECT * FROM orders WHERE amount > 100  (cached: 500 rows)
28//! Query B: SELECT * FROM orders WHERE amount > 150  (new query)
29//!
30//! Since amount > 150 is STRICTER than amount > 100,
31//! B's results ⊆ A's results
32//!
33//! Instead of scanning storage: Filter A's cached 500 rows → B's results
34//! ```
35//!
36//! # Supported Subsumption Patterns
37//!
38//! 1. **Numeric Range Tightening:**
39//!    - `col > 100` (cached) → `col > 150` (new): Filter cached
40//!    - `col < 500` (cached) → `col < 300` (new): Filter cached
41//!    - `col BETWEEN 100 AND 500` (cached) → `col BETWEEN 200 AND 400` (new): Filter cached
42//!
43//! 2. **Equality Subset:**
44//!    - `col IN (1,2,3,4,5)` (cached) → `col IN (2,3)` (new): Filter cached
45//!
46//! 3. **AND Conjunction Strengthening:**
47//!    - `A` (cached) → `A AND B` (new): Filter cached
48//!    - `A AND B` (cached) → `A AND B AND C` (new): Filter cached
49//!
50//! # Not Supported (Triggers Re-execution)
51//!
52//! - OR predicates (may expand result set)
53//! - Different tables
54//! - Different column sets
55//! - Non-comparable predicates (LIKE, function calls)
56//!
57//! # Transaction Isolation Considerations
58//!
59//! **Important:** The semantic cache is currently global and does not account for
60//! MVCC transaction isolation. This means:
61//!
62//! - Cache entries are shared across all transactions
63//! - A transaction might see cached results from another transaction's read
64//! - Cache invalidation on DML ensures committed changes are reflected
65//!
66//! This is safe for:
67//! - Single-connection usage
68//! - Read-only workloads
69//! - Scenarios where eventual consistency is acceptable
70//!
71//! For strict serializable isolation with concurrent writes, consider:
72//! - Disabling the cache during critical transactions
73//! - Using explicit cache invalidation between operations
74//!
75//! Future enhancement: Per-transaction cache scoping with timestamp-based invalidation
76
77use rustc_hash::{FxHashMap, FxHasher};
78use std::borrow::Cow;
79use std::hash::{Hash, Hasher};
80use std::sync::atomic::{AtomicU64, Ordering};
81use std::sync::RwLock;
82use std::time::{Duration, Instant};
83
84use crate::common::CompactArc;
85use crate::core::{Result, Row};
86
87/// Convert to lowercase without allocation if already lowercase.
88/// Returns Cow::Borrowed for already-lowercase strings (zero allocation).
89#[inline]
90fn to_lowercase_cow(s: &str) -> Cow<'_, str> {
91    if s.bytes().all(|b| !b.is_ascii_uppercase()) {
92        Cow::Borrowed(s)
93    } else {
94        Cow::Owned(s.to_lowercase())
95    }
96}
97use crate::functions::FunctionRegistry;
98use crate::parser::ast::{Expression, InfixOperator};
99
100use super::expression::ExpressionEval;
101use super::utils::{expressions_equivalent, extract_and_conditions, extract_column_name};
102
103/// Maximum number of cached query results per table+column combination.
104///
105/// This limits memory usage by bounding how many distinct query patterns
106/// can be cached for each table. When this limit is reached, the least
107/// recently used (LRU) entries are evicted.
108///
109/// Default: 64 entries
110pub const DEFAULT_SEMANTIC_CACHE_SIZE: usize = 64;
111
112/// Time-to-live for cached results in seconds.
113///
114/// Cached query results are automatically evicted after this duration
115/// to prevent serving stale data. This provides a safety net beyond
116/// explicit invalidation on DML operations.
117///
118/// Default: 300 seconds (5 minutes)
119pub const DEFAULT_CACHE_TTL_SECS: u64 = 300;
120
121/// Maximum number of rows to cache per query result.
122///
123/// Query results exceeding this threshold are not cached to prevent
124/// memory bloat. This is particularly important for queries that may
125/// return large result sets.
126///
127/// Default: 100,000 rows
128pub const DEFAULT_MAX_CACHED_ROWS: usize = 100_000;
129
130/// Global maximum total rows across all cache entries.
131///
132/// This prevents unbounded memory growth when many tables/column patterns
133/// are cached. When exceeded, entries are evicted using LRU across all
134/// tables until under the limit.
135///
136/// Default: 1,000,000 rows (approximately 100-500MB depending on row size)
137pub const DEFAULT_MAX_GLOBAL_CACHED_ROWS: usize = 1_000_000;
138
139/// Fingerprint for a cacheable query pattern
140#[derive(Debug, Clone, PartialEq, Eq, Hash)]
141pub struct QueryFingerprint {
142    /// Table name (lowercase)
143    pub table_name: String,
144    /// Selected columns (sorted for comparison)
145    pub columns: Vec<String>,
146    /// Hash of the predicate structure (not values)
147    pub predicate_structure_hash: u64,
148}
149
150impl QueryFingerprint {
151    /// Create a fingerprint for a simple table scan
152    pub fn new(table_name: &str, columns: Vec<String>) -> Self {
153        Self {
154            table_name: table_name.to_lowercase(),
155            columns,
156            predicate_structure_hash: 0,
157        }
158    }
159
160    /// Create a fingerprint with predicate structure
161    pub fn with_predicate(table_name: &str, columns: Vec<String>, predicate: &Expression) -> Self {
162        Self {
163            table_name: table_name.to_lowercase(),
164            columns,
165            predicate_structure_hash: hash_predicate_structure(predicate),
166        }
167    }
168}
169
170/// Cached query result with metadata
171#[derive(Debug, Clone)]
172pub struct CachedResult {
173    /// The fingerprint identifying this query pattern
174    pub fingerprint: QueryFingerprint,
175    /// Column names in order
176    pub column_names: Vec<String>,
177    /// Cached rows (Arc for zero-copy sharing on cache hits)
178    pub rows: CompactArc<Vec<Row>>,
179    /// The original WHERE predicate (for subsumption checking)
180    pub predicate: Option<Expression>,
181    /// When this entry was cached
182    pub cached_at: Instant,
183    /// Last access time (for LRU eviction)
184    pub last_accessed: Instant,
185    /// Access count
186    pub access_count: u64,
187}
188
189impl CachedResult {
190    /// Create a new cached result
191    pub fn new(
192        fingerprint: QueryFingerprint,
193        column_names: Vec<String>,
194        rows: Vec<Row>,
195        predicate: Option<Expression>,
196    ) -> Self {
197        let now = Instant::now();
198        Self {
199            fingerprint,
200            column_names,
201            rows: CompactArc::new(rows), // Wrap in CompactArc for zero-copy sharing
202            predicate,
203            cached_at: now,
204            last_accessed: now,
205            access_count: 1,
206        }
207    }
208
209    /// Create a new cached result with pre-wrapped Arc (avoids clone)
210    ///
211    /// Use this when the caller already has an CompactArc<Vec<Row>> to avoid
212    /// an extra allocation and copy.
213    pub fn new_with_arc(
214        fingerprint: QueryFingerprint,
215        column_names: Vec<String>,
216        rows: CompactArc<Vec<Row>>,
217        predicate: Option<Expression>,
218    ) -> Self {
219        let now = Instant::now();
220        Self {
221            fingerprint,
222            column_names,
223            rows, // Use Arc directly - no additional wrapping
224            predicate,
225            cached_at: now,
226            last_accessed: now,
227            access_count: 1,
228        }
229    }
230
231    /// Check if this cached result has expired
232    pub fn is_expired(&self, ttl: Duration) -> bool {
233        self.cached_at.elapsed() > ttl
234    }
235
236    /// Record an access to this cached result
237    pub fn record_access(&mut self) {
238        self.last_accessed = Instant::now();
239        self.access_count += 1;
240    }
241}
242
243/// Subsumption relationship between predicates
244#[derive(Debug, Clone)]
245pub enum SubsumptionResult {
246    /// New predicate is stricter - can filter cached results
247    Subsumed {
248        /// The additional filter to apply to cached rows
249        filter: Box<Expression>,
250    },
251    /// Predicates are identical - use cached results directly
252    Identical,
253    /// Cannot determine subsumption - must re-execute
254    NoSubsumption,
255}
256
257/// Semantic Query Cache
258///
259/// Intelligently caches query results and detects when new queries
260/// can be answered by filtering cached results.
261pub struct SemanticCache {
262    /// Cached results: table_name -> (column_key -> Vec<CachedResult>)
263    /// Nested structure enables O(1) table invalidation
264    cache: RwLock<FxHashMap<String, FxHashMap<String, Vec<CachedResult>>>>,
265    /// Maximum cache size per table+column combination
266    max_size: usize,
267    /// Cache TTL
268    ttl: Duration,
269    /// Maximum rows to cache per query
270    max_rows: usize,
271    /// Maximum total rows across all cache entries (prevents unbounded growth)
272    max_global_rows: usize,
273    /// Current total row count across all entries (for global limit enforcement)
274    global_row_count: AtomicU64,
275    /// Statistics (lock-free atomics)
276    stats: SemanticCacheStats,
277}
278
279/// Statistics for the semantic cache (lock-free with atomics)
280#[derive(Debug, Default)]
281pub struct SemanticCacheStats {
282    /// Total cache hits (exact or subsumption)
283    pub hits: AtomicU64,
284    /// Exact match hits
285    pub exact_hits: AtomicU64,
286    /// Subsumption hits (filtered from cached)
287    pub subsumption_hits: AtomicU64,
288    /// Cache misses
289    pub misses: AtomicU64,
290    /// Entries evicted due to TTL
291    pub ttl_evictions: AtomicU64,
292    /// Entries evicted due to size limit
293    pub size_evictions: AtomicU64,
294    /// Lock acquisition failures (poisoned lock from panics)
295    /// If this is non-zero, a previous operation panicked while holding the lock
296    pub lock_failures: AtomicU64,
297}
298
299/// Snapshot of cache statistics (plain values for reading)
300#[derive(Debug, Clone, Default)]
301pub struct SemanticCacheStatsSnapshot {
302    /// Total cache hits (exact or subsumption)
303    pub hits: u64,
304    /// Exact match hits
305    pub exact_hits: u64,
306    /// Subsumption hits (filtered from cached)
307    pub subsumption_hits: u64,
308    /// Cache misses
309    pub misses: u64,
310    /// Entries evicted due to TTL
311    pub ttl_evictions: u64,
312    /// Entries evicted due to size limit
313    pub size_evictions: u64,
314    /// Lock acquisition failures (indicates previous panic)
315    pub lock_failures: u64,
316}
317
318/// Result of a cache lookup
319#[derive(Debug)]
320pub enum CacheLookupResult {
321    /// Exact match found (Arc for zero-copy sharing)
322    ExactHit(CompactArc<Vec<Row>>),
323    /// Subsumption match found - apply filter to get results
324    SubsumptionHit {
325        /// Rows to filter (Arc for zero-copy sharing)
326        rows: CompactArc<Vec<Row>>,
327        /// Filter predicate to apply
328        filter: Box<Expression>,
329        /// Column names for evaluation context
330        columns: Vec<String>,
331    },
332    /// No match found
333    Miss,
334}
335
336impl SemanticCache {
337    /// Create a new semantic cache with default settings
338    pub fn new() -> Self {
339        Self::with_config(
340            DEFAULT_SEMANTIC_CACHE_SIZE,
341            Duration::from_secs(DEFAULT_CACHE_TTL_SECS),
342            DEFAULT_MAX_CACHED_ROWS,
343            DEFAULT_MAX_GLOBAL_CACHED_ROWS,
344        )
345    }
346
347    /// Create a semantic cache with custom configuration
348    pub fn with_config(
349        max_size: usize,
350        ttl: Duration,
351        max_rows: usize,
352        max_global_rows: usize,
353    ) -> Self {
354        Self {
355            cache: RwLock::new(FxHashMap::default()),
356            max_size,
357            ttl,
358            max_rows,
359            max_global_rows,
360            global_row_count: AtomicU64::new(0),
361            stats: SemanticCacheStats::default(),
362        }
363    }
364
365    /// Look up a query in the cache
366    ///
367    /// Returns:
368    /// - `ExactHit` if an identical query is cached
369    /// - `SubsumptionHit` if a broader query is cached and can be filtered
370    /// - `Miss` if no usable cache entry exists
371    pub fn lookup(
372        &self,
373        table_name: &str,
374        columns: &[String],
375        predicate: Option<&Expression>,
376    ) -> CacheLookupResult {
377        let (table_key, column_key) = Self::cache_keys(table_name, columns);
378
379        // First pass: read-only search
380        let hit_info = {
381            let cache = match self.cache.read() {
382                Ok(c) => c,
383                Err(_) => {
384                    self.stats.lock_failures.fetch_add(1, Ordering::Relaxed);
385                    return CacheLookupResult::Miss;
386                }
387            };
388
389            // Navigate nested structure: table -> columns -> entries
390            let table_cache = match cache.get(&table_key) {
391                Some(tc) => tc,
392                None => {
393                    drop(cache);
394                    self.record_miss();
395                    return CacheLookupResult::Miss;
396                }
397            };
398
399            let entries = match table_cache.get(&column_key) {
400                Some(e) => e,
401                None => {
402                    drop(cache);
403                    self.record_miss();
404                    return CacheLookupResult::Miss;
405                }
406            };
407
408            // Try to find a usable cached result
409            // Store (index, predicate_hash) for TOCTOU-safe access update
410            let mut found = None;
411            for (idx, entry) in entries.iter().enumerate() {
412                // Skip expired entries
413                if entry.is_expired(self.ttl) {
414                    continue;
415                }
416
417                // Check if columns match
418                if entry.column_names != columns {
419                    continue;
420                }
421
422                // Check predicate relationship
423                match check_subsumption(entry.predicate.as_ref(), predicate) {
424                    SubsumptionResult::Identical => {
425                        let rows = entry.rows.clone();
426                        let hash = entry.fingerprint.predicate_structure_hash;
427                        found = Some((idx, hash, CacheLookupResult::ExactHit(rows)));
428                        break;
429                    }
430                    SubsumptionResult::Subsumed { filter } => {
431                        let rows = entry.rows.clone();
432                        let columns = entry.column_names.clone();
433                        let hash = entry.fingerprint.predicate_structure_hash;
434                        found = Some((
435                            idx,
436                            hash,
437                            CacheLookupResult::SubsumptionHit {
438                                rows,
439                                filter,
440                                columns,
441                            },
442                        ));
443                        break;
444                    }
445                    SubsumptionResult::NoSubsumption => {
446                        // Try next entry
447                        continue;
448                    }
449                }
450            }
451            found
452        }; // Read lock released here
453
454        match hit_info {
455            Some((idx, expected_hash, result)) => {
456                // Update access time with write lock
457                // TOCTOU safety: verify the entry's fingerprint hash matches
458                // If entry was evicted/replaced, skip update (benign miss)
459                if let Ok(mut cache) = self.cache.write() {
460                    if let Some(table_cache) = cache.get_mut(&table_key) {
461                        if let Some(entries) = table_cache.get_mut(&column_key) {
462                            if let Some(entry) = entries.get_mut(idx) {
463                                // Only update if it's still the same entry
464                                if entry.fingerprint.predicate_structure_hash == expected_hash {
465                                    entry.record_access();
466                                }
467                            }
468                        }
469                    }
470                }
471                // Record hit stats
472                match &result {
473                    CacheLookupResult::ExactHit(_) => self.record_exact_hit(),
474                    CacheLookupResult::SubsumptionHit { .. } => self.record_subsumption_hit(),
475                    CacheLookupResult::Miss => {}
476                }
477                result
478            }
479            None => {
480                self.record_miss();
481                CacheLookupResult::Miss
482            }
483        }
484    }
485
486    /// Insert a query result into the cache
487    pub fn insert(
488        &self,
489        table_name: &str,
490        columns: Vec<String>,
491        rows: Vec<Row>,
492        predicate: Option<Expression>,
493    ) {
494        let new_row_count = rows.len();
495
496        // Don't cache if too many rows in this single result
497        if new_row_count > self.max_rows {
498            return;
499        }
500
501        let (table_key, column_key) = Self::cache_keys(table_name, &columns);
502        let fingerprint = match &predicate {
503            Some(p) => QueryFingerprint::with_predicate(table_name, columns.clone(), p),
504            None => QueryFingerprint::new(table_name, columns.clone()),
505        };
506
507        let entry = CachedResult::new(fingerprint, columns, rows, predicate);
508
509        let mut cache = match self.cache.write() {
510            Ok(c) => c,
511            Err(_) => {
512                self.stats.lock_failures.fetch_add(1, Ordering::Relaxed);
513                return;
514            }
515        };
516
517        // Check global limit and evict across other tables first if needed
518        let current_global = self.global_row_count.load(Ordering::Relaxed) as usize;
519        if current_global + new_row_count > self.max_global_rows {
520            let rows_to_free =
521                (current_global + new_row_count).saturating_sub(self.max_global_rows);
522            self.evict_global_lru(&mut cache, rows_to_free, &table_key);
523        }
524
525        // Navigate to nested entries: table -> columns -> entries
526        let table_cache = cache.entry(table_key).or_default();
527        let entries = table_cache.entry(column_key).or_default();
528
529        // Evict expired entries and track row count reduction
530        let mut rows_freed: usize = 0;
531        let before_len = entries.len();
532        entries.retain(|e| {
533            if e.is_expired(self.ttl) {
534                rows_freed += e.rows.len();
535                false
536            } else {
537                true
538            }
539        });
540        let evicted = before_len - entries.len();
541        if evicted > 0 {
542            self.stats
543                .ttl_evictions
544                .fetch_add(evicted as u64, Ordering::Relaxed);
545        }
546
547        // Evict if at per-table capacity (LRU)
548        while entries.len() >= self.max_size {
549            // Find least recently used
550            if let Some((idx, _)) = entries
551                .iter()
552                .enumerate()
553                .min_by_key(|(_, e)| (e.last_accessed, e.access_count))
554            {
555                rows_freed += entries[idx].rows.len();
556                entries.remove(idx);
557                self.stats.size_evictions.fetch_add(1, Ordering::Relaxed);
558            } else {
559                break;
560            }
561        }
562
563        // Update global row count (subtract freed, add new)
564        if rows_freed > 0 {
565            self.global_row_count
566                .fetch_sub(rows_freed as u64, Ordering::Relaxed);
567        }
568
569        // Add the new entry and update global count
570        self.global_row_count
571            .fetch_add(new_row_count as u64, Ordering::Relaxed);
572        entries.push(entry);
573    }
574
575    /// Insert a query result into the cache using a pre-wrapped Arc
576    ///
577    /// This variant allows sharing the Arc between the cache and the caller,
578    /// avoiding a full Vec clone when both need access to the same data.
579    pub fn insert_arc(
580        &self,
581        table_name: &str,
582        columns: Vec<String>,
583        rows: CompactArc<Vec<Row>>,
584        predicate: Option<Expression>,
585    ) {
586        let new_row_count = rows.len();
587
588        // Don't cache if too many rows in this single result
589        if new_row_count > self.max_rows {
590            return;
591        }
592
593        let (table_key, column_key) = Self::cache_keys(table_name, &columns);
594        let fingerprint = match &predicate {
595            Some(p) => QueryFingerprint::with_predicate(table_name, columns.clone(), p),
596            None => QueryFingerprint::new(table_name, columns.clone()),
597        };
598
599        let entry = CachedResult::new_with_arc(fingerprint, columns, rows, predicate);
600
601        let mut cache = match self.cache.write() {
602            Ok(c) => c,
603            Err(_) => {
604                self.stats.lock_failures.fetch_add(1, Ordering::Relaxed);
605                return;
606            }
607        };
608
609        // Check global limit and evict across other tables first if needed
610        let current_global = self.global_row_count.load(Ordering::Relaxed) as usize;
611        if current_global + new_row_count > self.max_global_rows {
612            let rows_to_free =
613                (current_global + new_row_count).saturating_sub(self.max_global_rows);
614            self.evict_global_lru(&mut cache, rows_to_free, &table_key);
615        }
616
617        // Navigate to nested entries: table -> columns -> entries
618        let table_cache = cache.entry(table_key).or_default();
619        let entries = table_cache.entry(column_key).or_default();
620
621        // Evict expired entries and track row count reduction
622        let mut rows_freed: usize = 0;
623        let before_len = entries.len();
624        entries.retain(|e| {
625            if e.is_expired(self.ttl) {
626                rows_freed += e.rows.len();
627                false
628            } else {
629                true
630            }
631        });
632        let evicted = before_len - entries.len();
633        if evicted > 0 {
634            self.stats
635                .ttl_evictions
636                .fetch_add(evicted as u64, Ordering::Relaxed);
637        }
638
639        // Evict if at per-table capacity (LRU)
640        while entries.len() >= self.max_size {
641            if let Some((idx, _)) = entries
642                .iter()
643                .enumerate()
644                .min_by_key(|(_, e)| (e.last_accessed, e.access_count))
645            {
646                rows_freed += entries[idx].rows.len();
647                entries.remove(idx);
648                self.stats.size_evictions.fetch_add(1, Ordering::Relaxed);
649            } else {
650                break;
651            }
652        }
653
654        // Update global row count (subtract freed, add new)
655        if rows_freed > 0 {
656            self.global_row_count
657                .fetch_sub(rows_freed as u64, Ordering::Relaxed);
658        }
659
660        // Add the new entry and update global count
661        self.global_row_count
662            .fetch_add(new_row_count as u64, Ordering::Relaxed);
663        entries.push(entry);
664    }
665
666    /// Evict entries across all tables using global LRU until rows_to_free rows are freed
667    fn evict_global_lru(
668        &self,
669        cache: &mut FxHashMap<String, FxHashMap<String, Vec<CachedResult>>>,
670        mut rows_to_free: usize,
671        skip_table: &str,
672    ) {
673        while rows_to_free > 0 {
674            // Find the globally oldest entry across all tables
675            let mut oldest: Option<(String, String, usize, Instant, u64, usize)> = None;
676
677            for (table_key, table_cache) in cache.iter() {
678                if table_key == skip_table {
679                    continue; // Don't evict from the table we're inserting into
680                }
681                for (col_key, entries) in table_cache.iter() {
682                    for (idx, entry) in entries.iter().enumerate() {
683                        let dominated = match &oldest {
684                            None => true,
685                            Some((_, _, _, last_acc, acc_count, _)) => {
686                                (entry.last_accessed, entry.access_count) < (*last_acc, *acc_count)
687                            }
688                        };
689                        if dominated {
690                            oldest = Some((
691                                table_key.clone(),
692                                col_key.clone(),
693                                idx,
694                                entry.last_accessed,
695                                entry.access_count,
696                                entry.rows.len(),
697                            ));
698                        }
699                    }
700                }
701            }
702
703            match oldest {
704                Some((table_key, col_key, idx, _, _, row_count)) => {
705                    if let Some(table_cache) = cache.get_mut(&table_key) {
706                        if let Some(entries) = table_cache.get_mut(&col_key) {
707                            entries.remove(idx);
708                            self.global_row_count
709                                .fetch_sub(row_count as u64, Ordering::Relaxed);
710                            self.stats.size_evictions.fetch_add(1, Ordering::Relaxed);
711                            rows_to_free = rows_to_free.saturating_sub(row_count);
712
713                            // Clean up empty structures
714                            if entries.is_empty() {
715                                table_cache.remove(&col_key);
716                            }
717                        }
718                        if table_cache.is_empty() {
719                            cache.remove(&table_key);
720                        }
721                    }
722                }
723                None => break, // No more entries to evict
724            }
725        }
726    }
727
728    /// Invalidate all cache entries for a table (O(1) operation)
729    pub fn invalidate_table(&self, table_name: &str) {
730        let table_key = to_lowercase_cow(table_name);
731        match self.cache.write() {
732            Ok(mut cache) => {
733                // Count rows being removed for global tracking
734                if let Some(table_cache) = cache.get(table_key.as_ref()) {
735                    let rows_removed: usize = table_cache
736                        .values()
737                        .flat_map(|entries| entries.iter())
738                        .map(|e| e.rows.len())
739                        .sum();
740                    if rows_removed > 0 {
741                        self.global_row_count
742                            .fetch_sub(rows_removed as u64, Ordering::Relaxed);
743                    }
744                }
745                // O(1) removal: just remove the entire table entry
746                cache.remove(table_key.as_ref());
747            }
748            Err(_) => {
749                self.stats.lock_failures.fetch_add(1, Ordering::Relaxed);
750            }
751        }
752    }
753
754    /// Clear the entire cache and reset statistics
755    pub fn clear(&self) {
756        match self.cache.write() {
757            Ok(mut cache) => cache.clear(),
758            Err(_) => {
759                self.stats.lock_failures.fetch_add(1, Ordering::Relaxed);
760            }
761        }
762        // Reset global row count
763        self.global_row_count.store(0, Ordering::Relaxed);
764        // Reset all atomic stats counters
765        self.stats.hits.store(0, Ordering::Relaxed);
766        self.stats.exact_hits.store(0, Ordering::Relaxed);
767        self.stats.subsumption_hits.store(0, Ordering::Relaxed);
768        self.stats.misses.store(0, Ordering::Relaxed);
769        self.stats.ttl_evictions.store(0, Ordering::Relaxed);
770        self.stats.size_evictions.store(0, Ordering::Relaxed);
771        // Note: lock_failures is NOT reset - it indicates historical panics
772    }
773
774    /// Get cache statistics as a snapshot
775    pub fn stats(&self) -> SemanticCacheStatsSnapshot {
776        SemanticCacheStatsSnapshot {
777            hits: self.stats.hits.load(Ordering::Relaxed),
778            exact_hits: self.stats.exact_hits.load(Ordering::Relaxed),
779            subsumption_hits: self.stats.subsumption_hits.load(Ordering::Relaxed),
780            misses: self.stats.misses.load(Ordering::Relaxed),
781            ttl_evictions: self.stats.ttl_evictions.load(Ordering::Relaxed),
782            size_evictions: self.stats.size_evictions.load(Ordering::Relaxed),
783            lock_failures: self.stats.lock_failures.load(Ordering::Relaxed),
784        }
785    }
786
787    /// Get the number of cached entries
788    pub fn size(&self) -> usize {
789        self.cache
790            .read()
791            .map(|c| {
792                // Sum entries across all tables and column combinations
793                c.values()
794                    .map(|table_cache| table_cache.values().map(|v| v.len()).sum::<usize>())
795                    .sum()
796            })
797            .unwrap_or(0)
798    }
799
800    /// Filter cached rows using a predicate
801    ///
802    /// This is used when a subsumption match is found to filter
803    /// the broader cached result down to the stricter query's result.
804    ///
805    /// CRITICAL: This function now returns Result to properly propagate compilation errors.
806    /// Previously, compilation failures silently returned unfiltered data which was incorrect.
807    pub fn filter_rows(
808        rows: Vec<Row>,
809        filter: &Expression,
810        columns: &[String],
811        _function_registry: &FunctionRegistry,
812    ) -> Result<Vec<Row>> {
813        let columns_vec: Vec<String> = columns.to_vec();
814        // CRITICAL: Propagate compilation errors instead of returning unfiltered data
815        let mut eval = ExpressionEval::compile(filter, &columns_vec)?;
816
817        Ok(rows.into_iter().filter(|row| eval.eval_bool(row)).collect())
818    }
819
820    // Private helpers
821
822    /// Returns (table_key, column_key) for the nested cache structure
823    fn cache_keys(table_name: &str, columns: &[String]) -> (String, String) {
824        // Use null byte as delimiter for column key since it's invalid in SQL identifiers
825        // This prevents collision between columns like ["a,b", "c"] and ["a", "b,c"]
826        let table_key = table_name.to_lowercase();
827        let mut sorted_cols = columns.to_vec();
828        sorted_cols.sort();
829        let column_key = sorted_cols.join("\0");
830        (table_key, column_key)
831    }
832
833    fn record_exact_hit(&self) {
834        self.stats.hits.fetch_add(1, Ordering::Relaxed);
835        self.stats.exact_hits.fetch_add(1, Ordering::Relaxed);
836    }
837
838    fn record_subsumption_hit(&self) {
839        self.stats.hits.fetch_add(1, Ordering::Relaxed);
840        self.stats.subsumption_hits.fetch_add(1, Ordering::Relaxed);
841    }
842
843    fn record_miss(&self) {
844        self.stats.misses.fetch_add(1, Ordering::Relaxed);
845    }
846}
847
848impl Default for SemanticCache {
849    fn default() -> Self {
850        Self::new()
851    }
852}
853
854/// Hash the structure of a predicate (ignoring literal values)
855///
856/// This allows matching predicates like:
857/// - `col > 100` and `col > 200` (same structure, different values)
858///
859/// Uses FxHasher which is 2-5x faster than SipHash for small keys.
860fn hash_predicate_structure(expr: &Expression) -> u64 {
861    let mut hasher = FxHasher::default();
862    hash_expr_structure(expr, &mut hasher);
863    hasher.finish()
864}
865
866fn hash_expr_structure(expr: &Expression, hasher: &mut FxHasher) {
867    match expr {
868        Expression::Identifier(ident) => {
869            0u8.hash(hasher);
870            ident.value_lower.hash(hasher);
871        }
872        Expression::QualifiedIdentifier(qi) => {
873            1u8.hash(hasher);
874            qi.qualifier.value_lower.hash(hasher);
875            qi.name.value_lower.hash(hasher);
876        }
877        Expression::IntegerLiteral(_) => {
878            2u8.hash(hasher);
879        }
880        Expression::FloatLiteral(_) => {
881            3u8.hash(hasher);
882        }
883        Expression::StringLiteral(_) => {
884            4u8.hash(hasher);
885        }
886        Expression::BooleanLiteral(_) => {
887            5u8.hash(hasher);
888        }
889        Expression::NullLiteral(_) => {
890            6u8.hash(hasher);
891        }
892        Expression::Infix(infix) => {
893            7u8.hash(hasher);
894            // Use discriminant for hashing - avoids expensive Debug format allocation
895            std::mem::discriminant(&infix.op_type).hash(hasher);
896            hash_expr_structure(&infix.left, hasher);
897            hash_expr_structure(&infix.right, hasher);
898        }
899        Expression::Prefix(prefix) => {
900            8u8.hash(hasher);
901            prefix.operator.hash(hasher);
902            hash_expr_structure(&prefix.right, hasher);
903        }
904        Expression::Between(between) => {
905            9u8.hash(hasher);
906            hash_expr_structure(&between.expr, hasher);
907        }
908        Expression::In(in_expr) => {
909            10u8.hash(hasher);
910            hash_expr_structure(&in_expr.left, hasher);
911        }
912        Expression::FunctionCall(func) => {
913            11u8.hash(hasher);
914            func.function.to_lowercase().hash(hasher);
915            func.arguments.len().hash(hasher);
916        }
917        Expression::Case(_) => {
918            12u8.hash(hasher);
919        }
920        Expression::List(list) => {
921            13u8.hash(hasher);
922            list.elements.len().hash(hasher);
923        }
924        _ => {
925            // Default case for other expressions
926            255u8.hash(hasher);
927        }
928    }
929}
930
931/// Check if new_predicate is subsumed by cached_predicate
932///
933/// Returns:
934/// - `Identical` if predicates are semantically equivalent
935/// - `Subsumed { filter }` if new_predicate is stricter than cached_predicate
936/// - `NoSubsumption` if we can't determine a subsumption relationship
937pub fn check_subsumption(
938    cached_predicate: Option<&Expression>,
939    new_predicate: Option<&Expression>,
940) -> SubsumptionResult {
941    match (cached_predicate, new_predicate) {
942        // Both None - identical (full table scan)
943        (None, None) => SubsumptionResult::Identical,
944
945        // Cached has predicate, new has none - new is broader (cannot use cache)
946        (Some(_), None) => SubsumptionResult::NoSubsumption,
947
948        // Cached has no predicate (full scan), new has predicate
949        // New is stricter - can filter cached full scan
950        (None, Some(new_pred)) => SubsumptionResult::Subsumed {
951            filter: Box::new(new_pred.clone()),
952        },
953
954        // Both have predicates - analyze relationship
955        (Some(cached), Some(new)) => check_predicate_subsumption(cached, new),
956    }
957}
958
959/// Analyze predicate subsumption for two predicates
960fn check_predicate_subsumption(cached: &Expression, new: &Expression) -> SubsumptionResult {
961    // First check if predicates are structurally identical
962    if expressions_equivalent(cached, new) {
963        return SubsumptionResult::Identical;
964    }
965
966    // Check for range tightening: cached is broader range
967    if let Some(result) = check_range_subsumption(cached, new) {
968        return result;
969    }
970
971    // Check for AND strengthening: new adds more conditions
972    if let Some(result) = check_and_subsumption(cached, new) {
973        return result;
974    }
975
976    // Check for IN list subsumption: new has smaller IN list
977    if let Some(result) = check_in_subsumption(cached, new) {
978        return result;
979    }
980
981    SubsumptionResult::NoSubsumption
982}
983
984/// Check for numeric range subsumption
985///
986/// Examples:
987/// - cached: `col > 100`, new: `col > 150` → Subsumed (new is stricter)
988/// - cached: `col < 500`, new: `col < 300` → Subsumed (new is stricter)
989fn check_range_subsumption(cached: &Expression, new: &Expression) -> Option<SubsumptionResult> {
990    // Both must be infix comparisons
991    let (cached_infix, new_infix) = match (cached, new) {
992        (Expression::Infix(c), Expression::Infix(n)) => (c, n),
993        _ => return None,
994    };
995
996    // Must be comparing same column to a literal
997    let cached_col = extract_column_name(&cached_infix.left)?;
998    let new_col = extract_column_name(&new_infix.left)?;
999
1000    // OPTIMIZATION: Use case-insensitive comparison instead of double to_lowercase()
1001    if !cached_col.eq_ignore_ascii_case(&new_col) {
1002        return None;
1003    }
1004
1005    let cached_val = extract_numeric_value(&cached_infix.right)?;
1006    let new_val = extract_numeric_value(&new_infix.right)?;
1007
1008    // Check operator relationship
1009    match (&cached_infix.op_type, &new_infix.op_type) {
1010        // Greater than: new > cached means new is stricter
1011        (InfixOperator::GreaterThan, InfixOperator::GreaterThan)
1012        | (InfixOperator::GreaterThan, InfixOperator::GreaterEqual) => {
1013            if new_val > cached_val {
1014                Some(SubsumptionResult::Subsumed {
1015                    filter: Box::new(new.clone()),
1016                })
1017            } else if (new_val - cached_val).abs() < f64::EPSILON {
1018                Some(SubsumptionResult::Identical)
1019            } else {
1020                None
1021            }
1022        }
1023
1024        (InfixOperator::GreaterEqual, InfixOperator::GreaterThan)
1025        | (InfixOperator::GreaterEqual, InfixOperator::GreaterEqual) => {
1026            if new_val >= cached_val {
1027                if (new_val - cached_val).abs() < f64::EPSILON
1028                    && matches!(cached_infix.op_type, InfixOperator::GreaterEqual)
1029                    && matches!(new_infix.op_type, InfixOperator::GreaterEqual)
1030                {
1031                    Some(SubsumptionResult::Identical)
1032                } else {
1033                    Some(SubsumptionResult::Subsumed {
1034                        filter: Box::new(new.clone()),
1035                    })
1036                }
1037            } else {
1038                None
1039            }
1040        }
1041
1042        // Less than: new < cached means new is stricter
1043        (InfixOperator::LessThan, InfixOperator::LessThan)
1044        | (InfixOperator::LessThan, InfixOperator::LessEqual) => {
1045            if new_val < cached_val {
1046                Some(SubsumptionResult::Subsumed {
1047                    filter: Box::new(new.clone()),
1048                })
1049            } else if (new_val - cached_val).abs() < f64::EPSILON {
1050                Some(SubsumptionResult::Identical)
1051            } else {
1052                None
1053            }
1054        }
1055
1056        (InfixOperator::LessEqual, InfixOperator::LessThan)
1057        | (InfixOperator::LessEqual, InfixOperator::LessEqual) => {
1058            if new_val <= cached_val {
1059                if (new_val - cached_val).abs() < f64::EPSILON
1060                    && matches!(cached_infix.op_type, InfixOperator::LessEqual)
1061                    && matches!(new_infix.op_type, InfixOperator::LessEqual)
1062                {
1063                    Some(SubsumptionResult::Identical)
1064                } else {
1065                    Some(SubsumptionResult::Subsumed {
1066                        filter: Box::new(new.clone()),
1067                    })
1068                }
1069            } else {
1070                None
1071            }
1072        }
1073
1074        // Equality: values must match for identity
1075        (InfixOperator::Equal, InfixOperator::Equal) => {
1076            if (new_val - cached_val).abs() < f64::EPSILON {
1077                Some(SubsumptionResult::Identical)
1078            } else {
1079                None
1080            }
1081        }
1082
1083        _ => None,
1084    }
1085}
1086
1087/// Check for AND conjunction subsumption
1088///
1089/// If new = cached AND extra_condition, then new is subsumed
1090fn check_and_subsumption(cached: &Expression, new: &Expression) -> Option<SubsumptionResult> {
1091    // New must be an AND
1092    let new_infix = match new {
1093        Expression::Infix(infix) if matches!(infix.op_type, InfixOperator::And) => infix,
1094        _ => return None,
1095    };
1096
1097    // Check if cached is equivalent to one side of the AND
1098    if expressions_equivalent(cached, &new_infix.left)
1099        || expressions_equivalent(cached, &new_infix.right)
1100    {
1101        // New is cached AND something_else → new is stricter
1102        return Some(SubsumptionResult::Subsumed {
1103            filter: Box::new(new.clone()),
1104        });
1105    }
1106
1107    // Check if cached is also an AND and new extends it
1108    if let Expression::Infix(cached_infix) = cached {
1109        if matches!(cached_infix.op_type, InfixOperator::And) {
1110            // Extract conditions from both
1111            let cached_conditions = extract_and_conditions(cached);
1112            let new_conditions = extract_and_conditions(new);
1113
1114            // New must contain all of cached's conditions
1115            let all_cached_present = cached_conditions.iter().all(|cc| {
1116                new_conditions
1117                    .iter()
1118                    .any(|nc| expressions_equivalent(cc, nc))
1119            });
1120
1121            if all_cached_present && new_conditions.len() > cached_conditions.len() {
1122                return Some(SubsumptionResult::Subsumed {
1123                    filter: Box::new(new.clone()),
1124                });
1125            }
1126        }
1127    }
1128
1129    None
1130}
1131
1132/// Check for IN list subsumption
1133///
1134/// If new IN list is subset of cached IN list, new is subsumed
1135fn check_in_subsumption(cached: &Expression, new: &Expression) -> Option<SubsumptionResult> {
1136    let (cached_in, new_in) = match (cached, new) {
1137        (Expression::In(c), Expression::In(n)) => (c, n),
1138        _ => return None,
1139    };
1140
1141    // Must be same column
1142    if !expressions_equivalent(&cached_in.left, &new_in.left) {
1143        return None;
1144    }
1145
1146    // Extract values from both IN lists
1147    let cached_values = extract_in_values(&cached_in.right)?;
1148    let new_values = extract_in_values(&new_in.right)?;
1149
1150    // Check if new is subset of cached
1151    let is_subset = new_values
1152        .iter()
1153        .all(|nv| cached_values.iter().any(|cv| values_equal(cv, nv)));
1154
1155    if is_subset {
1156        if new_values.len() == cached_values.len() {
1157            Some(SubsumptionResult::Identical)
1158        } else {
1159            Some(SubsumptionResult::Subsumed {
1160                filter: Box::new(new.clone()),
1161            })
1162        }
1163    } else {
1164        None
1165    }
1166}
1167
1168/// Numeric value extracted from a literal
1169#[derive(Debug, Clone, Copy)]
1170enum NumericValue {
1171    Integer(i64),
1172    Float(f64),
1173}
1174
1175impl NumericValue {
1176    fn as_f64(&self) -> f64 {
1177        match self {
1178            NumericValue::Integer(i) => *i as f64,
1179            NumericValue::Float(f) => *f,
1180        }
1181    }
1182}
1183
1184impl PartialEq for NumericValue {
1185    fn eq(&self, other: &Self) -> bool {
1186        // For database predicate comparison, exact integer comparison is preferred,
1187        // but for floats we use a reasonable tolerance (1e-10) rather than f64::EPSILON
1188        // which is too strict (~2.2e-16) and can cause false negatives.
1189        match (self, other) {
1190            (NumericValue::Integer(a), NumericValue::Integer(b)) => a == b,
1191            _ => (self.as_f64() - other.as_f64()).abs() < 1e-10,
1192        }
1193    }
1194}
1195
1196/// Extract values from an IN expression's right side
1197fn extract_in_values(expr: &Expression) -> Option<Vec<NumericValue>> {
1198    match expr {
1199        Expression::List(list) => Some(
1200            list.elements
1201                .iter()
1202                .filter_map(|e| match e {
1203                    Expression::IntegerLiteral(lit) => Some(NumericValue::Integer(lit.value)),
1204                    Expression::FloatLiteral(lit) => Some(NumericValue::Float(lit.value)),
1205                    _ => None,
1206                })
1207                .collect(),
1208        ),
1209        Expression::ExpressionList(list) => Some(
1210            list.expressions
1211                .iter()
1212                .filter_map(|e| match e {
1213                    Expression::IntegerLiteral(lit) => Some(NumericValue::Integer(lit.value)),
1214                    Expression::FloatLiteral(lit) => Some(NumericValue::Float(lit.value)),
1215                    _ => None,
1216                })
1217                .collect(),
1218        ),
1219        _ => None,
1220    }
1221}
1222
1223/// Compare two NumericValues for equality
1224fn values_equal(a: &NumericValue, b: &NumericValue) -> bool {
1225    a == b
1226}
1227
1228/// Extract numeric value from a literal expression
1229fn extract_numeric_value(expr: &Expression) -> Option<f64> {
1230    match expr {
1231        Expression::IntegerLiteral(lit) => Some(lit.value as f64),
1232        Expression::FloatLiteral(lit) => Some(lit.value),
1233        _ => None,
1234    }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239    use super::*;
1240    use crate::core::Value;
1241    use crate::parser::ast::{Identifier, InExpression, InfixExpression, ListExpression};
1242    use crate::parser::token::{Position, Token, TokenType};
1243
1244    fn make_token() -> Token {
1245        Token {
1246            token_type: TokenType::Integer,
1247            literal: "".into(),
1248            position: Position::new(0, 1, 1),
1249        }
1250    }
1251
1252    fn make_identifier(name: &str) -> Expression {
1253        Expression::Identifier(Identifier::new(make_token(), name.to_string()))
1254    }
1255
1256    fn make_int_literal(val: i64) -> Expression {
1257        Expression::IntegerLiteral(crate::parser::ast::IntegerLiteral {
1258            token: make_token(),
1259            value: val,
1260        })
1261    }
1262
1263    fn make_gt(col: &str, val: i64) -> Expression {
1264        Expression::Infix(InfixExpression::new(
1265            make_token(),
1266            Box::new(make_identifier(col)),
1267            ">".to_string(),
1268            Box::new(make_int_literal(val)),
1269        ))
1270    }
1271
1272    fn make_lt(col: &str, val: i64) -> Expression {
1273        Expression::Infix(InfixExpression::new(
1274            make_token(),
1275            Box::new(make_identifier(col)),
1276            "<".to_string(),
1277            Box::new(make_int_literal(val)),
1278        ))
1279    }
1280
1281    fn make_and(left: Expression, right: Expression) -> Expression {
1282        Expression::Infix(InfixExpression::new(
1283            make_token(),
1284            Box::new(left),
1285            "AND".to_string(),
1286            Box::new(right),
1287        ))
1288    }
1289
1290    fn make_in(col: &str, values: Vec<i64>) -> Expression {
1291        Expression::In(InExpression {
1292            token: make_token(),
1293            left: Box::new(make_identifier(col)),
1294            right: Box::new(Expression::List(Box::new(ListExpression {
1295                token: make_token(),
1296                elements: values.into_iter().map(make_int_literal).collect(),
1297            }))),
1298            not: false,
1299        })
1300    }
1301
1302    #[test]
1303    fn test_identical_predicates() {
1304        let pred1 = make_gt("amount", 100);
1305        let pred2 = make_gt("amount", 100);
1306
1307        match check_subsumption(Some(&pred1), Some(&pred2)) {
1308            SubsumptionResult::Identical => {}
1309            other => panic!("Expected Identical, got {:?}", other),
1310        }
1311    }
1312
1313    #[test]
1314    fn test_range_subsumption_greater_than() {
1315        // cached: amount > 100, new: amount > 150
1316        let cached = make_gt("amount", 100);
1317        let new = make_gt("amount", 150);
1318
1319        match check_subsumption(Some(&cached), Some(&new)) {
1320            SubsumptionResult::Subsumed { .. } => {}
1321            other => panic!("Expected Subsumed, got {:?}", other),
1322        }
1323
1324        // Reverse should not work: cached: amount > 150, new: amount > 100
1325        match check_subsumption(Some(&new), Some(&cached)) {
1326            SubsumptionResult::NoSubsumption => {}
1327            other => panic!("Expected NoSubsumption, got {:?}", other),
1328        }
1329    }
1330
1331    #[test]
1332    fn test_range_subsumption_less_than() {
1333        // cached: amount < 500, new: amount < 300
1334        let cached = make_lt("amount", 500);
1335        let new = make_lt("amount", 300);
1336
1337        match check_subsumption(Some(&cached), Some(&new)) {
1338            SubsumptionResult::Subsumed { .. } => {}
1339            other => panic!("Expected Subsumed, got {:?}", other),
1340        }
1341    }
1342
1343    #[test]
1344    fn test_and_subsumption() {
1345        // cached: amount > 100, new: amount > 100 AND status > 0
1346        let cached = make_gt("amount", 100);
1347        let status_check = make_gt("status", 0);
1348        let new = make_and(make_gt("amount", 100), status_check);
1349
1350        match check_subsumption(Some(&cached), Some(&new)) {
1351            SubsumptionResult::Subsumed { .. } => {}
1352            other => panic!("Expected Subsumed, got {:?}", other),
1353        }
1354    }
1355
1356    #[test]
1357    fn test_in_subsumption() {
1358        // cached: id IN (1,2,3,4,5), new: id IN (2,3)
1359        let cached = make_in("id", vec![1, 2, 3, 4, 5]);
1360        let new = make_in("id", vec![2, 3]);
1361
1362        match check_subsumption(Some(&cached), Some(&new)) {
1363            SubsumptionResult::Subsumed { .. } => {}
1364            other => panic!("Expected Subsumed, got {:?}", other),
1365        }
1366    }
1367
1368    #[test]
1369    fn test_no_predicate_to_predicate() {
1370        // cached: full scan, new: amount > 100
1371        let new = make_gt("amount", 100);
1372
1373        match check_subsumption(None, Some(&new)) {
1374            SubsumptionResult::Subsumed { .. } => {}
1375            other => panic!("Expected Subsumed, got {:?}", other),
1376        }
1377    }
1378
1379    #[test]
1380    fn test_cache_basic() {
1381        let cache = SemanticCache::new();
1382
1383        // Insert a result
1384        let rows = vec![
1385            Row::from_values(vec![Value::Integer(1), Value::Integer(200)]),
1386            Row::from_values(vec![Value::Integer(2), Value::Integer(300)]),
1387            Row::from_values(vec![Value::Integer(3), Value::Integer(400)]),
1388        ];
1389
1390        cache.insert(
1391            "orders",
1392            vec!["id".to_string(), "amount".to_string()],
1393            rows.clone(),
1394            Some(make_gt("amount", 100)),
1395        );
1396
1397        assert_eq!(cache.size(), 1);
1398
1399        // Lookup with identical predicate
1400        match cache.lookup(
1401            "orders",
1402            &["id".to_string(), "amount".to_string()],
1403            Some(&make_gt("amount", 100)),
1404        ) {
1405            CacheLookupResult::ExactHit(cached_rows) => {
1406                assert_eq!(cached_rows.len(), 3);
1407            }
1408            other => panic!("Expected ExactHit, got {:?}", other),
1409        }
1410
1411        let stats = cache.stats();
1412        assert_eq!(stats.exact_hits, 1);
1413    }
1414
1415    #[test]
1416    fn test_cache_subsumption_lookup() {
1417        let cache = SemanticCache::new();
1418
1419        // Cache result for amount > 100
1420        let rows = vec![
1421            Row::from_values(vec![Value::Integer(1), Value::Integer(150)]),
1422            Row::from_values(vec![Value::Integer(2), Value::Integer(200)]),
1423            Row::from_values(vec![Value::Integer(3), Value::Integer(300)]),
1424        ];
1425
1426        cache.insert(
1427            "orders",
1428            vec!["id".to_string(), "amount".to_string()],
1429            rows,
1430            Some(make_gt("amount", 100)),
1431        );
1432
1433        // Lookup with stricter predicate: amount > 180
1434        match cache.lookup(
1435            "orders",
1436            &["id".to_string(), "amount".to_string()],
1437            Some(&make_gt("amount", 180)),
1438        ) {
1439            CacheLookupResult::SubsumptionHit { rows, .. } => {
1440                assert_eq!(rows.len(), 3); // All cached rows returned
1441            }
1442            other => panic!("Expected SubsumptionHit, got {:?}", other),
1443        }
1444
1445        let stats = cache.stats();
1446        assert_eq!(stats.subsumption_hits, 1);
1447    }
1448
1449    #[test]
1450    fn test_cache_invalidation() {
1451        let cache = SemanticCache::new();
1452
1453        cache.insert(
1454            "orders",
1455            vec!["id".to_string()],
1456            vec![Row::from_values(vec![Value::Integer(1)])],
1457            None,
1458        );
1459
1460        assert_eq!(cache.size(), 1);
1461
1462        cache.invalidate_table("orders");
1463        assert_eq!(cache.size(), 0);
1464    }
1465}