1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use futures::prelude::*;
use hyper::{self, header};
use v2::*;
pub type StreamTags = Box<futures::Stream<Item = String, Error = Error>>;
#[derive(Debug, Default, Deserialize, Serialize)]
struct TagsChunk {
name: String,
tags: Vec<String>,
}
impl Client {
pub fn get_tags(&self, name: &str, paginate: Option<u32>) -> StreamTags {
let dclient = self.clone();
let base_url = format!("{}/v2/{}/tags/list", self.base_url, name);
let fres = futures::stream::unfold(Some(String::new()), move |link| {
let last = match link {
None => return None,
Some(ref s) if s == "" => None,
s => s,
};
let full_url = match (paginate, last) {
(Some(p), None) => format!("{}?n={}", base_url, p),
(None, Some(l)) => format!("{}?next_page={}", base_url, l),
(Some(p), Some(l)) => format!("{}?n={}&next_page={}", base_url, p, l),
_ => base_url.to_string(),
};
let client = dclient.clone();
let url = hyper::Uri::from_str(&full_url);
let freq = futures::future::result(url)
.from_err()
.and_then(move |url| {
trace!("GET {:?}", &url);
let req = client.new_request(hyper::Method::GET, url);
futures::future::result(req)
.and_then(move |req| client.hclient.request(req).from_err())
}).and_then(|resp| {
let status = resp.status();
match status {
hyper::StatusCode::OK => Ok(resp),
_ => Err(format!("get_tags: wrong HTTP status '{}'", status).into()),
}
}).and_then(|resp| {
let ct_hdr = resp.headers().get(header::CONTENT_TYPE).cloned();
let ok = match ct_hdr {
None => false,
Some(ref ct) => ct.to_str()?.starts_with("application/json"),
};
if !ok {
return Err(format!("get_tags: wrong content type '{:?}'", ct_hdr).into());
}
Ok(resp)
}).and_then(|resp| {
let hdr = resp.headers().get(header::LINK).cloned();
trace!("next_page {:?}", hdr);
resp.into_body()
.concat2()
.map_err(|e| {
format!("get_tags: failed to fetch the whole body: {}", e).into()
}).and_then(move |body| Ok((body, parse_link(hdr))))
}).and_then(|(body, hdr)| -> Result<(TagsChunk, Option<String>)> {
serde_json::from_slice(&body.into_bytes())
.map_err(|e| e.into())
.map(|tags_chunk| (tags_chunk, hdr))
}).map(|(tags_chunk, last)| {
(futures::stream::iter_ok(tags_chunk.tags.into_iter()), last)
});
Some(freq)
}).flatten();
Box::new(fres)
}
}
fn parse_link(hdr: Option<header::HeaderValue>) -> Option<String> {
let hval = match hdr {
Some(v) => v,
None => return None,
};
let sval = match hval.to_str() {
Ok(v) => v.to_owned(),
_ => return None,
};
let uri = sval.trim_right_matches(">; rel=\"next\"");
let query: Vec<&str> = uri.splitn(2, "next_page=").collect();
let params = match query.get(1) {
Some(v) if *v != "" => v,
_ => return None,
};
let last: Vec<&str> = params.splitn(2, '&').collect();
match last.get(0).cloned() {
Some(v) if v != "" => Some(v.to_string()),
_ => None,
}
}