use std::{sync::Arc, time::Duration};
use obs_core::{SchemaRegistry, Sink, registry::ScrubbedEnvelope, sink::SinkFut};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use crate::{
OtlpError,
backpressure::RetryQueue,
batch::Batch,
env_config::{OtlpEndpoint, OtlpResourceAttrs},
logs::OtlpLogPayload,
metrics::OtlpMetricPayload,
traces::{OtlpTracePayload, SpanPairTracker},
};
const DEFAULT_BATCH_RECORDS: usize = 256;
const DEFAULT_BATCH_AGE_MS: u64 = 1_000;
const DEFAULT_RETRY_QUEUE: usize = 16_384;
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct OtlpRetry {
pub max_attempts: u32,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
}
impl Default for OtlpRetry {
fn default() -> Self {
Self {
max_attempts: 5,
initial_backoff_ms: 100,
max_backoff_ms: 30_000,
}
}
}
pub trait OtlpExporter: Send + Sync + 'static {
fn export_logs(&self, payload: &OtlpLogPayload) -> Result<(), OtlpError>;
fn export_metrics(&self, payload: &OtlpMetricPayload) -> Result<(), OtlpError>;
fn export_traces(&self, payload: &OtlpTracePayload) -> Result<(), OtlpError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct StdoutDebugExporter;
impl OtlpExporter for StdoutDebugExporter {
fn export_logs(&self, payload: &OtlpLogPayload) -> Result<(), OtlpError> {
match serde_json::to_string(payload) {
Ok(s) => {
println!("{s}");
Ok(())
}
Err(e) => Err(OtlpError::Transport(e.to_string())),
}
}
fn export_metrics(&self, payload: &OtlpMetricPayload) -> Result<(), OtlpError> {
match serde_json::to_string(payload) {
Ok(s) => {
println!("{s}");
Ok(())
}
Err(e) => Err(OtlpError::Transport(e.to_string())),
}
}
fn export_traces(&self, payload: &OtlpTracePayload) -> Result<(), OtlpError> {
match serde_json::to_string(payload) {
Ok(s) => {
println!("{s}");
Ok(())
}
Err(e) => Err(OtlpError::Transport(e.to_string())),
}
}
}
pub struct OtlpLogSink {
exporter: Arc<dyn OtlpExporter>,
batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
retry: Arc<RetryQueue<OtlpLogPayload>>,
resource: Arc<OtlpResourceAttrs>,
endpoint: Arc<OtlpEndpoint>,
retry_policy: OtlpRetry,
last_flush: Mutex<()>,
}
impl std::fmt::Debug for OtlpLogSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtlpLogSink")
.field("endpoint", &self.endpoint.url)
.field("retry", &self.retry_policy)
.finish_non_exhaustive()
}
}
#[derive(Default)]
pub struct OtlpLogSinkBuilder {
exporter: Option<Arc<dyn OtlpExporter>>,
endpoint: Option<OtlpEndpoint>,
resource: Option<OtlpResourceAttrs>,
retry: Option<OtlpRetry>,
}
impl std::fmt::Debug for OtlpLogSinkBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtlpLogSinkBuilder").finish_non_exhaustive()
}
}
impl OtlpLogSink {
#[must_use]
pub fn builder() -> OtlpLogSinkBuilder {
OtlpLogSinkBuilder::default()
}
pub fn from_env() -> Result<Self, OtlpError> {
Self::builder()
.endpoint(crate::env_config::endpoint_from_env())
.resource(crate::env_config::resource_from_env())
.build()
}
}
impl OtlpLogSinkBuilder {
#[must_use]
pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
self.endpoint = Some(e);
self
}
#[must_use]
pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
self.resource = Some(r);
self
}
#[must_use]
pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
self.exporter = Some(e);
self
}
#[must_use]
pub fn retry(mut self, r: OtlpRetry) -> Self {
self.retry = Some(r);
self
}
pub fn build(self) -> Result<OtlpLogSink, OtlpError> {
let endpoint = self.endpoint.unwrap_or_default();
let resource = self.resource.unwrap_or_default();
let exporter = self
.exporter
.unwrap_or_else(|| Arc::new(StdoutDebugExporter));
let retry_policy = self.retry.unwrap_or_default();
Ok(OtlpLogSink {
exporter,
batch: Arc::new(Batch::new(
DEFAULT_BATCH_RECORDS,
Duration::from_millis(DEFAULT_BATCH_AGE_MS),
)),
retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
resource: Arc::new(resource),
endpoint: Arc::new(endpoint),
retry_policy,
last_flush: Mutex::new(()),
})
}
}
impl Sink for OtlpLogSink {
fn deliver(&self, env: ScrubbedEnvelope<'_>) {
let mut owned = env.envelope().clone();
owned.payload = env.payload().to_vec();
if let Some(batch) = self.batch.push(owned) {
self.dispatch(batch);
}
}
fn flush(&self) -> SinkFut<'_> {
Box::pin(async move {
let leftover = self.batch.drain();
if !leftover.is_empty() {
self.dispatch(leftover);
}
})
}
fn shutdown(&self) -> SinkFut<'_> {
Box::pin(async move {
let leftover = self.batch.drain();
if !leftover.is_empty() {
self.dispatch(leftover);
}
while let Some(payload) = self.retry.pop() {
let _ = self.exporter.export_logs(&payload);
}
})
}
}
impl OtlpLogSink {
fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
let attrs = obs_core::observer().resource_attrs();
if attrs.service_name.is_empty()
&& attrs.service_version.is_empty()
&& attrs.extra.is_empty()
{
Arc::clone(&self.resource)
} else {
Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
}
}
fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
let resource = self.live_resource();
let payload = OtlpLogPayload::from_envelopes(&envelopes, &resource, &self.endpoint);
let _g = self.last_flush.lock();
match self.exporter.export_logs(&payload) {
Ok(()) => {}
Err(_) => {
let _ = self.retry.push(payload);
}
}
}
#[must_use]
pub fn retry_depth(&self) -> usize {
self.retry.depth()
}
}
pub struct OtlpMetricSink {
exporter: Arc<dyn OtlpExporter>,
batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
retry: Arc<RetryQueue<OtlpMetricPayload>>,
resource: Arc<OtlpResourceAttrs>,
endpoint: Arc<OtlpEndpoint>,
retry_policy: OtlpRetry,
registry: Arc<SchemaRegistry>,
}
impl std::fmt::Debug for OtlpMetricSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtlpMetricSink")
.field("endpoint", &self.endpoint.url)
.field("retry", &self.retry_policy)
.finish_non_exhaustive()
}
}
#[derive(Default)]
pub struct OtlpMetricSinkBuilder {
exporter: Option<Arc<dyn OtlpExporter>>,
endpoint: Option<OtlpEndpoint>,
resource: Option<OtlpResourceAttrs>,
retry: Option<OtlpRetry>,
registry: Option<Arc<SchemaRegistry>>,
}
impl std::fmt::Debug for OtlpMetricSinkBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtlpMetricSinkBuilder")
.finish_non_exhaustive()
}
}
impl OtlpMetricSink {
#[must_use]
pub fn builder() -> OtlpMetricSinkBuilder {
OtlpMetricSinkBuilder::default()
}
pub fn from_env() -> Result<Self, OtlpError> {
Self::builder()
.endpoint(crate::env_config::endpoint_from_env())
.resource(crate::env_config::resource_from_env())
.build()
}
}
impl OtlpMetricSinkBuilder {
#[must_use]
pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
self.endpoint = Some(e);
self
}
#[must_use]
pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
self.resource = Some(r);
self
}
#[must_use]
pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
self.exporter = Some(e);
self
}
#[must_use]
pub fn retry(mut self, r: OtlpRetry) -> Self {
self.retry = Some(r);
self
}
#[must_use]
pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
self.registry = Some(registry);
self
}
pub fn build(self) -> Result<OtlpMetricSink, OtlpError> {
let endpoint = self.endpoint.unwrap_or_default();
let resource = self.resource.unwrap_or_default();
let exporter = self
.exporter
.unwrap_or_else(|| Arc::new(StdoutDebugExporter));
let retry_policy = self.retry.unwrap_or_default();
let registry = self
.registry
.unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
Ok(OtlpMetricSink {
exporter,
batch: Arc::new(Batch::new(
DEFAULT_BATCH_RECORDS,
Duration::from_millis(DEFAULT_BATCH_AGE_MS),
)),
retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
resource: Arc::new(resource),
endpoint: Arc::new(endpoint),
retry_policy,
registry,
})
}
}
impl Sink for OtlpMetricSink {
fn deliver(&self, env: ScrubbedEnvelope<'_>) {
let mut owned = env.envelope().clone();
owned.payload = env.payload().to_vec();
if let Some(batch) = self.batch.push(owned) {
self.dispatch(batch);
}
}
fn flush(&self) -> SinkFut<'_> {
Box::pin(async move {
let leftover = self.batch.drain();
if !leftover.is_empty() {
self.dispatch(leftover);
}
})
}
fn shutdown(&self) -> SinkFut<'_> {
Box::pin(async move {
let leftover = self.batch.drain();
if !leftover.is_empty() {
self.dispatch(leftover);
}
while let Some(payload) = self.retry.pop() {
let _ = self.exporter.export_metrics(&payload);
}
})
}
}
impl OtlpMetricSink {
fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
let attrs = obs_core::observer().resource_attrs();
if attrs.service_name.is_empty()
&& attrs.service_version.is_empty()
&& attrs.extra.is_empty()
{
Arc::clone(&self.resource)
} else {
Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
}
}
fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
let resource = self.live_resource();
let payload = OtlpMetricPayload::from_envelopes(
&envelopes,
&resource,
&self.endpoint,
&self.registry,
);
match self.exporter.export_metrics(&payload) {
Ok(()) => {}
Err(_) => {
let _ = self.retry.push(payload);
}
}
}
}
pub struct OtlpTraceSink {
exporter: Arc<dyn OtlpExporter>,
batch: Arc<Batch<obs_proto::obs::v1::ObsEnvelope>>,
retry: Arc<RetryQueue<OtlpTracePayload>>,
resource: Arc<OtlpResourceAttrs>,
endpoint: Arc<OtlpEndpoint>,
retry_policy: OtlpRetry,
pair_tracker: Arc<SpanPairTracker>,
registry: Arc<SchemaRegistry>,
}
impl std::fmt::Debug for OtlpTraceSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtlpTraceSink")
.field("endpoint", &self.endpoint.url)
.field("retry", &self.retry_policy)
.finish_non_exhaustive()
}
}
#[derive(Default)]
pub struct OtlpTraceSinkBuilder {
exporter: Option<Arc<dyn OtlpExporter>>,
endpoint: Option<OtlpEndpoint>,
resource: Option<OtlpResourceAttrs>,
retry: Option<OtlpRetry>,
registry: Option<Arc<SchemaRegistry>>,
pair_timeout: Option<Duration>,
}
impl std::fmt::Debug for OtlpTraceSinkBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OtlpTraceSinkBuilder")
.finish_non_exhaustive()
}
}
impl OtlpTraceSink {
#[must_use]
pub fn builder() -> OtlpTraceSinkBuilder {
OtlpTraceSinkBuilder::default()
}
pub fn from_env() -> Result<Self, OtlpError> {
Self::builder()
.endpoint(crate::env_config::endpoint_from_env())
.resource(crate::env_config::resource_from_env())
.build()
}
}
impl OtlpTraceSinkBuilder {
#[must_use]
pub fn endpoint(mut self, e: OtlpEndpoint) -> Self {
self.endpoint = Some(e);
self
}
#[must_use]
pub fn resource(mut self, r: OtlpResourceAttrs) -> Self {
self.resource = Some(r);
self
}
#[must_use]
pub fn exporter(mut self, e: Arc<dyn OtlpExporter>) -> Self {
self.exporter = Some(e);
self
}
#[must_use]
pub fn retry(mut self, r: OtlpRetry) -> Self {
self.retry = Some(r);
self
}
#[must_use]
pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
self.registry = Some(registry);
self
}
#[must_use]
pub fn pair_timeout(mut self, timeout: Duration) -> Self {
self.pair_timeout = Some(timeout);
self
}
pub fn build(self) -> Result<OtlpTraceSink, OtlpError> {
let endpoint = self.endpoint.unwrap_or_default();
let resource = self.resource.unwrap_or_default();
let exporter = self
.exporter
.unwrap_or_else(|| Arc::new(StdoutDebugExporter));
let retry_policy = self.retry.unwrap_or_default();
let registry = self
.registry
.unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
let pair_timeout = self
.pair_timeout
.unwrap_or(crate::traces::DEFAULT_PAIR_TIMEOUT);
Ok(OtlpTraceSink {
exporter,
batch: Arc::new(Batch::new(
DEFAULT_BATCH_RECORDS,
Duration::from_millis(DEFAULT_BATCH_AGE_MS),
)),
retry: Arc::new(RetryQueue::new(DEFAULT_RETRY_QUEUE)),
resource: Arc::new(resource),
endpoint: Arc::new(endpoint),
retry_policy,
pair_tracker: Arc::new(SpanPairTracker::with_timeout(pair_timeout)),
registry,
})
}
}
impl Sink for OtlpTraceSink {
fn deliver(&self, env: ScrubbedEnvelope<'_>) {
let mut owned = env.envelope().clone();
owned.payload = env.payload().to_vec();
if let Some(batch) = self.batch.push(owned) {
self.dispatch(batch);
}
}
fn flush(&self) -> SinkFut<'_> {
Box::pin(async move {
let leftover = self.batch.drain();
if !leftover.is_empty() {
self.dispatch(leftover);
}
})
}
fn shutdown(&self) -> SinkFut<'_> {
Box::pin(async move {
let leftover = self.batch.drain();
if !leftover.is_empty() {
self.dispatch(leftover);
}
while let Some(payload) = self.retry.pop() {
let _ = self.exporter.export_traces(&payload);
}
})
}
}
impl OtlpTraceSink {
fn live_resource(&self) -> Arc<OtlpResourceAttrs> {
let attrs = obs_core::observer().resource_attrs();
if attrs.service_name.is_empty()
&& attrs.service_version.is_empty()
&& attrs.extra.is_empty()
{
Arc::clone(&self.resource)
} else {
Arc::new(OtlpResourceAttrs::from(attrs.as_ref()))
}
}
fn dispatch(&self, envelopes: Vec<obs_proto::obs::v1::ObsEnvelope>) {
let resource = self.live_resource();
let payload = OtlpTracePayload::from_envelopes(
&envelopes,
&resource,
&self.endpoint,
&self.pair_tracker,
&self.registry,
);
for full_name in &payload.orphaned {
obs_core::self_events_public::emit_span_pair_orphaned(full_name);
}
match self.exporter.export_traces(&payload) {
Ok(()) => {}
Err(_) => {
let _ = self.retry.push(payload);
}
}
}
}