mod c_api;
pub mod clonefile;
mod cpathbuf;
mod error;
mod exclusive_file;
pub mod flock;
mod handle;
mod item;
mod owned_cell;
pub mod pathconf;
pub mod punchfile;
pub mod seekhole;
pub mod testing;
#[cfg(test)]
mod tests;
pub mod walk;
use std::cmp::{max, min};
use std::collections::{hash_map, HashMap, HashSet};
use std::ffi::{OsStr, OsString};
use std::fmt::{Debug, Display, Formatter};
use std::fs::{read_dir, remove_dir, remove_file, File, OpenOptions};
use std::io::SeekFrom::{End, Start};
use std::io::{ErrorKind, Read, Seek, Write};
use std::num::TryFromIntError;
use std::ops::{Deref, DerefMut};
use std::os::fd::{AsRawFd, RawFd};
use std::os::unix::ffi::{OsStrExt, OsStringExt};
use std::path::{Path, PathBuf};
use std::str;
use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
use std::time::Duration;
use std::{fs, io};
use anyhow::Result;
use anyhow::{bail, Context};
use chrono::NaiveDateTime;
use clonefile::clonefile;
use cpathbuf::CPathBuf;
pub use error::Error;
use exclusive_file::ExclusiveFile;
pub use handle::Handle;
use log::{debug, warn};
use memmap2::Mmap;
use nix::fcntl::FlockArg::{LockExclusiveNonblock, LockSharedNonblock};
use num::Integer;
use positioned_io::ReadAt;
use rand::Rng;
use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSqlOutput, ValueRef};
use rusqlite::Error::QueryReturnedNoRows;
use rusqlite::{params, CachedStatement, Connection, Statement};
use tempfile::TempDir;
#[cfg(test)]
pub use test_log::test;
pub use walk::Entry as WalkEntry;
use ErrorKind::InvalidInput;
use crate::clonefile::fclonefile;
use crate::flock::*;
use crate::item::Item;
use crate::punchfile::punchfile;
use crate::seekhole::seek_hole_whence;
use crate::walk::walk_dir;
pub type PubResult<T> = Result<T, Error>;
#[derive(Debug)]
struct FileClone {
    file: File,
    #[allow(unused)]
    tempdir: Arc<TempDir>,
    mmap: Option<Mmap>,
    len: u64,
}
type FileCloneCache = HashMap<FileId, Arc<Mutex<FileClone>>>;
impl FileClone {
    fn get_mmap(&mut self) -> io::Result<&Mmap> {
        let mmap_opt = &mut self.mmap;
        if let Some(mmap) = mmap_opt {
            return Ok(mmap);
        }
        let mmap = unsafe { Mmap::map(self.file.as_raw_fd()) }?;
        Ok(mmap_opt.insert(mmap))
    }
}
struct PendingWrite {
    key: Vec<u8>,
    value_file_offset: u64,
    value_length: u64,
    value_file_id: FileId,
}
const MANIFEST_SCHEMA_SQL: &str = include_str!("../manifest.sql");
fn init_manifest_schema(conn: &rusqlite::Connection) -> rusqlite::Result<()> {
    conn.execute_batch(MANIFEST_SCHEMA_SQL)
}
pub struct BeginWriteValue<'writer, 'handle> {
    batch: &'writer mut BatchWriter<'handle>,
}
impl BeginWriteValue<'_, '_> {
    pub fn clone_fd(self, fd: RawFd, _flags: u32) -> Result<ValueWriter> {
        let dst_path = loop {
            let dst_path = random_file_name_in_dir(&self.batch.handle.dir);
            match fclonefile(fd, &dst_path, 0) {
                Err(err) if err.kind() == ErrorKind::AlreadyExists => continue,
                Err(err) => return Err(err.into()),
                Ok(()) => break dst_path,
            }
        };
        let exclusive_file = ExclusiveFile::open(dst_path)?;
        Ok(ValueWriter {
            exclusive_file,
            value_file_offset: 0,
        })
    }
    pub fn begin(self) -> PubResult<ValueWriter> {
        let mut exclusive_file = self.batch.get_exclusive_file()?;
        Ok(ValueWriter {
            value_file_offset: exclusive_file.next_write_offset()?,
            exclusive_file,
        })
    }
}
#[derive(Debug)]
pub struct ValueWriter {
    exclusive_file: ExclusiveFile,
    value_file_offset: u64,
}
impl ValueWriter {
    pub fn get_file(&mut self) -> Result<&mut File> {
        Ok(&mut self.exclusive_file.inner)
    }
    pub fn copy_from(&mut self, mut value: impl Read) -> Result<u64> {
        let value_file_offset = self.exclusive_file.next_write_offset()?;
        let value_length = match std::io::copy(&mut value, &mut self.exclusive_file.inner) {
            Ok(ok) => ok,
            Err(err) => {
                self.exclusive_file
                    .inner
                    .seek(Start(value_file_offset))
                    .expect("should rewind failed copy");
                return Err(err.into());
            }
        };
        Ok(value_length)
    }
    pub fn value_length(&mut self) -> io::Result<u64> {
        Ok(self.exclusive_file.next_write_offset()? - self.value_file_offset)
    }
}
impl Write for ValueWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.exclusive_file.inner.write(buf)
    }
    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}
pub struct BatchWriter<'a> {
    handle: &'a Handle,
    exclusive_files: Vec<ExclusiveFile>,
    pending_writes: Vec<PendingWrite>,
}
pub type TimestampInner = NaiveDateTime;
#[derive(Debug, PartialEq, Copy, Clone, PartialOrd)]
pub struct Timestamp(TimestampInner);
impl FromSql for Timestamp {
    fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
        let int_time = value.as_i64()?;
        Ok(Self(
            TimestampInner::from_timestamp_millis(int_time)
                .ok_or(FromSqlError::OutOfRange(int_time))?,
        ))
    }
}
pub const LAST_USED_RESOLUTION: Duration = Duration::from_millis(1);
impl Deref for Timestamp {
    type Target = TimestampInner;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}
pub struct WriteCommitResult {
    count: usize,
}
impl WriteCommitResult {
    pub fn count(&self) -> usize {
        self.count
    }
}
const VALUE_COLUMN_NAMES: &[&str] = &["file_id", "file_offset", "value_length", "last_used"];
fn value_columns_sql() -> &'static str {
    static ONCE: OnceLock<String> = OnceLock::new();
    ONCE.get_or_init(|| VALUE_COLUMN_NAMES.join(", ")).as_str()
}
impl<'handle> BatchWriter<'handle> {
    fn get_exclusive_file(&mut self) -> Result<ExclusiveFile> {
        if let Some(ef) = self.exclusive_files.pop() {
            debug!("reusing exclusive file from writer");
            return Ok(ef);
        }
        self.handle.get_exclusive_file()
    }
    pub fn stage_write(&mut self, key: Vec<u8>, mut value: ValueWriter) -> anyhow::Result<()> {
        self.pending_writes.push(PendingWrite {
            key,
            value_file_offset: value.value_file_offset,
            value_length: value.value_length()?,
            value_file_id: value.exclusive_file.id.clone(),
        });
        debug!(
            "pushing exclusive file {} into writer",
            value.exclusive_file.id
        );
        self.exclusive_files.push(value.exclusive_file);
        Ok(())
    }
    pub fn new_value<'writer>(&'writer mut self) -> BeginWriteValue<'writer, 'handle> {
        BeginWriteValue { batch: self }
    }
    pub fn commit(self) -> Result<WriteCommitResult> {
        self.commit_inner(|| {})
    }
    fn commit_inner(mut self, before_write: impl Fn()) -> Result<WriteCommitResult> {
        let mut transaction: OwnedTx = self.handle.start_immediate_transaction()?;
        let mut write_commit_res = WriteCommitResult { count: 0 };
        for pw in self.pending_writes.drain(..) {
            before_write();
            transaction.delete_key(&pw.key)?;
            transaction.insert_key(pw)?;
            write_commit_res.count += 1;
        }
        let work = transaction
            .commit(write_commit_res)
            .context("commit transaction")?;
        self.flush_exclusive_files();
        work.complete()
    }
    fn flush_exclusive_files(&mut self) {
        let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap();
        for mut ef in self.exclusive_files.drain(..) {
            ef.committed().unwrap();
            debug!("returning exclusive file {} to handle", ef.id);
            assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none());
        }
        }
}
impl Drop for BatchWriter<'_> {
    fn drop(&mut self) {
        let mut handle_exclusive_files = self.handle.exclusive_files.lock().unwrap();
        for ef in self.exclusive_files.drain(..) {
            assert!(handle_exclusive_files.insert(ef.id.clone(), ef).is_none());
        }
        }
}
#[derive(Debug, Clone)]
pub struct Value {
    file_id: FileId,
    file_offset: u64,
    length: u64,
    last_used: Timestamp,
}
impl Value {
    fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Self> {
        let file_id: FileId = row.get(0)?;
        Ok(Value {
            file_id,
            file_offset: row.get(1)?,
            length: row.get(2)?,
            last_used: row.get(3)?,
        })
    }
    pub fn file_offset(&self) -> u64 {
        self.file_offset
    }
    pub fn length(&self) -> u64 {
        self.length
    }
    pub fn last_used(&self) -> Timestamp {
        self.last_used
    }
}
impl AsRef<Value> for Value {
    fn as_ref(&self) -> &Value {
        self
    }
}
impl AsMut<Snapshot> for Snapshot {
    fn as_mut(&mut self) -> &mut Snapshot {
        self
    }
}
impl AsRef<Snapshot> for Snapshot {
    fn as_ref(&self) -> &Self {
        self
    }
}
#[derive(Debug)]
pub struct Snapshot {
    file_clones: HashMap<FileId, Arc<Mutex<FileClone>>>,
}
#[derive(Debug)]
pub struct SnapshotValue<V> {
    value: V,
    cloned_file: Arc<Mutex<FileClone>>,
}
impl<V> Deref for SnapshotValue<V> {
    type Target = V;
    fn deref(&self) -> &Self::Target {
        &self.value
    }
}
impl Snapshot {
    pub fn value<V>(&self, value: V) -> SnapshotValue<V>
    where
        V: AsRef<Value>,
    {
        SnapshotValue {
            cloned_file: Arc::clone(self.file_clones.get(&value.as_ref().file_id).unwrap()),
            value,
        }
    }
}
impl<V> ReadAt for SnapshotValue<V>
where
    V: AsRef<Value>,
{
    fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
        let n = self.view(|view| {
            let r = view;
            r.read_at(pos, buf)
        })??;
        Ok(n)
    }
}
impl<V> SnapshotValue<V>
where
    V: AsRef<Value>,
{
    fn file_clone(&self) -> &Arc<Mutex<FileClone>> {
        &self.cloned_file
    }
    pub fn view<R>(&self, f: impl FnOnce(&[u8]) -> R) -> io::Result<R> {
        let value = self.value.as_ref();
        let file_clone = self.file_clone();
        let start = to_usize_io(value.file_offset)?;
        let usize_length = to_usize_io(value.length)?;
        let end = usize::checked_add(start, usize_length).ok_or_else(make_to_usize_io_error)?;
        let mut mutex_guard = file_clone.lock().unwrap();
        let mmap = mutex_guard.get_mmap()?;
        Ok(f(&mmap[start..end]))
    }
    pub fn read(&self, mut buf: &mut [u8]) -> Result<usize> {
        let value = self.value.as_ref();
        buf = buf
            .split_at_mut(min(buf.len() as u64, value.length) as usize)
            .0;
        let mut file_clone = self.file_clone().lock().unwrap();
        let file = &mut file_clone.file;
        file.seek(Start(value.file_offset))?;
        file.read(buf).map_err(Into::into)
    }
    pub fn new_reader(&self) -> impl Read + '_ {
        positioned_io::Cursor::new(self)
    }
    pub fn leak_snapshot_dir(&self) {
        std::mem::forget(Arc::clone(&self.file_clone().lock().unwrap().tempdir))
    }
}
mod ownedtx;
use ownedtx::OwnedTx;
pub struct FileValues<'a, S> {
    stmt: S,
    file_id: &'a FileIdFancy,
}
impl<'a, S> FileValues<'a, S>
where
    S: Deref<Target = Statement<'a>> + DerefMut + 'a,
{
    pub fn begin(
        &mut self,
    ) -> rusqlite::Result<impl Iterator<Item = rusqlite::Result<Value>> + '_> {
        self.stmt.query_map([self.file_id], Value::from_row)
    }
}
type ReaderFiles = HashMap<FileId, u64>;
pub struct Reader<'handle> {
    owned_tx: OwnedTx<'handle>,
    handle: &'handle Handle,
    files: ReaderFiles,
}
impl<'a> Reader<'a> {
    pub fn add(&mut self, key: &[u8]) -> rusqlite::Result<Option<Value>> {
        let res = self.owned_tx.touch_for_read(key);
        match res {
            Ok(value) => {
                let Value {
                    file_offset,
                    length,
                    ..
                } = value;
                let file = self.files.entry(value.file_id.clone());
                let value_end = file_offset + length;
                use hash_map::Entry::*;
                match file {
                    Occupied(mut entry) => {
                        let value = entry.get_mut();
                        *value = max(*value, value_end);
                    }
                    Vacant(entry) => {
                        entry.insert(value_end);
                    }
                };
                Ok(Some(value))
            }
            Err(QueryReturnedNoRows) => Ok(None),
            Err(err) => Err(err),
        }
    }
    pub fn begin(self) -> Result<Snapshot> {
        let file_clones = Self::clone_files(self.handle, self.files)?;
        self.owned_tx
            .commit(())
            .context("committing transaction")?
            .complete()?;
        Ok(Snapshot { file_clones })
    }
    fn clone_files(handle: &Handle, files: ReaderFiles) -> Result<FileCloneCache> {
        let mut tempdir = None;
        let mut file_clones: FileCloneCache = Default::default();
        let mut handle_clone_guard = handle.clones.lock().unwrap();
        let handle_clones = handle_clone_guard.deref_mut();
        for (file_id, min_len) in files {
            file_clones.insert(
                file_id.clone(),
                Self::get_file_clone(file_id, &mut tempdir, handle_clones, &handle.dir, min_len)
                    .context("getting file clone")?,
            );
        }
        Ok(file_clones)
    }
    pub fn list_items(&self, prefix: &[u8]) -> PubResult<Vec<Item>> {
        self.owned_tx.read().list_items(prefix)
    }
    fn get_file_clone(
        file_id: FileId,
        tempdir: &mut Option<Arc<TempDir>>,
        cache: &mut FileCloneCache,
        src_dir: &Path,
        min_len: u64,
    ) -> Result<Arc<Mutex<FileClone>>> {
        if let Some(ret) = cache.get(&file_id) {
            let file_clone_guard = ret.lock().unwrap();
            if file_clone_guard.len >= min_len {
                return Ok(ret.clone());
            }
        }
        let tempdir: &Arc<TempDir> = match tempdir {
            Some(tempdir) => tempdir,
            None => {
                let mut builder = tempfile::Builder::new();
                builder.prefix(SNAPSHOT_DIR_NAME_PREFIX);
                let new = Arc::new(builder.tempdir_in(src_dir)?);
                *tempdir = Some(new);
                tempdir.as_ref().unwrap()
            }
        };
        let src_path = file_path(src_dir, &file_id);
        if false {
            let mut src_file = OpenOptions::new().read(true).open(&src_path)?;
            assert!(try_lock_file(&mut src_file, flock::LockSharedNonblock)?);
        }
        let tempdir_path = tempdir.path();
        clonefile(&src_path, &file_path(tempdir_path, &file_id))?;
        let mut file = open_file_id(OpenOptions::new().read(true), tempdir_path, &file_id)
            .context("opening value file")?;
        let locked = try_lock_file(&mut file, LockSharedNonblock)?;
        assert!(locked);
        let len = file.seek(End(0))?;
        let file_clone = Arc::new(Mutex::new(FileClone {
            file,
            tempdir: tempdir.clone(),
            mmap: None,
            len,
        }));
        cache.insert(file_id, file_clone.clone());
        Ok(file_clone)
    }
}
#[allow(dead_code)]
fn floored_multiple<T>(value: T, multiple: T) -> T
where
    T: Integer + Copy,
{
    multiple * (value / multiple)
}
pub fn ceil_multiple<T>(value: T, multiple: T) -> T
where
    T: Integer + Copy,
{
    (value + multiple - T::one()) / multiple * multiple
}
fn open_file_id(options: &OpenOptions, dir: &Path, file_id: &FileId) -> io::Result<File> {
    options.open(file_path(dir, file_id))
}
fn file_path(dir: &Path, file_id: impl AsRef<FileId>) -> PathBuf {
    dir.join(file_id.as_ref())
}
fn random_file_name_in_dir(dir: &Path) -> PathBuf {
    let base = random_file_name();
    dir.join(base)
}
const FILE_NAME_RAND_LENGTH: usize = 8;
const VALUES_FILE_NAME_PREFIX: &str = "values-";
const SNAPSHOT_DIR_NAME_PREFIX: &str = "snapshot-";
fn random_file_name() -> OsString {
    let mut begin = VALUES_FILE_NAME_PREFIX.as_bytes().to_vec();
    begin.extend(
        rand::thread_rng()
            .sample_iter(rand::distributions::Alphanumeric)
            .take(FILE_NAME_RAND_LENGTH),
    );
    OsString::from_vec(begin)
}
pub const MANIFEST_DB_FILE_NAME: &str = "manifest.db";
fn valid_file_name(file_name: &str) -> bool {
    if file_name.starts_with(MANIFEST_DB_FILE_NAME) {
        return false;
    }
    file_name.starts_with(VALUES_FILE_NAME_PREFIX)
}
mod file_id;
pub mod tx;
use file_id::{FileId, FileIdFancy};
pub use crate::tx::Transaction;
use crate::tx::{PostCommitWork, ReadTransactionOwned};
struct PunchValueConstraints {
    greedy_start: bool,
    check_hole: bool,
    greedy_end: bool,
    allow_truncate: bool,
    allow_remove: bool,
}
impl Default for PunchValueConstraints {
    fn default() -> Self {
        Self {
            greedy_start: true,
            check_hole: true,
            greedy_end: true,
            allow_truncate: true,
            allow_remove: true,
        }
    }
}
struct PunchValueOptions<'a> {
    dir: &'a Path,
    file_id: &'a FileId,
    offset: u64,
    length: u64,
    tx: &'a ReadTransactionOwned<'a>,
    block_size: u64,
    constraints: PunchValueConstraints,
}
fn punch_value(opts: PunchValueOptions) -> Result<()> {
    let PunchValueOptions {
        dir,
        file_id,
        offset,
        length,
        tx,
        block_size,
        constraints:
            PunchValueConstraints {
                greedy_start,
                check_hole: check_holes,
                allow_truncate,
                allow_remove,
                greedy_end,
            },
    } = opts;
    let cloning_lock_aware = false;
    let mut offset = offset as i64;
    let mut length = length as i64;
    let block_size = block_size as i64;
    let file_path = file_path(dir, file_id);
    let mut file = match OpenOptions::new().write(true).open(&file_path) {
        Err(err) if err.kind() == ErrorKind::NotFound && allow_remove => return Ok(()),
        Err(err) => return Err(err).context("opening value file"),
        Ok(ok) => ok,
    };
    if offset % block_size != 0 || greedy_start {
        let last_end_offset = tx.query_last_end_offset(file_id, offset as u64)?;
        let new_offset = ceil_multiple(last_end_offset, block_size as u64) as i64;
        length += offset - new_offset;
        offset = new_offset;
    }
    assert_eq!(offset % block_size, 0);
    if greedy_end {
        let next_offset = tx.next_value_offset(file_id, (offset + length).try_into().unwrap())?;
        let end_offset = match next_offset {
            None => {
                let locked_file = try_lock_file(&mut file, flock::LockExclusiveNonblock)
                    .context("locking value file")?;
                let file_end = file.seek(End(0))? as i64;
                if locked_file {
                    if offset == 0 && allow_remove {
                        remove_file(file_path).context("removing value file")?;
                        return Ok(());
                    } else if allow_truncate {
                        file.set_len(offset as u64)?;
                        return Ok(());
                    }
                    file_end
                } else if cloning_lock_aware {
                    floored_multiple(file_end, block_size)
                } else {
                    floored_multiple(offset + length, block_size)
                }
            }
            Some(next_offset) => floored_multiple(next_offset as i64, block_size),
        };
        let new_length = end_offset - offset;
        length = new_length;
    } else {
        let end_offset = floored_multiple(offset + length, block_size);
        length = end_offset - offset;
    }
    debug!(target: "punching", "punching {} {} for {}", file_id, offset, length);
    if length == 0 {
        return Ok(());
    }
    assert!(length > 0);
    assert_eq!(offset % block_size, 0);
    punchfile(file.as_raw_fd(), offset, length).with_context(|| format!("length {}", length))?;
    if check_holes {
        if let Err(err) = check_hole(&mut file, offset as u64, length as u64) {
            warn!("checking hole: {}", err);
        }
    }
    Ok(())
}
pub fn check_hole(file: &mut File, offset: u64, length: u64) -> Result<()> {
    match seek_hole_whence(file.as_raw_fd(), offset as i64, seekhole::Data)? {
        Some(seek_offset) if seek_offset >= offset + length => Ok(()),
        None => Ok(()),
        otherwise => {
            bail!("punched hole didn't appear: {:?}", otherwise)
        }
    }
}
fn delete_unused_snapshots(dir: &Path) -> Result<()> {
    use walk::EntryType::*;
    for entry in walk_dir(dir).context("walking dir")? {
        match entry.entry_type {
            SnapshotDir => {
                let res = remove_dir(&entry.path);
                debug!("removing snapshot dir {:?}: {:?}", &entry.path, res);
            }
            SnapshotValue => {
                match std::fs::File::open(&entry.path) {
                    Err(err) if err.kind() == ErrorKind::NotFound => {}
                    Err(err) => {
                        return Err(err)
                            .with_context(|| format!("opening snapshot value {:?}", &entry.path))
                    }
                    Ok(mut file) => {
                        if try_lock_file(&mut file, LockExclusiveNonblock)
                            .context("locking snapshot value")?
                        {
                            let res = remove_file(&entry.path);
                            debug!("removing snapshot value file {:?}: {:?}", &entry.path, res);
                            let _ = remove_dir(
                                entry
                                    .path
                                    .parent()
                                    .expect("snapshot values must have a parent dir"),
                            );
                        } else {
                            debug!("not deleting {:?}, still in use", &entry.path);
                        }
                    }
                };
            }
            _ => {}
        }
    }
    Ok(())
}
fn to_usize_io<F>(from: F) -> io::Result<usize>
where
    usize: TryFrom<F, Error = TryFromIntError>,
{
    convert_int_io(from)
}
fn convert_int_io<F, T>(from: F) -> io::Result<T>
where
    T: TryFrom<F, Error = TryFromIntError>,
{
    from.try_into()
        .map_err(|_: TryFromIntError| make_to_usize_io_error())
}
fn make_to_usize_io_error() -> io::Error {
    io::Error::new(TO_USIZE_IO_ERROR_KIND, TO_USIZE_IO_ERR_PAYLOAD)
}
const TO_USIZE_IO_ERROR_KIND: ErrorKind = InvalidInput;
const TO_USIZE_IO_ERR_PAYLOAD: &str = "can't convert to usize";
fn inc_big_endian_array(arr: &mut [u8]) -> bool {
    for e in arr.iter_mut().rev() {
        if *e == u8::MAX {
            *e = 0
        } else {
            *e += 1;
            return true;
        }
    }
    false
}