watchrs 0.2.1

Crate that aids in monitoring and setting up alerts for AWS Batch Jobs.
Documentation
#![deny(missing_docs)]

//! # watchrs
//!
//! The `watchrs` crate aids in monitoring and setting up alerts for AWS Batch Jobs.
//!
//! # Examples
//!
//! ## Setting Up Alerts For Batch Job State Changes
//!
//! ```rust, no_run
//!
//! use watchrs::Watcher;
//!
//! let watcher = Watcher::default();
//! watcher
//!     .subscribe("youremail@here.com".to_owned(), None)
//!     .and_then(|(topic_arn, _)| {
//!         watcher
//!             .create_job_watcher_rule(
//!                 "my_batch_job_rule".to_owned(),
//!                 true,
//!                 Some("watch failed jobs".to_owned()),
//!                 Some(vec!["FAILED".to_owned(), "RUNNABLE".to_owned()]),
//!                 Some(vec!["JOB_QUEUE_ARN".to_owned()]),
//!                 Some(vec!["JOB_DEFINITION_NAME".to_owned()])
//!             )
//!             .map(|rule_name| (topic_arn, rule_name))
//!     })
//!       .and_then(|(topic_arn, rule_name)| {
//!            watcher.create_sns_target(rule_name, topic_arn)
//!     })
//!     .expect("failed to create alerting system");
//! ```

use log::{error, info};
use rusoto_core::region::Region;
use rusoto_events::{
    CloudWatchEvents, CloudWatchEventsClient, PutRuleRequest, PutTargetsRequest, Target,
};
use rusoto_sns::{
    CreateTopicInput, DeleteTopicInput, Sns, SnsClient, SubscribeInput, UnsubscribeInput,
};
use std::collections::HashMap;

use chrono::prelude::*;
use serde::Serialize;

/// An enum whos varients describe the point of faliure during AWS calls.
/// For now, the value captured by the enum only contains a short description of the error.
#[derive(Hash, Eq, PartialEq, Debug)]
pub enum WatchError {
    /// Indicates a failure while creating or deleting an SNS Topic
    SNSTopic(String),
    /// Indicates a failure while subscribing or unsibscribing from a topic
    SNSSubscription(String),
    /// Indicates a failure when creating or deleting an event rule
    EventRule(String),
    /// Indicates a failure when creating or deleting a event target
    EventTarget(String),
}

/// Represents a subset of the details field an AWS Batch Event returns.
/// The struct fields are used to filter a rule expression where Batch jobs are the target.
/// The full structure of a batch job event can be found in the [AWS Documentation](https://docs.aws.amazon.com/batch/latest/userguide/batch_cwe_events.html).
#[derive(Hash, Eq, PartialEq, Debug, Serialize)]
struct BatchRuleDetails {
    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "status")]
    statuses: Option<Vec<String>>,
    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "jobName")]
    job_names: Option<Vec<String>>,
    #[serde(skip_serializing_if = "Option::is_none")]
    #[serde(rename = "jobQueue")]
    job_queues: Option<Vec<String>>,
}

impl Default for BatchRuleDetails {
    fn default() -> Self {
        BatchRuleDetails {
            statuses: None,
            job_names: None,
            job_queues: None,
        }
    }
}

/// Used to create and operate on AWS resources related to monitoring Batch jobs.
///
/// At the moment the `Watcher` struct only takes in a AWS region to indicate
/// where to operate in.
#[must_use]
pub struct Watcher {
    region: Region,
}

impl Default for Watcher {
    fn default() -> Self {
        Watcher {
            region: Region::default(),
        }
    }
}

impl Watcher {
    /// Creates a topic when `topic_arn` is None and suscribes to it using the email provided.
    /// The method will skip the topic creation step whenever the `topic_arn` field `is_some()`.
    ///  `subscribe` will return back a tuple of the form (topic_arn, subscribtion_arn).
    ///
    /// ```rust,no_run
    /// # use watchrs::Watcher;
    /// let mut watcher = Watcher::default();
    /// let email = "emailtosubscribe@example.com".to_owned();
    /// let (topic_arn, subscription_arn) = watcher.subscribe(email, None).unwrap();
    /// ```
    pub fn subscribe(
        &self,
        email: String,
        topic_arn: Option<String>,
    ) -> Result<(String, String), WatchError> {
        // only create topic if needed
        let arn = if topic_arn.is_none() {
            self.create_topic()?
        } else {
            topic_arn.expect("missing topic_arn")
        };

        // susbscribe to email and return (topic arn, subscribtion_arn)
        self.subscribe_email(arn.clone(), email)
            .map(|subscription_arn| (arn, subscription_arn))
    }

    /// Unsubscribes an email from a topic using the subscription_arn. The topic will
    /// also be deleted when the `deleted_topic` is set. If the deleted flag is set the
    /// topic arn must be `Some`. Keep in mind that deleting a topic will also delete
    /// all of its subscriptions.
    /// ```rust, no_run
    /// # use watchrs::Watcher;
    /// let mut watcher = Watcher::default();
    /// watcher.unsubscribe("validsubscriptionarn".to_owned(), false, None).unwrap();
    /// ```
    pub fn unsubscribe(
        &self,
        subscription_arn: String,
        delete_topic: bool,
        topic_arn: Option<String>,
    ) -> Result<(), WatchError> {
        let sns_client = SnsClient::new(self.region.clone());

        // Error case, we cant delete a topic we dont have an arn for
        if delete_topic && topic_arn.is_none() {
            Err(WatchError::SNSTopic(
                "topic arn should be `Some` when `deleted_flag` is true".to_owned(),
            ))
        }
        // just delete the topic if the flag is specified. This is okay to do since
        // deleting a topic will also remove all subscriptions.
        else if delete_topic && topic_arn.is_some() {
            let topic_arn = topic_arn.expect("missing topic_arn");
            sns_client
                .delete_topic(DeleteTopicInput {
                    topic_arn: topic_arn.clone(),
                })
                .sync()
                .map_err(|err| {
                    error!("error deleting topic {}, err: {}", topic_arn, err);
                    WatchError::SNSSubscription(err.to_string())
                })
        } else {
            sns_client
                .unsubscribe(UnsubscribeInput {
                    subscription_arn: subscription_arn.clone(),
                })
                .sync()
                .map_err(|err| {
                    error!(
                        "error unsubscribing from {}, err: {}",
                        subscription_arn, err
                    );
                    WatchError::SNSSubscription(err.to_string())
                })
        }
    }
    /// Creates a Cloudwatch Event Rule that watches for AWS Batch job state changes.
    ///
    /// You can create a more restrictive filter expression by using the status, job_queues, or
    /// job_name fields. The rule created will contain these fields in a details struct. An example of
    /// a `details` field looks like below. You can get more info on creating rule expressions for
    /// batch jobs [here](https://docs.aws.amazon.com/batch/latest/userguide/batch_cwe_events.html).
    ///
    /// ### Event Detail
    ///
    ///```json
    /// "detail": {
    ///     "jobName": "event-test",
    ///     "jobId": "4c7599ae-0a82-49aa-ba5a-4727fcce14a8",
    ///     "jobQueue": "arn:aws:batch:us-east-1:aws_account_id:job-queue/HighPriority",
    /// }
    ///```
    ///
    ///### Example
    ///
    /// ```rust, no_run
    /// # use watchrs::Watcher;
    /// let mut watcher = Watcher::default();
    /// watcher.create_job_watcher_rule(
    ///     "my_batch_job_rule".to_owned(),
    ///     true,
    ///     Some("watch failed jobs".to_owned()),
    ///     Some(vec!["FAILED".to_owned()]),
    ///     None,
    ///     None).unwrap();
    /// ```
    pub fn create_job_watcher_rule(
        &self,
        rule_name: String,
        enable: bool,
        rule_description: Option<String>,
        statuses: Option<Vec<String>>,
        job_queues: Option<Vec<String>>,
        job_names: Option<Vec<String>>,
    ) -> Result<String, WatchError> {
        let events_client = CloudWatchEventsClient::new(self.region.clone());
        let enable_str = if enable { "ENABLED" } else { "DISABLED" };

        let rule_details = BatchRuleDetails {
            statuses,
            job_queues,
            job_names,
        };

        let details_json = serde_json::to_string(&rule_details);

        if details_json.is_err() {
            return Err(WatchError::EventRule(
                "failed to serialize batch rule details".to_owned(),
            ));
        }

        let mut event_pattern = r#"
        "detail-type": [
            "Batch Job State Change"
        ],
        "source": [
            "aws.batch"
        ]
        "#
        .to_owned();

        // only add the details str if its not empty. AWS does not allow empty fields.
        if BatchRuleDetails::default() != rule_details {
            event_pattern = format!(
                r#"{{
                    {},
                    "detail": {}
                    }}
            "#,
                event_pattern,
                details_json.expect("err with details json")
            );
        } else {
            event_pattern = format!("{{{}}}", event_pattern);
        }

        match events_client
            .put_rule(PutRuleRequest {
                name: rule_name.clone(),
                description: rule_description,
                state: Some(enable_str.to_owned()),
                event_pattern: Some(event_pattern),
                role_arn: None,
                ..PutRuleRequest::default()
            })
            .sync()
        {
            Ok(_) => {
                info!("Succesfully put rule: {}", rule_name.clone());
                Ok(rule_name)
            }
            Err(err) => {
                error!("error putting rule: {}", err);
                Err(WatchError::EventRule(err.to_string()))
            }
        }
    }

    /// Creates a Cloudwatch Event Target pointed to the SNS topic with the provided arn.
    /// The method will also attach the rule, `rule_name` to the event target.
    ///
    /// ```rust, no_run
    /// # use watchrs::Watcher;
    /// let mut watcher = Watcher::default();
    /// let subscription_arn = "validsubscriptionarn".to_owned();
    /// watcher.create_sns_target(
    ///     "my_batch_job_rule".to_owned(),
    ///     "watchrs_topic_arn".to_owned()
    /// ).unwrap();
    /// ```
    pub fn create_sns_target(
        &self,
        rule_name: String,
        topic_arn: String,
    ) -> Result<(), WatchError> {
        let events_client = CloudWatchEventsClient::new(self.region.clone());

        let now = Utc::now();
        let (year, month, day, hour) = (now.year(), now.month(), now.day(), now.hour());
        let target_id = format!("watchrs_sns_target_{}_{}_{}_{}", year, month, day, hour);

        let sns_target = Target {
            id: target_id,
            arn: topic_arn.clone(),
            ..Target::default()
        };

        events_client
            .put_targets(PutTargetsRequest {
                rule: rule_name.clone(),
                targets: vec![sns_target],
            })
            .sync()
            .map_err(|err| {
                error!("error putting targets: {}", err);
                WatchError::EventTarget(err.to_string())
            })
            .and_then(|resp| {
                let failed_entries = resp.failed_entries.unwrap_or_default();
                if !failed_entries.is_empty() {
                    error!("failed to put targets: {:?}", failed_entries);
                    Err(WatchError::EventTarget(format!(
                        "failed entries: {:?}",
                        failed_entries
                    )))
                } else {
                    info!(
                        "Succesfully put target with rule: {}, on {}",
                        rule_name, topic_arn
                    );
                    Ok(())
                }
            })
    }

    /// Sets the AWS region a `Watcher` instance should operate in. The default region
    /// is us-east-1. More information on supported services and AWS regions
    /// can be found [here](https://docs.aws.amazon.com/general/latest/gr/rande.html).
    pub fn set_region(&mut self, region: Region) {
        self.region = region
    }

    /// creates a topic
    fn create_topic(&self) -> Result<String, WatchError> {
        let sns_client = SnsClient::new(self.region.clone());

        let now = Utc::now();
        let (year, month, day, hour) = (now.year(), now.month(), now.day(), now.hour());

        // discard everything after @
        // add watchrs + date
        let topic_name = &format!("watchrs_{}_{}_{}_{}", year, month, day, hour).to_owned();
        let mut attributes = HashMap::new();
        let sns_access_policy = format!(
            r#"
        {{
            "Id": "AWSSNSCWEIntegration",
            "Statement": [
                {{
                    "Action": [
                        "SNS:GetTopicAttributes",
                        "SNS:SetTopicAttributes",
                        "SNS:AddPermission",
                        "SNS:RemovePermission",
                        "SNS:DeleteTopic",
                        "SNS:Subscribe",
                        "SNS:ListSubscriptionsByTopic",
                        "SNS:Publish",
                        "SNS:Receive"
                    ],
                    "Effect": "Allow",
                    "Principal": {{
                        "AWS": "*"
                    }},
                    "Resource": "arn:aws:sns:{0}:*:{1}",
                    "Sid": "AWSSNSAccess"
                }},
                {{
                    "Sid": "PublishEventsToSNS",
                    "Effect": "Allow",
                    "Principal": {{
                        "Service": "events.amazonaws.com"
                     }},
                    "Action": "sns:Publish",
                    "Resource": "arn:aws:sns:{0}:*:{1}"
                }}
            ],
            "Version": "2008-10-17"
        }}"#,
            self.region.name(),
            topic_name.clone()
        );

        // TODO: Fix this when the bug in rusoto is fixed. or fix rusoto yourself!
        // Key and Val are reversed
        attributes.insert(sns_access_policy.to_owned(), "Policy".to_owned());

        sns_client
            .create_topic(CreateTopicInput {
                attributes: Some(attributes),
                name: topic_name.to_owned(),
            })
            .sync()
            .map_err(|err| {
                error!("error creating topic: {:?}", err);
                WatchError::SNSTopic(err.to_string())
            })
            .and_then(|resp| {
                if let Some(topic_arn) = resp.topic_arn {
                    info!("Succesfully created topic {}", topic_arn.clone());
                    Ok(topic_arn)
                } else {
                    Err(WatchError::SNSTopic("error creating topic".to_owned()))
                }
            })
    }

    /// Subscribes the given email to a topic
    fn subscribe_email(&self, topic_arn: String, email: String) -> Result<String, WatchError> {
        let sns_client = SnsClient::new(self.region.clone());
        let sub_input = SubscribeInput {
            protocol: "email".to_owned(),
            endpoint: Some(email.clone()),
            topic_arn: topic_arn.clone(),
            ..SubscribeInput::default()
        };
        sns_client
            .subscribe(sub_input)
            .sync()
            .map_err(|err| {
                error!("error creating topic: {}", err);
                WatchError::SNSSubscription(err.to_string())
            })
            .and_then(|resp| {
                if let Some(subscription_arn) = resp.subscription_arn {
                    Ok(subscription_arn)
                } else {
                    Err(WatchError::EventRule(
                        "err retreiving subscription arn".to_owned(),
                    ))
                }
            })
    }
}