1pub mod helpers;
110pub mod pubsub;
111
112use async_trait::async_trait;
113use chrono::{DateTime, Utc};
114use crossbeam::channel::{bounded, Receiver, Sender};
115use serde_json::json;
116use std::collections::HashMap;
117use std::time::{Duration, SystemTime};
118use uuid::Uuid;
119
120#[derive(Debug)]
122pub enum ObservabilityError {
123 AuthenticationError(String),
124 ApiError(String),
125 SetupError(String),
126 Shutdown,
128}
129
130impl std::fmt::Display for ObservabilityError {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 ObservabilityError::AuthenticationError(msg) => {
134 write!(f, "Authentication error: {}", msg)
135 }
136 ObservabilityError::ApiError(msg) => write!(f, "API error: {}", msg),
137 ObservabilityError::SetupError(msg) => write!(f, "Setup error: {}", msg),
138 ObservabilityError::Shutdown => write!(f, "Shutdown requested"),
139 }
140 }
141}
142impl std::error::Error for ObservabilityError {}
143
144#[async_trait]
146pub trait Handle: Send {
147 async fn handle(
148 self: Box<Self>,
149 client: &ObservabilityClient,
150 ) -> Result<(), ObservabilityError>;
151}
152
153#[derive(Debug, Clone)]
155pub struct LogEntry {
156 pub severity: String,
157 pub message: String,
158 pub service_name: Option<String>,
159 pub log_name: Option<String>,
160 pub json_payload: Option<serde_json::Value>,
161 pub labels: Option<HashMap<String, String>>,
162 pub insert_id: Option<String>,
163}
164impl LogEntry {
165 pub fn new(severity: impl Into<String>, message: impl Into<String>) -> Self {
166 Self {
167 severity: severity.into(),
168 message: message.into(),
169 service_name: None,
170 log_name: None,
171 json_payload: None,
172 labels: None,
173 insert_id: None,
174 }
175 }
176
177 pub fn new_json(severity: impl Into<String>, json_payload: serde_json::Value) -> Self {
181 Self {
182 severity: severity.into(),
183 message: String::new(),
184 service_name: None,
185 log_name: None,
186 json_payload: Some(json_payload),
187 labels: None,
188 insert_id: None,
189 }
190 }
191
192 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
193 self.service_name = Some(service_name.into());
194 self
195 }
196 pub fn with_log_name(mut self, log_name: impl Into<String>) -> Self {
197 self.log_name = Some(log_name.into());
198 self
199 }
200
201 pub fn with_json_payload(mut self, json_payload: serde_json::Value) -> Self {
203 self.json_payload = Some(json_payload);
204 self
205 }
206
207 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
209 self.labels = Some(labels);
210 self
211 }
212
213 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
215 let labels = self.labels.get_or_insert_with(HashMap::new);
216 labels.insert(key.into(), value.into());
217 self
218 }
219
220 pub fn with_insert_id(mut self, insert_id: impl Into<String>) -> Self {
222 self.insert_id = Some(insert_id.into());
223 self
224 }
225}
226#[async_trait]
227impl Handle for LogEntry {
228 async fn handle(
229 self: Box<Self>,
230 client: &ObservabilityClient,
231 ) -> Result<(), ObservabilityError> {
232 client.send_log_impl(*self).await
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct MetricData {
239 pub metric_type: String,
240 pub value: f64,
241 pub value_type: String,
242 pub metric_kind: String,
243 pub labels: Option<HashMap<String, String>>,
244}
245impl MetricData {
246 pub fn new(
247 metric_type: impl Into<String>,
248 value: f64,
249 value_type: impl Into<String>,
250 metric_kind: impl Into<String>,
251 ) -> Self {
252 Self {
253 metric_type: metric_type.into(),
254 value,
255 value_type: value_type.into(),
256 metric_kind: metric_kind.into(),
257 labels: None,
258 }
259 }
260 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
261 self.labels = Some(labels);
262 self
263 }
264}
265#[async_trait]
266impl Handle for MetricData {
267 async fn handle(
268 self: Box<Self>,
269 client: &ObservabilityClient,
270 ) -> Result<(), ObservabilityError> {
271 client.send_metric_impl(*self).await
272 }
273}
274
275#[derive(Debug, Clone)]
277pub struct TraceSpan {
278 pub trace_id: String,
279 pub span_id: String,
280 pub display_name: String,
281 pub start_time: SystemTime,
282 pub duration: Duration,
283 pub parent_span_id: Option<String>,
284 pub attributes: HashMap<String, String>,
285 pub status: Option<TraceStatus>,
286}
287
288#[derive(Debug, Clone)]
289pub struct TraceStatus {
290 pub code: i32, pub message: Option<String>,
292}
293
294impl TraceSpan {
295 pub fn new(
296 trace_id: impl Into<String>,
297 span_id: impl Into<String>,
298 display_name: impl Into<String>,
299 start_time: SystemTime,
300 duration: Duration,
301 ) -> Self {
302 Self {
303 trace_id: trace_id.into(),
304 span_id: span_id.into(),
305 display_name: display_name.into(),
306 start_time,
307 duration,
308 parent_span_id: None,
309 attributes: HashMap::new(),
310 status: None,
311 }
312 }
313 pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
314 self.parent_span_id = Some(parent_span_id.into());
315 self
316 }
317 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
318 self.attributes.insert(key.into(), value.into());
319 self
320 }
321 pub fn with_status_error(mut self, message: impl Into<String>) -> Self {
322 self.status = Some(TraceStatus {
323 code: 2, message: Some(message.into()),
325 });
326 self
327 }
328 pub fn child(
329 &self,
330 name: impl Into<String>,
331 start_time: SystemTime,
332 duration: Duration,
333 ) -> Self {
334 Self {
335 trace_id: self.trace_id.clone(), span_id: ObservabilityClient::generate_span_id(), parent_span_id: Some(self.span_id.clone()), display_name: name.into(),
339 start_time,
340 duration,
341 attributes: HashMap::new(),
342 status: None,
343 }
344 }
345}
346#[async_trait]
347impl Handle for TraceSpan {
348 async fn handle(
349 self: Box<Self>,
350 client: &ObservabilityClient,
351 ) -> Result<(), ObservabilityError> {
352 client.send_trace_span_impl(*self).await
353 }
354}
355
356#[derive(Debug, Clone, Copy)]
358pub struct SIGTERM;
359#[async_trait]
360impl Handle for SIGTERM {
361 async fn handle(
362 self: Box<Self>,
363 _client: &ObservabilityClient,
364 ) -> Result<(), ObservabilityError> {
365 Err(ObservabilityError::Shutdown)
366 }
367}
368
369#[derive(Clone)]
371pub struct ObservabilityClient {
372 project_id: String,
373 service_account_path: String,
374 service_name: Option<String>,
375 tx: Sender<Box<dyn Handle>>,
376}
377
378impl ObservabilityClient {
379 pub async fn new(
380 project_id: Option<String>,
381 service_name: Option<String>,
382 ) -> Result<Self, ObservabilityError> {
383 let (tx, rx): (Sender<Box<dyn Handle>>, Receiver<Box<dyn Handle>>) = bounded(1027);
384
385 let service_account_path = helpers::gcp_config::credentials_path_from_env()
386 .map_err(|e| ObservabilityError::SetupError(e))?;
387
388 let mut project_id = project_id.unwrap_or_default();
389
390 let mut client = Self {
391 project_id: project_id.clone(),
392 service_account_path,
393 service_name,
394 tx,
395 };
396
397 client.ensure_gcloud_installed().await?;
399
400 if project_id.trim().is_empty() {
401 project_id = helpers::gcp_config::resolve_project_id(None)
402 .await
403 .map_err(|e| ObservabilityError::SetupError(e))?;
404 client.project_id = project_id;
405 }
406
407 client.setup_authentication().await?;
408 client.verify_authentication().await?;
409
410 let client_clone = client.clone();
412 let handle = tokio::runtime::Handle::current();
413 std::thread::spawn(move || {
414 while let Ok(msg) = rx.recv() {
415 let result = handle.block_on(async { msg.handle(&client_clone).await });
416 match result {
417 Ok(()) => {}
418 Err(ObservabilityError::Shutdown) => {
419 break;
420 }
421 Err(_e) => {
422 }
424 }
425 }
426 });
427
428 Ok(client)
429 }
430
431 pub fn send_log(
434 &self,
435 entry: LogEntry,
436 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
437 self.tx.send(Box::new(entry))
438 }
439
440 pub fn send_metric(
441 &self,
442 data: MetricData,
443 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
444 self.tx.send(Box::new(data))
445 }
446
447 pub fn send_trace(
448 &self,
449 span: TraceSpan,
450 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
451 self.tx.send(Box::new(span))
452 }
453
454 pub fn shutdown(&self) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
455 self.tx.send(Box::new(SIGTERM))
456 }
457
458 async fn ensure_gcloud_installed(&self) -> Result<(), ObservabilityError> {
461 let output = tokio::process::Command::new("gcloud")
462 .arg("version")
463 .output()
464 .await;
465 match output {
466 Ok(output) if output.status.success() => Ok(()),
467 _ => self.install_gcloud().await,
468 }
469 }
470
471 async fn install_gcloud(&self) -> Result<(), ObservabilityError> {
472 let install_command = if cfg!(target_os = "macos") {
473 "curl https://sdk.cloud.google.com | bash"
474 } else {
475 "curl https://sdk.cloud.google.com | bash"
476 };
477 let output = tokio::process::Command::new("sh")
478 .arg("-c")
479 .arg(install_command)
480 .output()
481 .await
482 .map_err(|e| {
483 ObservabilityError::SetupError(format!("Failed to install gcloud: {}", e))
484 })?;
485 if !output.status.success() {
486 return Err(ObservabilityError::SetupError(
487 "Failed to install gcloud CLI. Please install manually from https://cloud.google.com/sdk/docs/install".to_string(),
488 ));
489 }
490 Ok(())
491 }
492
493 async fn setup_authentication(&self) -> Result<(), ObservabilityError> {
494 let output = tokio::process::Command::new("gcloud")
495 .args([
496 "auth",
497 "activate-service-account",
498 "--key-file",
499 &self.service_account_path,
500 ])
501 .output()
502 .await
503 .map_err(|e| {
504 ObservabilityError::AuthenticationError(format!("Failed to run gcloud auth: {}", e))
505 })?;
506 if !output.status.success() {
507 let error_msg = String::from_utf8_lossy(&output.stderr);
508 return Err(ObservabilityError::AuthenticationError(format!(
509 "Failed to authenticate with service account: {}",
510 error_msg
511 )));
512 }
513 let project_output = tokio::process::Command::new("gcloud")
514 .args(["config", "set", "project", &self.project_id])
515 .output()
516 .await
517 .map_err(|e| {
518 ObservabilityError::AuthenticationError(format!("Failed to set project: {}", e))
519 })?;
520 if !project_output.status.success() {
521 let error_msg = String::from_utf8_lossy(&project_output.stderr);
522 return Err(ObservabilityError::AuthenticationError(format!(
523 "Failed to set project: {}",
524 error_msg
525 )));
526 }
527 Ok(())
528 }
529
530 async fn verify_authentication(&self) -> Result<(), ObservabilityError> {
531 let output = tokio::process::Command::new("gcloud")
532 .args(["auth", "list", "--format=json"])
533 .output()
534 .await
535 .map_err(|e| {
536 ObservabilityError::AuthenticationError(format!("Failed to verify auth: {}", e))
537 })?;
538 if !output.status.success() {
539 return Err(ObservabilityError::AuthenticationError(
540 "Authentication verification failed".to_string(),
541 ));
542 }
543 Ok(())
544 }
545
546 pub async fn get_identity_token(&self) -> Result<String, ObservabilityError> {
552 self.get_identity_token_with_retry(None).await
553 }
554
555 pub async fn get_identity_token_for_audience(
559 &self,
560 audience: impl AsRef<str>,
561 ) -> Result<String, ObservabilityError> {
562 let audience = audience.as_ref().trim();
563 if audience.is_empty() {
564 return Err(ObservabilityError::SetupError(
565 "Audience must not be empty".to_string(),
566 ));
567 }
568
569 self.get_identity_token_with_retry(Some(audience.to_string()))
570 .await
571 }
572
573 async fn get_identity_token_with_retry(
574 &self,
575 audience: Option<String>,
576 ) -> Result<String, ObservabilityError> {
577 match self.get_identity_token_internal(audience.clone()).await {
578 Ok(token) => Ok(token),
579 Err(e) => {
580 if e.to_string().contains("not logged in")
581 || e.to_string().contains("authentication")
582 || e.to_string().contains("expired")
583 {
584 self.refresh_authentication().await?;
585 self.get_identity_token_internal(audience).await
586 } else {
587 Err(e)
588 }
589 }
590 }
591 }
592
593 async fn get_identity_token_internal(
594 &self,
595 audience: Option<String>,
596 ) -> Result<String, ObservabilityError> {
597 let mut command = tokio::process::Command::new("gcloud");
598 command.args(["auth", "print-identity-token"]);
599
600 if let Some(audience) = audience {
601 command.arg(format!("--audiences={}", audience));
602 }
603
604 let output = command.output().await.map_err(|e| {
605 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
606 })?;
607
608 if !output.status.success() {
609 let error_msg = String::from_utf8_lossy(&output.stderr);
610 return Err(ObservabilityError::AuthenticationError(format!(
611 "Failed to get identity token: {}",
612 error_msg
613 )));
614 }
615
616 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
617 }
618
619 async fn get_access_token_with_retry(&self) -> Result<String, ObservabilityError> {
620 match self.get_access_token().await {
621 Ok(token) => Ok(token),
622 Err(e) => {
623 if e.to_string().contains("not logged in")
624 || e.to_string().contains("authentication")
625 || e.to_string().contains("expired")
626 {
627 self.refresh_authentication().await?;
628 self.get_access_token().await
629 } else {
630 Err(e)
631 }
632 }
633 }
634 }
635
636 async fn get_access_token(&self) -> Result<String, ObservabilityError> {
637 let output = tokio::process::Command::new("gcloud")
638 .args(["auth", "print-access-token"])
639 .output()
640 .await
641 .map_err(|e| {
642 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
643 })?;
644 if !output.status.success() {
645 let error_msg = String::from_utf8_lossy(&output.stderr);
646 return Err(ObservabilityError::AuthenticationError(format!(
647 "Failed to get access token: {}",
648 error_msg
649 )));
650 }
651 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
652 }
653
654 async fn refresh_authentication(&self) -> Result<(), ObservabilityError> {
655 let output = tokio::process::Command::new("gcloud")
656 .args([
657 "auth",
658 "activate-service-account",
659 "--key-file",
660 &self.service_account_path,
661 ])
662 .output()
663 .await
664 .map_err(|e| {
665 ObservabilityError::AuthenticationError(format!("Failed to refresh auth: {}", e))
666 })?;
667 if !output.status.success() {
668 let error_msg = String::from_utf8_lossy(&output.stderr);
669 return Err(ObservabilityError::AuthenticationError(format!(
670 "Failed to refresh authentication: {}",
671 error_msg
672 )));
673 }
674 Ok(())
675 }
676
677 async fn execute_api_request(
678 &self,
679 api_url: &str,
680 payload: &str,
681 operation_name: &str,
682 ) -> Result<(), ObservabilityError> {
683 let mut retries = 0;
684 const MAX_RETRIES: u32 = 2;
685
686 loop {
687 let access_token = self.get_access_token_with_retry().await?;
688 let output = tokio::process::Command::new("curl")
689 .args([
690 "-X",
691 "POST",
692 api_url,
693 "-H",
694 "Content-Type: application/json",
695 "-H",
696 &format!("Authorization: Bearer {}", access_token),
697 "-d",
698 payload,
699 "-s",
700 "-w",
701 "%{http_code}",
702 ])
703 .output()
704 .await
705 .map_err(|e| {
706 ObservabilityError::ApiError(format!(
707 "Failed to execute {} request: {}",
708 operation_name, e
709 ))
710 })?;
711
712 let response_body = String::from_utf8_lossy(&output.stdout);
713 let status_code = response_body
714 .chars()
715 .rev()
716 .take(3)
717 .collect::<String>()
718 .chars()
719 .rev()
720 .collect::<String>();
721
722 if output.status.success() && (status_code.starts_with("20") || status_code == "200") {
723 return Ok(());
724 }
725
726 let error_msg = String::from_utf8_lossy(&output.stderr);
727 if (status_code == "401" || status_code == "403") && retries < MAX_RETRIES {
728 retries += 1;
729 self.refresh_authentication().await?;
730 continue;
731 }
732
733 return Err(ObservabilityError::ApiError(format!(
734 "{} API call failed with status {}: {} - Response: {}",
735 operation_name, status_code, error_msg, response_body
736 )));
737 }
738 }
739
740 async fn send_log_impl(&self, log_entry: LogEntry) -> Result<(), ObservabilityError> {
743 let now = SystemTime::now();
744 let timestamp =
745 DateTime::<Utc>::from(now).to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
746
747 let resolved_service_name = log_entry.service_name.or(self.service_name.clone());
749
750 let log_name = log_entry
753 .log_name
754 .or_else(|| resolved_service_name.clone())
755 .unwrap_or_else(|| "default".to_string());
756
757 let log_name_encoded = urlencoding::encode(&log_name);
759
760 let mut labels = log_entry.labels.unwrap_or_default();
762 if let Some(service) = resolved_service_name {
763 labels
765 .entry("service_name".to_string())
766 .or_insert_with(|| service.clone());
767 labels.entry("service".to_string()).or_insert(service);
768 }
769
770 let insert_id = log_entry
771 .insert_id
772 .unwrap_or_else(|| Uuid::new_v4().to_string());
773
774 let mut entry = json!({
775 "logName": format!("projects/{}/logs/{}", self.project_id, log_name_encoded),
776 "resource": {
777 "type": "global",
778 "labels": { "project_id": self.project_id }
779 },
780 "timestamp": timestamp,
781 "severity": log_entry.severity,
782 "labels": labels,
783 "insertId": insert_id,
784 });
785
786 if let Some(json_payload) = log_entry.json_payload {
788 entry["jsonPayload"] = json_payload;
789 } else {
790 entry["textPayload"] = json!(log_entry.message);
791 }
792
793 let log_entry_json = json!({ "entries": [entry] });
794 let api_url = "https://logging.googleapis.com/v2/entries:write";
795 self.execute_api_request(api_url, &log_entry_json.to_string(), "Logging")
796 .await?;
797 Ok(())
798 }
799
800 async fn send_metric_impl(&self, metric_data: MetricData) -> Result<(), ObservabilityError> {
801 let timestamp = SystemTime::now();
802 let timestamp_str = DateTime::<Utc>::from(timestamp)
803 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
804 .to_string();
805
806 let time_series = json!({
807 "timeSeries": [{
808 "metric": {
809 "type": metric_data.metric_type,
810 "labels": metric_data.labels.unwrap_or_default()
811 },
812 "resource": { "type": "global", "labels": {} },
813 "points": [{
814 "interval": { "endTime": timestamp_str },
815 "value": {
816 &format!("{}Value", metric_data.value_type.to_lowercase()): metric_data.value
817 }
818 }]
819 }]
820 });
821 let api_url = &format!(
822 "https://monitoring.googleapis.com/v3/projects/{}/timeSeries",
823 self.project_id
824 );
825 self.execute_api_request(api_url, &time_series.to_string(), "Monitoring")
826 .await?;
827 Ok(())
828 }
829
830 async fn send_trace_span_impl(&self, trace_span: TraceSpan) -> Result<(), ObservabilityError> {
831 let start_timestamp = DateTime::<Utc>::from(trace_span.start_time);
832 let end_time = trace_span.start_time + trace_span.duration;
833 let end_timestamp = DateTime::<Utc>::from(end_time);
834
835 let mut attributes_json = json!({});
836 if !trace_span.attributes.is_empty() {
837 let mut attribute_map = serde_json::Map::new();
838 for (k, v) in trace_span.attributes {
839 attribute_map.insert(k, json!({ "string_value": { "value": v } }));
840 }
841 attributes_json = json!({ "attributeMap": attribute_map });
842 }
843
844 let mut span = json!({
845 "name": format!("projects/{}/traces/{}/spans/{}", self.project_id, trace_span.trace_id, trace_span.span_id),
846 "spanId": trace_span.span_id,
847 "displayName": { "value": trace_span.display_name },
848 "startTime": start_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
849 "endTime": end_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
850 "attributes": attributes_json
851 });
852
853 if let Some(parent_id) = &trace_span.parent_span_id {
854 span["parentSpanId"] = json!(parent_id);
855 }
856
857 if let Some(status) = &trace_span.status {
858 span["status"] = json!({
859 "code": status.code,
860 "message": status.message
861 });
862 }
863
864 let spans_payload = json!({ "spans": [span] });
865 let api_url = &format!(
866 "https://cloudtrace.googleapis.com/v2/projects/{}/traces:batchWrite",
867 self.project_id
868 );
869 self.execute_api_request(api_url, &spans_payload.to_string(), "Tracing")
870 .await?;
871 Ok(())
872 }
873
874 pub fn generate_trace_id() -> String {
876 format!("{:032x}", Uuid::new_v4().as_u128())
877 }
878 pub fn generate_span_id() -> String {
879 format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
880 }
881}