Skip to main content

reifydb_sub_flow/operator/scan/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	encoded::schema::RowSchema,
6	interface::{
7		catalog::{flow::FlowNodeId, ringbuffer::RingBuffer, schema::SchemaId},
8		change::{Change, Diff},
9	},
10	key::row::RowKey,
11	value::column::{Column, columns::Columns, data::ColumnData},
12};
13use reifydb_type::{Result, fragment::Fragment, util::cowvec::CowVec, value::row_number::RowNumber};
14
15use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};
16
17pub struct PrimitiveRingBufferOperator {
18	node: FlowNodeId,
19	ringbuffer: RingBuffer,
20}
21
22impl PrimitiveRingBufferOperator {
23	pub fn new(node: FlowNodeId, ringbuffer: RingBuffer) -> Self {
24		Self {
25			node,
26			ringbuffer,
27		}
28	}
29}
30
31impl Operator for PrimitiveRingBufferOperator {
32	fn id(&self) -> FlowNodeId {
33		self.node
34	}
35
36	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
37		let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
38		for diff in change.diffs {
39			decoded_diffs.push(match diff {
40				Diff::Insert {
41					post,
42				} => {
43					let mut decoded = post;
44					decode_dictionary_columns(&mut decoded, txn)?;
45					Diff::Insert {
46						post: decoded,
47					}
48				}
49				Diff::Update {
50					pre,
51					post,
52				} => {
53					let mut decoded_pre = pre;
54					let mut decoded_post = post;
55					decode_dictionary_columns(&mut decoded_pre, txn)?;
56					decode_dictionary_columns(&mut decoded_post, txn)?;
57					Diff::Update {
58						pre: decoded_pre,
59						post: decoded_post,
60					}
61				}
62				Diff::Remove {
63					pre,
64				} => {
65					let mut decoded = pre;
66					decode_dictionary_columns(&mut decoded, txn)?;
67					Diff::Remove {
68						pre: decoded,
69					}
70				}
71			});
72		}
73		Ok(Change::from_flow(self.node, change.version, decoded_diffs))
74	}
75
76	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
77		if rows.is_empty() {
78			return Ok(self.empty_columns());
79		}
80
81		let schema: RowSchema = (&self.ringbuffer.columns).into();
82		let fields = schema.fields();
83
84		let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
85		for field in fields.iter() {
86			columns_vec.push(Column {
87				name: Fragment::internal(&field.name),
88				data: ColumnData::with_capacity(field.constraint.get_type(), rows.len()),
89			});
90		}
91		let mut row_numbers = Vec::with_capacity(rows.len());
92
93		for row_num in rows {
94			let key = RowKey::encoded(SchemaId::ringbuffer(self.ringbuffer.id), *row_num);
95			if let Some(encoded) = txn.get(&key)? {
96				row_numbers.push(*row_num);
97				for (i, _field) in fields.iter().enumerate() {
98					let value = schema.get_value(&encoded, i);
99					columns_vec[i].data.push_value(value);
100				}
101			}
102		}
103
104		if row_numbers.is_empty() {
105			Ok(self.empty_columns())
106		} else {
107			Ok(Columns {
108				row_numbers: CowVec::new(row_numbers),
109				columns: CowVec::new(columns_vec),
110			})
111		}
112	}
113}
114
115impl PrimitiveRingBufferOperator {
116	fn empty_columns(&self) -> Columns {
117		let columns: Vec<Column> = self
118			.ringbuffer
119			.columns
120			.iter()
121			.map(|col| Column {
122				name: Fragment::internal(&col.name),
123				data: ColumnData::with_capacity(col.constraint.get_type(), 0),
124			})
125			.collect();
126		Columns {
127			row_numbers: CowVec::new(Vec::new()),
128			columns: CowVec::new(columns),
129		}
130	}
131}