libpijul 0.4.3

A patch-based distributed version control system, easy to use and fast.
Documentation
#[macro_use]
extern crate log;
extern crate chrono;
#[macro_use]
extern crate bitflags;

extern crate sanakirja;
extern crate byteorder;
extern crate flate2;
extern crate ring;
extern crate libc;
extern crate rand;
extern crate rustc_serialize;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate bincode;
extern crate memmap;

use std::path::Path;
use std::collections::{HashMap, HashSet};
use rustc_serialize::base64::{URL_SAFE, ToBase64};
use std::io::{Seek, SeekFrom, Write, Read};
use std::fs::{OpenOptions, File};

pub use sanakirja::Transaction;

pub mod error;
use self::error::*;

pub trait RepositoryEnv<'env, R>: Sized {
    fn open<P: AsRef<Path>>(&self, path: P) -> Result<Self, Error>;
    fn mut_txn_begin(&'env self) -> Result<R, Error>;
}


#[macro_use]
mod backend;
pub mod fs_representation;
pub mod file_operations;

pub mod patch;
pub mod conflict;
pub mod graph;
mod optimal_diff;
mod record;
mod apply;
mod output;
mod unrecord;

pub use backend::{
    DEFAULT_BRANCH, Repository, MutTxn, LineId, PatchId, FOLDER_EDGE, PARENT_EDGE, DELETED_EDGE,
    Hash, HashRef,
    Key, Edge,
    Txn, Branch, Inode,
    ROOT_INODE, ROOT_KEY,
    SmallString,
    ApplyTimestamp
};
pub use record::InodeUpdate;
pub use patch::Patch;

const LAST_CHANGES_BLOCK_LEN:u64 = 1000;

impl<'env, T: rand::Rng> backend::MutTxn<'env, T> {

    pub fn add_to_changes_file<P: AsRef<Path>>(&mut self, branch: &Branch, path: P) -> Result<(), Error> {

        let changes_file = fs_representation::branch_changes_file(path.as_ref(), branch.name.as_str());
        let mut file = OpenOptions::new()
                .read(true)
                .write(true)
                .create(true)
                .append(true)
                .open(&changes_file)?;

        // First compute the last revision.
        let len = file.metadata()?.len();
        debug!("len = {:?}", len);
        if len == 0 {

            // If we just created the file, copy the repository id as the first line.
            let id_file = fs_representation::id_file(path.as_ref());
            let mut f = File::open(&id_file)?;
            let mut s = String::new();
            f.read_to_string(&mut s)?;
            file.write_all(s.trim().as_bytes())?;
            file.write_all(b"\n")?;

        } else if len > LAST_CHANGES_BLOCK_LEN {

            debug!("seeking");
            let pos = file.seek(SeekFrom::Start(len - LAST_CHANGES_BLOCK_LEN))?;
            debug!("pos = {:?}", pos);

        }
        let mut f = String::new();
        file.read_to_string(&mut f)?;
        debug!("f: {:?}", f);
        let last_written_patch =
            if let Some(last_line) = f.lines().last() {
                let mut split = last_line.split(':');
                match (split.next(), split.next()) {
                    (Some(_), Some(s0)) => Some(s0.parse().map_err(|_| Error::ChangesFile)?),
                    _ => None
                }
            } else {
                None
            };
        debug!("last_written_patch: {:?}", last_written_patch);
        for (s, hash) in self.iter_applied(&branch, last_written_patch) {
            debug!("must_write: s = {:?}", s);
            let must_write = if let Some(last) = last_written_patch {
                s > last
            } else {
                true
            };

            if must_write {
                let hash_ext = self.get_external(hash).unwrap();
                writeln!(file, "{}:{}", hash_ext.to_base64(URL_SAFE), s)?
            }
        }
        Ok(())
    }

    pub fn remove_from_changes_file<P: AsRef<Path>>(&mut self, branch: &Branch, path: P, ord: u64) -> Result<(), Error> {
        debug!("remove_from_changes_file, ord = {:?}", ord);
        use memmap::{Mmap, Protection};
        use std::cmp::Ordering;
        let changes_file = fs_representation::branch_changes_file(path.as_ref(), branch.name.as_str());
        let mut file_mmap = Mmap::open_path(&changes_file, Protection::ReadWrite)?;
        let changes: &mut [u8] = unsafe { file_mmap.as_mut_slice() };
        debug!("lines: {:?}", std::str::from_utf8(&changes));
        let i = {
            let lines:Vec<&[u8]> = changes.split(|c| *c == b'\n').collect();
            if let Ok(a) = lines.binary_search_by(|l| {
                let mut s = l.split(|c| *c == b':');
                match (s.next(), s.next()) {
                    (Some(_), Some(num)) =>
                        std::str::from_utf8(num).unwrap()
                        .parse::<u64>().unwrap().cmp(&ord),
                    _ => Ordering::Less
                }
            }) {

                // The patch was found, at position a.
                lines[a].as_ptr() as usize - changes.as_ptr() as usize

            } else {
                // The patch was not found.
                return Ok(())
            }
        };
        debug!("remove_from_changes_file: i= {:?}, changes: {:?}", i, &changes[i..]);
        // Now, we need to overwrite the patch starting at position i.
        let colon = (&changes[i..]).iter().position(|c| *c == b':').unwrap();
        for x in changes[i..(i+colon)].iter_mut() {
            *x = b' '
        }
        Ok(())
    }

    pub fn branch_patches(&mut self, branch: &Branch) -> HashSet<(backend::Hash, ApplyTimestamp)> {
        self.iter_patches(branch, None)
            .map(|(patch, time)| (self.external_hash(patch).to_owned(), time))
            .collect()
    }

    pub fn fork(&mut self, branch: &Branch, new_name:&str) -> Result<Branch, Error> {
        if branch.name.as_str() == new_name {
            Err(Error::BranchNameAlreadyExists)
        } else {
            Ok(Branch {
                db: self.txn.fork(&mut self.rng, &branch.db)?,
                patches: self.txn.fork(&mut self.rng, &branch.patches)?,
                revpatches: self.txn.fork(&mut self.rng, &branch.revpatches)?,
                name: SmallString::from_str(new_name),
                apply_counter: 0
            })
        }
    }
}

impl<'env, T: rand::Rng> backend::MutTxn<'env, T> {
    pub fn add_file<P: AsRef<Path>>(&mut self, path: P, is_dir: bool) -> Result<(), Error> {
        self.add_inode(None, path.as_ref(), is_dir)
    }

    /// Tells whether a `key` is alive in `branch`, i.e. is either the
    /// root, or has at least one alive edge pointing to it.
    fn is_alive(&self, branch: &Branch, key: &Key<PatchId>) -> bool {
        *key == ROOT_KEY ||
            self.has_edge(branch, &key, PARENT_EDGE, false) ||
            self.has_edge(branch, &key, PARENT_EDGE | FOLDER_EDGE, false)
    }
}

fn make_remote<'a, I:Iterator<Item = &'a Hash>>(target: &Path, remote: I) -> Result<(HashMap<Hash, Patch>, usize), Error> {
    use fs_representation::*;
    use std::io::BufReader;
    use std::fs::File;
    let mut patches = HashMap::new();
    let mut patches_dir = patches_dir(target).to_path_buf();;
    let mut size_increase = 0;

    for h in remote {

        patches_dir.push(&patch_file_name(h.as_ref()));

        debug!("opening {:?}", patches_dir);
        let file = try!(File::open(&patches_dir));
        let mut file = BufReader::new(file);
        let (h, _, patch) = Patch::from_reader_compressed(&mut file)?;

        size_increase += patch.size_upper_bound();
        patches.insert(h.clone(), patch);

        patches_dir.pop();

    }
    Ok((patches, size_increase))
}

/// Apply a number of patches, guessing the new repository size.  If
/// this fails, the repository size is guaranteed to have been
/// increased by at least some pages, and it is safe to call this
/// function again.
///
/// Also, this function takes a file lock on the repository.
pub fn apply_resize<'a, I:Iterator<Item = &'a Hash>>(target: &Path, branch_name: &str, remote: I) -> Result<(), Error> {
    use fs_representation::*;
    let (patches, size_increase) = make_remote(target, remote)?;
    info!("applying patches with size_increase {:?}", size_increase);
    let pristine_dir = pristine_dir(target).to_path_buf();;
    let repo = try!(Repository::open(pristine_dir, Some(size_increase as u64)));
    let mut txn = try!(repo.mut_txn_begin(rand::thread_rng()));
    try!(txn.apply_patches(branch_name, target, &patches));
    try!(txn.commit());
    Ok(())
}

/// Apply a number of patches, guessing the new repository size.  If
/// this fails, the repository size is guaranteed to have been
/// increased by at least some pages, and it is safe to call this
/// function again.
///
/// Also, this function takes a file lock on the repository.
pub fn apply_resize_no_output<'a, I:Iterator<Item = &'a Hash>>(target: &Path, branch_name: &str, remote: I) -> Result<(), Error> {
    use fs_representation::*;
    let (patches, size_increase) = make_remote(target, remote)?;
    let pristine_dir = pristine_dir(target).to_path_buf();;
    let repo = try!(Repository::open(pristine_dir, Some(size_increase as u64)));
    let mut txn = try!(repo.mut_txn_begin(rand::thread_rng()));
    let mut branch = txn.open_branch(branch_name)?;
    let mut new_patches_count = 0;
    for (p, patch) in patches.iter() {
        debug!("apply_patches: {:?}", p);
        txn.apply_patches_rec(&mut branch, &patches,
                              p, patch, &mut new_patches_count)?
    }
    txn.commit_branch(branch)?;
    txn.commit()?;
    Ok(())
}

pub fn unrecord_no_resize(repo_dir: &Path, repo_root: &Path, branch_name: &str, selected: &mut Vec<(Hash, Patch)>, increase: u64) -> Result<(), Error> {
    let repo = try!(Repository::open(repo_dir, Some(increase)));

    let mut txn = try!(repo.mut_txn_begin(rand::thread_rng()));
    let mut branch = txn.open_branch(branch_name)?;
    while let Some((hash, patch)) = selected.pop() {
        let internal = txn.get_internal(hash.as_ref()).unwrap().to_owned();
        debug!("Unrecording {:?}", hash);
        try!(txn.unrecord(&mut branch, repo_root, &internal, &patch));
        debug!("Done unrecording {:?}", hash);
    }

    try!(txn.commit_branch(branch));
    try!(txn.commit());
    Ok(())
}