Skip to main content

serverless_otlp_forwarder_core/
telemetry.rs

1use anyhow::{Context, Result};
2use base64::{engine::general_purpose, Engine};
3use flate2::{read::GzDecoder, write::GzEncoder, Compression};
4use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
5use otlp_stdout_span_exporter::ExporterOutput;
6use prost::Message;
7use serde_json::Value;
8use std::io::{Read, Write};
9use tracing;
10/// Core structure representing telemetry data to be forwarded
11#[derive(Clone, Debug)]
12pub struct TelemetryData {
13    /// Source of the telemetry data (e.g., service name or log group)
14    pub source: String,
15    /// Target endpoint for the telemetry data
16    pub endpoint: String,
17    /// The actual payload bytes
18    pub payload: Vec<u8>,
19    /// Content type of the payload
20    pub content_type: String,
21    /// Optional content encoding (e.g., gzip)
22    pub content_encoding: Option<String>,
23}
24
25impl Default for TelemetryData {
26    fn default() -> Self {
27        Self {
28            source: "unknown".to_string(),
29            // Default endpoint to localhost for the collector extension model
30            endpoint: "http://localhost:4318/v1/traces".to_string(),
31            payload: Vec::new(),
32            content_type: "application/x-protobuf".to_string(),
33            content_encoding: None, // No compression by default
34        }
35    }
36}
37
38impl TelemetryData {
39    /// Converts payload data to binary protobuf format (uncompressed)
40    ///
41    /// This method ensures that all telemetry data is in a consistent format
42    /// before it reaches the span compactor, which simplifies compaction logic.
43    ///
44    /// # Arguments
45    ///
46    /// * `payload` - The raw payload bytes
47    /// * `content_type` - The content type of the payload
48    /// * `content_encoding` - The optional content encoding of the payload
49    ///
50    /// # Returns
51    ///
52    /// The binary protobuf payload
53    fn convert_to_protobuf(
54        payload: Vec<u8>,
55        content_type: &str,
56        content_encoding: Option<&str>,
57    ) -> Result<Vec<u8>> {
58        tracing::debug!(
59            content_type = %content_type,
60            content_encoding = %content_encoding.unwrap_or("none"),
61            "Converting payload to protobuf"
62        );
63
64        // First, decompress if needed
65        let decompressed = if content_encoding == Some("gzip") {
66            tracing::debug!("Decompressing gzipped payload");
67            let mut decoder = GzDecoder::new(&payload[..]);
68            let mut decompressed = Vec::new();
69            decoder
70                .read_to_end(&mut decompressed)
71                .context("Failed to decompress payload")?;
72            decompressed
73        } else {
74            payload
75        };
76
77        // Then convert to protobuf based on content type
78        match content_type {
79            "application/x-protobuf" => {
80                // Already protobuf, no conversion needed
81                tracing::debug!("Payload already in protobuf format");
82                Ok(decompressed)
83            }
84            "application/json" => {
85                // Convert JSON to protobuf
86                tracing::debug!("Converting JSON to protobuf");
87                Self::convert_json_to_protobuf(&decompressed)
88            }
89            _ => {
90                // Unknown format, log warning and return as-is
91                tracing::warn!(
92                    content_type = %content_type,
93                    "Unknown content type; keeping payload as is"
94                );
95                Ok(decompressed)
96            }
97        }
98    }
99
100    /// Converts JSON to protobuf using the OTLP schema
101    ///
102    /// Since the JSON schema matches the OTLP protobuf schema, we can directly
103    /// deserialize the JSON into the protobuf structure and then serialize it back
104    /// to binary protobuf format.
105    fn convert_json_to_protobuf(json_bytes: &[u8]) -> Result<Vec<u8>> {
106        // Parse the JSON into an ExportTraceServiceRequest
107        let request: ExportTraceServiceRequest = serde_json::from_slice(json_bytes)
108            .context("Failed to parse JSON as ExportTraceServiceRequest")?;
109
110        // Serialize to protobuf binary format
111        let protobuf_bytes = request.encode_to_vec();
112
113        tracing::debug!(
114            payload_size_bytes = protobuf_bytes.len() as u64,
115            "Converted JSON to protobuf"
116        );
117
118        Ok(protobuf_bytes)
119    }
120
121    /// Applies gzip compression to the payload
122    ///
123    /// This should only be called on the final compacted payload
124    /// to avoid unnecessary compression/decompression cycles.
125    pub fn compress(&mut self, compression_level: u32) -> Result<()> {
126        // Only compress if not already compressed
127        if self.content_encoding != Some("gzip".to_string()) {
128            tracing::debug!(compression_level, "Compressing payload");
129
130            let original_size = self.payload.len();
131            let mut encoder = GzEncoder::new(Vec::new(), Compression::new(compression_level));
132            encoder
133                .write_all(&self.payload)
134                .context("Failed to compress payload")?;
135
136            self.payload = encoder.finish().context("Failed to finish compression")?;
137
138            self.content_encoding = Some("gzip".to_string());
139
140            tracing::debug!(
141                original_size_bytes = original_size as u64,
142                compressed_size_bytes = self.payload.len() as u64,
143                "Compressed payload"
144            );
145        }
146
147        Ok(())
148    }
149
150    /// Creates a TelemetryData instance from a LogRecord
151    pub fn from_log_record(record: ExporterOutput) -> Result<Self> {
152        // Decode base64 payload
153        let raw_payload = if record.base64 {
154            general_purpose::STANDARD
155                .decode(&record.payload)
156                .context("Failed to decode base64 payload")?
157        } else {
158            record.payload.as_bytes().to_vec()
159        };
160
161        // Convert to uncompressed protobuf format
162        let protobuf_payload = Self::convert_to_protobuf(
163            raw_payload,
164            &record.content_type,
165            Some(&record.content_encoding),
166        )?;
167
168        Ok(Self {
169            source: record.source.clone(),
170            endpoint: record.endpoint.to_string(),
171            payload: protobuf_payload,
172            content_type: "application/x-protobuf".to_string(),
173            content_encoding: None, // Decompressed at this stage
174        })
175    }
176
177    /// Creates a TelemetryData instance from a raw span (as serialized JSON)
178    pub fn from_raw_span(span: Value, log_group: &str) -> Result<Self> {
179        // Serialize the span data
180        let json_string =
181            serde_json::to_string(&span).context("Failed to serialize span data to JSON string")?;
182
183        let raw_payload = json_string.as_bytes().to_vec();
184
185        // Convert to protobuf format (uncompressed)
186        let protobuf_payload = Self::convert_to_protobuf(raw_payload, "application/json", None)?;
187
188        Ok(Self {
189            source: log_group.to_string(),
190            // Default endpoint to localhost for the collector extension model
191            endpoint: "http://localhost:4318/v1/traces".to_string(),
192            payload: protobuf_payload,
193            content_type: "application/x-protobuf".to_string(),
194            content_encoding: None, // No compression at this stage
195        })
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use base64::{engine::general_purpose, Engine};
203    use flate2::{write::GzEncoder, Compression};
204    use otlp_stdout_span_exporter::ExporterOutput;
205    use serde_json::json;
206    use std::collections::HashMap;
207    use std::io::Write; // Added missing import for tests
208
209    // Helper function to create gzipped, base64-encoded protobuf data
210    fn create_test_payload() -> String {
211        // Create a minimal valid OTLP protobuf payload
212        let request = ExportTraceServiceRequest {
213            resource_spans: vec![],
214        };
215
216        // Convert to protobuf bytes
217        let proto_bytes = request.encode_to_vec();
218
219        // Compress with gzip
220        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
221        encoder.write_all(&proto_bytes).unwrap();
222        let compressed_bytes = encoder.finish().unwrap();
223
224        // Base64 encode
225        general_purpose::STANDARD.encode(compressed_bytes)
226    }
227
228    #[test]
229    fn test_from_log_record() {
230        let record = ExporterOutput {
231            version: "test".to_string(),
232            source: "test-service".to_string(),
233            endpoint: "http://example.com".to_string(),
234            method: "POST".to_string(),
235            payload: create_test_payload(),
236            headers: Some(HashMap::new()),
237            content_type: "application/x-protobuf".to_string(),
238            content_encoding: "gzip".to_string(),
239            base64: true,
240            level: Some("info".to_string()),
241        };
242
243        let telemetry = TelemetryData::from_log_record(record).unwrap();
244        assert_eq!(telemetry.source, "test-service");
245        assert_eq!(telemetry.endpoint, "http://example.com");
246        assert_eq!(telemetry.content_type, "application/x-protobuf");
247        // Since we're decompressing at from_log_record level, it should be None
248        assert_eq!(telemetry.content_encoding, None);
249    }
250
251    #[test]
252    fn test_from_raw_span() {
253        // Create a valid OTLP JSON structure
254        let span = json!({
255            "resourceSpans": []
256        });
257
258        let telemetry = TelemetryData::from_raw_span(span, "aws/spans").unwrap();
259        assert_eq!(telemetry.source, "aws/spans");
260        assert_eq!(telemetry.content_type, "application/x-protobuf");
261        assert_eq!(telemetry.content_encoding, None); // No compression at this stage
262    }
263
264    #[test]
265    fn test_compress() {
266        // Create a telemetry object with uncompressed data
267        let mut telemetry = TelemetryData {
268            source: "test".to_string(),
269            endpoint: "http://example.com".to_string(),
270            payload: vec![1, 2, 3, 4, 5],
271            content_type: "application/x-protobuf".to_string(),
272            content_encoding: None,
273        };
274
275        // Compress it
276        telemetry.compress(6).unwrap();
277
278        // Verify it's now compressed
279        assert_eq!(telemetry.content_encoding, Some("gzip".to_string()));
280
281        // Decompress to verify the data is intact
282        let mut decoder = GzDecoder::new(&telemetry.payload[..]);
283        let mut decompressed = Vec::new();
284        decoder.read_to_end(&mut decompressed).unwrap();
285
286        assert_eq!(decompressed, vec![1, 2, 3, 4, 5]);
287    }
288
289    #[test]
290    fn test_convert_to_protobuf_already_protobuf() {
291        // Test that protobuf data is not modified
292        let original_payload = vec![1, 2, 3, 4];
293        let converted = TelemetryData::convert_to_protobuf(
294            original_payload.clone(),
295            "application/x-protobuf",
296            None,
297        )
298        .unwrap();
299
300        assert_eq!(converted, original_payload);
301    }
302
303    #[test]
304    fn test_convert_to_protobuf_from_json() {
305        // Create a minimal valid OTLP JSON payload
306        let json_data = json!({
307            "resourceSpans": []
308        });
309        let json_bytes = serde_json::to_vec(&json_data).unwrap();
310
311        let converted =
312            TelemetryData::convert_to_protobuf(json_bytes, "application/json", None).unwrap();
313
314        // Verify we can decode it as an ExportTraceServiceRequest
315        let request = ExportTraceServiceRequest::decode(converted.as_slice()).unwrap();
316        assert_eq!(request.resource_spans.len(), 0);
317    }
318
319    #[test]
320    fn test_convert_to_protobuf_from_gzipped_json() {
321        // Create a minimal valid OTLP JSON payload
322        let json_data = json!({
323            "resourceSpans": []
324        });
325        let json_bytes = serde_json::to_vec(&json_data).unwrap();
326
327        // Compress it
328        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
329        encoder.write_all(&json_bytes).unwrap();
330        let compressed = encoder.finish().unwrap();
331
332        let converted =
333            TelemetryData::convert_to_protobuf(compressed, "application/json", Some("gzip"))
334                .unwrap();
335
336        // Verify we can decode it as an ExportTraceServiceRequest
337        let request = ExportTraceServiceRequest::decode(converted.as_slice()).unwrap();
338        assert_eq!(request.resource_spans.len(), 0);
339    }
340}