Skip to main content

fraiseql_core/cache/
fact_table_version.rs

1//! Fact table versioning for aggregation query caching.
2//!
3//! This module provides multiple strategies for caching aggregation queries on fact tables.
4//! Users can choose the strategy that best fits their data pipeline and freshness requirements.
5//!
6//! # Strategies
7//!
8//! | Strategy | Best For | Trade-off |
9//! |----------|----------|-----------|
10//! | `Disabled` | Real-time accuracy | No caching benefit |
11//! | `VersionTable` | ETL/batch loads | Requires version bump discipline |
12//! | `TimeBased` | Dashboards with acceptable lag | May serve stale data |
13//! | `SchemaVersion` | Append-only/immutable facts | Only invalidates on deploy |
14//!
15//! # Version Table Schema
16//!
17//! When using `VersionTable` strategy, create the following table:
18//!
19//! ```sql
20//! CREATE TABLE IF NOT EXISTS tf_versions (
21//!     table_name TEXT PRIMARY KEY,
22//!     version BIGINT NOT NULL DEFAULT 1,
23//!     updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
24//! );
25//!
26//! -- Helper function to bump version
27//! CREATE OR REPLACE FUNCTION bump_tf_version(p_table_name TEXT)
28//! RETURNS BIGINT AS $$
29//! DECLARE
30//!     new_version BIGINT;
31//! BEGIN
32//!     INSERT INTO tf_versions (table_name, version, updated_at)
33//!     VALUES (p_table_name, 1, NOW())
34//!     ON CONFLICT (table_name) DO UPDATE
35//!     SET version = tf_versions.version + 1,
36//!         updated_at = NOW()
37//!     RETURNING version INTO new_version;
38//!     RETURN new_version;
39//! END;
40//! $$ LANGUAGE plpgsql;
41//!
42//! -- Optional: Auto-bump trigger (adds write overhead)
43//! CREATE OR REPLACE FUNCTION tf_auto_version_bump()
44//! RETURNS TRIGGER AS $$
45//! BEGIN
46//!     PERFORM bump_tf_version(TG_TABLE_NAME);
47//!     RETURN NULL;
48//! END;
49//! $$ LANGUAGE plpgsql;
50//!
51//! -- Apply to specific fact tables:
52//! CREATE TRIGGER tf_sales_version_bump
53//! AFTER INSERT OR UPDATE OR DELETE ON tf_sales
54//! FOR EACH STATEMENT EXECUTE FUNCTION tf_auto_version_bump();
55//! ```
56//!
57//! # Example Configuration
58//!
59//! ```rust
60//! use fraiseql_core::cache::fact_table_version::{
61//!     FactTableVersionStrategy, FactTableCacheConfig,
62//! };
63//! use std::collections::HashMap;
64//!
65//! let mut config = FactTableCacheConfig::default();
66//!
67//! // ETL-loaded sales data - use version table
68//! config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
69//!
70//! // Real-time page views - cache for 5 minutes
71//! config.set_strategy("tf_page_views", FactTableVersionStrategy::TimeBased {
72//!     ttl_seconds: 300,
73//! });
74//!
75//! // Historical exchange rates - never changes
76//! config.set_strategy("tf_historical_rates", FactTableVersionStrategy::SchemaVersion);
77//! ```
78
79use std::{
80    collections::HashMap,
81    time::{Duration, Instant},
82};
83
84use serde::{Deserialize, Serialize};
85
86/// Versioning strategy for fact table aggregation caching.
87///
88/// Different strategies offer different trade-offs between cache hit rate,
89/// data freshness, and operational complexity.
90#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
91#[serde(tag = "type", rename_all = "snake_case")]
92pub enum FactTableVersionStrategy {
93    /// No caching for aggregations (always query database).
94    ///
95    /// Use when: Real-time accuracy is required.
96    /// Trade-off: No caching benefit, every query hits database.
97    Disabled,
98
99    /// Read version from `tf_versions` table.
100    ///
101    /// Cache key includes the version number, so when version bumps,
102    /// old cache entries are automatically ignored.
103    ///
104    /// Use when: Data is loaded via ETL/batch processes that can bump versions.
105    /// Trade-off: Requires discipline to bump version after data changes.
106    ///
107    /// # Version Bumping
108    ///
109    /// After loading data, call:
110    /// ```sql
111    /// SELECT bump_tf_version('tf_sales');
112    /// ```
113    ///
114    /// Or use triggers for automatic bumping (adds write overhead).
115    VersionTable,
116
117    /// Time-based TTL caching.
118    ///
119    /// Cache entries expire after the specified duration regardless of
120    /// whether the underlying data has changed.
121    ///
122    /// Use when: Some staleness is acceptable (e.g., dashboards).
123    /// Trade-off: May serve stale data within TTL window.
124    TimeBased {
125        /// Cache TTL in seconds.
126        ttl_seconds: u64,
127    },
128
129    /// Use schema version only (invalidate on deployment).
130    ///
131    /// Cache is only invalidated when the schema version changes,
132    /// which typically happens during deployments.
133    ///
134    /// Use when: Fact table data is immutable or append-only and
135    /// queries always filter to recent data.
136    /// Trade-off: Stale data until next deployment.
137    SchemaVersion,
138}
139
140impl Default for FactTableVersionStrategy {
141    /// Default strategy is `Disabled` for safety (no stale data).
142    fn default() -> Self {
143        Self::Disabled
144    }
145}
146
147impl FactTableVersionStrategy {
148    /// Create a time-based strategy with the given TTL.
149    #[must_use]
150    pub const fn time_based(ttl_seconds: u64) -> Self {
151        Self::TimeBased { ttl_seconds }
152    }
153
154    /// Check if caching is enabled for this strategy.
155    #[must_use]
156    pub const fn is_caching_enabled(&self) -> bool {
157        !matches!(self, Self::Disabled)
158    }
159
160    /// Get TTL for time-based strategy, if applicable.
161    #[must_use]
162    pub const fn ttl_seconds(&self) -> Option<u64> {
163        match self {
164            Self::TimeBased { ttl_seconds } => Some(*ttl_seconds),
165            _ => None,
166        }
167    }
168}
169
170/// Configuration for fact table aggregation caching.
171///
172/// Maps fact table names to their versioning strategies.
173#[derive(Debug, Clone, Default, Serialize, Deserialize)]
174pub struct FactTableCacheConfig {
175    /// Default strategy for tables not explicitly configured.
176    #[serde(default)]
177    pub default_strategy: FactTableVersionStrategy,
178
179    /// Per-table strategy overrides.
180    #[serde(default)]
181    pub table_strategies: HashMap<String, FactTableVersionStrategy>,
182}
183
184impl FactTableCacheConfig {
185    /// Create a new config with the given default strategy.
186    #[must_use]
187    pub fn with_default(strategy: FactTableVersionStrategy) -> Self {
188        Self {
189            default_strategy: strategy,
190            table_strategies: HashMap::new(),
191        }
192    }
193
194    /// Set strategy for a specific table.
195    pub fn set_strategy(&mut self, table_name: &str, strategy: FactTableVersionStrategy) {
196        self.table_strategies.insert(table_name.to_string(), strategy);
197    }
198
199    /// Get strategy for a table (falls back to default).
200    #[must_use]
201    pub fn get_strategy(&self, table_name: &str) -> &FactTableVersionStrategy {
202        self.table_strategies.get(table_name).unwrap_or(&self.default_strategy)
203    }
204
205    /// Check if caching is enabled for a table.
206    #[must_use]
207    pub fn is_caching_enabled(&self, table_name: &str) -> bool {
208        self.get_strategy(table_name).is_caching_enabled()
209    }
210}
211
212/// Cached version information for a fact table.
213#[derive(Debug, Clone)]
214pub struct CachedVersion {
215    /// The version number.
216    pub version:    i64,
217    /// When the version was fetched.
218    pub fetched_at: Instant,
219}
220
221impl CachedVersion {
222    /// Create a new cached version.
223    #[must_use]
224    pub fn new(version: i64) -> Self {
225        Self {
226            version,
227            fetched_at: Instant::now(),
228        }
229    }
230
231    /// Check if the cached version is still fresh.
232    ///
233    /// Versions are cached for a short time (default 1 second) to avoid
234    /// hammering the tf_versions table on every query.
235    #[must_use]
236    pub fn is_fresh(&self, max_age: Duration) -> bool {
237        self.fetched_at.elapsed() < max_age
238    }
239}
240
241/// Version provider for fact tables.
242///
243/// Fetches and caches version numbers from the `tf_versions` table.
244#[derive(Debug)]
245pub struct FactTableVersionProvider {
246    /// Cached versions (table_name -> version).
247    versions:          std::sync::RwLock<HashMap<String, CachedVersion>>,
248    /// How long to cache version lookups.
249    version_cache_ttl: Duration,
250}
251
252impl Default for FactTableVersionProvider {
253    fn default() -> Self {
254        Self::new(Duration::from_secs(1))
255    }
256}
257
258impl FactTableVersionProvider {
259    /// Create a new version provider.
260    ///
261    /// # Arguments
262    ///
263    /// * `version_cache_ttl` - How long to cache version lookups (default 1 second)
264    #[must_use]
265    pub fn new(version_cache_ttl: Duration) -> Self {
266        Self {
267            versions: std::sync::RwLock::new(HashMap::new()),
268            version_cache_ttl,
269        }
270    }
271
272    /// Get cached version if still fresh, otherwise return None.
273    #[must_use]
274    pub fn get_cached_version(&self, table_name: &str) -> Option<i64> {
275        let versions = self.versions.read().ok()?;
276        let cached = versions.get(table_name)?;
277        if cached.is_fresh(self.version_cache_ttl) {
278            Some(cached.version)
279        } else {
280            None
281        }
282    }
283
284    /// Update cached version for a table.
285    pub fn set_cached_version(&self, table_name: &str, version: i64) {
286        if let Ok(mut versions) = self.versions.write() {
287            versions.insert(table_name.to_string(), CachedVersion::new(version));
288        }
289    }
290
291    /// Clear cached version for a table.
292    pub fn clear_cached_version(&self, table_name: &str) {
293        if let Ok(mut versions) = self.versions.write() {
294            versions.remove(table_name);
295        }
296    }
297
298    /// Clear all cached versions.
299    pub fn clear_all(&self) {
300        if let Ok(mut versions) = self.versions.write() {
301            versions.clear();
302        }
303    }
304}
305
306/// Generate cache key component for a fact table based on its versioning strategy.
307///
308/// This function returns the version component to include in cache keys.
309///
310/// # Returns
311///
312/// - `Some(version_string)` - Version to include in cache key
313/// - `None` - Caching disabled, should not cache
314#[must_use]
315pub fn generate_version_key_component(
316    _table_name: &str,
317    strategy: &FactTableVersionStrategy,
318    table_version: Option<i64>,
319    schema_version: &str,
320) -> Option<String> {
321    match strategy {
322        FactTableVersionStrategy::Disabled => None,
323
324        FactTableVersionStrategy::VersionTable => {
325            // Require a version from tf_versions table
326            table_version.map(|v| format!("tv:{v}"))
327        },
328
329        FactTableVersionStrategy::TimeBased { ttl_seconds } => {
330            // Use time bucket as version (floor to TTL boundary)
331            let now = std::time::SystemTime::now()
332                .duration_since(std::time::UNIX_EPOCH)
333                .unwrap_or_default()
334                .as_secs();
335            let bucket = now / ttl_seconds;
336            Some(format!("tb:{bucket}"))
337        },
338
339        FactTableVersionStrategy::SchemaVersion => {
340            // Use schema version only
341            Some(format!("sv:{schema_version}"))
342        },
343    }
344}
345
346/// SQL to query version from tf_versions table.
347pub const VERSION_TABLE_QUERY: &str = r"
348    SELECT version FROM tf_versions WHERE table_name = $1
349";
350
351/// SQL to create the tf_versions table and helper functions.
352pub const VERSION_TABLE_SCHEMA: &str = r"
353-- Fact table version tracking for aggregation cache
354CREATE TABLE IF NOT EXISTS tf_versions (
355    table_name TEXT PRIMARY KEY,
356    version BIGINT NOT NULL DEFAULT 1,
357    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
358);
359
360-- Index for fast lookups
361CREATE INDEX IF NOT EXISTS idx_tf_versions_updated_at ON tf_versions (updated_at);
362
363-- Helper function to bump version (call after data loads)
364CREATE OR REPLACE FUNCTION bump_tf_version(p_table_name TEXT)
365RETURNS BIGINT AS $$
366DECLARE
367    new_version BIGINT;
368BEGIN
369    INSERT INTO tf_versions (table_name, version, updated_at)
370    VALUES (p_table_name, 1, NOW())
371    ON CONFLICT (table_name) DO UPDATE
372    SET version = tf_versions.version + 1,
373        updated_at = NOW()
374    RETURNING version INTO new_version;
375    RETURN new_version;
376END;
377$$ LANGUAGE plpgsql;
378
379-- Optional: Trigger function for automatic version bumping
380-- Note: This adds overhead to every INSERT/UPDATE/DELETE
381CREATE OR REPLACE FUNCTION tf_auto_version_bump()
382RETURNS TRIGGER AS $$
383BEGIN
384    PERFORM bump_tf_version(TG_TABLE_NAME);
385    RETURN NULL;
386END;
387$$ LANGUAGE plpgsql;
388
389-- Example: Apply auto-bump trigger to a fact table
390-- CREATE TRIGGER tf_sales_version_bump
391-- AFTER INSERT OR UPDATE OR DELETE ON tf_sales
392-- FOR EACH STATEMENT EXECUTE FUNCTION tf_auto_version_bump();
393";
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn test_strategy_default_is_disabled() {
401        let strategy = FactTableVersionStrategy::default();
402        assert_eq!(strategy, FactTableVersionStrategy::Disabled);
403        assert!(!strategy.is_caching_enabled());
404    }
405
406    #[test]
407    fn test_strategy_time_based() {
408        let strategy = FactTableVersionStrategy::time_based(300);
409        assert!(strategy.is_caching_enabled());
410        assert_eq!(strategy.ttl_seconds(), Some(300));
411    }
412
413    #[test]
414    fn test_strategy_version_table() {
415        let strategy = FactTableVersionStrategy::VersionTable;
416        assert!(strategy.is_caching_enabled());
417        assert_eq!(strategy.ttl_seconds(), None);
418    }
419
420    #[test]
421    fn test_strategy_schema_version() {
422        let strategy = FactTableVersionStrategy::SchemaVersion;
423        assert!(strategy.is_caching_enabled());
424        assert_eq!(strategy.ttl_seconds(), None);
425    }
426
427    #[test]
428    fn test_config_default_strategy() {
429        let config = FactTableCacheConfig::default();
430        assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::Disabled);
431    }
432
433    #[test]
434    fn test_config_per_table_strategy() {
435        let mut config = FactTableCacheConfig::default();
436        config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
437        config.set_strategy(
438            "tf_page_views",
439            FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
440        );
441
442        assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
443        assert_eq!(
444            config.get_strategy("tf_page_views"),
445            &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 }
446        );
447        // Unconfigured table uses default
448        assert_eq!(config.get_strategy("tf_other"), &FactTableVersionStrategy::Disabled);
449    }
450
451    #[test]
452    fn test_config_with_default() {
453        let config = FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
454        assert_eq!(config.get_strategy("tf_any"), &FactTableVersionStrategy::SchemaVersion);
455    }
456
457    #[test]
458    fn test_generate_version_key_disabled() {
459        let key = generate_version_key_component(
460            "tf_sales",
461            &FactTableVersionStrategy::Disabled,
462            Some(42),
463            "1.0.0",
464        );
465        assert!(key.is_none());
466    }
467
468    #[test]
469    fn test_generate_version_key_version_table() {
470        let key = generate_version_key_component(
471            "tf_sales",
472            &FactTableVersionStrategy::VersionTable,
473            Some(42),
474            "1.0.0",
475        );
476        assert_eq!(key, Some("tv:42".to_string()));
477
478        // No version available - should return None
479        let key = generate_version_key_component(
480            "tf_sales",
481            &FactTableVersionStrategy::VersionTable,
482            None,
483            "1.0.0",
484        );
485        assert!(key.is_none());
486    }
487
488    #[test]
489    fn test_generate_version_key_time_based() {
490        let key = generate_version_key_component(
491            "tf_sales",
492            &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
493            None,
494            "1.0.0",
495        );
496        assert!(key.is_some());
497        assert!(key.unwrap().starts_with("tb:"));
498    }
499
500    #[test]
501    fn test_generate_version_key_schema_version() {
502        let key = generate_version_key_component(
503            "tf_sales",
504            &FactTableVersionStrategy::SchemaVersion,
505            None,
506            "1.0.0",
507        );
508        assert_eq!(key, Some("sv:1.0.0".to_string()));
509    }
510
511    #[test]
512    fn test_version_provider_caching() {
513        let provider = FactTableVersionProvider::new(Duration::from_secs(10));
514
515        // Initially no cached version
516        assert!(provider.get_cached_version("tf_sales").is_none());
517
518        // Set version
519        provider.set_cached_version("tf_sales", 42);
520        assert_eq!(provider.get_cached_version("tf_sales"), Some(42));
521
522        // Clear version
523        provider.clear_cached_version("tf_sales");
524        assert!(provider.get_cached_version("tf_sales").is_none());
525    }
526
527    #[test]
528    fn test_version_provider_clear_all() {
529        let provider = FactTableVersionProvider::new(Duration::from_secs(10));
530
531        provider.set_cached_version("tf_sales", 1);
532        provider.set_cached_version("tf_orders", 2);
533
534        provider.clear_all();
535
536        assert!(provider.get_cached_version("tf_sales").is_none());
537        assert!(provider.get_cached_version("tf_orders").is_none());
538    }
539
540    #[test]
541    fn test_cached_version_freshness() {
542        let cached = CachedVersion::new(42);
543
544        // Should be fresh immediately
545        assert!(cached.is_fresh(Duration::from_secs(1)));
546
547        // Should be fresh for a longer duration
548        assert!(cached.is_fresh(Duration::from_secs(60)));
549    }
550
551    #[test]
552    fn test_strategy_serialization() {
553        let strategies = vec![
554            FactTableVersionStrategy::Disabled,
555            FactTableVersionStrategy::VersionTable,
556            FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
557            FactTableVersionStrategy::SchemaVersion,
558        ];
559
560        for strategy in strategies {
561            let json = serde_json::to_string(&strategy).unwrap();
562            let deserialized: FactTableVersionStrategy = serde_json::from_str(&json).unwrap();
563            assert_eq!(strategy, deserialized);
564        }
565    }
566
567    #[test]
568    fn test_config_serialization() {
569        let mut config =
570            FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
571        config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
572        config.set_strategy("tf_events", FactTableVersionStrategy::TimeBased { ttl_seconds: 60 });
573
574        let json = serde_json::to_string_pretty(&config).unwrap();
575        let deserialized: FactTableCacheConfig = serde_json::from_str(&json).unwrap();
576
577        assert_eq!(deserialized.default_strategy, FactTableVersionStrategy::SchemaVersion);
578        assert_eq!(deserialized.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
579    }
580}