Skip to main content

fraiseql_core/cache/adapter/
mod.rs

1//! Cached database adapter wrapper.
2//!
3//! Provides transparent caching for `DatabaseAdapter` implementations by wrapping
4//! `execute_where_query()` calls with cache lookup and storage.
5//!
6//! # Security: Cache Isolation via RLS
7//!
8//! Automatic Persisted Query (APQ) caching provides no user-level isolation on its own.
9//! Cache key isolation derives entirely from Row-Level Security: different users MUST
10//! produce different WHERE clauses via their RLS policies. If RLS is disabled or
11//! returns an empty WHERE clause, two users with the same query and variables will
12//! receive the same cached response.
13//!
14//! **Always verify RLS is active when caching is enabled in multi-tenant deployments.**
15//!
16//! # Architecture
17//!
18//! ```text
19//! ┌─────────────────────────┐
20//! │ CachedDatabaseAdapter   │
21//! │                         │
22//! │  execute_where_query()  │
23//! └───────────┬─────────────┘
24//!             │
25//!             ↓ generate_cache_key()
26//! ┌─────────────────────────┐
27//! │ Cache Hit?              │
28//! └───────────┬─────────────┘
29//!             │
30//!       ┌─────┴─────┐
31//!       │           │
32//!      HIT         MISS
33//!       │           │
34//!       ↓           ↓ DatabaseAdapter
35//! Return Cached   Execute Query
36//! Result          + Store in Cache
37//! ```
38//!
39//! # Example
40//!
41//! ```no_run
42//! use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
43//! use fraiseql_core::db::{postgres::PostgresAdapter, DatabaseAdapter};
44//!
45//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
46//! // Create underlying database adapter
47//! let db_adapter = PostgresAdapter::new("postgresql://localhost/db").await?;
48//!
49//! // Wrap with caching
50//! let cache = QueryResultCache::new(CacheConfig::default());
51//! let cached_adapter = CachedDatabaseAdapter::new(
52//!     db_adapter,
53//!     cache,
54//!     "1.0.0".to_string()  // schema version
55//! );
56//!
57//! // Use as normal DatabaseAdapter - caching is transparent
58//! let users = cached_adapter
59//!     .execute_where_query("v_user", None, Some(10), None, None)
60//!     .await?;
61//! # Ok(())
62//! # }
63//! ```
64
65use std::{
66    collections::HashMap,
67    sync::{Arc, Mutex},
68};
69
70use async_trait::async_trait;
71
72use super::{
73    cascade_invalidator::CascadeInvalidator,
74    fact_table_version::{FactTableCacheConfig, FactTableVersionProvider},
75    result::QueryResultCache,
76};
77use crate::{
78    cache::config::RlsEnforcement,
79    db::{
80        DatabaseAdapter, DatabaseType, PoolMetrics, SupportsMutations, WhereClause,
81        types::{JsonbValue, OrderByClause},
82    },
83    error::{FraiseQLError, Result},
84    schema::CompiledSchema,
85};
86
87mod mutation;
88mod query;
89#[cfg(test)]
90mod tests;
91
92pub use query::view_name_to_entity_type;
93
94/// Cached database adapter wrapper.
95///
96/// Wraps any `DatabaseAdapter` implementation with transparent query result caching.
97/// Cache keys include query, variables, WHERE clause, and schema version for security
98/// and correctness.
99///
100/// # Cache Behavior
101///
102/// - **Cache Hit**: Returns cached result in ~0.1ms (50-200x faster than database)
103/// - **Cache Miss**: Executes query via underlying adapter, stores result in cache
104/// - **Invalidation**: Call `invalidate_views()` after mutations to clear affected caches
105///
106/// # Thread Safety
107///
108/// This adapter is `Send + Sync` and can be safely shared across async tasks.
109/// The underlying cache uses `Arc<Mutex<>>` for thread-safe access.
110///
111/// # Example
112///
113/// ```no_run
114/// use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig, InvalidationContext};
115/// use fraiseql_core::db::{postgres::PostgresAdapter, DatabaseAdapter};
116///
117/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
118/// let db = PostgresAdapter::new("postgresql://localhost/db").await?;
119/// let cache = QueryResultCache::new(CacheConfig::default());
120/// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string());
121///
122/// // First query - cache miss (slower)
123/// let users1 = adapter.execute_where_query("v_user", None, None, None, None).await?;
124///
125/// // Second query - cache hit (fast!)
126/// let users2 = adapter.execute_where_query("v_user", None, None, None, None).await?;
127///
128/// // After mutation, invalidate
129/// let invalidation = InvalidationContext::for_mutation(
130///     "createUser",
131///     vec!["v_user".to_string()]
132/// );
133/// adapter.invalidate_views(&invalidation.modified_views)?;
134/// # Ok(())
135/// # }
136/// ```
137pub struct CachedDatabaseAdapter<A: DatabaseAdapter> {
138    /// Underlying database adapter.
139    pub(super) adapter: A,
140
141    /// Query result cache.
142    pub(super) cache: Arc<QueryResultCache>,
143
144    /// Schema version for cache key generation.
145    ///
146    /// When schema version changes (e.g., after deployment), all cache entries
147    /// with old version become invalid automatically.
148    pub(super) schema_version: String,
149
150    /// Per-view TTL overrides in seconds.
151    ///
152    /// Populated from `QueryDefinition::cache_ttl_seconds` at server startup:
153    /// view name → TTL seconds.  `None` for a view falls back to the global
154    /// `CacheConfig::ttl_seconds`.
155    pub(super) view_ttl_overrides: HashMap<String, u64>,
156
157    /// Configuration for fact table aggregation caching.
158    pub(super) fact_table_config: FactTableCacheConfig,
159
160    /// Version provider for fact tables (caches version lookups).
161    pub(super) version_provider: Arc<FactTableVersionProvider>,
162
163    /// Optional cascade invalidator for transitive view dependency expansion.
164    ///
165    /// When set, `invalidate_views()` uses BFS to expand the initial view list
166    /// to include all transitively dependent views before clearing cache entries.
167    pub(super) cascade_invalidator: Option<Arc<Mutex<CascadeInvalidator>>>,
168}
169
170impl<A: DatabaseAdapter> CachedDatabaseAdapter<A> {
171    /// Create new cached database adapter.
172    ///
173    /// # Arguments
174    ///
175    /// * `adapter` - Underlying database adapter to wrap
176    /// * `cache` - Query result cache instance
177    /// * `schema_version` - Uniquely identifies the compiled schema. Use `schema.content_hash()`
178    ///   (NOT `env!("CARGO_PKG_VERSION")`) so that any schema content change automatically
179    ///   invalidates cached entries across deploys.
180    ///
181    /// # Example
182    ///
183    /// ```rust,no_run
184    /// use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
185    /// use fraiseql_core::db::postgres::PostgresAdapter;
186    /// use fraiseql_core::schema::CompiledSchema;
187    ///
188    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
189    /// # let schema = CompiledSchema::default();
190    /// let db = PostgresAdapter::new("postgresql://localhost/db").await?;
191    /// let cache = QueryResultCache::new(CacheConfig::default());
192    /// let adapter = CachedDatabaseAdapter::new(
193    ///     db,
194    ///     cache,
195    ///     schema.content_hash()  // Use content hash for automatic invalidation
196    /// );
197    /// # Ok(())
198    /// # }
199    /// ```
200    #[must_use]
201    pub fn new(adapter: A, cache: QueryResultCache, schema_version: String) -> Self {
202        Self {
203            adapter,
204            cache: Arc::new(cache),
205            schema_version,
206            view_ttl_overrides: HashMap::new(),
207            fact_table_config: FactTableCacheConfig::default(),
208            version_provider: Arc::new(FactTableVersionProvider::default()),
209            cascade_invalidator: None,
210        }
211    }
212
213    /// Set per-view TTL overrides.
214    ///
215    /// Maps `sql_source` (view name) → TTL in seconds.  Built at server startup
216    /// from compiled `QueryDefinition::cache_ttl_seconds` entries.
217    ///
218    /// # Example
219    ///
220    /// ```rust,no_run
221    /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
222    /// # use fraiseql_core::db::postgres::PostgresAdapter;
223    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
224    /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
225    /// # let cache = QueryResultCache::new(CacheConfig::default());
226    /// let overrides = std::collections::HashMap::from([
227    ///     ("v_country".to_string(), 3600_u64),   // 1 h for reference data
228    ///     ("v_live_price".to_string(), 0_u64),   // never cache live data
229    /// ]);
230    /// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string())
231    ///     .with_view_ttl_overrides(overrides);
232    /// # Ok(())
233    /// # }
234    /// ```
235    #[must_use]
236    pub fn with_view_ttl_overrides(mut self, overrides: HashMap<String, u64>) -> Self {
237        self.view_ttl_overrides = overrides;
238        self
239    }
240
241    /// Set a cascade invalidator for transitive view dependency expansion.
242    ///
243    /// When set, `invalidate_views()` uses BFS to expand the initial view list
244    /// to include all views that transitively depend on the invalidated views.
245    ///
246    /// # Example
247    ///
248    /// ```rust,no_run
249    /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig, CascadeInvalidator};
250    /// # use fraiseql_core::db::postgres::PostgresAdapter;
251    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
252    /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
253    /// # let cache = QueryResultCache::new(CacheConfig::default());
254    /// let mut cascade = CascadeInvalidator::new();
255    /// cascade.add_dependency("v_user_stats", "v_user")?;
256    /// cascade.add_dependency("v_dashboard", "v_user_stats")?;
257    /// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string())
258    ///     .with_cascade_invalidator(cascade);
259    /// # Ok(())
260    /// # }
261    /// ```
262    #[must_use]
263    pub fn with_cascade_invalidator(mut self, invalidator: CascadeInvalidator) -> Self {
264        self.cascade_invalidator = Some(Arc::new(Mutex::new(invalidator)));
265        self
266    }
267
268    /// Populate per-view TTL overrides from a compiled schema.
269    ///
270    /// For each query that has `cache_ttl_seconds` set and a non-null `sql_source`,
271    /// this maps the view name → TTL so the cache adapter uses the per-query TTL
272    /// instead of the global default.
273    ///
274    /// # Example
275    ///
276    /// ```rust,no_run
277    /// # use fraiseql_core::cache::{CachedDatabaseAdapter, QueryResultCache, CacheConfig};
278    /// # use fraiseql_core::db::postgres::PostgresAdapter;
279    /// # use fraiseql_core::schema::CompiledSchema;
280    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
281    /// # let db = PostgresAdapter::new("postgresql://localhost/db").await?;
282    /// # let cache = QueryResultCache::new(CacheConfig::default());
283    /// # let schema = CompiledSchema::default();
284    /// let adapter = CachedDatabaseAdapter::new(db, cache, "1.0.0".to_string())
285    ///     .with_ttl_overrides_from_schema(&schema);
286    /// # Ok(())
287    /// # }
288    /// ```
289    #[must_use]
290    pub fn with_ttl_overrides_from_schema(mut self, schema: &CompiledSchema) -> Self {
291        for query in &schema.queries {
292            if let (Some(view), Some(ttl)) = (&query.sql_source, query.cache_ttl_seconds) {
293                self.view_ttl_overrides.insert(view.clone(), ttl);
294            }
295        }
296        self
297    }
298
299    /// Create new cached database adapter with fact table caching configuration.
300    ///
301    /// # Arguments
302    ///
303    /// * `adapter` - Underlying database adapter to wrap
304    /// * `cache` - Query result cache instance
305    /// * `schema_version` - Current schema version (e.g., git hash, semver)
306    /// * `fact_table_config` - Configuration for fact table aggregation caching
307    ///
308    /// # Example
309    ///
310    /// ```rust,no_run
311    /// use fraiseql_core::cache::{
312    ///     CachedDatabaseAdapter, QueryResultCache, CacheConfig,
313    ///     FactTableCacheConfig, FactTableVersionStrategy,
314    /// };
315    /// use fraiseql_core::db::postgres::PostgresAdapter;
316    ///
317    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
318    /// let db = PostgresAdapter::new("postgresql://localhost/db").await?;
319    /// let cache = QueryResultCache::new(CacheConfig::default());
320    ///
321    /// // Configure fact table caching strategies
322    /// let mut ft_config = FactTableCacheConfig::default();
323    /// ft_config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
324    /// ft_config.set_strategy("tf_events", FactTableVersionStrategy::time_based(300));
325    ///
326    /// let adapter = CachedDatabaseAdapter::with_fact_table_config(
327    ///     db,
328    ///     cache,
329    ///     "1.0.0".to_string(),
330    ///     ft_config,
331    /// );
332    /// # Ok(())
333    /// # }
334    /// ```
335    #[must_use]
336    pub fn with_fact_table_config(
337        adapter: A,
338        cache: QueryResultCache,
339        schema_version: String,
340        fact_table_config: FactTableCacheConfig,
341    ) -> Self {
342        Self {
343            adapter,
344            cache: Arc::new(cache),
345            schema_version,
346            view_ttl_overrides: HashMap::new(),
347            fact_table_config,
348            version_provider: Arc::new(FactTableVersionProvider::default()),
349            cascade_invalidator: None,
350        }
351    }
352
353    /// Get reference to underlying adapter.
354    ///
355    /// Useful for accessing adapter-specific methods not in the `DatabaseAdapter` trait.
356    ///
357    /// # Example
358    ///
359    /// ```rust,no_run
360    /// # use fraiseql_core::cache::CachedDatabaseAdapter;
361    /// # use fraiseql_core::db::postgres::PostgresAdapter;
362    /// # fn example(adapter: CachedDatabaseAdapter<PostgresAdapter>) {
363    /// // Access PostgreSQL-specific functionality
364    /// let pg_adapter = adapter.inner();
365    /// # }
366    /// ```
367    #[must_use]
368    pub const fn inner(&self) -> &A {
369        &self.adapter
370    }
371
372    /// Get reference to cache.
373    ///
374    /// Useful for metrics and monitoring.
375    ///
376    /// # Example
377    ///
378    /// ```rust,no_run
379    /// # use fraiseql_core::cache::CachedDatabaseAdapter;
380    /// # use fraiseql_core::db::postgres::PostgresAdapter;
381    /// # async fn example(adapter: CachedDatabaseAdapter<PostgresAdapter>) -> Result<(), Box<dyn std::error::Error>> {
382    /// let metrics = adapter.cache().metrics()?;
383    /// println!("Cache hit rate: {:.1}%", metrics.hit_rate() * 100.0);
384    /// # Ok(())
385    /// # }
386    /// ```
387    #[must_use]
388    pub fn cache(&self) -> &QueryResultCache {
389        &self.cache
390    }
391
392    /// Get schema version.
393    ///
394    /// # Example
395    ///
396    /// ```rust,no_run
397    /// # use fraiseql_core::cache::CachedDatabaseAdapter;
398    /// # use fraiseql_core::db::postgres::PostgresAdapter;
399    /// # fn example(adapter: CachedDatabaseAdapter<PostgresAdapter>) {
400    /// println!("Schema version: {}", adapter.schema_version());
401    /// # }
402    /// ```
403    #[must_use]
404    pub fn schema_version(&self) -> &str {
405        &self.schema_version
406    }
407
408    /// Get fact table cache configuration.
409    #[must_use]
410    pub const fn fact_table_config(&self) -> &FactTableCacheConfig {
411        &self.fact_table_config
412    }
413
414    /// Get the version provider for fact tables.
415    #[must_use]
416    pub fn version_provider(&self) -> &FactTableVersionProvider {
417        &self.version_provider
418    }
419
420    /// Verify that Row-Level Security is active on the database connection.
421    ///
422    /// Call this during server initialization when both caching and multi-tenancy
423    /// (`schema.is_multi_tenant()`) are enabled. Without RLS, users sharing the same
424    /// query parameters will receive the same cached response regardless of tenant.
425    ///
426    /// # What this checks
427    ///
428    /// Runs `SELECT current_setting('row_security', true) AS rls_setting`. The result
429    /// must be `'on'` or `'force'` for the check to pass. Non-PostgreSQL databases
430    /// (which return an error or unsupported) are treated as "RLS not active".
431    ///
432    /// # Errors
433    ///
434    /// Returns [`FraiseQLError::Configuration`] if RLS appears inactive.
435    pub async fn validate_rls_active(&self) -> Result<()> {
436        let result = self
437            .adapter
438            .execute_raw_query("SELECT current_setting('row_security', true) AS rls_setting")
439            .await;
440
441        let rls_active = match result {
442            Ok(rows) => rows
443                .first()
444                .and_then(|row| row.get("rls_setting"))
445                .and_then(serde_json::Value::as_str)
446                .is_some_and(|s| s == "on" || s == "force"),
447            Err(_) => false, // Non-PostgreSQL or query failure: RLS not active
448        };
449
450        if rls_active {
451            Ok(())
452        } else {
453            Err(FraiseQLError::Configuration {
454                message: "Caching is enabled in a multi-tenant schema but Row-Level Security \
455                          does not appear to be active on the database. This would allow \
456                          cross-tenant data leakage through the cache. \
457                          Either disable caching, enable RLS, or set \
458                          `rls_enforcement = \"off\"` in CacheConfig for single-tenant \
459                          deployments."
460                    .to_string(),
461            })
462        }
463    }
464
465    /// Apply the RLS enforcement policy from `CacheConfig`.
466    ///
467    /// Runs [`validate_rls_active`](Self::validate_rls_active) and handles the result
468    /// according to `enforcement`:
469    /// - [`RlsEnforcement::Error`]: propagates the error (default)
470    /// - [`RlsEnforcement::Warn`]: logs a warning and returns `Ok(())`
471    /// - [`RlsEnforcement::Off`]: skips the check entirely
472    ///
473    /// # Errors
474    ///
475    /// Returns the error from `validate_rls_active` when enforcement is `Error`.
476    pub async fn enforce_rls(&self, enforcement: RlsEnforcement) -> Result<()> {
477        if enforcement == RlsEnforcement::Off {
478            return Ok(());
479        }
480
481        match self.validate_rls_active().await {
482            Ok(()) => Ok(()),
483            Err(e) => match enforcement {
484                RlsEnforcement::Error => Err(e),
485                RlsEnforcement::Warn => {
486                    tracing::warn!(
487                        "RLS check failed (rls_enforcement = \"warn\"): {}. \
488                         Cross-tenant cache leakage is possible.",
489                        e
490                    );
491                    Ok(())
492                },
493                RlsEnforcement::Off => Ok(()), // unreachable but exhaustive
494            },
495        }
496    }
497}
498
499// Reason: DatabaseAdapter is defined with #[async_trait]; all implementations must match
500// its transformed method signatures to satisfy the trait contract
501// async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
502#[async_trait]
503impl<A: DatabaseAdapter> DatabaseAdapter for CachedDatabaseAdapter<A> {
504    async fn execute_with_projection(
505        &self,
506        view: &str,
507        projection: Option<&crate::schema::SqlProjectionHint>,
508        where_clause: Option<&WhereClause>,
509        limit: Option<u32>,
510        offset: Option<u32>,
511        _order_by: Option<&[OrderByClause]>,
512    ) -> Result<Vec<JsonbValue>> {
513        self.execute_with_projection_impl(view, projection, where_clause, limit, offset)
514            .await
515    }
516
517    async fn execute_where_query(
518        &self,
519        view: &str,
520        where_clause: Option<&WhereClause>,
521        limit: Option<u32>,
522        offset: Option<u32>,
523        _order_by: Option<&[OrderByClause]>,
524    ) -> Result<Vec<JsonbValue>> {
525        self.execute_where_query_impl(view, where_clause, limit, offset).await
526    }
527
528    fn database_type(&self) -> DatabaseType {
529        self.adapter.database_type()
530    }
531
532    async fn health_check(&self) -> Result<()> {
533        self.adapter.health_check().await
534    }
535
536    fn pool_metrics(&self) -> PoolMetrics {
537        self.adapter.pool_metrics()
538    }
539
540    async fn execute_raw_query(
541        &self,
542        sql: &str,
543    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
544        // Use the aggregation caching method which handles fact table versioning
545        self.execute_aggregation_query(sql).await
546    }
547
548    async fn execute_parameterized_aggregate(
549        &self,
550        sql: &str,
551        params: &[serde_json::Value],
552    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
553        // Parameterized aggregate results are not cacheable by SQL template alone;
554        // delegate directly to the underlying adapter to avoid caching with an
555        // incorrect key (the same SQL template with different params would return
556        // different results).
557        self.adapter.execute_parameterized_aggregate(sql, params).await
558    }
559
560    async fn execute_function_call(
561        &self,
562        function_name: &str,
563        args: &[serde_json::Value],
564    ) -> Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
565        // Mutations are never cached — always delegate to the underlying adapter
566        self.adapter.execute_function_call(function_name, args).await
567    }
568
569    async fn invalidate_views(&self, views: &[String]) -> Result<u64> {
570        // Delegate to the inherent (synchronous) method which handles cascade
571        // expansion and cache eviction.
572        CachedDatabaseAdapter::invalidate_views(self, views)
573    }
574
575    async fn invalidate_by_entity(&self, entity_type: &str, entity_id: &str) -> Result<u64> {
576        CachedDatabaseAdapter::invalidate_by_entity(self, entity_type, entity_id)
577    }
578
579    async fn bump_fact_table_versions(&self, tables: &[String]) -> Result<()> {
580        self.bump_fact_table_versions_impl(tables).await
581    }
582}
583
584impl<A: SupportsMutations + Send + Sync> SupportsMutations for CachedDatabaseAdapter<A> {}