reifydb-catalog 0.4.8

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::{collections::HashMap, sync::Arc};

use reifydb_core::{
	interface::catalog::{flow::FlowId, vtable::VTable},
	value::column::{Column, columns::Columns, data::ColumnData},
};
use reifydb_metric::{
	MetricId,
	metric::{CombinedStats, MetricReader},
	multi::Tier,
};
use reifydb_store_single::SingleStore;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::fragment::Fragment;

use crate::{
	CatalogStore, Result,
	system::SystemCatalog,
	vtable::{BaseVTable, Batch, VTableContext},
};

/// A row of storage statistics: (id, namespace_id, tier, current_key_bytes, current_value_bytes,
/// current_total_bytes, current_count, historical_key_bytes, historical_value_bytes,
/// historical_total_bytes, historical_count, total_bytes, cdc_key_bytes, cdc_value_bytes,
/// cdc_total_bytes, cdc_count).
type StorageStatsRow = (u64, u64, Tier, u64, u64, u64, u64, u64, u64, u64, u64, u64, u64, u64, u64, u64);

/// Virtual table that exposes storage statistics for flows
pub struct SystemFlowStorageStats {
	pub(crate) vtable: Arc<VTable>,
	exhausted: bool,
	stats_reader: MetricReader<SingleStore>,
}

impl SystemFlowStorageStats {
	pub fn new(stats_reader: MetricReader<SingleStore>) -> Self {
		Self {
			vtable: SystemCatalog::get_system_flow_storage_stats_table().clone(),
			exhausted: false,
			stats_reader,
		}
	}
}

fn tier_to_str(tier: Tier) -> &'static str {
	match tier {
		Tier::Hot => "hot",
		Tier::Warm => "warm",
		Tier::Cold => "cold",
	}
}

impl BaseVTable for SystemFlowStorageStats {
	fn initialize(&mut self, _txn: &mut Transaction<'_>, _ctx: VTableContext) -> Result<()> {
		self.exhausted = false;
		Ok(())
	}

	fn next(&mut self, txn: &mut Transaction<'_>) -> Result<Option<Batch>> {
		if self.exhausted {
			return Ok(None);
		}

		// Aggregate node stats by (flow_id, tier)
		let mut aggregated: HashMap<(FlowId, Tier), CombinedStats> = HashMap::new();

		for tier in [Tier::Hot, Tier::Warm, Tier::Cold] {
			let tier_stats = self.stats_reader.scan_tier(tier).unwrap_or_default();
			for (obj_id, stats) in tier_stats {
				if let MetricId::FlowNode(flow_node_id) = obj_id
					&& let Some(node_def) = CatalogStore::find_flow_node(txn, flow_node_id)?
				{
					let key = (node_def.flow, tier);
					let entry = aggregated.entry(key).or_default();
					entry.storage += stats.storage;
					entry.cdc += stats.cdc;
				}
			}
		}

		// Convert aggregated stats to rows
		let mut rows: Vec<StorageStatsRow> = Vec::new();

		for ((flow_id, tier), stats) in aggregated {
			// Look up namespace_id from catalog
			let namespace_id = match CatalogStore::find_flow(txn, flow_id)? {
				Some(flow) => flow.namespace.0,
				None => 0,
			};

			rows.push((
				flow_id.0,
				namespace_id,
				tier,
				stats.storage.current_key_bytes,
				stats.storage.current_value_bytes,
				stats.current_bytes(),
				stats.storage.current_count,
				stats.storage.historical_key_bytes,
				stats.storage.historical_value_bytes,
				stats.historical_bytes(),
				stats.storage.historical_count,
				stats.total_bytes(),
				stats.cdc.key_bytes,
				stats.cdc.value_bytes,
				stats.cdc_total_bytes(),
				stats.cdc.entry_count,
			));
		}

		let capacity = rows.len();
		let mut ids = ColumnData::uint8_with_capacity(capacity);
		let mut namespace_ids = ColumnData::uint8_with_capacity(capacity);
		let mut tiers = ColumnData::utf8_with_capacity(capacity);
		let mut current_key_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut current_value_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut current_total_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut current_counts = ColumnData::uint8_with_capacity(capacity);
		let mut historical_key_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut historical_value_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut historical_total_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut historical_counts = ColumnData::uint8_with_capacity(capacity);
		let mut total_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut cdc_key_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut cdc_value_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut cdc_total_bytes = ColumnData::uint8_with_capacity(capacity);
		let mut cdc_counts = ColumnData::uint8_with_capacity(capacity);

		for row in rows {
			ids.push(row.0);
			namespace_ids.push(row.1);
			tiers.push(tier_to_str(row.2));
			current_key_bytes.push(row.3);
			current_value_bytes.push(row.4);
			current_total_bytes.push(row.5);
			current_counts.push(row.6);
			historical_key_bytes.push(row.7);
			historical_value_bytes.push(row.8);
			historical_total_bytes.push(row.9);
			historical_counts.push(row.10);
			total_bytes.push(row.11);
			cdc_key_bytes.push(row.12);
			cdc_value_bytes.push(row.13);
			cdc_total_bytes.push(row.14);
			cdc_counts.push(row.15);
		}

		let columns = vec![
			Column {
				name: Fragment::internal("id"),
				data: ids,
			},
			Column {
				name: Fragment::internal("namespace_id"),
				data: namespace_ids,
			},
			Column {
				name: Fragment::internal("tier"),
				data: tiers,
			},
			Column {
				name: Fragment::internal("current_key_bytes"),
				data: current_key_bytes,
			},
			Column {
				name: Fragment::internal("current_value_bytes"),
				data: current_value_bytes,
			},
			Column {
				name: Fragment::internal("current_total_bytes"),
				data: current_total_bytes,
			},
			Column {
				name: Fragment::internal("current_count"),
				data: current_counts,
			},
			Column {
				name: Fragment::internal("historical_key_bytes"),
				data: historical_key_bytes,
			},
			Column {
				name: Fragment::internal("historical_value_bytes"),
				data: historical_value_bytes,
			},
			Column {
				name: Fragment::internal("historical_total_bytes"),
				data: historical_total_bytes,
			},
			Column {
				name: Fragment::internal("historical_count"),
				data: historical_counts,
			},
			Column {
				name: Fragment::internal("total_bytes"),
				data: total_bytes,
			},
			Column {
				name: Fragment::internal("cdc_key_bytes"),
				data: cdc_key_bytes,
			},
			Column {
				name: Fragment::internal("cdc_value_bytes"),
				data: cdc_value_bytes,
			},
			Column {
				name: Fragment::internal("cdc_total_bytes"),
				data: cdc_total_bytes,
			},
			Column {
				name: Fragment::internal("cdc_count"),
				data: cdc_counts,
			},
		];

		self.exhausted = true;
		Ok(Some(Batch {
			columns: Columns::new(columns),
		}))
	}

	fn vtable(&self) -> &VTable {
		&self.vtable
	}
}