reifydb-catalog 0.5.6

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use std::sync::Arc;

use reifydb_core::{
	encoded::row::{EncodedRow, SHAPE_HEADER_SIZE},
	interface::change::Diff,
	row::Row,
	value::column::{buffer::pool::ColumnBufferPool, columns::Columns},
};
use reifydb_type::value::row_number::RowNumber;
use tracing::warn;

use crate::catalog::Catalog;

fn decode_row(catalog: &Catalog, row_number: RowNumber, row: EncodedRow) -> Option<Row> {
	if row.len() < SHAPE_HEADER_SIZE {
		warn!("EncodedRow too short for shape fingerprint ({} < {})", row.len(), SHAPE_HEADER_SIZE);
		return None;
	}
	let fingerprint = row.fingerprint();
	let shape = catalog.find_row_shape(fingerprint);
	match shape {
		Some(shape) => Some(Row {
			number: row_number,
			encoded: row,
			shape,
		}),
		None => {
			warn!(?fingerprint, "RowShape not found in cache for row decode");
			None
		}
	}
}

pub fn build_insert_diff(catalog: &Catalog, row_number: RowNumber, post: EncodedRow) -> Option<Diff> {
	let row = decode_row(catalog, row_number, post)?;
	Some(Diff::insert(Columns::from_row(&row)))
}

pub fn build_update_diff(catalog: &Catalog, row_number: RowNumber, pre: EncodedRow, post: EncodedRow) -> Option<Diff> {
	let pre_row = decode_row(catalog, row_number, pre)?;
	let post_row = decode_row(catalog, row_number, post)?;
	Some(Diff::update(Columns::from_row(&pre_row), Columns::from_row(&post_row)))
}

pub fn build_remove_diff(catalog: &Catalog, row_number: RowNumber, pre: EncodedRow) -> Option<Diff> {
	let row = decode_row(catalog, row_number, pre)?;
	Some(Diff::remove(Columns::from_row(&row)))
}

pub fn build_insert_diff_into(
	catalog: &Catalog,
	row_number: RowNumber,
	post: EncodedRow,
	post_buf: &mut Arc<Columns>,
) -> Option<Diff> {
	let row = decode_row(catalog, row_number, post)?;
	Arc::make_mut(post_buf).reset_from_row(&row);
	Some(Diff::insert_arc(post_buf.clone()))
}

pub fn build_update_diff_into(
	catalog: &Catalog,
	row_number: RowNumber,
	pre: EncodedRow,
	post: EncodedRow,
	pre_buf: &mut Arc<Columns>,
	post_buf: &mut Arc<Columns>,
) -> Option<Diff> {
	let pre_row = decode_row(catalog, row_number, pre)?;
	let post_row = decode_row(catalog, row_number, post)?;
	Arc::make_mut(pre_buf).reset_from_row(&pre_row);
	Arc::make_mut(post_buf).reset_from_row(&post_row);
	Some(Diff::update_arc(pre_buf.clone(), post_buf.clone()))
}

pub fn build_remove_diff_into(
	catalog: &Catalog,
	row_number: RowNumber,
	pre: EncodedRow,
	pre_buf: &mut Arc<Columns>,
) -> Option<Diff> {
	let row = decode_row(catalog, row_number, pre)?;
	Arc::make_mut(pre_buf).reset_from_row(&row);
	Some(Diff::remove_arc(pre_buf.clone()))
}

pub fn build_insert_diff_into_with_pool(
	catalog: &Catalog,
	row_number: RowNumber,
	post: EncodedRow,
	post_buf: &mut Arc<Columns>,
	pool: &ColumnBufferPool,
) -> Option<Diff> {
	let row = decode_row(catalog, row_number, post)?;
	Arc::make_mut(post_buf).reset_from_row_with_pool(&row, pool);
	Some(Diff::insert_arc(post_buf.clone()))
}

pub fn build_update_diff_into_with_pool(
	catalog: &Catalog,
	row_number: RowNumber,
	pre: EncodedRow,
	post: EncodedRow,
	pre_buf: &mut Arc<Columns>,
	post_buf: &mut Arc<Columns>,
	pool: &ColumnBufferPool,
) -> Option<Diff> {
	let pre_row = decode_row(catalog, row_number, pre)?;
	let post_row = decode_row(catalog, row_number, post)?;
	Arc::make_mut(pre_buf).reset_from_row_with_pool(&pre_row, pool);
	Arc::make_mut(post_buf).reset_from_row_with_pool(&post_row, pool);
	Some(Diff::update_arc(pre_buf.clone(), post_buf.clone()))
}

pub fn build_remove_diff_into_with_pool(
	catalog: &Catalog,
	row_number: RowNumber,
	pre: EncodedRow,
	pre_buf: &mut Arc<Columns>,
	pool: &ColumnBufferPool,
) -> Option<Diff> {
	let row = decode_row(catalog, row_number, pre)?;
	Arc::make_mut(pre_buf).reset_from_row_with_pool(&row, pool);
	Some(Diff::remove_arc(pre_buf.clone()))
}