use std::fmt;
use std::str::FromStr as _;
use std::sync::Arc;
use std::time::Duration;
use delta_kernel_derive::internal_api;
use strum::{AsRefStr, Display as StrumDisplay, EnumString, IntoStaticStr};
use tracing::field::{Field, Visit};
use tracing::span::Attributes;
use tracing::warn;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct MetricId(pub(crate) Uuid);
impl MetricId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
pub fn as_bytes(&self) -> [u8; 16] {
*self.0.as_bytes()
}
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
#[derive(Default)]
struct V(Uuid);
impl Visit for V {
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
if field.name() == "operation_id" {
let s = format!("{value:?}");
match Uuid::from_str(&s) {
Ok(u) => self.0 = u,
Err(e) => warn!("Invalid uuid '{s}' on span: {e}. Using default."),
}
}
}
}
let mut v = V::default();
attrs.record(&mut v);
Self(v.0)
}
}
impl Default for MetricId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for MetricId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone)]
pub enum MetricEvent {
LogSegmentLoadSuccess(LogSegmentLoadSuccess),
LogSegmentLoadFailure(LogSegmentLoadFailure),
ProtocolMetadataLoadSuccess(ProtocolMetadataLoadSuccess),
ProtocolMetadataLoadFailure(ProtocolMetadataLoadFailure),
SnapshotBuildSuccess(SnapshotBuildSuccess),
SnapshotBuildFailure(SnapshotBuildFailure),
TransactionCommitSuccess(TransactionCommitSuccess),
TransactionCommitFailure(TransactionCommitFailure),
DomainMetadataLoadSuccess(DomainMetadataLoadSuccess),
DomainMetadataLoadFailure,
SetTransactionLoadSuccess(SetTransactionLoadSuccess),
SetTransactionLoadFailure,
CrcReadSuccess(CrcReadSuccess),
CrcReadFailure,
JsonReadCompleted(JsonReadCompleted),
ParquetReadCompleted(ParquetReadCompleted),
ScanMetadataCompleted(ScanMetadataCompleted),
StorageListCompleted(StorageListCompleted),
StorageReadCompleted(StorageReadCompleted),
StorageCopyCompleted(StorageCopyCompleted),
}
impl MetricEvent {
pub(crate) fn set_duration_if_applicable(&mut self, d: Duration) {
match self {
Self::LogSegmentLoadSuccess(e) => e.set_duration(d),
Self::ProtocolMetadataLoadSuccess(e) => e.set_duration(d),
Self::SnapshotBuildSuccess(e) => e.set_duration(d),
Self::TransactionCommitSuccess(e) => e.set_duration(d),
Self::DomainMetadataLoadSuccess(e) => e.set_duration(d),
Self::SetTransactionLoadSuccess(e) => e.set_duration(d),
Self::CrcReadSuccess(e) => e.set_duration(d),
Self::LogSegmentLoadFailure(_)
| Self::ProtocolMetadataLoadFailure(_)
| Self::SnapshotBuildFailure(_)
| Self::TransactionCommitFailure(_)
| Self::DomainMetadataLoadFailure
| Self::SetTransactionLoadFailure
| Self::CrcReadFailure
| Self::ScanMetadataCompleted(_)
| Self::StorageListCompleted(_)
| Self::StorageReadCompleted(_)
| Self::StorageCopyCompleted(_)
| Self::JsonReadCompleted(_)
| Self::ParquetReadCompleted(_) => {}
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match self {
Self::LogSegmentLoadSuccess(e) => e.record_u64(name, value),
Self::SnapshotBuildSuccess(e) => e.record_u64(name, value),
Self::TransactionCommitSuccess(e) => e.record_u64(name, value),
Self::DomainMetadataLoadSuccess(e) => e.record_u64(name, value),
Self::CrcReadSuccess(e) => e.record_u64(name, value),
Self::ProtocolMetadataLoadSuccess(_) => Err(ProtocolMetadataLoadSuccess::SPAN_NAME),
Self::SetTransactionLoadSuccess(_) => Err(SetTransactionLoadSuccess::SPAN_NAME),
Self::ScanMetadataCompleted(_) => Err(ScanMetadataCompleted::SPAN_NAME),
Self::JsonReadCompleted(_) => Err(JsonReadCompleted::SPAN_NAME),
Self::ParquetReadCompleted(_) => Err(ParquetReadCompleted::SPAN_NAME),
Self::StorageListCompleted(_)
| Self::StorageReadCompleted(_)
| Self::StorageCopyCompleted(_) => Err(STORAGE_SPAN),
Self::LogSegmentLoadFailure(_) => Err(LogSegmentLoadSuccess::SPAN_NAME),
Self::ProtocolMetadataLoadFailure(_) => Err(ProtocolMetadataLoadSuccess::SPAN_NAME),
Self::SnapshotBuildFailure(_) => Err(SnapshotBuildSuccess::SPAN_NAME),
Self::TransactionCommitFailure(_) => Err(TransactionCommitSuccess::SPAN_NAME),
Self::DomainMetadataLoadFailure => Err(DomainMetadataLoadSuccess::SPAN_NAME),
Self::SetTransactionLoadFailure => Err(SetTransactionLoadSuccess::SPAN_NAME),
Self::CrcReadFailure => Err(CrcReadSuccess::SPAN_NAME),
}
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match self {
Self::LogSegmentLoadSuccess(e) => e.record_bool(name, value),
Self::TransactionCommitSuccess(e) => e.record_bool(name, value),
Self::DomainMetadataLoadSuccess(e) => e.record_bool(name, value),
Self::SetTransactionLoadSuccess(e) => e.record_bool(name, value),
Self::ProtocolMetadataLoadSuccess(_) => Err(ProtocolMetadataLoadSuccess::SPAN_NAME),
Self::SnapshotBuildSuccess(_) => Err(SnapshotBuildSuccess::SPAN_NAME),
Self::CrcReadSuccess(_) => Err(CrcReadSuccess::SPAN_NAME),
Self::ScanMetadataCompleted(_) => Err(ScanMetadataCompleted::SPAN_NAME),
Self::JsonReadCompleted(_) => Err(JsonReadCompleted::SPAN_NAME),
Self::ParquetReadCompleted(_) => Err(ParquetReadCompleted::SPAN_NAME),
Self::StorageListCompleted(_)
| Self::StorageReadCompleted(_)
| Self::StorageCopyCompleted(_) => Err(STORAGE_SPAN),
Self::LogSegmentLoadFailure(_) => Err(LogSegmentLoadSuccess::SPAN_NAME),
Self::ProtocolMetadataLoadFailure(_) => Err(ProtocolMetadataLoadSuccess::SPAN_NAME),
Self::SnapshotBuildFailure(_) => Err(SnapshotBuildSuccess::SPAN_NAME),
Self::TransactionCommitFailure(_) => Err(TransactionCommitSuccess::SPAN_NAME),
Self::DomainMetadataLoadFailure => Err(DomainMetadataLoadSuccess::SPAN_NAME),
Self::SetTransactionLoadFailure => Err(SetTransactionLoadSuccess::SPAN_NAME),
Self::CrcReadFailure => Err(CrcReadSuccess::SPAN_NAME),
}
}
pub(crate) fn record_str(&mut self, name: &str, value: &str) -> Result<(), &'static str> {
if name == "failure_reason" {
if let Self::TransactionCommitSuccess(e) = self {
let operation_id = e.operation_id;
let table_type = e.table_type;
let correlation_id = e.correlation_id.take();
*self = Self::TransactionCommitFailure(TransactionCommitFailure {
operation_id,
table_type,
correlation_id,
reason: value.parse().unwrap_or_else(|e| {
warn!("Invalid failure_reason '{value}' on span: {e}. Using Error.");
CommitFailureReason::Error
}),
});
}
return Ok(());
}
match self {
Self::TransactionCommitSuccess(e) => e.record_str(name, value),
_ => Ok(()),
}
}
pub(crate) fn into_failure(self) -> Self {
match self {
Self::LogSegmentLoadSuccess(e) => Self::LogSegmentLoadFailure(LogSegmentLoadFailure {
operation_id: e.operation_id,
table_type: e.table_type,
correlation_id: e.correlation_id,
}),
Self::ProtocolMetadataLoadSuccess(e) => {
Self::ProtocolMetadataLoadFailure(ProtocolMetadataLoadFailure {
operation_id: e.operation_id,
table_type: e.table_type,
correlation_id: e.correlation_id,
})
}
Self::SnapshotBuildSuccess(e) => Self::SnapshotBuildFailure(SnapshotBuildFailure {
operation_id: e.operation_id,
table_type: e.table_type,
correlation_id: e.correlation_id,
}),
Self::TransactionCommitSuccess(e) => {
Self::TransactionCommitFailure(TransactionCommitFailure {
operation_id: e.operation_id,
table_type: e.table_type,
correlation_id: e.correlation_id,
reason: CommitFailureReason::Error,
})
}
Self::DomainMetadataLoadSuccess(_) => Self::DomainMetadataLoadFailure,
Self::SetTransactionLoadSuccess(_) => Self::SetTransactionLoadFailure,
Self::CrcReadSuccess(_) => Self::CrcReadFailure,
e @ (Self::LogSegmentLoadFailure(_)
| Self::ProtocolMetadataLoadFailure(_)
| Self::SnapshotBuildFailure(_)
| Self::TransactionCommitFailure(_)
| Self::DomainMetadataLoadFailure
| Self::SetTransactionLoadFailure
| Self::CrcReadFailure
| Self::JsonReadCompleted(_)
| Self::ParquetReadCompleted(_)
| Self::ScanMetadataCompleted(_)
| Self::StorageListCompleted(_)
| Self::StorageReadCompleted(_)
| Self::StorageCopyCompleted(_)) => e,
}
}
}
impl fmt::Display for MetricEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::LogSegmentLoadSuccess(e) => e.fmt(f),
Self::LogSegmentLoadFailure(e) => e.fmt(f),
Self::ProtocolMetadataLoadSuccess(e) => e.fmt(f),
Self::ProtocolMetadataLoadFailure(e) => e.fmt(f),
Self::SnapshotBuildSuccess(e) => e.fmt(f),
Self::SnapshotBuildFailure(e) => e.fmt(f),
Self::TransactionCommitSuccess(e) => e.fmt(f),
Self::TransactionCommitFailure(e) => e.fmt(f),
Self::DomainMetadataLoadSuccess(e) => e.fmt(f),
Self::DomainMetadataLoadFailure => f.write_str("DomainMetadataLoadFailure"),
Self::SetTransactionLoadSuccess(e) => e.fmt(f),
Self::SetTransactionLoadFailure => f.write_str("SetTransactionLoadFailure"),
Self::CrcReadSuccess(e) => e.fmt(f),
Self::CrcReadFailure => f.write_str("CrcReadFailure"),
Self::JsonReadCompleted(e) => e.fmt(f),
Self::ParquetReadCompleted(e) => e.fmt(f),
Self::ScanMetadataCompleted(e) => e.fmt(f),
Self::StorageListCompleted(e) => e.fmt(f),
Self::StorageReadCompleted(e) => e.fmt(f),
Self::StorageCopyCompleted(e) => e.fmt(f),
}
}
}
pub(crate) const LOG_SEGMENT_LOADED_SPAN: &str = "segment.for_snapshot";
#[derive(Debug, Clone)]
pub struct LogSegmentLoadSuccess {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
pub num_commit_files: u64,
pub num_checkpoint_files: u64,
pub num_compaction_files: u64,
pub has_latest_crc_file: bool,
pub duration: Duration,
}
impl LogSegmentLoadSuccess {
pub(crate) const SPAN_NAME: &'static str = LOG_SEGMENT_LOADED_SPAN;
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
Self {
operation_id: MetricId::from_attrs(attrs),
table_type: TableType::from_catalog_managed(read_is_catalog_managed(attrs)),
correlation_id: correlation_id_from_attrs(attrs),
num_commit_files: 0,
num_checkpoint_files: 0,
num_compaction_files: 0,
has_latest_crc_file: false,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"num_commit_files" => self.num_commit_files = value,
"num_checkpoint_files" => self.num_checkpoint_files = value,
"num_compaction_files" => self.num_compaction_files = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match name {
"has_latest_crc_file" => self.has_latest_crc_file = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for LogSegmentLoadSuccess {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
table_type,
correlation_id,
duration,
num_commit_files,
num_checkpoint_files,
num_compaction_files,
has_latest_crc_file,
} = self;
write!(
f,
"LogSegmentLoadSuccess(id={operation_id}, table_type={table_type}, \
correlation_id={correlation_id:?}, \
duration={duration:?}, commits={num_commit_files}, \
checkpoints={num_checkpoint_files}, compactions={num_compaction_files}, \
has_latest_crc={has_latest_crc_file})"
)
}
}
#[derive(Debug, Clone)]
pub struct LogSegmentLoadFailure {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
}
impl fmt::Display for LogSegmentLoadFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"LogSegmentLoadFailure(id={}, table_type={}, correlation_id={:?})",
self.operation_id, self.table_type, self.correlation_id
)
}
}
pub(crate) const PROTOCOL_METADATA_LOADED_SPAN: &str = "segment.read_metadata";
#[derive(Debug, Clone)]
pub struct ProtocolMetadataLoadSuccess {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
pub duration: Duration,
}
impl ProtocolMetadataLoadSuccess {
pub(crate) const SPAN_NAME: &'static str = PROTOCOL_METADATA_LOADED_SPAN;
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
Self {
operation_id: MetricId::from_attrs(attrs),
table_type: TableType::from_catalog_managed(read_is_catalog_managed(attrs)),
correlation_id: correlation_id_from_attrs(attrs),
duration: Duration::default(),
}
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for ProtocolMetadataLoadSuccess {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
table_type,
correlation_id,
duration,
} = self;
write!(
f,
"ProtocolMetadataLoadSuccess(id={operation_id}, table_type={table_type}, \
correlation_id={correlation_id:?}, duration={duration:?})"
)
}
}
#[derive(Debug, Clone)]
pub struct ProtocolMetadataLoadFailure {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
}
impl fmt::Display for ProtocolMetadataLoadFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ProtocolMetadataLoadFailure(id={}, table_type={}, correlation_id={:?})",
self.operation_id, self.table_type, self.correlation_id
)
}
}
pub(crate) const SNAPSHOT_COMPLETED_SPAN: &str = "snap.build";
#[derive(Debug, Clone)]
pub struct SnapshotBuildSuccess {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
pub version: u64,
pub duration: Duration,
}
impl SnapshotBuildSuccess {
pub(crate) const SPAN_NAME: &'static str = SNAPSHOT_COMPLETED_SPAN;
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
Self {
operation_id: MetricId::from_attrs(attrs),
table_type: TableType::from_catalog_managed(read_is_catalog_managed(attrs)),
correlation_id: correlation_id_from_attrs(attrs),
version: 0,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"version" => self.version = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for SnapshotBuildSuccess {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
table_type,
correlation_id,
version,
duration,
} = self;
write!(
f,
"SnapshotBuildSuccess(id={operation_id}, table_type={table_type}, \
correlation_id={correlation_id:?}, version={version}, duration={duration:?})"
)
}
}
#[derive(Debug, Clone)]
pub struct SnapshotBuildFailure {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
}
impl fmt::Display for SnapshotBuildFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"SnapshotBuildFailure(id={}, table_type={}, correlation_id={:?})",
self.operation_id, self.table_type, self.correlation_id
)
}
}
pub(crate) const TRANSACTION_COMMIT_SPAN: &str = "txn.commit";
#[derive(Debug, Clone)]
pub struct TransactionCommitSuccess {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
pub commit_version: u64,
pub num_add_files: u64,
pub num_remove_files: u64,
pub num_dv_updates: u64,
pub add_files_bytes: u64,
pub remove_files_bytes: u64,
pub is_blind_append: bool,
pub data_change: bool,
pub operation: Option<String>,
pub prepare_duration: Duration,
pub committer_duration: Duration,
pub total_duration: Duration,
}
impl TransactionCommitSuccess {
pub(crate) const SPAN_NAME: &'static str = TRANSACTION_COMMIT_SPAN;
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
let mut v = TransactionCommitAttrs::default();
attrs.record(&mut v);
Self {
operation_id: MetricId::from_attrs(attrs),
table_type: TableType::from_catalog_managed(read_is_catalog_managed(attrs)),
correlation_id: correlation_id_from_attrs(attrs),
commit_version: v.commit_version,
num_add_files: 0,
num_remove_files: 0,
num_dv_updates: 0,
add_files_bytes: 0,
remove_files_bytes: 0,
is_blind_append: false,
data_change: false,
operation: None,
prepare_duration: Duration::default(),
committer_duration: Duration::default(),
total_duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"num_add_files" => self.num_add_files = value,
"num_remove_files" => self.num_remove_files = value,
"num_dv_updates" => self.num_dv_updates = value,
"add_files_bytes" => self.add_files_bytes = value,
"remove_files_bytes" => self.remove_files_bytes = value,
"prepare_duration_ns" => self.prepare_duration = Duration::from_nanos(value),
"committer_duration_ns" => self.committer_duration = Duration::from_nanos(value),
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match name {
"is_blind_append" => self.is_blind_append = value,
"data_change" => self.data_change = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn record_str(&mut self, name: &str, value: &str) -> Result<(), &'static str> {
match name {
"operation" => self.operation = Some(value.to_string()),
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.total_duration = d;
}
}
impl fmt::Display for TransactionCommitSuccess {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
table_type,
correlation_id,
commit_version,
num_add_files,
num_remove_files,
num_dv_updates,
add_files_bytes,
remove_files_bytes,
is_blind_append,
data_change,
operation,
prepare_duration,
committer_duration,
total_duration,
} = self;
write!(
f,
"TransactionCommitSuccess(id={operation_id}, table_type={table_type}, \
correlation_id={correlation_id:?}, version={commit_version}, \
total_duration={total_duration:?}, prepare={prepare_duration:?}, committer={committer_duration:?}, \
add_files={num_add_files}, remove_files={num_remove_files}, dv_updates={num_dv_updates}, \
add_bytes={add_files_bytes}, remove_bytes={remove_files_bytes}, \
is_blind_append={is_blind_append}, data_change={data_change}, operation={operation:?})"
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString, StrumDisplay, AsRefStr, IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub enum CommitFailureReason {
Conflict,
RetryableIo,
Error,
}
#[derive(Debug, Clone)]
pub struct TransactionCommitFailure {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
pub reason: CommitFailureReason,
}
impl fmt::Display for TransactionCommitFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
table_type,
correlation_id,
reason,
} = self;
write!(
f,
"TransactionCommitFailure(id={operation_id}, table_type={table_type}, \
correlation_id={correlation_id:?}, reason={reason})"
)
}
}
#[derive(Default)]
struct TransactionCommitAttrs {
commit_version: u64,
}
impl Visit for TransactionCommitAttrs {
fn record_u64(&mut self, field: &Field, value: u64) {
if field.name() == "commit_version" {
self.commit_version = value;
}
}
fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
}
pub(crate) const DOMAIN_METADATA_LOADED_SPAN: &str = "snap.get_domain_metadata";
#[derive(Debug, Clone)]
pub struct DomainMetadataLoadSuccess {
pub from_cache: bool,
pub num_domains_returned: u64,
pub duration: Duration,
}
impl DomainMetadataLoadSuccess {
pub(crate) const SPAN_NAME: &'static str = DOMAIN_METADATA_LOADED_SPAN;
pub(crate) fn from_attrs(_attrs: &Attributes<'_>) -> Self {
Self {
from_cache: false,
num_domains_returned: 0,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"num_domains_returned" => self.num_domains_returned = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match name {
"from_cache" => self.from_cache = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for DomainMetadataLoadSuccess {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
from_cache,
num_domains_returned,
duration,
} = self;
write!(
f,
"DomainMetadataLoadSuccess(duration={duration:?}, from_cache={from_cache}, \
num_domains_returned={num_domains_returned})"
)
}
}
pub(crate) const SET_TRANSACTION_LOADED_SPAN: &str = "snap.get_app_id_version";
#[derive(Debug, Clone)]
pub struct SetTransactionLoadSuccess {
pub from_cache: bool,
pub found: bool,
pub duration: Duration,
}
impl SetTransactionLoadSuccess {
pub(crate) const SPAN_NAME: &'static str = SET_TRANSACTION_LOADED_SPAN;
pub(crate) fn from_attrs(_attrs: &Attributes<'_>) -> Self {
Self {
from_cache: false,
found: false,
duration: Duration::default(),
}
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match name {
"from_cache" => self.from_cache = value,
"found" => self.found = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for SetTransactionLoadSuccess {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
from_cache,
found,
duration,
} = self;
write!(
f,
"SetTransactionLoadSuccess(duration={duration:?}, from_cache={from_cache}, found={found})"
)
}
}
pub(crate) const CRC_READ_COMPLETED_SPAN: &str = "crc_read_completed";
#[derive(Debug, Clone)]
pub struct CrcReadSuccess {
pub bytes_read: u64,
pub duration: Duration,
}
impl CrcReadSuccess {
pub(crate) const SPAN_NAME: &'static str = CRC_READ_COMPLETED_SPAN;
pub(crate) fn from_attrs(_attrs: &Attributes<'_>) -> Self {
Self {
bytes_read: 0,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"bytes_read" => self.bytes_read = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for CrcReadSuccess {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
duration,
bytes_read,
} = self;
write!(
f,
"CrcReadSuccess(duration={duration:?}, bytes={bytes_read})"
)
}
}
#[derive(Debug, Clone)]
pub struct JsonReadCompleted {
pub num_files: u64,
pub bytes_read: u64,
}
impl JsonReadCompleted {
pub(crate) const SPAN_NAME: &'static str = "json_read_completed";
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
let mut v = FileReadAttrs::default();
attrs.record(&mut v);
Self {
num_files: v.num_files,
bytes_read: v.bytes_read,
}
}
}
impl fmt::Display for JsonReadCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
num_files,
bytes_read,
} = self;
write!(
f,
"JsonReadCompleted(files={num_files}, bytes={bytes_read})"
)
}
}
#[derive(Debug, Clone)]
pub struct ParquetReadCompleted {
pub num_files: u64,
pub bytes_read: u64,
}
impl ParquetReadCompleted {
pub(crate) const SPAN_NAME: &'static str = "parquet_read_completed";
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
let mut v = FileReadAttrs::default();
attrs.record(&mut v);
Self {
num_files: v.num_files,
bytes_read: v.bytes_read,
}
}
}
impl fmt::Display for ParquetReadCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
num_files,
bytes_read,
} = self;
write!(
f,
"ParquetReadCompleted(files={num_files}, bytes={bytes_read})"
)
}
}
#[derive(Default)]
struct FileReadAttrs {
num_files: u64,
bytes_read: u64,
}
impl Visit for FileReadAttrs {
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"num_files" => self.num_files = value,
"bytes_read" => self.bytes_read = value,
_ => {}
}
}
fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumString, StrumDisplay, AsRefStr, IntoStaticStr)]
pub enum ScanType {
#[strum(serialize = "sequential")]
SequentialPhase,
#[strum(serialize = "parallel")]
ParallelPhase,
#[strum(serialize = "full")]
Full,
}
impl ScanType {
fn parse_lenient(s: &str) -> Self {
Self::from_str(s).unwrap_or_else(|e| {
warn!("Invalid scan_type '{s}' on span: {e}. Using Full.");
Self::Full
})
}
}
pub(crate) const IS_CATALOG_MANAGED_FIELD: &str = "is_catalog_managed";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
pub enum TableType {
#[default]
PathBased,
CatalogManaged,
}
impl TableType {
#[internal_api]
pub(crate) fn from_catalog_managed(is_catalog_managed: bool) -> Self {
if is_catalog_managed {
Self::CatalogManaged
} else {
Self::PathBased
}
}
pub(crate) fn is_catalog_managed(self) -> bool {
self == Self::CatalogManaged
}
}
impl fmt::Display for TableType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::PathBased => "path_based",
Self::CatalogManaged => "catalog_managed",
})
}
}
#[derive(Debug, Clone, Default)]
pub struct SnapshotLoadMetricContext {
pub(crate) operation_id: MetricId,
pub(crate) correlation_id: Option<Arc<str>>,
pub(crate) is_catalog_managed: bool,
}
pub(crate) fn read_is_catalog_managed(attrs: &Attributes<'_>) -> bool {
#[derive(Default)]
struct V(bool);
impl Visit for V {
fn record_bool(&mut self, field: &Field, value: bool) {
if field.name() == IS_CATALOG_MANAGED_FIELD {
self.0 = value;
}
}
fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
}
let mut v = V::default();
attrs.record(&mut v);
v.0
}
pub(crate) const CORRELATION_ID_FIELD: &str = "correlation_id";
pub(crate) fn correlation_id_from_attrs(attrs: &Attributes<'_>) -> Option<Arc<str>> {
#[derive(Default)]
struct CorrelationIdVisitor(Option<Arc<str>>);
impl Visit for CorrelationIdVisitor {
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == CORRELATION_ID_FIELD && !value.is_empty() {
self.0 = Some(value.into());
}
}
fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
}
let mut v = CorrelationIdVisitor::default();
attrs.record(&mut v);
v.0
}
#[derive(Debug, Clone)]
pub struct ScanMetadataCompleted {
pub operation_id: MetricId,
pub correlation_id: Option<Arc<str>>,
pub table_type: TableType,
pub scan_type: ScanType,
pub duration: Duration,
pub num_add_files_seen: u64,
pub num_active_add_files: u64,
pub active_add_files_bytes: u64,
pub num_remove_files_seen: u64,
pub num_non_file_actions: u64,
pub num_predicate_filtered: u64,
pub peak_hash_set_size: usize,
pub dedup_visitor_time_ms: u64,
pub predicate_eval_time_ms: u64,
}
impl ScanMetadataCompleted {
pub(crate) const SPAN_NAME: &'static str = "scan.metadata_completed";
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
let mut v = ScanMetadataCompletedAttrs::default();
attrs.record(&mut v);
Self {
operation_id: MetricId(v.operation_id),
table_type: TableType::from_catalog_managed(v.is_catalog_managed),
correlation_id: v.correlation_id,
scan_type: ScanType::parse_lenient(&v.scan_type),
duration: Duration::from_nanos(v.duration_ns),
num_add_files_seen: v.num_add_files_seen,
num_active_add_files: v.num_active_add_files,
active_add_files_bytes: v.active_add_files_bytes,
num_remove_files_seen: v.num_remove_files_seen,
num_non_file_actions: v.num_non_file_actions,
num_predicate_filtered: v.num_predicate_filtered,
peak_hash_set_size: v.peak_hash_set_size as usize,
dedup_visitor_time_ms: v.dedup_visitor_time_ms,
predicate_eval_time_ms: v.predicate_eval_time_ms,
}
}
}
impl fmt::Display for ScanMetadataCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
table_type,
correlation_id,
scan_type,
duration,
num_add_files_seen,
num_active_add_files,
active_add_files_bytes,
num_remove_files_seen,
num_non_file_actions,
num_predicate_filtered,
peak_hash_set_size,
dedup_visitor_time_ms,
predicate_eval_time_ms,
} = self;
write!(
f,
"ScanMetadataCompleted(id={operation_id}, table_type={table_type}, \
correlation_id={correlation_id:?}, scan_type={scan_type}, duration={duration:?}, \
add_files_seen={num_add_files_seen}, active_add_files={num_active_add_files}, \
active_add_files_bytes={active_add_files_bytes}, \
remove_files_seen={num_remove_files_seen}, non_file_actions={num_non_file_actions}, \
predicate_filtered={num_predicate_filtered}, peak_hash_set_size={peak_hash_set_size}, \
dedup_visitor_time_ms={dedup_visitor_time_ms}, predicate_eval_time_ms={predicate_eval_time_ms})"
)
}
}
#[derive(Default)]
struct ScanMetadataCompletedAttrs {
operation_id: Uuid,
is_catalog_managed: bool,
correlation_id: Option<Arc<str>>,
scan_type: String,
duration_ns: u64,
num_add_files_seen: u64,
num_active_add_files: u64,
active_add_files_bytes: u64,
num_remove_files_seen: u64,
num_non_file_actions: u64,
num_predicate_filtered: u64,
peak_hash_set_size: u64,
dedup_visitor_time_ms: u64,
predicate_eval_time_ms: u64,
}
impl Visit for ScanMetadataCompletedAttrs {
fn record_bool(&mut self, field: &Field, value: bool) {
if field.name() == IS_CATALOG_MANAGED_FIELD {
self.is_catalog_managed = value;
}
}
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == CORRELATION_ID_FIELD && !value.is_empty() {
self.correlation_id = Some(value.into());
}
}
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"duration_ns" => self.duration_ns = value,
"num_add_files_seen" => self.num_add_files_seen = value,
"num_active_add_files" => self.num_active_add_files = value,
"active_add_files_bytes" => self.active_add_files_bytes = value,
"num_remove_files_seen" => self.num_remove_files_seen = value,
"num_non_file_actions" => self.num_non_file_actions = value,
"num_predicate_filtered" => self.num_predicate_filtered = value,
"peak_hash_set_size" => self.peak_hash_set_size = value,
"dedup_visitor_time_ms" => self.dedup_visitor_time_ms = value,
"predicate_eval_time_ms" => self.predicate_eval_time_ms = value,
_ => {}
}
}
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
let s = format!("{value:?}");
match field.name() {
"operation_id" => match Uuid::from_str(&s) {
Ok(u) => self.operation_id = u,
Err(e) => warn!(
"Invalid uuid '{s}' on {}: {e}",
ScanMetadataCompleted::SPAN_NAME
),
},
"scan_type" => self.scan_type = s,
_ => {}
}
}
}
pub(crate) const STORAGE_SPAN: &str = "storage";
pub(crate) fn storage_metric_from_attrs(attrs: &Attributes<'_>) -> Option<MetricEvent> {
let mut v = StorageAttrs::default();
attrs.record(&mut v);
let duration = Duration::from_nanos(v.duration_ns);
match v.kind {
StorageKind::List => Some(MetricEvent::StorageListCompleted(StorageListCompleted {
duration,
num_files: v.num_files,
})),
StorageKind::Read => Some(MetricEvent::StorageReadCompleted(StorageReadCompleted {
duration,
num_files: v.num_files,
bytes_read: v.bytes_read,
})),
StorageKind::Copy => Some(MetricEvent::StorageCopyCompleted(StorageCopyCompleted {
duration,
})),
StorageKind::Unknown => None,
}
}
#[derive(Default)]
enum StorageKind {
#[default]
Unknown,
List,
Read,
Copy,
}
#[derive(Default)]
struct StorageAttrs {
kind: StorageKind,
num_files: u64,
bytes_read: u64,
duration_ns: u64,
}
impl Visit for StorageAttrs {
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "name" {
self.kind = match value {
StorageListCompleted::NAME => StorageKind::List,
StorageReadCompleted::NAME => StorageKind::Read,
StorageCopyCompleted::NAME => StorageKind::Copy,
_ => {
warn!("Storage span with unknown name: {value}");
StorageKind::Unknown
}
};
}
}
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"num_files" => self.num_files = value,
"bytes_read" => self.bytes_read = value,
"duration_ns" => self.duration_ns = value,
_ => {}
}
}
fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
}
#[derive(Debug, Clone)]
pub struct StorageListCompleted {
pub duration: Duration,
pub num_files: u64,
}
impl StorageListCompleted {
pub(crate) const NAME: &'static str = "list_completed";
}
impl fmt::Display for StorageListCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
duration,
num_files,
} = self;
write!(
f,
"StorageListCompleted(duration={duration:?}, files={num_files})"
)
}
}
#[derive(Debug, Clone)]
pub struct StorageReadCompleted {
pub duration: Duration,
pub num_files: u64,
pub bytes_read: u64,
}
impl StorageReadCompleted {
pub(crate) const NAME: &'static str = "read_completed";
}
impl fmt::Display for StorageReadCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
duration,
num_files,
bytes_read,
} = self;
write!(
f,
"StorageReadCompleted(duration={duration:?}, files={num_files}, bytes={bytes_read})"
)
}
}
#[derive(Debug, Clone)]
pub struct StorageCopyCompleted {
pub duration: Duration,
}
impl StorageCopyCompleted {
pub(crate) const NAME: &'static str = "copy_completed";
}
impl fmt::Display for StorageCopyCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { duration } = self;
write!(f, "StorageCopyCompleted(duration={duration:?})")
}
}
pub fn emit_json_read_completed(num_files: u64, bytes_read: u64) {
let _span = tracing::span!(
tracing::Level::INFO,
JsonReadCompleted::SPAN_NAME,
report = tracing::field::Empty,
num_files,
bytes_read,
);
}
pub fn emit_parquet_read_completed(num_files: u64, bytes_read: u64) {
let _span = tracing::span!(
tracing::Level::INFO,
ParquetReadCompleted::SPAN_NAME,
report = tracing::field::Empty,
num_files,
bytes_read,
);
}
pub(crate) fn emit_scan_metadata_completed(e: &ScanMetadataCompleted) {
let _span = tracing::span!(
tracing::Level::INFO,
ScanMetadataCompleted::SPAN_NAME,
report = tracing::field::Empty,
operation_id = %e.operation_id,
is_catalog_managed = e.table_type.is_catalog_managed(),
correlation_id = e.correlation_id.as_deref().unwrap_or(""),
scan_type = %e.scan_type,
duration_ns = e.duration.as_nanos() as u64,
num_add_files_seen = e.num_add_files_seen,
num_active_add_files = e.num_active_add_files,
active_add_files_bytes = e.active_add_files_bytes,
num_remove_files_seen = e.num_remove_files_seen,
num_non_file_actions = e.num_non_file_actions,
num_predicate_filtered = e.num_predicate_filtered,
peak_hash_set_size = e.peak_hash_set_size as u64,
dedup_visitor_time_ms = e.dedup_visitor_time_ms,
predicate_eval_time_ms = e.predicate_eval_time_ms,
);
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
fn commit_success(operation_id: MetricId) -> TransactionCommitSuccess {
TransactionCommitSuccess {
operation_id,
table_type: TableType::PathBased,
correlation_id: None,
commit_version: 1,
num_add_files: 0,
num_remove_files: 0,
num_dv_updates: 0,
add_files_bytes: 0,
remove_files_bytes: 0,
is_blind_append: false,
data_change: false,
operation: None,
prepare_duration: Duration::default(),
committer_duration: Duration::default(),
total_duration: Duration::default(),
}
}
#[rstest]
#[case::conflict("conflict", CommitFailureReason::Conflict)]
#[case::retryable_io("retryable_io", CommitFailureReason::RetryableIo)]
#[case::error("error", CommitFailureReason::Error)]
#[case::unknown_defaults_to_error("totally_unknown", CommitFailureReason::Error)]
fn record_str_failure_reason_flips_to_expected_reason(
#[case] value: &str,
#[case] expected: CommitFailureReason,
) {
let id = MetricId::new();
let mut event = MetricEvent::TransactionCommitSuccess(commit_success(id));
event.record_str("failure_reason", value).unwrap();
let MetricEvent::TransactionCommitFailure(failure) = event else {
panic!("expected TransactionCommitFailure");
};
assert_eq!(failure.operation_id, id);
assert_eq!(failure.reason, expected);
}
#[test]
fn record_str_operation_sets_field_without_flipping() {
let mut event = MetricEvent::TransactionCommitSuccess(commit_success(MetricId::new()));
event.record_str("operation", "WRITE").unwrap();
let MetricEvent::TransactionCommitSuccess(success) = event else {
panic!("expected TransactionCommitSuccess");
};
assert_eq!(success.operation.as_deref(), Some("WRITE"));
}
#[test]
fn record_u64_num_dv_updates_sets_field() {
let mut event = MetricEvent::TransactionCommitSuccess(commit_success(MetricId::new()));
event.record_u64("num_dv_updates", 7).unwrap();
let MetricEvent::TransactionCommitSuccess(success) = event else {
panic!("expected TransactionCommitSuccess");
};
assert_eq!(success.num_dv_updates, 7);
}
#[rstest]
#[case::conflict(CommitFailureReason::Conflict, "conflict")]
#[case::retryable_io(CommitFailureReason::RetryableIo, "retryable_io")]
#[case::error(CommitFailureReason::Error, "error")]
fn commit_failure_reason_serializes_to_wire_name_and_parses_back(
#[case] reason: CommitFailureReason,
#[case] wire: &str,
) {
let serialized: &'static str = reason.into();
assert_eq!(serialized, wire);
assert_eq!(CommitFailureReason::from_str(wire).unwrap(), reason);
}
#[rstest]
#[case::sequential(ScanType::SequentialPhase, "sequential")]
#[case::parallel(ScanType::ParallelPhase, "parallel")]
#[case::full(ScanType::Full, "full")]
fn scan_type_serializes_to_wire_name_and_parses_back(
#[case] scan_type: ScanType,
#[case] wire: &str,
) {
let serialized: &'static str = scan_type.into();
assert_eq!(serialized, wire);
assert_eq!(ScanType::from_str(wire).unwrap(), scan_type);
}
#[rstest]
#[case::known("parallel", ScanType::ParallelPhase)]
#[case::unknown_defaults_to_full("totally_unknown", ScanType::Full)]
fn scan_type_parse_lenient_maps_unknown_to_full(
#[case] value: &str,
#[case] expected: ScanType,
) {
assert_eq!(ScanType::parse_lenient(value), expected);
}
#[test]
fn into_failure_maps_commit_success_to_error_reason() {
let id = MetricId::new();
let failure = MetricEvent::TransactionCommitSuccess(commit_success(id)).into_failure();
let MetricEvent::TransactionCommitFailure(failure) = failure else {
panic!("expected TransactionCommitFailure");
};
assert_eq!(failure.operation_id, id);
assert_eq!(failure.reason, CommitFailureReason::Error);
}
#[test]
fn record_str_failure_reason_flip_preserves_correlation_id() {
let mut success = commit_success(MetricId::new());
success.correlation_id = Some("commit-req-1".into());
let mut event = MetricEvent::TransactionCommitSuccess(success);
event.record_str("failure_reason", "conflict").unwrap();
let MetricEvent::TransactionCommitFailure(failure) = event else {
panic!("expected TransactionCommitFailure");
};
assert_eq!(failure.correlation_id.as_deref(), Some("commit-req-1"));
}
#[test]
fn into_failure_preserves_correlation_id() {
let mut success = commit_success(MetricId::new());
success.correlation_id = Some("commit-req-2".into());
let MetricEvent::TransactionCommitFailure(failure) =
MetricEvent::TransactionCommitSuccess(success).into_failure()
else {
panic!("expected TransactionCommitFailure");
};
assert_eq!(failure.correlation_id.as_deref(), Some("commit-req-2"));
let snapshot = SnapshotBuildSuccess {
operation_id: MetricId::new(),
table_type: TableType::PathBased,
correlation_id: Some("snap-req-3".into()),
version: 0,
duration: Duration::default(),
};
let MetricEvent::SnapshotBuildFailure(failure) =
MetricEvent::SnapshotBuildSuccess(snapshot).into_failure()
else {
panic!("expected SnapshotBuildFailure");
};
assert_eq!(failure.correlation_id.as_deref(), Some("snap-req-3"));
}
}