mesos 0.2.10

Mesos library using the new HTTP API
use std::io;
use std::sync::{Arc, Mutex};

use hyper;
use hyper::Client;
use hyper::client::response::Response;
use hyper::header::{Accept, Connection, ContentType, Headers, Quality,
                    QualityItem, qitem};
use protobuf::{self, Message};

use proto::scheduler::{Call, Call_Accept, Call_Acknowledge, Call_Decline,
                       Call_Kill, Call_Message, Call_Reconcile,
                       Call_Reconcile_Task, Call_Request, Call_Shutdown,
                       Call_Subscribe, Call_Type};
use proto::mesos::{AgentID, ExecutorID, Filters, FrameworkID, FrameworkInfo,
                   OfferID, Operation, Request, TaskID, TaskInfo};
use util;

#[derive(Clone)]
pub struct SchedulerClient {
    pub url: String,
    pub framework_id: Arc<Mutex<Option<FrameworkID>>>,
}

impl SchedulerClient {
    pub fn new(url: String,
               framework_id: Option<FrameworkID>)
               -> SchedulerClient {
        SchedulerClient {
            url: url + "/api/v1/scheduler",
            framework_id: Arc::new(Mutex::new(framework_id)),
        }
    }

    pub fn get_framework_id(&self) -> Option<FrameworkID> {
        let id = self.framework_id.lock().unwrap().clone();
        id
    }

    pub fn subscribe(&self,
                     mut framework_info: FrameworkInfo,
                     force: Option<bool>)
                     -> hyper::Result<Response> {
        match self.get_framework_id() {
            Some(fwid) => framework_info.set_id(fwid),
            _ => (),
        }

        let mut subscribe = Call_Subscribe::new();
        subscribe.set_framework_info(framework_info);

        let mut call = Call::new();
        call.set_field_type(Call_Type::SUBSCRIBE);
        call.set_subscribe(subscribe);

        self.post(&mut call)
    }

    pub fn teardown(&self) -> hyper::Result<Response> {
        let mut call = Call::new();
        call.set_field_type(Call_Type::TEARDOWN);

        self.post(&mut call)
    }

    pub fn launch(&self,
                  offer_ids: Vec<OfferID>,
                  tasks: Vec<TaskInfo>,
                  filters: Option<Filters>)
                  -> hyper::Result<Response> {
        let operation = util::launch_operation(tasks);
        self.accept(offer_ids, vec![operation], filters)
    }

    pub fn accept(&self,
                  offer_ids: Vec<OfferID>,
                  operations: Vec<Operation>,
                  filters: Option<Filters>)
                  -> hyper::Result<Response> {

        let mut accept = Call_Accept::new();
        accept.set_offer_ids(protobuf::RepeatedField::from_vec(offer_ids));
        accept.set_operations(protobuf::RepeatedField::from_vec(operations));
        if filters.is_some() {
            accept.set_filters(filters.unwrap());
        }

        let mut call = Call::new();
        call.set_field_type(Call_Type::ACCEPT);
        call.set_accept(accept);

        self.post(&mut call)
    }

    pub fn decline(&self,
                   offer_ids: Vec<OfferID>,
                   filters: Option<Filters>)
                   -> hyper::Result<Response> {
        let mut decline = Call_Decline::new();
        decline.set_offer_ids(protobuf::RepeatedField::from_vec(offer_ids));
        if filters.is_some() {
            decline.set_filters(filters.unwrap());
        }

        let mut call = Call::new();
        call.set_field_type(Call_Type::DECLINE);
        call.set_decline(decline);

        self.post(&mut call)
    }

    pub fn revive(&self) -> hyper::Result<Response> {
        let mut call = Call::new();
        call.set_field_type(Call_Type::REVIVE);

        self.post(&mut call)
    }

    pub fn kill(&self,
                task_id: TaskID,
                agent_id: Option<AgentID>)
                -> hyper::Result<Response> {
        let mut kill = Call_Kill::new();
        kill.set_task_id(task_id);
        if agent_id.is_some() {
            kill.set_agent_id(agent_id.unwrap());
        }

        let mut call = Call::new();
        call.set_field_type(Call_Type::KILL);
        call.set_kill(kill);

        self.post(&mut call)
    }

    pub fn shutdown(&self,
                    executor_id: ExecutorID,
                    agent_id: AgentID)
                    -> hyper::Result<Response> {
        let mut shutdown = Call_Shutdown::new();
        shutdown.set_executor_id(executor_id);
        shutdown.set_agent_id(agent_id);

        let mut call = Call::new();
        call.set_field_type(Call_Type::SHUTDOWN);
        call.set_shutdown(shutdown);

        self.post(&mut call)
    }

    pub fn acknowledge(&self,
                       agent_id: AgentID,
                       task_id: TaskID,
                       uuid: Vec<u8>)
                       -> hyper::Result<Response> {
        let mut acknowledge = Call_Acknowledge::new();
        acknowledge.set_agent_id(agent_id);
        acknowledge.set_task_id(task_id);
        acknowledge.set_uuid(uuid);

        let mut call = Call::new();
        call.set_field_type(Call_Type::ACKNOWLEDGE);
        call.set_acknowledge(acknowledge);

        self.post(&mut call)
    }

    pub fn reconcile_task(&self,
                          task_id: TaskID,
                          agent_id: Option<AgentID>)
                          -> hyper::Result<Response> {
        let mut reconcile = Call_Reconcile_Task::new();
        reconcile.set_task_id(task_id);
        if agent_id.is_some() {
            reconcile.set_agent_id(agent_id.unwrap());
        }

        self.reconcile(vec![reconcile])
    }

    pub fn reconcile(&self,
                     tasks: Vec<Call_Reconcile_Task>)
                     -> hyper::Result<Response> {
        let mut reconcile = Call_Reconcile::new();
        reconcile.set_tasks(protobuf::RepeatedField::from_vec(tasks));

        let mut call = Call::new();
        call.set_field_type(Call_Type::RECONCILE);
        call.set_reconcile(reconcile);

        self.post(&mut call)
    }


    pub fn message(&self,
                   agent_id: AgentID,
                   executor_id: ExecutorID,
                   data: Vec<u8>)
                   -> hyper::Result<Response> {
        let mut message = Call_Message::new();
        message.set_agent_id(agent_id);
        message.set_executor_id(executor_id);
        message.set_data(data);

        let mut call = Call::new();
        call.set_field_type(Call_Type::MESSAGE);
        call.set_message(message);

        self.post(&mut call)
    }

    pub fn request(&self, requests: Vec<Request>) -> hyper::Result<Response> {
        let mut request = Call_Request::new();
        request.set_requests(protobuf::RepeatedField::from_vec(requests));

        let mut call = Call::new();
        call.set_field_type(Call_Type::REQUEST);
        call.set_request(request);

        self.post(&mut call)
    }

    pub fn suppress(&self) -> hyper::Result<Response> {
        let mut call = Call::new();
        call.set_field_type(Call_Type::SUPPRESS);

        self.post(&mut call)
    }

    fn post(&self, call: &mut Call) -> hyper::Result<Response> {
        match self.get_framework_id() {
            Some(fwid) => call.set_framework_id(fwid),
            _ => (),
        }

        let client = Client::new();

        let data = &*call.write_to_bytes().unwrap();

        client.post(&*self.url)
              .headers(util::protobuf_headers())
              .body(data)
              .send()
    }
}