reifydb-catalog 0.5.6

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	interface::catalog::{flow::FlowNodeId, shape::ShapeId},
	key::{
		EncodableKey,
		operator_ttl::{OperatorTtlKey, OperatorTtlKeyRange},
		row_ttl::{RowTtlKey, RowTtlKeyRange},
	},
	row::Ttl,
};
use reifydb_transaction::transaction::Transaction;

use super::decode_ttl_config;
use crate::{CatalogStore, Result};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RowTtlEntry {
	pub shape: ShapeId,
	pub config: Ttl,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OperatorTtlEntry {
	pub node: FlowNodeId,
	pub config: Ttl,
}

impl CatalogStore {
	#[allow(dead_code)]
	pub fn list_row_ttls(rx: &mut Transaction<'_>) -> Result<Vec<RowTtlEntry>> {
		let mut result = Vec::new();

		let stream = rx.range(RowTtlKeyRange::full_scan(), 1024)?;

		for entry in stream {
			let entry = entry?;
			if let Some(key) = RowTtlKey::decode(&entry.key)
				&& let Some(config) = decode_ttl_config(&entry.row)
			{
				result.push(RowTtlEntry {
					shape: key.shape,
					config,
				});
			}
		}

		Ok(result)
	}

	#[allow(dead_code)]
	pub fn list_operator_ttls(rx: &mut Transaction<'_>) -> Result<Vec<OperatorTtlEntry>> {
		let mut result = Vec::new();

		let stream = rx.range(OperatorTtlKeyRange::full_scan(), 1024)?;

		for entry in stream {
			let entry = entry?;
			if let Some(key) = OperatorTtlKey::decode(&entry.key)
				&& let Some(config) = decode_ttl_config(&entry.row)
			{
				result.push(OperatorTtlEntry {
					node: key.node,
					config,
				});
			}
		}

		Ok(result)
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_core::{
		interface::catalog::id::{RingBufferId, SeriesId, TableId},
		row::{Ttl, TtlAnchor, TtlCleanupMode},
	};
	use reifydb_engine::test_harness::create_test_admin_transaction;
	use reifydb_transaction::transaction::Transaction;

	use super::*;
	use crate::store::ttl::create::create_row_ttl;

	#[test]
	fn test_list_row_ttls_empty() {
		let mut txn = create_test_admin_transaction();

		let entries = CatalogStore::list_row_ttls(&mut Transaction::Admin(&mut txn)).unwrap();
		assert!(entries.is_empty());
	}

	#[test]
	fn test_list_row_ttls_multiple() {
		let mut txn = create_test_admin_transaction();

		let table_shape = ShapeId::Table(TableId(1));
		let rb_shape = ShapeId::RingBuffer(RingBufferId(2));
		let series_shape = ShapeId::Series(SeriesId(3));

		let config_table = Ttl {
			duration_nanos: 300_000_000_000,
			anchor: TtlAnchor::Created,
			cleanup_mode: TtlCleanupMode::Drop,
		};
		let config_rb = Ttl {
			duration_nanos: 600_000_000_000,
			anchor: TtlAnchor::Updated,
			cleanup_mode: TtlCleanupMode::Delete,
		};
		let config_series = Ttl {
			duration_nanos: 86_400_000_000_000,
			anchor: TtlAnchor::Created,
			cleanup_mode: TtlCleanupMode::Drop,
		};

		create_row_ttl(&mut txn, table_shape, &config_table).unwrap();
		create_row_ttl(&mut txn, rb_shape, &config_rb).unwrap();
		create_row_ttl(&mut txn, series_shape, &config_series).unwrap();

		let entries = CatalogStore::list_row_ttls(&mut Transaction::Admin(&mut txn)).unwrap();
		assert_eq!(entries.len(), 3);
		assert!(entries.iter().any(|e| e.shape == table_shape && e.config == config_table));
		assert!(entries.iter().any(|e| e.shape == rb_shape && e.config == config_rb));
		assert!(entries.iter().any(|e| e.shape == series_shape && e.config == config_series));
	}
}