use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Context, Result};
use crate::format::{
decode_next, encode_record, DecodeError, DecodeOutcome, RecordType, HEADER_LEN, MAX_KEY_LEN,
MAX_PAYLOAD_LEN,
};
use crate::lock::DirLock;
use crate::segment::{
active_segment_id, list_segment_ids, next_segment_id, segment_path, segment_size,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RecordRef {
pub segment: u32,
pub offset: u64,
pub len: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Record {
pub record_type: RecordType,
pub txid: u64,
pub key: Vec<u8>,
pub payload: Vec<u8>,
pub segment: u32,
pub offset: u64,
pub len: u32,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RecoveryReport {
pub files_scanned: u32,
pub records_replayed: u64,
pub tail_truncated: u32,
pub tail_bytes_discarded: u64,
pub mid_stream_errors: u32,
pub unsupported_versions: u32,
pub last_txid_seen: u64,
}
#[derive(Debug)]
pub struct RecordLog {
dir: PathBuf,
_lock: DirLock,
active_id: u32,
active_file: File,
active_size: u64,
next_txid: u64,
last_report: Option<RecoveryReport>,
poisoned: Option<&'static str>,
}
impl RecordLog {
pub fn open(dir: &Path) -> Result<Self> {
std::fs::create_dir_all(dir)
.with_context(|| format!("datawal: create_dir_all {}", dir.display()))?;
let lock = DirLock::acquire(dir)?;
let mut ids = list_segment_ids(dir)?;
if ids.is_empty() {
let p = segment_path(dir, 1);
File::create(&p)
.with_context(|| format!("datawal: create initial segment {}", p.display()))?;
safeatomic_rs::fsync_dir(dir)
.with_context(|| format!("datawal: fsync_dir {}", dir.display()))?;
ids.push(1);
}
let active_id = active_segment_id(dir)?.expect("just ensured at least one segment");
let report = scan_all(dir, &ids)?;
let next_txid = report.last_txid_seen.checked_add(1).unwrap_or(1);
let active_size_logical = report.last_segment_logical_size_for(active_id).unwrap_or(0);
let active_size_on_disk = segment_size(dir, active_id)?;
let _ = active_size_on_disk;
let active_file = OpenOptions::new()
.read(true)
.append(true)
.create(false)
.open(segment_path(dir, active_id))
.with_context(|| {
format!(
"datawal: open active segment {}",
segment_path(dir, active_id).display()
)
})?;
Ok(Self {
dir: dir.to_path_buf(),
_lock: lock,
active_id,
active_file,
active_size: active_size_logical,
next_txid,
last_report: Some(report.into_public()),
poisoned: None,
})
}
pub fn is_poisoned(&self) -> bool {
self.poisoned.is_some()
}
fn poison_error(reason: &'static str) -> anyhow::Error {
anyhow!(
"datawal: writer poisoned: {}; drop handle and reopen",
reason
)
}
fn check_poisoned(&self) -> Result<()> {
if let Some(reason) = self.poisoned {
Err(Self::poison_error(reason))
} else {
Ok(())
}
}
#[doc(hidden)]
pub fn __set_poisoned_for_test(&mut self, reason: &'static str) {
self.poisoned = Some(reason);
}
pub fn dir(&self) -> &Path {
&self.dir
}
pub fn active_segment(&self) -> u32 {
self.active_id
}
pub fn recovery_report(&self) -> Result<RecoveryReport> {
Ok(self.last_report.clone().unwrap_or_default())
}
pub fn append(&mut self, payload: &[u8]) -> Result<RecordRef> {
self.append_record(RecordType::Raw, b"", payload)
}
pub fn append_record(
&mut self,
record_type: RecordType,
key: &[u8],
payload: &[u8],
) -> Result<RecordRef> {
self.check_poisoned()?;
let txid = self.next_txid;
let bytes = encode_record(record_type, txid, key, payload)?;
let len = bytes.len() as u32;
let offset = self.active_size;
if let Err(e) = self.active_file.write_all(&bytes) {
self.poisoned = Some("append_record write_all failed");
return Err(anyhow::Error::new(e).context(format!(
"datawal: write_all to segment {}",
segment_path(&self.dir, self.active_id).display()
)));
}
self.active_size = match self.active_size.checked_add(len as u64) {
Some(v) => v,
None => {
self.poisoned = Some("active segment size overflow");
return Err(anyhow!("datawal: active segment size overflow"));
}
};
self.next_txid = match txid.checked_add(1) {
Some(v) => v,
None => {
self.poisoned = Some("txid overflow");
return Err(anyhow!("datawal: txid overflow at {}", txid));
}
};
Ok(RecordRef {
segment: self.active_id,
offset,
len,
})
}
pub fn scan(&mut self) -> Result<Vec<Record>> {
let ids = list_segment_ids(&self.dir)?;
let internal = scan_all(&self.dir, &ids)?;
self.next_txid = internal.last_txid_seen.checked_add(1).unwrap_or(1);
self.last_report = Some(internal.clone().into_public());
Ok(internal.records)
}
pub fn scan_iter(&self) -> Result<RecordIter<'_>> {
let ids = list_segment_ids(&self.dir)?;
Ok(RecordIter::new(&self.dir, ids))
}
pub fn fsync(&mut self) -> Result<()> {
self.check_poisoned()?;
if let Err(e) = self.active_file.sync_all() {
self.poisoned = Some("fsync sync_all failed");
return Err(anyhow::Error::new(e).context(format!(
"datawal: sync_all on segment {}",
segment_path(&self.dir, self.active_id).display()
)));
}
if let Err(e) = safeatomic_rs::fsync_dir(&self.dir) {
self.poisoned = Some("fsync fsync_dir failed");
return Err(e.context(format!("datawal: fsync_dir {}", self.dir.display())));
}
Ok(())
}
pub fn rotate(&mut self) -> Result<()> {
self.check_poisoned()?;
if let Err(e) = self.active_file.sync_all() {
self.poisoned = Some("rotate sync_all on previous segment failed");
return Err(anyhow::Error::new(e).context(format!(
"datawal: sync_all on rotate, segment {}",
segment_path(&self.dir, self.active_id).display()
)));
}
let ids = list_segment_ids(&self.dir)?;
let new_id = next_segment_id(&ids)?;
if new_id <= self.active_id {
self.poisoned = Some("rotate computed non-increasing segment id");
bail!(
"datawal: rotate computed non-increasing segment id (current={}, computed={})",
self.active_id,
new_id
);
}
let new_path = segment_path(&self.dir, new_id);
if let Err(e) = File::create(&new_path) {
self.poisoned = Some("rotate create new segment failed");
return Err(anyhow::Error::new(e)
.context(format!("datawal: create segment {}", new_path.display())));
}
if let Err(e) = safeatomic_rs::fsync_dir(&self.dir) {
self.poisoned = Some("rotate fsync_dir after segment create failed");
return Err(e.context(format!("datawal: fsync_dir {}", self.dir.display())));
}
let new_file = match OpenOptions::new().read(true).append(true).open(&new_path) {
Ok(f) => f,
Err(e) => {
self.poisoned = Some("rotate open new active segment failed");
return Err(anyhow::Error::new(e).context(format!(
"datawal: open new active segment {}",
new_path.display()
)));
}
};
self.active_file = new_file;
self.active_id = new_id;
self.active_size = 0;
Ok(())
}
pub fn close(self) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Clone)]
struct ScanInternal {
records: Vec<Record>,
records_replayed: u64,
files_scanned: u32,
last_txid_seen: u64,
tail_truncated: u32,
tail_bytes_discarded: u64,
last_segment_logical_end: Option<(u32, u64)>,
}
impl ScanInternal {
fn last_segment_logical_size_for(&self, segment: u32) -> Option<u64> {
self.last_segment_logical_end
.filter(|(id, _)| *id == segment)
.map(|(_, end)| end)
}
fn into_public(self) -> RecoveryReport {
RecoveryReport {
files_scanned: self.files_scanned,
records_replayed: self.records_replayed,
tail_truncated: self.tail_truncated,
tail_bytes_discarded: self.tail_bytes_discarded,
mid_stream_errors: 0,
unsupported_versions: 0,
last_txid_seen: self.last_txid_seen,
}
}
}
fn scan_segment(dir: &Path, id: u32, is_last_segment: bool, out: &mut ScanInternal) -> Result<()> {
let path = segment_path(dir, id);
let mut f =
File::open(&path).with_context(|| format!("datawal: open segment {}", path.display()))?;
let mut buf = Vec::new();
f.read_to_end(&mut buf)
.with_context(|| format!("datawal: read_to_end {}", path.display()))?;
let mut offset: u64 = 0;
let file_len = buf.len() as u64;
loop {
if offset == file_len {
out.last_segment_logical_end = Some((id, offset));
break;
}
match decode_next(&buf, offset) {
Ok(DecodeOutcome::Ok {
record_type,
txid,
key,
payload,
bytes_consumed,
}) => {
let len = bytes_consumed;
out.records.push(Record {
record_type,
txid,
key,
payload,
segment: id,
offset,
len,
});
out.records_replayed += 1;
if txid > out.last_txid_seen {
out.last_txid_seen = txid;
}
offset += bytes_consumed as u64;
}
Ok(DecodeOutcome::Truncated { .. }) => {
if is_last_segment {
let discarded = file_len - offset;
out.tail_truncated += 1;
out.tail_bytes_discarded += discarded;
out.last_segment_logical_end = Some((id, offset));
break;
} else {
bail!(
"datawal: truncated record at offset {} of non-tail segment {} ({}); refusing to silently drop data",
offset,
id,
path.display()
);
}
}
Ok(DecodeOutcome::CrcMismatch { bytes_consumed }) => {
if is_last_segment {
let discarded = file_len - offset;
out.tail_truncated += 1;
out.tail_bytes_discarded += discarded;
out.last_segment_logical_end = Some((id, offset));
let _ = bytes_consumed;
break;
} else {
bail!(
"datawal: CRC mismatch at offset {} of non-tail segment {} ({})",
offset,
id,
path.display()
);
}
}
Err(err) => {
let _: DecodeError = err;
bail!(
"datawal: structural decode error at offset {} of segment {} ({}): {}",
offset,
id,
path.display(),
err
);
}
}
}
Ok(())
}
fn scan_all(dir: &Path, ids: &[u32]) -> Result<ScanInternal> {
let mut out = ScanInternal {
records: Vec::new(),
records_replayed: 0,
files_scanned: 0,
last_txid_seen: 0,
tail_truncated: 0,
tail_bytes_discarded: 0,
last_segment_logical_end: None,
};
if ids.is_empty() {
return Ok(out);
}
let last_idx = ids.len() - 1;
for (i, id) in ids.iter().enumerate() {
out.files_scanned += 1;
let is_last = i == last_idx;
scan_segment(dir, *id, is_last, &mut out)?;
}
Ok(out)
}
pub struct RecordIter<'log> {
dir: PathBuf,
ids: Vec<u32>,
cur_idx: usize,
cur_buf: Vec<u8>,
cur_offset: u64,
cur_id: u32,
cur_loaded: bool,
report: ScanInternal,
done: bool,
_log: std::marker::PhantomData<&'log RecordLog>,
}
impl<'log> RecordIter<'log> {
pub(crate) fn new(dir: &Path, ids: Vec<u32>) -> Self {
Self {
dir: dir.to_path_buf(),
ids,
cur_idx: 0,
cur_buf: Vec::new(),
cur_offset: 0,
cur_id: 0,
cur_loaded: false,
report: ScanInternal {
records: Vec::new(),
records_replayed: 0,
files_scanned: 0,
last_txid_seen: 0,
tail_truncated: 0,
tail_bytes_discarded: 0,
last_segment_logical_end: None,
},
done: false,
_log: std::marker::PhantomData,
}
}
pub fn recovery_report(&self) -> RecoveryReport {
self.report.clone().into_public()
}
fn ensure_loaded(&mut self) -> Result<()> {
if self.cur_loaded {
return Ok(());
}
let id = self.ids[self.cur_idx];
let path = segment_path(&self.dir, id);
let mut f = File::open(&path)
.with_context(|| format!("datawal: open segment {}", path.display()))?;
self.cur_buf.clear();
f.read_to_end(&mut self.cur_buf)
.with_context(|| format!("datawal: read_to_end {}", path.display()))?;
self.cur_id = id;
self.cur_offset = 0;
self.cur_loaded = true;
self.report.files_scanned += 1;
Ok(())
}
fn try_next_in_segment(&mut self) -> Result<Option<Record>> {
let id = self.cur_id;
let is_last = self.cur_idx + 1 == self.ids.len();
let file_len = self.cur_buf.len() as u64;
if self.cur_offset == file_len {
self.report.last_segment_logical_end = Some((id, self.cur_offset));
return Ok(None);
}
match decode_next(&self.cur_buf, self.cur_offset) {
Ok(DecodeOutcome::Ok {
record_type,
txid,
key,
payload,
bytes_consumed,
}) => {
let len = bytes_consumed;
let offset = self.cur_offset;
self.cur_offset += bytes_consumed as u64;
if txid > self.report.last_txid_seen {
self.report.last_txid_seen = txid;
}
Ok(Some(Record {
record_type,
txid,
key,
payload,
segment: id,
offset,
len,
}))
}
Ok(DecodeOutcome::Truncated { .. }) => {
if is_last {
let discarded = file_len - self.cur_offset;
self.report.tail_truncated += 1;
self.report.tail_bytes_discarded += discarded;
self.report.last_segment_logical_end = Some((id, self.cur_offset));
Ok(None)
} else {
let path = segment_path(&self.dir, id);
Err(anyhow!(
"datawal: truncated record at offset {} of non-tail segment {} ({}); refusing to silently drop data",
self.cur_offset,
id,
path.display()
))
}
}
Ok(DecodeOutcome::CrcMismatch { bytes_consumed }) => {
if is_last {
let discarded = file_len - self.cur_offset;
self.report.tail_truncated += 1;
self.report.tail_bytes_discarded += discarded;
self.report.last_segment_logical_end = Some((id, self.cur_offset));
let _ = bytes_consumed;
Ok(None)
} else {
let path = segment_path(&self.dir, id);
Err(anyhow!(
"datawal: CRC mismatch at offset {} of non-tail segment {} ({})",
self.cur_offset,
id,
path.display()
))
}
}
Err(err) => {
let _: DecodeError = err;
let path = segment_path(&self.dir, id);
Err(anyhow!(
"datawal: structural decode error at offset {} of segment {} ({}): {}",
self.cur_offset,
id,
path.display(),
err
))
}
}
}
}
impl Iterator for RecordIter<'_> {
type Item = Result<Record>;
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
loop {
if self.cur_idx >= self.ids.len() {
self.done = true;
return None;
}
if let Err(e) = self.ensure_loaded() {
self.done = true;
return Some(Err(e));
}
match self.try_next_in_segment() {
Ok(Some(rec)) => {
self.report.records_replayed += 1;
return Some(Ok(rec));
}
Ok(None) => {
self.cur_idx += 1;
self.cur_loaded = false;
self.cur_buf.clear();
continue;
}
Err(e) => {
self.done = true;
return Some(Err(e));
}
}
}
}
}
#[allow(dead_code)]
const _ASSERT_HEADER: () = {
let _ = HEADER_LEN;
let _ = MAX_KEY_LEN;
let _ = MAX_PAYLOAD_LEN;
};