picologger 0.9.4

Logger for HPC
Documentation
use crate::errors::PicoError;
use crate::global::{next_page_id, set_ack_number};
use crate::{LogBuffer, PageManager};
use crossbeam_channel::{Receiver, RecvTimeoutError};
use io_uring::{IoUring, opcode, types};
use std::fs::File;
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use std::time::{Duration, Instant};

pub struct LogWorker<'a, T> {
    pub receiver: Receiver<usize>,
    pub data_buffer: Arc<LogBuffer<T>>,
    pub pages: PageManager<T>,
    pub last_flush: Instant,
    pub flush_interval: Duration,
    pub poll_interval: Duration,
    pub logfile: &'a File,
    pub ring: IoUring,
    pub pending_writes: usize,
}

impl<'a, T> LogWorker<'a, T> {
    pub fn run(&mut self) {
        loop {
            self.process_completions();

            if self.last_flush.elapsed() >= self.flush_interval {
                self.flush_current_page();
            }

            let time_since_flush = self.last_flush.elapsed();

            let time_until_flush = if time_since_flush > self.flush_interval {
                Duration::ZERO
            } else {
                self.flush_interval - time_since_flush
            };

            let timeout = std::cmp::min(time_until_flush, self.poll_interval);

            match self.receiver.recv_timeout(timeout) {
                Ok(msg) => self.handle_message(msg),
                Err(RecvTimeoutError::Timeout) => {
                    continue;
                }
                Err(RecvTimeoutError::Disconnected) => {
                    self.flush_remaining();
                    break;
                }
            }
        }
    }

    fn handle_message(&mut self, idx: usize) {
        let log_msg = unsafe { &*self.data_buffer.inner[idx].get() };

        if let Err(PicoError::PageFull { .. }) =
            self.pages.get_active_page().append(log_msg.seq_id, &log_msg.data)
        {
            self.flush_current_page();
            let _ = self.pages.get_active_page().append(log_msg.seq_id, &log_msg.data);
        }
    }

    fn flush_current_page(&mut self) {
        let page_idx = self.pages.active_idx;
        let page = &self.pages.pages[page_idx];
        
        if page.is_empty() {
            self.last_flush = Instant::now();
            return;
        }

        self.pages.pending_status[page_idx] = true;

        let page_id = next_page_id();
        let offset = page_id * (page.get_page_content().len() as u64);
        let buf = page.get_page_content();

        let seq_id = page.get_last_entry();
        let user_data = ((page_idx as u64) << 56) | (seq_id & 0x00FF_FFFF_FFFF_FFFF);

        let write_e = opcode::Write::new(
            types::Fd(self.logfile.as_raw_fd()),
            buf.as_ptr(),
            buf.len() as _,
        )
        .offset(offset)
        .build()
        .user_data(user_data);

        unsafe {
            if self.ring.submission().push(&write_e).is_err() {
                self.ring.submit().expect("Fail to submit to clear SQ");

                self.ring
                    .submission()
                    .push(&write_e)
                    .expect("SQ full even after submit");
            }
        }

        let _ = self.ring.submit(); 
        self.pending_writes += 1;


        let _ = self.pages.advance();
        
        self.wait_if_next_page_pending();
        self.pages.get_active_page().reset();

        self.last_flush = Instant::now();
    }

    fn wait_if_next_page_pending(&mut self) {
        let idx = self.pages.active_idx;
        while self.pages.pending_status[idx] {
            self.ring.submit_and_wait(1).expect("failed to wait");
            self.process_completions();
        }
    }

    fn process_completions(&mut self) {
        let mut cq = self.ring.completion();
        while let Some(cqe) = cq.next() {
            if self.pending_writes > 0 {
                self.pending_writes -= 1;
            }
            if cqe.result() >= 0 {
                let user_data = cqe.user_data();
                let page_idx = (user_data >> 56) as usize;
                let seq_id = user_data & 0x00FF_FFFF_FFFF_FFFF;
                
                if page_idx < self.pages.pending_status.len() {
                    self.pages.pending_status[page_idx] = false;
                }
                
                set_ack_number(seq_id);
            } else {
                eprintln!("Async write failed: {}", cqe.result());
            }
        }
    }

    fn flush_remaining(&mut self) {
        self.flush_current_page();
        while self.pending_writes > 0 {
            self.ring.submit_and_wait(1).expect("failed to wait");
            self.process_completions();
        }
    }
}