1use ::measurement::Measurement;
2use ::serializer::Serializer;
3use ::client::{Precision, Client, Credentials, ClientError, ClientReadResult, ClientWriteResult};
4use ::hurl::{Hurl, Request, Method, Auth};
5use std::collections::HashMap;
6use futures::{Future, stream, Stream};
7
8const MAX_BATCH: u16 = 5000;
9
10pub enum WriteStatus {
11 Success,
12 CouldNotComplete,
13}
14
15pub struct Options {
17 pub max_batch: Option<u16>,
18 pub precision: Option<Precision>,
19
20 pub epoch: Option<Precision>,
21 pub chunk_size: Option<u16>
22}
23
24pub struct HttpClient<'a> {
25 credentials: Credentials<'a>,
26 serializer: Box<Serializer + Send + Sync>,
27 hurl: Box<Hurl + Send + Sync>,
28 hosts: Vec<&'a str>,
29 pub max_batch: u16
30}
31
32impl<'a> HttpClient<'a> {
33 pub fn new(credentials: Credentials<'a>, serializer: Box<Serializer + Send + Sync>, hurl: Box<Hurl + Send + Sync>) -> HttpClient<'a> {
34 HttpClient {
35 credentials: credentials,
36 serializer: serializer,
37 hurl: hurl,
38 hosts: vec![],
39 max_batch: MAX_BATCH
40 }
41 }
42
43 pub fn add_host(&mut self, host: &'a str) {
44 self.hosts.push(host);
45 }
46
47 fn get_host(&self) -> &'a str {
48 match self.hosts.first() {
49 Some(host) => host,
50 None => panic!("Could not get host")
51 }
52 }
53}
54
55impl<'a> Client for HttpClient<'a> {
56 fn query(&self, q: String, epoch: Option<Precision>) -> ClientReadResult {
57 let host = self.get_host();
58
59 let mut query = HashMap::new();
60 query.insert("db", self.credentials.database.to_string());
61 query.insert("q", q);
62
63 if let Some(ref epoch) = epoch {
64 query.insert("epoch", epoch.to_string());
65 }
66
67 let request = Request {
68 url: &*{host.to_string() + "/query"},
69 method: Method::GET,
70 auth: Some(Auth {
71 username: self.credentials.username,
72 password: self.credentials.password
73 }),
74 query: Some(query),
75 body: None
76 };
77
78 Box::new(self.hurl.request(request).then(|res| {
79 match res {
80 Ok(ref resp) if resp.status == 200 => Ok(resp.to_string()),
81 Ok(ref resp) if resp.status == 400 => Err(ClientError::Syntax(resp.to_string())),
82 Ok(ref resp) => Err(ClientError::Unexpected(format!("Unexpected response. Status: {}; Body: \"{}\"", resp.status, resp.to_string()))),
83 Err(reason) => Err(ClientError::Communication(reason))
84 }
85 }))
86 }
87
88 fn write_one(&self, measurement: Measurement, precision: Option<Precision>) -> ClientWriteResult {
89 self.write_many(&[measurement], precision)
90 }
91
92 fn write_many(&self, measurements: &[Measurement], precision: Option<Precision>) -> ClientWriteResult {
93 let host = self.get_host();
94
95 let futures = measurements.chunks(self.max_batch as usize).map(|chunk| {
96 let mut lines = Vec::new();
97
98 for measurement in chunk {
99 lines.push(self.serializer.serialize(measurement));
100 }
101
102 let mut query = HashMap::new();
103 query.insert("db", self.credentials.database.to_string());
104
105 if let Some(ref precision) = precision {
106 query.insert("precision", precision.to_string());
107 }
108
109 let request = Request {
110 url: &*{host.to_string() + "/write"},
111 method: Method::POST,
112 auth: Some(Auth {
113 username: self.credentials.username,
114 password: self.credentials.password
115 }),
116 query: Some(query),
117 body: Some(lines.join("\n"))
118 };
119
120 self.hurl.request(request).then(|res| {
121 match res {
122 Ok(ref resp) if resp.status == 204 => Ok(()),
123 Ok(ref resp) if resp.status == 200 => Err(ClientError::CouldNotComplete(resp.to_string())),
124 Ok(ref resp) if resp.status == 400 => Err(ClientError::Syntax(resp.to_string())),
125 Ok(ref resp) => Err(ClientError::Unexpected(format!("Unexpected response. Status: {}; Body: \"{}\"", resp.status, resp.to_string()))),
126 Err(reason) => Err(ClientError::Communication(reason))
127 }
128 })
129 });
130
131 Box::new(stream::futures_ordered(futures).for_each(|_| Ok(())))
132 }
133}
134
135
136
137#[cfg(test)]
138mod tests {
139 use ::serializer::Serializer;
140 use ::client::{Client};
141 use super::HttpClient;
142 use ::client::{Credentials, Precision};
143 use ::hurl::{Hurl, Request, Response, HurlResult};
144 use ::measurement::Measurement;
145 use std::sync::atomic::{AtomicUsize, Ordering};
146 use ::futures::{self, Future};
147
148 struct MockSerializer {
149 serialize_count: AtomicUsize,
150 }
151
152 impl MockSerializer {
153 fn new() -> MockSerializer {
154 MockSerializer {
155 serialize_count: AtomicUsize::new(0),
156 }
157 }
158 }
159
160 impl Serializer for MockSerializer {
161 fn serialize(&self, measurement: &Measurement) -> String {
162 println!("serializing: {:?}", measurement);
163 self.serialize_count.fetch_add(1, Ordering::SeqCst);
164 "serialized".to_string()
165 }
166 }
167
168 struct MockHurl {
169 request_count: AtomicUsize,
170 result: Box<(Fn() -> HurlResult) + Send + Sync>
171 }
172
173 impl MockHurl {
174 fn new(result: Box<(Fn() -> HurlResult) + Send + Sync>) -> MockHurl {
175 MockHurl {
176 request_count: AtomicUsize::new(0),
177 result: result
178 }
179 }
180 }
181
182 impl Hurl for MockHurl {
183 fn request(&self, req: Request) -> HurlResult {
184 println!("sending: {:?}", req);
185 self.request_count.fetch_add(1, Ordering::SeqCst);
186 let ref f = self.result;
187 f()
188 }
189 }
190
191 fn before<'a>(result: Box<(Fn() -> HurlResult) + Send + Sync>) -> HttpClient<'a> {
192 let credentials = Credentials {
193 username: "gobwas",
194 password: "1234",
195 database: "test"
196 };
197
198 let serializer = MockSerializer::new();
199 let hurl = MockHurl::new(result);
200
201 HttpClient::new(credentials, Box::new(serializer), Box::new(hurl))
202 }
203
204 #[test]
205 fn test_write_one() {
206 let mut client = before(Box::new(|| Box::new(futures::future::ok(Response { status: 204, body: "Ok".to_string() }))));
207 client.add_host("http://localhost:8086");
208 ::tokio::run(client.write_one(Measurement::new("key"), Some(Precision::Nanoseconds)).map_err(|e| panic!(e)));
209 }
210
211 #[test]
212 fn test_write_many() {
213 let mut client = before(Box::new(|| Box::new(futures::future::ok(Response { status: 204, body: "Ok".to_string() }))));
214 client.add_host("http://localhost:8086");
215 assert!(client.write_many(&[Measurement::new("key")], Some(Precision::Nanoseconds)).wait().is_ok());
216 }
217}
218
219
220