use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use opentelemetry_proto::tonic::{
collector::trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
common::v1::any_value::Value,
common::v1::{AnyValue, KeyValue},
resource::v1::Resource,
trace::v1::{ResourceSpans, ScopeSpans, Span, span::Link, span::SpanKind},
};
use re_dataframe::QueryExpression;
use re_uri::Origin;
use tokio::sync::OnceCell;
use web_time::{Duration, SystemTime};
#[cfg(not(target_arch = "wasm32"))]
type Channel = tonic::transport::Channel;
#[cfg(target_arch = "wasm32")]
type Channel = tonic_web_wasm_client::Client;
const EXPORT_PATH: &str = "/opentelemetry.proto.collector.trace.v1.TraceService/Export";
#[derive(Clone)]
pub(crate) struct ConnectionAnalytics {
inner: Arc<Inner>,
}
impl std::fmt::Debug for ConnectionAnalytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionAnalytics")
.field("origin", &self.inner.origin)
.finish_non_exhaustive()
}
}
struct Inner {
origin: Origin,
client: tokio::sync::Mutex<Option<tonic::client::Grpc<Channel>>>,
server_version: OnceCell<Option<String>>,
}
impl ConnectionAnalytics {
pub fn new(origin: Origin) -> Self {
Self {
inner: Arc::new(Inner {
origin,
client: tokio::sync::Mutex::new(None),
server_version: OnceCell::new(),
}),
}
}
pub fn set_server_version(&self, version: Option<String>) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.server_version.set(version);
}
fn server_version(&self) -> Option<String> {
self.inner.server_version.get().and_then(Clone::clone)
}
pub fn begin_query(
&self,
query_info: QueryInfo,
scan_start: web_time::Instant,
) -> PendingQueryAnalytics {
PendingQueryAnalytics {
inner: Arc::new(PendingInner {
connection: self.clone(),
query_info,
fetch_stats: SharedFetchStats::default(),
scan_start,
time_to_first_chunk: OnceLock::new(),
direct_terminal_reason: OnceLock::new(),
error_kind: OnceLock::new(),
}),
}
}
fn send_span(&self, span: Span, trace_id: Option<opentelemetry::TraceId>) {
let this = self.clone();
let fut = async move {
if let Err(err) = this.send_span_impl(span, trace_id).await {
re_log::debug_once!(
"Failed to send analytics to Rerun Cloud: {} ({})",
err.code(),
err.message()
);
}
};
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(fut);
#[cfg(not(target_arch = "wasm32"))]
{
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(fut);
} else {
std::thread::Builder::new()
.name("query-analytics-sender".to_owned())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
match rt {
Ok(rt) => rt.block_on(fut),
Err(err) => {
re_log::debug_once!("Failed to create analytics runtime: {err}");
}
}
})
.ok();
}
}
}
async fn send_span_impl(
&self,
span: Span,
trace_id: Option<opentelemetry::TraceId>,
) -> tonic::Result<()> {
let mut guard = self.inner.client.lock().await;
let grpc = if let Some(grpc) = guard.as_mut() {
grpc
} else {
match re_redap_client::channel(self.inner.origin.clone()).await {
Ok(channel) => guard.get_or_insert(tonic::client::Grpc::new(channel)),
Err(err) => {
return Err(tonic::Status::unavailable(format!(
"failed to connect for analytics: {err}"
)));
}
}
};
let mut resource_attributes = vec![kv_string("service.name", "rerun-viewer")];
if let Some(analytics) = re_analytics::Analytics::global_get() {
resource_attributes.push(kv_string("analytics_id", &analytics.config().analytics_id));
}
let export_request = ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes,
dropped_attributes_count: 0,
entity_refs: Vec::new(),
}),
scope_spans: vec![ScopeSpans {
scope: None,
spans: vec![span],
schema_url: String::new(),
}],
schema_url: String::new(),
}],
};
let mut request = tonic::Request::new(export_request);
if let Some(trace_id) = trace_id
&& let Ok(value) = trace_id.to_string().parse()
{
request.metadata_mut().insert("x-request-trace-id", value);
}
grpc.ready().await.map_err(|err| {
tonic::Status::unavailable(format!("analytics channel not ready: {err}"))
})?;
let path = http::uri::PathAndQuery::from_static(EXPORT_PATH);
let codec = tonic_prost::ProstCodec::default();
let _response: tonic::Response<ExportTraceServiceResponse> =
grpc.unary(request.map(|m| m), path, codec).await?;
Ok(())
}
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum QueryType {
Static,
LatestAt,
Range,
Dataframe,
FullScan,
}
impl QueryType {
pub(crate) fn classify(query_expression: &QueryExpression) -> Self {
if query_expression.is_static() {
Self::Static
} else {
let has_latest_at = query_expression.min_latest_at().is_some();
let has_range = query_expression.max_range().is_some();
match (has_latest_at, has_range) {
(true, true) => Self::Dataframe,
(true, false) => Self::LatestAt,
(false, true) => Self::Range,
(false, false) => Self::FullScan,
}
}
}
const fn as_str(self) -> &'static str {
match self {
Self::Static => "static",
Self::LatestAt => "latest_at",
Self::Range => "range",
Self::Dataframe => "dataframe",
Self::FullScan => "full_scan",
}
}
}
#[derive(Clone, Debug)]
pub struct QueryInfo {
pub dataset_id: String,
pub query_chunks: usize,
pub query_segments: usize,
pub query_layers: usize,
pub query_columns: usize,
pub query_entities: usize,
pub query_bytes: u64,
pub query_chunks_per_segment_max: u32,
pub query_chunks_per_segment_mean: f32,
pub query_type: QueryType,
pub primary_index_name: Option<String>,
pub time_range: Range<SystemTime>,
pub time_to_first_chunk_info: Option<Duration>,
pub trace_id: Option<opentelemetry::TraceId>,
}
#[derive(Default)]
pub(crate) struct SharedFetchStats {
grpc_requests: AtomicU64,
grpc_bytes: AtomicU64,
direct_requests: AtomicU64,
direct_bytes: AtomicU64,
direct_retries_total: AtomicU64,
direct_requests_retried: AtomicU64,
direct_retry_sleep_us: AtomicU64,
direct_max_attempt: AtomicU64,
direct_original_ranges: AtomicU64,
direct_merged_ranges: AtomicU64,
}
impl SharedFetchStats {
fn snapshot(&mut self) -> TaskFetchStats {
TaskFetchStats {
grpc_requests: *self.grpc_requests.get_mut(),
grpc_bytes: *self.grpc_bytes.get_mut(),
direct_requests: *self.direct_requests.get_mut(),
direct_bytes: *self.direct_bytes.get_mut(),
direct_retries_total: *self.direct_retries_total.get_mut(),
direct_requests_retried: *self.direct_requests_retried.get_mut(),
direct_retry_sleep_us: *self.direct_retry_sleep_us.get_mut(),
direct_max_attempt: *self.direct_max_attempt.get_mut(),
direct_original_ranges: *self.direct_original_ranges.get_mut(),
direct_merged_ranges: *self.direct_merged_ranges.get_mut(),
}
}
}
#[derive(Clone)]
pub(crate) struct PendingQueryAnalytics {
inner: Arc<PendingInner>,
}
impl std::fmt::Debug for PendingQueryAnalytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PendingQueryAnalytics")
.finish_non_exhaustive()
}
}
struct PendingInner {
connection: ConnectionAnalytics,
query_info: QueryInfo,
fetch_stats: SharedFetchStats,
scan_start: web_time::Instant,
time_to_first_chunk: OnceLock<Duration>,
direct_terminal_reason: std::sync::OnceLock<DirectFetchFailureReason>,
error_kind: std::sync::OnceLock<&'static str>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub enum QueryErrorKind {
GrpcFetch,
DirectFetch,
Decode,
Other,
}
impl QueryErrorKind {
pub fn as_str(self) -> &'static str {
match self {
Self::GrpcFetch => "grpc_fetch",
Self::DirectFetch => "direct_fetch",
Self::Decode => "decode",
Self::Other => "other",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub(crate) enum DirectFetchFailureReason {
Timeout,
Http4xx,
Http5xx,
Connection,
Decode,
Other,
}
impl DirectFetchFailureReason {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Timeout => "timeout",
Self::Http4xx => "http_4xx",
Self::Http5xx => "http_5xx",
Self::Connection => "connection",
Self::Decode => "decode",
Self::Other => "other",
}
}
}
impl PendingQueryAnalytics {
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub fn record_first_chunk(&self) {
self.inner
.time_to_first_chunk
.get_or_init(|| self.inner.scan_start.elapsed());
}
pub(crate) fn fetch_stats(&self) -> &SharedFetchStats {
&self.inner.fetch_stats
}
#[cfg(not(target_arch = "wasm32"))]
pub fn record_direct_terminal_failure(&self, reason: DirectFetchFailureReason) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.direct_terminal_reason.set(reason);
}
pub fn record_error(&self, kind: QueryErrorKind) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.error_kind.set(kind.as_str());
}
}
#[derive(Default)]
#[must_use]
pub(crate) struct TaskFetchStats {
grpc_requests: u64,
grpc_bytes: u64,
direct_requests: u64,
direct_bytes: u64,
direct_retries_total: u64,
direct_requests_retried: u64,
direct_retry_sleep_us: u64,
direct_max_attempt: u64,
direct_original_ranges: u64,
direct_merged_ranges: u64,
}
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
impl TaskFetchStats {
pub fn record_grpc_fetch(&mut self, bytes: u64) {
self.grpc_requests += 1;
self.grpc_bytes += bytes;
}
pub fn record_direct_fetch(&mut self, bytes: u64) {
self.direct_requests += 1;
self.direct_bytes += bytes;
}
pub fn record_direct_retry(&mut self, sleep: Duration, attempt: u64) {
self.direct_retries_total += 1;
self.direct_retry_sleep_us += sleep.as_micros() as u64;
self.direct_max_attempt = self.direct_max_attempt.max(attempt);
}
pub fn record_direct_request_was_retried(&mut self) {
self.direct_requests_retried += 1;
}
pub fn record_direct_ranges(&mut self, original: u64, merged: u64) {
self.direct_original_ranges += original;
self.direct_merged_ranges += merged;
}
#[expect(
clippy::needless_pass_by_value,
reason = "Prevent double-counting stats"
)]
pub fn merge_from(&mut self, other: Self) {
let Self {
grpc_requests,
grpc_bytes,
direct_requests,
direct_bytes,
direct_retries_total,
direct_requests_retried,
direct_retry_sleep_us,
direct_max_attempt,
direct_original_ranges,
direct_merged_ranges,
} = other;
self.grpc_requests += grpc_requests;
self.grpc_bytes += grpc_bytes;
self.direct_requests += direct_requests;
self.direct_bytes += direct_bytes;
self.direct_retries_total += direct_retries_total;
self.direct_requests_retried += direct_requests_retried;
self.direct_retry_sleep_us += direct_retry_sleep_us;
self.direct_max_attempt = self.direct_max_attempt.max(direct_max_attempt);
self.direct_original_ranges += direct_original_ranges;
self.direct_merged_ranges += direct_merged_ranges;
}
pub fn flush_into(self, shared: &SharedFetchStats) {
macro_rules! flush_stats {
{sum $($sum_id:ident),*; max $($max_id:ident),*;} => {
let Self {
$($sum_id,)*
$($max_id,)*
} = self;
$(
if $sum_id != 0 {
shared.$sum_id
.fetch_add($sum_id, Ordering::Relaxed);
}
)+
$(
if $max_id != 0 {
shared.$max_id
.fetch_max($max_id, Ordering::Relaxed);
}
)*
};
}
flush_stats! {
sum
grpc_requests,
grpc_bytes,
direct_requests,
direct_bytes,
direct_retries_total,
direct_requests_retried,
direct_retry_sleep_us,
direct_original_ranges,
direct_merged_ranges;
max
direct_max_attempt;
};
}
pub fn try_flush_into(
self,
analytics: Option<&PendingQueryAnalytics>,
result: Result<(), QueryErrorKind>,
) {
if let Some(analytics) = analytics {
self.flush_into(analytics.fetch_stats());
if let Err(err) = result {
analytics.record_error(err);
}
}
}
}
impl Drop for PendingInner {
fn drop(&mut self) {
let Self {
connection,
query_info,
fetch_stats,
scan_start,
time_to_first_chunk,
direct_terminal_reason,
error_kind,
} = self;
let total_duration = scan_start.elapsed();
let QueryInfo {
ref dataset_id,
query_chunks,
query_segments,
query_layers,
query_columns,
query_entities,
query_bytes,
query_chunks_per_segment_max,
query_chunks_per_segment_mean,
query_type,
ref primary_index_name,
ref time_range,
time_to_first_chunk_info,
trace_id,
} = *query_info;
let fetch = fetch_stats.snapshot();
let time_to_first_chunk = time_to_first_chunk.get().copied();
let direct_terminal_reason = direct_terminal_reason.get().copied();
let error_kind = error_kind.get().copied();
let [start_ns, end_ns] = [
nanos_since_epoch(&time_range.start),
nanos_since_epoch(&time_range.end),
];
#[expect(
clippy::cast_possible_wrap,
reason = "OTLP proto uses i64 for int values"
)]
let mut attributes = vec![
kv_string("dataset_id", dataset_id),
kv_int("query_chunks", query_chunks as i64),
kv_int("query_segments", query_segments as i64),
kv_int("query_layers", query_layers as i64),
kv_int("query_columns", query_columns as i64),
kv_int("query_entities", query_entities as i64),
kv_int("query_bytes", query_bytes as i64),
kv_int(
"query_chunks_per_segment_max",
i64::from(query_chunks_per_segment_max),
),
kv_double(
"query_chunks_per_segment_mean",
f64::from(query_chunks_per_segment_mean),
),
kv_string("query_type", query_type.as_str()),
kv_int("total_duration_us", total_duration.as_micros() as i64),
kv_bool("is_success", error_kind.is_none()),
kv_int("fetch_grpc_requests", fetch.grpc_requests as i64),
kv_int("fetch_grpc_bytes", fetch.grpc_bytes as i64),
kv_int("fetch_direct_requests", fetch.direct_requests as i64),
kv_int("fetch_direct_bytes", fetch.direct_bytes as i64),
kv_int("fetch_direct_retries", fetch.direct_retries_total as i64),
kv_int(
"fetch_direct_requests_retried",
fetch.direct_requests_retried as i64,
),
kv_int(
"fetch_direct_retry_sleep_us",
fetch.direct_retry_sleep_us as i64,
),
kv_int("fetch_direct_max_attempt", fetch.direct_max_attempt as i64),
kv_int(
"fetch_direct_original_ranges",
fetch.direct_original_ranges as i64,
),
kv_int(
"fetch_direct_merged_ranges",
fetch.direct_merged_ranges as i64,
),
];
if let Some(name) = primary_index_name {
attributes.push(kv_string("primary_index_name", name));
}
if let Some(ttfci) = time_to_first_chunk_info {
attributes.push(kv_int(
"time_to_first_chunk_info_us",
ttfci.as_micros() as i64,
));
}
if let Some(ttfr) = time_to_first_chunk {
attributes.push(kv_int("time_to_first_chunk_us", ttfr.as_micros() as i64));
}
if let Some(reason) = direct_terminal_reason {
attributes.push(kv_string("fetch_direct_terminal_reason", reason.as_str()));
}
if let Some(kind) = error_kind {
attributes.push(kv_string("error_kind", kind));
}
if let Some(version) = connection.server_version() {
attributes.push(kv_string("server_version", &version));
}
let links = trace_id
.map(|id| {
vec![Link {
trace_id: id.to_bytes().to_vec(),
..Default::default()
}]
})
.unwrap_or_default();
let span = Span {
name: "cloud_query_dataset".to_owned(),
kind: SpanKind::Client.into(),
start_time_unix_nano: start_ns,
end_time_unix_nano: end_ns,
attributes,
links,
..Default::default()
};
connection.send_span(span, trace_id);
}
}
fn nanos_since_epoch(time: &SystemTime) -> u64 {
time.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
fn kv_string(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::StringValue(value.to_owned())),
}),
}
}
fn kv_int(key: &str, value: i64) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::IntValue(value)),
}),
}
}
fn kv_bool(key: &str, value: bool) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::BoolValue(value)),
}),
}
}
fn kv_double(key: &str, value: f64) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::DoubleValue(value)),
}),
}
}