reifydb_sub_flow/operator/scan/
ringbuffer.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::shape::RowShape,
8 interface::{
9 catalog::{flow::FlowNodeId, ringbuffer::RingBuffer, shape::ShapeId},
10 change::{Change, Diff},
11 },
12 key::row::RowKey,
13 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
14};
15use reifydb_type::{
16 Result,
17 fragment::Fragment,
18 value::{datetime::DateTime, row_number::RowNumber},
19};
20
21use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};
22
23pub struct PrimitiveRingBufferOperator {
24 node: FlowNodeId,
25 ringbuffer: RingBuffer,
26}
27
28impl PrimitiveRingBufferOperator {
29 pub fn new(node: FlowNodeId, ringbuffer: RingBuffer) -> Self {
30 Self {
31 node,
32 ringbuffer,
33 }
34 }
35}
36
37impl Operator for PrimitiveRingBufferOperator {
38 fn id(&self) -> FlowNodeId {
39 self.node
40 }
41
42 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
43 let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
44 for diff in change.diffs {
45 decoded_diffs.push(match diff {
46 Diff::Insert {
47 post,
48 } => {
49 let mut decoded = post;
50 decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
51 Diff::insert_arc(decoded)
52 }
53 Diff::Update {
54 pre,
55 post,
56 } => {
57 let mut decoded_pre = pre;
58 let mut decoded_post = post;
59 decode_dictionary_columns(Arc::make_mut(&mut decoded_pre), txn)?;
60 decode_dictionary_columns(Arc::make_mut(&mut decoded_post), txn)?;
61 Diff::update_arc(decoded_pre, decoded_post)
62 }
63 Diff::Remove {
64 pre,
65 } => {
66 let mut decoded = pre;
67 decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
68 Diff::remove_arc(decoded)
69 }
70 });
71 }
72 Ok(Change::from_flow(self.node, change.version, decoded_diffs, change.changed_at))
73 }
74
75 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
76 if rows.is_empty() {
77 return Ok(self.empty_columns());
78 }
79
80 let shape: RowShape = (&self.ringbuffer.columns).into();
81 let fields = shape.fields();
82
83 let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
84 for field in fields.iter() {
85 columns_vec.push(ColumnWithName {
86 name: Fragment::internal(&field.name),
87 data: ColumnBuffer::with_capacity(field.constraint.get_type(), rows.len()),
88 });
89 }
90 let mut row_numbers = Vec::with_capacity(rows.len());
91 let mut created_at = Vec::with_capacity(rows.len());
92 let mut updated_at = Vec::with_capacity(rows.len());
93
94 for row_num in rows {
95 let key = RowKey::encoded(ShapeId::ringbuffer(self.ringbuffer.id), *row_num);
96 if let Some(encoded) = txn.get(&key)? {
97 row_numbers.push(*row_num);
98 created_at.push(DateTime::from_nanos(encoded.created_at_nanos()));
99 updated_at.push(DateTime::from_nanos(encoded.updated_at_nanos()));
100 for (i, _field) in fields.iter().enumerate() {
101 let value = shape.get_value(&encoded, i);
102 columns_vec[i].data.push_value(value);
103 }
104 }
105 }
106
107 if row_numbers.is_empty() {
108 Ok(self.empty_columns())
109 } else {
110 Ok(Columns::with_system_columns(columns_vec, row_numbers, created_at, updated_at))
111 }
112 }
113}
114
115impl PrimitiveRingBufferOperator {
116 fn empty_columns(&self) -> Columns {
117 let columns: Vec<ColumnWithName> = self
118 .ringbuffer
119 .columns
120 .iter()
121 .map(|col| ColumnWithName {
122 name: Fragment::internal(&col.name),
123 data: ColumnBuffer::with_capacity(col.constraint.get_type(), 0),
124 })
125 .collect();
126 Columns::new(columns)
127 }
128}