Skip to main content

obs_otel/
sink.rs

1//! OTLP sink wrappers around the per-record projection types in
2//! `mapping`.
3
4use std::{sync::Arc, time::Duration};
5
6use obs_core::{SchemaRegistry, Sink, registry::ScrubbedEnvelope, sink::SinkFut};
7use parking_lot::Mutex;
8use serde::{Deserialize, Serialize};
9
10use crate::{
11    OtlpError,
12    backpressure::RetryQueue,
13    batch::Batch,
14    env_config::{OtlpEndpoint, OtlpResourceAttrs},
15    logs::OtlpLogPayload,
16    metrics::OtlpMetricPayload,
17    traces::{OtlpTracePayload, SpanPairTracker},
18};
19
20const DEFAULT_BATCH_RECORDS: usize = 256;
21const DEFAULT_BATCH_AGE_MS: u64 = 1_000;
22const DEFAULT_RETRY_QUEUE: usize = 16_384;
23
24/// Retry policy. Spec 20 § 4.1.
25#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
26pub struct OtlpRetry {
27    /// Maximum attempts including the first.
28    pub max_attempts: u32,
29    /// Initial backoff before retry.
30    pub initial_backoff_ms: u64,
31    /// Cap on the exponential backoff.
32    pub max_backoff_ms: u64,
33}
34
35impl Default for OtlpRetry {
36    fn default() -> Self {
37        Self {
38            max_attempts: 5,
39            initial_backoff_ms: 100,
40            max_backoff_ms: 30_000,
41        }
42    }
43}
44
45/// Pluggable transport. Implementors translate the structured payload
46/// into a network call (gRPC, HTTP+protobuf, HTTP+JSON, …).
47pub trait OtlpExporter: Send + Sync + 'static {
48    /// Send one batch of log records.
49    fn export_logs(&self, payload: &OtlpLogPayload) -> Result<(), OtlpError>;
50    /// Send one batch of metric data points.
51    fn export_metrics(&self, payload: &OtlpMetricPayload) -> Result<(), OtlpError>;
52    /// Send one batch of span records.
53    fn export_traces(&self, payload: &OtlpTracePayload) -> Result<(), OtlpError>;
54}
55
56/// Default exporter — writes JSON to stdout. Useful for the
57/// `apps/server` end-to-end demo, the unit tests, and as a fallback
58/// when no OTLP endpoint is wired in.
59#[derive(Debug, Default, Clone, Copy)]
60pub struct StdoutDebugExporter;
61
62impl OtlpExporter for StdoutDebugExporter {
63    fn export_logs(&self, payload: &OtlpLogPayload) -> Result<(), OtlpError> {
64        match serde_json::to_string(payload) {
65            Ok(s) => {
66                println!("{s}");
67                Ok(())
68            }
69            Err(e) => Err(OtlpError::Transport(e.to_string())),
70        }
71    }
72    fn export_metrics(&self, payload: &OtlpMetricPayload) -> Result<(), OtlpError> {
73        match serde_json::to_string(payload) {
74            Ok(s) => {
75                println!("{s}");
76                Ok(())
77            }
78            Err(e) => Err(OtlpError::Transport(e.to_string())),
79        }
80    }
81    fn export_traces(&self, payload: &OtlpTracePayload) -> Result<(), OtlpError> {
82        match serde_json::to_string(payload) {
83            Ok(s) => {
84                println!("{s}");
85                Ok(())
86            }
87            Err(e) => Err(OtlpError::Transport(e.to_string())),
88        }
89    }
90}
91
92// ─── OtlpLogSink ──────────────────────────────────────────────────────
93
94/// OTLP log-tier sink. Spec 20 § 2.3.
95pub struct OtlpLogSink {
96    exporter: Arc<dyn OtlpExporter>,
97    batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
98    retry: Arc<RetryQueue<OtlpLogPayload>>,
99    resource: Arc<OtlpResourceAttrs>,
100    endpoint: Arc<OtlpEndpoint>,
101    retry_policy: OtlpRetry,
102    last_flush: Mutex<()>,
103}
104
105impl std::fmt::Debug for OtlpLogSink {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("OtlpLogSink")
108            .field("endpoint", &self.endpoint.url)
109            .field("retry", &self.retry_policy)
110            .finish_non_exhaustive()
111    }
112}
113
114/// Builder for [`OtlpLogSink`].
115#[derive(Default)]
116pub struct OtlpLogSinkBuilder {
117    exporter: Option<Arc<dyn OtlpExporter>>,
118    endpoint: Option<OtlpEndpoint>,
119    resource: Option<OtlpResourceAttrs>,
120    retry: Option<OtlpRetry>,
121}
122
123impl std::fmt::Debug for OtlpLogSinkBuilder {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct("OtlpLogSinkBuilder").finish_non_exhaustive()
126    }
127}
128
129impl OtlpLogSink {
130    /// Builder entry.
131    #[must_use]
132    pub fn builder() -> OtlpLogSinkBuilder {
133        OtlpLogSinkBuilder::default()
134    }
135
136    /// Convenience: construct the sink with defaults from env vars.
137    ///
138    /// # Errors
139    ///
140    /// Returns `OtlpError::Config` when env-var parsing finds an
141    /// invalid setting.
142    pub fn from_env() -> Result<Self, OtlpError> {
143        Self::builder()
144            .endpoint(crate::env_config::endpoint_from_env())
145            .resource(crate::env_config::resource_from_env())
146            .build()
147    }
148}
149
150impl OtlpLogSinkBuilder {
151    /// Set the endpoint.
152    #[must_use]
153    pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
154        self.endpoint = Some(e);
155        self
156    }
157
158    /// Set the resource attributes.
159    #[must_use]
160    pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
161        self.resource = Some(r);
162        self
163    }
164
165    /// Override the exporter; default is [`StdoutDebugExporter`].
166    #[must_use]
167    pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
168        self.exporter = Some(e);
169        self
170    }
171
172    /// Override the retry policy.
173    #[must_use]
174    pub fn retry(mut self, r: OtlpRetry) -> Self {
175        self.retry = Some(r);
176        self
177    }
178
179    /// Finalise.
180    ///
181    /// # Errors
182    ///
183    /// Returns `OtlpError::Config` when required fields are missing.
184    pub fn build(self) -> Result<OtlpLogSink, OtlpError> {
185        let endpoint = self.endpoint.unwrap_or_default();
186        let resource = self.resource.unwrap_or_default();
187        let exporter = self
188            .exporter
189            .unwrap_or_else(|| Arc::new(StdoutDebugExporter));
190        let retry_policy = self.retry.unwrap_or_default();
191        Ok(OtlpLogSink {
192            exporter,
193            batch: Arc::new(Batch::new(
194                DEFAULT_BATCH_RECORDS,
195                Duration::from_millis(DEFAULT_BATCH_AGE_MS),
196            )),
197            retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
198            resource: Arc::new(resource),
199            endpoint: Arc::new(endpoint),
200            retry_policy,
201            last_flush: Mutex::new(()),
202        })
203    }
204}
205
206impl Sink for OtlpLogSink {
207    fn deliver(&self, env: ScrubbedEnvelope<'_>) {
208        // Spec 14 § 5 / spec 93 P0-8: ship the *scrubbed* payload.
209        let mut owned = env.envelope().clone();
210        owned.payload = env.payload().to_vec();
211        if let Some(batch) = self.batch.push(owned) {
212            self.dispatch(batch);
213        }
214    }
215
216    fn flush(&self) -> SinkFut<'_> {
217        Box::pin(async move {
218            let leftover = self.batch.drain();
219            if !leftover.is_empty() {
220                self.dispatch(leftover);
221            }
222        })
223    }
224
225    fn shutdown(&self) -> SinkFut<'_> {
226        Box::pin(async move {
227            let leftover = self.batch.drain();
228            if !leftover.is_empty() {
229                self.dispatch(leftover);
230            }
231            // Drain retry queue best-effort.
232            while let Some(payload) = self.retry.pop() {
233                let _ = self.exporter.export_logs(&payload);
234            }
235        })
236    }
237}
238
239impl OtlpLogSink {
240    /// Resolve the resource attribute snapshot to ship with this
241    /// batch: prefer the live observer-held set (spec 94 § 2.7) and
242    /// fall back to the sink's builder-provided set when no observer
243    /// exposes one (e.g. unit tests with `NoopObserver`).
244    fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
245        let attrs = obs_core::observer().resource_attrs();
246        if attrs.service_name.is_empty()
247            && attrs.service_version.is_empty()
248            && attrs.extra.is_empty()
249        {
250            Arc::clone(&self.resource)
251        } else {
252            Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
253        }
254    }
255
256    fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
257        let resource = self.live_resource();
258        let payload = OtlpLogPayload::from_envelopes(&envelopes, &resource, &self.endpoint);
259        let _g = self.last_flush.lock();
260        match self.exporter.export_logs(&payload) {
261            Ok(()) => {}
262            Err(_) => {
263                let _ = self.retry.push(payload);
264            }
265        }
266    }
267
268    /// Test helper.
269    #[must_use]
270    pub fn retry_depth(&self) -> usize {
271        self.retry.depth()
272    }
273}
274
275// ─── OtlpMetricSink ───────────────────────────────────────────────────
276
277/// OTLP metric-tier sink. Spec 20 § 2.4.
278pub struct OtlpMetricSink {
279    exporter: Arc<dyn OtlpExporter>,
280    batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
281    retry: Arc<RetryQueue<OtlpMetricPayload>>,
282    resource: Arc<OtlpResourceAttrs>,
283    endpoint: Arc<OtlpEndpoint>,
284    retry_policy: OtlpRetry,
285    /// Schema lookup used to call `EventSchema::project_metrics` per
286    /// envelope. Spec 93 P1-6.
287    registry: Arc<SchemaRegistry>,
288}
289
290impl std::fmt::Debug for OtlpMetricSink {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        f.debug_struct("OtlpMetricSink")
293            .field("endpoint", &self.endpoint.url)
294            .field("retry", &self.retry_policy)
295            .finish_non_exhaustive()
296    }
297}
298
299/// Builder for [`OtlpMetricSink`].
300#[derive(Default)]
301pub struct OtlpMetricSinkBuilder {
302    exporter: Option<Arc<dyn OtlpExporter>>,
303    endpoint: Option<OtlpEndpoint>,
304    resource: Option<OtlpResourceAttrs>,
305    retry: Option<OtlpRetry>,
306    registry: Option<Arc<SchemaRegistry>>,
307}
308
309impl std::fmt::Debug for OtlpMetricSinkBuilder {
310    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311        f.debug_struct("OtlpMetricSinkBuilder")
312            .finish_non_exhaustive()
313    }
314}
315
316impl OtlpMetricSink {
317    /// Builder entry.
318    #[must_use]
319    pub fn builder() -> OtlpMetricSinkBuilder {
320        OtlpMetricSinkBuilder::default()
321    }
322
323    /// `from_env` convenience.
324    ///
325    /// # Errors
326    ///
327    /// See [`OtlpLogSink::from_env`].
328    pub fn from_env() -> Result<Self, OtlpError> {
329        Self::builder()
330            .endpoint(crate::env_config::endpoint_from_env())
331            .resource(crate::env_config::resource_from_env())
332            .build()
333    }
334}
335
336impl OtlpMetricSinkBuilder {
337    /// Set the endpoint.
338    #[must_use]
339    pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
340        self.endpoint = Some(e);
341        self
342    }
343
344    /// Set the resource attributes.
345    #[must_use]
346    pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
347        self.resource = Some(r);
348        self
349    }
350
351    /// Override the exporter.
352    #[must_use]
353    pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
354        self.exporter = Some(e);
355        self
356    }
357
358    /// Override the retry policy.
359    #[must_use]
360    pub fn retry(mut self, r: OtlpRetry) -> Self {
361        self.retry = Some(r);
362        self
363    }
364
365    /// Inject the schema registry used to project per-event metric
366    /// fields. When unset, the sink falls back to
367    /// `SchemaRegistry::from_link_section()` so apps that don't wire it
368    /// explicitly still get correct dispatch. Spec 93 P1-6.
369    #[must_use]
370    pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
371        self.registry = Some(registry);
372        self
373    }
374
375    /// Finalise.
376    ///
377    /// # Errors
378    ///
379    /// Returns `OtlpError::Config` if required fields are missing.
380    pub fn build(self) -> Result<OtlpMetricSink, OtlpError> {
381        let endpoint = self.endpoint.unwrap_or_default();
382        let resource = self.resource.unwrap_or_default();
383        let exporter = self
384            .exporter
385            .unwrap_or_else(|| Arc::new(StdoutDebugExporter));
386        let retry_policy = self.retry.unwrap_or_default();
387        let registry = self
388            .registry
389            .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
390        Ok(OtlpMetricSink {
391            exporter,
392            batch: Arc::new(Batch::new(
393                DEFAULT_BATCH_RECORDS,
394                Duration::from_millis(DEFAULT_BATCH_AGE_MS),
395            )),
396            retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
397            resource: Arc::new(resource),
398            endpoint: Arc::new(endpoint),
399            retry_policy,
400            registry,
401        })
402    }
403}
404
405impl Sink for OtlpMetricSink {
406    fn deliver(&self, env: ScrubbedEnvelope<'_>) {
407        // Spec 14 § 5 / spec 93 P0-8: ship the *scrubbed* payload.
408        let mut owned = env.envelope().clone();
409        owned.payload = env.payload().to_vec();
410        if let Some(batch) = self.batch.push(owned) {
411            self.dispatch(batch);
412        }
413    }
414
415    fn flush(&self) -> SinkFut<'_> {
416        Box::pin(async move {
417            let leftover = self.batch.drain();
418            if !leftover.is_empty() {
419                self.dispatch(leftover);
420            }
421        })
422    }
423
424    fn shutdown(&self) -> SinkFut<'_> {
425        Box::pin(async move {
426            let leftover = self.batch.drain();
427            if !leftover.is_empty() {
428                self.dispatch(leftover);
429            }
430            while let Some(payload) = self.retry.pop() {
431                let _ = self.exporter.export_metrics(&payload);
432            }
433        })
434    }
435}
436
437impl OtlpMetricSink {
438    fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
439        let attrs = obs_core::observer().resource_attrs();
440        if attrs.service_name.is_empty()
441            && attrs.service_version.is_empty()
442            && attrs.extra.is_empty()
443        {
444            Arc::clone(&self.resource)
445        } else {
446            Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
447        }
448    }
449
450    fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
451        let resource = self.live_resource();
452        let payload = OtlpMetricPayload::from_envelopes(
453            &envelopes,
454            &resource,
455            &self.endpoint,
456            &self.registry,
457        );
458        match self.exporter.export_metrics(&payload) {
459            Ok(()) => {}
460            Err(_) => {
461                let _ = self.retry.push(payload);
462            }
463        }
464    }
465}
466
467// ─── OtlpTraceSink ────────────────────────────────────────────────────
468
469/// OTLP trace-tier sink. Spec 20 § 2.5.
470pub struct OtlpTraceSink {
471    exporter: Arc<dyn OtlpExporter>,
472    batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
473    retry: Arc<RetryQueue<OtlpTracePayload>>,
474    resource: Arc<OtlpResourceAttrs>,
475    endpoint: Arc<OtlpEndpoint>,
476    retry_policy: OtlpRetry,
477    pair_tracker: Arc<SpanPairTracker>,
478    /// Schema lookup driving Started/Completed pair detection via
479    /// `EventSchemaErased::spans_paired_with()`. Spec 93 P1-7.
480    registry: Arc<SchemaRegistry>,
481}
482
483impl std::fmt::Debug for OtlpTraceSink {
484    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485        f.debug_struct("OtlpTraceSink")
486            .field("endpoint", &self.endpoint.url)
487            .field("retry", &self.retry_policy)
488            .finish_non_exhaustive()
489    }
490}
491
492/// Builder for [`OtlpTraceSink`].
493#[derive(Default)]
494pub struct OtlpTraceSinkBuilder {
495    exporter: Option<Arc<dyn OtlpExporter>>,
496    endpoint: Option<OtlpEndpoint>,
497    resource: Option<OtlpResourceAttrs>,
498    retry: Option<OtlpRetry>,
499    registry: Option<Arc<SchemaRegistry>>,
500    pair_timeout: Option<Duration>,
501}
502
503impl std::fmt::Debug for OtlpTraceSinkBuilder {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        f.debug_struct("OtlpTraceSinkBuilder")
506            .finish_non_exhaustive()
507    }
508}
509
510impl OtlpTraceSink {
511    /// Builder entry.
512    #[must_use]
513    pub fn builder() -> OtlpTraceSinkBuilder {
514        OtlpTraceSinkBuilder::default()
515    }
516
517    /// `from_env` convenience.
518    ///
519    /// # Errors
520    ///
521    /// See [`OtlpLogSink::from_env`].
522    pub fn from_env() -> Result<Self, OtlpError> {
523        Self::builder()
524            .endpoint(crate::env_config::endpoint_from_env())
525            .resource(crate::env_config::resource_from_env())
526            .build()
527    }
528}
529
530impl OtlpTraceSinkBuilder {
531    /// Set endpoint.
532    #[must_use]
533    pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
534        self.endpoint = Some(e);
535        self
536    }
537    /// Set resource attrs.
538    #[must_use]
539    pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
540        self.resource = Some(r);
541        self
542    }
543    /// Override exporter.
544    #[must_use]
545    pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
546        self.exporter = Some(e);
547        self
548    }
549    /// Override retry policy.
550    #[must_use]
551    pub fn retry(mut self, r: OtlpRetry) -> Self {
552        self.retry = Some(r);
553        self
554    }
555
556    /// Inject the schema registry used for Started/Completed pair
557    /// detection (`EventSchema::SPANS_PAIRED_WITH`). Spec 93 P1-7.
558    #[must_use]
559    pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
560        self.registry = Some(registry);
561        self
562    }
563
564    /// Override the orphan-pair timeout (default 60s). Spec 93 P1-7.
565    #[must_use]
566    pub fn pair_timeout(mut self, timeout: Duration) -> Self {
567        self.pair_timeout = Some(timeout);
568        self
569    }
570
571    /// Finalise.
572    ///
573    /// # Errors
574    ///
575    /// Returns `OtlpError::Config` when required fields are missing.
576    pub fn build(self) -> Result<OtlpTraceSink, OtlpError> {
577        let endpoint = self.endpoint.unwrap_or_default();
578        let resource = self.resource.unwrap_or_default();
579        let exporter = self
580            .exporter
581            .unwrap_or_else(|| Arc::new(StdoutDebugExporter));
582        let retry_policy = self.retry.unwrap_or_default();
583        let registry = self
584            .registry
585            .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
586        let pair_timeout = self
587            .pair_timeout
588            .unwrap_or(crate::traces::DEFAULT_PAIR_TIMEOUT);
589        Ok(OtlpTraceSink {
590            exporter,
591            batch: Arc::new(Batch::new(
592                DEFAULT_BATCH_RECORDS,
593                Duration::from_millis(DEFAULT_BATCH_AGE_MS),
594            )),
595            retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
596            resource: Arc::new(resource),
597            endpoint: Arc::new(endpoint),
598            retry_policy,
599            pair_tracker: Arc::new(SpanPairTracker::with_timeout(pair_timeout)),
600            registry,
601        })
602    }
603}
604
605impl Sink for OtlpTraceSink {
606    fn deliver(&self, env: ScrubbedEnvelope<'_>) {
607        // Spec 14 § 5 / spec 93 P0-8: ship the *scrubbed* payload.
608        let mut owned = env.envelope().clone();
609        owned.payload = env.payload().to_vec();
610        if let Some(batch) = self.batch.push(owned) {
611            self.dispatch(batch);
612        }
613    }
614
615    fn flush(&self) -> SinkFut<'_> {
616        Box::pin(async move {
617            let leftover = self.batch.drain();
618            if !leftover.is_empty() {
619                self.dispatch(leftover);
620            }
621        })
622    }
623
624    fn shutdown(&self) -> SinkFut<'_> {
625        Box::pin(async move {
626            let leftover = self.batch.drain();
627            if !leftover.is_empty() {
628                self.dispatch(leftover);
629            }
630            while let Some(payload) = self.retry.pop() {
631                let _ = self.exporter.export_traces(&payload);
632            }
633        })
634    }
635}
636
637impl OtlpTraceSink {
638    fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
639        let attrs = obs_core::observer().resource_attrs();
640        if attrs.service_name.is_empty()
641            && attrs.service_version.is_empty()
642            && attrs.extra.is_empty()
643        {
644            Arc::clone(&self.resource)
645        } else {
646            Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
647        }
648    }
649
650    fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
651        let resource = self.live_resource();
652        let payload = OtlpTracePayload::from_envelopes(
653            &envelopes,
654            &resource,
655            &self.endpoint,
656            &self.pair_tracker,
657            &self.registry,
658        );
659        // Surface orphans to the runtime self-event catalogue.
660        // Spec 93 P1-2 + P1-7.
661        for full_name in &payload.orphaned {
662            obs_core::self_events_public::emit_span_pair_orphaned(full_name);
663        }
664        match self.exporter.export_traces(&payload) {
665            Ok(()) => {}
666            Err(_) => {
667                let _ = self.retry.push(payload);
668            }
669        }
670    }
671}