ciruela 0.6.12

A peer-to-peer synchronization software for servers in datacenters.
Documentation
use std::sync::Arc;
use std::time::{SystemTime, Duration};
use std::fs::read;

use abstract_ns::Name;
use failure::{Error, ResultExt};
use futures::Future;
use futures::future::{err, join_all, Either};
use tk_easyloop::{self, handle};
use ns_env_config;
use ssh_keys::PrivateKey;

use {VPath};
use ciruela::blocks::ThreadedBlockReader;
use ciruela::index::InMemoryIndexes;
use ciruela::cluster::{Config, Connection};
use ciruela::signature::sign_upload;

use sync::network::upload_with_progress;
use put_file::PutFileOptions;


pub fn put(config: Arc<Config>, clusters: Vec<Vec<Name>>,
    keys: Vec<PrivateKey>,
    indexes: &InMemoryIndexes, blocks: &ThreadedBlockReader,
    opts: PutFileOptions)
    -> Result<(), Error>
{
    if clusters.len() == 0 {
        bail!("at least one destination host name is expected");
    }
    let data = read(&opts.source_file)?;
    let res = tk_easyloop::run(|| {
        let ns = ns_env_config::init(&handle()).expect("init dns");
        let conns = clusters.iter().map(|addr| {
            Connection::new(addr.clone(),
                ns.clone(), indexes.clone(), blocks.clone(), &config)
        }).collect::<Vec<_>>();
        let vpath = VPath::from(&opts.dir);
        conns[0].fetch_index(&vpath)
        .then(|res| res.context("can't fetch index").map_err(Error::from))
        .and_then(|idx| {
            let hash = idx.get_hash()
                .context("can't parse index").map_err(Error::from)?;
            let idx = idx.into_mut()
                .context("can't parse index").map_err(Error::from)?;
            return Ok((hash, idx));
        })
        .and_then(move |(old_image, idx)| {
            let mut idx = idx;
            match idx.insert_file(&opts.file, &data[..], false) {
                Ok(()) => {}
                Err(e) => return Either::B(err(e.into())),
            }
            blocks.register_memory_blocks(
                idx.hash_type(), idx.block_size(), data);
            let new_index = idx.to_raw_data();
            let image_id = indexes.register_index(&new_index)
                .expect("index is valid");
            let upload = sign_upload(&vpath,
                &image_id, SystemTime::now(), &keys);
            Either::A(join_all(conns.into_iter().map(move |conn| {
                let up = conn.replace_if_matches(
                    upload.clone(), old_image.clone());
                upload_with_progress(up, Duration::new(30,0))
                    .map_err(Into::into)
            })).map(Either::A))
        })
    })?;
    match res {
        Either::A(results) => {
            for res in results {
                println!("{}", res);
            }
        }
        Either::B(()) => warn!("file is unchanged."),
    }

    Ok(())
}