Skip to main content

reifydb_routine/procedure/testing/
changed.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_catalog::catalog::Catalog;
5use reifydb_core::{
6	interface::{catalog::shape::ShapeId, change::Diff},
7	internal_error,
8	value::column::{Column, columns::Columns, data::ColumnData},
9};
10use reifydb_transaction::transaction::Transaction;
11use reifydb_type::{
12	error::Error,
13	params::Params,
14	value::{Value, r#type::Type},
15};
16
17use crate::procedure::{Procedure, context::ProcedureContext, error::ProcedureError};
18
19/// Identifies the primitive type category for a `testing::*::changed()` procedure.
20pub struct TestingChanged {
21	pub shape_type: &'static str,
22}
23
24impl TestingChanged {
25	pub fn new(shape_type: &'static str) -> Self {
26		Self {
27			shape_type,
28		}
29	}
30}
31
32impl Procedure for TestingChanged {
33	fn call(&self, ctx: &ProcedureContext, tx: &mut Transaction<'_>) -> Result<Columns, ProcedureError> {
34		let t = match tx {
35			Transaction::Test(t) => t,
36			_ => {
37				return Err(internal_error!("testing::*::changed() requires a test transaction").into());
38			}
39		};
40
41		let filter_arg = extract_optional_string_param(ctx.params);
42
43		// Materialize view rows from pending source changes so that
44		// changed() sees transactional view mutations.
45		if self.shape_type == "views" {
46			let _ = t.capture_testing_pre_commit();
47		}
48
49		// Read individual mutations from the accumulator
50		let entries: Vec<_> =
51			t.accumulator_entries_from().iter().map(|(id, diff)| (*id, diff.clone())).collect();
52
53		let mut mutations: Vec<MutationEntry> = Vec::new();
54
55		for (shape_id, diff) in &entries {
56			let type_matches = matches!(
57				(&shape_id, self.shape_type),
58				(ShapeId::Table(_), "tables")
59					| (ShapeId::View(_), "views") | (ShapeId::RingBuffer(_), "ringbuffers")
60					| (ShapeId::Series(_), "series") | (ShapeId::Dictionary(_), "dictionaries")
61			);
62			if !type_matches {
63				continue;
64			}
65
66			let catalog: &Catalog = ctx.catalog;
67			let name = match resolve_shape_name(
68				catalog,
69				&mut Transaction::Test(Box::new(t.reborrow())),
70				shape_id,
71			) {
72				Ok(n) => n,
73				Err(_) => continue,
74			};
75
76			if let Some(filter) = filter_arg.as_deref()
77				&& name != filter
78			{
79				continue;
80			}
81
82			mutations.push(MutationEntry {
83				target: name,
84				diff: diff.clone(),
85			});
86		}
87
88		mutations.sort_by(|a, b| a.target.cmp(&b.target));
89		Ok(build_output_columns(&mutations)?)
90	}
91}
92
93fn extract_optional_string_param(params: &Params) -> Option<String> {
94	match params {
95		Params::Positional(args) if !args.is_empty() => match &args[0] {
96			Value::Utf8(s) => Some(s.clone()),
97			_ => None,
98		},
99		_ => None,
100	}
101}
102
103struct MutationEntry {
104	target: String,
105	diff: Diff,
106}
107
108fn resolve_shape_name(catalog: &Catalog, txn: &mut Transaction<'_>, id: &ShapeId) -> Result<String, Error> {
109	match id {
110		ShapeId::Table(table_id) => {
111			let table = catalog
112				.find_table(txn, *table_id)?
113				.ok_or_else(|| internal_error!("table not found for id {:?}", table_id))?;
114			let ns = catalog
115				.find_namespace(txn, table.namespace)?
116				.ok_or_else(|| internal_error!("namespace not found"))?;
117			Ok(format!("{}::{}", ns.name(), table.name))
118		}
119		ShapeId::View(view_id) => {
120			let view = catalog
121				.find_view(txn, *view_id)?
122				.ok_or_else(|| internal_error!("view not found for id {:?}", view_id))?;
123			let ns = catalog
124				.find_namespace(txn, view.namespace())?
125				.ok_or_else(|| internal_error!("namespace not found"))?;
126			Ok(format!("{}::{}", ns.name(), view.name()))
127		}
128		ShapeId::RingBuffer(rb_id) => {
129			let rb = catalog
130				.find_ringbuffer(txn, *rb_id)?
131				.ok_or_else(|| internal_error!("ringbuffer not found for id {:?}", rb_id))?;
132			let ns = catalog
133				.find_namespace(txn, rb.namespace)?
134				.ok_or_else(|| internal_error!("namespace not found"))?;
135			Ok(format!("{}::{}", ns.name(), rb.name))
136		}
137		ShapeId::Series(series_id) => {
138			let series = catalog
139				.find_series(txn, *series_id)?
140				.ok_or_else(|| internal_error!("series not found for id {:?}", series_id))?;
141			let ns = catalog
142				.find_namespace(txn, series.namespace)?
143				.ok_or_else(|| internal_error!("namespace not found"))?;
144			Ok(format!("{}::{}", ns.name(), series.name))
145		}
146		ShapeId::Dictionary(dict_id) => {
147			let dict = catalog
148				.find_dictionary(txn, *dict_id)?
149				.ok_or_else(|| internal_error!("dictionary not found for id {:?}", dict_id))?;
150			let ns = catalog
151				.find_namespace(txn, dict.namespace)?
152				.ok_or_else(|| internal_error!("namespace not found"))?;
153			Ok(format!("{}::{}", ns.name(), dict.name))
154		}
155		_ => Err(internal_error!("unsupported primitive type {:?}", id)),
156	}
157}
158
159fn build_output_columns(entries: &[MutationEntry]) -> Result<Columns, Error> {
160	if entries.is_empty() {
161		return Ok(Columns::empty());
162	}
163
164	let mut op_data = ColumnData::utf8_with_capacity(entries.len());
165	let mut target_data = ColumnData::utf8_with_capacity(entries.len());
166
167	let mut field_names: Vec<String> = Vec::new();
168	for entry in entries {
169		match &entry.diff {
170			Diff::Insert {
171				post,
172			}
173			| Diff::Remove {
174				pre: post,
175			} => {
176				for col in post.iter() {
177					let name = col.name().text().to_string();
178					if !field_names.contains(&name) {
179						field_names.push(name);
180					}
181				}
182			}
183			Diff::Update {
184				pre,
185				post,
186			} => {
187				for col in pre.iter() {
188					let name = col.name().text().to_string();
189					if !field_names.contains(&name) {
190						field_names.push(name);
191					}
192				}
193				for col in post.iter() {
194					let name = col.name().text().to_string();
195					if !field_names.contains(&name) {
196						field_names.push(name);
197					}
198				}
199			}
200		}
201	}
202
203	let mut old_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(entries.len()); field_names.len()];
204	let mut new_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(entries.len()); field_names.len()];
205
206	for entry in entries {
207		let (op, old_cols, new_cols) = match &entry.diff {
208			Diff::Insert {
209				post,
210			} => ("insert", &Columns::empty(), post),
211			Diff::Update {
212				pre,
213				post,
214			} => ("update", pre, post),
215			Diff::Remove {
216				pre,
217			} => ("delete", pre, &Columns::empty()),
218		};
219
220		op_data.push(op);
221		target_data.push(entry.target.as_str());
222
223		for (i, field_name) in field_names.iter().enumerate() {
224			let old_val =
225				old_cols.column(field_name).map(|col| col.data().get_value(0)).unwrap_or(Value::none());
226			old_columns[i].push(old_val);
227
228			let new_val =
229				new_cols.column(field_name).map(|col| col.data().get_value(0)).unwrap_or(Value::none());
230			new_columns[i].push(new_val);
231		}
232	}
233
234	let mut columns = vec![Column::new("op", op_data), Column::new("target", target_data)];
235
236	for (i, name) in field_names.iter().enumerate() {
237		let mut old_data = column_for_values(&old_columns[i]);
238		for val in &old_columns[i] {
239			old_data.push_value(val.clone());
240		}
241		columns.push(Column::new(format!("old_{}", name), old_data));
242
243		let mut new_data = column_for_values(&new_columns[i]);
244		for val in &new_columns[i] {
245			new_data.push_value(val.clone());
246		}
247		columns.push(Column::new(format!("new_{}", name), new_data));
248	}
249
250	Ok(Columns::new(columns))
251}
252
253fn column_for_values(values: &[Value]) -> ColumnData {
254	let first_type = values.iter().find_map(|v| {
255		if matches!(v, Value::None { .. }) {
256			None
257		} else {
258			Some(v.get_type())
259		}
260	});
261	match first_type {
262		Some(ty) => ColumnData::with_capacity(ty, values.len()),
263		None => ColumnData::none_typed(Type::Boolean, 0),
264	}
265}