1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
//! # Scuttlebutt //! //! Scuttlebutt is an interface for extending kubernetes by feeding off a stream of kubernetes //! cluster events #[macro_use] extern crate log; extern crate hyper; extern crate serde_json; #[macro_use] extern crate serde_derive; use hyper::{Client, Error as HttpError, Url}; use std::io::{self, Read}; use std::sync::mpsc::{channel, Receiver}; use std::thread; // Kubernets cluster event #[derive(Serialize, Deserialize, Debug)] pub struct Event { pub object: Object, #[serde(rename = "type")] pub event_type: String, } /// A description of the event #[derive(Serialize, Deserialize, Debug)] pub struct Object { /// APIVersion defines the versioned schema of this representation of an object. /// Servers should convert recognized schemas to the latest internal value, /// and may reject unrecognized values. More info: /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#resources #[serde(rename = "apiVersion")] pub api_version: String, /// The number of times this event has occurred. pub count: usize, /// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.) #[serde(rename = "firstTimestamp")] pub first_timestamp: String, /// The time at which the most recent occurrence of this event was recorded. #[serde(rename = "lastTimestamp")] pub last_timestamp: String, /// The object that this event is about. #[serde(rename = "involvedObject")] pub involved_object: ObjectReference, /// Kind is a string value representing the REST resource this object represents. /// Servers may infer this from the endpoint the client submits requests to. /// Cannot be updated. In CamelCase. More info: /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds pub kind: String, /// A human-readable description of the status of this operation. pub message: String, /// Standard object’s metadata. More info: /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata pub metadata: ObjectMeta, /// This should be a short, machine understandable string that gives the reason for the /// transition into the object’s current status. pub reason: String, /// The component reporting this event. Should be a short machine understandable string. pub source: EventSource, /// Type of this event (Normal, Warning), new types could be added in the future #[serde(rename = "type")] pub object_type: String, } /// ObjectMeta is metadata that all persisted resources must have, which includes all /// objects users must create. #[derive(Serialize, Deserialize, Debug)] pub struct ObjectMeta { /// CreationTimestamp is a timestamp representing the server time when this object was // created. It is not guaranteed to be set in happens-before order across separate operations. // Clients may not set this value. It is represented in RFC3339 form and is in UTC. /// Populated by the system. Read-only. Null for lists. More info: /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata #[serde(rename = "creationTimestamp")] pub creation_timestamp: String, /// DeletionTimestamp is RFC 3339 date and time at which this resource will be deleted. /// This field is set by the server when a graceful deletion is requested by the user, // and is not directly settable by a client. The resource will be deleted (no longer visible // from resource lists, and not reachable by name) after the time in this field. Once set, /// this value may not be unset or be set further into the future, although it may be shortened /// or the resource may be deleted prior to this time. For example, a user may request that a /// pod is deleted in 30 seconds. The Kubelet will react by sending a graceful termination /// signal to the containers in the pod. Once the resource is deleted in the API, the Kubelet /// will send a hard termination signal to the container. If not set, graceful deletion of /// the object has not been requested. /// Populated by the system when a graceful deletion is requested. Read-only. More info: /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata #[serde(rename = "deletionTimestamp")] pub deletion_timestamp: Option<String>, /// Name must be unique within a namespace. Is required when creating resources, although /// some resources may allow a client to request the generation of an appropriate name /// automatically. Name is primarily intended for creation idempotence and configuration /// definition. Cannot be updated. More info: /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names pub name: String, /// Namespace defines the space within each name must be unique. An empty namespace is /// equivalent to the "default" namespace, but "default" is the canonical representation. /// Not all objects are required to be scoped to a namespace - the value of this field for /// those objects will be empty. /// Must be a DNS_LABEL. Cannot be updated. More info: /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md pub namespace: String, /// An opaque value that represents the internal version of this object that can be used /// by clients to determine when objects have changed. May be used for optimistic concurrency, /// change detection, and the watch operation on a resource or set of resources. /// Clients must treat these values as opaque and passed unmodified back to the server. /// They may only be valid for a particular resource or set of resources. /// Populated by the system. Read-only. Value must be treated as opaque by clients #[serde(rename = "resourceVersion")] pub resource_version: String, /// SelfLink is a URL representing this object. Populated by the system. Read-only. #[serde(rename = "selfLink")] pub self_link: String, /// UID is the unique in time and space value for this object. It is typically generated by /// the server on successful creation of a resource and is not allowed to change on PUT /// operations. /// Populated by the system. Read-only. More info: /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids pub uid: String, } /// EventSource contains information for an event. #[derive(Serialize, Deserialize, Debug)] pub struct EventSource { /// Component from which the event is generated. pub component: String, /// Host name on which the event is generated. pub host: Option<String>, } /// ObjectReference contains enough information to let you inspect or modify the referred object. #[derive(Serialize, Deserialize, Debug)] pub struct ObjectReference { /// API version of the referent. #[serde(rename = "apiVersion")] pub api_version: String, /// Specific resourceVersion to which this reference is made, if any. #[serde(rename = "resourceVersion")] pub resource_version: String, /// UID of the referent. More info: /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids pub uid: String, /// If referring to a piece of an object instead of an entire object, /// this string should contain a valid JSON/Go field access statement, /// such as desiredState.manifest.containers[2]. For example, if the object reference /// is to a container within a pod, this would take on a value like: "spec.containers{name}" /// (where "name" refers to the name of the container that triggered the event) or if no /// container name is specified "spec.containers[2]" (container with index 2 in this pod). /// This syntax is chosen only to have some well-defined way of referencing a part of an /// object. #[serde(rename = "fieldPath")] pub field_path: Option<String>, /// Kind of the referent. More info: /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds pub kind: String, /// Name of the referent. More info: /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names pub name: String, /// Namespace of the referent. More info: /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md pub namespace: String, } const DEFAULT_HOST: &'static str = "http://localhost:8001"; pub type Result<T> = std::result::Result<T, Error>; /// An enumeratation of potential errors #[derive(Debug)] pub enum Error { Transport(HttpError), } impl From<HttpError> for Error { fn from(error: HttpError) -> Error { Error::Transport(error) } } /// A cluster contains an address /// for interacting with a kubernetes Cluster /// of nodes pub struct Cluster { host: Url, } /// Events provides a means for generating /// a receiver for events pub trait Events { fn events(&mut self) -> Result<Receiver<Event>>; fn generator<Bytes>(&self, bytes: Bytes) -> Result<Receiver<Event>> where Bytes: 'static + Iterator<Item = io::Result<u8>>, Bytes: Send, { let (tx, rx) = channel(); let stream = serde_json::Deserializer::from_iter(bytes).into_iter::<Event>(); thread::spawn( move || for e in stream { match e { Ok(event) => { if let Err(e) = tx.send(event) { debug!("{:#?}", e); break; } } Err(e) => { debug!("{:#?}", e); break; } } }, ); Ok(rx) } } impl Cluster { pub fn new() -> Cluster { Cluster { host: Url::parse(DEFAULT_HOST).unwrap() } } } impl Events for Cluster { fn events(&mut self) -> Result<Receiver<Event>> { let res = try!( Client::new() .get(self.host.join("/api/v1/events?watch=true").unwrap()) .send() ); self.generator(res.bytes()) } } #[cfg(test)] mod tests { use super::*; use std::sync::mpsc::Receiver; #[test] fn events_generator() { impl Events for &'static str { fn events(&mut self) -> Result<Receiver<Event>> { self.generator(self.bytes().into_iter().map(|b| Ok(b))) } } let events = r#"{ "object":{ "apiVersion": "1", "count": 1, "firstTimestamp": "...", "lastTimestamp": "...", "kind":"Event", "message":"test", "involvedObject": { "apiVersion": "1", "resourceVersion": "2", "uid":"2", "kind": "POD", "name": "test_name", "namespace": "test_namespace" }, "metadata": { "creationTimestamp": "...", "deletionTimestamp": "...", "name": "test", "namespace":"default", "resourceVersion": "1", "selfLink": "...", "uid": "1" }, "reason": "started", "source": { "component": "test", "host": "foo.com" }, "type": "Normal" }, "type":"ADDED" }"# .events(); assert!( events .unwrap() .into_iter() .map(|e| e.object.involved_object.namespace) .nth(0) == Some("test_namespace".to_owned()) ) } }