ghee 0.6.1

That thin layer of data change management over the filesystem
Documentation
use anyhow::Result;
use ghee_lang::{Key, Predicate, Value};
use thiserror::Error;

use crate::{paths::PathBufExt, table_info, walk::walk_records, xattr_values, Record};

use std::{collections::BTreeMap, fs::remove_file, path::PathBuf};

#[derive(Error, Debug)]
pub enum DelErr {
    #[error("Table info not found at {0}")]
    TableInfoNotFound(PathBuf),
    #[error("An IO error occurred: {0}")]
    IoError(std::io::Error),
    #[error(
        "Either where clauses or values for the table's primary key should be provided, not both"
    )]
    WhereClausesOrKeyValuesNotBoth,
    #[error(
        "Incorrect number of primary key values provided; expected {expected} but got {provided}"
    )]
    WrongNumberOfPrimaryKeyValues { expected: usize, provided: usize },
}

/// Delete all instantiations of a record across tables
/// If `record` is not provided it will be loaded from `record_path`
///
/// `indices` should have absolute paths
fn unlink_record(
    indices: &BTreeMap<Key, PathBuf>,
    record_path: &PathBuf,
    record: Option<&Record>,
    verbose: bool,
) -> Result<()> {
    debug_assert!(!indices.is_empty());

    debug_assert!(indices.values().all(|p| p.is_absolute()));

    // Owned copy of record xattr values if we are loading them
    let loaded_xattr_values: Option<Record> = if record.is_some() {
        None
    } else {
        Some(xattr_values(record_path)?)
    };

    let record = record.unwrap_or_else(|| loaded_xattr_values.as_ref().unwrap());

    for (key, path) in indices {
        let key_value = key.value_for_record(&record)?;

        let record_path = {
            let mut path = path.clone();
            for value in key_value {
                path.push(value.to_string());
            }
            path
        };

        remove_file(&record_path).map_err(DelErr::IoError)?;

        if verbose {
            let record_path_display = record_path.relative_to_curdir_if_possible();
            eprintln!("Removed {}", record_path_display.display());
        }
    }

    Ok(())
}

/** Delete records from the table at path `table_path`.
 *
 * Qualifying records can be specified using either `where_` predicates or values for the
 * table's primary key; otherwise all records are deleted.
 *
 * The records will additionally be removed from all other indices of the table.
 */
pub fn del(
    table_path: &PathBuf,
    where_: &Vec<Predicate>,
    key: &Vec<Value>,
    verbose: bool,
) -> Result<()> {
    if !(where_.is_empty() ^ key.is_empty()) {
        return Err(DelErr::WhereClausesOrKeyValuesNotBoth.into());
    }

    if key.is_empty() {
        debug_assert!(!where_.is_empty());

        walk_records(&table_path, where_, true, false, true, false, &|record| {
            if record.path.is_file() {
                let table_info = record
                    .table_info
                    .ok_or_else(|| DelErr::TableInfoNotFound(record.path.clone()))?;

                unlink_record(
                    table_info.indices_abs(),
                    &record.path,
                    Some(&record.xattr_values),
                    verbose,
                )
            } else {
                Ok(())
            }
        })?;
    } else {
        let info = table_info(&table_path)?.ok_or(DelErr::TableInfoNotFound(table_path.clone()))?;

        if info.key().subkeys.len() != key.len() {
            return Err(DelErr::WrongNumberOfPrimaryKeyValues {
                expected: info.key().subkeys.len(),
                provided: key.len(),
            }
            .into());
        }

        // An individual record is uniquely identified by the provided primary key values
        let record_path = {
            let mut path = table_path.clone();

            for subkey_value in key {
                path.push(subkey_value.to_string());
            }

            path
        };

        debug_assert!(record_path.exists());

        let table_info = table_info(&table_path)?
            .ok_or_else(|| DelErr::TableInfoNotFound(table_path.clone()))?;

        unlink_record(table_info.indices_abs(), &record_path, None, verbose)?;
    }

    Ok(())
}

#[cfg(test)]
mod test {
    use std::{cell::Cell, rc::Rc};

    use ghee_lang::{parse_predicate, Value};

    use crate::{
        record_count, table_info, test_support::Scenario, walk::walk_records, xattr_values,
    };

    use super::{del, unlink_record};

    #[test]
    fn test_del_dir1_default_key() {
        let s = Scenario::new("del-dir1-default-key");
        {
            let dir1count = record_count(&s.dir1);
            let dir2count = record_count(&s.dir2);
            assert_eq!(dir1count, 2);
            assert_eq!(dir2count, 2);
        }

        {
            let indices1 = table_info(&s.dir1).unwrap().unwrap();
            assert_eq!(indices1.indices_abs().len(), 2);

            let indices2 = table_info(&s.dir2).unwrap().unwrap();
            assert_eq!(indices2.indices_abs().len(), 2);
        }

        del(&s.dir1, &vec![], &vec![Value::Number(0f64)], true).unwrap();

        {
            let dir1count = record_count(&s.dir1);
            let dir2count = record_count(&s.dir2);
            assert_eq!(dir1count, 1);
            assert_eq!(dir2count, 1);
        }
    }

    #[test]
    fn test_del_dir1_bogus_key() {
        let s = Scenario::new("del-dir1-bogus-key");

        del(&s.dir1, &vec![], &vec![], false).expect_err("failed due to too few key components");

        del(
            &s.dir1,
            &vec![],
            &vec![Value::String("abc".into()), Value::Number(-10f64)],
            false,
        )
        .expect_err("failed due to too many key components");
    }

    #[test]
    fn test_del_dir2_default_key() {
        let s = Scenario::new("del-dir2-default-key");
        {
            let dir1count = record_count(&s.dir1);
            let dir2count = record_count(&s.dir2);
            assert_eq!(dir1count, 2);
            assert_eq!(dir2count, 2);
        }

        del(
            &s.dir2,
            &vec![],
            &vec![Value::Number(1f64), Value::Number(2f64)],
            true,
        )
        .unwrap();

        {
            let dir1count = record_count(&s.dir1);
            let dir2count = record_count(&s.dir2);
            assert_eq!(dir1count, 1);
            assert_eq!(dir2count, 1);
        }
    }

    #[test]
    fn test_del_dir1_predicate() {
        let s = Scenario::new("del-dir1-predicate");

        let preds = vec![
            parse_predicate(b"test2=1").unwrap().1,
            parse_predicate(b"test3=2").unwrap().1,
        ];

        {
            let dir1predcount = Rc::new(Cell::new(0));

            walk_records(&s.dir1, &preds, true, false, true, false, &|_| {
                dir1predcount.set(dir1predcount.get() + 1);
                Ok(())
            })
            .unwrap();

            assert_eq!(dir1predcount.get(), 1);
        }

        {
            let dir2predcount = Rc::new(Cell::new(0));

            walk_records(&s.dir2, &preds, true, false, true, false, &|_| {
                dir2predcount.set(dir2predcount.get() + 1);
                Ok(())
            })
            .unwrap();

            assert_eq!(dir2predcount.get(), 1);
        }

        del(&s.dir1, &preds, &vec![], true).unwrap();

        {
            let dir1count = record_count(&s.dir1);
            let dir2count = record_count(&s.dir2);
            assert_eq!(dir1count, 1);
            assert_eq!(dir2count, 1);
        }
    }

    #[test]
    fn test_del_dir2_predicate() {
        let s = Scenario::new("del-dir2-predicate");
        // Delete from dir2 using non-default predicate
        {
            let dir1count = record_count(&s.dir1);
            let dir2count = record_count(&s.dir2);
            assert_eq!(dir1count, 2);
            assert_eq!(dir2count, 2);
        }

        del(
            &s.dir1,
            &vec![parse_predicate(b"test1=0").unwrap().1],
            &vec![],
            true,
        )
        .unwrap();

        {
            let dir1count = record_count(&s.dir1);
            let dir2count = record_count(&s.dir2);
            assert_eq!(dir1count, 1);
            assert_eq!(dir2count, 1);
        }
    }

    #[test]
    fn test_del_unconstrained() {
        let s = Scenario::new("del-dir-unconstrained");

        del(&s.dir1, &vec![], &vec![], true)
            .expect_err("del should choke when no predicates or key values are provided");
    }

    #[test]
    fn test_del_overconstrained() {
        let s = Scenario::new("del-overconstrained");

        del(
            &s.dir1,
            &vec![parse_predicate(b"test1=0").unwrap().1],
            &vec![Value::Number(0f64)],
            true,
        )
        .expect_err("del should choke when both predicates and key values are provided");
    }

    #[test]
    fn test_unlink_record1() {
        let s = Scenario::new("ghee-test-del-unlink-record");

        let info = table_info(&s.dir1).unwrap().unwrap();

        let path = &s.dir1path1;

        let record = xattr_values(path).unwrap();

        for path in vec![&s.dir1path1, &s.dir1path2, &s.dir2path1, &s.dir2path2] {
            assert!(path.exists());
        }

        unlink_record(info.indices_abs(), path, Some(&record), false).unwrap();

        assert!(!s.dir1path1.exists());
        assert!(s.dir1path2.exists());
        assert!(!s.dir2path1.exists());
        assert!(s.dir2path2.exists());
    }
    #[test]
    fn test_unlink_record2() {
        let s = Scenario::new("ghee-test-del-unlink-record");

        let info = table_info(&s.dir1).unwrap().unwrap();

        let path = &s.dir1path2;

        let record = xattr_values(path).unwrap();

        for path in vec![&s.dir1path1, &s.dir1path2, &s.dir2path1, &s.dir2path2] {
            assert!(path.exists());
        }

        unlink_record(info.indices_abs(), path, Some(&record), false).unwrap();

        assert!(s.dir1path1.exists());
        assert!(!s.dir1path2.exists());
        assert!(s.dir2path1.exists());
        assert!(!s.dir2path2.exists());
    }
    #[test]
    fn test_unlink_record3() {
        let s = Scenario::new("ghee-test-del-unlink-record");

        let info = table_info(&s.dir1).unwrap().unwrap();

        let path = &s.dir2path1;

        let record = xattr_values(path).unwrap();

        for path in vec![&s.dir1path1, &s.dir1path2, &s.dir2path1, &s.dir2path2] {
            assert!(path.exists());
        }

        unlink_record(info.indices_abs(), path, Some(&record), false).unwrap();

        assert!(!s.dir1path1.exists());
        assert!(s.dir1path2.exists());
        assert!(!s.dir2path1.exists());
        assert!(s.dir2path2.exists());
    }
    #[test]
    fn test_unlink_record4() {
        let s = Scenario::new("ghee-test-del-unlink-record");

        let info = table_info(&s.dir1).unwrap().unwrap();

        let path = &s.dir2path2;

        let record = xattr_values(path).unwrap();

        for path in vec![&s.dir1path1, &s.dir1path2, &s.dir2path1, &s.dir2path2] {
            assert!(path.exists());
        }

        unlink_record(info.indices_abs(), path, Some(&record), false).unwrap();

        assert!(s.dir1path1.exists());
        assert!(!s.dir1path2.exists());
        assert!(s.dir2path1.exists());
        assert!(!s.dir2path2.exists());
    }
}