#[macro_use]
extern crate log;
extern crate hyper;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;
use hyper::{Client, Error as HttpError, Url};
use std::io::{self, Read};
use std::sync::mpsc::{channel, Receiver};
use std::thread;
#[derive(Serialize, Deserialize, Debug)]
pub struct Event {
pub object: Object,
#[serde(rename = "type")]
pub event_type: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Object {
#[serde(rename = "apiVersion")]
pub api_version: String,
pub count: usize,
#[serde(rename = "firstTimestamp")]
pub first_timestamp: String,
#[serde(rename = "lastTimestamp")]
pub last_timestamp: String,
#[serde(rename = "involvedObject")]
pub involved_object: ObjectReference,
pub kind: String,
pub message: String,
pub metadata: ObjectMeta,
pub reason: String,
pub source: EventSource,
#[serde(rename = "type")]
pub object_type: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectMeta {
#[serde(rename = "creationTimestamp")]
pub creation_timestamp: String,
#[serde(rename = "deletionTimestamp")]
pub deletion_timestamp: Option<String>,
pub name: String,
pub namespace: String,
#[serde(rename = "resourceVersion")]
pub resource_version: String,
#[serde(rename = "selfLink")]
pub self_link: String,
pub uid: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct EventSource {
pub component: String,
pub host: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectReference {
#[serde(rename = "apiVersion")]
pub api_version: String,
#[serde(rename = "resourceVersion")]
pub resource_version: String,
pub uid: String,
#[serde(rename = "fieldPath")]
pub field_path: Option<String>,
pub kind: String,
pub name: String,
pub namespace: String,
}
const DEFAULT_HOST: &'static str = "http://localhost:8001";
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub enum Error {
Transport(HttpError),
}
impl From<HttpError> for Error {
fn from(error: HttpError) -> Error {
Error::Transport(error)
}
}
pub struct Cluster {
host: Url,
}
pub trait Events {
fn events(&mut self) -> Result<Receiver<Event>>;
fn generator<Bytes>(&self, bytes: Bytes) -> Result<Receiver<Event>>
where
Bytes: 'static + Iterator<Item = io::Result<u8>>,
Bytes: Send,
{
let (tx, rx) = channel();
let stream = serde_json::Deserializer::from_iter(bytes).into_iter::<Event>();
thread::spawn(
move || for e in stream {
match e {
Ok(event) => {
if let Err(e) = tx.send(event) {
debug!("{:#?}", e);
break;
}
}
Err(e) => {
debug!("{:#?}", e);
break;
}
}
},
);
Ok(rx)
}
}
impl Cluster {
pub fn new() -> Cluster {
Cluster { host: Url::parse(DEFAULT_HOST).unwrap() }
}
}
impl Events for Cluster {
fn events(&mut self) -> Result<Receiver<Event>> {
let res = try!(
Client::new()
.get(self.host.join("/api/v1/events?watch=true").unwrap())
.send()
);
self.generator(res.bytes())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::mpsc::Receiver;
#[test]
fn events_generator() {
impl Events for &'static str {
fn events(&mut self) -> Result<Receiver<Event>> {
self.generator(self.bytes().into_iter().map(|b| Ok(b)))
}
}
let events = r#"{
"object":{
"apiVersion": "1",
"count": 1,
"firstTimestamp": "...",
"lastTimestamp": "...",
"kind":"Event",
"message":"test",
"involvedObject": {
"apiVersion": "1",
"resourceVersion": "2",
"uid":"2",
"kind": "POD",
"name": "test_name",
"namespace": "test_namespace"
},
"metadata": {
"creationTimestamp": "...",
"deletionTimestamp": "...",
"name": "test",
"namespace":"default",
"resourceVersion": "1",
"selfLink": "...",
"uid": "1"
},
"reason": "started",
"source": {
"component": "test",
"host": "foo.com"
},
"type": "Normal"
},
"type":"ADDED"
}"#
.events();
assert!(
events
.unwrap()
.into_iter()
.map(|e| e.object.involved_object.namespace)
.nth(0) == Some("test_namespace".to_owned())
)
}
}