Skip to main content

reifydb_routine/procedure/testing/
handler.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::LazyLock;
5
6use reifydb_core::{
7	internal_error,
8	testing::CapturedInvocation,
9	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_transaction::transaction::Transaction;
12use reifydb_type::{
13	error::Error,
14	params::Params,
15	value::{Value, r#type::Type},
16};
17
18use crate::routine::{Routine, RoutineInfo, context::ProcedureContext, error::RoutineError};
19
20static INFO: LazyLock<RoutineInfo> = LazyLock::new(|| RoutineInfo::new("testing::handlers::invoked"));
21
22pub struct TestingHandlersInvoked;
23
24impl Default for TestingHandlersInvoked {
25	fn default() -> Self {
26		Self::new()
27	}
28}
29
30impl TestingHandlersInvoked {
31	pub fn new() -> Self {
32		Self
33	}
34}
35
36impl<'a, 'tx> Routine<ProcedureContext<'a, 'tx>> for TestingHandlersInvoked {
37	fn info(&self) -> &RoutineInfo {
38		&INFO
39	}
40
41	fn return_type(&self, _input_types: &[Type]) -> Type {
42		Type::Any
43	}
44
45	fn execute(&self, ctx: &mut ProcedureContext<'a, 'tx>, _args: &Columns) -> Result<Columns, RoutineError> {
46		let invocations = match ctx.tx {
47			Transaction::Test(t) => &**t.invocations,
48			_ => {
49				return Err(internal_error!(
50					"testing::handlers::invoked() requires a test transaction"
51				)
52				.into());
53			}
54		};
55		let filter_arg = extract_optional_string_param(ctx.params);
56		Ok(build_invocations(invocations, filter_arg.as_deref())?)
57	}
58}
59
60fn extract_optional_string_param(params: &Params) -> Option<String> {
61	match params {
62		Params::Positional(args) if !args.is_empty() => match &args[0] {
63			Value::Utf8(s) => Some(s.clone()),
64			_ => None,
65		},
66		_ => None,
67	}
68}
69
70fn build_invocations(invocations: &[CapturedInvocation], filter_name: Option<&str>) -> Result<Columns, Error> {
71	let filter: Option<(&str, &str)> = filter_name.and_then(|s| {
72		let parts: Vec<&str> = s.splitn(2, "::").collect();
73		if parts.len() == 2 {
74			Some((parts[0], parts[1]))
75		} else {
76			None
77		}
78	});
79
80	let invocations: Vec<_> = invocations
81		.iter()
82		.filter(|inv| {
83			if let Some((ns, name)) = filter {
84				inv.namespace == ns && inv.handler == name
85			} else {
86				true
87			}
88		})
89		.collect();
90
91	if invocations.is_empty() {
92		return Ok(Columns::empty());
93	}
94
95	let mut seq_data = ColumnBuffer::uint8_with_capacity(invocations.len());
96	let mut ns_data = ColumnBuffer::utf8_with_capacity(invocations.len());
97	let mut handler_data = ColumnBuffer::utf8_with_capacity(invocations.len());
98	let mut event_data = ColumnBuffer::utf8_with_capacity(invocations.len());
99	let mut variant_data = ColumnBuffer::utf8_with_capacity(invocations.len());
100	let mut duration_data = ColumnBuffer::uint8_with_capacity(invocations.len());
101	let mut outcome_data = ColumnBuffer::utf8_with_capacity(invocations.len());
102	let mut message_data = ColumnBuffer::utf8_with_capacity(invocations.len());
103
104	for inv in &invocations {
105		seq_data.push(inv.sequence);
106		ns_data.push(inv.namespace.as_str());
107		handler_data.push(inv.handler.as_str());
108		event_data.push(inv.event.as_str());
109		variant_data.push(inv.variant.as_str());
110		duration_data.push(inv.duration_ns);
111		outcome_data.push(inv.outcome.as_str());
112		message_data.push(inv.message.as_str());
113	}
114
115	Ok(Columns::new(vec![
116		ColumnWithName::new("sequence", seq_data),
117		ColumnWithName::new("namespace", ns_data),
118		ColumnWithName::new("handler", handler_data),
119		ColumnWithName::new("event", event_data),
120		ColumnWithName::new("variant", variant_data),
121		ColumnWithName::new("duration", duration_data),
122		ColumnWithName::new("outcome", outcome_data),
123		ColumnWithName::new("message", message_data),
124	]))
125}