1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use rusoto_core::{default_tls_client, Region};

use std::fs::OpenOptions;
use slog;
use slog::Logger;
use slog_json;

use slog::{Drain, FnValue};
use futures_cpupool::CpuPool;
use tokio_timer::*;
use futures::*;
use futures::future::ok;
use std::time::Duration;
use rusoto_sqs::{Sqs, SqsClient, CreateQueueRequest};
use rusoto_credential::{ProvideAwsCredentials, ChainProvider, ProfileProvider};
use rusoto_sns::SnsClient;

use std::sync::Mutex;
use std::env;
use hyper;

static mut TIMER: Option<Timer> = None;

const NANOS_PER_MILLI: u32 = 1000_000;
const MILLIS_PER_SEC: u64 = 1000;

pub fn set_timer() {
    unsafe {
        TIMER = Some(Timer::default());
    }
}

pub fn get_timer() -> Timer {
    unsafe {
        TIMER.clone().unwrap().clone()
    }
}


pub fn millis(d: Duration) -> u64 {
    // A proper Duration will not overflow, because MIN and MAX are defined
    // such that the range is exactly i64 milliseconds.
    let secs_part = d.as_secs() * MILLIS_PER_SEC;
    let nanos_part = d.subsec_nanos() / NANOS_PER_MILLI;
    secs_part + nanos_part as u64
}

#[cfg_attr(feature = "flame_it", flame)]
pub fn new_sqs_client<P>(sqs_provider: &P) -> SqsClient<P, hyper::Client>
    where P: ProvideAwsCredentials + Clone + Send + 'static
{
    SqsClient::new(
        default_tls_client().unwrap(),
        sqs_provider.clone(),
        Region::UsEast1
    )
}

#[cfg_attr(feature = "flame_it", flame)]
pub fn new_sns_client<P>(sns_provider: &P) -> SnsClient<P, hyper::Client>
    where P: ProvideAwsCredentials + Clone + Send + 'static
{
    SnsClient::new(
        default_tls_client().unwrap(),
        sns_provider.clone(),
        Region::UsEast1
    )
}


#[cfg_attr(feature = "flame_it", flame)]
pub fn create_queue<P>(pool: &CpuPool, provider: &P, queue_name: &str, timer: &'static Timer) -> String
    where P: ProvideAwsCredentials + Clone + Send + 'static,
{
    let create_queue_request = CreateQueueRequest {
        attributes: None,
        queue_name: queue_name.to_owned()
    };

    let _provider = provider.clone();
    let _queue_name = queue_name.to_owned();

    let queue_url = timeout_ms! {
        pool.clone(),
        move || {
            ok(SqsClient::new(
                default_tls_client().unwrap(),
                _provider,
                Region::UsEast1
            ).create_queue(&create_queue_request)
                .unwrap_or_else(|e| panic!("Failed to create queue {} with {}", _queue_name, e)))
        },
        5_500,
        timer
    };

    match queue_url {
        Ok(url) => url.queue_url.expect("Queue url was None"),
        _ => panic!("Timeout while trying to create queue: {}", queue_name)
    }
}

#[cfg_attr(feature = "flame_it", flame)]
pub fn get_profile_provider() -> ChainProvider {
    let profile = match env::var("AWS_PROFILE") {
        Ok(val) => val.to_string(),
        Err(_) => "default".to_string(),
    };

    let mut profile_provider = ProfileProvider::new().unwrap();
    profile_provider.set_profile(profile);
    ChainProvider::with_profile_provider(profile_provider)
}

pub fn init_logger(log_path: &str) -> Logger {
    let file = OpenOptions::new()
        .create(true)
        .write(true)
        .truncate(true)
        .open(log_path)
        .expect(&format!("Failed to create log file {}", log_path));

    slog::Logger::root(
        Mutex::new(slog_json::Json::default(file)).map(slog::Fuse),
        o!("version" => env!("CARGO_PKG_VERSION"),
           "place" =>
              FnValue(move |info| {
                  format!("{}:{} {}",
                          info.file(),
                          info.line(),
                          info.module())
              }))
    )
}