trine-kv 0.2.0

Embedded LSM MVCC key-value database.
Documentation
use std::{
    path::{Path, PathBuf},
    sync::{Arc, mpsc},
};

use crate::{
    error::{Error, Result},
    options::DurabilityMode,
    storage::StorageReadBuffer,
};

use super::{PlatformIoBackendMatrix, PlatformIoTask};

#[cfg(target_os = "linux")]
mod linux_backend;
#[cfg(all(unix, not(target_os = "linux")))]
mod unix_backend;
#[cfg(not(any(target_os = "linux", windows, unix)))]
mod unsupported_backend;
#[cfg(windows)]
mod windows_backend;

pub(super) fn matrix() -> PlatformIoBackendMatrix {
    #[cfg(target_os = "linux")]
    {
        linux_backend::matrix()
    }
    #[cfg(windows)]
    {
        windows_backend::matrix()
    }
    #[cfg(all(unix, not(target_os = "linux")))]
    {
        unix_backend::matrix()
    }
    #[cfg(not(any(target_os = "linux", windows, unix)))]
    {
        unsupported_backend::matrix()
    }
}

pub(super) fn run_worker(receiver: mpsc::Receiver<PlatformIoTask>) {
    let runtime = match compio::runtime::Runtime::new() {
        Ok(runtime) => runtime,
        Err(error) => {
            let message = format!("platform I/O runtime failed to start: {error}");
            for task in receiver {
                task.complete_start_error(&message);
            }
            return;
        }
    };

    for task in receiver {
        runtime.block_on(task.run());
    }
}

pub(super) async fn len(path: PathBuf) -> Result<u64> {
    let file = compio::fs::File::open(path).await.map_err(Error::Io)?;
    let metadata = file.metadata().await.map_err(Error::Io)?;
    Ok(metadata.len())
}

pub(super) async fn read_exact_at_owned(
    path: PathBuf,
    offset: usize,
    len: usize,
) -> Result<StorageReadBuffer> {
    use compio::io::AsyncReadAtExt;

    let file = compio::fs::File::open(path).await.map_err(Error::Io)?;
    let buffer = vec![0; len];
    let compio::buf::BufResult(result, buffer) =
        file.read_exact_at(buffer, platform_offset(offset)?).await;
    result.map_err(Error::Io)?;
    Ok(StorageReadBuffer::from_vec(offset, buffer))
}

pub(super) async fn read_optional(path: PathBuf) -> Result<Option<Arc<[u8]>>> {
    match compio::fs::read(path).await {
        Ok(bytes) => Ok(Some(Arc::from(bytes))),
        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
        Err(error) => Err(Error::Io(error)),
    }
}

pub(super) async fn write_temp_rename(
    path: PathBuf,
    tmp_path: PathBuf,
    bytes: Arc<[u8]>,
    durability: DurabilityMode,
    create_parent: bool,
    sync_parent_on_sync_all: bool,
) -> Result<()> {
    use compio::io::AsyncWriteAtExt;

    if create_parent {
        if let Some(parent) = tmp_path.parent() {
            compio::fs::create_dir_all(parent)
                .await
                .map_err(Error::Io)?;
        }
    }

    let mut file = compio::fs::File::create(&tmp_path)
        .await
        .map_err(Error::Io)?;
    let compio::buf::BufResult(result, _buffer) = file.write_all_at(bytes.to_vec(), 0).await;
    result.map_err(Error::Io)?;
    persist_published_file(&file, durability).await?;
    file.close().await.map_err(Error::Io)?;
    compio::fs::rename(&tmp_path, &path)
        .await
        .map_err(Error::Io)?;
    if sync_parent_on_sync_all && durability == DurabilityMode::SyncAll {
        sync_parent_directory(&path).await?;
    }
    Ok(())
}

pub(super) async fn open_append(path: PathBuf) -> Result<()> {
    if let Some(parent) = path.parent() {
        compio::fs::create_dir_all(parent)
            .await
            .map_err(Error::Io)?;
    }

    let mut options = compio::fs::OpenOptions::new();
    options.write(true).create(true);
    let file = options.open(path).await.map_err(Error::Io)?;
    file.close().await.map_err(Error::Io)
}

pub(super) async fn append(
    path: PathBuf,
    bytes: Arc<[u8]>,
    durability: DurabilityMode,
) -> Result<()> {
    use compio::io::AsyncWriteAtExt;

    let mut options = compio::fs::OpenOptions::new();
    options.write(true).create(true);
    let mut file = options.open(&path).await.map_err(Error::Io)?;
    let offset = match compio::fs::metadata(&path).await {
        Ok(metadata) => metadata.len(),
        Err(error) if error.kind() == std::io::ErrorKind::NotFound => 0,
        Err(error) => return Err(Error::Io(error)),
    };
    let compio::buf::BufResult(result, _buffer) = file.write_all_at(bytes.to_vec(), offset).await;
    result.map_err(Error::Io)?;
    persist_wal_file(&file, durability).await?;
    file.close().await.map_err(Error::Io)
}

pub(super) async fn persist_path(path: PathBuf, durability: DurabilityMode) -> Result<()> {
    let mut options = compio::fs::OpenOptions::new();
    options.write(true);
    let file = options.open(path).await.map_err(Error::Io)?;
    persist_wal_file(&file, durability).await?;
    file.close().await.map_err(Error::Io)
}

pub(super) async fn delete_path(path: PathBuf) -> Result<()> {
    match compio::fs::remove_file(path).await {
        Ok(()) => Ok(()),
        Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
        Err(error) => Err(Error::Io(error)),
    }
}

pub(super) async fn create_dir_all(path: PathBuf) -> Result<()> {
    compio::fs::create_dir_all(path).await.map_err(Error::Io)
}

pub(super) async fn list_file_paths(path: PathBuf) -> Result<Vec<PathBuf>> {
    compio::runtime::spawn_blocking(move || list_file_paths_blocking(&path))
        .await
        .unwrap_or_else(|_| {
            Err(Error::runtime_busy(
                "platform directory listing fallback panicked",
            ))
        })
}

fn list_file_paths_blocking(path: &Path) -> Result<Vec<PathBuf>> {
    let mut paths = Vec::new();
    for entry in std::fs::read_dir(path)? {
        let entry = entry?;
        if entry.file_type()?.is_file() {
            paths.push(entry.path());
        }
    }
    paths.sort_unstable();
    Ok(paths)
}

pub(super) async fn acquire_writer_lease(path: PathBuf, owner: Arc<[u8]>) -> Result<()> {
    use compio::io::AsyncWriteAtExt;

    if let Some(parent) = path.parent() {
        compio::fs::create_dir_all(parent)
            .await
            .map_err(Error::Io)?;
    }

    let mut options = compio::fs::OpenOptions::new();
    options.write(true).create_new(true);
    let mut file = match options.open(&path).await {
        Ok(file) => file,
        Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
            return Err(Error::Corruption {
                message: format!("database lock is already held: {}", path.display()),
            });
        }
        Err(error) => return Err(Error::Io(error)),
    };

    let compio::buf::BufResult(result, _buffer) = file.write_all_at(owner.to_vec(), 0).await;
    if let Err(error) = result {
        let _ = compio::fs::remove_file(&path).await;
        return Err(Error::Io(error));
    }
    if let Err(error) = file.sync_all().await {
        let _ = compio::fs::remove_file(&path).await;
        return Err(Error::Io(error));
    }
    if let Err(error) = file.close().await {
        let _ = compio::fs::remove_file(&path).await;
        return Err(Error::Io(error));
    }
    Ok(())
}

async fn persist_wal_file(file: &compio::fs::File, durability: DurabilityMode) -> Result<()> {
    match durability {
        DurabilityMode::Buffered | DurabilityMode::Flush => Ok(()),
        DurabilityMode::SyncData => file.sync_data().await.map_err(Error::Io),
        DurabilityMode::SyncAll => file.sync_all().await.map_err(Error::Io),
    }
}

async fn persist_published_file(file: &compio::fs::File, durability: DurabilityMode) -> Result<()> {
    match durability {
        DurabilityMode::Buffered => Ok(()),
        DurabilityMode::Flush | DurabilityMode::SyncData => {
            file.sync_data().await.map_err(Error::Io)
        }
        DurabilityMode::SyncAll => file.sync_all().await.map_err(Error::Io),
    }
}

async fn sync_parent_directory(path: &Path) -> Result<()> {
    let Some(parent) = path
        .parent()
        .filter(|parent| !parent.as_os_str().is_empty())
    else {
        return Ok(());
    };

    sync_directory(parent.to_path_buf()).await
}

#[cfg(unix)]
pub(super) async fn sync_directory(path: PathBuf) -> Result<()> {
    let file = compio::fs::File::open(path).await.map_err(Error::Io)?;
    file.sync_all().await.map_err(Error::Io)?;
    file.close().await.map_err(Error::Io)
}

#[cfg(windows)]
pub(super) async fn sync_directory(path: PathBuf) -> Result<()> {
    const FILE_FLAG_BACKUP_SEMANTICS: u32 = 0x0200_0000;
    const FILE_SHARE_READ: u32 = 0x0000_0001;
    const FILE_SHARE_WRITE: u32 = 0x0000_0002;
    const FILE_SHARE_DELETE: u32 = 0x0000_0004;

    let mut options = compio::fs::OpenOptions::new();
    options
        .read(true)
        .share_mode(FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE)
        .custom_flags(FILE_FLAG_BACKUP_SEMANTICS);
    let file = options.open(path).await.map_err(Error::Io)?;
    file.sync_all().await.map_err(Error::Io)?;
    file.close().await.map_err(Error::Io)
}

#[cfg(not(any(unix, windows)))]
pub(super) async fn sync_directory(_path: PathBuf) -> Result<()> {
    Ok(())
}

fn platform_offset(offset: usize) -> Result<u64> {
    u64::try_from(offset).map_err(|_| Error::invalid_options("platform I/O offset overflow"))
}