use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::{stream::BoxStream, StreamExt};
use log::{error, trace};
use tokio::{runtime::Handle, select, sync::oneshot};
use tracing::instrument;
use crate::clock::MonotonicClock;
use crate::db_state::SsTableId;
use crate::db_stats::DbStats;
use crate::db_status::ClosedResultWriter;
use crate::dispatcher::{MessageFactory, MessageHandler, MessageHandlerExecutor};
use crate::error::SlateDBError;
use crate::oracle::{DbOracle, Oracle};
use crate::tablestore::TableStore;
use crate::types::RowEntry;
use crate::utils::SafeSender;
use crate::utils::{format_bytes_si, WatchableOnceCell, WatchableOnceCellReader};
use crate::wal_id::WalIdStore;
pub(crate) const WAL_BUFFER_TASK_NAME: &str = "wal_writer";
pub(crate) struct WalBufferManager {
inner: Arc<parking_lot::RwLock<WalBufferManagerInner>>,
wal_id_incrementor: Arc<dyn WalIdStore + Send + Sync>,
status_manager: crate::db_status::DbStatusManager,
db_stats: DbStats,
mono_clock: Arc<MonotonicClock>,
table_store: Arc<TableStore>,
max_wal_bytes_size: usize,
max_flush_interval: Option<Duration>,
last_flush_requested_epoch: AtomicU64,
}
struct WalBufferManagerInner {
current_wal: WalBuffer,
immutable_wals: VecDeque<(u64, Arc<WalBuffer>)>,
flush_tx: Option<SafeSender<WalFlushWork>>,
task_executor: Option<Arc<MessageHandlerExecutor>>,
last_applied_seq: Option<u64>,
flush_epoch: u64,
recent_flushed_wal_id: u64,
oracle: Arc<DbOracle>,
}
struct WalBuffer {
entries: VecDeque<RowEntry>,
durable: WatchableOnceCell<Result<(), SlateDBError>>,
last_tick: i64,
last_seq: u64,
entries_size: usize,
}
struct WalBufferIterator {
entries: std::vec::IntoIter<RowEntry>,
}
impl WalBufferManager {
pub(crate) fn new(
wal_id_incrementor: Arc<dyn WalIdStore + Send + Sync>,
status_manager: crate::db_status::DbStatusManager,
db_stats: DbStats,
recent_flushed_wal_id: u64,
oracle: Arc<DbOracle>,
table_store: Arc<TableStore>,
mono_clock: Arc<MonotonicClock>,
max_wal_bytes_size: usize,
max_flush_interval: Option<Duration>,
) -> Self {
let current_wal = WalBuffer::new();
let immutable_wals = VecDeque::new();
let inner = WalBufferManagerInner {
current_wal,
immutable_wals,
last_applied_seq: None,
flush_epoch: 1,
recent_flushed_wal_id,
flush_tx: None,
task_executor: None,
oracle,
};
Self {
inner: Arc::new(parking_lot::RwLock::new(inner)),
wal_id_incrementor,
status_manager,
db_stats,
table_store,
mono_clock,
max_wal_bytes_size,
max_flush_interval,
last_flush_requested_epoch: AtomicU64::new(0),
}
}
pub(crate) async fn init(
self: &Arc<Self>,
task_executor: Arc<MessageHandlerExecutor>,
) -> Result<(), SlateDBError> {
let (flush_tx, flush_rx) =
SafeSender::unbounded_channel(self.status_manager.result_reader());
{
let mut inner = self.inner.write();
inner.flush_tx = Some(flush_tx);
}
let wal_flush_handler = WalFlushHandler {
max_flush_interval: self.max_flush_interval,
wal_buffer_manager: self.clone(),
};
let result = task_executor.add_handler(
WAL_BUFFER_TASK_NAME.to_string(),
Box::new(wal_flush_handler),
flush_rx,
&Handle::current(),
);
{
let mut inner = self.inner.write();
inner.task_executor = Some(task_executor);
}
result
}
#[cfg(test)]
pub(crate) fn buffered_wal_entries_count(&self) -> usize {
let guard = self.inner.read();
let flushing_wal_entries_count = guard
.immutable_wals
.iter()
.map(|(_, wal)| wal.len())
.sum::<usize>();
guard.current_wal.len() + flushing_wal_entries_count
}
pub(crate) fn recent_flushed_wal_id(&self) -> u64 {
let inner = self.inner.read();
inner.recent_flushed_wal_id
}
pub(crate) fn advance_recent_flushed_wal_id(&self, wal_id: u64) {
let mut inner = self.inner.write();
if wal_id > inner.recent_flushed_wal_id {
inner.recent_flushed_wal_id = wal_id;
}
}
#[cfg(test)] pub(crate) fn is_empty(&self) -> bool {
let inner = self.inner.read();
inner.current_wal.is_empty() && inner.immutable_wals.is_empty()
}
pub(crate) fn estimated_bytes(&self) -> Result<usize, SlateDBError> {
let inner = self.inner.read();
let current_wal_size = self
.table_store
.estimate_encoded_size_wal(inner.current_wal.len(), inner.current_wal.size());
let imm_wal_size = inner
.immutable_wals
.iter()
.map(|(_, wal)| {
self.table_store
.estimate_encoded_size_wal(wal.len(), wal.size())
})
.sum::<usize>();
Ok(current_wal_size + imm_wal_size)
}
pub(crate) fn append(
&self,
entries: &[RowEntry],
) -> Result<WatchableOnceCellReader<Result<(), SlateDBError>>, SlateDBError> {
let mut inner = self.inner.write();
for entry in entries {
inner.current_wal.append(entry.clone());
}
Ok(inner.current_wal.durable_watcher())
}
pub(crate) fn maybe_trigger_flush(
&self,
) -> Result<WatchableOnceCellReader<Result<(), SlateDBError>>, SlateDBError> {
let (durable_watcher, need_flush, flush_epoch) = {
let inner = self.inner.read();
let current_wal_size = self
.table_store
.estimate_encoded_size_wal(inner.current_wal.len(), inner.current_wal.size());
trace!(
"checking flush trigger [current_wal_size={}, max_wal_bytes_size={}]",
format_bytes_si(current_wal_size as u64),
format_bytes_si(self.max_wal_bytes_size as u64),
);
let need_flush = current_wal_size >= self.max_wal_bytes_size;
(
inner.current_wal.durable_watcher(),
need_flush,
inner.flush_epoch,
)
};
if need_flush {
let last = self.last_flush_requested_epoch.load(Ordering::Relaxed);
if last < flush_epoch
&& self
.last_flush_requested_epoch
.compare_exchange(last, flush_epoch, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.send_flush_request(None)?;
}
}
let estimated_bytes = self.estimated_bytes()?;
self.db_stats
.wal_buffer_estimated_bytes
.set(estimated_bytes as i64);
Ok(durable_watcher)
}
pub(crate) fn watcher_for_oldest_unflushed_wal(
&self,
) -> Option<WatchableOnceCellReader<Result<(), SlateDBError>>> {
let guard = self.inner.read();
if let Some((_, wal)) = guard.immutable_wals.front() {
Some(wal.durable_watcher())
} else if !guard.current_wal.is_empty() {
Some(guard.current_wal.durable_watcher())
} else {
None
}
}
fn send_flush_request(
&self,
result_tx: Option<oneshot::Sender<Result<(), SlateDBError>>>,
) -> Result<(), SlateDBError> {
self.db_stats.wal_buffer_flush_requests.increment(1);
let flush_tx = self
.inner
.read()
.flush_tx
.clone()
.expect("flush_tx not initialized, please call init first.");
flush_tx.send(WalFlushWork { result_tx })
}
#[instrument(level = "trace", skip_all, err(level = tracing::Level::DEBUG))]
pub(crate) async fn flush(&self) -> Result<(), SlateDBError> {
let (result_tx, result_rx) = oneshot::channel();
self.send_flush_request(Some(result_tx))?;
select! {
result = result_rx => {
result?
}
}
}
fn flushing_wals(&self) -> Vec<(u64, Arc<WalBuffer>)> {
let inner = self.inner.read();
let mut flushing_wals = Vec::new();
for (wal_id, wal) in inner.immutable_wals.iter() {
if *wal_id > inner.recent_flushed_wal_id {
flushing_wals.push((*wal_id, wal.clone()));
}
}
flushing_wals
}
#[instrument(level = "trace", skip_all, err(level = tracing::Level::DEBUG))]
async fn do_flush(&self) -> Result<(), SlateDBError> {
self.freeze_current_wal()?;
let flushing_wals = self.flushing_wals();
if flushing_wals.is_empty() {
return Ok(());
}
for (wal_id, wal) in flushing_wals.iter() {
let result = self.do_flush_one_wal(*wal_id, wal.clone()).await;
if let Err(e) = &result {
error!("failed to flush WAL [wal_id={}]", wal_id);
return Err(e.clone());
}
{
let mut inner = self.inner.write();
inner.recent_flushed_wal_id = *wal_id;
if let Some(seq) = wal.last_seq() {
inner.oracle.advance_durable_seq(seq);
}
}
wal.notify_durable(result.clone());
}
self.maybe_release_immutable_wals();
Ok(())
}
async fn do_flush_one_wal(&self, wal_id: u64, wal: Arc<WalBuffer>) -> Result<(), SlateDBError> {
self.db_stats.wal_buffer_flushes.increment(1);
let mut sst_builder = self.table_store.wal_table_builder();
let (mut iter, last_tick) = (wal.iter(), wal.last_tick());
while let Some(entry) = iter.next() {
sst_builder.add(entry).await?;
}
let encoded_sst = sst_builder.build().await?;
self.table_store
.write_sst(&SsTableId::Wal(wal_id), encoded_sst, false)
.await?;
self.mono_clock.fetch_max_last_durable_tick(last_tick);
Ok(())
}
fn freeze_current_wal(&self) -> Result<(), SlateDBError> {
let is_empty = self.inner.read().current_wal.is_empty();
if is_empty {
return Ok(());
}
let next_wal_id = self.wal_id_incrementor.next_wal_id();
let mut inner = self.inner.write();
let current_wal = std::mem::replace(&mut inner.current_wal, WalBuffer::new());
inner.flush_epoch += 1;
inner
.immutable_wals
.push_back((next_wal_id, Arc::new(current_wal)));
Ok(())
}
pub(crate) fn track_last_applied_seq(&self, seq: u64) {
{
let mut inner = self.inner.write();
inner.last_applied_seq = Some(seq);
}
self.maybe_release_immutable_wals();
}
fn maybe_release_immutable_wals(&self) {
let mut inner = self.inner.write();
let last_applied_seq = match inner.last_applied_seq {
Some(seq) => seq,
None => return,
};
let last_flushed_seq = inner.oracle.last_remote_persisted_seq();
let mut releaseable_count = 0;
for (_, wal) in inner.immutable_wals.iter() {
if wal
.last_seq()
.map(|seq| seq <= last_applied_seq && seq <= last_flushed_seq)
.unwrap_or(false)
{
releaseable_count += 1;
} else {
break;
}
}
if releaseable_count > 0 {
trace!(
"draining immutable wals [releaseable_count={}]",
releaseable_count
);
inner.immutable_wals.drain(..releaseable_count);
}
}
#[allow(dead_code)]
pub(crate) async fn close(&self) -> Result<(), SlateDBError> {
let task_executor = {
let inner = self.inner.read();
inner
.task_executor
.clone()
.expect("task executor should be initialized")
};
task_executor.shutdown_task(WAL_BUFFER_TASK_NAME).await
}
}
impl WalBuffer {
fn new() -> Self {
Self {
entries: VecDeque::new(),
durable: WatchableOnceCell::new(),
last_tick: i64::MIN,
last_seq: 0,
entries_size: 0,
}
}
fn append(&mut self, entry: RowEntry) {
if let Some(ts) = entry.create_ts {
self.last_tick = ts;
}
self.last_seq = entry.seq;
self.entries_size += entry.estimated_size();
self.entries.push_back(entry);
}
fn iter(&self) -> WalBufferIterator {
WalBufferIterator::new(self)
}
fn durable_watcher(&self) -> WatchableOnceCellReader<Result<(), SlateDBError>> {
self.durable.reader()
}
#[cfg(test)]
async fn await_durable(&self) -> Result<(), SlateDBError> {
self.durable.reader().await_value().await
}
fn notify_durable(&self, result: Result<(), SlateDBError>) {
self.durable.write(result);
}
fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn len(&self) -> usize {
self.entries.len()
}
fn size(&self) -> usize {
self.entries_size
}
fn last_seq(&self) -> Option<u64> {
if self.last_seq == 0 {
None
} else {
Some(self.last_seq)
}
}
fn last_tick(&self) -> i64 {
self.last_tick
}
}
impl WalBufferIterator {
pub(crate) fn new(wal_buffer: &WalBuffer) -> Self {
let entries = wal_buffer.entries.iter().cloned().collect::<Vec<_>>();
Self {
entries: entries.into_iter(),
}
}
pub(crate) fn next(&mut self) -> Option<RowEntry> {
self.entries.next()
}
}
#[derive(Debug)]
struct WalFlushWork {
result_tx: Option<oneshot::Sender<Result<(), SlateDBError>>>,
}
struct WalFlushHandler {
max_flush_interval: Option<Duration>,
wal_buffer_manager: Arc<WalBufferManager>,
}
#[async_trait]
impl MessageHandler<WalFlushWork> for WalFlushHandler {
fn tickers(&mut self) -> Vec<(Duration, Box<MessageFactory<WalFlushWork>>)> {
if let Some(max_flush_interval) = self.max_flush_interval {
return vec![(
max_flush_interval,
Box::new(|| WalFlushWork { result_tx: None }),
)];
}
vec![]
}
async fn handle(&mut self, message: WalFlushWork) -> Result<(), SlateDBError> {
let WalFlushWork { result_tx } = message;
if let Some(result_tx) = result_tx {
let result = self.wal_buffer_manager.do_flush().await;
result_tx
.send(result.clone())
.expect("failed to send flush result");
result
} else {
self.wal_buffer_manager.do_flush().await
}
}
async fn cleanup(
&mut self,
mut messages: BoxStream<'async_trait, WalFlushWork>,
result: Result<(), SlateDBError>,
) -> Result<(), SlateDBError> {
let error = result.err().unwrap_or(SlateDBError::Closed);
while let Some(WalFlushWork { result_tx }) = messages.next().await {
if let Some(result_tx) = result_tx {
result_tx
.send(Err(error.clone()))
.expect("failed to send flush result");
}
}
self.wal_buffer_manager.freeze_current_wal()?;
let flushing_wals = self.wal_buffer_manager.flushing_wals();
for (_, wal) in flushing_wals.iter() {
wal.notify_durable(Err(error.clone()));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::MonotonicClock;
use crate::db_status::DbStatusManager;
use crate::format::sst::SsTableFormat;
use crate::iter::RowEntryIterator;
use crate::manifest::SsTableView;
use crate::object_stores::ObjectStores;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::tablestore::TableStore;
use crate::types::{RowEntry, ValueDeletable};
use bytes::Bytes;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use slatedb_common::clock::DefaultSystemClock;
use slatedb_common::metrics::{
lookup_metric, DefaultMetricsRecorder, MetricLevel, MetricsRecorderHelper,
};
use slatedb_common::MockSystemClock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
fn make_entry(key: &str, value: &str, seq: u64, create_ts: Option<i64>) -> RowEntry {
RowEntry::new(
Bytes::from(key.to_string()),
ValueDeletable::Value(Bytes::from(value.to_string())),
seq,
create_ts,
None,
)
}
#[test]
fn test_new_buffer_initial_state() {
let buffer = WalBuffer::new();
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
assert_eq!(buffer.size(), 0);
assert_eq!(buffer.last_seq(), None);
assert_eq!(buffer.last_tick(), i64::MIN);
}
#[test]
fn test_append_single_entry() {
let mut buffer = WalBuffer::new();
let entry = make_entry("key1", "value1", 42, Some(1000));
let expected_size = entry.estimated_size();
buffer.append(entry);
assert!(!buffer.is_empty());
assert_eq!(buffer.len(), 1);
assert_eq!(buffer.size(), expected_size);
assert_eq!(buffer.last_seq(), Some(42));
assert_eq!(buffer.last_tick(), 1000);
}
#[test]
fn test_append_multiple_entries() {
let mut buffer = WalBuffer::new();
let entry1 = make_entry("key1", "value1", 10, Some(100));
let entry2 = make_entry("key2", "value2", 20, Some(200));
let entry3 = make_entry("key3", "value3", 30, Some(300));
let entry4 = make_entry("key4", "value4", 40, None);
let size1 = entry1.estimated_size();
let size2 = entry2.estimated_size();
let size3 = entry3.estimated_size();
let size4 = entry4.estimated_size();
buffer.append(entry1);
buffer.append(entry2);
buffer.append(entry3);
buffer.append(entry4);
assert_eq!(buffer.len(), 4);
assert_eq!(buffer.size(), size1 + size2 + size3 + size4);
assert_eq!(buffer.last_seq(), Some(40));
assert_eq!(buffer.last_tick(), 300);
}
#[tokio::test]
async fn test_notify_durable_success() {
let mut buffer = WalBuffer::new();
buffer.append(make_entry("key", "value", 1, None));
buffer.notify_durable(Ok(()));
let result = buffer.await_durable().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_notify_durable_error() {
let mut buffer = WalBuffer::new();
buffer.append(make_entry("key", "value", 1, None));
buffer.notify_durable(Err(SlateDBError::Closed));
let result = buffer.await_durable().await;
assert!(matches!(result, Err(SlateDBError::Closed)));
}
#[tokio::test]
async fn test_durable_watcher_returns_reader() {
let mut buffer = WalBuffer::new();
buffer.append(make_entry("key", "value", 1, None));
let mut reader = buffer.durable_watcher();
buffer.notify_durable(Ok(()));
let result = reader.await_value().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_notify_durable_only_sets_once() {
let mut buffer = WalBuffer::new();
buffer.append(make_entry("key", "value", 1, None));
buffer.notify_durable(Ok(()));
buffer.notify_durable(Err(SlateDBError::Closed));
let result = buffer.await_durable().await;
assert!(result.is_ok());
}
#[test]
fn test_iter() {
let mut buffer = WalBuffer::new();
let mut iter = buffer.iter();
assert!(iter.next().is_none());
let entry1 = make_entry("key1", "value1", 1, Some(100));
let entry2 = make_entry("key2", "value2", 2, Some(200));
let entry3 = make_entry("key3", "value3", 3, Some(300));
buffer.append(entry1.clone());
buffer.append(entry2.clone());
buffer.append(entry3.clone());
let mut iter = buffer.iter();
let read1 = iter.next().unwrap();
assert_eq!(read1.key, entry1.key);
assert_eq!(read1.seq, entry1.seq);
let read2 = iter.next().unwrap();
assert_eq!(read2.key, entry2.key);
assert_eq!(read2.seq, entry2.seq);
let read3 = iter.next().unwrap();
assert_eq!(read3.key, entry3.key);
assert_eq!(read3.seq, entry3.seq);
assert!(iter.next().is_none());
let mut iter = buffer.iter();
buffer.append(make_entry("key4", "value4", 4, None));
let mut count = 0;
while iter.next().is_some() {
count += 1;
}
assert_eq!(count, 3);
let mut iter = buffer.iter();
let mut count = 0;
while iter.next().is_some() {
count += 1;
}
assert_eq!(count, 4);
}
#[test]
fn test_large_entry_size() {
let mut buffer = WalBuffer::new();
let large_key = "k".repeat(10_000);
let large_value = "v".repeat(100_000);
let entry = RowEntry::new(
Bytes::from(large_key),
ValueDeletable::Value(Bytes::from(large_value)),
1,
None,
None,
);
let expected_size = entry.estimated_size();
buffer.append(entry);
assert_eq!(buffer.size(), expected_size);
assert!(buffer.size() > 100_000);
}
struct MockWalIdStore {
next_id: AtomicU64,
}
impl WalIdStore for MockWalIdStore {
fn next_wal_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::SeqCst)
}
}
async fn setup_wal_buffer() -> (
Arc<WalBufferManager>,
Arc<TableStore>,
Arc<MockSystemClock>,
DbStats,
Arc<DefaultMetricsRecorder>,
) {
setup_wal_buffer_with_flush_interval(Duration::from_millis(10)).await
}
async fn setup_wal_buffer_with_flush_interval(
flush_interval: Duration,
) -> (
Arc<WalBufferManager>,
Arc<TableStore>,
Arc<MockSystemClock>,
DbStats,
Arc<DefaultMetricsRecorder>,
) {
let wal_id_store: Arc<dyn WalIdStore + Send + Sync> = Arc::new(MockWalIdStore {
next_id: AtomicU64::new(1),
});
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let table_store = Arc::new(TableStore::new(
ObjectStores::new(object_store, None),
SsTableFormat::default(),
Path::from("/root"),
None,
));
let test_clock = Arc::new(MockSystemClock::new());
let mono_clock = Arc::new(MonotonicClock::new(test_clock.clone(), 0));
let system_clock = Arc::new(DefaultSystemClock::new());
let status_manager = DbStatusManager::new(0);
let oracle = Arc::new(DbOracle::new(0, 0, 0, status_manager.clone()));
let recorder = Arc::new(DefaultMetricsRecorder::new());
let helper = MetricsRecorderHelper::new(recorder.clone(), MetricLevel::default());
let db_stats = DbStats::new(&helper);
let wal_buffer = Arc::new(WalBufferManager::new(
wal_id_store,
status_manager.clone(),
db_stats.clone(),
0, oracle,
table_store.clone(),
mono_clock,
1000, Some(flush_interval), ));
let task_executor = Arc::new(MessageHandlerExecutor::new(
Arc::new(status_manager),
system_clock.clone(),
));
wal_buffer.init(task_executor.clone()).await.unwrap();
task_executor
.monitor_on(&Handle::current())
.expect("failed to monitor executor");
(wal_buffer, table_store, test_clock, db_stats, recorder)
}
#[tokio::test]
async fn test_basic_append_and_flush_operations() {
let (wal_buffer, table_store, _, _, _) = setup_wal_buffer().await;
let entry1 = make_entry("key1", "value1", 1, None);
let entry2 = make_entry("key2", "value2", 2, None);
wal_buffer.append(std::slice::from_ref(&entry1)).unwrap();
wal_buffer.append(std::slice::from_ref(&entry2)).unwrap();
wal_buffer.flush().await.unwrap();
let sst_iter_options = SstIteratorOptions {
eager_spawn: true,
..SstIteratorOptions::default()
};
let mut iter = SstIterator::new_owned_initialized(
..,
SsTableView::identity(table_store.open_sst(&SsTableId::Wal(1)).await.unwrap()),
table_store.clone(),
sst_iter_options,
)
.await
.unwrap()
.unwrap();
let read_entry1 = iter.next().await.unwrap().unwrap();
assert_eq!(read_entry1.key, entry1.key);
assert_eq!(read_entry1.value, entry1.value);
assert_eq!(read_entry1.seq, entry1.seq);
let read_entry2 = iter.next().await.unwrap().unwrap();
assert_eq!(read_entry2.key, entry2.key);
assert_eq!(read_entry2.value, entry2.value);
assert_eq!(read_entry2.seq, entry2.seq);
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_size_based_flush_triggering() {
let (wal_buffer, _, _, _, _) = setup_wal_buffer_with_flush_interval(Duration::MAX).await;
let mut seq = 1;
while wal_buffer.estimated_bytes().unwrap() < wal_buffer.max_wal_bytes_size {
let entry = make_entry(&format!("key{}", seq), &format!("value{}", seq), seq, None);
wal_buffer.append(&[entry]).unwrap();
seq += 1;
}
let mut reader = wal_buffer.maybe_trigger_flush().unwrap();
reader.await_value().await.unwrap();
assert_eq!(wal_buffer.recent_flushed_wal_id(), 1);
}
#[tokio::test]
async fn test_immutable_wal_reclaim() {
let (wal_buffer, _, _, _, _) = setup_wal_buffer().await;
for i in 0..100 {
let seq = i + 1;
let entry = make_entry(&format!("key{}", i), &format!("value{}", i), seq, None);
wal_buffer.append(&[entry]).unwrap();
wal_buffer.flush().await.unwrap();
}
assert_eq!(wal_buffer.recent_flushed_wal_id(), 100);
assert_eq!(wal_buffer.inner.read().immutable_wals.len(), 100);
wal_buffer.track_last_applied_seq(50);
assert_eq!(wal_buffer.inner.read().immutable_wals.len(), 50);
}
#[tokio::test]
async fn test_immutable_wal_reclaim_with_flush_check() {
let (wal_buffer, _, _, _, _) = setup_wal_buffer().await;
for i in 0..100 {
let seq = i + 1;
let entry = make_entry(&format!("key{}", i), &format!("value{}", i), seq, None);
wal_buffer.append(&[entry]).unwrap();
wal_buffer.flush().await.unwrap();
}
wal_buffer.track_last_applied_seq(50);
assert_eq!(wal_buffer.inner.read().immutable_wals.len(), 50);
assert_eq!(wal_buffer.recent_flushed_wal_id(), 100);
{
let inner = wal_buffer.inner.write();
inner.oracle.set_durable_seq_unsafe(80);
}
wal_buffer.track_last_applied_seq(90);
assert_eq!(wal_buffer.inner.read().immutable_wals.len(), 20);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_maybe_trigger_flush_spams_flush_requests() {
let (wal_buffer, _, _, _db_stats, recorder) =
setup_wal_buffer_with_flush_interval(Duration::MAX).await;
let num_writes: u64 = 100;
for seq in 1..=num_writes {
let entry = make_entry(&format!("key{}", seq), &format!("value{}", seq), seq, None);
wal_buffer.append(&[entry]).unwrap();
wal_buffer.maybe_trigger_flush().unwrap();
}
let size_triggered_requests =
lookup_metric(&recorder, crate::db_stats::WAL_BUFFER_FLUSH_REQUESTS).unwrap();
wal_buffer.flush().await.unwrap();
let actual_flushes = lookup_metric(&recorder, crate::db_stats::WAL_BUFFER_FLUSHES).unwrap();
assert!(
actual_flushes >= 1,
"expected at least one flush but got {}",
actual_flushes,
);
assert!(
size_triggered_requests <= actual_flushes,
"size_triggered_requests ({}) should not exceed actual_flushes ({})",
size_triggered_requests,
actual_flushes,
);
}
}