reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{common::CommitVersion, interface::catalog::shape::ShapeId, row::RowTtl};

use crate::materialized::{MaterializedCatalog, MultiVersionRowTtl};

impl MaterializedCatalog {
	/// Find a TTL config for a shape at a specific version
	pub fn find_row_ttl_at(&self, shape: ShapeId, version: CommitVersion) -> Option<RowTtl> {
		self.row_ttls.get(&shape).and_then(|entry| {
			let multi = entry.value();
			multi.get(version)
		})
	}

	/// Find a TTL config for a shape (returns latest version)
	pub fn find_row_ttl(&self, shape: ShapeId) -> Option<RowTtl> {
		self.row_ttls.get(&shape).and_then(|entry| {
			let multi = entry.value();
			multi.get_latest()
		})
	}

	/// Set a TTL config for a shape at a specific version
	pub fn set_row_ttl(&self, shape: ShapeId, version: CommitVersion, config: Option<RowTtl>) {
		let multi = self.row_ttls.get_or_insert_with(shape, MultiVersionRowTtl::new);

		if let Some(new_config) = config {
			multi.value().insert(version, new_config);
		} else {
			multi.value().remove(version);
		}
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_core::{
		interface::catalog::id::TableId,
		row::{RowTtlAnchor, RowTtlCleanupMode},
	};

	use super::*;

	#[test]
	fn test_set_and_find_row_ttl() {
		let catalog = MaterializedCatalog::new();
		let shape = ShapeId::Table(TableId(1));
		let config = RowTtl {
			duration_nanos: 300_000_000_000,
			anchor: RowTtlAnchor::Created,
			cleanup_mode: RowTtlCleanupMode::Drop,
		};

		catalog.set_row_ttl(shape, CommitVersion(1), Some(config.clone()));

		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(1)), Some(config.clone()));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(5)), Some(config));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(0)), None);
	}

	#[test]
	fn test_row_ttl_update() {
		let catalog = MaterializedCatalog::new();
		let shape = ShapeId::Table(TableId(42));

		let config_v1 = RowTtl {
			duration_nanos: 300_000_000_000,
			anchor: RowTtlAnchor::Created,
			cleanup_mode: RowTtlCleanupMode::Drop,
		};
		let config_v2 = RowTtl {
			duration_nanos: 600_000_000_000,
			anchor: RowTtlAnchor::Updated,
			cleanup_mode: RowTtlCleanupMode::Delete,
		};

		catalog.set_row_ttl(shape, CommitVersion(1), Some(config_v1.clone()));
		catalog.set_row_ttl(shape, CommitVersion(2), Some(config_v2.clone()));

		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(1)), Some(config_v1));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(2)), Some(config_v2.clone()));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(10)), Some(config_v2));
	}

	#[test]
	fn test_row_ttl_deletion() {
		let catalog = MaterializedCatalog::new();
		let shape = ShapeId::Table(TableId(99));
		let config = RowTtl {
			duration_nanos: 300_000_000_000,
			anchor: RowTtlAnchor::Created,
			cleanup_mode: RowTtlCleanupMode::Drop,
		};

		catalog.set_row_ttl(shape, CommitVersion(1), Some(config.clone()));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(1)), Some(config.clone()));

		catalog.set_row_ttl(shape, CommitVersion(2), None);
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(2)), None);
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(1)), Some(config));
	}

	#[test]
	fn test_row_ttl_versioning() {
		let catalog = MaterializedCatalog::new();
		let shape = ShapeId::Table(TableId(100));

		let config_v1 = RowTtl {
			duration_nanos: 60_000_000_000,
			anchor: RowTtlAnchor::Created,
			cleanup_mode: RowTtlCleanupMode::Drop,
		};
		let config_v2 = RowTtl {
			duration_nanos: 300_000_000_000,
			anchor: RowTtlAnchor::Updated,
			cleanup_mode: RowTtlCleanupMode::Delete,
		};
		let config_v3 = RowTtl {
			duration_nanos: 86_400_000_000_000,
			anchor: RowTtlAnchor::Created,
			cleanup_mode: RowTtlCleanupMode::Drop,
		};

		catalog.set_row_ttl(shape, CommitVersion(10), Some(config_v1.clone()));
		catalog.set_row_ttl(shape, CommitVersion(20), Some(config_v2.clone()));
		catalog.set_row_ttl(shape, CommitVersion(30), Some(config_v3.clone()));

		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(5)), None);
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(10)), Some(config_v1.clone()));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(15)), Some(config_v1));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(20)), Some(config_v2.clone()));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(25)), Some(config_v2));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(30)), Some(config_v3.clone()));
		assert_eq!(catalog.find_row_ttl_at(shape, CommitVersion(100)), Some(config_v3));
	}
}