zarr_datafusion/reader/
stats.rs

1//! I/O statistics for Zarr reads
2//!
3//! Tracks bytes read, arrays accessed, and timing breakdown for metadata,
4//! coordinates, and data variables.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9
10/// I/O statistics collected during Zarr reads.
11///
12/// Uses atomic counters for thread-safety without locks, which is important
13/// for async/multi-threaded execution in DataFusion.
14#[derive(Debug, Default)]
15pub struct ZarrIoStats {
16    // Byte counts (in-memory/uncompressed)
17    pub metadata_bytes: AtomicU64,
18    pub coord_bytes: AtomicU64,
19    pub data_bytes: AtomicU64,
20
21    // Disk bytes (actual I/O, compressed)
22    pub disk_bytes: AtomicU64,
23
24    // Array counts
25    pub coord_arrays: AtomicU64,
26    pub data_arrays: AtomicU64,
27
28    // Timing (stored as nanoseconds)
29    pub metadata_nanos: AtomicU64,
30    pub coord_nanos: AtomicU64,
31    pub data_nanos: AtomicU64,
32}
33
34impl ZarrIoStats {
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    pub fn total_bytes(&self) -> u64 {
40        self.metadata_bytes.load(Ordering::Relaxed)
41            + self.coord_bytes.load(Ordering::Relaxed)
42            + self.data_bytes.load(Ordering::Relaxed)
43    }
44
45    pub fn total_arrays(&self) -> u64 {
46        self.coord_arrays.load(Ordering::Relaxed) + self.data_arrays.load(Ordering::Relaxed)
47    }
48
49    pub fn metadata_time(&self) -> Duration {
50        Duration::from_nanos(self.metadata_nanos.load(Ordering::Relaxed))
51    }
52
53    pub fn coord_time(&self) -> Duration {
54        Duration::from_nanos(self.coord_nanos.load(Ordering::Relaxed))
55    }
56
57    pub fn data_time(&self) -> Duration {
58        Duration::from_nanos(self.data_nanos.load(Ordering::Relaxed))
59    }
60
61    /// Record metadata read stats
62    pub fn record_metadata(&self, bytes: u64, duration: Duration) {
63        self.metadata_bytes.fetch_add(bytes, Ordering::Relaxed);
64        self.metadata_nanos
65            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
66    }
67
68    /// Record coordinate array read stats
69    pub fn record_coord(&self, bytes: u64, duration: Duration) {
70        self.coord_bytes.fetch_add(bytes, Ordering::Relaxed);
71        self.coord_nanos
72            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
73        self.coord_arrays.fetch_add(1, Ordering::Relaxed);
74    }
75
76    /// Record data variable read stats
77    pub fn record_data(&self, bytes: u64, duration: Duration) {
78        self.data_bytes.fetch_add(bytes, Ordering::Relaxed);
79        self.data_nanos
80            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
81        self.data_arrays.fetch_add(1, Ordering::Relaxed);
82    }
83
84    /// Record disk bytes read (actual I/O)
85    pub fn record_disk_read(&self, bytes: u64) {
86        self.disk_bytes.fetch_add(bytes, Ordering::Relaxed);
87    }
88
89    /// Get total disk bytes read
90    pub fn total_disk_bytes(&self) -> u64 {
91        self.disk_bytes.load(Ordering::Relaxed)
92    }
93}
94
95/// Thread-safe handle for sharing stats across async boundaries
96pub type SharedIoStats = Arc<ZarrIoStats>;
97
98/// Format bytes in human-readable form (KB, MB, GB)
99pub fn format_bytes(bytes: u64) -> String {
100    if bytes >= 1_000_000_000 {
101        format!("{:.2} GB", bytes as f64 / 1_000_000_000.0)
102    } else if bytes >= 1_000_000 {
103        format!("{:.2} MB", bytes as f64 / 1_000_000.0)
104    } else if bytes >= 1_000 {
105        format!("{:.2} KB", bytes as f64 / 1_000.0)
106    } else {
107        format!("{} B", bytes)
108    }
109}