#![cfg(feature = "sql")]
use std::sync::Arc;
use merutable::engine::{config::EngineConfig, engine::MeruEngine};
use merutable::sql::{ChangeFeedCursor, ChangeOp};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
fn schema() -> TableSchema {
TableSchema {
table_name: "cf-issue36".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
fn row(id: i64, v: i64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Int64(v)),
])
}
async fn open_engine(tmp: &tempfile::TempDir) -> Arc<MeruEngine> {
let cfg = EngineConfig {
schema: schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
..Default::default()
};
MeruEngine::open(cfg).await.unwrap()
}
#[tokio::test]
async fn buffer_is_populated_once_and_drained_across_batches() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
for i in 1..=500i64 {
engine
.put(vec![FieldValue::Int64(i)], row(i, i * 10))
.await
.unwrap();
}
let mut cursor = ChangeFeedCursor::from_engine(engine, 0);
assert_eq!(cursor.buffered_len(), 0, "fresh cursor has empty buffer");
let b1 = cursor.next_batch(100).unwrap();
assert_eq!(b1.len(), 100, "first batch returns 100");
assert_eq!(
cursor.buffered_len(),
400,
"remaining 400 stay in the buffer — NOT rescanned next call"
);
let b2 = cursor.next_batch(100).unwrap();
assert_eq!(b2.len(), 100);
assert_eq!(cursor.buffered_len(), 300);
assert_eq!(
b2[0].seq,
b1.last().unwrap().seq + 1,
"batches are contiguous in seq order"
);
let b3 = cursor.next_batch(10_000).unwrap();
assert_eq!(b3.len(), 300);
assert_eq!(cursor.buffered_len(), 0);
let b4 = cursor.next_batch(100).unwrap();
assert!(b4.is_empty());
}
#[tokio::test]
async fn large_drain_completes_in_linear_time() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
const N: i64 = 10_000;
for i in 1..=N {
engine
.put(vec![FieldValue::Int64(i)], row(i, i))
.await
.unwrap();
}
let mut cursor = ChangeFeedCursor::from_engine(engine, 0);
let started = std::time::Instant::now();
let mut total = 0usize;
loop {
let batch = cursor.next_batch(1024).unwrap();
if batch.is_empty() {
break;
}
total += batch.len();
}
let elapsed = started.elapsed();
assert_eq!(total, N as usize, "drained every record");
assert!(
elapsed < std::time::Duration::from_secs(5),
"10k-record drain took {elapsed:?} — quadratic regression"
);
}
#[tokio::test]
async fn buffered_drain_matches_single_shot_drain() {
let tmp_a = tempfile::tempdir().unwrap();
let engine_a = open_engine(&tmp_a).await;
let tmp_b = tempfile::tempdir().unwrap();
let engine_b = open_engine(&tmp_b).await;
for i in 1..=250i64 {
engine_a
.put(vec![FieldValue::Int64(i)], row(i, i))
.await
.unwrap();
engine_b
.put(vec![FieldValue::Int64(i)], row(i, i))
.await
.unwrap();
}
let mut cursor_a = ChangeFeedCursor::from_engine(engine_a, 0);
let mut got_a: Vec<u64> = Vec::new();
loop {
let batch = cursor_a.next_batch(7).unwrap();
if batch.is_empty() {
break;
}
got_a.extend(batch.iter().map(|r| r.seq));
}
let mut cursor_b = ChangeFeedCursor::from_engine(engine_b, 0);
let batch = cursor_b.next_batch(10_000).unwrap();
let got_b: Vec<u64> = batch.iter().map(|r| r.seq).collect();
assert_eq!(got_a, got_b, "small-batch drain must match single-shot");
assert!(got_a.windows(2).all(|w| w[0] < w[1]), "strictly ascending");
assert_eq!(got_a.len(), 250);
}
#[tokio::test]
async fn cursor_refills_after_drain_for_new_records() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
for i in 1..=50i64 {
engine
.put(vec![FieldValue::Int64(i)], row(i, i))
.await
.unwrap();
}
let mut cursor = ChangeFeedCursor::from_engine(engine.clone(), 0);
let first = cursor.next_batch(1_000).unwrap();
assert_eq!(first.len(), 50);
assert_eq!(cursor.buffered_len(), 0);
for i in 51..=100i64 {
engine
.put(vec![FieldValue::Int64(i)], row(i, i))
.await
.unwrap();
}
let second = cursor.next_batch(1_000).unwrap();
assert_eq!(second.len(), 50);
assert!(second.iter().all(|r| r.seq > first.last().unwrap().seq));
}
#[tokio::test]
async fn buffered_path_honors_skip_update_discrimination() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(1)], row(1, 10))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(1)], row(1, 20))
.await
.unwrap();
let mut cursor = ChangeFeedCursor::from_engine(engine, 0).skip_update_discrimination(true);
let batch = cursor.next_batch(100).unwrap();
assert_eq!(batch.len(), 2);
for rec in &batch {
assert_eq!(rec.op, ChangeOp::Insert);
}
}