Skip to main content

reifydb_routine/procedure/testing/
event.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	internal_error,
6	testing::CapturedEvent,
7	value::column::{Column, columns::Columns, data::ColumnData},
8};
9use reifydb_transaction::transaction::Transaction;
10use reifydb_type::{
11	error::Error,
12	params::Params,
13	value::{Value, r#type::Type},
14};
15
16use crate::procedure::{Procedure, context::ProcedureContext, error::ProcedureError};
17
18pub struct TestingEventsDispatched;
19
20impl Default for TestingEventsDispatched {
21	fn default() -> Self {
22		Self::new()
23	}
24}
25
26impl TestingEventsDispatched {
27	pub fn new() -> Self {
28		Self
29	}
30}
31
32impl Procedure for TestingEventsDispatched {
33	fn call(&self, ctx: &ProcedureContext, tx: &mut Transaction<'_>) -> Result<Columns, ProcedureError> {
34		let events = match tx {
35			Transaction::Test(t) => &**t.events,
36			_ => {
37				return Err(internal_error!(
38					"testing::events::dispatched() requires a test transaction"
39				)
40				.into());
41			}
42		};
43		let filter_arg = extract_optional_string_param(ctx.params);
44		Ok(build_dispatched_events(events, filter_arg.as_deref())?)
45	}
46}
47
48fn extract_optional_string_param(params: &Params) -> Option<String> {
49	match params {
50		Params::Positional(args) if !args.is_empty() => match &args[0] {
51			Value::Utf8(s) => Some(s.clone()),
52			_ => None,
53		},
54		_ => None,
55	}
56}
57
58fn build_dispatched_events(events: &[CapturedEvent], filter_name: Option<&str>) -> Result<Columns, Error> {
59	let filter: Option<(&str, &str)> = filter_name.and_then(|s| {
60		let parts: Vec<&str> = s.splitn(2, "::").collect();
61		if parts.len() == 2 {
62			Some((parts[0], parts[1]))
63		} else {
64			None
65		}
66	});
67
68	let events: Vec<_> = events
69		.iter()
70		.filter(|e| {
71			if let Some((ns, name)) = filter {
72				e.namespace == ns && e.event == name
73			} else {
74				true
75			}
76		})
77		.collect();
78
79	if events.is_empty() {
80		return Ok(Columns::empty());
81	}
82
83	let mut seq_data = ColumnData::uint8_with_capacity(events.len());
84	let mut ns_data = ColumnData::utf8_with_capacity(events.len());
85	let mut event_data = ColumnData::utf8_with_capacity(events.len());
86	let mut variant_data = ColumnData::utf8_with_capacity(events.len());
87	let mut depth_data = ColumnData::uint1_with_capacity(events.len());
88
89	let mut field_names: Vec<String> = Vec::new();
90	for event in &events {
91		for col in event.columns.iter() {
92			let name = col.name().text().to_string();
93			if !field_names.contains(&name) {
94				field_names.push(name);
95			}
96		}
97	}
98
99	let mut field_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(events.len()); field_names.len()];
100
101	for event in &events {
102		seq_data.push(event.sequence);
103		ns_data.push(event.namespace.as_str());
104		event_data.push(event.event.as_str());
105		variant_data.push(event.variant.as_str());
106		depth_data.push(event.depth);
107
108		for (i, field_name) in field_names.iter().enumerate() {
109			let val = event
110				.columns
111				.column(field_name)
112				.map(|col| col.data().get_value(0))
113				.unwrap_or(Value::none());
114			field_columns[i].push(val);
115		}
116	}
117
118	let mut columns = vec![
119		Column::new("sequence", seq_data),
120		Column::new("namespace", ns_data),
121		Column::new("event", event_data),
122		Column::new("variant", variant_data),
123		Column::new("depth", depth_data),
124	];
125
126	for (i, name) in field_names.iter().enumerate() {
127		let mut data = column_for_values(&field_columns[i]);
128		for val in &field_columns[i] {
129			data.push_value(val.clone());
130		}
131		columns.push(Column::new(name.as_str(), data));
132	}
133
134	Ok(Columns::new(columns))
135}
136
137fn column_for_values(values: &[Value]) -> ColumnData {
138	let first_type = values.iter().find_map(|v| {
139		if matches!(v, Value::None { .. }) {
140			None
141		} else {
142			Some(v.get_type())
143		}
144	});
145	match first_type {
146		Some(ty) => ColumnData::with_capacity(ty, values.len()),
147		None => ColumnData::none_typed(Type::Boolean, 0),
148	}
149}