logscale_log/
logscale_unstructured_logger.rs

1use std::{
2    error::Error,
3    sync::{Arc, Mutex},
4    time::Duration,
5};
6
7use crate::options::{LoggerIngestPolicy, LoggerOptions};
8use log::Log;
9use logscale_client::{
10    client::LogScaleClient,
11    models::ingest::{UnstructuredLogEvent, UnstructuredLogsIngestRequest},
12};
13
14pub struct LogScaleUnstructuredLogger {
15    log_ingester: UnstructuredLogIngester,
16    options: LoggerOptions,
17}
18
19type PendingEvents = Arc<Mutex<Vec<UnstructuredLogEvent>>>;
20
21struct UnstructuredLogIngester {
22    client: LogScaleClient,
23    pending_events: PendingEvents,
24    ingest_policy: LoggerIngestPolicy,
25}
26
27impl UnstructuredLogIngester {
28    pub fn new(client: LogScaleClient, ingest_policy: LoggerIngestPolicy) -> Self {
29        Self {
30            client,
31            pending_events: Arc::from(Mutex::new(Vec::new())),
32            ingest_policy,
33        }
34    }
35
36    pub fn ingest_log_event(&self, log_event: UnstructuredLogEvent) {
37        match self.ingest_policy {
38            LoggerIngestPolicy::Immediately => {
39                let client = self.client.clone();
40                tokio::spawn(async move {
41                    let request_content = [log_event];
42                    let request = UnstructuredLogsIngestRequest::from_log_events(&request_content);
43
44                    let _ = client.ingest_unstructured(&[request]).await;
45                });
46            }
47            LoggerIngestPolicy::Periodically(_) => {
48                if let Ok(mut pending_events) = self.pending_events.lock() {
49                    pending_events.push(log_event);
50                }
51            }
52        }
53    }
54
55    fn start_background_ingest_job(&mut self, duration: Duration) {
56        let client = self.client.clone();
57        let pending_events = Arc::clone(&self.pending_events);
58
59        tokio::spawn(async move {
60            let mut interval = tokio::time::interval(duration);
61
62            interval.tick().await;
63
64            loop {
65                interval.tick().await;
66
67                let mut events: Vec<UnstructuredLogEvent> = Vec::new();
68
69                {
70                    if let Ok(pending_events) = pending_events.lock() {
71                        if pending_events.is_empty() {
72                            continue;
73                        }
74
75                        events = pending_events.iter().cloned().collect();
76                    }
77                }
78
79                let request = UnstructuredLogsIngestRequest::from_log_events(&events);
80                if client.ingest_unstructured(&[request]).await.is_ok() {
81                    if let Ok(mut pending_events) = pending_events.lock() {
82                        pending_events.clear();
83                    }
84                }
85            }
86        });
87    }
88}
89
90impl LogScaleUnstructuredLogger {
91    pub fn init(
92        url: String,
93        ingest_token: String,
94        options: LoggerOptions,
95    ) -> Result<(), Box<dyn Error>> {
96        let mut logscale_logger = LogScaleUnstructuredLogger::create(&url, &ingest_token, options)?;
97
98        if let LoggerIngestPolicy::Periodically(duration) = logscale_logger.options.ingest_policy {
99            logscale_logger
100                .log_ingester
101                .start_background_ingest_job(duration);
102        }
103
104        log::set_boxed_logger(Box::from(logscale_logger))?;
105
106        Ok(())
107    }
108
109    fn create(
110        url: &str,
111        ingest_token: &str,
112        options: LoggerOptions,
113    ) -> Result<Self, Box<dyn Error>> {
114        let client = LogScaleClient::from_url(url, String::from(ingest_token))?;
115
116        let log_ingester = UnstructuredLogIngester::new(client, options.ingest_policy);
117
118        Ok(Self {
119            log_ingester,
120            options,
121        })
122    }
123}
124
125impl Log for LogScaleUnstructuredLogger {
126    fn enabled(&self, _: &log::Metadata) -> bool {
127        true
128    }
129
130    fn log(&self, record: &log::Record) {
131        let log_event: UnstructuredLogEvent = record.args().to_string().into();
132
133        self.log_ingester.ingest_log_event(log_event);
134    }
135
136    fn flush(&self) {}
137}