mesos 0.2.3

Mesos library using the new HTTP API
Documentation
use std::io;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use std::thread;

use client::SchedulerClient;
use recordio::RecordIOCodec;
use proto::mesos::FrameworkID;
use proto::scheduler::*;
use Scheduler;
use util;

pub fn run_protobuf_scheduler(
       master_url: String,
       user: String,
       name: String,
       framework_timeout: f64,
       scheduler: &mut Scheduler,
       framework_id: Option<String>) {

    let mesos_framework_id = framework_id.map(|framework_id| {
        let mut proto_framework_id = FrameworkID::new();
        proto_framework_id.set_value(framework_id);
        proto_framework_id
    });

    let client = SchedulerClient {
        url: master_url,
        framework_id: Arc::new(Mutex::new(None))
    };
    let client_clone = client.clone();

    let (tx, rx) = channel();
    thread::spawn(move|| {
        let mut codec = RecordIOCodec::new(tx);
        let framework_info = util::framework_info(user, name, framework_timeout);
        let mut res = client_clone.subscribe(framework_info, None).unwrap();
        io::copy(&mut res, &mut codec).unwrap();
    });

    for event in rx {
        match event.get_field_type() {
            Event_Type::SUBSCRIBED => {
                let subscribed = event.get_subscribed();
                let mut framework_id = client.framework_id.lock().unwrap();
                *framework_id = Some(subscribed.get_framework_id().clone());

                let heartbeat_interval_seconds =
                    if !subscribed.has_heartbeat_interval_seconds() {
                        None
                    } else {
                        Some(subscribed.get_heartbeat_interval_seconds())
                    };

                scheduler.subscribed(&client, subscribed.get_framework_id(), heartbeat_interval_seconds)
            },
            Event_Type::OFFERS => {
                let offers = event.get_offers();
                scheduler.offers(
                    &client,
                    offers.get_offers().to_vec(),
                    offers.get_inverse_offers().to_vec()
                )
            },
            Event_Type::RESCIND =>
                scheduler.rescind(&client, event.get_rescind().get_offer_id()),
            Event_Type::UPDATE =>
                scheduler.update(&client, event.get_update().get_status()),
            Event_Type::MESSAGE => {
                let message = event.get_message();
                scheduler.message(&client, message.get_agent_id(), message.get_executor_id(), message.get_data().to_vec())
            },
            Event_Type::FAILURE => {
                let failure = event.get_failure();
                let executor_id =
                    if !failure.has_executor_id() {
                        None
                    } else {
                        Some(failure.get_executor_id())
                    };
                let status =
                    if !failure.has_status() {
                        None
                    } else {
                        Some(failure.get_status())
                    };
                scheduler.failure(&client, failure.get_agent_id(), executor_id, status)
            },
            Event_Type::ERROR => scheduler.error(&client, event.get_error().get_message().to_string()),
            Event_Type::HEARTBEAT => scheduler.heartbeat(&client),
        }
    }
}