scuttlebutt/
lib.rs

1//! # Scuttlebutt
2//!
3//! Scuttlebutt is an interface for extending kubernetes by feeding off a stream of kubernetes
4//! cluster events
5
6#[macro_use]
7extern crate log;
8extern crate hyper;
9extern crate serde_json;
10#[macro_use]
11extern crate serde_derive;
12
13use hyper::{Client, Error as HttpError, Url};
14use std::io::{self, Read};
15use std::sync::mpsc::{channel, Receiver};
16use std::thread;
17
18// Kubernets cluster event
19#[derive(Serialize, Deserialize, Debug)]
20pub struct Event {
21    pub object: Object,
22    #[serde(rename = "type")]
23    pub event_type: String,
24}
25
26/// A description of the event
27#[derive(Serialize, Deserialize, Debug)]
28pub struct Object {
29    /// APIVersion defines the versioned schema of this representation of an object.
30    /// Servers should convert recognized schemas to the latest internal value,
31    /// and may reject unrecognized values. More info:
32    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#resources
33    #[serde(rename = "apiVersion")]
34    pub api_version: String,
35    /// The number of times this event has occurred.
36    pub count: usize,
37    /// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
38    #[serde(rename = "firstTimestamp")]
39    pub first_timestamp: String,
40    /// The time at which the most recent occurrence of this event was recorded.
41    #[serde(rename = "lastTimestamp")]
42    pub last_timestamp: String,
43    /// The object that this event is about.
44    #[serde(rename = "involvedObject")]
45    pub involved_object: ObjectReference,
46    /// Kind is a string value representing the REST resource this object represents.
47    /// Servers may infer this from the endpoint the client submits requests to.
48    /// Cannot be updated. In CamelCase. More info:
49    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds
50    pub kind: String,
51    /// A human-readable description of the status of this operation.
52    pub message: String,
53    /// Standard object’s metadata. More info:
54    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
55    pub metadata: ObjectMeta,
56    /// This should be a short, machine understandable string that gives the reason for the
57    /// transition into the object’s current status.
58    pub reason: String,
59    /// The component reporting this event. Should be a short machine understandable string.
60    pub source: EventSource,
61    /// Type of this event (Normal, Warning), new types could be added in the future
62    #[serde(rename = "type")]
63    pub object_type: String,
64}
65
66/// ObjectMeta is metadata that all persisted resources must have, which includes all
67/// objects users must create.
68#[derive(Serialize, Deserialize, Debug)]
69pub struct ObjectMeta {
70    /// CreationTimestamp is a timestamp representing the server time when this object was
71    // created. It is not guaranteed to be set in happens-before order across separate operations.
72    // Clients may not set this value. It is represented in RFC3339 form and is in UTC.
73    /// Populated by the system. Read-only. Null for lists. More info:
74    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
75    #[serde(rename = "creationTimestamp")]
76    pub creation_timestamp: String,
77    /// DeletionTimestamp is RFC 3339 date and time at which this resource will be deleted.
78    /// This field is set by the server when a graceful deletion is requested by the user,
79    // and is not directly settable by a client. The resource will be deleted (no longer visible
80    // from resource lists, and not reachable by name) after the time in this field. Once set,
81    /// this value may not be unset or be set further into the future, although it may be shortened
82    /// or the resource may be deleted prior to this time. For example, a user may request that a
83    /// pod is deleted in 30 seconds. The Kubelet will react by sending a graceful termination
84    /// signal to the containers in the pod. Once the resource is deleted in the API, the Kubelet
85    /// will send a hard termination signal to the container. If not set, graceful deletion of
86    /// the object has not been requested.
87    /// Populated by the system when a graceful deletion is requested. Read-only. More info:
88    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
89    #[serde(rename = "deletionTimestamp")]
90    pub deletion_timestamp: Option<String>,
91    /// Name must be unique within a namespace. Is required when creating resources, although
92    /// some resources may allow a client to request the generation of an appropriate name
93    /// automatically. Name is primarily intended for creation idempotence and configuration
94    /// definition. Cannot be updated. More info:
95    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names
96    pub name: String,
97    /// Namespace defines the space within each name must be unique. An empty namespace is
98    /// equivalent to the "default" namespace, but "default" is the canonical representation.
99    /// Not all objects are required to be scoped to a namespace - the value of this field for
100    /// those objects will be empty.
101    /// Must be a DNS_LABEL. Cannot be updated. More info:
102    /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md
103    pub namespace: String,
104    /// An opaque value that represents the internal version of this object that can be used
105    /// by clients to determine when objects have changed. May be used for optimistic concurrency,
106    /// change detection, and the watch operation on a resource or set of resources.
107    /// Clients must treat these values as opaque and passed unmodified back to the server.
108    /// They may only be valid for a particular resource or set of resources.
109    /// Populated by the system. Read-only. Value must be treated as opaque by clients
110    #[serde(rename = "resourceVersion")]
111    pub resource_version: String,
112    /// SelfLink is a URL representing this object. Populated by the system. Read-only.
113    #[serde(rename = "selfLink")]
114    pub self_link: String,
115    /// UID is the unique in time and space value for this object. It is typically generated by
116    /// the server on successful creation of a resource and is not allowed to change on PUT
117    /// operations.
118    /// Populated by the system. Read-only. More info:
119    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids
120    pub uid: String,
121}
122
123/// EventSource contains information for an event.
124#[derive(Serialize, Deserialize, Debug)]
125pub struct EventSource {
126    /// Component from which the event is generated.
127    pub component: String,
128    /// Host name on which the event is generated.
129    pub host: Option<String>,
130}
131
132/// ObjectReference contains enough information to let you inspect or modify the referred object.
133#[derive(Serialize, Deserialize, Debug)]
134pub struct ObjectReference {
135    /// API version of the referent.
136    #[serde(rename = "apiVersion")]
137    pub api_version: String,
138    /// Specific resourceVersion to which this reference is made, if any.
139    #[serde(rename = "resourceVersion")]
140    pub resource_version: String,
141    /// UID of the referent. More info:
142    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids
143    pub uid: String,
144    /// If referring to a piece of an object instead of an entire object,
145    /// this string should contain a valid JSON/Go field access statement,
146    /// such as desiredState.manifest.containers[2]. For example, if the object reference
147    /// is to a container within a pod, this would take on a value like: "spec.containers{name}"
148    /// (where "name" refers to the name of the container that triggered the event) or if no
149    /// container name is specified "spec.containers[2]" (container with index 2 in this pod).
150    /// This syntax is chosen only to have some well-defined way of referencing a part of an
151    /// object.
152    #[serde(rename = "fieldPath")]
153    pub field_path: Option<String>,
154    /// Kind of the referent. More info:
155    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds
156    pub kind: String,
157    /// Name of the referent. More info:
158    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names
159    pub name: String,
160    /// Namespace of the referent. More info:
161    /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md
162    pub namespace: String,
163}
164
165const DEFAULT_HOST: &'static str = "http://localhost:8001";
166
167pub type Result<T> = std::result::Result<T, Error>;
168
169/// An enumeratation of potential errors
170#[derive(Debug)]
171pub enum Error {
172    Transport(HttpError),
173}
174
175impl From<HttpError> for Error {
176    fn from(error: HttpError) -> Error {
177        Error::Transport(error)
178    }
179}
180
181/// A cluster contains an address
182/// for interacting with a kubernetes Cluster
183/// of nodes
184pub struct Cluster {
185    host: Url,
186}
187
188/// Events provides a means for generating
189/// a receiver for events
190pub trait Events {
191    fn events(&mut self) -> Result<Receiver<Event>>;
192
193    fn generator<Bytes>(&self, bytes: Bytes) -> Result<Receiver<Event>>
194    where
195        Bytes: 'static + Iterator<Item = io::Result<u8>>,
196        Bytes: Send,
197    {
198        let (tx, rx) = channel();
199        let stream = serde_json::Deserializer::from_iter(bytes).into_iter::<Event>();
200        thread::spawn(
201            move || for e in stream {
202                match e {
203                    Ok(event) => {
204                        if let Err(e) = tx.send(event) {
205                            debug!("{:#?}", e);
206                            break;
207                        }
208                    }
209                    Err(e) => {
210                        debug!("{:#?}", e);
211                        break;
212                    }
213                }
214            },
215        );
216        Ok(rx)
217    }
218}
219
220impl Cluster {
221    pub fn new() -> Cluster {
222        Cluster { host: Url::parse(DEFAULT_HOST).unwrap() }
223    }
224}
225
226impl Events for Cluster {
227    fn events(&mut self) -> Result<Receiver<Event>> {
228        let res = try!(
229            Client::new()
230                .get(self.host.join("/api/v1/events?watch=true").unwrap())
231                .send()
232        );
233        self.generator(res.bytes())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use std::sync::mpsc::Receiver;
241    #[test]
242    fn events_generator() {
243        impl Events for &'static str {
244            fn events(&mut self) -> Result<Receiver<Event>> {
245                self.generator(self.bytes().into_iter().map(|b| Ok(b)))
246            }
247        }
248        let events = r#"{
249            "object":{
250                "apiVersion": "1",
251                "count": 1,
252                "firstTimestamp": "...",
253                "lastTimestamp": "...",
254                "kind":"Event",
255                "message":"test",
256                "involvedObject": {
257                    "apiVersion": "1",
258                    "resourceVersion": "2",
259                    "uid":"2",
260                    "kind": "POD",
261                    "name": "test_name",
262                    "namespace": "test_namespace"
263                },
264                "metadata": {
265                    "creationTimestamp": "...",
266                    "deletionTimestamp": "...",
267                    "name": "test",
268                    "namespace":"default",
269                    "resourceVersion": "1",
270                    "selfLink": "...",
271                    "uid": "1"
272                },
273                "reason": "started",
274                "source": {
275                    "component": "test",
276                    "host": "foo.com"
277                },
278                "type": "Normal"
279            },
280            "type":"ADDED"
281        }"#
282                .events();
283        assert!(
284            events
285                .unwrap()
286                .into_iter()
287                .map(|e| e.object.involved_object.namespace)
288                .nth(0) == Some("test_namespace".to_owned())
289        )
290    }
291}