nano_gcp_logging/
lib.rs

1//! A custom logging layer for sending logs to Google Cloud Logging
2//! using the `tracing` and `tracing-subscriber` crates.
3//! This layer captures log events, enriches them with metadata
4//! about the running environment, and sends them to Google Cloud Logging.
5//! (C) 2025 Enzo Lombardi
6use chrono::Local;
7use gcp_auth::AuthenticationManager;
8
9use serde::Serialize;
10use std::time::Duration;
11use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
12use tracing::{Event, Subscriber};
13use tracing_subscriber::layer::Context;
14use tracing_subscriber::registry::LookupSpan;
15use tracing_subscriber::Layer;
16
17/// Metadata for a container, capturing its ID and name
18#[derive(Debug, Serialize)]
19pub struct ContainerMetadata {
20    /// Unique identifier of the container
21    pub id: String,
22    /// Name of the container
23    pub name: String,
24}
25
26/// Metadata for a Google Compute Engine instance
27#[derive(Debug, Serialize)]
28pub struct InstanceMetadata {
29    /// Name of the instance
30    pub name: String,
31    /// Unique identifier of the instance
32    pub id: String,
33    /// Zone where the instance is located
34    pub zone: String,
35    /// Google Cloud project ID
36    pub project_id: String,
37}
38
39/// Comprehensive logging context metadata
40#[derive(Debug, Serialize)]
41pub struct LogContextMetadata {
42    /// Optional container metadata
43    pub container: Option<ContainerMetadata>,
44    /// Instance metadata
45    pub instance: InstanceMetadata,
46}
47
48/// Structured log entry for Google Cloud Logging
49#[derive(Debug, Serialize)]
50struct GcpLogEntry {
51    /// Log message content
52    message: String,
53    /// Severity level of the log entry
54    severity: String,
55}
56
57/// Custom logging layer for sending logs to Google Cloud Logging
58pub struct GcpLoggingLayer {
59    /// Channel for sending log entries
60    channel: UnboundedSender<GcpLogEntry>,
61}
62
63impl GcpLoggingLayer {
64    /// Create a new GcpLoggingLayer with authentication and log metadata
65    ///
66    /// # Arguments
67    /// * `project_id` - The Google Cloud project ID
68    ///
69    /// # Returns
70    /// A Result containing the initialized GcpLoggingLayer or an error
71    pub async fn new(project_id: String) -> Result<Self, Box<dyn std::error::Error>> {
72        // Try to initialize authentication, but allow initialization to succeed
73        // even if authentication is not available (e.g. in tests or local dev).
74        // In such cases we proceed with an empty token and continue sending logs
75        // best-effort (requests will be unauthenticated).
76        let token = match AuthenticationManager::new().await {
77            Ok(auth) => match auth
78                .get_token(&["https://www.googleapis.com/auth/logging.write"])
79                .await
80            {
81                // Convert the acquired Token to a String representation so `token`
82                // has a consistent `String` type across all match arms.
83                Ok(tok) => tok.as_str().to_string(),
84                Err(e) => {
85                    eprintln!(
86                        "Warning: failed to acquire GCP token: {}. Proceeding without auth.",
87                        e
88                    );
89                    String::new()
90                }
91            },
92            Err(e) => {
93                eprintln!("Warning: failed to initialize AuthenticationManager: {}. Proceeding without auth.", e);
94                String::new()
95            }
96        };
97
98        // Attempt to collect metadata, but fall back to sensible defaults on error.
99        // We clone project_id to allow creating a fallback instance that still
100        // contains the provided project id in case metadata lookup fails.
101        let metadata = match collect_log_metadata(project_id.clone()).await {
102            Ok(m) => m,
103            Err(e) => {
104                eprintln!(
105                    "Warning: failed to collect metadata: {}. Using fallback values.",
106                    e
107                );
108                LogContextMetadata {
109                    container: None,
110                    instance: InstanceMetadata {
111                        name: "unknown".into(),
112                        id: "0".into(),
113                        zone: "".into(),
114                        project_id,
115                    },
116                }
117            }
118        };
119
120        let (channel, mut rx) = unbounded_channel::<GcpLogEntry>();
121        let client = reqwest::Client::new();
122
123        // Spawn the background task that drains the channel and sends logs.
124        // If we don't have a token (empty string), skip sending entries to avoid noisy errors.
125        tokio::spawn(async move {
126            // If token is empty we will not attempt HTTP requests; warn once.
127            let mut warned_no_auth = false;
128            let skip_sending = token.is_empty();
129
130            loop {
131                let log_entry = rx.recv().await;
132                if log_entry.is_none() {
133                    tokio::time::sleep(Duration::from_millis(1)).await;
134                    continue;
135                }
136                let log_entry = log_entry.unwrap();
137
138                if skip_sending {
139                    if !warned_no_auth {
140                        eprintln!("Warning: no GCP auth token available; log entries will not be sent. Set up authentication to enable sending.");
141                        warned_no_auth = true;
142                    }
143                    // Drop the entry without attempting to send it.
144                    continue;
145                }
146
147                let entry = serde_json::json!({
148                    "logName": format!("projects/{}/logs/proxie", metadata.instance.project_id),
149                    "resource": {
150                        "type": "gce_instance",
151                        "labels": {
152                            "instance_id": metadata.instance.id,
153                            "zone": metadata.instance.zone,
154                            "project_id": metadata.instance.project_id
155                        }
156                    },
157                    "severity": log_entry.severity,
158                    "jsonPayload": {
159                        "message": log_entry.message,
160                        "container": metadata.container,
161                        "instance": metadata.instance
162                    }
163                });
164                let body = serde_json::json!({ "entries": [entry] });
165
166                // Build the request and conditionally add auth if available.
167                let mut req = client
168                    .post("https://logging.googleapis.com/v2/entries:write")
169                    .json(&body);
170                if !token.is_empty() {
171                    req = req.bearer_auth(token.as_str());
172                }
173
174                let res = req.send().await;
175                if let Err(e) = res {
176                    eprintln!("Failed to send log entry: {}", e);
177                }
178            }
179        });
180
181        Ok(Self { channel })
182    }
183
184    /// Map tracing log level to Google Cloud Logging severity
185    ///
186    /// # Arguments
187    /// * `level` - The tracing log level
188    ///
189    /// # Returns
190    /// A static string representing the corresponding severity level
191    fn map_level_to_severity(level: &tracing::Level) -> &'static str {
192        match *level {
193            tracing::Level::ERROR => "ERROR",
194            tracing::Level::WARN => "WARNING",
195            tracing::Level::INFO => "INFO",
196            tracing::Level::DEBUG => "DEBUG",
197            tracing::Level::TRACE => "DEBUG",
198        }
199    }
200}
201
202impl<S> Layer<S> for GcpLoggingLayer
203where
204    S: Subscriber + for<'a> LookupSpan<'a>,
205{
206    /// Process log events and send them to Google Cloud Logging
207    ///
208    /// # Arguments
209    /// * `event` - The log event to process
210    /// * `_ctx` - The tracing context
211    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
212        let mut message = "**UNDEFINED**".to_string();
213
214        event.record(
215            &mut |field: &tracing::field::Field, value: &dyn std::fmt::Debug| {
216                if field.name() == "message" {
217                    message = format!("{:?}", value);
218                }
219            },
220        );
221        let now = Local::now().format("%Y-%m-%d %H:%M:%S,%3f").to_string();
222
223        let metadata = event.metadata();
224        let severity = Self::map_level_to_severity(metadata.level()).to_string();
225        let message = format!(
226            "[{}] {} [{} {}:{}] [{}]",
227            now,
228            severity,
229            metadata.target(),
230            metadata.file().unwrap_or("unknown_file"),
231            metadata.line().unwrap_or(0),
232            message
233        );
234
235        let log_entry = GcpLogEntry { severity, message };
236
237        let result = self.channel.send(log_entry);
238        if result.is_err() {
239            eprintln!("Error {:?}", result);
240        }
241    }
242}
243
244/// Collect comprehensive log metadata for the current instance
245///
246/// # Arguments
247/// * `project_id` - The Google Cloud project ID
248///
249/// # Returns
250/// A Result containing the LogContextMetadata or an error
251pub async fn collect_log_metadata(
252    project_id: String,
253) -> Result<LogContextMetadata, Box<dyn std::error::Error>> {
254    let client = reqwest::Client::new();
255
256    let container_id = get_container_id();
257    let container_name = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".into());
258
259    let container_metadata = container_id.clone().map(|id| ContainerMetadata {
260        id,
261        name: container_name,
262    });
263
264    let instance_name = get_metadata(&client, "instance/name")
265        .await
266        .unwrap_or_default();
267    let instance_id = get_metadata(&client, "instance/id")
268        .await
269        .unwrap_or_default();
270    let zone_path = get_metadata(&client, "instance/zone")
271        .await
272        .unwrap_or_default();
273    let zone = zone_path.split('/').last().unwrap_or("").to_string();
274
275    Ok(LogContextMetadata {
276        container: container_metadata,
277        instance: InstanceMetadata {
278            name: instance_name,
279            id: instance_id,
280            zone,
281            project_id,
282        },
283    })
284}
285
286/// Retrieve the current container ID from cgroup
287///
288/// # Returns
289/// An optional container ID string
290fn get_container_id() -> Option<String> {
291    std::fs::read_to_string("/proc/self/cgroup")
292        .ok()?
293        .lines()
294        .find_map(|line| {
295            if let Some(pos) = line.rfind('/') {
296                Some(line[pos + 1..].to_string())
297            } else {
298                None
299            }
300        })
301}
302
303/// Retrieve metadata from Google Cloud metadata service
304///
305/// # Arguments
306/// * `client` - A reqwest client
307/// * `path` - The metadata path to retrieve
308///
309/// # Returns
310/// An optional string containing the metadata value
311async fn get_metadata(client: &reqwest::Client, path: &str) -> Option<String> {
312    let url = format!(
313        "http://metadata.google.internal/computeMetadata/v1/{}",
314        path
315    );
316    client
317        .get(&url)
318        .header("Metadata-Flavor", "Google")
319        .send()
320        .await
321        .ok()?
322        .text()
323        .await
324        .ok()
325}