use std::collections::BTreeMap;
use std::hash::Hasher;
use std::io::{BufWriter, ErrorKind, Write};
use std::mem::size_of;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use chrono::prelude::{DateTime, Utc};
use fst::{Map, MapBuilder, Streamer};
use zerocopy::{AsBytes, FromZeroes};
use crate::error::Result;
use crate::io::buf::{IoBufMut, ZeroCopyBuf};
use crate::io::file::{BufCopy, FileExt};
use crate::io::Inspect;
use crate::segment::{checked_frame_offset, CheckedFrame};
use crate::{LIBSQL_MAGIC, LIBSQL_WAL_VERSION};
use super::compacted::{CompactedSegmentDataFooter, CompactedSegmentDataHeader};
use super::{frame_offset, page_offset, Frame, Segment, SegmentFlags, SegmentHeader};
#[derive(Debug)]
pub struct SealedSegment<F> {
inner: Arc<SealedSegmentInner<F>>,
}
impl<F> Clone for SealedSegment<F> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
pub struct SealedSegmentInner<F> {
pub read_locks: Arc<AtomicU64>,
header: SegmentHeader,
file: Arc<F>,
index: Map<Arc<[u8]>>,
path: PathBuf,
}
impl<F> std::fmt::Debug for SealedSegmentInner<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SealedSegment")
.field("read_locks", &self.read_locks)
.field("header", &self.header)
.field("index", &self.index)
.field("path", &self.path)
.finish()
}
}
impl<F> SealedSegment<F> {
pub fn empty(f: F) -> Self {
Self {
inner: SealedSegmentInner {
read_locks: Default::default(),
header: SegmentHeader::new_zeroed(),
file: Arc::new(f),
index: Map::default().map_data(Into::into).unwrap(),
path: PathBuf::new(),
}
.into(),
}
}
}
impl<F> Deref for SealedSegment<F> {
type Target = SealedSegmentInner<F>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<F> Segment for SealedSegment<F>
where
F: FileExt,
{
async fn compact(&self, out_file: &impl FileExt, id: uuid::Uuid) -> Result<Vec<u8>> {
let mut hasher = crc32fast::Hasher::new();
let header = CompactedSegmentDataHeader {
frame_count: (self.index().len() as u32).into(),
segment_id: id.as_u128().into(),
start_frame_no: self.header().start_frame_no,
end_frame_no: self.header().last_commited_frame_no,
size_after: self.header.size_after,
version: LIBSQL_WAL_VERSION.into(),
magic: LIBSQL_MAGIC.into(),
page_size: self.header().page_size,
timestamp: self.header.sealed_at_timestamp,
};
hasher.update(header.as_bytes());
let (_, ret) = out_file
.write_all_at_async(ZeroCopyBuf::new_init(header), 0)
.await;
ret?;
let mut pages = self.index().stream();
let mut buffer = Box::new(ZeroCopyBuf::<Frame>::new_uninit());
let mut out_index = fst::MapBuilder::memory();
let mut current_offset = 0;
while let Some((page_no_bytes, offset)) = pages.next() {
let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await;
ret?;
b.get_mut().header_mut().set_size_after(0);
hasher.update(&b.get_ref().as_bytes());
let dest_offset =
size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
let (mut b, ret) = out_file.write_all_at_async(b, dest_offset as u64).await;
ret?;
out_index
.insert(page_no_bytes, current_offset as _)
.unwrap();
current_offset += 1;
b.deinit();
buffer = b;
}
let footer = CompactedSegmentDataFooter {
checksum: hasher.finalize().into(),
};
let footer_offset =
size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
let (_, ret) = out_file
.write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _)
.await;
ret?;
Ok(out_index.into_inner().unwrap())
}
#[inline]
fn start_frame_no(&self) -> u64 {
self.header.start_frame_no.get()
}
#[inline]
fn last_committed(&self) -> u64 {
self.header.last_committed()
}
fn index(&self) -> &fst::Map<Arc<[u8]>> {
&self.index
}
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> std::io::Result<bool> {
if self.header().start_frame_no.get() > max_frame_no {
return Ok(false);
}
let index = self.index();
if let Some(offset) = index.get(page_no.to_be_bytes()) {
self.read_page_offset(offset as u32, buf)?;
return Ok(true);
}
Ok(false)
}
fn is_checkpointable(&self) -> bool {
let read_locks = self.read_locks.load(Ordering::Relaxed);
tracing::debug!(read_locks);
read_locks == 0
}
fn size_after(&self) -> u32 {
self.header().size_after()
}
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
where
B: IoBufMut + Send + 'static,
{
assert_eq!(buf.bytes_total(), size_of::<Frame>());
let frame_offset = frame_offset(offset);
let (buf, ret) = self.file.read_exact_at_async(buf, frame_offset as _).await;
(buf, ret.map_err(Into::into))
}
fn destroy<IO: crate::io::Io>(&self, io: &IO) -> impl std::future::Future<Output = ()> {
async move {
if let Err(e) = io.remove_file_async(&self.path).await {
tracing::error!("failed to remove segment file {:?}: {e}", self.path);
}
}
}
fn timestamp(&self) -> DateTime<Utc> {
assert_ne!(
self.header().sealed_at_timestamp.get(),
0,
"segment was not sealed properly"
);
DateTime::from_timestamp_millis(self.header().sealed_at_timestamp.get() as _)
.expect("this should be a guaranteed roundtrip with DateTime::timestamp_millis")
}
}
impl<F: FileExt> SealedSegment<F> {
pub fn open(
file: Arc<F>,
path: PathBuf,
read_locks: Arc<AtomicU64>,
now: DateTime<Utc>,
) -> Result<Option<Self>> {
let mut header: SegmentHeader = SegmentHeader::new_zeroed();
file.read_exact_at(header.as_bytes_mut(), 0)?;
header.check()?;
let index_offset = header.index_offset.get();
let index_len = header.index_size.get();
if header.is_empty() {
std::fs::remove_file(path)?;
return Ok(None);
}
if !header.flags().contains(SegmentFlags::SEALED) {
assert_eq!(header.index_offset.get(), 0, "{header:?}");
return Self::recover(file, path, header, now).map(Some);
}
let mut slice = vec![0; index_len as usize];
file.read_exact_at(&mut slice, index_offset)?;
let index = Map::new(slice.into())?;
Ok(Some(Self {
inner: SealedSegmentInner {
file,
path,
read_locks,
index,
header,
}
.into(),
}))
}
fn recover(
file: Arc<F>,
path: PathBuf,
mut header: SegmentHeader,
now: DateTime<Utc>,
) -> Result<Self> {
assert!(!header.is_empty());
assert_eq!(header.index_size.get(), 0);
assert_eq!(header.index_offset.get(), 0);
assert!(!header.flags().contains(SegmentFlags::SEALED));
let mut current_checksum = header.salt.get();
tracing::trace!("recovering unsealed segment at {path:?}");
let mut index = BTreeMap::new();
let mut frame: Box<CheckedFrame> = CheckedFrame::new_box_zeroed();
let mut current_tx = Vec::new();
let mut last_committed = 0;
let mut size_after = 0;
let mut frame_count = 0;
let mut max_seen_frame_no = 0;
for i in 0.. {
let offset = checked_frame_offset(i as u32);
match file.read_exact_at(frame.as_bytes_mut(), offset) {
Ok(_) => {
let new_checksum = frame.frame.checksum(current_checksum);
if new_checksum != frame.checksum.get() {
tracing::warn!(
"found invalid checksum in segment, dropping {} frames",
header.last_committed() - last_committed
);
break;
}
current_checksum = new_checksum;
frame_count += 1;
#[cfg(debug_assertions)]
{
if !header.flags().contains(SegmentFlags::FRAME_UNORDERED) {
assert!(frame.frame.header().frame_no() > max_seen_frame_no);
}
}
max_seen_frame_no = max_seen_frame_no.max(frame.frame.header.frame_no());
current_tx.push(frame.frame.header().page_no());
if frame.frame.header.is_commit() {
last_committed = max_seen_frame_no;
size_after = frame.frame.header().size_after();
let base_offset = (i + 1) - current_tx.len();
for (frame_offset, page_no) in current_tx.drain(..).enumerate() {
index.insert(page_no, (base_offset + frame_offset) as u32);
}
}
}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
}
}
let index_offset = frame_count as u32;
let index_byte_offset = checked_frame_offset(index_offset);
let cursor = file.cursor(index_byte_offset);
let writer = BufCopy::new(cursor);
let writer = BufWriter::new(writer);
let mut digest = crc32fast::Hasher::new_with_initial(current_checksum);
let mut writer = Inspect::new(writer, |data: &[u8]| {
digest.write(data);
});
let mut builder = MapBuilder::new(&mut writer)?;
for (k, v) in index.into_iter() {
builder.insert(k.to_be_bytes(), v as u64).unwrap();
}
builder.finish().unwrap();
let writer = writer.into_inner();
let index_size = writer.get_ref().get_ref().count();
let index_checksum = digest.finalize();
let (mut cursor, index_bytes) = writer
.into_inner()
.map_err(|e| e.into_parts().0)?
.into_parts();
cursor.write_all(&index_checksum.to_le_bytes())?;
header.index_offset = index_byte_offset.into();
header.index_size = index_size.into();
header.last_commited_frame_no = last_committed.into();
header.size_after = size_after.into();
header.sealed_at_timestamp = (now.timestamp_millis() as u64).into();
let flags = header.flags();
header.set_flags(flags | SegmentFlags::SEALED);
header.recompute_checksum();
file.write_all_at(header.as_bytes(), 0)?;
let index = Map::new(index_bytes.into()).unwrap();
Ok(SealedSegment {
inner: SealedSegmentInner {
read_locks: Default::default(),
header,
file,
index,
path,
}
.into(),
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn read_page_offset(&self, offset: u32, buf: &mut [u8]) -> std::io::Result<()> {
let page_offset = page_offset(offset) as usize;
self.file.read_exact_at(buf, page_offset as _)?;
Ok(())
}
pub fn read_frame_offset(&self, offset: u32, frame: &mut Frame) -> Result<()> {
let offset = frame_offset(offset);
self.file.read_exact_at(frame.as_bytes_mut(), offset as _)?;
Ok(())
}
}
impl<F> SealedSegment<F> {
pub fn header(&self) -> &SegmentHeader {
&self.header
}
pub async fn read_page_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
where
B: IoBufMut + Send + 'static,
F: FileExt,
{
assert_eq!(buf.bytes_total(), 4096);
let page_offset = page_offset(offset) as usize;
let (buf, ret) = self.file.read_exact_at_async(buf, page_offset as _).await;
(buf, ret.map_err(Into::into))
}
}