1use rkyv::{Archive, Deserialize, Serialize};
2
3use crate::dtype::DType;
4use crate::error::{Error, Result};
5use crate::layout::FillValue;
6use crate::storage::Storage;
7
8const MAGIC: [u8; 4] = *b"ARST";
9const TRAILER_SIZE: usize = 12;
10
11#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
13pub enum StatValue {
14 Int(i64),
15 UInt(u64),
16 Float(f64),
17 Bytes(Vec<u8>),
19 TimestampNs(i64),
21}
22
23#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
25pub struct ArrayStats {
26 pub name: String,
27 pub min: Option<StatValue>,
29 pub max: Option<StatValue>,
31 pub null_count: u64,
33 pub row_count: u64,
35}
36
37impl ArrayStats {
38 pub(crate) fn new(name: String) -> Self {
39 Self {
40 name,
41 min: None,
42 max: None,
43 null_count: 0,
44 row_count: 0,
45 }
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Default, Archive, Serialize, Deserialize)]
55pub struct StatsFile {
56 pub arrays: Vec<ArrayStats>,
57}
58
59impl StatsFile {
60 pub(crate) fn upsert(&mut self, new_stats: ArrayStats) {
61 if let Some(existing) = self.arrays.iter_mut().find(|a| a.name == new_stats.name) {
62 *existing = new_stats;
63 } else {
64 self.arrays.push(new_stats);
65 }
66 }
67
68 pub fn get_array(&self, name: &str) -> Option<&ArrayStats> {
69 self.arrays.iter().find(|a| a.name == name)
70 }
71
72 pub(crate) fn serialize(&self) -> Result<Vec<u8>> {
73 let rkyv_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(self)
74 .map_err(|e| Error::Serialization(e.to_string()))?;
75 let size = rkyv_bytes.len() as u64;
76 let mut out = Vec::with_capacity(rkyv_bytes.len() + TRAILER_SIZE);
77 out.extend_from_slice(&rkyv_bytes);
78 out.extend_from_slice(&size.to_le_bytes());
79 out.extend_from_slice(&MAGIC);
80 Ok(out)
81 }
82
83 fn deserialize(data: &[u8]) -> Result<Self> {
84 if data.len() < TRAILER_SIZE {
85 return Err(Error::InvalidFooter("stats data too short".into()));
86 }
87 let magic_start = data.len() - 4;
88 if data[magic_start..] != MAGIC {
89 return Err(Error::InvalidFooter("invalid stats magic".into()));
90 }
91 let size_start = magic_start - 8;
92 let size = u64::from_le_bytes(data[size_start..magic_start].try_into().unwrap()) as usize;
93 if size > size_start {
94 return Err(Error::InvalidFooter("stats size exceeds data".into()));
95 }
96 let rkyv_start = size_start - size;
97 let mut aligned: rkyv::util::AlignedVec = rkyv::util::AlignedVec::new();
98 aligned.extend_from_slice(&data[rkyv_start..size_start]);
99 rkyv::from_bytes::<Self, rkyv::rancor::Error>(&aligned)
100 .map_err(|e| Error::Serialization(e.to_string()))
101 }
102}
103
104pub(crate) async fn read_stats_file(storage: &(dyn Storage + Sync)) -> Result<StatsFile> {
106 let file_size = storage.size().await?;
107 if (file_size as usize) < TRAILER_SIZE {
108 return Err(Error::InvalidFooter("stats file too short".into()));
109 }
110 let trailer = storage
111 .read_range(file_size - TRAILER_SIZE as u64..file_size)
112 .await?;
113 if trailer[8..] != MAGIC {
114 return Err(Error::InvalidFooter("invalid stats magic".into()));
115 }
116 let size = u64::from_le_bytes(trailer[..8].try_into().unwrap()) as usize;
117 let total = size + TRAILER_SIZE;
118 let start = file_size - total as u64;
119 let data = storage.read_range(start..file_size).await?;
120 StatsFile::deserialize(&data)
121}
122
123macro_rules! int_partial {
126 ($bytes:expr, $fill:expr, $ty:ty) => {{
127 let size = std::mem::size_of::<$ty>();
128 let n = $bytes.len() / size;
129 let fill_val: Option<$ty> = match $fill {
130 Some(FillValue::Int(v)) => Some(*v as $ty),
131 Some(FillValue::UInt(v)) => Some(*v as $ty),
132 _ => None,
133 };
134 let mut min: Option<$ty> = None;
135 let mut max: Option<$ty> = None;
136 let mut null_count = 0u64;
137 for i in 0..n {
138 let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
139 if fill_val.map_or(false, |f| e == f) {
140 null_count += 1;
141 } else {
142 min = Some(min.map_or(e, |m| m.min(e)));
143 max = Some(max.map_or(e, |m| m.max(e)));
144 }
145 }
146 (
147 min.map(|v| StatValue::Int(v as i64)),
148 max.map(|v| StatValue::Int(v as i64)),
149 null_count,
150 n as u64,
151 )
152 }};
153}
154
155macro_rules! uint_partial {
156 ($bytes:expr, $fill:expr, $ty:ty) => {{
157 let size = std::mem::size_of::<$ty>();
158 let n = $bytes.len() / size;
159 let fill_val: Option<$ty> = match $fill {
160 Some(FillValue::UInt(v)) => Some(*v as $ty),
161 Some(FillValue::Int(v)) => Some(*v as $ty),
162 _ => None,
163 };
164 let mut min: Option<$ty> = None;
165 let mut max: Option<$ty> = None;
166 let mut null_count = 0u64;
167 for i in 0..n {
168 let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
169 if fill_val.map_or(false, |f| e == f) {
170 null_count += 1;
171 } else {
172 min = Some(min.map_or(e, |m| m.min(e)));
173 max = Some(max.map_or(e, |m| m.max(e)));
174 }
175 }
176 (
177 min.map(|v| StatValue::UInt(v as u64)),
178 max.map(|v| StatValue::UInt(v as u64)),
179 null_count,
180 n as u64,
181 )
182 }};
183}
184
185macro_rules! float_partial {
186 ($bytes:expr, $fill:expr, $ty:ty) => {{
187 let size = std::mem::size_of::<$ty>();
188 let n = $bytes.len() / size;
189 let fill_val: Option<$ty> = match $fill {
190 Some(FillValue::Float(v)) => Some(*v as $ty),
191 _ => None,
192 };
193 let mut min: Option<$ty> = None;
194 let mut max: Option<$ty> = None;
195 let mut null_count = 0u64;
196 for i in 0..n {
197 let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
198 let is_fill =
199 fill_val.map_or(false, |f: $ty| if f.is_nan() { e.is_nan() } else { e == f });
200 if is_fill {
201 null_count += 1;
202 } else {
203 min = Some(match min {
204 None => e,
205 Some(m) => {
206 if e.total_cmp(&m).is_lt() {
207 e
208 } else {
209 m
210 }
211 }
212 });
213 max = Some(match max {
214 None => e,
215 Some(m) => {
216 if e.total_cmp(&m).is_gt() {
217 e
218 } else {
219 m
220 }
221 }
222 });
223 }
224 }
225 (
226 min.map(|v| StatValue::Float(v as f64)),
227 max.map(|v| StatValue::Float(v as f64)),
228 null_count,
229 n as u64,
230 )
231 }};
232}
233
234pub fn compute_chunk_partial(
242 bytes: &[u8],
243 dtype: &DType,
244 fill_value: Option<&FillValue>,
245) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
246 if bytes.is_empty() {
247 return (None, None, 0, 0);
248 }
249 match dtype {
250 DType::Int8 => int_partial!(bytes, fill_value, i8),
251 DType::Int16 => int_partial!(bytes, fill_value, i16),
252 DType::Int32 => int_partial!(bytes, fill_value, i32),
253 DType::Int64 => int_partial!(bytes, fill_value, i64),
254 DType::UInt8 => uint_partial!(bytes, fill_value, u8),
255 DType::UInt16 => uint_partial!(bytes, fill_value, u16),
256 DType::UInt32 => uint_partial!(bytes, fill_value, u32),
257 DType::UInt64 => uint_partial!(bytes, fill_value, u64),
258 DType::Bool => bool_partial(bytes, fill_value),
259 DType::Float32 => float_partial!(bytes, fill_value, f32),
260 DType::Float64 => float_partial!(bytes, fill_value, f64),
261 DType::String | DType::Binary => vlen_partial(bytes, fill_value),
262 DType::TimestampNs => timestamp_partial(bytes, fill_value),
263 DType::FixedSizeList { .. } | DType::List { .. } => (None, None, 0, 0),
264 }
265}
266
267pub fn merge_partial(
269 stats: &mut ArrayStats,
270 min: Option<StatValue>,
271 max: Option<StatValue>,
272 null_count: u64,
273 row_count: u64,
274) {
275 stats.null_count += null_count;
276 stats.row_count += row_count;
277 stats.min = stat_min(stats.min.take(), min);
278 stats.max = stat_max(stats.max.take(), max);
279}
280
281fn stat_min(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
284 match (a, b) {
285 (None, x) | (x, None) => x,
286 (Some(a), Some(b)) => Some(if stat_le(&a, &b) { a } else { b }),
287 }
288}
289
290fn stat_max(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
291 match (a, b) {
292 (None, x) | (x, None) => x,
293 (Some(a), Some(b)) => Some(if stat_le(&a, &b) { b } else { a }),
294 }
295}
296
297fn stat_le(a: &StatValue, b: &StatValue) -> bool {
298 match (a, b) {
299 (StatValue::Int(a), StatValue::Int(b)) => a <= b,
300 (StatValue::UInt(a), StatValue::UInt(b)) => a <= b,
301 (StatValue::Float(a), StatValue::Float(b)) => a.total_cmp(b).is_le(),
302 (StatValue::Bytes(a), StatValue::Bytes(b)) => a <= b,
303 (StatValue::TimestampNs(a), StatValue::TimestampNs(b)) => a <= b,
304 _ => false,
305 }
306}
307
308fn bool_partial(
309 bytes: &[u8],
310 fill: Option<&FillValue>,
311) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
312 let fill_val: Option<u8> = match fill {
313 Some(FillValue::Bool(b)) => Some(u8::from(*b)),
314 _ => None,
315 };
316 let mut min: Option<u8> = None;
317 let mut max: Option<u8> = None;
318 let mut null_count = 0u64;
319 for &e in bytes {
320 if fill_val == Some(e) {
321 null_count += 1;
322 } else {
323 min = Some(min.map_or(e, |m| m.min(e)));
324 max = Some(max.map_or(e, |m| m.max(e)));
325 }
326 }
327 (
328 min.map(|v| StatValue::UInt(v as u64)),
329 max.map(|v| StatValue::UInt(v as u64)),
330 null_count,
331 bytes.len() as u64,
332 )
333}
334
335fn vlen_partial(
336 bytes: &[u8],
337 fill: Option<&FillValue>,
338) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
339 let n = find_vlen_count(bytes);
340 if n == 0 {
341 return (None, None, 0, 0);
342 }
343 let values_base = (n + 1) * 4;
344 let fill_bytes: Option<&[u8]> = match fill {
345 Some(FillValue::String(s)) => Some(s.as_bytes()),
346 _ => None,
347 };
348 let mut min: Option<Vec<u8>> = None;
349 let mut max: Option<Vec<u8>> = None;
350 let mut null_count = 0u64;
351 for i in 0..n {
352 let start = u32::from_le_bytes(bytes[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
353 let end =
354 u32::from_le_bytes(bytes[(i + 1) * 4..(i + 1) * 4 + 4].try_into().unwrap()) as usize;
355 let val = &bytes[values_base + start..values_base + end];
356 if fill_bytes == Some(val) {
357 null_count += 1;
358 } else {
359 match &mut min {
360 slot @ None => *slot = Some(val.to_vec()),
361 Some(m) if val < m.as_slice() => *m = val.to_vec(),
362 _ => {}
363 }
364 match &mut max {
365 slot @ None => *slot = Some(val.to_vec()),
366 Some(m) if val > m.as_slice() => *m = val.to_vec(),
367 _ => {}
368 }
369 }
370 }
371 (
372 min.map(StatValue::Bytes),
373 max.map(StatValue::Bytes),
374 null_count,
375 n as u64,
376 )
377}
378
379fn timestamp_partial(
380 bytes: &[u8],
381 fill: Option<&FillValue>,
382) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
383 let n = bytes.len() / 8;
384 let fill_val: Option<i64> = match fill {
385 Some(FillValue::TimestampNs(v)) => Some(*v),
386 Some(FillValue::Int(v)) => Some(*v),
387 _ => None,
388 };
389 let mut min: Option<i64> = None;
390 let mut max: Option<i64> = None;
391 let mut null_count = 0u64;
392 for i in 0..n {
393 let e = i64::from_le_bytes(bytes[i * 8..(i + 1) * 8].try_into().unwrap());
394 if fill_val == Some(e) {
395 null_count += 1;
396 } else {
397 min = Some(min.map_or(e, |m| m.min(e)));
398 max = Some(max.map_or(e, |m| m.max(e)));
399 }
400 }
401 (
402 min.map(StatValue::TimestampNs),
403 max.map(StatValue::TimestampNs),
404 null_count,
405 n as u64,
406 )
407}
408
409fn find_vlen_count(bytes: &[u8]) -> usize {
412 let mut n = 0usize;
413 loop {
414 let pos = (n + 1) * 4;
415 if pos + 4 > bytes.len() {
416 break;
417 }
418 let off = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
419 if pos + 4 + off == bytes.len() {
420 n += 1;
421 break;
422 }
423 n += 1;
424 }
425 n
426}
427
428#[cfg(test)]
431mod tests {
432 use super::*;
433
434 fn i32_bytes(values: &[i32]) -> Vec<u8> {
435 values.iter().flat_map(|v| v.to_le_bytes()).collect()
436 }
437
438 fn i64_bytes(values: &[i64]) -> Vec<u8> {
439 values.iter().flat_map(|v| v.to_le_bytes()).collect()
440 }
441
442 fn f64_bytes(values: &[f64]) -> Vec<u8> {
443 values.iter().flat_map(|v| v.to_le_bytes()).collect()
444 }
445
446 fn string_bytes(values: &[&str]) -> Vec<u8> {
447 let mut offsets: Vec<u32> = vec![0];
448 let mut data: Vec<u8> = Vec::new();
449 for s in values {
450 data.extend_from_slice(s.as_bytes());
451 offsets.push(data.len() as u32);
452 }
453 let mut out: Vec<u8> = Vec::new();
454 for o in &offsets {
455 out.extend_from_slice(&o.to_le_bytes());
456 }
457 out.extend_from_slice(&data);
458 out
459 }
460
461 #[test]
462 fn i32_min_max_null_row_count() {
463 let bytes = i32_bytes(&[3, 1, 4, 1, 5, 9]);
466 let (min, max, null_count, row_count) =
467 compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(1)));
468 assert_eq!(min, Some(StatValue::Int(3)));
469 assert_eq!(max, Some(StatValue::Int(9)));
470 assert_eq!(null_count, 2); assert_eq!(row_count, 6);
472 }
473
474 #[test]
475 fn i32_no_fill_value() {
476 let bytes = i32_bytes(&[10, 20, 30]);
477 let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::Int32, None);
478 assert_eq!(min, Some(StatValue::Int(10)));
479 assert_eq!(max, Some(StatValue::Int(30)));
480 assert_eq!(null_count, 0);
481 assert_eq!(row_count, 3);
482 }
483
484 #[test]
485 fn all_elements_equal_fill_value() {
486 let bytes = i32_bytes(&[7, 7, 7]);
487 let (_, _, null_count, row_count) =
488 compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(7)));
489 assert_eq!(null_count, row_count);
490 assert_eq!(row_count, 3);
491 }
492
493 #[test]
494 fn f64_min_max() {
495 let bytes = f64_bytes(&[3.0, 1.0, 4.0, 1.5]);
496 let (min, max, null_count, row_count) =
497 compute_chunk_partial(&bytes, &DType::Float64, None);
498 assert_eq!(min, Some(StatValue::Float(1.0)));
499 assert_eq!(max, Some(StatValue::Float(4.0)));
500 assert_eq!(null_count, 0);
501 assert_eq!(row_count, 4);
502 }
503
504 #[test]
505 fn string_lexicographic_min_max() {
506 let bytes = string_bytes(&["banana", "apple", "cherry"]);
507 let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::String, None);
508 assert_eq!(min, Some(StatValue::Bytes(b"apple".to_vec())));
509 assert_eq!(max, Some(StatValue::Bytes(b"cherry".to_vec())));
510 assert_eq!(null_count, 0);
511 assert_eq!(row_count, 3);
512 }
513
514 #[test]
515 fn string_fill_value_null_count() {
516 let bytes = string_bytes(&["a", "", "b", ""]);
517 let fill = FillValue::String(String::new());
518 let (_, _, null_count, row_count) =
519 compute_chunk_partial(&bytes, &DType::String, Some(&fill));
520 assert_eq!(null_count, 2);
521 assert_eq!(row_count, 4);
522 }
523
524 #[test]
525 fn merge_partial_aggregates_correctly() {
526 let mut stats = ArrayStats::new("x".into());
527 merge_partial(
528 &mut stats,
529 Some(StatValue::Int(5)),
530 Some(StatValue::Int(10)),
531 1,
532 3,
533 );
534 merge_partial(
535 &mut stats,
536 Some(StatValue::Int(2)),
537 Some(StatValue::Int(8)),
538 0,
539 2,
540 );
541 assert_eq!(stats.min, Some(StatValue::Int(2)));
542 assert_eq!(stats.max, Some(StatValue::Int(10)));
543 assert_eq!(stats.null_count, 1);
544 assert_eq!(stats.row_count, 5);
545 }
546
547 #[test]
548 fn statsfile_serialize_deserialize_roundtrip() {
549 let sf = StatsFile {
550 arrays: vec![ArrayStats {
551 name: "arr".into(),
552 min: Some(StatValue::Int(-1)),
553 max: Some(StatValue::Int(99)),
554 null_count: 3,
555 row_count: 100,
556 }],
557 };
558 let bytes = sf.serialize().unwrap();
559 let restored = StatsFile::deserialize(&bytes).unwrap();
560 assert_eq!(sf, restored);
561 }
562
563 #[test]
564 fn statsfile_empty_roundtrip() {
565 let sf = StatsFile::default();
566 let bytes = sf.serialize().unwrap();
567 let restored = StatsFile::deserialize(&bytes).unwrap();
568 assert_eq!(sf, restored);
569 }
570
571 #[test]
572 fn timestamp_min_max() {
573 let bytes = i64_bytes(&[10, 20, -5, 7, 20]);
576 let (min, max, null_count, row_count) = compute_chunk_partial(
577 &bytes,
578 &DType::TimestampNs,
579 Some(&FillValue::TimestampNs(20)),
580 );
581 assert_eq!(min, Some(StatValue::TimestampNs(-5)));
582 assert_eq!(max, Some(StatValue::TimestampNs(10)));
583 assert_eq!(null_count, 2);
584 assert_eq!(row_count, 5);
585 }
586
587 #[test]
588 fn timestamp_fill_value_int_fallback() {
589 let bytes = i64_bytes(&[1, 2, 3]);
592 let (_min, _max, null_count, row_count) =
593 compute_chunk_partial(&bytes, &DType::TimestampNs, Some(&FillValue::Int(2)));
594 assert_eq!(null_count, 1);
595 assert_eq!(row_count, 3);
596 }
597}