extern crate async_mesos;
#[macro_use]
extern crate failure;
extern crate futures;
extern crate hyper;
#[macro_use]
extern crate log;
extern crate mime;
extern crate protobuf;
extern crate simple_logger;
extern crate spectral;
extern crate tokio_core;
extern crate users;
#[cfg(test)]
mod integration {
use failure;
use futures::{future, Future, Stream};
use hyper::Uri;
use async_mesos::client::Client;
use async_mesos::mesos;
use async_mesos::model;
use async_mesos::scheduler;
use simple_logger;
use spectral::prelude::*;
use tokio_core::reactor::Core;
use users::{get_current_uid, get_user_by_uid};
#[test]
fn connect() {
simple_logger::init();
let mut core = Core::new().expect("Could not create Core.");
let handle = core.handle();
let user = get_user_by_uid(get_current_uid()).expect("No system user found.");
let mut framework_info = mesos::FrameworkInfo::new();
framework_info.set_user(String::from(user.name()));
framework_info.set_name(String::from("Example FOO Framework"));
let uri = "http://localhost:5050/api/v1/scheduler"
.parse::<Uri>()
.expect("Could not parse Uri.");
let client = Client::connect(&handle, uri, framework_info);
let work = client
.into_stream()
.map(|(_, events)| events)
.flatten()
.map(|event| event.get_field_type())
.take(1)
.collect();
let result = core.run(work).unwrap();
assert_that(&result).is_equal_to(vec![scheduler::Event_Type::HEARTBEAT]);
}
#[test]
fn task_launch() {
simple_logger::init();
#[derive(Debug)]
pub struct State {
pub client: Client,
pub task_id: Option<mesos::TaskID>,
}
fn build_accept_call(
state: &State,
offer_id: mesos::OfferID,
task_id: mesos::TaskID,
agent_id: mesos::AgentID
) -> Result<scheduler::Call, failure::Error> {
let cpu = model::ScalarResourceBuilder::default()
.name("cpus")
.value(0.1)
.build()?;
let mem = model::ScalarResourceBuilder::default()
.name("mem")
.value(32.0)
.build()?;
let command = model::ShellCommandBuilder::default()
.command("sleep 100000")
.build()?;
let task_info = model::TaskInfoBuilder::default()
.name("sleep_task")
.task_id(task_id)
.agent_id(agent_id)
.resources(vec![cpu, mem])
.command(command)
.build()?;
let operation = model::OfferLaunchOperationBuilder::default()
.task_info(task_info)
.build()?;
let call = state.client.accept(vec![offer_id], vec![operation]);
Ok(call)
}
let mut core = Core::new().expect("Could not create Core.");
let handle = core.handle();
let user = get_user_by_uid(get_current_uid()).expect("No system user found.");
let mut framework_info = mesos::FrameworkInfo::new();
framework_info.set_user(String::from(user.name()));
framework_info.set_name(String::from("Example Rust Framework"));
framework_info.set_role(String::from("*"));
let uri = "http://localhost:5050/api/v1/scheduler"
.parse::<Uri>()
.expect("Could not parse Uri.");
let future_client = Client::connect(&handle, uri, framework_info);
let work = future_client.and_then(|(client, events)| {
let state = State {
client: client,
task_id: None,
};
events.fold(
state,
|mut state, mut event| -> Box<Future<Item = State, Error = failure::Error>> {
match event.get_field_type() {
scheduler::Event_Type::OFFERS => {
info!("Received offer.");
if state.task_id.is_some() {
info!("Ignoring offer because task is already launching.");
return Box::new(future::result(Ok(state)));
}
let mut offer = event.take_offers().take_offers()[0].clone();
let offer_id = offer.take_id();
let agent_id = offer.take_agent_id();
let mut task_id = mesos::TaskID::new();
task_id.set_value(String::from("my_task"));
state.task_id = Some(task_id.clone());
if let Ok(call) = build_accept_call(&state, offer_id, task_id, agent_id) {
let s = state.client.call(&handle, call).map(|()| state);
Box::new(s)
} else {
Box::new(future::err(format_err!("Could not construct offer accept call")))
}
}
scheduler::Event_Type::UPDATE => {
info!("Received task update.");
let status = event.take_update().take_status();
let task_id = status.get_task_id().clone();
let task_state = status.get_state().clone();
debug!(
"Task {} is {:?}: {}",
task_id.get_value(),
task_state,
status.get_message()
);
let ack_call = state.client.acknowledge(status);
let s = state.client.call(&handle, ack_call).map_err(|error| {
error!("Could not make acknowledge request: {}", error);
()
});
handle.spawn(s);
if task_state == mesos::TaskState::TASK_RUNNING {
let call = state.client.teardown();
let s = state.client.call(&handle, call).map(|()| state);
return Box::new(s);
} else {
Box::new(future::result(Ok(state)))
}
}
other => {
debug!("Ignore event {:?}", other);
Box::new(future::result(Ok(state)))
}
}
},
)
});
core.run(work).unwrap();
}
}