slog_google/logger.rs
1#[cfg(feature = "shipper")]
2use crate::shipper;
3
4use crate::error::Error;
5
6use google_logging2::api::{LogEntry, MonitoredResource, WriteLogEntriesRequest};
7
8use slog::{self, Drain, Key, Level, Never, OwnedKVList, Record, KV};
9use std::collections::HashMap;
10use std::fmt;
11use std::fmt::Write;
12
13use serde_json::json;
14
15use std::sync::mpsc::sync_channel;
16
17use chrono::Utc;
18
19/// Builder for the [`Logger`]
20#[derive(Default, Debug)]
21pub struct Builder {
22 log_name: String,
23 log_level_label: Option<String>,
24 resource_type: String,
25 default_labels: HashMap<String, String>,
26 resource_labels: Option<HashMap<String, String>>,
27}
28
29/// Main struct for the Google Logger drain
30pub struct Logger {
31 log_name: String,
32 log_level_label: Option<String>,
33 default_labels: HashMap<String, String>,
34 resource: MonitoredResource,
35 sync_tx: std::sync::mpsc::SyncSender<WriteLogEntriesRequest>,
36}
37
38impl Builder {
39 /// Creates a Builder object.
40 ///
41 /// # Parameters
42 /// - `log_name`: The `logName` string to be used in the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry)
43 /// - `resource_type`: The required `type` field set in the `resource` [MonitoredResource](https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource) object of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). For example: `k8s_container`.
44 ///
45 /// # Example
46 ///
47 /// ```
48 /// use slog_google::logger::Builder;
49 /// let (drain, _) = Builder::new(
50 /// "projects/my-gcp-project/logs/my-log-id",
51 /// "k8s_container",
52 /// )
53 /// .build();
54 /// ```
55 ///
56 #[must_use = "The builder must be used"]
57 pub fn new(log_name: &str, resource_type: &str) -> Self {
58 Self {
59 log_name: log_name.to_string(),
60 resource_type: resource_type.to_string(),
61 ..Default::default()
62 }
63 }
64
65 /// Sets resource labels to be applied.
66 ///
67 /// These labels will populate the `labels` field in the `resource` [MonitoredResource](https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource) object of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry).
68 ///
69 /// # Example
70 ///
71 /// ```
72 /// use serde_json::json;
73 /// let resource_labels = json!(
74 /// {
75 /// "pod_name": "dummy-value",
76 /// "location": "europe-west1-b",
77 /// "pod_name": std::env::var("HOSTNAME").unwrap_or_default(),
78 /// "container_name": "my-app",
79 /// "project_id": "my-gcp-project",
80 /// "cluster_name": "my-gke-cluster",
81 /// "namespace_name": "my-gke-namespace"
82 /// });
83 ///
84 /// use slog_google::logger::Builder;
85 /// let (drain, _) = Builder::new(
86 /// "projects/my-gcp-project/logs/my-log-id",
87 /// "k8s_container",
88 /// )
89 /// .with_resource_labels(resource_labels)
90 /// .unwrap()
91 /// .build();
92 /// ```
93 ///
94 /// # Errors
95 ///
96 /// Will return `Err` if `labels` does not parse as JSON.
97 #[must_use = "The builder must be used"]
98 pub fn with_resource_labels(
99 self,
100 labels: serde_json::Value,
101 ) -> Result<Self, Box<dyn std::error::Error>> {
102 Ok(Self {
103 resource_labels: Some(
104 serde_json::from_value(labels).map_err(Error::ResourceLabelsError)?,
105 ),
106 ..self
107 })
108 }
109
110 /// Sets default labels to be applied in the labels field.
111 ///
112 /// These will populate the `labels` top level field of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). These labels are added in addition to any labels set in the logger statement.
113 ///
114 /// # Example
115 ///
116 /// ```
117 /// use serde_json::json;
118 /// let default_labels = json!(
119 /// {
120 /// "application": "my-application",
121 /// "team": "my-team",
122 /// "version": "my-app-version",
123 /// "role": "my-app-role",
124 /// "environment": "production",
125 /// "platform": "gcp",
126 /// });
127 /// ```
128 ///
129 /// # Errors
130 ///
131 /// Will return `Err` if `labels` does not parse as JSON.
132 #[must_use = "The builder must be used"]
133 pub fn with_default_labels(self, labels: serde_json::Value) -> Result<Self, Error> {
134 Ok(Self {
135 default_labels: serde_json::from_value(labels).map_err(Error::DefaultLabelsError)?,
136 ..self
137 })
138 }
139
140 /// Sets the label name to store the log level
141 ///
142 /// If set, the log level value is added under this label the `labels` top level field of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry)
143 ///
144 /// If not set, the log level is not propagated, but you will still have the [severity](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity), which is always there.
145 #[must_use = "The builder must be used"]
146 pub fn with_log_level_label(self, log_level_label: &str) -> Self {
147 Self {
148 log_level_label: Some(log_level_label.into()),
149 ..self
150 }
151 }
152
153 /// This returns a tuple with a [`Logger`](struct@Logger), which can be passed to the slog root logger [as usual](https://docs.rs/slog/latest/slog/#where-to-start), and a [`std::sync::mpsc::Receiver`] channel.
154 /// The `Logger` sends the [`WriteLogEntries`](https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.v2#google.logging.v2.LoggingServiceV2.WriteLogEntries) it creates to this channel.
155 ///
156 /// For instance you could output these to the console, if you have an external agent that reads the process' output and ships it to Google Logging.
157 ///
158 #[must_use = "The logger and receiver must be used to handle logging correctly"]
159 #[allow(dead_code)]
160 pub fn build(self) -> (Logger, std::sync::mpsc::Receiver<WriteLogEntriesRequest>) {
161 let (sync_tx, sync_rx) = sync_channel::<WriteLogEntriesRequest>(100);
162 (
163 Logger {
164 log_name: self.log_name,
165 log_level_label: self.log_level_label,
166 default_labels: self.default_labels,
167 resource: MonitoredResource {
168 type_: Some(self.resource_type),
169 labels: self.resource_labels,
170 },
171 sync_tx,
172 },
173 sync_rx,
174 )
175 }
176
177 /// In an async context this 'shipper' sends the log entries directly to the [Google Logging API](https://cloud.google.com/logging/docs/reference/v2/rest).
178 ///
179 /// # Example
180 ///
181 /// ```
182 /// use tokio::runtime::Runtime;
183 /// use serde_json::json;
184 ///
185 /// let mut rt = Runtime::new().unwrap();
186 /// rt.spawn(async {
187 /// let resource_labels = json!(
188 /// {
189 /// "pod_name": "dummy-value",
190 /// "location": "europe-west1-b",
191 /// "pod_name": std::env::var("HOSTNAME").unwrap_or_default(),
192 /// "container_name": "my-app",
193 /// "project_id": "my-gcp-project",
194 /// "cluster_name": "my-gke-cluster",
195 /// "namespace_name": "my-gke-namespace"
196 /// });
197 ///
198 /// use slog_google::logger::Builder;
199 /// let (drain, mut shipper) = Builder::new(
200 /// "projects/my-gcp-project/logs/my-log-id",
201 /// "k8s_container",
202 /// )
203 /// .with_resource_labels(resource_labels)
204 /// .unwrap()
205 /// .build_with_async_shipper();
206 ///
207 /// // Forward messages from the sync channel to the async channel where the
208 /// // shipper sends it to Google Cloud Logging
209 /// let bridge = shipper.yield_bridge();
210 /// tokio::task::spawn_blocking(move || {
211 /// bridge.run_sync_to_async_bridge();
212 /// });
213 ///
214 /// tokio::spawn(async move {
215 /// shipper.run_log_shipper().await;
216 /// });
217 /// });
218 ///
219 /// ```
220 #[cfg(feature = "shipper")]
221 #[must_use = "The logger and shipper must be used to handle logging correctly"]
222 pub fn build_with_async_shipper(self) -> (Logger, shipper::Shipper) {
223 let (sync_tx, sync_rx) = sync_channel::<WriteLogEntriesRequest>(100);
224 (
225 Logger {
226 log_name: self.log_name,
227 log_level_label: self.log_level_label,
228 default_labels: self.default_labels,
229 resource: MonitoredResource {
230 type_: Some(self.resource_type),
231 labels: self.resource_labels,
232 },
233 sync_tx,
234 },
235 shipper::Shipper::new(sync_rx),
236 )
237 }
238}
239
240impl Logger {
241 // Determine a sensible severity based on the log level
242 fn get_severity(log_level: Level) -> String {
243 // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
244 match log_level {
245 Level::Critical => "CRITICAL".into(),
246 Level::Error => "ERROR".into(),
247 Level::Warning => "WARNING".into(),
248 Level::Info => "INFO".into(),
249 Level::Debug | Level::Trace => "DEBUG".into(),
250 }
251 }
252
253 fn construct_log_entry(
254 &self,
255 message: &str,
256 log_level: Level,
257 serializer: Serializer,
258 ) -> LogEntry {
259 let mut labels = self.default_labels.clone();
260
261 if !serializer.map.is_empty() {
262 labels.extend(serializer.map);
263 }
264
265 // We add the log level to the labels if requested
266 if let Some(label) = &self.log_level_label {
267 labels.insert(label.clone(), log_level.as_str().to_string());
268 }
269
270 let resource = Some(self.resource.clone());
271
272 // TODO: support both text_payload and json_payload
273 let json_payload = HashMap::from([("message".to_string(), json!(message))]);
274
275 // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
276 LogEntry {
277 json_payload: Some(json_payload),
278 labels: Some(labels),
279 severity: Some(Self::get_severity(log_level)),
280 timestamp: Some(Utc::now()),
281 resource,
282 ..Default::default()
283 }
284 }
285}
286
287#[derive(Debug)]
288struct Serializer {
289 map: HashMap<String, String>,
290}
291
292impl Serializer {
293 fn new() -> Self {
294 Self {
295 map: HashMap::new(),
296 }
297 }
298}
299
300impl slog::Serializer for Serializer {
301 fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
302 let mut value = String::new();
303 write!(value, "{val}")?;
304 self.map.insert(key.into(), value);
305 Ok(())
306 }
307}
308
309impl Drain for Logger {
310 type Ok = ();
311 type Err = Never; // TODO: Handle errors
312
313 fn log(&self, record: &Record<'_>, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
314 let mut serializer = Serializer::new();
315
316 let kv = record.kv();
317 let _ = kv.serialize(record, &mut serializer);
318
319 let _ = values.serialize(record, &mut serializer);
320
321 let log_entry = self.construct_log_entry(
322 format!("{}", record.msg()).as_str(),
323 record.level(),
324 serializer,
325 );
326
327 let body = WriteLogEntriesRequest {
328 log_name: Some(self.log_name.clone()),
329 entries: Some(vec![log_entry]),
330 ..Default::default()
331 };
332
333 let _ = self.sync_tx.send(body);
334
335 Ok(())
336 }
337}