serverless_otlp_forwarder_core/
telemetry.rs1use anyhow::{Context, Result};
2use base64::{engine::general_purpose, Engine};
3use flate2::{read::GzDecoder, write::GzEncoder, Compression};
4use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
5use otlp_stdout_span_exporter::ExporterOutput;
6use prost::Message;
7use serde_json::Value;
8use std::io::{Read, Write};
9use tracing;
10#[derive(Clone, Debug)]
12pub struct TelemetryData {
13 pub source: String,
15 pub endpoint: String,
17 pub payload: Vec<u8>,
19 pub content_type: String,
21 pub content_encoding: Option<String>,
23}
24
25impl Default for TelemetryData {
26 fn default() -> Self {
27 Self {
28 source: "unknown".to_string(),
29 endpoint: "http://localhost:4318/v1/traces".to_string(),
31 payload: Vec::new(),
32 content_type: "application/x-protobuf".to_string(),
33 content_encoding: None, }
35 }
36}
37
38impl TelemetryData {
39 fn convert_to_protobuf(
54 payload: Vec<u8>,
55 content_type: &str,
56 content_encoding: Option<&str>,
57 ) -> Result<Vec<u8>> {
58 tracing::debug!(
59 content_type = %content_type,
60 content_encoding = %content_encoding.unwrap_or("none"),
61 "Converting payload to protobuf"
62 );
63
64 let decompressed = if content_encoding == Some("gzip") {
66 tracing::debug!("Decompressing gzipped payload");
67 let mut decoder = GzDecoder::new(&payload[..]);
68 let mut decompressed = Vec::new();
69 decoder
70 .read_to_end(&mut decompressed)
71 .context("Failed to decompress payload")?;
72 decompressed
73 } else {
74 payload
75 };
76
77 match content_type {
79 "application/x-protobuf" => {
80 tracing::debug!("Payload already in protobuf format");
82 Ok(decompressed)
83 }
84 "application/json" => {
85 tracing::debug!("Converting JSON to protobuf");
87 Self::convert_json_to_protobuf(&decompressed)
88 }
89 _ => {
90 tracing::warn!(
92 content_type = %content_type,
93 "Unknown content type; keeping payload as is"
94 );
95 Ok(decompressed)
96 }
97 }
98 }
99
100 fn convert_json_to_protobuf(json_bytes: &[u8]) -> Result<Vec<u8>> {
106 let request: ExportTraceServiceRequest = serde_json::from_slice(json_bytes)
108 .context("Failed to parse JSON as ExportTraceServiceRequest")?;
109
110 let protobuf_bytes = request.encode_to_vec();
112
113 tracing::debug!(
114 payload_size_bytes = protobuf_bytes.len() as u64,
115 "Converted JSON to protobuf"
116 );
117
118 Ok(protobuf_bytes)
119 }
120
121 pub fn compress(&mut self, compression_level: u32) -> Result<()> {
126 if self.content_encoding != Some("gzip".to_string()) {
128 tracing::debug!(compression_level, "Compressing payload");
129
130 let original_size = self.payload.len();
131 let mut encoder = GzEncoder::new(Vec::new(), Compression::new(compression_level));
132 encoder
133 .write_all(&self.payload)
134 .context("Failed to compress payload")?;
135
136 self.payload = encoder.finish().context("Failed to finish compression")?;
137
138 self.content_encoding = Some("gzip".to_string());
139
140 tracing::debug!(
141 original_size_bytes = original_size as u64,
142 compressed_size_bytes = self.payload.len() as u64,
143 "Compressed payload"
144 );
145 }
146
147 Ok(())
148 }
149
150 pub fn from_log_record(record: ExporterOutput) -> Result<Self> {
152 let raw_payload = if record.base64 {
154 general_purpose::STANDARD
155 .decode(&record.payload)
156 .context("Failed to decode base64 payload")?
157 } else {
158 record.payload.as_bytes().to_vec()
159 };
160
161 let protobuf_payload = Self::convert_to_protobuf(
163 raw_payload,
164 &record.content_type,
165 Some(&record.content_encoding),
166 )?;
167
168 Ok(Self {
169 source: record.source.clone(),
170 endpoint: record.endpoint.to_string(),
171 payload: protobuf_payload,
172 content_type: "application/x-protobuf".to_string(),
173 content_encoding: None, })
175 }
176
177 pub fn from_raw_span(span: Value, log_group: &str) -> Result<Self> {
179 let json_string =
181 serde_json::to_string(&span).context("Failed to serialize span data to JSON string")?;
182
183 let raw_payload = json_string.as_bytes().to_vec();
184
185 let protobuf_payload = Self::convert_to_protobuf(raw_payload, "application/json", None)?;
187
188 Ok(Self {
189 source: log_group.to_string(),
190 endpoint: "http://localhost:4318/v1/traces".to_string(),
192 payload: protobuf_payload,
193 content_type: "application/x-protobuf".to_string(),
194 content_encoding: None, })
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202 use base64::{engine::general_purpose, Engine};
203 use flate2::{write::GzEncoder, Compression};
204 use otlp_stdout_span_exporter::ExporterOutput;
205 use serde_json::json;
206 use std::collections::HashMap;
207 use std::io::Write; fn create_test_payload() -> String {
211 let request = ExportTraceServiceRequest {
213 resource_spans: vec![],
214 };
215
216 let proto_bytes = request.encode_to_vec();
218
219 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
221 encoder.write_all(&proto_bytes).unwrap();
222 let compressed_bytes = encoder.finish().unwrap();
223
224 general_purpose::STANDARD.encode(compressed_bytes)
226 }
227
228 #[test]
229 fn test_from_log_record() {
230 let record = ExporterOutput {
231 version: "test".to_string(),
232 source: "test-service".to_string(),
233 endpoint: "http://example.com".to_string(),
234 method: "POST".to_string(),
235 payload: create_test_payload(),
236 headers: Some(HashMap::new()),
237 content_type: "application/x-protobuf".to_string(),
238 content_encoding: "gzip".to_string(),
239 base64: true,
240 level: Some("info".to_string()),
241 };
242
243 let telemetry = TelemetryData::from_log_record(record).unwrap();
244 assert_eq!(telemetry.source, "test-service");
245 assert_eq!(telemetry.endpoint, "http://example.com");
246 assert_eq!(telemetry.content_type, "application/x-protobuf");
247 assert_eq!(telemetry.content_encoding, None);
249 }
250
251 #[test]
252 fn test_from_raw_span() {
253 let span = json!({
255 "resourceSpans": []
256 });
257
258 let telemetry = TelemetryData::from_raw_span(span, "aws/spans").unwrap();
259 assert_eq!(telemetry.source, "aws/spans");
260 assert_eq!(telemetry.content_type, "application/x-protobuf");
261 assert_eq!(telemetry.content_encoding, None); }
263
264 #[test]
265 fn test_compress() {
266 let mut telemetry = TelemetryData {
268 source: "test".to_string(),
269 endpoint: "http://example.com".to_string(),
270 payload: vec![1, 2, 3, 4, 5],
271 content_type: "application/x-protobuf".to_string(),
272 content_encoding: None,
273 };
274
275 telemetry.compress(6).unwrap();
277
278 assert_eq!(telemetry.content_encoding, Some("gzip".to_string()));
280
281 let mut decoder = GzDecoder::new(&telemetry.payload[..]);
283 let mut decompressed = Vec::new();
284 decoder.read_to_end(&mut decompressed).unwrap();
285
286 assert_eq!(decompressed, vec![1, 2, 3, 4, 5]);
287 }
288
289 #[test]
290 fn test_convert_to_protobuf_already_protobuf() {
291 let original_payload = vec![1, 2, 3, 4];
293 let converted = TelemetryData::convert_to_protobuf(
294 original_payload.clone(),
295 "application/x-protobuf",
296 None,
297 )
298 .unwrap();
299
300 assert_eq!(converted, original_payload);
301 }
302
303 #[test]
304 fn test_convert_to_protobuf_from_json() {
305 let json_data = json!({
307 "resourceSpans": []
308 });
309 let json_bytes = serde_json::to_vec(&json_data).unwrap();
310
311 let converted =
312 TelemetryData::convert_to_protobuf(json_bytes, "application/json", None).unwrap();
313
314 let request = ExportTraceServiceRequest::decode(converted.as_slice()).unwrap();
316 assert_eq!(request.resource_spans.len(), 0);
317 }
318
319 #[test]
320 fn test_convert_to_protobuf_from_gzipped_json() {
321 let json_data = json!({
323 "resourceSpans": []
324 });
325 let json_bytes = serde_json::to_vec(&json_data).unwrap();
326
327 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
329 encoder.write_all(&json_bytes).unwrap();
330 let compressed = encoder.finish().unwrap();
331
332 let converted =
333 TelemetryData::convert_to_protobuf(compressed, "application/json", Some("gzip"))
334 .unwrap();
335
336 let request = ExportTraceServiceRequest::decode(converted.as_slice()).unwrap();
338 assert_eq!(request.resource_spans.len(), 0);
339 }
340}