reifydb_routine/procedure/testing/
handler.rs1use std::sync::LazyLock;
5
6use reifydb_core::{
7 internal_error,
8 testing::CapturedInvocation,
9 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_transaction::transaction::Transaction;
12use reifydb_type::{
13 error::Error,
14 params::Params,
15 value::{Value, r#type::Type},
16};
17
18use crate::routine::{Routine, RoutineInfo, context::ProcedureContext, error::RoutineError};
19
20static INFO: LazyLock<RoutineInfo> = LazyLock::new(|| RoutineInfo::new("testing::handlers::invoked"));
21
22pub struct TestingHandlersInvoked;
23
24impl Default for TestingHandlersInvoked {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl TestingHandlersInvoked {
31 pub fn new() -> Self {
32 Self
33 }
34}
35
36impl<'a, 'tx> Routine<ProcedureContext<'a, 'tx>> for TestingHandlersInvoked {
37 fn info(&self) -> &RoutineInfo {
38 &INFO
39 }
40
41 fn return_type(&self, _input_types: &[Type]) -> Type {
42 Type::Any
43 }
44
45 fn execute(&self, ctx: &mut ProcedureContext<'a, 'tx>, _args: &Columns) -> Result<Columns, RoutineError> {
46 let invocations = match ctx.tx {
47 Transaction::Test(t) => &**t.invocations,
48 _ => {
49 return Err(internal_error!(
50 "testing::handlers::invoked() requires a test transaction"
51 )
52 .into());
53 }
54 };
55 let filter_arg = extract_optional_string_param(ctx.params);
56 Ok(build_invocations(invocations, filter_arg.as_deref())?)
57 }
58}
59
60fn extract_optional_string_param(params: &Params) -> Option<String> {
61 match params {
62 Params::Positional(args) if !args.is_empty() => match &args[0] {
63 Value::Utf8(s) => Some(s.clone()),
64 _ => None,
65 },
66 _ => None,
67 }
68}
69
70fn build_invocations(invocations: &[CapturedInvocation], filter_name: Option<&str>) -> Result<Columns, Error> {
71 let filter: Option<(&str, &str)> = filter_name.and_then(|s| {
72 let parts: Vec<&str> = s.splitn(2, "::").collect();
73 if parts.len() == 2 {
74 Some((parts[0], parts[1]))
75 } else {
76 None
77 }
78 });
79
80 let invocations: Vec<_> = invocations
81 .iter()
82 .filter(|inv| {
83 if let Some((ns, name)) = filter {
84 inv.namespace == ns && inv.handler == name
85 } else {
86 true
87 }
88 })
89 .collect();
90
91 if invocations.is_empty() {
92 return Ok(Columns::empty());
93 }
94
95 let mut seq_data = ColumnBuffer::uint8_with_capacity(invocations.len());
96 let mut ns_data = ColumnBuffer::utf8_with_capacity(invocations.len());
97 let mut handler_data = ColumnBuffer::utf8_with_capacity(invocations.len());
98 let mut event_data = ColumnBuffer::utf8_with_capacity(invocations.len());
99 let mut variant_data = ColumnBuffer::utf8_with_capacity(invocations.len());
100 let mut duration_data = ColumnBuffer::uint8_with_capacity(invocations.len());
101 let mut outcome_data = ColumnBuffer::utf8_with_capacity(invocations.len());
102 let mut message_data = ColumnBuffer::utf8_with_capacity(invocations.len());
103
104 for inv in &invocations {
105 seq_data.push(inv.sequence);
106 ns_data.push(inv.namespace.as_str());
107 handler_data.push(inv.handler.as_str());
108 event_data.push(inv.event.as_str());
109 variant_data.push(inv.variant.as_str());
110 duration_data.push(inv.duration_ns);
111 outcome_data.push(inv.outcome.as_str());
112 message_data.push(inv.message.as_str());
113 }
114
115 Ok(Columns::new(vec![
116 ColumnWithName::new("sequence", seq_data),
117 ColumnWithName::new("namespace", ns_data),
118 ColumnWithName::new("handler", handler_data),
119 ColumnWithName::new("event", event_data),
120 ColumnWithName::new("variant", variant_data),
121 ColumnWithName::new("duration", duration_data),
122 ColumnWithName::new("outcome", outcome_data),
123 ColumnWithName::new("message", message_data),
124 ]))
125}