slog_google/
logger.rs

1#[cfg(feature = "shipper")]
2use crate::shipper;
3
4use crate::error::Error;
5
6use google_logging2::api::{LogEntry, MonitoredResource, WriteLogEntriesRequest};
7
8use slog::{self, Drain, Key, Level, Never, OwnedKVList, Record, KV};
9use std::collections::HashMap;
10use std::fmt;
11use std::fmt::Write;
12
13use serde_json::json;
14
15use std::sync::mpsc::sync_channel;
16
17use chrono::Utc;
18
19/// Builder for the [`Logger`]
20#[derive(Default, Debug)]
21pub struct Builder {
22    log_name: String,
23    log_level_label: Option<String>,
24    resource_type: String,
25    default_labels: HashMap<String, String>,
26    resource_labels: Option<HashMap<String, String>>,
27}
28
29/// Main struct for the Google Logger drain
30pub struct Logger {
31    log_name: String,
32    log_level_label: Option<String>,
33    default_labels: HashMap<String, String>,
34    resource: MonitoredResource,
35    sync_tx: std::sync::mpsc::SyncSender<WriteLogEntriesRequest>,
36}
37
38impl Builder {
39    /// Creates a Builder object.
40    ///
41    /// # Parameters
42    /// - `log_name`: The `logName` string to be used in the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry)
43    /// - `resource_type`: The required `type` field set in the `resource` [MonitoredResource](https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource) object of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). For example: `k8s_container`.
44    ///
45    /// # Example
46    ///
47    /// ```
48    /// use slog_google::logger::Builder;
49    /// let (drain, _) = Builder::new(
50    ///     "projects/my-gcp-project/logs/my-log-id",
51    ///     "k8s_container",
52    /// )
53    /// .build();
54    /// ```
55    ///
56    #[must_use = "The builder must be used"]
57    pub fn new(log_name: &str, resource_type: &str) -> Self {
58        Self {
59            log_name: log_name.to_string(),
60            resource_type: resource_type.to_string(),
61            ..Default::default()
62        }
63    }
64
65    /// Sets resource labels to be applied.
66    ///
67    /// These labels will populate the `labels` field in the `resource` [MonitoredResource](https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource) object of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry).
68    ///
69    /// # Example
70    ///
71    /// ```
72    /// use serde_json::json;
73    /// let resource_labels = json!(
74    /// {
75    ///     "pod_name": "dummy-value",
76    ///     "location": "europe-west1-b",
77    ///     "pod_name": std::env::var("HOSTNAME").unwrap_or_default(),
78    ///     "container_name": "my-app",
79    ///     "project_id": "my-gcp-project",
80    ///     "cluster_name": "my-gke-cluster",
81    ///     "namespace_name": "my-gke-namespace"
82    /// });
83    ///
84    /// use slog_google::logger::Builder;
85    /// let (drain, _) = Builder::new(
86    ///     "projects/my-gcp-project/logs/my-log-id",
87    ///     "k8s_container",
88    /// )
89    /// .with_resource_labels(resource_labels)
90    /// .unwrap()
91    /// .build();
92    /// ```
93    ///
94    /// # Errors
95    ///
96    /// Will return `Err` if `labels` does not parse as JSON.
97    #[must_use = "The builder must be used"]
98    pub fn with_resource_labels(
99        self,
100        labels: serde_json::Value,
101    ) -> Result<Self, Box<dyn std::error::Error>> {
102        Ok(Self {
103            resource_labels: Some(
104                serde_json::from_value(labels).map_err(Error::ResourceLabelsError)?,
105            ),
106            ..self
107        })
108    }
109
110    /// Sets default labels to be applied in the labels field.
111    ///
112    /// These will populate the `labels` top level field of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry). These labels are added in addition to any labels set in the logger statement.
113    ///
114    /// # Example
115    ///
116    /// ```
117    /// use serde_json::json;
118    /// let default_labels = json!(
119    /// {
120    ///     "application": "my-application",
121    ///     "team": "my-team",
122    ///     "version": "my-app-version",
123    ///     "role": "my-app-role",
124    ///     "environment": "production",
125    ///     "platform": "gcp",
126    /// });
127    /// ```
128    ///
129    /// # Errors
130    ///
131    /// Will return `Err` if `labels` does not parse as JSON.
132    #[must_use = "The builder must be used"]
133    pub fn with_default_labels(self, labels: serde_json::Value) -> Result<Self, Error> {
134        Ok(Self {
135            default_labels: serde_json::from_value(labels).map_err(Error::DefaultLabelsError)?,
136            ..self
137        })
138    }
139
140    /// Sets the label name to store the log level
141    ///
142    /// If set, the log level value is added under this label the `labels` top level field of the [LogEntry](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry)
143    ///
144    /// If not set, the log level is not propagated, but you will still have the [severity](https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity), which is always there.
145    #[must_use = "The builder must be used"]
146    pub fn with_log_level_label(self, log_level_label: &str) -> Self {
147        Self {
148            log_level_label: Some(log_level_label.into()),
149            ..self
150        }
151    }
152
153    /// This returns a tuple with a [`Logger`](struct@Logger), which can be passed to the slog root logger [as usual](https://docs.rs/slog/latest/slog/#where-to-start), and a [`std::sync::mpsc::Receiver`] channel.
154    /// The `Logger` sends the [`WriteLogEntries`](https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.v2#google.logging.v2.LoggingServiceV2.WriteLogEntries) it creates to this channel.
155    ///
156    /// For instance you could output these to the console, if you have an external agent that reads the process' output and ships it to Google Logging.
157    ///
158    #[must_use = "The logger and receiver must be used to handle logging correctly"]
159    #[allow(dead_code)]
160    pub fn build(self) -> (Logger, std::sync::mpsc::Receiver<WriteLogEntriesRequest>) {
161        let (sync_tx, sync_rx) = sync_channel::<WriteLogEntriesRequest>(100);
162        (
163            Logger {
164                log_name: self.log_name,
165                log_level_label: self.log_level_label,
166                default_labels: self.default_labels,
167                resource: MonitoredResource {
168                    type_: Some(self.resource_type),
169                    labels: self.resource_labels,
170                },
171                sync_tx,
172            },
173            sync_rx,
174        )
175    }
176
177    /// In an async context this 'shipper' sends the log entries directly to the [Google Logging API](https://cloud.google.com/logging/docs/reference/v2/rest).
178    ///
179    /// # Example
180    ///
181    /// ```
182    /// use tokio::runtime::Runtime;
183    /// use serde_json::json;
184    ///
185    /// let mut rt = Runtime::new().unwrap();
186    /// rt.spawn(async {
187    ///   let resource_labels = json!(
188    ///   {
189    ///       "pod_name": "dummy-value",
190    ///       "location": "europe-west1-b",
191    ///       "pod_name": std::env::var("HOSTNAME").unwrap_or_default(),
192    ///       "container_name": "my-app",
193    ///       "project_id": "my-gcp-project",
194    ///       "cluster_name": "my-gke-cluster",
195    ///       "namespace_name": "my-gke-namespace"
196    ///   });
197    ///
198    ///   use slog_google::logger::Builder;
199    ///   let (drain, mut shipper) = Builder::new(
200    ///       "projects/my-gcp-project/logs/my-log-id",
201    ///       "k8s_container",
202    ///   )
203    ///   .with_resource_labels(resource_labels)
204    ///   .unwrap()
205    ///   .build_with_async_shipper();
206    ///
207    ///   // Forward messages from the sync channel to the async channel where the
208    ///   // shipper sends it to Google Cloud Logging
209    ///   let bridge = shipper.yield_bridge();
210    ///   tokio::task::spawn_blocking(move || {
211    ///       bridge.run_sync_to_async_bridge();
212    ///   });
213    ///
214    ///   tokio::spawn(async move {
215    ///       shipper.run_log_shipper().await;
216    ///   });
217    /// });
218    ///
219    /// ```
220    #[cfg(feature = "shipper")]
221    #[must_use = "The logger and shipper must be used to handle logging correctly"]
222    pub fn build_with_async_shipper(self) -> (Logger, shipper::Shipper) {
223        let (sync_tx, sync_rx) = sync_channel::<WriteLogEntriesRequest>(100);
224        (
225            Logger {
226                log_name: self.log_name,
227                log_level_label: self.log_level_label,
228                default_labels: self.default_labels,
229                resource: MonitoredResource {
230                    type_: Some(self.resource_type),
231                    labels: self.resource_labels,
232                },
233                sync_tx,
234            },
235            shipper::Shipper::new(sync_rx),
236        )
237    }
238}
239
240impl Logger {
241    // Determine a sensible severity based on the log level
242    fn get_severity(log_level: Level) -> String {
243        // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
244        match log_level {
245            Level::Critical => "CRITICAL".into(),
246            Level::Error => "ERROR".into(),
247            Level::Warning => "WARNING".into(),
248            Level::Info => "INFO".into(),
249            Level::Debug | Level::Trace => "DEBUG".into(),
250        }
251    }
252
253    fn construct_log_entry(
254        &self,
255        message: &str,
256        log_level: Level,
257        serializer: Serializer,
258    ) -> LogEntry {
259        let mut labels = self.default_labels.clone();
260
261        if !serializer.map.is_empty() {
262            labels.extend(serializer.map);
263        }
264
265        // We add the log level to the labels if requested
266        if let Some(label) = &self.log_level_label {
267            labels.insert(label.clone(), log_level.as_str().to_string());
268        }
269
270        let resource = Some(self.resource.clone());
271
272        // TODO: support both text_payload and json_payload
273        let json_payload = HashMap::from([("message".to_string(), json!(message))]);
274
275        // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
276        LogEntry {
277            json_payload: Some(json_payload),
278            labels: Some(labels),
279            severity: Some(Self::get_severity(log_level)),
280            timestamp: Some(Utc::now()),
281            resource,
282            ..Default::default()
283        }
284    }
285}
286
287#[derive(Debug)]
288struct Serializer {
289    map: HashMap<String, String>,
290}
291
292impl Serializer {
293    fn new() -> Self {
294        Self {
295            map: HashMap::new(),
296        }
297    }
298}
299
300impl slog::Serializer for Serializer {
301    fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
302        let mut value = String::new();
303        write!(value, "{val}")?;
304        self.map.insert(key.into(), value);
305        Ok(())
306    }
307}
308
309impl Drain for Logger {
310    type Ok = ();
311    type Err = Never; // TODO: Handle errors
312
313    fn log(&self, record: &Record<'_>, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
314        let mut serializer = Serializer::new();
315
316        let kv = record.kv();
317        let _ = kv.serialize(record, &mut serializer);
318
319        let _ = values.serialize(record, &mut serializer);
320
321        let log_entry = self.construct_log_entry(
322            format!("{}", record.msg()).as_str(),
323            record.level(),
324            serializer,
325        );
326
327        let body = WriteLogEntriesRequest {
328            log_name: Some(self.log_name.clone()),
329            entries: Some(vec![log_entry]),
330            ..Default::default()
331        };
332
333        let _ = self.sync_tx.send(body);
334
335        Ok(())
336    }
337}