Skip to main content

fraiseql_core/cache/
fact_table_cache.rs

1//! Fact table aggregation caching methods for `CachedDatabaseAdapter`.
2//!
3//! Provides transparent caching for aggregation queries on fact tables
4//! (`tf_*` prefix), using version-table, time-based, or schema-version
5//! strategies to determine cache validity.
6
7use super::{
8    adapter::CachedDatabaseAdapter,
9    fact_table_version::{FactTableVersionStrategy, generate_version_key_component},
10};
11use crate::{
12    db::{DatabaseAdapter, types::JsonbValue},
13    error::Result,
14};
15
16impl<A: DatabaseAdapter> CachedDatabaseAdapter<A> {
17    /// Extract fact table name from SQL query.
18    ///
19    /// Looks for `FROM tf_<name>` pattern in the SQL.
20    pub(super) fn extract_fact_table_from_sql(sql: &str) -> Option<String> {
21        // Look for FROM tf_xxx pattern (case insensitive)
22        let sql_lower = sql.to_lowercase();
23        let from_idx = sql_lower.find("from ")?;
24        let after_from = &sql_lower[from_idx + 5..];
25
26        // Skip whitespace
27        let trimmed = after_from.trim_start();
28
29        // Check if it starts with tf_
30        if !trimmed.starts_with("tf_") {
31            return None;
32        }
33
34        // Extract table name (until whitespace, comma, or end)
35        let end_idx = trimmed
36            .find(|c: char| c.is_whitespace() || c == ',' || c == ')')
37            .unwrap_or(trimmed.len());
38
39        Some(trimmed[..end_idx].to_string())
40    }
41
42    /// Generate cache key for aggregation query.
43    ///
44    /// Includes SQL, schema version, and version component based on strategy.
45    pub(super) fn generate_aggregation_cache_key(
46        sql: &str,
47        schema_version: &str,
48        version_component: Option<&str>,
49    ) -> u64 {
50        use std::hash::{BuildHasher, Hasher};
51        let mut hasher =
52            ahash::RandomState::with_seeds(0x5172_7f6a, 0x8a4e_3c2b, 0xd6f1_48c5, 0x3e9a_7d14)
53                .build_hasher();
54        hasher.write_u8(b'A'); // "aggregation" domain separator
55        hasher.write(sql.as_bytes());
56        hasher.write(schema_version.as_bytes());
57        if let Some(vc) = version_component {
58            hasher.write_u8(1);
59            hasher.write(vc.as_bytes());
60        } else {
61            hasher.write_u8(0);
62        }
63        hasher.finish()
64    }
65
66    /// Fetch version from `tf_versions` table.
67    ///
68    /// Returns cached version if fresh, otherwise queries database.
69    pub(super) async fn fetch_table_version(&self, table_name: &str) -> Option<i64> {
70        // Check cached version first
71        if let Some(version) = self.version_provider.get_cached_version(table_name) {
72            return Some(version);
73        }
74
75        // Query tf_versions table
76        let sql = format!(
77            "SELECT version FROM tf_versions WHERE table_name = '{}'",
78            table_name.replace('\'', "''") // Escape single quotes
79        );
80
81        match self.adapter.execute_raw_query(&sql).await {
82            Ok(rows) if !rows.is_empty() => {
83                if let Some(serde_json::Value::Number(n)) = rows[0].get("version") {
84                    if let Some(v) = n.as_i64() {
85                        self.version_provider.set_cached_version(table_name, v);
86                        return Some(v);
87                    }
88                }
89                None
90            },
91            _ => None,
92        }
93    }
94
95    /// Execute aggregation query with caching based on fact table versioning strategy.
96    ///
97    /// This method provides transparent caching for aggregation queries on fact tables.
98    /// The caching behavior depends on the configured strategy for the fact table.
99    ///
100    /// # Arguments
101    ///
102    /// * `sql` - The aggregation SQL query
103    ///
104    /// # Returns
105    ///
106    /// Query results (from cache or database)
107    ///
108    /// # Example
109    ///
110    /// ```rust,no_run
111    /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
112    /// # use fraiseql_core::db::postgres::PostgresAdapter;
113    ///
114    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
115    /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
116    /// # let cache = QueryResultCache::new(CacheConfig::default());
117    /// # let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string());
118    /// // This query will be cached according to tf_sales strategy
119    /// let results = adapter.execute_aggregation_query(
120    ///     "SELECT SUM(revenue) FROM tf_sales WHERE year = 2024"
121    /// ).await?;
122    /// # Ok(())
123    /// # }
124    /// ```
125    ///
126    /// # Errors
127    ///
128    /// Returns `FraiseQLError` if the underlying database query fails.
129    pub async fn execute_aggregation_query(
130        &self,
131        sql: &str,
132    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
133        // Extract fact table from SQL
134        let Some(table_name) = Self::extract_fact_table_from_sql(sql) else {
135            // Not a fact table query - execute without caching
136            return self.adapter.execute_raw_query(sql).await;
137        };
138
139        // Get strategy for this table
140        let strategy = self.fact_table_config.get_strategy(&table_name);
141
142        // Check if caching is enabled
143        if !strategy.is_caching_enabled() {
144            return self.adapter.execute_raw_query(sql).await;
145        }
146
147        // Get version component based on strategy
148        let table_version = if matches!(strategy, FactTableVersionStrategy::VersionTable) {
149            self.fetch_table_version(&table_name).await
150        } else {
151            None
152        };
153
154        let version_component = generate_version_key_component(
155            &table_name,
156            strategy,
157            table_version,
158            &self.schema_version,
159        );
160
161        // If version table strategy but no version found, skip caching
162        let Some(version_component) = version_component else {
163            // VersionTable strategy but no version in tf_versions - skip cache
164            return self.adapter.execute_raw_query(sql).await;
165        };
166
167        // Generate cache key
168        let cache_key = Self::generate_aggregation_cache_key(
169            sql,
170            &self.schema_version,
171            Some(&version_component),
172        );
173
174        // Try cache first
175        if let Some(cached_result) = self.cache.get(cache_key)? {
176            // Cache hit - convert JsonbValue back to HashMap
177            let results: Vec<std::collections::HashMap<String, serde_json::Value>> = cached_result
178                .iter()
179                .filter_map(|jv| serde_json::from_value(jv.as_value().clone()).ok())
180                .collect();
181            return Ok(results);
182        }
183
184        // Cache miss - execute query
185        let result = self.adapter.execute_raw_query(sql).await?;
186
187        // Store in cache (convert HashMap to JsonbValue)
188        let cached_values: Vec<JsonbValue> = result
189            .iter()
190            .filter_map(|row| serde_json::to_value(row).ok().map(JsonbValue::new))
191            .collect();
192
193        self.cache.put(
194            cache_key,
195            cached_values,
196            vec![table_name], // Track which fact table this query reads
197            None,             // Fact-table queries use the global TTL
198            None,             // No entity-type index for raw queries
199        )?;
200
201        Ok(result)
202    }
203}