gcp_rust_tools/
lib.rs

1//! # GCP Observability for Rust
2//!
3//! A lightweight, high-performance Google Cloud Platform observability library for Rust applications.
4//! This crate provides easy-to-use APIs for Cloud Logging, Cloud Monitoring, and Cloud Trace
5//! using the gcloud CLI for authentication and the Google Cloud REST APIs for data submission.
6//!
7//! ## Features
8//!
9//! - **Cloud Logging**: Send structured logs to Google Cloud Logging
10//! - **Cloud Monitoring**: Create custom metrics in Google Cloud Monitoring
11//! - **Cloud Trace**: Create distributed traces in Google Cloud Trace
12//! - **Background Processing**: Fire-and-forget API with background thread processing
13//! - **Automatic Token Refresh**: Handles gcloud token expiration and re-authentication
14//! - **Error Resilience**: Automatic retry logic for authentication failures
15//! - **Builder Pattern**: Fluent API for constructing observability data
16//!
17//! ## Architecture
18//!
19//! The library uses a channel-based architecture with a single background worker thread:
20//!
21//! - **Main Thread**: Your application code sends observability data to a channel
22//! - **Worker Thread**: A dedicated `std::thread` processes queued items using async operations
23//! - **No Rate Limiting**: The single-threaded model naturally prevents overwhelming the APIs
24//! - **Silent Failures**: Background operations fail silently to avoid disrupting your application
25//!
26//! ## Quick Start
27//!
28//! ```rust,no_run
29//! use gcp_rust_tools::{ObservabilityClient, LogEntry, MetricData, TraceSpan};
30//! use std::collections::HashMap;
31//! use std::time::{SystemTime, Duration};
32//!
33//! #[tokio::main]
34//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
35//!     // Initialize the client (performs authentication)
36//!     // Credentials are resolved internally from GOOGLE_APPLICATION_CREDENTIALS.
37//!     // Project id is resolved from (in order): provided value, GOOGLE_CLOUD_PROJECT,
38//!     // or `gcloud config get-value project`.
39//!     let client = ObservabilityClient::new(
40//!         Some("your-project-id".to_string()),
41//!         None,
42//!     ).await?;
43//!
44//!     // Fire-and-forget logging (returns immediately, processes in background)
45//!     client.send_log(LogEntry::new("INFO", "Application started"))?;
46//!     
47//!     // With service name
48//!     client.send_log(
49//!         LogEntry::new("ERROR", "Database connection failed")
50//!             .with_service_name("api-server")
51//!     )?;
52//!
53//!     // Send metrics with labels
54//!     let mut labels = HashMap::new();
55//!     labels.insert("environment".to_string(), "production".to_string());
56//!     
57//!     client.send_metric(
58//!         MetricData::new(
59//!             "custom.googleapis.com/requests_total",
60//!             42.0,
61//!             "INT64",
62//!             "GAUGE"
63//!         ).with_labels(labels)
64//!     )?;
65//!
66//!     // Create distributed traces
67//!     let trace_id = ObservabilityClient::generate_trace_id();
68//!     let span_id = ObservabilityClient::generate_span_id();
69//!     
70//!     client.send_trace(
71//!         TraceSpan::new(
72//!             trace_id,
73//!             span_id,
74//!             "HTTP Request",
75//!             SystemTime::now(),
76//!             Duration::from_millis(150)
77//!         )
78//!     )?;
79//!
80//!     Ok(())
81//! }
82//! ```
83//!
84//! ## Error Handling
85//!
86//! The library provides robust error handling:
87//!
88//! - **Authentication Errors**: Automatically detected and recovered via token refresh
89//! - **API Errors**: Detailed error messages with HTTP status codes
90//! - **Background Failures**: Silently handled to avoid disrupting your application
91//! - **Setup Errors**: Returned immediately during client initialization
92//!
93//! ## Token Management
94//!
95//! The library automatically handles gcloud token expiration:
96//!
97//! 1. Detects expired tokens (401/403 responses)
98//! 2. Re-authenticates using your service account
99//! 3. Retries the failed operation with a fresh token
100//! 4. All happens transparently without manual intervention
101//!
102//! ## Performance Considerations
103//!
104//! - **Non-blocking**: Fire-and-forget methods return immediately
105//! - **Single Worker**: One background thread prevents API rate limit issues
106//! - **Bounded Channel**: 1027-item buffer prevents memory overflow
107//! - **Minimal Overhead**: No rate limiting logic or complex synchronization
108
109pub mod helpers;
110pub mod pubsub;
111
112use async_trait::async_trait;
113use chrono::{DateTime, Utc};
114use crossbeam::channel::{bounded, Receiver, Sender};
115use serde_json::json;
116use std::collections::HashMap;
117use std::time::{Duration, SystemTime};
118use uuid::Uuid;
119
120/// Errors for observability operations
121#[derive(Debug)]
122pub enum ObservabilityError {
123    AuthenticationError(String),
124    ApiError(String),
125    SetupError(String),
126    /// Special error: used by SIGTERM to request shutdown of worker loop
127    Shutdown,
128}
129
130impl std::fmt::Display for ObservabilityError {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            ObservabilityError::AuthenticationError(msg) => {
134                write!(f, "Authentication error: {}", msg)
135            }
136            ObservabilityError::ApiError(msg) => write!(f, "API error: {}", msg),
137            ObservabilityError::SetupError(msg) => write!(f, "Setup error: {}", msg),
138            ObservabilityError::Shutdown => write!(f, "Shutdown requested"),
139        }
140    }
141}
142impl std::error::Error for ObservabilityError {}
143
144/// Each message type implements `Handle` to execute itself using the client.
145#[async_trait]
146pub trait Handle: Send {
147    async fn handle(
148        self: Box<Self>,
149        client: &ObservabilityClient,
150    ) -> Result<(), ObservabilityError>;
151}
152
153/// Log entry data for Cloud Logging
154#[derive(Debug, Clone)]
155pub struct LogEntry {
156    pub severity: String,
157    pub message: String,
158    pub service_name: Option<String>,
159    pub log_name: Option<String>,
160    pub json_payload: Option<serde_json::Value>,
161    pub labels: Option<HashMap<String, String>>,
162    pub insert_id: Option<String>,
163}
164impl LogEntry {
165    pub fn new(severity: impl Into<String>, message: impl Into<String>) -> Self {
166        Self {
167            severity: severity.into(),
168            message: message.into(),
169            service_name: None,
170            log_name: None,
171            json_payload: None,
172            labels: None,
173            insert_id: None,
174        }
175    }
176
177    /// Create a structured log entry using Cloud Logging `jsonPayload`.
178    ///
179    /// When `json_payload` is set, the `message` field is not used for the payload.
180    pub fn new_json(severity: impl Into<String>, json_payload: serde_json::Value) -> Self {
181        Self {
182            severity: severity.into(),
183            message: String::new(),
184            service_name: None,
185            log_name: None,
186            json_payload: Some(json_payload),
187            labels: None,
188            insert_id: None,
189        }
190    }
191
192    pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
193        self.service_name = Some(service_name.into());
194        self
195    }
196    pub fn with_log_name(mut self, log_name: impl Into<String>) -> Self {
197        self.log_name = Some(log_name.into());
198        self
199    }
200
201    /// Set the Cloud Logging `jsonPayload`.
202    pub fn with_json_payload(mut self, json_payload: serde_json::Value) -> Self {
203        self.json_payload = Some(json_payload);
204        self
205    }
206
207    /// Replace all labels with the provided map.
208    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
209        self.labels = Some(labels);
210        self
211    }
212
213    /// Add a single label (merging with existing labels).
214    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
215        let labels = self.labels.get_or_insert_with(HashMap::new);
216        labels.insert(key.into(), value.into());
217        self
218    }
219
220    /// Set a custom insertId for deduplication.
221    pub fn with_insert_id(mut self, insert_id: impl Into<String>) -> Self {
222        self.insert_id = Some(insert_id.into());
223        self
224    }
225}
226#[async_trait]
227impl Handle for LogEntry {
228    async fn handle(
229        self: Box<Self>,
230        client: &ObservabilityClient,
231    ) -> Result<(), ObservabilityError> {
232        client.send_log_impl(*self).await
233    }
234}
235
236/// Metric data for Cloud Monitoring
237#[derive(Debug, Clone)]
238pub struct MetricData {
239    pub metric_type: String,
240    pub value: f64,
241    pub value_type: String,
242    pub metric_kind: String,
243    pub labels: Option<HashMap<String, String>>,
244}
245impl MetricData {
246    pub fn new(
247        metric_type: impl Into<String>,
248        value: f64,
249        value_type: impl Into<String>,
250        metric_kind: impl Into<String>,
251    ) -> Self {
252        Self {
253            metric_type: metric_type.into(),
254            value,
255            value_type: value_type.into(),
256            metric_kind: metric_kind.into(),
257            labels: None,
258        }
259    }
260    pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
261        self.labels = Some(labels);
262        self
263    }
264}
265#[async_trait]
266impl Handle for MetricData {
267    async fn handle(
268        self: Box<Self>,
269        client: &ObservabilityClient,
270    ) -> Result<(), ObservabilityError> {
271        client.send_metric_impl(*self).await
272    }
273}
274
275/// Trace span data for Cloud Trace
276#[derive(Debug, Clone)]
277pub struct TraceSpan {
278    pub trace_id: String,
279    pub span_id: String,
280    pub display_name: String,
281    pub start_time: SystemTime,
282    pub duration: Duration,
283    pub parent_span_id: Option<String>,
284    pub attributes: HashMap<String, String>,
285    pub status: Option<TraceStatus>,
286}
287
288#[derive(Debug, Clone)]
289pub struct TraceStatus {
290    pub code: i32, // 0=OK, 1=CANCELLED, 2=UNKNOWN, 3=INVALID_ARGUMENT... (using gRPC codes)
291    pub message: Option<String>,
292}
293
294impl TraceSpan {
295    pub fn new(
296        trace_id: impl Into<String>,
297        span_id: impl Into<String>,
298        display_name: impl Into<String>,
299        start_time: SystemTime,
300        duration: Duration,
301    ) -> Self {
302        Self {
303            trace_id: trace_id.into(),
304            span_id: span_id.into(),
305            display_name: display_name.into(),
306            start_time,
307            duration,
308            parent_span_id: None,
309            attributes: HashMap::new(),
310            status: None,
311        }
312    }
313    pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
314        self.parent_span_id = Some(parent_span_id.into());
315        self
316    }
317    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
318        self.attributes.insert(key.into(), value.into());
319        self
320    }
321    pub fn with_status_error(mut self, message: impl Into<String>) -> Self {
322        self.status = Some(TraceStatus {
323            code: 2, // UNKNOWN (generic error)
324            message: Some(message.into()),
325        });
326        self
327    }
328    pub fn child(
329        &self,
330        name: impl Into<String>,
331        start_time: SystemTime,
332        duration: Duration,
333    ) -> Self {
334        Self {
335            trace_id: self.trace_id.clone(),                  // Same trace ID
336            span_id: ObservabilityClient::generate_span_id(), // New span ID
337            parent_span_id: Some(self.span_id.clone()),       // Parent is the current span
338            display_name: name.into(),
339            start_time,
340            duration,
341            attributes: HashMap::new(),
342            status: None,
343        }
344    }
345}
346#[async_trait]
347impl Handle for TraceSpan {
348    async fn handle(
349        self: Box<Self>,
350        client: &ObservabilityClient,
351    ) -> Result<(), ObservabilityError> {
352        client.send_trace_span_impl(*self).await
353    }
354}
355
356/// SIGTERM command—used to stop the worker loop
357#[derive(Debug, Clone, Copy)]
358pub struct SIGTERM;
359#[async_trait]
360impl Handle for SIGTERM {
361    async fn handle(
362        self: Box<Self>,
363        _client: &ObservabilityClient,
364    ) -> Result<(), ObservabilityError> {
365        Err(ObservabilityError::Shutdown)
366    }
367}
368
369/// Main client
370#[derive(Clone)]
371pub struct ObservabilityClient {
372    project_id: String,
373    service_account_path: String,
374    service_name: Option<String>,
375    tx: Sender<Box<dyn Handle>>,
376}
377
378impl ObservabilityClient {
379    pub async fn new(
380        project_id: Option<String>,
381        service_name: Option<String>,
382    ) -> Result<Self, ObservabilityError> {
383        let (tx, rx): (Sender<Box<dyn Handle>>, Receiver<Box<dyn Handle>>) = bounded(1027);
384
385        let service_account_path = helpers::gcp_config::credentials_path_from_env()
386            .map_err(|e| ObservabilityError::SetupError(e))?;
387
388        let mut project_id = project_id.unwrap_or_default();
389
390        let mut client = Self {
391            project_id: project_id.clone(),
392            service_account_path,
393            service_name,
394            tx,
395        };
396
397        // Setup auth (left as-is from your original design)
398        client.ensure_gcloud_installed().await?;
399
400        if project_id.trim().is_empty() {
401            project_id = helpers::gcp_config::resolve_project_id(None)
402                .await
403                .map_err(|e| ObservabilityError::SetupError(e))?;
404            client.project_id = project_id;
405        }
406
407        client.setup_authentication().await?;
408        client.verify_authentication().await?;
409
410        // Worker thread that blocks on a Tokio runtime to run async handlers
411        let client_clone = client.clone();
412        let handle = tokio::runtime::Handle::current();
413        std::thread::spawn(move || {
414            while let Ok(msg) = rx.recv() {
415                let result = handle.block_on(async { msg.handle(&client_clone).await });
416                match result {
417                    Ok(()) => {}
418                    Err(ObservabilityError::Shutdown) => {
419                        break;
420                    }
421                    Err(_e) => {
422                        // Silently handle errors in background processing
423                    }
424                }
425            }
426        });
427
428        Ok(client)
429    }
430
431    /// Public convenience API — callers never box manually
432
433    pub fn send_log(
434        &self,
435        entry: LogEntry,
436    ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
437        self.tx.send(Box::new(entry))
438    }
439
440    pub fn send_metric(
441        &self,
442        data: MetricData,
443    ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
444        self.tx.send(Box::new(data))
445    }
446
447    pub fn send_trace(
448        &self,
449        span: TraceSpan,
450    ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
451        self.tx.send(Box::new(span))
452    }
453
454    pub fn shutdown(&self) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
455        self.tx.send(Box::new(SIGTERM))
456    }
457
458    /// ---------- Internal helpers below (mostly as you had them) ----------
459
460    async fn ensure_gcloud_installed(&self) -> Result<(), ObservabilityError> {
461        let output = tokio::process::Command::new("gcloud")
462            .arg("version")
463            .output()
464            .await;
465        match output {
466            Ok(output) if output.status.success() => Ok(()),
467            _ => self.install_gcloud().await,
468        }
469    }
470
471    async fn install_gcloud(&self) -> Result<(), ObservabilityError> {
472        let install_command = if cfg!(target_os = "macos") {
473            "curl https://sdk.cloud.google.com | bash"
474        } else {
475            "curl https://sdk.cloud.google.com | bash"
476        };
477        let output = tokio::process::Command::new("sh")
478            .arg("-c")
479            .arg(install_command)
480            .output()
481            .await
482            .map_err(|e| {
483                ObservabilityError::SetupError(format!("Failed to install gcloud: {}", e))
484            })?;
485        if !output.status.success() {
486            return Err(ObservabilityError::SetupError(
487                "Failed to install gcloud CLI. Please install manually from https://cloud.google.com/sdk/docs/install".to_string(),
488            ));
489        }
490        Ok(())
491    }
492
493    async fn setup_authentication(&self) -> Result<(), ObservabilityError> {
494        let output = tokio::process::Command::new("gcloud")
495            .args([
496                "auth",
497                "activate-service-account",
498                "--key-file",
499                &self.service_account_path,
500            ])
501            .output()
502            .await
503            .map_err(|e| {
504                ObservabilityError::AuthenticationError(format!("Failed to run gcloud auth: {}", e))
505            })?;
506        if !output.status.success() {
507            let error_msg = String::from_utf8_lossy(&output.stderr);
508            return Err(ObservabilityError::AuthenticationError(format!(
509                "Failed to authenticate with service account: {}",
510                error_msg
511            )));
512        }
513        let project_output = tokio::process::Command::new("gcloud")
514            .args(["config", "set", "project", &self.project_id])
515            .output()
516            .await
517            .map_err(|e| {
518                ObservabilityError::AuthenticationError(format!("Failed to set project: {}", e))
519            })?;
520        if !project_output.status.success() {
521            let error_msg = String::from_utf8_lossy(&project_output.stderr);
522            return Err(ObservabilityError::AuthenticationError(format!(
523                "Failed to set project: {}",
524                error_msg
525            )));
526        }
527        Ok(())
528    }
529
530    async fn verify_authentication(&self) -> Result<(), ObservabilityError> {
531        let output = tokio::process::Command::new("gcloud")
532            .args(["auth", "list", "--format=json"])
533            .output()
534            .await
535            .map_err(|e| {
536                ObservabilityError::AuthenticationError(format!("Failed to verify auth: {}", e))
537            })?;
538        if !output.status.success() {
539            return Err(ObservabilityError::AuthenticationError(
540                "Authentication verification failed".to_string(),
541            ));
542        }
543        Ok(())
544    }
545
546    pub async fn get_identity_token(&self) -> Result<String, ObservabilityError> {
547        match self.get_identity_token_internal().await {
548            Ok(token) => Ok(token),
549            Err(e) => {
550                if e.to_string().contains("not logged in")
551                    || e.to_string().contains("authentication")
552                    || e.to_string().contains("expired")
553                {
554                    self.refresh_authentication().await?;
555                    self.get_identity_token_internal().await
556                } else {
557                    Err(e)
558                }
559            }
560        }
561    }
562
563    async fn get_identity_token_internal(&self) -> Result<String, ObservabilityError> {
564        let output = tokio::process::Command::new("gcloud")
565            .args(["auth", "print-identity-token"])
566            .output()
567            .await
568            .map_err(|e| {
569                ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
570            })?;
571        if !output.status.success() {
572            let error_msg = String::from_utf8_lossy(&output.stderr);
573            return Err(ObservabilityError::AuthenticationError(format!(
574                "Failed to get identity token: {}",
575                error_msg
576            )));
577        }
578        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
579    }
580
581    async fn get_access_token_with_retry(&self) -> Result<String, ObservabilityError> {
582        match self.get_access_token().await {
583            Ok(token) => Ok(token),
584            Err(e) => {
585                if e.to_string().contains("not logged in")
586                    || e.to_string().contains("authentication")
587                    || e.to_string().contains("expired")
588                {
589                    self.refresh_authentication().await?;
590                    self.get_access_token().await
591                } else {
592                    Err(e)
593                }
594            }
595        }
596    }
597
598    async fn get_access_token(&self) -> Result<String, ObservabilityError> {
599        let output = tokio::process::Command::new("gcloud")
600            .args(["auth", "print-access-token"])
601            .output()
602            .await
603            .map_err(|e| {
604                ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
605            })?;
606        if !output.status.success() {
607            let error_msg = String::from_utf8_lossy(&output.stderr);
608            return Err(ObservabilityError::AuthenticationError(format!(
609                "Failed to get access token: {}",
610                error_msg
611            )));
612        }
613        Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
614    }
615
616    async fn refresh_authentication(&self) -> Result<(), ObservabilityError> {
617        let output = tokio::process::Command::new("gcloud")
618            .args([
619                "auth",
620                "activate-service-account",
621                "--key-file",
622                &self.service_account_path,
623            ])
624            .output()
625            .await
626            .map_err(|e| {
627                ObservabilityError::AuthenticationError(format!("Failed to refresh auth: {}", e))
628            })?;
629        if !output.status.success() {
630            let error_msg = String::from_utf8_lossy(&output.stderr);
631            return Err(ObservabilityError::AuthenticationError(format!(
632                "Failed to refresh authentication: {}",
633                error_msg
634            )));
635        }
636        Ok(())
637    }
638
639    async fn execute_api_request(
640        &self,
641        api_url: &str,
642        payload: &str,
643        operation_name: &str,
644    ) -> Result<(), ObservabilityError> {
645        let mut retries = 0;
646        const MAX_RETRIES: u32 = 2;
647
648        loop {
649            let access_token = self.get_access_token_with_retry().await?;
650            let output = tokio::process::Command::new("curl")
651                .args([
652                    "-X",
653                    "POST",
654                    api_url,
655                    "-H",
656                    "Content-Type: application/json",
657                    "-H",
658                    &format!("Authorization: Bearer {}", access_token),
659                    "-d",
660                    payload,
661                    "-s",
662                    "-w",
663                    "%{http_code}",
664                ])
665                .output()
666                .await
667                .map_err(|e| {
668                    ObservabilityError::ApiError(format!(
669                        "Failed to execute {} request: {}",
670                        operation_name, e
671                    ))
672                })?;
673
674            let response_body = String::from_utf8_lossy(&output.stdout);
675            let status_code = response_body
676                .chars()
677                .rev()
678                .take(3)
679                .collect::<String>()
680                .chars()
681                .rev()
682                .collect::<String>();
683
684            if output.status.success() && (status_code.starts_with("20") || status_code == "200") {
685                return Ok(());
686            }
687
688            let error_msg = String::from_utf8_lossy(&output.stderr);
689            if (status_code == "401" || status_code == "403") && retries < MAX_RETRIES {
690                retries += 1;
691                self.refresh_authentication().await?;
692                continue;
693            }
694
695            return Err(ObservabilityError::ApiError(format!(
696                "{} API call failed with status {}: {} - Response: {}",
697                operation_name, status_code, error_msg, response_body
698            )));
699        }
700    }
701
702    // ---------- The three concrete senders ----------
703
704    async fn send_log_impl(&self, log_entry: LogEntry) -> Result<(), ObservabilityError> {
705        let now = SystemTime::now();
706        let timestamp = DateTime::<Utc>::from(now).to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
707
708        // Use the entry's service name, fallback to client's default.
709        let resolved_service_name = log_entry.service_name.or(self.service_name.clone());
710
711        // Default log name: service name (so logName becomes projects/{project}/logs/{service}).
712        // If a custom log name is provided, it wins.
713        let log_name = log_entry
714            .log_name
715            .or_else(|| resolved_service_name.clone())
716            .unwrap_or_else(|| "default".to_string());
717
718        // Cloud Logging expects the log ID portion to be URL-encoded.
719        let log_name_encoded = urlencoding::encode(&log_name);
720
721        // Merge labels: caller-provided labels + service labels.
722        let mut labels = log_entry.labels.unwrap_or_default();
723        if let Some(service) = resolved_service_name {
724            // Keep the previous label for compatibility, plus a more conventional key.
725            labels.entry("service_name".to_string()).or_insert_with(|| service.clone());
726            labels.entry("service".to_string()).or_insert(service);
727        }
728
729        let insert_id = log_entry.insert_id.unwrap_or_else(|| Uuid::new_v4().to_string());
730
731        let mut entry = json!({
732            "logName": format!("projects/{}/logs/{}", self.project_id, log_name_encoded),
733            "resource": {
734                "type": "global",
735                "labels": { "project_id": self.project_id }
736            },
737            "timestamp": timestamp,
738            "severity": log_entry.severity,
739            "labels": labels,
740            "insertId": insert_id,
741        });
742
743        // Payload: prefer structured jsonPayload if provided.
744        if let Some(json_payload) = log_entry.json_payload {
745            entry["jsonPayload"] = json_payload;
746        } else {
747            entry["textPayload"] = json!(log_entry.message);
748        }
749
750        let log_entry_json = json!({ "entries": [entry] });
751        let api_url = "https://logging.googleapis.com/v2/entries:write";
752        self.execute_api_request(api_url, &log_entry_json.to_string(), "Logging")
753            .await?;
754        Ok(())
755    }
756
757    async fn send_metric_impl(&self, metric_data: MetricData) -> Result<(), ObservabilityError> {
758        let timestamp = SystemTime::now();
759        let timestamp_str = DateTime::<Utc>::from(timestamp)
760            .format("%Y-%m-%dT%H:%M:%S%.3fZ")
761            .to_string();
762
763        let time_series = json!({
764            "timeSeries": [{
765                "metric": {
766                    "type": metric_data.metric_type,
767                    "labels": metric_data.labels.unwrap_or_default()
768                },
769                "resource": { "type": "global", "labels": {} },
770                "points": [{
771                    "interval": { "endTime": timestamp_str },
772                    "value": {
773                        &format!("{}Value", metric_data.value_type.to_lowercase()): metric_data.value
774                    }
775                }]
776            }]
777        });
778        let api_url = &format!(
779            "https://monitoring.googleapis.com/v3/projects/{}/timeSeries",
780            self.project_id
781        );
782        self.execute_api_request(api_url, &time_series.to_string(), "Monitoring")
783            .await?;
784        Ok(())
785    }
786
787    async fn send_trace_span_impl(&self, trace_span: TraceSpan) -> Result<(), ObservabilityError> {
788        let start_timestamp = DateTime::<Utc>::from(trace_span.start_time);
789        let end_time = trace_span.start_time + trace_span.duration;
790        let end_timestamp = DateTime::<Utc>::from(end_time);
791
792        let mut attributes_json = json!({});
793        if !trace_span.attributes.is_empty() {
794            let mut attribute_map = serde_json::Map::new();
795            for (k, v) in trace_span.attributes {
796                attribute_map.insert(k, json!({ "string_value": { "value": v } }));
797            }
798            attributes_json = json!({ "attributeMap": attribute_map });
799        }
800
801        let mut span = json!({
802            "name": format!("projects/{}/traces/{}/spans/{}", self.project_id, trace_span.trace_id, trace_span.span_id),
803            "spanId": trace_span.span_id,
804            "displayName": { "value": trace_span.display_name },
805            "startTime": start_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
806            "endTime": end_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
807            "attributes": attributes_json
808        });
809
810        if let Some(parent_id) = &trace_span.parent_span_id {
811            span["parentSpanId"] = json!(parent_id);
812        }
813
814        if let Some(status) = &trace_span.status {
815            span["status"] = json!({
816                "code": status.code,
817                "message": status.message
818            });
819        }
820
821        let spans_payload = json!({ "spans": [span] });
822        let api_url = &format!(
823            "https://cloudtrace.googleapis.com/v2/projects/{}/traces:batchWrite",
824            self.project_id
825        );
826        self.execute_api_request(api_url, &spans_payload.to_string(), "Tracing")
827            .await?;
828        Ok(())
829    }
830
831    /// Convenience IDs
832    pub fn generate_trace_id() -> String {
833        format!("{:032x}", Uuid::new_v4().as_u128())
834    }
835    pub fn generate_span_id() -> String {
836        format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
837    }
838}