extern crate hyper;
extern crate serde_json;
extern crate serde;
#[cfg(test)]
#[macro_use]
extern crate matches;
#[cfg(test)]
#[macro_use]
extern crate serde_derive;
use hyper::client::Client;
use hyper::client::response::Response;
use serde_json::Deserializer;
use serde::Deserialize;
use std::io::{self, Read};
use std::sync::mpsc::{channel, Receiver};
use std::thread;
#[derive(Debug)]
pub enum Error {
InvalidUrl(hyper::error::ParseError),
HttpRequestFailed(hyper::error::Error),
DeserializationFailed(serde_json::Error),
}
#[derive(Debug)]
pub struct Cluster {
host: hyper::Url,
}
impl Cluster {
pub fn new(host: &str) -> Result<Cluster, Error> {
let url = try!(hyper::Url::parse(host).map_err(Error::InvalidUrl));
Ok(Cluster { host: url })
}
fn get(&self, path: &str) -> Result<Response, Error> {
let url = try!(self.host.join(path).map_err(Error::InvalidUrl));
Client::new().get(url).send().map_err(Error::HttpRequestFailed)
}
}
pub trait Events {
fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
where Event: Deserialize + Send + 'static;
fn generator<Event, Iter>(&self, iter: Iter) -> Receiver<Result<Event, Error>>
where Event: Deserialize + Send + 'static,
Iter: Iterator<Item = io::Result<u8>> + Send + 'static
{
let (tx, rx) = channel();
let stream = Deserializer::from_iter(iter).into_iter::<Event>();
thread::spawn(move || for event in stream {
if let Err(_) = tx.send(event.map_err(Error::DeserializationFailed)) {
break;
}
});
rx
}
}
impl Events for Cluster {
fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
where Event: Deserialize + Send + 'static
{
let path = format!("{}?watch=true", name);
let bytes = try!(self.get(&path)).bytes();
Ok(self.generator(bytes))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cluster() {
let cluster = Cluster::new("http://rust-lang.org");
assert!(cluster.is_ok());
}
#[test]
fn cluster_invalid_url() {
let cluster = Cluster::new("123.456.789.000");
assert!(matches!(cluster, Err(Error::InvalidUrl(_))));
}
#[test]
fn cluster_get() {
let cluster = Cluster::new("http://duckduckgo.com").unwrap();
let response = cluster.get("/rust");
assert!(response.is_ok());
}
#[test]
fn cluster_get_invalid_url() {
let cluster = Cluster::new("http://does.not").unwrap();
let response = cluster.get("/exist");
assert!(matches!(response, Err(Error::HttpRequestFailed(_))));
}
impl Events for &'static str {
#[allow(unused_variables)]
fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
where Event: Deserialize + Send + 'static
{
Ok(self.generator(self.bytes().into_iter().map(|b| Ok(b))))
}
}
#[derive(Deserialize, PartialEq, Eq, Debug)]
struct Point {
x: i32,
y: i32,
}
#[test]
fn events_generator() {
let mut events = r#"{"x": 1, "y": 2}{"x": 3, "y": 4}"#
.events::<Point>("points")
.unwrap()
.into_iter();
assert_eq!(events.next().unwrap().unwrap(), Point { x: 1, y: 2 });
assert_eq!(events.next().unwrap().unwrap(), Point { x: 3, y: 4 });
}
}