Skip to main content

reifydb_sub_flow/operator/scan/
ringbuffer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::shape::RowShape,
8	interface::{
9		catalog::{flow::FlowNodeId, ringbuffer::RingBuffer, shape::ShapeId},
10		change::{Change, Diff},
11	},
12	key::row::RowKey,
13	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
14};
15use reifydb_type::{
16	Result,
17	fragment::Fragment,
18	value::{datetime::DateTime, row_number::RowNumber},
19};
20
21use crate::{Operator, operator::sink::decode_dictionary_columns, transaction::FlowTransaction};
22
23pub struct PrimitiveRingBufferOperator {
24	node: FlowNodeId,
25	ringbuffer: RingBuffer,
26}
27
28impl PrimitiveRingBufferOperator {
29	pub fn new(node: FlowNodeId, ringbuffer: RingBuffer) -> Self {
30		Self {
31			node,
32			ringbuffer,
33		}
34	}
35}
36
37impl Operator for PrimitiveRingBufferOperator {
38	fn id(&self) -> FlowNodeId {
39		self.node
40	}
41
42	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
43		let mut decoded_diffs = Vec::with_capacity(change.diffs.len());
44		for diff in change.diffs {
45			decoded_diffs.push(match diff {
46				Diff::Insert {
47					post,
48				} => {
49					let mut decoded = post;
50					decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
51					Diff::insert_arc(decoded)
52				}
53				Diff::Update {
54					pre,
55					post,
56				} => {
57					let mut decoded_pre = pre;
58					let mut decoded_post = post;
59					decode_dictionary_columns(Arc::make_mut(&mut decoded_pre), txn)?;
60					decode_dictionary_columns(Arc::make_mut(&mut decoded_post), txn)?;
61					Diff::update_arc(decoded_pre, decoded_post)
62				}
63				Diff::Remove {
64					pre,
65				} => {
66					let mut decoded = pre;
67					decode_dictionary_columns(Arc::make_mut(&mut decoded), txn)?;
68					Diff::remove_arc(decoded)
69				}
70			});
71		}
72		Ok(Change::from_flow(self.node, change.version, decoded_diffs, change.changed_at))
73	}
74
75	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
76		if rows.is_empty() {
77			return Ok(self.empty_columns());
78		}
79
80		let shape: RowShape = (&self.ringbuffer.columns).into();
81		let fields = shape.fields();
82
83		let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
84		for field in fields.iter() {
85			columns_vec.push(ColumnWithName {
86				name: Fragment::internal(&field.name),
87				data: ColumnBuffer::with_capacity(field.constraint.get_type(), rows.len()),
88			});
89		}
90		let mut row_numbers = Vec::with_capacity(rows.len());
91		let mut created_at = Vec::with_capacity(rows.len());
92		let mut updated_at = Vec::with_capacity(rows.len());
93
94		for row_num in rows {
95			let key = RowKey::encoded(ShapeId::ringbuffer(self.ringbuffer.id), *row_num);
96			if let Some(encoded) = txn.get(&key)? {
97				row_numbers.push(*row_num);
98				created_at.push(DateTime::from_nanos(encoded.created_at_nanos()));
99				updated_at.push(DateTime::from_nanos(encoded.updated_at_nanos()));
100				for (i, _field) in fields.iter().enumerate() {
101					let value = shape.get_value(&encoded, i);
102					columns_vec[i].data.push_value(value);
103				}
104			}
105		}
106
107		if row_numbers.is_empty() {
108			Ok(self.empty_columns())
109		} else {
110			Ok(Columns::with_system_columns(columns_vec, row_numbers, created_at, updated_at))
111		}
112	}
113}
114
115impl PrimitiveRingBufferOperator {
116	fn empty_columns(&self) -> Columns {
117		let columns: Vec<ColumnWithName> = self
118			.ringbuffer
119			.columns
120			.iter()
121			.map(|col| ColumnWithName {
122				name: Fragment::internal(&col.name),
123				data: ColumnBuffer::with_capacity(col.constraint.get_type(), 0),
124			})
125			.collect();
126		Columns::new(columns)
127	}
128}