reifydb_routine/procedure/testing/
changed.rs1use reifydb_catalog::catalog::Catalog;
5use reifydb_core::{
6 interface::{catalog::shape::ShapeId, change::Diff},
7 internal_error,
8 value::column::{Column, columns::Columns, data::ColumnData},
9};
10use reifydb_transaction::transaction::Transaction;
11use reifydb_type::{
12 error::Error,
13 params::Params,
14 value::{Value, r#type::Type},
15};
16
17use crate::procedure::{Procedure, context::ProcedureContext, error::ProcedureError};
18
19pub struct TestingChanged {
21 pub shape_type: &'static str,
22}
23
24impl TestingChanged {
25 pub fn new(shape_type: &'static str) -> Self {
26 Self {
27 shape_type,
28 }
29 }
30}
31
32impl Procedure for TestingChanged {
33 fn call(&self, ctx: &ProcedureContext, tx: &mut Transaction<'_>) -> Result<Columns, ProcedureError> {
34 let t = match tx {
35 Transaction::Test(t) => t,
36 _ => {
37 return Err(internal_error!("testing::*::changed() requires a test transaction").into());
38 }
39 };
40
41 let filter_arg = extract_optional_string_param(ctx.params);
42
43 if self.shape_type == "views" {
46 let _ = t.capture_testing_pre_commit();
47 }
48
49 let entries: Vec<_> =
51 t.accumulator_entries_from().iter().map(|(id, diff)| (*id, diff.clone())).collect();
52
53 let mut mutations: Vec<MutationEntry> = Vec::new();
54
55 for (shape_id, diff) in &entries {
56 let type_matches = matches!(
57 (&shape_id, self.shape_type),
58 (ShapeId::Table(_), "tables")
59 | (ShapeId::View(_), "views") | (ShapeId::RingBuffer(_), "ringbuffers")
60 | (ShapeId::Series(_), "series") | (ShapeId::Dictionary(_), "dictionaries")
61 );
62 if !type_matches {
63 continue;
64 }
65
66 let catalog: &Catalog = ctx.catalog;
67 let name = match resolve_shape_name(
68 catalog,
69 &mut Transaction::Test(Box::new(t.reborrow())),
70 shape_id,
71 ) {
72 Ok(n) => n,
73 Err(_) => continue,
74 };
75
76 if let Some(filter) = filter_arg.as_deref()
77 && name != filter
78 {
79 continue;
80 }
81
82 mutations.push(MutationEntry {
83 target: name,
84 diff: diff.clone(),
85 });
86 }
87
88 mutations.sort_by(|a, b| a.target.cmp(&b.target));
89 Ok(build_output_columns(&mutations)?)
90 }
91}
92
93fn extract_optional_string_param(params: &Params) -> Option<String> {
94 match params {
95 Params::Positional(args) if !args.is_empty() => match &args[0] {
96 Value::Utf8(s) => Some(s.clone()),
97 _ => None,
98 },
99 _ => None,
100 }
101}
102
103struct MutationEntry {
104 target: String,
105 diff: Diff,
106}
107
108fn resolve_shape_name(catalog: &Catalog, txn: &mut Transaction<'_>, id: &ShapeId) -> Result<String, Error> {
109 match id {
110 ShapeId::Table(table_id) => {
111 let table = catalog
112 .find_table(txn, *table_id)?
113 .ok_or_else(|| internal_error!("table not found for id {:?}", table_id))?;
114 let ns = catalog
115 .find_namespace(txn, table.namespace)?
116 .ok_or_else(|| internal_error!("namespace not found"))?;
117 Ok(format!("{}::{}", ns.name(), table.name))
118 }
119 ShapeId::View(view_id) => {
120 let view = catalog
121 .find_view(txn, *view_id)?
122 .ok_or_else(|| internal_error!("view not found for id {:?}", view_id))?;
123 let ns = catalog
124 .find_namespace(txn, view.namespace())?
125 .ok_or_else(|| internal_error!("namespace not found"))?;
126 Ok(format!("{}::{}", ns.name(), view.name()))
127 }
128 ShapeId::RingBuffer(rb_id) => {
129 let rb = catalog
130 .find_ringbuffer(txn, *rb_id)?
131 .ok_or_else(|| internal_error!("ringbuffer not found for id {:?}", rb_id))?;
132 let ns = catalog
133 .find_namespace(txn, rb.namespace)?
134 .ok_or_else(|| internal_error!("namespace not found"))?;
135 Ok(format!("{}::{}", ns.name(), rb.name))
136 }
137 ShapeId::Series(series_id) => {
138 let series = catalog
139 .find_series(txn, *series_id)?
140 .ok_or_else(|| internal_error!("series not found for id {:?}", series_id))?;
141 let ns = catalog
142 .find_namespace(txn, series.namespace)?
143 .ok_or_else(|| internal_error!("namespace not found"))?;
144 Ok(format!("{}::{}", ns.name(), series.name))
145 }
146 ShapeId::Dictionary(dict_id) => {
147 let dict = catalog
148 .find_dictionary(txn, *dict_id)?
149 .ok_or_else(|| internal_error!("dictionary not found for id {:?}", dict_id))?;
150 let ns = catalog
151 .find_namespace(txn, dict.namespace)?
152 .ok_or_else(|| internal_error!("namespace not found"))?;
153 Ok(format!("{}::{}", ns.name(), dict.name))
154 }
155 _ => Err(internal_error!("unsupported primitive type {:?}", id)),
156 }
157}
158
159fn build_output_columns(entries: &[MutationEntry]) -> Result<Columns, Error> {
160 if entries.is_empty() {
161 return Ok(Columns::empty());
162 }
163
164 let mut op_data = ColumnData::utf8_with_capacity(entries.len());
165 let mut target_data = ColumnData::utf8_with_capacity(entries.len());
166
167 let mut field_names: Vec<String> = Vec::new();
168 for entry in entries {
169 match &entry.diff {
170 Diff::Insert {
171 post,
172 }
173 | Diff::Remove {
174 pre: post,
175 } => {
176 for col in post.iter() {
177 let name = col.name().text().to_string();
178 if !field_names.contains(&name) {
179 field_names.push(name);
180 }
181 }
182 }
183 Diff::Update {
184 pre,
185 post,
186 } => {
187 for col in pre.iter() {
188 let name = col.name().text().to_string();
189 if !field_names.contains(&name) {
190 field_names.push(name);
191 }
192 }
193 for col in post.iter() {
194 let name = col.name().text().to_string();
195 if !field_names.contains(&name) {
196 field_names.push(name);
197 }
198 }
199 }
200 }
201 }
202
203 let mut old_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(entries.len()); field_names.len()];
204 let mut new_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(entries.len()); field_names.len()];
205
206 for entry in entries {
207 let (op, old_cols, new_cols) = match &entry.diff {
208 Diff::Insert {
209 post,
210 } => ("insert", &Columns::empty(), post),
211 Diff::Update {
212 pre,
213 post,
214 } => ("update", pre, post),
215 Diff::Remove {
216 pre,
217 } => ("delete", pre, &Columns::empty()),
218 };
219
220 op_data.push(op);
221 target_data.push(entry.target.as_str());
222
223 for (i, field_name) in field_names.iter().enumerate() {
224 let old_val =
225 old_cols.column(field_name).map(|col| col.data().get_value(0)).unwrap_or(Value::none());
226 old_columns[i].push(old_val);
227
228 let new_val =
229 new_cols.column(field_name).map(|col| col.data().get_value(0)).unwrap_or(Value::none());
230 new_columns[i].push(new_val);
231 }
232 }
233
234 let mut columns = vec![Column::new("op", op_data), Column::new("target", target_data)];
235
236 for (i, name) in field_names.iter().enumerate() {
237 let mut old_data = column_for_values(&old_columns[i]);
238 for val in &old_columns[i] {
239 old_data.push_value(val.clone());
240 }
241 columns.push(Column::new(format!("old_{}", name), old_data));
242
243 let mut new_data = column_for_values(&new_columns[i]);
244 for val in &new_columns[i] {
245 new_data.push_value(val.clone());
246 }
247 columns.push(Column::new(format!("new_{}", name), new_data));
248 }
249
250 Ok(Columns::new(columns))
251}
252
253fn column_for_values(values: &[Value]) -> ColumnData {
254 let first_type = values.iter().find_map(|v| {
255 if matches!(v, Value::None { .. }) {
256 None
257 } else {
258 Some(v.get_type())
259 }
260 });
261 match first_type {
262 Some(ty) => ColumnData::with_capacity(ty, values.len()),
263 None => ColumnData::none_typed(Type::Boolean, 0),
264 }
265}