1pub mod helpers;
110pub mod pubsub;
111
112use async_trait::async_trait;
113use chrono::{DateTime, Utc};
114use crossbeam::channel::{bounded, Receiver, Sender};
115use serde_json::json;
116use std::collections::HashMap;
117use std::time::{Duration, SystemTime};
118use uuid::Uuid;
119
120#[derive(Debug)]
122pub enum ObservabilityError {
123 AuthenticationError(String),
124 ApiError(String),
125 SetupError(String),
126 Shutdown,
128}
129
130impl std::fmt::Display for ObservabilityError {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 ObservabilityError::AuthenticationError(msg) => {
134 write!(f, "Authentication error: {}", msg)
135 }
136 ObservabilityError::ApiError(msg) => write!(f, "API error: {}", msg),
137 ObservabilityError::SetupError(msg) => write!(f, "Setup error: {}", msg),
138 ObservabilityError::Shutdown => write!(f, "Shutdown requested"),
139 }
140 }
141}
142impl std::error::Error for ObservabilityError {}
143
144#[async_trait]
146pub trait Handle: Send {
147 async fn handle(
148 self: Box<Self>,
149 client: &ObservabilityClient,
150 ) -> Result<(), ObservabilityError>;
151}
152
153#[derive(Debug, Clone)]
155pub struct LogEntry {
156 pub severity: String,
157 pub message: String,
158 pub service_name: Option<String>,
159 pub log_name: Option<String>,
160 pub json_payload: Option<serde_json::Value>,
161 pub labels: Option<HashMap<String, String>>,
162 pub insert_id: Option<String>,
163}
164impl LogEntry {
165 pub fn new(severity: impl Into<String>, message: impl Into<String>) -> Self {
166 Self {
167 severity: severity.into(),
168 message: message.into(),
169 service_name: None,
170 log_name: None,
171 json_payload: None,
172 labels: None,
173 insert_id: None,
174 }
175 }
176
177 pub fn new_json(severity: impl Into<String>, json_payload: serde_json::Value) -> Self {
181 Self {
182 severity: severity.into(),
183 message: String::new(),
184 service_name: None,
185 log_name: None,
186 json_payload: Some(json_payload),
187 labels: None,
188 insert_id: None,
189 }
190 }
191
192 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
193 self.service_name = Some(service_name.into());
194 self
195 }
196 pub fn with_log_name(mut self, log_name: impl Into<String>) -> Self {
197 self.log_name = Some(log_name.into());
198 self
199 }
200
201 pub fn with_json_payload(mut self, json_payload: serde_json::Value) -> Self {
203 self.json_payload = Some(json_payload);
204 self
205 }
206
207 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
209 self.labels = Some(labels);
210 self
211 }
212
213 pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
215 let labels = self.labels.get_or_insert_with(HashMap::new);
216 labels.insert(key.into(), value.into());
217 self
218 }
219
220 pub fn with_insert_id(mut self, insert_id: impl Into<String>) -> Self {
222 self.insert_id = Some(insert_id.into());
223 self
224 }
225}
226#[async_trait]
227impl Handle for LogEntry {
228 async fn handle(
229 self: Box<Self>,
230 client: &ObservabilityClient,
231 ) -> Result<(), ObservabilityError> {
232 client.send_log_impl(*self).await
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct MetricData {
239 pub metric_type: String,
240 pub value: f64,
241 pub value_type: String,
242 pub metric_kind: String,
243 pub labels: Option<HashMap<String, String>>,
244}
245impl MetricData {
246 pub fn new(
247 metric_type: impl Into<String>,
248 value: f64,
249 value_type: impl Into<String>,
250 metric_kind: impl Into<String>,
251 ) -> Self {
252 Self {
253 metric_type: metric_type.into(),
254 value,
255 value_type: value_type.into(),
256 metric_kind: metric_kind.into(),
257 labels: None,
258 }
259 }
260 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
261 self.labels = Some(labels);
262 self
263 }
264}
265#[async_trait]
266impl Handle for MetricData {
267 async fn handle(
268 self: Box<Self>,
269 client: &ObservabilityClient,
270 ) -> Result<(), ObservabilityError> {
271 client.send_metric_impl(*self).await
272 }
273}
274
275#[derive(Debug, Clone)]
277pub struct TraceSpan {
278 pub trace_id: String,
279 pub span_id: String,
280 pub display_name: String,
281 pub start_time: SystemTime,
282 pub duration: Duration,
283 pub parent_span_id: Option<String>,
284 pub attributes: HashMap<String, String>,
285 pub status: Option<TraceStatus>,
286}
287
288#[derive(Debug, Clone)]
289pub struct TraceStatus {
290 pub code: i32, pub message: Option<String>,
292}
293
294impl TraceSpan {
295 pub fn new(
296 trace_id: impl Into<String>,
297 span_id: impl Into<String>,
298 display_name: impl Into<String>,
299 start_time: SystemTime,
300 duration: Duration,
301 ) -> Self {
302 Self {
303 trace_id: trace_id.into(),
304 span_id: span_id.into(),
305 display_name: display_name.into(),
306 start_time,
307 duration,
308 parent_span_id: None,
309 attributes: HashMap::new(),
310 status: None,
311 }
312 }
313 pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
314 self.parent_span_id = Some(parent_span_id.into());
315 self
316 }
317 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
318 self.attributes.insert(key.into(), value.into());
319 self
320 }
321 pub fn with_status_error(mut self, message: impl Into<String>) -> Self {
322 self.status = Some(TraceStatus {
323 code: 2, message: Some(message.into()),
325 });
326 self
327 }
328 pub fn child(
329 &self,
330 name: impl Into<String>,
331 start_time: SystemTime,
332 duration: Duration,
333 ) -> Self {
334 Self {
335 trace_id: self.trace_id.clone(), span_id: ObservabilityClient::generate_span_id(), parent_span_id: Some(self.span_id.clone()), display_name: name.into(),
339 start_time,
340 duration,
341 attributes: HashMap::new(),
342 status: None,
343 }
344 }
345}
346#[async_trait]
347impl Handle for TraceSpan {
348 async fn handle(
349 self: Box<Self>,
350 client: &ObservabilityClient,
351 ) -> Result<(), ObservabilityError> {
352 client.send_trace_span_impl(*self).await
353 }
354}
355
356#[derive(Debug, Clone, Copy)]
358pub struct SIGTERM;
359#[async_trait]
360impl Handle for SIGTERM {
361 async fn handle(
362 self: Box<Self>,
363 _client: &ObservabilityClient,
364 ) -> Result<(), ObservabilityError> {
365 Err(ObservabilityError::Shutdown)
366 }
367}
368
369#[derive(Clone)]
371pub struct ObservabilityClient {
372 project_id: String,
373 service_account_path: String,
374 service_name: Option<String>,
375 tx: Sender<Box<dyn Handle>>,
376}
377
378impl ObservabilityClient {
379 pub async fn new(
380 project_id: Option<String>,
381 service_name: Option<String>,
382 ) -> Result<Self, ObservabilityError> {
383 let (tx, rx): (Sender<Box<dyn Handle>>, Receiver<Box<dyn Handle>>) = bounded(1027);
384
385 let service_account_path = helpers::gcp_config::credentials_path_from_env()
386 .map_err(|e| ObservabilityError::SetupError(e))?;
387
388 let mut project_id = project_id.unwrap_or_default();
389
390 let mut client = Self {
391 project_id: project_id.clone(),
392 service_account_path,
393 service_name,
394 tx,
395 };
396
397 client.ensure_gcloud_installed().await?;
399
400 if project_id.trim().is_empty() {
401 project_id = helpers::gcp_config::resolve_project_id(None)
402 .await
403 .map_err(|e| ObservabilityError::SetupError(e))?;
404 client.project_id = project_id;
405 }
406
407 client.setup_authentication().await?;
408 client.verify_authentication().await?;
409
410 let client_clone = client.clone();
412 let handle = tokio::runtime::Handle::current();
413 std::thread::spawn(move || {
414 while let Ok(msg) = rx.recv() {
415 let result = handle.block_on(async { msg.handle(&client_clone).await });
416 match result {
417 Ok(()) => {}
418 Err(ObservabilityError::Shutdown) => {
419 break;
420 }
421 Err(_e) => {
422 }
424 }
425 }
426 });
427
428 Ok(client)
429 }
430
431 pub fn send_log(
434 &self,
435 entry: LogEntry,
436 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
437 self.tx.send(Box::new(entry))
438 }
439
440 pub fn send_metric(
441 &self,
442 data: MetricData,
443 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
444 self.tx.send(Box::new(data))
445 }
446
447 pub fn send_trace(
448 &self,
449 span: TraceSpan,
450 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
451 self.tx.send(Box::new(span))
452 }
453
454 pub fn shutdown(&self) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
455 self.tx.send(Box::new(SIGTERM))
456 }
457
458 async fn ensure_gcloud_installed(&self) -> Result<(), ObservabilityError> {
461 let output = tokio::process::Command::new("gcloud")
462 .arg("version")
463 .output()
464 .await;
465 match output {
466 Ok(output) if output.status.success() => Ok(()),
467 _ => self.install_gcloud().await,
468 }
469 }
470
471 async fn install_gcloud(&self) -> Result<(), ObservabilityError> {
472 let install_command = if cfg!(target_os = "macos") {
473 "curl https://sdk.cloud.google.com | bash"
474 } else {
475 "curl https://sdk.cloud.google.com | bash"
476 };
477 let output = tokio::process::Command::new("sh")
478 .arg("-c")
479 .arg(install_command)
480 .output()
481 .await
482 .map_err(|e| {
483 ObservabilityError::SetupError(format!("Failed to install gcloud: {}", e))
484 })?;
485 if !output.status.success() {
486 return Err(ObservabilityError::SetupError(
487 "Failed to install gcloud CLI. Please install manually from https://cloud.google.com/sdk/docs/install".to_string(),
488 ));
489 }
490 Ok(())
491 }
492
493 async fn setup_authentication(&self) -> Result<(), ObservabilityError> {
494 let output = tokio::process::Command::new("gcloud")
495 .args([
496 "auth",
497 "activate-service-account",
498 "--key-file",
499 &self.service_account_path,
500 ])
501 .output()
502 .await
503 .map_err(|e| {
504 ObservabilityError::AuthenticationError(format!("Failed to run gcloud auth: {}", e))
505 })?;
506 if !output.status.success() {
507 let error_msg = String::from_utf8_lossy(&output.stderr);
508 return Err(ObservabilityError::AuthenticationError(format!(
509 "Failed to authenticate with service account: {}",
510 error_msg
511 )));
512 }
513 let project_output = tokio::process::Command::new("gcloud")
514 .args(["config", "set", "project", &self.project_id])
515 .output()
516 .await
517 .map_err(|e| {
518 ObservabilityError::AuthenticationError(format!("Failed to set project: {}", e))
519 })?;
520 if !project_output.status.success() {
521 let error_msg = String::from_utf8_lossy(&project_output.stderr);
522 return Err(ObservabilityError::AuthenticationError(format!(
523 "Failed to set project: {}",
524 error_msg
525 )));
526 }
527 Ok(())
528 }
529
530 async fn verify_authentication(&self) -> Result<(), ObservabilityError> {
531 let output = tokio::process::Command::new("gcloud")
532 .args(["auth", "list", "--format=json"])
533 .output()
534 .await
535 .map_err(|e| {
536 ObservabilityError::AuthenticationError(format!("Failed to verify auth: {}", e))
537 })?;
538 if !output.status.success() {
539 return Err(ObservabilityError::AuthenticationError(
540 "Authentication verification failed".to_string(),
541 ));
542 }
543 Ok(())
544 }
545
546 pub async fn get_identity_token(&self) -> Result<String, ObservabilityError> {
547 match self.get_identity_token_internal().await {
548 Ok(token) => Ok(token),
549 Err(e) => {
550 if e.to_string().contains("not logged in")
551 || e.to_string().contains("authentication")
552 || e.to_string().contains("expired")
553 {
554 self.refresh_authentication().await?;
555 self.get_identity_token_internal().await
556 } else {
557 Err(e)
558 }
559 }
560 }
561 }
562
563 async fn get_identity_token_internal(&self) -> Result<String, ObservabilityError> {
564 let output = tokio::process::Command::new("gcloud")
565 .args(["auth", "print-identity-token"])
566 .output()
567 .await
568 .map_err(|e| {
569 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
570 })?;
571 if !output.status.success() {
572 let error_msg = String::from_utf8_lossy(&output.stderr);
573 return Err(ObservabilityError::AuthenticationError(format!(
574 "Failed to get identity token: {}",
575 error_msg
576 )));
577 }
578 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
579 }
580
581 async fn get_access_token_with_retry(&self) -> Result<String, ObservabilityError> {
582 match self.get_access_token().await {
583 Ok(token) => Ok(token),
584 Err(e) => {
585 if e.to_string().contains("not logged in")
586 || e.to_string().contains("authentication")
587 || e.to_string().contains("expired")
588 {
589 self.refresh_authentication().await?;
590 self.get_access_token().await
591 } else {
592 Err(e)
593 }
594 }
595 }
596 }
597
598 async fn get_access_token(&self) -> Result<String, ObservabilityError> {
599 let output = tokio::process::Command::new("gcloud")
600 .args(["auth", "print-access-token"])
601 .output()
602 .await
603 .map_err(|e| {
604 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
605 })?;
606 if !output.status.success() {
607 let error_msg = String::from_utf8_lossy(&output.stderr);
608 return Err(ObservabilityError::AuthenticationError(format!(
609 "Failed to get access token: {}",
610 error_msg
611 )));
612 }
613 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
614 }
615
616 async fn refresh_authentication(&self) -> Result<(), ObservabilityError> {
617 let output = tokio::process::Command::new("gcloud")
618 .args([
619 "auth",
620 "activate-service-account",
621 "--key-file",
622 &self.service_account_path,
623 ])
624 .output()
625 .await
626 .map_err(|e| {
627 ObservabilityError::AuthenticationError(format!("Failed to refresh auth: {}", e))
628 })?;
629 if !output.status.success() {
630 let error_msg = String::from_utf8_lossy(&output.stderr);
631 return Err(ObservabilityError::AuthenticationError(format!(
632 "Failed to refresh authentication: {}",
633 error_msg
634 )));
635 }
636 Ok(())
637 }
638
639 async fn execute_api_request(
640 &self,
641 api_url: &str,
642 payload: &str,
643 operation_name: &str,
644 ) -> Result<(), ObservabilityError> {
645 let mut retries = 0;
646 const MAX_RETRIES: u32 = 2;
647
648 loop {
649 let access_token = self.get_access_token_with_retry().await?;
650 let output = tokio::process::Command::new("curl")
651 .args([
652 "-X",
653 "POST",
654 api_url,
655 "-H",
656 "Content-Type: application/json",
657 "-H",
658 &format!("Authorization: Bearer {}", access_token),
659 "-d",
660 payload,
661 "-s",
662 "-w",
663 "%{http_code}",
664 ])
665 .output()
666 .await
667 .map_err(|e| {
668 ObservabilityError::ApiError(format!(
669 "Failed to execute {} request: {}",
670 operation_name, e
671 ))
672 })?;
673
674 let response_body = String::from_utf8_lossy(&output.stdout);
675 let status_code = response_body
676 .chars()
677 .rev()
678 .take(3)
679 .collect::<String>()
680 .chars()
681 .rev()
682 .collect::<String>();
683
684 if output.status.success() && (status_code.starts_with("20") || status_code == "200") {
685 return Ok(());
686 }
687
688 let error_msg = String::from_utf8_lossy(&output.stderr);
689 if (status_code == "401" || status_code == "403") && retries < MAX_RETRIES {
690 retries += 1;
691 self.refresh_authentication().await?;
692 continue;
693 }
694
695 return Err(ObservabilityError::ApiError(format!(
696 "{} API call failed with status {}: {} - Response: {}",
697 operation_name, status_code, error_msg, response_body
698 )));
699 }
700 }
701
702 async fn send_log_impl(&self, log_entry: LogEntry) -> Result<(), ObservabilityError> {
705 let now = SystemTime::now();
706 let timestamp = DateTime::<Utc>::from(now).to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
707
708 let resolved_service_name = log_entry.service_name.or(self.service_name.clone());
710
711 let log_name = log_entry
714 .log_name
715 .or_else(|| resolved_service_name.clone())
716 .unwrap_or_else(|| "default".to_string());
717
718 let log_name_encoded = urlencoding::encode(&log_name);
720
721 let mut labels = log_entry.labels.unwrap_or_default();
723 if let Some(service) = resolved_service_name {
724 labels.entry("service_name".to_string()).or_insert_with(|| service.clone());
726 labels.entry("service".to_string()).or_insert(service);
727 }
728
729 let insert_id = log_entry.insert_id.unwrap_or_else(|| Uuid::new_v4().to_string());
730
731 let mut entry = json!({
732 "logName": format!("projects/{}/logs/{}", self.project_id, log_name_encoded),
733 "resource": {
734 "type": "global",
735 "labels": { "project_id": self.project_id }
736 },
737 "timestamp": timestamp,
738 "severity": log_entry.severity,
739 "labels": labels,
740 "insertId": insert_id,
741 });
742
743 if let Some(json_payload) = log_entry.json_payload {
745 entry["jsonPayload"] = json_payload;
746 } else {
747 entry["textPayload"] = json!(log_entry.message);
748 }
749
750 let log_entry_json = json!({ "entries": [entry] });
751 let api_url = "https://logging.googleapis.com/v2/entries:write";
752 self.execute_api_request(api_url, &log_entry_json.to_string(), "Logging")
753 .await?;
754 Ok(())
755 }
756
757 async fn send_metric_impl(&self, metric_data: MetricData) -> Result<(), ObservabilityError> {
758 let timestamp = SystemTime::now();
759 let timestamp_str = DateTime::<Utc>::from(timestamp)
760 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
761 .to_string();
762
763 let time_series = json!({
764 "timeSeries": [{
765 "metric": {
766 "type": metric_data.metric_type,
767 "labels": metric_data.labels.unwrap_or_default()
768 },
769 "resource": { "type": "global", "labels": {} },
770 "points": [{
771 "interval": { "endTime": timestamp_str },
772 "value": {
773 &format!("{}Value", metric_data.value_type.to_lowercase()): metric_data.value
774 }
775 }]
776 }]
777 });
778 let api_url = &format!(
779 "https://monitoring.googleapis.com/v3/projects/{}/timeSeries",
780 self.project_id
781 );
782 self.execute_api_request(api_url, &time_series.to_string(), "Monitoring")
783 .await?;
784 Ok(())
785 }
786
787 async fn send_trace_span_impl(&self, trace_span: TraceSpan) -> Result<(), ObservabilityError> {
788 let start_timestamp = DateTime::<Utc>::from(trace_span.start_time);
789 let end_time = trace_span.start_time + trace_span.duration;
790 let end_timestamp = DateTime::<Utc>::from(end_time);
791
792 let mut attributes_json = json!({});
793 if !trace_span.attributes.is_empty() {
794 let mut attribute_map = serde_json::Map::new();
795 for (k, v) in trace_span.attributes {
796 attribute_map.insert(k, json!({ "string_value": { "value": v } }));
797 }
798 attributes_json = json!({ "attributeMap": attribute_map });
799 }
800
801 let mut span = json!({
802 "name": format!("projects/{}/traces/{}/spans/{}", self.project_id, trace_span.trace_id, trace_span.span_id),
803 "spanId": trace_span.span_id,
804 "displayName": { "value": trace_span.display_name },
805 "startTime": start_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
806 "endTime": end_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
807 "attributes": attributes_json
808 });
809
810 if let Some(parent_id) = &trace_span.parent_span_id {
811 span["parentSpanId"] = json!(parent_id);
812 }
813
814 if let Some(status) = &trace_span.status {
815 span["status"] = json!({
816 "code": status.code,
817 "message": status.message
818 });
819 }
820
821 let spans_payload = json!({ "spans": [span] });
822 let api_url = &format!(
823 "https://cloudtrace.googleapis.com/v2/projects/{}/traces:batchWrite",
824 self.project_id
825 );
826 self.execute_api_request(api_url, &spans_payload.to_string(), "Tracing")
827 .await?;
828 Ok(())
829 }
830
831 pub fn generate_trace_id() -> String {
833 format!("{:032x}", Uuid::new_v4().as_u128())
834 }
835 pub fn generate_span_id() -> String {
836 format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
837 }
838}