#![deny(missing_docs)]
use log::{error, info};
use rusoto_core::region::Region;
use rusoto_events::{
CloudWatchEvents, CloudWatchEventsClient, PutRuleRequest, PutTargetsRequest, Target,
};
use rusoto_sns::{
CreateTopicInput, DeleteTopicInput, Sns, SnsClient, SubscribeInput, UnsubscribeInput,
};
use std::collections::HashMap;
use chrono::prelude::*;
use serde::Serialize;
#[derive(Hash, Eq, PartialEq, Debug)]
pub enum WatchError {
SNSTopic(String),
SNSSubscription(String),
EventRule(String),
EventTarget(String),
}
#[derive(Hash, Eq, PartialEq, Debug, Serialize)]
struct BatchRuleDetails {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "status")]
statuses: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "jobName")]
job_names: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "jobQueue")]
job_queues: Option<Vec<String>>,
}
impl Default for BatchRuleDetails {
fn default() -> Self {
BatchRuleDetails {
statuses: None,
job_names: None,
job_queues: None,
}
}
}
#[must_use]
pub struct Watcher {
region: Region,
}
impl Default for Watcher {
fn default() -> Self {
Watcher {
region: Region::default(),
}
}
}
impl Watcher {
pub fn subscribe(
&self,
email: String,
topic_arn: Option<String>,
) -> Result<(String, String), WatchError> {
let arn = if topic_arn.is_none() {
self.create_topic()?
} else {
topic_arn.expect("missing topic_arn")
};
self.subscribe_email(arn.clone(), email)
.map(|subscription_arn| (arn, subscription_arn))
}
pub fn unsubscribe(
&self,
subscription_arn: String,
delete_topic: bool,
topic_arn: Option<String>,
) -> Result<(), WatchError> {
let sns_client = SnsClient::new(self.region.clone());
if delete_topic && topic_arn.is_none() {
Err(WatchError::SNSTopic(
"topic arn should be `Some` when `deleted_flag` is true".to_owned(),
))
}
else if delete_topic && topic_arn.is_some() {
let topic_arn = topic_arn.expect("missing topic_arn");
sns_client
.delete_topic(DeleteTopicInput {
topic_arn: topic_arn.clone(),
})
.sync()
.map_err(|err| {
error!("error deleting topic {}, err: {}", topic_arn, err);
WatchError::SNSSubscription(err.to_string())
})
} else {
sns_client
.unsubscribe(UnsubscribeInput {
subscription_arn: subscription_arn.clone(),
})
.sync()
.map_err(|err| {
error!(
"error unsubscribing from {}, err: {}",
subscription_arn, err
);
WatchError::SNSSubscription(err.to_string())
})
}
}
pub fn create_job_watcher_rule(
&self,
rule_name: String,
enable: bool,
rule_description: Option<String>,
statuses: Option<Vec<String>>,
job_queues: Option<Vec<String>>,
job_names: Option<Vec<String>>,
) -> Result<String, WatchError> {
let events_client = CloudWatchEventsClient::new(self.region.clone());
let enable_str = if enable { "ENABLED" } else { "DISABLED" };
let rule_details = BatchRuleDetails {
statuses,
job_queues,
job_names,
};
let details_json = serde_json::to_string(&rule_details);
if details_json.is_err() {
return Err(WatchError::EventRule(
"failed to serialize batch rule details".to_owned(),
));
}
let mut event_pattern = r#"
"detail-type": [
"Batch Job State Change"
],
"source": [
"aws.batch"
]
"#
.to_owned();
if BatchRuleDetails::default() != rule_details {
event_pattern = format!(
r#"{{
{},
"detail": {}
}}
"#,
event_pattern,
details_json.expect("err with details json")
);
} else {
event_pattern = format!("{{{}}}", event_pattern);
}
match events_client
.put_rule(PutRuleRequest {
name: rule_name.clone(),
description: rule_description,
state: Some(enable_str.to_owned()),
event_pattern: Some(event_pattern),
role_arn: None,
..PutRuleRequest::default()
})
.sync()
{
Ok(_) => {
info!("Succesfully put rule: {}", rule_name.clone());
Ok(rule_name)
}
Err(err) => {
error!("error putting rule: {}", err);
Err(WatchError::EventRule(err.to_string()))
}
}
}
pub fn create_sns_target(
&self,
rule_name: String,
topic_arn: String,
) -> Result<(), WatchError> {
let events_client = CloudWatchEventsClient::new(self.region.clone());
let now = Utc::now();
let (year, month, day, hour) = (now.year(), now.month(), now.day(), now.hour());
let target_id = format!("watchrs_sns_target_{}_{}_{}_{}", year, month, day, hour);
let sns_target = Target {
id: target_id,
arn: topic_arn.clone(),
..Target::default()
};
events_client
.put_targets(PutTargetsRequest {
rule: rule_name.clone(),
targets: vec![sns_target],
})
.sync()
.map_err(|err| {
error!("error putting targets: {}", err);
WatchError::EventTarget(err.to_string())
})
.and_then(|resp| {
let failed_entries = resp.failed_entries.unwrap_or_default();
if !failed_entries.is_empty() {
error!("failed to put targets: {:?}", failed_entries);
Err(WatchError::EventTarget(format!(
"failed entries: {:?}",
failed_entries
)))
} else {
info!(
"Succesfully put target with rule: {}, on {}",
rule_name, topic_arn
);
Ok(())
}
})
}
pub fn set_region(&mut self, region: Region) {
self.region = region
}
fn create_topic(&self) -> Result<String, WatchError> {
let sns_client = SnsClient::new(self.region.clone());
let now = Utc::now();
let (year, month, day, hour) = (now.year(), now.month(), now.day(), now.hour());
let topic_name = &format!("watchrs_{}_{}_{}_{}", year, month, day, hour).to_owned();
let mut attributes = HashMap::new();
let sns_access_policy = format!(
r#"
{{
"Id": "AWSSNSCWEIntegration",
"Statement": [
{{
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish",
"SNS:Receive"
],
"Effect": "Allow",
"Principal": {{
"AWS": "*"
}},
"Resource": "arn:aws:sns:{0}:*:{1}",
"Sid": "AWSSNSAccess"
}},
{{
"Sid": "PublishEventsToSNS",
"Effect": "Allow",
"Principal": {{
"Service": "events.amazonaws.com"
}},
"Action": "sns:Publish",
"Resource": "arn:aws:sns:{0}:*:{1}"
}}
],
"Version": "2008-10-17"
}}"#,
self.region.name(),
topic_name.clone()
);
attributes.insert(sns_access_policy.to_owned(), "Policy".to_owned());
sns_client
.create_topic(CreateTopicInput {
attributes: Some(attributes),
name: topic_name.to_owned(),
})
.sync()
.map_err(|err| {
error!("error creating topic: {:?}", err);
WatchError::SNSTopic(err.to_string())
})
.and_then(|resp| {
if let Some(topic_arn) = resp.topic_arn {
info!("Succesfully created topic {}", topic_arn.clone());
Ok(topic_arn)
} else {
Err(WatchError::SNSTopic("error creating topic".to_owned()))
}
})
}
fn subscribe_email(&self, topic_arn: String, email: String) -> Result<String, WatchError> {
let sns_client = SnsClient::new(self.region.clone());
let sub_input = SubscribeInput {
protocol: "email".to_owned(),
endpoint: Some(email.clone()),
topic_arn: topic_arn.clone(),
..SubscribeInput::default()
};
sns_client
.subscribe(sub_input)
.sync()
.map_err(|err| {
error!("error creating topic: {}", err);
WatchError::SNSSubscription(err.to_string())
})
.and_then(|resp| {
if let Some(subscription_arn) = resp.subscription_arn {
Ok(subscription_arn)
} else {
Err(WatchError::EventRule(
"err retreiving subscription arn".to_owned(),
))
}
})
}
}