1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
//! # Scuttlebutt
//!
//! Scuttlebutt is an interface for extending kubernetes by feeding off a stream of kubernetes
//! cluster events

#[macro_use]
extern crate log;
extern crate hyper;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;

use hyper::{Client, Error as HttpError, Url};
use std::io::{self, Read};
use std::sync::mpsc::{channel, Receiver};
use std::thread;

// Kubernets cluster event
#[derive(Serialize, Deserialize, Debug)]
pub struct Event {
    pub object: Object,
    #[serde(rename = "type")]
    pub event_type: String,
}

/// A description of the event
#[derive(Serialize, Deserialize, Debug)]
pub struct Object {
    /// APIVersion defines the versioned schema of this representation of an object.
    /// Servers should convert recognized schemas to the latest internal value,
    /// and may reject unrecognized values. More info:
    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#resources
    #[serde(rename = "apiVersion")]
    pub api_version: String,
    /// The number of times this event has occurred.
    pub count: usize,
    /// The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
    #[serde(rename = "firstTimestamp")]
    pub first_timestamp: String,
    /// The time at which the most recent occurrence of this event was recorded.
    #[serde(rename = "lastTimestamp")]
    pub last_timestamp: String,
    /// The object that this event is about.
    #[serde(rename = "involvedObject")]
    pub involved_object: ObjectReference,
    /// Kind is a string value representing the REST resource this object represents.
    /// Servers may infer this from the endpoint the client submits requests to.
    /// Cannot be updated. In CamelCase. More info:
    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds
    pub kind: String,
    /// A human-readable description of the status of this operation.
    pub message: String,
    /// Standard object’s metadata. More info:
    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
    pub metadata: ObjectMeta,
    /// This should be a short, machine understandable string that gives the reason for the
    /// transition into the object’s current status.
    pub reason: String,
    /// The component reporting this event. Should be a short machine understandable string.
    pub source: EventSource,
    /// Type of this event (Normal, Warning), new types could be added in the future
    #[serde(rename = "type")]
    pub object_type: String,
}

/// ObjectMeta is metadata that all persisted resources must have, which includes all
/// objects users must create.
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectMeta {
    /// CreationTimestamp is a timestamp representing the server time when this object was
    // created. It is not guaranteed to be set in happens-before order across separate operations.
    // Clients may not set this value. It is represented in RFC3339 form and is in UTC.
    /// Populated by the system. Read-only. Null for lists. More info:
    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
    #[serde(rename = "creationTimestamp")]
    pub creation_timestamp: String,
    /// DeletionTimestamp is RFC 3339 date and time at which this resource will be deleted.
    /// This field is set by the server when a graceful deletion is requested by the user,
    // and is not directly settable by a client. The resource will be deleted (no longer visible
    // from resource lists, and not reachable by name) after the time in this field. Once set,
    /// this value may not be unset or be set further into the future, although it may be shortened
    /// or the resource may be deleted prior to this time. For example, a user may request that a
    /// pod is deleted in 30 seconds. The Kubelet will react by sending a graceful termination
    /// signal to the containers in the pod. Once the resource is deleted in the API, the Kubelet
    /// will send a hard termination signal to the container. If not set, graceful deletion of
    /// the object has not been requested.
    /// Populated by the system when a graceful deletion is requested. Read-only. More info:
    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#metadata
    #[serde(rename = "deletionTimestamp")]
    pub deletion_timestamp: Option<String>,
    /// Name must be unique within a namespace. Is required when creating resources, although
    /// some resources may allow a client to request the generation of an appropriate name
    /// automatically. Name is primarily intended for creation idempotence and configuration
    /// definition. Cannot be updated. More info:
    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names
    pub name: String,
    /// Namespace defines the space within each name must be unique. An empty namespace is
    /// equivalent to the "default" namespace, but "default" is the canonical representation.
    /// Not all objects are required to be scoped to a namespace - the value of this field for
    /// those objects will be empty.
    /// Must be a DNS_LABEL. Cannot be updated. More info:
    /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md
    pub namespace: String,
    /// An opaque value that represents the internal version of this object that can be used
    /// by clients to determine when objects have changed. May be used for optimistic concurrency,
    /// change detection, and the watch operation on a resource or set of resources.
    /// Clients must treat these values as opaque and passed unmodified back to the server.
    /// They may only be valid for a particular resource or set of resources.
    /// Populated by the system. Read-only. Value must be treated as opaque by clients
    #[serde(rename = "resourceVersion")]
    pub resource_version: String,
    /// SelfLink is a URL representing this object. Populated by the system. Read-only.
    #[serde(rename = "selfLink")]
    pub self_link: String,
    /// UID is the unique in time and space value for this object. It is typically generated by
    /// the server on successful creation of a resource and is not allowed to change on PUT
    /// operations.
    /// Populated by the system. Read-only. More info:
    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids
    pub uid: String,
}

/// EventSource contains information for an event.
#[derive(Serialize, Deserialize, Debug)]
pub struct EventSource {
    /// Component from which the event is generated.
    pub component: String,
    /// Host name on which the event is generated.
    pub host: Option<String>,
}

/// ObjectReference contains enough information to let you inspect or modify the referred object.
#[derive(Serialize, Deserialize, Debug)]
pub struct ObjectReference {
    /// API version of the referent.
    #[serde(rename = "apiVersion")]
    pub api_version: String,
    /// Specific resourceVersion to which this reference is made, if any.
    #[serde(rename = "resourceVersion")]
    pub resource_version: String,
    /// UID of the referent. More info:
    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#uids
    pub uid: String,
    /// If referring to a piece of an object instead of an entire object,
    /// this string should contain a valid JSON/Go field access statement,
    /// such as desiredState.manifest.containers[2]. For example, if the object reference
    /// is to a container within a pod, this would take on a value like: "spec.containers{name}"
    /// (where "name" refers to the name of the container that triggered the event) or if no
    /// container name is specified "spec.containers[2]" (container with index 2 in this pod).
    /// This syntax is chosen only to have some well-defined way of referencing a part of an
    /// object.
    #[serde(rename = "fieldPath")]
    pub field_path: Option<String>,
    /// Kind of the referent. More info:
    /// http://releases.k8s.io/release-1.3/docs/devel/api-conventions.md#types-kinds
    pub kind: String,
    /// Name of the referent. More info:
    /// http://releases.k8s.io/release-1.3/docs/user-guide/identifiers.md#names
    pub name: String,
    /// Namespace of the referent. More info:
    /// http://releases.k8s.io/release-1.3/docs/user-guide/namespaces.md
    pub namespace: String,
}

const DEFAULT_HOST: &'static str = "http://localhost:8001";

pub type Result<T> = std::result::Result<T, Error>;

/// An enumeratation of potential errors
#[derive(Debug)]
pub enum Error {
    Transport(HttpError),
}

impl From<HttpError> for Error {
    fn from(error: HttpError) -> Error {
        Error::Transport(error)
    }
}

/// A cluster contains an address
/// for interacting with a kubernetes Cluster
/// of nodes
pub struct Cluster {
    host: Url,
}

/// Events provides a means for generating
/// a receiver for events
pub trait Events {
    fn events(&mut self) -> Result<Receiver<Event>>;

    fn generator<Bytes>(&self, bytes: Bytes) -> Result<Receiver<Event>>
    where
        Bytes: 'static + Iterator<Item = io::Result<u8>>,
        Bytes: Send,
    {
        let (tx, rx) = channel();
        let stream = serde_json::Deserializer::from_iter(bytes).into_iter::<Event>();
        thread::spawn(
            move || for e in stream {
                match e {
                    Ok(event) => {
                        if let Err(e) = tx.send(event) {
                            debug!("{:#?}", e);
                            break;
                        }
                    }
                    Err(e) => {
                        debug!("{:#?}", e);
                        break;
                    }
                }
            },
        );
        Ok(rx)
    }
}

impl Cluster {
    pub fn new() -> Cluster {
        Cluster { host: Url::parse(DEFAULT_HOST).unwrap() }
    }
}

impl Events for Cluster {
    fn events(&mut self) -> Result<Receiver<Event>> {
        let res = try!(
            Client::new()
                .get(self.host.join("/api/v1/events?watch=true").unwrap())
                .send()
        );
        self.generator(res.bytes())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::mpsc::Receiver;
    #[test]
    fn events_generator() {
        impl Events for &'static str {
            fn events(&mut self) -> Result<Receiver<Event>> {
                self.generator(self.bytes().into_iter().map(|b| Ok(b)))
            }
        }
        let events = r#"{
            "object":{
                "apiVersion": "1",
                "count": 1,
                "firstTimestamp": "...",
                "lastTimestamp": "...",
                "kind":"Event",
                "message":"test",
                "involvedObject": {
                    "apiVersion": "1",
                    "resourceVersion": "2",
                    "uid":"2",
                    "kind": "POD",
                    "name": "test_name",
                    "namespace": "test_namespace"
                },
                "metadata": {
                    "creationTimestamp": "...",
                    "deletionTimestamp": "...",
                    "name": "test",
                    "namespace":"default",
                    "resourceVersion": "1",
                    "selfLink": "...",
                    "uid": "1"
                },
                "reason": "started",
                "source": {
                    "component": "test",
                    "host": "foo.com"
                },
                "type": "Normal"
            },
            "type":"ADDED"
        }"#
                .events();
        assert!(
            events
                .unwrap()
                .into_iter()
                .map(|e| e.object.involved_object.namespace)
                .nth(0) == Some("test_namespace".to_owned())
        )
    }
}