use crate::Options;
use crate::types::traits::IAsSlice;
use crate::io::{self, GatherIO, IoVec};
use std::fmt::Debug;
use std::path::PathBuf;
#[derive(Clone, Copy)]
pub struct LenSeq {
pub len: u32,
pub seq: u32,
}
impl LenSeq {
pub const fn new(len: u32, seq: u32) -> Self {
Self { len, seq }
}
}
#[derive(Clone, Copy, Debug)]
#[repr(C, packed(1))]
pub struct Reloc {
pub(crate) off: usize,
pub(crate) len: u32,
pub(crate) seq: u32,
pub(crate) crc: u32,
}
#[derive(Debug, Clone, Copy)]
#[repr(C, packed(1))]
pub struct AddrPair {
pub(crate) key: u64,
pub(crate) val: Reloc,
}
impl AddrPair {
pub const LEN: usize = size_of::<Self>();
pub fn new(key: u64, off: usize, len: u32, seq: u32, crc: u32) -> Self {
Self {
key,
val: Reloc { off, len, seq, crc },
}
}
}
impl IAsSlice for AddrPair {}
#[derive(Debug, Clone, Copy)]
#[repr(C, packed(1))]
pub struct MapEntry {
pub page_id: u64,
pub page_addr: u64,
}
impl IAsSlice for MapEntry {}
#[derive(Clone, Copy)]
#[repr(C, packed(1))]
pub struct Interval {
pub lo: u64,
pub hi: u64,
}
impl Debug for Interval {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("[{}, {}]", { self.lo }, { self.hi }))
}
}
impl Interval {
pub const LEN: usize = size_of::<Self>();
pub const fn new(lo: u64, hi: u64) -> Self {
Self { lo, hi }
}
}
impl IAsSlice for Interval {}
pub struct GatherWriter {
path: PathBuf,
file: io::File,
queue: Vec<IoVec>,
queued_len: usize,
max_iovcnt: usize,
}
unsafe impl Send for GatherWriter {}
impl GatherWriter {
pub(crate) const MAX_IOVCNT: usize = 1024;
pub(crate) const DEFAULT_IOVCNT: usize = 64;
fn open(path: &PathBuf, trunc: bool) -> io::File {
io::File::options()
.write(true)
.append(true)
.trunc(trunc)
.create(true)
.open(path)
.inspect_err(|x| {
log::error!("can't open {path:?}, {x}");
})
.unwrap()
}
fn create(path: &PathBuf, max_iovcnt: usize, trunc: bool) -> Self {
Self {
path: path.clone(),
file: Self::open(path, trunc),
queue: Vec::with_capacity(max_iovcnt),
queued_len: 0,
max_iovcnt: if max_iovcnt >= Self::MAX_IOVCNT {
Self::DEFAULT_IOVCNT
} else {
max_iovcnt
},
}
}
pub fn trunc(path: &PathBuf, max_iovcnt: usize) -> Self {
Self::create(path, max_iovcnt, true)
}
pub fn append(path: &PathBuf, max_iovcnt: usize) -> Self {
Self::create(path, max_iovcnt, false)
}
pub fn queue(&mut self, data: &[u8]) {
if self.queue.len() >= self.max_iovcnt {
self.flush();
}
self.queue.push(data.into());
self.queued_len += data.len();
}
#[allow(unused)]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn pos(&self) -> u64 {
self.file
.size()
.inspect_err(|x| {
log::error!("can't get metadata of {:?}, {}", self.path, x);
})
.unwrap()
}
pub fn write(&mut self, data: &[u8]) {
self.file
.write(data)
.inspect_err(|e| {
log::error!("can't write: {:?}", e);
})
.expect("can't fail");
}
pub fn flush(&mut self) {
let iov = self.queue.as_mut_slice();
self.file
.writev(iov, self.queued_len)
.inspect_err(|x| log::error!("can't write iov, {x}"))
.unwrap();
self.queued_len = 0;
self.queue.clear();
}
pub fn sync(&mut self) {
self.file
.sync()
.inspect_err(|x| {
log::error!("can't sync {:?}, {}", self.path, x);
})
.expect("can't fail");
}
pub fn sync_data(&mut self) {
self.file
.sync_data()
.inspect_err(|x| {
log::error!("can't sync data {:?}, {}", self.path, x);
})
.expect("can't fail");
}
}
impl Drop for GatherWriter {
fn drop(&mut self) {
self.flush();
}
}
#[derive(Debug, Clone, Copy, Default)]
#[repr(C)]
pub struct Position {
pub file_id: u64,
pub offset: u64,
}
pub type GroupPositions = [Position; Options::MAX_CONCURRENT_WRITE as usize];
pub const fn init_group_pos() -> GroupPositions {
[Position::MIN; Options::MAX_CONCURRENT_WRITE as usize]
}
impl Position {
pub const MIN: Self = Position::new(u64::MIN, u64::MIN);
pub const fn new(id: u64, off: u64) -> Self {
Self {
file_id: id,
offset: off,
}
}
}
impl Ord for Position {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.file_id.cmp(&other.file_id) {
std::cmp::Ordering::Equal => self.offset.cmp(&other.offset),
o => o,
}
}
}
impl PartialOrd for Position {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for Position {
fn eq(&self, other: &Self) -> bool {
self.cmp(other).is_eq()
}
}
impl Eq for Position {}