use std::sync::Arc;
use fusio::Read;
#[cfg(all(test, feature = "tokio"))]
use fusio::{DynFs, fs::FsCas, path::Path as FusioPath};
use tracing::instrument;
use crate::{
manifest::WalSegmentRef,
observability::{log_info, log_warn},
wal::{
WalConfig, WalError, WalResult,
frame::{FRAME_HEADER_SIZE, FrameHeader, WalEvent, decode_frame},
storage::WalStorage,
},
};
pub struct Replayer {
cfg: WalConfig,
storage: WalStorage,
}
impl Replayer {
pub fn new(cfg: WalConfig) -> Self {
let storage = WalStorage::new(Arc::clone(&cfg.segment_backend), cfg.dir.clone());
Self { cfg, storage }
}
#[cfg(all(test, feature = "tokio"))]
pub(crate) async fn scan(&self) -> WalResult<Vec<WalEvent>> {
self.scan_with_floor(None).await
}
#[instrument(
name = "wal::replay",
skip(self, floor),
fields(floor_seq = floor.map(|f| f.seq()))
)]
pub(crate) async fn scan_with_floor(
&self,
floor: Option<&WalSegmentRef>,
) -> WalResult<Vec<WalEvent>> {
use crate::wal::WalRecoveryMode;
match self.cfg.recovery {
WalRecoveryMode::PointInTime | WalRecoveryMode::TolerateCorruptedTail => {
}
WalRecoveryMode::AbsoluteConsistency => {
return Err(WalError::Unimplemented(
"wal recovery mode AbsoluteConsistency is not implemented",
));
}
WalRecoveryMode::SkipCorrupted => {
return Err(WalError::Unimplemented(
"wal recovery mode SkipCorrupted is not implemented",
));
}
}
let state_hint = self
.storage
.load_state_handle(self.cfg.state_store.as_ref())
.await?
.and_then(|handle| handle.state().last_segment_seq);
let segments = self.storage.list_segments_with_hint(state_hint).await?;
if segments.is_empty() {
return Ok(Vec::new());
}
let floor_seq = floor.map(|f| f.seq());
let floor_first_frame = floor.map(|f| f.first_frame());
log_info!(
component = "wal",
event = "replay_started",
segment_count = segments.len(),
floor_seq = ?floor_seq,
floor_first_frame = ?floor_first_frame,
);
let mut events = Vec::new();
for segment in segments {
if let Some(seq) = floor_seq
&& segment.seq < seq
{
continue;
}
let segment_floor_first_frame = match floor_seq {
Some(seq) if segment.seq == seq => floor_first_frame,
_ => None,
};
let path = segment.path;
let path_display = path.to_string();
let mut file = self
.storage
.fs()
.open_options(&path, WalStorage::read_options())
.await
.map_err(|err| {
WalError::Storage(format!(
"failed to open wal segment {}: {}",
path_display, err
))
})?;
let (read_res, data) = file.read_to_end_at(Vec::new(), 0).await;
read_res.map_err(|err| {
WalError::Storage(format!(
"failed to read wal segment {}: {}",
path_display, err
))
})?;
let mut offset: usize = 0;
while offset < data.len() {
let slice = &data[offset..];
let header = match FrameHeader::decode_from(slice) {
Ok((header, _)) => header,
Err(WalError::Corrupt(reason))
if reason == "frame header truncated"
|| reason == "frame payload truncated" =>
{
log_warn!(
component = "wal",
event = "replay_truncated_tail",
segment_path = %path_display,
offset = offset,
reason = reason,
);
return Ok(events);
}
Err(err) => return Err(err),
};
let payload_start = offset + FRAME_HEADER_SIZE;
let payload_end = payload_start + header.len as usize;
if payload_end > data.len() {
return Ok(events);
}
let payload = &data[payload_start..payload_end];
if let Some(floor_frame) = segment_floor_first_frame
&& header.seq < floor_frame
{
offset = payload_end;
continue;
}
match decode_frame(header.frame_type, payload) {
Ok(event) => events.push(event),
Err(err) => return Err(err),
}
offset = payload_end;
}
}
log_info!(
component = "wal",
event = "replay_completed",
events_recovered = events.len(),
);
Ok(events)
}
pub fn config(&self) -> &WalConfig {
&self.cfg
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use fusio::{
Write,
impls::{disk::TokioFs, mem::fs::InMemoryFs},
};
use tempfile::tempdir;
use super::*;
use crate::{
manifest::WalSegmentRef,
mvcc::Timestamp,
schema::SchemaBuilder,
wal::{
WalRecoveryMode,
frame::{INITIAL_FRAME_SEQ, encode_autocommit_frames},
state::FsWalStateStore,
storage::WalStorage,
wal_segment_file_id,
},
};
#[tokio::test(flavor = "current_thread")]
async fn replayer_returns_logged_events() {
let backend = Arc::new(InMemoryFs::new());
let fs_dyn: Arc<dyn DynFs> = backend.clone();
let fs_cas: Arc<dyn FsCas> = backend.clone();
let wal_root = FusioPath::parse("wal-test").expect("wal path");
let storage = WalStorage::new(Arc::clone(&fs_dyn), wal_root.clone());
let schema = default_test_schema();
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["a"])) as _,
Arc::new(Int32Array::from(vec![1])) as _,
],
)
.expect("batch");
let frames =
encode_autocommit_frames(batch.clone(), 7, Timestamp::new(42)).expect("encode");
let mut bytes = Vec::new();
for (seq, frame) in (INITIAL_FRAME_SEQ..).zip(frames.into_iter()) {
bytes.extend_from_slice(&frame.into_bytes(seq));
}
let storage_clone = storage.clone();
let wal_root_for_write = wal_root.clone();
let bytes_for_write = bytes;
storage_clone
.ensure_dir(&wal_root_for_write)
.await
.expect("ensure dir");
let mut segment = storage_clone.open_segment(1).await.expect("open segment");
let (write_res, _buf) = segment.file_mut().write_all(bytes_for_write).await;
write_res.expect("write wal");
segment.file_mut().flush().await.expect("flush");
let cfg = WalConfig {
dir: wal_root,
segment_backend: fs_dyn,
state_store: Some(Arc::new(FsWalStateStore::new(fs_cas))),
..WalConfig::default()
};
let replayer = Replayer::new(cfg);
let events = replayer.scan().await.expect("scan");
assert_eq!(events.len(), 2);
match &events[0] {
WalEvent::DynAppend {
provisional_id,
payload,
} => {
assert_eq!(*provisional_id, 7);
assert_eq!(payload.commit_ts_hint, Some(Timestamp::new(42)));
let commit_array = payload
.commit_ts_column
.as_any()
.downcast_ref::<UInt64Array>()
.expect("u64 column");
assert_eq!(commit_array.len(), 1);
assert_eq!(commit_array.value(0), 42);
assert_eq!(payload.batch.num_rows(), 1);
}
other => panic!("unexpected event: {other:?}"),
}
match events[1] {
WalEvent::TxnCommit {
provisional_id,
commit_ts,
} => {
assert_eq!(provisional_id, 7);
assert_eq!(commit_ts, Timestamp::new(42));
}
ref other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test(flavor = "current_thread")]
async fn replayer_skips_events_below_floor() {
let backend = Arc::new(InMemoryFs::new());
let fs_dyn: Arc<dyn DynFs> = backend.clone();
let wal_root = FusioPath::parse("wal-floor").expect("wal path");
let storage = WalStorage::new(Arc::clone(&fs_dyn), wal_root.clone());
let mut next_seq = INITIAL_FRAME_SEQ;
let batch_a = sample_batch_with_label("a", 1);
write_autocommit_segment(&storage, 0, batch_a, 10, Timestamp::new(100), &mut next_seq)
.await;
let batch_b = sample_batch_with_label("b", 2);
let (floor_first_seq, floor_last_seq) =
write_autocommit_segment(&storage, 1, batch_b, 11, Timestamp::new(200), &mut next_seq)
.await;
let cfg = WalConfig {
dir: wal_root,
segment_backend: fs_dyn,
state_store: None,
..WalConfig::default()
};
let replayer = Replayer::new(cfg);
let floor = WalSegmentRef::new(1, wal_segment_file_id(1), floor_first_seq, floor_last_seq);
let events = replayer.scan_with_floor(Some(&floor)).await.expect("scan");
assert_eq!(events.len(), 2, "only second segment survives the floor");
match events[0] {
WalEvent::DynAppend { provisional_id, .. } => {
assert_eq!(provisional_id, 11);
}
ref other => panic!("unexpected first event: {other:?}"),
}
match events[1] {
WalEvent::TxnCommit { provisional_id, .. } => {
assert_eq!(provisional_id, 11);
}
ref other => panic!("unexpected second event: {other:?}"),
}
}
#[tokio::test(flavor = "current_thread")]
async fn replayer_stops_after_truncated_tail() {
let backend = Arc::new(InMemoryFs::new());
let fs_dyn: Arc<dyn DynFs> = backend.clone();
let fs_cas: Arc<dyn FsCas> = backend.clone();
let wal_root = FusioPath::parse("wal-truncated").expect("wal path");
let storage = WalStorage::new(Arc::clone(&fs_dyn), wal_root.clone());
let storage_clone = storage.clone();
let wal_root_for_dir = wal_root.clone();
storage_clone
.ensure_dir(&wal_root_for_dir)
.await
.expect("ensure dir");
let schema = default_test_schema();
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["a"])) as _,
Arc::new(Int32Array::from(vec![1])) as _,
],
)
.expect("batch");
let frames =
encode_autocommit_frames(batch.clone(), 9, Timestamp::new(42)).expect("encode");
let mut seq = INITIAL_FRAME_SEQ;
let append_bytes = frames[0].clone().into_bytes(seq);
seq += 1;
let mut commit_bytes = frames[1].clone().into_bytes(seq);
commit_bytes.truncate(commit_bytes.len() - 3);
let storage_clone = storage.clone();
let mut segment = storage_clone.open_segment(5).await.expect("open segment");
let (res, _buf) = segment.file_mut().write_all(append_bytes).await;
res.expect("write append");
let (res_commit, _buf) = segment.file_mut().write_all(commit_bytes).await;
res_commit.expect("write truncated commit");
segment.file_mut().flush().await.expect("flush");
let cfg = WalConfig {
dir: wal_root,
segment_backend: fs_dyn,
state_store: Some(Arc::new(FsWalStateStore::new(fs_cas))),
..WalConfig::default()
};
let replayer = Replayer::new(cfg);
let events = replayer.scan().await.expect("scan succeeds");
assert_eq!(events.len(), 1, "commit frame should be ignored");
match &events[0] {
WalEvent::DynAppend {
provisional_id,
payload,
} => {
assert_eq!(*provisional_id, 9);
assert_eq!(payload.commit_ts_hint, Some(Timestamp::new(42)));
let commit_array = payload
.commit_ts_column
.as_any()
.downcast_ref::<UInt64Array>()
.expect("u64 column");
assert_eq!(commit_array.len(), 1);
assert_eq!(commit_array.value(0), 42);
assert_eq!(payload.batch.num_rows(), 1);
}
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn replayer_reads_tokiofs_segments() {
let dir = tempdir().expect("tempdir");
let wal_dir = dir.path().join("wal");
let backend = Arc::new(TokioFs);
let fs_dyn: Arc<dyn DynFs> = backend.clone();
let wal_root = FusioPath::from_filesystem_path(&wal_dir).expect("wal path");
let storage = WalStorage::new(Arc::clone(&fs_dyn), wal_root.clone());
storage
.ensure_dir(storage.root())
.await
.expect("ensure dir");
let schema = default_test_schema();
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["tokio"])) as _,
Arc::new(Int32Array::from(vec![9])) as _,
],
)
.expect("batch");
let frames =
encode_autocommit_frames(batch.clone(), 11, Timestamp::new(7)).expect("encode");
let mut bytes = Vec::new();
for (seq, frame) in (INITIAL_FRAME_SEQ..).zip(frames.into_iter()) {
bytes.extend_from_slice(&frame.into_bytes(seq));
}
let mut segment = storage.open_segment(42).await.expect("segment");
let (write_res, _buf) = segment.file_mut().write_all(bytes).await;
write_res.expect("write");
segment.file_mut().flush().await.expect("flush");
let cfg = WalConfig {
dir: wal_root,
segment_backend: fs_dyn,
state_store: None,
..WalConfig::default()
};
let replayer = Replayer::new(cfg);
let events = replayer.scan().await.expect("scan");
assert_eq!(events.len(), 2);
match events[0] {
WalEvent::DynAppend { ref payload, .. } => {
assert_eq!(payload.batch.num_rows(), 1);
}
ref other => panic!("unexpected event: {other:?}"),
}
match events[1] {
WalEvent::TxnCommit { .. } => {}
ref other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn replayer_skips_events_below_floor_on_disk() {
let dir = tempdir().expect("tempdir");
let wal_dir = dir.path().join("wal-floor-disk");
let backend = Arc::new(TokioFs);
let fs_dyn: Arc<dyn DynFs> = backend.clone();
let wal_root = FusioPath::from_filesystem_path(&wal_dir).expect("wal path");
let storage = WalStorage::new(Arc::clone(&fs_dyn), wal_root.clone());
storage
.ensure_dir(storage.root())
.await
.expect("ensure dir");
let mut next_seq = INITIAL_FRAME_SEQ;
let batch_a = sample_batch_with_label("disk-a", 7);
write_autocommit_segment(
&storage,
10,
batch_a,
21,
Timestamp::new(300),
&mut next_seq,
)
.await;
let batch_b = sample_batch_with_label("disk-b", 8);
let (floor_first_seq, floor_last_seq) = write_autocommit_segment(
&storage,
11,
batch_b,
22,
Timestamp::new(400),
&mut next_seq,
)
.await;
let cfg = WalConfig {
dir: wal_root,
segment_backend: fs_dyn,
state_store: None,
..WalConfig::default()
};
let replayer = Replayer::new(cfg);
let floor =
WalSegmentRef::new(11, wal_segment_file_id(11), floor_first_seq, floor_last_seq);
let events = replayer
.scan_with_floor(Some(&floor))
.await
.expect("scan succeeds");
assert_eq!(events.len(), 2);
match events[0] {
WalEvent::DynAppend { provisional_id, .. } => assert_eq!(provisional_id, 22),
ref other => panic!("unexpected first event: {other:?}"),
}
match events[1] {
WalEvent::TxnCommit { provisional_id, .. } => assert_eq!(provisional_id, 22),
ref other => panic!("unexpected second event: {other:?}"),
}
}
#[tokio::test(flavor = "current_thread")]
async fn replayer_rejects_unimplemented_recovery_mode() {
let backend = Arc::new(InMemoryFs::new());
let fs_dyn: Arc<dyn DynFs> = backend.clone();
let fs_cas: Arc<dyn FsCas> = backend.clone();
let wal_root = FusioPath::parse("wal-unimplemented").expect("wal path");
let cfg = WalConfig {
dir: wal_root,
segment_backend: fs_dyn,
state_store: Some(Arc::new(FsWalStateStore::new(fs_cas))),
recovery: WalRecoveryMode::AbsoluteConsistency,
..WalConfig::default()
};
let replayer = Replayer::new(cfg);
let err = replayer.scan().await.expect_err("mode unimplemented");
assert!(matches!(
err,
WalError::Unimplemented("wal recovery mode AbsoluteConsistency is not implemented")
));
}
fn sample_batch_with_label(label: &str, value: i32) -> RecordBatch {
let schema = default_test_schema();
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec![label])) as _,
Arc::new(Int32Array::from(vec![value])) as _,
],
)
.expect("batch")
}
async fn write_autocommit_segment(
storage: &WalStorage,
segment_seq: u64,
batch: RecordBatch,
provisional_id: u64,
commit_ts: Timestamp,
next_seq: &mut u64,
) -> (u64, u64) {
let frames = encode_autocommit_frames(batch, provisional_id, commit_ts).expect("encode");
let first_seq = *next_seq;
let mut segment = storage
.open_segment(segment_seq)
.await
.expect("open segment");
for frame in frames {
let bytes = frame.into_bytes(*next_seq);
*next_seq = next_seq.saturating_add(1);
let (res, _buf) = segment.file_mut().write_all(bytes).await;
res.expect("write frame");
}
segment.file_mut().flush().await.expect("flush segment");
let last_seq = next_seq.saturating_sub(1);
(first_seq, last_seq)
}
fn default_test_schema() -> SchemaRef {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("v", DataType::Int32, false),
]));
SchemaBuilder::from_schema(Arc::clone(&schema))
.primary_key("id")
.build()
.expect("schema builder should succeed")
.schema
}
#[test]
fn replay_emits_tracing_span() {
use std::sync::Mutex;
use tracing_subscriber::{fmt::MakeWriter, layer::SubscriberExt};
#[derive(Clone, Default)]
struct TestWriter(Arc<Mutex<Vec<u8>>>);
impl std::io::Write for TestWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0
.lock()
.expect("test log buffer lock should not be poisoned")
.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl<'a> MakeWriter<'a> for TestWriter {
type Writer = Self;
fn make_writer(&'a self) -> Self::Writer {
self.clone()
}
}
let buffer = Arc::new(Mutex::new(Vec::new()));
let writer = TestWriter(Arc::clone(&buffer));
let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::layer()
.with_writer(writer)
.with_ansi(false)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL),
);
tracing::subscriber::with_default(subscriber, || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("runtime");
rt.block_on(async {
let backend = Arc::new(InMemoryFs::new());
let fs_dyn: Arc<dyn DynFs> = backend.clone();
let wal_root = FusioPath::parse("wal-span-test").expect("wal path");
let cfg = WalConfig {
dir: wal_root,
segment_backend: fs_dyn,
state_store: None,
..WalConfig::default()
};
let replayer = Replayer::new(cfg);
let _events = replayer.scan_with_floor(None).await.expect("scan");
});
});
let output = String::from_utf8(
buffer
.lock()
.expect("test log buffer lock should not be poisoned")
.clone(),
)
.expect("utf8");
assert!(
output.contains("wal::replay"),
"Expected span 'wal::replay' in output, got: {}",
output
);
}
}