use crate::compiler::prelude::*;
use chrono::{DateTime, Utc, serde::ts_milliseconds};
use serde::Deserialize;
use std::collections::BTreeMap;
#[derive(Debug, Deserialize, Clone, Copy)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE", deny_unknown_fields)]
enum AwsCloudWatchLogsSubscriptionMessageType {
ControlMessage,
DataMessage,
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct AwsCloudWatchLogEvent {
id: String,
#[serde(with = "ts_milliseconds")]
timestamp: DateTime<Utc>,
message: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct AwsCloudWatchLogsSubscriptionMessage {
owner: String,
message_type: AwsCloudWatchLogsSubscriptionMessageType,
log_group: String,
log_stream: String,
subscription_filters: Vec<String>,
log_events: Vec<AwsCloudWatchLogEvent>,
}
impl AwsCloudWatchLogsSubscriptionMessageType {
#[must_use]
fn as_str(self) -> &'static str {
match self {
AwsCloudWatchLogsSubscriptionMessageType::ControlMessage => "CONTROL_MESSAGE",
AwsCloudWatchLogsSubscriptionMessageType::DataMessage => "DATA_MESSAGE",
}
}
}
fn parse_aws_cloudwatch_log_subscription_message(bytes: Value) -> Resolved {
let bytes = bytes.try_bytes()?;
let message = serde_json::from_slice::<AwsCloudWatchLogsSubscriptionMessage>(&bytes)
.map_err(|e| format!("unable to parse: {e}"))?;
let map = Value::from(BTreeMap::from([
(KeyString::from("owner"), Value::from(message.owner)),
(
KeyString::from("message_type"),
Value::from(message.message_type.as_str()),
),
(KeyString::from("log_group"), Value::from(message.log_group)),
(
KeyString::from("log_stream"),
Value::from(message.log_stream),
),
(
KeyString::from("subscription_filters"),
Value::from(message.subscription_filters),
),
(
KeyString::from("log_events"),
Value::Array(
message
.log_events
.into_iter()
.map(|event| {
Value::from(BTreeMap::from([
(KeyString::from("id"), Value::from(event.id)),
(KeyString::from("timestamp"), Value::from(event.timestamp)),
(KeyString::from("message"), Value::from(event.message)),
]))
})
.collect::<Vec<Value>>(),
),
),
]));
Ok(map)
}
#[derive(Clone, Copy, Debug)]
pub struct ParseAwsCloudWatchLogSubscriptionMessage;
impl Function for ParseAwsCloudWatchLogSubscriptionMessage {
fn identifier(&self) -> &'static str {
"parse_aws_cloudwatch_log_subscription_message"
}
fn usage(&self) -> &'static str {
"Parses AWS CloudWatch Logs events (configured through AWS Cloudwatch subscriptions) from the `aws_kinesis_firehose` source."
}
fn category(&self) -> &'static str {
Category::Parse.as_ref()
}
fn internal_failure_reasons(&self) -> &'static [&'static str] {
&["`value` is not a properly formatted AWS CloudWatch Log subscription message."]
}
fn return_kind(&self) -> u16 {
kind::OBJECT
}
fn examples(&self) -> &'static [Example] {
&[example! {
title: "Parse AWS Cloudwatch Log subscription message",
source: indoc! {r#"
parse_aws_cloudwatch_log_subscription_message!(s'{
"messageType": "DATA_MESSAGE",
"owner": "111111111111",
"logGroup": "test",
"logStream": "test",
"subscriptionFilters": [
"Destination"
],
"logEvents": [
{
"id": "35683658089614582423604394983260738922885519999578275840",
"timestamp": 1600110569039,
"message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41-0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
}
]
}')
"#},
result: Ok(indoc! {r#"{
"log_events": [{
"id": "35683658089614582423604394983260738922885519999578275840",
"message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41-0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}",
"timestamp": "2020-09-14T19:09:29.039Z"}
],
"log_group": "test",
"log_stream": "test",
"message_type": "DATA_MESSAGE",
"owner": "111111111111",
"subscription_filters": ["Destination"]
}"#}),
}]
}
fn compile(
&self,
_state: &state::TypeState,
_ctx: &mut FunctionCompileContext,
arguments: ArgumentList,
) -> Compiled {
let value = arguments.required("value");
Ok(ParseAwsCloudWatchLogSubscriptionMessageFn { value }.as_expr())
}
fn parameters(&self) -> &'static [Parameter] {
const PARAMETERS: &[Parameter] = &[Parameter::required(
"value",
kind::BYTES,
"The string representation of the message to parse.",
)];
PARAMETERS
}
}
#[derive(Debug, Clone)]
struct ParseAwsCloudWatchLogSubscriptionMessageFn {
value: Box<dyn Expression>,
}
impl FunctionExpression for ParseAwsCloudWatchLogSubscriptionMessageFn {
fn resolve(&self, ctx: &mut Context) -> Resolved {
let bytes = self.value.resolve(ctx)?;
parse_aws_cloudwatch_log_subscription_message(bytes)
}
fn type_def(&self, _: &state::TypeState) -> TypeDef {
TypeDef::object(inner_kind()).fallible()
}
}
fn inner_kind() -> BTreeMap<Field, Kind> {
BTreeMap::from([
(Field::from("owner"), Kind::bytes()),
(Field::from("message_type"), Kind::bytes()),
(Field::from("log_group"), Kind::bytes()),
(Field::from("log_stream"), Kind::bytes()),
(
Field::from("subscription_filters"),
Kind::array({
let mut v = Collection::any();
v.set_unknown(Kind::bytes());
v
}),
),
(
Field::from("log_events"),
Kind::array(Collection::from_unknown(Kind::object(BTreeMap::from([
(Field::from("id"), Kind::bytes()),
(Field::from("timestamp"), Kind::timestamp()),
(Field::from("message"), Kind::bytes()),
])))),
),
])
}
#[cfg(test)]
mod tests {
use chrono::{TimeZone, Utc};
use super::*;
test_function![
parse_aws_cloudwatch_log_subscription_message => ParseAwsCloudWatchLogSubscriptionMessage;
invalid_type {
args: func_args![value: "42"],
want: Err("unable to parse: invalid type: integer `42`, expected struct AwsCloudWatchLogsSubscriptionMessage at line 1 column 2"),
tdef: TypeDef::object(inner_kind()).fallible(),
}
string {
args: func_args![value: r#"
{
"messageType": "DATA_MESSAGE",
"owner": "071959437513",
"logGroup": "/jesse/test",
"logStream": "test",
"subscriptionFilters": [
"Destination"
],
"logEvents": [
{
"id": "35683658089614582423604394983260738922885519999578275840",
"timestamp": 1600110569039,
"message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
},
{
"id": "35683658089659183914001456229543810359430816722590236673",
"timestamp": 1600110569041,
"message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
}
]
}
"#],
want: Ok(Value::from_iter([
(String::from("owner"), Value::from("071959437513")),
(String::from("message_type"), Value::from("DATA_MESSAGE")),
(String::from("log_group"), Value::from("/jesse/test")),
(String::from("log_stream"), Value::from("test")),
(String::from("subscription_filters"), Value::from(vec![Value::from("Destination")])),
(String::from("log_events"), Value::from(vec![
Value::from_iter([
(String::from("id"), Value::from( "35683658089614582423604394983260738922885519999578275840")),
(String::from("timestamp"), Value::from(Utc.timestamp_opt(1_600_110_569, 39_000_000).single().expect("invalid timestamp"))),
(String::from("message"), Value::from("{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}")),
]),
Value::from_iter([
(String::from("id"), Value::from("35683658089659183914001456229543810359430816722590236673")),
(String::from("timestamp"), Value::from(Utc.timestamp_opt(1_600_110_569, 41_000_000).single().expect("invalid timestamp"))),
(String::from("message"), Value::from("{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}")),
])
])),
])),
tdef: TypeDef::object(inner_kind()).fallible(),
}
invalid_value {
args: func_args![value: "{ INVALID }"],
want: Err("unable to parse: key must be a string at line 1 column 3"),
tdef: TypeDef::object(inner_kind()).fallible(),
}
];
}