Skip to main content

array_format/
stats.rs

1use rkyv::{Archive, Deserialize, Serialize};
2
3use crate::dtype::DType;
4use crate::error::{Error, Result};
5use crate::layout::FillValue;
6use crate::storage::Storage;
7
8const MAGIC: [u8; 4] = *b"ARST";
9const TRAILER_SIZE: usize = 12;
10
11/// A typed min or max value.
12#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
13pub enum StatValue {
14    Int(i64),
15    UInt(u64),
16    Float(f64),
17    /// String or binary: raw bytes in lexicographic order.
18    Bytes(Vec<u8>),
19    /// Nanoseconds since the Unix epoch — matches [`DType::TimestampNs`].
20    TimestampNs(i64),
21}
22
23/// Aggregate statistics for a single array covering all its chunks.
24#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
25pub struct ArrayStats {
26    pub name: String,
27    /// Global min across all chunks; `None` for unsupported dtypes.
28    pub min: Option<StatValue>,
29    /// Global max across all chunks; `None` for unsupported dtypes.
30    pub max: Option<StatValue>,
31    /// Count of elements equal to the array's fill value; 0 if none set.
32    pub null_count: u64,
33    /// Total element count across all chunks.
34    pub row_count: u64,
35}
36
37impl ArrayStats {
38    pub(crate) fn new(name: String) -> Self {
39        Self {
40            name,
41            min: None,
42            max: None,
43            null_count: 0,
44            row_count: 0,
45        }
46    }
47}
48
49/// The stats file: one [`ArrayStats`] per array.
50///
51/// Stored in `{stem}.stats` alongside the `.af` file using the same
52/// rkyv + trailer format as the footer:
53/// `[rkyv_bytes][size: u64 LE][MAGIC: b"ARST"]`
54#[derive(Debug, Clone, PartialEq, Default, Archive, Serialize, Deserialize)]
55pub struct StatsFile {
56    pub arrays: Vec<ArrayStats>,
57}
58
59impl StatsFile {
60    pub(crate) fn upsert(&mut self, new_stats: ArrayStats) {
61        if let Some(existing) = self.arrays.iter_mut().find(|a| a.name == new_stats.name) {
62            *existing = new_stats;
63        } else {
64            self.arrays.push(new_stats);
65        }
66    }
67
68    pub fn get_array(&self, name: &str) -> Option<&ArrayStats> {
69        self.arrays.iter().find(|a| a.name == name)
70    }
71
72    pub(crate) fn serialize(&self) -> Result<Vec<u8>> {
73        let rkyv_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(self)
74            .map_err(|e| Error::Serialization(e.to_string()))?;
75        let size = rkyv_bytes.len() as u64;
76        let mut out = Vec::with_capacity(rkyv_bytes.len() + TRAILER_SIZE);
77        out.extend_from_slice(&rkyv_bytes);
78        out.extend_from_slice(&size.to_le_bytes());
79        out.extend_from_slice(&MAGIC);
80        Ok(out)
81    }
82
83    fn deserialize(data: &[u8]) -> Result<Self> {
84        if data.len() < TRAILER_SIZE {
85            return Err(Error::InvalidFooter("stats data too short".into()));
86        }
87        let magic_start = data.len() - 4;
88        if data[magic_start..] != MAGIC {
89            return Err(Error::InvalidFooter("invalid stats magic".into()));
90        }
91        let size_start = magic_start - 8;
92        let size = u64::from_le_bytes(data[size_start..magic_start].try_into().unwrap()) as usize;
93        if size > size_start {
94            return Err(Error::InvalidFooter("stats size exceeds data".into()));
95        }
96        let rkyv_start = size_start - size;
97        let mut aligned: rkyv::util::AlignedVec = rkyv::util::AlignedVec::new();
98        aligned.extend_from_slice(&data[rkyv_start..size_start]);
99        rkyv::from_bytes::<Self, rkyv::rancor::Error>(&aligned)
100            .map_err(|e| Error::Serialization(e.to_string()))
101    }
102}
103
104/// Reads and deserializes a stats file from `storage`.
105pub(crate) async fn read_stats_file(storage: &(dyn Storage + Sync)) -> Result<StatsFile> {
106    let file_size = storage.size().await?;
107    if (file_size as usize) < TRAILER_SIZE {
108        return Err(Error::InvalidFooter("stats file too short".into()));
109    }
110    let trailer = storage
111        .read_range(file_size - TRAILER_SIZE as u64..file_size)
112        .await?;
113    if trailer[8..] != MAGIC {
114        return Err(Error::InvalidFooter("invalid stats magic".into()));
115    }
116    let size = u64::from_le_bytes(trailer[..8].try_into().unwrap()) as usize;
117    let total = size + TRAILER_SIZE;
118    let start = file_size - total as u64;
119    let data = storage.read_range(start..file_size).await?;
120    StatsFile::deserialize(&data)
121}
122
123// ── Macros (must appear before use) ──────────────────────────────────────────
124
125macro_rules! int_partial {
126    ($bytes:expr, $fill:expr, $ty:ty) => {{
127        let size = std::mem::size_of::<$ty>();
128        let n = $bytes.len() / size;
129        let fill_val: Option<$ty> = match $fill {
130            Some(FillValue::Int(v)) => Some(*v as $ty),
131            Some(FillValue::UInt(v)) => Some(*v as $ty),
132            _ => None,
133        };
134        let mut min: Option<$ty> = None;
135        let mut max: Option<$ty> = None;
136        let mut null_count = 0u64;
137        for i in 0..n {
138            let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
139            if fill_val.map_or(false, |f| e == f) {
140                null_count += 1;
141            } else {
142                min = Some(min.map_or(e, |m| m.min(e)));
143                max = Some(max.map_or(e, |m| m.max(e)));
144            }
145        }
146        (
147            min.map(|v| StatValue::Int(v as i64)),
148            max.map(|v| StatValue::Int(v as i64)),
149            null_count,
150            n as u64,
151        )
152    }};
153}
154
155macro_rules! uint_partial {
156    ($bytes:expr, $fill:expr, $ty:ty) => {{
157        let size = std::mem::size_of::<$ty>();
158        let n = $bytes.len() / size;
159        let fill_val: Option<$ty> = match $fill {
160            Some(FillValue::UInt(v)) => Some(*v as $ty),
161            Some(FillValue::Int(v)) => Some(*v as $ty),
162            _ => None,
163        };
164        let mut min: Option<$ty> = None;
165        let mut max: Option<$ty> = None;
166        let mut null_count = 0u64;
167        for i in 0..n {
168            let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
169            if fill_val.map_or(false, |f| e == f) {
170                null_count += 1;
171            } else {
172                min = Some(min.map_or(e, |m| m.min(e)));
173                max = Some(max.map_or(e, |m| m.max(e)));
174            }
175        }
176        (
177            min.map(|v| StatValue::UInt(v as u64)),
178            max.map(|v| StatValue::UInt(v as u64)),
179            null_count,
180            n as u64,
181        )
182    }};
183}
184
185macro_rules! float_partial {
186    ($bytes:expr, $fill:expr, $ty:ty) => {{
187        let size = std::mem::size_of::<$ty>();
188        let n = $bytes.len() / size;
189        let fill_val: Option<$ty> = match $fill {
190            Some(FillValue::Float(v)) => Some(*v as $ty),
191            _ => None,
192        };
193        let mut min: Option<$ty> = None;
194        let mut max: Option<$ty> = None;
195        let mut null_count = 0u64;
196        for i in 0..n {
197            let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
198            let is_fill =
199                fill_val.map_or(false, |f: $ty| if f.is_nan() { e.is_nan() } else { e == f });
200            if is_fill {
201                null_count += 1;
202            } else {
203                min = Some(match min {
204                    None => e,
205                    Some(m) => {
206                        if e.total_cmp(&m).is_lt() {
207                            e
208                        } else {
209                            m
210                        }
211                    }
212                });
213                max = Some(match max {
214                    None => e,
215                    Some(m) => {
216                        if e.total_cmp(&m).is_gt() {
217                            e
218                        } else {
219                            m
220                        }
221                    }
222                });
223            }
224        }
225        (
226            min.map(|v| StatValue::Float(v as f64)),
227            max.map(|v| StatValue::Float(v as f64)),
228            null_count,
229            n as u64,
230        )
231    }};
232}
233
234// ── Computation ──────────────────────────────────────────────────────────────
235
236/// Computes `(min, max, null_count, row_count)` for a single chunk's raw bytes.
237///
238/// `null_count` counts elements equal to `fill_value`; 0 when none is set.
239/// Floats use `total_cmp` so NaN sorts last (max). `FixedSizeList` / `List`
240/// return `(None, None, 0, 0)`.
241pub fn compute_chunk_partial(
242    bytes: &[u8],
243    dtype: &DType,
244    fill_value: Option<&FillValue>,
245) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
246    if bytes.is_empty() {
247        return (None, None, 0, 0);
248    }
249    match dtype {
250        DType::Int8 => int_partial!(bytes, fill_value, i8),
251        DType::Int16 => int_partial!(bytes, fill_value, i16),
252        DType::Int32 => int_partial!(bytes, fill_value, i32),
253        DType::Int64 => int_partial!(bytes, fill_value, i64),
254        DType::UInt8 => uint_partial!(bytes, fill_value, u8),
255        DType::UInt16 => uint_partial!(bytes, fill_value, u16),
256        DType::UInt32 => uint_partial!(bytes, fill_value, u32),
257        DType::UInt64 => uint_partial!(bytes, fill_value, u64),
258        DType::Bool => bool_partial(bytes, fill_value),
259        DType::Float32 => float_partial!(bytes, fill_value, f32),
260        DType::Float64 => float_partial!(bytes, fill_value, f64),
261        DType::String | DType::Binary => vlen_partial(bytes, fill_value),
262        DType::TimestampNs => timestamp_partial(bytes, fill_value),
263        DType::FixedSizeList { .. } | DType::List { .. } => (None, None, 0, 0),
264    }
265}
266
267/// Merges one chunk's partial results into an aggregate [`ArrayStats`].
268pub fn merge_partial(
269    stats: &mut ArrayStats,
270    min: Option<StatValue>,
271    max: Option<StatValue>,
272    null_count: u64,
273    row_count: u64,
274) {
275    stats.null_count += null_count;
276    stats.row_count += row_count;
277    stats.min = stat_min(stats.min.take(), min);
278    stats.max = stat_max(stats.max.take(), max);
279}
280
281// ── Internal helpers ─────────────────────────────────────────────────────────
282
283fn stat_min(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
284    match (a, b) {
285        (None, x) | (x, None) => x,
286        (Some(a), Some(b)) => Some(if stat_le(&a, &b) { a } else { b }),
287    }
288}
289
290fn stat_max(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
291    match (a, b) {
292        (None, x) | (x, None) => x,
293        (Some(a), Some(b)) => Some(if stat_le(&a, &b) { b } else { a }),
294    }
295}
296
297fn stat_le(a: &StatValue, b: &StatValue) -> bool {
298    match (a, b) {
299        (StatValue::Int(a), StatValue::Int(b)) => a <= b,
300        (StatValue::UInt(a), StatValue::UInt(b)) => a <= b,
301        (StatValue::Float(a), StatValue::Float(b)) => a.total_cmp(b).is_le(),
302        (StatValue::Bytes(a), StatValue::Bytes(b)) => a <= b,
303        (StatValue::TimestampNs(a), StatValue::TimestampNs(b)) => a <= b,
304        _ => false,
305    }
306}
307
308fn bool_partial(
309    bytes: &[u8],
310    fill: Option<&FillValue>,
311) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
312    let fill_val: Option<u8> = match fill {
313        Some(FillValue::Bool(b)) => Some(u8::from(*b)),
314        _ => None,
315    };
316    let mut min: Option<u8> = None;
317    let mut max: Option<u8> = None;
318    let mut null_count = 0u64;
319    for &e in bytes {
320        if fill_val == Some(e) {
321            null_count += 1;
322        } else {
323            min = Some(min.map_or(e, |m| m.min(e)));
324            max = Some(max.map_or(e, |m| m.max(e)));
325        }
326    }
327    (
328        min.map(|v| StatValue::UInt(v as u64)),
329        max.map(|v| StatValue::UInt(v as u64)),
330        null_count,
331        bytes.len() as u64,
332    )
333}
334
335fn vlen_partial(
336    bytes: &[u8],
337    fill: Option<&FillValue>,
338) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
339    let n = find_vlen_count(bytes);
340    if n == 0 {
341        return (None, None, 0, 0);
342    }
343    let values_base = (n + 1) * 4;
344    let fill_bytes: Option<&[u8]> = match fill {
345        Some(FillValue::String(s)) => Some(s.as_bytes()),
346        _ => None,
347    };
348    let mut min: Option<Vec<u8>> = None;
349    let mut max: Option<Vec<u8>> = None;
350    let mut null_count = 0u64;
351    for i in 0..n {
352        let start = u32::from_le_bytes(bytes[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
353        let end =
354            u32::from_le_bytes(bytes[(i + 1) * 4..(i + 1) * 4 + 4].try_into().unwrap()) as usize;
355        let val = &bytes[values_base + start..values_base + end];
356        if fill_bytes == Some(val) {
357            null_count += 1;
358        } else {
359            match &mut min {
360                slot @ None => *slot = Some(val.to_vec()),
361                Some(m) if val < m.as_slice() => *m = val.to_vec(),
362                _ => {}
363            }
364            match &mut max {
365                slot @ None => *slot = Some(val.to_vec()),
366                Some(m) if val > m.as_slice() => *m = val.to_vec(),
367                _ => {}
368            }
369        }
370    }
371    (
372        min.map(StatValue::Bytes),
373        max.map(StatValue::Bytes),
374        null_count,
375        n as u64,
376    )
377}
378
379fn timestamp_partial(
380    bytes: &[u8],
381    fill: Option<&FillValue>,
382) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
383    let n = bytes.len() / 8;
384    let fill_val: Option<i64> = match fill {
385        Some(FillValue::TimestampNs(v)) => Some(*v),
386        Some(FillValue::Int(v)) => Some(*v),
387        _ => None,
388    };
389    let mut min: Option<i64> = None;
390    let mut max: Option<i64> = None;
391    let mut null_count = 0u64;
392    for i in 0..n {
393        let e = i64::from_le_bytes(bytes[i * 8..(i + 1) * 8].try_into().unwrap());
394        if fill_val == Some(e) {
395            null_count += 1;
396        } else {
397            min = Some(min.map_or(e, |m| m.min(e)));
398            max = Some(max.map_or(e, |m| m.max(e)));
399        }
400    }
401    (
402        min.map(StatValue::TimestampNs),
403        max.map(StatValue::TimestampNs),
404        null_count,
405        n as u64,
406    )
407}
408
409/// Determines the number of elements from an offset-buffer encoded chunk.
410/// Uses the same algorithm as `decode_offsets` in `array.rs`.
411fn find_vlen_count(bytes: &[u8]) -> usize {
412    let mut n = 0usize;
413    loop {
414        let pos = (n + 1) * 4;
415        if pos + 4 > bytes.len() {
416            break;
417        }
418        let off = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
419        if pos + 4 + off == bytes.len() {
420            n += 1;
421            break;
422        }
423        n += 1;
424    }
425    n
426}
427
428// ── Tests ────────────────────────────────────────────────────────────────────
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    fn i32_bytes(values: &[i32]) -> Vec<u8> {
435        values.iter().flat_map(|v| v.to_le_bytes()).collect()
436    }
437
438    fn i64_bytes(values: &[i64]) -> Vec<u8> {
439        values.iter().flat_map(|v| v.to_le_bytes()).collect()
440    }
441
442    fn f64_bytes(values: &[f64]) -> Vec<u8> {
443        values.iter().flat_map(|v| v.to_le_bytes()).collect()
444    }
445
446    fn string_bytes(values: &[&str]) -> Vec<u8> {
447        let mut offsets: Vec<u32> = vec![0];
448        let mut data: Vec<u8> = Vec::new();
449        for s in values {
450            data.extend_from_slice(s.as_bytes());
451            offsets.push(data.len() as u32);
452        }
453        let mut out: Vec<u8> = Vec::new();
454        for o in &offsets {
455            out.extend_from_slice(&o.to_le_bytes());
456        }
457        out.extend_from_slice(&data);
458        out
459    }
460
461    #[test]
462    fn i32_min_max_null_row_count() {
463        // values: [3, 1, 4, 1, 5, 9], fill=1 — the two 1s count as nulls and are
464        // excluded from min/max, so the range covers only non-fill elements.
465        let bytes = i32_bytes(&[3, 1, 4, 1, 5, 9]);
466        let (min, max, null_count, row_count) =
467            compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(1)));
468        assert_eq!(min, Some(StatValue::Int(3)));
469        assert_eq!(max, Some(StatValue::Int(9)));
470        assert_eq!(null_count, 2); // two 1s
471        assert_eq!(row_count, 6);
472    }
473
474    #[test]
475    fn i32_no_fill_value() {
476        let bytes = i32_bytes(&[10, 20, 30]);
477        let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::Int32, None);
478        assert_eq!(min, Some(StatValue::Int(10)));
479        assert_eq!(max, Some(StatValue::Int(30)));
480        assert_eq!(null_count, 0);
481        assert_eq!(row_count, 3);
482    }
483
484    #[test]
485    fn all_elements_equal_fill_value() {
486        let bytes = i32_bytes(&[7, 7, 7]);
487        let (_, _, null_count, row_count) =
488            compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(7)));
489        assert_eq!(null_count, row_count);
490        assert_eq!(row_count, 3);
491    }
492
493    #[test]
494    fn f64_min_max() {
495        let bytes = f64_bytes(&[3.0, 1.0, 4.0, 1.5]);
496        let (min, max, null_count, row_count) =
497            compute_chunk_partial(&bytes, &DType::Float64, None);
498        assert_eq!(min, Some(StatValue::Float(1.0)));
499        assert_eq!(max, Some(StatValue::Float(4.0)));
500        assert_eq!(null_count, 0);
501        assert_eq!(row_count, 4);
502    }
503
504    #[test]
505    fn string_lexicographic_min_max() {
506        let bytes = string_bytes(&["banana", "apple", "cherry"]);
507        let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::String, None);
508        assert_eq!(min, Some(StatValue::Bytes(b"apple".to_vec())));
509        assert_eq!(max, Some(StatValue::Bytes(b"cherry".to_vec())));
510        assert_eq!(null_count, 0);
511        assert_eq!(row_count, 3);
512    }
513
514    #[test]
515    fn string_fill_value_null_count() {
516        let bytes = string_bytes(&["a", "", "b", ""]);
517        let fill = FillValue::String(String::new());
518        let (_, _, null_count, row_count) =
519            compute_chunk_partial(&bytes, &DType::String, Some(&fill));
520        assert_eq!(null_count, 2);
521        assert_eq!(row_count, 4);
522    }
523
524    #[test]
525    fn merge_partial_aggregates_correctly() {
526        let mut stats = ArrayStats::new("x".into());
527        merge_partial(
528            &mut stats,
529            Some(StatValue::Int(5)),
530            Some(StatValue::Int(10)),
531            1,
532            3,
533        );
534        merge_partial(
535            &mut stats,
536            Some(StatValue::Int(2)),
537            Some(StatValue::Int(8)),
538            0,
539            2,
540        );
541        assert_eq!(stats.min, Some(StatValue::Int(2)));
542        assert_eq!(stats.max, Some(StatValue::Int(10)));
543        assert_eq!(stats.null_count, 1);
544        assert_eq!(stats.row_count, 5);
545    }
546
547    #[test]
548    fn statsfile_serialize_deserialize_roundtrip() {
549        let sf = StatsFile {
550            arrays: vec![ArrayStats {
551                name: "arr".into(),
552                min: Some(StatValue::Int(-1)),
553                max: Some(StatValue::Int(99)),
554                null_count: 3,
555                row_count: 100,
556            }],
557        };
558        let bytes = sf.serialize().unwrap();
559        let restored = StatsFile::deserialize(&bytes).unwrap();
560        assert_eq!(sf, restored);
561    }
562
563    #[test]
564    fn statsfile_empty_roundtrip() {
565        let sf = StatsFile::default();
566        let bytes = sf.serialize().unwrap();
567        let restored = StatsFile::deserialize(&bytes).unwrap();
568        assert_eq!(sf, restored);
569    }
570
571    #[test]
572    fn timestamp_min_max() {
573        // values: [10, 20, -5, 7, 20] with fill=20 — two 20s are nulls, excluded
574        // from min/max; min comes from -5, max from 10.
575        let bytes = i64_bytes(&[10, 20, -5, 7, 20]);
576        let (min, max, null_count, row_count) = compute_chunk_partial(
577            &bytes,
578            &DType::TimestampNs,
579            Some(&FillValue::TimestampNs(20)),
580        );
581        assert_eq!(min, Some(StatValue::TimestampNs(-5)));
582        assert_eq!(max, Some(StatValue::TimestampNs(10)));
583        assert_eq!(null_count, 2);
584        assert_eq!(row_count, 5);
585    }
586
587    #[test]
588    fn timestamp_fill_value_int_fallback() {
589        // FillValue::Int is accepted as a fallback for the TimestampNs path,
590        // so a value matching it still counts as a null.
591        let bytes = i64_bytes(&[1, 2, 3]);
592        let (_min, _max, null_count, row_count) =
593            compute_chunk_partial(&bytes, &DType::TimestampNs, Some(&FillValue::Int(2)));
594        assert_eq!(null_count, 1);
595        assert_eq!(row_count, 3);
596    }
597}