Skip to main content

reifydb_routine/procedure/testing/
event.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::LazyLock;
5
6use reifydb_core::{
7	internal_error,
8	testing::CapturedEvent,
9	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_transaction::transaction::Transaction;
12use reifydb_type::{
13	error::Error,
14	params::Params,
15	value::{Value, r#type::Type},
16};
17
18use crate::routine::{Routine, RoutineInfo, context::ProcedureContext, error::RoutineError};
19
20static INFO: LazyLock<RoutineInfo> = LazyLock::new(|| RoutineInfo::new("testing::events::dispatched"));
21
22pub struct TestingEventsDispatched;
23
24impl Default for TestingEventsDispatched {
25	fn default() -> Self {
26		Self::new()
27	}
28}
29
30impl TestingEventsDispatched {
31	pub fn new() -> Self {
32		Self
33	}
34}
35
36impl<'a, 'tx> Routine<ProcedureContext<'a, 'tx>> for TestingEventsDispatched {
37	fn info(&self) -> &RoutineInfo {
38		&INFO
39	}
40
41	fn return_type(&self, _input_types: &[Type]) -> Type {
42		Type::Any
43	}
44
45	fn execute(&self, ctx: &mut ProcedureContext<'a, 'tx>, _args: &Columns) -> Result<Columns, RoutineError> {
46		let events = match ctx.tx {
47			Transaction::Test(t) => &**t.events,
48			_ => {
49				return Err(internal_error!(
50					"testing::events::dispatched() requires a test transaction"
51				)
52				.into());
53			}
54		};
55		let filter_arg = extract_optional_string_param(ctx.params);
56		Ok(build_dispatched_events(events, filter_arg.as_deref())?)
57	}
58}
59
60fn extract_optional_string_param(params: &Params) -> Option<String> {
61	match params {
62		Params::Positional(args) if !args.is_empty() => match &args[0] {
63			Value::Utf8(s) => Some(s.clone()),
64			_ => None,
65		},
66		_ => None,
67	}
68}
69
70fn build_dispatched_events(events: &[CapturedEvent], filter_name: Option<&str>) -> Result<Columns, Error> {
71	let filter: Option<(&str, &str)> = filter_name.and_then(|s| {
72		let parts: Vec<&str> = s.splitn(2, "::").collect();
73		if parts.len() == 2 {
74			Some((parts[0], parts[1]))
75		} else {
76			None
77		}
78	});
79
80	let events: Vec<_> = events
81		.iter()
82		.filter(|e| {
83			if let Some((ns, name)) = filter {
84				e.namespace == ns && e.event == name
85			} else {
86				true
87			}
88		})
89		.collect();
90
91	if events.is_empty() {
92		return Ok(Columns::empty());
93	}
94
95	let mut seq_data = ColumnBuffer::uint8_with_capacity(events.len());
96	let mut ns_data = ColumnBuffer::utf8_with_capacity(events.len());
97	let mut event_data = ColumnBuffer::utf8_with_capacity(events.len());
98	let mut variant_data = ColumnBuffer::utf8_with_capacity(events.len());
99	let mut depth_data = ColumnBuffer::uint1_with_capacity(events.len());
100
101	let mut field_names: Vec<String> = Vec::new();
102	for event in &events {
103		for col in event.columns.iter() {
104			let name = col.name().text().to_string();
105			if !field_names.contains(&name) {
106				field_names.push(name);
107			}
108		}
109	}
110
111	let mut field_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(events.len()); field_names.len()];
112
113	for event in &events {
114		seq_data.push(event.sequence);
115		ns_data.push(event.namespace.as_str());
116		event_data.push(event.event.as_str());
117		variant_data.push(event.variant.as_str());
118		depth_data.push(event.depth);
119
120		for (i, field_name) in field_names.iter().enumerate() {
121			let val = event
122				.columns
123				.column(field_name)
124				.map(|col| col.data().get_value(0))
125				.unwrap_or(Value::none());
126			field_columns[i].push(val);
127		}
128	}
129
130	let mut columns = vec![
131		ColumnWithName::new("sequence", seq_data),
132		ColumnWithName::new("namespace", ns_data),
133		ColumnWithName::new("event", event_data),
134		ColumnWithName::new("variant", variant_data),
135		ColumnWithName::new("depth", depth_data),
136	];
137
138	for (i, name) in field_names.iter().enumerate() {
139		let mut data = column_for_values(&field_columns[i]);
140		for val in &field_columns[i] {
141			data.push_value(val.clone());
142		}
143		columns.push(ColumnWithName::new(name.as_str(), data));
144	}
145
146	Ok(Columns::new(columns))
147}
148
149fn column_for_values(values: &[Value]) -> ColumnBuffer {
150	let first_type = values.iter().find_map(|v| {
151		if matches!(v, Value::None { .. }) {
152			None
153		} else {
154			Some(v.get_type())
155		}
156	});
157	match first_type {
158		Some(ty) => ColumnBuffer::with_capacity(ty, values.len()),
159		None => ColumnBuffer::none_typed(Type::Boolean, 0),
160	}
161}