fraiseql_core/cache/fact_table_cache.rs
1//! Fact table aggregation caching methods for `CachedDatabaseAdapter`.
2//!
3//! Provides transparent caching for aggregation queries on fact tables
4//! (`tf_*` prefix), using version-table, time-based, or schema-version
5//! strategies to determine cache validity.
6
7use super::{
8 adapter::CachedDatabaseAdapter,
9 fact_table_version::{FactTableVersionStrategy, generate_version_key_component},
10};
11use crate::{
12 db::{DatabaseAdapter, types::JsonbValue},
13 error::Result,
14};
15
16impl<A: DatabaseAdapter> CachedDatabaseAdapter<A> {
17 /// Extract fact table name from SQL query.
18 ///
19 /// Looks for `FROM tf_<name>` pattern in the SQL.
20 pub(super) fn extract_fact_table_from_sql(sql: &str) -> Option<String> {
21 // Look for FROM tf_xxx pattern (case insensitive)
22 let sql_lower = sql.to_lowercase();
23 let from_idx = sql_lower.find("from ")?;
24 let after_from = &sql_lower[from_idx + 5..];
25
26 // Skip whitespace
27 let trimmed = after_from.trim_start();
28
29 // Check if it starts with tf_
30 if !trimmed.starts_with("tf_") {
31 return None;
32 }
33
34 // Extract table name (until whitespace, comma, or end)
35 let end_idx = trimmed
36 .find(|c: char| c.is_whitespace() || c == ',' || c == ')')
37 .unwrap_or(trimmed.len());
38
39 Some(trimmed[..end_idx].to_string())
40 }
41
42 /// Generate cache key for aggregation query.
43 ///
44 /// Includes SQL, schema version, and version component based on strategy.
45 pub(super) fn generate_aggregation_cache_key(
46 sql: &str,
47 schema_version: &str,
48 version_component: Option<&str>,
49 ) -> u64 {
50 use std::hash::{BuildHasher, Hasher};
51 let mut hasher =
52 ahash::RandomState::with_seeds(0x5172_7f6a, 0x8a4e_3c2b, 0xd6f1_48c5, 0x3e9a_7d14)
53 .build_hasher();
54 hasher.write_u8(b'A'); // "aggregation" domain separator
55 hasher.write(sql.as_bytes());
56 hasher.write(schema_version.as_bytes());
57 if let Some(vc) = version_component {
58 hasher.write_u8(1);
59 hasher.write(vc.as_bytes());
60 } else {
61 hasher.write_u8(0);
62 }
63 hasher.finish()
64 }
65
66 /// Fetch version from `tf_versions` table.
67 ///
68 /// Returns cached version if fresh, otherwise queries database.
69 pub(super) async fn fetch_table_version(&self, table_name: &str) -> Option<i64> {
70 // Check cached version first
71 if let Some(version) = self.version_provider.get_cached_version(table_name) {
72 return Some(version);
73 }
74
75 // Query tf_versions table
76 let sql = format!(
77 "SELECT version FROM tf_versions WHERE table_name = '{}'",
78 table_name.replace('\'', "''") // Escape single quotes
79 );
80
81 match self.adapter.execute_raw_query(&sql).await {
82 Ok(rows) if !rows.is_empty() => {
83 if let Some(serde_json::Value::Number(n)) = rows[0].get("version") {
84 if let Some(v) = n.as_i64() {
85 self.version_provider.set_cached_version(table_name, v);
86 return Some(v);
87 }
88 }
89 None
90 },
91 _ => None,
92 }
93 }
94
95 /// Execute aggregation query with caching based on fact table versioning strategy.
96 ///
97 /// This method provides transparent caching for aggregation queries on fact tables.
98 /// The caching behavior depends on the configured strategy for the fact table.
99 ///
100 /// # Arguments
101 ///
102 /// * `sql` - The aggregation SQL query
103 ///
104 /// # Returns
105 ///
106 /// Query results (from cache or database)
107 ///
108 /// # Example
109 ///
110 /// ```rust,no_run
111 /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
112 /// # use fraiseql_core::db::postgres::PostgresAdapter;
113 ///
114 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
115 /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
116 /// # let cache = QueryResultCache::new(CacheConfig::default());
117 /// # let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string());
118 /// // This query will be cached according to tf_sales strategy
119 /// let results = adapter.execute_aggregation_query(
120 /// "SELECT SUM(revenue) FROM tf_sales WHERE year = 2024"
121 /// ).await?;
122 /// # Ok(())
123 /// # }
124 /// ```
125 ///
126 /// # Errors
127 ///
128 /// Returns `FraiseQLError` if the underlying database query fails.
129 pub async fn execute_aggregation_query(
130 &self,
131 sql: &str,
132 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
133 // Extract fact table from SQL
134 let Some(table_name) = Self::extract_fact_table_from_sql(sql) else {
135 // Not a fact table query - execute without caching
136 return self.adapter.execute_raw_query(sql).await;
137 };
138
139 // Get strategy for this table
140 let strategy = self.fact_table_config.get_strategy(&table_name);
141
142 // Check if caching is enabled
143 if !strategy.is_caching_enabled() {
144 return self.adapter.execute_raw_query(sql).await;
145 }
146
147 // Get version component based on strategy
148 let table_version = if matches!(strategy, FactTableVersionStrategy::VersionTable) {
149 self.fetch_table_version(&table_name).await
150 } else {
151 None
152 };
153
154 let version_component = generate_version_key_component(
155 &table_name,
156 strategy,
157 table_version,
158 &self.schema_version,
159 );
160
161 // If version table strategy but no version found, skip caching
162 let Some(version_component) = version_component else {
163 // VersionTable strategy but no version in tf_versions - skip cache
164 return self.adapter.execute_raw_query(sql).await;
165 };
166
167 // Generate cache key
168 let cache_key = Self::generate_aggregation_cache_key(
169 sql,
170 &self.schema_version,
171 Some(&version_component),
172 );
173
174 // Try cache first
175 if let Some(cached_result) = self.cache.get(cache_key)? {
176 // Cache hit - convert JsonbValue back to HashMap
177 let results: Vec<std::collections::HashMap<String, serde_json::Value>> = cached_result
178 .iter()
179 .filter_map(|jv| serde_json::from_value(jv.as_value().clone()).ok())
180 .collect();
181 return Ok(results);
182 }
183
184 // Cache miss - execute query
185 let result = self.adapter.execute_raw_query(sql).await?;
186
187 // Store in cache (convert HashMap to JsonbValue)
188 let cached_values: Vec<JsonbValue> = result
189 .iter()
190 .filter_map(|row| serde_json::to_value(row).ok().map(JsonbValue::new))
191 .collect();
192
193 self.cache.put(
194 cache_key,
195 cached_values,
196 vec![table_name], // Track which fact table this query reads
197 None, // Fact-table queries use the global TTL
198 None, // No entity-type index for raw queries
199 )?;
200
201 Ok(result)
202 }
203}