mesos 0.2.3

Mesos library using the new HTTP API
Documentation
use std::io;
use std::sync::{Arc, Mutex};

use hyper;
use hyper::Client;
use hyper::client::response::Response;
use hyper::header::{Connection, ContentType, Headers, Accept, QualityItem, Quality, qitem};
use protobuf::{self, Message};

use proto::scheduler::{Call, Call_Type, Call_Subscribe, Call_Accept, Call_Decline, 
                       Call_Kill, Call_Shutdown, Call_Acknowledge, Call_Reconcile,
                       Call_Reconcile_Task, Call_Message, Call_Request};
use proto::mesos::{AgentID, ExecutorID, Filters, FrameworkInfo, FrameworkID, OfferID,
                   Operation, Request, TaskID};
use util;

#[derive(Clone)]
pub struct SchedulerClient {
    pub url: String,
    pub framework_id: Arc<Mutex<Option<FrameworkID>>>,
}

impl SchedulerClient {
    pub fn subscribe(&self, framework_info: FrameworkInfo, force: Option<bool>) -> hyper::Result<Response> {
        let mut subscribe = Call_Subscribe::new();
        subscribe.set_framework_info(framework_info);
        
        let mut call = Call::new();
        call.set_field_type(Call_Type::SUBSCRIBE);
        call.set_subscribe(subscribe);
        {
            let framework_id_clone = self.framework_id.clone();
            let framework_id = framework_id_clone.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn teardown(&self) -> hyper::Result<Response> {
        let mut call = Call::new();
        call.set_field_type(Call_Type::TEARDOWN);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn accept(
        &self,
        offer_ids: Vec<OfferID>,
        operations: Vec<Operation>,
        filters: Filters
    ) -> hyper::Result<Response> {

        let mut accept = Call_Accept::new();
        accept.set_offer_ids(protobuf::RepeatedField::from_vec(offer_ids));
        accept.set_operations(protobuf::RepeatedField::from_vec(operations));
        accept.set_filters(filters);

        let mut call = Call::new();
        call.set_field_type(Call_Type::ACCEPT);
        call.set_accept(accept);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn decline(&self, offer_ids: Vec<OfferID>, filters: Filters) -> hyper::Result<Response> {
        let mut decline = Call_Decline::new();
        decline.set_offer_ids(protobuf::RepeatedField::from_vec(offer_ids));
        decline.set_filters(filters);

        let mut call = Call::new();
        call.set_field_type(Call_Type::DECLINE);
        call.set_decline(decline);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn revive(&self) -> hyper::Result<Response> {
        let mut call = Call::new();
        call.set_field_type(Call_Type::REVIVE);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn kill(&self, task_id: TaskID, agent_id: AgentID) -> hyper::Result<Response> {
        let mut kill = Call_Kill::new();
        kill.set_task_id(task_id);
        kill.set_agent_id(agent_id);

        let mut call = Call::new();
        call.set_field_type(Call_Type::KILL);
        call.set_kill(kill);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn shutdown(&self, executor_id: ExecutorID, agent_id: AgentID) -> hyper::Result<Response> {
        let mut shutdown = Call_Shutdown::new();
        shutdown.set_executor_id(executor_id);
        shutdown.set_agent_id(agent_id);

        let mut call = Call::new();
        call.set_field_type(Call_Type::SHUTDOWN);
        call.set_shutdown(shutdown);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn acknowledge(&self, agent_id: AgentID, task_id: TaskID, uuid: Vec<u8>) -> hyper::Result<Response> {
        let mut acknowledge = Call_Acknowledge::new();
        acknowledge.set_agent_id(agent_id);
        acknowledge.set_task_id(task_id);
        acknowledge.set_uuid(uuid);

        let mut call = Call::new();
        call.set_field_type(Call_Type::ACKNOWLEDGE);
        call.set_acknowledge(acknowledge);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn reconcile_task(&self, task_id: TaskID, agent_id: AgentID) -> hyper::Result<Response> {
        let mut reconcile = Call_Reconcile_Task::new();
        reconcile.set_task_id(task_id);
        reconcile.set_agent_id(agent_id);

        self.reconcile(vec![reconcile])
    }

    pub fn reconcile(&self, tasks: Vec<Call_Reconcile_Task>) -> hyper::Result<Response> {
        let mut reconcile = Call_Reconcile::new();
        reconcile.set_tasks(protobuf::RepeatedField::from_vec(tasks));

        let mut call = Call::new();
        call.set_field_type(Call_Type::RECONCILE);
        call.set_reconcile(reconcile);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }


    pub fn message(&self, agent_id: AgentID, executor_id: ExecutorID, data: Vec<u8>) -> hyper::Result<Response> {
        let mut message = Call_Message::new();
        message.set_agent_id(agent_id);
        message.set_executor_id(executor_id);
        message.set_data(data);

        let mut call = Call::new();
        call.set_field_type(Call_Type::MESSAGE);
        call.set_message(message);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn request(&self, requests: Vec<Request>) -> hyper::Result<Response> {
        let mut request = Call_Request::new();
        request.set_requests(protobuf::RepeatedField::from_vec(requests));

        let mut call = Call::new();
        call.set_field_type(Call_Type::REQUEST);
        call.set_request(request);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    pub fn suppress(&self) -> hyper::Result<Response> {
        let mut call = Call::new();
        call.set_field_type(Call_Type::SUPPRESS);
        {
            let framework_id = self.framework_id.lock().unwrap();
            if framework_id.is_some() {
                let framework_id_clone = framework_id.clone();
                call.set_framework_id(framework_id_clone.unwrap());
            }
        }

        self.post(&*call.write_to_bytes().unwrap())
    }

    fn post(&self, data: &[u8]) -> hyper::Result<Response> {
        let client = Client::new();

        client.post(&*self.url)
            .headers(util::protobuf_headers())
            .body(data)
            .send()
    }
}