#![cfg(feature = "sql")]
use std::sync::Arc;
use merutable::engine::{config::EngineConfig, engine::MeruEngine};
use merutable::sql::{ChangeFeedCursor, ChangeOp};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
fn test_schema() -> TableSchema {
TableSchema {
table_name: "cf-test".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
async fn open_engine(tmp: &tempfile::TempDir) -> Arc<MeruEngine> {
let cfg = EngineConfig {
schema: test_schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
..Default::default()
};
MeruEngine::open(cfg).await.unwrap()
}
fn row(id: i64, v: i64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Int64(v)),
])
}
#[tokio::test]
async fn memtable_scan_returns_ops_above_since_in_seq_order() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(1)], row(1, 100))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(2)], row(2, 200))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(3)], row(3, 300))
.await
.unwrap();
engine.delete(vec![FieldValue::Int64(2)]).await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine.clone(), 0);
let batch = cur.next_batch(100).unwrap();
assert_eq!(batch.len(), 4);
for w in batch.windows(2) {
assert!(w[0].seq < w[1].seq);
}
let ops: Vec<ChangeOp> = batch.iter().map(|r| r.op).collect();
assert_eq!(
ops,
vec![
ChangeOp::Insert,
ChangeOp::Insert,
ChangeOp::Insert,
ChangeOp::Delete
]
);
assert_eq!(cur.since_seq(), batch.last().unwrap().seq);
}
#[tokio::test]
async fn cursor_filters_ops_at_or_below_since_seq() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(1)], row(1, 1))
.await
.unwrap();
let boundary = engine.read_seq().0;
engine
.put(vec![FieldValue::Int64(2)], row(2, 2))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(3)], row(3, 3))
.await
.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine.clone(), boundary);
let batch = cur.next_batch(100).unwrap();
assert_eq!(batch.len(), 2);
assert!(batch.iter().all(|r| r.seq > boundary));
}
#[tokio::test]
async fn max_rows_caps_batch_and_cursor_advances_incrementally() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
for i in 1..=5i64 {
engine
.put(vec![FieldValue::Int64(i)], row(i, i * 10))
.await
.unwrap();
}
let mut cur = ChangeFeedCursor::from_engine(engine.clone(), 0);
let b1 = cur.next_batch(2).unwrap();
assert_eq!(b1.len(), 2);
let after_b1 = cur.since_seq();
let b2 = cur.next_batch(2).unwrap();
assert_eq!(b2.len(), 2);
assert!(
b2[0].seq > after_b1,
"next batch must continue past the previous since_seq"
);
let b3 = cur.next_batch(2).unwrap();
assert_eq!(b3.len(), 1);
let b4 = cur.next_batch(2).unwrap();
assert_eq!(b4.len(), 0);
}
#[tokio::test]
async fn empty_memtable_returns_empty_batch_not_error() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
assert!(cur.next_batch(10).unwrap().is_empty());
}
#[tokio::test]
async fn delete_op_carries_pre_image_phase_2c() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(7)], row(7, 777))
.await
.unwrap();
engine.delete(vec![FieldValue::Int64(7)]).await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
let batch = cur.next_batch(10).unwrap();
let del = batch
.iter()
.find(|r| matches!(r.op, ChangeOp::Delete))
.unwrap();
assert_eq!(del.row.fields.len(), 2);
match del.row.fields.get(1).and_then(|f| f.as_ref()) {
Some(FieldValue::Int64(n)) => assert_eq!(*n, 777),
other => panic!("unexpected pre-image v field: {other:?}"),
}
}