use std::fmt;
use std::str::FromStr as _;
use std::time::Duration;
use tracing::field::{Field, Visit};
use tracing::span::Attributes;
use tracing::warn;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct MetricId(pub(crate) Uuid);
impl MetricId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
#[derive(Default)]
struct V(Uuid);
impl Visit for V {
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
if field.name() == "operation_id" {
let s = format!("{value:?}");
match Uuid::from_str(&s) {
Ok(u) => self.0 = u,
Err(e) => warn!("Invalid uuid '{s}' on span: {e}. Using default."),
}
}
}
}
let mut v = V::default();
attrs.record(&mut v);
Self(v.0)
}
}
impl Default for MetricId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for MetricId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
#[derive(Debug, Clone)]
pub enum MetricEvent {
LogSegmentLoaded(LogSegmentLoaded),
ProtocolMetadataLoaded(ProtocolMetadataLoaded),
SnapshotCompleted(SnapshotCompleted),
SnapshotFailed(SnapshotFailed),
DomainMetadataLoaded(DomainMetadataLoaded),
SetTransactionLoaded(SetTransactionLoaded),
CrcReadCompleted(CrcReadCompleted),
JsonReadCompleted(JsonReadCompleted),
ParquetReadCompleted(ParquetReadCompleted),
ScanMetadataCompleted(ScanMetadataCompleted),
StorageListCompleted(StorageListCompleted),
StorageReadCompleted(StorageReadCompleted),
StorageCopyCompleted(StorageCopyCompleted),
}
impl MetricEvent {
pub(crate) fn set_duration_if_applicable(&mut self, d: Duration) {
match self {
Self::LogSegmentLoaded(e) => e.set_duration(d),
Self::ProtocolMetadataLoaded(e) => e.set_duration(d),
Self::SnapshotCompleted(e) => e.set_duration(d),
Self::SnapshotFailed(e) => e.set_duration(d),
Self::DomainMetadataLoaded(e) => e.set_duration(d),
Self::SetTransactionLoaded(e) => e.set_duration(d),
Self::CrcReadCompleted(e) => e.set_duration(d),
Self::ScanMetadataCompleted(_)
| Self::StorageListCompleted(_)
| Self::StorageReadCompleted(_)
| Self::StorageCopyCompleted(_)
| Self::JsonReadCompleted(_)
| Self::ParquetReadCompleted(_) => {}
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match self {
Self::LogSegmentLoaded(e) => e.record_u64(name, value),
Self::SnapshotCompleted(e) => e.record_u64(name, value),
Self::DomainMetadataLoaded(e) => e.record_u64(name, value),
Self::CrcReadCompleted(e) => e.record_u64(name, value),
Self::ProtocolMetadataLoaded(_) => Err(ProtocolMetadataLoaded::SPAN_NAME),
Self::SnapshotFailed(_) => Err(SnapshotCompleted::SPAN_NAME),
Self::SetTransactionLoaded(_) => Err(SetTransactionLoaded::SPAN_NAME),
Self::ScanMetadataCompleted(_) => Err(ScanMetadataCompleted::SPAN_NAME),
Self::JsonReadCompleted(_) => Err(JsonReadCompleted::SPAN_NAME),
Self::ParquetReadCompleted(_) => Err(ParquetReadCompleted::SPAN_NAME),
Self::StorageListCompleted(_)
| Self::StorageReadCompleted(_)
| Self::StorageCopyCompleted(_) => Err(STORAGE_SPAN),
}
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match self {
Self::LogSegmentLoaded(e) => e.record_bool(name, value),
Self::DomainMetadataLoaded(e) => e.record_bool(name, value),
Self::SetTransactionLoaded(e) => e.record_bool(name, value),
Self::ProtocolMetadataLoaded(_) => Err(ProtocolMetadataLoaded::SPAN_NAME),
Self::SnapshotCompleted(_) => Err(SnapshotCompleted::SPAN_NAME),
Self::SnapshotFailed(_) => Err(SnapshotCompleted::SPAN_NAME),
Self::CrcReadCompleted(_) => Err(CrcReadCompleted::SPAN_NAME),
Self::ScanMetadataCompleted(_) => Err(ScanMetadataCompleted::SPAN_NAME),
Self::JsonReadCompleted(_) => Err(JsonReadCompleted::SPAN_NAME),
Self::ParquetReadCompleted(_) => Err(ParquetReadCompleted::SPAN_NAME),
Self::StorageListCompleted(_)
| Self::StorageReadCompleted(_)
| Self::StorageCopyCompleted(_) => Err(STORAGE_SPAN),
}
}
}
impl fmt::Display for MetricEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::LogSegmentLoaded(e) => e.fmt(f),
Self::ProtocolMetadataLoaded(e) => e.fmt(f),
Self::SnapshotCompleted(e) => e.fmt(f),
Self::SnapshotFailed(e) => e.fmt(f),
Self::DomainMetadataLoaded(e) => e.fmt(f),
Self::SetTransactionLoaded(e) => e.fmt(f),
Self::CrcReadCompleted(e) => e.fmt(f),
Self::JsonReadCompleted(e) => e.fmt(f),
Self::ParquetReadCompleted(e) => e.fmt(f),
Self::ScanMetadataCompleted(e) => e.fmt(f),
Self::StorageListCompleted(e) => e.fmt(f),
Self::StorageReadCompleted(e) => e.fmt(f),
Self::StorageCopyCompleted(e) => e.fmt(f),
}
}
}
pub(crate) const LOG_SEGMENT_LOADED_SPAN: &str = "segment.for_snapshot";
#[derive(Debug, Clone)]
pub struct LogSegmentLoaded {
pub operation_id: MetricId,
pub num_commit_files: u64,
pub num_checkpoint_files: u64,
pub num_compaction_files: u64,
pub has_latest_crc_file: bool,
pub duration: Duration,
}
impl LogSegmentLoaded {
pub(crate) const SPAN_NAME: &'static str = LOG_SEGMENT_LOADED_SPAN;
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
Self {
operation_id: MetricId::from_attrs(attrs),
num_commit_files: 0,
num_checkpoint_files: 0,
num_compaction_files: 0,
has_latest_crc_file: false,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"num_commit_files" => self.num_commit_files = value,
"num_checkpoint_files" => self.num_checkpoint_files = value,
"num_compaction_files" => self.num_compaction_files = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match name {
"has_latest_crc_file" => self.has_latest_crc_file = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for LogSegmentLoaded {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
duration,
num_commit_files,
num_checkpoint_files,
num_compaction_files,
has_latest_crc_file,
} = self;
write!(
f,
"LogSegmentLoaded(id={operation_id}, duration={duration:?}, \
commits={num_commit_files}, checkpoints={num_checkpoint_files}, \
compactions={num_compaction_files}, has_latest_crc={has_latest_crc_file})"
)
}
}
pub(crate) const PROTOCOL_METADATA_LOADED_SPAN: &str = "segment.read_metadata";
#[derive(Debug, Clone)]
pub struct ProtocolMetadataLoaded {
pub operation_id: MetricId,
pub duration: Duration,
}
impl ProtocolMetadataLoaded {
pub(crate) const SPAN_NAME: &'static str = PROTOCOL_METADATA_LOADED_SPAN;
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
Self {
operation_id: MetricId::from_attrs(attrs),
duration: Duration::default(),
}
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for ProtocolMetadataLoaded {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
duration,
} = self;
write!(
f,
"ProtocolMetadataLoaded(id={operation_id}, duration={duration:?})"
)
}
}
pub(crate) const SNAPSHOT_COMPLETED_SPAN: &str = "snap.build";
#[derive(Debug, Clone)]
pub struct SnapshotCompleted {
pub operation_id: MetricId,
pub version: u64,
pub duration: Duration,
}
impl SnapshotCompleted {
pub(crate) const SPAN_NAME: &'static str = SNAPSHOT_COMPLETED_SPAN;
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
Self {
operation_id: MetricId::from_attrs(attrs),
version: 0,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"version" => self.version = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
pub(crate) fn into_failed(self) -> SnapshotFailed {
SnapshotFailed {
operation_id: self.operation_id,
duration: self.duration,
}
}
}
impl fmt::Display for SnapshotCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
version,
duration,
} = self;
write!(
f,
"SnapshotCompleted(id={operation_id}, version={version}, duration={duration:?})"
)
}
}
#[derive(Debug, Clone)]
pub struct SnapshotFailed {
pub operation_id: MetricId,
pub duration: Duration,
}
impl SnapshotFailed {
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for SnapshotFailed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
duration,
} = self;
write!(
f,
"SnapshotFailed(id={operation_id}, duration={duration:?})"
)
}
}
pub(crate) const DOMAIN_METADATA_LOADED_SPAN: &str = "snap.get_domain_metadata";
#[derive(Debug, Clone)]
pub struct DomainMetadataLoaded {
pub from_cache: bool,
pub num_domains_returned: u64,
pub duration: Duration,
}
impl DomainMetadataLoaded {
pub(crate) const SPAN_NAME: &'static str = DOMAIN_METADATA_LOADED_SPAN;
pub(crate) fn from_attrs(_attrs: &Attributes<'_>) -> Self {
Self {
from_cache: false,
num_domains_returned: 0,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"num_domains_returned" => self.num_domains_returned = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match name {
"from_cache" => self.from_cache = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for DomainMetadataLoaded {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
from_cache,
num_domains_returned,
duration,
} = self;
write!(
f,
"DomainMetadataLoaded(duration={duration:?}, from_cache={from_cache}, \
num_domains_returned={num_domains_returned})"
)
}
}
pub(crate) const SET_TRANSACTION_LOADED_SPAN: &str = "snap.get_app_id_version";
#[derive(Debug, Clone)]
pub struct SetTransactionLoaded {
pub from_cache: bool,
pub found: bool,
pub duration: Duration,
}
impl SetTransactionLoaded {
pub(crate) const SPAN_NAME: &'static str = SET_TRANSACTION_LOADED_SPAN;
pub(crate) fn from_attrs(_attrs: &Attributes<'_>) -> Self {
Self {
from_cache: false,
found: false,
duration: Duration::default(),
}
}
pub(crate) fn record_bool(&mut self, name: &str, value: bool) -> Result<(), &'static str> {
match name {
"from_cache" => self.from_cache = value,
"found" => self.found = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for SetTransactionLoaded {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
from_cache,
found,
duration,
} = self;
write!(
f,
"SetTransactionLoaded(duration={duration:?}, from_cache={from_cache}, found={found})"
)
}
}
pub(crate) const CRC_READ_COMPLETED_SPAN: &str = "crc_read_completed";
#[derive(Debug, Clone)]
pub struct CrcReadCompleted {
pub bytes_read: u64,
pub duration: Duration,
}
impl CrcReadCompleted {
pub(crate) const SPAN_NAME: &'static str = CRC_READ_COMPLETED_SPAN;
pub(crate) fn from_attrs(_attrs: &Attributes<'_>) -> Self {
Self {
bytes_read: 0,
duration: Duration::default(),
}
}
pub(crate) fn record_u64(&mut self, name: &str, value: u64) -> Result<(), &'static str> {
match name {
"bytes_read" => self.bytes_read = value,
_ => return Err(Self::SPAN_NAME),
}
Ok(())
}
pub(crate) fn set_duration(&mut self, d: Duration) {
self.duration = d;
}
}
impl fmt::Display for CrcReadCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
duration,
bytes_read,
} = self;
write!(
f,
"CrcReadCompleted(duration={duration:?}, bytes={bytes_read})"
)
}
}
#[derive(Debug, Clone)]
pub struct JsonReadCompleted {
pub num_files: u64,
pub bytes_read: u64,
}
impl JsonReadCompleted {
pub(crate) const SPAN_NAME: &'static str = "json_read_completed";
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
let mut v = FileReadAttrs::default();
attrs.record(&mut v);
Self {
num_files: v.num_files,
bytes_read: v.bytes_read,
}
}
}
impl fmt::Display for JsonReadCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
num_files,
bytes_read,
} = self;
write!(
f,
"JsonReadCompleted(files={num_files}, bytes={bytes_read})"
)
}
}
#[derive(Debug, Clone)]
pub struct ParquetReadCompleted {
pub num_files: u64,
pub bytes_read: u64,
}
impl ParquetReadCompleted {
pub(crate) const SPAN_NAME: &'static str = "parquet_read_completed";
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
let mut v = FileReadAttrs::default();
attrs.record(&mut v);
Self {
num_files: v.num_files,
bytes_read: v.bytes_read,
}
}
}
impl fmt::Display for ParquetReadCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
num_files,
bytes_read,
} = self;
write!(
f,
"ParquetReadCompleted(files={num_files}, bytes={bytes_read})"
)
}
}
#[derive(Default)]
struct FileReadAttrs {
num_files: u64,
bytes_read: u64,
}
impl Visit for FileReadAttrs {
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"num_files" => self.num_files = value,
"bytes_read" => self.bytes_read = value,
_ => {}
}
}
fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScanType {
SequentialPhase,
ParallelPhase,
Full,
}
impl ScanType {
fn parse(s: &str) -> Self {
match s {
"sequential" => Self::SequentialPhase,
"parallel" => Self::ParallelPhase,
_ => Self::Full,
}
}
}
impl fmt::Display for ScanType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::SequentialPhase => "sequential",
Self::ParallelPhase => "parallel",
Self::Full => "full",
})
}
}
#[derive(Debug, Clone)]
pub struct ScanMetadataCompleted {
pub operation_id: MetricId,
pub scan_type: ScanType,
pub duration: Duration,
pub num_add_files_seen: u64,
pub num_active_add_files: u64,
pub active_add_files_bytes: u64,
pub num_remove_files_seen: u64,
pub num_non_file_actions: u64,
pub num_predicate_filtered: u64,
pub peak_hash_set_size: usize,
pub dedup_visitor_time_ms: u64,
pub predicate_eval_time_ms: u64,
}
impl ScanMetadataCompleted {
pub(crate) const SPAN_NAME: &'static str = "scan.metadata_completed";
pub(crate) fn from_attrs(attrs: &Attributes<'_>) -> Self {
let mut v = ScanMetadataCompletedAttrs::default();
attrs.record(&mut v);
Self {
operation_id: MetricId(v.operation_id),
scan_type: ScanType::parse(&v.scan_type),
duration: Duration::from_nanos(v.duration_ns),
num_add_files_seen: v.num_add_files_seen,
num_active_add_files: v.num_active_add_files,
active_add_files_bytes: v.active_add_files_bytes,
num_remove_files_seen: v.num_remove_files_seen,
num_non_file_actions: v.num_non_file_actions,
num_predicate_filtered: v.num_predicate_filtered,
peak_hash_set_size: v.peak_hash_set_size as usize,
dedup_visitor_time_ms: v.dedup_visitor_time_ms,
predicate_eval_time_ms: v.predicate_eval_time_ms,
}
}
}
impl fmt::Display for ScanMetadataCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
operation_id,
scan_type,
duration,
num_add_files_seen,
num_active_add_files,
active_add_files_bytes,
num_remove_files_seen,
num_non_file_actions,
num_predicate_filtered,
peak_hash_set_size,
dedup_visitor_time_ms,
predicate_eval_time_ms,
} = self;
write!(
f,
"ScanMetadataCompleted(id={operation_id}, scan_type={scan_type}, duration={duration:?}, \
add_files_seen={num_add_files_seen}, active_add_files={num_active_add_files}, \
active_add_files_bytes={active_add_files_bytes}, \
remove_files_seen={num_remove_files_seen}, non_file_actions={num_non_file_actions}, \
predicate_filtered={num_predicate_filtered}, peak_hash_set_size={peak_hash_set_size}, \
dedup_visitor_time_ms={dedup_visitor_time_ms}, predicate_eval_time_ms={predicate_eval_time_ms})"
)
}
}
#[derive(Default)]
struct ScanMetadataCompletedAttrs {
operation_id: Uuid,
scan_type: String,
duration_ns: u64,
num_add_files_seen: u64,
num_active_add_files: u64,
active_add_files_bytes: u64,
num_remove_files_seen: u64,
num_non_file_actions: u64,
num_predicate_filtered: u64,
peak_hash_set_size: u64,
dedup_visitor_time_ms: u64,
predicate_eval_time_ms: u64,
}
impl Visit for ScanMetadataCompletedAttrs {
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"duration_ns" => self.duration_ns = value,
"num_add_files_seen" => self.num_add_files_seen = value,
"num_active_add_files" => self.num_active_add_files = value,
"active_add_files_bytes" => self.active_add_files_bytes = value,
"num_remove_files_seen" => self.num_remove_files_seen = value,
"num_non_file_actions" => self.num_non_file_actions = value,
"num_predicate_filtered" => self.num_predicate_filtered = value,
"peak_hash_set_size" => self.peak_hash_set_size = value,
"dedup_visitor_time_ms" => self.dedup_visitor_time_ms = value,
"predicate_eval_time_ms" => self.predicate_eval_time_ms = value,
_ => {}
}
}
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
let s = format!("{value:?}");
match field.name() {
"operation_id" => match Uuid::from_str(&s) {
Ok(u) => self.operation_id = u,
Err(e) => warn!(
"Invalid uuid '{s}' on {}: {e}",
ScanMetadataCompleted::SPAN_NAME
),
},
"scan_type" => self.scan_type = s,
_ => {}
}
}
}
pub(crate) const STORAGE_SPAN: &str = "storage";
pub(crate) fn storage_metric_from_attrs(attrs: &Attributes<'_>) -> Option<MetricEvent> {
let mut v = StorageAttrs::default();
attrs.record(&mut v);
let duration = Duration::from_nanos(v.duration_ns);
match v.kind {
StorageKind::List => Some(MetricEvent::StorageListCompleted(StorageListCompleted {
duration,
num_files: v.num_files,
})),
StorageKind::Read => Some(MetricEvent::StorageReadCompleted(StorageReadCompleted {
duration,
num_files: v.num_files,
bytes_read: v.bytes_read,
})),
StorageKind::Copy => Some(MetricEvent::StorageCopyCompleted(StorageCopyCompleted {
duration,
})),
StorageKind::Unknown => None,
}
}
#[derive(Default)]
enum StorageKind {
#[default]
Unknown,
List,
Read,
Copy,
}
#[derive(Default)]
struct StorageAttrs {
kind: StorageKind,
num_files: u64,
bytes_read: u64,
duration_ns: u64,
}
impl Visit for StorageAttrs {
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "name" {
self.kind = match value {
StorageListCompleted::NAME => StorageKind::List,
StorageReadCompleted::NAME => StorageKind::Read,
StorageCopyCompleted::NAME => StorageKind::Copy,
_ => {
warn!("Storage span with unknown name: {value}");
StorageKind::Unknown
}
};
}
}
fn record_u64(&mut self, field: &Field, value: u64) {
match field.name() {
"num_files" => self.num_files = value,
"bytes_read" => self.bytes_read = value,
"duration_ns" => self.duration_ns = value,
_ => {}
}
}
fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
}
#[derive(Debug, Clone)]
pub struct StorageListCompleted {
pub duration: Duration,
pub num_files: u64,
}
impl StorageListCompleted {
pub(crate) const NAME: &'static str = "list_completed";
}
impl fmt::Display for StorageListCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
duration,
num_files,
} = self;
write!(
f,
"StorageListCompleted(duration={duration:?}, files={num_files})"
)
}
}
#[derive(Debug, Clone)]
pub struct StorageReadCompleted {
pub duration: Duration,
pub num_files: u64,
pub bytes_read: u64,
}
impl StorageReadCompleted {
pub(crate) const NAME: &'static str = "read_completed";
}
impl fmt::Display for StorageReadCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
duration,
num_files,
bytes_read,
} = self;
write!(
f,
"StorageReadCompleted(duration={duration:?}, files={num_files}, bytes={bytes_read})"
)
}
}
#[derive(Debug, Clone)]
pub struct StorageCopyCompleted {
pub duration: Duration,
}
impl StorageCopyCompleted {
pub(crate) const NAME: &'static str = "copy_completed";
}
impl fmt::Display for StorageCopyCompleted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self { duration } = self;
write!(f, "StorageCopyCompleted(duration={duration:?})")
}
}
pub fn emit_json_read_completed(num_files: u64, bytes_read: u64) {
let _span = tracing::span!(
tracing::Level::INFO,
JsonReadCompleted::SPAN_NAME,
report = tracing::field::Empty,
num_files,
bytes_read,
);
}
pub fn emit_parquet_read_completed(num_files: u64, bytes_read: u64) {
let _span = tracing::span!(
tracing::Level::INFO,
ParquetReadCompleted::SPAN_NAME,
report = tracing::field::Empty,
num_files,
bytes_read,
);
}
pub(crate) fn emit_scan_metadata_completed(e: &ScanMetadataCompleted) {
let _span = tracing::span!(
tracing::Level::INFO,
ScanMetadataCompleted::SPAN_NAME,
report = tracing::field::Empty,
operation_id = %e.operation_id,
scan_type = %e.scan_type,
duration_ns = e.duration.as_nanos() as u64,
num_add_files_seen = e.num_add_files_seen,
num_active_add_files = e.num_active_add_files,
active_add_files_bytes = e.active_add_files_bytes,
num_remove_files_seen = e.num_remove_files_seen,
num_non_file_actions = e.num_non_file_actions,
num_predicate_filtered = e.num_predicate_filtered,
peak_hash_set_size = e.peak_hash_set_size as u64,
dedup_visitor_time_ms = e.dedup_visitor_time_ms,
predicate_eval_time_ms = e.predicate_eval_time_ms,
);
}