#![doc(html_logo_url = "https://raw.githubusercontent.com/emit-rs/emit/main/asset/logo.svg")]
#![deny(missing_docs)]
use emit::well_known::KEY_SPAN_LINKS;
use emit::{
well_known::{
KEY_ERR, KEY_EVT_KIND, KEY_LVL, KEY_SPAN_ID, KEY_SPAN_KIND, KEY_SPAN_NAME, KEY_SPAN_PARENT,
KEY_TRACE_ID, LVL_DEBUG, LVL_ERROR, LVL_INFO, LVL_WARN,
},
Filter, Props,
};
use opentelemetry::trace::{Link, SamplingDecision, SamplingResult, TraceState};
use opentelemetry::{
logs::{AnyValue, LogRecord, Logger, LoggerProvider, Severity},
trace::{
SpanContext, SpanId, SpanKind, Status, TraceContextExt, TraceId, Tracer, TracerProvider,
},
Context, ContextGuard, Key, KeyValue, TraceFlags, Value,
};
use std::fmt::{Display, Formatter};
use std::{borrow::Cow, cell::RefCell, fmt, ops::ControlFlow, sync::Arc};
mod internal_metrics;
pub use internal_metrics::*;
pub fn setup<L: Logger + Send + Sync + 'static, T: Tracer + Send + Sync + 'static>(
logger_provider: impl LoggerProvider<Logger = L>,
tracer_provider: impl TracerProvider<Tracer = T>,
) -> emit::Setup<
OpenTelemetryEmitter<L>,
OpenTelemetryIsSampledFilter,
OpenTelemetryCtxt<emit::setup::DefaultCtxt, T>,
>
where
T::Span: Send + Sync + 'static,
{
let name = "emit";
let metrics = Arc::new(InternalMetrics::default());
emit::setup()
.emit_to(OpenTelemetryEmitter::new(
metrics.clone(),
logger_provider.logger(name),
))
.emit_when(OpenTelemetryIsSampledFilter {})
.map_ctxt(|ctxt| {
OpenTelemetryCtxt::wrap(metrics.clone(), tracer_provider.tracer(name), ctxt)
})
}
pub struct OpenTelemetryCtxt<C, T> {
tracer: T,
metrics: Arc<InternalMetrics>,
inner: C,
}
pub struct OpenTelemetryFrame<F> {
slot: Option<Context>,
active: bool,
options: OpenTelemetryFrameOptions,
inner: F,
}
#[derive(Debug, Clone, Copy, Default)]
struct OpenTelemetryFrameOptions {
end_on_close: bool,
update_default_name_on_close: bool,
}
pub struct OpenTelemetryProps<P: ?Sized> {
ctxt: emit::span::SpanCtxt,
inner: *const P,
}
impl<C, T> OpenTelemetryCtxt<C, T> {
fn wrap(metrics: Arc<InternalMetrics>, tracer: T, ctxt: C) -> Self {
OpenTelemetryCtxt {
tracer,
inner: ctxt,
metrics,
}
}
pub fn metric_source(&self) -> EmitOpenTelemetryMetrics {
EmitOpenTelemetryMetrics {
metrics: self.metrics.clone(),
}
}
}
impl<P: emit::Props + ?Sized> emit::Props for OpenTelemetryProps<P> {
fn for_each<'kv, F: FnMut(emit::Str<'kv>, emit::Value<'kv>) -> ControlFlow<()>>(
&'kv self,
mut for_each: F,
) -> ControlFlow<()> {
self.ctxt.for_each(&mut for_each)?;
unsafe { &*self.inner }.for_each(|k, v| {
if k != emit::well_known::KEY_TRACE_ID && k != emit::well_known::KEY_SPAN_ID {
for_each(k, v)?;
}
ControlFlow::Continue(())
})
}
}
thread_local! {
static ACTIVE_FRAME_STACK: RefCell<Vec<ActiveFrame>> = RefCell::new(Vec::new());
}
struct ActiveFrame {
_guard: ContextGuard,
options: OpenTelemetryFrameOptions,
}
fn push(guard: ActiveFrame) {
ACTIVE_FRAME_STACK.with(|stack| stack.borrow_mut().push(guard));
}
fn pop() -> Option<ActiveFrame> {
ACTIVE_FRAME_STACK.with(|stack| stack.borrow_mut().pop())
}
fn with_current(f: impl FnOnce(&mut ActiveFrame)) {
ACTIVE_FRAME_STACK.with(|stack| {
if let Some(frame) = stack.borrow_mut().last_mut() {
f(frame);
}
})
}
impl<C: emit::Ctxt, T: Tracer> emit::Ctxt for OpenTelemetryCtxt<C, T>
where
T::Span: Send + Sync + 'static,
{
type Current = OpenTelemetryProps<C::Current>;
type Frame = OpenTelemetryFrame<C::Frame>;
fn open_root<P: emit::Props>(&self, props: P) -> Self::Frame {
let (incoming, props) = incoming_span_ctxt(&self.tracer, props, false);
let (slot, options) = match incoming {
IncomingSpanCtxt::None => (Some(Context::new()), Default::default()),
IncomingSpanCtxt::Same => (None, Default::default()),
IncomingSpanCtxt::Different(context, options) => (Some(context), options),
};
OpenTelemetryFrame {
active: false,
options,
slot,
inner: self.inner.open_root(props),
}
}
fn open_disabled<P: Props>(&self, props: P) -> Self::Frame {
let (incoming, props) = incoming_span_ctxt(&self.tracer, props, true);
let (slot, options) = match incoming {
IncomingSpanCtxt::None => (None, Default::default()),
IncomingSpanCtxt::Same => (None, Default::default()),
IncomingSpanCtxt::Different(context, options) => (Some(context), options),
};
OpenTelemetryFrame {
active: false,
options,
slot,
inner: self.inner.open_disabled(props),
}
}
fn open_push<P: Props>(&self, props: P) -> Self::Frame {
let (incoming, props) = incoming_span_ctxt(&self.tracer, props, false);
let (slot, options) = match incoming {
IncomingSpanCtxt::None => (None, Default::default()),
IncomingSpanCtxt::Same => (None, Default::default()),
IncomingSpanCtxt::Different(context, options) => (Some(context), options),
};
OpenTelemetryFrame {
active: false,
options,
slot,
inner: self.inner.open_push(props),
}
}
fn enter(&self, local: &mut Self::Frame) {
self.inner.enter(&mut local.inner);
if let Some(ctxt) = local.slot.take() {
let guard = ctxt.attach();
push(ActiveFrame {
_guard: guard,
options: local.options,
});
local.active = true;
}
}
fn with_current<R, F: FnOnce(&Self::Current) -> R>(&self, with: F) -> R {
let ctxt = Context::current();
let span = ctxt.span();
let trace_id = span.span_context().trace_id().to_bytes();
let span_id = span.span_context().span_id().to_bytes();
self.inner.with_current(|props| {
let props = OpenTelemetryProps {
ctxt: emit::span::SpanCtxt::new(
emit::TraceId::from_bytes(trace_id),
None,
emit::SpanId::from_bytes(span_id),
),
inner: props as *const C::Current,
};
with(&props)
})
}
fn exit(&self, frame: &mut Self::Frame) {
self.inner.exit(&mut frame.inner);
if frame.active {
frame.slot = Some(Context::current());
if let Some(active) = pop() {
frame.options = active.options;
}
frame.active = false;
}
}
fn close(&self, mut frame: Self::Frame) {
if frame.options.end_on_close {
if let Some(ctxt) = frame.slot.take() {
self.metrics.span_unexpected_close.increment();
let span = ctxt.span();
span.end();
}
}
}
}
fn incoming_span_ctxt<T: Tracer>(
tracer: &T,
props: impl emit::Props,
suppress: bool,
) -> (IncomingSpanCtxt, impl Props)
where
T::Span: Send + Sync + 'static,
{
let span_id = props.pull::<emit::SpanId, _>(KEY_SPAN_ID);
let Some(span_id) = span_id else {
return (
IncomingSpanCtxt::None,
ExcludeSpanCtxtProps { inner: props },
);
};
let ctxt = Context::current();
let span_id = otel_span_id(span_id);
if span_id == ctxt.span().span_context().span_id() {
return (
IncomingSpanCtxt::Same,
ExcludeSpanCtxtProps { inner: props },
);
}
let span_links = props
.get(KEY_SPAN_LINKS)
.map(|links| {
otel_span_links(
links,
ctxt.span().span_context().trace_flags(),
ctxt.span().span_context().is_remote(),
ctxt.span().span_context().trace_state(),
)
})
.unwrap_or_default();
let span_kind = props
.pull::<emit::span::SpanKind, _>(KEY_SPAN_KIND)
.map(|kind| match kind {
emit::span::SpanKind::Client => SpanKind::Client,
emit::span::SpanKind::Server => SpanKind::Server,
emit::span::SpanKind::Producer => SpanKind::Producer,
emit::span::SpanKind::Consumer => SpanKind::Consumer,
_ => SpanKind::Internal,
})
.unwrap_or(SpanKind::Internal);
let span_name = props.pull::<String, _>(KEY_SPAN_NAME).map(Cow::Owned);
let update_default_name_on_close = span_name.is_none();
let trace_id = props.pull::<emit::TraceId, _>(KEY_TRACE_ID);
let trace_id = trace_id.map(otel_trace_id);
let mut span = tracer
.span_builder(span_name.unwrap_or(Cow::Borrowed("emit_span")))
.with_kind(span_kind)
.with_span_id(span_id);
if let Some(trace_id) = trace_id {
span = span.with_trace_id(trace_id);
}
span = span.with_links(span_links);
if suppress {
span = span.with_sampling_result(SamplingResult {
decision: SamplingDecision::Drop,
attributes: vec![],
trace_state: Default::default(),
});
}
let ctxt = ctxt.with_span(span.start(tracer));
(
IncomingSpanCtxt::Different(
ctxt,
OpenTelemetryFrameOptions {
update_default_name_on_close,
end_on_close: true,
},
),
ExcludeSpanCtxtProps { inner: props },
)
}
enum IncomingSpanCtxt {
None,
Same,
Different(Context, OpenTelemetryFrameOptions),
}
struct ExcludeSpanCtxtProps<P> {
inner: P,
}
impl<P: emit::Props> emit::Props for ExcludeSpanCtxtProps<P> {
fn for_each<'kv, F: FnMut(emit::Str<'kv>, emit::Value<'kv>) -> ControlFlow<()>>(
&'kv self,
mut for_each: F,
) -> ControlFlow<()> {
self.inner.for_each(|key, value| match key.get() {
KEY_TRACE_ID | KEY_SPAN_ID | KEY_SPAN_PARENT | KEY_SPAN_NAME | KEY_SPAN_KIND
| KEY_SPAN_LINKS => ControlFlow::Continue(()),
_ => for_each(key, value),
})
}
}
pub struct OpenTelemetryEmitter<L> {
logger: L,
span_name: Box<MessageFormatter>,
log_body: Box<MessageFormatter>,
metrics: Arc<InternalMetrics>,
}
type MessageFormatter = dyn Fn(&emit::event::Event<&dyn emit::props::ErasedProps>, &mut fmt::Formatter) -> fmt::Result
+ Send
+ Sync;
fn default_span_name() -> Box<MessageFormatter> {
Box::new(|evt, f| {
if let Some(name) = evt.props().get(KEY_SPAN_NAME) {
write!(f, "{}", name)
} else {
write!(f, "{}", evt.msg())
}
})
}
fn default_log_body() -> Box<MessageFormatter> {
Box::new(|evt, f| write!(f, "{}", evt.msg()))
}
struct MessageRenderer<'a, P> {
pub fmt: &'a MessageFormatter,
pub evt: &'a emit::event::Event<'a, P>,
}
impl<'a, P: emit::props::Props> fmt::Display for MessageRenderer<'a, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
(self.fmt)(&self.evt.erase(), f)
}
}
impl<L> OpenTelemetryEmitter<L> {
fn new(metrics: Arc<InternalMetrics>, logger: L) -> Self {
OpenTelemetryEmitter {
logger,
span_name: default_span_name(),
log_body: default_log_body(),
metrics,
}
}
pub fn with_span_name(
mut self,
writer: impl Fn(
&emit::event::Event<&dyn emit::props::ErasedProps>,
&mut fmt::Formatter,
) -> fmt::Result
+ Send
+ Sync
+ 'static,
) -> Self {
self.span_name = Box::new(writer);
self
}
pub fn with_log_body(
mut self,
writer: impl Fn(
&emit::event::Event<&dyn emit::props::ErasedProps>,
&mut fmt::Formatter,
) -> fmt::Result
+ Send
+ Sync
+ 'static,
) -> Self {
self.log_body = Box::new(writer);
self
}
pub fn metric_source(&self) -> EmitOpenTelemetryMetrics {
EmitOpenTelemetryMetrics {
metrics: self.metrics.clone(),
}
}
}
impl<L: Logger> emit::Emitter for OpenTelemetryEmitter<L> {
fn emit<E: emit::event::ToEvent>(&self, evt: E) {
let evt = evt.to_event();
if emit::kind::is_span_filter().matches(&evt) {
let mut emitted = false;
with_current(|frame| {
if frame.options.end_on_close {
let ctxt = Context::current();
let span = ctxt.span();
let span_id = span.span_context().span_id();
let evt_span_id = evt
.props()
.pull::<emit::SpanId, _>(KEY_SPAN_ID)
.map(otel_span_id);
if Some(span_id) == evt_span_id {
let mut status = Status::Ok;
let mut status_is_descriptive_err = false;
let mut span_links = Vec::new();
let _ = evt.props().for_each(|k, v| {
if k == KEY_LVL {
if let Some(emit::Level::Error) = v.by_ref().cast::<emit::Level>() {
if !status_is_descriptive_err {
status = Status::error("error");
}
return ControlFlow::Continue(());
}
}
if k == KEY_ERR {
span.add_event(
"exception",
vec![KeyValue::new("exception.message", v.to_string())],
);
status = Status::error(v.to_string());
status_is_descriptive_err = true;
return ControlFlow::Continue(());
}
if k == KEY_SPAN_LINKS {
span_links = otel_span_links(
v,
span.span_context().trace_flags(),
span.span_context().is_remote(),
span.span_context().trace_state(),
);
return ControlFlow::Continue(());
}
if k == KEY_TRACE_ID
|| k == KEY_SPAN_ID
|| k == KEY_SPAN_PARENT
|| k == KEY_EVT_KIND
|| k == KEY_SPAN_KIND
|| k == KEY_SPAN_NAME
{
return ControlFlow::Continue(());
}
if let Some(v) = otel_span_value(v) {
span.set_attribute(KeyValue::new(k.to_cow(), v));
}
ControlFlow::Continue(())
});
if frame.options.update_default_name_on_close {
span.update_name(
MessageRenderer {
fmt: &self.span_name,
evt: &evt,
}
.to_string(),
);
}
span.set_status(status);
for link in span_links {
span.add_link(link.span_context, Default::default());
}
if let Some(extent) = evt.extent().and_then(|ex| ex.as_range()) {
span.end_with_timestamp(extent.end.to_system_time());
} else {
span.end();
}
frame.options.end_on_close = false;
emitted = true;
}
}
});
if emitted {
return;
} else {
self.metrics.span_unexpected_emit.increment();
}
}
let mut record = self.logger.create_log_record();
let body = format!(
"{}",
MessageRenderer {
fmt: &self.log_body,
evt: &evt,
}
);
record.set_body(AnyValue::String(body.into()));
let mut trace_id = None;
let mut span_id = None;
let mut attributes = Vec::new();
{
let _ = evt.props().for_each(|k, v| {
if k == KEY_LVL {
match v.by_ref().cast::<emit::Level>() {
Some(emit::Level::Debug) => {
record.set_severity_number(Severity::Debug);
record.set_severity_text(LVL_DEBUG);
}
Some(emit::Level::Info) => {
record.set_severity_number(Severity::Info);
record.set_severity_text(LVL_INFO);
}
Some(emit::Level::Warn) => {
record.set_severity_number(Severity::Warn);
record.set_severity_text(LVL_WARN);
}
Some(emit::Level::Error) => {
record.set_severity_number(Severity::Error);
record.set_severity_text(LVL_ERROR);
}
None => {
record.set_severity_text("unknown");
}
}
return ControlFlow::Continue(());
}
if k == KEY_TRACE_ID {
if let Some(id) = v.by_ref().cast::<emit::TraceId>() {
trace_id = Some(otel_trace_id(id));
return ControlFlow::Continue(());
}
}
if k == KEY_SPAN_ID {
if let Some(id) = v.by_ref().cast::<emit::SpanId>() {
span_id = Some(otel_span_id(id));
return ControlFlow::Continue(());
}
}
if k == KEY_SPAN_PARENT {
if v.by_ref().cast::<emit::SpanId>().is_some() {
return ControlFlow::Continue(());
}
}
if k == KEY_SPAN_NAME || k == KEY_SPAN_KIND || k == KEY_SPAN_LINKS {
return ControlFlow::Continue(());
}
if let Some(v) = otel_log_value(v) {
attributes.push((Key::new(k.to_cow()), v));
}
ControlFlow::Continue(())
});
}
record.add_attributes(attributes);
if let Some(extent) = evt.extent() {
record.set_timestamp(extent.as_point().to_system_time());
}
let context = Context::current();
let span = context.span();
let span = span.span_context();
let trace_id = trace_id.unwrap_or_else(|| span.trace_id());
let span_id = span_id.unwrap_or_else(|| span.span_id());
let trace_flags = Some(span.trace_flags());
record.set_trace_context(trace_id, span_id, trace_flags);
self.logger.emit(record);
}
fn blocking_flush(&self, _: std::time::Duration) -> bool {
false
}
}
pub struct OpenTelemetryIsSampledFilter {}
impl emit::Filter for OpenTelemetryIsSampledFilter {
fn matches<E: emit::event::ToEvent>(&self, evt: E) -> bool {
let evt = evt.to_event();
if emit::kind::is_span_filter().matches(&evt) {
let ctxt = Context::current();
let span = ctxt.span();
let span_ctxt = span.span_context();
span_ctxt == &SpanContext::NONE || span_ctxt.is_sampled()
} else {
true
}
}
}
fn otel_trace_id(trace_id: emit::TraceId) -> TraceId {
TraceId::from_bytes(trace_id.to_bytes())
}
fn otel_span_id(span_id: emit::SpanId) -> SpanId {
SpanId::from_bytes(span_id.to_bytes())
}
fn otel_span_value(v: emit::Value) -> Option<Value> {
match any_value::serialize(&v) {
Ok(Some(av)) => match av {
AnyValue::Int(v) => Some(Value::I64(v)),
AnyValue::Double(v) => Some(Value::F64(v)),
AnyValue::String(v) => Some(Value::String(v)),
AnyValue::Boolean(v) => Some(Value::Bool(v)),
_ => Some(Value::String(v.to_string().into())),
},
Ok(None) => None,
Err(()) => Some(Value::String(v.to_string().into())),
}
}
fn otel_log_value(v: emit::Value) -> Option<AnyValue> {
match any_value::serialize(&v) {
Ok(v) => v,
Err(()) => Some(AnyValue::String(v.to_string().into())),
}
}
fn otel_span_links(
v: emit::Value,
trace_flags: TraceFlags,
is_remote: bool,
trace_state: &TraceState,
) -> Vec<Link> {
use serde::ser::{
Error, Impossible, Serialize, SerializeSeq, SerializeTuple, SerializeTupleStruct,
SerializeTupleVariant, Serializer, StdError,
};
struct Extract<'a> {
links: Vec<Link>,
trace_flags: TraceFlags,
is_remote: bool,
trace_state: &'a TraceState,
}
#[derive(Debug)]
struct ExtractError;
impl fmt::Display for ExtractError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("unsupported value for span links")
}
}
impl StdError for ExtractError {}
impl Error for ExtractError {
fn custom<T>(_: T) -> Self
where
T: Display,
{
ExtractError
}
}
impl<'a> Serializer for Extract<'a> {
type Ok = Vec<Link>;
type Error = ExtractError;
type SerializeSeq = Self;
type SerializeTuple = Self;
type SerializeTupleStruct = Self;
type SerializeTupleVariant = Self;
type SerializeMap = Impossible<Self::Ok, Self::Error>;
type SerializeStruct = Impossible<Self::Ok, Self::Error>;
type SerializeStructVariant = Impossible<Self::Ok, Self::Error>;
fn serialize_bool(self, _: bool) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i8(self, _: i8) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i16(self, _: i16) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i32(self, _: i32) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i64(self, _: i64) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u8(self, _: u8) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u16(self, _: u16) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u32(self, _: u32) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u64(self, _: u64) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_f32(self, _: f32) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_f64(self, _: f64) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_char(self, _: char) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_str(self, _: &str) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_bytes(self, _: &[u8]) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_some<T>(self, value: &T) -> Result<Self::Ok, Self::Error>
where
T: ?Sized + Serialize,
{
value.serialize(self)
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_unit_struct(self, _: &'static str) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_unit_variant(
self,
_: &'static str,
_: u32,
_: &'static str,
) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_newtype_struct<T>(
self,
_: &'static str,
value: &T,
) -> Result<Self::Ok, Self::Error>
where
T: ?Sized + Serialize,
{
value.serialize(self)
}
fn serialize_newtype_variant<T>(
self,
_: &'static str,
_: u32,
_: &'static str,
value: &T,
) -> Result<Self::Ok, Self::Error>
where
T: ?Sized + Serialize,
{
value.serialize(self)
}
fn serialize_seq(self, _: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Ok(self)
}
fn serialize_tuple(self, _: usize) -> Result<Self::SerializeTuple, Self::Error> {
Ok(self)
}
fn serialize_tuple_struct(
self,
_: &'static str,
_: usize,
) -> Result<Self::SerializeTupleStruct, Self::Error> {
Ok(self)
}
fn serialize_tuple_variant(
self,
_: &'static str,
_: u32,
_: &'static str,
_: usize,
) -> Result<Self::SerializeTupleVariant, Self::Error> {
Ok(self)
}
fn serialize_map(self, _: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Err(ExtractError)
}
fn serialize_struct(
self,
_: &'static str,
_: usize,
) -> Result<Self::SerializeStruct, Self::Error> {
Err(ExtractError)
}
fn serialize_struct_variant(
self,
_: &'static str,
_: u32,
_: &'static str,
_: usize,
) -> Result<Self::SerializeStructVariant, Self::Error> {
Err(ExtractError)
}
}
impl<'a> SerializeSeq for Extract<'a> {
type Ok = Vec<Link>;
type Error = ExtractError;
fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
where
T: ?Sized + Serialize,
{
self.links.push(value.serialize(ParseLink {
trace_flags: self.trace_flags,
is_remote: self.is_remote,
trace_state: self.trace_state,
})?);
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(self.links)
}
}
impl<'a> SerializeTuple for Extract<'a> {
type Ok = Vec<Link>;
type Error = ExtractError;
fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
where
T: ?Sized + Serialize,
{
self.links.push(value.serialize(ParseLink {
trace_flags: self.trace_flags,
is_remote: self.is_remote,
trace_state: self.trace_state,
})?);
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(self.links)
}
}
impl<'a> SerializeTupleStruct for Extract<'a> {
type Ok = Vec<Link>;
type Error = ExtractError;
fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
where
T: ?Sized + Serialize,
{
self.links.push(value.serialize(ParseLink {
trace_flags: self.trace_flags,
is_remote: self.is_remote,
trace_state: self.trace_state,
})?);
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(self.links)
}
}
impl<'a> SerializeTupleVariant for Extract<'a> {
type Ok = Vec<Link>;
type Error = ExtractError;
fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
where
T: ?Sized + Serialize,
{
self.links.push(value.serialize(ParseLink {
trace_flags: self.trace_flags,
is_remote: self.is_remote,
trace_state: &self.trace_state,
})?);
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(self.links)
}
}
struct ParseLink<'a> {
trace_flags: TraceFlags,
is_remote: bool,
trace_state: &'a TraceState,
}
impl<'a> Serializer for ParseLink<'a> {
type Ok = Link;
type Error = ExtractError;
type SerializeSeq = Impossible<Self::Ok, Self::Error>;
type SerializeTuple = Impossible<Self::Ok, Self::Error>;
type SerializeTupleStruct = Impossible<Self::Ok, Self::Error>;
type SerializeTupleVariant = Impossible<Self::Ok, Self::Error>;
type SerializeMap = Impossible<Self::Ok, Self::Error>;
type SerializeStruct = Impossible<Self::Ok, Self::Error>;
type SerializeStructVariant = Impossible<Self::Ok, Self::Error>;
fn serialize_bool(self, _: bool) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i8(self, _: i8) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i16(self, _: i16) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i32(self, _: i32) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_i64(self, _: i64) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u8(self, _: u8) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u16(self, _: u16) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u32(self, _: u32) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_u64(self, _: u64) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_f32(self, _: f32) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_f64(self, _: f64) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_char(self, _: char) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
let link = emit::span::SpanLink::try_from_str(v).map_err(|_| ExtractError)?;
Ok(Link::new(
SpanContext::new(
otel_trace_id(*link.trace_id()),
otel_span_id(*link.span_id()),
self.trace_flags,
self.is_remote,
self.trace_state.clone(),
),
Vec::new(),
0,
))
}
fn serialize_bytes(self, _: &[u8]) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_some<T>(self, value: &T) -> Result<Self::Ok, Self::Error>
where
T: ?Sized + Serialize,
{
value.serialize(self)
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_unit_struct(self, _: &'static str) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_unit_variant(
self,
_: &'static str,
_: u32,
_: &'static str,
) -> Result<Self::Ok, Self::Error> {
Err(ExtractError)
}
fn serialize_newtype_struct<T>(
self,
_: &'static str,
value: &T,
) -> Result<Self::Ok, Self::Error>
where
T: ?Sized + Serialize,
{
value.serialize(self)
}
fn serialize_newtype_variant<T>(
self,
_: &'static str,
_: u32,
_: &'static str,
value: &T,
) -> Result<Self::Ok, Self::Error>
where
T: ?Sized + Serialize,
{
value.serialize(self)
}
fn serialize_seq(self, _: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Err(ExtractError)
}
fn serialize_tuple(self, _: usize) -> Result<Self::SerializeTuple, Self::Error> {
Err(ExtractError)
}
fn serialize_tuple_struct(
self,
_: &'static str,
_: usize,
) -> Result<Self::SerializeTupleStruct, Self::Error> {
Err(ExtractError)
}
fn serialize_tuple_variant(
self,
_: &'static str,
_: u32,
_: &'static str,
_: usize,
) -> Result<Self::SerializeTupleVariant, Self::Error> {
Err(ExtractError)
}
fn serialize_map(self, _: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Err(ExtractError)
}
fn serialize_struct(
self,
_: &'static str,
_: usize,
) -> Result<Self::SerializeStruct, Self::Error> {
Err(ExtractError)
}
fn serialize_struct_variant(
self,
_: &'static str,
_: u32,
_: &'static str,
_: usize,
) -> Result<Self::SerializeStructVariant, Self::Error> {
Err(ExtractError)
}
}
v.serialize(Extract {
links: Vec::new(),
trace_flags,
is_remote,
trace_state,
})
.unwrap_or_default()
}
mod any_value {
use std::{collections::HashMap, fmt};
use opentelemetry::{logs::AnyValue, Key, StringValue};
use serde::ser::{
Error, Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeStructVariant,
SerializeTuple, SerializeTupleStruct, SerializeTupleVariant, Serializer, StdError,
};
pub(crate) fn serialize(value: impl Serialize) -> Result<Option<AnyValue>, ()> {
value.serialize(ValueSerializer).map_err(|_| ())
}
struct ValueSerializer;
struct ValueSerializeSeq {
value: Vec<AnyValue>,
}
struct ValueSerializeTuple {
value: Vec<AnyValue>,
}
struct ValueSerializeTupleStruct {
value: Vec<AnyValue>,
}
struct ValueSerializeMap {
key: Option<Key>,
value: HashMap<Key, AnyValue>,
}
struct ValueSerializeStruct {
value: HashMap<Key, AnyValue>,
}
struct ValueSerializeTupleVariant {
variant: &'static str,
value: Vec<AnyValue>,
}
struct ValueSerializeStructVariant {
variant: &'static str,
value: HashMap<Key, AnyValue>,
}
#[derive(Debug)]
struct ValueError(String);
impl fmt::Display for ValueError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl Error for ValueError {
fn custom<T>(msg: T) -> Self
where
T: fmt::Display,
{
ValueError(msg.to_string())
}
}
impl StdError for ValueError {}
impl Serializer for ValueSerializer {
type Ok = Option<AnyValue>;
type Error = ValueError;
type SerializeSeq = ValueSerializeSeq;
type SerializeTuple = ValueSerializeTuple;
type SerializeTupleStruct = ValueSerializeTupleStruct;
type SerializeTupleVariant = ValueSerializeTupleVariant;
type SerializeMap = ValueSerializeMap;
type SerializeStruct = ValueSerializeStruct;
type SerializeStructVariant = ValueSerializeStructVariant;
fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Boolean(v)))
}
fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
self.serialize_i64(v as i64)
}
fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
self.serialize_i64(v as i64)
}
fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
self.serialize_i64(v as i64)
}
fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Int(v)))
}
fn serialize_i128(self, v: i128) -> Result<Self::Ok, Self::Error> {
if let Ok(v) = v.try_into() {
self.serialize_i64(v)
} else {
self.collect_str(&v)
}
}
fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
self.serialize_i64(v as i64)
}
fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
self.serialize_i64(v as i64)
}
fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
self.serialize_i64(v as i64)
}
fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
if let Ok(v) = v.try_into() {
self.serialize_i64(v)
} else {
self.collect_str(&v)
}
}
fn serialize_u128(self, v: u128) -> Result<Self::Ok, Self::Error> {
if let Ok(v) = v.try_into() {
self.serialize_i64(v)
} else {
self.collect_str(&v)
}
}
fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
self.serialize_f64(v as f64)
}
fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Double(v)))
}
fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> {
self.collect_str(&v)
}
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::String(StringValue::from(v.to_owned()))))
}
fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Bytes(Box::new(v.to_owned()))))
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Ok(None)
}
fn serialize_some<T: serde::Serialize + ?Sized>(
self,
value: &T,
) -> Result<Self::Ok, Self::Error> {
value.serialize(self)
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Ok(None)
}
fn serialize_unit_struct(self, name: &'static str) -> Result<Self::Ok, Self::Error> {
name.serialize(self)
}
fn serialize_unit_variant(
self,
_: &'static str,
_: u32,
variant: &'static str,
) -> Result<Self::Ok, Self::Error> {
variant.serialize(self)
}
fn serialize_newtype_struct<T: serde::Serialize + ?Sized>(
self,
_: &'static str,
value: &T,
) -> Result<Self::Ok, Self::Error> {
value.serialize(self)
}
fn serialize_newtype_variant<T: serde::Serialize + ?Sized>(
self,
_: &'static str,
_: u32,
variant: &'static str,
value: &T,
) -> Result<Self::Ok, Self::Error> {
let mut map = self.serialize_map(Some(1))?;
map.serialize_entry(variant, value)?;
map.end()
}
fn serialize_seq(self, _: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Ok(ValueSerializeSeq { value: Vec::new() })
}
fn serialize_tuple(self, _: usize) -> Result<Self::SerializeTuple, Self::Error> {
Ok(ValueSerializeTuple { value: Vec::new() })
}
fn serialize_tuple_struct(
self,
_: &'static str,
_: usize,
) -> Result<Self::SerializeTupleStruct, Self::Error> {
Ok(ValueSerializeTupleStruct { value: Vec::new() })
}
fn serialize_tuple_variant(
self,
_: &'static str,
_: u32,
variant: &'static str,
_: usize,
) -> Result<Self::SerializeTupleVariant, Self::Error> {
Ok(ValueSerializeTupleVariant {
variant,
value: Vec::new(),
})
}
fn serialize_map(self, _: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Ok(ValueSerializeMap {
key: None,
value: HashMap::new(),
})
}
fn serialize_struct(
self,
_: &'static str,
_: usize,
) -> Result<Self::SerializeStruct, Self::Error> {
Ok(ValueSerializeStruct {
value: HashMap::new(),
})
}
fn serialize_struct_variant(
self,
_: &'static str,
_: u32,
variant: &'static str,
_: usize,
) -> Result<Self::SerializeStructVariant, Self::Error> {
Ok(ValueSerializeStructVariant {
variant,
value: HashMap::new(),
})
}
}
impl SerializeSeq for ValueSerializeSeq {
type Ok = Option<AnyValue>;
type Error = ValueError;
fn serialize_element<T: serde::Serialize + ?Sized>(
&mut self,
value: &T,
) -> Result<(), Self::Error> {
if let Some(value) = value.serialize(ValueSerializer)? {
self.value.push(value);
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::ListAny(Box::new(self.value))))
}
}
impl SerializeTuple for ValueSerializeTuple {
type Ok = Option<AnyValue>;
type Error = ValueError;
fn serialize_element<T: serde::Serialize + ?Sized>(
&mut self,
value: &T,
) -> Result<(), Self::Error> {
if let Some(value) = value.serialize(ValueSerializer)? {
self.value.push(value);
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::ListAny(Box::new(self.value))))
}
}
impl SerializeTupleStruct for ValueSerializeTupleStruct {
type Ok = Option<AnyValue>;
type Error = ValueError;
fn serialize_field<T: serde::Serialize + ?Sized>(
&mut self,
value: &T,
) -> Result<(), Self::Error> {
if let Some(value) = value.serialize(ValueSerializer)? {
self.value.push(value);
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::ListAny(Box::new(self.value))))
}
}
impl SerializeTupleVariant for ValueSerializeTupleVariant {
type Ok = Option<AnyValue>;
type Error = ValueError;
fn serialize_field<T: serde::Serialize + ?Sized>(
&mut self,
value: &T,
) -> Result<(), Self::Error> {
if let Some(value) = value.serialize(ValueSerializer)? {
self.value.push(value);
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Map({
let mut variant = HashMap::new();
variant.insert(
Key::from(self.variant),
AnyValue::ListAny(Box::new(self.value)),
);
Box::new(variant)
})))
}
}
impl SerializeMap for ValueSerializeMap {
type Ok = Option<AnyValue>;
type Error = ValueError;
fn serialize_key<T: serde::Serialize + ?Sized>(
&mut self,
key: &T,
) -> Result<(), Self::Error> {
let key = match key.serialize(ValueSerializer)? {
Some(AnyValue::String(key)) => Key::from(String::from(key)),
key => Key::from(format!("{:?}", key)),
};
self.key = Some(key);
Ok(())
}
fn serialize_value<T: serde::Serialize + ?Sized>(
&mut self,
value: &T,
) -> Result<(), Self::Error> {
let key = self
.key
.take()
.ok_or_else(|| Self::Error::custom("missing key"))?;
if let Some(value) = value.serialize(ValueSerializer)? {
self.value.insert(key, value);
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Map(Box::new(self.value))))
}
}
impl SerializeStruct for ValueSerializeStruct {
type Ok = Option<AnyValue>;
type Error = ValueError;
fn serialize_field<T: serde::Serialize + ?Sized>(
&mut self,
key: &'static str,
value: &T,
) -> Result<(), Self::Error> {
let key = Key::from(key);
if let Some(value) = value.serialize(ValueSerializer)? {
self.value.insert(key, value);
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Map(Box::new(self.value))))
}
}
impl SerializeStructVariant for ValueSerializeStructVariant {
type Ok = Option<AnyValue>;
type Error = ValueError;
fn serialize_field<T: serde::Serialize + ?Sized>(
&mut self,
key: &'static str,
value: &T,
) -> Result<(), Self::Error> {
let key = Key::from(key);
if let Some(value) = value.serialize(ValueSerializer)? {
self.value.insert(key, value);
}
Ok(())
}
fn end(self) -> Result<Self::Ok, Self::Error> {
Ok(Some(AnyValue::Map({
let mut variant = HashMap::new();
variant.insert(Key::from(self.variant), AnyValue::Map(Box::new(self.value)));
Box::new(variant)
})))
}
}
}
#[cfg(test)]
mod tests {
use std::io;
use super::*;
use emit::runtime::AmbientSlot;
use opentelemetry::trace::{SamplingDecision, SamplingResult, TraceState};
use opentelemetry_sdk::{
logs::{in_memory_exporter::InMemoryLogExporter, SdkLoggerProvider},
trace::{in_memory_exporter::InMemorySpanExporter, SdkTracerProvider},
};
fn build(
slot: &AmbientSlot,
) -> (
InMemoryLogExporter,
InMemorySpanExporter,
SdkLoggerProvider,
SdkTracerProvider,
) {
let logger_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(logger_exporter.clone())
.build();
let tracer_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(tracer_exporter.clone())
.build();
let _ = setup(logger_provider.clone(), tracer_provider.clone()).init_slot(slot);
(
logger_exporter,
tracer_exporter,
logger_provider,
tracer_provider,
)
}
#[test]
fn emit_log() {
let slot = AmbientSlot::new();
let (logs, _, _, _) = build(&slot);
emit::emit!(rt: slot.get(), "test {attr}", attr: "log");
let logs = logs.get_emitted_logs().unwrap();
assert_eq!(1, logs.len());
let Some(AnyValue::String(body)) = logs[0].record.body() else {
panic!("unexpected log body value");
};
assert_eq!("test log", body.as_str());
}
#[test]
fn emit_log_trace_context() {
let slot = AmbientSlot::new();
let (logs, _, _, _) = build(&slot);
emit::emit!(rt: slot.get(), "test log", trace_id: "4bf92f3577b34da6a3ce929d0e0e4736", span_id: "00f067aa0ba902b7");
let logs = logs.get_emitted_logs().unwrap();
assert_eq!(1, logs.len());
assert_eq!(
"4bf92f3577b34da6a3ce929d0e0e4736",
logs[0].record.trace_context().unwrap().trace_id.to_string()
);
assert_eq!(
"00f067aa0ba902b7",
logs[0].record.trace_context().unwrap().span_id.to_string()
);
}
#[test]
fn emit_span() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), "emit {attr}", attr: "span")]
fn emit_span() -> emit::span::SpanCtxt {
emit::span::SpanCtxt::current(SLOT.get().ctxt())
}
let ctxt = emit_span();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, spans.len());
assert_eq!("emit {attr}", spans[0].name);
assert_eq!(
ctxt.trace_id().unwrap().to_bytes(),
spans[0].span_context.trace_id().to_bytes()
);
assert_eq!(
ctxt.span_id().unwrap().to_bytes(),
spans[0].span_context.span_id().to_bytes()
);
assert!(ctxt.span_parent().is_none());
assert_eq!(
opentelemetry::trace::SpanId::INVALID,
spans[0].parent_span_id
);
}
#[test]
fn emit_span_direct() {
let slot = AmbientSlot::new();
let (logs, spans, _, _) = build(&slot);
emit::emit!(
rt: slot.get(),
evt: emit::Span::new(
emit::mdl!(),
"test",
"2024-01-01T00:00:00.000Z".parse::<emit::Timestamp>().unwrap().."2024-01-01T00:00:01.000Z".parse::<emit::Timestamp>().unwrap(),
emit::span::SpanCtxt::new_root(slot.get().rng()),
),
);
let logs = logs.get_emitted_logs().unwrap();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, logs.len());
assert_eq!(0, spans.len());
}
#[test]
fn emit_span_otel_span() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, tracer_provider) = build(&SLOT);
#[emit::span(rt: SLOT.get(), "emit span")]
fn emit_span(tracer_provider: &SdkTracerProvider) -> (SpanContext, emit::span::SpanCtxt) {
fn otel_span(tracer_provider: &SdkTracerProvider) -> SpanContext {
use opentelemetry::trace::TracerProvider;
tracer_provider
.tracer("otel_span")
.in_span("otel span", |cx| cx.span().span_context().clone())
}
(
otel_span(&tracer_provider),
emit::span::SpanCtxt::current(SLOT.get().ctxt()),
)
}
let (otel_ctxt, emit_ctxt) = emit_span(&tracer_provider);
let spans = spans.get_finished_spans().unwrap();
assert_eq!(2, spans.len());
assert_eq!(
otel_ctxt.trace_id().to_bytes(),
emit_ctxt.trace_id().unwrap().to_bytes()
);
assert_eq!("otel span", spans[0].name);
assert_eq!(
otel_ctxt.trace_id().to_bytes(),
spans[0].span_context.trace_id().to_bytes()
);
assert_eq!(
otel_ctxt.span_id().to_bytes(),
spans[0].span_context.span_id().to_bytes()
);
assert_eq!(
emit_ctxt.span_id().unwrap().to_bytes(),
spans[0].parent_span_id.to_bytes()
);
assert_eq!("emit span", spans[1].name);
assert_eq!(
emit_ctxt.trace_id().unwrap().to_bytes(),
spans[1].span_context.trace_id().to_bytes()
);
assert_eq!(
emit_ctxt.span_id().unwrap().to_bytes(),
spans[1].span_context.span_id().to_bytes()
);
assert!(emit_ctxt.span_parent().is_none());
}
#[test]
fn emit_span_otel_log() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (logs, _, logger_provider, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), "emit span")]
fn emit_span(logger_provider: &SdkLoggerProvider) -> emit::span::SpanCtxt {
use opentelemetry::logs::LoggerProvider;
let logger = logger_provider.logger("otel_logger");
let mut log = logger.create_log_record();
log.set_body(AnyValue::String("test log".into()));
logger.emit(log);
emit::span::SpanCtxt::current(SLOT.get().ctxt())
}
let ctxt = emit_span(&logger_provider);
let logs = logs.get_emitted_logs().unwrap();
assert_eq!(1, logs.len());
assert!(logs[0]
.record
.trace_context()
.unwrap()
.trace_flags
.unwrap()
.is_sampled());
assert_eq!(
ctxt.trace_id().unwrap().to_bytes(),
logs[0].record.trace_context().unwrap().trace_id.to_bytes()
);
assert_eq!(
ctxt.span_id().unwrap().to_bytes(),
logs[0].record.trace_context().unwrap().span_id.to_bytes()
);
}
#[test]
fn emit_span_span_name() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), "emit span", span_name: "custom name")]
fn emit_span() {}
emit_span();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, spans.len());
assert_eq!("custom name", spans[0].name);
}
#[test]
fn emit_span_span_kind() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), "emit span", span_kind: "producer")]
fn emit_span() {}
emit_span();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, spans.len());
assert_eq!(SpanKind::Producer, spans[0].span_kind);
}
#[test]
fn emit_span_span_links() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(
rt: SLOT.get(),
"emit span",
#[emit::as_serde]
span_links: [
"4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7",
],
)]
fn emit_span() {}
emit_span();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, spans.len());
assert_eq!(1, spans[0].links.links.len());
assert_eq!(
"4bf92f3577b34da6a3ce929d0e0e4736",
spans[0].links.links[0].span_context.trace_id().to_string()
);
assert_eq!(
"00f067aa0ba902b7",
spans[0].links.links[0].span_context.span_id().to_string()
);
}
#[test]
fn emit_span_ok() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), ok_lvl: "info", "emit span")]
fn emit_span() -> Result<(), io::Error> {
Ok(())
}
let _ = emit_span();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, spans.len());
assert_eq!(Status::Ok, spans[0].status);
}
#[test]
fn emit_span_err() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), err_lvl: "error", "emit span")]
fn emit_span() -> Result<(), io::Error> {
Err(io::Error::new(io::ErrorKind::Other, "something failed!"))
}
let _ = emit_span();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, spans.len());
assert_eq!(Status::error("something failed!"), spans[0].status);
}
#[test]
fn emit_span_lvl_err() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), ok_lvl: "error", "emit span")]
fn emit_span() -> Result<(), io::Error> {
Ok(())
}
let _ = emit_span();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, spans.len());
assert_eq!(Status::error("error"), spans[0].status);
}
#[test]
fn emit_span_unsampled_otel_span() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, tracer_provider) = build(&SLOT);
#[emit::span(rt: SLOT.get(), when: emit::filter::from_fn(|_| false), "emit span")]
fn emit_span(tracer_provider: &SdkTracerProvider) -> SpanContext {
fn otel_span(tracer_provider: &SdkTracerProvider) -> SpanContext {
use opentelemetry::trace::TracerProvider;
tracer_provider
.tracer("otel_span")
.in_span("otel span", |cx| cx.span().span_context().clone())
}
otel_span(&tracer_provider)
}
let otel_ctxt = emit_span(&tracer_provider);
let spans = spans.get_finished_spans().unwrap();
assert_eq!(0, spans.len());
assert!(!otel_ctxt.is_sampled());
}
#[test]
fn emit_span_unsampled_emit_span() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), when: emit::filter::from_fn(|_| false), "emit span")]
fn emit_span_outer() {
#[emit::span(rt: SLOT.get(), "emit span")]
fn emit_span_inner() {}
emit_span_inner()
}
emit_span_outer();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(0, spans.len());
}
#[test]
fn emit_span_unsampled_otel_log() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (logs, _, logger_provider, _) = build(&SLOT);
#[emit::span(rt: SLOT.get(), when: emit::filter::from_fn(|_| false), "emit span")]
fn emit_span(logger_provider: &SdkLoggerProvider) -> emit::span::SpanCtxt {
use opentelemetry::logs::LoggerProvider;
let logger = logger_provider.logger("otel_logger");
let mut log = logger.create_log_record();
log.set_body(AnyValue::String("test log".into()));
logger.emit(log);
emit::span::SpanCtxt::current(SLOT.get().ctxt())
}
let ctxt = emit_span(&logger_provider);
let logs = logs.get_emitted_logs().unwrap();
assert_eq!(1, logs.len());
assert!(!logs[0]
.record
.trace_context()
.unwrap()
.trace_flags
.unwrap()
.is_sampled());
assert_eq!(
ctxt.trace_id().unwrap().to_bytes(),
logs[0].record.trace_context().unwrap().trace_id.to_bytes()
);
assert_eq!(
ctxt.span_id().unwrap().to_bytes(),
logs[0].record.trace_context().unwrap().span_id.to_bytes()
);
}
#[test]
fn emit_ctxt_clear_otel_ctxt() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, _, _, tracer_provider) = build(&SLOT);
fn otel_span(tracer_provider: &SdkTracerProvider) -> SpanContext {
use opentelemetry::trace::TracerProvider;
tracer_provider
.tracer("otel_span")
.in_span("otel span", |_| {
emit::Frame::root(SLOT.get().ctxt(), emit::Empty)
.call(|| Context::current().span().span_context().clone())
})
}
let cx = otel_span(&tracer_provider);
assert_eq!(SpanContext::NONE, cx);
}
#[test]
fn emit_ctxt_push_otel_ctxt() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, _, _, tracer_provider) = build(&SLOT);
fn otel_span(tracer_provider: &SdkTracerProvider) -> (SpanContext, SpanContext) {
use opentelemetry::trace::TracerProvider;
tracer_provider
.tracer("otel_span")
.in_span("otel span", |_| {
let outer = Context::current().span().span_context().clone();
let inner = emit::Frame::push(SLOT.get().ctxt(), emit::Empty)
.call(|| Context::current().span().span_context().clone());
(outer, inner)
})
}
let (outer, inner) = otel_span(&tracer_provider);
assert_eq!(outer.trace_id(), inner.trace_id());
assert_eq!(outer.span_id(), inner.span_id());
assert_eq!(outer.trace_flags(), inner.trace_flags());
}
#[test]
fn otel_span_emit_span() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (_, spans, _, tracer_provider) = build(&SLOT);
fn otel_span(tracer_provider: &SdkTracerProvider) -> (SpanContext, emit::span::SpanCtxt) {
use opentelemetry::trace::TracerProvider;
#[emit::span(rt: SLOT.get(), "emit span")]
fn emit_span() -> emit::span::SpanCtxt {
emit::span::SpanCtxt::current(SLOT.get().ctxt())
}
tracer_provider
.tracer("otel_span")
.in_span("otel span", |cx| {
(cx.span().span_context().clone(), emit_span())
})
}
let (otel_ctxt, emit_ctxt) = otel_span(&tracer_provider);
let spans = spans.get_finished_spans().unwrap();
assert_eq!(2, spans.len());
assert_eq!(
otel_ctxt.trace_id().to_bytes(),
emit_ctxt.trace_id().unwrap().to_bytes()
);
assert_eq!("emit span", spans[0].name);
assert_eq!(
emit_ctxt.trace_id().unwrap().to_bytes(),
spans[0].span_context.trace_id().to_bytes()
);
assert_eq!(
emit_ctxt.span_id().unwrap().to_bytes(),
spans[0].span_context.span_id().to_bytes()
);
assert!(emit_ctxt.span_parent().is_none(),);
assert!(emit_ctxt.span_parent().is_none(),);
assert_eq!("otel span", spans[1].name);
assert_eq!(
otel_ctxt.trace_id().to_bytes(),
spans[1].span_context.trace_id().to_bytes()
);
assert_eq!(
otel_ctxt.span_id().to_bytes(),
spans[1].span_context.span_id().to_bytes()
);
assert_eq!(SpanId::INVALID, spans[1].parent_span_id);
}
#[test]
fn otel_span_emit_log() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (logs, _, _, tracer_provider) = build(&SLOT);
fn otel_span(tracer_provider: &SdkTracerProvider) -> SpanContext {
use opentelemetry::trace::TracerProvider;
tracer_provider
.tracer("otel_span")
.in_span("otel span", |cx| {
emit::emit!(rt: SLOT.get(), "emit event");
cx.span().span_context().clone()
})
}
let ctxt = otel_span(&tracer_provider);
let logs = logs.get_emitted_logs().unwrap();
assert_eq!(1, logs.len());
assert!(logs[0]
.record
.trace_context()
.unwrap()
.trace_flags
.unwrap()
.is_sampled());
assert_eq!(
ctxt.trace_id(),
logs[0].record.trace_context().unwrap().trace_id
);
assert_eq!(
ctxt.span_id(),
logs[0].record.trace_context().unwrap().span_id
);
}
#[test]
fn otel_span_unsampled_emit_span() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (logs, spans, _, tracer_provider) = build(&SLOT);
#[emit::span(rt: SLOT.get(), "emit {attr}", attr: "span")]
fn emit_span() {
emit::emit!(rt: SLOT.get(), "emit event");
}
fn otel_span(tracer_provider: &SdkTracerProvider) {
use opentelemetry::trace::TracerProvider;
let tracer = tracer_provider.tracer("otel_span");
let span = tracer
.span_builder("otel span")
.with_sampling_result(SamplingResult {
decision: SamplingDecision::RecordOnly,
attributes: Vec::new(),
trace_state: TraceState::NONE,
});
let span = span.start(&tracer);
let cx = Context::current_with_span(span);
let _guard = cx.attach();
emit_span();
}
otel_span(&tracer_provider);
let logs = logs.get_emitted_logs().unwrap();
let spans = spans.get_finished_spans().unwrap();
assert_eq!(1, logs.len());
assert_eq!(0, spans.len());
assert!(!logs[0]
.record
.trace_context()
.unwrap()
.trace_flags
.unwrap()
.is_sampled());
}
#[test]
fn otel_span_unsampled_emit_log() {
static SLOT: AmbientSlot = AmbientSlot::new();
let (logs, _, _, tracer_provider) = build(&SLOT);
fn otel_span(tracer_provider: &SdkTracerProvider) -> SpanContext {
use opentelemetry::trace::TracerProvider;
let tracer = tracer_provider.tracer("otel_span");
let span = tracer.span_builder("otel span");
let span = span
.with_sampling_result(SamplingResult {
decision: SamplingDecision::Drop,
attributes: vec![],
trace_state: Default::default(),
})
.start(&tracer);
let cx = Context::current_with_span(span);
let _guard = cx.attach();
emit::emit!(rt: SLOT.get(), "emit event");
Context::current().span().span_context().clone()
}
let ctxt = otel_span(&tracer_provider);
let logs = logs.get_emitted_logs().unwrap();
assert_eq!(1, logs.len());
assert!(!logs[0]
.record
.trace_context()
.unwrap()
.trace_flags
.unwrap()
.is_sampled());
assert_eq!(
ctxt.trace_id(),
logs[0].record.trace_context().unwrap().trace_id
);
assert_eq!(
ctxt.span_id(),
logs[0].record.trace_context().unwrap().span_id
);
}
#[test]
fn emit_value_to_otel_attribute() {
use opentelemetry::Key;
use std::collections::HashMap;
#[derive(serde::Serialize)]
struct Struct {
a: i32,
b: i32,
c: i32,
}
#[derive(serde::Serialize)]
struct Newtype(i32);
#[derive(serde::Serialize)]
enum Enum {
Unit,
Newtype(i32),
Struct { a: i32, b: i32, c: i32 },
Tuple(i32, i32, i32),
}
struct Bytes<B>(B);
impl<B: AsRef<[u8]>> serde::Serialize for Bytes<B> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_bytes(self.0.as_ref())
}
}
struct Map {
a: i32,
b: i32,
c: i32,
}
impl serde::Serialize for Map {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(3))?;
map.serialize_entry(&"a", &self.a)?;
map.serialize_entry(&"b", &self.b)?;
map.serialize_entry(&"c", &self.c)?;
map.end()
}
}
let slot = AmbientSlot::new();
let (logs, _, _, _) = build(&slot);
emit::emit!(
rt: slot.get(),
"test log",
str_value: "a string",
u8_value: 1u8,
u16_value: 2u16,
u32_value: 42u32,
u64_value: 2147483660u64,
u128_small_value: 2147483660u128,
u128_big_value: 9223372036854775820u128,
i8_value: 1i8,
i16_value: 2i16,
i32_value: 42i32,
i64_value: 2147483660i64,
i128_small_value: 2147483660i128,
i128_big_value: 9223372036854775820i128,
f64_value: 4.2,
bool_value: true,
#[emit::as_serde]
bytes_value: Bytes([1, 1, 1]),
#[emit::as_serde]
unit_value: (),
#[emit::as_serde]
some_value: Some(42),
#[emit::as_serde]
none_value: None::<i32>,
#[emit::as_serde]
slice_value: [1, 1, 1] as [i32; 3],
#[emit::as_serde]
map_value: Map { a: 1, b: 1, c: 1 },
#[emit::as_serde]
struct_value: Struct { a: 1, b: 1, c: 1 },
#[emit::as_serde]
tuple_value: (1, 1, 1),
#[emit::as_serde]
newtype_value: Newtype(42),
#[emit::as_serde]
unit_variant_value: Enum::Unit,
#[emit::as_serde]
unit_variant_value: Enum::Unit,
#[emit::as_serde]
newtype_variant_value: Enum::Newtype(42),
#[emit::as_serde]
struct_variant_value: Enum::Struct { a: 1, b: 1, c: 1 },
#[emit::as_serde]
tuple_variant_value: Enum::Tuple(1, 1, 1),
);
let logs = logs.get_emitted_logs().unwrap();
let get = |needle: &str| -> Option<AnyValue> {
logs[0].record.attributes_iter().find_map(|(k, v)| {
if k.as_str() == needle {
Some(v.clone())
} else {
None
}
})
};
assert_eq!(
AnyValue::String("a string".into()),
get("str_value").unwrap()
);
assert_eq!(AnyValue::Int(1), get("i8_value").unwrap());
assert_eq!(AnyValue::Int(2), get("i16_value").unwrap());
assert_eq!(AnyValue::Int(42), get("i32_value").unwrap());
assert_eq!(AnyValue::Int(2147483660), get("i64_value").unwrap());
assert_eq!(AnyValue::Int(2147483660), get("i128_small_value").unwrap());
assert_eq!(
AnyValue::String("9223372036854775820".into()),
get("i128_big_value").unwrap()
);
assert_eq!(AnyValue::Double(4.2), get("f64_value").unwrap());
assert_eq!(AnyValue::Boolean(true), get("bool_value").unwrap());
assert_eq!(None, get("unit_value"));
assert_eq!(None, get("none_value"));
assert_eq!(AnyValue::Int(42), get("some_value").unwrap());
assert_eq!(
AnyValue::ListAny(Box::new(vec![
AnyValue::Int(1),
AnyValue::Int(1),
AnyValue::Int(1)
])),
get("slice_value").unwrap()
);
assert_eq!(
AnyValue::Map({
let mut map = HashMap::<Key, AnyValue>::default();
map.insert(Key::from("a"), AnyValue::Int(1));
map.insert(Key::from("b"), AnyValue::Int(1));
map.insert(Key::from("c"), AnyValue::Int(1));
Box::new(map)
}),
get("map_value").unwrap()
);
assert_eq!(
AnyValue::Map({
let mut map = HashMap::<Key, AnyValue>::default();
map.insert(Key::from("a"), AnyValue::Int(1));
map.insert(Key::from("b"), AnyValue::Int(1));
map.insert(Key::from("c"), AnyValue::Int(1));
Box::new(map)
}),
get("struct_value").unwrap()
);
assert_eq!(
AnyValue::ListAny(Box::new(vec![
AnyValue::Int(1),
AnyValue::Int(1),
AnyValue::Int(1)
])),
get("tuple_value").unwrap()
);
assert_eq!(
AnyValue::String("Unit".into()),
get("unit_variant_value").unwrap()
);
assert_eq!(
AnyValue::Map({
let mut map = HashMap::new();
map.insert(Key::from("Newtype"), AnyValue::Int(42));
Box::new(map)
}),
get("newtype_variant_value").unwrap()
);
assert_eq!(
AnyValue::Map({
let mut map = HashMap::new();
map.insert(
Key::from("Struct"),
AnyValue::Map({
let mut map = HashMap::new();
map.insert(Key::from("a"), AnyValue::Int(1));
map.insert(Key::from("b"), AnyValue::Int(1));
map.insert(Key::from("c"), AnyValue::Int(1));
Box::new(map)
}),
);
Box::new(map)
}),
get("struct_variant_value").unwrap()
);
assert_eq!(
AnyValue::Map({
let mut map = HashMap::new();
map.insert(
Key::from("Tuple"),
AnyValue::ListAny(Box::new(vec![
AnyValue::Int(1),
AnyValue::Int(1),
AnyValue::Int(1),
])),
);
Box::new(map)
}),
get("tuple_variant_value").unwrap()
);
}
}