pub(crate) mod metrics;
pub(crate) mod observer;
use anyhow::Context;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::SpanExporter;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer, Registry};
pub(crate) fn init_tracing() -> Option<SdkTracerProvider> {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let json = std::env::var("FLUSSO_LOG_FORMAT")
.map(|value| value.eq_ignore_ascii_case("json"))
.unwrap_or(false);
let fmt_layer: Box<dyn Layer<Registry> + Send + Sync> = if json {
Box::new(
tracing_subscriber::fmt::layer()
.json()
.with_writer(std::io::stderr),
)
} else {
Box::new(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
};
let mut layers: Vec<Box<dyn Layer<Registry> + Send + Sync>> = vec![fmt_layer];
let mut otlp_error: Option<String> = None;
let provider = match otlp_provider() {
Ok(Some(provider)) => {
let tracer = provider.tracer("flusso");
layers.push(Box::new(tracing_opentelemetry::layer().with_tracer(tracer)));
Some(provider)
}
Ok(None) => None,
Err(error) => {
otlp_error = Some(format!("{error:#}"));
None
}
};
Registry::default().with(layers).with(filter).init();
if let Some(error) = otlp_error {
tracing::warn!(error, "OTLP trace export disabled; logging to stderr only");
} else if provider.is_some() {
tracing::info!("OTLP trace export enabled");
}
provider
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum OtlpProtocol {
HttpProtobuf,
Grpc,
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum OtlpSignal {
Traces,
Metrics,
}
impl OtlpSignal {
fn per_signal_var(self) -> &'static str {
match self {
OtlpSignal::Traces => "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL",
OtlpSignal::Metrics => "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL",
}
}
}
pub(crate) fn otlp_protocol(signal: OtlpSignal) -> OtlpProtocol {
let per_signal = std::env::var(signal.per_signal_var()).ok();
let general = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").ok();
resolve_protocol(per_signal.as_deref(), general.as_deref())
}
fn resolve_protocol(per_signal: Option<&str>, general: Option<&str>) -> OtlpProtocol {
match per_signal.or(general).map(str::trim) {
None | Some("http/protobuf") => OtlpProtocol::HttpProtobuf,
Some("grpc") => OtlpProtocol::Grpc,
Some(other) => {
tracing::warn!(
protocol = other,
"unrecognized OTEL_EXPORTER_OTLP_PROTOCOL; falling back to http/protobuf"
);
OtlpProtocol::HttpProtobuf
}
}
}
#[cfg(test)]
mod tests {
use super::{OtlpProtocol, resolve_protocol};
#[test]
fn unset_defaults_to_http_protobuf() {
assert_eq!(resolve_protocol(None, None), OtlpProtocol::HttpProtobuf);
}
#[test]
fn general_protocol_is_honored() {
assert_eq!(resolve_protocol(None, Some("grpc")), OtlpProtocol::Grpc);
assert_eq!(
resolve_protocol(None, Some(" http/protobuf ")),
OtlpProtocol::HttpProtobuf
);
}
#[test]
fn per_signal_overrides_general() {
assert_eq!(
resolve_protocol(Some("grpc"), Some("http/protobuf")),
OtlpProtocol::Grpc
);
assert_eq!(
resolve_protocol(Some("http/protobuf"), Some("grpc")),
OtlpProtocol::HttpProtobuf
);
}
#[test]
fn unrecognized_falls_back_to_http_protobuf() {
assert_eq!(
resolve_protocol(Some("thrift"), None),
OtlpProtocol::HttpProtobuf
);
}
}
fn otlp_provider() -> anyhow::Result<Option<SdkTracerProvider>> {
let configured = std::env::var_os("OTEL_EXPORTER_OTLP_ENDPOINT").is_some()
|| std::env::var_os("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT").is_some();
if !configured {
return Ok(None);
}
let builder = SpanExporter::builder();
let exporter = match otlp_protocol(OtlpSignal::Traces) {
OtlpProtocol::HttpProtobuf => builder.with_http().build(),
OtlpProtocol::Grpc => builder.with_tonic().build(),
}
.context("building OTLP span exporter")?;
let resource = Resource::builder()
.with_service_name(env!("CARGO_PKG_NAME"))
.build();
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource)
.build();
Ok(Some(provider))
}