1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
extern crate chrono;
extern crate log;
extern crate rusoto_core;
extern crate rusoto_logs;

use chrono::Utc;
use log::{Level, Log, Metadata, Record, SetLoggerError};
use rusoto_core::Region;
use rusoto_logs::{
    CloudWatchLogs, CloudWatchLogsClient, DescribeLogStreamsRequest, InputLogEvent,
    PutLogEventsRequest,
};
use std::cell::Cell;
use std::sync::Mutex;

struct CloudWatchLogger {
    level: Level,
    client: CloudWatchLogsClient,
    log_group: String,
    log_stream: String,
    sequence_token: Mutex<Cell<Option<String>>>,
}

impl Log for CloudWatchLogger {
    fn enabled(&self, metadata: &Metadata) -> bool {
        metadata.level() <= self.level
    }

    fn log(&self, record: &Record) {
        if self.enabled(record.metadata()) {
            let message = format!("{:<5} {}", record.level().to_string(), record.args());

            let input_log_event = InputLogEvent {
                message: message,
                timestamp: Utc::now().timestamp_millis(), // milliseconds epoch
            };
            let seq = self.sequence_token.lock().unwrap();
            let put_log_events_request = PutLogEventsRequest {
                log_events: vec![input_log_event], // > 1 must sort by timestamp ASC
                log_group_name: self.log_group.clone(),
                log_stream_name: self.log_stream.clone(),
                sequence_token: seq.replace(None),
            };
            let resp = self.client.put_log_events(put_log_events_request).sync();

            match resp {
                Ok(r) => seq.set(r.next_sequence_token),
                Err(e) => println!("{:#?}", e),
            }
        }
    }

    fn flush(&self) {}
}

pub fn init_with_level(
    level: Level,
    region: Region,
    log_group: String,
    log_stream: String,
) -> Result<(), SetLoggerError> {
    let client = CloudWatchLogsClient::new(region);

    let mut desc_streams_req: DescribeLogStreamsRequest = Default::default();
    desc_streams_req.log_group_name = log_group.clone();
    let streams_resp = client.describe_log_streams(desc_streams_req).sync();
    let log_streams = streams_resp.unwrap().log_streams.unwrap();
    let stream = &log_streams
        .iter()
        .find(|s| s.log_stream_name == Some(log_stream.clone()))
        .unwrap();
    let sequence_token = stream.upload_sequence_token.clone();

    let logger = CloudWatchLogger {
        level,
        client,
        log_group,
        log_stream,
        sequence_token: Mutex::new(Cell::new(sequence_token)),
    };
    log::set_boxed_logger(Box::new(logger))?;
    log::set_max_level(level.to_level_filter());

    Ok(())
}

pub fn init(region: Region, log_group: String, log_stream: String) -> Result<(), SetLoggerError> {
    init_with_level(Level::Trace, region, log_group, log_stream)
}