1use crate::error::{Mecha10Error, Result};
43use serde::de::DeserializeOwned;
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46use std::sync::Arc;
47use tracing::{debug, info, span, Level, Span};
48use tracing_subscriber::layer::SubscriberExt;
49use tracing_subscriber::util::SubscriberInitExt;
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct TraceContext {
60 pub trace_id: String,
62 pub span_id: String,
64 pub parent_span_id: Option<String>,
66 pub sampled: bool,
68 #[serde(default)]
70 pub baggage: HashMap<String, String>,
71}
72
73impl TraceContext {
74 pub fn new() -> Self {
76 Self {
77 trace_id: generate_trace_id(),
78 span_id: generate_span_id(),
79 parent_span_id: None,
80 sampled: true,
81 baggage: HashMap::new(),
82 }
83 }
84
85 pub fn child(&self) -> Self {
87 Self {
88 trace_id: self.trace_id.clone(),
89 span_id: generate_span_id(),
90 parent_span_id: Some(self.span_id.clone()),
91 sampled: self.sampled,
92 baggage: self.baggage.clone(),
93 }
94 }
95
96 pub fn with_baggage(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
98 self.baggage.insert(key.into(), value.into());
99 self
100 }
101
102 pub fn to_w3c_traceparent(&self) -> String {
104 let flags = if self.sampled { "01" } else { "00" };
105 format!("00-{}-{}-{}", self.trace_id, self.span_id, flags)
106 }
107
108 pub fn from_w3c_traceparent(traceparent: &str) -> Option<Self> {
110 let parts: Vec<&str> = traceparent.split('-').collect();
111 if parts.len() != 4 || parts[0] != "00" {
112 return None;
113 }
114
115 Some(Self {
116 trace_id: parts[1].to_string(),
117 span_id: parts[2].to_string(),
118 parent_span_id: None,
119 sampled: parts[3] == "01",
120 baggage: HashMap::new(),
121 })
122 }
123}
124
125impl Default for TraceContext {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct TracedMessage<T> {
138 pub payload: T,
140 pub trace_context: TraceContext,
142 pub timestamp_us: u64,
144 pub source_node: String,
146}
147
148impl<T> TracedMessage<T> {
149 pub fn new(payload: T, source_node: impl Into<String>) -> Self {
151 Self {
152 payload,
153 trace_context: TraceContext::new(),
154 timestamp_us: crate::prelude::now_micros(),
155 source_node: source_node.into(),
156 }
157 }
158
159 pub fn with_context(payload: T, trace_context: TraceContext, source_node: impl Into<String>) -> Self {
161 Self {
162 payload,
163 trace_context,
164 timestamp_us: crate::prelude::now_micros(),
165 source_node: source_node.into(),
166 }
167 }
168
169 pub fn create_span(&self, operation: &str) -> Span {
171 span!(
172 Level::INFO,
173 "message",
174 operation = operation,
175 trace_id = %self.trace_context.trace_id,
176 span_id = %self.trace_context.span_id,
177 source = %self.source_node,
178 )
179 }
180}
181
182#[derive(Debug, Clone)]
188pub struct TracingConfig {
189 pub service_name: String,
191 pub jaeger_endpoint: Option<String>,
193 pub zipkin_endpoint: Option<String>,
195 pub sampling_rate: f64,
197 pub console_output: bool,
199}
200
201impl Default for TracingConfig {
202 fn default() -> Self {
203 Self {
204 service_name: "mecha10".to_string(),
205 jaeger_endpoint: None,
206 zipkin_endpoint: None,
207 sampling_rate: 1.0,
208 console_output: true,
209 }
210 }
211}
212
213impl TracingConfig {
214 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
216 self.service_name = name.into();
217 self
218 }
219
220 pub fn with_jaeger(mut self, endpoint: impl Into<String>) -> Self {
222 self.jaeger_endpoint = Some(endpoint.into());
223 self
224 }
225
226 pub fn with_zipkin(mut self, endpoint: impl Into<String>) -> Self {
228 self.zipkin_endpoint = Some(endpoint.into());
229 self
230 }
231
232 pub fn with_sampling_rate(mut self, rate: f64) -> Self {
234 self.sampling_rate = rate.clamp(0.0, 1.0);
235 self
236 }
237
238 pub fn without_console(mut self) -> Self {
240 self.console_output = false;
241 self
242 }
243
244 pub fn init(self) -> Result<TracingHandle> {
248 info!("Initializing distributed tracing: {}", self.service_name);
249
250 let fmt_layer = if self.console_output {
252 Some(tracing_subscriber::fmt::layer())
253 } else {
254 None
255 };
256
257 tracing_subscriber::registry()
259 .with(fmt_layer)
260 .try_init()
261 .map_err(|e| Mecha10Error::Runtime(format!("Failed to initialize tracing: {}", e)))?;
262
263 info!(
264 "Tracing initialized - Service: {}, Jaeger: {:?}, Sampling: {}",
265 self.service_name, self.jaeger_endpoint, self.sampling_rate
266 );
267
268 let sampling_rate = self.sampling_rate;
269 Ok(TracingHandle {
270 config: self,
271 sampler: Arc::new(ProbabilitySampler::new(sampling_rate)),
272 })
273 }
274}
275
276pub struct TracingHandle {
282 config: TracingConfig,
283 sampler: Arc<ProbabilitySampler>,
284}
285
286impl TracingHandle {
287 pub fn should_sample(&self) -> bool {
289 self.sampler.should_sample()
290 }
291
292 pub fn start_span(&self, name: &str) -> Span {
294 let trace_ctx = TraceContext::new();
295 span!(
296 Level::INFO,
297 "operation",
298 name = name,
299 trace_id = %trace_ctx.trace_id,
300 span_id = %trace_ctx.span_id,
301 )
302 }
303
304 pub fn service_name(&self) -> &str {
306 &self.config.service_name
307 }
308}
309
310struct ProbabilitySampler {
316 rate: f64,
317}
318
319impl ProbabilitySampler {
320 fn new(rate: f64) -> Self {
321 Self {
322 rate: rate.clamp(0.0, 1.0),
323 }
324 }
325
326 fn should_sample(&self) -> bool {
327 if self.rate >= 1.0 {
328 true
329 } else if self.rate <= 0.0 {
330 false
331 } else {
332 use std::collections::hash_map::RandomState;
333 use std::hash::{BuildHasher, Hash, Hasher};
334
335 let mut hasher = RandomState::new().build_hasher();
336 std::time::SystemTime::now().hash(&mut hasher);
337 let hash = hasher.finish();
338 (hash as f64 / u64::MAX as f64) < self.rate
339 }
340 }
341}
342
343fn generate_trace_id() -> String {
349 use std::time::SystemTime;
350 let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
351 format!("{:016x}{:016x}", now.as_nanos(), rand_u64())
352}
353
354fn generate_span_id() -> String {
356 format!("{:016x}", rand_u64())
357}
358
359fn rand_u64() -> u64 {
361 use std::collections::hash_map::RandomState;
362 use std::hash::{BuildHasher, Hash, Hasher};
363
364 let mut hasher = RandomState::new().build_hasher();
365 std::time::SystemTime::now().hash(&mut hasher);
366 hasher.finish()
367}
368
369pub trait TracingExt {
375 fn publish_traced<T: crate::messages::Message + Serialize + Clone + 'static>(
380 &self,
381 topic: &str,
382 message: &T,
383 ) -> impl std::future::Future<Output = Result<()>> + Send;
384
385 fn publish_with_context<T: crate::messages::Message + Serialize + Clone + 'static>(
387 &self,
388 topic: &str,
389 message: &T,
390 trace_context: TraceContext,
391 ) -> impl std::future::Future<Output = Result<()>> + Send;
392
393 fn subscribe_traced<T: crate::messages::Message + DeserializeOwned + Send + 'static>(
397 &self,
398 topic: &str,
399 ) -> impl std::future::Future<Output = Result<crate::context::Receiver<TracedMessage<T>>>> + Send;
400
401 fn current_trace_context(&self) -> Option<TraceContext>;
403
404 fn extract_trace_context(&self, metadata: &HashMap<String, String>) -> Option<TraceContext>;
406
407 fn inject_trace_context(&self, metadata: &mut HashMap<String, String>, ctx: &TraceContext);
409}
410
411impl TracingExt for crate::context::Context {
412 async fn publish_traced<T: crate::messages::Message + Serialize + Clone + 'static>(
413 &self,
414 topic: &str,
415 message: &T,
416 ) -> Result<()> {
417 let trace_context = self.current_trace_context().unwrap_or_default();
419 self.publish_with_context(topic, message, trace_context).await
420 }
421
422 async fn publish_with_context<T: crate::messages::Message + Serialize + Clone + 'static>(
423 &self,
424 topic: &str,
425 message: &T,
426 trace_context: TraceContext,
427 ) -> Result<()> {
428 let traced_msg = TracedMessage::with_context(
430 message.clone(),
431 trace_context.child(), self.node_id(),
433 );
434
435 self.publish_raw(topic, &traced_msg).await
437 }
438
439 async fn subscribe_traced<T: crate::messages::Message + DeserializeOwned + Send + 'static>(
440 &self,
441 topic: &str,
442 ) -> Result<crate::context::Receiver<TracedMessage<T>>> {
443 self.subscribe_raw::<TracedMessage<T>>(topic).await
445 }
446
447 fn current_trace_context(&self) -> Option<TraceContext> {
448 use tracing::field::Visit;
450
451 struct TraceIdVisitor {
452 trace_id: Option<String>,
453 span_id: Option<String>,
454 }
455
456 impl Visit for TraceIdVisitor {
457 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
458 match field.name() {
459 "trace_id" => self.trace_id = Some(value.to_string()),
460 "span_id" => self.span_id = Some(value.to_string()),
461 _ => {}
462 }
463 }
464
465 fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {}
466 }
467
468 let _visitor = TraceIdVisitor {
470 trace_id: None,
471 span_id: None,
472 };
473
474 let span = Span::current();
476 span.record("trace_id", &tracing::field::Empty);
477 None
481 }
482
483 fn extract_trace_context(&self, metadata: &HashMap<String, String>) -> Option<TraceContext> {
484 if let Some(traceparent) = metadata.get("traceparent") {
486 return TraceContext::from_w3c_traceparent(traceparent);
487 }
488
489 if let Some(trace_ctx_json) = metadata.get("trace_context") {
491 return serde_json::from_str(trace_ctx_json).ok();
492 }
493
494 None
495 }
496
497 fn inject_trace_context(&self, metadata: &mut HashMap<String, String>, ctx: &TraceContext) {
498 metadata.insert("traceparent".to_string(), ctx.to_w3c_traceparent());
500
501 if let Ok(json) = serde_json::to_string(ctx) {
503 metadata.insert("trace_context".to_string(), json);
504 }
505
506 for (key, value) in &ctx.baggage {
508 metadata.insert(format!("baggage_{}", key), value.clone());
509 }
510 }
511}
512
513pub struct SpanBuilder {
519 name: String,
520 trace_context: Option<TraceContext>,
521 attributes: HashMap<String, String>,
522}
523
524impl SpanBuilder {
525 pub fn new(name: impl Into<String>) -> Self {
527 Self {
528 name: name.into(),
529 trace_context: None,
530 attributes: HashMap::new(),
531 }
532 }
533
534 pub fn with_context(mut self, ctx: TraceContext) -> Self {
536 self.trace_context = Some(ctx);
537 self
538 }
539
540 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
542 self.attributes.insert(key.into(), value.into());
543 self
544 }
545
546 pub fn start(self) -> (Span, TraceContext) {
548 let trace_ctx = self.trace_context.unwrap_or_default();
549
550 let span = span!(
551 Level::INFO,
552 "span",
553 name = %self.name,
554 trace_id = %trace_ctx.trace_id,
555 span_id = %trace_ctx.span_id,
556 );
557
558 (span, trace_ctx)
559 }
560}
561
562pub trait TraceExporter: Send + Sync {
568 fn export_span(&self, span_data: &SpanData);
570
571 fn flush(&self);
573}
574
575#[derive(Debug, Clone)]
577pub struct SpanData {
578 pub trace_id: String,
579 pub span_id: String,
580 pub parent_span_id: Option<String>,
581 pub name: String,
582 pub start_time: u64,
583 pub end_time: u64,
584 pub attributes: HashMap<String, String>,
585}
586
587pub struct JaegerExporter {
589 #[allow(dead_code)]
590 endpoint: String,
591 #[allow(dead_code)]
592 service_name: String,
593}
594
595impl JaegerExporter {
596 pub fn new(endpoint: impl Into<String>, service_name: impl Into<String>) -> Self {
598 Self {
599 endpoint: endpoint.into(),
600 service_name: service_name.into(),
601 }
602 }
603}
604
605impl TraceExporter for JaegerExporter {
606 fn export_span(&self, span_data: &SpanData) {
607 debug!(
609 "Exporting span to Jaeger {}: {} (trace: {})",
610 self.endpoint, span_data.name, span_data.trace_id
611 );
612 }
613
614 fn flush(&self) {
615 debug!("Flushing Jaeger exporter");
616 }
617}
618
619pub struct ZipkinExporter {
621 #[allow(dead_code)]
622 endpoint: String,
623 #[allow(dead_code)]
624 service_name: String,
625}
626
627impl ZipkinExporter {
628 pub fn new(endpoint: impl Into<String>, service_name: impl Into<String>) -> Self {
630 Self {
631 endpoint: endpoint.into(),
632 service_name: service_name.into(),
633 }
634 }
635}
636
637impl TraceExporter for ZipkinExporter {
638 fn export_span(&self, span_data: &SpanData) {
639 debug!(
640 "Exporting span to Zipkin {}: {} (trace: {})",
641 self.endpoint, span_data.name, span_data.trace_id
642 );
643 }
644
645 fn flush(&self) {
646 debug!("Flushing Zipkin exporter");
647 }
648}