use std::cmp::Ordering;
use std::fmt::{self, Display};
use fail::fail_point;
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::ToPrimitive;
use serde_repr::{Deserialize_repr, Serialize_repr};
use strum::EnumIter;
use crate::Result;
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum LogQueue {
Append = 0,
Rewrite = 1,
}
pub type FileSeq = u64;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct FileId {
pub queue: LogQueue,
pub seq: FileSeq,
}
impl FileId {
pub fn new(queue: LogQueue, seq: FileSeq) -> Self {
Self { queue, seq }
}
#[cfg(test)]
pub fn dummy(queue: LogQueue) -> Self {
Self { queue, seq: 0 }
}
}
impl std::cmp::Ord for FileId {
fn cmp(&self, other: &Self) -> Ordering {
match (self.queue, other.queue) {
(LogQueue::Append, LogQueue::Rewrite) => Ordering::Greater,
(LogQueue::Rewrite, LogQueue::Append) => Ordering::Less,
_ => self.seq.cmp(&other.seq),
}
}
}
impl std::cmp::PartialOrd for FileId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct FileBlockHandle {
pub id: FileId,
pub offset: u64,
pub len: usize,
}
impl FileBlockHandle {
#[cfg(test)]
pub fn dummy(queue: LogQueue) -> Self {
Self {
id: FileId::dummy(queue),
offset: 0,
len: 0,
}
}
}
#[repr(u64)]
#[derive(
Clone,
Copy,
Debug,
Eq,
PartialEq,
FromPrimitive,
ToPrimitive,
Serialize_repr,
Deserialize_repr,
EnumIter,
Default,
)]
pub enum Version {
#[default]
V1 = 1,
V2 = 2,
}
impl Version {
pub fn has_log_signing(&self) -> bool {
fail_point!("pipe_log::version::force_enable_log_signing", |_| { true });
match self {
Version::V1 => false,
Version::V2 => true,
}
}
}
impl Display for Version {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_u64().unwrap())
}
}
pub struct LogFileContext {
pub id: FileId,
pub version: Version,
}
impl LogFileContext {
pub fn new(id: FileId, version: Version) -> Self {
Self { id, version }
}
pub fn get_signature(&self) -> Option<u32> {
if self.version.has_log_signing() {
Some(self.id.seq as u32)
} else {
None
}
}
}
pub trait ReactiveBytes {
fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8];
}
impl<T> ReactiveBytes for &T
where
T: AsRef<[u8]> + ?Sized,
{
fn as_bytes(&mut self, _ctx: &LogFileContext) -> &[u8] {
(*self).as_ref()
}
}
pub trait PipeLog: Sized {
fn read_bytes(&self, handle: FileBlockHandle) -> Result<Vec<u8>>;
fn append<T: ReactiveBytes + ?Sized>(
&self,
queue: LogQueue,
bytes: &mut T,
) -> Result<FileBlockHandle>;
fn sync(&self, queue: LogQueue) -> Result<()>;
fn file_span(&self, queue: LogQueue) -> (FileSeq, FileSeq);
fn file_at(&self, queue: LogQueue, mut position: f64) -> FileSeq {
position = position.clamp(0.0, 1.0);
let (first, active) = self.file_span(queue);
let count = active - first + 1;
first + (count as f64 * position) as u64
}
fn total_size(&self, queue: LogQueue) -> usize;
fn rotate(&self, queue: LogQueue) -> Result<()>;
fn purge_to(&self, file_id: FileId) -> Result<usize>;
}