scuttlebutt/lib.rs
1//! # Scuttlebutt
2//!
3//! Scuttlebutt is an interface for extending kubernetes by feeding off a stream of kubernetes
4//! cluster events
5
6#[macro_use]
7extern crate log;
8extern crate hyper;
9extern crate serde_json;
10#[macro_use]
11extern crate serde_derive;
12
13use hyper::{Client, Error as HttpError, Url};
14use std::io::{self, Read};
15use std::sync::mpsc::{channel, Receiver};
16use std::thread;
17
18// Kubernets cluster event
19#[derive(Serialize, Deserialize, Debug)]
20pub struct Event {
21 pub object: Object,
22 #[serde(rename = "type")]
23 pub event_type: String,
24}
25
26/// A description of the event
27#[derive(Serialize, Deserialize, Debug)]
28pub struct Object {
29 /// APIVersion defines the versioned schema of this representation of an object.
30 /// Servers should convert recognized schemas to the latest internal value,
31 /// and may reject unrecognized values. More info:
32 /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#resources
33 #[serde(rename = "apiVersion")]
34 pub api_version: String,
35 /// The number of times this event has occurred.
36 pub count: usize,
37 /// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
38 #[serde(rename = "firstTimestamp")]
39 pub first_timestamp: String,
40 /// The time at which the most recent occurrence of this event was recorded.
41 #[serde(rename = "lastTimestamp")]
42 pub last_timestamp: String,
43 /// The object that this event is about.
44 #[serde(rename = "involvedObject")]
45 pub involved_object: ObjectReference,
46 /// Kind is a string value representing the REST resource this object represents.
47 /// Servers may infer this from the endpoint the client submits requests to.
48 /// Cannot be updated. In CamelCase. More info:
49 /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds
50 pub kind: String,
51 /// A human-readable description of the status of this operation.
52 pub message: String,
53 /// Standard object’s metadata. More info:
54 /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
55 pub metadata: ObjectMeta,
56 /// This should be a short, machine understandable string that gives the reason for the
57 /// transition into the object’s current status.
58 pub reason: String,
59 /// The component reporting this event. Should be a short machine understandable string.
60 pub source: EventSource,
61 /// Type of this event (Normal, Warning), new types could be added in the future
62 #[serde(rename = "type")]
63 pub object_type: String,
64}
65
66/// ObjectMeta is metadata that all persisted resources must have, which includes all
67/// objects users must create.
68#[derive(Serialize, Deserialize, Debug)]
69pub struct ObjectMeta {
70 /// CreationTimestamp is a timestamp representing the server time when this object was
71 // created. It is not guaranteed to be set in happens-before order across separate operations.
72 // Clients may not set this value. It is represented in RFC3339 form and is in UTC.
73 /// Populated by the system. Read-only. Null for lists. More info:
74 /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
75 #[serde(rename = "creationTimestamp")]
76 pub creation_timestamp: String,
77 /// DeletionTimestamp is RFC 3339 date and time at which this resource will be deleted.
78 /// This field is set by the server when a graceful deletion is requested by the user,
79 // and is not directly settable by a client. The resource will be deleted (no longer visible
80 // from resource lists, and not reachable by name) after the time in this field. Once set,
81 /// this value may not be unset or be set further into the future, although it may be shortened
82 /// or the resource may be deleted prior to this time. For example, a user may request that a
83 /// pod is deleted in 30 seconds. The Kubelet will react by sending a graceful termination
84 /// signal to the containers in the pod. Once the resource is deleted in the API, the Kubelet
85 /// will send a hard termination signal to the container. If not set, graceful deletion of
86 /// the object has not been requested.
87 /// Populated by the system when a graceful deletion is requested. Read-only. More info:
88 /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
89 #[serde(rename = "deletionTimestamp")]
90 pub deletion_timestamp: Option<String>,
91 /// Name must be unique within a namespace. Is required when creating resources, although
92 /// some resources may allow a client to request the generation of an appropriate name
93 /// automatically. Name is primarily intended for creation idempotence and configuration
94 /// definition. Cannot be updated. More info:
95 /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names
96 pub name: String,
97 /// Namespace defines the space within each name must be unique. An empty namespace is
98 /// equivalent to the "default" namespace, but "default" is the canonical representation.
99 /// Not all objects are required to be scoped to a namespace - the value of this field for
100 /// those objects will be empty.
101 /// Must be a DNS_LABEL. Cannot be updated. More info:
102 /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md
103 pub namespace: String,
104 /// An opaque value that represents the internal version of this object that can be used
105 /// by clients to determine when objects have changed. May be used for optimistic concurrency,
106 /// change detection, and the watch operation on a resource or set of resources.
107 /// Clients must treat these values as opaque and passed unmodified back to the server.
108 /// They may only be valid for a particular resource or set of resources.
109 /// Populated by the system. Read-only. Value must be treated as opaque by clients
110 #[serde(rename = "resourceVersion")]
111 pub resource_version: String,
112 /// SelfLink is a URL representing this object. Populated by the system. Read-only.
113 #[serde(rename = "selfLink")]
114 pub self_link: String,
115 /// UID is the unique in time and space value for this object. It is typically generated by
116 /// the server on successful creation of a resource and is not allowed to change on PUT
117 /// operations.
118 /// Populated by the system. Read-only. More info:
119 /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids
120 pub uid: String,
121}
122
123/// EventSource contains information for an event.
124#[derive(Serialize, Deserialize, Debug)]
125pub struct EventSource {
126 /// Component from which the event is generated.
127 pub component: String,
128 /// Host name on which the event is generated.
129 pub host: Option<String>,
130}
131
132/// ObjectReference contains enough information to let you inspect or modify the referred object.
133#[derive(Serialize, Deserialize, Debug)]
134pub struct ObjectReference {
135 /// API version of the referent.
136 #[serde(rename = "apiVersion")]
137 pub api_version: String,
138 /// Specific resourceVersion to which this reference is made, if any.
139 #[serde(rename = "resourceVersion")]
140 pub resource_version: String,
141 /// UID of the referent. More info:
142 /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids
143 pub uid: String,
144 /// If referring to a piece of an object instead of an entire object,
145 /// this string should contain a valid JSON/Go field access statement,
146 /// such as desiredState.manifest.containers[2]. For example, if the object reference
147 /// is to a container within a pod, this would take on a value like: "spec.containers{name}"
148 /// (where "name" refers to the name of the container that triggered the event) or if no
149 /// container name is specified "spec.containers[2]" (container with index 2 in this pod).
150 /// This syntax is chosen only to have some well-defined way of referencing a part of an
151 /// object.
152 #[serde(rename = "fieldPath")]
153 pub field_path: Option<String>,
154 /// Kind of the referent. More info:
155 /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds
156 pub kind: String,
157 /// Name of the referent. More info:
158 /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names
159 pub name: String,
160 /// Namespace of the referent. More info:
161 /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md
162 pub namespace: String,
163}
164
165const DEFAULT_HOST: &'static str = "http://localhost:8001";
166
167pub type Result<T> = std::result::Result<T, Error>;
168
169/// An enumeratation of potential errors
170#[derive(Debug)]
171pub enum Error {
172 Transport(HttpError),
173}
174
175impl From<HttpError> for Error {
176 fn from(error: HttpError) -> Error {
177 Error::Transport(error)
178 }
179}
180
181/// A cluster contains an address
182/// for interacting with a kubernetes Cluster
183/// of nodes
184pub struct Cluster {
185 host: Url,
186}
187
188/// Events provides a means for generating
189/// a receiver for events
190pub trait Events {
191 fn events(&mut self) -> Result<Receiver<Event>>;
192
193 fn generator<Bytes>(&self, bytes: Bytes) -> Result<Receiver<Event>>
194 where
195 Bytes: 'static + Iterator<Item = io::Result<u8>>,
196 Bytes: Send,
197 {
198 let (tx, rx) = channel();
199 let stream = serde_json::Deserializer::from_iter(bytes).into_iter::<Event>();
200 thread::spawn(
201 move || for e in stream {
202 match e {
203 Ok(event) => {
204 if let Err(e) = tx.send(event) {
205 debug!("{:#?}", e);
206 break;
207 }
208 }
209 Err(e) => {
210 debug!("{:#?}", e);
211 break;
212 }
213 }
214 },
215 );
216 Ok(rx)
217 }
218}
219
220impl Cluster {
221 pub fn new() -> Cluster {
222 Cluster { host: Url::parse(DEFAULT_HOST).unwrap() }
223 }
224}
225
226impl Events for Cluster {
227 fn events(&mut self) -> Result<Receiver<Event>> {
228 let res = try!(
229 Client::new()
230 .get(self.host.join("/api/v1/events?watch=true").unwrap())
231 .send()
232 );
233 self.generator(res.bytes())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use std::sync::mpsc::Receiver;
241 #[test]
242 fn events_generator() {
243 impl Events for &'static str {
244 fn events(&mut self) -> Result<Receiver<Event>> {
245 self.generator(self.bytes().into_iter().map(|b| Ok(b)))
246 }
247 }
248 let events = r#"{
249 "object":{
250 "apiVersion": "1",
251 "count": 1,
252 "firstTimestamp": "...",
253 "lastTimestamp": "...",
254 "kind":"Event",
255 "message":"test",
256 "involvedObject": {
257 "apiVersion": "1",
258 "resourceVersion": "2",
259 "uid":"2",
260 "kind": "POD",
261 "name": "test_name",
262 "namespace": "test_namespace"
263 },
264 "metadata": {
265 "creationTimestamp": "...",
266 "deletionTimestamp": "...",
267 "name": "test",
268 "namespace":"default",
269 "resourceVersion": "1",
270 "selfLink": "...",
271 "uid": "1"
272 },
273 "reason": "started",
274 "source": {
275 "component": "test",
276 "host": "foo.com"
277 },
278 "type": "Normal"
279 },
280 "type":"ADDED"
281 }"#
282 .events();
283 assert!(
284 events
285 .unwrap()
286 .into_iter()
287 .map(|e| e.object.involved_object.namespace)
288 .nth(0) == Some("test_namespace".to_owned())
289 )
290 }
291}