use std::pin::pin;
use futures::future::join_all;
use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::{Runtime, block_on, yield_now};
use super::*;
async fn tick_until<I: Io, F: Future>(wal: &mut Wal<I>, fut: F, max_ticks: usize) -> F::Output {
let mut fut = pin!(fut);
let mut n = 0;
while n < max_ticks {
trace!(n, "TICK");
wal.tick().await.unwrap();
yield_now().await;
if let Poll::Ready(val) = poll_fn(|cx| Poll::Ready(fut.as_mut().poll(cx))).await {
return val;
}
n += 1;
}
panic!("timed out: operation exceeded {} ticks", max_ticks);
}
#[test]
fn test_wal_append_acked() {
setup_tracing();
block_on(VirtualIo::default(), async {
let mut wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |_| {})
.await
.unwrap();
let ack = wal.append(b"hello").recv();
let result = tick_until(&mut wal, ack, 32).await;
result.expect("channel not closed").unwrap();
wal.close().await.unwrap();
});
}
#[test]
fn test_wal_multi_append() {
setup_tracing();
block_on(VirtualIo::default(), async {
let mut wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |_| {})
.await
.unwrap();
let acks = [
wal.append(b"hello"),
wal.append(b"there"),
wal.append(b"world"),
]
.map(|ack| ack.recv());
let acks_future = join_all(acks);
let results = tick_until(&mut wal, acks_future, 32).await;
for res in results {
res.expect("channel not closed").unwrap()
}
wal.close().await.unwrap();
});
}
#[test]
fn test_wal_rotation() {
setup_tracing();
let config = WalConfig {
rotate_file_size_threshold: WAL_HEADER_SIZE as u64 + 1,
..WalConfig::default()
};
let mut rt = Runtime::new(VirtualIo::default());
rt.block_on(async {
let mut wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), config, |_| {})
.await
.unwrap();
let ack1 = wal.append(b"hello");
let ack2 = wal.append(b"world");
let results = tick_until(&mut wal, join_all([ack1, ack2].map(|a| a.recv())), 32).await;
for res in results {
res.expect("channel not closed").unwrap();
}
while wal.is_idle() {
wal.tick().await.unwrap();
yield_now().await;
}
wal.close().await.unwrap();
});
let io = rt.inspect_io();
let files = io.list_dir(Path::new("/wal/files")).unwrap();
assert!(
files
.iter()
.any(|e| e.path == PathBuf::from("/wal/files/1.wal")),
"expected rotation to produce /wal/files/1.wal, got: {:?}",
files.iter().map(|e| &e.path).collect::<Vec<_>>()
);
}
#[test]
fn test_wal_recovery() {
setup_tracing();
let records: &[&[u8]] = &[b"hello", b"world", b"foo"];
block_on(VirtualIo::default(), async {
{
let mut wal =
Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |_| {})
.await
.unwrap();
let acks: Vec<_> = records.iter().map(|r| wal.append(r).recv()).collect();
let results = tick_until(&mut wal, join_all(acks), 32).await;
for res in results {
res.expect("channel not closed").unwrap();
}
wal.close().await.unwrap();
}
{
let mut recovered: Vec<Vec<u8>> = Vec::new();
let wal = Wal::<VirtualIo>::init(PathBuf::from("/wal"), WalConfig::default(), |rec| {
recovered.push(rec.to_vec())
})
.await
.unwrap();
assert_eq!(
recovered.len(),
records.len(),
"recovered record count mismatch"
);
for (got, expected) in recovered.iter().zip(records.iter()) {
assert_eq!(got.as_slice(), *expected, "recovered record data mismatch");
}
wal.close().await.unwrap();
}
});
}