use std::path::Path;
use std::usize;
mod addr;
mod config;
mod set;
mod upload;
mod download;
mod future;
mod error;
pub use cluster::config::Config;
pub use cluster::upload::{Stats, ProgressOneLiner};
pub use cluster::download::{RawIndex, MutableIndex, MaterializedIndex};
pub use cluster::download::{IndexParseError, IndexUpdateError};
pub use cluster::future::{UploadFuture, UploadOk, UploadFail};
pub use cluster::error::{UploadErr, ErrorKind};
use std::sync::Arc;
use abstract_ns::{Name, Resolve, HostResolve};
use futures::sync::mpsc::{UnboundedSender};
use futures::future::{Future, Shared};
use futures::sync::oneshot;
use {VPath};
use id::ImageId;
use index::GetIndex;
use blocks::GetBlock;
use cluster::set::{Message, NewUpload};
use signature::SignedUpload;
pub use cluster::future::{IndexFuture, FileFuture};
#[derive(Debug, Clone)]
pub struct Connection {
cluster_name: Vec<Name>,
config: Arc<Config>,
chan: UnboundedSender<Message>,
}
#[derive(Debug, Clone)]
pub struct Upload {
stats: Arc<upload::Stats>,
future: Shared<oneshot::Receiver<Result<UploadOk, Arc<UploadErr>>>>,
}
impl Connection {
pub fn new<R, I, B>(initial_address: Vec<Name>, resolver: R,
index_source: I, block_source: B, config: &Arc<Config>)
-> Connection
where I: GetIndex + Clone + Send + 'static,
B: GetBlock + Clone + Send + 'static,
R: Resolve + HostResolve + Clone + Send + 'static,
{
let tx = set::ConnectionSet::spawn(initial_address.clone(), resolver,
index_source, block_source, config);
return Connection {
cluster_name: initial_address,
config: config.clone(),
chan: tx,
}
}
pub fn append(&self, upload: SignedUpload) -> Upload {
self._upload(false, false, upload, None)
}
pub fn append_weak(&self, upload: SignedUpload) -> Upload {
self._upload(false, true, upload, None)
}
pub fn replace(&self, upload: SignedUpload) -> Upload {
self._upload(true, false, upload, None)
}
pub fn replace_if_matches(&self, upload: SignedUpload, old_image: ImageId)
-> Upload
{
self._upload(true, false, upload, Some(old_image))
}
fn _upload(&self, replace: bool, weak: bool, upload: SignedUpload,
old_image: Option<ImageId>)
-> Upload
{
let (tx, rx) = oneshot::channel();
let stats = Arc::new(upload::Stats::new(
&self.cluster_name, &upload.path, weak));
self.chan.unbounded_send(Message::NewUpload(NewUpload {
replace, upload, weak, old_image: old_image,
stats: stats.clone(),
resolve: tx,
})).expect("connection set is not closed");
Upload {
stats,
future: rx.shared(),
}
}
pub fn fetch_index(&self, vpath: &VPath) -> IndexFuture {
let (tx, rx) = oneshot::channel();
self.chan.unbounded_send(Message::FetchIndex(vpath.clone(), tx))
.expect("connection set is not closed");
IndexFuture {
inner: rx,
}
}
pub fn fetch_file<I, P>(&self, idx: &I, path: P)
-> FileFuture
where P: AsRef<Path>,
I: MaterializedIndex,
{
let (tx, rx) = oneshot::channel();
let path = path.as_ref().to_path_buf();
let (_, size, hashes) = idx.get_file(&path).expect("file must exist");
assert!(size < usize::MAX as u64, "file must fit memory");
assert_eq!(
(size + hashes.block_size()-1) / hashes.block_size(),
hashes.len() as u64,
"valid number of hashes");
self.chan.unbounded_send(Message::FetchFile {
location: idx.get_location(),
size: size as usize, hashes, path, tx,
}).expect("connection set is not closed");
FileFuture {
inner: rx,
}
}
}
impl Upload {
pub fn future(&self) -> UploadFuture {
UploadFuture {
inner: self.future.clone()
}
}
pub fn stats(&self) -> &Stats {
&*self.stats
}
}