fraiseql_core/cache/adapter/mod.rs
1//! Cached database adapter wrapper.
2//!
3//! Provides transparent caching for `DatabaseAdapter` implementations by wrapping
4//! `execute_where_query()` calls with cache lookup and storage.
5//!
6//! # Security: Cache Isolation via RLS
7//!
8//! Automatic Persisted Query (APQ) caching provides no user-level isolation on its own.
9//! Cache key isolation derives entirely from Row-Level Security: different users MUST
10//! produce different WHERE clauses via their RLS policies. If RLS is disabled or
11//! returns an empty WHERE clause, two users with the same query and variables will
12//! receive the same cached response.
13//!
14//! **Always verify RLS is active when caching is enabled in multi-tenant deployments.**
15//!
16//! # Architecture
17//!
18//! ```text
19//! ┌─────────────────────────┐
20//! │ CachedDatabaseAdapter │
21//! │ │
22//! │ execute_where_query() │
23//! └───────────┬─────────────┘
24//! │
25//! ↓ generate_cache_key()
26//! ┌─────────────────────────┐
27//! │ Cache Hit? │
28//! └───────────┬─────────────┘
29//! │
30//! ┌─────┴─────┐
31//! │ │
32//! HIT MISS
33//! │ │
34//! ↓ ↓ DatabaseAdapter
35//! Return Cached Execute Query
36//! Result + Store in Cache
37//! ```
38//!
39//! # Example
40//!
41//! ```no_run
42//! use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
43//! use fraiseql_core::db::{postgres::PostgresAdapter, DatabaseAdapter};
44//!
45//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
46//! // Create underlying database adapter
47//! let db_adapter = PostgresAdapter::new("postgresql://localhost/db").await?;
48//!
49//! // Wrap with caching
50//! let cache = QueryResultCache::new(CacheConfig::default());
51//! let cached_adapter = CachedDatabaseAdapter::new(
52//! db_adapter,
53//! cache,
54//! "1.0.0".to_string() // schema version
55//! );
56//!
57//! // Use as normal DatabaseAdapter - caching is transparent
58//! let users = cached_adapter
59//! .execute_where_query("v_user", None, Some(10), None, None)
60//! .await?;
61//! # Ok(())
62//! # }
63//! ```
64
65use std::{
66 collections::HashMap,
67 sync::{Arc, Mutex},
68};
69
70use async_trait::async_trait;
71
72use super::{
73 cascade_invalidator::CascadeInvalidator,
74 fact_table_version::{FactTableCacheConfig, FactTableVersionProvider},
75 result::QueryResultCache,
76};
77use crate::{
78 cache::config::RlsEnforcement,
79 db::{
80 DatabaseAdapter, DatabaseType, PoolMetrics, SupportsMutations, WhereClause,
81 types::{JsonbValue, OrderByClause},
82 },
83 error::{FraiseQLError, Result},
84 schema::CompiledSchema,
85};
86
87mod mutation;
88mod query;
89#[cfg(test)]
90mod tests;
91
92pub use query::view_name_to_entity_type;
93
94/// Cached database adapter wrapper.
95///
96/// Wraps any `DatabaseAdapter` implementation with transparent query result caching.
97/// Cache keys include query, variables, WHERE clause, and schema version for security
98/// and correctness.
99///
100/// # Cache Behavior
101///
102/// - **Cache Hit**: Returns cached result in ~0.1ms (50-200x faster than database)
103/// - **Cache Miss**: Executes query via underlying adapter, stores result in cache
104/// - **Invalidation**: Call `invalidate_views()` after mutations to clear affected caches
105///
106/// # Thread Safety
107///
108/// This adapter is `Send + Sync` and can be safely shared across async tasks.
109/// The underlying cache uses `Arc<Mutex<>>` for thread-safe access.
110///
111/// # Example
112///
113/// ```no_run
114/// use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig, InvalidationContext};
115/// use fraiseql_core::db::{postgres::PostgresAdapter, DatabaseAdapter};
116///
117/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
118/// let db = PostgresAdapter::new("postgresql://localhost/db").await?;
119/// let cache = QueryResultCache::new(CacheConfig::default());
120/// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string());
121///
122/// // First query - cache miss (slower)
123/// let users1 = adapter.execute_where_query("v_user", None, None, None, None).await?;
124///
125/// // Second query - cache hit (fast!)
126/// let users2 = adapter.execute_where_query("v_user", None, None, None, None).await?;
127///
128/// // After mutation, invalidate
129/// let invalidation = InvalidationContext::for_mutation(
130/// "createUser",
131/// vec!["v_user".to_string()]
132/// );
133/// adapter.invalidate_views(&invalidation.modified_views)?;
134/// # Ok(())
135/// # }
136/// ```
137pub struct CachedDatabaseAdapter<A: DatabaseAdapter> {
138 /// Underlying database adapter.
139 pub(super) adapter: A,
140
141 /// Query result cache.
142 pub(super) cache: Arc<QueryResultCache>,
143
144 /// Schema version for cache key generation.
145 ///
146 /// When schema version changes (e.g., after deployment), all cache entries
147 /// with old version become invalid automatically.
148 pub(super) schema_version: String,
149
150 /// Per-view TTL overrides in seconds.
151 ///
152 /// Populated from `QueryDefinition::cache_ttl_seconds` at server startup:
153 /// view name → TTL seconds. `None` for a view falls back to the global
154 /// `CacheConfig::ttl_seconds`.
155 pub(super) view_ttl_overrides: HashMap<String, u64>,
156
157 /// Configuration for fact table aggregation caching.
158 pub(super) fact_table_config: FactTableCacheConfig,
159
160 /// Version provider for fact tables (caches version lookups).
161 pub(super) version_provider: Arc<FactTableVersionProvider>,
162
163 /// Optional cascade invalidator for transitive view dependency expansion.
164 ///
165 /// When set, `invalidate_views()` uses BFS to expand the initial view list
166 /// to include all transitively dependent views before clearing cache entries.
167 pub(super) cascade_invalidator: Option<Arc<Mutex<CascadeInvalidator>>>,
168}
169
170impl<A: DatabaseAdapter> CachedDatabaseAdapter<A> {
171 /// Create new cached database adapter.
172 ///
173 /// # Arguments
174 ///
175 /// * `adapter` - Underlying database adapter to wrap
176 /// * `cache` - Query result cache instance
177 /// * `schema_version` - Uniquely identifies the compiled schema. Use `schema.content_hash()`
178 /// (NOT `env!("CARGO_PKG_VERSION")`) so that any schema content change automatically
179 /// invalidates cached entries across deploys.
180 ///
181 /// # Example
182 ///
183 /// ```rust,no_run
184 /// use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
185 /// use fraiseql_core::db::postgres::PostgresAdapter;
186 /// use fraiseql_core::schema::CompiledSchema;
187 ///
188 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
189 /// # let schema = CompiledSchema::default();
190 /// let db = PostgresAdapter::new("postgresql://localhost/db").await?;
191 /// let cache = QueryResultCache::new(CacheConfig::default());
192 /// let adapter = CachedDatabaseAdapter::new(
193 /// db,
194 /// cache,
195 /// schema.content_hash() // Use content hash for automatic invalidation
196 /// );
197 /// # Ok(())
198 /// # }
199 /// ```
200 #[must_use]
201 pub fn new(adapter: A, cache: QueryResultCache, schema_version: String) -> Self {
202 Self {
203 adapter,
204 cache: Arc::new(cache),
205 schema_version,
206 view_ttl_overrides: HashMap::new(),
207 fact_table_config: FactTableCacheConfig::default(),
208 version_provider: Arc::new(FactTableVersionProvider::default()),
209 cascade_invalidator: None,
210 }
211 }
212
213 /// Set per-view TTL overrides.
214 ///
215 /// Maps `sql_source` (view name) → TTL in seconds. Built at server startup
216 /// from compiled `QueryDefinition::cache_ttl_seconds` entries.
217 ///
218 /// # Example
219 ///
220 /// ```rust,no_run
221 /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
222 /// # use fraiseql_core::db::postgres::PostgresAdapter;
223 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
224 /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
225 /// # let cache = QueryResultCache::new(CacheConfig::default());
226 /// let overrides = std::collections::HashMap::from([
227 /// ("v_country".to_string(), 3600_u64), // 1 h for reference data
228 /// ("v_live_price".to_string(), 0_u64), // never cache live data
229 /// ]);
230 /// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string())
231 /// .with_view_ttl_overrides(overrides);
232 /// # Ok(())
233 /// # }
234 /// ```
235 #[must_use]
236 pub fn with_view_ttl_overrides(mut self, overrides: HashMap<String, u64>) -> Self {
237 self.view_ttl_overrides = overrides;
238 self
239 }
240
241 /// Set a cascade invalidator for transitive view dependency expansion.
242 ///
243 /// When set, `invalidate_views()` uses BFS to expand the initial view list
244 /// to include all views that transitively depend on the invalidated views.
245 ///
246 /// # Example
247 ///
248 /// ```rust,no_run
249 /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig, CascadeInvalidator};
250 /// # use fraiseql_core::db::postgres::PostgresAdapter;
251 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
252 /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
253 /// # let cache = QueryResultCache::new(CacheConfig::default());
254 /// let mut cascade = CascadeInvalidator::new();
255 /// cascade.add_dependency("v_user_stats", "v_user")?;
256 /// cascade.add_dependency("v_dashboard", "v_user_stats")?;
257 /// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string())
258 /// .with_cascade_invalidator(cascade);
259 /// # Ok(())
260 /// # }
261 /// ```
262 #[must_use]
263 pub fn with_cascade_invalidator(mut self, invalidator: CascadeInvalidator) -> Self {
264 self.cascade_invalidator = Some(Arc::new(Mutex::new(invalidator)));
265 self
266 }
267
268 /// Populate per-view TTL overrides from a compiled schema.
269 ///
270 /// For each query that has `cache_ttl_seconds` set and a non-null `sql_source`,
271 /// this maps the view name → TTL so the cache adapter uses the per-query TTL
272 /// instead of the global default.
273 ///
274 /// # Example
275 ///
276 /// ```rust,no_run
277 /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
278 /// # use fraiseql_core::db::postgres::PostgresAdapter;
279 /// # use fraiseql_core::schema::CompiledSchema;
280 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
281 /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
282 /// # let cache = QueryResultCache::new(CacheConfig::default());
283 /// # let schema = CompiledSchema::default();
284 /// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string())
285 /// .with_ttl_overrides_from_schema(&schema);
286 /// # Ok(())
287 /// # }
288 /// ```
289 #[must_use]
290 pub fn with_ttl_overrides_from_schema(mut self, schema: &CompiledSchema) -> Self {
291 for query in &schema.queries {
292 if let (Some(view), Some(ttl)) = (&query.sql_source, query.cache_ttl_seconds) {
293 self.view_ttl_overrides.insert(view.clone(), ttl);
294 }
295 }
296 self
297 }
298
299 /// Create new cached database adapter with fact table caching configuration.
300 ///
301 /// # Arguments
302 ///
303 /// * `adapter` - Underlying database adapter to wrap
304 /// * `cache` - Query result cache instance
305 /// * `schema_version` - Current schema version (e.g., git hash, semver)
306 /// * `fact_table_config` - Configuration for fact table aggregation caching
307 ///
308 /// # Example
309 ///
310 /// ```rust,no_run
311 /// use fraiseql_core::cache::{
312 /// CachedDatabaseAdapter, QueryResultCache, CacheConfig,
313 /// FactTableCacheConfig, FactTableVersionStrategy,
314 /// };
315 /// use fraiseql_core::db::postgres::PostgresAdapter;
316 ///
317 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
318 /// let db = PostgresAdapter::new("postgresql://localhost/db").await?;
319 /// let cache = QueryResultCache::new(CacheConfig::default());
320 ///
321 /// // Configure fact table caching strategies
322 /// let mut ft_config = FactTableCacheConfig::default();
323 /// ft_config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
324 /// ft_config.set_strategy("tf_events", FactTableVersionStrategy::time_based(300));
325 ///
326 /// let adapter = CachedDatabaseAdapter::with_fact_table_config(
327 /// db,
328 /// cache,
329 /// "1.0.0".to_string(),
330 /// ft_config,
331 /// );
332 /// # Ok(())
333 /// # }
334 /// ```
335 #[must_use]
336 pub fn with_fact_table_config(
337 adapter: A,
338 cache: QueryResultCache,
339 schema_version: String,
340 fact_table_config: FactTableCacheConfig,
341 ) -> Self {
342 Self {
343 adapter,
344 cache: Arc::new(cache),
345 schema_version,
346 view_ttl_overrides: HashMap::new(),
347 fact_table_config,
348 version_provider: Arc::new(FactTableVersionProvider::default()),
349 cascade_invalidator: None,
350 }
351 }
352
353 /// Get reference to underlying adapter.
354 ///
355 /// Useful for accessing adapter-specific methods not in the `DatabaseAdapter` trait.
356 ///
357 /// # Example
358 ///
359 /// ```rust,no_run
360 /// # use fraiseql_core::cache::CachedDatabaseAdapter;
361 /// # use fraiseql_core::db::postgres::PostgresAdapter;
362 /// # fn example(adapter: CachedDatabaseAdapter<PostgresAdapter>) {
363 /// // Access PostgreSQL-specific functionality
364 /// let pg_adapter = adapter.inner();
365 /// # }
366 /// ```
367 #[must_use]
368 pub const fn inner(&self) -> &A {
369 &self.adapter
370 }
371
372 /// Get reference to cache.
373 ///
374 /// Useful for metrics and monitoring.
375 ///
376 /// # Example
377 ///
378 /// ```rust,no_run
379 /// # use fraiseql_core::cache::CachedDatabaseAdapter;
380 /// # use fraiseql_core::db::postgres::PostgresAdapter;
381 /// # async fn example(adapter: CachedDatabaseAdapter<PostgresAdapter>) -> Result<(), Box<dyn std::error::Error>> {
382 /// let metrics = adapter.cache().metrics()?;
383 /// println!("Cache hit rate: {:.1}%", metrics.hit_rate() * 100.0);
384 /// # Ok(())
385 /// # }
386 /// ```
387 #[must_use]
388 pub fn cache(&self) -> &QueryResultCache {
389 &self.cache
390 }
391
392 /// Get schema version.
393 ///
394 /// # Example
395 ///
396 /// ```rust,no_run
397 /// # use fraiseql_core::cache::CachedDatabaseAdapter;
398 /// # use fraiseql_core::db::postgres::PostgresAdapter;
399 /// # fn example(adapter: CachedDatabaseAdapter<PostgresAdapter>) {
400 /// println!("Schema version: {}", adapter.schema_version());
401 /// # }
402 /// ```
403 #[must_use]
404 pub fn schema_version(&self) -> &str {
405 &self.schema_version
406 }
407
408 /// Get fact table cache configuration.
409 #[must_use]
410 pub const fn fact_table_config(&self) -> &FactTableCacheConfig {
411 &self.fact_table_config
412 }
413
414 /// Get the version provider for fact tables.
415 #[must_use]
416 pub fn version_provider(&self) -> &FactTableVersionProvider {
417 &self.version_provider
418 }
419
420 /// Verify that Row-Level Security is active on the database connection.
421 ///
422 /// Call this during server initialization when both caching and multi-tenancy
423 /// (`schema.is_multi_tenant()`) are enabled. Without RLS, users sharing the same
424 /// query parameters will receive the same cached response regardless of tenant.
425 ///
426 /// # What this checks
427 ///
428 /// Runs `SELECT current_setting('row_security', true) AS rls_setting`. The result
429 /// must be `'on'` or `'force'` for the check to pass. Non-PostgreSQL databases
430 /// (which return an error or unsupported) are treated as "RLS not active".
431 ///
432 /// # Errors
433 ///
434 /// Returns [`FraiseQLError::Configuration`] if RLS appears inactive.
435 pub async fn validate_rls_active(&self) -> Result<()> {
436 let result = self
437 .adapter
438 .execute_raw_query("SELECT current_setting('row_security', true) AS rls_setting")
439 .await;
440
441 let rls_active = match result {
442 Ok(rows) => rows
443 .first()
444 .and_then(|row| row.get("rls_setting"))
445 .and_then(serde_json::Value::as_str)
446 .is_some_and(|s| s == "on" || s == "force"),
447 Err(_) => false, // Non-PostgreSQL or query failure: RLS not active
448 };
449
450 if rls_active {
451 Ok(())
452 } else {
453 Err(FraiseQLError::Configuration {
454 message: "Caching is enabled in a multi-tenant schema but Row-Level Security \
455 does not appear to be active on the database. This would allow \
456 cross-tenant data leakage through the cache. \
457 Either disable caching, enable RLS, or set \
458 `rls_enforcement = \"off\"` in CacheConfig for single-tenant \
459 deployments."
460 .to_string(),
461 })
462 }
463 }
464
465 /// Apply the RLS enforcement policy from `CacheConfig`.
466 ///
467 /// Runs [`validate_rls_active`](Self::validate_rls_active) and handles the result
468 /// according to `enforcement`:
469 /// - [`RlsEnforcement::Error`]: propagates the error (default)
470 /// - [`RlsEnforcement::Warn`]: logs a warning and returns `Ok(())`
471 /// - [`RlsEnforcement::Off`]: skips the check entirely
472 ///
473 /// # Errors
474 ///
475 /// Returns the error from `validate_rls_active` when enforcement is `Error`.
476 pub async fn enforce_rls(&self, enforcement: RlsEnforcement) -> Result<()> {
477 if enforcement == RlsEnforcement::Off {
478 return Ok(());
479 }
480
481 match self.validate_rls_active().await {
482 Ok(()) => Ok(()),
483 Err(e) => match enforcement {
484 RlsEnforcement::Error => Err(e),
485 RlsEnforcement::Warn => {
486 tracing::warn!(
487 "RLS check failed (rls_enforcement = \"warn\"): {}. \
488 Cross-tenant cache leakage is possible.",
489 e
490 );
491 Ok(())
492 },
493 RlsEnforcement::Off => Ok(()), // unreachable but exhaustive
494 },
495 }
496 }
497}
498
499// Reason: DatabaseAdapter is defined with #[async_trait]; all implementations must match
500// its transformed method signatures to satisfy the trait contract
501// async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
502#[async_trait]
503impl<A: DatabaseAdapter> DatabaseAdapter for CachedDatabaseAdapter<A> {
504 async fn execute_with_projection(
505 &self,
506 view: &str,
507 projection: Option<&crate::schema::SqlProjectionHint>,
508 where_clause: Option<&WhereClause>,
509 limit: Option<u32>,
510 offset: Option<u32>,
511 _order_by: Option<&[OrderByClause]>,
512 ) -> Result<Vec<JsonbValue>> {
513 self.execute_with_projection_impl(view, projection, where_clause, limit, offset)
514 .await
515 }
516
517 async fn execute_where_query(
518 &self,
519 view: &str,
520 where_clause: Option<&WhereClause>,
521 limit: Option<u32>,
522 offset: Option<u32>,
523 _order_by: Option<&[OrderByClause]>,
524 ) -> Result<Vec<JsonbValue>> {
525 self.execute_where_query_impl(view, where_clause, limit, offset).await
526 }
527
528 fn database_type(&self) -> DatabaseType {
529 self.adapter.database_type()
530 }
531
532 async fn health_check(&self) -> Result<()> {
533 self.adapter.health_check().await
534 }
535
536 fn pool_metrics(&self) -> PoolMetrics {
537 self.adapter.pool_metrics()
538 }
539
540 async fn execute_raw_query(
541 &self,
542 sql: &str,
543 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
544 // Use the aggregation caching method which handles fact table versioning
545 self.execute_aggregation_query(sql).await
546 }
547
548 async fn execute_parameterized_aggregate(
549 &self,
550 sql: &str,
551 params: &[serde_json::Value],
552 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
553 // Parameterized aggregate results are not cacheable by SQL template alone;
554 // delegate directly to the underlying adapter to avoid caching with an
555 // incorrect key (the same SQL template with different params would return
556 // different results).
557 self.adapter.execute_parameterized_aggregate(sql, params).await
558 }
559
560 async fn execute_function_call(
561 &self,
562 function_name: &str,
563 args: &[serde_json::Value],
564 ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
565 // Mutations are never cached — always delegate to the underlying adapter
566 self.adapter.execute_function_call(function_name, args).await
567 }
568
569 async fn invalidate_views(&self, views: &[String]) -> Result<u64> {
570 // Delegate to the inherent (synchronous) method which handles cascade
571 // expansion and cache eviction.
572 CachedDatabaseAdapter::invalidate_views(self, views)
573 }
574
575 async fn invalidate_by_entity(&self, entity_type: &str, entity_id: &str) -> Result<u64> {
576 CachedDatabaseAdapter::invalidate_by_entity(self, entity_type, entity_id)
577 }
578
579 async fn bump_fact_table_versions(&self, tables: &[String]) -> Result<()> {
580 self.bump_fact_table_versions_impl(tables).await
581 }
582}
583
584impl<A: SupportsMutations + Send + Sync> SupportsMutations for CachedDatabaseAdapter<A> {}