use std::fmt;
use std::time::Duration;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct MetricId(Uuid);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScanType {
SequentialPhase,
ParallelPhase,
Full,
}
impl std::fmt::Display for ScanType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let scan_type = match self {
ScanType::SequentialPhase => "sequential",
ScanType::ParallelPhase => "parallel",
ScanType::Full => "full",
};
write!(f, "{scan_type}")
}
}
impl MetricId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
}
impl Default for MetricId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for MetricId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone)]
pub enum MetricEvent {
LogSegmentLoaded {
operation_id: MetricId,
duration: Duration,
num_commit_files: u64,
num_checkpoint_files: u64,
num_compaction_files: u64,
},
ProtocolMetadataLoaded {
operation_id: MetricId,
duration: Duration,
},
SnapshotCompleted {
operation_id: MetricId,
version: u64,
total_duration: Duration,
},
SnapshotFailed {
operation_id: MetricId,
duration: Duration,
},
StorageListCompleted { duration: Duration, num_files: u64 },
StorageReadCompleted {
duration: Duration,
num_files: u64,
bytes_read: u64,
},
StorageCopyCompleted { duration: Duration },
JsonReadCompleted { num_files: u64, bytes_read: u64 },
ParquetReadCompleted { num_files: u64, bytes_read: u64 },
ScanMetadataCompleted {
operation_id: MetricId,
scan_type: ScanType,
total_duration: Duration,
num_add_files_seen: u64,
num_active_add_files: u64,
num_remove_files_seen: u64,
num_non_file_actions: u64,
num_predicate_filtered: u64,
peak_hash_set_size: usize,
dedup_visitor_time_ms: u64,
predicate_eval_time_ms: u64,
},
}
impl fmt::Display for MetricEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MetricEvent::LogSegmentLoaded {
operation_id,
duration,
num_commit_files,
num_checkpoint_files,
num_compaction_files,
} => write!(
f,
"LogSegmentLoaded(id={operation_id}, duration={duration:?}, commits={num_commit_files}, checkpoints={num_checkpoint_files}, compactions={num_compaction_files})"
),
MetricEvent::ProtocolMetadataLoaded {
operation_id,
duration,
} => write!(
f,
"ProtocolMetadataLoaded(id={operation_id}, duration={duration:?})"
),
MetricEvent::SnapshotCompleted {
operation_id,
version,
total_duration,
} => write!(
f,
"SnapshotCompleted(id={operation_id}, version={version}, duration={total_duration:?})"
),
MetricEvent::SnapshotFailed {
operation_id,
duration,
} => write!(
f,
"SnapshotFailed(id={operation_id}, duration={duration:?})"
),
MetricEvent::StorageListCompleted {
duration,
num_files,
} => write!(
f,
"StorageListCompleted(duration={duration:?}, files={num_files})"
),
MetricEvent::StorageReadCompleted {
duration,
num_files,
bytes_read,
} => write!(
f,
"StorageReadCompleted(duration={duration:?}, files={num_files}, bytes={bytes_read})"
),
MetricEvent::StorageCopyCompleted { duration } => write!(
f,
"StorageCopyCompleted(duration={duration:?})"
),
MetricEvent::JsonReadCompleted {
num_files,
bytes_read,
} => write!(
f,
"JsonReadCompleted(files={num_files}, bytes={bytes_read})"
),
MetricEvent::ParquetReadCompleted {
num_files,
bytes_read,
} => write!(
f,
"ParquetReadCompleted(files={num_files}, bytes={bytes_read})"
),
MetricEvent::ScanMetadataCompleted {
operation_id,
scan_type,
total_duration,
num_add_files_seen,
num_active_add_files,
num_remove_files_seen,
num_non_file_actions,
num_predicate_filtered,
peak_hash_set_size,
dedup_visitor_time_ms,
predicate_eval_time_ms,
} => write!(
f,
"ScanMetadataCompleted(id={operation_id}, scan_type={scan_type}, duration={total_duration:?}, \
add_files_seen={num_add_files_seen}, active_add_files={num_active_add_files}, \
remove_files_seen={num_remove_files_seen}, non_file_actions={num_non_file_actions}, \
predicate_filtered={num_predicate_filtered}, peak_hash_set_size={peak_hash_set_size}, \
dedup_visitor_time_ms={dedup_visitor_time_ms}, predicate_eval_time_ms={predicate_eval_time_ms})"
),
}
}
}