mycelium_core 0.1.1

Library for Mycelium DDM
Documentation
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::RwLock;

pub(crate) mod history;
mod page;
pub use self::page::{create_page, Page};
use crate::ephemeral::history::History;
use crate::persistence::{load_from_vec, Persistable};
use crate::{NodeId, PageId};
use mycelium_command::node::Node;

use rayon::prelude::*;

/// Primary data structure. Tag, and what it contains. This structure holds the actual data.
pub type Data = RwLock<HashMap<String, Tag>>;

#[derive(Clone, Copy, PartialEq, PartialOrd)]
pub enum TagStatus {
    New,
    Loaded,
    Cleared,
}

///
/// # Primary container
///
/// * Members
///     - history: Lazy container only loads after requests
///     - index: Maps node to page
///     - last_inserted_page: None if new db instance
///         * TODO: pickup pages that are not complete
///     - max_page_size: Configurable from mycelium_config.ron block_count * block_size
///     - status: Default: new
///     - tag: Arbitrarily assigned name for container
///     - working_dir: Directory location for tag. Eventually a location inside an archive.
///
pub struct Tag {
    pub(crate) history: History,
    index: HashMap<NodeId, PageId>,
    last_inserted_page: Option<PageId>,
    max_page_size: usize,
    pages: HashMap<PageId, Page>,
    status: TagStatus,
    tag: String,
    working_dir: std::path::PathBuf,
}

impl Tag {
    /// Page container
    pub(crate) fn new(tag: &str, working_dir: PathBuf, max_page_size: usize) -> Tag {
        let history = crate::ephemeral::history::lazy_head(&working_dir, max_page_size);
        Tag {
            history,
            index: HashMap::new(),
            last_inserted_page: None,
            status: TagStatus::New,
            max_page_size,
            pages: HashMap::new(),
            tag: String::from(tag),
            working_dir,
        }
    }

    pub(crate) fn add_index(&mut self, node_id: NodeId, page_id: PageId) {
        self.index.insert(node_id, page_id);
    }

    pub(crate) fn all_container_nodes(&self) -> Vec<Node> {
        let mut nodes: Vec<Node> = Vec::new();

        for page in self.pages.values() {
            for n in &page.nodes {
                nodes.push(n.clone());
            }
        }

        nodes
    }

    pub(crate) fn all_container_nodes_map(&self) -> HashMap<NodeId, Node> {
        let mut map = HashMap::new();
        for page in self.pages.values() {
            for n in &page.nodes {
                map.insert(n.get_id(), n.clone());
            }
        }

        map
    }

    pub(crate) fn clear(&mut self) -> std::io::Result<()> {
        self.index.clear();
        self.pages.clear();
        self.history.clear().expect("Failed to clear history.");
        self.status = TagStatus::Cleared;

        Ok(())
    }

    /// Use only on pages known to exist. No checks will panic.
    pub(crate) fn get_append_page(&mut self, node_size: usize) -> &mut Page {
        // Check size. If page.size() + node.size() < page.max_size add
        if self.last_inserted_page.is_some() {
            let id = self.last_inserted_page.unwrap();
            let page_ref = &self.pages[&id]; // self.pages.get(&id).unwrap();
            if page_ref.get_size() + node_size < page_ref.get_max_size() {
                let id = page_ref.get_id();
                return self.pages.get_mut(&id).unwrap();
            }
        }

        // get a new page either the other is full or we have not inserted yet
        let new_page = create_page(self.tag.as_str(), self.max_page_size);
        let new_page_id = new_page.get_id();
        self.pages.insert(new_page_id, new_page);
        self.last_inserted_page = Some(new_page_id);
        self.pages.get_mut(&new_page_id).unwrap()
    }

    pub(crate) fn get_node_page_mut(&mut self, id: NodeId) -> &mut Page {
        let page_id = self.index[&id];
        self.pages.get_mut(&page_id).unwrap()
    }

    pub(crate) fn get_all_nodes(&self) -> Vec<Node> {
        self.pages
            .values()
            .par_bridge()
            .flat_map(|page| page.get_all_nodes().clone())
            .collect()
    }

    pub(crate) fn get_node(&self, id: NodeId) -> Option<Node> {
        let target_page = self.index[&id];
        let page = &self.pages[&target_page];

        for i in page.get_nodes() {
            if i.get_id() == id {
                return Some(i.clone());
            }
        }

        None
    }

    #[allow(dead_code)]
    pub(crate) fn get_tag(&self) -> &str {
        self.tag.as_str()
    }

    pub(crate) fn is_loaded(&self) -> bool {
        self.status == TagStatus::Loaded
    }

    /// Load the contents of this container into memory.
    pub(crate) fn load(&self) -> Self {
        let mut container = Tag::new(
            self.tag.as_str(),
            self.working_dir.to_path_buf(),
            self.max_page_size,
        );

        // Shortcut return if item does not yet exist on disk
        // do not create it. Save will trigger creates to file system.
        // if load is called again before save we want to give whatever
        // has been persisted to disk and drop any memory changes.
        if !&self.working_dir.exists() {
            return Tag::new(
                self.tag.as_str(),
                self.working_dir.to_path_buf(),
                self.max_page_size,
            );
        }

        // Shortcut return if item does not yet exist on disk
        // do not create it. Save will trigger creates to file system.
        // if load is called again before save we want to give whatever
        // has been persisted to disk and drop any memory changes.
        if !&self.working_dir.exists() {
            return Tag::new(
                self.tag.as_str(),
                self.working_dir.to_path_buf(),
                self.max_page_size,
            );
        }

        let pages: Vec<PathBuf> = fs::read_dir(&self.working_dir)
            .expect("Error reading container.")
            .par_bridge()
            .filter_map(|file| {
                let file = file.expect("Error reading item from container.");
                let path = file.path();
                if path.extension().is_none() && path.is_file() {
                    Some(path)
                } else {
                    None
                }
            })
            .collect();

        container.pages = load_from_vec(pages).unwrap();
        container.status = TagStatus::Loaded;
        container
    }

    pub(crate) fn replace_node(&mut self, node_id: NodeId, node: Node) -> std::io::Result<()> {
        let page = self.get_node_page_mut(node_id);
        let page_pos = *page.index.get(&node_id).unwrap();
        page.nodes[page_pos] = node;
        Ok(())
    }

    pub(crate) fn save_all(&self) -> std::io::Result<()> {
        crate::persistence::prep_tag_directory(&self.working_dir)?;
        self.pages
            .par_iter()
            .for_each(|x| match x.1.save(&self.working_dir) {
                Ok(_) => (),
                Err(e) => panic!("Failed to save page: {:?}", e),
            });
        self.history.save(&self.history.get_working_directory())?;
        self.history.pages.par_iter().for_each(|x| {
            match x.1.save(&self.history.get_working_directory()) {
                Ok(_) => (),
                Err(e) => panic!("Failure saving page: {:?}", e),
            }
        });

        Ok(())
    }

    /// Replace original node in primary tag container with the new node
    /// Return original node
    pub(crate) fn update_node(
        &mut self,
        node_id: NodeId,
        item: &[u8],
    ) -> Result<Node, Box<dyn std::error::Error>> {
        let original = self.get_node(node_id).expect("Failed to find node.");
        let new = original.create_update_node(item);
        self.replace_node(original.get_id(), new)?;
        Ok(original)
    }

    pub(crate) fn update_node_history(
        &mut self,
        node: Node,
    ) -> Result<(), Box<dyn std::error::Error>> {
        self.history.append_history(node)
    }

    /// Index all items so we know what page contains them
    /// when looking for a specific item.
    pub(crate) fn with_page_item_idex(mut self) -> Self {
        let mut index: HashMap<NodeId, PageId> = HashMap::new();
        for (k, v) in &self.pages {
            for n in v.get_nodes() {
                index.insert(n.get_id(), *k);
            }
        }
        self.index = index;
        self
    }
}