1pub mod helpers;
110pub mod pubsub;
111
112use async_trait::async_trait;
113use chrono::{DateTime, Utc};
114use crossbeam::channel::{bounded, Receiver, Sender};
115use serde_json::json;
116use std::collections::HashMap;
117use std::time::{Duration, SystemTime, UNIX_EPOCH};
118use uuid::Uuid;
119
120#[derive(Debug)]
122pub enum ObservabilityError {
123 AuthenticationError(String),
124 ApiError(String),
125 SetupError(String),
126 Shutdown,
128}
129
130impl std::fmt::Display for ObservabilityError {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 ObservabilityError::AuthenticationError(msg) => {
134 write!(f, "Authentication error: {}", msg)
135 }
136 ObservabilityError::ApiError(msg) => write!(f, "API error: {}", msg),
137 ObservabilityError::SetupError(msg) => write!(f, "Setup error: {}", msg),
138 ObservabilityError::Shutdown => write!(f, "Shutdown requested"),
139 }
140 }
141}
142impl std::error::Error for ObservabilityError {}
143
144#[async_trait]
146pub trait Handle: Send {
147 async fn handle(
148 self: Box<Self>,
149 client: &ObservabilityClient,
150 ) -> Result<(), ObservabilityError>;
151}
152
153#[derive(Debug, Clone)]
155pub struct LogEntry {
156 pub severity: String,
157 pub message: String,
158 pub service_name: Option<String>,
159 pub log_name: Option<String>,
160}
161impl LogEntry {
162 pub fn new(severity: impl Into<String>, message: impl Into<String>) -> Self {
163 Self {
164 severity: severity.into(),
165 message: message.into(),
166 service_name: None,
167 log_name: None,
168 }
169 }
170 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
171 self.service_name = Some(service_name.into());
172 self
173 }
174 pub fn with_log_name(mut self, log_name: impl Into<String>) -> Self {
175 self.log_name = Some(log_name.into());
176 self
177 }
178}
179#[async_trait]
180impl Handle for LogEntry {
181 async fn handle(
182 self: Box<Self>,
183 client: &ObservabilityClient,
184 ) -> Result<(), ObservabilityError> {
185 client.send_log_impl(*self).await
186 }
187}
188
189#[derive(Debug, Clone)]
191pub struct MetricData {
192 pub metric_type: String,
193 pub value: f64,
194 pub value_type: String,
195 pub metric_kind: String,
196 pub labels: Option<HashMap<String, String>>,
197}
198impl MetricData {
199 pub fn new(
200 metric_type: impl Into<String>,
201 value: f64,
202 value_type: impl Into<String>,
203 metric_kind: impl Into<String>,
204 ) -> Self {
205 Self {
206 metric_type: metric_type.into(),
207 value,
208 value_type: value_type.into(),
209 metric_kind: metric_kind.into(),
210 labels: None,
211 }
212 }
213 pub fn with_labels(mut self, labels: HashMap<String, String>) -> Self {
214 self.labels = Some(labels);
215 self
216 }
217}
218#[async_trait]
219impl Handle for MetricData {
220 async fn handle(
221 self: Box<Self>,
222 client: &ObservabilityClient,
223 ) -> Result<(), ObservabilityError> {
224 client.send_metric_impl(*self).await
225 }
226}
227
228#[derive(Debug, Clone)]
230pub struct TraceSpan {
231 pub trace_id: String,
232 pub span_id: String,
233 pub display_name: String,
234 pub start_time: SystemTime,
235 pub duration: Duration,
236 pub parent_span_id: Option<String>,
237 pub attributes: HashMap<String, String>,
238 pub status: Option<TraceStatus>,
239}
240
241#[derive(Debug, Clone)]
242pub struct TraceStatus {
243 pub code: i32, pub message: Option<String>,
245}
246
247impl TraceSpan {
248 pub fn new(
249 trace_id: impl Into<String>,
250 span_id: impl Into<String>,
251 display_name: impl Into<String>,
252 start_time: SystemTime,
253 duration: Duration,
254 ) -> Self {
255 Self {
256 trace_id: trace_id.into(),
257 span_id: span_id.into(),
258 display_name: display_name.into(),
259 start_time,
260 duration,
261 parent_span_id: None,
262 attributes: HashMap::new(),
263 status: None,
264 }
265 }
266 pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
267 self.parent_span_id = Some(parent_span_id.into());
268 self
269 }
270 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
271 self.attributes.insert(key.into(), value.into());
272 self
273 }
274 pub fn with_status_error(mut self, message: impl Into<String>) -> Self {
275 self.status = Some(TraceStatus {
276 code: 2, message: Some(message.into()),
278 });
279 self
280 }
281 pub fn child(
282 &self,
283 name: impl Into<String>,
284 start_time: SystemTime,
285 duration: Duration,
286 ) -> Self {
287 Self {
288 trace_id: self.trace_id.clone(), span_id: ObservabilityClient::generate_span_id(), parent_span_id: Some(self.span_id.clone()), display_name: name.into(),
292 start_time,
293 duration,
294 attributes: HashMap::new(),
295 status: None,
296 }
297 }
298}
299#[async_trait]
300impl Handle for TraceSpan {
301 async fn handle(
302 self: Box<Self>,
303 client: &ObservabilityClient,
304 ) -> Result<(), ObservabilityError> {
305 client.send_trace_span_impl(*self).await
306 }
307}
308
309#[derive(Debug, Clone, Copy)]
311pub struct SIGTERM;
312#[async_trait]
313impl Handle for SIGTERM {
314 async fn handle(
315 self: Box<Self>,
316 _client: &ObservabilityClient,
317 ) -> Result<(), ObservabilityError> {
318 Err(ObservabilityError::Shutdown)
319 }
320}
321
322#[derive(Clone)]
324pub struct ObservabilityClient {
325 project_id: String,
326 service_account_path: String,
327 service_name: Option<String>,
328 tx: Sender<Box<dyn Handle>>,
329}
330
331impl ObservabilityClient {
332 pub async fn new(
333 project_id: Option<String>,
334 service_name: Option<String>,
335 ) -> Result<Self, ObservabilityError> {
336 let (tx, rx): (Sender<Box<dyn Handle>>, Receiver<Box<dyn Handle>>) = bounded(1027);
337
338 let service_account_path = helpers::gcp_config::credentials_path_from_env()
339 .map_err(|e| ObservabilityError::SetupError(e))?;
340
341 let mut project_id = project_id.unwrap_or_default();
342
343 let mut client = Self {
344 project_id: project_id.clone(),
345 service_account_path,
346 service_name,
347 tx,
348 };
349
350 client.ensure_gcloud_installed().await?;
352
353 if project_id.trim().is_empty() {
354 project_id = helpers::gcp_config::resolve_project_id(None)
355 .await
356 .map_err(|e| ObservabilityError::SetupError(e))?;
357 client.project_id = project_id;
358 }
359
360 client.setup_authentication().await?;
361 client.verify_authentication().await?;
362
363 let client_clone = client.clone();
365 let handle = tokio::runtime::Handle::current();
366 std::thread::spawn(move || {
367 while let Ok(msg) = rx.recv() {
368 let result = handle.block_on(async { msg.handle(&client_clone).await });
369 match result {
370 Ok(()) => {}
371 Err(ObservabilityError::Shutdown) => {
372 break;
373 }
374 Err(_e) => {
375 }
377 }
378 }
379 });
380
381 Ok(client)
382 }
383
384 pub fn send_log(
387 &self,
388 entry: LogEntry,
389 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
390 self.tx.send(Box::new(entry))
391 }
392
393 pub fn send_metric(
394 &self,
395 data: MetricData,
396 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
397 self.tx.send(Box::new(data))
398 }
399
400 pub fn send_trace(
401 &self,
402 span: TraceSpan,
403 ) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
404 self.tx.send(Box::new(span))
405 }
406
407 pub fn shutdown(&self) -> Result<(), crossbeam::channel::SendError<Box<dyn Handle>>> {
408 self.tx.send(Box::new(SIGTERM))
409 }
410
411 async fn ensure_gcloud_installed(&self) -> Result<(), ObservabilityError> {
414 let output = tokio::process::Command::new("gcloud")
415 .arg("version")
416 .output()
417 .await;
418 match output {
419 Ok(output) if output.status.success() => Ok(()),
420 _ => self.install_gcloud().await,
421 }
422 }
423
424 async fn install_gcloud(&self) -> Result<(), ObservabilityError> {
425 let install_command = if cfg!(target_os = "macos") {
426 "curl https://sdk.cloud.google.com | bash"
427 } else {
428 "curl https://sdk.cloud.google.com | bash"
429 };
430 let output = tokio::process::Command::new("sh")
431 .arg("-c")
432 .arg(install_command)
433 .output()
434 .await
435 .map_err(|e| {
436 ObservabilityError::SetupError(format!("Failed to install gcloud: {}", e))
437 })?;
438 if !output.status.success() {
439 return Err(ObservabilityError::SetupError(
440 "Failed to install gcloud CLI. Please install manually from https://cloud.google.com/sdk/docs/install".to_string(),
441 ));
442 }
443 Ok(())
444 }
445
446 async fn setup_authentication(&self) -> Result<(), ObservabilityError> {
447 let output = tokio::process::Command::new("gcloud")
448 .args([
449 "auth",
450 "activate-service-account",
451 "--key-file",
452 &self.service_account_path,
453 ])
454 .output()
455 .await
456 .map_err(|e| {
457 ObservabilityError::AuthenticationError(format!("Failed to run gcloud auth: {}", e))
458 })?;
459 if !output.status.success() {
460 let error_msg = String::from_utf8_lossy(&output.stderr);
461 return Err(ObservabilityError::AuthenticationError(format!(
462 "Failed to authenticate with service account: {}",
463 error_msg
464 )));
465 }
466 let project_output = tokio::process::Command::new("gcloud")
467 .args(["config", "set", "project", &self.project_id])
468 .output()
469 .await
470 .map_err(|e| {
471 ObservabilityError::AuthenticationError(format!("Failed to set project: {}", e))
472 })?;
473 if !project_output.status.success() {
474 let error_msg = String::from_utf8_lossy(&project_output.stderr);
475 return Err(ObservabilityError::AuthenticationError(format!(
476 "Failed to set project: {}",
477 error_msg
478 )));
479 }
480 Ok(())
481 }
482
483 async fn verify_authentication(&self) -> Result<(), ObservabilityError> {
484 let output = tokio::process::Command::new("gcloud")
485 .args(["auth", "list", "--format=json"])
486 .output()
487 .await
488 .map_err(|e| {
489 ObservabilityError::AuthenticationError(format!("Failed to verify auth: {}", e))
490 })?;
491 if !output.status.success() {
492 return Err(ObservabilityError::AuthenticationError(
493 "Authentication verification failed".to_string(),
494 ));
495 }
496 Ok(())
497 }
498
499 pub async fn get_identity_token(&self) -> Result<String, ObservabilityError> {
500 match self.get_identity_token_internal().await {
501 Ok(token) => Ok(token),
502 Err(e) => {
503 if e.to_string().contains("not logged in")
504 || e.to_string().contains("authentication")
505 || e.to_string().contains("expired")
506 {
507 self.refresh_authentication().await?;
508 self.get_identity_token_internal().await
509 } else {
510 Err(e)
511 }
512 }
513 }
514 }
515
516 async fn get_identity_token_internal(&self) -> Result<String, ObservabilityError> {
517 let output = tokio::process::Command::new("gcloud")
518 .args(["auth", "print-identity-token"])
519 .output()
520 .await
521 .map_err(|e| {
522 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
523 })?;
524 if !output.status.success() {
525 let error_msg = String::from_utf8_lossy(&output.stderr);
526 return Err(ObservabilityError::AuthenticationError(format!(
527 "Failed to get identity token: {}",
528 error_msg
529 )));
530 }
531 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
532 }
533
534 async fn get_access_token_with_retry(&self) -> Result<String, ObservabilityError> {
535 match self.get_access_token().await {
536 Ok(token) => Ok(token),
537 Err(e) => {
538 if e.to_string().contains("not logged in")
539 || e.to_string().contains("authentication")
540 || e.to_string().contains("expired")
541 {
542 self.refresh_authentication().await?;
543 self.get_access_token().await
544 } else {
545 Err(e)
546 }
547 }
548 }
549 }
550
551 async fn get_access_token(&self) -> Result<String, ObservabilityError> {
552 let output = tokio::process::Command::new("gcloud")
553 .args(["auth", "print-access-token"])
554 .output()
555 .await
556 .map_err(|e| {
557 ObservabilityError::ApiError(format!("Failed to run gcloud command: {}", e))
558 })?;
559 if !output.status.success() {
560 let error_msg = String::from_utf8_lossy(&output.stderr);
561 return Err(ObservabilityError::AuthenticationError(format!(
562 "Failed to get access token: {}",
563 error_msg
564 )));
565 }
566 Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
567 }
568
569 async fn refresh_authentication(&self) -> Result<(), ObservabilityError> {
570 let output = tokio::process::Command::new("gcloud")
571 .args([
572 "auth",
573 "activate-service-account",
574 "--key-file",
575 &self.service_account_path,
576 ])
577 .output()
578 .await
579 .map_err(|e| {
580 ObservabilityError::AuthenticationError(format!("Failed to refresh auth: {}", e))
581 })?;
582 if !output.status.success() {
583 let error_msg = String::from_utf8_lossy(&output.stderr);
584 return Err(ObservabilityError::AuthenticationError(format!(
585 "Failed to refresh authentication: {}",
586 error_msg
587 )));
588 }
589 Ok(())
590 }
591
592 async fn execute_api_request(
593 &self,
594 api_url: &str,
595 payload: &str,
596 operation_name: &str,
597 ) -> Result<(), ObservabilityError> {
598 let mut retries = 0;
599 const MAX_RETRIES: u32 = 2;
600
601 loop {
602 let access_token = self.get_access_token_with_retry().await?;
603 let output = tokio::process::Command::new("curl")
604 .args([
605 "-X",
606 "POST",
607 api_url,
608 "-H",
609 "Content-Type: application/json",
610 "-H",
611 &format!("Authorization: Bearer {}", access_token),
612 "-d",
613 payload,
614 "-s",
615 "-w",
616 "%{http_code}",
617 ])
618 .output()
619 .await
620 .map_err(|e| {
621 ObservabilityError::ApiError(format!(
622 "Failed to execute {} request: {}",
623 operation_name, e
624 ))
625 })?;
626
627 let response_body = String::from_utf8_lossy(&output.stdout);
628 let status_code = response_body
629 .chars()
630 .rev()
631 .take(3)
632 .collect::<String>()
633 .chars()
634 .rev()
635 .collect::<String>();
636
637 if output.status.success() && (status_code.starts_with("20") || status_code == "200") {
638 return Ok(());
639 }
640
641 let error_msg = String::from_utf8_lossy(&output.stderr);
642 if (status_code == "401" || status_code == "403") && retries < MAX_RETRIES {
643 retries += 1;
644 self.refresh_authentication().await?;
645 continue;
646 }
647
648 return Err(ObservabilityError::ApiError(format!(
649 "{} API call failed with status {}: {} - Response: {}",
650 operation_name, status_code, error_msg, response_body
651 )));
652 }
653 }
654
655 async fn send_log_impl(&self, log_entry: LogEntry) -> Result<(), ObservabilityError> {
658 let timestamp = SystemTime::now()
659 .duration_since(UNIX_EPOCH)
660 .unwrap()
661 .as_secs();
662 let mut labels = HashMap::new();
663
664 if let Some(service) = log_entry.service_name.or(self.service_name.clone()) {
666 labels.insert("service_name".to_string(), service);
667 }
668
669 let log_name = log_entry
670 .log_name
671 .clone()
672 .unwrap_or_else(|| "gcp-observability-rs".to_string());
673
674 let log_entry_json = json!({
675 "entries": [{
676 "logName": format!("projects/{}/logs/{}", self.project_id, log_name),
677 "resource": { "type": "global" },
678 "timestamp": DateTime::<Utc>::from(UNIX_EPOCH + std::time::Duration::from_secs(timestamp))
679 .format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
680 "severity": log_entry.severity,
681 "textPayload": log_entry.message,
682 "labels": labels
683 }]
684 });
685 let api_url = "https://logging.googleapis.com/v2/entries:write";
686 self.execute_api_request(api_url, &log_entry_json.to_string(), "Logging")
687 .await?;
688 Ok(())
689 }
690
691 async fn send_metric_impl(&self, metric_data: MetricData) -> Result<(), ObservabilityError> {
692 let timestamp = SystemTime::now();
693 let timestamp_str = DateTime::<Utc>::from(timestamp)
694 .format("%Y-%m-%dT%H:%M:%S%.3fZ")
695 .to_string();
696
697 let time_series = json!({
698 "timeSeries": [{
699 "metric": {
700 "type": metric_data.metric_type,
701 "labels": metric_data.labels.unwrap_or_default()
702 },
703 "resource": { "type": "global", "labels": {} },
704 "points": [{
705 "interval": { "endTime": timestamp_str },
706 "value": {
707 &format!("{}Value", metric_data.value_type.to_lowercase()): metric_data.value
708 }
709 }]
710 }]
711 });
712 let api_url = &format!(
713 "https://monitoring.googleapis.com/v3/projects/{}/timeSeries",
714 self.project_id
715 );
716 self.execute_api_request(api_url, &time_series.to_string(), "Monitoring")
717 .await?;
718 Ok(())
719 }
720
721 async fn send_trace_span_impl(&self, trace_span: TraceSpan) -> Result<(), ObservabilityError> {
722 let start_timestamp = DateTime::<Utc>::from(trace_span.start_time);
723 let end_time = trace_span.start_time + trace_span.duration;
724 let end_timestamp = DateTime::<Utc>::from(end_time);
725
726 let mut attributes_json = json!({});
727 if !trace_span.attributes.is_empty() {
728 let mut attribute_map = serde_json::Map::new();
729 for (k, v) in trace_span.attributes {
730 attribute_map.insert(k, json!({ "string_value": { "value": v } }));
731 }
732 attributes_json = json!({ "attributeMap": attribute_map });
733 }
734
735 let mut span = json!({
736 "name": format!("projects/{}/traces/{}/spans/{}", self.project_id, trace_span.trace_id, trace_span.span_id),
737 "spanId": trace_span.span_id,
738 "displayName": { "value": trace_span.display_name },
739 "startTime": start_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
740 "endTime": end_timestamp.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(),
741 "attributes": attributes_json
742 });
743
744 if let Some(parent_id) = &trace_span.parent_span_id {
745 span["parentSpanId"] = json!(parent_id);
746 }
747
748 if let Some(status) = &trace_span.status {
749 span["status"] = json!({
750 "code": status.code,
751 "message": status.message
752 });
753 }
754
755 let spans_payload = json!({ "spans": [span] });
756 let api_url = &format!(
757 "https://cloudtrace.googleapis.com/v2/projects/{}/traces:batchWrite",
758 self.project_id
759 );
760 self.execute_api_request(api_url, &spans_payload.to_string(), "Tracing")
761 .await?;
762 Ok(())
763 }
764
765 pub fn generate_trace_id() -> String {
767 format!("{:032x}", Uuid::new_v4().as_u128())
768 }
769 pub fn generate_span_id() -> String {
770 format!("{:016x}", Uuid::new_v4().as_u128() & 0xFFFFFFFFFFFFFFFF)
771 }
772}