1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use failure::err_msg;
use futures::future::Shared;
use futures::sync::oneshot;
use futures::{Future, Async};
use cluster::error::{UploadErr, FetchErr};
use cluster::upload::{Stats, UploadName};
use cluster::download::RawIndex;
#[derive(Debug)]
pub struct UploadFuture {
pub(crate) inner: Shared<oneshot::Receiver<Result<UploadOk, Arc<UploadErr>>>>,
}
#[derive(Debug)]
pub struct IndexFuture {
pub(crate) inner: oneshot::Receiver<Result<RawIndex, FetchErr>>,
}
#[derive(Debug)]
pub struct FileFuture {
pub(crate) inner: oneshot::Receiver<Result<Vec<u8>, FetchErr>>,
}
#[derive(Debug, Clone)]
pub struct UploadOk {
stats: Arc<Stats>,
finished: Instant,
}
#[derive(Debug, Fail)]
#[fail(display="Upload error: {}", err)]
pub struct UploadFail {
err: Arc<UploadErr>,
}
impl Future for UploadFuture {
type Item = UploadOk;
type Error = UploadFail;
fn poll(&mut self) -> Result<Async<UploadOk>, UploadFail> {
match self.inner.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(res)) => match *res {
Ok(ref x) => Ok(Async::Ready(x.clone())),
Err(ref e) => Err(UploadFail { err: e.clone() }),
}
Err(_) => {
Err(UploadFail {
err: Arc::new(
UploadErr::Fatal(err_msg("uploader crashed")))
})
}
}
}
}
impl UploadOk {
pub(crate) fn new(stats: &Arc<Stats>) -> UploadOk {
UploadOk {
stats: stats.clone(),
finished: Instant::now(),
}
}
}
impl fmt::Display for UploadOk {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let ref s = self.stats;
let d = self.finished.duration_since(s.started);
write!(f, "Upload to {}: ", UploadName(s))?;
s.fmt_downloaded(f)?;
if d.as_secs() < 1 {
write!(f, " in 0.{:03}s", d.subsec_nanos() / 1_000_000)?;
} else if d.as_secs() < 10 {
write!(f, " in {}.{:01}s", d.as_secs(),
d.subsec_nanos() / 100_000_000)?;
} else {
write!(f, " in {}s", d.as_secs())?;
}
Ok(())
}
}
impl Future for IndexFuture {
type Item = RawIndex;
type Error = FetchErr;
fn poll(&mut self) -> Result<Async<RawIndex>, FetchErr> {
match self.inner.poll() {
Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)),
Ok(Async::Ready(Err(e))) => Err(e),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(FetchErr::Fatal(
format_err!("channel closed unexpectedly"))),
}
}
}
impl Future for FileFuture {
type Item = Vec<u8>;
type Error = FetchErr;
fn poll(&mut self) -> Result<Async<Vec<u8>>, FetchErr> {
match self.inner.poll() {
Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)),
Ok(Async::Ready(Err(e))) => Err(e),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(FetchErr::Fatal(
format_err!("channel closed unexpectedly"))),
}
}
}