echoloc 0.1.10

Generated by template 'rustyhorde-lib-template' by cargo-generate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
// Copyright © 2019 echoloc developers
//
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

//! Echo Event
//!
//! [Echo Event Type](https://confluence.kroger.com/confluence/display/ECHO/Echo+Event+Format)
//!
//! # Example
//! ```
//! ```

use crate::error::{Err, ErrKind};
use crate::runnable::Runnable;
use getset::Setters;
use reqwest::{
    header::{CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT},
    Client,
};
use serde::{Serialize as Ser, Serializer};
use serde_derive::Serialize;
use slog::{error, trace, Logger};
use slog_try::{try_error, try_trace};
use std::collections::HashMap;
use std::error::Error;
use uuid::Uuid;

/// The Echo messages urls
#[derive(Clone, Copy, Debug, PartialEq, Serialize)]
pub enum CollectorUrl {
    /// The stage url (https://echocollector-stage.kroger.com/echo/messages)
    Stage,
    /// The prod url (https://echocollector.kroger.com/echo/messages)
    Prod,
}

impl Default for CollectorUrl {
    fn default() -> Self {
        CollectorUrl::Stage
    }
}

impl CollectorUrl {
    /// Convert the enum to a str
    pub fn as_str(self) -> &'static str {
        match self {
            CollectorUrl::Stage => "https://echocollector-stage.kroger.com/echo/messages",
            CollectorUrl::Prod => "https://echocollector.kroger.com/echo/messages",
        }
    }
}

/// The payload for sending a batch of Echo `Event`s
#[derive(Clone, Debug, Default, Setters)]
pub struct Payload {
    /// The collector url to use
    #[set = "pub"]
    url: CollectorUrl,
    /// The batch of events to send
    #[set = "pub"]
    events: Vec<Event>,
    /// An optional `slog` logger
    #[set = "pub"]
    logger: Option<Logger>,
    /// An error count for retries, this is not serialized.
    #[set = "crate"]
    error_count: usize,
    /// The retry count if an error occurred sending the batch
    #[set = "pub"]
    retry_count: usize,
}

impl Runnable for Payload {
    type Ok = ();
    type Error = Err;

    fn run(&mut self) -> Result<Self::Ok, Self::Error> {
        let events_clone = self.events.clone();
        let json = serde_json::to_string(&events_clone)?;
        let client = Client::builder()
            .danger_accept_invalid_certs(true)
            .danger_accept_invalid_hostnames(true)
            .build()?;
        let length = json.as_bytes().len();

        let resp = client
            .post(self.url.as_str())
            .header(USER_AGENT, "curl/7.54.0")
            .header(CONTENT_TYPE, "application/json")
            .header(CONTENT_LENGTH, length)
            .body(json)
            .send()?;

        if resp.status().is_success() {
            Ok(())
        } else {
            try_error!(self.logger, "Echo Event could not be sent");
            self.error_count += 1;
            Err(ErrKind::Run.into())
        }
    }

    fn should_retry(&self, error: &Self::Error) -> bool {
        try_error!(self.logger, "Error: {}", error);
        if let Some(cause) = error.source() {
            if let Some(err) = cause.downcast_ref::<ErrKind>() {
                if let ErrKind::Run = err {
                    try_error!(self.logger, "`ErrKind::Run` error - Checking for retry");
                    if self.error_count < self.retry_count {
                        try_trace!(self.logger, "Retrying failed event");
                        true
                    } else {
                        try_error!(self.logger, "Too many retries, bailing");
                        try_error!(self.logger, "Echo Events NOT sent successfully");
                        false
                    }
                } else {
                    try_error!(self.logger, "Source isn't `ErrKind::Run`: {}", err);
                    false
                }
            } else {
                try_error!(self.logger, "Error source isn't `ErrKind`");
                false
            }
        } else {
            try_error!(self.logger, "No error source");
            false
        }
    }

    fn store_result(&mut self, _result: Result<Self::Ok, Self::Error>) {}
}

/// An Echo Event
#[derive(Clone, Debug, Default, PartialEq, Serialize, Setters)]
pub struct Event {
    /// The routing_key is what identifies the message with an application. It will become the ElasticSearch index.
    /// Valid characters are lowercase alpha numeric and '-'.
    /// The key should follow the format <application group>-<application name>-<environment>.
    #[serde(rename = "routingKey")]
    routing_key: String,
    /// Echo Event Type
    #[serde(rename = "type")]
    #[set = "pub"]
    event_type: EventType,
    /// A simple string message.  Most messages should be one line of information.  If you have secondary, deeper information to store, put it in the `message_detail`.
    ///
    /// This field holds the data when the tail appender or default log appender is used.
    message: String,
    /// The correlation id
    #[set = "pub"]
    #[serde(rename = "correlationId", skip_serializing_if = "Option::is_none")]
    correlation_id: Option<Uuid>,
    /// The timestamp of the event.  If unset, it will be set by the EchoClient.
    ///
    /// If producing your own messages, the format of the date should be either of:
    ///
    /// * An ISO-8601 date/time string (e.g. 2017-04-06T17:23:00-04:00)
    /// * A number representing milliseconds since epoch (e.g. 1491514054000)
    ///
    #[set = "pub"]
    #[serde(skip_serializing_if = "Option::is_none")]
    timestamp: Option<i64>,
    /// A place to store custom key/value pairs in the message, typically used when there isn't an appropriate root-level field.
    #[set = "pub"]
    #[serde(rename = "messageDetail", skip_serializing_if = "Option::is_none")]
    message_detail: Option<HashMap<String, String>>,
    /// Hostname where the message originated. If None, it will be set by the EchoClient.
    #[serde(skip_serializing_if = "Option::is_none")]
    host: Option<String>,
    /// Sets the version of the application that is creating this message.
    #[serde(rename = "applicationVersion", skip_serializing_if = "Option::is_none")]
    application_version: Option<String>,
    /// Sets the datacenter that the application is in, based on DCPloy environment settings.
    #[serde(rename = "dataCenter", skip_serializing_if = "Option::is_none")]
    data_center: Option<String>,
    /// The hostname of a client if this message is involving an external system calling into your system.
    #[serde(rename = "clientHostName", skip_serializing_if = "Option::is_none")]
    client_host_name: Option<String>,
    /// The hostname of a destination system if this message is involving your system calling an external system.
    #[serde(
        rename = "destinationHostName",
        skip_serializing_if = "Option::is_none"
    )]
    destination_host_name: Option<String>,
    /// The path being called on a destination system if this message is involving your system calling an external system.
    #[serde(rename = "destinationPath", skip_serializing_if = "Option::is_none")]
    destination_path: Option<String>,
    /// Sets the timestamp of millis since the epoch for the time at which this event started.
    #[serde(rename = "startTimestamp", skip_serializing_if = "Option::is_none")]
    #[set = "pub"]
    start_timestamp: Option<u64>,
    /// Sets the timestamp of millis since the epoch for the time at which this event finished.
    #[serde(rename = "finishTimestamp", skip_serializing_if = "Option::is_none")]
    #[set = "pub"]
    finish_timestamp: Option<u64>,
    /// Sets the duration (time in milliseconds) that passed during this event.
    #[serde(skip_serializing_if = "Option::is_none")]
    #[set = "pub"]
    duration: Option<u64>,
    /// Sets the duration (time in milliseconds) that passed during this event.
    #[serde(rename = "durationInMs", skip_serializing_if = "Option::is_none")]
    #[set = "pub"]
    duration_in_ms: Option<u64>,
    /// The HTTP response code returned by a performance event.
    #[serde(rename = "responseCode", skip_serializing_if = "Option::is_none")]
    #[set = "pub"]
    response_code: Option<u16>,
    /// A more generic response used when a HTTP response code doesn't make sense. Typical values might be "success" or "failure".
    #[serde(skip_serializing_if = "Option::is_none")]
    #[set = "pub"]
    response: Option<Response>,
}

impl Event {
    /// Set the routing key field
    pub fn set_routing_key<T>(&mut self, routing_key: T) -> &mut Self
    where
        T: Into<String>,
    {
        self.routing_key = routing_key.into();
        self
    }

    /// Set the message field
    pub fn set_message<T>(&mut self, message: T) -> &mut Self
    where
        T: Into<String>,
    {
        self.message = message.into();
        self
    }

    /// Set the host field
    pub fn set_host<T>(&mut self, host: Option<T>) -> &mut Self
    where
        T: Into<String>,
    {
        self.host = match host {
            None => None,
            Some(t) => Some(t.into()),
        };
        self
    }

    /// Set the application version field
    pub fn set_application_version<T>(&mut self, application_version: Option<T>) -> &mut Self
    where
        T: Into<String>,
    {
        self.application_version = match application_version {
            None => None,
            Some(t) => Some(t.into()),
        };
        self
    }

    /// Set the datacenter field
    pub fn set_data_center<T>(&mut self, data_center: Option<T>) -> &mut Self
    where
        T: Into<String>,
    {
        self.data_center = match data_center {
            None => None,
            Some(t) => Some(t.into()),
        };
        self
    }

    /// Set the client host name
    pub fn set_client_host_name<T>(&mut self, client_host_name: Option<T>) -> &mut Self
    where
        T: Into<String>,
    {
        self.client_host_name = match client_host_name {
            None => None,
            Some(t) => Some(t.into()),
        };
        self
    }

    /// Set the destination host name
    pub fn set_destination_host_name<T>(&mut self, destination_host_name: Option<T>) -> &mut Self
    where
        T: Into<String>,
    {
        self.destination_host_name = match destination_host_name {
            None => None,
            Some(t) => Some(t.into()),
        };
        self
    }

    /// Set the destination path
    pub fn set_destination_path<T>(&mut self, destination_path: Option<T>) -> &mut Self
    where
        T: Into<String>,
    {
        self.destination_path = match destination_path {
            None => None,
            Some(t) => Some(t.into()),
        };
        self
    }
}

/// Echo Event Type
///
/// The following types are currently recognized:
///
/// * ERROR - Any message that should be associated with a non-normal action or situation that the system processed.
/// * INFO - Any message that should be associated with a normal action or situation that the system processed.
/// * PERFORMANCE - Any message that associates speed or time taken with which any action or situation that the system processed.
/// * TRACKING - Any message that tries to correlate two (or more) events or data points that is not associated.
/// * SYSTEM - Internally used for client machine performance data (CPU utilization, JVM heap usage, ect)
///
/// Additional types may be added in the future.
///
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum EventType {
    /// ERROR
    Error,
    /// INFO
    Info,
    /// PERFORMANCE
    Performance,
    /// TRACKING
    Tracking,
    /// SYSTEM
    System,
}

impl Default for EventType {
    fn default() -> Self {
        EventType::Info
    }
}

impl Ser for EventType {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        match *self {
            EventType::Error => serializer.serialize_str("ERROR"),
            EventType::Info => serializer.serialize_str("INFO"),
            EventType::Performance => serializer.serialize_str("PERFORMANCE"),
            EventType::Tracking => serializer.serialize_str("TRACKING"),
            EventType::System => serializer.serialize_str("SYSTEM"),
        }
    }
}

/// A more generic response used when a HTTP response code doesn't make sense. Typical values might be "success" or "failure".
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Response {
    /// Success
    Success,
    /// Failure
    Failure,
}

impl Default for Response {
    fn default() -> Self {
        Response::Success
    }
}

impl Ser for Response {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        match *self {
            Response::Success => serializer.serialize_str("success"),
            Response::Failure => serializer.serialize_str("failure"),
        }
    }
}

#[cfg(test)]
mod test {
    use super::{Event, EventType, Response};
    use crate::error::Result;
    use chrono::{offset::TimeZone, Utc};
    use std::collections::HashMap;
    use uuid::Uuid;

    #[test]
    fn serialize_default() -> Result<()> {
        let echo_event = Event::default();
        let result = serde_json::to_string(&echo_event)?;
        assert_eq!(result, r#"{"routingKey":"","type":"INFO","message":""}"#);
        Ok(())
    }

    #[test]
    fn with_message() -> Result<()> {
        let mut echo_event = Event::default();
        let _ = echo_event.set_message("testing");
        let result = serde_json::to_string(&echo_event)?;
        assert_eq!(
            result,
            r#"{"routingKey":"","type":"INFO","message":"testing"}"#
        );
        Ok(())
    }

    #[test]
    fn with_type() -> Result<()> {
        let mut echo_event = Event::default();
        let _ = echo_event.set_event_type(EventType::Performance);
        let result = serde_json::to_string(&echo_event)?;
        assert_eq!(
            result,
            r#"{"routingKey":"","type":"PERFORMANCE","message":""}"#
        );
        Ok(())
    }

    #[test]
    fn full() -> Result<()> {
        let mut echo_event = Event::default();
        let _ = echo_event.set_routing_key("atlas-dev-promises");
        let _ = echo_event.set_event_type(EventType::System);
        let _ = echo_event.set_message("testing");
        let _ = echo_event.set_correlation_id(Some(Uuid::parse_str(
            "35F3E1D6-D859-4AA0-8C58-2CDFE97A4710",
        )?));
        let _ = echo_event.set_timestamp(Some(
            Utc.ymd(1976, 3, 22)
                .and_hms_milli(0, 0, 1, 666)
                .timestamp_millis(),
        ));
        let mut message_detail = HashMap::new();
        let _ = message_detail.insert("a", "b");
        let _ = echo_event.set_message_detail(Some(
            message_detail
                .iter_mut()
                .map(|(k, v)| (k.to_string(), v.to_string()))
                .collect(),
        ));
        let _ = echo_event.set_host(Some("host"));
        let _ = echo_event.set_application_version(Some("1.2.3"));
        let _ = echo_event.set_data_center(Some("cdc"));
        let _ = echo_event.set_client_host_name(Some("blah"));
        let _ = echo_event.set_destination_host_name(Some("blah1"));
        let _ = echo_event.set_destination_path(Some("yoda"));
        let _ = echo_event.set_start_timestamp(Some(1));
        let _ = echo_event.set_finish_timestamp(Some(2));
        let _ = echo_event.set_duration(Some(3));
        let _ = echo_event.set_duration_in_ms(Some(4));
        let _ = echo_event.set_response_code(Some(200));
        let _ = echo_event.set_response(Some(Response::Failure));

        let result = serde_json::to_string(&echo_event)?;
        assert_eq!(
            result,
            r#"{"routingKey":"atlas-dev-promises","type":"SYSTEM","message":"testing","correlationId":"35f3e1d6-d859-4aa0-8c58-2cdfe97a4710","timestamp":196300801666,"messageDetail":{"a":"b"},"host":"host","applicationVersion":"1.2.3","dataCenter":"cdc","clientHostName":"blah","destinationHostName":"blah1","destinationPath":"yoda","startTimestamp":1,"finishTimestamp":2,"duration":3,"durationInMs":4,"responseCode":200,"response":"failure"}"#
        );
        Ok(())
    }
}