logscale_log/
logscale_structured_logger.rs1use std::{
2 collections::HashMap,
3 error::Error,
4 sync::{Arc, Mutex},
5 time::{Duration, SystemTime, UNIX_EPOCH},
6};
7
8use logscale_client::{
9 client::LogScaleClient,
10 models::ingest::{StructuredLogEvent, StructuredLogsIngestRequest},
11};
12use structured_logger::{Builder, Writer};
13
14use crate::options::{LoggerIngestPolicy, LoggerOptions};
15
16type PendingEvents = Arc<Mutex<Vec<StructuredLogEvent>>>;
17
18struct StructuredLogIngester {
19 client: LogScaleClient,
20 pending_events: PendingEvents,
21 ingest_policy: LoggerIngestPolicy,
22}
23
24impl StructuredLogIngester {
25 pub fn new(client: LogScaleClient, ingest_policy: LoggerIngestPolicy) -> Self {
26 Self {
27 client,
28 pending_events: Arc::from(Mutex::new(Vec::new())),
29 ingest_policy,
30 }
31 }
32
33 pub fn ingest_log_event(&self, log_event: StructuredLogEvent) {
34 match self.ingest_policy {
35 LoggerIngestPolicy::Immediately => {
36 let client = self.client.clone();
37
38 tokio::spawn(async move {
39 let _ = client
40 .ingest_structured(&[StructuredLogsIngestRequest {
41 tags: HashMap::new(),
42 events: &[log_event],
43 }])
44 .await;
45 });
46 }
47 LoggerIngestPolicy::Periodically(_) => {
48 if let Ok(mut pending_events) = self.pending_events.lock() {
49 pending_events.push(log_event);
50 }
51 }
52 }
53 }
54
55 fn start_background_ingest_job(&mut self, duration: Duration) {
56 let client = self.client.clone();
57 let pending_events = Arc::clone(&self.pending_events);
58
59 tokio::spawn(async move {
60 let mut interval = tokio::time::interval(duration);
61
62 interval.tick().await;
63
64 loop {
65 interval.tick().await;
66
67 let mut events: Vec<StructuredLogEvent> = Vec::new();
68
69 {
70 if let Ok(pending_events) = pending_events.lock() {
71 if pending_events.is_empty() {
72 continue;
73 }
74
75 events = pending_events.iter().cloned().collect();
76 }
77 }
78
79 let request = StructuredLogsIngestRequest {
80 events: &events,
81 tags: HashMap::new(),
82 };
83 if client.ingest_structured(&[request]).await.is_ok() {
84 if let Ok(mut pending_events) = pending_events.lock() {
85 pending_events.clear();
86 }
87 }
88 }
89 });
90 }
91}
92
93pub struct LogScaleStructuredLogger {
94 log_ingester: StructuredLogIngester,
95 options: LoggerOptions,
96}
97
98impl LogScaleStructuredLogger {
99 pub fn init(
100 url: String,
101 ingest_token: String,
102 options: LoggerOptions,
103 ) -> Result<(), Box<dyn Error>> {
104 let mut logscale_logger = LogScaleStructuredLogger::create(&url, &ingest_token, options)?;
105
106 if let LoggerIngestPolicy::Periodically(duration) = logscale_logger.options.ingest_policy {
107 logscale_logger
108 .log_ingester
109 .start_background_ingest_job(duration);
110 }
111
112 Builder::new()
113 .with_default_writer(Box::from(logscale_logger))
114 .init();
115
116 Ok(())
117 }
118
119 fn create(
120 url: &str,
121 ingest_token: &str,
122 options: LoggerOptions,
123 ) -> Result<Self, Box<dyn Error>> {
124 let client = LogScaleClient::from_url(url, String::from(ingest_token))?;
125
126 let log_ingester = StructuredLogIngester::new(client, options.ingest_policy);
127
128 Ok(Self {
129 log_ingester,
130 options,
131 })
132 }
133}
134
135impl Writer for LogScaleStructuredLogger {
136 fn write_log(
137 &self,
138 value: &std::collections::BTreeMap<log::kv::Key, log::kv::Value>,
139 ) -> Result<(), std::io::Error> {
140 let attributes = serde_json::to_value(value)?;
141
142 let now_unix_timestamp = SystemTime::now()
143 .duration_since(UNIX_EPOCH)
144 .unwrap()
145 .as_millis();
146
147 let log_event = StructuredLogEvent::new(now_unix_timestamp, attributes);
148
149 self.log_ingester.ingest_log_event(log_event);
150
151 Ok(())
152 }
153}