use std::{
collections::HashMap,
time::{Duration, Instant},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum FactTableVersionStrategy {
Disabled,
VersionTable,
TimeBased {
ttl_seconds: u64,
},
SchemaVersion,
}
impl Default for FactTableVersionStrategy {
fn default() -> Self {
Self::Disabled
}
}
impl FactTableVersionStrategy {
#[must_use]
pub const fn time_based(ttl_seconds: u64) -> Self {
Self::TimeBased { ttl_seconds }
}
#[must_use]
pub const fn is_caching_enabled(&self) -> bool {
!matches!(self, Self::Disabled)
}
#[must_use]
pub const fn ttl_seconds(&self) -> Option<u64> {
match self {
Self::TimeBased { ttl_seconds } => Some(*ttl_seconds),
_ => None,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FactTableCacheConfig {
#[serde(default)]
pub default_strategy: FactTableVersionStrategy,
#[serde(default)]
pub table_strategies: HashMap<String, FactTableVersionStrategy>,
}
impl FactTableCacheConfig {
#[must_use]
pub fn with_default(strategy: FactTableVersionStrategy) -> Self {
Self {
default_strategy: strategy,
table_strategies: HashMap::new(),
}
}
pub fn set_strategy(&mut self, table_name: &str, strategy: FactTableVersionStrategy) {
self.table_strategies.insert(table_name.to_string(), strategy);
}
#[must_use]
pub fn get_strategy(&self, table_name: &str) -> &FactTableVersionStrategy {
self.table_strategies.get(table_name).unwrap_or(&self.default_strategy)
}
#[must_use]
pub fn is_caching_enabled(&self, table_name: &str) -> bool {
self.get_strategy(table_name).is_caching_enabled()
}
}
#[derive(Debug, Clone)]
pub struct CachedVersion {
pub version: i64,
pub fetched_at: Instant,
}
impl CachedVersion {
#[must_use]
pub fn new(version: i64) -> Self {
Self {
version,
fetched_at: Instant::now(),
}
}
#[must_use]
pub fn is_fresh(&self, max_age: Duration) -> bool {
self.fetched_at.elapsed() < max_age
}
}
#[derive(Debug)]
pub struct FactTableVersionProvider {
versions: std::sync::RwLock<HashMap<String, CachedVersion>>,
version_cache_ttl: Duration,
}
impl Default for FactTableVersionProvider {
fn default() -> Self {
Self::new(Duration::from_secs(1))
}
}
impl FactTableVersionProvider {
#[must_use]
pub fn new(version_cache_ttl: Duration) -> Self {
Self {
versions: std::sync::RwLock::new(HashMap::new()),
version_cache_ttl,
}
}
#[must_use]
pub fn get_cached_version(&self, table_name: &str) -> Option<i64> {
let versions = self.versions.read().ok()?;
let cached = versions.get(table_name)?;
if cached.is_fresh(self.version_cache_ttl) {
Some(cached.version)
} else {
None
}
}
pub fn set_cached_version(&self, table_name: &str, version: i64) {
if let Ok(mut versions) = self.versions.write() {
versions.insert(table_name.to_string(), CachedVersion::new(version));
}
}
pub fn clear_cached_version(&self, table_name: &str) {
if let Ok(mut versions) = self.versions.write() {
versions.remove(table_name);
}
}
pub fn clear_all(&self) {
if let Ok(mut versions) = self.versions.write() {
versions.clear();
}
}
}
#[must_use]
pub fn generate_version_key_component(
_table_name: &str,
strategy: &FactTableVersionStrategy,
table_version: Option<i64>,
schema_version: &str,
) -> Option<String> {
match strategy {
FactTableVersionStrategy::Disabled => None,
FactTableVersionStrategy::VersionTable => {
table_version.map(|v| format!("tv:{v}"))
},
FactTableVersionStrategy::TimeBased { ttl_seconds } => {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let bucket = now / ttl_seconds;
Some(format!("tb:{bucket}"))
},
FactTableVersionStrategy::SchemaVersion => {
Some(format!("sv:{schema_version}"))
},
}
}
pub const VERSION_TABLE_QUERY: &str = r"
SELECT version FROM tf_versions WHERE table_name = $1
";
pub const VERSION_TABLE_SCHEMA: &str = r"
-- Fact table version tracking for aggregation cache
CREATE TABLE IF NOT EXISTS tf_versions (
table_name TEXT PRIMARY KEY,
version BIGINT NOT NULL DEFAULT 1,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Index for fast lookups
CREATE INDEX IF NOT EXISTS idx_tf_versions_updated_at ON tf_versions (updated_at);
-- Helper function to bump version (call after data loads)
CREATE OR REPLACE FUNCTION bump_tf_version(p_table_name TEXT)
RETURNS BIGINT AS $$
DECLARE
new_version BIGINT;
BEGIN
INSERT INTO tf_versions (table_name, version, updated_at)
VALUES (p_table_name, 1, NOW())
ON CONFLICT (table_name) DO UPDATE
SET version = tf_versions.version + 1,
updated_at = NOW()
RETURNING version INTO new_version;
RETURN new_version;
END;
$$ LANGUAGE plpgsql;
-- Optional: Trigger function for automatic version bumping
-- Note: This adds overhead to every INSERT/UPDATE/DELETE
CREATE OR REPLACE FUNCTION tf_auto_version_bump()
RETURNS TRIGGER AS $$
BEGIN
PERFORM bump_tf_version(TG_TABLE_NAME);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Example: Apply auto-bump trigger to a fact table
-- CREATE TRIGGER tf_sales_version_bump
-- AFTER INSERT OR UPDATE OR DELETE ON tf_sales
-- FOR EACH STATEMENT EXECUTE FUNCTION tf_auto_version_bump();
";
#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
use super::*;
#[test]
fn test_strategy_default_is_disabled() {
let strategy = FactTableVersionStrategy::default();
assert_eq!(strategy, FactTableVersionStrategy::Disabled);
assert!(!strategy.is_caching_enabled());
}
#[test]
fn test_strategy_time_based() {
let strategy = FactTableVersionStrategy::time_based(300);
assert!(strategy.is_caching_enabled());
assert_eq!(strategy.ttl_seconds(), Some(300));
}
#[test]
fn test_strategy_version_table() {
let strategy = FactTableVersionStrategy::VersionTable;
assert!(strategy.is_caching_enabled());
assert_eq!(strategy.ttl_seconds(), None);
}
#[test]
fn test_strategy_schema_version() {
let strategy = FactTableVersionStrategy::SchemaVersion;
assert!(strategy.is_caching_enabled());
assert_eq!(strategy.ttl_seconds(), None);
}
#[test]
fn test_config_default_strategy() {
let config = FactTableCacheConfig::default();
assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::Disabled);
}
#[test]
fn test_config_per_table_strategy() {
let mut config = FactTableCacheConfig::default();
config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
config.set_strategy(
"tf_page_views",
FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
);
assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
assert_eq!(
config.get_strategy("tf_page_views"),
&FactTableVersionStrategy::TimeBased { ttl_seconds: 300 }
);
assert_eq!(config.get_strategy("tf_other"), &FactTableVersionStrategy::Disabled);
}
#[test]
fn test_config_with_default() {
let config = FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
assert_eq!(config.get_strategy("tf_any"), &FactTableVersionStrategy::SchemaVersion);
}
#[test]
fn test_generate_version_key_disabled() {
let key = generate_version_key_component(
"tf_sales",
&FactTableVersionStrategy::Disabled,
Some(42),
"1.0.0",
);
assert!(key.is_none());
}
#[test]
fn test_generate_version_key_version_table() {
let key = generate_version_key_component(
"tf_sales",
&FactTableVersionStrategy::VersionTable,
Some(42),
"1.0.0",
);
assert_eq!(key, Some("tv:42".to_string()));
let key = generate_version_key_component(
"tf_sales",
&FactTableVersionStrategy::VersionTable,
None,
"1.0.0",
);
assert!(key.is_none());
}
#[test]
fn test_generate_version_key_time_based() {
let key = generate_version_key_component(
"tf_sales",
&FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
None,
"1.0.0",
);
assert!(key.is_some());
assert!(key.unwrap().starts_with("tb:"));
}
#[test]
fn test_generate_version_key_schema_version() {
let key = generate_version_key_component(
"tf_sales",
&FactTableVersionStrategy::SchemaVersion,
None,
"1.0.0",
);
assert_eq!(key, Some("sv:1.0.0".to_string()));
}
#[test]
fn test_version_provider_caching() {
let provider = FactTableVersionProvider::new(Duration::from_secs(10));
assert!(provider.get_cached_version("tf_sales").is_none());
provider.set_cached_version("tf_sales", 42);
assert_eq!(provider.get_cached_version("tf_sales"), Some(42));
provider.clear_cached_version("tf_sales");
assert!(provider.get_cached_version("tf_sales").is_none());
}
#[test]
fn test_version_provider_clear_all() {
let provider = FactTableVersionProvider::new(Duration::from_secs(10));
provider.set_cached_version("tf_sales", 1);
provider.set_cached_version("tf_orders", 2);
provider.clear_all();
assert!(provider.get_cached_version("tf_sales").is_none());
assert!(provider.get_cached_version("tf_orders").is_none());
}
#[test]
fn test_cached_version_freshness() {
let cached = CachedVersion::new(42);
assert!(cached.is_fresh(Duration::from_secs(1)));
assert!(cached.is_fresh(Duration::from_secs(60)));
}
#[test]
fn test_strategy_serialization() {
let strategies = vec![
FactTableVersionStrategy::Disabled,
FactTableVersionStrategy::VersionTable,
FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
FactTableVersionStrategy::SchemaVersion,
];
for strategy in strategies {
let json = serde_json::to_string(&strategy).unwrap();
let deserialized: FactTableVersionStrategy = serde_json::from_str(&json).unwrap();
assert_eq!(strategy, deserialized);
}
}
#[test]
fn test_config_serialization() {
let mut config =
FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
config.set_strategy("tf_events", FactTableVersionStrategy::TimeBased { ttl_seconds: 60 });
let json = serde_json::to_string_pretty(&config).unwrap();
let deserialized: FactTableCacheConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.default_strategy, FactTableVersionStrategy::SchemaVersion);
assert_eq!(deserialized.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
}
}