use crate::error::{Err, ErrKind};
use crate::runnable::Runnable;
use getset::Setters;
use reqwest::{
header::{CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT},
Client,
};
use serde::{Serialize as Ser, Serializer};
use serde_derive::Serialize;
use slog::{error, trace, Logger};
use slog_try::{try_error, try_trace};
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;
#[derive(Clone, Copy, Debug, PartialEq, Serialize)]
pub enum CollectorUrl {
Stage,
Prod,
}
impl Default for CollectorUrl {
fn default() -> Self {
CollectorUrl::Stage
}
}
impl CollectorUrl {
pub fn as_str(self) -> &'static str {
match self {
CollectorUrl::Stage => "https://echocollector-stage.kroger.com/echo/messages",
CollectorUrl::Prod => "https://echocollector.kroger.com/echo/messages",
}
}
}
#[derive(Clone, Debug, Default, Setters)]
pub struct Payload {
#[set = "pub"]
url: CollectorUrl,
#[set = "pub"]
events: Vec<Event>,
#[set = "pub"]
logger: Option<Logger>,
#[set = "crate"]
error_count: usize,
#[set = "pub"]
retry_count: usize,
}
impl Runnable for Payload {
type Ok = ();
type Error = Err;
fn run(&mut self) -> Result<Self::Ok, Self::Error> {
let events_clone = self.events.clone();
let json = serde_json::to_string(&events_clone)?;
let client = Client::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()?;
let length = json.as_bytes().len();
let resp = client
.post(self.url.as_str())
.header(USER_AGENT, "curl/7.54.0")
.header(CONTENT_TYPE, "application/json")
.header(CONTENT_LENGTH, length)
.body(json)
.send()?;
if resp.status().is_success() {
Ok(())
} else {
try_error!(self.logger, "Echo Event could not be sent");
self.error_count += 1;
Err(ErrKind::Run.into())
}
}
fn should_retry(&self, error: &Self::Error) -> bool {
try_error!(self.logger, "Error: {}", error);
if let Some(cause) = error.source() {
if let Some(err) = cause.downcast_ref::<ErrKind>() {
if let ErrKind::Run = err {
try_error!(self.logger, "`ErrKind::Run` error - Checking for retry");
if self.error_count < self.retry_count {
try_trace!(self.logger, "Retrying failed event");
true
} else {
try_error!(self.logger, "Too many retries, bailing");
try_error!(self.logger, "Echo Events NOT sent successfully");
false
}
} else {
try_error!(self.logger, "Source isn't `ErrKind::Run`: {}", err);
false
}
} else {
try_error!(self.logger, "Error source isn't `ErrKind`");
false
}
} else {
try_error!(self.logger, "No error source");
false
}
}
fn store_result(&mut self, _result: Result<Self::Ok, Self::Error>) {}
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Setters)]
pub struct Event {
#[serde(rename = "routingKey")]
routing_key: String,
#[serde(rename = "type")]
#[set = "pub"]
event_type: EventType,
message: String,
#[set = "pub"]
#[serde(rename = "correlationId", skip_serializing_if = "Option::is_none")]
correlation_id: Option<Uuid>,
#[set = "pub"]
#[serde(skip_serializing_if = "Option::is_none")]
timestamp: Option<i64>,
#[set = "pub"]
#[serde(rename = "messageDetail", skip_serializing_if = "Option::is_none")]
message_detail: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
host: Option<String>,
#[serde(rename = "applicationVersion", skip_serializing_if = "Option::is_none")]
application_version: Option<String>,
#[serde(rename = "dataCenter", skip_serializing_if = "Option::is_none")]
data_center: Option<String>,
#[serde(rename = "clientHostName", skip_serializing_if = "Option::is_none")]
client_host_name: Option<String>,
#[serde(
rename = "destinationHostName",
skip_serializing_if = "Option::is_none"
)]
destination_host_name: Option<String>,
#[serde(rename = "destinationPath", skip_serializing_if = "Option::is_none")]
destination_path: Option<String>,
#[serde(rename = "startTimestamp", skip_serializing_if = "Option::is_none")]
#[set = "pub"]
start_timestamp: Option<u64>,
#[serde(rename = "finishTimestamp", skip_serializing_if = "Option::is_none")]
#[set = "pub"]
finish_timestamp: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
#[set = "pub"]
duration: Option<u64>,
#[serde(rename = "durationInMs", skip_serializing_if = "Option::is_none")]
#[set = "pub"]
duration_in_ms: Option<u64>,
#[serde(rename = "responseCode", skip_serializing_if = "Option::is_none")]
#[set = "pub"]
response_code: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
#[set = "pub"]
response: Option<Response>,
}
impl Event {
pub fn set_routing_key<T>(&mut self, routing_key: T) -> &mut Self
where
T: Into<String>,
{
self.routing_key = routing_key.into();
self
}
pub fn set_message<T>(&mut self, message: T) -> &mut Self
where
T: Into<String>,
{
self.message = message.into();
self
}
pub fn set_host<T>(&mut self, host: Option<T>) -> &mut Self
where
T: Into<String>,
{
self.host = match host {
None => None,
Some(t) => Some(t.into()),
};
self
}
pub fn set_application_version<T>(&mut self, application_version: Option<T>) -> &mut Self
where
T: Into<String>,
{
self.application_version = match application_version {
None => None,
Some(t) => Some(t.into()),
};
self
}
pub fn set_data_center<T>(&mut self, data_center: Option<T>) -> &mut Self
where
T: Into<String>,
{
self.data_center = match data_center {
None => None,
Some(t) => Some(t.into()),
};
self
}
pub fn set_client_host_name<T>(&mut self, client_host_name: Option<T>) -> &mut Self
where
T: Into<String>,
{
self.client_host_name = match client_host_name {
None => None,
Some(t) => Some(t.into()),
};
self
}
pub fn set_destination_host_name<T>(&mut self, destination_host_name: Option<T>) -> &mut Self
where
T: Into<String>,
{
self.destination_host_name = match destination_host_name {
None => None,
Some(t) => Some(t.into()),
};
self
}
pub fn set_destination_path<T>(&mut self, destination_path: Option<T>) -> &mut Self
where
T: Into<String>,
{
self.destination_path = match destination_path {
None => None,
Some(t) => Some(t.into()),
};
self
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum EventType {
Error,
Info,
Performance,
Tracking,
System,
}
impl Default for EventType {
fn default() -> Self {
EventType::Info
}
}
impl Ser for EventType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
EventType::Error => serializer.serialize_str("ERROR"),
EventType::Info => serializer.serialize_str("INFO"),
EventType::Performance => serializer.serialize_str("PERFORMANCE"),
EventType::Tracking => serializer.serialize_str("TRACKING"),
EventType::System => serializer.serialize_str("SYSTEM"),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Response {
Success,
Failure,
}
impl Default for Response {
fn default() -> Self {
Response::Success
}
}
impl Ser for Response {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
Response::Success => serializer.serialize_str("success"),
Response::Failure => serializer.serialize_str("failure"),
}
}
}
#[cfg(test)]
mod test {
use super::{Event, EventType, Response};
use crate::error::Result;
use chrono::{offset::TimeZone, Utc};
use std::collections::HashMap;
use uuid::Uuid;
#[test]
fn serialize_default() -> Result<()> {
let echo_event = Event::default();
let result = serde_json::to_string(&echo_event)?;
assert_eq!(result, r#"{"routingKey":"","type":"INFO","message":""}"#);
Ok(())
}
#[test]
fn with_message() -> Result<()> {
let mut echo_event = Event::default();
let _ = echo_event.set_message("testing");
let result = serde_json::to_string(&echo_event)?;
assert_eq!(
result,
r#"{"routingKey":"","type":"INFO","message":"testing"}"#
);
Ok(())
}
#[test]
fn with_type() -> Result<()> {
let mut echo_event = Event::default();
let _ = echo_event.set_event_type(EventType::Performance);
let result = serde_json::to_string(&echo_event)?;
assert_eq!(
result,
r#"{"routingKey":"","type":"PERFORMANCE","message":""}"#
);
Ok(())
}
#[test]
fn full() -> Result<()> {
let mut echo_event = Event::default();
let _ = echo_event.set_routing_key("atlas-dev-promises");
let _ = echo_event.set_event_type(EventType::System);
let _ = echo_event.set_message("testing");
let _ = echo_event.set_correlation_id(Some(Uuid::parse_str(
"35F3E1D6-D859-4AA0-8C58-2CDFE97A4710",
)?));
let _ = echo_event.set_timestamp(Some(
Utc.ymd(1976, 3, 22)
.and_hms_milli(0, 0, 1, 666)
.timestamp_millis(),
));
let mut message_detail = HashMap::new();
let _ = message_detail.insert("a", "b");
let _ = echo_event.set_message_detail(Some(
message_detail
.iter_mut()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
));
let _ = echo_event.set_host(Some("host"));
let _ = echo_event.set_application_version(Some("1.2.3"));
let _ = echo_event.set_data_center(Some("cdc"));
let _ = echo_event.set_client_host_name(Some("blah"));
let _ = echo_event.set_destination_host_name(Some("blah1"));
let _ = echo_event.set_destination_path(Some("yoda"));
let _ = echo_event.set_start_timestamp(Some(1));
let _ = echo_event.set_finish_timestamp(Some(2));
let _ = echo_event.set_duration(Some(3));
let _ = echo_event.set_duration_in_ms(Some(4));
let _ = echo_event.set_response_code(Some(200));
let _ = echo_event.set_response(Some(Response::Failure));
let result = serde_json::to_string(&echo_event)?;
assert_eq!(
result,
r#"{"routingKey":"atlas-dev-promises","type":"SYSTEM","message":"testing","correlationId":"35f3e1d6-d859-4aa0-8c58-2cdfe97a4710","timestamp":196300801666,"messageDetail":{"a":"b"},"host":"host","applicationVersion":"1.2.3","dataCenter":"cdc","clientHostName":"blah","destinationHostName":"blah1","destinationPath":"yoda","startTimestamp":1,"finishTimestamp":2,"duration":3,"durationInMs":4,"responseCode":200,"response":"failure"}"#
);
Ok(())
}
}