#![cfg(feature = "sql")]
use std::sync::Arc;
use merutable::engine::{config::EngineConfig, engine::MeruEngine};
use merutable::sql::ChangeFeedCursor;
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
fn test_schema() -> TableSchema {
TableSchema {
table_name: "cf-phase2b".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
async fn open_engine(tmp: &tempfile::TempDir) -> Arc<MeruEngine> {
let cfg = EngineConfig {
schema: test_schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
..Default::default()
};
MeruEngine::open(cfg).await.unwrap()
}
fn row(id: i64, v: i64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Int64(v)),
])
}
#[tokio::test]
async fn flushed_ops_remain_visible_via_l0_scan() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
for i in 1..=3i64 {
engine
.put(vec![FieldValue::Int64(i)], row(i, i * 10))
.await
.unwrap();
}
engine.flush().await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine.clone(), 0);
let batch = cur.next_batch(100).unwrap();
assert_eq!(
batch.len(),
3,
"ops flushed to L0 must still appear in the change feed"
);
for w in batch.windows(2) {
assert!(w[0].seq < w[1].seq);
}
}
#[tokio::test]
async fn mixed_memtable_and_l0_interleave_in_seq_order() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(1)], row(1, 1))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(2)], row(2, 2))
.await
.unwrap();
engine.flush().await.unwrap();
engine
.put(vec![FieldValue::Int64(3)], row(3, 3))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(4)], row(4, 4))
.await
.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine.clone(), 0);
let batch = cur.next_batch(100).unwrap();
assert_eq!(batch.len(), 4, "L0 + memtable ops all appear");
for w in batch.windows(2) {
assert!(w[0].seq < w[1].seq);
}
}
#[tokio::test]
async fn since_seq_boundary_respected_across_l0_and_memtable() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(1)], row(1, 1))
.await
.unwrap();
engine.flush().await.unwrap();
let boundary_after_flush = engine.read_seq().0;
engine
.put(vec![FieldValue::Int64(2)], row(2, 2))
.await
.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, boundary_after_flush);
let batch = cur.next_batch(100).unwrap();
assert_eq!(batch.len(), 1);
assert!(batch[0].seq > boundary_after_flush);
}