zarr_datafusion/reader/
stats.rs1use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10#[derive(Debug, Default)]
15pub struct ZarrIoStats {
16 pub metadata_bytes: AtomicU64,
18 pub coord_bytes: AtomicU64,
19 pub data_bytes: AtomicU64,
20
21 pub disk_bytes: AtomicU64,
23
24 pub coord_arrays: AtomicU64,
26 pub data_arrays: AtomicU64,
27
28 pub metadata_nanos: AtomicU64,
30 pub coord_nanos: AtomicU64,
31 pub data_nanos: AtomicU64,
32}
33
34impl ZarrIoStats {
35 pub fn new() -> Self {
36 Self::default()
37 }
38
39 pub fn total_bytes(&self) -> u64 {
40 self.metadata_bytes.load(Ordering::Relaxed)
41 + self.coord_bytes.load(Ordering::Relaxed)
42 + self.data_bytes.load(Ordering::Relaxed)
43 }
44
45 pub fn total_arrays(&self) -> u64 {
46 self.coord_arrays.load(Ordering::Relaxed) + self.data_arrays.load(Ordering::Relaxed)
47 }
48
49 pub fn metadata_time(&self) -> Duration {
50 Duration::from_nanos(self.metadata_nanos.load(Ordering::Relaxed))
51 }
52
53 pub fn coord_time(&self) -> Duration {
54 Duration::from_nanos(self.coord_nanos.load(Ordering::Relaxed))
55 }
56
57 pub fn data_time(&self) -> Duration {
58 Duration::from_nanos(self.data_nanos.load(Ordering::Relaxed))
59 }
60
61 pub fn record_metadata(&self, bytes: u64, duration: Duration) {
63 self.metadata_bytes.fetch_add(bytes, Ordering::Relaxed);
64 self.metadata_nanos
65 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
66 }
67
68 pub fn record_coord(&self, bytes: u64, duration: Duration) {
70 self.coord_bytes.fetch_add(bytes, Ordering::Relaxed);
71 self.coord_nanos
72 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
73 self.coord_arrays.fetch_add(1, Ordering::Relaxed);
74 }
75
76 pub fn record_data(&self, bytes: u64, duration: Duration) {
78 self.data_bytes.fetch_add(bytes, Ordering::Relaxed);
79 self.data_nanos
80 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
81 self.data_arrays.fetch_add(1, Ordering::Relaxed);
82 }
83
84 pub fn record_disk_read(&self, bytes: u64) {
86 self.disk_bytes.fetch_add(bytes, Ordering::Relaxed);
87 }
88
89 pub fn total_disk_bytes(&self) -> u64 {
91 self.disk_bytes.load(Ordering::Relaxed)
92 }
93}
94
95pub type SharedIoStats = Arc<ZarrIoStats>;
97
98pub fn format_bytes(bytes: u64) -> String {
100 if bytes >= 1_000_000_000 {
101 format!("{:.2} GB", bytes as f64 / 1_000_000_000.0)
102 } else if bytes >= 1_000_000 {
103 format!("{:.2} MB", bytes as f64 / 1_000_000.0)
104 } else if bytes >= 1_000 {
105 format!("{:.2} KB", bytes as f64 / 1_000.0)
106 } else {
107 format!("{} B", bytes)
108 }
109}