use std::{
collections::HashMap,
time::{Duration, Instant},
};
use obs_core::SchemaRegistry;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use crate::{
env_config::{OtlpEndpoint, OtlpResourceAttrs},
logs::ResourceMessage,
mapping::{SpanEventRecord, SpanRecord, project_span},
};
pub const DEFAULT_PAIR_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OtlpTracePayload {
pub resource: ResourceMessage,
pub endpoint: String,
pub spans: Vec<SpanRecord>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub orphaned: Vec<String>,
}
#[derive(Debug)]
pub struct SpanPairTracker {
pending: Mutex<HashMap<(String, String), Pending>>,
timeout: Duration,
}
#[derive(Debug)]
struct Pending {
record: SpanEventRecord,
full_name: String,
queued_at: Instant,
}
impl Default for SpanPairTracker {
fn default() -> Self {
Self::with_timeout(DEFAULT_PAIR_TIMEOUT)
}
}
impl SpanPairTracker {
#[must_use]
pub fn with_timeout(timeout: Duration) -> Self {
Self {
pending: Mutex::new(HashMap::new()),
timeout,
}
}
fn note_started(&self, env: &obs_proto::obs::v1::ObsEnvelope) {
if env.trace_id.is_empty() && env.span_id.is_empty() {
return;
}
let key = (env.trace_id.clone(), env.span_id.clone());
let event = SpanEventRecord {
name: "started".to_string(),
time_unix_nano: env.ts_ns,
attributes: env
.labels
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
};
self.pending.lock().insert(
key,
Pending {
record: event,
full_name: env.full_name.clone(),
queued_at: Instant::now(),
},
);
}
fn pop_started(&self, env: &obs_proto::obs::v1::ObsEnvelope) -> Option<SpanEventRecord> {
let key = (env.trace_id.clone(), env.span_id.clone());
self.pending.lock().remove(&key).map(|p| p.record)
}
pub fn collect_orphaned(&self) -> Vec<String> {
let mut out = Vec::new();
let now = Instant::now();
let timeout = self.timeout;
let mut pending = self.pending.lock();
pending.retain(|_, p| {
let stale = now.saturating_duration_since(p.queued_at) >= timeout;
if stale {
out.push(p.full_name.clone());
}
!stale
});
out
}
}
impl OtlpTracePayload {
#[must_use]
pub fn from_envelopes(
envs: &[obs_proto::obs::v1::ObsEnvelope],
resource: &OtlpResourceAttrs,
endpoint: &OtlpEndpoint,
tracker: &SpanPairTracker,
registry: &SchemaRegistry,
) -> Self {
let mut spans = Vec::with_capacity(envs.len());
for env in envs {
let paired = registry.lookup(env).and_then(|s| s.spans_paired_with());
let is_started_half = match paired {
Some(_) => {
let key = (env.trace_id.clone(), env.span_id.clone());
let already_pending = tracker.pending.lock().contains_key(&key);
!already_pending
}
None => false,
};
let legacy_started =
env.full_name.ends_with("Started") || env.full_name.ends_with(".Start");
if is_started_half || legacy_started {
tracker.note_started(env);
continue;
}
let duration_ns = env
.labels
.get("latency_ns")
.and_then(|v| v.parse::<u64>().ok())
.or_else(|| {
env.labels
.get("latency_ms")
.and_then(|v| v.parse::<u64>().ok())
.map(|ms| ms.saturating_mul(1_000_000))
});
let mut span = project_span(env, duration_ns);
if let Some(started) = tracker.pop_started(env) {
span.events.push(started);
}
spans.push(span);
}
let orphaned = tracker.collect_orphaned();
Self {
resource: ResourceMessage::from_attrs(resource),
endpoint: endpoint.url.clone(),
spans,
orphaned,
}
}
}