#![cfg(feature = "sql")]
use std::sync::Arc;
use merutable::engine::{config::EngineConfig, engine::MeruEngine};
use merutable::sql::{ChangeFeedCursor, ChangeOp};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
fn test_schema() -> TableSchema {
TableSchema {
table_name: "cf-phase2c".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
async fn open_engine(tmp: &tempfile::TempDir) -> Arc<MeruEngine> {
open_engine_with_l0_trigger(tmp, 4).await
}
async fn open_engine_with_l0_trigger(
tmp: &tempfile::TempDir,
l0_compaction_trigger: usize,
) -> Arc<MeruEngine> {
let cfg = EngineConfig {
schema: test_schema(),
catalog_uri: tmp.path().to_string_lossy().to_string(),
object_store_prefix: tmp.path().to_string_lossy().to_string(),
wal_dir: tmp.path().join("wal"),
l0_compaction_trigger,
..Default::default()
};
MeruEngine::open(cfg).await.unwrap()
}
fn row(id: i64, v: i64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Int64(v)),
])
}
#[tokio::test]
async fn insert_vs_update_distinguished_by_prior_state() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(1)], row(1, 1))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(1)], row(1, 2))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(2)], row(2, 20))
.await
.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
let batch = cur.next_batch(100).unwrap();
assert_eq!(batch.len(), 3);
assert_eq!(batch[0].op, ChangeOp::Insert, "first put of id=1 is Insert");
assert_eq!(
batch[1].op,
ChangeOp::Update,
"second put of id=1 is Update"
);
assert_eq!(batch[2].op, ChangeOp::Insert, "first put of id=2 is Insert");
}
#[tokio::test]
async fn delete_pre_image_reconstructed_from_memtable() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(42)], row(42, 9999))
.await
.unwrap();
engine.delete(vec![FieldValue::Int64(42)]).await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
let batch = cur.next_batch(10).unwrap();
let del = batch
.iter()
.find(|r| matches!(r.op, ChangeOp::Delete))
.unwrap();
assert_eq!(del.row.fields.len(), 2);
match del.row.fields.first().and_then(|f| f.as_ref()) {
Some(FieldValue::Int64(n)) => assert_eq!(*n, 42),
other => panic!("pre-image id: {other:?}"),
}
match del.row.fields.get(1).and_then(|f| f.as_ref()) {
Some(FieldValue::Int64(n)) => assert_eq!(*n, 9999),
other => panic!("pre-image v: {other:?}"),
}
}
#[tokio::test]
async fn delete_pre_image_reconstructed_across_l0_flush() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(5)], row(5, 55))
.await
.unwrap();
engine.flush().await.unwrap();
engine.delete(vec![FieldValue::Int64(5)]).await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
let batch = cur.next_batch(10).unwrap();
let del = batch
.iter()
.find(|r| matches!(r.op, ChangeOp::Delete))
.unwrap();
assert_eq!(
del.row.fields.len(),
2,
"pre-image reconstructed across flush boundary"
);
match del.row.fields.get(1).and_then(|f| f.as_ref()) {
Some(FieldValue::Int64(n)) => assert_eq!(*n, 55),
other => panic!("pre-image v: {other:?}"),
}
}
#[tokio::test]
async fn skip_update_discrimination_tags_every_put_as_insert() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(1)], row(1, 1))
.await
.unwrap();
engine
.put(vec![FieldValue::Int64(1)], row(1, 2))
.await
.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0).skip_update_discrimination(true);
let batch = cur.next_batch(10).unwrap();
for rec in &batch {
assert_eq!(
rec.op,
ChangeOp::Insert,
"skip_update_discrimination forces Insert for every Put"
);
}
}
#[tokio::test]
async fn issue_33_delete_pre_image_survives_flush() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine
.put(vec![FieldValue::Int64(7)], row(7, 12345))
.await
.unwrap();
engine.flush().await.unwrap();
engine.delete(vec![FieldValue::Int64(7)]).await.unwrap();
engine.flush().await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
let batch = cur.next_batch(100).unwrap();
let del = batch
.iter()
.find(|r| matches!(r.op, ChangeOp::Delete))
.expect("delete op present in feed");
assert!(
!del.row.fields.is_empty(),
"#33 regression: DELETE at seq {} returned empty pre-image",
del.seq
);
match del.row.fields.first().and_then(|f| f.as_ref()) {
Some(FieldValue::Int64(n)) => assert_eq!(*n, 7, "pre-image id"),
other => panic!("pre-image id: {other:?}"),
}
match del.row.fields.get(1).and_then(|f| f.as_ref()) {
Some(FieldValue::Int64(n)) => assert_eq!(*n, 12345, "pre-image v"),
other => panic!("pre-image v: {other:?}"),
}
}
#[tokio::test]
async fn issue_33_same_batch_put_then_delete_surfaces_put_as_pre_image() {
use merutable::engine::write_path::{apply_batch, MutationBatch};
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
let mut batch = MutationBatch::new();
batch.put(vec![FieldValue::Int64(9)], row(9, 999));
batch.delete(vec![FieldValue::Int64(9)]);
apply_batch(&engine, batch).await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
let recs = cur.next_batch(100).unwrap();
let del = recs
.iter()
.find(|r| matches!(r.op, ChangeOp::Delete))
.expect("delete in batch");
match del.row.fields.get(1).and_then(|f| f.as_ref()) {
Some(FieldValue::Int64(n)) => assert_eq!(*n, 999),
other => panic!("same-batch delete pre-image v: {other:?}"),
}
}
#[tokio::test]
async fn delete_without_prior_state_returns_empty_pre_image() {
let tmp = tempfile::tempdir().unwrap();
let engine = open_engine(&tmp).await;
engine.delete(vec![FieldValue::Int64(999)]).await.unwrap();
let mut cur = ChangeFeedCursor::from_engine(engine, 0);
let batch = cur.next_batch(10).unwrap();
let del = batch
.iter()
.find(|r| matches!(r.op, ChangeOp::Delete))
.unwrap();
assert_eq!(del.row.fields.len(), 0);
}