1use async_trait::async_trait;
135use chrono::{DateTime, Utc};
136use crossbeam::channel::{bounded, Receiver, Sender};
137use serde_json::json;
138use std::collections::HashMap;
139use std::time::{Duration, SystemTime, UNIX_EPOCH};
140use uuid::Uuid;
141
142#[derive(Debug)]
144pub enum ObservabilityError {
145 AuthenticationError(String),
146 ApiError(String),
147 SetupError(String),
148 Shutdown,
150}
151
152impl std::fmt::Display for ObservabilityError {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 match self {
155 ObservabilityError::AuthenticationError(msg) => {
156 write!(f, "Authentication error: {}", msg)
157 }
158 ObservabilityError::ApiError(msg) => write!(f, "API error: {}", msg),
159 ObservabilityError::SetupError(msg) => write!(f, "Setup error: {}", msg),
160 ObservabilityError::Shutdown => write!(f, "Shutdown requested"),
161 }
162 }
163}
164impl std::error::Error for ObservabilityError {}
165
166#[async_trait]
168pub trait Handle: Send {
169 async fn handle(
170 self: Box<Self>,
171 client: &ObservabilityClient,
172 ) -> Result<(), ObservabilityError>;
173}
174
175#[derive(Debug, Clone)]
177pub struct LogEntry {
178 pub severity: String,
179 pub message: String,
180 pub service_name: Option<String>,
181 pub log_name: Option<String>,
182}
183impl LogEntry {
184 pub fn new(severity: impl Into<String>, message: impl Into<String>) -> Self {
185 Self {
186 severity: severity.into(),
187 message: message.into(),
188 service_name: None,
189 log_name: None,
190 }
191 }
192 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
193 self.service_name = Some(service_name.into());
194 self
195 }
196 pub fn with_log_name(mut self, log_name: impl Into<String>) -> Self {
197 self.log_name = Some(log_name.into());
198 self
199 }
200}
201#[async_trait]
202impl Handle for LogEntry {
203 async fn handle(
204 self: Box<Self>,
205 client: &ObservabilityClient,
206 ) -> Result<(), ObservabilityError> {
207 client.send_log_impl(*self).await
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct MetricData {
214 pub metric_type: String,
215 pub value: f64,
216 pub value_type: String,
217 pub metric_kind: String,
218 pub labels: Option<HashMap<String, String>>,
219}
220impl MetricData {
221 pub fn new(
222 metric_type: impl Into<String>,
223 value: f64,
224 value_type: impl Into<String>,
225 metric_kind: impl Into<String>,
226 ) -> Self {
227 Self {
228 metric_type: metric_type.into(),
229 value,
230 value_type: value_type.into(),
231 metric_kind: metric_kind.into(),
232 labels: None,
233 }
234 }
235 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
236 self.labels = Some(labels);
237 self
238 }
239}
240#[async_trait]
241impl Handle for MetricData {
242 async fn handle(
243 self: Box<Self>,
244 client: &ObservabilityClient,
245 ) -> Result<(), ObservabilityError> {
246 client.send_metric_impl(*self).await
247 }
248}
249
250#[derive(Debug, Clone)]
252pub struct TraceSpan {
253 pub trace_id: String,
254 pub span_id: String,
255 pub display_name: String,
256 pub start_time: SystemTime,
257 pub duration: Duration,
258 pub parent_span_id: Option<String>,
259 pub attributes: HashMap<String, String>,
260 pub status: Option<TraceStatus>,
261}
262
263#[derive(Debug, Clone)]
264pub struct TraceStatus {
265 pub code: i32, pub message: Option<String>,
267}
268
269impl TraceSpan {
270 pub fn new(
271 trace_id: impl Into<String>,
272 span_id: impl Into<String>,
273 display_name: impl Into<String>,
274 start_time: SystemTime,
275 duration: Duration,
276 ) -> Self {
277 Self {
278 trace_id: trace_id.into(),
279 span_id: span_id.into(),
280 display_name: display_name.into(),
281 start_time,
282 duration,
283 parent_span_id: None,
284 attributes: HashMap::new(),
285 status: None,
286 }
287 }
288 pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
289 self.parent_span_id = Some(parent_span_id.into());
290 self
291 }
292 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
293 self.attributes.insert(key.into(), value.into());
294 self
295 }
296 pub fn with_status_error(mut self, message: impl Into<String>) -> Self {
297 self.status = Some(TraceStatus {
298 code: 2, message: Some(message.into()),
300 });
301 self
302 }
303 pub fn child(&self, name: impl Into<String>, start_time: SystemTime, duration: Duration) -> Self {
304 Self {
305 trace_id: self.trace_id.clone(), span_id: ObservabilityClient::generate_span_id(), parent_span_id: Some(self.span_id.clone()), display_name: name.into(),
309 start_time,
310 duration,
311 attributes: HashMap::new(),
312 status: None,
313 }
314 }
315}
316#[async_trait]
317impl Handle for TraceSpan {
318 async fn handle(
319 self: Box<Self>,
320 client: &ObservabilityClient,
321 ) -> Result<(), ObservabilityError> {
322 client.send_trace_span_impl(*self).await
323 }
324}
325
326#[derive(Debug, Clone, Copy)]
328pub struct SIGTERM;
329#[async_trait]
330impl Handle for SIGTERM {
331 async fn handle(
332 self: Box<Self>,
333 _client: &ObservabilityClient,
334 ) -> Result<(), ObservabilityError> {
335 Err(ObservabilityError::Shutdown)
336 }
337}
338
339#[derive(Clone)]
341pub struct ObservabilityClient {
342 project_id: String,
343 service_account_path: String,
344 service_name: Option<String>,
345 tx: Sender<Box<dyn Handle>>,
346}
347
348impl ObservabilityClient {
349 pub async fn new(
350 project_id: String,
351 service_account_path: String,
352 service_name: Option<String>,
353 ) -> Result<Self, ObservabilityError> {
354 let (tx, rx): (Sender<Box<dyn Handle>>, Receiver<Box<dyn Handle>>) = bounded(1027);
355
356 let client = Self {
357 project_id,
358 service_account_path,
359 service_name,
360 tx,
361 };
362
363 client.ensure_gcloud_installed().await?;
365 client.setup_authentication().await?;
366 client.verify_authentication().await?;
367
368 let client_clone = client.clone();
370 let handle = tokio::runtime::Handle::current();
371 std::thread::spawn(move || {
372 while let Ok(msg) = rx.recv() {
373 let result = handle.block_on(async { msg.handle(&client_clone).await });
374 match result {
375 Ok(()) => {}
376 Err(ObservabilityError::Shutdown) => {
377 break;
378 }
379 Err(_e) => {
380 }
382 }
383 }
384 });
385
386 Ok(client)
387 }
388
389 pub fn send_log(
392 &self,
393 entry: LogEntry,
394 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
395 self.tx.send(Box::new(entry))
396 }
397
398 pub fn send_metric(
399 &self,
400 data: MetricData,
401 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
402 self.tx.send(Box::new(data))
403 }
404
405 pub fn send_trace(
406 &self,
407 span: TraceSpan,
408 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
409 self.tx.send(Box::new(span))
410 }
411
412 pub fn shutdown(&self) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
413 self.tx.send(Box::new(SIGTERM))
414 }
415
416 async fn ensure_gcloud_installed(&self) -> Result<(), ObservabilityError> {
419 let output = tokio::process::Command::new("gcloud")
420 .arg("version")
421 .output()
422 .await;
423 match output {
424 Ok(output) if output.status.success() => Ok(()),
425 _ => self.install_gcloud().await,
426 }
427 }
428
429 async fn install_gcloud(&self) -> Result<(), ObservabilityError> {
430 let install_command = if cfg!(target_os = "macos") {
431 "curl https://sdk.cloud.google.com | bash"
432 } else {
433 "curl https://sdk.cloud.google.com | bash"
434 };
435 let output = tokio::process::Command::new("sh")
436 .arg("-c")
437 .arg(install_command)
438 .output()
439 .await
440 .map_err(|e| {
441 ObservabilityError::SetupError(format!("Failed to install gcloud: {}", e))
442 })?;
443 if !output.status.success() {
444 return Err(ObservabilityError::SetupError(
445 "Failed to install gcloud CLI. Please install manually from https://cloud.google.com/sdk/docs/install".to_string(),
446 ));
447 }
448 Ok(())
449 }
450
451 async fn setup_authentication(&self) -> Result<(), ObservabilityError> {
452 let output = tokio::process::Command::new("gcloud")
453 .args([
454 "auth",
455 "activate-service-account",
456 "--key-file",
457 &self.service_account_path,
458 ])
459 .output()
460 .await
461 .map_err(|e| {
462 ObservabilityError::AuthenticationError(format!("Failed to run gcloud auth: {}", e))
463 })?;
464 if !output.status.success() {
465 let error_msg = String::from_utf8_lossy(&output.stderr);
466 return Err(ObservabilityError::AuthenticationError(format!(
467 "Failed to authenticate with service account: {}",
468 error_msg
469 )));
470 }
471 let project_output = tokio::process::Command::new("gcloud")
472 .args(["config", "set", "project", &self.project_id])
473 .output()
474 .await
475 .map_err(|e| {
476 ObservabilityError::AuthenticationError(format!("Failed to set project: {}", e))
477 })?;
478 if !project_output.status.success() {
479 let error_msg = String::from_utf8_lossy(&project_output.stderr);
480 return Err(ObservabilityError::AuthenticationError(format!(
481 "Failed to set project: {}",
482 error_msg
483 )));
484 }
485 Ok(())
486 }
487
488 async fn verify_authentication(&self) -> Result<(), ObservabilityError> {
489 let output = tokio::process::Command::new("gcloud")
490 .args(["auth", "list", "--format=json"])
491 .output()
492 .await
493 .map_err(|e| {
494 ObservabilityError::AuthenticationError(format!("Failed to verify auth: {}", e))
495 })?;
496 if !output.status.success() {
497 return Err(ObservabilityError::AuthenticationError(
498 "Authentication verification failed".to_string(),
499 ));
500 }
501 Ok(())
502 }
503
504 pub async fn get_identity_token(&self) -> Result<String, ObservabilityError> {
505 match self.get_identity_token_internal().await {
506 Ok(token) => Ok(token),
507 Err(e) => {
508 if e.to_string().contains("not logged in")
509 || e.to_string().contains("authentication")
510 || e.to_string().contains("expired")
511 {
512 self.refresh_authentication().await?;
513 self.get_identity_token_internal().await
514 } else {
515 Err(e)
516 }
517 }
518 }
519 }
520
521 async fn get_identity_token_internal(&self) -> Result<String, ObservabilityError> {
522 let output = tokio::process::Command::new("gcloud")
523 .args(["auth", "print-identity-token"])
524 .output()
525 .await
526 .map_err(|e| {
527 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
528 })?;
529 if !output.status.success() {
530 let error_msg = String::from_utf8_lossy(&output.stderr);
531 return Err(ObservabilityError::AuthenticationError(format!(
532 "Failed to get identity token: {}",
533 error_msg
534 )));
535 }
536 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
537 }
538
539 async fn get_access_token_with_retry(&self) -> Result<String, ObservabilityError> {
540 match self.get_access_token().await {
541 Ok(token) => Ok(token),
542 Err(e) => {
543 if e.to_string().contains("not logged in")
544 || e.to_string().contains("authentication")
545 || e.to_string().contains("expired")
546 {
547 self.refresh_authentication().await?;
548 self.get_access_token().await
549 } else {
550 Err(e)
551 }
552 }
553 }
554 }
555
556 async fn get_access_token(&self) -> Result<String, ObservabilityError> {
557 let output = tokio::process::Command::new("gcloud")
558 .args(["auth", "print-access-token"])
559 .output()
560 .await
561 .map_err(|e| {
562 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
563 })?;
564 if !output.status.success() {
565 let error_msg = String::from_utf8_lossy(&output.stderr);
566 return Err(ObservabilityError::AuthenticationError(format!(
567 "Failed to get access token: {}",
568 error_msg
569 )));
570 }
571 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
572 }
573
574 async fn refresh_authentication(&self) -> Result<(), ObservabilityError> {
575 let output = tokio::process::Command::new("gcloud")
576 .args([
577 "auth",
578 "activate-service-account",
579 "--key-file",
580 &self.service_account_path,
581 ])
582 .output()
583 .await
584 .map_err(|e| {
585 ObservabilityError::AuthenticationError(format!("Failed to refresh auth: {}", e))
586 })?;
587 if !output.status.success() {
588 let error_msg = String::from_utf8_lossy(&output.stderr);
589 return Err(ObservabilityError::AuthenticationError(format!(
590 "Failed to refresh authentication: {}",
591 error_msg
592 )));
593 }
594 Ok(())
595 }
596
597 async fn execute_api_request(
598 &self,
599 api_url: &str,
600 payload: &str,
601 operation_name: &str,
602 ) -> Result<(), ObservabilityError> {
603 let mut retries = 0;
604 const MAX_RETRIES: u32 = 2;
605
606 loop {
607 let access_token = self.get_access_token_with_retry().await?;
608 let output = tokio::process::Command::new("curl")
609 .args([
610 "-X",
611 "POST",
612 api_url,
613 "-H",
614 "Content-Type: application/json",
615 "-H",
616 &format!("Authorization: Bearer {}", access_token),
617 "-d",
618 payload,
619 "-s",
620 "-w",
621 "%{http_code}",
622 ])
623 .output()
624 .await
625 .map_err(|e| {
626 ObservabilityError::ApiError(format!(
627 "Failed to execute {} request: {}",
628 operation_name, e
629 ))
630 })?;
631
632 let response_body = String::from_utf8_lossy(&output.stdout);
633 let status_code = response_body
634 .chars()
635 .rev()
636 .take(3)
637 .collect::<String>()
638 .chars()
639 .rev()
640 .collect::<String>();
641
642 if output.status.success() && (status_code.starts_with("20") || status_code == "200") {
643 return Ok(());
644 }
645
646 let error_msg = String::from_utf8_lossy(&output.stderr);
647 if (status_code == "401" || status_code == "403") && retries < MAX_RETRIES {
648 retries += 1;
649 self.refresh_authentication().await?;
650 continue;
651 }
652
653 return Err(ObservabilityError::ApiError(format!(
654 "{} API call failed with status {}: {} - Response: {}",
655 operation_name, status_code, error_msg, response_body
656 )));
657 }
658 }
659
660 async fn send_log_impl(&self, log_entry: LogEntry) -> Result<(), ObservabilityError> {
663 let timestamp = SystemTime::now()
664 .duration_since(UNIX_EPOCH)
665 .unwrap()
666 .as_secs();
667 let mut labels = HashMap::new();
668
669 if let Some(service) = log_entry.service_name.or(self.service_name.clone()) {
671 labels.insert("service_name".to_string(), service);
672 }
673
674 let log_name = log_entry
675 .log_name
676 .clone()
677 .unwrap_or_else(|| "gcp-observability-rs".to_string());
678
679 let log_entry_json = json!({
680 "entries": [{
681 "logName": format!("projects/{}/logs/{}", self.project_id, log_name),
682 "resource": { "type": "global" },
683 "timestamp": DateTime::<Utc>::from(UNIX_EPOCH + std::time::Duration::from_secs(timestamp))
684 .format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
685 "severity": log_entry.severity,
686 "textPayload": log_entry.message,
687 "labels": labels
688 }]
689 });
690 let api_url = "https://logging.googleapis.com/v2/entries:write";
691 self.execute_api_request(api_url, &log_entry_json.to_string(), "Logging")
692 .await?;
693 Ok(())
694 }
695
696 async fn send_metric_impl(&self, metric_data: MetricData) -> Result<(), ObservabilityError> {
697 let timestamp = SystemTime::now();
698 let timestamp_str = DateTime::<Utc>::from(timestamp)
699 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
700 .to_string();
701
702 let time_series = json!({
703 "timeSeries": [{
704 "metric": {
705 "type": metric_data.metric_type,
706 "labels": metric_data.labels.unwrap_or_default()
707 },
708 "resource": { "type": "global", "labels": {} },
709 "points": [{
710 "interval": { "endTime": timestamp_str },
711 "value": {
712 &format!("{}Value", metric_data.value_type.to_lowercase()): metric_data.value
713 }
714 }]
715 }]
716 });
717 let api_url = &format!(
718 "https://monitoring.googleapis.com/v3/projects/{}/timeSeries",
719 self.project_id
720 );
721 self.execute_api_request(api_url, &time_series.to_string(), "Monitoring")
722 .await?;
723 Ok(())
724 }
725
726 async fn send_trace_span_impl(&self, trace_span: TraceSpan) -> Result<(), ObservabilityError> {
727 let start_timestamp = DateTime::<Utc>::from(trace_span.start_time);
728 let end_time = trace_span.start_time + trace_span.duration;
729 let end_timestamp = DateTime::<Utc>::from(end_time);
730
731 let mut attributes_json = json!({});
732 if !trace_span.attributes.is_empty() {
733 let mut attribute_map = serde_json::Map::new();
734 for (k, v) in trace_span.attributes {
735 attribute_map.insert(k, json!({ "string_value": { "value": v } }));
736 }
737 attributes_json = json!({ "attributeMap": attribute_map });
738 }
739
740 let mut span = json!({
741 "name": format!("projects/{}/traces/{}/spans/{}", self.project_id, trace_span.trace_id, trace_span.span_id),
742 "spanId": trace_span.span_id,
743 "displayName": { "value": trace_span.display_name },
744 "startTime": start_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
745 "endTime": end_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
746 "attributes": attributes_json
747 });
748
749 if let Some(parent_id) = &trace_span.parent_span_id {
750 span["parentSpanId"] = json!(parent_id);
751 }
752
753 if let Some(status) = &trace_span.status {
754 span["status"] = json!({
755 "code": status.code,
756 "message": status.message
757 });
758 }
759
760 let spans_payload = json!({ "spans": [span] });
761 let api_url = &format!(
762 "https://cloudtrace.googleapis.com/v2/projects/{}/traces:batchWrite",
763 self.project_id
764 );
765 self.execute_api_request(api_url, &spans_payload.to_string(), "Tracing")
766 .await?;
767 Ok(())
768 }
769
770 pub fn generate_trace_id() -> String {
772 format!("{:032x}", Uuid::new_v4().as_u128())
773 }
774 pub fn generate_span_id() -> String {
775 format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
776 }
777}
778