#![deny(missing_docs)]
#[macro_use]
extern crate failure;
extern crate rand;
extern crate rayon;
extern crate rusoto_core;
extern crate rusoto_ec2;
#[macro_use]
extern crate scopeguard;
#[macro_use]
extern crate slog;
extern crate slog_term;
extern crate ssh2;
extern crate tempfile;
use failure::{Error, ResultExt};
use rayon::prelude::*;
use rusoto_core::request::HttpClient;
use rusoto_core::Region;
use rusoto_core::{DefaultCredentialsProvider, ProvideAwsCredentials};
use std::collections::HashMap;
use std::io::Write;
use std::{thread, time};
mod ssh;
pub use ssh::Session;
pub struct Machine {
pub ssh: Option<ssh::Session>,
pub instance_type: String,
pub private_ip: String,
pub public_dns: String,
pub public_ip: String,
}
pub struct MachineSetup {
instance_type: String,
ami: String,
username: String,
setup: Box<Fn(&mut ssh::Session) -> Result<(), Error> + Sync>,
}
impl MachineSetup {
pub fn new<F>(instance_type: &str, ami: &str, setup: F) -> Self
where
F: Fn(&mut ssh::Session) -> Result<(), Error> + 'static + Sync,
{
MachineSetup {
instance_type: instance_type.to_string(),
ami: ami.to_string(),
username: String::from("ec2-user"),
setup: Box::new(setup),
}
}
pub fn as_user(mut self, username: &str) -> Self {
self.username = username.to_string();
self
}
}
#[must_use]
pub struct TsunamiBuilder {
descriptors: HashMap<String, (MachineSetup, u32)>,
log: slog::Logger,
max_duration: i64,
region: Region,
availability_zone: Option<String>,
cluster: bool,
max_wait: Option<time::Duration>,
}
impl Default for TsunamiBuilder {
fn default() -> Self {
TsunamiBuilder {
descriptors: Default::default(),
log: slog::Logger::root(slog::Discard, o!()),
max_duration: 60,
region: Region::UsEast1,
availability_zone: None,
cluster: true,
max_wait: None,
}
}
}
impl TsunamiBuilder {
pub fn add_set(&mut self, name: &str, number: u32, setup: MachineSetup) {
self.descriptors.insert(name.to_string(), (setup, number));
}
pub fn set_region(&mut self, region: Region) {
self.region = region;
}
pub fn set_availability_zone(&mut self, zone: &str) {
self.availability_zone = Some(zone.to_string());
}
pub fn no_clustering(&mut self) {
self.cluster = false;
}
pub fn wait_limit(&mut self, t: time::Duration) {
self.max_wait = Some(t);
}
pub fn set_max_duration(&mut self, hours: u8) {
self.max_duration = hours as i64 * 60;
}
pub fn set_logger(&mut self, log: slog::Logger) {
self.log = log;
}
pub fn use_term_logger(&mut self) {
use slog::Drain;
use std::sync::Mutex;
let decorator = slog_term::TermDecorator::new().build();
let drain = Mutex::new(slog_term::FullFormat::new(decorator).build()).fuse();
self.log = slog::Logger::root(drain, o!());
}
pub fn run<F, R>(self, f: F) -> Result<R, Error>
where
F: FnOnce(HashMap<String, Vec<Machine>>) -> Result<R, Error>,
{
self.run_as(DefaultCredentialsProvider::new()?, f)
}
pub fn run_as<P, F, R>(self, provider: P, f: F) -> Result<R, Error>
where
P: ProvideAwsCredentials + Send + Sync + 'static,
<P as ProvideAwsCredentials>::Future: Send,
F: FnOnce(HashMap<String, Vec<Machine>>) -> Result<R, Error>,
{
use rusoto_ec2::Ec2;
let log = &self.log;
let mut rng = rand::thread_rng();
debug!(log, "connecting to ec2");
let ec2 = rusoto_ec2::Ec2Client::new_with(HttpClient::new()?, provider, self.region);
info!(log, "spinning up tsunami");
use rand::Rng;
let mut group_name = String::from("tsunami_security_");
group_name.extend(rng.sample_iter(&rand::distributions::Alphanumeric).take(10));
trace!(log, "creating security group"; "name" => &group_name);
let mut req = rusoto_ec2::CreateSecurityGroupRequest::default();
req.group_name = group_name;
req.description = "temporary access group for tsunami VMs".to_string();
let res = ec2
.create_security_group(req)
.sync()
.context("failed to create security group for new machines")?;
let group_id = res
.group_id
.expect("aws created security group with no group id");
trace!(log, "created security group"; "id" => &group_id);
let mut req = rusoto_ec2::AuthorizeSecurityGroupIngressRequest::default();
req.group_id = Some(group_id.clone());
req.ip_protocol = Some("tcp".to_string());
req.from_port = Some(22);
req.to_port = Some(22);
req.cidr_ip = Some("0.0.0.0/0".to_string());
trace!(log, "adding ssh access to security group");
let _ = ec2
.authorize_security_group_ingress(req.clone())
.sync()
.context("failed to fill in security group for new machines")?;
req.from_port = Some(0);
req.to_port = Some(65535);
req.cidr_ip = Some("172.31.0.0/16".to_string());
trace!(log, "adding internal VM access to security group");
let _ = ec2
.authorize_security_group_ingress(req)
.sync()
.context("failed to fill in security group for new machines")?;
trace!(log, "creating keypair");
let mut req = rusoto_ec2::CreateKeyPairRequest::default();
let mut key_name = String::from("tsunami_key_");
key_name.extend(rng.sample_iter(&rand::distributions::Alphanumeric).take(10));
req.key_name = key_name.clone();
let res = ec2
.create_key_pair(req)
.sync()
.context("failed to generate new key pair")?;
trace!(log, "created keypair"; "fingerprint" => res.key_fingerprint);
let private_key = res
.key_material
.expect("aws did not generate key material for new key");
let mut private_key_file = tempfile::NamedTempFile::new()
.context("failed to create temporary file for keypair")?;
private_key_file
.write_all(private_key.as_bytes())
.context("could not write private key to file")?;
trace!(log, "wrote keypair to file"; "filename" => private_key_file.path().display());
let mut setup_fns = HashMap::new();
let mut usernames = HashMap::new();
let expected_num: u32 = self.descriptors.values().map(|&(_, n)| n).sum();
let placement = if self.cluster {
trace!(log, "creating placement group");
let mut req = rusoto_ec2::CreatePlacementGroupRequest::default();
let mut placement_name = String::from("tsunami_placement_");
placement_name.extend(rng.sample_iter(&rand::distributions::Alphanumeric).take(10));
req.group_name = Some(placement_name.clone());
req.strategy = Some(String::from("cluster"));
ec2.create_placement_group(req)
.sync()
.context("failed to create new placement group")?;
trace!(log, "created placement group");
let mut placement = rusoto_ec2::SpotPlacement::default();
placement.availability_zone = self.availability_zone;
placement.group_name = Some(placement_name);
Some(placement)
} else {
None
};
let mut id_to_name = HashMap::new();
let mut spot_req_ids = Vec::new();
debug!(log, "issuing spot requests");
for (name, (setup, number)) in self.descriptors {
let mut launch = rusoto_ec2::RequestSpotLaunchSpecification::default();
launch.image_id = Some(setup.ami);
launch.instance_type = Some(setup.instance_type);
launch.placement = placement.clone();
setup_fns.insert(name.clone(), setup.setup);
usernames.insert(name.clone(), setup.username);
launch.security_group_ids = Some(vec![group_id.clone()]);
launch.key_name = Some(key_name.clone());
let req = rusoto_ec2::RequestSpotInstancesRequest {
instance_count: Some(i64::from(number)),
block_duration_minutes: Some(self.max_duration),
launch_specification: Some(launch),
type_: Some("one-time".into()),
..Default::default()
};
trace!(log, "issuing spot request for {}", name; "#" => number);
let res = ec2
.request_spot_instances(req)
.sync()
.context(format!("failed to request spot instances for {}", name))?;
let res = res
.spot_instance_requests
.expect("request_spot_instances should always return spot instance requests");
spot_req_ids.extend(
res.into_iter()
.filter_map(|sir| sir.spot_instance_request_id)
.map(|sir| {
trace!(log, "activated spot request"; "id" => &sir);
id_to_name.insert(sir.clone(), name.clone());
sir
}),
);
}
let start = time::Instant::now();
let mut error = None;
let mut req = rusoto_ec2::DescribeSpotInstanceRequestsRequest::default();
req.spot_instance_request_ids = Some(spot_req_ids);
let instances: Vec<_>;
debug!(log, "waiting for instances to spawn");
loop {
trace!(log, "checking spot request status");
let res = ec2.describe_spot_instance_requests(req.clone()).sync();
if let Err(e) = res {
let msg = format!("{}", e);
if msg.contains("The spot instance request ID") && msg.contains("does not exist") {
trace!(log, "spot instance requests not yet ready");
continue;
} else {
return Err(e)
.context(format!("failed to describe spot instances"))
.map_err(|e| e.into());
}
}
let res = res.expect("Err checked above");
let any_pending = res
.spot_instance_requests
.as_ref()
.expect("describe always returns at least one spot instance")
.iter()
.map(|sir| {
(
sir,
sir.state
.as_ref()
.expect("spot request did not have state specified"),
)
})
.any(|(sir, state)| {
if state == "open" || (state == "active" && sir.instance_id.is_none()) {
true
} else {
trace!(log, "spot request ready"; "state" => state, "id" => &sir.spot_instance_request_id);
false
}
});
if !any_pending {
instances = res
.spot_instance_requests
.unwrap()
.into_iter()
.filter_map(|sir| {
let name = id_to_name
.remove(
&sir.spot_instance_request_id
.expect("spot request must have spot request id"),
)
.expect("every spot request id is made for some machine set");
if sir.state.as_ref().unwrap() == "active" {
let instance_id = sir.instance_id.unwrap();
trace!(log, "spot request satisfied"; "set" => &name, "iid" => &instance_id);
id_to_name.insert(instance_id.clone(), name);
Some(instance_id)
} else {
error!(log, "spot request failed: {:?}", &sir.status; "set" => &name, "state" => &sir.state.unwrap());
None
}
})
.collect();
break;
} else {
thread::sleep(time::Duration::from_secs(1));
}
if let Some(wait_limit) = self.max_wait {
if start.elapsed() > wait_limit {
warn!(log, "wait time exceeded -- cancelling run");
let mut cancel = rusoto_ec2::CancelSpotInstanceRequestsRequest::default();
cancel.spot_instance_request_ids = req
.spot_instance_request_ids
.clone()
.expect("we set this to Some above");
ec2.cancel_spot_instance_requests(cancel)
.sync()
.context("failed to cancel spot instances")
.map_err(|e| {
warn!(log, "failed to cancel spot instance request: {:?}", e);
e
})?;
trace!(
log,
"spot instances cancelled -- gathering remaining instances"
);
thread::sleep(time::Duration::from_secs(1));
instances = ec2
.describe_spot_instance_requests(req)
.sync()?
.spot_instance_requests
.map(|reqs| {
reqs.into_iter()
.filter_map(|mut sir| {
sir.instance_id
.take()
.map(|instance_id| {
trace!(log, "spot request cancelled"; "iid" => &instance_id);
instance_id
})
.or_else(|| {
error!(log, "spot request failed: {:?}", &sir.status);
None
})
})
.collect()
})
.unwrap_or_default();
error = Some("wait limit reached");
break;
}
}
}
let mut term_instances = instances.clone();
defer! {{
use std::mem;
if !term_instances.is_empty() {
debug!(log, "terminating instances");
let mut termination_req = rusoto_ec2::TerminateInstancesRequest::default();
termination_req.instance_ids = mem::replace(&mut term_instances, Vec::new());
while let Err(e) = ec2.terminate_instances(termination_req.clone()).sync() {
let msg = format!("{}", e);
if msg.contains("Pooled stream disconnected") || msg.contains("broken pipe") {
trace!(log, "retrying instance termination");
continue;
} else {
warn!(log, "failed to terminate tsunami instances: {:?}", e);
break;
}
}
}
}};
if let Some(e) = error {
bail!(e);
}
let mut machines = HashMap::new();
let mut desc_req = rusoto_ec2::DescribeInstancesRequest::default();
let mut all_ready = instances.is_empty();
desc_req.instance_ids = Some(instances);
while !all_ready {
all_ready = true;
machines.clear();
for reservation in ec2
.describe_instances(desc_req.clone())
.sync()
.context("failed to cancel spot instances")?
.reservations
.unwrap_or_else(Vec::new)
{
for instance in reservation.instances.unwrap_or_else(Vec::new) {
match instance {
rusoto_ec2::Instance {
instance_id: Some(instance_id),
instance_type: Some(instance_type),
private_ip_address: Some(private_ip),
public_dns_name: Some(public_dns),
public_ip_address: Some(public_ip),
..
} => {
let machine = Machine {
ssh: None,
instance_type,
private_ip,
public_ip,
public_dns,
};
let name = id_to_name[&instance_id].clone();
trace!(log, "instance ready"; "set" => &name, "ip" => &machine.public_ip);
machines.entry(name).or_insert_with(Vec::new).push(machine);
}
_ => {
all_ready = false;
}
}
}
}
}
let mut res = None;
let mut errors = Vec::new();
let running: u32 = machines.values().map(|ms| ms.len() as u32).sum();
if running == expected_num {
info!(log, "all machines instantiated; running setup");
let usernames = &usernames;
let private_key_file = &private_key_file;
let wait_for = self.max_wait.map(|wl| wl - start.elapsed());
errors.par_extend(machines.par_iter_mut().flat_map(|(name, machines)| {
let f = &setup_fns[name];
machines
.par_iter_mut()
.map(move |machine| -> Result<_, Error> {
use std::net::{IpAddr, SocketAddr};
let mut sess = ssh::Session::connect(
&log,
&usernames[name],
SocketAddr::new(
machine
.public_ip
.parse::<IpAddr>()
.context("machine ip is not an ip address")?,
22,
),
private_key_file.path(),
wait_for,
).context(format!(
"failed to ssh to {} machine {}",
name, machine.public_dns
))
.map_err(|e| {
error!(log, "failed to ssh to {}:{}", &name, &machine.public_ip);
e
})?;
debug!(log, "setting up {} instance", name; "ip" => &machine.public_ip);
f(&mut sess)
.context(format!("setup procedure for {} machine failed", name))
.map_err(|e| {
error!(log, "setup for {} machine failed", name);
e
})?;
info!(log, "finished setting up {} instance", name; "ip" => &machine.public_ip);
machine.ssh = Some(sess);
Ok(())
})
.filter_map(Result::err)
}));
if errors.is_empty() {
let start = time::Instant::now();
info!(log, "quiet before the storm");
res = Some(
f(machines)
.context("tsunami main routine failed")
.map_err(|e| {
crit!(log, "main tsunami routine failed");
e
})?,
);
info!(log, "the power of the tsunami was unleashed"; "duration" => start.elapsed().as_secs());
}
} else {
crit!(
log,
"only {} out of {} machines were started; aborting",
running,
expected_num
);
}
debug!(log, "all done");
errors
.into_iter()
.next()
.map(|e| Err(e))
.unwrap_or_else(|| {
Ok(res.expect("if there are no errors, then we ran the user's main function"))
})
}
}