kubewatch/
lib.rs

1//! # Kubernetes Event Watcher
2//!
3//! This library serves as a base component for Kubernetes event watching. It allows user to
4//! specify which resource should be monitored. Deserialization of events is done via
5//! [Serde](https://serde.rs/), thanks to it it is possible to use both dynamic on-the-fly
6//! deserialization and also beforehand generated Deserializer for specific structure.
7//!
8//! ## Example
9//! ```rust,no_run
10//! extern crate kubewatch;
11//! extern crate serde_json;
12//!
13//! use kubewatch::Events;
14//!
15//! fn main() {
16//!     let cluster = kubewatch::Cluster::new("http://localhost:8080").unwrap();
17//!     let events = cluster.events::<serde_json::Value>("pods").unwrap();
18//!     for event in events.into_iter() {
19//!         println!("{:#?}", event);
20//!     }
21//! }
22//! ```
23//! Check for more in `examples/`.
24
25extern crate hyper;
26extern crate serde_json;
27extern crate serde;
28
29#[cfg(test)]
30#[macro_use]
31extern crate matches;
32#[cfg(test)]
33#[macro_use]
34extern crate serde_derive;
35
36use hyper::client::Client;
37use hyper::client::response::Response;
38use serde_json::Deserializer;
39use serde::Deserialize;
40use std::io::{self, Read};
41use std::sync::mpsc::{channel, Receiver};
42use std::thread;
43
44/// Covers all errors returned by `kubewatch`.
45#[derive(Debug)]
46pub enum Error {
47    /// Failed to parse given URL, check inner `ParseError` for more info.
48    InvalidUrl(hyper::error::ParseError),
49    /// HTTP request failed (does not apply to non-2xx status), check inner `Error` for more info.
50    HttpRequestFailed(hyper::error::Error),
51    /// Failed while deserializating an event from JSON to Rust.
52    DeserializationFailed(serde_json::Error),
53}
54
55/// Represents connection to Kubernetes API server.
56#[derive(Debug)]
57pub struct Cluster {
58    host: hyper::Url,
59}
60
61impl Cluster {
62    /// Initialize `Cluster` with host address and port (e.g. http://127.0.0.1:8080).
63    ///
64    /// ```
65    /// let cluster = kubewatch::Cluster::new("http://127.0.0.1:8080").unwrap();
66    /// ```
67    pub fn new(host: &str) -> Result<Cluster, Error> {
68        let url = try!(hyper::Url::parse(host).map_err(Error::InvalidUrl));
69        Ok(Cluster { host: url })
70    }
71
72    /// Run HTTP GET request on given path (will be joined to `Cluster` URL).
73    fn get(&self, path: &str) -> Result<Response, Error> {
74        let url = try!(self.host.join(path).map_err(Error::InvalidUrl));
75        Client::new().get(url).send().map_err(Error::HttpRequestFailed)
76    }
77}
78
79/// This trait is used to deserialize input stream and return respective Rust structs.
80pub trait Events {
81    /// Read monitor of events with given `name` and return them as given `Event` structure.
82    fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
83        where Event: Deserialize + Send + 'static;
84
85    /// Helper which reads a byte iterator, deserializes it and return respective structures.
86    fn generator<Event, Iter>(&self, iter: Iter) -> Receiver<Result<Event, Error>>
87        where Event: Deserialize + Send + 'static,
88              Iter: Iterator<Item = io::Result<u8>> + Send + 'static
89    {
90        let (tx, rx) = channel();
91        let stream = Deserializer::from_iter(iter).into_iter::<Event>();
92        thread::spawn(move || for event in stream {
93            if let Err(_) = tx.send(event.map_err(Error::DeserializationFailed)) {
94                break;
95            }
96        });
97        rx
98    }
99}
100
101/// Read event monitor from Kubernetes API server.
102impl Events for Cluster {
103    fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
104        where Event: Deserialize + Send + 'static
105    {
106        let path = format!("{}?watch=true", name);
107        let bytes = try!(self.get(&path)).bytes();
108        Ok(self.generator(bytes))
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[test]
117    fn cluster() {
118        let cluster = Cluster::new("http://rust-lang.org");
119        assert!(cluster.is_ok());
120    }
121
122    #[test]
123    fn cluster_invalid_url() {
124        let cluster = Cluster::new("123.456.789.000");
125        assert!(matches!(cluster, Err(Error::InvalidUrl(_))));
126    }
127
128    #[test]
129    fn cluster_get() {
130        let cluster = Cluster::new("http://duckduckgo.com").unwrap();
131        let response = cluster.get("/rust");
132        assert!(response.is_ok());
133    }
134
135    #[test]
136    fn cluster_get_invalid_url() {
137        let cluster = Cluster::new("http://does.not").unwrap();
138        let response = cluster.get("/exist");
139        assert!(matches!(response, Err(Error::HttpRequestFailed(_))));
140    }
141
142    impl Events for &'static str {
143        #[allow(unused_variables)] 
144        fn events<Event>(&self, name: &str) -> Result<Receiver<Result<Event, Error>>, Error>
145            where Event: Deserialize + Send + 'static
146        {
147            Ok(self.generator(self.bytes().into_iter().map(|b| Ok(b))))
148        }
149    }
150
151    #[derive(Deserialize, PartialEq, Eq, Debug)]
152    struct Point {
153        x: i32,
154        y: i32,
155    }
156
157    #[test]
158    fn events_generator() {
159        let mut events = r#"{"x": 1, "y": 2}{"x": 3, "y": 4}"#
160            .events::<Point>("points")
161            .unwrap()
162            .into_iter();
163        assert_eq!(events.next().unwrap().unwrap(), Point { x: 1, y: 2 });
164        assert_eq!(events.next().unwrap().unwrap(), Point { x: 3, y: 4 });
165    }
166}