Skip to main content

influent/client/
http.rs

1use ::measurement::Measurement;
2use ::serializer::Serializer;
3use ::client::{Precision, Client, Credentials, ClientError, ClientReadResult, ClientWriteResult};
4use ::hurl::{Hurl, Request, Method, Auth};
5use std::collections::HashMap;
6use futures::{Future, stream, Stream};
7
8const MAX_BATCH: u16 = 5000;
9
10pub enum WriteStatus {
11    Success,
12    CouldNotComplete,
13}
14
15// fixme
16pub struct Options {
17    pub max_batch: Option<u16>,
18    pub precision: Option<Precision>,
19
20    pub epoch: Option<Precision>,
21    pub chunk_size: Option<u16>
22}
23
24pub struct HttpClient<'a> {
25    credentials: Credentials<'a>,
26    serializer: Box<Serializer + Send + Sync>,
27    hurl: Box<Hurl + Send + Sync>,
28    hosts: Vec<&'a str>,
29    pub max_batch: u16
30}
31
32impl<'a> HttpClient<'a> {
33    pub fn new(credentials: Credentials<'a>, serializer: Box<Serializer + Send + Sync>, hurl: Box<Hurl + Send + Sync>) -> HttpClient<'a> {
34        HttpClient {
35            credentials: credentials,
36            serializer: serializer,
37            hurl: hurl,
38            hosts: vec![],
39            max_batch: MAX_BATCH
40        }
41    }
42
43    pub fn add_host(&mut self, host: &'a str) {
44        self.hosts.push(host);
45    }
46
47    fn get_host(&self) -> &'a str {
48        match self.hosts.first() {
49            Some(host) => host,
50            None => panic!("Could not get host")
51        }
52    }
53}
54
55impl<'a> Client for HttpClient<'a> {
56    fn query(&self, q: String, epoch: Option<Precision>) -> ClientReadResult {
57        let host = self.get_host();
58
59        let mut query = HashMap::new();
60        query.insert("db", self.credentials.database.to_string());
61        query.insert("q", q);
62
63        if let Some(ref epoch) = epoch {
64            query.insert("epoch", epoch.to_string());
65        }
66
67        let request = Request {
68            url: &*{host.to_string() + "/query"},
69            method: Method::GET,
70            auth: Some(Auth {
71                username: self.credentials.username,
72                password: self.credentials.password
73            }),
74            query: Some(query),
75            body: None
76        };
77
78        Box::new(self.hurl.request(request).then(|res| {
79            match res {
80                Ok(ref resp) if resp.status == 200 => Ok(resp.to_string()),
81                Ok(ref resp) if resp.status == 400 => Err(ClientError::Syntax(resp.to_string())),
82                Ok(ref resp) => Err(ClientError::Unexpected(format!("Unexpected response. Status: {}; Body: \"{}\"", resp.status, resp.to_string()))),
83                Err(reason) => Err(ClientError::Communication(reason))
84            }
85        }))
86    }
87
88    fn write_one(&self, measurement: Measurement, precision: Option<Precision>) -> ClientWriteResult {
89        self.write_many(&[measurement], precision)
90    }
91
92    fn write_many(&self, measurements: &[Measurement], precision: Option<Precision>) -> ClientWriteResult {
93        let host = self.get_host();
94
95        let futures = measurements.chunks(self.max_batch as usize).map(|chunk| {
96            let mut lines = Vec::new();
97
98            for measurement in chunk {
99                lines.push(self.serializer.serialize(measurement));
100            }
101
102            let mut query = HashMap::new();
103            query.insert("db", self.credentials.database.to_string());
104
105            if let Some(ref precision) = precision {
106                query.insert("precision", precision.to_string());
107            }
108
109            let request = Request {
110                url: &*{host.to_string() + "/write"},
111                method: Method::POST,
112                auth: Some(Auth {
113                    username: self.credentials.username,
114                    password: self.credentials.password
115                }),
116                query: Some(query),
117                body: Some(lines.join("\n"))
118            };
119
120            self.hurl.request(request).then(|res| {
121                match res {
122                    Ok(ref resp) if resp.status == 204 => Ok(()),
123                    Ok(ref resp) if resp.status == 200 => Err(ClientError::CouldNotComplete(resp.to_string())),
124                    Ok(ref resp) if resp.status == 400 => Err(ClientError::Syntax(resp.to_string())),
125                    Ok(ref resp) => Err(ClientError::Unexpected(format!("Unexpected response. Status: {}; Body: \"{}\"", resp.status, resp.to_string()))),
126                    Err(reason) => Err(ClientError::Communication(reason))
127                }
128            })
129        });
130
131        Box::new(stream::futures_ordered(futures).for_each(|_| Ok(())))
132    }
133}
134
135
136
137#[cfg(test)]
138mod tests {
139    use ::serializer::Serializer;
140    use ::client::{Client};
141    use super::HttpClient;
142    use ::client::{Credentials, Precision};
143    use ::hurl::{Hurl, Request, Response, HurlResult};
144    use ::measurement::Measurement;
145    use std::sync::atomic::{AtomicUsize, Ordering};
146    use ::futures::{self, Future};
147
148    struct MockSerializer {
149        serialize_count: AtomicUsize,
150    }
151
152    impl MockSerializer {
153        fn new() -> MockSerializer {
154            MockSerializer {
155                serialize_count: AtomicUsize::new(0),
156            }
157        }
158    }
159
160    impl Serializer for MockSerializer {
161        fn serialize(&self, measurement: &Measurement) -> String {
162            println!("serializing: {:?}", measurement);
163            self.serialize_count.fetch_add(1, Ordering::SeqCst);
164            "serialized".to_string()
165        }
166    }
167
168    struct MockHurl {
169        request_count: AtomicUsize,
170        result: Box<(Fn() -> HurlResult) + Send + Sync>
171    }
172
173    impl MockHurl {
174        fn new(result: Box<(Fn() -> HurlResult) + Send + Sync>) -> MockHurl {
175            MockHurl {
176                request_count: AtomicUsize::new(0),
177                result: result
178            }
179        }
180    }
181
182    impl Hurl for MockHurl {
183        fn request(&self, req: Request) -> HurlResult {
184            println!("sending: {:?}", req);
185            self.request_count.fetch_add(1, Ordering::SeqCst);
186            let ref f = self.result;
187            f()
188        }
189    }
190
191    fn before<'a>(result: Box<(Fn() -> HurlResult) + Send + Sync>) -> HttpClient<'a> {
192        let credentials = Credentials {
193            username: "gobwas",
194            password: "1234",
195            database: "test"
196        };
197
198        let serializer = MockSerializer::new();
199        let hurl = MockHurl::new(result);
200
201        HttpClient::new(credentials, Box::new(serializer), Box::new(hurl))
202    }
203
204    #[test]
205    fn test_write_one() {
206        let mut client = before(Box::new(|| Box::new(futures::future::ok(Response { status: 204, body: "Ok".to_string() }))));
207        client.add_host("http://localhost:8086");
208        ::tokio::run(client.write_one(Measurement::new("key"), Some(Precision::Nanoseconds)).map_err(|e| panic!(e)));
209    }
210
211    #[test]
212    fn test_write_many() {
213        let mut client = before(Box::new(|| Box::new(futures::future::ok(Response { status: 204, body: "Ok".to_string() }))));
214        client.add_host("http://localhost:8086");
215        assert!(client.write_many(&[Measurement::new("key")], Some(Precision::Nanoseconds)).wait().is_ok());
216    }
217}
218
219
220