reifydb_routine/procedure/testing/
event.rs1use std::sync::LazyLock;
5
6use reifydb_core::{
7 internal_error,
8 testing::CapturedEvent,
9 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_transaction::transaction::Transaction;
12use reifydb_type::{
13 error::Error,
14 params::Params,
15 value::{Value, r#type::Type},
16};
17
18use crate::routine::{Routine, RoutineInfo, context::ProcedureContext, error::RoutineError};
19
20static INFO: LazyLock<RoutineInfo> = LazyLock::new(|| RoutineInfo::new("testing::events::dispatched"));
21
22pub struct TestingEventsDispatched;
23
24impl Default for TestingEventsDispatched {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl TestingEventsDispatched {
31 pub fn new() -> Self {
32 Self
33 }
34}
35
36impl<'a, 'tx> Routine<ProcedureContext<'a, 'tx>> for TestingEventsDispatched {
37 fn info(&self) -> &RoutineInfo {
38 &INFO
39 }
40
41 fn return_type(&self, _input_types: &[Type]) -> Type {
42 Type::Any
43 }
44
45 fn execute(&self, ctx: &mut ProcedureContext<'a, 'tx>, _args: &Columns) -> Result<Columns, RoutineError> {
46 let events = match ctx.tx {
47 Transaction::Test(t) => &**t.events,
48 _ => {
49 return Err(internal_error!(
50 "testing::events::dispatched() requires a test transaction"
51 )
52 .into());
53 }
54 };
55 let filter_arg = extract_optional_string_param(ctx.params);
56 Ok(build_dispatched_events(events, filter_arg.as_deref())?)
57 }
58}
59
60fn extract_optional_string_param(params: &Params) -> Option<String> {
61 match params {
62 Params::Positional(args) if !args.is_empty() => match &args[0] {
63 Value::Utf8(s) => Some(s.clone()),
64 _ => None,
65 },
66 _ => None,
67 }
68}
69
70fn build_dispatched_events(events: &[CapturedEvent], filter_name: Option<&str>) -> Result<Columns, Error> {
71 let filter: Option<(&str, &str)> = filter_name.and_then(|s| {
72 let parts: Vec<&str> = s.splitn(2, "::").collect();
73 if parts.len() == 2 {
74 Some((parts[0], parts[1]))
75 } else {
76 None
77 }
78 });
79
80 let events: Vec<_> = events
81 .iter()
82 .filter(|e| {
83 if let Some((ns, name)) = filter {
84 e.namespace == ns && e.event == name
85 } else {
86 true
87 }
88 })
89 .collect();
90
91 if events.is_empty() {
92 return Ok(Columns::empty());
93 }
94
95 let mut seq_data = ColumnBuffer::uint8_with_capacity(events.len());
96 let mut ns_data = ColumnBuffer::utf8_with_capacity(events.len());
97 let mut event_data = ColumnBuffer::utf8_with_capacity(events.len());
98 let mut variant_data = ColumnBuffer::utf8_with_capacity(events.len());
99 let mut depth_data = ColumnBuffer::uint1_with_capacity(events.len());
100
101 let mut field_names: Vec<String> = Vec::new();
102 for event in &events {
103 for col in event.columns.iter() {
104 let name = col.name().text().to_string();
105 if !field_names.contains(&name) {
106 field_names.push(name);
107 }
108 }
109 }
110
111 let mut field_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(events.len()); field_names.len()];
112
113 for event in &events {
114 seq_data.push(event.sequence);
115 ns_data.push(event.namespace.as_str());
116 event_data.push(event.event.as_str());
117 variant_data.push(event.variant.as_str());
118 depth_data.push(event.depth);
119
120 for (i, field_name) in field_names.iter().enumerate() {
121 let val = event
122 .columns
123 .column(field_name)
124 .map(|col| col.data().get_value(0))
125 .unwrap_or(Value::none());
126 field_columns[i].push(val);
127 }
128 }
129
130 let mut columns = vec![
131 ColumnWithName::new("sequence", seq_data),
132 ColumnWithName::new("namespace", ns_data),
133 ColumnWithName::new("event", event_data),
134 ColumnWithName::new("variant", variant_data),
135 ColumnWithName::new("depth", depth_data),
136 ];
137
138 for (i, name) in field_names.iter().enumerate() {
139 let mut data = column_for_values(&field_columns[i]);
140 for val in &field_columns[i] {
141 data.push_value(val.clone());
142 }
143 columns.push(ColumnWithName::new(name.as_str(), data));
144 }
145
146 Ok(Columns::new(columns))
147}
148
149fn column_for_values(values: &[Value]) -> ColumnBuffer {
150 let first_type = values.iter().find_map(|v| {
151 if matches!(v, Value::None { .. }) {
152 None
153 } else {
154 Some(v.get_type())
155 }
156 });
157 match first_type {
158 Some(ty) => ColumnBuffer::with_capacity(ty, values.len()),
159 None => ColumnBuffer::none_typed(Type::Boolean, 0),
160 }
161}