reifydb-engine 0.6.0

Query execution and processing engine for ReifyDB
Documentation
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (c) 2026 ReifyDB

use reifydb_catalog::change::apply_system_change;
use reifydb_core::{
	delta::Delta,
	interface::{
		catalog::{id::NamespaceId, shape::ShapeId},
		cdc::SystemChange,
	},
};
use reifydb_engine::test_harness::TestEngine;
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction, replica::ReplicaTransaction};
use reifydb_value::value::identity::IdentityId;

#[test]
fn test_row_settings_sync_to_catalog_cache() {
	let engine = TestEngine::new();
	let catalog = engine.catalog();

	// 1. Create a namespace and table with TTL
	engine.admin("CREATE NAMESPACE test");
	engine.admin(r#"
		CREATE TABLE test::users { id: int4 } WITH {
			row: { ttl: { duration: '1h', on: created, mode: drop } }
		};
	"#);

	// 2. Check if TTL is in CatalogCache immediately
	let mut txn = engine.begin_admin(IdentityId::system()).unwrap();
	let ns_id = NamespaceId(16385); // 'test' namespace
	let table = catalog
		.find_table_by_name(&mut Transaction::Admin(&mut txn), ns_id, "users")
		.unwrap()
		.expect("table not found");
	let shape = ShapeId::Table(table.id);

	let ttl = catalog
		.find_row_settings(&mut Transaction::Admin(&mut txn), shape)
		.expect("TTL not found in materialized catalog");
	assert_eq!(ttl.ttl.expect("ttl not set").duration_nanos, 3_600_000_000_000);
}

#[test]
fn test_row_settings_replication_sync() {
	let primary = TestEngine::new();
	let replica = TestEngine::new();
	let replica_catalog = replica.catalog();

	// 1. Start transaction on primary
	let mut txn = primary.begin_admin(IdentityId::system()).unwrap();

	// 2. Create table with TTL
	let r = txn.rql("CREATE NAMESPACE test", Default::default());
	if let Some(e) = r.error {
		panic!("{e:?}");
	}
	let r = txn.rql(
		"CREATE TABLE test::users { id: int4 } WITH { row: { ttl: { duration: '1m', on: created, mode: drop } } }",
		Default::default(),
	);
	if let Some(e) = r.error {
		panic!("{e:?}");
	}

	// 3. Capture changes
	let changes = deltas_to_system_changes(&txn);

	// 4. Commit primary
	let version = txn.commit().unwrap();

	// 5. Apply to replica
	let mut replica_txn = ReplicaTransaction::new(replica.multi_owned(), version).unwrap();
	for change in &changes {
		apply_system_change(&replica_catalog, &mut Transaction::Replica(&mut replica_txn), change).unwrap();
	}
	replica_txn.commit_at_version().unwrap();

	// 6. Verify replica materialized catalog has the TTL
	// Namespace ID should be 16385
	let mut q_txn = replica.begin_admin(IdentityId::system()).unwrap();
	let table = replica_catalog
		.find_table_by_name(&mut Transaction::Admin(&mut q_txn), NamespaceId(16385), "users")
		.unwrap()
		.expect("table not found on replica");
	let shape = ShapeId::Table(table.id);

	let ttl = replica_catalog
		.find_row_settings(&mut Transaction::Admin(&mut q_txn), shape)
		.expect("TTL not found in replica materialized catalog");
	assert_eq!(ttl.ttl.expect("ttl not set").duration_nanos, 60_000_000_000);
}

#[test]
fn test_operator_settings_sync_to_catalog_cache() {
	use reifydb_catalog::store::operator_settings::create::create_operator_settings;
	use reifydb_core::{
		interface::catalog::flow::FlowNodeId,
		row::{OperatorSettings, Ttl, TtlAnchor, TtlCleanupMode},
	};
	use reifydb_store_multi::gc::operator::ListOperatorSettings;

	let engine = TestEngine::new();
	let catalog = engine.catalog();

	// An operator's TTL is persisted by the flow compiler via create_operator_settings.
	// The operator-TTL GC actor only ever sees these via the cache-backed list below, so
	// the write must reach the cache through the post-commit interceptor. Before the fix
	// create_operator_settings only wrote storage without tracking the change, so the
	// interceptor never learned about it and this list stayed empty - silently disabling
	// operator-state GC for every stateful operator.
	let node_id = FlowNodeId(42);
	let settings = OperatorSettings {
		ttl: Some(Ttl {
			duration_nanos: 3_600_000_000_000,
			anchor: TtlAnchor::Created,
			cleanup_mode: TtlCleanupMode::Drop,
		}),
		join: None,
	};

	let mut txn = engine.begin_admin(IdentityId::system()).unwrap();
	create_operator_settings(&mut txn, node_id, &settings).unwrap();
	txn.commit().unwrap();

	let listed = catalog.list_operator_settings();
	assert_eq!(listed.len(), 1, "operator settings did not sync to the catalog cache");
	assert_eq!(listed[0].0, node_id);
	assert_eq!(listed[0].1.ttl.as_ref().expect("ttl not set").duration_nanos, 3_600_000_000_000);
}

fn deltas_to_system_changes(txn: &AdminTransaction) -> Vec<SystemChange> {
	txn.pending_writes()
		.clone()
		.into_iter_insertion_order()
		.filter_map(|(_, pending)| match pending.delta {
			Delta::Set {
				key,
				row,
			} => Some(SystemChange::Insert {
				key,
				post: row,
			}),
			Delta::Unset {
				key,
				row,
			} => Some(SystemChange::Delete {
				key,
				pre: Some(row),
			}),
			Delta::Remove {
				key,
			} => Some(SystemChange::Delete {
				key,
				pre: None,
			}),
			Delta::Drop {
				key: _,
			} => None,
		})
		.collect()
}