tempest-core 0.0.2

Core utilities and primitives for TempestDB
Documentation
use std::{cell::RefCell, io, path::PathBuf, rc::Rc};

use bincode::Options;
use bytes::{BufMut, BytesMut};
use tempest_io::{Io, IoBuf, OpenOptions, Statx};
use tempest_rt::{
    JoinHandle, close_file, list_dir, open_file, read_exact, remove_file, spawn, stat_file,
    sync::{
        mpsc::{BoundedSender, bounded},
        oneshot,
    },
    sync_file, write_exact,
};

use crate::{bincode_options, utils::ByteSize};

mod config;
mod header;

#[cfg(test)]
mod tests;

pub use config::{JournalConfig, JournalError, Replayable};
pub use header::{EditPrefix, JOURNAL_MAGIC_NUM};

use header::{EDIT_PREFIX_SIZE, JOURNAL_HEADER_SIZE, JournalHeader};

struct JournalMessage<T: Replayable> {
    edit: T::Edit,
    tx: oneshot::Sender<Result<(), JournalError>>,
}

pub struct JournalHandle<T: Replayable> {
    tx: BoundedSender<JournalMessage<T>>,
    data: Rc<RefCell<T>>,
}

impl<T: Replayable> JournalHandle<T> {
    pub async fn append(&self, edit: T::Edit) -> Result<(), JournalError> {
        let (tx, rx) = oneshot::channel();
        self.tx
            .clone()
            .send(JournalMessage { edit, tx })
            .await
            .map_err(|_| JournalError::WorkerDied)?;

        rx.recv()
            .await
            .map_err(|_| JournalError::WorkerDied)
            .flatten()
    }

    // TODO: should this be a RwLock guarded value?
    pub fn data(&self) -> std::cell::Ref<'_, T> {
        self.data.borrow()
    }
}

#[derive(Debug)]
pub struct Journal<T: Replayable, I: Io> {
    dir: PathBuf,
    data: Rc<RefCell<T>>,
    config: JournalConfig,
    fd: Option<I::Fd>,
    filenum: u64,
    write_offset: u64,
    rotation_threshold: u64,
}

impl<T: Replayable, I: Io> Journal<T, I> {
    fn path(&self, filenum: u64) -> PathBuf {
        self.dir
            .join(format!("{}-{}", T::filename_prefix(), filenum))
    }

    fn write_edit(scratch: &mut BytesMut, edit: &T::Edit) -> Result<u64, JournalError> {
        let initial_size = scratch.len();
        scratch.put_bytes(0, EDIT_PREFIX_SIZE);

        if let Err(e) = bincode_options().serialize_into(scratch.writer(), edit) {
            return Err(e.into());
        }

        let prefix = EditPrefix::new(&scratch[initial_size + EDIT_PREFIX_SIZE..]);
        scratch[initial_size..initial_size + EDIT_PREFIX_SIZE].copy_from_slice(&prefix.encode());

        Ok((scratch.len() - initial_size) as u64)
    }

    async fn create_file(
        &mut self,
        filenum: u64,
        old_fd: Option<I::Fd>,
    ) -> Result<(), JournalError> {
        let path = self.path(filenum);
        debug!(filenum, ?path, "creating new journal file");

        let fd = open_file::<I>(
            path,
            OpenOptions::new().create(true).write(true).truncate(true),
        )
        .await?;

        let mut scratch = BytesMut::new();
        let header = JournalHeader::new(filenum).encode();
        scratch.put_slice(&header);
        Self::write_edit(&mut scratch, &self.data.borrow().snapshot())?;

        let initial_size = scratch.len() as u64;
        let (result, _) = write_exact::<_, I>(fd, scratch, 0).await;
        result?;
        sync_file::<I>(fd).await?;

        if let Some(old_fd) = old_fd {
            let old_path = self.path(self.filenum);
            close_file::<I>(old_fd).await?;
            if let Err(e) = remove_file::<I>(old_path).await {
                warn!("could not remove old journal file: {}", e);
            }
        }

        let factored = initial_size * self.config.growth_factor;
        let rotation_threshold = factored.max(self.config.growth_baseline);

        self.fd = Some(fd);
        self.filenum = filenum;
        self.write_offset = initial_size;
        self.rotation_threshold = rotation_threshold;
        Ok(())
    }

    async fn replay(&mut self, file_size: u64) -> Result<(), JournalError> {
        let fd = self.fd.expect("replay called before fd is set");
        let mut read_offset = JOURNAL_HEADER_SIZE as u64;

        while read_offset < file_size {
            // read edit prefix
            let mut prefix_buf = BytesMut::zeroed(EDIT_PREFIX_SIZE);
            let (result, slice) =
                read_exact::<_, I>(fd, prefix_buf.slice(0..EDIT_PREFIX_SIZE), read_offset).await;
            prefix_buf = slice.into_inner();
            result?;
            let prefix = EditPrefix::decode_from_slice(&prefix_buf);
            read_offset += EDIT_PREFIX_SIZE as u64;

            // read edit body
            let edit_len = prefix.len() as usize;
            let mut edit_buf = BytesMut::zeroed(edit_len);
            let (result, slice) =
                read_exact::<_, I>(fd, edit_buf.slice(0..edit_len), read_offset).await;
            edit_buf = slice.into_inner();
            result?;

            if !prefix.is_valid(&edit_buf) {
                return Err(io::Error::new(
                    io::ErrorKind::InvalidData,
                    "journal record prefix checksum mismatch: potential corruption",
                )
                .into());
            }

            let edit: T::Edit = bincode_options().deserialize(&edit_buf)?;
            self.data.borrow_mut().apply(edit);
            read_offset += edit_len as u64;
        }

        self.write_offset = read_offset;
        Ok(())
    }

    async fn append(&mut self, edit: T::Edit) -> Result<(), JournalError> {
        let fd = self.fd.expect("append called before init");
        let original_offset = self.write_offset;

        let mut scratch = BytesMut::new();
        Self::write_edit(&mut scratch, &edit)?;

        let len = scratch.len() as u64;
        let (result, _) = write_exact::<_, I>(fd, scratch, self.write_offset).await;
        result?;
        sync_file::<I>(fd).await?;

        self.data.borrow_mut().apply(edit);
        self.write_offset += len;

        trace!(size=?ByteSize(self.write_offset - original_offset), "wrote edit");

        // auto-rotate if threshold exceeded
        if self.write_offset >= self.rotation_threshold {
            debug!(
                size = ?ByteSize(self.write_offset),
                threshold = ?ByteSize(self.rotation_threshold),
                "journal size exceeds rotation threshold, rotating file",
            );
            self.rotate().await?;
        }

        Ok(())
    }

    pub async fn rotate(&mut self) -> Result<(), JournalError> {
        debug!("rotating journal file");
        let old_fd = self.fd.take();
        let new_filenum = self.filenum + 1;
        self.create_file(new_filenum, old_fd).await
    }

    pub async fn close(mut self) -> Result<(), JournalError> {
        if let Some(fd) = self.fd.take() {
            close_file::<I>(fd).await?;
        }
        Ok(())
    }

    async fn init(&mut self) -> Result<(), JournalError> {
        let entries = list_dir::<I>(self.dir.clone())?;
        let files: Vec<PathBuf> = entries
            .into_iter()
            .filter(|e| !e.is_dir)
            .map(|e| e.path)
            .collect();

        // scan candidates: open each, read header, find highest filenum
        let mut winner: Option<(u64, PathBuf)> = None;
        for path in files {
            let fd = match open_file::<I>(path.clone(), OpenOptions::new().read(true)).await {
                Ok(fd) => fd,
                Err(err) => {
                    warn!("could not open journal candidate (skipping): {err}");
                    continue;
                }
            };
            let mut scratch = BytesMut::zeroed(JOURNAL_HEADER_SIZE);
            let (result, slice) =
                read_exact::<_, I>(fd, scratch.slice(0..JOURNAL_HEADER_SIZE), 0).await;
            scratch = slice.into_inner();
            close_file::<I>(fd).await.ok();
            if result.is_err() {
                continue;
            }
            match JournalHeader::decode_from_slice(&scratch) {
                Ok(header) => {
                    if winner.as_ref().map_or(true, |(n, _)| header.filenum > *n) {
                        winner = Some((header.filenum, path));
                    }
                }
                Err(err) => {
                    warn!("could not decode journal candidate (skipping): {err}");
                    continue;
                }
            }
        }

        match winner {
            None => self.create_file(0, None).await?,
            Some((filenum, path)) => {
                let fd = open_file::<I>(path, OpenOptions::new().read(true).write(true)).await?;
                let stat = stat_file::<I>(fd).await?;
                self.fd = Some(fd);
                self.filenum = filenum;
                self.write_offset = JOURNAL_HEADER_SIZE as u64;

                if stat.stx_size() > JOURNAL_HEADER_SIZE as u64 {
                    self.replay(stat.stx_size()).await?;
                }
            }
        }

        Ok(())
    }

    pub async fn new(
        dir: PathBuf,
        config: JournalConfig,
    ) -> Result<(JournalHandle<T>, JoinHandle<()>), JournalError> {
        let data = Rc::new(RefCell::new(T::initial()));

        let mut instance = Self {
            dir,
            data: data.clone(),
            config,
            fd: None,
            filenum: 0,
            write_offset: 0,
            rotation_threshold: 0,
        };

        instance.init().await?;

        let (tx, mut rx) = bounded(256);

        let journal_handle = JournalHandle { tx, data };

        let join_handle = spawn(async move {
            loop {
                match rx.recv().await {
                    Ok(msg) => {
                        let result = instance.append(msg.edit).await;
                        if let Err(err) = &result {
                            error!("failed to append edit: {err}");
                            break;
                        }
                        let _ = msg.tx.send(result);
                    }
                    Err(_) => break,
                }
            }

            if let Err(err) = instance.close().await {
                error!("failed to close journal: {err}");
            }
        });

        Ok((journal_handle, join_handle))
    }
}