raft-log 0.3.0

Raft log implementation
Documentation
//! Manages the creation, opening, and management of log chunks.
//!
//! A chunk is a segment of the Write-Ahead Log (WAL) that contains a sequence
//! of records. Chunks are used to:
//! - Break down large logs into manageable pieces
//! - Enable efficient record lookup and iteration
//! - Support log truncation and cleanup
//!
//! Each chunk maintains its position in the global log using absolute offsets,
//! which allows for consistent addressing regardless of chunk boundaries.

pub(crate) mod chunk_id;
pub(crate) mod closed_chunk;
pub(crate) mod open_chunk;
mod record_iterator;

use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::marker::PhantomData;
use std::os::unix::fs::FileExt;
use std::sync::Arc;

use codeq::Decode;
use codeq::OffsetSize;
use codeq::error_context_ext::ErrorContextExt;
use log::error;
use log::warn;
use record_iterator::RecordIterator;

use crate::Config;
use crate::Types;
use crate::WALRecord;
use crate::chunk::chunk_id::ChunkId;
use crate::num::format_pad9_u64;
use crate::types::Segment;

/// Represents a chunk of the Write-Ahead Log containing a sequence of records.
///
/// A chunk maintains:
/// - A file handle for persistent storage
/// - Global offsets for all records it contains
/// - Metadata about its position in the complete log
#[derive(Debug, Clone)]
pub struct Chunk<T> {
    /// File handle for the chunk's persistent storage
    pub(crate) f: Arc<File>,

    /// The global offsets of each record in the file.
    ///
    /// Contains N+1 offsets where N is the number of records:
    /// - First offset is the chunk's starting position
    /// - Last offset is the end of the last record
    /// - Offsets are absolute positions in the complete log, not relative to
    ///   chunk start
    pub(crate) global_offsets: Vec<u64>,

    /// Records the original file size if the chunk was truncated due to an
    /// incomplete write.
    ///
    /// This field is primarily used for testing and debugging purposes.
    #[allow(dead_code)]
    pub(crate) truncated: Option<u64>,

    pub(crate) _p: PhantomData<T>,
}

impl<T> Chunk<T> {
    /// Returns the number of records stored in this chunk.
    pub(crate) fn records_count(&self) -> usize {
        self.global_offsets.len() - 1
    }

    /// Returns this chunk's globally unique identifier.
    pub(crate) fn chunk_id(&self) -> ChunkId {
        ChunkId(self.global_offsets[0])
    }

    /// Returns the segment representing the last record in this chunk.
    pub(crate) fn last_segment(&self) -> Segment {
        let offsets = &self.global_offsets;
        let l = offsets.len();

        let start = offsets[l - 2];
        let end = offsets[l - 1];

        Segment::new(start, end - start)
    }

    /// Returns the total size of this chunk in bytes.
    pub(crate) fn chunk_size(&self) -> u64 {
        self.end_offset()
    }

    /// Returns the size of this chunk in bytes, calculated as the difference
    /// between its end and start offsets.
    #[allow(dead_code)]
    pub(crate) fn end_offset(&self) -> u64 {
        self.global_offsets[self.global_offsets.len() - 1]
            - self.global_offsets[0]
    }

    /// Returns the global offset where this chunk begins.
    pub(crate) fn global_start(&self) -> u64 {
        self.global_offsets[0]
    }

    /// Returns the global offset where this chunk ends.
    #[allow(dead_code)]
    pub(crate) fn global_end(&self) -> u64 {
        self.global_offsets[self.global_offsets.len() - 1]
    }

    /// Appends the size of a new record to the global offsets list.
    pub(crate) fn append_record_size(&mut self, size: u64) {
        let last = self.global_offsets[self.global_offsets.len() - 1];
        self.global_offsets.push(last + size);
    }

    pub(crate) fn open_chunk_file(
        config: &Config,
        chunk_id: ChunkId,
    ) -> Result<File, io::Error> {
        let path = config.chunk_path(chunk_id);
        let f = OpenOptions::new()
            .read(true)
            .write(true)
            .open(path)
            .context(|| format!("open {}", chunk_id))?;

        Ok(f)
    }
}

impl<T> Chunk<T>
where T: Types
{
    /// Opens a chunk and loads its records.
    ///
    /// This function performs the following steps:
    /// 1. Opens the chunk file
    /// 2. Loads the records from the file
    /// 3. Verifies the integrity of the records
    pub(crate) fn open(
        config: Arc<Config>,
        chunk_id: ChunkId,
    ) -> Result<(Self, Vec<WALRecord<T>>), io::Error> {
        let f = Self::open_chunk_file(&config, chunk_id)?;
        let arc_f = Arc::new(f);
        let file_size = arc_f.metadata()?.len();
        let it = Self::load_records_iter(&config, arc_f.clone(), chunk_id)?;

        let mut record_offsets = vec![chunk_id.offset()];
        let mut records = Vec::new();
        let mut truncate = false;

        for res in it {
            match res {
                Ok((seg, record)) => {
                    record_offsets.push(chunk_id.offset() + seg.end().0);
                    records.push(record);
                }
                Err(io_err) => {
                    let global_offset = record_offsets.last().copied().unwrap();
                    truncate = Self::handle_record_error(
                        io_err,
                        arc_f.clone(),
                        global_offset,
                        chunk_id,
                        &config,
                    )?;
                    break;
                }
            };
        }

        let truncated = if truncate {
            arc_f
                .set_len(*record_offsets.last().unwrap() - chunk_id.offset())?;
            arc_f.sync_all()?;
            Some(file_size)
        } else {
            None
        };

        let chunk = Self {
            f: arc_f,
            global_offsets: record_offsets,
            truncated,
            _p: Default::default(),
        };

        Ok((chunk, records))
    }

    /// Handles a record read error and determines if truncation should occur.
    ///
    /// Returns `Ok(true)` if the file should be truncated at the error
    /// position. Returns `Err` if the error is unrecoverable.
    fn handle_record_error(
        io_err: io::Error,
        file: Arc<File>,
        global_offset: u64,
        chunk_id: ChunkId,
        config: &Config,
    ) -> Result<bool, io::Error> {
        let at = format!(
            "at offset {} in chunk {}",
            format_pad9_u64(global_offset),
            chunk_id
        );
        error!(
            "Error reading record {at}: {}, error kind: {:?}; trying to recover...",
            io_err,
            io_err.kind()
        );

        let can_truncate = config.truncate_incomplete_record();

        // UnexpectedEof: incomplete record, can truncate if enabled
        if io_err.kind() == io::ErrorKind::UnexpectedEof {
            if can_truncate {
                warn!("UnexpectedEof {at}; truncating");
                return Ok(true);
            }
            error!("UnexpectedEof {at}; truncate disabled");
            return Err(io_err);
        }

        // Other errors: check for trailing zeros (can happen with EXT4
        // data=writeback mode where data and metadata are written in arbitrary
        // order)
        let all_zero = Self::verify_trailing_zeros(
            file,
            global_offset - chunk_id.offset(),
            chunk_id,
        )?;

        if all_zero && can_truncate {
            warn!("Trailing zeros {at}; truncating");
            return Ok(true);
        }

        if all_zero {
            error!("Trailing zeros {at}; truncate disabled");
        } else {
            error!("Damaged record({}) {at}", io_err);
        }
        Err(io_err)
    }

    /// Checks if a file contains only zero bytes from a specified offset to the
    /// end.
    ///
    /// This function is used to detect and validate partially written or
    /// corrupted data. It reads the file in chunks and verifies that all
    /// bytes after the given offset are zeros. This is particularly useful
    /// for detecting incomplete or interrupted writes where the remaining
    /// space may have been zero-filled.
    fn verify_trailing_zeros(
        file: Arc<File>,
        mut start_offset: u64,
        chunk_id: ChunkId,
    ) -> Result<bool, io::Error> {
        let file_size = file.metadata()?.len();

        if start_offset > file_size {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                format!(
                    "Start offset {} exceeds file size {}",
                    start_offset, file_size
                ),
            ));
        }

        if file_size == start_offset {
            return Ok(true);
        }

        const WARN_THRESHOLD: u64 = 64 * 1024; // 64KB
        if file_size - start_offset > WARN_THRESHOLD {
            warn!(
                "Large maybe damaged section detected: {} bytes to the end; in chunk {}",
                file_size - start_offset,
                chunk_id
            );
        }

        const READ_CHUNK_SIZE: usize = 1024; // 1KB
        let mut buffer = vec![0u8; READ_CHUNK_SIZE];

        loop {
            let n = file.read_at(&mut buffer, start_offset)?;
            if n == 0 {
                break;
            }

            for (i, byt) in buffer.iter().enumerate().take(n) {
                if *byt != 0 {
                    error!(
                        "Non-zero byte detected at offset {} in chunk {}",
                        start_offset + i as u64,
                        chunk_id
                    );
                    return Ok(false);
                }
            }

            start_offset += n as u64;
        }
        Ok(true)
    }

    #[allow(clippy::type_complexity)]
    pub(crate) fn dump(
        config: &Config,
        chunk_id: ChunkId,
    ) -> Result<Vec<Result<(Segment, WALRecord<T>), io::Error>>, io::Error>
    {
        let f = Self::open_chunk_file(config, chunk_id)?;
        let it = Self::load_records_iter(config, Arc::new(f), chunk_id)?;

        Ok(it.collect::<Vec<_>>())
    }

    /// Returns an iterator of `start, end, record` or error.
    ///
    /// This method requires a newly opened file whose position is at the
    /// beginning, because the returned iterator reads sequentially from
    /// the current file position via `BufReader`.
    pub(crate) fn load_records_iter(
        config: &Config,
        f: Arc<File>,
        chunk_id: ChunkId,
    ) -> Result<
        impl Iterator<Item = Result<(Segment, WALRecord<T>), io::Error>> + '_,
        io::Error,
    > {
        let file_size = f
            .metadata()
            .context(|| format!("get file size of {chunk_id}"))?
            .len();

        let br = io::BufReader::with_capacity(config.read_buffer_size(), f);
        Ok(RecordIterator::new(br, file_size, chunk_id))
    }

    /// Read a record from the chunk at the specified segment.
    ///
    /// Uses `pread` (positional read) to atomically read from a specific offset
    /// without changing the file position. This avoids race conditions when
    /// multiple threads read from the same chunk concurrently.
    pub(crate) fn read_record(
        &self,
        segment: Segment,
    ) -> Result<WALRecord<T>, io::Error> {
        let offset = segment.offset().0 - self.global_start();
        let size = *segment.size() as usize;

        let mut buf = vec![0u8; size];
        self.f.read_exact_at(&mut buf, offset)?;

        WALRecord::<T>::decode(&buf[..]).context(|| {
            format!("decode Record {:?} in {}", segment, self.chunk_id())
        })
    }
}