1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
//! `tsunami` provides an interface for running one-off jobs on cloud instances.
//!
//! **This crate requires nightly Rust.**
//!
//! Imagine you need to run an experiment that involves four machines of different types on AWS. Or
//! on Azure. And each one needs to be set up in a particular way. Maybe one is a server, two are
//! load generating clients, and one is a monitor of some sort. You want to spin them all up with a
//! custom AMI, in different regions, and then run some benchmarks once they're all up and running.
//!
//! This crate makes that trivial.
//!
//! You say what machines you want, and the library takes care of the rest. It uses the cloud
//! service's API to start the machines as appropriate, and gives you [ssh connections] to each
//! host as it becomes available to run setup. When all the machines are available, you can connect
//! to them all in a single step, and then run your distributed job. When you're done, `tsunami`
//! tears everything down for you. And did I mention it even supports AWS spot instances, so it
//! even saves you money?
//!
//! How does this magic work? Take a look at this example:
//!
//! ```rust,no_run
//! use azure::Region as AzureRegion;
//! use rusoto_core::{credential::DefaultCredentialsProvider, Region as AWSRegion};
//! use tsunami::Tsunami;
//! use tsunami::providers::{aws, azure};
//! #[tokio::main]
//! async fn main() -> Result<(), color_eyre::Report> {
//!     // Initialize AWS
//!     let mut aws = aws::Launcher::default();
//!     // Create an AWS machine descriptor and add it to the AWS Tsunami
//!     aws.spawn(
//!         vec![(
//!             String::from("aws_vm"),
//!             aws::Setup::default()
//!                 .region_with_ubuntu_ami(AWSRegion::UsWest1) // default is UsEast1
//!                 .await
//!                 .unwrap()
//!                 .setup(|vm| {
//!                     // default is a no-op
//!                     Box::pin(async move {
//!                         vm.ssh.command("sudo")
//!                             .arg("apt")
//!                             .arg("update")
//!                             .status()
//!                             .await?;
//!                         vm.ssh.command("bash")
//!                             .arg("-c")
//!                             .arg("\"curl https://sh.rustup.rs -sSf | sh -- -y\"")
//!                             .status()
//!                             .await?;
//!                         Ok(())
//!                     })
//!                 }),
//!         )],
//!         None,
//!     )
//!     .await?;
//!
//!     // Initialize Azure
//!     let mut azure = azure::Launcher::default();
//!     // Create an Azure machine descriptor and add it to the Azure Tsunami
//!     azure
//!         .spawn(
//!             vec![(
//!                 String::from("azure_vm"),
//!                 azure::Setup::default()
//!                     .region(AzureRegion::FranceCentral) // default is EastUs
//!                     .setup(|vm| {
//!                         // default is a no-op
//!                         Box::pin(async move {
//!                             vm.ssh.command("sudo")
//!                                 .arg("apt")
//!                                 .arg("update")
//!                                 .status()
//!                                 .await?;
//!                             vm.ssh.command("bash")
//!                                 .arg("-c")
//!                                 .arg("\"curl https://sh.rustup.rs -sSf | sh -- -y\"")
//!                                 .status()
//!                                 .await?;
//!                             Ok(())
//!                         })
//!                     }),
//!             )],
//!             None,
//!         )
//!         .await?;
//!
//!     // SSH to the VMs and run commands on it
//!     let aws_vms = aws.connect_all().await?;
//!     let azure_vms = azure.connect_all().await?;
//!
//!     let vms = aws_vms.into_iter().chain(azure_vms.into_iter());
//!
//!     // do amazing things with the VMs!
//!     // you have access to things like ip addresses for each host too.
//!
//!     // call terminate_all() to terminate the instances.
//!     aws.terminate_all().await?;
//!     azure.terminate_all().await?;
//!     Ok(())
//! }
//! ```
//!
//! # Where are the logs?
//!
//! This crate uses [`tracing`](https://docs.rs/tracing), which does not log anything by default,
//! since the crate allows you to [plug and
//! play](https://docs.rs/tracing/0.1.14/tracing/#in-executables) which "consumer" you want for
//! your trace points. If you want logging that "just works", you'll want
//! [`tracing_subscriber::fmt`](https://docs.rs/tracing-subscriber/0.2/tracing_subscriber/fmt/index.html),
//! which you can instantiate (after adding it to your Cargo.toml) with:
//!
//! ```rust
//! tracing_subscriber::fmt::init();
//! ```
//!
//! And then run your application with, for example, `RUST_LOG=info` to get logs. If you're using
//! the `log` crate, you can instead just add a dependency on `tracing` with the `log` feature
//! enabled, and things should just "magically" work.
//!
//! If you also want better tracing of errors (which I think you do), take a look at the
//! documentation for [`color-eyre`](https://docs.rs/color_eyre/), which includes an example for
//! how to set up `tracing` with [`tracing-error`](https://docs.rs/tracing-error).
//!
//! # SSH without `openssh`
//!
//! An SSH connection to each [`Machine`](crate::Machine) is automatically established using the
//! [`openssh`](https://docs.rs/openssh/) crate. However, it's possible to SSH into [`Machine`]'s
//! without `openssh`. For example, using [`tokio::process::Command`](https://docs.rs/tokio/?search=Command),
//! you can ssh into a `Machine` with something like this:
//!
//! ```rust,no_run
//! tsunami::providers::aws::Setup::default()
//!     .setup(|vm| {
//!         Box::pin(async move {
//!             let remote_command = "date +%Y-%m-%d";
//!             let ssh_command = format!(
//!                 "ssh -o StrictHostKeyChecking=no {}@{} -i {} {}",
//!                 vm.username,
//!                 vm.public_ip,
//!                 vm.private_key.as_ref().expect("private key should be set").as_path().display(),
//!                 remote_command,
//!             );
//!             let out = tokio::process::Command::new("sh")
//!                 .arg("-c")
//!                 .arg(ssh_command)
//!                 .output()
//!                 .await?;
//!             let out = String::from_utf8(out.stdout)?
//!                 .trim()
//!                 .to_string();
//!             println!("{}", out);
//!             Ok(())
//!         })
//!     });
//! ```
//!
//! # Live-coding
//!
//! An earlier version of this crate was written as part of a live-coding stream series intended
//! for users who are already somewhat familiar with Rust, and who want to see something larger and
//! more involved be built. You can find the recordings of past sessions [on
//! YouTube](https://www.youtube.com/playlist?list=PLqbS7AVVErFgY2faCIYjJZv_RluGkTlKt).

#![warn(
    unreachable_pub,
    missing_docs,
    missing_copy_implementations,
    trivial_casts,
    trivial_numeric_casts,
    unused_extern_crates,
    rust_2018_idioms,
    missing_debug_implementations
)]
#![allow(clippy::type_complexity)]

use color_eyre::Report;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use tracing::instrument;

pub mod providers;

#[derive(Debug)]
struct MachineDescriptor<'tsunami> {
    pub(crate) nickname: String,
    pub(crate) public_dns: Option<String>,
    pub(crate) public_ip: String,
    pub(crate) private_ip: Option<String>,

    // tie the lifetime of the machine to the Tsunami.
    _tsunami: std::marker::PhantomData<&'tsunami ()>,
}
/// A handle to an instance currently running as part of a tsunami.
///
/// Run commands on the machine using the [`openssh::Session`] via the `ssh` field.
#[non_exhaustive]
#[derive(Debug)]
pub struct Machine<'tsunami> {
    /// The friendly name for this machine.
    ///
    /// Corresponds to the name set in [`TsunamiBuilder::add`].
    pub nickname: String,
    /// The public DNS name of the machine.
    ///
    /// If the instance doesn't have a DNS name, this field will be
    /// equivalent to `public_ip`.
    pub public_dns: String,
    /// The public IP address of the machine.
    pub public_ip: String,
    /// The private IP address of the machine, if available.
    pub private_ip: Option<String>,

    /// An established SSH session to this host.
    pub ssh: openssh::Session,

    /// Username that can be used to SSH into the host.
    pub username: String,
    /// Private key that can be used to SSH into the host.
    pub private_key: Option<std::path::PathBuf>,

    // tie the lifetime of the machine to the Tsunami.
    _tsunami: std::marker::PhantomData<&'tsunami ()>,
}

impl<'t> MachineDescriptor<'t> {
    #[cfg(any(feature = "aws", feature = "azure", feature = "baremetal"))]
    #[instrument(level = "debug", skip(key_path, timeout))]
    async fn connect_ssh(
        self,
        username: &str,
        key_path: Option<&std::path::Path>,
        timeout: Option<std::time::Duration>,
        port: u16,
    ) -> Result<Machine<'t>, Report> {
        let mut sess = openssh::SessionBuilder::default();

        sess.user(username.to_string()).port(port);

        if let Some(k) = key_path {
            sess.keyfile(k);
        }

        if let Some(t) = timeout {
            sess.connect_timeout(t);
        }

        tracing::trace!("connecting");
        let sess = sess.connect(&self.public_ip).await?;
        tracing::trace!("connected");

        let public_ip = self.public_ip;
        Ok(Machine {
            nickname: self.nickname,
            // if not defined, set public dns to be the public ip
            public_dns: self.public_dns.unwrap_or_else(|| public_ip.clone()),
            public_ip,
            private_ip: self.private_ip,
            _tsunami: self._tsunami,
            ssh: sess,
            username: username.to_string(),
            private_key: key_path.map(|path| path.to_path_buf()),
        })
    }
}

/// Use this trait to launch machines into providers.
///
/// Important: You must call `terminate_all` to shut down the instances once you are done.
/// Otherwise, you may incur unexpected charges from the cloud provider.
///
/// This trait is sealed. If you want to implement support for a provider, see
/// [`providers::Launcher`].
pub trait Tsunami: sealed::Sealed {
    /// A type describing a single instance to launch.
    type MachineDescriptor: providers::MachineSetup;

    /// Start up all the hosts.
    ///
    /// The returned future will resolve when the instances are spawned into the provided launcher.
    /// SSH connections to each instance are accesssible via
    /// [`connect_all`](providers::Launcher::connect_all).
    ///
    /// The argument `descriptors` is an iterator of machine nickname to descriptor. Duplicate
    /// nicknames will cause an error. To add many and auto-generate nicknames, see the helper
    /// function [`crate::make_multiple`].
    ///
    /// `max_wait` limits how long we should wait for instances to be available before giving up.
    /// Passing `None` implies no limit.
    ///
    /// # Example
    /// ```rust,no_run
    /// #[tokio::main]
    /// async fn main() -> Result<(), color_eyre::Report> {
    ///     use tsunami::Tsunami;
    ///     // make a launcher
    ///     let mut aws: tsunami::providers::aws::Launcher<_> = Default::default();
    ///     // spawn a host into the launcher
    ///     aws.spawn(vec![(String::from("my_tsunami"), Default::default())], None).await?;
    ///     // access the host via the launcher
    ///     let vms = aws.connect_all().await?;
    ///     // we're done! terminate the instance.
    ///     aws.terminate_all().await?;
    ///     Ok(())
    /// }
    /// ```
    fn spawn<'l, I>(
        &'l mut self,
        descriptors: I,
        max_wait: Option<std::time::Duration>,
    ) -> Pin<Box<dyn Future<Output = Result<(), Report>> + Send + 'l>>
    where
        I: IntoIterator<Item = (String, Self::MachineDescriptor)> + Send + 'static,
        I: std::fmt::Debug,
        I::IntoIter: Send;

    /// Return connections to the [`Machine`s](crate::Machine) that `spawn` spawned.
    fn connect_all<'l>(
        &'l self,
    ) -> Pin<
        Box<dyn Future<Output = Result<HashMap<String, crate::Machine<'l>>, Report>> + Send + 'l>,
    >;

    /// Shut down all instances.
    fn terminate_all(self) -> Pin<Box<dyn Future<Output = Result<(), Report>> + Send>>;
}

impl<L: providers::Launcher> Tsunami for L {
    type MachineDescriptor = L::MachineDescriptor;

    fn connect_all<'l>(
        &'l self,
    ) -> Pin<
        Box<dyn Future<Output = Result<HashMap<String, crate::Machine<'l>>, Report>> + Send + 'l>,
    > {
        self.connect_all()
    }

    fn terminate_all(self) -> Pin<Box<dyn Future<Output = Result<(), Report>> + Send>> {
        self.terminate_all()
    }

    fn spawn<'l, I>(
        &'l mut self,
        descriptors: I,
        max_wait: Option<std::time::Duration>,
    ) -> Pin<Box<dyn Future<Output = Result<(), Report>> + Send + 'l>>
    where
        I: IntoIterator<Item = (String, Self::MachineDescriptor)> + Send + 'static,
        I: std::fmt::Debug,
        I::IntoIter: Send,
    {
        self.spawn(descriptors, max_wait)
    }
}

mod sealed {
    pub trait Sealed {}

    impl<L: crate::providers::Launcher> Sealed for L {}
}

/// Make multiple machine descriptors.
///
/// The `nickname_prefix` is used to name the machines, indexed from 0 to `n`:
/// ```rust,no_run
/// #[tokio::main]
/// async fn main() -> Result<(), color_eyre::Report> {
///     use tsunami::{
///         Tsunami,
///         make_multiple,
///         providers::aws::{self, Setup},
///     };
///     let mut aws: aws::Launcher<_> = Default::default();
///     aws.spawn(make_multiple(3, "my_tsunami", Setup::default()), None).await?;
///
///     let vms = aws.connect_all().await?;
///     let my_first_vm = vms.get("my_tsunami-0").unwrap();
///     let my_last_vm = vms.get("my_tsunami-2").unwrap();
///     Ok(())
/// }
/// ```
pub fn make_multiple<M: Clone>(n: usize, nickname_prefix: &str, m: M) -> Vec<(String, M)> {
    std::iter::repeat(m)
        .take(n)
        .enumerate()
        .map(|(i, m)| {
            let name = format!("{}-{}", nickname_prefix, i);
            (name, m)
        })
        .collect()
}