1pub mod helpers;
135pub mod pubsub;
136
137use async_trait::async_trait;
138use chrono::{DateTime, Utc};
139use crossbeam::channel::{bounded, Receiver, Sender};
140use serde_json::json;
141use std::collections::HashMap;
142use std::time::{Duration, SystemTime, UNIX_EPOCH};
143use uuid::Uuid;
144
145#[derive(Debug)]
147pub enum ObservabilityError {
148 AuthenticationError(String),
149 ApiError(String),
150 SetupError(String),
151 Shutdown,
153}
154
155impl std::fmt::Display for ObservabilityError {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 match self {
158 ObservabilityError::AuthenticationError(msg) => {
159 write!(f, "Authentication error: {}", msg)
160 }
161 ObservabilityError::ApiError(msg) => write!(f, "API error: {}", msg),
162 ObservabilityError::SetupError(msg) => write!(f, "Setup error: {}", msg),
163 ObservabilityError::Shutdown => write!(f, "Shutdown requested"),
164 }
165 }
166}
167impl std::error::Error for ObservabilityError {}
168
169#[async_trait]
171pub trait Handle: Send {
172 async fn handle(
173 self: Box<Self>,
174 client: &ObservabilityClient,
175 ) -> Result<(), ObservabilityError>;
176}
177
178#[derive(Debug, Clone)]
180pub struct LogEntry {
181 pub severity: String,
182 pub message: String,
183 pub service_name: Option<String>,
184 pub log_name: Option<String>,
185}
186impl LogEntry {
187 pub fn new(severity: impl Into<String>, message: impl Into<String>) -> Self {
188 Self {
189 severity: severity.into(),
190 message: message.into(),
191 service_name: None,
192 log_name: None,
193 }
194 }
195 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
196 self.service_name = Some(service_name.into());
197 self
198 }
199 pub fn with_log_name(mut self, log_name: impl Into<String>) -> Self {
200 self.log_name = Some(log_name.into());
201 self
202 }
203}
204#[async_trait]
205impl Handle for LogEntry {
206 async fn handle(
207 self: Box<Self>,
208 client: &ObservabilityClient,
209 ) -> Result<(), ObservabilityError> {
210 client.send_log_impl(*self).await
211 }
212}
213
214#[derive(Debug, Clone)]
216pub struct MetricData {
217 pub metric_type: String,
218 pub value: f64,
219 pub value_type: String,
220 pub metric_kind: String,
221 pub labels: Option<HashMap<String, String>>,
222}
223impl MetricData {
224 pub fn new(
225 metric_type: impl Into<String>,
226 value: f64,
227 value_type: impl Into<String>,
228 metric_kind: impl Into<String>,
229 ) -> Self {
230 Self {
231 metric_type: metric_type.into(),
232 value,
233 value_type: value_type.into(),
234 metric_kind: metric_kind.into(),
235 labels: None,
236 }
237 }
238 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
239 self.labels = Some(labels);
240 self
241 }
242}
243#[async_trait]
244impl Handle for MetricData {
245 async fn handle(
246 self: Box<Self>,
247 client: &ObservabilityClient,
248 ) -> Result<(), ObservabilityError> {
249 client.send_metric_impl(*self).await
250 }
251}
252
253#[derive(Debug, Clone)]
255pub struct TraceSpan {
256 pub trace_id: String,
257 pub span_id: String,
258 pub display_name: String,
259 pub start_time: SystemTime,
260 pub duration: Duration,
261 pub parent_span_id: Option<String>,
262 pub attributes: HashMap<String, String>,
263 pub status: Option<TraceStatus>,
264}
265
266#[derive(Debug, Clone)]
267pub struct TraceStatus {
268 pub code: i32, pub message: Option<String>,
270}
271
272impl TraceSpan {
273 pub fn new(
274 trace_id: impl Into<String>,
275 span_id: impl Into<String>,
276 display_name: impl Into<String>,
277 start_time: SystemTime,
278 duration: Duration,
279 ) -> Self {
280 Self {
281 trace_id: trace_id.into(),
282 span_id: span_id.into(),
283 display_name: display_name.into(),
284 start_time,
285 duration,
286 parent_span_id: None,
287 attributes: HashMap::new(),
288 status: None,
289 }
290 }
291 pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
292 self.parent_span_id = Some(parent_span_id.into());
293 self
294 }
295 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
296 self.attributes.insert(key.into(), value.into());
297 self
298 }
299 pub fn with_status_error(mut self, message: impl Into<String>) -> Self {
300 self.status = Some(TraceStatus {
301 code: 2, message: Some(message.into()),
303 });
304 self
305 }
306 pub fn child(&self, name: impl Into<String>, start_time: SystemTime, duration: Duration) -> Self {
307 Self {
308 trace_id: self.trace_id.clone(), span_id: ObservabilityClient::generate_span_id(), parent_span_id: Some(self.span_id.clone()), display_name: name.into(),
312 start_time,
313 duration,
314 attributes: HashMap::new(),
315 status: None,
316 }
317 }
318}
319#[async_trait]
320impl Handle for TraceSpan {
321 async fn handle(
322 self: Box<Self>,
323 client: &ObservabilityClient,
324 ) -> Result<(), ObservabilityError> {
325 client.send_trace_span_impl(*self).await
326 }
327}
328
329#[derive(Debug, Clone, Copy)]
331pub struct SIGTERM;
332#[async_trait]
333impl Handle for SIGTERM {
334 async fn handle(
335 self: Box<Self>,
336 _client: &ObservabilityClient,
337 ) -> Result<(), ObservabilityError> {
338 Err(ObservabilityError::Shutdown)
339 }
340}
341
342#[derive(Clone)]
344pub struct ObservabilityClient {
345 project_id: String,
346 service_account_path: String,
347 service_name: Option<String>,
348 tx: Sender<Box<dyn Handle>>,
349}
350
351impl ObservabilityClient {
352 pub async fn new(
353 project_id: String,
354 service_account_path: String,
355 service_name: Option<String>,
356 ) -> Result<Self, ObservabilityError> {
357 let (tx, rx): (Sender<Box<dyn Handle>>, Receiver<Box<dyn Handle>>) = bounded(1027);
358
359 let client = Self {
360 project_id,
361 service_account_path,
362 service_name,
363 tx,
364 };
365
366 client.ensure_gcloud_installed().await?;
368 client.setup_authentication().await?;
369 client.verify_authentication().await?;
370
371 let client_clone = client.clone();
373 let handle = tokio::runtime::Handle::current();
374 std::thread::spawn(move || {
375 while let Ok(msg) = rx.recv() {
376 let result = handle.block_on(async { msg.handle(&client_clone).await });
377 match result {
378 Ok(()) => {}
379 Err(ObservabilityError::Shutdown) => {
380 break;
381 }
382 Err(_e) => {
383 }
385 }
386 }
387 });
388
389 Ok(client)
390 }
391
392 pub fn send_log(
395 &self,
396 entry: LogEntry,
397 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
398 self.tx.send(Box::new(entry))
399 }
400
401 pub fn send_metric(
402 &self,
403 data: MetricData,
404 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
405 self.tx.send(Box::new(data))
406 }
407
408 pub fn send_trace(
409 &self,
410 span: TraceSpan,
411 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
412 self.tx.send(Box::new(span))
413 }
414
415 pub fn shutdown(&self) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
416 self.tx.send(Box::new(SIGTERM))
417 }
418
419 async fn ensure_gcloud_installed(&self) -> Result<(), ObservabilityError> {
422 let output = tokio::process::Command::new("gcloud")
423 .arg("version")
424 .output()
425 .await;
426 match output {
427 Ok(output) if output.status.success() => Ok(()),
428 _ => self.install_gcloud().await,
429 }
430 }
431
432 async fn install_gcloud(&self) -> Result<(), ObservabilityError> {
433 let install_command = if cfg!(target_os = "macos") {
434 "curl https://sdk.cloud.google.com | bash"
435 } else {
436 "curl https://sdk.cloud.google.com | bash"
437 };
438 let output = tokio::process::Command::new("sh")
439 .arg("-c")
440 .arg(install_command)
441 .output()
442 .await
443 .map_err(|e| {
444 ObservabilityError::SetupError(format!("Failed to install gcloud: {}", e))
445 })?;
446 if !output.status.success() {
447 return Err(ObservabilityError::SetupError(
448 "Failed to install gcloud CLI. Please install manually from https://cloud.google.com/sdk/docs/install".to_string(),
449 ));
450 }
451 Ok(())
452 }
453
454 async fn setup_authentication(&self) -> Result<(), ObservabilityError> {
455 let output = tokio::process::Command::new("gcloud")
456 .args([
457 "auth",
458 "activate-service-account",
459 "--key-file",
460 &self.service_account_path,
461 ])
462 .output()
463 .await
464 .map_err(|e| {
465 ObservabilityError::AuthenticationError(format!("Failed to run gcloud auth: {}", e))
466 })?;
467 if !output.status.success() {
468 let error_msg = String::from_utf8_lossy(&output.stderr);
469 return Err(ObservabilityError::AuthenticationError(format!(
470 "Failed to authenticate with service account: {}",
471 error_msg
472 )));
473 }
474 let project_output = tokio::process::Command::new("gcloud")
475 .args(["config", "set", "project", &self.project_id])
476 .output()
477 .await
478 .map_err(|e| {
479 ObservabilityError::AuthenticationError(format!("Failed to set project: {}", e))
480 })?;
481 if !project_output.status.success() {
482 let error_msg = String::from_utf8_lossy(&project_output.stderr);
483 return Err(ObservabilityError::AuthenticationError(format!(
484 "Failed to set project: {}",
485 error_msg
486 )));
487 }
488 Ok(())
489 }
490
491 async fn verify_authentication(&self) -> Result<(), ObservabilityError> {
492 let output = tokio::process::Command::new("gcloud")
493 .args(["auth", "list", "--format=json"])
494 .output()
495 .await
496 .map_err(|e| {
497 ObservabilityError::AuthenticationError(format!("Failed to verify auth: {}", e))
498 })?;
499 if !output.status.success() {
500 return Err(ObservabilityError::AuthenticationError(
501 "Authentication verification failed".to_string(),
502 ));
503 }
504 Ok(())
505 }
506
507 pub async fn get_identity_token(&self) -> Result<String, ObservabilityError> {
508 match self.get_identity_token_internal().await {
509 Ok(token) => Ok(token),
510 Err(e) => {
511 if e.to_string().contains("not logged in")
512 || e.to_string().contains("authentication")
513 || e.to_string().contains("expired")
514 {
515 self.refresh_authentication().await?;
516 self.get_identity_token_internal().await
517 } else {
518 Err(e)
519 }
520 }
521 }
522 }
523
524 async fn get_identity_token_internal(&self) -> Result<String, ObservabilityError> {
525 let output = tokio::process::Command::new("gcloud")
526 .args(["auth", "print-identity-token"])
527 .output()
528 .await
529 .map_err(|e| {
530 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
531 })?;
532 if !output.status.success() {
533 let error_msg = String::from_utf8_lossy(&output.stderr);
534 return Err(ObservabilityError::AuthenticationError(format!(
535 "Failed to get identity token: {}",
536 error_msg
537 )));
538 }
539 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
540 }
541
542 async fn get_access_token_with_retry(&self) -> Result<String, ObservabilityError> {
543 match self.get_access_token().await {
544 Ok(token) => Ok(token),
545 Err(e) => {
546 if e.to_string().contains("not logged in")
547 || e.to_string().contains("authentication")
548 || e.to_string().contains("expired")
549 {
550 self.refresh_authentication().await?;
551 self.get_access_token().await
552 } else {
553 Err(e)
554 }
555 }
556 }
557 }
558
559 async fn get_access_token(&self) -> Result<String, ObservabilityError> {
560 let output = tokio::process::Command::new("gcloud")
561 .args(["auth", "print-access-token"])
562 .output()
563 .await
564 .map_err(|e| {
565 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
566 })?;
567 if !output.status.success() {
568 let error_msg = String::from_utf8_lossy(&output.stderr);
569 return Err(ObservabilityError::AuthenticationError(format!(
570 "Failed to get access token: {}",
571 error_msg
572 )));
573 }
574 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
575 }
576
577 async fn refresh_authentication(&self) -> Result<(), ObservabilityError> {
578 let output = tokio::process::Command::new("gcloud")
579 .args([
580 "auth",
581 "activate-service-account",
582 "--key-file",
583 &self.service_account_path,
584 ])
585 .output()
586 .await
587 .map_err(|e| {
588 ObservabilityError::AuthenticationError(format!("Failed to refresh auth: {}", e))
589 })?;
590 if !output.status.success() {
591 let error_msg = String::from_utf8_lossy(&output.stderr);
592 return Err(ObservabilityError::AuthenticationError(format!(
593 "Failed to refresh authentication: {}",
594 error_msg
595 )));
596 }
597 Ok(())
598 }
599
600 async fn execute_api_request(
601 &self,
602 api_url: &str,
603 payload: &str,
604 operation_name: &str,
605 ) -> Result<(), ObservabilityError> {
606 let mut retries = 0;
607 const MAX_RETRIES: u32 = 2;
608
609 loop {
610 let access_token = self.get_access_token_with_retry().await?;
611 let output = tokio::process::Command::new("curl")
612 .args([
613 "-X",
614 "POST",
615 api_url,
616 "-H",
617 "Content-Type: application/json",
618 "-H",
619 &format!("Authorization: Bearer {}", access_token),
620 "-d",
621 payload,
622 "-s",
623 "-w",
624 "%{http_code}",
625 ])
626 .output()
627 .await
628 .map_err(|e| {
629 ObservabilityError::ApiError(format!(
630 "Failed to execute {} request: {}",
631 operation_name, e
632 ))
633 })?;
634
635 let response_body = String::from_utf8_lossy(&output.stdout);
636 let status_code = response_body
637 .chars()
638 .rev()
639 .take(3)
640 .collect::<String>()
641 .chars()
642 .rev()
643 .collect::<String>();
644
645 if output.status.success() && (status_code.starts_with("20") || status_code == "200") {
646 return Ok(());
647 }
648
649 let error_msg = String::from_utf8_lossy(&output.stderr);
650 if (status_code == "401" || status_code == "403") && retries < MAX_RETRIES {
651 retries += 1;
652 self.refresh_authentication().await?;
653 continue;
654 }
655
656 return Err(ObservabilityError::ApiError(format!(
657 "{} API call failed with status {}: {} - Response: {}",
658 operation_name, status_code, error_msg, response_body
659 )));
660 }
661 }
662
663 async fn send_log_impl(&self, log_entry: LogEntry) -> Result<(), ObservabilityError> {
666 let timestamp = SystemTime::now()
667 .duration_since(UNIX_EPOCH)
668 .unwrap()
669 .as_secs();
670 let mut labels = HashMap::new();
671
672 if let Some(service) = log_entry.service_name.or(self.service_name.clone()) {
674 labels.insert("service_name".to_string(), service);
675 }
676
677 let log_name = log_entry
678 .log_name
679 .clone()
680 .unwrap_or_else(|| "gcp-observability-rs".to_string());
681
682 let log_entry_json = json!({
683 "entries": [{
684 "logName": format!("projects/{}/logs/{}", self.project_id, log_name),
685 "resource": { "type": "global" },
686 "timestamp": DateTime::<Utc>::from(UNIX_EPOCH + std::time::Duration::from_secs(timestamp))
687 .format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
688 "severity": log_entry.severity,
689 "textPayload": log_entry.message,
690 "labels": labels
691 }]
692 });
693 let api_url = "https://logging.googleapis.com/v2/entries:write";
694 self.execute_api_request(api_url, &log_entry_json.to_string(), "Logging")
695 .await?;
696 Ok(())
697 }
698
699 async fn send_metric_impl(&self, metric_data: MetricData) -> Result<(), ObservabilityError> {
700 let timestamp = SystemTime::now();
701 let timestamp_str = DateTime::<Utc>::from(timestamp)
702 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
703 .to_string();
704
705 let time_series = json!({
706 "timeSeries": [{
707 "metric": {
708 "type": metric_data.metric_type,
709 "labels": metric_data.labels.unwrap_or_default()
710 },
711 "resource": { "type": "global", "labels": {} },
712 "points": [{
713 "interval": { "endTime": timestamp_str },
714 "value": {
715 &format!("{}Value", metric_data.value_type.to_lowercase()): metric_data.value
716 }
717 }]
718 }]
719 });
720 let api_url = &format!(
721 "https://monitoring.googleapis.com/v3/projects/{}/timeSeries",
722 self.project_id
723 );
724 self.execute_api_request(api_url, &time_series.to_string(), "Monitoring")
725 .await?;
726 Ok(())
727 }
728
729 async fn send_trace_span_impl(&self, trace_span: TraceSpan) -> Result<(), ObservabilityError> {
730 let start_timestamp = DateTime::<Utc>::from(trace_span.start_time);
731 let end_time = trace_span.start_time + trace_span.duration;
732 let end_timestamp = DateTime::<Utc>::from(end_time);
733
734 let mut attributes_json = json!({});
735 if !trace_span.attributes.is_empty() {
736 let mut attribute_map = serde_json::Map::new();
737 for (k, v) in trace_span.attributes {
738 attribute_map.insert(k, json!({ "string_value": { "value": v } }));
739 }
740 attributes_json = json!({ "attributeMap": attribute_map });
741 }
742
743 let mut span = json!({
744 "name": format!("projects/{}/traces/{}/spans/{}", self.project_id, trace_span.trace_id, trace_span.span_id),
745 "spanId": trace_span.span_id,
746 "displayName": { "value": trace_span.display_name },
747 "startTime": start_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
748 "endTime": end_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
749 "attributes": attributes_json
750 });
751
752 if let Some(parent_id) = &trace_span.parent_span_id {
753 span["parentSpanId"] = json!(parent_id);
754 }
755
756 if let Some(status) = &trace_span.status {
757 span["status"] = json!({
758 "code": status.code,
759 "message": status.message
760 });
761 }
762
763 let spans_payload = json!({ "spans": [span] });
764 let api_url = &format!(
765 "https://cloudtrace.googleapis.com/v2/projects/{}/traces:batchWrite",
766 self.project_id
767 );
768 self.execute_api_request(api_url, &spans_payload.to_string(), "Tracing")
769 .await?;
770 Ok(())
771 }
772
773 pub fn generate_trace_id() -> String {
775 format!("{:032x}", Uuid::new_v4().as_u128())
776 }
777 pub fn generate_span_id() -> String {
778 format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
779 }
780}
781