1extern crate hyper;
26extern crate serde_json;
27extern crate serde;
28
29#[cfg(test)]
30#[macro_use]
31extern crate matches;
32#[cfg(test)]
33#[macro_use]
34extern crate serde_derive;
35
36use hyper::client::Client;
37use hyper::client::response::Response;
38use serde_json::Deserializer;
39use serde::Deserialize;
40use std::io::{self, Read};
41use std::sync::mpsc::{channel, Receiver};
42use std::thread;
43
44#[derive(Debug)]
46pub enum Error {
47 InvalidUrl(hyper::error::ParseError),
49 HttpRequestFailed(hyper::error::Error),
51 DeserializationFailed(serde_json::Error),
53}
54
55#[derive(Debug)]
57pub struct Cluster {
58 host: hyper::Url,
59}
60
61impl Cluster {
62 pub fn new(host: &str) -> Result<Cluster, Error> {
68 let url = try!(hyper::Url::parse(host).map_err(Error::InvalidUrl));
69 Ok(Cluster { host: url })
70 }
71
72 fn get(&self, path: &str) -> Result<Response, Error> {
74 let url = try!(self.host.join(path).map_err(Error::InvalidUrl));
75 Client::new().get(url).send().map_err(Error::HttpRequestFailed)
76 }
77}
78
79pub trait Events {
81 fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
83 where Event: Deserialize + Send + 'static;
84
85 fn generator<Event, Iter>(&self, iter: Iter) -> Receiver<Result<Event, Error>>
87 where Event: Deserialize + Send + 'static,
88 Iter: Iterator<Item = io::Result<u8>> + Send + 'static
89 {
90 let (tx, rx) = channel();
91 let stream = Deserializer::from_iter(iter).into_iter::<Event>();
92 thread::spawn(move || for event in stream {
93 if let Err(_) = tx.send(event.map_err(Error::DeserializationFailed)) {
94 break;
95 }
96 });
97 rx
98 }
99}
100
101impl Events for Cluster {
103 fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
104 where Event: Deserialize + Send + 'static
105 {
106 let path = format!("{}?watch=true", name);
107 let bytes = try!(self.get(&path)).bytes();
108 Ok(self.generator(bytes))
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 #[test]
117 fn cluster() {
118 let cluster = Cluster::new("http://rust-lang.org");
119 assert!(cluster.is_ok());
120 }
121
122 #[test]
123 fn cluster_invalid_url() {
124 let cluster = Cluster::new("123.456.789.000");
125 assert!(matches!(cluster, Err(Error::InvalidUrl(_))));
126 }
127
128 #[test]
129 fn cluster_get() {
130 let cluster = Cluster::new("http://duckduckgo.com").unwrap();
131 let response = cluster.get("/rust");
132 assert!(response.is_ok());
133 }
134
135 #[test]
136 fn cluster_get_invalid_url() {
137 let cluster = Cluster::new("http://does.not").unwrap();
138 let response = cluster.get("/exist");
139 assert!(matches!(response, Err(Error::HttpRequestFailed(_))));
140 }
141
142 impl Events for &'static str {
143 #[allow(unused_variables)]
144 fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
145 where Event: Deserialize + Send + 'static
146 {
147 Ok(self.generator(self.bytes().into_iter().map(|b| Ok(b))))
148 }
149 }
150
151 #[derive(Deserialize, PartialEq, Eq, Debug)]
152 struct Point {
153 x: i32,
154 y: i32,
155 }
156
157 #[test]
158 fn events_generator() {
159 let mut events = r#"{"x": 1, "y": 2}{"x": 3, "y": 4}"#
160 .events::<Point>("points")
161 .unwrap()
162 .into_iter();
163 assert_eq!(events.next().unwrap().unwrap(), Point { x: 1, y: 2 });
164 assert_eq!(events.next().unwrap().unwrap(), Point { x: 3, y: 4 });
165 }
166}