reifydb_flow_operator_sdk/testing/
builders.rs1use reifydb_core::{
5 CommitVersion, Row,
6 interface::{FlowNodeId, SourceId, TableId},
7 value::encoded::{EncodedValuesLayout, EncodedValuesNamedLayout},
8};
9use reifydb_type::{RowNumber, Type, Value};
10
11use crate::{FlowChange, FlowChangeOrigin, FlowDiff};
12
13pub struct TestRowBuilder {
15 row_number: RowNumber,
16 values: Vec<Value>,
17 layout: Option<EncodedValuesLayout>,
18 named_layout: Option<EncodedValuesNamedLayout>,
19}
20
21impl TestRowBuilder {
22 pub fn new(row_number: impl Into<RowNumber>) -> Self {
24 Self {
25 row_number: row_number.into(),
26 values: Vec::new(),
27 layout: None,
28 named_layout: None,
29 }
30 }
31
32 pub fn with_values(mut self, values: Vec<Value>) -> Self {
34 self.values = values;
35 self
36 }
37
38 pub fn add_value(mut self, value: Value) -> Self {
40 self.values.push(value);
41 self
42 }
43
44 pub fn with_layout(mut self, layout: EncodedValuesLayout) -> Self {
46 self.layout = Some(layout);
47 self.named_layout = None;
48 self
49 }
50
51 pub fn with_named_layout(mut self, layout: EncodedValuesNamedLayout) -> Self {
53 self.named_layout = Some(layout);
54 self.layout = None;
55 self
56 }
57
58 pub fn build(self) -> Row {
60 if let Some(named_layout) = self.named_layout {
61 let mut encoded = named_layout.allocate();
63 named_layout.set_values(&mut encoded, &self.values);
64
65 return Row {
66 number: self.row_number,
67 encoded,
68 layout: named_layout,
69 };
70 }
71
72 let types: Vec<Type> = if let Some(layout) = self.layout {
74 (0..layout.fields.len()).map(|i| layout.fields[i].r#type).collect()
75 } else {
76 self.values.iter().map(|v| v.get_type()).collect()
77 };
78
79 let fields: Vec<(String, Type)> =
81 types.iter().enumerate().map(|(i, t)| (format!("field{}", i), *t)).collect();
82
83 let named_layout = EncodedValuesNamedLayout::new(fields);
84 let mut encoded = named_layout.allocate();
85 named_layout.set_values(&mut encoded, &self.values);
86
87 Row {
88 number: self.row_number,
89 encoded,
90 layout: named_layout,
91 }
92 }
93}
94
95pub struct TestFlowChangeBuilder {
97 origin: FlowChangeOrigin,
98 diffs: Vec<FlowDiff>,
99 version: CommitVersion,
100}
101
102impl TestFlowChangeBuilder {
103 pub fn new() -> Self {
105 Self {
106 origin: FlowChangeOrigin::External(SourceId::Table(TableId(1))),
107 diffs: Vec::new(),
108 version: CommitVersion(1),
109 }
110 }
111
112 pub fn changed_by_source(mut self, source: SourceId) -> Self {
114 self.origin = FlowChangeOrigin::External(source);
115 self
116 }
117
118 pub fn changed_by_node(mut self, node: FlowNodeId) -> Self {
120 self.origin = FlowChangeOrigin::Internal(node);
121 self
122 }
123
124 pub fn with_version(mut self, version: CommitVersion) -> Self {
126 self.version = version;
127 self
128 }
129
130 pub fn insert(mut self, row: Row) -> Self {
132 self.diffs.push(FlowDiff::Insert {
133 post: row,
134 });
135 self
136 }
137
138 pub fn insert_row(self, row_number: impl Into<RowNumber>, values: Vec<Value>) -> Self {
140 let row = TestRowBuilder::new(row_number).with_values(values).build();
141 self.insert(row)
142 }
143
144 pub fn update(mut self, pre: Row, post: Row) -> Self {
146 self.diffs.push(FlowDiff::Update {
147 pre,
148 post,
149 });
150 self
151 }
152
153 pub fn update_row(
155 self,
156 row_number: impl Into<RowNumber>,
157 pre_values: Vec<Value>,
158 post_values: Vec<Value>,
159 ) -> Self {
160 let row_number = row_number.into();
161 let pre = TestRowBuilder::new(row_number).with_values(pre_values).build();
162 let post = TestRowBuilder::new(row_number).with_values(post_values).build();
163 self.update(pre, post)
164 }
165
166 pub fn remove(mut self, row: Row) -> Self {
168 self.diffs.push(FlowDiff::Remove {
169 pre: row,
170 });
171 self
172 }
173
174 pub fn remove_row(self, row_number: impl Into<RowNumber>, values: Vec<Value>) -> Self {
176 let row = TestRowBuilder::new(row_number).with_values(values).build();
177 self.remove(row)
178 }
179
180 pub fn build(self) -> FlowChange {
182 FlowChange {
183 origin: self.origin,
184 diffs: self.diffs,
185 version: self.version,
186 }
187 }
188}
189
190pub struct TestLayoutBuilder {
192 types: Vec<Type>,
193 names: Option<Vec<String>>,
194}
195
196impl TestLayoutBuilder {
197 pub fn new() -> Self {
199 Self {
200 types: Vec::new(),
201 names: None,
202 }
203 }
204
205 pub fn add_type(mut self, ty: Type) -> Self {
207 self.types.push(ty);
208 self
209 }
210
211 pub fn add_field(mut self, name: impl Into<String>, ty: Type) -> Self {
213 if self.names.is_none() {
214 self.names = Some(Vec::new());
215 }
216 self.names.as_mut().unwrap().push(name.into());
217 self.types.push(ty);
218 self
219 }
220
221 pub fn build(self) -> EncodedValuesLayout {
223 EncodedValuesLayout::new(&self.types)
224 }
225
226 pub fn build_named(self) -> EncodedValuesNamedLayout {
228 let names = self.names.unwrap_or_else(|| {
229 (0..self.types.len()).map(|i| format!("field{}", i)).collect()
231 });
232
233 let fields: Vec<(String, Type)> = names.into_iter().zip(self.types.into_iter()).collect();
234
235 EncodedValuesNamedLayout::new(fields)
236 }
237}
238
239pub mod helpers {
241 use super::*;
242
243 pub fn counter_layout() -> EncodedValuesLayout {
245 TestLayoutBuilder::new().add_type(Type::Int8).build()
246 }
247
248 pub fn key_value_layout() -> EncodedValuesLayout {
250 TestLayoutBuilder::new().add_type(Type::Utf8).add_type(Type::Int8).build()
251 }
252
253 pub fn named_key_value_layout() -> EncodedValuesNamedLayout {
255 TestLayoutBuilder::new().add_field("key", Type::Utf8).add_field("value", Type::Int8).build_named()
256 }
257
258 pub fn int_row(row_number: impl Into<RowNumber>, value: i8) -> Row {
260 TestRowBuilder::new(row_number).with_values(vec![Value::Int8(value as i64)]).build()
261 }
262
263 pub fn key_value_row(row_number: impl Into<RowNumber>, key: &str, value: i8) -> Row {
265 TestRowBuilder::new(row_number)
266 .with_values(vec![Value::Utf8(key.into()), Value::Int8(value as i64)])
267 .build()
268 }
269
270 pub fn empty_change() -> FlowChange {
272 TestFlowChangeBuilder::new().build()
273 }
274
275 pub fn insert_change(row: Row) -> FlowChange {
277 TestFlowChangeBuilder::new().insert(row).build()
278 }
279
280 pub fn batch_insert_change(rows: Vec<Row>) -> FlowChange {
282 let mut builder = TestFlowChangeBuilder::new();
283 for row in rows {
284 builder = builder.insert(row);
285 }
286 builder.build()
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::{helpers::*, *};
293
294 #[test]
295 fn test_row_builder() {
296 let row = TestRowBuilder::new(42)
297 .add_value(Value::Int8(10i64))
298 .add_value(Value::Utf8("test".into()))
299 .build();
300
301 assert_eq!(row.number, RowNumber(42));
302 assert_eq!(row.layout.names().len(), 2);
303 }
304
305 #[test]
306 fn test_flow_change_builder() {
307 let change = TestFlowChangeBuilder::new()
308 .changed_by_source(SourceId::table(100))
309 .with_version(CommitVersion(5))
310 .insert_row(1, vec![Value::Int8(42i64)])
311 .update_row(2, vec![Value::Int8(10i64)], vec![Value::Int8(20i64)])
312 .remove_row(3, vec![Value::Int8(30i64)])
313 .build();
314
315 assert_eq!(change.version, CommitVersion(5));
316 assert_eq!(change.diffs.len(), 3);
317
318 match &change.origin {
319 FlowChangeOrigin::External(source) => {
320 assert_eq!(*source, SourceId::table(100));
321 }
322 _ => panic!("Expected external origin"),
323 }
324 }
325
326 #[test]
327 fn test_layout_builder() {
328 let unnamed = TestLayoutBuilder::new().add_type(Type::Int8).add_type(Type::Utf8).build();
329
330 assert_eq!(unnamed.fields.len(), 2);
331
332 let named = TestLayoutBuilder::new()
333 .add_field("count", Type::Int8)
334 .add_field("name", Type::Utf8)
335 .build_named();
336
337 assert_eq!(named.fields().fields.len(), 2);
338 assert_eq!(named.names()[0].as_str(), "count");
339 assert_eq!(named.names()[1].as_str(), "name");
340 }
341
342 #[test]
343 fn test_helpers() {
344 let row = int_row(1, 42);
345 assert_eq!(row.number, RowNumber(1));
346
347 let kv_row = key_value_row(2, "test", 100);
348 assert_eq!(kv_row.number, RowNumber(2));
349
350 let change = insert_change(row.clone());
351 assert_eq!(change.diffs.len(), 1);
352
353 let batch = batch_insert_change(vec![row.clone(), kv_row.clone()]);
354 assert_eq!(batch.diffs.len(), 2);
355 }
356}