use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use opentelemetry_proto::tonic::{
collector::trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
common::v1::any_value::Value,
common::v1::{AnyValue, KeyValue},
resource::v1::Resource,
trace::v1::{ResourceSpans, ScopeSpans, Span, span::Link, span::SpanKind},
};
use re_dataframe::QueryExpression;
use re_protos::cloud::v1alpha1::SystemTableKind;
use re_protos::cloud::v1alpha1::ext::ProviderDetails;
use re_uri::Origin;
use tokio::sync::OnceCell;
use web_time::{Duration, SystemTime};
const EXPORT_PATH: &str = "/opentelemetry.proto.collector.trace.v1.TraceService/Export";
#[derive(Clone)]
pub(crate) struct ConnectionAnalytics {
inner: Arc<Inner>,
}
impl std::fmt::Debug for ConnectionAnalytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectionAnalytics")
.field("origin", &self.inner.origin)
.finish_non_exhaustive()
}
}
struct Inner {
origin: Origin,
grpc: tonic::client::Grpc<re_redap_client::RedapClientInner>,
server_version: OnceCell<Option<String>>,
}
impl ConnectionAnalytics {
pub fn new(origin: Origin, client: &re_redap_client::ConnectionClient) -> Self {
Self {
inner: Arc::new(Inner {
origin,
grpc: tonic::client::Grpc::new(client.service()),
server_version: OnceCell::new(),
}),
}
}
pub fn set_server_version(&self, version: Option<String>) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.server_version.set(version);
}
fn server_version(&self) -> Option<String> {
self.inner.server_version.get().and_then(Clone::clone)
}
pub fn begin_query(
&self,
query_info: QueryInfo,
scan_start: web_time::Instant,
scan_start_wall: SystemTime,
) -> PendingQueryAnalytics {
PendingQueryAnalytics {
inner: Arc::new(PendingInner {
connection: self.clone(),
query_info,
fetch_stats: SharedFetchStats::default(),
scan_start,
scan_start_wall,
time_to_first_chunk: OnceLock::new(),
direct_terminal_reason: OnceLock::new(),
error_kind: OnceLock::new(),
}),
}
}
pub fn begin_table_query(
&self,
info: TableQueryInfo,
scan_start: web_time::Instant,
) -> PendingTableQueryAnalytics {
PendingTableQueryAnalytics {
inner: Arc::new(PendingTableInner {
connection: self.clone(),
info,
stats: SharedTableScanStats::default(),
scan_start,
time_to_first_response: OnceLock::new(),
time_to_first_batch: OnceLock::new(),
trace_id: OnceLock::new(),
error_kind: OnceLock::new(),
}),
}
}
fn send_span(&self, span: Span, trace_id: Option<opentelemetry::TraceId>) {
let this = self.clone();
let fut = async move {
if let Err(err) = this.send_span_impl(span, trace_id).await {
re_log::debug_once!(
"Failed to send analytics to Rerun Hub: {} ({})",
err.code(),
err.message()
);
}
};
#[cfg(target_arch = "wasm32")]
wasm_bindgen_futures::spawn_local(fut);
#[cfg(not(target_arch = "wasm32"))]
{
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(fut);
} else {
std::thread::Builder::new()
.name("query-analytics-sender".to_owned())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
match rt {
Ok(rt) => rt.block_on(fut),
Err(err) => {
re_log::debug_once!("Failed to create analytics runtime: {err}");
}
}
})
.ok();
}
}
}
async fn send_span_impl(
&self,
span: Span,
trace_id: Option<opentelemetry::TraceId>,
) -> tonic::Result<()> {
let mut grpc = self.inner.grpc.clone();
let mut resource_attributes = vec![kv_string("service.name", "rerun-viewer")];
if let Some(analytics) = re_analytics::Analytics::global_get() {
resource_attributes.push(kv_string("analytics_id", &analytics.config().analytics_id));
}
let export_request = ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes,
dropped_attributes_count: 0,
entity_refs: Vec::new(),
}),
scope_spans: vec![ScopeSpans {
scope: None,
spans: vec![span],
schema_url: String::new(),
}],
schema_url: String::new(),
}],
};
let mut request = tonic::Request::new(export_request);
if let Some(trace_id) = trace_id
&& let Ok(value) = trace_id.to_string().parse()
{
request.metadata_mut().insert("x-request-trace-id", value);
}
grpc.ready().await.map_err(|err| {
tonic::Status::unavailable(format!("analytics channel not ready: {err}"))
})?;
let path = http::uri::PathAndQuery::from_static(EXPORT_PATH);
let codec = tonic_prost::ProstCodec::default();
let _response: tonic::Response<ExportTraceServiceResponse> =
grpc.unary(request.map(|m| m), path, codec).await?;
Ok(())
}
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum QueryType {
Static,
LatestAt,
Range,
Dataframe,
FullScan,
}
impl QueryType {
pub(crate) fn classify(query_expression: &QueryExpression) -> Self {
if query_expression.is_static() {
Self::Static
} else {
let has_latest_at = query_expression.min_latest_at().is_some();
let has_range = query_expression.max_range().is_some();
match (has_latest_at, has_range) {
(true, true) => Self::Dataframe,
(true, false) => Self::LatestAt,
(false, true) => Self::Range,
(false, false) => Self::FullScan,
}
}
}
const fn as_str(self) -> &'static str {
match self {
Self::Static => "static",
Self::LatestAt => "latest_at",
Self::Range => "range",
Self::Dataframe => "dataframe",
Self::FullScan => "full_scan",
}
}
}
#[derive(Clone, Debug)]
pub struct QueryInfo {
pub dataset_id: String,
pub query_chunks: usize,
pub query_segments: usize,
pub query_layers: usize,
pub query_columns: usize,
pub query_entities: usize,
pub query_bytes: u64,
pub query_chunks_per_segment_min: u32,
pub query_chunks_per_segment_max: u32,
pub query_chunks_per_segment_mean: f32,
pub query_type: QueryType,
pub primary_index_name: Option<String>,
pub time_to_first_chunk_info: Option<Duration>,
pub trace_id: Option<opentelemetry::TraceId>,
}
#[derive(Default)]
pub(crate) struct SharedFetchStats {
grpc_requests: AtomicU64,
grpc_bytes: AtomicU64,
direct_requests: AtomicU64,
direct_bytes: AtomicU64,
direct_retries_total: AtomicU64,
direct_requests_retried: AtomicU64,
direct_retry_sleep_us: AtomicU64,
direct_max_attempt: AtomicU64,
direct_original_ranges: AtomicU64,
direct_merged_ranges: AtomicU64,
}
impl SharedFetchStats {
fn snapshot(&mut self) -> TaskFetchStats {
TaskFetchStats {
grpc_requests: *self.grpc_requests.get_mut(),
grpc_bytes: *self.grpc_bytes.get_mut(),
direct_requests: *self.direct_requests.get_mut(),
direct_bytes: *self.direct_bytes.get_mut(),
direct_retries_total: *self.direct_retries_total.get_mut(),
direct_requests_retried: *self.direct_requests_retried.get_mut(),
direct_retry_sleep_us: *self.direct_retry_sleep_us.get_mut(),
direct_max_attempt: *self.direct_max_attempt.get_mut(),
direct_original_ranges: *self.direct_original_ranges.get_mut(),
direct_merged_ranges: *self.direct_merged_ranges.get_mut(),
}
}
}
#[derive(Clone)]
pub(crate) struct PendingQueryAnalytics {
inner: Arc<PendingInner>,
}
impl std::fmt::Debug for PendingQueryAnalytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PendingQueryAnalytics")
.finish_non_exhaustive()
}
}
struct PendingInner {
connection: ConnectionAnalytics,
query_info: QueryInfo,
fetch_stats: SharedFetchStats,
scan_start: web_time::Instant,
scan_start_wall: SystemTime,
time_to_first_chunk: OnceLock<Duration>,
direct_terminal_reason: OnceLock<DirectFetchFailureReason>,
error_kind: OnceLock<&'static str>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub enum QueryErrorKind {
GrpcFetch,
DirectFetch,
Decode,
Other,
}
impl QueryErrorKind {
pub fn as_str(self) -> &'static str {
match self {
Self::GrpcFetch => "grpc_fetch",
Self::DirectFetch => "direct_fetch",
Self::Decode => "decode",
Self::Other => "other",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub(crate) enum DirectFetchFailureReason {
Timeout,
Http4xx,
Http5xx,
Connection,
Decode,
SourceChanged,
Other,
}
impl DirectFetchFailureReason {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Timeout => "timeout",
Self::Http4xx => "http_4xx",
Self::Http5xx => "http_5xx",
Self::Connection => "connection",
Self::Decode => "decode",
Self::SourceChanged => "source_changed",
Self::Other => "other",
}
}
}
impl PendingQueryAnalytics {
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub fn record_first_chunk(&self) {
self.inner
.time_to_first_chunk
.get_or_init(|| self.inner.scan_start.elapsed());
}
pub(crate) fn fetch_stats(&self) -> &SharedFetchStats {
&self.inner.fetch_stats
}
#[cfg(not(target_arch = "wasm32"))]
pub fn record_direct_terminal_failure(&self, reason: DirectFetchFailureReason) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.direct_terminal_reason.set(reason);
}
pub fn record_error(&self, kind: QueryErrorKind) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.error_kind.set(kind.as_str());
}
}
#[derive(Default)]
#[must_use]
pub(crate) struct TaskFetchStats {
grpc_requests: u64,
grpc_bytes: u64,
direct_requests: u64,
direct_bytes: u64,
direct_retries_total: u64,
direct_requests_retried: u64,
direct_retry_sleep_us: u64,
direct_max_attempt: u64,
direct_original_ranges: u64,
direct_merged_ranges: u64,
}
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
impl TaskFetchStats {
pub fn record_grpc_fetch(&mut self, bytes: u64) {
self.grpc_requests += 1;
self.grpc_bytes += bytes;
}
pub fn record_direct_fetch(&mut self, bytes: u64) {
self.direct_requests += 1;
self.direct_bytes += bytes;
}
pub fn record_direct_retry(&mut self, sleep: Duration, attempt: u64) {
self.direct_retries_total += 1;
self.direct_retry_sleep_us += sleep.as_micros() as u64;
self.direct_max_attempt = self.direct_max_attempt.max(attempt);
}
pub fn record_direct_request_was_retried(&mut self) {
self.direct_requests_retried += 1;
}
pub fn record_direct_ranges(&mut self, original: u64, merged: u64) {
self.direct_original_ranges += original;
self.direct_merged_ranges += merged;
}
#[expect(
clippy::needless_pass_by_value,
reason = "Prevent double-counting stats"
)]
pub fn merge_from(&mut self, other: Self) {
let Self {
grpc_requests,
grpc_bytes,
direct_requests,
direct_bytes,
direct_retries_total,
direct_requests_retried,
direct_retry_sleep_us,
direct_max_attempt,
direct_original_ranges,
direct_merged_ranges,
} = other;
self.grpc_requests += grpc_requests;
self.grpc_bytes += grpc_bytes;
self.direct_requests += direct_requests;
self.direct_bytes += direct_bytes;
self.direct_retries_total += direct_retries_total;
self.direct_requests_retried += direct_requests_retried;
self.direct_retry_sleep_us += direct_retry_sleep_us;
self.direct_max_attempt = self.direct_max_attempt.max(direct_max_attempt);
self.direct_original_ranges += direct_original_ranges;
self.direct_merged_ranges += direct_merged_ranges;
}
pub fn flush_into(self, shared: &SharedFetchStats) {
macro_rules! flush_stats {
{sum $($sum_id:ident),*; max $($max_id:ident),*;} => {
let Self {
$($sum_id,)*
$($max_id,)*
} = self;
$(
if $sum_id != 0 {
shared.$sum_id
.fetch_add($sum_id, Ordering::Relaxed);
}
)+
$(
if $max_id != 0 {
shared.$max_id
.fetch_max($max_id, Ordering::Relaxed);
}
)*
};
}
flush_stats! {
sum
grpc_requests,
grpc_bytes,
direct_requests,
direct_bytes,
direct_retries_total,
direct_requests_retried,
direct_retry_sleep_us,
direct_original_ranges,
direct_merged_ranges;
max
direct_max_attempt;
};
}
pub fn try_flush_into(
self,
analytics: Option<&PendingQueryAnalytics>,
result: Result<(), QueryErrorKind>,
) {
if let Some(analytics) = analytics {
self.flush_into(analytics.fetch_stats());
if let Err(err) = result {
analytics.record_error(err);
}
}
}
}
impl Drop for PendingInner {
fn drop(&mut self) {
let Self {
connection,
query_info,
fetch_stats,
scan_start,
scan_start_wall,
time_to_first_chunk,
direct_terminal_reason,
error_kind,
} = self;
let total_duration = scan_start.elapsed();
let scan_end_wall = SystemTime::now();
let fetch = fetch_stats.snapshot();
let time_to_first_chunk = time_to_first_chunk.get().copied();
let direct_terminal_reason = direct_terminal_reason.get().copied();
let error_kind = error_kind.get().copied();
let trace_id = query_info.trace_id;
let span = build_query_span(
query_info,
&fetch,
*scan_start_wall..scan_end_wall,
total_duration,
time_to_first_chunk,
direct_terminal_reason,
error_kind,
connection.server_version().as_deref(),
);
connection.send_span(span, trace_id);
}
}
#[expect(
clippy::too_many_arguments,
reason = "pure builder fn; grouping these would be churn without clarity"
)]
fn build_query_span(
query_info: &QueryInfo,
fetch: &TaskFetchStats,
wall_clock_range: Range<SystemTime>,
total_duration: Duration,
time_to_first_chunk: Option<Duration>,
direct_terminal_reason: Option<DirectFetchFailureReason>,
error_kind: Option<&'static str>,
server_version: Option<&str>,
) -> Span {
let QueryInfo {
ref dataset_id,
query_chunks,
query_segments,
query_layers,
query_columns,
query_entities,
query_bytes,
query_chunks_per_segment_min,
query_chunks_per_segment_max,
query_chunks_per_segment_mean,
query_type,
ref primary_index_name,
time_to_first_chunk_info,
trace_id,
} = *query_info;
let start_time_unix_nano = nanos_since_epoch(&wall_clock_range.start);
let end_time_unix_nano = nanos_since_epoch(&wall_clock_range.end);
#[expect(
clippy::cast_possible_wrap,
reason = "OTLP proto uses i64 for int values"
)]
let mut attributes = vec![
kv_string("dataset_id", dataset_id),
kv_int("query_chunks", query_chunks as i64),
kv_int("query_segments", query_segments as i64),
kv_int("query_layers", query_layers as i64),
kv_int("query_columns", query_columns as i64),
kv_int("query_entities", query_entities as i64),
kv_int("query_bytes", query_bytes as i64),
kv_int(
"query_chunks_per_segment_min",
i64::from(query_chunks_per_segment_min),
),
kv_int(
"query_chunks_per_segment_max",
i64::from(query_chunks_per_segment_max),
),
kv_double(
"query_chunks_per_segment_mean",
f64::from(query_chunks_per_segment_mean),
),
kv_string("query_type", query_type.as_str()),
kv_int("total_duration_us", total_duration.as_micros() as i64),
kv_bool("is_success", error_kind.is_none()),
kv_int("fetch_grpc_requests", fetch.grpc_requests as i64),
kv_int("fetch_grpc_bytes", fetch.grpc_bytes as i64),
kv_int("fetch_direct_requests", fetch.direct_requests as i64),
kv_int("fetch_direct_bytes", fetch.direct_bytes as i64),
kv_int("fetch_direct_retries", fetch.direct_retries_total as i64),
kv_int(
"fetch_direct_requests_retried",
fetch.direct_requests_retried as i64,
),
kv_int(
"fetch_direct_retry_sleep_us",
fetch.direct_retry_sleep_us as i64,
),
kv_int("fetch_direct_max_attempt", fetch.direct_max_attempt as i64),
kv_int(
"fetch_direct_original_ranges",
fetch.direct_original_ranges as i64,
),
kv_int(
"fetch_direct_merged_ranges",
fetch.direct_merged_ranges as i64,
),
];
if let Some(name) = primary_index_name {
attributes.push(kv_string("primary_index_name", name));
}
if let Some(ttfci) = time_to_first_chunk_info {
attributes.push(kv_int(
"time_to_first_chunk_info_us",
ttfci.as_micros() as i64,
));
}
if let Some(ttfr) = time_to_first_chunk {
attributes.push(kv_int("time_to_first_chunk_us", ttfr.as_micros() as i64));
}
if let Some(reason) = direct_terminal_reason {
attributes.push(kv_string("fetch_direct_terminal_reason", reason.as_str()));
}
if let Some(kind) = error_kind {
attributes.push(kv_string("error_kind", kind));
}
if let Some(version) = server_version {
attributes.push(kv_string("server_version", version));
}
let links = trace_id
.map(|id| {
vec![Link {
trace_id: id.to_bytes().to_vec(),
..Default::default()
}]
})
.unwrap_or_default();
Span {
name: "cloud_query_dataset".to_owned(),
kind: SpanKind::Client.into(),
start_time_unix_nano,
end_time_unix_nano,
attributes,
links,
..Default::default()
}
}
#[derive(Clone, Copy, Debug)]
pub enum TableKind {
Lance,
SystemEntries,
SystemNamespaces,
Unknown,
}
impl TableKind {
const fn as_str(self) -> &'static str {
match self {
Self::Lance => "lance",
Self::SystemEntries => "system_entries",
Self::SystemNamespaces => "system_namespaces",
Self::Unknown => "unknown",
}
}
}
impl From<&ProviderDetails> for TableKind {
fn from(details: &ProviderDetails) -> Self {
match details {
ProviderDetails::LanceTable(_) => Self::Lance,
ProviderDetails::SystemTable(t) => match t.kind {
SystemTableKind::Entries => Self::SystemEntries,
SystemTableKind::Namespaces => Self::SystemNamespaces,
SystemTableKind::Unspecified => Self::Unknown,
},
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum TableQueryCaller {
CatalogResolver,
EntriesTable,
BrowserDetailView,
}
impl TableQueryCaller {
const fn as_str(self) -> &'static str {
match self {
Self::CatalogResolver => "catalog_resolver",
Self::EntriesTable => "entries_table",
Self::BrowserDetailView => "browser_detail_view",
}
}
}
#[derive(Clone, Debug)]
pub struct TableQueryInfo {
pub table_id: String,
pub table_kind: TableKind,
pub caller: TableQueryCaller,
pub schema_total_columns: u32,
pub projected_columns: u32,
pub has_limit: bool,
pub limit_value: Option<u64>,
pub time_range: Range<SystemTime>,
}
#[derive(Default)]
pub(crate) struct SharedTableScanStats {
grpc_requests: AtomicU64,
batches: AtomicU64,
rows_returned: AtomicU64,
bytes_returned: AtomicU64,
}
#[derive(Default, Clone, Copy)]
pub(crate) struct TableScanStatsSnapshot {
pub grpc_requests: u64,
pub batches: u64,
pub rows_returned: u64,
pub bytes_returned: u64,
}
impl SharedTableScanStats {
fn snapshot(&self) -> TableScanStatsSnapshot {
TableScanStatsSnapshot {
grpc_requests: self.grpc_requests.load(Ordering::Relaxed),
batches: self.batches.load(Ordering::Relaxed),
rows_returned: self.rows_returned.load(Ordering::Relaxed),
bytes_returned: self.bytes_returned.load(Ordering::Relaxed),
}
}
}
#[derive(Clone)]
pub(crate) struct PendingTableQueryAnalytics {
inner: Arc<PendingTableInner>,
}
impl std::fmt::Debug for PendingTableQueryAnalytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PendingTableQueryAnalytics")
.finish_non_exhaustive()
}
}
struct PendingTableInner {
connection: ConnectionAnalytics,
info: TableQueryInfo,
stats: SharedTableScanStats,
scan_start: web_time::Instant,
time_to_first_response: OnceLock<Duration>,
time_to_first_batch: OnceLock<Duration>,
trace_id: OnceLock<opentelemetry::TraceId>,
error_kind: OnceLock<&'static str>,
}
impl PendingTableQueryAnalytics {
pub fn record_trace_id(&self, trace_id: opentelemetry::TraceId) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.trace_id.set(trace_id);
}
pub fn record_first_response(&self) {
self.inner
.time_to_first_response
.get_or_init(|| self.inner.scan_start.elapsed());
}
pub fn record_first_batch(&self) {
self.inner
.time_to_first_batch
.get_or_init(|| self.inner.scan_start.elapsed());
}
pub fn record_batch(&self, num_rows: u64, num_bytes: u64) {
self.inner
.stats
.grpc_requests
.fetch_add(1, Ordering::Relaxed);
self.inner.stats.batches.fetch_add(1, Ordering::Relaxed);
if num_rows != 0 {
self.inner
.stats
.rows_returned
.fetch_add(num_rows, Ordering::Relaxed);
}
if num_bytes != 0 {
self.inner
.stats
.bytes_returned
.fetch_add(num_bytes, Ordering::Relaxed);
}
}
pub fn record_error(&self, kind: QueryErrorKind) {
#[expect(clippy::let_underscore_must_use)]
let _ = self.inner.error_kind.set(kind.as_str());
}
#[cfg(test)]
pub(crate) fn build_span_for_test(&self) -> Span {
self.inner.build_span()
}
}
impl PendingTableInner {
fn build_span(&self) -> Span {
let total_duration = self.scan_start.elapsed();
let scan_end_wall = web_time::SystemTime::now();
let stats = self.stats.snapshot();
build_table_query_span(
&self.info,
stats,
self.info.time_range.start..scan_end_wall,
total_duration,
self.time_to_first_response.get().copied(),
self.time_to_first_batch.get().copied(),
self.trace_id.get().copied(),
self.error_kind.get().copied(),
self.connection.server_version().as_deref(),
)
}
}
impl Drop for PendingTableInner {
fn drop(&mut self) {
let span = self.build_span();
let trace_id = self.trace_id.get().copied();
self.connection.send_span(span, trace_id);
}
}
#[expect(
clippy::too_many_arguments,
reason = "pure builder fn; grouping these would be churn without clarity"
)]
pub(crate) fn build_table_query_span(
info: &TableQueryInfo,
stats: TableScanStatsSnapshot,
wall_clock_range: Range<SystemTime>,
total_duration: Duration,
time_to_first_response: Option<Duration>,
time_to_first_batch: Option<Duration>,
trace_id: Option<opentelemetry::TraceId>,
error_kind: Option<&'static str>,
server_version: Option<&str>,
) -> Span {
let TableQueryInfo {
ref table_id,
table_kind,
caller,
schema_total_columns,
projected_columns,
has_limit,
limit_value,
time_range: _,
} = *info;
let start_time_unix_nano = nanos_since_epoch(&wall_clock_range.start);
let end_time_unix_nano = nanos_since_epoch(&wall_clock_range.end);
#[expect(
clippy::cast_possible_wrap,
reason = "OTLP proto uses i64 for int values"
)]
let mut attributes = vec![
kv_string("table_id", table_id),
kv_string("table_kind", table_kind.as_str()),
kv_string("caller", caller.as_str()),
kv_int("schema_total_columns", i64::from(schema_total_columns)),
kv_int("projected_columns", i64::from(projected_columns)),
kv_bool("has_limit", has_limit),
kv_bool("is_success", error_kind.is_none()),
kv_int("total_duration_us", total_duration.as_micros() as i64),
kv_int("fetch_grpc_requests", stats.grpc_requests as i64),
kv_int("num_record_batches", stats.batches as i64),
kv_int("rows_returned", stats.rows_returned as i64),
kv_int("bytes_returned", stats.bytes_returned as i64),
];
if let Some(value) = limit_value {
#[expect(
clippy::cast_possible_wrap,
reason = "OTLP proto uses i64 for int values"
)]
attributes.push(kv_int("limit_value", value as i64));
}
if let Some(ttfr) = time_to_first_response {
attributes.push(kv_int("time_to_first_response_us", ttfr.as_micros() as i64));
}
if let Some(ttfb) = time_to_first_batch {
attributes.push(kv_int("time_to_first_batch_us", ttfb.as_micros() as i64));
}
if let Some(kind) = error_kind {
attributes.push(kv_string("error_kind", kind));
}
if let Some(version) = server_version {
attributes.push(kv_string("server_version", version));
}
let links = trace_id
.map(|id| {
vec![Link {
trace_id: id.to_bytes().to_vec(),
..Default::default()
}]
})
.unwrap_or_default();
Span {
name: "cloud_scan_table".to_owned(),
kind: SpanKind::Client.into(),
start_time_unix_nano,
end_time_unix_nano,
attributes,
links,
..Default::default()
}
}
fn nanos_since_epoch(time: &SystemTime) -> u64 {
time.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
fn kv_string(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::StringValue(value.to_owned())),
}),
}
}
fn kv_int(key: &str, value: i64) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::IntValue(value)),
}),
}
}
fn kv_bool(key: &str, value: bool) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::BoolValue(value)),
}),
}
}
fn kv_double(key: &str, value: f64) -> KeyValue {
KeyValue {
key: key.to_owned(),
value: Some(AnyValue {
value: Some(Value::DoubleValue(value)),
}),
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
fn dummy_query_info() -> QueryInfo {
QueryInfo {
dataset_id: "ds-123".to_owned(),
query_chunks: 42,
query_segments: 5,
query_layers: 2,
query_columns: 7,
query_entities: 3,
query_bytes: 1234,
query_chunks_per_segment_min: 4,
query_chunks_per_segment_max: 12,
query_chunks_per_segment_mean: 8.4,
query_type: QueryType::LatestAt,
primary_index_name: None,
time_to_first_chunk_info: None,
trace_id: None,
}
}
fn attribute_keys(span: &Span) -> HashSet<&str> {
let keys: HashSet<_> = span.attributes.iter().map(|kv| kv.key.as_str()).collect();
re_log::debug_assert_eq!(
keys.len(),
span.attributes.len(),
"span contains duplicate attribute keys"
);
keys
}
fn find_int(span: &Span, key: &str) -> Option<i64> {
span.attributes
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| match kv.value.as_ref()?.value.as_ref()? {
Value::IntValue(i) => Some(*i),
_ => None,
})
}
fn find_string<'a>(span: &'a Span, key: &str) -> Option<&'a str> {
span.attributes
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| match kv.value.as_ref()?.value.as_ref()? {
Value::StringValue(s) => Some(s.as_str()),
_ => None,
})
}
fn find_double(span: &Span, key: &str) -> Option<f64> {
span.attributes
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| match kv.value.as_ref()?.value.as_ref()? {
Value::DoubleValue(d) => Some(*d),
_ => None,
})
}
fn find_bool(span: &Span, key: &str) -> Option<bool> {
span.attributes
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| match kv.value.as_ref()?.value.as_ref()? {
Value::BoolValue(b) => Some(*b),
_ => None,
})
}
const REQUIRED_KEYS: &[&str] = &[
"dataset_id",
"query_chunks",
"query_segments",
"query_layers",
"query_columns",
"query_entities",
"query_bytes",
"query_chunks_per_segment_min",
"query_chunks_per_segment_max",
"query_chunks_per_segment_mean",
"query_type",
"total_duration_us",
"is_success",
"fetch_grpc_requests",
"fetch_grpc_bytes",
"fetch_direct_requests",
"fetch_direct_bytes",
"fetch_direct_retries",
"fetch_direct_requests_retried",
"fetch_direct_retry_sleep_us",
"fetch_direct_max_attempt",
"fetch_direct_original_ranges",
"fetch_direct_merged_ranges",
];
#[test]
fn build_query_span_minimal_emits_only_required_attributes() {
let qi = dummy_query_info();
let fetch = TaskFetchStats::default();
let span = build_query_span(
&qi,
&fetch,
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH + Duration::from_secs(1),
Duration::from_micros(500),
None,
None,
None,
None,
);
assert_eq!(span.name, "cloud_query_dataset");
assert_eq!(span.kind, i32::from(SpanKind::Client));
assert!(span.links.is_empty());
let expected: HashSet<&str> = REQUIRED_KEYS.iter().copied().collect();
let actual = attribute_keys(&span);
assert_eq!(
actual,
expected,
"extra/missing attribute keys: {:?}",
actual.symmetric_difference(&expected).collect::<Vec<_>>()
);
assert_eq!(find_string(&span, "dataset_id"), Some("ds-123"));
assert_eq!(find_int(&span, "query_chunks"), Some(42));
assert_eq!(find_int(&span, "query_chunks_per_segment_min"), Some(4));
assert_eq!(find_int(&span, "query_chunks_per_segment_max"), Some(12));
assert_eq!(
find_double(&span, "query_chunks_per_segment_mean"),
Some(f64::from(8.4_f32))
);
assert_eq!(find_string(&span, "query_type"), Some("latest_at"));
assert_eq!(find_int(&span, "total_duration_us"), Some(500));
assert_eq!(find_bool(&span, "is_success"), Some(true));
}
#[test]
fn build_query_span_records_fetch_stats() {
let qi = dummy_query_info();
let mut fetch = TaskFetchStats::default();
fetch.record_grpc_fetch(2_000);
fetch.record_grpc_fetch(3_000);
fetch.record_direct_fetch(10_000);
fetch.record_direct_retry(Duration::from_millis(5), 2);
fetch.record_direct_retry(Duration::from_millis(7), 3);
fetch.record_direct_request_was_retried();
fetch.record_direct_ranges(8, 4);
let span = build_query_span(
&qi,
&fetch,
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH + Duration::from_secs(1),
Duration::from_micros(1_000),
None,
None,
None,
None,
);
assert_eq!(find_int(&span, "fetch_grpc_requests"), Some(2));
assert_eq!(find_int(&span, "fetch_grpc_bytes"), Some(5_000));
assert_eq!(find_int(&span, "fetch_direct_requests"), Some(1));
assert_eq!(find_int(&span, "fetch_direct_bytes"), Some(10_000));
assert_eq!(find_int(&span, "fetch_direct_retries"), Some(2));
assert_eq!(find_int(&span, "fetch_direct_requests_retried"), Some(1));
assert_eq!(find_int(&span, "fetch_direct_retry_sleep_us"), Some(12_000));
assert_eq!(find_int(&span, "fetch_direct_max_attempt"), Some(3));
assert_eq!(find_int(&span, "fetch_direct_original_ranges"), Some(8));
assert_eq!(find_int(&span, "fetch_direct_merged_ranges"), Some(4));
}
#[test]
fn build_query_span_emits_all_optional_attributes_when_present() {
let trace_id = opentelemetry::TraceId::from_bytes([7u8; 16]);
let mut qi = dummy_query_info();
qi.primary_index_name = Some("log_time".to_owned());
qi.time_to_first_chunk_info = Some(Duration::from_micros(123));
qi.trace_id = Some(trace_id);
let span = build_query_span(
&qi,
&TaskFetchStats::default(),
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH + Duration::from_secs(1),
Duration::from_micros(999),
Some(Duration::from_micros(456)),
Some(DirectFetchFailureReason::Http5xx),
Some(QueryErrorKind::DirectFetch.as_str()),
Some("redap-1.2.3"),
);
let optional = [
"primary_index_name",
"time_to_first_chunk_info_us",
"time_to_first_chunk_us",
"fetch_direct_terminal_reason",
"error_kind",
"server_version",
];
let keys = attribute_keys(&span);
for k in optional {
assert!(keys.contains(k), "missing optional attribute: {k}");
}
assert_eq!(find_bool(&span, "is_success"), Some(false));
assert_eq!(find_string(&span, "primary_index_name"), Some("log_time"));
assert_eq!(find_int(&span, "time_to_first_chunk_info_us"), Some(123));
assert_eq!(find_int(&span, "time_to_first_chunk_us"), Some(456));
assert_eq!(
find_string(&span, "fetch_direct_terminal_reason"),
Some("http_5xx")
);
assert_eq!(find_string(&span, "error_kind"), Some("direct_fetch"));
assert_eq!(find_string(&span, "server_version"), Some("redap-1.2.3"));
assert_eq!(span.links.len(), 1);
assert_eq!(span.links[0].trace_id, trace_id.to_bytes().to_vec());
}
#[test]
fn build_query_span_uses_wall_clock_range() {
let qi = dummy_query_info();
let start = SystemTime::UNIX_EPOCH + Duration::from_millis(2_000);
let end = SystemTime::UNIX_EPOCH + Duration::from_millis(2_500);
let span = build_query_span(
&qi,
&TaskFetchStats::default(),
start..end,
Duration::from_micros(0),
None,
None,
None,
None,
);
assert_eq!(span.start_time_unix_nano, 2_000_000_000);
assert_eq!(span.end_time_unix_nano, 2_500_000_000);
}
}
#[cfg(test)]
mod table_query_tests {
use std::collections::HashSet;
use re_protos::cloud::v1alpha1::ext::{LanceTable, ProviderDetails, SystemTable};
use super::*;
fn lance_provider_details() -> ProviderDetails {
let proto = re_protos::cloud::v1alpha1::LanceTable {
table_url: "s3://bucket/path".to_owned(),
};
ProviderDetails::LanceTable(LanceTable::try_from(proto).unwrap())
}
fn dummy_table_query_info() -> TableQueryInfo {
TableQueryInfo {
table_id: "tbl-42".to_owned(),
table_kind: TableKind::Lance,
caller: TableQueryCaller::BrowserDetailView,
schema_total_columns: 12,
projected_columns: 5,
has_limit: false,
limit_value: None,
time_range: SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH + Duration::from_secs(1),
}
}
fn empty_stats() -> TableScanStatsSnapshot {
TableScanStatsSnapshot::default()
}
fn attribute_keys(span: &Span) -> HashSet<&str> {
let keys: HashSet<_> = span.attributes.iter().map(|kv| kv.key.as_str()).collect();
assert_eq!(
keys.len(),
span.attributes.len(),
"span contains duplicate attribute keys"
);
keys
}
fn find_int(span: &Span, key: &str) -> Option<i64> {
span.attributes
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| match kv.value.as_ref()?.value.as_ref()? {
Value::IntValue(i) => Some(*i),
_ => None,
})
}
fn find_string<'a>(span: &'a Span, key: &str) -> Option<&'a str> {
span.attributes
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| match kv.value.as_ref()?.value.as_ref()? {
Value::StringValue(s) => Some(s.as_str()),
_ => None,
})
}
fn find_bool(span: &Span, key: &str) -> Option<bool> {
span.attributes
.iter()
.find(|kv| kv.key == key)
.and_then(|kv| match kv.value.as_ref()?.value.as_ref()? {
Value::BoolValue(b) => Some(*b),
_ => None,
})
}
const REQUIRED_KEYS: &[&str] = &[
"table_id",
"table_kind",
"caller",
"schema_total_columns",
"projected_columns",
"has_limit",
"is_success",
"total_duration_us",
"fetch_grpc_requests",
"num_record_batches",
"rows_returned",
"bytes_returned",
];
#[test]
fn build_table_query_span_minimal_emits_only_required_attributes() {
let info = dummy_table_query_info();
let span = build_table_query_span(
&info,
empty_stats(),
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH + Duration::from_secs(1),
Duration::from_micros(500),
None,
None,
None,
None,
None,
);
assert_eq!(span.name, "cloud_scan_table");
assert_eq!(span.kind, i32::from(SpanKind::Client));
assert!(span.links.is_empty());
let expected: HashSet<&str> = REQUIRED_KEYS.iter().copied().collect();
let actual = attribute_keys(&span);
assert_eq!(
actual,
expected,
"extra/missing attribute keys: {:?}",
actual.symmetric_difference(&expected).collect::<Vec<_>>()
);
assert_eq!(find_string(&span, "table_id"), Some("tbl-42"));
assert_eq!(find_string(&span, "table_kind"), Some("lance"));
assert_eq!(find_string(&span, "caller"), Some("browser_detail_view"));
assert_eq!(find_int(&span, "schema_total_columns"), Some(12));
assert_eq!(find_int(&span, "projected_columns"), Some(5));
assert_eq!(find_bool(&span, "has_limit"), Some(false));
assert_eq!(find_bool(&span, "is_success"), Some(true));
assert_eq!(find_int(&span, "total_duration_us"), Some(500));
}
#[test]
fn build_table_query_span_records_scan_stats() {
let info = dummy_table_query_info();
let stats = TableScanStatsSnapshot {
grpc_requests: 7,
batches: 7,
rows_returned: 12_345,
bytes_returned: 4_567_890,
};
let span = build_table_query_span(
&info,
stats,
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH + Duration::from_secs(1),
Duration::from_micros(2_000),
None,
None,
None,
None,
None,
);
assert_eq!(find_int(&span, "fetch_grpc_requests"), Some(7));
assert_eq!(find_int(&span, "num_record_batches"), Some(7));
assert_eq!(find_int(&span, "rows_returned"), Some(12_345));
assert_eq!(find_int(&span, "bytes_returned"), Some(4_567_890));
}
#[test]
fn build_table_query_span_emits_optional_attributes_when_present() {
let trace_id = opentelemetry::TraceId::from_bytes([3u8; 16]);
let mut info = dummy_table_query_info();
info.has_limit = true;
info.limit_value = Some(500);
let span = build_table_query_span(
&info,
empty_stats(),
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH + Duration::from_secs(1),
Duration::from_micros(1_000),
Some(Duration::from_micros(50)),
Some(Duration::from_micros(75)),
Some(trace_id),
Some(QueryErrorKind::Decode.as_str()),
Some("redap-9.9.9"),
);
let optional = [
"limit_value",
"time_to_first_response_us",
"time_to_first_batch_us",
"error_kind",
"server_version",
];
let keys = attribute_keys(&span);
for k in optional {
assert!(keys.contains(k), "missing optional attribute: {k}");
}
assert_eq!(find_bool(&span, "is_success"), Some(false));
assert_eq!(find_int(&span, "limit_value"), Some(500));
assert_eq!(find_int(&span, "time_to_first_response_us"), Some(50));
assert_eq!(find_int(&span, "time_to_first_batch_us"), Some(75));
assert_eq!(find_string(&span, "error_kind"), Some("decode"));
assert_eq!(find_string(&span, "server_version"), Some("redap-9.9.9"));
assert_eq!(span.links.len(), 1);
assert_eq!(span.links[0].trace_id, trace_id.to_bytes().to_vec());
}
#[test]
fn build_table_query_span_uses_wall_clock_range() {
let info = dummy_table_query_info();
let start = SystemTime::UNIX_EPOCH + Duration::from_millis(2_000);
let end = SystemTime::UNIX_EPOCH + Duration::from_millis(2_500);
let span = build_table_query_span(
&info,
empty_stats(),
start..end,
Duration::from_micros(0),
None,
None,
None,
None,
None,
);
assert_eq!(span.start_time_unix_nano, 2_000_000_000);
assert_eq!(span.end_time_unix_nano, 2_500_000_000);
}
#[test]
fn build_table_query_span_records_table_kind_and_caller_strings() {
let cases = [
(TableKind::Lance, "lance"),
(TableKind::SystemEntries, "system_entries"),
(TableKind::SystemNamespaces, "system_namespaces"),
(TableKind::Unknown, "unknown"),
];
for (kind, expected) in cases {
let mut info = dummy_table_query_info();
info.table_kind = kind;
let span = build_table_query_span(
&info,
empty_stats(),
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH,
Duration::ZERO,
None,
None,
None,
None,
None,
);
assert_eq!(find_string(&span, "table_kind"), Some(expected));
}
let cases = [
(TableQueryCaller::CatalogResolver, "catalog_resolver"),
(TableQueryCaller::EntriesTable, "entries_table"),
(TableQueryCaller::BrowserDetailView, "browser_detail_view"),
];
for (caller, expected) in cases {
let mut info = dummy_table_query_info();
info.caller = caller;
let span = build_table_query_span(
&info,
empty_stats(),
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH,
Duration::ZERO,
None,
None,
None,
None,
None,
);
assert_eq!(find_string(&span, "caller"), Some(expected));
}
}
#[test]
fn build_table_query_span_no_limit_value_when_no_limit() {
let info = dummy_table_query_info();
let span = build_table_query_span(
&info,
empty_stats(),
SystemTime::UNIX_EPOCH..SystemTime::UNIX_EPOCH,
Duration::ZERO,
None,
None,
None,
None,
None,
);
assert!(!attribute_keys(&span).contains("limit_value"));
}
#[test]
fn table_kind_from_lance_provider() {
assert!(matches!(
TableKind::from(&lance_provider_details()),
TableKind::Lance
));
}
#[test]
fn table_kind_from_system_entries_provider() {
let pd = ProviderDetails::SystemTable(SystemTable {
kind: SystemTableKind::Entries,
});
assert!(matches!(TableKind::from(&pd), TableKind::SystemEntries));
}
#[test]
fn table_kind_from_system_namespaces_provider() {
let pd = ProviderDetails::SystemTable(SystemTable {
kind: SystemTableKind::Namespaces,
});
assert!(matches!(TableKind::from(&pd), TableKind::SystemNamespaces));
}
#[test]
fn table_kind_from_system_unspecified_falls_back_to_unknown() {
let pd = ProviderDetails::SystemTable(SystemTable {
kind: SystemTableKind::Unspecified,
});
assert!(matches!(TableKind::from(&pd), TableKind::Unknown));
}
fn make_pending() -> PendingTableQueryAnalytics {
let origin: Origin = "rerun+http://localhost:51234".parse().unwrap();
let client = re_redap_client::ConnectionClient::new_disconnected();
let analytics = ConnectionAnalytics::new(origin, &client);
analytics.begin_table_query(dummy_table_query_info(), web_time::Instant::now())
}
#[tokio::test]
async fn record_first_response_is_once_only() {
let pending = make_pending();
pending.record_first_response();
let first = pending.inner.time_to_first_response.get().copied().unwrap();
std::thread::sleep(Duration::from_millis(2));
pending.record_first_response();
let second = pending.inner.time_to_first_response.get().copied().unwrap();
assert_eq!(first, second, "second call must not overwrite");
}
#[tokio::test]
async fn record_first_batch_is_once_only() {
let pending = make_pending();
pending.record_first_batch();
let first = pending.inner.time_to_first_batch.get().copied().unwrap();
std::thread::sleep(Duration::from_millis(2));
pending.record_first_batch();
let second = pending.inner.time_to_first_batch.get().copied().unwrap();
assert_eq!(first, second);
}
#[tokio::test]
async fn record_error_is_once_only() {
let pending = make_pending();
pending.record_error(QueryErrorKind::GrpcFetch);
pending.record_error(QueryErrorKind::Decode);
assert_eq!(
pending.inner.error_kind.get().copied(),
Some(QueryErrorKind::GrpcFetch.as_str())
);
}
#[tokio::test]
async fn record_trace_id_is_once_only() {
let pending = make_pending();
let first = opentelemetry::TraceId::from_bytes([1u8; 16]);
let second = opentelemetry::TraceId::from_bytes([2u8; 16]);
pending.record_trace_id(first);
pending.record_trace_id(second);
assert_eq!(pending.inner.trace_id.get().copied(), Some(first));
}
#[tokio::test]
async fn record_batch_accumulates_across_calls() {
let pending = make_pending();
pending.record_batch(100, 1_000);
pending.record_batch(50, 500);
pending.record_batch(0, 0); let stats = pending.inner.stats.snapshot();
assert_eq!(stats.grpc_requests, 3);
assert_eq!(stats.batches, 3);
assert_eq!(stats.rows_returned, 150);
assert_eq!(stats.bytes_returned, 1_500);
}
}