use bytes::Bytes;
use seerdb::memtable::Memtable;
use seerdb::sstable::SSTable;
use seerdb::wal::{reader::WALReader, Record, SyncPolicy, WAL};
use tempfile::tempdir;
#[test]
fn test_wal_memtable_integration() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncAll).unwrap();
let memtable = Memtable::new(1024 * 1024);
let records = vec![
Record::Put {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
seq: 1,
},
Record::Put {
key: Bytes::from("key2"),
value: Bytes::from("value2"),
seq: 2,
},
Record::Delete {
key: Bytes::from("key3"),
seq: 3,
},
];
for record in &records {
wal.write(record).unwrap();
match record {
Record::Put { key, value, seq } => {
memtable.put(key.clone(), value.clone(), *seq);
}
Record::Delete { key, seq } => {
memtable.delete(key.clone(), *seq);
}
Record::Merge { key, operand, seq } => {
memtable.merge(key.clone(), operand.clone(), *seq);
}
Record::Batch {
base_seq,
operations,
} => {
let mut seq = *base_seq;
for op in operations {
match op {
seerdb::wal::BatchOp::Put { key, value } => {
memtable.put(key.clone(), value.clone(), seq);
seq += 1;
}
seerdb::wal::BatchOp::Delete { key } => {
memtable.delete(key.clone(), seq);
seq += 1;
}
seerdb::wal::BatchOp::Merge { key, operand } => {
memtable.merge(key.clone(), operand.clone(), seq);
seq += 1;
}
}
}
}
}
}
assert_eq!(
memtable.get(b"key1", u64::MAX).map(|(v, _)| v),
Some(Bytes::from("value1"))
);
assert_eq!(
memtable.get(b"key2", u64::MAX).map(|(v, _)| v),
Some(Bytes::from("value2"))
);
assert_eq!(memtable.get(b"key3", u64::MAX), None);
let sstable_path = dir.path().join("flush.sst");
memtable.flush(&sstable_path).unwrap();
let mut sstable = SSTable::open(&sstable_path).unwrap();
assert!(sstable.len() > 0);
}
#[test]
fn test_crash_recovery() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
let sstable_path = dir.path().join("flush.sst");
{
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncAll).unwrap();
let memtable = Memtable::new(1024 * 1024);
for i in 0..100u64 {
let key = format!("key_{}", i);
let value = format!("value_{}", i);
let record = Record::Put {
key: Bytes::from(key.clone()),
value: Bytes::from(value.clone()),
seq: i + 1,
};
wal.write(&record).unwrap();
memtable.put(Bytes::from(key), Bytes::from(value), i + 1);
}
memtable.flush(&sstable_path).unwrap();
}
let mut reader = WALReader::open(&wal_path).unwrap();
let records = reader.read_all().unwrap();
assert_eq!(records.len(), 100);
let memtable = Memtable::new(1024 * 1024);
for record in records {
match record {
Record::Put { key, value, seq } => {
memtable.put(key, value, seq);
}
Record::Delete { key, seq } => {
memtable.delete(key, seq);
}
Record::Merge { key, operand, seq } => {
memtable.merge(key, operand, seq);
}
Record::Batch {
base_seq,
operations,
} => {
let mut seq = base_seq;
for op in operations {
match op {
seerdb::wal::BatchOp::Put { key, value } => {
memtable.put(key, value, seq);
seq += 1;
}
seerdb::wal::BatchOp::Delete { key } => {
memtable.delete(key, seq);
seq += 1;
}
seerdb::wal::BatchOp::Merge { key, operand } => {
memtable.merge(key, operand, seq);
seq += 1;
}
}
}
}
}
}
for i in 0..100 {
let key = format!("key_{}", i);
let value = format!("value_{}", i);
assert_eq!(
memtable.get(key.as_bytes(), u64::MAX).map(|(v, _)| v),
Some(Bytes::from(value))
);
}
let sstable = SSTable::open(&sstable_path).unwrap();
assert!(sstable.len() > 0);
}
#[test]
fn test_write_flush_recover_cycle() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncData).unwrap();
let memtable = Memtable::new(100);
for i in 0..10u64 {
let key = format!("key_{}", i);
let value = format!("value_with_long_data_{}", i);
let record = Record::Put {
key: Bytes::from(key.clone()),
value: Bytes::from(value.clone()),
seq: i + 1,
};
wal.write(&record).unwrap();
memtable.put(Bytes::from(key), Bytes::from(value), i + 1);
if memtable.should_flush() {
let sstable_path = dir.path().join(format!("sstable_{}.sst", i));
memtable.flush(&sstable_path).unwrap();
}
}
}
{
let mut reader = WALReader::open(&wal_path).unwrap();
let records = reader.read_all().unwrap();
let memtable = Memtable::new(100);
for record in records {
match record {
Record::Put { key, value, seq } => {
memtable.put(key, value, seq);
}
Record::Delete { key, seq } => {
memtable.delete(key, seq);
}
Record::Merge { key, operand, seq } => {
memtable.merge(key, operand, seq);
}
Record::Batch {
base_seq,
operations,
} => {
let mut seq = base_seq;
for op in operations {
match op {
seerdb::wal::BatchOp::Put { key, value } => {
memtable.put(key, value, seq);
seq += 1;
}
seerdb::wal::BatchOp::Delete { key } => {
memtable.delete(key, seq);
seq += 1;
}
seerdb::wal::BatchOp::Merge { key, operand } => {
memtable.merge(key, operand, seq);
seq += 1;
}
}
}
}
}
}
for i in 0..10 {
let key = format!("key_{}", i);
assert!(memtable.get(key.as_bytes(), u64::MAX).is_some());
}
}
}
#[test]
fn test_delete_in_wal_and_memtable() {
let dir = tempdir().unwrap();
let wal_path = dir.path().join("test.wal");
{
let mut wal = WAL::create(&wal_path, SyncPolicy::SyncAll).unwrap();
let memtable = Memtable::new(1024);
wal.write(&Record::Put {
key: Bytes::from("key1"),
value: Bytes::from("value1"),
seq: 1,
})
.unwrap();
memtable.put(Bytes::from("key1"), Bytes::from("value1"), 1);
wal.write(&Record::Delete {
key: Bytes::from("key1"),
seq: 2,
})
.unwrap();
memtable.delete(Bytes::from("key1"), 2);
assert_eq!(memtable.get(b"key1", u64::MAX), None);
}
{
let mut reader = WALReader::open(&wal_path).unwrap();
let records = reader.read_all().unwrap();
let memtable = Memtable::new(1024);
for record in records {
match record {
Record::Put { key, value, seq } => {
memtable.put(key, value, seq);
}
Record::Delete { key, seq } => {
memtable.delete(key, seq);
}
Record::Merge { key, operand, seq } => {
memtable.merge(key, operand, seq);
}
Record::Batch {
base_seq,
operations,
} => {
let mut seq = base_seq;
for op in operations {
match op {
seerdb::wal::BatchOp::Put { key, value } => {
memtable.put(key, value, seq);
seq += 1;
}
seerdb::wal::BatchOp::Delete { key } => {
memtable.delete(key, seq);
seq += 1;
}
seerdb::wal::BatchOp::Merge { key, operand } => {
memtable.merge(key, operand, seq);
seq += 1;
}
}
}
}
}
}
assert_eq!(memtable.get(b"key1", u64::MAX), None);
}
}