use std::sync::Arc;
use crate::engine::engine::MeruEngine;
use crate::types::{
sequence::{OpType, SeqNum},
value::Row,
MeruError, Result,
};
pub mod arrow;
pub mod datafusion_provider;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ChangeOp {
Insert,
Update,
Delete,
}
impl ChangeOp {
pub fn as_sql_str(self) -> &'static str {
match self {
ChangeOp::Insert => "INSERT",
ChangeOp::Update => "UPDATE",
ChangeOp::Delete => "DELETE",
}
}
}
#[derive(Clone, Debug)]
pub struct ChangeRecord {
pub seq: u64,
pub op: ChangeOp,
pub row: Row,
pub pk_bytes: Vec<u8>,
}
pub struct ChangeFeedCursor {
inner: CursorInner,
skip_update_discrimination: bool,
}
enum CursorInner {
Engine {
engine: Arc<MeruEngine>,
since_seq: u64,
buffer: Vec<ChangeRecord>,
},
BelowRetention {
requested: u64,
low_water: u64,
},
}
impl ChangeFeedCursor {
pub fn from_engine(engine: Arc<MeruEngine>, since_seq: u64) -> Self {
Self {
inner: CursorInner::Engine {
engine,
since_seq,
buffer: Vec::new(),
},
skip_update_discrimination: false,
}
}
pub fn new_below_retention(requested: u64, low_water: u64) -> Self {
Self {
inner: CursorInner::BelowRetention {
requested,
low_water,
},
skip_update_discrimination: false,
}
}
pub fn skip_update_discrimination(mut self, skip: bool) -> Self {
self.skip_update_discrimination = skip;
self
}
pub fn next_batch(&mut self, max_rows: usize) -> Result<Vec<ChangeRecord>> {
match &mut self.inner {
CursorInner::BelowRetention {
requested,
low_water,
} => Err(MeruError::ChangeFeedBelowRetention {
requested: *requested,
low_water: *low_water,
}),
CursorInner::Engine {
engine,
since_seq,
buffer,
} => {
if buffer.is_empty() {
let read_seq = engine.read_seq();
if SeqNum(*since_seq) >= read_seq {
return Ok(Vec::new());
}
let raw = engine.scan_tail_changes_with_pre_image(*since_seq, read_seq)?;
buffer.reserve(raw.len());
for tuple in raw {
let op = match tuple.op_type {
OpType::Put => {
if self.skip_update_discrimination {
ChangeOp::Insert
} else {
let had_prior = if tuple.seq == 0 {
false
} else {
engine
.point_lookup_by_user_key_at_seq(
&tuple.pk_bytes,
SeqNum(tuple.seq - 1),
)?
.is_some()
};
if had_prior {
ChangeOp::Update
} else {
ChangeOp::Insert
}
}
}
OpType::Delete => ChangeOp::Delete,
};
buffer.push(ChangeRecord {
seq: tuple.seq,
op,
row: tuple.row,
pk_bytes: tuple.pk_bytes,
});
}
}
let drain_n = buffer.len().min(max_rows);
if drain_n == 0 {
return Ok(Vec::new());
}
let remainder = buffer.split_off(drain_n);
let out = std::mem::replace(buffer, remainder);
if let Some(last) = out.last() {
*since_seq = last.seq;
}
Ok(out)
}
}
}
pub fn since_seq(&self) -> u64 {
match &self.inner {
CursorInner::Engine { since_seq, .. } => *since_seq,
CursorInner::BelowRetention { requested, .. } => *requested,
}
}
pub fn buffered_len(&self) -> usize {
match &self.inner {
CursorInner::Engine { buffer, .. } => buffer.len(),
CursorInner::BelowRetention { .. } => 0,
}
}
}