logscale_log/
logscale_unstructured_logger.rs1use std::{
2 error::Error,
3 sync::{Arc, Mutex},
4 time::Duration,
5};
6
7use crate::options::{LoggerIngestPolicy, LoggerOptions};
8use log::Log;
9use logscale_client::{
10 client::LogScaleClient,
11 models::ingest::{UnstructuredLogEvent, UnstructuredLogsIngestRequest},
12};
13
14pub struct LogScaleUnstructuredLogger {
15 log_ingester: UnstructuredLogIngester,
16 options: LoggerOptions,
17}
18
19type PendingEvents = Arc<Mutex<Vec<UnstructuredLogEvent>>>;
20
21struct UnstructuredLogIngester {
22 client: LogScaleClient,
23 pending_events: PendingEvents,
24 ingest_policy: LoggerIngestPolicy,
25}
26
27impl UnstructuredLogIngester {
28 pub fn new(client: LogScaleClient, ingest_policy: LoggerIngestPolicy) -> Self {
29 Self {
30 client,
31 pending_events: Arc::from(Mutex::new(Vec::new())),
32 ingest_policy,
33 }
34 }
35
36 pub fn ingest_log_event(&self, log_event: UnstructuredLogEvent) {
37 match self.ingest_policy {
38 LoggerIngestPolicy::Immediately => {
39 let client = self.client.clone();
40 tokio::spawn(async move {
41 let request_content = [log_event];
42 let request = UnstructuredLogsIngestRequest::from_log_events(&request_content);
43
44 let _ = client.ingest_unstructured(&[request]).await;
45 });
46 }
47 LoggerIngestPolicy::Periodically(_) => {
48 if let Ok(mut pending_events) = self.pending_events.lock() {
49 pending_events.push(log_event);
50 }
51 }
52 }
53 }
54
55 fn start_background_ingest_job(&mut self, duration: Duration) {
56 let client = self.client.clone();
57 let pending_events = Arc::clone(&self.pending_events);
58
59 tokio::spawn(async move {
60 let mut interval = tokio::time::interval(duration);
61
62 interval.tick().await;
63
64 loop {
65 interval.tick().await;
66
67 let mut events: Vec<UnstructuredLogEvent> = Vec::new();
68
69 {
70 if let Ok(pending_events) = pending_events.lock() {
71 if pending_events.is_empty() {
72 continue;
73 }
74
75 events = pending_events.iter().cloned().collect();
76 }
77 }
78
79 let request = UnstructuredLogsIngestRequest::from_log_events(&events);
80 if client.ingest_unstructured(&[request]).await.is_ok() {
81 if let Ok(mut pending_events) = pending_events.lock() {
82 pending_events.clear();
83 }
84 }
85 }
86 });
87 }
88}
89
90impl LogScaleUnstructuredLogger {
91 pub fn init(
92 url: String,
93 ingest_token: String,
94 options: LoggerOptions,
95 ) -> Result<(), Box<dyn Error>> {
96 let mut logscale_logger = LogScaleUnstructuredLogger::create(&url, &ingest_token, options)?;
97
98 if let LoggerIngestPolicy::Periodically(duration) = logscale_logger.options.ingest_policy {
99 logscale_logger
100 .log_ingester
101 .start_background_ingest_job(duration);
102 }
103
104 log::set_boxed_logger(Box::from(logscale_logger))?;
105
106 Ok(())
107 }
108
109 fn create(
110 url: &str,
111 ingest_token: &str,
112 options: LoggerOptions,
113 ) -> Result<Self, Box<dyn Error>> {
114 let client = LogScaleClient::from_url(url, String::from(ingest_token))?;
115
116 let log_ingester = UnstructuredLogIngester::new(client, options.ingest_policy);
117
118 Ok(Self {
119 log_ingester,
120 options,
121 })
122 }
123}
124
125impl Log for LogScaleUnstructuredLogger {
126 fn enabled(&self, _: &log::Metadata) -> bool {
127 true
128 }
129
130 fn log(&self, record: &log::Record) {
131 let log_event: UnstructuredLogEvent = record.args().to_string().into();
132
133 self.log_ingester.ingest_log_event(log_event);
134 }
135
136 fn flush(&self) {}
137}