1#![warn(missing_docs)]
36
37use std::io::{Read, Write};
38use std::net::TcpStream;
39use std::sync::Mutex;
40use std::time::Duration;
41
42use zerodds_foundation::observability::{Component, Event, Level};
43use zerodds_foundation::tracing::{Histogram, Span, SpanKind, SpanStatus};
44
45pub const DEFAULT_OTLP_HOST: &str = "127.0.0.1";
47pub const DEFAULT_OTLP_PORT: u16 = 4318;
49
50#[derive(Clone, Debug)]
52pub struct OtlpConfig {
53 pub host: String,
55 pub port: u16,
57 pub service_name: String,
59 pub service_version: String,
61 pub timeout: Duration,
63}
64
65impl Default for OtlpConfig {
66 fn default() -> Self {
67 Self {
68 host: DEFAULT_OTLP_HOST.into(),
69 port: DEFAULT_OTLP_PORT,
70 service_name: "zerodds".into(),
71 service_version: env!("CARGO_PKG_VERSION").into(),
72 timeout: Duration::from_secs(5),
73 }
74 }
75}
76
77pub struct OtlpExporter {
80 cfg: OtlpConfig,
81 buf: Mutex<ExporterBuffers>,
82}
83
84#[derive(Default)]
85struct ExporterBuffers {
86 spans: Vec<Span>,
87 histograms: Vec<Histogram>,
88 events: Vec<Event>,
89}
90
91#[derive(Debug)]
93pub enum ExportError {
94 Io(std::io::Error),
96 HttpStatus {
98 code: u16,
100 body_snippet: String,
102 },
103 Poisoned,
105}
106
107impl std::fmt::Display for ExportError {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 match self {
110 Self::Io(e) => write!(f, "io: {e}"),
111 Self::HttpStatus { code, body_snippet } => {
112 write!(f, "http {code}: {body_snippet}")
113 }
114 Self::Poisoned => write!(f, "exporter mutex poisoned"),
115 }
116 }
117}
118
119impl std::error::Error for ExportError {}
120
121impl OtlpExporter {
122 #[must_use]
124 pub fn new(cfg: OtlpConfig) -> Self {
125 Self {
126 cfg,
127 buf: Mutex::new(ExporterBuffers::default()),
128 }
129 }
130
131 pub fn add_span(&self, span: Span) {
133 if let Ok(mut b) = self.buf.lock() {
134 b.spans.push(span);
135 }
136 }
137
138 pub fn add_histogram(&self, h: Histogram) {
140 if let Ok(mut b) = self.buf.lock() {
141 b.histograms.push(h);
142 }
143 }
144
145 pub fn add_event(&self, e: Event) {
147 if let Ok(mut b) = self.buf.lock() {
148 b.events.push(e);
149 }
150 }
151
152 pub fn flush(&self) -> Result<(), ExportError> {
158 let (spans, histograms, events) = {
159 let mut b = self.buf.lock().map_err(|_| ExportError::Poisoned)?;
160 (
161 std::mem::take(&mut b.spans),
162 std::mem::take(&mut b.histograms),
163 std::mem::take(&mut b.events),
164 )
165 };
166 if !spans.is_empty() {
167 let body = build_traces_json(&self.cfg, &spans);
168 self.post("/v1/traces", &body)?;
169 }
170 if !histograms.is_empty() {
171 let body = build_metrics_json(&self.cfg, &histograms);
172 self.post("/v1/metrics", &body)?;
173 }
174 if !events.is_empty() {
175 let body = build_logs_json(&self.cfg, &events);
176 self.post("/v1/logs", &body)?;
177 }
178 Ok(())
179 }
180
181 fn post(&self, path: &str, body: &str) -> Result<(), ExportError> {
183 let addr = format!("{}:{}", self.cfg.host, self.cfg.port);
184 let mut stream = TcpStream::connect(&addr).map_err(ExportError::Io)?;
185 stream.set_write_timeout(Some(self.cfg.timeout)).ok();
186 stream.set_read_timeout(Some(self.cfg.timeout)).ok();
187 let req = format!(
188 "POST {} HTTP/1.1\r\n\
189 Host: {}\r\n\
190 User-Agent: zerodds-otlp/0.1\r\n\
191 Content-Type: application/json\r\n\
192 Content-Length: {}\r\n\
193 Connection: close\r\n\r\n{}",
194 path,
195 self.cfg.host,
196 body.len(),
197 body
198 );
199 stream.write_all(req.as_bytes()).map_err(ExportError::Io)?;
200 let mut resp = Vec::new();
201 let _ = stream.read_to_end(&mut resp);
202 let resp_str = String::from_utf8_lossy(&resp);
203 let (code, body_start) = parse_http_status(&resp_str);
205 if !(200..300).contains(&code) {
206 let snippet: String = resp_str[body_start.min(resp_str.len())..]
207 .chars()
208 .take(256)
209 .collect();
210 return Err(ExportError::HttpStatus {
211 code,
212 body_snippet: snippet,
213 });
214 }
215 Ok(())
216 }
217}
218
219fn parse_http_status(resp: &str) -> (u16, usize) {
220 if let Some(line_end) = resp.find('\n') {
221 let first_line = &resp[..line_end];
222 let parts: Vec<&str> = first_line.split_whitespace().collect();
223 if parts.len() >= 2 {
224 let code: u16 = parts[1].parse().unwrap_or(0);
225 let body_start = resp.find("\r\n\r\n").map(|i| i + 4).unwrap_or(resp.len());
226 return (code, body_start);
227 }
228 }
229 (0, resp.len())
230}
231
232fn build_traces_json(cfg: &OtlpConfig, spans: &[Span]) -> String {
237 let mut out = String::with_capacity(512 + spans.len() * 256);
239 out.push_str(r#"{"resourceSpans":[{"resource":"#);
240 push_resource(&mut out, cfg);
241 out.push_str(r#","scopeSpans":[{"scope":{"name":"zerodds","version":""#);
242 push_str_escaped(&mut out, &cfg.service_version);
243 out.push_str(r#""},"spans":["#);
244 for (i, s) in spans.iter().enumerate() {
245 if i > 0 {
246 out.push(',');
247 }
248 push_span(&mut out, s);
249 }
250 out.push_str("]}]}]}");
251 out
252}
253
254fn build_metrics_json(cfg: &OtlpConfig, hists: &[Histogram]) -> String {
255 let mut out = String::with_capacity(512 + hists.len() * 256);
256 out.push_str(r#"{"resourceMetrics":[{"resource":"#);
257 push_resource(&mut out, cfg);
258 out.push_str(r#","scopeMetrics":[{"scope":{"name":"zerodds","version":""#);
259 push_str_escaped(&mut out, &cfg.service_version);
260 out.push_str(r#""},"metrics":["#);
261 for (i, h) in hists.iter().enumerate() {
262 if i > 0 {
263 out.push(',');
264 }
265 push_histogram(&mut out, h);
266 }
267 out.push_str("]}]}]}");
268 out
269}
270
271fn build_logs_json(cfg: &OtlpConfig, events: &[Event]) -> String {
272 let mut out = String::with_capacity(512 + events.len() * 200);
273 out.push_str(r#"{"resourceLogs":[{"resource":"#);
274 push_resource(&mut out, cfg);
275 out.push_str(r#","scopeLogs":[{"scope":{"name":"zerodds","version":""#);
276 push_str_escaped(&mut out, &cfg.service_version);
277 out.push_str(r#""},"logRecords":["#);
278 for (i, e) in events.iter().enumerate() {
279 if i > 0 {
280 out.push(',');
281 }
282 push_event(&mut out, e);
283 }
284 out.push_str("]}]}]}");
285 out
286}
287
288fn push_resource(out: &mut String, cfg: &OtlpConfig) {
289 out.push_str(r#"{"attributes":[{"key":"service.name","value":{"stringValue":""#);
290 push_str_escaped(out, &cfg.service_name);
291 out.push_str(r#""}},{"key":"service.version","value":{"stringValue":""#);
292 push_str_escaped(out, &cfg.service_version);
293 out.push_str(r#""}}]}"#);
294}
295
296fn push_span(out: &mut String, s: &Span) {
297 out.push_str(r#"{"traceId":""#);
298 out.push_str(&s.context.trace_id.to_hex());
299 out.push_str(r#"","spanId":""#);
300 out.push_str(&s.context.span_id.to_hex());
301 out.push_str(r#"","name":""#);
302 push_str_escaped(out, &s.name);
303 out.push_str(r#"","kind":"#);
304 out.push_str(match s.kind {
305 SpanKind::Internal => "1",
306 SpanKind::Server => "2",
307 SpanKind::Client => "3",
308 });
309 out.push_str(r#","startTimeUnixNano":""#);
310 push_u64(out, s.start_unix_ns);
311 out.push_str(r#"","endTimeUnixNano":""#);
312 push_u64(out, s.end_unix_ns);
313 out.push('"');
314 if let Some(p) = s.context.parent_span_id {
315 out.push_str(r#","parentSpanId":""#);
316 out.push_str(&p.to_hex());
317 out.push('"');
318 }
319 if let Some(d) = &s.status_description {
320 out.push_str(r#","status":{"code":"#);
321 out.push_str(match s.status {
322 SpanStatus::Unset => "0",
323 SpanStatus::Ok => "1",
324 SpanStatus::Error => "2",
325 });
326 out.push_str(r#","message":""#);
327 push_str_escaped(out, d);
328 out.push_str(r#""}"#);
329 } else {
330 out.push_str(r#","status":{"code":"#);
331 out.push_str(match s.status {
332 SpanStatus::Unset => "0",
333 SpanStatus::Ok => "1",
334 SpanStatus::Error => "2",
335 });
336 out.push('}');
337 }
338 if !s.attributes.is_empty() {
339 out.push_str(r#","attributes":["#);
340 for (i, a) in s.attributes.iter().enumerate() {
341 if i > 0 {
342 out.push(',');
343 }
344 out.push_str(r#"{"key":""#);
345 push_str_escaped(out, a.key);
346 out.push_str(r#"","value":{"stringValue":""#);
347 push_str_escaped(out, &a.value);
348 out.push_str(r#""}}"#);
349 }
350 out.push(']');
351 }
352 out.push('}');
353}
354
355fn push_histogram(out: &mut String, h: &Histogram) {
356 out.push_str(r#"{"name":""#);
357 push_str_escaped(out, &h.name);
358 out.push_str(r#"","unit":"ns","histogram":{"aggregationTemporality":2,"dataPoints":[{"#);
359 out.push_str(r#""startTimeUnixNano":"0","timeUnixNano":"0","count":""#);
360 push_u64(out, h.count);
361 out.push_str(r#"","sum":"#);
362 push_u64(out, h.sum_ns);
363 out.push_str(r#","min":"#);
364 push_u64(out, if h.count == 0 { 0 } else { h.min_ns });
365 out.push_str(r#","max":"#);
366 push_u64(out, h.max_ns);
367 out.push_str(r#","explicitBounds":["#);
368 let bounds = Histogram::bucket_bounds();
369 for (i, b) in bounds.iter().take(bounds.len() - 1).enumerate() {
372 if i > 0 {
373 out.push(',');
374 }
375 push_u64(out, *b);
376 }
377 out.push_str(r#"],"bucketCounts":["#);
378 for (i, c) in h.buckets.iter().enumerate() {
379 if i > 0 {
380 out.push(',');
381 }
382 out.push('"');
383 push_u64(out, *c);
384 out.push('"');
385 }
386 out.push_str("]}]}}");
387}
388
389fn push_event(out: &mut String, e: &Event) {
390 out.push_str(r#"{"timeUnixNano":"0","severityNumber":"#);
391 out.push_str(match e.level {
392 Level::Info => "9",
393 Level::Warn => "13",
394 Level::Error => "17",
395 });
396 out.push_str(r#","severityText":""#);
397 out.push_str(match e.level {
398 Level::Info => "INFO",
399 Level::Warn => "WARN",
400 Level::Error => "ERROR",
401 });
402 out.push_str(r#"","body":{"stringValue":""#);
403 push_str_escaped(out, e.name);
404 out.push_str(r#""},"attributes":[{"key":"component","value":{"stringValue":""#);
405 out.push_str(component_str(e.component));
406 out.push_str(r#""}}"#);
407 for a in &e.attrs {
408 out.push_str(r#",{"key":""#);
409 push_str_escaped(out, a.key);
410 out.push_str(r#"","value":{"stringValue":""#);
411 push_str_escaped(out, &a.value);
412 out.push_str(r#""}}"#);
413 }
414 out.push_str("]}");
415}
416
417fn component_str(c: Component) -> &'static str {
418 c.as_str()
419}
420
421fn push_u64(out: &mut String, v: u64) {
422 use std::fmt::Write as _;
423 let _ = write!(out, "{v}");
424}
425
426fn push_str_escaped(out: &mut String, s: &str) {
427 for c in s.chars() {
428 match c {
429 '"' => out.push_str(r#"\""#),
430 '\\' => out.push_str(r"\\"),
431 '\n' => out.push_str(r"\n"),
432 '\r' => out.push_str(r"\r"),
433 '\t' => out.push_str(r"\t"),
434 c if (c as u32) < 0x20 => {
435 use std::fmt::Write as _;
436 let _ = write!(out, "\\u{:04x}", c as u32);
437 }
438 c => out.push(c),
439 }
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use zerodds_foundation::observability::Event;
447 use zerodds_foundation::tracing::{Span, SpanContext, SpanId, TraceId};
448
449 #[test]
450 fn config_default_points_to_localhost() {
451 let c = OtlpConfig::default();
452 assert_eq!(c.host, "127.0.0.1");
453 assert_eq!(c.port, 4318);
454 assert_eq!(c.service_name, "zerodds");
455 }
456
457 #[test]
458 fn traces_json_roundtrip_shape() {
459 let span = Span {
460 context: SpanContext::new_root(TraceId([1u8; 16]), SpanId([2u8; 8])),
461 name: "dcps.write".into(),
462 kind: SpanKind::Client,
463 start_unix_ns: 1_700_000_000_000_000_000,
464 end_unix_ns: 1_700_000_000_001_500_000,
465 status: SpanStatus::Ok,
466 status_description: None,
467 attributes: Vec::new(),
468 };
469 let cfg = OtlpConfig::default();
470 let body = build_traces_json(&cfg, &[span]);
471 assert!(body.contains(r#""traceId":"01010101010101010101010101010101""#));
472 assert!(body.contains(r#""spanId":"0202020202020202""#));
473 assert!(body.contains(r#""name":"dcps.write""#));
474 assert!(body.contains(r#""kind":3"#)); assert!(body.contains(r#""service.name""#));
476 }
477
478 #[test]
479 fn metrics_json_roundtrip_shape() {
480 let mut h = Histogram::new("dds.write.latency");
481 h.record_ns(500);
482 h.record_ns(50_000);
483 let cfg = OtlpConfig::default();
484 let body = build_metrics_json(&cfg, &[h]);
485 assert!(body.contains(r#""name":"dds.write.latency""#));
486 assert!(body.contains(r#""unit":"ns""#));
487 assert!(body.contains(r#""count":"2""#));
488 assert!(body.contains(r#""explicitBounds":[1,10,100,1000,10000,100000,"#));
489 }
490
491 #[test]
492 fn logs_json_roundtrip_shape() {
493 let e = Event::new(Level::Info, Component::Dcps, "writer.created").with_attr("topic", "/x");
494 let cfg = OtlpConfig::default();
495 let body = build_logs_json(&cfg, &[e]);
496 assert!(body.contains(r#""severityNumber":9"#));
497 assert!(body.contains(r#""body":{"stringValue":"writer.created"}"#));
498 assert!(body.contains(r#""key":"topic""#));
499 }
500
501 #[test]
502 fn json_escape_handles_quotes_and_newlines() {
503 let mut s = String::new();
504 push_str_escaped(&mut s, "a\"b\nc\\d");
505 assert_eq!(s, r#"a\"b\nc\\d"#);
506 }
507
508 #[test]
509 fn parse_http_status_extracts_code() {
510 let r = "HTTP/1.1 200 OK\r\nServer: x\r\n\r\n{}";
511 let (code, body_start) = parse_http_status(r);
512 assert_eq!(code, 200);
513 assert_eq!(&r[body_start..], "{}");
514 }
515
516 #[test]
517 fn parse_http_status_handles_500() {
518 let r = "HTTP/1.1 500 Internal Server Error\r\n\r\nboom";
519 let (code, _) = parse_http_status(r);
520 assert_eq!(code, 500);
521 }
522
523 #[test]
524 fn flush_drains_buffers_even_with_no_collector() {
525 let cfg = OtlpConfig {
526 host: "127.0.0.1".into(),
527 port: 1, timeout: Duration::from_millis(50),
529 ..OtlpConfig::default()
530 };
531 let exp = OtlpExporter::new(cfg);
532 exp.add_span(Span {
533 context: SpanContext::new_root(TraceId([1u8; 16]), SpanId([2u8; 8])),
534 name: "x".into(),
535 kind: SpanKind::Internal,
536 start_unix_ns: 0,
537 end_unix_ns: 1,
538 status: SpanStatus::Unset,
539 status_description: None,
540 attributes: Vec::new(),
541 });
542 let r = exp.flush();
543 assert!(r.is_err());
545 let r2 = exp.flush();
547 assert!(r2.is_ok(), "second flush with empty buffers should be ok");
548 }
549}