use crate::sql::ChangeOp;
use crate::types::{value::Row, MeruError, Result};
use async_trait::async_trait;
use futures::stream::BoxStream;
#[derive(Clone, Debug)]
pub struct OpRecord {
pub seq: u64,
pub op: ChangeOp,
pub row: Row,
pub pk_bytes: Vec<u8>,
}
#[derive(Clone, Debug)]
pub struct LogGap {
pub requested: u64,
pub earliest_available: Option<u64>,
pub reason: String,
}
impl std::fmt::Display for LogGap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"log gap: requested={}, earliest_available={:?}, reason={}",
self.requested, self.earliest_available, self.reason
)
}
}
impl std::error::Error for LogGap {}
#[async_trait]
pub trait LogSource: Send + Sync + 'static {
async fn stream(&self, since: u64) -> Result<BoxStream<'static, Result<OpRecord>>>;
async fn latest_seq(&self) -> Result<u64>;
}
pub struct ChangeFeedLogSource {
pub primary_low_water: u64,
}
impl ChangeFeedLogSource {
pub fn new(primary_low_water: u64) -> Self {
Self { primary_low_water }
}
}
#[async_trait]
impl LogSource for ChangeFeedLogSource {
async fn stream(&self, since: u64) -> Result<BoxStream<'static, Result<OpRecord>>> {
Err(MeruError::ChangeFeedBelowRetention {
requested: since,
low_water: self.primary_low_water,
})
}
async fn latest_seq(&self) -> Result<u64> {
Err(MeruError::InvalidArgument(
"ChangeFeedLogSource::latest_seq: Phase 2 pending (requires #29 Phase 2 \
Flight SQL endpoint)"
.into(),
))
}
}
pub struct InProcessLogSource {
db: std::sync::Arc<crate::MeruDB>,
batch_size: usize,
}
impl InProcessLogSource {
pub fn new(db: std::sync::Arc<crate::MeruDB>) -> Self {
Self {
db,
batch_size: 4096,
}
}
pub fn with_batch_size(mut self, n: usize) -> Self {
self.batch_size = n.max(1);
self
}
}
#[async_trait]
impl LogSource for InProcessLogSource {
async fn stream(&self, since: u64) -> Result<BoxStream<'static, Result<OpRecord>>> {
use futures::stream::StreamExt;
let engine = self.db.engine_for_replica();
let upper = engine.read_seq().0;
let records: Vec<Result<OpRecord>> = if since >= upper {
Vec::new()
} else {
let mut cursor = crate::sql::ChangeFeedCursor::from_engine(engine, since);
let mut out: Vec<Result<OpRecord>> = Vec::new();
loop {
let batch = cursor.next_batch(self.batch_size)?;
if batch.is_empty() {
break;
}
let mut reached_upper = false;
for r in batch {
if r.seq > upper {
reached_upper = true;
break;
}
out.push(Ok(OpRecord {
seq: r.seq,
op: r.op,
row: r.row,
pk_bytes: r.pk_bytes,
}));
}
if reached_upper || cursor.since_seq() >= upper {
break;
}
}
out
};
Ok(futures::stream::iter(records).boxed())
}
async fn latest_seq(&self) -> Result<u64> {
Ok(self.db.read_seq().0)
}
}
pub struct ReplicaTail {
index: std::collections::HashMap<Vec<u8>, TailEntry>,
visible_seq: u64,
ops_applied: u64,
}
#[derive(Clone, Debug)]
struct TailEntry {
seq: u64,
op: ChangeOp,
row: crate::types::value::Row,
}
impl Default for ReplicaTail {
fn default() -> Self {
Self::new()
}
}
impl ReplicaTail {
pub fn new() -> Self {
Self {
index: std::collections::HashMap::new(),
visible_seq: 0,
ops_applied: 0,
}
}
pub fn visible_seq(&self) -> u64 {
self.visible_seq
}
pub fn seed_visible_seq(&mut self, seq: u64) {
if seq > self.visible_seq {
self.visible_seq = seq;
}
}
pub fn ops_applied(&self) -> u64 {
self.ops_applied
}
pub fn get(&self, pk_bytes: &[u8]) -> Option<&crate::types::value::Row> {
let entry = self.index.get(pk_bytes)?;
match entry.op {
ChangeOp::Insert | ChangeOp::Update => Some(&entry.row),
ChangeOp::Delete => None,
}
}
pub fn apply(&mut self, pk_bytes: Vec<u8>, op_record: OpRecord) {
if let Some(existing) = self.index.get(&pk_bytes) {
if op_record.seq <= existing.seq {
return;
}
}
let seq = op_record.seq;
self.index.insert(
pk_bytes,
TailEntry {
seq,
op: op_record.op,
row: op_record.row,
},
);
if seq > self.visible_seq {
self.visible_seq = seq;
}
self.ops_applied += 1;
}
pub async fn advance<S>(&mut self, source: &S) -> Result<()>
where
S: LogSource + ?Sized,
{
use futures::stream::StreamExt;
let since = self.visible_seq;
let mut stream = source.stream(since).await?;
while let Some(rec) = stream.next().await {
let rec = rec?;
let pk = rec.pk_bytes.clone();
self.apply(pk, rec);
}
Ok(())
}
}
pub struct ReplicaState {
base: std::sync::Arc<crate::MeruDB>,
tail: tokio::sync::RwLock<ReplicaTail>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ReplicaStats {
pub visible_seq: u64,
pub base_seq: u64,
pub tail_length: usize,
pub rebase_count: u64,
pub last_rebase_warmup_millis: u64,
}
pub struct Replica {
state: arc_swap::ArcSwap<ReplicaState>,
log_source: std::sync::Arc<dyn LogSource>,
schema: std::sync::Arc<crate::types::schema::TableSchema>,
rebase_count: std::sync::atomic::AtomicU64,
last_rebase_warmup_millis: std::sync::atomic::AtomicU64,
rebase_timeout_millis: std::sync::atomic::AtomicU64,
base_opts: crate::OpenOptions,
}
impl Replica {
pub async fn open(
mut base_options: crate::OpenOptions,
log_source: std::sync::Arc<dyn LogSource>,
) -> Result<Self> {
base_options = base_options.read_only(true);
let schema = std::sync::Arc::new(base_options.schema.clone());
let base_opts = base_options.clone();
let base = std::sync::Arc::new(crate::MeruDB::open(base_options).await?);
let mut initial_tail = ReplicaTail::new();
let base_seq = base.read_seq().0;
if base_seq > 0 {
initial_tail.seed_visible_seq(base_seq);
}
let state = std::sync::Arc::new(ReplicaState {
base,
tail: tokio::sync::RwLock::new(initial_tail),
});
Ok(Self {
state: arc_swap::ArcSwap::new(state),
log_source,
schema,
base_opts,
rebase_count: std::sync::atomic::AtomicU64::new(0),
last_rebase_warmup_millis: std::sync::atomic::AtomicU64::new(0),
rebase_timeout_millis: std::sync::atomic::AtomicU64::new(60_000),
})
}
pub fn set_rebase_timeout(&self, timeout: Option<std::time::Duration>) {
let millis = timeout.map(|d| d.as_millis() as u64).unwrap_or(0);
self.rebase_timeout_millis
.store(millis, std::sync::atomic::Ordering::Relaxed);
}
pub fn rebase_timeout(&self) -> Option<std::time::Duration> {
let m = self
.rebase_timeout_millis
.load(std::sync::atomic::Ordering::Relaxed);
if m == 0 {
None
} else {
Some(std::time::Duration::from_millis(m))
}
}
pub async fn stats(&self) -> ReplicaStats {
let state = self.state.load_full();
let base_seq = state.base.read_seq().0;
let (visible_seq, tail_length) = {
let tail = state.tail.read().await;
(tail.visible_seq(), tail.ops_applied() as usize)
};
ReplicaStats {
visible_seq,
base_seq,
tail_length,
rebase_count: self.rebase_count.load(std::sync::atomic::Ordering::Relaxed),
last_rebase_warmup_millis: self
.last_rebase_warmup_millis
.load(std::sync::atomic::Ordering::Relaxed),
}
}
pub async fn advance(&self) -> Result<()> {
let state = self.state.load_full();
let mut tail = state.tail.write().await;
tail.advance(self.log_source.as_ref()).await
}
pub async fn get(
&self,
pk_values: &[crate::types::value::FieldValue],
) -> Result<Option<crate::types::value::Row>> {
let encoded = crate::types::key::InternalKey::encode_user_key(pk_values, &self.schema)?;
let state = self.state.load_full();
{
let tail = state.tail.read().await;
if tail.index.contains_key(&encoded) {
return Ok(tail.get(&encoded).cloned());
}
}
state.base.get(pk_values)
}
pub async fn visible_seq(&self) -> u64 {
let state = self.state.load_full();
let seq = state.tail.read().await.visible_seq();
seq
}
pub fn base_seq(&self) -> u64 {
self.state.load_full().base.read_seq().0
}
pub async fn rebase(&self) -> Result<()> {
let state = self.state.load_full();
state.base.refresh().await?;
let new_base_seq = state.base.read_seq().0;
{
let mut tail = state.tail.write().await;
*tail = ReplicaTail::new();
if new_base_seq > 0 {
tail.seed_visible_seq(new_base_seq);
}
tail.advance(self.log_source.as_ref()).await?;
}
Ok(())
}
pub async fn rebase_hotswap(&self) -> Result<(u64, u64)> {
let start = std::time::Instant::now();
let timeout_millis = self
.rebase_timeout_millis
.load(std::sync::atomic::Ordering::Relaxed);
let base_opts = self.base_opts.clone();
let log_source = self.log_source.clone();
let work = async move {
let new_base = std::sync::Arc::new(crate::MeruDB::open(base_opts).await?);
let new_base_seq = new_base.read_seq().0;
let mut fresh_tail = ReplicaTail::new();
if new_base_seq > 0 {
fresh_tail.seed_visible_seq(new_base_seq);
}
match fresh_tail.advance(log_source.as_ref()).await {
Ok(()) => {}
Err(MeruError::ChangeFeedBelowRetention { .. }) => {}
Err(other) => return Err(other),
}
let new_visible_seq = fresh_tail.visible_seq();
let new_state = std::sync::Arc::new(ReplicaState {
base: new_base,
tail: tokio::sync::RwLock::new(fresh_tail),
});
Ok::<_, MeruError>((new_state, new_base_seq, new_visible_seq))
};
let (new_state, new_base_seq, new_visible_seq) = if timeout_millis > 0 {
match tokio::time::timeout(std::time::Duration::from_millis(timeout_millis), work).await
{
Ok(inner) => inner?,
Err(_) => {
return Err(MeruError::InvalidArgument(format!(
"rebase_hotswap timed out after {timeout_millis}ms — \
primary's write rate likely exceeds replica drain \
throughput; raise `Replica::set_rebase_timeout` or \
reduce primary load. The replica's old state is still \
serving reads (this call left it untouched)."
)))
}
}
} else {
work.await?
};
self.state.store(new_state);
let warmup_millis = start.elapsed().as_millis() as u64;
self.last_rebase_warmup_millis
.store(warmup_millis, std::sync::atomic::Ordering::Relaxed);
self.rebase_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok((new_base_seq, new_visible_seq))
}
pub async fn advance_or_recover(&self) -> Result<AdvanceOutcome> {
match self.advance().await {
Ok(()) => Ok(AdvanceOutcome::Advanced),
Err(MeruError::ChangeFeedBelowRetention { .. }) => {
let (new_base_seq, _) = self.rebase_hotswap().await?;
Ok(AdvanceOutcome::Recovered { new_base_seq })
}
Err(other) => Err(other),
}
}
pub async fn recover_from_log_gap(&self) -> Result<u64> {
let (new_base_seq, _) = self.rebase_hotswap().await?;
Ok(new_base_seq)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum AdvanceOutcome {
Advanced,
Recovered { new_base_seq: u64 },
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn phase1_stream_returns_below_retention() {
let src = ChangeFeedLogSource::new(1000);
let err = src.stream(500).await.err().unwrap();
match err {
MeruError::ChangeFeedBelowRetention {
requested,
low_water,
} => {
assert_eq!(requested, 500);
assert_eq!(low_water, 1000);
}
other => panic!("unexpected error shape: {other:?}"),
}
}
#[tokio::test]
async fn phase1_latest_seq_errors_with_pointer() {
let src = ChangeFeedLogSource::new(0);
let err = src.latest_seq().await.err().unwrap();
assert!(format!("{err:?}").contains("Phase 2"));
}
#[test]
fn log_gap_display_is_informative() {
let gap = LogGap {
requested: 42,
earliest_available: Some(100),
reason: "change-feed retention".into(),
};
let s = format!("{gap}");
assert!(s.contains("42"));
assert!(s.contains("100"));
assert!(s.contains("retention"));
}
}