reifydb_routine/procedure/testing/
handler.rs1use reifydb_core::{
5 internal_error,
6 testing::CapturedInvocation,
7 value::column::{Column, columns::Columns, data::ColumnData},
8};
9use reifydb_transaction::transaction::Transaction;
10use reifydb_type::{error::Error, params::Params, value::Value};
11
12use crate::procedure::{Procedure, context::ProcedureContext, error::ProcedureError};
13
14pub struct TestingHandlersInvoked;
15
16impl Default for TestingHandlersInvoked {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl TestingHandlersInvoked {
23 pub fn new() -> Self {
24 Self
25 }
26}
27
28impl Procedure for TestingHandlersInvoked {
29 fn call(&self, ctx: &ProcedureContext, tx: &mut Transaction<'_>) -> Result<Columns, ProcedureError> {
30 let invocations = match tx {
31 Transaction::Test(t) => &**t.invocations,
32 _ => {
33 return Err(internal_error!(
34 "testing::handlers::invoked() requires a test transaction"
35 )
36 .into());
37 }
38 };
39 let filter_arg = extract_optional_string_param(ctx.params);
40 Ok(build_invocations(invocations, filter_arg.as_deref())?)
41 }
42}
43
44fn extract_optional_string_param(params: &Params) -> Option<String> {
45 match params {
46 Params::Positional(args) if !args.is_empty() => match &args[0] {
47 Value::Utf8(s) => Some(s.clone()),
48 _ => None,
49 },
50 _ => None,
51 }
52}
53
54fn build_invocations(invocations: &[CapturedInvocation], filter_name: Option<&str>) -> Result<Columns, Error> {
55 let filter: Option<(&str, &str)> = filter_name.and_then(|s| {
56 let parts: Vec<&str> = s.splitn(2, "::").collect();
57 if parts.len() == 2 {
58 Some((parts[0], parts[1]))
59 } else {
60 None
61 }
62 });
63
64 let invocations: Vec<_> = invocations
65 .iter()
66 .filter(|inv| {
67 if let Some((ns, name)) = filter {
68 inv.namespace == ns && inv.handler == name
69 } else {
70 true
71 }
72 })
73 .collect();
74
75 if invocations.is_empty() {
76 return Ok(Columns::empty());
77 }
78
79 let mut seq_data = ColumnData::uint8_with_capacity(invocations.len());
80 let mut ns_data = ColumnData::utf8_with_capacity(invocations.len());
81 let mut handler_data = ColumnData::utf8_with_capacity(invocations.len());
82 let mut event_data = ColumnData::utf8_with_capacity(invocations.len());
83 let mut variant_data = ColumnData::utf8_with_capacity(invocations.len());
84 let mut duration_data = ColumnData::uint8_with_capacity(invocations.len());
85 let mut outcome_data = ColumnData::utf8_with_capacity(invocations.len());
86 let mut message_data = ColumnData::utf8_with_capacity(invocations.len());
87
88 for inv in &invocations {
89 seq_data.push(inv.sequence);
90 ns_data.push(inv.namespace.as_str());
91 handler_data.push(inv.handler.as_str());
92 event_data.push(inv.event.as_str());
93 variant_data.push(inv.variant.as_str());
94 duration_data.push(inv.duration_ns);
95 outcome_data.push(inv.outcome.as_str());
96 message_data.push(inv.message.as_str());
97 }
98
99 Ok(Columns::new(vec![
100 Column::new("sequence", seq_data),
101 Column::new("namespace", ns_data),
102 Column::new("handler", handler_data),
103 Column::new("event", event_data),
104 Column::new("variant", variant_data),
105 Column::new("duration", duration_data),
106 Column::new("outcome", outcome_data),
107 Column::new("message", message_data),
108 ]))
109}