use mesos::{Scheduler, SchedulerClient, SchedulerConf,
ProtobufCallbackRouter, run_protobuf_scheduler};
use mesos::proto::*;
use mesos::util;
struct TestScheduler {
max_id: u64,
}
impl TestScheduler {
fn get_id(&mut self) -> u64 {
self.max_id += 1;
self.max_id
}
}
impl Scheduler for TestScheduler {
fn subscribed(&mut self,
client: &SchedulerClient,
framework_id: &FrameworkID,
heartbeat_interval_seconds: Option<f64>) {
println!("received subscribed");
client.reconcile(vec![]);
}
fn inverse_offers(&mut self,
client: &SchedulerClient,
inverse_offers: Vec<&InverseOffer>) {
println!("received inverse offers");
let offer_ids = inverse_offers.iter()
.map(|o| o.get_id().clone())
.collect();
client.decline(offer_ids, None);
}
fn offers(&mut self, client: &SchedulerClient, offers: Vec<&Offer>) {
let agent_id = offers[0].get_agent_id();
println!("received {} offers from agent {}",
offers.len(),
agent_id.get_value());
let offer_ids: Vec<OfferID> = offers.iter()
.map(|o| o.get_id().clone())
.collect();
let mut offer_cpus: f64 = offers.iter()
.flat_map(|o| o.get_resources())
.filter(|r| r.get_name() == "cpus")
.map(|c| c.get_scalar())
.fold(0f64, |acc, cpu_res| {
acc + cpu_res.get_value()
});
let mut offer_mem = util::get_scalar_resource_sum("mem", offers);
let mut tasks = vec![];
while offer_cpus >= 1f64 && offer_mem >= 128f64 {
let name = &*format!("sleepy-{}", self.get_id());
let task_id = util::task_id(name);
let mut command = CommandInfo::new();
command.set_value("env && sleep 10".to_string());
let mem = util::scalar("mem", "*", 128f64);
let cpus = util::scalar("cpus", "*", 1f64);
let resources = vec![mem, cpus];
let task_info = util::task_info(name,
&task_id,
agent_id,
&command,
resources);
tasks.push(task_info);
offer_cpus -= 1f64;
offer_mem -= 128f64;
}
client.launch(offer_ids, tasks, None);
}
fn rescind(&mut self, client: &SchedulerClient, offer_id: &OfferID) {
println!("received rescind");
}
fn update(&mut self, client: &SchedulerClient, status: &TaskStatus) {
println!("received update {:?} from {}",
status.get_state(),
status.get_task_id().get_value());
}
fn message(&mut self,
client: &SchedulerClient,
agent_id: &AgentID,
executor_id: &ExecutorID,
data: Vec<u8>) {
println!("received message");
}
fn failure(&mut self,
client: &SchedulerClient,
agent_id: Option<&AgentID>,
executor_id: Option<&ExecutorID>,
status: Option<i32>) {
println!("received failure");
}
fn error(&mut self, client: &SchedulerClient, message: String) {
println!("received error");
}
fn heartbeat(&mut self, client: &SchedulerClient) {
println!("received heartbeat");
}
fn disconnected(&mut self) {
println!("disconnected from scheduler");
}
}
#[test]
fn main() {
let mut scheduler = TestScheduler { max_id: 0 };
let conf = SchedulerConf {
master_url: "http://localhost:5050".to_string(),
user: "root".to_string(),
name: "rust http".to_string(),
framework_timeout: 0f64,
implicit_acknowledgements: true,
framework_id: None,
};
let mut router = ProtobufCallbackRouter {
scheduler: &mut scheduler,
conf: conf.clone(),
};
run_protobuf_scheduler(&mut router, conf)
}