1use std::{
80 collections::HashMap,
81 time::{Duration, Instant},
82};
83
84use serde::{Deserialize, Serialize};
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
91#[serde(tag = "type", rename_all = "snake_case")]
92#[non_exhaustive]
93pub enum FactTableVersionStrategy {
94 Disabled,
99
100 VersionTable,
117
118 TimeBased {
126 ttl_seconds: u64,
128 },
129
130 SchemaVersion,
139}
140
141impl Default for FactTableVersionStrategy {
142 fn default() -> Self {
144 Self::Disabled
145 }
146}
147
148impl FactTableVersionStrategy {
149 #[must_use]
151 pub const fn time_based(ttl_seconds: u64) -> Self {
152 Self::TimeBased { ttl_seconds }
153 }
154
155 #[must_use]
157 pub const fn is_caching_enabled(&self) -> bool {
158 !matches!(self, Self::Disabled)
159 }
160
161 #[must_use]
163 pub const fn ttl_seconds(&self) -> Option<u64> {
164 match self {
165 Self::TimeBased { ttl_seconds } => Some(*ttl_seconds),
166 _ => None,
167 }
168 }
169}
170
171#[derive(Debug, Clone, Default, Serialize, Deserialize)]
175pub struct FactTableCacheConfig {
176 #[serde(default)]
178 pub default_strategy: FactTableVersionStrategy,
179
180 #[serde(default)]
182 pub table_strategies: HashMap<String, FactTableVersionStrategy>,
183}
184
185impl FactTableCacheConfig {
186 #[must_use]
188 pub fn with_default(strategy: FactTableVersionStrategy) -> Self {
189 Self {
190 default_strategy: strategy,
191 table_strategies: HashMap::new(),
192 }
193 }
194
195 pub fn set_strategy(&mut self, table_name: &str, strategy: FactTableVersionStrategy) {
197 self.table_strategies.insert(table_name.to_string(), strategy);
198 }
199
200 #[must_use]
202 pub fn get_strategy(&self, table_name: &str) -> &FactTableVersionStrategy {
203 self.table_strategies.get(table_name).unwrap_or(&self.default_strategy)
204 }
205
206 #[must_use]
208 pub fn is_caching_enabled(&self, table_name: &str) -> bool {
209 self.get_strategy(table_name).is_caching_enabled()
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct CachedVersion {
216 pub version: i64,
218 pub fetched_at: Instant,
220}
221
222impl CachedVersion {
223 #[must_use]
225 pub fn new(version: i64) -> Self {
226 Self {
227 version,
228 fetched_at: Instant::now(),
229 }
230 }
231
232 #[must_use]
237 pub fn is_fresh(&self, max_age: Duration) -> bool {
238 self.fetched_at.elapsed() < max_age
239 }
240}
241
242#[derive(Debug)]
246pub struct FactTableVersionProvider {
247 versions: std::sync::RwLock<HashMap<String, CachedVersion>>,
249 version_cache_ttl: Duration,
251}
252
253impl Default for FactTableVersionProvider {
254 fn default() -> Self {
255 Self::new(Duration::from_secs(1))
256 }
257}
258
259impl FactTableVersionProvider {
260 #[must_use]
266 pub fn new(version_cache_ttl: Duration) -> Self {
267 Self {
268 versions: std::sync::RwLock::new(HashMap::new()),
269 version_cache_ttl,
270 }
271 }
272
273 #[must_use]
275 pub fn get_cached_version(&self, table_name: &str) -> Option<i64> {
276 let versions = self.versions.read().ok()?;
277 let cached = versions.get(table_name)?;
278 if cached.is_fresh(self.version_cache_ttl) {
279 Some(cached.version)
280 } else {
281 None
282 }
283 }
284
285 pub fn set_cached_version(&self, table_name: &str, version: i64) {
287 if let Ok(mut versions) = self.versions.write() {
288 versions.insert(table_name.to_string(), CachedVersion::new(version));
289 }
290 }
291
292 pub fn clear_cached_version(&self, table_name: &str) {
294 if let Ok(mut versions) = self.versions.write() {
295 versions.remove(table_name);
296 }
297 }
298
299 pub fn clear_all(&self) {
301 if let Ok(mut versions) = self.versions.write() {
302 versions.clear();
303 }
304 }
305}
306
307#[must_use]
316pub fn generate_version_key_component(
317 _table_name: &str,
318 strategy: &FactTableVersionStrategy,
319 table_version: Option<i64>,
320 schema_version: &str,
321) -> Option<String> {
322 match strategy {
323 FactTableVersionStrategy::Disabled => None,
324
325 FactTableVersionStrategy::VersionTable => {
326 table_version.map(|v| format!("tv:{v}"))
328 },
329
330 FactTableVersionStrategy::TimeBased { ttl_seconds } => {
331 let now = std::time::SystemTime::now()
333 .duration_since(std::time::UNIX_EPOCH)
334 .unwrap_or_default()
335 .as_secs();
336 let bucket = now / ttl_seconds;
337 Some(format!("tb:{bucket}"))
338 },
339
340 FactTableVersionStrategy::SchemaVersion => {
341 Some(format!("sv:{schema_version}"))
343 },
344 }
345}
346
347pub const VERSION_TABLE_QUERY: &str = r"
349 SELECT version FROM tf_versions WHERE table_name = $1
350";
351
352pub const VERSION_TABLE_SCHEMA: &str = r"
354-- Fact table version tracking for aggregation cache
355CREATE TABLE IF NOT EXISTS tf_versions (
356 table_name TEXT PRIMARY KEY,
357 version BIGINT NOT NULL DEFAULT 1,
358 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
359);
360
361-- Index for fast lookups
362CREATE INDEX IF NOT EXISTS idx_tf_versions_updated_at ON tf_versions (updated_at);
363
364-- Helper function to bump version (call after data loads)
365CREATE OR REPLACE FUNCTION bump_tf_version(p_table_name TEXT)
366RETURNS BIGINT AS $$
367DECLARE
368 new_version BIGINT;
369BEGIN
370 INSERT INTO tf_versions (table_name, version, updated_at)
371 VALUES (p_table_name, 1, NOW())
372 ON CONFLICT (table_name) DO UPDATE
373 SET version = tf_versions.version + 1,
374 updated_at = NOW()
375 RETURNING version INTO new_version;
376 RETURN new_version;
377END;
378$$ LANGUAGE plpgsql;
379
380-- Optional: Trigger function for automatic version bumping
381-- Note: This adds overhead to every INSERT/UPDATE/DELETE
382CREATE OR REPLACE FUNCTION tf_auto_version_bump()
383RETURNS TRIGGER AS $$
384BEGIN
385 PERFORM bump_tf_version(TG_TABLE_NAME);
386 RETURN NULL;
387END;
388$$ LANGUAGE plpgsql;
389
390-- Example: Apply auto-bump trigger to a fact table
391-- CREATE TRIGGER tf_sales_version_bump
392-- AFTER INSERT OR UPDATE OR DELETE ON tf_sales
393-- FOR EACH STATEMENT EXECUTE FUNCTION tf_auto_version_bump();
394";
395
396#[cfg(test)]
397mod tests {
398 #![allow(clippy::unwrap_used)] use super::*;
401
402 #[test]
403 fn test_strategy_default_is_disabled() {
404 let strategy = FactTableVersionStrategy::default();
405 assert_eq!(strategy, FactTableVersionStrategy::Disabled);
406 assert!(!strategy.is_caching_enabled());
407 }
408
409 #[test]
410 fn test_strategy_time_based() {
411 let strategy = FactTableVersionStrategy::time_based(300);
412 assert!(strategy.is_caching_enabled());
413 assert_eq!(strategy.ttl_seconds(), Some(300));
414 }
415
416 #[test]
417 fn test_strategy_version_table() {
418 let strategy = FactTableVersionStrategy::VersionTable;
419 assert!(strategy.is_caching_enabled());
420 assert_eq!(strategy.ttl_seconds(), None);
421 }
422
423 #[test]
424 fn test_strategy_schema_version() {
425 let strategy = FactTableVersionStrategy::SchemaVersion;
426 assert!(strategy.is_caching_enabled());
427 assert_eq!(strategy.ttl_seconds(), None);
428 }
429
430 #[test]
431 fn test_config_default_strategy() {
432 let config = FactTableCacheConfig::default();
433 assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::Disabled);
434 }
435
436 #[test]
437 fn test_config_per_table_strategy() {
438 let mut config = FactTableCacheConfig::default();
439 config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
440 config.set_strategy(
441 "tf_page_views",
442 FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
443 );
444
445 assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
446 assert_eq!(
447 config.get_strategy("tf_page_views"),
448 &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 }
449 );
450 assert_eq!(config.get_strategy("tf_other"), &FactTableVersionStrategy::Disabled);
452 }
453
454 #[test]
455 fn test_config_with_default() {
456 let config = FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
457 assert_eq!(config.get_strategy("tf_any"), &FactTableVersionStrategy::SchemaVersion);
458 }
459
460 #[test]
461 fn test_generate_version_key_disabled() {
462 let key = generate_version_key_component(
463 "tf_sales",
464 &FactTableVersionStrategy::Disabled,
465 Some(42),
466 "1.0.0",
467 );
468 assert!(key.is_none());
469 }
470
471 #[test]
472 fn test_generate_version_key_version_table() {
473 let key = generate_version_key_component(
474 "tf_sales",
475 &FactTableVersionStrategy::VersionTable,
476 Some(42),
477 "1.0.0",
478 );
479 assert_eq!(key, Some("tv:42".to_string()));
480
481 let key = generate_version_key_component(
483 "tf_sales",
484 &FactTableVersionStrategy::VersionTable,
485 None,
486 "1.0.0",
487 );
488 assert!(key.is_none());
489 }
490
491 #[test]
492 fn test_generate_version_key_time_based() {
493 let key = generate_version_key_component(
494 "tf_sales",
495 &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
496 None,
497 "1.0.0",
498 );
499 assert!(key.is_some());
500 assert!(key.unwrap().starts_with("tb:"));
501 }
502
503 #[test]
504 fn test_generate_version_key_schema_version() {
505 let key = generate_version_key_component(
506 "tf_sales",
507 &FactTableVersionStrategy::SchemaVersion,
508 None,
509 "1.0.0",
510 );
511 assert_eq!(key, Some("sv:1.0.0".to_string()));
512 }
513
514 #[test]
515 fn test_version_provider_caching() {
516 let provider = FactTableVersionProvider::new(Duration::from_secs(10));
517
518 assert!(provider.get_cached_version("tf_sales").is_none());
520
521 provider.set_cached_version("tf_sales", 42);
523 assert_eq!(provider.get_cached_version("tf_sales"), Some(42));
524
525 provider.clear_cached_version("tf_sales");
527 assert!(provider.get_cached_version("tf_sales").is_none());
528 }
529
530 #[test]
531 fn test_version_provider_clear_all() {
532 let provider = FactTableVersionProvider::new(Duration::from_secs(10));
533
534 provider.set_cached_version("tf_sales", 1);
535 provider.set_cached_version("tf_orders", 2);
536
537 provider.clear_all();
538
539 assert!(provider.get_cached_version("tf_sales").is_none());
540 assert!(provider.get_cached_version("tf_orders").is_none());
541 }
542
543 #[test]
544 fn test_cached_version_freshness() {
545 let cached = CachedVersion::new(42);
546
547 assert!(cached.is_fresh(Duration::from_secs(1)));
549
550 assert!(cached.is_fresh(Duration::from_secs(60)));
552 }
553
554 #[test]
555 fn test_strategy_serialization() {
556 let strategies = vec![
557 FactTableVersionStrategy::Disabled,
558 FactTableVersionStrategy::VersionTable,
559 FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
560 FactTableVersionStrategy::SchemaVersion,
561 ];
562
563 for strategy in strategies {
564 let json = serde_json::to_string(&strategy).unwrap();
565 let deserialized: FactTableVersionStrategy = serde_json::from_str(&json).unwrap();
566 assert_eq!(strategy, deserialized);
567 }
568 }
569
570 #[test]
571 fn test_config_serialization() {
572 let mut config =
573 FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
574 config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
575 config.set_strategy("tf_events", FactTableVersionStrategy::TimeBased { ttl_seconds: 60 });
576
577 let json = serde_json::to_string_pretty(&config).unwrap();
578 let deserialized: FactTableCacheConfig = serde_json::from_str(&json).unwrap();
579
580 assert_eq!(deserialized.default_strategy, FactTableVersionStrategy::SchemaVersion);
581 assert_eq!(deserialized.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
582 }
583}