rustkernel_core/observability/
tracing.rs1use serde::{Deserialize, Serialize};
25use std::time::{Duration, Instant};
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct TracingConfig {
30 pub enabled: bool,
32 pub otlp_endpoint: Option<String>,
34 pub sampling_rate: f64,
36 pub service_name: String,
38 pub include_events: bool,
40 pub max_attributes: u32,
42 pub batch_size: usize,
44 pub export_timeout: Duration,
46}
47
48impl Default for TracingConfig {
49 fn default() -> Self {
50 Self {
51 enabled: true,
52 otlp_endpoint: None,
53 sampling_rate: 1.0, service_name: "rustkernels".to_string(),
55 include_events: true,
56 max_attributes: 128,
57 batch_size: 512,
58 export_timeout: Duration::from_secs(30),
59 }
60 }
61}
62
63impl TracingConfig {
64 pub fn otlp(endpoint: impl Into<String>) -> Self {
66 Self {
67 otlp_endpoint: Some(endpoint.into()),
68 ..Default::default()
69 }
70 }
71
72 pub fn with_sampling(mut self, rate: f64) -> Self {
74 self.sampling_rate = rate.clamp(0.0, 1.0);
75 self
76 }
77
78 #[cfg(feature = "otlp")]
80 pub async fn init(&self) -> crate::error::Result<()> {
81 use opentelemetry_otlp::WithExportConfig;
82 use opentelemetry_sdk::trace::SdkTracerProvider;
83
84 if !self.enabled {
85 return Ok(());
86 }
87
88 if let Some(ref endpoint) = self.otlp_endpoint {
89 let exporter = opentelemetry_otlp::SpanExporter::builder()
90 .with_tonic()
91 .with_endpoint(endpoint)
92 .with_timeout(self.export_timeout)
93 .build()
94 .map_err(|e| crate::error::KernelError::ConfigError(e.to_string()))?;
95
96 let provider = SdkTracerProvider::builder()
97 .with_batch_exporter(exporter)
98 .build();
99
100 opentelemetry::global::set_tracer_provider(provider);
101 }
102
103 Ok(())
104 }
105
106 #[cfg(not(feature = "otlp"))]
108 pub async fn init(&self) -> crate::error::Result<()> {
109 Ok(())
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SpanContext {
116 pub trace_id: String,
118 pub span_id: String,
120 pub trace_flags: u8,
122 pub trace_state: Option<String>,
124}
125
126impl SpanContext {
127 pub fn new(trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
129 Self {
130 trace_id: trace_id.into(),
131 span_id: span_id.into(),
132 trace_flags: 0x01, trace_state: None,
134 }
135 }
136
137 pub fn generate_trace_id() -> String {
139 format!("{:032x}", rand::random::<u128>())
140 }
141
142 pub fn generate_span_id() -> String {
144 format!("{:016x}", rand::random::<u64>())
145 }
146
147 pub fn new_root() -> Self {
149 Self::new(Self::generate_trace_id(), Self::generate_span_id())
150 }
151
152 pub fn new_child(&self) -> Self {
154 Self {
155 trace_id: self.trace_id.clone(),
156 span_id: Self::generate_span_id(),
157 trace_flags: self.trace_flags,
158 trace_state: self.trace_state.clone(),
159 }
160 }
161
162 pub fn to_traceparent(&self) -> String {
164 format!(
165 "00-{}-{}-{:02x}",
166 self.trace_id, self.span_id, self.trace_flags
167 )
168 }
169
170 pub fn from_traceparent(header: &str) -> Option<Self> {
172 let parts: Vec<&str> = header.split('-').collect();
173 if parts.len() != 4 {
174 return None;
175 }
176
177 Some(Self {
178 trace_id: parts[1].to_string(),
179 span_id: parts[2].to_string(),
180 trace_flags: u8::from_str_radix(parts[3], 16).ok()?,
181 trace_state: None,
182 })
183 }
184}
185
186pub struct KernelSpan {
188 pub kernel_id: String,
190 pub operation: String,
192 pub context: SpanContext,
194 pub start: Instant,
196 pub attributes: std::collections::HashMap<String, String>,
198 pub events: Vec<SpanEvent>,
200}
201
202impl KernelSpan {
203 pub fn start(kernel_id: impl Into<String>, operation: impl Into<String>) -> Self {
205 Self {
206 kernel_id: kernel_id.into(),
207 operation: operation.into(),
208 context: SpanContext::new_root(),
209 start: Instant::now(),
210 attributes: std::collections::HashMap::new(),
211 events: Vec::new(),
212 }
213 }
214
215 pub fn start_child(
217 parent: &SpanContext,
218 kernel_id: impl Into<String>,
219 operation: impl Into<String>,
220 ) -> Self {
221 Self {
222 kernel_id: kernel_id.into(),
223 operation: operation.into(),
224 context: parent.new_child(),
225 start: Instant::now(),
226 attributes: std::collections::HashMap::new(),
227 events: Vec::new(),
228 }
229 }
230
231 pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<String>) {
233 self.attributes.insert(key.into(), value.into());
234 }
235
236 pub fn add_event(&mut self, name: impl Into<String>) {
238 self.events.push(SpanEvent {
239 name: name.into(),
240 timestamp: Instant::now(),
241 attributes: std::collections::HashMap::new(),
242 });
243 }
244
245 pub fn add_event_with_attributes(
247 &mut self,
248 name: impl Into<String>,
249 attributes: std::collections::HashMap<String, String>,
250 ) {
251 self.events.push(SpanEvent {
252 name: name.into(),
253 timestamp: Instant::now(),
254 attributes,
255 });
256 }
257
258 pub fn record_error(&mut self, error: &dyn std::error::Error) {
260 self.set_attribute("error", "true");
261 self.set_attribute("error.message", error.to_string());
262 self.add_event("exception");
263 }
264
265 pub fn end(self) -> Duration {
267 let duration = self.start.elapsed();
268
269 #[cfg(feature = "otlp")]
270 {
271 use tracing::info_span;
272 let span = info_span!(
274 "kernel_execution",
275 kernel_id = %self.kernel_id,
276 operation = %self.operation,
277 trace_id = %self.context.trace_id,
278 span_id = %self.context.span_id,
279 duration_us = duration.as_micros() as u64,
280 );
281 span.in_scope(|| {
282 for (key, value) in &self.attributes {
283 tracing::info!(key = %key, value = %value, "span attribute");
284 }
285 });
286 }
287
288 duration
289 }
290
291 pub fn elapsed(&self) -> Duration {
293 self.start.elapsed()
294 }
295}
296
297#[derive(Debug, Clone)]
299pub struct SpanEvent {
300 pub name: String,
302 pub timestamp: Instant,
304 pub attributes: std::collections::HashMap<String, String>,
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311
312 #[test]
313 fn test_span_context() {
314 let ctx = SpanContext::new_root();
315 assert_eq!(ctx.trace_id.len(), 32);
316 assert_eq!(ctx.span_id.len(), 16);
317
318 let child = ctx.new_child();
319 assert_eq!(child.trace_id, ctx.trace_id);
320 assert_ne!(child.span_id, ctx.span_id);
321 }
322
323 #[test]
324 fn test_traceparent() {
325 let ctx = SpanContext::new("0af7651916cd43dd8448eb211c80319c", "b7ad6b7169203331");
326 let header = ctx.to_traceparent();
327 assert!(header.starts_with("00-"));
328
329 let parsed = SpanContext::from_traceparent(&header).unwrap();
330 assert_eq!(parsed.trace_id, ctx.trace_id);
331 assert_eq!(parsed.span_id, ctx.span_id);
332 }
333
334 #[test]
335 fn test_kernel_span() {
336 let mut span = KernelSpan::start("graph/pagerank", "execute");
337 span.set_attribute("input_size", "1000");
338 span.add_event("started");
339
340 std::thread::sleep(std::time::Duration::from_millis(10));
341
342 let duration = span.end();
343 assert!(duration >= std::time::Duration::from_millis(10));
344 }
345
346 #[test]
347 fn test_tracing_config() {
348 let config = TracingConfig::otlp("http://jaeger:4317").with_sampling(0.5);
349
350 assert_eq!(config.otlp_endpoint, Some("http://jaeger:4317".to_string()));
351 assert_eq!(config.sampling_rate, 0.5);
352 }
353}