use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use postcard::{from_bytes, to_stdvec};
use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
use reifydb_core::{
encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
interface::{
catalog::{
flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, shape::ShapeId, view::View,
},
change::{Change, ChangeOrigin, Diff},
resolved::ResolvedView,
},
internal,
key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
value::column::columns::Columns,
};
use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
use reifydb_type::{
Result,
error::Error,
value::{blob::Blob, datetime::DateTime, row_number::RowNumber},
};
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use super::{coerce_columns, encode_row_at_index};
use crate::{
Operator,
operator::{
Operators,
stateful::{raw::RawStatefulOperator, single::SingleStateful},
},
transaction::FlowTransaction,
};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RingBufferState {
forward: BTreeMap<RowNumber, RowNumber>,
reverse: BTreeMap<RowNumber, RowNumber>,
}
pub struct SinkRingBufferViewOperator {
#[allow(dead_code)]
parent: Arc<Operators>,
node: FlowNodeId,
view: ResolvedView,
ringbuffer_id: RingBufferId,
capacity: u64,
propagate_evictions: bool,
state_shape: RowShape,
}
impl SinkRingBufferViewOperator {
pub fn new(
parent: Arc<Operators>,
node: FlowNodeId,
view: ResolvedView,
ringbuffer_id: RingBufferId,
capacity: u64,
propagate_evictions: bool,
) -> Self {
Self {
parent,
node,
view,
ringbuffer_id,
capacity,
propagate_evictions,
state_shape: RowShape::operator_state(),
}
}
fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
match txn.get(&key)? {
Some(row) => Ok(decode_ringbuffer_metadata(&row)),
None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
}
}
fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
let row = encode_ringbuffer_metadata(metadata);
txn.set(&key, row)
}
fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
let state_row = self.load_state(txn)?;
if state_row.is_empty() || !state_row.is_defined(0) {
return Ok(RingBufferState::default());
}
let blob = self.state_shape.get_blob(&state_row, 0);
if blob.is_empty() {
return Ok(RingBufferState::default());
}
from_bytes(blob.as_ref())
.map_err(|e| Error(Box::new(internal!("Failed to deserialize RingBufferState: {}", e))))
}
fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
let serialized = to_stdvec(state)
.map_err(|e| Error(Box::new(internal!("Failed to serialize RingBufferState: {}", e))))?;
let blob = Blob::from(serialized);
self.update_state(txn, |shape, row| {
shape.set_blob(row, 0, &blob);
Ok(())
})?;
Ok(())
}
}
impl RawStatefulOperator for SinkRingBufferViewOperator {}
impl SingleStateful for SinkRingBufferViewOperator {
fn layout(&self) -> RowShape {
self.state_shape.clone()
}
}
impl Operator for SinkRingBufferViewOperator {
fn id(&self) -> FlowNodeId {
self.node
}
fn capabilities(&self) -> u32 {
CAPABILITY_ALL_STANDARD
}
fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
let view = self.view.def().clone();
let shape: RowShape = view.columns().into();
let object_id = ShapeId::ringbuffer(self.ringbuffer_id);
let mut metadata = self.read_metadata(txn)?;
let mut state = self.load(txn)?;
for diff in change.diffs.iter() {
match diff {
Diff::Insert {
post,
} => self.apply_ringbuffer_insert(
txn,
&view,
&shape,
object_id,
&mut metadata,
&mut state,
post,
)?,
Diff::Update {
pre,
post,
} => self.apply_ringbuffer_update(txn, &view, &shape, object_id, &state, pre, post)?,
Diff::Remove {
pre,
} => self.apply_ringbuffer_remove(txn, &view, &shape, object_id, &mut state, pre)?,
}
}
self.write_metadata(txn, &metadata)?;
self.save(txn, &state)?;
Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
}
fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
unreachable!()
}
}
impl SinkRingBufferViewOperator {
#[inline]
#[allow(clippy::too_many_arguments)]
fn apply_ringbuffer_insert(
&self,
txn: &mut FlowTransaction,
view: &View,
shape: &RowShape,
object_id: ShapeId,
metadata: &mut RingBufferMetadata,
state: &mut RingBufferState,
post: &Arc<Columns>,
) -> Result<()> {
let coerced = coerce_columns(post, view.columns())?;
let row_count = coerced.row_count();
let mut assigned_ids: Vec<RowNumber> = Vec::with_capacity(row_count);
let mut encoded_rows: Vec<EncodedRow> = Vec::with_capacity(row_count);
let mut evicted_in_batch: HashSet<RowNumber> = HashSet::new();
for row_idx in 0..row_count {
if metadata.is_full() {
let oldest_rn = RowNumber(metadata.head);
let pre_key = RowKey::encoded(object_id, oldest_rn);
txn.remove(&pre_key)?;
metadata.head += 1;
metadata.count -= 1;
evicted_in_batch.insert(oldest_rn);
if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
state.forward.remove(&source_rn);
}
if self.propagate_evictions {}
}
let source_rn = coerced.row_numbers[row_idx];
let assigned_rn = RowNumber(metadata.tail);
let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, assigned_rn)?;
if source_rn != assigned_rn {
state.forward.insert(source_rn, assigned_rn);
state.reverse.insert(assigned_rn, source_rn);
}
assigned_ids.push(assigned_rn);
encoded_rows.push(encoded);
if metadata.is_empty() {
metadata.head = assigned_rn.0;
}
metadata.count += 1;
metadata.tail = assigned_rn.0 + 1;
}
let surviving: Vec<usize> =
(0..assigned_ids.len()).filter(|&i| !evicted_in_batch.contains(&assigned_ids[i])).collect();
let final_ids: Vec<RowNumber> = surviving.iter().map(|&i| assigned_ids[i]).collect();
let mut final_rows: Vec<EncodedRow> = surviving.iter().map(|&i| encoded_rows[i].clone()).collect();
ViewRowInterceptor::pre_insert(txn, view, &final_ids, &mut final_rows)?;
for (assigned_rn, encoded) in final_ids.iter().zip(final_rows.iter()) {
let key = RowKey::encoded(object_id, *assigned_rn);
txn.set(&key, encoded.clone())?;
}
ViewRowInterceptor::post_insert(txn, view, &final_ids, &final_rows)?;
emit_view_change(txn, view, Diff::insert(coerced));
Ok(())
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn apply_ringbuffer_update(
&self,
txn: &mut FlowTransaction,
view: &View,
shape: &RowShape,
object_id: ShapeId,
state: &RingBufferState,
pre: &Arc<Columns>,
post: &Arc<Columns>,
) -> Result<()> {
let coerced_pre = coerce_columns(pre, view.columns())?;
let coerced_post = coerce_columns(post, view.columns())?;
let row_count = coerced_post.row_count();
let mut post_ids: Vec<RowNumber> = Vec::with_capacity(row_count);
let mut pre_keys: Vec<EncodedKey> = Vec::with_capacity(row_count);
let mut post_keys: Vec<EncodedKey> = Vec::with_capacity(row_count);
let mut pre_encoded_rows: Vec<EncodedRow> = Vec::with_capacity(row_count);
let mut post_encoded_rows: Vec<EncodedRow> = Vec::with_capacity(row_count);
for row_idx in 0..row_count {
let pre_source_rn = coerced_pre.row_numbers[row_idx];
let post_source_rn = coerced_post.row_numbers[row_idx];
let pre_storage_rn = state.forward.get(&pre_source_rn).copied().unwrap_or(pre_source_rn);
let post_storage_rn = state.forward.get(&post_source_rn).copied().unwrap_or(post_source_rn);
let (_, pre_encoded) = encode_row_at_index(&coerced_pre, row_idx, shape, pre_storage_rn)?;
let (_, post_encoded) = encode_row_at_index(&coerced_post, row_idx, shape, post_storage_rn)?;
post_ids.push(post_storage_rn);
pre_keys.push(RowKey::encoded(object_id, pre_storage_rn));
post_keys.push(RowKey::encoded(object_id, post_storage_rn));
pre_encoded_rows.push(pre_encoded);
post_encoded_rows.push(post_encoded);
}
ViewRowInterceptor::pre_update(txn, view, &post_ids, &mut post_encoded_rows)?;
for ((pre_key, post_key), post_encoded) in
pre_keys.iter().zip(post_keys.iter()).zip(post_encoded_rows.iter())
{
txn.remove(pre_key)?;
txn.set(post_key, post_encoded.clone())?;
}
ViewRowInterceptor::post_update(txn, view, &post_ids, &post_encoded_rows, &pre_encoded_rows)?;
emit_view_change(txn, view, Diff::update(coerced_pre, coerced_post));
Ok(())
}
#[inline]
fn apply_ringbuffer_remove(
&self,
txn: &mut FlowTransaction,
view: &View,
shape: &RowShape,
object_id: ShapeId,
state: &mut RingBufferState,
pre: &Arc<Columns>,
) -> Result<()> {
let coerced = coerce_columns(pre, view.columns())?;
let row_count = coerced.row_count();
let mut storage_ids: Vec<RowNumber> = Vec::with_capacity(row_count);
let mut encoded_rows: Vec<EncodedRow> = Vec::with_capacity(row_count);
for row_idx in 0..row_count {
let source_rn = coerced.row_numbers[row_idx];
let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
state.reverse.remove(&storage_rn);
let (_, encoded) = encode_row_at_index(&coerced, row_idx, shape, storage_rn)?;
storage_ids.push(storage_rn);
encoded_rows.push(encoded);
}
ViewRowInterceptor::pre_delete(txn, view, &storage_ids)?;
for storage_rn in storage_ids.iter() {
let key = RowKey::encoded(object_id, *storage_rn);
txn.remove(&key)?;
}
ViewRowInterceptor::post_delete(txn, view, &storage_ids, &encoded_rows)?;
emit_view_change(txn, view, Diff::remove(coerced));
Ok(())
}
}
#[inline]
fn emit_view_change(txn: &mut FlowTransaction, view: &View, diff: Diff) {
let version = txn.version();
let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
txn.track_flow_change(Change {
origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
version,
diffs: smallvec![diff],
changed_at,
});
}