#![deny(missing_docs, unreachable_pub, missing_debug_implementations)]
#![cfg_attr(test, deny(warnings))]
mod agent;
#[cfg(feature = "collector_client")]
mod collector;
#[allow(clippy::all, unreachable_pub, dead_code)]
mod thrift;
pub(crate) mod transport;
mod uploader;
use self::thrift::jaeger;
use opentelemetry::{api, exporter::trace, sdk};
use std::sync::{Arc, Mutex};
use std::{
any, net,
time::{Duration, SystemTime},
};
static DEFAULT_SERVICE_NAME: &str = "OpenTelemetry";
static DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831";
#[derive(Debug)]
pub struct Exporter {
process: jaeger::Process,
uploader: Mutex<uploader::BatchUploader>,
}
#[derive(Debug, Default)]
pub struct Process {
pub service_name: String,
pub tags: Vec<api::KeyValue>,
}
impl Into<jaeger::Process> for Process {
fn into(self) -> jaeger::Process {
jaeger::Process::new(
self.service_name,
Some(self.tags.into_iter().map(Into::into).collect()),
)
}
}
impl Exporter {
pub fn builder() -> Builder<String> {
Builder::default()
}
pub fn init_default() -> Result<Self, ::thrift::Error> {
Exporter::builder()
.with_agent_endpoint(DEFAULT_AGENT_ENDPOINT.parse().unwrap())
.init()
}
}
impl trace::SpanExporter for Exporter {
fn export(&self, batch: Vec<Arc<trace::SpanData>>) -> trace::ExportResult {
match self.uploader.lock() {
Ok(mut uploader) => {
let jaeger_spans = batch.into_iter().map(Into::into).collect();
uploader.upload(jaeger::Batch::new(self.process.clone(), jaeger_spans))
}
Err(_) => trace::ExportResult::FailedNotRetryable,
}
}
fn shutdown(&self) {}
fn as_any(&self) -> &dyn any::Any {
self
}
}
#[derive(Debug)]
pub struct Builder<T: net::ToSocketAddrs> {
agent_endpoint: Option<T>,
#[cfg(feature = "collector_client")]
collector_endpoint: Option<String>,
#[cfg(feature = "collector_client")]
collector_username: Option<String>,
#[cfg(feature = "collector_client")]
collector_password: Option<String>,
process: Process,
}
impl<T: net::ToSocketAddrs> Default for Builder<T> {
fn default() -> Self {
Builder {
agent_endpoint: None,
#[cfg(feature = "collector_client")]
collector_endpoint: None,
#[cfg(feature = "collector_client")]
collector_username: None,
#[cfg(feature = "collector_client")]
collector_password: None,
process: Process {
service_name: DEFAULT_SERVICE_NAME.to_string(),
tags: Vec::new(),
},
}
}
}
impl<T: net::ToSocketAddrs> Builder<T> {
pub fn with_agent_endpoint(self, agent_endpoint: T) -> Self {
Builder {
agent_endpoint: Some(agent_endpoint),
..self
}
}
#[cfg(feature = "collector_client")]
pub fn with_collector_endpoint<T: Into<String>>(self, collector_endpoint: T) -> Self {
Builder {
collector_endpoint: Some(collector_endpoint.into()),
..self
}
}
#[cfg(feature = "collector_client")]
pub fn with_collector_username<T: Into<String>>(self, collector_username: T) -> Self {
Builder {
collector_username: Some(collector_username.into()),
..self
}
}
#[cfg(feature = "collector_client")]
pub fn with_collector_password<T: Into<String>>(self, collector_password: T) -> Self {
Builder {
collector_password: Some(collector_password.into()),
..self
}
}
pub fn with_process(self, process: Process) -> Self {
Builder { process, ..self }
}
pub fn init(self) -> ::thrift::Result<Exporter> {
let (process, uploader) = self.init_uploader()?;
Ok(Exporter {
process: process.into(),
uploader: Mutex::new(uploader),
})
}
#[cfg(not(feature = "collector_client"))]
fn init_uploader(self) -> ::thrift::Result<(Process, uploader::BatchUploader)> {
let agent = if let Some(endpoint) = self.agent_endpoint {
agent::AgentSyncClientUDP::new(endpoint, None)?
} else {
agent::AgentSyncClientUDP::new(
DEFAULT_AGENT_ENDPOINT.parse::<net::SocketAddr>().unwrap(),
None,
)?
};
Ok((self.process, uploader::BatchUploader::Agent(agent)))
}
#[cfg(feature = "collector_client")]
fn init_uploader(self) -> ::thrift::Result<(Process, uploader::BatchUploader)> {
if self.agent_endpoint.is_some() {
let agent = agent::AgentSyncClientUDP::new(self.agent_endpoint.unwrap(), None)?;
Ok((self.process, uploader::BatchUploader::Agent(agent)))
} else if self.collector_endpoint.is_some() {
let collector = collector::CollectorSyncClientHttp::new(
self.collector_endpoint.unwrap(),
self.collector_username,
self.collector_password,
)?;
Ok((self.process, uploader::BatchUploader::Collector(collector)))
} else {
Err(::thrift::Error::from(
"Collector endpoint or agent endpoint must be set",
))
}
}
}
#[rustfmt::skip]
impl Into<jaeger::Tag> for api::KeyValue {
fn into(self) -> jaeger::Tag {
let api::KeyValue { key, value } = self;
match value {
api::Value::String(s) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(s), None, None, None, None),
api::Value::F64(f) => jaeger::Tag::new(key.into(), jaeger::TagType::Double, None, Some(f.into()), None, None, None),
api::Value::Bool(b) => jaeger::Tag::new(key.into(), jaeger::TagType::Bool, None, None, Some(b), None, None),
api::Value::I64(i) => jaeger::Tag::new(key.into(), jaeger::TagType::Long, None, None, None, Some(i), None),
api::Value::Bytes(b) => jaeger::Tag::new(key.into(), jaeger::TagType::Binary, None, None, None, None, Some(b)),
api::Value::U64(u) => jaeger::Tag::new(key.into(), jaeger::TagType::String, Some(u.to_string()), None, None, None, None),
}
}
}
impl Into<jaeger::Log> for api::Event {
fn into(self) -> jaeger::Log {
let timestamp = self
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_micros() as i64;
jaeger::Log::new(
timestamp,
vec![jaeger::Tag::new(
"event".to_string(),
jaeger::TagType::String,
Some(self.message),
None,
None,
None,
None,
)],
)
}
}
impl Into<jaeger::Span> for Arc<trace::SpanData> {
fn into(self) -> jaeger::Span {
let trace_id = self.context.trace_id();
let trace_id_high = (trace_id >> 64) as i64;
let trace_id_low = trace_id as i64;
jaeger::Span {
trace_id_low,
trace_id_high,
span_id: self.context.span_id() as i64,
parent_span_id: self.parent_span_id as i64,
operation_name: self.name.clone(),
references: links_to_references(&self.links),
flags: self.context.trace_flags() as i32,
start_time: self
.start_time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_micros() as i64,
duration: self
.end_time
.duration_since(self.start_time)
.unwrap_or_else(|_| Duration::from_secs(0))
.as_micros() as i64,
tags: attrs_to_tags(&self.attributes),
logs: events_to_logs(&self.message_events),
}
}
}
fn links_to_references(_links: &sdk::EvictedQueue<api::Link>) -> Option<Vec<jaeger::SpanRef>> {
None
}
fn attrs_to_tags(attrs: &sdk::EvictedQueue<api::KeyValue>) -> Option<Vec<jaeger::Tag>> {
if attrs.is_empty() {
None
} else {
Some(attrs.iter().cloned().map(Into::into).collect())
}
}
fn events_to_logs(events: &sdk::EvictedQueue<api::Event>) -> Option<Vec<jaeger::Log>> {
if events.is_empty() {
None
} else {
Some(events.iter().cloned().map(Into::into).collect())
}
}