context_creator/core/telemetry/
otlp_parser.rs

1//! OTLP format parsers for JSON and protobuf
2
3use anyhow::{Context, Result};
4use prost::Message;
5use std::collections::HashMap;
6use std::fs;
7use std::path::Path;
8
9use crate::core::telemetry::types::*;
10
11// OpenTelemetry protobuf message definitions
12// Based on: https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
13
14/// OTLP ExportTraceServiceRequest message
15#[derive(Clone, PartialEq, Message)]
16pub struct ExportTraceServiceRequest {
17    #[prost(message, repeated, tag = "1")]
18    pub resource_spans: Vec<ResourceSpans>,
19}
20
21/// OTLP ResourceSpans message
22#[derive(Clone, PartialEq, Message)]
23pub struct ResourceSpans {
24    #[prost(message, optional, tag = "1")]
25    pub resource: Option<Resource>,
26    #[prost(message, repeated, tag = "2")]
27    pub scope_spans: Vec<ScopeSpans>,
28}
29
30/// OTLP Resource message
31#[derive(Clone, PartialEq, Message)]
32pub struct Resource {
33    #[prost(message, repeated, tag = "1")]
34    pub attributes: Vec<KeyValue>,
35}
36
37/// OTLP ScopeSpans message
38#[derive(Clone, PartialEq, Message)]
39pub struct ScopeSpans {
40    #[prost(message, repeated, tag = "2")]
41    pub spans: Vec<ProtobufSpan>,
42}
43
44/// OTLP Span message
45#[derive(Clone, PartialEq, Message)]
46pub struct ProtobufSpan {
47    #[prost(string, tag = "2")]
48    pub name: String,
49    #[prost(fixed64, tag = "7")]
50    pub start_time_unix_nano: u64,
51    #[prost(fixed64, tag = "8")]
52    pub end_time_unix_nano: u64,
53    #[prost(message, repeated, tag = "9")]
54    pub attributes: Vec<KeyValue>,
55    #[prost(message, optional, tag = "15")]
56    pub status: Option<Status>,
57}
58
59/// OTLP Status message
60#[derive(Clone, PartialEq, Message)]
61pub struct Status {
62    #[prost(enumeration = "StatusCode", tag = "2")]
63    pub code: i32,
64    #[prost(string, tag = "3")]
65    pub message: String,
66}
67
68/// OTLP StatusCode enumeration
69#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, prost::Enumeration)]
70#[repr(i32)]
71pub enum StatusCode {
72    Unset = 0,
73    Ok = 1,
74    Error = 2,
75}
76
77/// OTLP KeyValue message
78#[derive(Clone, PartialEq, Message)]
79pub struct KeyValue {
80    #[prost(string, tag = "1")]
81    pub key: String,
82    #[prost(message, optional, tag = "2")]
83    pub value: Option<AnyValue>,
84}
85
86/// OTLP AnyValue message
87#[derive(Clone, PartialEq, Message)]
88pub struct AnyValue {
89    #[prost(oneof = "any_value::Value", tags = "1, 2, 3, 4")]
90    pub value: Option<any_value::Value>,
91}
92
93/// AnyValue value oneof
94pub mod any_value {
95    #[allow(clippy::enum_variant_names)]
96    #[derive(Clone, PartialEq, prost::Oneof)]
97    pub enum Value {
98        #[prost(string, tag = "1")]
99        StringValue(String),
100        #[prost(int64, tag = "2")]
101        IntValue(i64),
102        #[prost(double, tag = "3")]
103        DoubleValue(f64),
104        #[prost(bool, tag = "4")]
105        BoolValue(bool),
106    }
107}
108
109/// Trait for parsing OTLP data from different formats
110pub trait OtlpParser {
111    /// Parse OTLP data from a file
112    fn parse_file(&self, path: &Path) -> Result<ParsedTelemetry>;
113
114    /// Parse OTLP data from bytes
115    fn parse_bytes(&self, data: &[u8]) -> Result<ParsedTelemetry>;
116}
117
118/// JSON format parser for OTLP data
119pub struct JsonParser;
120
121impl JsonParser {
122    pub fn new() -> Self {
123        Self
124    }
125}
126
127impl Default for JsonParser {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl JsonParser {
134    /// Extract telemetry spans from parsed JSON
135    fn extract_spans(&self, otlp: OtlpJson) -> Vec<TelemetrySpan> {
136        let mut spans = Vec::new();
137
138        if let Some(resource_spans) = otlp.resource_spans {
139            for rs in resource_spans {
140                // Extract service name from resource attributes
141                let service_name = rs
142                    .resource
143                    .as_ref()
144                    .and_then(|r| r.attributes.as_ref())
145                    .and_then(|attrs| {
146                        attrs
147                            .iter()
148                            .find(|a| a.key == "service.name")
149                            .and_then(|a| a.value.as_ref())
150                            .and_then(|v| v.string_value.clone())
151                    });
152
153                if let Some(scope_spans) = rs.scope_spans {
154                    for ss in scope_spans {
155                        if let Some(span_list) = ss.spans {
156                            for span in span_list {
157                                if let Some(telemetry_span) =
158                                    self.convert_span(span, service_name.clone())
159                                {
160                                    spans.push(telemetry_span);
161                                }
162                            }
163                        }
164                    }
165                }
166            }
167        }
168
169        spans
170    }
171
172    /// Convert OTLP span to our internal representation
173    fn convert_span(&self, span: Span, service_name: Option<String>) -> Option<TelemetrySpan> {
174        let mut telemetry_span = TelemetrySpan {
175            name: span.name,
176            function_name: None,
177            file_path: None,
178            line_number: None,
179            service_name,
180            start_time_nanos: span
181                .start_time_unix_nano
182                .and_then(|s| s.parse::<u64>().ok())
183                .unwrap_or(0),
184            end_time_nanos: span
185                .end_time_unix_nano
186                .and_then(|s| s.parse::<u64>().ok())
187                .unwrap_or(0),
188            duration_ms: 0.0,
189            attributes: HashMap::new(),
190        };
191
192        // Extract code-related attributes
193        if let Some(attributes) = &span.attributes {
194            for attr in attributes {
195                match attr.key.as_str() {
196                    "code.function.name" => {
197                        telemetry_span.function_name =
198                            attr.value.as_ref().and_then(|v| v.string_value.clone());
199                    }
200                    "code.file.path" | "code.filepath" => {
201                        telemetry_span.file_path = attr
202                            .value
203                            .as_ref()
204                            .and_then(|v| v.string_value.as_ref())
205                            .map(|p| Path::new(p).to_path_buf());
206                    }
207                    "code.line.number" | "code.lineno" => {
208                        telemetry_span.line_number = attr
209                            .value
210                            .as_ref()
211                            .and_then(|v| v.int_value.as_ref())
212                            .and_then(|s| s.parse::<u32>().ok());
213                    }
214                    _ => {
215                        // Store other attributes
216                        if let Some(value) = &attr.value {
217                            if let Some(attr_value) =
218                                self.convert_json_attribute_value(value.clone())
219                            {
220                                telemetry_span
221                                    .attributes
222                                    .insert(attr.key.clone(), attr_value);
223                            }
224                        }
225                    }
226                }
227            }
228        }
229
230        // Calculate duration
231        telemetry_span.calculate_duration_ms();
232
233        Some(telemetry_span)
234    }
235
236    /// Convert OTLP JSON attribute value to our internal representation
237    fn convert_json_attribute_value(
238        &self,
239        value: crate::core::telemetry::types::AnyValue,
240    ) -> Option<AttributeValue> {
241        if let Some(s) = value.string_value {
242            Some(AttributeValue::String(s))
243        } else if let Some(i) = value.int_value {
244            i.parse::<i64>().ok().map(AttributeValue::Int)
245        } else if let Some(d) = value.double_value {
246            Some(AttributeValue::Double(d))
247        } else {
248            value.bool_value.map(AttributeValue::Bool)
249        }
250    }
251}
252
253impl OtlpParser for JsonParser {
254    fn parse_file(&self, path: &Path) -> Result<ParsedTelemetry> {
255        let data = fs::read(path)
256            .with_context(|| format!("Failed to read telemetry file: {}", path.display()))?;
257        self.parse_bytes(&data)
258    }
259
260    fn parse_bytes(&self, data: &[u8]) -> Result<ParsedTelemetry> {
261        let otlp: OtlpJson = serde_json::from_slice(data).context("Failed to parse OTLP JSON")?;
262
263        let spans = self.extract_spans(otlp);
264        let code_spans = spans
265            .iter()
266            .filter(|s| s.function_name.is_some() || s.file_path.is_some())
267            .cloned()
268            .collect();
269
270        Ok(ParsedTelemetry { spans, code_spans })
271    }
272}
273
274/// Protobuf format parser for OTLP data
275pub struct ProtobufParser;
276
277impl ProtobufParser {
278    pub fn new() -> Self {
279        Self
280    }
281}
282
283impl Default for ProtobufParser {
284    fn default() -> Self {
285        Self::new()
286    }
287}
288
289impl ProtobufParser {
290    /// Extract telemetry spans from parsed protobuf
291    fn extract_spans(&self, otlp: ExportTraceServiceRequest) -> Vec<TelemetrySpan> {
292        let mut spans = Vec::new();
293
294        for resource_span in otlp.resource_spans {
295            // Extract service name from resource attributes
296            let service_name = resource_span.resource.as_ref().and_then(|r| {
297                r.attributes
298                    .iter()
299                    .find(|attr| attr.key == "service.name")
300                    .and_then(|attr| attr.value.as_ref())
301                    .and_then(|v| match &v.value {
302                        Some(any_value::Value::StringValue(s)) => Some(s.clone()),
303                        _ => None,
304                    })
305            });
306
307            for scope_span in resource_span.scope_spans {
308                for span in scope_span.spans {
309                    if let Some(telemetry_span) =
310                        self.convert_protobuf_span(span, service_name.clone())
311                    {
312                        spans.push(telemetry_span);
313                    }
314                }
315            }
316        }
317
318        spans
319    }
320
321    /// Convert OTLP protobuf span to our internal representation
322    fn convert_protobuf_span(
323        &self,
324        span: ProtobufSpan,
325        service_name: Option<String>,
326    ) -> Option<TelemetrySpan> {
327        let mut telemetry_span = TelemetrySpan {
328            name: span.name,
329            function_name: None,
330            file_path: None,
331            line_number: None,
332            service_name,
333            start_time_nanos: span.start_time_unix_nano,
334            end_time_nanos: span.end_time_unix_nano,
335            duration_ms: 0.0,
336            attributes: HashMap::new(),
337        };
338
339        // Extract code-related attributes
340        for attr in span.attributes {
341            match attr.key.as_str() {
342                "code.function.name" => {
343                    if let Some(value) = attr.value.and_then(|v| match v.value {
344                        Some(any_value::Value::StringValue(s)) => Some(s),
345                        _ => None,
346                    }) {
347                        telemetry_span.function_name = Some(value);
348                    }
349                }
350                "code.file.path" | "code.filepath" => {
351                    if let Some(value) = attr.value.and_then(|v| match v.value {
352                        Some(any_value::Value::StringValue(s)) => Some(s),
353                        _ => None,
354                    }) {
355                        telemetry_span.file_path = Some(Path::new(&value).to_path_buf());
356                    }
357                }
358                "code.line.number" | "code.lineno" => {
359                    if let Some(value) = attr.value.and_then(|v| match v.value {
360                        Some(any_value::Value::IntValue(i)) => Some(i as u32),
361                        _ => None,
362                    }) {
363                        telemetry_span.line_number = Some(value);
364                    }
365                }
366                _ => {
367                    // Store other attributes
368                    if let Some(attr_value) = self.convert_protobuf_attribute_value(attr.value) {
369                        telemetry_span.attributes.insert(attr.key, attr_value);
370                    }
371                }
372            }
373        }
374
375        // Add status information as attributes
376        if let Some(status) = span.status {
377            telemetry_span.attributes.insert(
378                "status.code".to_string(),
379                AttributeValue::Int(status.code as i64),
380            );
381            if !status.message.is_empty() {
382                telemetry_span.attributes.insert(
383                    "status.message".to_string(),
384                    AttributeValue::String(status.message),
385                );
386            }
387        }
388
389        // Calculate duration
390        telemetry_span.calculate_duration_ms();
391
392        Some(telemetry_span)
393    }
394
395    /// Convert OTLP protobuf attribute value to our internal representation
396    fn convert_protobuf_attribute_value(&self, value: Option<AnyValue>) -> Option<AttributeValue> {
397        value.and_then(|v| match v.value {
398            Some(any_value::Value::StringValue(s)) => Some(AttributeValue::String(s)),
399            Some(any_value::Value::IntValue(i)) => Some(AttributeValue::Int(i)),
400            Some(any_value::Value::DoubleValue(d)) => Some(AttributeValue::Double(d)),
401            Some(any_value::Value::BoolValue(b)) => Some(AttributeValue::Bool(b)),
402            None => None,
403        })
404    }
405}
406
407impl OtlpParser for ProtobufParser {
408    fn parse_file(&self, path: &Path) -> Result<ParsedTelemetry> {
409        let data = fs::read(path)
410            .with_context(|| format!("Failed to read telemetry file: {}", path.display()))?;
411        self.parse_bytes(&data)
412    }
413
414    fn parse_bytes(&self, data: &[u8]) -> Result<ParsedTelemetry> {
415        // Handle empty data gracefully
416        if data.is_empty() {
417            return Ok(ParsedTelemetry {
418                spans: vec![],
419                code_spans: vec![],
420            });
421        }
422
423        // Try to decode as ExportTraceServiceRequest (the most common OTLP format)
424        let otlp = ExportTraceServiceRequest::decode(data)
425            .context("Failed to decode OTLP protobuf data")?;
426
427        let spans = self.extract_spans(otlp);
428        let code_spans = spans
429            .iter()
430            .filter(|s| s.function_name.is_some() || s.file_path.is_some())
431            .cloned()
432            .collect();
433
434        Ok(ParsedTelemetry { spans, code_spans })
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use std::io::Write;
442    use tempfile::NamedTempFile;
443
444    #[test]
445    fn test_protobuf_parser_basic_deserialization() {
446        // Given: A simple protobuf binary with basic OTLP structure
447        // For now, we test that the method doesn't panic and returns a result
448        let parser = ProtobufParser::new();
449        let empty_data = vec![]; // Empty protobuf data
450
451        // When: Parsing empty protobuf data
452        let result = parser.parse_bytes(&empty_data);
453
454        // Then: Should return empty parsed data (not panic)
455        assert!(result.is_ok());
456        let parsed = result.unwrap();
457        assert_eq!(parsed.spans.len(), 0);
458        assert_eq!(parsed.code_spans.len(), 0);
459    }
460
461    #[test]
462    fn test_protobuf_parser_invalid_data() {
463        // Given: Invalid protobuf data
464        let parser = ProtobufParser::new();
465        let invalid_data = vec![0xFF, 0xFF, 0xFF, 0xFF]; // Invalid protobuf
466
467        // When: Parsing invalid data
468        let result = parser.parse_bytes(&invalid_data);
469
470        // Then: Should handle error gracefully
471        match result {
472            Ok(parsed) => {
473                // If it succeeds, should return empty data
474                assert_eq!(parsed.spans.len(), 0);
475                assert_eq!(parsed.code_spans.len(), 0);
476            }
477            Err(_) => {
478                // Error is also acceptable for invalid data
479            }
480        }
481    }
482
483    #[test]
484    fn test_protobuf_parser_file_reading() {
485        // Given: A temporary file with protobuf data
486        let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
487        let test_data = vec![0x08, 0x96, 0x01]; // Simple protobuf varint
488        temp_file
489            .write_all(&test_data)
490            .expect("Failed to write test data");
491
492        let parser = ProtobufParser::new();
493
494        // When: Parsing file
495        let result = parser.parse_file(temp_file.path());
496
497        // Then: Should read file and attempt to parse
498        match result {
499            Ok(_) => {
500                // Success is good
501            }
502            Err(e) => {
503                // For invalid protobuf data, error is acceptable
504                println!("Expected error for invalid protobuf data: {e}");
505                // The test should pass as long as we can handle the error gracefully
506            }
507        }
508    }
509
510    #[test]
511    fn test_protobuf_parser_vs_json_parser_interface() {
512        // Given: Both parsers
513        let protobuf_parser = ProtobufParser::new();
514        let json_parser = JsonParser::new();
515
516        // When: Using same interface
517        let empty_data = vec![];
518        let pb_result = protobuf_parser.parse_bytes(&empty_data);
519        let json_result = json_parser.parse_bytes(b"{}");
520
521        // Then: Both should implement same interface
522        assert!(pb_result.is_ok());
523        assert!(json_result.is_ok());
524    }
525
526    use std::path::PathBuf;
527
528    #[test]
529    fn test_parse_otlp_json_with_code_attributes() {
530        let json_data = r#"{
531            "resourceSpans": [{
532                "resource": {
533                    "attributes": [{
534                        "key": "service.name",
535                        "value": { "stringValue": "payment-api" }
536                    }]
537                },
538                "scopeSpans": [{
539                    "spans": [{
540                        "name": "process_payment",
541                        "startTimeUnixNano": "1704067200000000000",
542                        "endTimeUnixNano": "1704067200050000000",
543                        "attributes": [
544                            {
545                                "key": "code.function.name",
546                                "value": { "stringValue": "process_payment" }
547                            },
548                            {
549                                "key": "code.file.path",
550                                "value": { "stringValue": "src/api/handlers.rs" }
551                            },
552                            {
553                                "key": "code.line.number",
554                                "value": { "intValue": "42" }
555                            }
556                        ]
557                    }]
558                }]
559            }]
560        }"#;
561
562        let parser = JsonParser::new();
563        let result = parser.parse_bytes(json_data.as_bytes()).unwrap();
564
565        assert_eq!(result.spans.len(), 1);
566        assert_eq!(result.code_spans.len(), 1);
567
568        let span = &result.code_spans[0];
569        assert_eq!(span.name, "process_payment");
570        assert_eq!(span.function_name, Some("process_payment".to_string()));
571        assert_eq!(span.file_path, Some(PathBuf::from("src/api/handlers.rs")));
572        assert_eq!(span.line_number, Some(42));
573        assert_eq!(span.service_name, Some("payment-api".to_string()));
574        assert_eq!(span.duration_ms, 50.0);
575    }
576
577    #[test]
578    fn test_parse_otlp_json_without_code_attributes() {
579        let json_data = r#"{
580            "resourceSpans": [{
581                "scopeSpans": [{
582                    "spans": [{
583                        "name": "database_query",
584                        "startTimeUnixNano": "1704067200000000000",
585                        "endTimeUnixNano": "1704067200100000000",
586                        "attributes": [
587                            {
588                                "key": "db.statement",
589                                "value": { "stringValue": "SELECT * FROM users" }
590                            }
591                        ]
592                    }]
593                }]
594            }]
595        }"#;
596
597        let parser = JsonParser::new();
598        let result = parser.parse_bytes(json_data.as_bytes()).unwrap();
599
600        assert_eq!(result.spans.len(), 1);
601        assert_eq!(result.code_spans.len(), 0); // No code attributes
602
603        let span = &result.spans[0];
604        assert_eq!(span.name, "database_query");
605        assert!(span.function_name.is_none());
606        assert!(span.file_path.is_none());
607    }
608
609    #[test]
610    fn test_parse_invalid_json() {
611        let invalid_json = r#"{ invalid json }"#;
612
613        let parser = JsonParser::new();
614        let result = parser.parse_bytes(invalid_json.as_bytes());
615
616        assert!(result.is_err());
617        assert!(result
618            .unwrap_err()
619            .to_string()
620            .contains("Failed to parse OTLP JSON"));
621    }
622}