picologger 0.9.0

Logger for HPC
Documentation
mod util;
use crate::{
    global::next_seq_id,
    page::{Page, EntryHeader},
    util::{get_blksize, get_file_handler},
    worker::LogWorker,
};
use crossbeam_channel::Sender;
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use std::{io::Error, path::PathBuf};
use std::io::Read;
use std::ptr;

mod errors;
mod global;
mod page;
mod worker;

#[repr(C)]
#[derive(Clone, Default)]
pub struct LogMessage<T> {
    pub seq_id: u64,
    pub data: T,
}

struct PageManager<T> {
    pages: Vec<Page<T>>,
    active_idx: usize,
    pending_status: Vec<bool>,
}

impl<T> PageManager<T> {
    pub fn new(page_size: usize, count: usize) -> Self {
        let mut pages = Vec::with_capacity(count);
        let mut pending_status = Vec::with_capacity(count);
        for _ in 0..count {
            pages.push(Page::init(page_size));
            pending_status.push(false);
        }
        Self {
            pages,
            active_idx: 0,
            pending_status,
        }
    }

    pub fn get_active_page(&mut self) -> &mut Page<T> {
        &mut self.pages[self.active_idx]
    }

    pub fn advance(&mut self) -> usize {
        let prev = self.active_idx;
        self.active_idx = (self.active_idx + 1) % self.pages.len();
        prev
    }
}

struct LogBuffer<T> {
    inner: Vec<UnsafeCell<LogMessage<T>>>,
}

unsafe impl<T: Send + Sync> Sync for LogBuffer<T> {}
unsafe impl<T: Send + Sync> Send for LogBuffer<T> {}

pub struct Logger<T> {
    data_buffer: Option<Arc<LogBuffer<T>>>,
    sender: Option<Sender<usize>>,
    worker_handle: Option<thread::JoinHandle<()>>,
    capacity: usize,
    logpath: Option<String>,
    flush_interval: Option<u64>,
    poll_interval: Option<u64>,
}

impl<T: Send + Sync + Default + Copy + 'static> Logger<T> {
    pub fn new() -> Self {
        Self {
            data_buffer: None,
            sender: None,
            worker_handle: None,
            capacity: 0,
            logpath: None,
            flush_interval: None,
            poll_interval: None,
        }
    }

    pub fn with_write_config(mut self, logpath: String, capacity: usize, flush_interval: u64, poll_interval: u64) -> Self {
        self.logpath = Some(logpath);
        self.capacity = capacity;
        self.flush_interval = Some(flush_interval);
        self.poll_interval = Some(poll_interval);
        self
    }

    pub fn start(&mut self) -> Result<(), Error> {
        if let (Some(logpath), Some(flush_interval), Some(poll_interval)) = (&self.logpath, self.flush_interval, self.poll_interval) {
            let capacity = self.capacity;
            let mut raw_vec = Vec::with_capacity(capacity);
            for _ in 0..capacity {
                raw_vec.push(UnsafeCell::new(LogMessage::default()));
            }

            let data_buffer = Arc::new(LogBuffer { inner: raw_vec });

            let (sender, receiver) = crossbeam_channel::bounded::<usize>(capacity);

            let path = PathBuf::from(logpath);
            let blk_size = get_blksize(&path) as usize;

            let worker_buffer = data_buffer.clone();

            let page_manager = PageManager::new(blk_size, 256);

            let file = get_file_handler(&path)?;
            let flush_interval_duration = flush_interval;
            let poll_interval_duration = poll_interval;

            let handle = thread::spawn(move || {
                let ring = io_uring::IoUring::new(256).expect("failed to init io_uring");
                let mut worker = LogWorker {
                    receiver,
                    pages: page_manager,
                    data_buffer: worker_buffer,
                    last_flush: Instant::now(),
                    flush_interval: Duration::from_nanos(flush_interval_duration),
                    poll_interval: Duration::from_nanos(poll_interval_duration),
                    logfile: &file,
                    ring,
                    pending_writes: 0,
                };
                worker.run();
            });

            self.data_buffer = Some(data_buffer);
            self.sender = Some(sender);
            self.worker_handle = Some(handle);
            
            Ok(())
        } else {
             Err(Error::new(std::io::ErrorKind::InvalidInput, "Config missing"))
        }
    }

    pub fn with_read_config(mut self, logpath: String) -> Self {
        self.logpath = Some(logpath);
        self
    }

    pub fn read(&self) -> Result<Vec<T>, Error> {
        let logpath = self.logpath.as_ref().ok_or(Error::new(std::io::ErrorKind::NotFound, "Log path not configured"))?;
        let mut file = std::fs::File::open(logpath)?;
        let mut vec = Vec::new();
        let path = PathBuf::from(logpath);
        let blk_size = get_blksize(&path) as usize;
        
        let mut buffer = vec![0u8; blk_size];
        
        loop {
            let bytes_read = file.read(&mut buffer)?;
            if bytes_read == 0 {
                break;
            }
            
            let mut cursor = 0;
            while cursor < bytes_read {
                if cursor + std::mem::size_of::<EntryHeader>() > bytes_read {
                    break; 
                }
                
                let header_ptr = unsafe { buffer.as_ptr().add(cursor) as *const EntryHeader };
                let header = unsafe { ptr::read_unaligned(header_ptr) };
                
                if header.len == 0 {
                    break;
                }
                
                let msg_size = header.len as usize;
                let header_size = std::mem::size_of::<EntryHeader>();
                let total_size = header_size + msg_size;
                let aligned_size = (total_size + 7) & !7;
                
                if cursor + total_size > bytes_read {
                    break;
                }
                
                let data_ptr = unsafe { buffer.as_ptr().add(cursor + header_size) as *const T };
                let data = unsafe { ptr::read_unaligned(data_ptr) };
                vec.push(data);
                
                cursor += aligned_size;
            }
        }
        Ok(vec)
    }

    pub fn log(&mut self, data: T) -> Option<u64> {
        if let Some(sender) = &self.sender {
            let seq_id = next_seq_id();
            let index = (seq_id as usize) % self.capacity;

            if let Some(data_buffer) = &self.data_buffer {
                unsafe {
                    let ptr = data_buffer.inner[index].get();
                    (*ptr).seq_id = seq_id;
                    (*ptr).data = data;
                }
            }

            let _ = sender.send(index);
            return Some(seq_id);
        }
        return None;
    }

    pub fn get_last_flushed_entry() -> u64 {
        global::get_ack_number()
    }
}

impl<T> Drop for Logger<T> {
    fn drop(&mut self) {
        if let Some(sender) = self.sender.take() {
            drop(sender);
        }

        if let Some(handle) = self.worker_handle.take() {
            let _ = handle.join();
        }
    }
}