Skip to main content

gcp_rust_tools/
lib.rs

1//! # GCP Observability for Rust
2//!
3//! A lightweight, high-performance Google Cloud Platform observability library for Rust applications.
4//! This crate provides easy-to-use APIs for Cloud Logging, Cloud Monitoring, and Cloud Trace
5//! using the gcloud CLI for authentication and the Google Cloud REST APIs for data submission.
6//!
7//! ## Features
8//!
9//! - **Cloud Logging**: Send structured logs to Google Cloud Logging
10//! - **Cloud Monitoring**: Create custom metrics in Google Cloud Monitoring
11//! - **Cloud Trace**: Create distributed traces in Google Cloud Trace
12//! - **Background Processing**: Fire-and-forget API with background thread processing
13//! - **Automatic Token Refresh**: Handles gcloud token expiration and re-authentication
14//! - **Error Resilience**: Automatic retry logic for authentication failures
15//! - **Builder Pattern**: Fluent API for constructing observability data
16//!
17//! ## Architecture
18//!
19//! The library uses a channel-based architecture with a single background worker thread:
20//!
21//! - **Main Thread**: Your application code sends observability data to a channel
22//! - **Worker Thread**: A dedicated `std::thread` processes queued items using async operations
23//! - **No Rate Limiting**: The single-threaded model naturally prevents overwhelming the APIs
24//! - **Silent Failures**: Background operations fail silently to avoid disrupting your application
25//!
26//! ## Quick Start
27//!
28//! ```rust,no_run
29//! use gcp_rust_tools::{ObservabilityClient, LogEntry, MetricData, TraceSpan};
30//! use std::collections::HashMap;
31//! use std::time::{SystemTime, Duration};
32//!
33//! #[tokio::main]
34//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
35//!     // Initialize the client (performs authentication)
36//!     // Credentials are resolved internally from GOOGLE_APPLICATION_CREDENTIALS.
37//!     // Project id is resolved from (in order): provided value, GOOGLE_CLOUD_PROJECT,
38//!     // or `gcloud config get-value project`.
39//!     let client = ObservabilityClient::new(
40//!         Some("your-project-id".to_string()),
41//!         None,
42//!     ).await?;
43//!
44//!     // Fire-and-forget logging (returns immediately, processes in background)
45//!     client.send_log(LogEntry::new("INFO", "Application started"))?;
46//!     
47//!     // With service name
48//!     client.send_log(
49//!         LogEntry::new("ERROR", "Database connection failed")
50//!             .with_service_name("api-server")
51//!     )?;
52//!
53//!     // Send metrics with labels
54//!     let mut labels = HashMap::new();
55//!     labels.insert("environment".to_string(), "production".to_string());
56//!     
57//!     client.send_metric(
58//!         MetricData::new(
59//!             "custom.googleapis.com/requests_total",
60//!             42.0,
61//!             "INT64",
62//!             "GAUGE"
63//!         ).with_labels(labels)
64//!     )?;
65//!
66//!     // Create distributed traces
67//!     let trace_id = ObservabilityClient::generate_trace_id();
68//!     let span_id = ObservabilityClient::generate_span_id();
69//!     
70//!     client.send_trace(
71//!         TraceSpan::new(
72//!             trace_id,
73//!             span_id,
74//!             "HTTP Request",
75//!             SystemTime::now(),
76//!             Duration::from_millis(150)
77//!         )
78//!     )?;
79//!
80//!     Ok(())
81//! }
82//! ```
83//!
84//! ## Error Handling
85//!
86//! The library provides robust error handling:
87//!
88//! - **Authentication Errors**: Automatically detected and recovered via token refresh
89//! - **API Errors**: Detailed error messages with HTTP status codes
90//! - **Background Failures**: Silently handled to avoid disrupting your application
91//! - **Setup Errors**: Returned immediately during client initialization
92//!
93//! ## Token Management
94//!
95//! The library automatically handles gcloud token expiration:
96//!
97//! 1. Detects expired tokens (401/403 responses)
98//! 2. Re-authenticates using your service account
99//! 3. Retries the failed operation with a fresh token
100//! 4. All happens transparently without manual intervention
101//!
102//! ## Performance Considerations
103//!
104//! - **Non-blocking**: Fire-and-forget methods return immediately
105//! - **Single Worker**: One background thread prevents API rate limit issues
106//! - **Bounded Channel**: 1027-item buffer prevents memory overflow
107//! - **Minimal Overhead**: No rate limiting logic or complex synchronization
108
109pub mod helpers;
110pub mod pubsub;
111
112use async_trait::async_trait;
113use chrono::{DateTime, Utc};
114use crossbeam::channel::{bounded, Receiver, Sender};
115use serde_json::json;
116use std::collections::HashMap;
117use std::time::{Duration, SystemTime};
118use uuid::Uuid;
119
120/// Errors for observability operations
121#[derive(Debug)]
122pub enum ObservabilityError {
123    AuthenticationError(String),
124    ApiError(String),
125    SetupError(String),
126    /// Special error: used by SIGTERM to request shutdown of worker loop
127    Shutdown,
128}
129
130impl std::fmt::Display for ObservabilityError {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            ObservabilityError::AuthenticationError(msg) => {
134                write!(f, "Authentication error: {}", msg)
135            }
136            ObservabilityError::ApiError(msg) => write!(f, "API error: {}", msg),
137            ObservabilityError::SetupError(msg) => write!(f, "Setup error: {}", msg),
138            ObservabilityError::Shutdown => write!(f, "Shutdown requested"),
139        }
140    }
141}
142impl std::error::Error for ObservabilityError {}
143
144/// Each message type implements `Handle` to execute itself using the client.
145#[async_trait]
146pub trait Handle: Send {
147    async fn handle(
148        self: Box<Self>,
149        client: &ObservabilityClient,
150    ) -> Result<(), ObservabilityError>;
151}
152
153/// Log entry data for Cloud Logging
154#[derive(Debug, Clone)]
155pub struct LogEntry {
156    pub severity: String,
157    pub message: String,
158    pub service_name: Option<String>,
159    pub log_name: Option<String>,
160    pub json_payload: Option<serde_json::Value>,
161    pub labels: Option<HashMap<String, String>>,
162    pub insert_id: Option<String>,
163}
164impl LogEntry {
165    pub fn new(severity: impl Into<String>, message: impl Into<String>) -> Self {
166        Self {
167            severity: severity.into(),
168            message: message.into(),
169            service_name: None,
170            log_name: None,
171            json_payload: None,
172            labels: None,
173            insert_id: None,
174        }
175    }
176
177    /// Create a structured log entry using Cloud Logging `jsonPayload`.
178    ///
179    /// When `json_payload` is set, the `message` field is not used for the payload.
180    pub fn new_json(severity: impl Into<String>, json_payload: serde_json::Value) -> Self {
181        Self {
182            severity: severity.into(),
183            message: String::new(),
184            service_name: None,
185            log_name: None,
186            json_payload: Some(json_payload),
187            labels: None,
188            insert_id: None,
189        }
190    }
191
192    pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
193        self.service_name = Some(service_name.into());
194        self
195    }
196    pub fn with_log_name(mut self, log_name: impl Into<String>) -> Self {
197        self.log_name = Some(log_name.into());
198        self
199    }
200
201    /// Set the Cloud Logging `jsonPayload`.
202    pub fn with_json_payload(mut self, json_payload: serde_json::Value) -> Self {
203        self.json_payload = Some(json_payload);
204        self
205    }
206
207    /// Replace all labels with the provided map.
208    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
209        self.labels = Some(labels);
210        self
211    }
212
213    /// Add a single label (merging with existing labels).
214    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
215        let labels = self.labels.get_or_insert_with(HashMap::new);
216        labels.insert(key.into(), value.into());
217        self
218    }
219
220    /// Set a custom insertId for deduplication.
221    pub fn with_insert_id(mut self, insert_id: impl Into<String>) -> Self {
222        self.insert_id = Some(insert_id.into());
223        self
224    }
225}
226#[async_trait]
227impl Handle for LogEntry {
228    async fn handle(
229        self: Box<Self>,
230        client: &ObservabilityClient,
231    ) -> Result<(), ObservabilityError> {
232        client.send_log_impl(*self).await
233    }
234}
235
236/// Metric data for Cloud Monitoring
237#[derive(Debug, Clone)]
238pub struct MetricData {
239    pub metric_type: String,
240    pub value: f64,
241    pub value_type: String,
242    pub metric_kind: String,
243    pub labels: Option<HashMap<String, String>>,
244}
245impl MetricData {
246    pub fn new(
247        metric_type: impl Into<String>,
248        value: f64,
249        value_type: impl Into<String>,
250        metric_kind: impl Into<String>,
251    ) -> Self {
252        Self {
253            metric_type: metric_type.into(),
254            value,
255            value_type: value_type.into(),
256            metric_kind: metric_kind.into(),
257            labels: None,
258        }
259    }
260    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
261        self.labels = Some(labels);
262        self
263    }
264}
265#[async_trait]
266impl Handle for MetricData {
267    async fn handle(
268        self: Box<Self>,
269        client: &ObservabilityClient,
270    ) -> Result<(), ObservabilityError> {
271        client.send_metric_impl(*self).await
272    }
273}
274
275/// Trace span data for Cloud Trace
276#[derive(Debug, Clone)]
277pub struct TraceSpan {
278    pub trace_id: String,
279    pub span_id: String,
280    pub display_name: String,
281    pub start_time: SystemTime,
282    pub duration: Duration,
283    pub parent_span_id: Option<String>,
284    pub attributes: HashMap<String, String>,
285    pub status: Option<TraceStatus>,
286}
287
288#[derive(Debug, Clone)]
289pub struct TraceStatus {
290    pub code: i32, // 0=OK, 1=CANCELLED, 2=UNKNOWN, 3=INVALID_ARGUMENT... (using gRPC codes)
291    pub message: Option<String>,
292}
293
294impl TraceSpan {
295    pub fn new(
296        trace_id: impl Into<String>,
297        span_id: impl Into<String>,
298        display_name: impl Into<String>,
299        start_time: SystemTime,
300        duration: Duration,
301    ) -> Self {
302        Self {
303            trace_id: trace_id.into(),
304            span_id: span_id.into(),
305            display_name: display_name.into(),
306            start_time,
307            duration,
308            parent_span_id: None,
309            attributes: HashMap::new(),
310            status: None,
311        }
312    }
313    pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
314        self.parent_span_id = Some(parent_span_id.into());
315        self
316    }
317    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
318        self.attributes.insert(key.into(), value.into());
319        self
320    }
321    pub fn with_status_error(mut self, message: impl Into<String>) -> Self {
322        self.status = Some(TraceStatus {
323            code: 2, // UNKNOWN (generic error)
324            message: Some(message.into()),
325        });
326        self
327    }
328    pub fn child(
329        &self,
330        name: impl Into<String>,
331        start_time: SystemTime,
332        duration: Duration,
333    ) -> Self {
334        Self {
335            trace_id: self.trace_id.clone(),                  // Same trace ID
336            span_id: ObservabilityClient::generate_span_id(), // New span ID
337            parent_span_id: Some(self.span_id.clone()),       // Parent is the current span
338            display_name: name.into(),
339            start_time,
340            duration,
341            attributes: HashMap::new(),
342            status: None,
343        }
344    }
345}
346#[async_trait]
347impl Handle for TraceSpan {
348    async fn handle(
349        self: Box<Self>,
350        client: &ObservabilityClient,
351    ) -> Result<(), ObservabilityError> {
352        client.send_trace_span_impl(*self).await
353    }
354}
355
356/// SIGTERM command—used to stop the worker loop
357#[derive(Debug, Clone, Copy)]
358pub struct SIGTERM;
359#[async_trait]
360impl Handle for SIGTERM {
361    async fn handle(
362        self: Box<Self>,
363        _client: &ObservabilityClient,
364    ) -> Result<(), ObservabilityError> {
365        Err(ObservabilityError::Shutdown)
366    }
367}
368
369/// Main client
370#[derive(Clone)]
371pub struct ObservabilityClient {
372    project_id: String,
373    service_account_path: String,
374    service_name: Option<String>,
375    tx: Sender<Box<dyn Handle>>,
376}
377
378impl ObservabilityClient {
379    pub async fn new(
380        project_id: Option<String>,
381        service_name: Option<String>,
382    ) -> Result<Self, ObservabilityError> {
383        let (tx, rx): (Sender<Box<dyn Handle>>, Receiver<Box<dyn Handle>>) = bounded(1027);
384
385        let service_account_path = helpers::gcp_config::credentials_path_from_env()
386            .map_err(|e| ObservabilityError::SetupError(e))?;
387
388        let mut project_id = project_id.unwrap_or_default();
389
390        let mut client = Self {
391            project_id: project_id.clone(),
392            service_account_path,
393            service_name,
394            tx,
395        };
396
397        // Setup auth (left as-is from your original design)
398        client.ensure_gcloud_installed().await?;
399
400        if project_id.trim().is_empty() {
401            project_id = helpers::gcp_config::resolve_project_id(None)
402                .await
403                .map_err(|e| ObservabilityError::SetupError(e))?;
404            client.project_id = project_id;
405        }
406
407        client.setup_authentication().await?;
408        client.verify_authentication().await?;
409
410        // Worker thread that blocks on a Tokio runtime to run async handlers
411        let client_clone = client.clone();
412        let handle = tokio::runtime::Handle::current();
413        std::thread::spawn(move || {
414            while let Ok(msg) = rx.recv() {
415                let result = handle.block_on(async { msg.handle(&client_clone).await });
416                match result {
417                    Ok(()) => {}
418                    Err(ObservabilityError::Shutdown) => {
419                        break;
420                    }
421                    Err(_e) => {
422                        // Silently handle errors in background processing
423                    }
424                }
425            }
426        });
427
428        Ok(client)
429    }
430
431    /// Public convenience API — callers never box manually
432
433    pub fn send_log(
434        &self,
435        entry: LogEntry,
436    ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
437        self.tx.send(Box::new(entry))
438    }
439
440    pub fn send_metric(
441        &self,
442        data: MetricData,
443    ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
444        self.tx.send(Box::new(data))
445    }
446
447    pub fn send_trace(
448        &self,
449        span: TraceSpan,
450    ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
451        self.tx.send(Box::new(span))
452    }
453
454    pub fn shutdown(&self) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
455        self.tx.send(Box::new(SIGTERM))
456    }
457
458    /// ---------- Internal helpers below (mostly as you had them) ----------
459
460    async fn ensure_gcloud_installed(&self) -> Result<(), ObservabilityError> {
461        let output = tokio::process::Command::new("gcloud")
462            .arg("version")
463            .output()
464            .await;
465        match output {
466            Ok(output) if output.status.success() => Ok(()),
467            _ => self.install_gcloud().await,
468        }
469    }
470
471    async fn install_gcloud(&self) -> Result<(), ObservabilityError> {
472        let install_command = if cfg!(target_os = "macos") {
473            "curl https://sdk.cloud.google.com | bash"
474        } else {
475            "curl https://sdk.cloud.google.com | bash"
476        };
477        let output = tokio::process::Command::new("sh")
478            .arg("-c")
479            .arg(install_command)
480            .output()
481            .await
482            .map_err(|e| {
483                ObservabilityError::SetupError(format!("Failed to install gcloud: {}", e))
484            })?;
485        if !output.status.success() {
486            return Err(ObservabilityError::SetupError(
487                "Failed to install gcloud CLI. Please install manually from https://cloud.google.com/sdk/docs/install".to_string(),
488            ));
489        }
490        Ok(())
491    }
492
493    async fn setup_authentication(&self) -> Result<(), ObservabilityError> {
494        let output = tokio::process::Command::new("gcloud")
495            .args([
496                "auth",
497                "activate-service-account",
498                "--key-file",
499                &self.service_account_path,
500            ])
501            .output()
502            .await
503            .map_err(|e| {
504                ObservabilityError::AuthenticationError(format!("Failed to run gcloud auth: {}", e))
505            })?;
506        if !output.status.success() {
507            let error_msg = String::from_utf8_lossy(&output.stderr);
508            return Err(ObservabilityError::AuthenticationError(format!(
509                "Failed to authenticate with service account: {}",
510                error_msg
511            )));
512        }
513        let project_output = tokio::process::Command::new("gcloud")
514            .args(["config", "set", "project", &self.project_id])
515            .output()
516            .await
517            .map_err(|e| {
518                ObservabilityError::AuthenticationError(format!("Failed to set project: {}", e))
519            })?;
520        if !project_output.status.success() {
521            let error_msg = String::from_utf8_lossy(&project_output.stderr);
522            return Err(ObservabilityError::AuthenticationError(format!(
523                "Failed to set project: {}",
524                error_msg
525            )));
526        }
527        Ok(())
528    }
529
530    async fn verify_authentication(&self) -> Result<(), ObservabilityError> {
531        let output = tokio::process::Command::new("gcloud")
532            .args(["auth", "list", "--format=json"])
533            .output()
534            .await
535            .map_err(|e| {
536                ObservabilityError::AuthenticationError(format!("Failed to verify auth: {}", e))
537            })?;
538        if !output.status.success() {
539            return Err(ObservabilityError::AuthenticationError(
540                "Authentication verification failed".to_string(),
541            ));
542        }
543        Ok(())
544    }
545
546    /// Generates a generic identity token for the active account.
547    ///
548    /// This is useful for local/dev workflows. For Cloud Run service-to-service calls,
549    /// prefer [`Self::get_identity_token_for_audience`] so the `aud` claim is scoped
550    /// to the receiving service URL (or configured custom audience).
551    pub async fn get_identity_token(&self) -> Result<String, ObservabilityError> {
552        self.get_identity_token_with_retry(None).await
553    }
554
555    /// Generates an identity token whose `aud` claim is bound to `audience`.
556    ///
557    /// Cloud Run private invocation requires this audience-bound token format.
558    pub async fn get_identity_token_for_audience(
559        &self,
560        audience: impl AsRef<str>,
561    ) -> Result<String, ObservabilityError> {
562        let audience = audience.as_ref().trim();
563        if audience.is_empty() {
564            return Err(ObservabilityError::SetupError(
565                "Audience must not be empty".to_string(),
566            ));
567        }
568
569        self.get_identity_token_with_retry(Some(audience.to_string()))
570            .await
571    }
572
573    async fn get_identity_token_with_retry(
574        &self,
575        audience: Option<String>,
576    ) -> Result<String, ObservabilityError> {
577        match self.get_identity_token_internal(audience.clone()).await {
578            Ok(token) => Ok(token),
579            Err(e) => {
580                if e.to_string().contains("not logged in")
581                    || e.to_string().contains("authentication")
582                    || e.to_string().contains("expired")
583                {
584                    self.refresh_authentication().await?;
585                    self.get_identity_token_internal(audience).await
586                } else {
587                    Err(e)
588                }
589            }
590        }
591    }
592
593    async fn get_identity_token_internal(
594        &self,
595        audience: Option<String>,
596    ) -> Result<String, ObservabilityError> {
597        let mut command = tokio::process::Command::new("gcloud");
598        command.args(["auth", "print-identity-token"]);
599
600        if let Some(audience) = audience {
601            command.arg(format!("--audiences={}", audience));
602        }
603
604        let output = command.output().await.map_err(|e| {
605            ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
606        })?;
607
608        if !output.status.success() {
609            let error_msg = String::from_utf8_lossy(&output.stderr);
610            return Err(ObservabilityError::AuthenticationError(format!(
611                "Failed to get identity token: {}",
612                error_msg
613            )));
614        }
615
616        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
617    }
618
619    async fn get_access_token_with_retry(&self) -> Result<String, ObservabilityError> {
620        match self.get_access_token().await {
621            Ok(token) => Ok(token),
622            Err(e) => {
623                if e.to_string().contains("not logged in")
624                    || e.to_string().contains("authentication")
625                    || e.to_string().contains("expired")
626                {
627                    self.refresh_authentication().await?;
628                    self.get_access_token().await
629                } else {
630                    Err(e)
631                }
632            }
633        }
634    }
635
636    async fn get_access_token(&self) -> Result<String, ObservabilityError> {
637        let output = tokio::process::Command::new("gcloud")
638            .args(["auth", "print-access-token"])
639            .output()
640            .await
641            .map_err(|e| {
642                ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
643            })?;
644        if !output.status.success() {
645            let error_msg = String::from_utf8_lossy(&output.stderr);
646            return Err(ObservabilityError::AuthenticationError(format!(
647                "Failed to get access token: {}",
648                error_msg
649            )));
650        }
651        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
652    }
653
654    async fn refresh_authentication(&self) -> Result<(), ObservabilityError> {
655        let output = tokio::process::Command::new("gcloud")
656            .args([
657                "auth",
658                "activate-service-account",
659                "--key-file",
660                &self.service_account_path,
661            ])
662            .output()
663            .await
664            .map_err(|e| {
665                ObservabilityError::AuthenticationError(format!("Failed to refresh auth: {}", e))
666            })?;
667        if !output.status.success() {
668            let error_msg = String::from_utf8_lossy(&output.stderr);
669            return Err(ObservabilityError::AuthenticationError(format!(
670                "Failed to refresh authentication: {}",
671                error_msg
672            )));
673        }
674        Ok(())
675    }
676
677    async fn execute_api_request(
678        &self,
679        api_url: &str,
680        payload: &str,
681        operation_name: &str,
682    ) -> Result<(), ObservabilityError> {
683        let mut retries = 0;
684        const MAX_RETRIES: u32 = 2;
685
686        loop {
687            let access_token = self.get_access_token_with_retry().await?;
688            let output = tokio::process::Command::new("curl")
689                .args([
690                    "-X",
691                    "POST",
692                    api_url,
693                    "-H",
694                    "Content-Type: application/json",
695                    "-H",
696                    &format!("Authorization: Bearer {}", access_token),
697                    "-d",
698                    payload,
699                    "-s",
700                    "-w",
701                    "%{http_code}",
702                ])
703                .output()
704                .await
705                .map_err(|e| {
706                    ObservabilityError::ApiError(format!(
707                        "Failed to execute {} request: {}",
708                        operation_name, e
709                    ))
710                })?;
711
712            let response_body = String::from_utf8_lossy(&output.stdout);
713            let status_code = response_body
714                .chars()
715                .rev()
716                .take(3)
717                .collect::<String>()
718                .chars()
719                .rev()
720                .collect::<String>();
721
722            if output.status.success() && (status_code.starts_with("20") || status_code == "200") {
723                return Ok(());
724            }
725
726            let error_msg = String::from_utf8_lossy(&output.stderr);
727            if (status_code == "401" || status_code == "403") && retries < MAX_RETRIES {
728                retries += 1;
729                self.refresh_authentication().await?;
730                continue;
731            }
732
733            return Err(ObservabilityError::ApiError(format!(
734                "{} API call failed with status {}: {} - Response: {}",
735                operation_name, status_code, error_msg, response_body
736            )));
737        }
738    }
739
740    // ---------- The three concrete senders ----------
741
742    async fn send_log_impl(&self, log_entry: LogEntry) -> Result<(), ObservabilityError> {
743        let now = SystemTime::now();
744        let timestamp =
745            DateTime::<Utc>::from(now).to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
746
747        // Use the entry's service name, fallback to client's default.
748        let resolved_service_name = log_entry.service_name.or(self.service_name.clone());
749
750        // Default log name: service name (so logName becomes projects/{project}/logs/{service}).
751        // If a custom log name is provided, it wins.
752        let log_name = log_entry
753            .log_name
754            .or_else(|| resolved_service_name.clone())
755            .unwrap_or_else(|| "default".to_string());
756
757        // Cloud Logging expects the log ID portion to be URL-encoded.
758        let log_name_encoded = urlencoding::encode(&log_name);
759
760        // Merge labels: caller-provided labels + service labels.
761        let mut labels = log_entry.labels.unwrap_or_default();
762        if let Some(service) = resolved_service_name {
763            // Keep the previous label for compatibility, plus a more conventional key.
764            labels
765                .entry("service_name".to_string())
766                .or_insert_with(|| service.clone());
767            labels.entry("service".to_string()).or_insert(service);
768        }
769
770        let insert_id = log_entry
771            .insert_id
772            .unwrap_or_else(|| Uuid::new_v4().to_string());
773
774        let mut entry = json!({
775            "logName": format!("projects/{}/logs/{}", self.project_id, log_name_encoded),
776            "resource": {
777                "type": "global",
778                "labels": { "project_id": self.project_id }
779            },
780            "timestamp": timestamp,
781            "severity": log_entry.severity,
782            "labels": labels,
783            "insertId": insert_id,
784        });
785
786        // Payload: prefer structured jsonPayload if provided.
787        if let Some(json_payload) = log_entry.json_payload {
788            entry["jsonPayload"] = json_payload;
789        } else {
790            entry["textPayload"] = json!(log_entry.message);
791        }
792
793        let log_entry_json = json!({ "entries": [entry] });
794        let api_url = "https://logging.googleapis.com/v2/entries:write";
795        self.execute_api_request(api_url, &log_entry_json.to_string(), "Logging")
796            .await?;
797        Ok(())
798    }
799
800    async fn send_metric_impl(&self, metric_data: MetricData) -> Result<(), ObservabilityError> {
801        let timestamp = SystemTime::now();
802        let timestamp_str = DateTime::<Utc>::from(timestamp)
803            .format("%Y-%m-%dT%H:%M:%S%.3fZ")
804            .to_string();
805
806        let time_series = json!({
807            "timeSeries": [{
808                "metric": {
809                    "type": metric_data.metric_type,
810                    "labels": metric_data.labels.unwrap_or_default()
811                },
812                "resource": { "type": "global", "labels": {} },
813                "points": [{
814                    "interval": { "endTime": timestamp_str },
815                    "value": {
816                        &format!("{}Value", metric_data.value_type.to_lowercase()): metric_data.value
817                    }
818                }]
819            }]
820        });
821        let api_url = &format!(
822            "https://monitoring.googleapis.com/v3/projects/{}/timeSeries",
823            self.project_id
824        );
825        self.execute_api_request(api_url, &time_series.to_string(), "Monitoring")
826            .await?;
827        Ok(())
828    }
829
830    async fn send_trace_span_impl(&self, trace_span: TraceSpan) -> Result<(), ObservabilityError> {
831        let start_timestamp = DateTime::<Utc>::from(trace_span.start_time);
832        let end_time = trace_span.start_time + trace_span.duration;
833        let end_timestamp = DateTime::<Utc>::from(end_time);
834
835        let mut attributes_json = json!({});
836        if !trace_span.attributes.is_empty() {
837            let mut attribute_map = serde_json::Map::new();
838            for (k, v) in trace_span.attributes {
839                attribute_map.insert(k, json!({ "string_value": { "value": v } }));
840            }
841            attributes_json = json!({ "attributeMap": attribute_map });
842        }
843
844        let mut span = json!({
845            "name": format!("projects/{}/traces/{}/spans/{}", self.project_id, trace_span.trace_id, trace_span.span_id),
846            "spanId": trace_span.span_id,
847            "displayName": { "value": trace_span.display_name },
848            "startTime": start_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
849            "endTime": end_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
850            "attributes": attributes_json
851        });
852
853        if let Some(parent_id) = &trace_span.parent_span_id {
854            span["parentSpanId"] = json!(parent_id);
855        }
856
857        if let Some(status) = &trace_span.status {
858            span["status"] = json!({
859                "code": status.code,
860                "message": status.message
861            });
862        }
863
864        let spans_payload = json!({ "spans": [span] });
865        let api_url = &format!(
866            "https://cloudtrace.googleapis.com/v2/projects/{}/traces:batchWrite",
867            self.project_id
868        );
869        self.execute_api_request(api_url, &spans_payload.to_string(), "Tracing")
870            .await?;
871        Ok(())
872    }
873
874    /// Convenience IDs
875    pub fn generate_trace_id() -> String {
876        format!("{:032x}", Uuid::new_v4().as_u128())
877    }
878    pub fn generate_span_id() -> String {
879        format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
880    }
881}