#![cfg(feature = "replica")]
use std::sync::Arc;
use merutable::replica::{InProcessLogSource, Replica};
use merutable::types::{
schema::{ColumnDef, ColumnType, TableSchema},
value::{FieldValue, Row},
};
use merutable::MeruDB;
fn schema() -> TableSchema {
TableSchema {
table_name: "replica-compose-test".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "v".into(),
col_type: ColumnType::Int64,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
}
}
async fn open_primary(tmp: &tempfile::TempDir) -> Arc<MeruDB> {
Arc::new(
MeruDB::open(
merutable::OpenOptions::new(schema())
.wal_dir(tmp.path().join("wal"))
.catalog_uri(tmp.path().to_string_lossy().to_string()),
)
.await
.unwrap(),
)
}
fn row(id: i64, v: i64) -> Row {
Row::new(vec![
Some(FieldValue::Int64(id)),
Some(FieldValue::Int64(v)),
])
}
#[tokio::test]
async fn replica_serves_from_tail_when_present() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(1, 100)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
primary.put(row(2, 200)).await.unwrap();
replica.advance().await.unwrap();
let r1 = replica.get(&[FieldValue::Int64(1)]).await.unwrap();
assert!(r1.is_some(), "base reads resolve unchanged entries");
let r2 = replica.get(&[FieldValue::Int64(2)]).await.unwrap();
assert!(r2.is_some(), "tail surfaces post-open writes without flush");
}
#[tokio::test]
async fn replica_falls_through_to_base_on_tail_miss() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(42, 9999)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
assert_eq!(replica.visible_seq().await, replica.base_seq());
let r = replica.get(&[FieldValue::Int64(42)]).await.unwrap();
assert!(r.is_some(), "tail miss falls through to base");
}
#[tokio::test]
async fn tail_delete_is_authoritative_over_base_row() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(1, 111)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
let before = replica.get(&[FieldValue::Int64(1)]).await.unwrap();
assert!(before.is_some());
primary.delete(vec![FieldValue::Int64(1)]).await.unwrap();
replica.advance().await.unwrap();
let after = replica.get(&[FieldValue::Int64(1)]).await.unwrap();
assert!(
after.is_none(),
"tail Delete must shadow the (stale) base row"
);
}
#[tokio::test]
async fn rebase_advances_base_and_resets_tail() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(1, 10)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
let base_seq_v1 = replica.base_seq();
primary.put(row(2, 20)).await.unwrap();
primary.flush().await.unwrap();
assert_eq!(replica.base_seq(), base_seq_v1);
let pre_rebase = replica.get(&[FieldValue::Int64(2)]).await.unwrap();
assert!(pre_rebase.is_none(), "key=2 not yet visible before rebase");
replica.rebase().await.unwrap();
assert!(
replica.base_seq() > base_seq_v1,
"rebase advances base_seq: {} -> {}",
base_seq_v1,
replica.base_seq()
);
let post_rebase = replica.get(&[FieldValue::Int64(2)]).await.unwrap();
assert!(post_rebase.is_some(), "key=2 visible after rebase");
assert!(
replica
.get(&[FieldValue::Int64(1)])
.await
.unwrap()
.is_some(),
"pre-existing key stays visible"
);
}
#[tokio::test]
async fn hotswap_returns_warm_state_atomically() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(1, 10)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
let old_base_seq = replica.base_seq();
primary.put(row(2, 20)).await.unwrap();
primary.flush().await.unwrap();
primary.put(row(3, 30)).await.unwrap();
let (new_base_seq, visible_seq) = replica.rebase_hotswap().await.unwrap();
assert!(new_base_seq > old_base_seq, "base advanced");
assert!(
visible_seq >= 3,
"tail caught up to unflushed op: {visible_seq}"
);
for i in 1..=3i64 {
let r = replica.get(&[FieldValue::Int64(i)]).await.unwrap();
assert!(r.is_some(), "key={i} visible after hotswap");
}
}
#[tokio::test]
async fn old_state_arc_persists_across_hotswap() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(1, 10)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
let (hotswap_ok, _reads_ok): (Result<_, _>, Vec<_>) = tokio::join!(
replica.rebase_hotswap(),
futures::future::join_all(
(1..=5).map(|_| async { replica.get(&[FieldValue::Int64(1)]).await.unwrap() })
),
);
assert!(hotswap_ok.is_ok(), "hotswap does not error under read load");
}
#[tokio::test]
async fn stats_surface_advances_across_advance_and_rebase() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(1, 1)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
let s0 = replica.stats().await;
assert!(s0.base_seq > 0);
assert_eq!(
s0.visible_seq, s0.base_seq,
"tail seeded at base_seq on open"
);
assert_eq!(s0.tail_length, 0);
assert_eq!(s0.rebase_count, 0);
assert_eq!(s0.last_rebase_warmup_millis, 0);
primary.put(row(2, 2)).await.unwrap();
primary.put(row(3, 3)).await.unwrap();
replica.advance().await.unwrap();
let s1 = replica.stats().await;
assert!(
s1.visible_seq >= 3,
"visible advanced to >=3: {}",
s1.visible_seq
);
assert_eq!(s1.tail_length, 2, "2 tail ops absorbed");
primary.flush().await.unwrap();
let _ = replica.rebase_hotswap().await.unwrap();
let s2 = replica.stats().await;
assert_eq!(s2.rebase_count, 1);
let _ = s2.last_rebase_warmup_millis;
assert!(s2.base_seq > s0.base_seq, "base advanced through rebase");
assert_eq!(s2.tail_length, 0, "fresh tail after rebase is empty");
}
#[tokio::test]
async fn replica_base_seq_and_visible_seq_track_independently() {
let primary_dir = tempfile::tempdir().unwrap();
let replica_dir = tempfile::tempdir().unwrap();
let primary = open_primary(&primary_dir).await;
primary.put(row(1, 1)).await.unwrap();
primary.flush().await.unwrap();
let base_opts = merutable::OpenOptions::new(schema())
.wal_dir(replica_dir.path().join("wal"))
.catalog_uri(primary_dir.path().to_string_lossy().to_string());
let log = Arc::new(InProcessLogSource::new(primary.clone()));
let replica = Replica::open(base_opts, log).await.unwrap();
let initial_base_seq = replica.base_seq();
for i in 2..=5 {
primary.put(row(i, i)).await.unwrap();
}
replica.advance().await.unwrap();
assert!(replica.visible_seq().await >= 5);
assert_eq!(replica.base_seq(), initial_base_seq);
}