1use anyhow::{Context, Result};
4use prost::Message;
5use std::collections::HashMap;
6use std::fs;
7use std::path::Path;
8
9use crate::core::telemetry::types::*;
10
11#[derive(Clone, PartialEq, Message)]
16pub struct ExportTraceServiceRequest {
17 #[prost(message, repeated, tag = "1")]
18 pub resource_spans: Vec<ResourceSpans>,
19}
20
21#[derive(Clone, PartialEq, Message)]
23pub struct ResourceSpans {
24 #[prost(message, optional, tag = "1")]
25 pub resource: Option<Resource>,
26 #[prost(message, repeated, tag = "2")]
27 pub scope_spans: Vec<ScopeSpans>,
28}
29
30#[derive(Clone, PartialEq, Message)]
32pub struct Resource {
33 #[prost(message, repeated, tag = "1")]
34 pub attributes: Vec<KeyValue>,
35}
36
37#[derive(Clone, PartialEq, Message)]
39pub struct ScopeSpans {
40 #[prost(message, repeated, tag = "2")]
41 pub spans: Vec<ProtobufSpan>,
42}
43
44#[derive(Clone, PartialEq, Message)]
46pub struct ProtobufSpan {
47 #[prost(string, tag = "2")]
48 pub name: String,
49 #[prost(fixed64, tag = "7")]
50 pub start_time_unix_nano: u64,
51 #[prost(fixed64, tag = "8")]
52 pub end_time_unix_nano: u64,
53 #[prost(message, repeated, tag = "9")]
54 pub attributes: Vec<KeyValue>,
55 #[prost(message, optional, tag = "15")]
56 pub status: Option<Status>,
57}
58
59#[derive(Clone, PartialEq, Message)]
61pub struct Status {
62 #[prost(enumeration = "StatusCode", tag = "2")]
63 pub code: i32,
64 #[prost(string, tag = "3")]
65 pub message: String,
66}
67
68#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, prost::Enumeration)]
70#[repr(i32)]
71pub enum StatusCode {
72 Unset = 0,
73 Ok = 1,
74 Error = 2,
75}
76
77#[derive(Clone, PartialEq, Message)]
79pub struct KeyValue {
80 #[prost(string, tag = "1")]
81 pub key: String,
82 #[prost(message, optional, tag = "2")]
83 pub value: Option<AnyValue>,
84}
85
86#[derive(Clone, PartialEq, Message)]
88pub struct AnyValue {
89 #[prost(oneof = "any_value::Value", tags = "1, 2, 3, 4")]
90 pub value: Option<any_value::Value>,
91}
92
93pub mod any_value {
95 #[allow(clippy::enum_variant_names)]
96 #[derive(Clone, PartialEq, prost::Oneof)]
97 pub enum Value {
98 #[prost(string, tag = "1")]
99 StringValue(String),
100 #[prost(int64, tag = "2")]
101 IntValue(i64),
102 #[prost(double, tag = "3")]
103 DoubleValue(f64),
104 #[prost(bool, tag = "4")]
105 BoolValue(bool),
106 }
107}
108
109pub trait OtlpParser {
111 fn parse_file(&self, path: &Path) -> Result<ParsedTelemetry>;
113
114 fn parse_bytes(&self, data: &[u8]) -> Result<ParsedTelemetry>;
116}
117
118pub struct JsonParser;
120
121impl JsonParser {
122 pub fn new() -> Self {
123 Self
124 }
125}
126
127impl Default for JsonParser {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl JsonParser {
134 fn extract_spans(&self, otlp: OtlpJson) -> Vec<TelemetrySpan> {
136 let mut spans = Vec::new();
137
138 if let Some(resource_spans) = otlp.resource_spans {
139 for rs in resource_spans {
140 let service_name = rs
142 .resource
143 .as_ref()
144 .and_then(|r| r.attributes.as_ref())
145 .and_then(|attrs| {
146 attrs
147 .iter()
148 .find(|a| a.key == "service.name")
149 .and_then(|a| a.value.as_ref())
150 .and_then(|v| v.string_value.clone())
151 });
152
153 if let Some(scope_spans) = rs.scope_spans {
154 for ss in scope_spans {
155 if let Some(span_list) = ss.spans {
156 for span in span_list {
157 if let Some(telemetry_span) =
158 self.convert_span(span, service_name.clone())
159 {
160 spans.push(telemetry_span);
161 }
162 }
163 }
164 }
165 }
166 }
167 }
168
169 spans
170 }
171
172 fn convert_span(&self, span: Span, service_name: Option<String>) -> Option<TelemetrySpan> {
174 let mut telemetry_span = TelemetrySpan {
175 name: span.name,
176 function_name: None,
177 file_path: None,
178 line_number: None,
179 service_name,
180 start_time_nanos: span
181 .start_time_unix_nano
182 .and_then(|s| s.parse::<u64>().ok())
183 .unwrap_or(0),
184 end_time_nanos: span
185 .end_time_unix_nano
186 .and_then(|s| s.parse::<u64>().ok())
187 .unwrap_or(0),
188 duration_ms: 0.0,
189 attributes: HashMap::new(),
190 };
191
192 if let Some(attributes) = &span.attributes {
194 for attr in attributes {
195 match attr.key.as_str() {
196 "code.function.name" => {
197 telemetry_span.function_name =
198 attr.value.as_ref().and_then(|v| v.string_value.clone());
199 }
200 "code.file.path" | "code.filepath" => {
201 telemetry_span.file_path = attr
202 .value
203 .as_ref()
204 .and_then(|v| v.string_value.as_ref())
205 .map(|p| Path::new(p).to_path_buf());
206 }
207 "code.line.number" | "code.lineno" => {
208 telemetry_span.line_number = attr
209 .value
210 .as_ref()
211 .and_then(|v| v.int_value.as_ref())
212 .and_then(|s| s.parse::<u32>().ok());
213 }
214 _ => {
215 if let Some(value) = &attr.value {
217 if let Some(attr_value) =
218 self.convert_json_attribute_value(value.clone())
219 {
220 telemetry_span
221 .attributes
222 .insert(attr.key.clone(), attr_value);
223 }
224 }
225 }
226 }
227 }
228 }
229
230 telemetry_span.calculate_duration_ms();
232
233 Some(telemetry_span)
234 }
235
236 fn convert_json_attribute_value(
238 &self,
239 value: crate::core::telemetry::types::AnyValue,
240 ) -> Option<AttributeValue> {
241 if let Some(s) = value.string_value {
242 Some(AttributeValue::String(s))
243 } else if let Some(i) = value.int_value {
244 i.parse::<i64>().ok().map(AttributeValue::Int)
245 } else if let Some(d) = value.double_value {
246 Some(AttributeValue::Double(d))
247 } else {
248 value.bool_value.map(AttributeValue::Bool)
249 }
250 }
251}
252
253impl OtlpParser for JsonParser {
254 fn parse_file(&self, path: &Path) -> Result<ParsedTelemetry> {
255 let data = fs::read(path)
256 .with_context(|| format!("Failed to read telemetry file: {}", path.display()))?;
257 self.parse_bytes(&data)
258 }
259
260 fn parse_bytes(&self, data: &[u8]) -> Result<ParsedTelemetry> {
261 let otlp: OtlpJson = serde_json::from_slice(data).context("Failed to parse OTLP JSON")?;
262
263 let spans = self.extract_spans(otlp);
264 let code_spans = spans
265 .iter()
266 .filter(|s| s.function_name.is_some() || s.file_path.is_some())
267 .cloned()
268 .collect();
269
270 Ok(ParsedTelemetry { spans, code_spans })
271 }
272}
273
274pub struct ProtobufParser;
276
277impl ProtobufParser {
278 pub fn new() -> Self {
279 Self
280 }
281}
282
283impl Default for ProtobufParser {
284 fn default() -> Self {
285 Self::new()
286 }
287}
288
289impl ProtobufParser {
290 fn extract_spans(&self, otlp: ExportTraceServiceRequest) -> Vec<TelemetrySpan> {
292 let mut spans = Vec::new();
293
294 for resource_span in otlp.resource_spans {
295 let service_name = resource_span.resource.as_ref().and_then(|r| {
297 r.attributes
298 .iter()
299 .find(|attr| attr.key == "service.name")
300 .and_then(|attr| attr.value.as_ref())
301 .and_then(|v| match &v.value {
302 Some(any_value::Value::StringValue(s)) => Some(s.clone()),
303 _ => None,
304 })
305 });
306
307 for scope_span in resource_span.scope_spans {
308 for span in scope_span.spans {
309 if let Some(telemetry_span) =
310 self.convert_protobuf_span(span, service_name.clone())
311 {
312 spans.push(telemetry_span);
313 }
314 }
315 }
316 }
317
318 spans
319 }
320
321 fn convert_protobuf_span(
323 &self,
324 span: ProtobufSpan,
325 service_name: Option<String>,
326 ) -> Option<TelemetrySpan> {
327 let mut telemetry_span = TelemetrySpan {
328 name: span.name,
329 function_name: None,
330 file_path: None,
331 line_number: None,
332 service_name,
333 start_time_nanos: span.start_time_unix_nano,
334 end_time_nanos: span.end_time_unix_nano,
335 duration_ms: 0.0,
336 attributes: HashMap::new(),
337 };
338
339 for attr in span.attributes {
341 match attr.key.as_str() {
342 "code.function.name" => {
343 if let Some(value) = attr.value.and_then(|v| match v.value {
344 Some(any_value::Value::StringValue(s)) => Some(s),
345 _ => None,
346 }) {
347 telemetry_span.function_name = Some(value);
348 }
349 }
350 "code.file.path" | "code.filepath" => {
351 if let Some(value) = attr.value.and_then(|v| match v.value {
352 Some(any_value::Value::StringValue(s)) => Some(s),
353 _ => None,
354 }) {
355 telemetry_span.file_path = Some(Path::new(&value).to_path_buf());
356 }
357 }
358 "code.line.number" | "code.lineno" => {
359 if let Some(value) = attr.value.and_then(|v| match v.value {
360 Some(any_value::Value::IntValue(i)) => Some(i as u32),
361 _ => None,
362 }) {
363 telemetry_span.line_number = Some(value);
364 }
365 }
366 _ => {
367 if let Some(attr_value) = self.convert_protobuf_attribute_value(attr.value) {
369 telemetry_span.attributes.insert(attr.key, attr_value);
370 }
371 }
372 }
373 }
374
375 if let Some(status) = span.status {
377 telemetry_span.attributes.insert(
378 "status.code".to_string(),
379 AttributeValue::Int(status.code as i64),
380 );
381 if !status.message.is_empty() {
382 telemetry_span.attributes.insert(
383 "status.message".to_string(),
384 AttributeValue::String(status.message),
385 );
386 }
387 }
388
389 telemetry_span.calculate_duration_ms();
391
392 Some(telemetry_span)
393 }
394
395 fn convert_protobuf_attribute_value(&self, value: Option<AnyValue>) -> Option<AttributeValue> {
397 value.and_then(|v| match v.value {
398 Some(any_value::Value::StringValue(s)) => Some(AttributeValue::String(s)),
399 Some(any_value::Value::IntValue(i)) => Some(AttributeValue::Int(i)),
400 Some(any_value::Value::DoubleValue(d)) => Some(AttributeValue::Double(d)),
401 Some(any_value::Value::BoolValue(b)) => Some(AttributeValue::Bool(b)),
402 None => None,
403 })
404 }
405}
406
407impl OtlpParser for ProtobufParser {
408 fn parse_file(&self, path: &Path) -> Result<ParsedTelemetry> {
409 let data = fs::read(path)
410 .with_context(|| format!("Failed to read telemetry file: {}", path.display()))?;
411 self.parse_bytes(&data)
412 }
413
414 fn parse_bytes(&self, data: &[u8]) -> Result<ParsedTelemetry> {
415 if data.is_empty() {
417 return Ok(ParsedTelemetry {
418 spans: vec![],
419 code_spans: vec![],
420 });
421 }
422
423 let otlp = ExportTraceServiceRequest::decode(data)
425 .context("Failed to decode OTLP protobuf data")?;
426
427 let spans = self.extract_spans(otlp);
428 let code_spans = spans
429 .iter()
430 .filter(|s| s.function_name.is_some() || s.file_path.is_some())
431 .cloned()
432 .collect();
433
434 Ok(ParsedTelemetry { spans, code_spans })
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use std::io::Write;
442 use tempfile::NamedTempFile;
443
444 #[test]
445 fn test_protobuf_parser_basic_deserialization() {
446 let parser = ProtobufParser::new();
449 let empty_data = vec![]; let result = parser.parse_bytes(&empty_data);
453
454 assert!(result.is_ok());
456 let parsed = result.unwrap();
457 assert_eq!(parsed.spans.len(), 0);
458 assert_eq!(parsed.code_spans.len(), 0);
459 }
460
461 #[test]
462 fn test_protobuf_parser_invalid_data() {
463 let parser = ProtobufParser::new();
465 let invalid_data = vec![0xFF, 0xFF, 0xFF, 0xFF]; let result = parser.parse_bytes(&invalid_data);
469
470 match result {
472 Ok(parsed) => {
473 assert_eq!(parsed.spans.len(), 0);
475 assert_eq!(parsed.code_spans.len(), 0);
476 }
477 Err(_) => {
478 }
480 }
481 }
482
483 #[test]
484 fn test_protobuf_parser_file_reading() {
485 let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
487 let test_data = vec![0x08, 0x96, 0x01]; temp_file
489 .write_all(&test_data)
490 .expect("Failed to write test data");
491
492 let parser = ProtobufParser::new();
493
494 let result = parser.parse_file(temp_file.path());
496
497 match result {
499 Ok(_) => {
500 }
502 Err(e) => {
503 println!("Expected error for invalid protobuf data: {e}");
505 }
507 }
508 }
509
510 #[test]
511 fn test_protobuf_parser_vs_json_parser_interface() {
512 let protobuf_parser = ProtobufParser::new();
514 let json_parser = JsonParser::new();
515
516 let empty_data = vec![];
518 let pb_result = protobuf_parser.parse_bytes(&empty_data);
519 let json_result = json_parser.parse_bytes(b"{}");
520
521 assert!(pb_result.is_ok());
523 assert!(json_result.is_ok());
524 }
525
526 use std::path::PathBuf;
527
528 #[test]
529 fn test_parse_otlp_json_with_code_attributes() {
530 let json_data = r#"{
531 "resourceSpans": [{
532 "resource": {
533 "attributes": [{
534 "key": "service.name",
535 "value": { "stringValue": "payment-api" }
536 }]
537 },
538 "scopeSpans": [{
539 "spans": [{
540 "name": "process_payment",
541 "startTimeUnixNano": "1704067200000000000",
542 "endTimeUnixNano": "1704067200050000000",
543 "attributes": [
544 {
545 "key": "code.function.name",
546 "value": { "stringValue": "process_payment" }
547 },
548 {
549 "key": "code.file.path",
550 "value": { "stringValue": "src/api/handlers.rs" }
551 },
552 {
553 "key": "code.line.number",
554 "value": { "intValue": "42" }
555 }
556 ]
557 }]
558 }]
559 }]
560 }"#;
561
562 let parser = JsonParser::new();
563 let result = parser.parse_bytes(json_data.as_bytes()).unwrap();
564
565 assert_eq!(result.spans.len(), 1);
566 assert_eq!(result.code_spans.len(), 1);
567
568 let span = &result.code_spans[0];
569 assert_eq!(span.name, "process_payment");
570 assert_eq!(span.function_name, Some("process_payment".to_string()));
571 assert_eq!(span.file_path, Some(PathBuf::from("src/api/handlers.rs")));
572 assert_eq!(span.line_number, Some(42));
573 assert_eq!(span.service_name, Some("payment-api".to_string()));
574 assert_eq!(span.duration_ms, 50.0);
575 }
576
577 #[test]
578 fn test_parse_otlp_json_without_code_attributes() {
579 let json_data = r#"{
580 "resourceSpans": [{
581 "scopeSpans": [{
582 "spans": [{
583 "name": "database_query",
584 "startTimeUnixNano": "1704067200000000000",
585 "endTimeUnixNano": "1704067200100000000",
586 "attributes": [
587 {
588 "key": "db.statement",
589 "value": { "stringValue": "SELECT * FROM users" }
590 }
591 ]
592 }]
593 }]
594 }]
595 }"#;
596
597 let parser = JsonParser::new();
598 let result = parser.parse_bytes(json_data.as_bytes()).unwrap();
599
600 assert_eq!(result.spans.len(), 1);
601 assert_eq!(result.code_spans.len(), 0); let span = &result.spans[0];
604 assert_eq!(span.name, "database_query");
605 assert!(span.function_name.is_none());
606 assert!(span.file_path.is_none());
607 }
608
609 #[test]
610 fn test_parse_invalid_json() {
611 let invalid_json = r#"{ invalid json }"#;
612
613 let parser = JsonParser::new();
614 let result = parser.parse_bytes(invalid_json.as_bytes());
615
616 assert!(result.is_err());
617 assert!(result
618 .unwrap_err()
619 .to_string()
620 .contains("Failed to parse OTLP JSON"));
621 }
622}