use std::{collections::BTreeMap, sync::Arc};
use postcard::{from_bytes, to_stdvec};
use reifydb_catalog::store::ringbuffer::update::{decode_ringbuffer_metadata, encode_ringbuffer_metadata};
use reifydb_core::{
encoded::shape::{RowShape, RowShapeField},
interface::{
catalog::{flow::FlowNodeId, id::RingBufferId, ringbuffer::RingBufferMetadata, shape::ShapeId},
change::{Change, ChangeOrigin, Diff},
resolved::ResolvedView,
},
internal,
key::{ringbuffer::RingBufferMetadataKey, row::RowKey},
value::column::columns::Columns,
};
use reifydb_transaction::interceptor::view_row::ViewRowInterceptor;
use reifydb_type::{
Result,
error::Error,
value::{blob::Blob, datetime::DateTime, row_number::RowNumber, r#type::Type},
};
use serde::{Deserialize, Serialize};
use super::{coerce_columns, encode_row_at_index};
use crate::{
Operator,
operator::{
Operators,
stateful::{raw::RawStatefulOperator, single::SingleStateful},
},
transaction::FlowTransaction,
};
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RingBufferState {
forward: BTreeMap<RowNumber, RowNumber>, reverse: BTreeMap<RowNumber, RowNumber>, }
pub struct SinkRingBufferViewOperator {
#[allow(dead_code)]
parent: Arc<Operators>,
node: FlowNodeId,
view: ResolvedView,
ringbuffer_id: RingBufferId,
capacity: u64,
propagate_evictions: bool,
state_shape: RowShape,
}
impl SinkRingBufferViewOperator {
pub fn new(
parent: Arc<Operators>,
node: FlowNodeId,
view: ResolvedView,
ringbuffer_id: RingBufferId,
capacity: u64,
propagate_evictions: bool,
) -> Self {
Self {
parent,
node,
view,
ringbuffer_id,
capacity,
propagate_evictions,
state_shape: RowShape::new(vec![RowShapeField::unconstrained("state", Type::Blob)]),
}
}
fn read_metadata(&self, txn: &mut FlowTransaction) -> Result<RingBufferMetadata> {
let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
match txn.get(&key)? {
Some(row) => Ok(decode_ringbuffer_metadata(&row)),
None => Ok(RingBufferMetadata::new(self.ringbuffer_id, self.capacity)),
}
}
fn write_metadata(&self, txn: &mut FlowTransaction, metadata: &RingBufferMetadata) -> Result<()> {
let key = RingBufferMetadataKey::encoded(self.ringbuffer_id);
let row = encode_ringbuffer_metadata(metadata);
txn.set(&key, row)
}
fn load(&self, txn: &mut FlowTransaction) -> Result<RingBufferState> {
let state_row = self.load_state(txn)?;
if state_row.is_empty() || !state_row.is_defined(0) {
return Ok(RingBufferState::default());
}
let blob = self.state_shape.get_blob(&state_row, 0);
if blob.is_empty() {
return Ok(RingBufferState::default());
}
from_bytes(blob.as_ref())
.map_err(|e| Error(Box::new(internal!("Failed to deserialize RingBufferState: {}", e))))
}
fn save(&self, txn: &mut FlowTransaction, state: &RingBufferState) -> Result<()> {
let serialized = to_stdvec(state)
.map_err(|e| Error(Box::new(internal!("Failed to serialize RingBufferState: {}", e))))?;
let blob = Blob::from(serialized);
self.update_state(txn, |shape, row| {
shape.set_blob(row, 0, &blob);
Ok(())
})?;
Ok(())
}
}
impl RawStatefulOperator for SinkRingBufferViewOperator {}
impl SingleStateful for SinkRingBufferViewOperator {
fn layout(&self) -> RowShape {
self.state_shape.clone()
}
}
impl Operator for SinkRingBufferViewOperator {
fn id(&self) -> FlowNodeId {
self.node
}
fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
let view = self.view.def().clone();
let shape: RowShape = view.columns().into();
let object_id = ShapeId::ringbuffer(self.ringbuffer_id);
let mut metadata = self.read_metadata(txn)?;
let mut state = self.load(txn)?;
for diff in change.diffs.iter() {
match diff {
Diff::Insert {
post,
} => {
let coerced = coerce_columns(post, view.columns())?;
let row_count = coerced.row_count();
for row_idx in 0..row_count {
if metadata.is_full() {
let oldest_rn = RowNumber(metadata.head);
let pre_key = RowKey::encoded(object_id, oldest_rn);
txn.remove(&pre_key)?;
metadata.head += 1;
metadata.count -= 1;
if let Some(source_rn) = state.reverse.remove(&oldest_rn) {
state.forward.remove(&source_rn);
}
if self.propagate_evictions {
}
}
let source_rn = coerced.row_numbers[row_idx];
let assigned_rn = RowNumber(metadata.tail);
let (_, encoded) =
encode_row_at_index(&coerced, row_idx, &shape, assigned_rn)?;
if source_rn != assigned_rn {
state.forward.insert(source_rn, assigned_rn);
state.reverse.insert(assigned_rn, source_rn);
}
let encoded = ViewRowInterceptor::pre_insert(
txn,
&view,
assigned_rn,
encoded,
)?;
let key = RowKey::encoded(object_id, assigned_rn);
txn.set(&key, encoded.clone())?;
ViewRowInterceptor::post_insert(txn, &view, assigned_rn, &encoded)?;
if metadata.is_empty() {
metadata.head = assigned_rn.0;
}
metadata.count += 1;
metadata.tail = assigned_rn.0 + 1;
}
let version = txn.version();
let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
txn.track_flow_change(Change {
origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
version,
diffs: vec![Diff::Insert {
post: coerced,
}],
changed_at,
});
}
Diff::Update {
pre,
post,
} => {
let coerced_pre = coerce_columns(pre, view.columns())?;
let coerced_post = coerce_columns(post, view.columns())?;
let row_count = coerced_post.row_count();
for row_idx in 0..row_count {
let pre_source_rn = coerced_pre.row_numbers[row_idx];
let post_source_rn = coerced_post.row_numbers[row_idx];
let pre_storage_rn = state
.forward
.get(&pre_source_rn)
.copied()
.unwrap_or(pre_source_rn);
let post_storage_rn = state
.forward
.get(&post_source_rn)
.copied()
.unwrap_or(post_source_rn);
let (_, pre_encoded) = encode_row_at_index(
&coerced_pre,
row_idx,
&shape,
pre_storage_rn,
)?;
let (_, post_encoded) = encode_row_at_index(
&coerced_post,
row_idx,
&shape,
post_storage_rn,
)?;
let post_encoded = ViewRowInterceptor::pre_update(
txn,
&view,
post_storage_rn,
post_encoded,
)?;
let pre_key = RowKey::encoded(object_id, pre_storage_rn);
let post_key = RowKey::encoded(object_id, post_storage_rn);
txn.remove(&pre_key)?;
txn.set(&post_key, post_encoded.clone())?;
ViewRowInterceptor::post_update(
txn,
&view,
post_storage_rn,
&post_encoded,
&pre_encoded,
)?;
}
let version = txn.version();
let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
txn.track_flow_change(Change {
origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
version,
diffs: vec![Diff::Update {
pre: coerced_pre,
post: coerced_post,
}],
changed_at,
});
}
Diff::Remove {
pre,
} => {
let coerced = coerce_columns(pre, view.columns())?;
let row_count = coerced.row_count();
for row_idx in 0..row_count {
let source_rn = coerced.row_numbers[row_idx];
let storage_rn = state.forward.remove(&source_rn).unwrap_or(source_rn);
state.reverse.remove(&storage_rn);
let (_, encoded) =
encode_row_at_index(&coerced, row_idx, &shape, storage_rn)?;
ViewRowInterceptor::pre_delete(txn, &view, storage_rn)?;
let key = RowKey::encoded(object_id, storage_rn);
txn.remove(&key)?;
ViewRowInterceptor::post_delete(txn, &view, storage_rn, &encoded)?;
}
let version = txn.version();
let changed_at = DateTime::from_nanos(txn.clock().now_nanos());
txn.track_flow_change(Change {
origin: ChangeOrigin::Shape(ShapeId::view(view.id())),
version,
diffs: vec![Diff::Remove {
pre: coerced,
}],
changed_at,
});
}
}
}
self.write_metadata(txn, &metadata)?;
self.save(txn, &state)?;
Ok(Change::from_flow(self.node, change.version, Vec::new(), change.changed_at))
}
fn pull(&self, _txn: &mut FlowTransaction, _rows: &[RowNumber]) -> Result<Columns> {
unreachable!()
}
}