#![allow(dead_code)]
use std::{
fmt,
future::Future,
marker::PhantomData,
sync::{
Arc, Mutex,
atomic::{AtomicU64, AtomicUsize, Ordering},
},
time::Duration,
};
use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use fusio::{
DynFs,
disk::LocalFs,
executor::{Executor, Instant, JoinHandle, Timer},
path::Path,
};
use futures::{
SinkExt,
channel::{mpsc, oneshot},
};
use ulid::Ulid;
use crate::{
id::FileId,
inmem::immutable::memtable::{MVCC_COMMIT_COL, MVCC_TOMBSTONE_COL},
mvcc::Timestamp,
observability::log_info,
wal::metrics::WalMetrics,
};
pub mod frame;
pub mod manifest_ext;
pub mod metrics;
pub mod replay;
pub mod state;
pub mod storage;
pub mod writer;
pub use state::WalSegmentBounds;
#[derive(Debug, Clone)]
pub enum WalSyncPolicy {
Always,
IntervalBytes(usize),
IntervalTime(Duration),
Disabled,
}
impl Default for WalSyncPolicy {
fn default() -> Self {
WalSyncPolicy::IntervalTime(Duration::from_millis(50))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum WalRecoveryMode {
#[default]
PointInTime,
TolerateCorruptedTail,
AbsoluteConsistency,
SkipCorrupted,
}
#[derive(Clone)]
pub struct WalConfig {
pub dir: Path,
pub segment_max_bytes: usize,
pub segment_max_age: Option<Duration>,
pub flush_interval: Duration,
pub sync: WalSyncPolicy,
pub recovery: WalRecoveryMode,
pub retention_bytes: Option<usize>,
pub queue_size: usize,
pub segment_backend: Arc<dyn DynFs>,
pub state_store: Option<Arc<dyn state::WalStateStore>>,
pub prune_dry_run: bool,
}
impl Default for WalConfig {
fn default() -> Self {
let dir = Path::parse("wal").expect("static wal path");
let filesystem = Arc::new(LocalFs {});
Self {
dir,
segment_max_bytes: 64 * 1024 * 1024,
segment_max_age: None,
flush_interval: Duration::from_millis(10),
sync: WalSyncPolicy::default(),
recovery: WalRecoveryMode::default(),
retention_bytes: None,
queue_size: 65_536,
segment_backend: filesystem,
state_store: None,
prune_dry_run: false,
}
}
}
impl fmt::Debug for WalConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let fs_tag = self.segment_backend.file_system();
f.debug_struct("WalConfig")
.field("dir", &self.dir)
.field("segment_max_bytes", &self.segment_max_bytes)
.field("segment_max_age", &self.segment_max_age)
.field("flush_interval", &self.flush_interval)
.field("sync", &self.sync)
.field("recovery", &self.recovery)
.field("retention_bytes", &self.retention_bytes)
.field("queue_size", &self.queue_size)
.field("segment_backend", &fs_tag)
.field("state_store_present", &self.state_store.is_some())
.field("prune_dry_run", &self.prune_dry_run)
.finish()
}
}
impl WalConfig {
#[must_use]
pub fn with_state_store(mut self, store: Arc<dyn state::WalStateStore>) -> Self {
self.state_store = Some(store);
self
}
#[must_use]
pub fn without_state_store(mut self) -> Self {
self.state_store = None;
self
}
#[must_use]
pub fn with_prune_dry_run(mut self, dry_run: bool) -> Self {
self.prune_dry_run = dry_run;
self
}
}
pub type WalResult<T> = Result<T, WalError>;
#[derive(Debug, Clone, thiserror::Error)]
pub enum WalError {
#[error("wal storage error: {0}")]
Storage(String),
#[error("wal frame codec error: {0}")]
Codec(String),
#[error("tombstone bitmap length mismatch: expected {expected}, got {actual}")]
TombstoneLengthMismatch {
expected: usize,
actual: usize,
},
#[error("wal frame is corrupt: {0}")]
Corrupt(&'static str),
#[error("wal not enabled")]
Disabled,
#[error("wal state error: {0}")]
State(String),
#[error("wal feature is not implemented: {0}")]
Unimplemented(&'static str),
}
#[derive(Debug, Clone)]
pub struct WalAck {
pub first_seq: u64,
pub last_seq: u64,
pub bytes_flushed: usize,
pub elapsed: Duration,
}
#[derive(Debug, Clone)]
pub struct WalSnapshot {
pub sealed_segments: Vec<WalSegmentBounds>,
pub active_segment: Option<WalSegmentBounds>,
}
impl WalSnapshot {
pub fn iter(&self) -> impl Iterator<Item = &WalSegmentBounds> {
self.sealed_segments
.iter()
.chain(self.active_segment.iter())
}
}
#[derive(Debug, Clone)]
pub enum DynBatchPayload {
Row {
batch: RecordBatch,
commit_ts_column: ArrayRef,
},
Delete {
batch: RecordBatch,
},
}
impl DynBatchPayload {
pub fn num_rows(&self) -> usize {
match self {
DynBatchPayload::Row { batch, .. } | DynBatchPayload::Delete { batch } => {
batch.num_rows()
}
}
}
}
#[derive(Debug, Clone)]
#[allow(clippy::enum_variant_names)]
pub(super) enum WalCommand {
TxnBegin {
provisional_id: u64,
},
TxnAppend {
provisional_id: u64,
payload: DynBatchPayload,
},
TxnCommit {
provisional_id: u64,
commit_ts: Timestamp,
},
TxnAbort {
provisional_id: u64,
},
}
pub(crate) fn append_commit_column(
batch: &RecordBatch,
commit_ts: &ArrayRef,
) -> WalResult<RecordBatch> {
let rows = batch.num_rows();
let schema = batch.schema();
if commit_ts.len() != rows {
return Err(WalError::Codec(
"commit_ts column length mismatch record batch".to_string(),
));
}
let commit_array = commit_ts
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| WalError::Codec("_commit_ts column not u64".to_string()))?;
if commit_array.null_count() > 0 {
return Err(WalError::Codec("null commit_ts value".to_string()));
}
if commit_array.is_empty()
|| commit_array
.iter()
.all(|value| value.map(|v| v == commit_array.value(0)).unwrap_or(false))
{
} else {
return Err(WalError::Codec(
"commit_ts column contained varying values".to_string(),
));
}
if schema.fields().iter().any(|f| f.name() == MVCC_COMMIT_COL) {
return Err(WalError::Codec(
"record batch already contains _commit_ts column".to_string(),
));
}
if schema
.fields()
.iter()
.any(|f| f.name() == MVCC_TOMBSTONE_COL)
{
return Err(WalError::Codec(
"record batch already contains _tombstone column".to_string(),
));
}
let mut fields = schema.fields().to_vec();
fields.push(Field::new(MVCC_COMMIT_COL, DataType::UInt64, false).into());
let mut columns = batch.columns().to_vec();
columns.push(Arc::clone(commit_ts));
let metadata = schema.metadata().clone();
let updated_schema = Arc::new(Schema::new_with_metadata(fields, metadata));
RecordBatch::try_new(updated_schema, columns).map_err(|err| WalError::Codec(err.to_string()))
}
pub(crate) fn split_commit_column(
batch: RecordBatch,
) -> WalResult<(RecordBatch, Option<Timestamp>, ArrayRef)> {
let schema = batch.schema();
let commit_idx = schema
.fields()
.iter()
.position(|f| f.name() == MVCC_COMMIT_COL)
.ok_or_else(|| WalError::Codec("_commit_ts column missing".to_string()))?;
let commit_array = batch.column(commit_idx);
let commit_column = commit_array
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| WalError::Codec("_commit_ts column not u64".to_string()))?;
if commit_column.null_count() > 0 {
return Err(WalError::Codec("null commit_ts value".to_string()));
}
let commit_ts = if commit_column.is_empty() {
None
} else {
let first = commit_column.value(0);
if commit_column
.iter()
.any(|value| value.map(|v| v != first).unwrap_or(true))
{
return Err(WalError::Codec(
"commit_ts column contained varying values".to_string(),
));
}
Some(Timestamp::from(first))
};
let stripped_fields = schema
.fields()
.iter()
.enumerate()
.filter(|(idx, _)| *idx != commit_idx)
.map(|(_, field)| field.clone())
.collect::<Vec<_>>();
let stripped_columns = batch
.columns()
.iter()
.enumerate()
.filter(|(idx, _)| *idx != commit_idx)
.map(|(_, column)| column.clone())
.collect::<Vec<_>>();
let metadata = schema.metadata().clone();
let stripped_schema = Arc::new(Schema::new_with_metadata(stripped_fields, metadata));
let stripped = RecordBatch::try_new(stripped_schema, stripped_columns)
.map_err(|err| WalError::Codec(err.to_string()))?;
Ok((stripped, commit_ts, Arc::clone(commit_array)))
}
struct WalHandleInner<E>
where
E: Executor + Timer + Clone,
{
sender: mpsc::Sender<writer::WriterMsg>,
queue_depth: Arc<AtomicUsize>, next_payload_seq: AtomicU64, join: Mutex<Option<E::JoinHandle<WalResult<()>>>>,
metrics: Arc<E::RwLock<WalMetrics>>,
}
impl<E> WalHandleInner<E>
where
E: Executor + Timer + Clone,
{
fn new(
sender: mpsc::Sender<writer::WriterMsg>,
queue_depth: Arc<AtomicUsize>,
start_seq: u64,
join: E::JoinHandle<WalResult<()>>,
metrics: Arc<E::RwLock<WalMetrics>>,
) -> Self {
Self {
sender,
queue_depth,
next_payload_seq: AtomicU64::new(start_seq),
join: Mutex::new(Some(join)),
metrics,
}
}
fn clone_sender(&self) -> mpsc::Sender<writer::WriterMsg> {
self.sender.clone()
}
fn close_channel(&self) {
let mut sender = self.sender.clone();
sender.close_channel();
}
fn take_join(&self) -> Option<E::JoinHandle<WalResult<()>>> {
self.join.lock().expect("wal join mutex poisoned").take()
}
fn next_payload_seq(&self) -> u64 {
self.next_payload_seq.fetch_add(1, Ordering::SeqCst)
}
fn metrics(&self) -> Arc<E::RwLock<WalMetrics>> {
Arc::clone(&self.metrics)
}
}
pub struct WalHandle<E>
where
E: Executor + Timer + Clone,
{
inner: Arc<WalHandleInner<E>>,
}
impl<E> Clone for WalHandle<E>
where
E: Executor + Timer + Clone,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<E> fmt::Debug for WalHandle<E>
where
E: Executor + Timer + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalHandle").finish()
}
}
impl<E> WalHandle<E>
where
E: Executor + Timer + Clone,
{
fn from_parts(
sender: mpsc::Sender<writer::WriterMsg>,
queue_depth: Arc<AtomicUsize>,
join: E::JoinHandle<WalResult<()>>,
start_seq: u64,
metrics: Arc<E::RwLock<WalMetrics>>,
) -> Self {
Self {
inner: Arc::new(WalHandleInner::new(
sender,
queue_depth,
start_seq,
join,
metrics,
)),
}
}
pub fn next_provisional_id(&self) -> u64 {
self.inner.next_payload_seq()
}
async fn enqueue_command(
&self,
command: WalCommand,
submission_seq: u64,
) -> WalResult<WalTicket<E>> {
let (ack_tx, ack_rx) = oneshot::channel();
let enqueued_at = Instant::now();
self.inner.queue_depth.fetch_add(1, Ordering::SeqCst);
let msg = writer::WriterMsg::Enqueue {
_submission_seq: submission_seq,
command,
enqueued_at,
ack_tx,
};
let mut sender = self.inner.clone_sender();
if let Err(_err) = sender.send(msg).await {
self.inner.queue_depth.fetch_sub(1, Ordering::SeqCst);
return Err(WalError::Storage("wal writer task closed".into()));
}
Ok(WalTicket {
seq: submission_seq,
receiver: ack_rx,
_exec: PhantomData,
})
}
pub async fn rotate(&self) -> WalResult<()> {
let (ack_tx, ack_rx) = oneshot::channel();
let mut sender = self.inner.clone_sender();
let msg = writer::WriterMsg::Rotate { ack_tx };
if let Err(_err) = sender.send(msg).await {
return Err(WalError::Storage("wal writer task closed".into()));
}
ack_rx
.await
.map_err(|_| WalError::Storage("wal writer task closed".into()))?
}
pub async fn snapshot(&self) -> WalResult<WalSnapshot> {
let (ack_tx, ack_rx) = oneshot::channel();
let mut sender = self.inner.clone_sender();
let msg = writer::WriterMsg::Snapshot { ack_tx };
if let Err(_err) = sender.send(msg).await {
return Err(WalError::Storage("wal writer task closed".into()));
}
ack_rx
.await
.map_err(|_| WalError::Storage("wal writer task closed".into()))?
}
pub async fn shutdown(self) -> WalResult<()> {
self.inner.close_channel();
if let Some(join) = self.inner.take_join() {
match join.join().await {
Ok(join_result) => join_result.map_err(|err| WalError::Storage(err.to_string()))?,
Err(err) => {
#[cfg(target_arch = "wasm32")]
{
let _ = err;
}
#[cfg(not(target_arch = "wasm32"))]
{
return Err(WalError::Storage(err.to_string()));
}
}
}
}
Ok(())
}
pub fn metrics(&self) -> Arc<E::RwLock<WalMetrics>> {
self.inner.metrics()
}
#[cfg(all(test, feature = "tokio"))]
pub(crate) fn test_from_parts(
sender: mpsc::Sender<writer::WriterMsg>,
queue_depth: Arc<AtomicUsize>,
join: E::JoinHandle<WalResult<()>>,
start_seq: u64,
metrics: Arc<E::RwLock<WalMetrics>>,
) -> Self {
Self::from_parts(sender, queue_depth, join, start_seq, metrics)
}
pub async fn txn_begin(&self, provisional_id: u64) -> WalResult<WalTicket<E>> {
self.enqueue_command(WalCommand::TxnBegin { provisional_id }, provisional_id)
.await
}
pub(crate) async fn txn_append(
&self,
provisional_id: u64,
batch: &RecordBatch,
commit_ts: Timestamp,
) -> WalResult<WalTicket<E>> {
let commit_values =
Arc::new(UInt64Array::from(vec![commit_ts.get(); batch.num_rows()])) as ArrayRef;
let payload = DynBatchPayload::Row {
batch: batch.clone(),
commit_ts_column: Arc::clone(&commit_values),
};
self.txn_append_payload(provisional_id, payload).await
}
pub async fn txn_append_delete(
&self,
provisional_id: u64,
batch: RecordBatch,
) -> WalResult<WalTicket<E>> {
let payload = DynBatchPayload::Delete { batch };
self.txn_append_payload(provisional_id, payload).await
}
pub(crate) async fn txn_commit(
&self,
provisional_id: u64,
commit_ts: Timestamp,
) -> WalResult<WalTicket<E>> {
self.enqueue_command(
WalCommand::TxnCommit {
provisional_id,
commit_ts,
},
provisional_id,
)
.await
}
pub async fn txn_abort(&self, provisional_id: u64) -> WalResult<WalTicket<E>> {
self.enqueue_command(WalCommand::TxnAbort { provisional_id }, provisional_id)
.await
}
async fn txn_append_payload(
&self,
provisional_id: u64,
payload: DynBatchPayload,
) -> WalResult<WalTicket<E>> {
let command = WalCommand::TxnAppend {
provisional_id,
payload,
};
self.enqueue_command(command, provisional_id).await
}
}
pub(super) fn wal_segment_file_id(seq: u64) -> FileId {
let mut bytes = [0u8; 16];
bytes[8..16].copy_from_slice(&seq.to_be_bytes());
Ulid::from_bytes(bytes)
}
#[must_use = "propagate WAL spans into GC pinning before dropping"]
pub struct WalTicket<E> {
pub seq: u64,
receiver: oneshot::Receiver<WalResult<WalAck>>,
_exec: PhantomData<E>,
}
impl<E> WalTicket<E>
where
E: Executor + Timer + Clone,
{
pub async fn durable(self) -> WalResult<WalAck> {
match self.receiver.await {
Ok(result) => result,
Err(_) => Err(WalError::Storage("wal writer dropped ack".into())),
}
}
}
impl<E> fmt::Debug for WalTicket<E>
where
E: Executor + Timer + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WalTicket").field("seq", &self.seq).finish()
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use std::sync::Arc;
use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use fusio::executor::tokio::TokioExecutor;
use futures::StreamExt;
use super::*;
fn sample_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let data = Arc::new(Int32Array::from(vec![1, 2])) as _;
RecordBatch::try_new(schema, vec![data]).expect("batch")
}
fn commit_array(ts: u64, len: usize) -> ArrayRef {
Arc::new(UInt64Array::from(vec![ts; len])) as ArrayRef
}
#[test]
fn append_mvcc_rejects_length_mismatch() {
let batch = sample_batch();
let commit = commit_array(1, batch.num_rows() - 1);
let err = append_commit_column(&batch, &commit).expect_err("length mismatch");
assert!(
matches!(err, WalError::Codec(msg) if msg.contains("commit_ts column length mismatch"))
);
}
#[test]
fn append_mvcc_rejects_existing_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(MVCC_COMMIT_COL, DataType::UInt64, false),
]));
let data = Arc::new(Int32Array::from(vec![1])) as _;
let commit = Arc::new(UInt64Array::from(vec![1_u64])) as _;
let batch = RecordBatch::try_new(schema, vec![data, commit]).expect("batch");
let commit = commit_array(2, batch.num_rows());
let err = append_commit_column(&batch, &commit).expect_err("existing column");
assert!(matches!(err, WalError::Codec(msg) if msg.contains("already contains _commit_ts")));
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(MVCC_TOMBSTONE_COL, DataType::Boolean, false),
]));
let data = Arc::new(Int32Array::from(vec![1])) as _;
let tombstones = Arc::new(BooleanArray::from(vec![false])) as _;
let batch = RecordBatch::try_new(schema, vec![data, tombstones]).expect("batch");
let commit = commit_array(3, batch.num_rows());
let err = append_commit_column(&batch, &commit).expect_err("existing column");
assert!(matches!(err, WalError::Codec(msg) if msg.contains("already contains _tombstone")));
}
#[test]
fn split_commit_requires_column() {
let batch = sample_batch();
let err = split_commit_column(batch).expect_err("missing column");
assert!(matches!(err, WalError::Codec(msg) if msg.contains("_commit_ts column missing")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn append_returns_both_acks_in_order() {
use crate::wal::{frame, writer};
let executor = Arc::new(TokioExecutor::default());
let (sender, mut receiver) = mpsc::channel(4);
let queue_depth = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let join: tokio::task::JoinHandle<WalResult<()>> = executor.spawn(async move {
let mut next_seq = frame::INITIAL_FRAME_SEQ;
while let Some(msg) = receiver.next().await {
match msg {
writer::WriterMsg::Enqueue {
command, ack_tx, ..
} => {
let (first_seq, last_seq, advance) = match command {
WalCommand::TxnAppend { .. } => {
(next_seq, next_seq.saturating_add(1), 2)
}
WalCommand::TxnCommit { .. } => (next_seq, next_seq, 1),
_ => (next_seq, next_seq, 1),
};
next_seq = next_seq.saturating_add(advance);
let ack = WalAck {
first_seq,
last_seq,
bytes_flushed: 0,
elapsed: Duration::from_millis(0),
};
let _ = ack_tx.send(Ok(ack));
}
writer::WriterMsg::Rotate { ack_tx } => {
let _ = ack_tx.send(Ok(()));
}
writer::WriterMsg::Snapshot { ack_tx } => {
let snapshot = WalSnapshot {
sealed_segments: Vec::new(),
active_segment: None,
};
let _ = ack_tx.send(Ok(snapshot));
}
}
}
#[cfg(all(test, target_arch = "wasm32", feature = "web"))]
mod wasm_tests {
use std::sync::{Arc, atomic::AtomicUsize};
use fusio::executor::web::WebExecutor;
use wasm_bindgen_test::*;
use super::*;
wasm_bindgen_test_configure!(run_in_browser);
#[wasm_bindgen_test]
async fn shutdown_treats_unjoinable_handles_as_ok() {
let exec = WebExecutor::new();
let (tx, _rx) = futures::channel::mpsc::channel(1);
let queue_depth = Arc::new(AtomicUsize::new(0));
let join = exec.spawn(async { Ok(()) });
let metrics = exec.rw_lock(WalMetrics::default());
let inner = WalHandleInner::new(
tx,
queue_depth,
frame::INITIAL_FRAME_SEQ,
join,
metrics,
);
let handle = WalHandle {
inner: Arc::new(inner),
};
handle
.shutdown()
.await
.expect("shutdown should not fail on unjoinable wasm handles");
}
}
Ok(())
});
let metrics = Arc::new(TokioExecutor::rw_lock(WalMetrics::default()));
let handle: WalHandle<TokioExecutor> = WalHandle::test_from_parts(
sender,
queue_depth,
join,
frame::INITIAL_FRAME_SEQ,
metrics,
);
let batch = sample_batch();
let provisional_id = handle.next_provisional_id();
let commit_ts = Timestamp::new(1);
let append_ack = handle
.txn_append(provisional_id, &batch, commit_ts)
.await
.expect("append ticket")
.durable()
.await
.expect("append ack");
let commit_ack = handle
.txn_commit(provisional_id, commit_ts)
.await
.expect("commit ticket")
.durable()
.await
.expect("commit ack");
assert!(append_ack.first_seq <= append_ack.last_seq);
assert!(commit_ack.first_seq <= commit_ack.last_seq);
assert!(append_ack.last_seq < commit_ack.first_seq);
assert_eq!(append_ack.first_seq, frame::INITIAL_FRAME_SEQ);
assert_eq!(append_ack.last_seq, frame::INITIAL_FRAME_SEQ + 1);
assert_eq!(commit_ack.first_seq, frame::INITIAL_FRAME_SEQ + 2);
assert_eq!(commit_ack.last_seq, frame::INITIAL_FRAME_SEQ + 2);
}
}
pub trait WalExt<FS, E>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
fn enable_wal(&mut self, _cfg: WalConfig)
-> impl Future<Output = WalResult<WalHandle<E>>> + '_;
fn disable_wal(&mut self) -> impl Future<Output = WalResult<()>> + '_;
fn wal(&self) -> Option<&WalHandle<E>>;
}
impl<FS, E> WalExt<FS, E> for crate::db::DbInner<FS, E>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
async fn enable_wal(&mut self, cfg: WalConfig) -> WalResult<WalHandle<E>> {
let storage = storage::WalStorage::new(Arc::clone(&cfg.segment_backend), cfg.dir.clone());
let wal_state_handle = storage.load_state_handle(cfg.state_store.as_ref()).await?;
let wal_state_hint = wal_state_handle
.as_ref()
.and_then(|handle| handle.state().last_segment_seq);
let tail_metadata = storage.tail_metadata_with_hint(wal_state_hint).await?;
let next_payload_seq = tail_metadata
.as_ref()
.and_then(|meta| meta.last_provisional_id)
.map(|last| last.saturating_add(1))
.unwrap_or(frame::INITIAL_FRAME_SEQ);
let initial_frame_seq = tail_metadata
.as_ref()
.and_then(|meta| meta.last_frame_seq)
.map(|seq| seq.saturating_add(1))
.unwrap_or(frame::INITIAL_FRAME_SEQ);
let metrics = Arc::new(E::rw_lock(WalMetrics::default()));
let writer = writer::spawn_writer(
Arc::clone(self.executor()),
storage,
cfg.clone(),
Arc::clone(&metrics),
0,
initial_frame_seq,
);
let (sender, queue_depth, join) = writer.into_parts();
let handle = WalHandle::from_parts(
sender,
queue_depth,
join,
next_payload_seq,
Arc::clone(&metrics),
);
log_info!(
component = "wal",
event = "wal_enabled",
wal_dir = %cfg.dir,
queue_size = cfg.queue_size,
segment_max_bytes = cfg.segment_max_bytes,
sync_policy = ?cfg.sync,
recovery_mode = ?cfg.recovery,
retention_bytes = ?cfg.retention_bytes,
);
self.set_wal_config(Some(cfg.clone()));
self.set_wal_handle(Some(handle.clone()));
Ok(handle)
}
fn disable_wal(&mut self) -> impl Future<Output = WalResult<()>> + '_ {
self.disable_wal_async()
}
fn wal(&self) -> Option<&WalHandle<E>> {
self.wal_handle()
}
}