Skip to main content

array_format/
stats.rs

1//! Per-array statistics stored in an optional `.stats` sidecar.
2//!
3//! A [`StatsFile`] holds [`ArrayStats`] (e.g. min/max as [`StatValue`]) for the
4//! arrays in a file. It is serialized to its own sidecar with an `ARST` trailer,
5//! mirroring the footer format, so stats can be read without touching the data.
6
7use rkyv::{Archive, Deserialize, Serialize};
8
9use crate::dtype::DType;
10use crate::error::{Error, Result};
11use crate::layout::FillValue;
12use crate::storage::Storage;
13
14const MAGIC: [u8; 4] = *b"ARST";
15const TRAILER_SIZE: usize = 12;
16
17/// A typed min or max value.
18#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
19pub enum StatValue {
20    /// Signed integer value (for the signed integer dtypes).
21    Int(i64),
22    /// Unsigned integer value (for the unsigned integer dtypes).
23    UInt(u64),
24    /// Floating-point value (for `Float32`/`Float64`).
25    Float(f64),
26    /// String or binary: raw bytes in lexicographic order.
27    Bytes(Vec<u8>),
28    /// Nanoseconds since the Unix epoch — matches [`DType::TimestampNs`].
29    TimestampNs(i64),
30}
31
32/// Aggregate statistics for a single array covering all its chunks.
33#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
34pub struct ArrayStats {
35    /// Name of the array these statistics describe.
36    pub name: String,
37    /// Global min across all chunks; `None` for unsupported dtypes.
38    pub min: Option<StatValue>,
39    /// Global max across all chunks; `None` for unsupported dtypes.
40    pub max: Option<StatValue>,
41    /// Count of elements equal to the array's fill value; 0 if none set.
42    pub null_count: u64,
43    /// Total element count across all chunks.
44    pub row_count: u64,
45}
46
47impl ArrayStats {
48    pub(crate) fn new(name: String) -> Self {
49        Self {
50            name,
51            min: None,
52            max: None,
53            null_count: 0,
54            row_count: 0,
55        }
56    }
57}
58
59/// The stats file: one [`ArrayStats`] per array.
60///
61/// Stored in `{stem}.stats` alongside the `.af` file using the same
62/// rkyv + trailer format as the footer:
63/// `[rkyv_bytes][size: u64 LE][MAGIC: b"ARST"]`
64#[derive(Debug, Clone, PartialEq, Default, Archive, Serialize, Deserialize)]
65pub struct StatsFile {
66    /// Per-array statistics, one entry per array in the file.
67    pub arrays: Vec<ArrayStats>,
68}
69
70impl StatsFile {
71    pub(crate) fn upsert(&mut self, new_stats: ArrayStats) {
72        if let Some(existing) = self.arrays.iter_mut().find(|a| a.name == new_stats.name) {
73            *existing = new_stats;
74        } else {
75            self.arrays.push(new_stats);
76        }
77    }
78
79    /// Returns the statistics for the array named `name`, if present.
80    pub fn get_array(&self, name: &str) -> Option<&ArrayStats> {
81        self.arrays.iter().find(|a| a.name == name)
82    }
83
84    pub(crate) fn serialize(&self) -> Result<Vec<u8>> {
85        let rkyv_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(self)
86            .map_err(|e| Error::Serialization(e.to_string()))?;
87        let size = rkyv_bytes.len() as u64;
88        let mut out = Vec::with_capacity(rkyv_bytes.len() + TRAILER_SIZE);
89        out.extend_from_slice(&rkyv_bytes);
90        out.extend_from_slice(&size.to_le_bytes());
91        out.extend_from_slice(&MAGIC);
92        Ok(out)
93    }
94
95    fn deserialize(data: &[u8]) -> Result<Self> {
96        if data.len() < TRAILER_SIZE {
97            return Err(Error::InvalidFooter("stats data too short".into()));
98        }
99        let magic_start = data.len() - 4;
100        if data[magic_start..] != MAGIC {
101            return Err(Error::InvalidFooter("invalid stats magic".into()));
102        }
103        let size_start = magic_start - 8;
104        let size = u64::from_le_bytes(data[size_start..magic_start].try_into().unwrap()) as usize;
105        if size > size_start {
106            return Err(Error::InvalidFooter("stats size exceeds data".into()));
107        }
108        let rkyv_start = size_start - size;
109        let mut aligned: rkyv::util::AlignedVec = rkyv::util::AlignedVec::new();
110        aligned.extend_from_slice(&data[rkyv_start..size_start]);
111        rkyv::from_bytes::<Self, rkyv::rancor::Error>(&aligned)
112            .map_err(|e| Error::Serialization(e.to_string()))
113    }
114}
115
116/// Reads and deserializes a stats file from `storage`.
117pub(crate) async fn read_stats_file(storage: &(dyn Storage + Sync)) -> Result<StatsFile> {
118    let file_size = storage.size().await?;
119    if (file_size as usize) < TRAILER_SIZE {
120        return Err(Error::InvalidFooter("stats file too short".into()));
121    }
122    let trailer = storage
123        .read_range(file_size - TRAILER_SIZE as u64..file_size)
124        .await?;
125    if trailer[8..] != MAGIC {
126        return Err(Error::InvalidFooter("invalid stats magic".into()));
127    }
128    let size = u64::from_le_bytes(trailer[..8].try_into().unwrap()) as usize;
129    let total = size + TRAILER_SIZE;
130    let start = file_size - total as u64;
131    let data = storage.read_range(start..file_size).await?;
132    StatsFile::deserialize(&data)
133}
134
135// ── Macros (must appear before use) ──────────────────────────────────────────
136
137macro_rules! int_partial {
138    ($bytes:expr, $fill:expr, $ty:ty) => {{
139        let size = std::mem::size_of::<$ty>();
140        let n = $bytes.len() / size;
141        let fill_val: Option<$ty> = match $fill {
142            Some(FillValue::Int(v)) => Some(*v as $ty),
143            Some(FillValue::UInt(v)) => Some(*v as $ty),
144            _ => None,
145        };
146        let mut min: Option<$ty> = None;
147        let mut max: Option<$ty> = None;
148        let mut null_count = 0u64;
149        for i in 0..n {
150            let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
151            if fill_val.map_or(false, |f| e == f) {
152                null_count += 1;
153            } else {
154                min = Some(min.map_or(e, |m| m.min(e)));
155                max = Some(max.map_or(e, |m| m.max(e)));
156            }
157        }
158        (
159            min.map(|v| StatValue::Int(v as i64)),
160            max.map(|v| StatValue::Int(v as i64)),
161            null_count,
162            n as u64,
163        )
164    }};
165}
166
167macro_rules! uint_partial {
168    ($bytes:expr, $fill:expr, $ty:ty) => {{
169        let size = std::mem::size_of::<$ty>();
170        let n = $bytes.len() / size;
171        let fill_val: Option<$ty> = match $fill {
172            Some(FillValue::UInt(v)) => Some(*v as $ty),
173            Some(FillValue::Int(v)) => Some(*v as $ty),
174            _ => None,
175        };
176        let mut min: Option<$ty> = None;
177        let mut max: Option<$ty> = None;
178        let mut null_count = 0u64;
179        for i in 0..n {
180            let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
181            if fill_val.map_or(false, |f| e == f) {
182                null_count += 1;
183            } else {
184                min = Some(min.map_or(e, |m| m.min(e)));
185                max = Some(max.map_or(e, |m| m.max(e)));
186            }
187        }
188        (
189            min.map(|v| StatValue::UInt(v as u64)),
190            max.map(|v| StatValue::UInt(v as u64)),
191            null_count,
192            n as u64,
193        )
194    }};
195}
196
197macro_rules! float_partial {
198    ($bytes:expr, $fill:expr, $ty:ty) => {{
199        let size = std::mem::size_of::<$ty>();
200        let n = $bytes.len() / size;
201        let fill_val: Option<$ty> = match $fill {
202            Some(FillValue::Float(v)) => Some(*v as $ty),
203            _ => None,
204        };
205        let mut min: Option<$ty> = None;
206        let mut max: Option<$ty> = None;
207        let mut null_count = 0u64;
208        for i in 0..n {
209            let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
210            let is_fill =
211                fill_val.map_or(false, |f: $ty| if f.is_nan() { e.is_nan() } else { e == f });
212            if is_fill {
213                null_count += 1;
214            } else {
215                min = Some(match min {
216                    None => e,
217                    Some(m) => {
218                        if e.total_cmp(&m).is_lt() {
219                            e
220                        } else {
221                            m
222                        }
223                    }
224                });
225                max = Some(match max {
226                    None => e,
227                    Some(m) => {
228                        if e.total_cmp(&m).is_gt() {
229                            e
230                        } else {
231                            m
232                        }
233                    }
234                });
235            }
236        }
237        (
238            min.map(|v| StatValue::Float(v as f64)),
239            max.map(|v| StatValue::Float(v as f64)),
240            null_count,
241            n as u64,
242        )
243    }};
244}
245
246// ── Computation ──────────────────────────────────────────────────────────────
247
248/// Computes `(min, max, null_count, row_count)` for a single chunk's raw bytes.
249///
250/// `null_count` counts elements equal to `fill_value`; 0 when none is set.
251/// Floats use `total_cmp` so NaN sorts last (max). `FixedSizeList` / `List`
252/// return `(None, None, 0, 0)`.
253pub(crate) fn compute_chunk_partial(
254    bytes: &[u8],
255    dtype: &DType,
256    fill_value: Option<&FillValue>,
257) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
258    if bytes.is_empty() {
259        return (None, None, 0, 0);
260    }
261    match dtype {
262        DType::Int8 => int_partial!(bytes, fill_value, i8),
263        DType::Int16 => int_partial!(bytes, fill_value, i16),
264        DType::Int32 => int_partial!(bytes, fill_value, i32),
265        DType::Int64 => int_partial!(bytes, fill_value, i64),
266        DType::UInt8 => uint_partial!(bytes, fill_value, u8),
267        DType::UInt16 => uint_partial!(bytes, fill_value, u16),
268        DType::UInt32 => uint_partial!(bytes, fill_value, u32),
269        DType::UInt64 => uint_partial!(bytes, fill_value, u64),
270        DType::Bool => bool_partial(bytes, fill_value),
271        DType::Float32 => float_partial!(bytes, fill_value, f32),
272        DType::Float64 => float_partial!(bytes, fill_value, f64),
273        DType::String | DType::Binary => vlen_partial(bytes, fill_value),
274        DType::TimestampNs => timestamp_partial(bytes, fill_value),
275        DType::FixedSizeList { .. } | DType::List { .. } => (None, None, 0, 0),
276    }
277}
278
279/// Merges one chunk's partial results into an aggregate [`ArrayStats`].
280pub(crate) fn merge_partial(
281    stats: &mut ArrayStats,
282    min: Option<StatValue>,
283    max: Option<StatValue>,
284    null_count: u64,
285    row_count: u64,
286) {
287    stats.null_count += null_count;
288    stats.row_count += row_count;
289    stats.min = stat_min(stats.min.take(), min);
290    stats.max = stat_max(stats.max.take(), max);
291}
292
293// ── Internal helpers ─────────────────────────────────────────────────────────
294
295fn stat_min(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
296    match (a, b) {
297        (None, x) | (x, None) => x,
298        (Some(a), Some(b)) => Some(if stat_le(&a, &b) { a } else { b }),
299    }
300}
301
302fn stat_max(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
303    match (a, b) {
304        (None, x) | (x, None) => x,
305        (Some(a), Some(b)) => Some(if stat_le(&a, &b) { b } else { a }),
306    }
307}
308
309fn stat_le(a: &StatValue, b: &StatValue) -> bool {
310    match (a, b) {
311        (StatValue::Int(a), StatValue::Int(b)) => a <= b,
312        (StatValue::UInt(a), StatValue::UInt(b)) => a <= b,
313        (StatValue::Float(a), StatValue::Float(b)) => a.total_cmp(b).is_le(),
314        (StatValue::Bytes(a), StatValue::Bytes(b)) => a <= b,
315        (StatValue::TimestampNs(a), StatValue::TimestampNs(b)) => a <= b,
316        _ => false,
317    }
318}
319
320fn bool_partial(
321    bytes: &[u8],
322    fill: Option<&FillValue>,
323) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
324    let fill_val: Option<u8> = match fill {
325        Some(FillValue::Bool(b)) => Some(u8::from(*b)),
326        _ => None,
327    };
328    let mut min: Option<u8> = None;
329    let mut max: Option<u8> = None;
330    let mut null_count = 0u64;
331    for &e in bytes {
332        if fill_val == Some(e) {
333            null_count += 1;
334        } else {
335            min = Some(min.map_or(e, |m| m.min(e)));
336            max = Some(max.map_or(e, |m| m.max(e)));
337        }
338    }
339    (
340        min.map(|v| StatValue::UInt(v as u64)),
341        max.map(|v| StatValue::UInt(v as u64)),
342        null_count,
343        bytes.len() as u64,
344    )
345}
346
347fn vlen_partial(
348    bytes: &[u8],
349    fill: Option<&FillValue>,
350) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
351    let n = find_vlen_count(bytes);
352    if n == 0 {
353        return (None, None, 0, 0);
354    }
355    let values_base = (n + 1) * 4;
356    let fill_bytes: Option<&[u8]> = match fill {
357        Some(FillValue::String(s)) => Some(s.as_bytes()),
358        _ => None,
359    };
360    let mut min: Option<Vec<u8>> = None;
361    let mut max: Option<Vec<u8>> = None;
362    let mut null_count = 0u64;
363    for i in 0..n {
364        let start = u32::from_le_bytes(bytes[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
365        let end =
366            u32::from_le_bytes(bytes[(i + 1) * 4..(i + 1) * 4 + 4].try_into().unwrap()) as usize;
367        let val = &bytes[values_base + start..values_base + end];
368        if fill_bytes == Some(val) {
369            null_count += 1;
370        } else {
371            match &mut min {
372                slot @ None => *slot = Some(val.to_vec()),
373                Some(m) if val < m.as_slice() => *m = val.to_vec(),
374                _ => {}
375            }
376            match &mut max {
377                slot @ None => *slot = Some(val.to_vec()),
378                Some(m) if val > m.as_slice() => *m = val.to_vec(),
379                _ => {}
380            }
381        }
382    }
383    (
384        min.map(StatValue::Bytes),
385        max.map(StatValue::Bytes),
386        null_count,
387        n as u64,
388    )
389}
390
391fn timestamp_partial(
392    bytes: &[u8],
393    fill: Option<&FillValue>,
394) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
395    let n = bytes.len() / 8;
396    let fill_val: Option<i64> = match fill {
397        Some(FillValue::TimestampNs(v)) => Some(*v),
398        Some(FillValue::Int(v)) => Some(*v),
399        _ => None,
400    };
401    let mut min: Option<i64> = None;
402    let mut max: Option<i64> = None;
403    let mut null_count = 0u64;
404    for i in 0..n {
405        let e = i64::from_le_bytes(bytes[i * 8..(i + 1) * 8].try_into().unwrap());
406        if fill_val == Some(e) {
407            null_count += 1;
408        } else {
409            min = Some(min.map_or(e, |m| m.min(e)));
410            max = Some(max.map_or(e, |m| m.max(e)));
411        }
412    }
413    (
414        min.map(StatValue::TimestampNs),
415        max.map(StatValue::TimestampNs),
416        null_count,
417        n as u64,
418    )
419}
420
421/// Determines the number of elements from an offset-buffer encoded chunk.
422/// Uses the same algorithm as `decode_offsets` in `array.rs`.
423fn find_vlen_count(bytes: &[u8]) -> usize {
424    let mut n = 0usize;
425    loop {
426        let pos = (n + 1) * 4;
427        if pos + 4 > bytes.len() {
428            break;
429        }
430        let off = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
431        if pos + 4 + off == bytes.len() {
432            n += 1;
433            break;
434        }
435        n += 1;
436    }
437    n
438}
439
440// ── Tests ────────────────────────────────────────────────────────────────────
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445
446    fn i32_bytes(values: &[i32]) -> Vec<u8> {
447        values.iter().flat_map(|v| v.to_le_bytes()).collect()
448    }
449
450    fn i64_bytes(values: &[i64]) -> Vec<u8> {
451        values.iter().flat_map(|v| v.to_le_bytes()).collect()
452    }
453
454    fn f64_bytes(values: &[f64]) -> Vec<u8> {
455        values.iter().flat_map(|v| v.to_le_bytes()).collect()
456    }
457
458    fn string_bytes(values: &[&str]) -> Vec<u8> {
459        let mut offsets: Vec<u32> = vec![0];
460        let mut data: Vec<u8> = Vec::new();
461        for s in values {
462            data.extend_from_slice(s.as_bytes());
463            offsets.push(data.len() as u32);
464        }
465        let mut out: Vec<u8> = Vec::new();
466        for o in &offsets {
467            out.extend_from_slice(&o.to_le_bytes());
468        }
469        out.extend_from_slice(&data);
470        out
471    }
472
473    #[test]
474    fn i32_min_max_null_row_count() {
475        // values: [3, 1, 4, 1, 5, 9], fill=1 — the two 1s count as nulls and are
476        // excluded from min/max, so the range covers only non-fill elements.
477        let bytes = i32_bytes(&[3, 1, 4, 1, 5, 9]);
478        let (min, max, null_count, row_count) =
479            compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(1)));
480        assert_eq!(min, Some(StatValue::Int(3)));
481        assert_eq!(max, Some(StatValue::Int(9)));
482        assert_eq!(null_count, 2); // two 1s
483        assert_eq!(row_count, 6);
484    }
485
486    #[test]
487    fn i32_no_fill_value() {
488        let bytes = i32_bytes(&[10, 20, 30]);
489        let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::Int32, None);
490        assert_eq!(min, Some(StatValue::Int(10)));
491        assert_eq!(max, Some(StatValue::Int(30)));
492        assert_eq!(null_count, 0);
493        assert_eq!(row_count, 3);
494    }
495
496    #[test]
497    fn all_elements_equal_fill_value() {
498        let bytes = i32_bytes(&[7, 7, 7]);
499        let (_, _, null_count, row_count) =
500            compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(7)));
501        assert_eq!(null_count, row_count);
502        assert_eq!(row_count, 3);
503    }
504
505    #[test]
506    fn f64_min_max() {
507        let bytes = f64_bytes(&[3.0, 1.0, 4.0, 1.5]);
508        let (min, max, null_count, row_count) =
509            compute_chunk_partial(&bytes, &DType::Float64, None);
510        assert_eq!(min, Some(StatValue::Float(1.0)));
511        assert_eq!(max, Some(StatValue::Float(4.0)));
512        assert_eq!(null_count, 0);
513        assert_eq!(row_count, 4);
514    }
515
516    #[test]
517    fn string_lexicographic_min_max() {
518        let bytes = string_bytes(&["banana", "apple", "cherry"]);
519        let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::String, None);
520        assert_eq!(min, Some(StatValue::Bytes(b"apple".to_vec())));
521        assert_eq!(max, Some(StatValue::Bytes(b"cherry".to_vec())));
522        assert_eq!(null_count, 0);
523        assert_eq!(row_count, 3);
524    }
525
526    #[test]
527    fn string_fill_value_null_count() {
528        let bytes = string_bytes(&["a", "", "b", ""]);
529        let fill = FillValue::String(String::new());
530        let (_, _, null_count, row_count) =
531            compute_chunk_partial(&bytes, &DType::String, Some(&fill));
532        assert_eq!(null_count, 2);
533        assert_eq!(row_count, 4);
534    }
535
536    #[test]
537    fn merge_partial_aggregates_correctly() {
538        let mut stats = ArrayStats::new("x".into());
539        merge_partial(
540            &mut stats,
541            Some(StatValue::Int(5)),
542            Some(StatValue::Int(10)),
543            1,
544            3,
545        );
546        merge_partial(
547            &mut stats,
548            Some(StatValue::Int(2)),
549            Some(StatValue::Int(8)),
550            0,
551            2,
552        );
553        assert_eq!(stats.min, Some(StatValue::Int(2)));
554        assert_eq!(stats.max, Some(StatValue::Int(10)));
555        assert_eq!(stats.null_count, 1);
556        assert_eq!(stats.row_count, 5);
557    }
558
559    #[test]
560    fn statsfile_serialize_deserialize_roundtrip() {
561        let sf = StatsFile {
562            arrays: vec![ArrayStats {
563                name: "arr".into(),
564                min: Some(StatValue::Int(-1)),
565                max: Some(StatValue::Int(99)),
566                null_count: 3,
567                row_count: 100,
568            }],
569        };
570        let bytes = sf.serialize().unwrap();
571        let restored = StatsFile::deserialize(&bytes).unwrap();
572        assert_eq!(sf, restored);
573    }
574
575    #[test]
576    fn statsfile_empty_roundtrip() {
577        let sf = StatsFile::default();
578        let bytes = sf.serialize().unwrap();
579        let restored = StatsFile::deserialize(&bytes).unwrap();
580        assert_eq!(sf, restored);
581    }
582
583    #[test]
584    fn timestamp_min_max() {
585        // values: [10, 20, -5, 7, 20] with fill=20 — two 20s are nulls, excluded
586        // from min/max; min comes from -5, max from 10.
587        let bytes = i64_bytes(&[10, 20, -5, 7, 20]);
588        let (min, max, null_count, row_count) = compute_chunk_partial(
589            &bytes,
590            &DType::TimestampNs,
591            Some(&FillValue::TimestampNs(20)),
592        );
593        assert_eq!(min, Some(StatValue::TimestampNs(-5)));
594        assert_eq!(max, Some(StatValue::TimestampNs(10)));
595        assert_eq!(null_count, 2);
596        assert_eq!(row_count, 5);
597    }
598
599    #[test]
600    fn timestamp_fill_value_int_fallback() {
601        // FillValue::Int is accepted as a fallback for the TimestampNs path,
602        // so a value matching it still counts as a null.
603        let bytes = i64_bytes(&[1, 2, 3]);
604        let (_min, _max, null_count, row_count) =
605            compute_chunk_partial(&bytes, &DType::TimestampNs, Some(&FillValue::Int(2)));
606        assert_eq!(null_count, 1);
607        assert_eq!(row_count, 3);
608    }
609}