mod util;
use crate::{
global::next_seq_id,
page::{Page, EntryHeader},
util::{get_blksize, get_file_handler},
worker::LogWorker,
};
use crossbeam_channel::Sender;
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use std::{io::Error, path::PathBuf};
use std::io::Read;
use std::ptr;
mod errors;
mod global;
mod page;
mod worker;
#[repr(C)]
#[derive(Clone, Default)]
pub struct LogMessage<T> {
pub seq_id: u64,
pub data: T,
}
struct PageManager<T> {
pages: Vec<Page<T>>,
active_idx: usize,
pending_status: Vec<bool>,
}
impl<T> PageManager<T> {
pub fn new(page_size: usize, count: usize) -> Self {
let mut pages = Vec::with_capacity(count);
let mut pending_status = Vec::with_capacity(count);
for _ in 0..count {
pages.push(Page::init(page_size));
pending_status.push(false);
}
Self {
pages,
active_idx: 0,
pending_status,
}
}
pub fn get_active_page(&mut self) -> &mut Page<T> {
&mut self.pages[self.active_idx]
}
pub fn advance(&mut self) -> usize {
let prev = self.active_idx;
self.active_idx = (self.active_idx + 1) % self.pages.len();
prev
}
}
struct LogBuffer<T> {
inner: Vec<UnsafeCell<LogMessage<T>>>,
}
unsafe impl<T: Send + Sync> Sync for LogBuffer<T> {}
unsafe impl<T: Send + Sync> Send for LogBuffer<T> {}
pub struct Logger<T> {
data_buffer: Option<Arc<LogBuffer<T>>>,
sender: Option<Sender<usize>>,
worker_handle: Option<thread::JoinHandle<()>>,
capacity: usize,
logpath: Option<String>,
flush_interval: Option<u64>,
poll_interval: Option<u64>,
}
impl<T: Send + Sync + Default + Copy + 'static> Logger<T> {
pub fn new() -> Self {
Self {
data_buffer: None,
sender: None,
worker_handle: None,
capacity: 0,
logpath: None,
flush_interval: None,
poll_interval: None,
}
}
pub fn with_write_config(mut self, logpath: String, capacity: usize, flush_interval: u64, poll_interval: u64) -> Self {
self.logpath = Some(logpath);
self.capacity = capacity;
self.flush_interval = Some(flush_interval);
self.poll_interval = Some(poll_interval);
self
}
pub fn start(&mut self) -> Result<(), Error> {
if let (Some(logpath), Some(flush_interval), Some(poll_interval)) = (&self.logpath, self.flush_interval, self.poll_interval) {
let capacity = self.capacity;
let mut raw_vec = Vec::with_capacity(capacity);
for _ in 0..capacity {
raw_vec.push(UnsafeCell::new(LogMessage::default()));
}
let data_buffer = Arc::new(LogBuffer { inner: raw_vec });
let (sender, receiver) = crossbeam_channel::bounded::<usize>(capacity);
let path = PathBuf::from(logpath);
let blk_size = get_blksize(&path) as usize;
let worker_buffer = data_buffer.clone();
let page_manager = PageManager::new(blk_size, 256);
let file = get_file_handler(&path)?;
let flush_interval_duration = flush_interval;
let poll_interval_duration = poll_interval;
let handle = thread::spawn(move || {
let ring = io_uring::IoUring::new(256).expect("failed to init io_uring");
let mut worker = LogWorker {
receiver,
pages: page_manager,
data_buffer: worker_buffer,
last_flush: Instant::now(),
flush_interval: Duration::from_nanos(flush_interval_duration),
poll_interval: Duration::from_nanos(poll_interval_duration),
logfile: &file,
ring,
pending_writes: 0,
};
worker.run();
});
self.data_buffer = Some(data_buffer);
self.sender = Some(sender);
self.worker_handle = Some(handle);
Ok(())
} else {
Err(Error::new(std::io::ErrorKind::InvalidInput, "Config missing"))
}
}
pub fn with_read_config(mut self, logpath: String) -> Self {
self.logpath = Some(logpath);
self
}
pub fn read(&self) -> Result<Vec<T>, Error> {
let logpath = self.logpath.as_ref().ok_or(Error::new(std::io::ErrorKind::NotFound, "Log path not configured"))?;
let mut file = std::fs::File::open(logpath)?;
let mut vec = Vec::new();
let path = PathBuf::from(logpath);
let blk_size = get_blksize(&path) as usize;
let mut buffer = vec![0u8; blk_size];
loop {
let bytes_read = file.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
let mut cursor = 0;
while cursor < bytes_read {
if cursor + std::mem::size_of::<EntryHeader>() > bytes_read {
break;
}
let header_ptr = unsafe { buffer.as_ptr().add(cursor) as *const EntryHeader };
let header = unsafe { ptr::read_unaligned(header_ptr) };
if header.len == 0 {
break;
}
let msg_size = header.len as usize;
let header_size = std::mem::size_of::<EntryHeader>();
let total_size = header_size + msg_size;
let aligned_size = (total_size + 7) & !7;
if cursor + total_size > bytes_read {
break;
}
let data_ptr = unsafe { buffer.as_ptr().add(cursor + header_size) as *const T };
let data = unsafe { ptr::read_unaligned(data_ptr) };
vec.push(data);
cursor += aligned_size;
}
}
Ok(vec)
}
pub fn log(&mut self, data: T) -> Option<u64> {
if let Some(sender) = &self.sender {
let seq_id = next_seq_id();
let index = (seq_id as usize) % self.capacity;
if let Some(data_buffer) = &self.data_buffer {
unsafe {
let ptr = data_buffer.inner[index].get();
(*ptr).seq_id = seq_id;
(*ptr).data = data;
}
}
let _ = sender.send(index);
return Some(seq_id);
}
return None;
}
pub fn get_last_flushed_entry() -> u64 {
global::get_ack_number()
}
}
impl<T> Drop for Logger<T> {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
drop(sender);
}
if let Some(handle) = self.worker_handle.take() {
let _ = handle.join();
}
}
}