#![allow(dead_code)]
use std::future::Future;
use std::hash::Hasher as _;
use std::io;
use std::mem::offset_of;
use std::mem::size_of;
use std::num::NonZeroU64;
use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use zerocopy::byteorder::little_endian::{U128, U16, U32, U64};
use zerocopy::AsBytes;
use crate::error::{Error, Result};
use crate::io::buf::IoBufMut;
use crate::io::FileExt;
use crate::io::Io;
use crate::LIBSQL_MAGIC;
use crate::LIBSQL_PAGE_SIZE;
pub(crate) mod compacted;
pub mod current;
pub mod list;
pub mod sealed;
bitflags::bitflags! {
pub struct SegmentFlags: u32 {
const FRAME_UNORDERED = 1 << 0;
const SEALED = 1 << 1;
}
}
#[repr(C)]
#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes, Clone, Copy)]
pub struct SegmentHeader {
pub magic: U64,
pub version: U16,
pub start_frame_no: U64,
pub last_commited_frame_no: U64,
pub frame_count: U64,
pub size_after: U32,
pub index_offset: U64,
pub index_size: U64,
pub flags: U32,
pub salt: U32,
pub page_size: U16,
pub log_id: U128,
pub sealed_at_timestamp: U64,
pub header_cheksum: U32,
}
impl SegmentHeader {
fn checksum(&self) -> u32 {
let field_bytes: &[u8] = &self.as_bytes()[..offset_of!(SegmentHeader, header_cheksum)];
let checksum = crc32fast::hash(field_bytes);
checksum
}
fn check(&self) -> Result<()> {
if self.page_size.get() != LIBSQL_PAGE_SIZE {
return Err(Error::InvalidPageSize);
}
if self.magic.get() != LIBSQL_MAGIC {
return Err(Error::InvalidHeaderChecksum);
}
if self.version.get() != 1 {
return Err(Error::InvalidHeaderVersion);
}
let computed = self.checksum();
if computed == self.header_cheksum.get() {
return Ok(());
} else {
return Err(Error::InvalidHeaderChecksum);
}
}
pub fn flags(&self) -> SegmentFlags {
SegmentFlags::from_bits(self.flags.get()).unwrap()
}
fn set_flags(&mut self, flags: SegmentFlags) {
self.flags = flags.bits().into();
}
fn recompute_checksum(&mut self) {
let checksum = self.checksum();
self.header_cheksum = checksum.into();
}
pub fn last_commited_frame_no(&self) -> u64 {
self.last_commited_frame_no.get()
}
pub fn size_after(&self) -> u32 {
self.size_after.get()
}
fn is_empty(&self) -> bool {
self.frame_count() == 0
}
pub fn frame_count(&self) -> usize {
self.frame_count.get() as usize
}
pub fn last_committed(&self) -> u64 {
if self.is_empty() {
self.start_frame_no.get() - 1
} else {
self.last_commited_frame_no.get()
}
}
pub(crate) fn next_frame_no(&self) -> NonZeroU64 {
if self.is_empty() {
assert!(self.start_frame_no.get() > 0);
NonZeroU64::new(self.start_frame_no.get()).unwrap()
} else {
NonZeroU64::new(self.last_commited_frame_no.get() + 1).unwrap()
}
}
}
pub trait Segment: Send + Sync + 'static {
fn compact(
&self,
out_file: &impl FileExt,
id: uuid::Uuid,
) -> impl Future<Output = Result<Vec<u8>>> + Send;
fn start_frame_no(&self) -> u64;
fn last_committed(&self) -> u64;
fn index(&self) -> &fst::Map<Arc<[u8]>>;
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool>;
fn is_checkpointable(&self) -> bool;
fn size_after(&self) -> u32;
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
where
B: IoBufMut + Send + 'static;
fn timestamp(&self) -> DateTime<Utc>;
fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()>;
}
impl<T: Segment> Segment for Arc<T> {
fn compact(
&self,
out_file: &impl FileExt,
id: uuid::Uuid,
) -> impl Future<Output = Result<Vec<u8>>> + Send {
self.as_ref().compact(out_file, id)
}
fn start_frame_no(&self) -> u64 {
self.as_ref().start_frame_no()
}
fn last_committed(&self) -> u64 {
self.as_ref().last_committed()
}
fn index(&self) -> &fst::Map<Arc<[u8]>> {
self.as_ref().index()
}
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
self.as_ref().read_page(page_no, max_frame_no, buf)
}
fn is_checkpointable(&self) -> bool {
self.as_ref().is_checkpointable()
}
fn size_after(&self) -> u32 {
self.as_ref().size_after()
}
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
where
B: IoBufMut + Send + 'static,
{
self.as_ref().read_frame_offset_async(offset, buf).await
}
fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()> {
self.as_ref().destroy(io)
}
fn timestamp(&self) -> DateTime<Utc> {
self.as_ref().timestamp()
}
}
#[repr(C)]
#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
pub struct FrameHeader {
pub page_no: U32,
pub size_after: U32,
pub frame_no: U64,
}
impl FrameHeader {
pub fn page_no(&self) -> u32 {
self.page_no.get()
}
pub fn size_after(&self) -> u32 {
self.size_after.get()
}
pub fn frame_no(&self) -> u64 {
self.frame_no.get()
}
pub fn set_frame_no(&mut self, frame_no: u64) {
self.frame_no = frame_no.into();
}
pub fn set_page_no(&mut self, page_no: u32) {
self.page_no = page_no.into();
}
pub fn set_size_after(&mut self, size_after: u32) {
self.size_after = size_after.into();
}
pub fn is_commit(&self) -> bool {
self.size_after() != 0
}
}
#[repr(C)]
#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
pub struct CheckedFrame {
checksum: U32,
frame: Frame,
}
impl CheckedFrame {
pub(crate) const fn offset_of_frame() -> usize {
offset_of!(Self, frame)
}
}
#[repr(C)]
#[derive(Debug, zerocopy::AsBytes, zerocopy::FromBytes, zerocopy::FromZeroes)]
pub struct Frame {
header: FrameHeader,
data: [u8; LIBSQL_PAGE_SIZE as usize],
}
impl Frame {
pub(crate) fn checksum(&self, previous_checksum: u32) -> u32 {
let mut digest = crc32fast::Hasher::new_with_initial(previous_checksum);
digest.write(self.as_bytes());
digest.finalize()
}
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn header(&self) -> &FrameHeader {
&self.header
}
pub fn header_mut(&mut self) -> &mut FrameHeader {
&mut self.header
}
pub(crate) fn size_after(&self) -> Option<u32> {
let size_after = self.header().size_after.get();
(size_after != 0).then_some(size_after)
}
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data
}
}
#[inline]
fn checked_frame_offset(offset: u32) -> u64 {
(size_of::<SegmentHeader>() + (offset as usize) * size_of::<CheckedFrame>()) as u64
}
#[inline]
fn frame_offset(offset: u32) -> u64 {
checked_frame_offset(offset) + CheckedFrame::offset_of_frame() as u64
}
#[inline]
fn page_offset(offset: u32) -> u64 {
frame_offset(offset) + size_of::<FrameHeader>() as u64
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn offsets() {
assert_eq!(checked_frame_offset(0) as usize, size_of::<SegmentHeader>());
assert_eq!(
frame_offset(0) as usize,
size_of::<SegmentHeader>() + CheckedFrame::offset_of_frame()
);
assert_eq!(
page_offset(0) as usize,
size_of::<SegmentHeader>() + CheckedFrame::offset_of_frame() + size_of::<FrameHeader>()
);
}
}