reifydb_routine/procedure/testing/
event.rs1use reifydb_core::{
5 internal_error,
6 testing::CapturedEvent,
7 value::column::{Column, columns::Columns, data::ColumnData},
8};
9use reifydb_transaction::transaction::Transaction;
10use reifydb_type::{
11 error::Error,
12 params::Params,
13 value::{Value, r#type::Type},
14};
15
16use crate::procedure::{Procedure, context::ProcedureContext, error::ProcedureError};
17
18pub struct TestingEventsDispatched;
19
20impl Default for TestingEventsDispatched {
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl TestingEventsDispatched {
27 pub fn new() -> Self {
28 Self
29 }
30}
31
32impl Procedure for TestingEventsDispatched {
33 fn call(&self, ctx: &ProcedureContext, tx: &mut Transaction<'_>) -> Result<Columns, ProcedureError> {
34 let events = match tx {
35 Transaction::Test(t) => &**t.events,
36 _ => {
37 return Err(internal_error!(
38 "testing::events::dispatched() requires a test transaction"
39 )
40 .into());
41 }
42 };
43 let filter_arg = extract_optional_string_param(ctx.params);
44 Ok(build_dispatched_events(events, filter_arg.as_deref())?)
45 }
46}
47
48fn extract_optional_string_param(params: &Params) -> Option<String> {
49 match params {
50 Params::Positional(args) if !args.is_empty() => match &args[0] {
51 Value::Utf8(s) => Some(s.clone()),
52 _ => None,
53 },
54 _ => None,
55 }
56}
57
58fn build_dispatched_events(events: &[CapturedEvent], filter_name: Option<&str>) -> Result<Columns, Error> {
59 let filter: Option<(&str, &str)> = filter_name.and_then(|s| {
60 let parts: Vec<&str> = s.splitn(2, "::").collect();
61 if parts.len() == 2 {
62 Some((parts[0], parts[1]))
63 } else {
64 None
65 }
66 });
67
68 let events: Vec<_> = events
69 .iter()
70 .filter(|e| {
71 if let Some((ns, name)) = filter {
72 e.namespace == ns && e.event == name
73 } else {
74 true
75 }
76 })
77 .collect();
78
79 if events.is_empty() {
80 return Ok(Columns::empty());
81 }
82
83 let mut seq_data = ColumnData::uint8_with_capacity(events.len());
84 let mut ns_data = ColumnData::utf8_with_capacity(events.len());
85 let mut event_data = ColumnData::utf8_with_capacity(events.len());
86 let mut variant_data = ColumnData::utf8_with_capacity(events.len());
87 let mut depth_data = ColumnData::uint1_with_capacity(events.len());
88
89 let mut field_names: Vec<String> = Vec::new();
90 for event in &events {
91 for col in event.columns.iter() {
92 let name = col.name().text().to_string();
93 if !field_names.contains(&name) {
94 field_names.push(name);
95 }
96 }
97 }
98
99 let mut field_columns: Vec<Vec<Value>> = vec![Vec::with_capacity(events.len()); field_names.len()];
100
101 for event in &events {
102 seq_data.push(event.sequence);
103 ns_data.push(event.namespace.as_str());
104 event_data.push(event.event.as_str());
105 variant_data.push(event.variant.as_str());
106 depth_data.push(event.depth);
107
108 for (i, field_name) in field_names.iter().enumerate() {
109 let val = event
110 .columns
111 .column(field_name)
112 .map(|col| col.data().get_value(0))
113 .unwrap_or(Value::none());
114 field_columns[i].push(val);
115 }
116 }
117
118 let mut columns = vec![
119 Column::new("sequence", seq_data),
120 Column::new("namespace", ns_data),
121 Column::new("event", event_data),
122 Column::new("variant", variant_data),
123 Column::new("depth", depth_data),
124 ];
125
126 for (i, name) in field_names.iter().enumerate() {
127 let mut data = column_for_values(&field_columns[i]);
128 for val in &field_columns[i] {
129 data.push_value(val.clone());
130 }
131 columns.push(Column::new(name.as_str(), data));
132 }
133
134 Ok(Columns::new(columns))
135}
136
137fn column_for_values(values: &[Value]) -> ColumnData {
138 let first_type = values.iter().find_map(|v| {
139 if matches!(v, Value::None { .. }) {
140 None
141 } else {
142 Some(v.get_type())
143 }
144 });
145 match first_type {
146 Some(ty) => ColumnData::with_capacity(ty, values.len()),
147 None => ColumnData::none_typed(Type::Boolean, 0),
148 }
149}