logscale_log/
logscale_structured_logger.rs

1use std::{
2    collections::HashMap,
3    error::Error,
4    sync::{Arc, Mutex},
5    time::{Duration, SystemTime, UNIX_EPOCH},
6};
7
8use logscale_client::{
9    client::LogScaleClient,
10    models::ingest::{StructuredLogEvent, StructuredLogsIngestRequest},
11};
12use structured_logger::{Builder, Writer};
13
14use crate::options::{LoggerIngestPolicy, LoggerOptions};
15
16type PendingEvents = Arc<Mutex<Vec<StructuredLogEvent>>>;
17
18struct StructuredLogIngester {
19    client: LogScaleClient,
20    pending_events: PendingEvents,
21    ingest_policy: LoggerIngestPolicy,
22}
23
24impl StructuredLogIngester {
25    pub fn new(client: LogScaleClient, ingest_policy: LoggerIngestPolicy) -> Self {
26        Self {
27            client,
28            pending_events: Arc::from(Mutex::new(Vec::new())),
29            ingest_policy,
30        }
31    }
32
33    pub fn ingest_log_event(&self, log_event: StructuredLogEvent) {
34        match self.ingest_policy {
35            LoggerIngestPolicy::Immediately => {
36                let client = self.client.clone();
37
38                tokio::spawn(async move {
39                    let _ = client
40                        .ingest_structured(&[StructuredLogsIngestRequest {
41                            tags: HashMap::new(),
42                            events: &[log_event],
43                        }])
44                        .await;
45                });
46            }
47            LoggerIngestPolicy::Periodically(_) => {
48                if let Ok(mut pending_events) = self.pending_events.lock() {
49                    pending_events.push(log_event);
50                }
51            }
52        }
53    }
54
55    fn start_background_ingest_job(&mut self, duration: Duration) {
56        let client = self.client.clone();
57        let pending_events = Arc::clone(&self.pending_events);
58
59        tokio::spawn(async move {
60            let mut interval = tokio::time::interval(duration);
61
62            interval.tick().await;
63
64            loop {
65                interval.tick().await;
66
67                let mut events: Vec<StructuredLogEvent> = Vec::new();
68
69                {
70                    if let Ok(pending_events) = pending_events.lock() {
71                        if pending_events.is_empty() {
72                            continue;
73                        }
74
75                        events = pending_events.iter().cloned().collect();
76                    }
77                }
78
79                let request = StructuredLogsIngestRequest {
80                    events: &events,
81                    tags: HashMap::new(),
82                };
83                if client.ingest_structured(&[request]).await.is_ok() {
84                    if let Ok(mut pending_events) = pending_events.lock() {
85                        pending_events.clear();
86                    }
87                }
88            }
89        });
90    }
91}
92
93pub struct LogScaleStructuredLogger {
94    log_ingester: StructuredLogIngester,
95    options: LoggerOptions,
96}
97
98impl LogScaleStructuredLogger {
99    pub fn init(
100        url: String,
101        ingest_token: String,
102        options: LoggerOptions,
103    ) -> Result<(), Box<dyn Error>> {
104        let mut logscale_logger = LogScaleStructuredLogger::create(&url, &ingest_token, options)?;
105
106        if let LoggerIngestPolicy::Periodically(duration) = logscale_logger.options.ingest_policy {
107            logscale_logger
108                .log_ingester
109                .start_background_ingest_job(duration);
110        }
111
112        Builder::new()
113            .with_default_writer(Box::from(logscale_logger))
114            .init();
115
116        Ok(())
117    }
118
119    fn create(
120        url: &str,
121        ingest_token: &str,
122        options: LoggerOptions,
123    ) -> Result<Self, Box<dyn Error>> {
124        let client = LogScaleClient::from_url(url, String::from(ingest_token))?;
125
126        let log_ingester = StructuredLogIngester::new(client, options.ingest_policy);
127
128        Ok(Self {
129            log_ingester,
130            options,
131        })
132    }
133}
134
135impl Writer for LogScaleStructuredLogger {
136    fn write_log(
137        &self,
138        value: &std::collections::BTreeMap<log::kv::Key, log::kv::Value>,
139    ) -> Result<(), std::io::Error> {
140        let attributes = serde_json::to_value(value)?;
141
142        let now_unix_timestamp = SystemTime::now()
143            .duration_since(UNIX_EPOCH)
144            .unwrap()
145            .as_millis();
146
147        let log_event = StructuredLogEvent::new(now_unix_timestamp, attributes);
148
149        self.log_ingester.ingest_log_event(log_event);
150
151        Ok(())
152    }
153}