use std::fmt;
use uuid::Uuid;
#[cfg(feature = "tracing")]
use opentelemetry::{KeyValue, global, trace::TracerProvider};
#[cfg(feature = "tracing")]
use opentelemetry_otlp::WithExportConfig;
#[cfg(feature = "tracing")]
use tracing_opentelemetry::OpenTelemetryLayer;
#[cfg(feature = "tracing")]
use crate::Result;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TraceId(String);
impl TraceId {
pub fn new() -> Self {
Self(format!("trace-{}", Uuid::new_v4()))
}
pub fn from_string(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn into_string(self) -> String {
self.0
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for TraceId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for TraceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for TraceId {
fn from(id: String) -> Self {
Self(id)
}
}
impl From<&str> for TraceId {
fn from(id: &str) -> Self {
Self(id.to_string())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CorrelationId(String);
impl CorrelationId {
pub fn new() -> Self {
Self(format!("corr-{}", Uuid::new_v4()))
}
pub fn from_string(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn into_string(self) -> String {
self.0
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for CorrelationId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for CorrelationId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for CorrelationId {
fn from(id: String) -> Self {
Self(id)
}
}
impl From<&str> for CorrelationId {
fn from(id: &str) -> Self {
Self(id.to_string())
}
}
#[cfg(feature = "tracing")]
#[derive(Debug, Clone)]
pub struct TracingConfig {
pub service_name: String,
pub service_version: Option<String>,
pub environment: Option<String>,
pub otlp_endpoint: Option<String>,
pub resource_attributes: Vec<(String, String)>,
pub console_exporter: bool,
}
#[cfg(feature = "tracing")]
impl TracingConfig {
pub fn new() -> Self {
Self {
service_name: "hammerwork".to_string(),
service_version: None,
environment: None,
otlp_endpoint: None,
resource_attributes: Vec::new(),
console_exporter: false,
}
}
pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
self.service_name = name.into();
self
}
pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
self.service_version = Some(version.into());
self
}
pub fn with_environment(mut self, environment: impl Into<String>) -> Self {
self.environment = Some(environment.into());
self
}
pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.otlp_endpoint = Some(endpoint.into());
self
}
pub fn with_resource_attribute(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.resource_attributes.push((key.into(), value.into()));
self
}
pub fn with_console_exporter(mut self, enabled: bool) -> Self {
self.console_exporter = enabled;
self
}
}
#[cfg(feature = "tracing")]
impl Default for TracingConfig {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "tracing")]
pub async fn init_tracing(config: TracingConfig) -> Result<()> {
use opentelemetry_sdk::Resource;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
let mut resource = Resource::new(vec![KeyValue::new(
"service.name",
config.service_name.clone(),
)]);
if let Some(version) = &config.service_version {
resource = resource.merge(&Resource::new(vec![KeyValue::new(
"service.version",
version.clone(),
)]));
}
if let Some(environment) = &config.environment {
resource = resource.merge(&Resource::new(vec![KeyValue::new(
"deployment.environment",
environment.clone(),
)]));
}
for (key, value) in &config.resource_attributes {
resource = resource.merge(&Resource::new(vec![KeyValue::new(
key.clone(),
value.clone(),
)]));
}
let tracer_provider = if let Some(endpoint) = &config.otlp_endpoint {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint.clone())
.build_span_exporter()
.map_err(|e| crate::HammerworkError::Tracing {
message: format!("Failed to build OTLP span exporter: {}", e),
})?;
let span_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder(
exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
opentelemetry_sdk::trace::TracerProvider::builder()
.with_config(
opentelemetry_sdk::trace::config()
.with_resource(resource)
.with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn),
)
.with_span_processor(span_processor)
.build()
} else {
opentelemetry_sdk::trace::TracerProvider::builder()
.with_config(
opentelemetry_sdk::trace::config()
.with_resource(resource)
.with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOff),
)
.build()
};
global::set_tracer_provider(tracer_provider.clone());
let telemetry_layer = OpenTelemetryLayer::new(tracer_provider.tracer("hammerwork"));
let subscriber = tracing_subscriber::registry().with(telemetry_layer);
if config.console_exporter {
subscriber
.with(tracing_subscriber::fmt::layer())
.try_init()
.map_err(|e| crate::HammerworkError::Tracing {
message: format!("Failed to initialize tracing subscriber: {}", e),
})?;
} else {
subscriber
.try_init()
.map_err(|e| crate::HammerworkError::Tracing {
message: format!("Failed to initialize tracing subscriber: {}", e),
})?;
}
Ok(())
}
#[cfg(feature = "tracing")]
pub async fn shutdown_tracing() {
global::shutdown_tracer_provider();
}
#[cfg(feature = "tracing")]
pub fn create_job_span(job: &crate::Job, operation_name: &'static str) -> tracing::Span {
use tracing_opentelemetry::OpenTelemetrySpanExt;
let span = tracing::info_span!(
"job.process",
operation_name = operation_name,
job.id = %job.id,
job.queue_name = %job.queue_name,
job.priority = ?job.priority,
job.status = ?job.status,
job.attempts = job.attempts,
job.trace_id = job.trace_id.as_deref().unwrap_or(""),
job.correlation_id = job.correlation_id.as_deref().unwrap_or(""),
job.parent_span_id = job.parent_span_id.as_deref().unwrap_or(""),
otel.kind = "consumer",
otel.name = operation_name,
);
if let Some(trace_id_str) = &job.trace_id {
if let Some(_span_context_str) = &job.span_context {
span.set_attribute("trace.id", trace_id_str.clone());
}
}
if let Some(correlation_id) = &job.correlation_id {
span.set_attribute("correlation.id", correlation_id.clone());
}
if let Some(parent_span_id) = &job.parent_span_id {
span.set_attribute("parent.span.id", parent_span_id.clone());
}
span.set_attribute("job.scheduled_at", job.scheduled_at.to_rfc3339());
if let Some(cron_schedule) = &job.cron_schedule {
span.set_attribute("job.cron_schedule", cron_schedule.clone());
}
if job.is_recurring() {
span.set_attribute("job.recurring", true);
}
span
}
#[cfg(feature = "tracing")]
pub fn set_job_trace_context(job: &mut crate::Job, span: &tracing::Span) {
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;
let otel_context = span.context();
let otel_span = otel_context.span();
let span_context = otel_span.span_context();
if span_context.is_valid() {
let trace_id = span_context.trace_id().to_string();
job.trace_id = Some(trace_id);
let span_id = span_context.span_id().to_string();
job.parent_span_id = Some(span_id);
job.span_context = Some(format!(
"trace_id={};span_id={};trace_flags={:?}",
span_context.trace_id(),
span_context.span_id(),
span_context.trace_flags()
));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_trace_id_new() {
let trace_id = TraceId::new();
assert!(!trace_id.to_string().is_empty());
assert!(trace_id.to_string().starts_with("trace-"));
}
#[test]
fn test_trace_id_from_string() {
let trace_id = TraceId::from_string("custom-trace-123");
assert_eq!(trace_id.to_string(), "custom-trace-123");
assert_eq!(trace_id.as_str(), "custom-trace-123");
}
#[test]
fn test_trace_id_display() {
let trace_id = TraceId::from_string("test-trace");
assert_eq!(format!("{}", trace_id), "test-trace");
}
#[test]
fn test_trace_id_from_str() {
let trace_id: TraceId = "test-trace".into();
assert_eq!(trace_id.to_string(), "test-trace");
}
#[test]
fn test_correlation_id_new() {
let correlation_id = CorrelationId::new();
assert!(!correlation_id.to_string().is_empty());
assert!(correlation_id.to_string().starts_with("corr-"));
}
#[test]
fn test_correlation_id_from_string() {
let correlation_id = CorrelationId::from_string("order-12345");
assert_eq!(correlation_id.to_string(), "order-12345");
assert_eq!(correlation_id.as_str(), "order-12345");
}
#[test]
fn test_correlation_id_display() {
let correlation_id = CorrelationId::from_string("test-correlation");
assert_eq!(format!("{}", correlation_id), "test-correlation");
}
#[test]
fn test_correlation_id_from_str() {
let correlation_id: CorrelationId = "test-correlation".into();
assert_eq!(correlation_id.to_string(), "test-correlation");
}
#[cfg(feature = "tracing")]
#[test]
fn test_tracing_config_builder() {
let config = TracingConfig::new()
.with_service_name("test-service")
.with_service_version("1.0.0")
.with_environment("test")
.with_otlp_endpoint("http://localhost:4317")
.with_resource_attribute("custom.key", "custom.value")
.with_console_exporter(true);
assert_eq!(config.service_name, "test-service");
assert_eq!(config.service_version, Some("1.0.0".to_string()));
assert_eq!(config.environment, Some("test".to_string()));
assert_eq!(
config.otlp_endpoint,
Some("http://localhost:4317".to_string())
);
assert_eq!(config.resource_attributes.len(), 1);
assert_eq!(
config.resource_attributes[0],
("custom.key".to_string(), "custom.value".to_string())
);
assert!(config.console_exporter);
}
#[cfg(feature = "tracing")]
#[test]
fn test_tracing_config_default() {
let config = TracingConfig::default();
assert_eq!(config.service_name, "hammerwork");
assert_eq!(config.service_version, None);
assert_eq!(config.environment, None);
assert_eq!(config.otlp_endpoint, None);
assert_eq!(config.resource_attributes.len(), 0);
assert!(!config.console_exporter);
}
#[cfg(feature = "tracing")]
#[test]
fn test_create_job_span() {
use crate::{Job, JobPriority};
use serde_json::json;
let job = Job::new("test_queue".to_string(), json!({"test": "data"}))
.with_trace_id("trace-123")
.with_correlation_id("corr-456")
.with_parent_span_id("parent-789")
.with_priority(JobPriority::High);
let span = create_job_span(&job, "job.test");
assert_eq!(
span.metadata().expect("span should have metadata").name(),
"job.process"
);
}
#[cfg(feature = "tracing")]
#[test]
fn test_create_job_span_with_minimal_job() {
use crate::Job;
use serde_json::json;
let job = Job::new("minimal_queue".to_string(), json!({"data": "test"}));
let span = create_job_span(&job, "job.minimal");
assert_eq!(
span.metadata().expect("span should have metadata").name(),
"job.process"
);
}
#[cfg(feature = "tracing")]
#[test]
fn test_create_job_span_with_cron_job() {
use crate::Job;
use serde_json::json;
let mut job = Job::new("cron_queue".to_string(), json!({"data": "test"}))
.with_trace_id("trace-cron-123");
job.cron_schedule = Some("0 0 * * *".to_string());
let span = create_job_span(&job, "job.cron");
assert_eq!(
span.metadata().expect("span should have metadata").name(),
"job.process"
);
}
#[test]
fn test_trace_id_uniqueness() {
let trace_id1 = TraceId::new();
let trace_id2 = TraceId::new();
assert_ne!(trace_id1.to_string(), trace_id2.to_string());
}
#[test]
fn test_correlation_id_uniqueness() {
let corr_id1 = CorrelationId::new();
let corr_id2 = CorrelationId::new();
assert_ne!(corr_id1.to_string(), corr_id2.to_string());
}
#[test]
fn test_trace_id_equality() {
let id1 = TraceId::from_string("test-id-123");
let id2 = TraceId::from_string("test-id-123");
let id3 = TraceId::from_string("different-id");
assert_eq!(id1, id2);
assert_ne!(id1, id3);
}
#[test]
fn test_correlation_id_equality() {
let id1 = CorrelationId::from_string("order-12345");
let id2 = CorrelationId::from_string("order-12345");
let id3 = CorrelationId::from_string("order-67890");
assert_eq!(id1, id2);
assert_ne!(id1, id3);
}
}