reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::sync::Arc;

use reifydb_core::{
	interface::catalog::vtable::VTable,
	retention::{CleanupMode, RetentionStrategy},
	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{fragment::Fragment, value::Value};

use crate::{
	CatalogStore, Result,
	system::SystemCatalog,
	vtable::{BaseVTable, Batch, VTableContext},
};

/// Virtual table that exposes operator retention strategy information
pub struct SystemOperatorRetentionStrategies {
	pub(crate) vtable: Arc<VTable>,
	exhausted: bool,
}

impl Default for SystemOperatorRetentionStrategies {
	fn default() -> Self {
		Self::new()
	}
}

impl SystemOperatorRetentionStrategies {
	pub fn new() -> Self {
		Self {
			vtable: SystemCatalog::get_system_operator_retention_strategies_table().clone(),
			exhausted: false,
		}
	}
}

impl BaseVTable for SystemOperatorRetentionStrategies {
	fn initialize(&mut self, _txn: &mut Transaction<'_>, _ctx: VTableContext) -> Result<()> {
		self.exhausted = false;
		Ok(())
	}

	fn next(&mut self, txn: &mut Transaction<'_>) -> Result<Option<Batch>> {
		if self.exhausted {
			return Ok(None);
		}

		let strategies = CatalogStore::list_operator_retention_strategies(txn)?;

		let mut operator_ids = ColumnBuffer::uint8_with_capacity(strategies.len());
		let mut strategy_types = ColumnBuffer::utf8_with_capacity(strategies.len());
		let mut cleanup_modes = ColumnBuffer::utf8_with_capacity(strategies.len());
		let mut values = ColumnBuffer::uint8_with_capacity(strategies.len());

		for entry in strategies {
			operator_ids.push(entry.operator.0);

			// Encode strategy
			match entry.strategy {
				RetentionStrategy::KeepForever => {
					strategy_types.push("keep_forever");
					cleanup_modes.push_value(Value::none());
					values.push_value(Value::none());
				}
				RetentionStrategy::KeepVersions {
					count,
					cleanup_mode,
				} => {
					strategy_types.push("keep_versions");
					cleanup_modes.push(match cleanup_mode {
						CleanupMode::Delete => "delete",
						CleanupMode::Drop => "drop",
					});
					values.push(count);
				}
			}
		}

		let columns = vec![
			ColumnWithName::new(Fragment::internal("operator_id"), operator_ids),
			ColumnWithName::new(Fragment::internal("strategy_type"), strategy_types),
			ColumnWithName::new(Fragment::internal("cleanup_mode"), cleanup_modes),
			ColumnWithName::new(Fragment::internal("value"), values),
		];

		self.exhausted = true;
		Ok(Some(Batch {
			columns: Columns::new(columns),
		}))
	}

	fn vtable(&self) -> &VTable {
		&self.vtable
	}
}