1use std::{sync::Arc, time::Duration};
5
6use obs_core::{SchemaRegistry, Sink, registry::ScrubbedEnvelope, sink::SinkFut};
7use parking_lot::Mutex;
8use serde::{Deserialize, Serialize};
9
10use crate::{
11 OtlpError,
12 backpressure::RetryQueue,
13 batch::Batch,
14 env_config::{OtlpEndpoint, OtlpResourceAttrs},
15 logs::OtlpLogPayload,
16 metrics::OtlpMetricPayload,
17 traces::{OtlpTracePayload, SpanPairTracker},
18};
19
20const DEFAULT_BATCH_RECORDS: usize = 256;
21const DEFAULT_BATCH_AGE_MS: u64 = 1_000;
22const DEFAULT_RETRY_QUEUE: usize = 16_384;
23
24#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
26pub struct OtlpRetry {
27 pub max_attempts: u32,
29 pub initial_backoff_ms: u64,
31 pub max_backoff_ms: u64,
33}
34
35impl Default for OtlpRetry {
36 fn default() -> Self {
37 Self {
38 max_attempts: 5,
39 initial_backoff_ms: 100,
40 max_backoff_ms: 30_000,
41 }
42 }
43}
44
45pub trait OtlpExporter: Send + Sync + 'static {
48 fn export_logs(&self, payload: &OtlpLogPayload) -> Result<(), OtlpError>;
50 fn export_metrics(&self, payload: &OtlpMetricPayload) -> Result<(), OtlpError>;
52 fn export_traces(&self, payload: &OtlpTracePayload) -> Result<(), OtlpError>;
54}
55
56#[derive(Debug, Default, Clone, Copy)]
60pub struct StdoutDebugExporter;
61
62impl OtlpExporter for StdoutDebugExporter {
63 fn export_logs(&self, payload: &OtlpLogPayload) -> Result<(), OtlpError> {
64 match serde_json::to_string(payload) {
65 Ok(s) => {
66 println!("{s}");
67 Ok(())
68 }
69 Err(e) => Err(OtlpError::Transport(e.to_string())),
70 }
71 }
72 fn export_metrics(&self, payload: &OtlpMetricPayload) -> Result<(), OtlpError> {
73 match serde_json::to_string(payload) {
74 Ok(s) => {
75 println!("{s}");
76 Ok(())
77 }
78 Err(e) => Err(OtlpError::Transport(e.to_string())),
79 }
80 }
81 fn export_traces(&self, payload: &OtlpTracePayload) -> Result<(), OtlpError> {
82 match serde_json::to_string(payload) {
83 Ok(s) => {
84 println!("{s}");
85 Ok(())
86 }
87 Err(e) => Err(OtlpError::Transport(e.to_string())),
88 }
89 }
90}
91
92pub struct OtlpLogSink {
96 exporter: Arc<dyn OtlpExporter>,
97 batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
98 retry: Arc<RetryQueue<OtlpLogPayload>>,
99 resource: Arc<OtlpResourceAttrs>,
100 endpoint: Arc<OtlpEndpoint>,
101 retry_policy: OtlpRetry,
102 last_flush: Mutex<()>,
103}
104
105impl std::fmt::Debug for OtlpLogSink {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("OtlpLogSink")
108 .field("endpoint", &self.endpoint.url)
109 .field("retry", &self.retry_policy)
110 .finish_non_exhaustive()
111 }
112}
113
114#[derive(Default)]
116pub struct OtlpLogSinkBuilder {
117 exporter: Option<Arc<dyn OtlpExporter>>,
118 endpoint: Option<OtlpEndpoint>,
119 resource: Option<OtlpResourceAttrs>,
120 retry: Option<OtlpRetry>,
121}
122
123impl std::fmt::Debug for OtlpLogSinkBuilder {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("OtlpLogSinkBuilder").finish_non_exhaustive()
126 }
127}
128
129impl OtlpLogSink {
130 #[must_use]
132 pub fn builder() -> OtlpLogSinkBuilder {
133 OtlpLogSinkBuilder::default()
134 }
135
136 pub fn from_env() -> Result<Self, OtlpError> {
143 Self::builder()
144 .endpoint(crate::env_config::endpoint_from_env())
145 .resource(crate::env_config::resource_from_env())
146 .build()
147 }
148}
149
150impl OtlpLogSinkBuilder {
151 #[must_use]
153 pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
154 self.endpoint = Some(e);
155 self
156 }
157
158 #[must_use]
160 pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
161 self.resource = Some(r);
162 self
163 }
164
165 #[must_use]
167 pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
168 self.exporter = Some(e);
169 self
170 }
171
172 #[must_use]
174 pub fn retry(mut self, r: OtlpRetry) -> Self {
175 self.retry = Some(r);
176 self
177 }
178
179 pub fn build(self) -> Result<OtlpLogSink, OtlpError> {
185 let endpoint = self.endpoint.unwrap_or_default();
186 let resource = self.resource.unwrap_or_default();
187 let exporter = self
188 .exporter
189 .unwrap_or_else(|| Arc::new(StdoutDebugExporter));
190 let retry_policy = self.retry.unwrap_or_default();
191 Ok(OtlpLogSink {
192 exporter,
193 batch: Arc::new(Batch::new(
194 DEFAULT_BATCH_RECORDS,
195 Duration::from_millis(DEFAULT_BATCH_AGE_MS),
196 )),
197 retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
198 resource: Arc::new(resource),
199 endpoint: Arc::new(endpoint),
200 retry_policy,
201 last_flush: Mutex::new(()),
202 })
203 }
204}
205
206impl Sink for OtlpLogSink {
207 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
208 let mut owned = env.envelope().clone();
210 owned.payload = env.payload().to_vec();
211 if let Some(batch) = self.batch.push(owned) {
212 self.dispatch(batch);
213 }
214 }
215
216 fn flush(&self) -> SinkFut<'_> {
217 Box::pin(async move {
218 let leftover = self.batch.drain();
219 if !leftover.is_empty() {
220 self.dispatch(leftover);
221 }
222 })
223 }
224
225 fn shutdown(&self) -> SinkFut<'_> {
226 Box::pin(async move {
227 let leftover = self.batch.drain();
228 if !leftover.is_empty() {
229 self.dispatch(leftover);
230 }
231 while let Some(payload) = self.retry.pop() {
233 let _ = self.exporter.export_logs(&payload);
234 }
235 })
236 }
237}
238
239impl OtlpLogSink {
240 fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
245 let attrs = obs_core::observer().resource_attrs();
246 if attrs.service_name.is_empty()
247 && attrs.service_version.is_empty()
248 && attrs.extra.is_empty()
249 {
250 Arc::clone(&self.resource)
251 } else {
252 Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
253 }
254 }
255
256 fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
257 let resource = self.live_resource();
258 let payload = OtlpLogPayload::from_envelopes(&envelopes, &resource, &self.endpoint);
259 let _g = self.last_flush.lock();
260 match self.exporter.export_logs(&payload) {
261 Ok(()) => {}
262 Err(_) => {
263 let _ = self.retry.push(payload);
264 }
265 }
266 }
267
268 #[must_use]
270 pub fn retry_depth(&self) -> usize {
271 self.retry.depth()
272 }
273}
274
275pub struct OtlpMetricSink {
279 exporter: Arc<dyn OtlpExporter>,
280 batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
281 retry: Arc<RetryQueue<OtlpMetricPayload>>,
282 resource: Arc<OtlpResourceAttrs>,
283 endpoint: Arc<OtlpEndpoint>,
284 retry_policy: OtlpRetry,
285 registry: Arc<SchemaRegistry>,
288}
289
290impl std::fmt::Debug for OtlpMetricSink {
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 f.debug_struct("OtlpMetricSink")
293 .field("endpoint", &self.endpoint.url)
294 .field("retry", &self.retry_policy)
295 .finish_non_exhaustive()
296 }
297}
298
299#[derive(Default)]
301pub struct OtlpMetricSinkBuilder {
302 exporter: Option<Arc<dyn OtlpExporter>>,
303 endpoint: Option<OtlpEndpoint>,
304 resource: Option<OtlpResourceAttrs>,
305 retry: Option<OtlpRetry>,
306 registry: Option<Arc<SchemaRegistry>>,
307}
308
309impl std::fmt::Debug for OtlpMetricSinkBuilder {
310 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311 f.debug_struct("OtlpMetricSinkBuilder")
312 .finish_non_exhaustive()
313 }
314}
315
316impl OtlpMetricSink {
317 #[must_use]
319 pub fn builder() -> OtlpMetricSinkBuilder {
320 OtlpMetricSinkBuilder::default()
321 }
322
323 pub fn from_env() -> Result<Self, OtlpError> {
329 Self::builder()
330 .endpoint(crate::env_config::endpoint_from_env())
331 .resource(crate::env_config::resource_from_env())
332 .build()
333 }
334}
335
336impl OtlpMetricSinkBuilder {
337 #[must_use]
339 pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
340 self.endpoint = Some(e);
341 self
342 }
343
344 #[must_use]
346 pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
347 self.resource = Some(r);
348 self
349 }
350
351 #[must_use]
353 pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
354 self.exporter = Some(e);
355 self
356 }
357
358 #[must_use]
360 pub fn retry(mut self, r: OtlpRetry) -> Self {
361 self.retry = Some(r);
362 self
363 }
364
365 #[must_use]
370 pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
371 self.registry = Some(registry);
372 self
373 }
374
375 pub fn build(self) -> Result<OtlpMetricSink, OtlpError> {
381 let endpoint = self.endpoint.unwrap_or_default();
382 let resource = self.resource.unwrap_or_default();
383 let exporter = self
384 .exporter
385 .unwrap_or_else(|| Arc::new(StdoutDebugExporter));
386 let retry_policy = self.retry.unwrap_or_default();
387 let registry = self
388 .registry
389 .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
390 Ok(OtlpMetricSink {
391 exporter,
392 batch: Arc::new(Batch::new(
393 DEFAULT_BATCH_RECORDS,
394 Duration::from_millis(DEFAULT_BATCH_AGE_MS),
395 )),
396 retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
397 resource: Arc::new(resource),
398 endpoint: Arc::new(endpoint),
399 retry_policy,
400 registry,
401 })
402 }
403}
404
405impl Sink for OtlpMetricSink {
406 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
407 let mut owned = env.envelope().clone();
409 owned.payload = env.payload().to_vec();
410 if let Some(batch) = self.batch.push(owned) {
411 self.dispatch(batch);
412 }
413 }
414
415 fn flush(&self) -> SinkFut<'_> {
416 Box::pin(async move {
417 let leftover = self.batch.drain();
418 if !leftover.is_empty() {
419 self.dispatch(leftover);
420 }
421 })
422 }
423
424 fn shutdown(&self) -> SinkFut<'_> {
425 Box::pin(async move {
426 let leftover = self.batch.drain();
427 if !leftover.is_empty() {
428 self.dispatch(leftover);
429 }
430 while let Some(payload) = self.retry.pop() {
431 let _ = self.exporter.export_metrics(&payload);
432 }
433 })
434 }
435}
436
437impl OtlpMetricSink {
438 fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
439 let attrs = obs_core::observer().resource_attrs();
440 if attrs.service_name.is_empty()
441 && attrs.service_version.is_empty()
442 && attrs.extra.is_empty()
443 {
444 Arc::clone(&self.resource)
445 } else {
446 Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
447 }
448 }
449
450 fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
451 let resource = self.live_resource();
452 let payload = OtlpMetricPayload::from_envelopes(
453 &envelopes,
454 &resource,
455 &self.endpoint,
456 &self.registry,
457 );
458 match self.exporter.export_metrics(&payload) {
459 Ok(()) => {}
460 Err(_) => {
461 let _ = self.retry.push(payload);
462 }
463 }
464 }
465}
466
467pub struct OtlpTraceSink {
471 exporter: Arc<dyn OtlpExporter>,
472 batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
473 retry: Arc<RetryQueue<OtlpTracePayload>>,
474 resource: Arc<OtlpResourceAttrs>,
475 endpoint: Arc<OtlpEndpoint>,
476 retry_policy: OtlpRetry,
477 pair_tracker: Arc<SpanPairTracker>,
478 registry: Arc<SchemaRegistry>,
481}
482
483impl std::fmt::Debug for OtlpTraceSink {
484 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485 f.debug_struct("OtlpTraceSink")
486 .field("endpoint", &self.endpoint.url)
487 .field("retry", &self.retry_policy)
488 .finish_non_exhaustive()
489 }
490}
491
492#[derive(Default)]
494pub struct OtlpTraceSinkBuilder {
495 exporter: Option<Arc<dyn OtlpExporter>>,
496 endpoint: Option<OtlpEndpoint>,
497 resource: Option<OtlpResourceAttrs>,
498 retry: Option<OtlpRetry>,
499 registry: Option<Arc<SchemaRegistry>>,
500 pair_timeout: Option<Duration>,
501}
502
503impl std::fmt::Debug for OtlpTraceSinkBuilder {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct("OtlpTraceSinkBuilder")
506 .finish_non_exhaustive()
507 }
508}
509
510impl OtlpTraceSink {
511 #[must_use]
513 pub fn builder() -> OtlpTraceSinkBuilder {
514 OtlpTraceSinkBuilder::default()
515 }
516
517 pub fn from_env() -> Result<Self, OtlpError> {
523 Self::builder()
524 .endpoint(crate::env_config::endpoint_from_env())
525 .resource(crate::env_config::resource_from_env())
526 .build()
527 }
528}
529
530impl OtlpTraceSinkBuilder {
531 #[must_use]
533 pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
534 self.endpoint = Some(e);
535 self
536 }
537 #[must_use]
539 pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
540 self.resource = Some(r);
541 self
542 }
543 #[must_use]
545 pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
546 self.exporter = Some(e);
547 self
548 }
549 #[must_use]
551 pub fn retry(mut self, r: OtlpRetry) -> Self {
552 self.retry = Some(r);
553 self
554 }
555
556 #[must_use]
559 pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
560 self.registry = Some(registry);
561 self
562 }
563
564 #[must_use]
566 pub fn pair_timeout(mut self, timeout: Duration) -> Self {
567 self.pair_timeout = Some(timeout);
568 self
569 }
570
571 pub fn build(self) -> Result<OtlpTraceSink, OtlpError> {
577 let endpoint = self.endpoint.unwrap_or_default();
578 let resource = self.resource.unwrap_or_default();
579 let exporter = self
580 .exporter
581 .unwrap_or_else(|| Arc::new(StdoutDebugExporter));
582 let retry_policy = self.retry.unwrap_or_default();
583 let registry = self
584 .registry
585 .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
586 let pair_timeout = self
587 .pair_timeout
588 .unwrap_or(crate::traces::DEFAULT_PAIR_TIMEOUT);
589 Ok(OtlpTraceSink {
590 exporter,
591 batch: Arc::new(Batch::new(
592 DEFAULT_BATCH_RECORDS,
593 Duration::from_millis(DEFAULT_BATCH_AGE_MS),
594 )),
595 retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
596 resource: Arc::new(resource),
597 endpoint: Arc::new(endpoint),
598 retry_policy,
599 pair_tracker: Arc::new(SpanPairTracker::with_timeout(pair_timeout)),
600 registry,
601 })
602 }
603}
604
605impl Sink for OtlpTraceSink {
606 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
607 let mut owned = env.envelope().clone();
609 owned.payload = env.payload().to_vec();
610 if let Some(batch) = self.batch.push(owned) {
611 self.dispatch(batch);
612 }
613 }
614
615 fn flush(&self) -> SinkFut<'_> {
616 Box::pin(async move {
617 let leftover = self.batch.drain();
618 if !leftover.is_empty() {
619 self.dispatch(leftover);
620 }
621 })
622 }
623
624 fn shutdown(&self) -> SinkFut<'_> {
625 Box::pin(async move {
626 let leftover = self.batch.drain();
627 if !leftover.is_empty() {
628 self.dispatch(leftover);
629 }
630 while let Some(payload) = self.retry.pop() {
631 let _ = self.exporter.export_traces(&payload);
632 }
633 })
634 }
635}
636
637impl OtlpTraceSink {
638 fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
639 let attrs = obs_core::observer().resource_attrs();
640 if attrs.service_name.is_empty()
641 && attrs.service_version.is_empty()
642 && attrs.extra.is_empty()
643 {
644 Arc::clone(&self.resource)
645 } else {
646 Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
647 }
648 }
649
650 fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
651 let resource = self.live_resource();
652 let payload = OtlpTracePayload::from_envelopes(
653 &envelopes,
654 &resource,
655 &self.endpoint,
656 &self.pair_tracker,
657 &self.registry,
658 );
659 for full_name in &payload.orphaned {
662 obs_core::self_events_public::emit_span_pair_orphaned(full_name);
663 }
664 match self.exporter.export_traces(&payload) {
665 Ok(()) => {}
666 Err(_) => {
667 let _ = self.retry.push(payload);
668 }
669 }
670 }
671}