1use std::{
80 collections::HashMap,
81 time::{Duration, Instant},
82};
83
84use serde::{Deserialize, Serialize};
85
86#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
91#[serde(tag = "type", rename_all = "snake_case")]
92pub enum FactTableVersionStrategy {
93 Disabled,
98
99 VersionTable,
116
117 TimeBased {
125 ttl_seconds: u64,
127 },
128
129 SchemaVersion,
138}
139
140impl Default for FactTableVersionStrategy {
141 fn default() -> Self {
143 Self::Disabled
144 }
145}
146
147impl FactTableVersionStrategy {
148 #[must_use]
150 pub const fn time_based(ttl_seconds: u64) -> Self {
151 Self::TimeBased { ttl_seconds }
152 }
153
154 #[must_use]
156 pub const fn is_caching_enabled(&self) -> bool {
157 !matches!(self, Self::Disabled)
158 }
159
160 #[must_use]
162 pub const fn ttl_seconds(&self) -> Option<u64> {
163 match self {
164 Self::TimeBased { ttl_seconds } => Some(*ttl_seconds),
165 _ => None,
166 }
167 }
168}
169
170#[derive(Debug, Clone, Default, Serialize, Deserialize)]
174pub struct FactTableCacheConfig {
175 #[serde(default)]
177 pub default_strategy: FactTableVersionStrategy,
178
179 #[serde(default)]
181 pub table_strategies: HashMap<String, FactTableVersionStrategy>,
182}
183
184impl FactTableCacheConfig {
185 #[must_use]
187 pub fn with_default(strategy: FactTableVersionStrategy) -> Self {
188 Self {
189 default_strategy: strategy,
190 table_strategies: HashMap::new(),
191 }
192 }
193
194 pub fn set_strategy(&mut self, table_name: &str, strategy: FactTableVersionStrategy) {
196 self.table_strategies.insert(table_name.to_string(), strategy);
197 }
198
199 #[must_use]
201 pub fn get_strategy(&self, table_name: &str) -> &FactTableVersionStrategy {
202 self.table_strategies.get(table_name).unwrap_or(&self.default_strategy)
203 }
204
205 #[must_use]
207 pub fn is_caching_enabled(&self, table_name: &str) -> bool {
208 self.get_strategy(table_name).is_caching_enabled()
209 }
210}
211
212#[derive(Debug, Clone)]
214pub struct CachedVersion {
215 pub version: i64,
217 pub fetched_at: Instant,
219}
220
221impl CachedVersion {
222 #[must_use]
224 pub fn new(version: i64) -> Self {
225 Self {
226 version,
227 fetched_at: Instant::now(),
228 }
229 }
230
231 #[must_use]
236 pub fn is_fresh(&self, max_age: Duration) -> bool {
237 self.fetched_at.elapsed() < max_age
238 }
239}
240
241#[derive(Debug)]
245pub struct FactTableVersionProvider {
246 versions: std::sync::RwLock<HashMap<String, CachedVersion>>,
248 version_cache_ttl: Duration,
250}
251
252impl Default for FactTableVersionProvider {
253 fn default() -> Self {
254 Self::new(Duration::from_secs(1))
255 }
256}
257
258impl FactTableVersionProvider {
259 #[must_use]
265 pub fn new(version_cache_ttl: Duration) -> Self {
266 Self {
267 versions: std::sync::RwLock::new(HashMap::new()),
268 version_cache_ttl,
269 }
270 }
271
272 #[must_use]
274 pub fn get_cached_version(&self, table_name: &str) -> Option<i64> {
275 let versions = self.versions.read().ok()?;
276 let cached = versions.get(table_name)?;
277 if cached.is_fresh(self.version_cache_ttl) {
278 Some(cached.version)
279 } else {
280 None
281 }
282 }
283
284 pub fn set_cached_version(&self, table_name: &str, version: i64) {
286 if let Ok(mut versions) = self.versions.write() {
287 versions.insert(table_name.to_string(), CachedVersion::new(version));
288 }
289 }
290
291 pub fn clear_cached_version(&self, table_name: &str) {
293 if let Ok(mut versions) = self.versions.write() {
294 versions.remove(table_name);
295 }
296 }
297
298 pub fn clear_all(&self) {
300 if let Ok(mut versions) = self.versions.write() {
301 versions.clear();
302 }
303 }
304}
305
306#[must_use]
315pub fn generate_version_key_component(
316 _table_name: &str,
317 strategy: &FactTableVersionStrategy,
318 table_version: Option<i64>,
319 schema_version: &str,
320) -> Option<String> {
321 match strategy {
322 FactTableVersionStrategy::Disabled => None,
323
324 FactTableVersionStrategy::VersionTable => {
325 table_version.map(|v| format!("tv:{v}"))
327 },
328
329 FactTableVersionStrategy::TimeBased { ttl_seconds } => {
330 let now = std::time::SystemTime::now()
332 .duration_since(std::time::UNIX_EPOCH)
333 .unwrap_or_default()
334 .as_secs();
335 let bucket = now / ttl_seconds;
336 Some(format!("tb:{bucket}"))
337 },
338
339 FactTableVersionStrategy::SchemaVersion => {
340 Some(format!("sv:{schema_version}"))
342 },
343 }
344}
345
346pub const VERSION_TABLE_QUERY: &str = r"
348 SELECT version FROM tf_versions WHERE table_name = $1
349";
350
351pub const VERSION_TABLE_SCHEMA: &str = r"
353-- Fact table version tracking for aggregation cache
354CREATE TABLE IF NOT EXISTS tf_versions (
355 table_name TEXT PRIMARY KEY,
356 version BIGINT NOT NULL DEFAULT 1,
357 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
358);
359
360-- Index for fast lookups
361CREATE INDEX IF NOT EXISTS idx_tf_versions_updated_at ON tf_versions (updated_at);
362
363-- Helper function to bump version (call after data loads)
364CREATE OR REPLACE FUNCTION bump_tf_version(p_table_name TEXT)
365RETURNS BIGINT AS $$
366DECLARE
367 new_version BIGINT;
368BEGIN
369 INSERT INTO tf_versions (table_name, version, updated_at)
370 VALUES (p_table_name, 1, NOW())
371 ON CONFLICT (table_name) DO UPDATE
372 SET version = tf_versions.version + 1,
373 updated_at = NOW()
374 RETURNING version INTO new_version;
375 RETURN new_version;
376END;
377$$ LANGUAGE plpgsql;
378
379-- Optional: Trigger function for automatic version bumping
380-- Note: This adds overhead to every INSERT/UPDATE/DELETE
381CREATE OR REPLACE FUNCTION tf_auto_version_bump()
382RETURNS TRIGGER AS $$
383BEGIN
384 PERFORM bump_tf_version(TG_TABLE_NAME);
385 RETURN NULL;
386END;
387$$ LANGUAGE plpgsql;
388
389-- Example: Apply auto-bump trigger to a fact table
390-- CREATE TRIGGER tf_sales_version_bump
391-- AFTER INSERT OR UPDATE OR DELETE ON tf_sales
392-- FOR EACH STATEMENT EXECUTE FUNCTION tf_auto_version_bump();
393";
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn test_strategy_default_is_disabled() {
401 let strategy = FactTableVersionStrategy::default();
402 assert_eq!(strategy, FactTableVersionStrategy::Disabled);
403 assert!(!strategy.is_caching_enabled());
404 }
405
406 #[test]
407 fn test_strategy_time_based() {
408 let strategy = FactTableVersionStrategy::time_based(300);
409 assert!(strategy.is_caching_enabled());
410 assert_eq!(strategy.ttl_seconds(), Some(300));
411 }
412
413 #[test]
414 fn test_strategy_version_table() {
415 let strategy = FactTableVersionStrategy::VersionTable;
416 assert!(strategy.is_caching_enabled());
417 assert_eq!(strategy.ttl_seconds(), None);
418 }
419
420 #[test]
421 fn test_strategy_schema_version() {
422 let strategy = FactTableVersionStrategy::SchemaVersion;
423 assert!(strategy.is_caching_enabled());
424 assert_eq!(strategy.ttl_seconds(), None);
425 }
426
427 #[test]
428 fn test_config_default_strategy() {
429 let config = FactTableCacheConfig::default();
430 assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::Disabled);
431 }
432
433 #[test]
434 fn test_config_per_table_strategy() {
435 let mut config = FactTableCacheConfig::default();
436 config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
437 config.set_strategy(
438 "tf_page_views",
439 FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
440 );
441
442 assert_eq!(config.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
443 assert_eq!(
444 config.get_strategy("tf_page_views"),
445 &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 }
446 );
447 assert_eq!(config.get_strategy("tf_other"), &FactTableVersionStrategy::Disabled);
449 }
450
451 #[test]
452 fn test_config_with_default() {
453 let config = FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
454 assert_eq!(config.get_strategy("tf_any"), &FactTableVersionStrategy::SchemaVersion);
455 }
456
457 #[test]
458 fn test_generate_version_key_disabled() {
459 let key = generate_version_key_component(
460 "tf_sales",
461 &FactTableVersionStrategy::Disabled,
462 Some(42),
463 "1.0.0",
464 );
465 assert!(key.is_none());
466 }
467
468 #[test]
469 fn test_generate_version_key_version_table() {
470 let key = generate_version_key_component(
471 "tf_sales",
472 &FactTableVersionStrategy::VersionTable,
473 Some(42),
474 "1.0.0",
475 );
476 assert_eq!(key, Some("tv:42".to_string()));
477
478 let key = generate_version_key_component(
480 "tf_sales",
481 &FactTableVersionStrategy::VersionTable,
482 None,
483 "1.0.0",
484 );
485 assert!(key.is_none());
486 }
487
488 #[test]
489 fn test_generate_version_key_time_based() {
490 let key = generate_version_key_component(
491 "tf_sales",
492 &FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
493 None,
494 "1.0.0",
495 );
496 assert!(key.is_some());
497 assert!(key.unwrap().starts_with("tb:"));
498 }
499
500 #[test]
501 fn test_generate_version_key_schema_version() {
502 let key = generate_version_key_component(
503 "tf_sales",
504 &FactTableVersionStrategy::SchemaVersion,
505 None,
506 "1.0.0",
507 );
508 assert_eq!(key, Some("sv:1.0.0".to_string()));
509 }
510
511 #[test]
512 fn test_version_provider_caching() {
513 let provider = FactTableVersionProvider::new(Duration::from_secs(10));
514
515 assert!(provider.get_cached_version("tf_sales").is_none());
517
518 provider.set_cached_version("tf_sales", 42);
520 assert_eq!(provider.get_cached_version("tf_sales"), Some(42));
521
522 provider.clear_cached_version("tf_sales");
524 assert!(provider.get_cached_version("tf_sales").is_none());
525 }
526
527 #[test]
528 fn test_version_provider_clear_all() {
529 let provider = FactTableVersionProvider::new(Duration::from_secs(10));
530
531 provider.set_cached_version("tf_sales", 1);
532 provider.set_cached_version("tf_orders", 2);
533
534 provider.clear_all();
535
536 assert!(provider.get_cached_version("tf_sales").is_none());
537 assert!(provider.get_cached_version("tf_orders").is_none());
538 }
539
540 #[test]
541 fn test_cached_version_freshness() {
542 let cached = CachedVersion::new(42);
543
544 assert!(cached.is_fresh(Duration::from_secs(1)));
546
547 assert!(cached.is_fresh(Duration::from_secs(60)));
549 }
550
551 #[test]
552 fn test_strategy_serialization() {
553 let strategies = vec![
554 FactTableVersionStrategy::Disabled,
555 FactTableVersionStrategy::VersionTable,
556 FactTableVersionStrategy::TimeBased { ttl_seconds: 300 },
557 FactTableVersionStrategy::SchemaVersion,
558 ];
559
560 for strategy in strategies {
561 let json = serde_json::to_string(&strategy).unwrap();
562 let deserialized: FactTableVersionStrategy = serde_json::from_str(&json).unwrap();
563 assert_eq!(strategy, deserialized);
564 }
565 }
566
567 #[test]
568 fn test_config_serialization() {
569 let mut config =
570 FactTableCacheConfig::with_default(FactTableVersionStrategy::SchemaVersion);
571 config.set_strategy("tf_sales", FactTableVersionStrategy::VersionTable);
572 config.set_strategy("tf_events", FactTableVersionStrategy::TimeBased { ttl_seconds: 60 });
573
574 let json = serde_json::to_string_pretty(&config).unwrap();
575 let deserialized: FactTableCacheConfig = serde_json::from_str(&json).unwrap();
576
577 assert_eq!(deserialized.default_strategy, FactTableVersionStrategy::SchemaVersion);
578 assert_eq!(deserialized.get_strategy("tf_sales"), &FactTableVersionStrategy::VersionTable);
579 }
580}