Skip to main content

fraiseql_core/cache/
fact_table_version.rs

1//! Fact table versioning for aggregation query caching.
2//!
3//! This module provides multiple strategies for caching aggregation queries on fact tables.
4//! Users can choose the strategy that best fits their data pipeline and freshness requirements.
5//!
6//! # Strategies
7//!
8//! | Strategy | Best For | Trade-off |
9//! |----------|----------|-----------|
10//! | `Disabled` | Real-time accuracy | No caching benefit |
11//! | `VersionTable` | ETL/batch loads | Requires version bump discipline |
12//! | `TimeBased` | Dashboards with acceptable lag | May serve stale data |
13//! | `SchemaVersion` | Append-only/immutable facts | Only invalidates on deploy |
14//!
15//! # Version Table Schema
16//!
17//! When using `VersionTable` strategy, create the following table:
18//!
19//! ```sql
20//! CREATE TABLE IF NOT EXISTS tf_versions (
21//!     table_name TEXT PRIMARY KEY,
22//!     version BIGINT NOT NULL DEFAULT 1,
23//!     updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
24//! );
25//!
26//! -- Helper function to bump version
27//! CREATE OR REPLACE FUNCTION bump_tf_version(p_table_name TEXT)
28//! RETURNS BIGINT AS $$
29//! DECLARE
30//!     new_version BIGINT;
31//! BEGIN
32//!     INSERT INTO tf_versions (table_name, version, updated_at)
33//!     VALUES (p_table_name, 1, NOW())
34//!     ON CONFLICT (table_name) DO UPDATE
35//!     SET version = tf_versions.version + 1,
36//!         updated_at = NOW()
37//!     RETURNING version INTO new_version;
38//!     RETURN new_version;
39//! END;
40//! $$ LANGUAGE plpgsql;
41//!
42//! -- Optional: Auto-bump trigger (adds write overhead)
43//! CREATE OR REPLACE FUNCTION tf_auto_version_bump()
44//! RETURNS TRIGGER AS $$
45//! BEGIN
46//!     PERFORM bump_tf_version(TG_TABLE_NAME);
47//!     RETURN NULL;
48//! END;
49//! $$ LANGUAGE plpgsql;
50//!
51//! -- Apply to specific fact tables:
52//! CREATE TRIGGER tf_sales_version_bump
53//! AFTER INSERT OR UPDATE OR DELETE ON tf_sales
54//! FOR EACH STATEMENT EXECUTE FUNCTION tf_auto_version_bump();
55//! ```
56//!
57//! # Example Configuration
58//!
59//! ```rust
60//! use fraiseql_core::cache::fact_table_version::{
61//!     FactTableVersionStrategy, FactTableCacheConfig,
62//! };
63//! use std::collections::HashMap;
64//!
65//! let mut config = FactTableCacheConfig::default();
66//!
67//! // ETL-loaded sales data - use version table
68//! config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
69//!
70//! // Real-time page views - cache for 5 minutes
71//! config.set_strategy("tf_page_views", FactTableVersionStrategy::TimeBased {
72//!     ttl_seconds: 300,
73//! });
74//!
75//! // Historical exchange rates - never changes
76//! config.set_strategy("tf_historical_rates", FactTableVersionStrategy::SchemaVersion);
77//! ```
78
79use std::{
80    collections::HashMap,
81    time::{Duration, Instant},
82};
83
84use serde::{Deserialize, Serialize};
85
86/// Versioning strategy for fact table aggregation caching.
87///
88/// Different strategies offer different trade-offs between cache hit rate,
89/// data freshness, and operational complexity.
90#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
91#[serde(tag = "type", rename_all = "snake_case")]
92#[non_exhaustive]
93pub enum FactTableVersionStrategy {
94    /// No caching for aggregations (always query database).
95    ///
96    /// Use when: Real-time accuracy is required.
97    /// Trade-off: No caching benefit, every query hits database.
98    Disabled,
99
100    /// Read version from `tf_versions` table.
101    ///
102    /// Cache key includes the version number, so when version bumps,
103    /// old cache entries are automatically ignored.
104    ///
105    /// Use when: Data is loaded via ETL/batch processes that can bump versions.
106    /// Trade-off: Requires discipline to bump version after data changes.
107    ///
108    /// # Version Bumping
109    ///
110    /// After loading data, call:
111    /// ```sql
112    /// SELECT bump_tf_version('tf_sales');
113    /// ```
114    ///
115    /// Or use triggers for automatic bumping (adds write overhead).
116    VersionTable,
117
118    /// Time-based TTL caching.
119    ///
120    /// Cache entries expire after the specified duration regardless of
121    /// whether the underlying data has changed.
122    ///
123    /// Use when: Some staleness is acceptable (e.g., dashboards).
124    /// Trade-off: May serve stale data within TTL window.
125    TimeBased {
126        /// Cache TTL in seconds.
127        ttl_seconds: u64,
128    },
129
130    /// Use schema version only (invalidate on deployment).
131    ///
132    /// Cache is only invalidated when the schema version changes,
133    /// which typically happens during deployments.
134    ///
135    /// Use when: Fact table data is immutable or append-only and
136    /// queries always filter to recent data.
137    /// Trade-off: Stale data until next deployment.
138    SchemaVersion,
139}
140
141impl Default for FactTableVersionStrategy {
142    /// Default strategy is `Disabled` for safety (no stale data).
143    fn default() -> Self {
144        Self::Disabled
145    }
146}
147
148impl FactTableVersionStrategy {
149    /// Create a time-based strategy with the given TTL.
150    #[must_use]
151    pub const fn time_based(ttl_seconds: u64) -> Self {
152        Self::TimeBased { ttl_seconds }
153    }
154
155    /// Check if caching is enabled for this strategy.
156    #[must_use]
157    pub const fn is_caching_enabled(&self) -> bool {
158        !matches!(self, Self::Disabled)
159    }
160
161    /// Get TTL for time-based strategy, if applicable.
162    #[must_use]
163    pub const fn ttl_seconds(&self) -> Option<u64> {
164        match self {
165            Self::TimeBased { ttl_seconds } => Some(*ttl_seconds),
166            _ => None,
167        }
168    }
169}
170
171/// Configuration for fact table aggregation caching.
172///
173/// Maps fact table names to their versioning strategies.
174#[derive(Debug, Clone, Default, Serialize, Deserialize)]
175pub struct FactTableCacheConfig {
176    /// Default strategy for tables not explicitly configured.
177    #[serde(default)]
178    pub default_strategy: FactTableVersionStrategy,
179
180    /// Per-table strategy overrides.
181    #[serde(default)]
182    pub table_strategies: HashMap<String, FactTableVersionStrategy>,
183}
184
185impl FactTableCacheConfig {
186    /// Create a new config with the given default strategy.
187    #[must_use]
188    pub fn with_default(strategy: FactTableVersionStrategy) -> Self {
189        Self {
190            default_strategy: strategy,
191            table_strategies: HashMap::new(),
192        }
193    }
194
195    /// Set strategy for a specific table.
196    pub fn set_strategy(&mut self, table_name: &str, strategy: FactTableVersionStrategy) {
197        self.table_strategies.insert(table_name.to_string(), strategy);
198    }
199
200    /// Get strategy for a table (falls back to default).
201    #[must_use]
202    pub fn get_strategy(&self, table_name: &str) -> &FactTableVersionStrategy {
203        self.table_strategies.get(table_name).unwrap_or(&self.default_strategy)
204    }
205
206    /// Check if caching is enabled for a table.
207    #[must_use]
208    pub fn is_caching_enabled(&self, table_name: &str) -> bool {
209        self.get_strategy(table_name).is_caching_enabled()
210    }
211}
212
213/// Cached version information for a fact table.
214#[derive(Debug, Clone)]
215pub struct CachedVersion {
216    /// The version number.
217    pub version:    i64,
218    /// When the version was fetched.
219    pub fetched_at: Instant,
220}
221
222impl CachedVersion {
223    /// Create a new cached version.
224    #[must_use]
225    pub fn new(version: i64) -> Self {
226        Self {
227            version,
228            fetched_at: Instant::now(),
229        }
230    }
231
232    /// Check if the cached version is still fresh.
233    ///
234    /// Versions are cached for a short time (default 1 second) to avoid
235    /// hammering the `tf_versions` table on every query.
236    #[must_use]
237    pub fn is_fresh(&self, max_age: Duration) -> bool {
238        self.fetched_at.elapsed() < max_age
239    }
240}
241
242/// Version provider for fact tables.
243///
244/// Fetches and caches version numbers from the `tf_versions` table.
245#[derive(Debug)]
246pub struct FactTableVersionProvider {
247    /// Cached versions (`table_name` -> version).
248    versions:          std::sync::RwLock<HashMap<String, CachedVersion>>,
249    /// How long to cache version lookups.
250    version_cache_ttl: Duration,
251}
252
253impl Default for FactTableVersionProvider {
254    fn default() -> Self {
255        Self::new(Duration::from_secs(1))
256    }
257}
258
259impl FactTableVersionProvider {
260    /// Create a new version provider.
261    ///
262    /// # Arguments
263    ///
264    /// * `version_cache_ttl` - How long to cache version lookups (default 1 second)
265    #[must_use]
266    pub fn new(version_cache_ttl: Duration) -> Self {
267        Self {
268            versions: std::sync::RwLock::new(HashMap::new()),
269            version_cache_ttl,
270        }
271    }
272
273    /// Get cached version if still fresh, otherwise return None.
274    #[must_use]
275    pub fn get_cached_version(&self, table_name: &str) -> Option<i64> {
276        let versions = self.versions.read().ok()?;
277        let cached = versions.get(table_name)?;
278        if cached.is_fresh(self.version_cache_ttl) {
279            Some(cached.version)
280        } else {
281            None
282        }
283    }
284
285    /// Update cached version for a table.
286    pub fn set_cached_version(&self, table_name: &str, version: i64) {
287        if let Ok(mut versions) = self.versions.write() {
288            versions.insert(table_name.to_string(), CachedVersion::new(version));
289        }
290    }
291
292    /// Clear cached version for a table.
293    pub fn clear_cached_version(&self, table_name: &str) {
294        if let Ok(mut versions) = self.versions.write() {
295            versions.remove(table_name);
296        }
297    }
298
299    /// Clear all cached versions.
300    pub fn clear_all(&self) {
301        if let Ok(mut versions) = self.versions.write() {
302            versions.clear();
303        }
304    }
305}
306
307/// Generate cache key component for a fact table based on its versioning strategy.
308///
309/// This function returns the version component to include in cache keys.
310///
311/// # Returns
312///
313/// - `Some(version_string)` - Version to include in cache key
314/// - `None` - Caching disabled, should not cache
315#[must_use]
316pub fn generate_version_key_component(
317    _table_name: &str,
318    strategy: &FactTableVersionStrategy,
319    table_version: Option<i64>,
320    schema_version: &str,
321) -> Option<String> {
322    match strategy {
323        FactTableVersionStrategy::Disabled => None,
324
325        FactTableVersionStrategy::VersionTable => {
326            // Require a version from tf_versions table
327            table_version.map(|v| format!("tv:{v}"))
328        },
329
330        FactTableVersionStrategy::TimeBased { ttl_seconds } => {
331            // Use time bucket as version (floor to TTL boundary)
332            let now = std::time::SystemTime::now()
333                .duration_since(std::time::UNIX_EPOCH)
334                .unwrap_or_default()
335                .as_secs();
336            let bucket = now / ttl_seconds;
337            Some(format!("tb:{bucket}"))
338        },
339
340        FactTableVersionStrategy::SchemaVersion => {
341            // Use schema version only
342            Some(format!("sv:{schema_version}"))
343        },
344    }
345}
346
347/// SQL to query version from `tf_versions` table.
348pub const VERSION_TABLE_QUERY: &str = r"
349    SELECT version FROM tf_versions WHERE table_name = $1
350";
351
352/// SQL to create the `tf_versions` table and helper functions.
353pub const VERSION_TABLE_SCHEMA: &str = r"
354-- Fact table version tracking for aggregation cache
355CREATE TABLE IF NOT EXISTS tf_versions (
356    table_name TEXT PRIMARY KEY,
357    version BIGINT NOT NULL DEFAULT 1,
358    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
359);
360
361-- Index for fast lookups
362CREATE INDEX IF NOT EXISTS idx_tf_versions_updated_at ON tf_versions (updated_at);
363
364-- Helper function to bump version (call after data loads)
365CREATE OR REPLACE FUNCTION bump_tf_version(p_table_name TEXT)
366RETURNS BIGINT AS $$
367DECLARE
368    new_version BIGINT;
369BEGIN
370    INSERT INTO tf_versions (table_name, version, updated_at)
371    VALUES (p_table_name, 1, NOW())
372    ON CONFLICT (table_name) DO UPDATE
373    SET version = tf_versions.version + 1,
374        updated_at = NOW()
375    RETURNING version INTO new_version;
376    RETURN new_version;
377END;
378$$ LANGUAGE plpgsql;
379
380-- Optional: Trigger function for automatic version bumping
381-- Note: This adds overhead to every INSERT/UPDATE/DELETE
382CREATE OR REPLACE FUNCTION tf_auto_version_bump()
383RETURNS TRIGGER AS $$
384BEGIN
385    PERFORM bump_tf_version(TG_TABLE_NAME);
386    RETURN NULL;
387END;
388$$ LANGUAGE plpgsql;
389
390-- Example: Apply auto-bump trigger to a fact table
391-- CREATE TRIGGER tf_sales_version_bump
392-- AFTER INSERT OR UPDATE OR DELETE ON tf_sales
393-- FOR EACH STATEMENT EXECUTE FUNCTION tf_auto_version_bump();
394";
395
396#[cfg(test)]
397mod tests {
398    #![allow(clippy::unwrap_used)] // Reason: test code, panics are acceptable
399
400    use super::*;
401
402    #[test]
403    fn test_strategy_default_is_disabled() {
404        let strategy = FactTableVersionStrategy::default();
405        assert_eq!(strategy, FactTableVersionStrategy::Disabled);
406        assert!(!strategy.is_caching_enabled());
407    }
408
409    #[test]
410    fn test_strategy_time_based() {
411        let strategy = FactTableVersionStrategy::time_based(300);
412        assert!(strategy.is_caching_enabled());
413        assert_eq!(strategy.ttl_seconds(), Some(300));
414    }
415
416    #[test]
417    fn test_strategy_version_table() {
418        let strategy = FactTableVersionStrategy::VersionTable;
419        assert!(strategy.is_caching_enabled());
420        assert_eq!(strategy.ttl_seconds(), None);
421    }
422
423    #[test]
424    fn test_strategy_schema_version() {
425        let strategy = FactTableVersionStrategy::SchemaVersion;
426        assert!(strategy.is_caching_enabled());
427        assert_eq!(strategy.ttl_seconds(), None);
428    }
429
430    #[test]
431    fn test_config_default_strategy() {
432        let config = FactTableCacheConfig::default();
433        assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::Disabled);
434    }
435
436    #[test]
437    fn test_config_per_table_strategy() {
438        let mut config = FactTableCacheConfig::default();
439        config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
440        config.set_strategy(
441            "tf_page_views",
442            FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
443        );
444
445        assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
446        assert_eq!(
447            config.get_strategy("tf_page_views"),
448            &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 }
449        );
450        // Unconfigured table uses default
451        assert_eq!(config.get_strategy("tf_other"), &FactTableVersionStrategy::Disabled);
452    }
453
454    #[test]
455    fn test_config_with_default() {
456        let config = FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
457        assert_eq!(config.get_strategy("tf_any"), &FactTableVersionStrategy::SchemaVersion);
458    }
459
460    #[test]
461    fn test_generate_version_key_disabled() {
462        let key = generate_version_key_component(
463            "tf_sales",
464            &FactTableVersionStrategy::Disabled,
465            Some(42),
466            "1.0.0",
467        );
468        assert!(key.is_none());
469    }
470
471    #[test]
472    fn test_generate_version_key_version_table() {
473        let key = generate_version_key_component(
474            "tf_sales",
475            &FactTableVersionStrategy::VersionTable,
476            Some(42),
477            "1.0.0",
478        );
479        assert_eq!(key, Some("tv:42".to_string()));
480
481        // No version available - should return None
482        let key = generate_version_key_component(
483            "tf_sales",
484            &FactTableVersionStrategy::VersionTable,
485            None,
486            "1.0.0",
487        );
488        assert!(key.is_none());
489    }
490
491    #[test]
492    fn test_generate_version_key_time_based() {
493        let key = generate_version_key_component(
494            "tf_sales",
495            &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
496            None,
497            "1.0.0",
498        );
499        assert!(key.is_some());
500        assert!(key.unwrap().starts_with("tb:"));
501    }
502
503    #[test]
504    fn test_generate_version_key_schema_version() {
505        let key = generate_version_key_component(
506            "tf_sales",
507            &FactTableVersionStrategy::SchemaVersion,
508            None,
509            "1.0.0",
510        );
511        assert_eq!(key, Some("sv:1.0.0".to_string()));
512    }
513
514    #[test]
515    fn test_version_provider_caching() {
516        let provider = FactTableVersionProvider::new(Duration::from_secs(10));
517
518        // Initially no cached version
519        assert!(provider.get_cached_version("tf_sales").is_none());
520
521        // Set version
522        provider.set_cached_version("tf_sales", 42);
523        assert_eq!(provider.get_cached_version("tf_sales"), Some(42));
524
525        // Clear version
526        provider.clear_cached_version("tf_sales");
527        assert!(provider.get_cached_version("tf_sales").is_none());
528    }
529
530    #[test]
531    fn test_version_provider_clear_all() {
532        let provider = FactTableVersionProvider::new(Duration::from_secs(10));
533
534        provider.set_cached_version("tf_sales", 1);
535        provider.set_cached_version("tf_orders", 2);
536
537        provider.clear_all();
538
539        assert!(provider.get_cached_version("tf_sales").is_none());
540        assert!(provider.get_cached_version("tf_orders").is_none());
541    }
542
543    #[test]
544    fn test_cached_version_freshness() {
545        let cached = CachedVersion::new(42);
546
547        // Should be fresh immediately
548        assert!(cached.is_fresh(Duration::from_secs(1)));
549
550        // Should be fresh for a longer duration
551        assert!(cached.is_fresh(Duration::from_secs(60)));
552    }
553
554    #[test]
555    fn test_strategy_serialization() {
556        let strategies = vec![
557            FactTableVersionStrategy::Disabled,
558            FactTableVersionStrategy::VersionTable,
559            FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
560            FactTableVersionStrategy::SchemaVersion,
561        ];
562
563        for strategy in strategies {
564            let json = serde_json::to_string(&strategy).unwrap();
565            let deserialized: FactTableVersionStrategy = serde_json::from_str(&json).unwrap();
566            assert_eq!(strategy, deserialized);
567        }
568    }
569
570    #[test]
571    fn test_config_serialization() {
572        let mut config =
573            FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
574        config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
575        config.set_strategy("tf_events", FactTableVersionStrategy::TimeBased { ttl_seconds: 60 });
576
577        let json = serde_json::to_string_pretty(&config).unwrap();
578        let deserialized: FactTableCacheConfig = serde_json::from_str(&json).unwrap();
579
580        assert_eq!(deserialized.default_strategy, FactTableVersionStrategy::SchemaVersion);
581        assert_eq!(deserialized.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
582    }
583}