1use rkyv::{Archive, Deserialize, Serialize};
8
9use crate::dtype::DType;
10use crate::error::{Error, Result};
11use crate::layout::FillValue;
12use crate::storage::Storage;
13
14const MAGIC: [u8; 4] = *b"ARST";
15const TRAILER_SIZE: usize = 12;
16
17#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
19pub enum StatValue {
20 Int(i64),
22 UInt(u64),
24 Float(f64),
26 Bytes(Vec<u8>),
28 TimestampNs(i64),
30}
31
32#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
34pub struct ArrayStats {
35 pub name: String,
37 pub min: Option<StatValue>,
39 pub max: Option<StatValue>,
41 pub null_count: u64,
43 pub row_count: u64,
45}
46
47impl ArrayStats {
48 pub(crate) fn new(name: String) -> Self {
49 Self {
50 name,
51 min: None,
52 max: None,
53 null_count: 0,
54 row_count: 0,
55 }
56 }
57}
58
59#[derive(Debug, Clone, PartialEq, Default, Archive, Serialize, Deserialize)]
65pub struct StatsFile {
66 pub arrays: Vec<ArrayStats>,
68}
69
70impl StatsFile {
71 pub(crate) fn upsert(&mut self, new_stats: ArrayStats) {
72 if let Some(existing) = self.arrays.iter_mut().find(|a| a.name == new_stats.name) {
73 *existing = new_stats;
74 } else {
75 self.arrays.push(new_stats);
76 }
77 }
78
79 pub fn get_array(&self, name: &str) -> Option<&ArrayStats> {
81 self.arrays.iter().find(|a| a.name == name)
82 }
83
84 pub(crate) fn serialize(&self) -> Result<Vec<u8>> {
85 let rkyv_bytes = rkyv::to_bytes::<rkyv::rancor::Error>(self)
86 .map_err(|e| Error::Serialization(e.to_string()))?;
87 let size = rkyv_bytes.len() as u64;
88 let mut out = Vec::with_capacity(rkyv_bytes.len() + TRAILER_SIZE);
89 out.extend_from_slice(&rkyv_bytes);
90 out.extend_from_slice(&size.to_le_bytes());
91 out.extend_from_slice(&MAGIC);
92 Ok(out)
93 }
94
95 fn deserialize(data: &[u8]) -> Result<Self> {
96 if data.len() < TRAILER_SIZE {
97 return Err(Error::InvalidFooter("stats data too short".into()));
98 }
99 let magic_start = data.len() - 4;
100 if data[magic_start..] != MAGIC {
101 return Err(Error::InvalidFooter("invalid stats magic".into()));
102 }
103 let size_start = magic_start - 8;
104 let size = u64::from_le_bytes(data[size_start..magic_start].try_into().unwrap()) as usize;
105 if size > size_start {
106 return Err(Error::InvalidFooter("stats size exceeds data".into()));
107 }
108 let rkyv_start = size_start - size;
109 let mut aligned: rkyv::util::AlignedVec = rkyv::util::AlignedVec::new();
110 aligned.extend_from_slice(&data[rkyv_start..size_start]);
111 rkyv::from_bytes::<Self, rkyv::rancor::Error>(&aligned)
112 .map_err(|e| Error::Serialization(e.to_string()))
113 }
114}
115
116pub(crate) async fn read_stats_file(storage: &(dyn Storage + Sync)) -> Result<StatsFile> {
118 let file_size = storage.size().await?;
119 if (file_size as usize) < TRAILER_SIZE {
120 return Err(Error::InvalidFooter("stats file too short".into()));
121 }
122 let trailer = storage
123 .read_range(file_size - TRAILER_SIZE as u64..file_size)
124 .await?;
125 if trailer[8..] != MAGIC {
126 return Err(Error::InvalidFooter("invalid stats magic".into()));
127 }
128 let size = u64::from_le_bytes(trailer[..8].try_into().unwrap()) as usize;
129 let total = size + TRAILER_SIZE;
130 let start = file_size - total as u64;
131 let data = storage.read_range(start..file_size).await?;
132 StatsFile::deserialize(&data)
133}
134
135macro_rules! int_partial {
138 ($bytes:expr, $fill:expr, $ty:ty) => {{
139 let size = std::mem::size_of::<$ty>();
140 let n = $bytes.len() / size;
141 let fill_val: Option<$ty> = match $fill {
142 Some(FillValue::Int(v)) => Some(*v as $ty),
143 Some(FillValue::UInt(v)) => Some(*v as $ty),
144 _ => None,
145 };
146 let mut min: Option<$ty> = None;
147 let mut max: Option<$ty> = None;
148 let mut null_count = 0u64;
149 for i in 0..n {
150 let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
151 if fill_val.map_or(false, |f| e == f) {
152 null_count += 1;
153 } else {
154 min = Some(min.map_or(e, |m| m.min(e)));
155 max = Some(max.map_or(e, |m| m.max(e)));
156 }
157 }
158 (
159 min.map(|v| StatValue::Int(v as i64)),
160 max.map(|v| StatValue::Int(v as i64)),
161 null_count,
162 n as u64,
163 )
164 }};
165}
166
167macro_rules! uint_partial {
168 ($bytes:expr, $fill:expr, $ty:ty) => {{
169 let size = std::mem::size_of::<$ty>();
170 let n = $bytes.len() / size;
171 let fill_val: Option<$ty> = match $fill {
172 Some(FillValue::UInt(v)) => Some(*v as $ty),
173 Some(FillValue::Int(v)) => Some(*v as $ty),
174 _ => None,
175 };
176 let mut min: Option<$ty> = None;
177 let mut max: Option<$ty> = None;
178 let mut null_count = 0u64;
179 for i in 0..n {
180 let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
181 if fill_val.map_or(false, |f| e == f) {
182 null_count += 1;
183 } else {
184 min = Some(min.map_or(e, |m| m.min(e)));
185 max = Some(max.map_or(e, |m| m.max(e)));
186 }
187 }
188 (
189 min.map(|v| StatValue::UInt(v as u64)),
190 max.map(|v| StatValue::UInt(v as u64)),
191 null_count,
192 n as u64,
193 )
194 }};
195}
196
197macro_rules! float_partial {
198 ($bytes:expr, $fill:expr, $ty:ty) => {{
199 let size = std::mem::size_of::<$ty>();
200 let n = $bytes.len() / size;
201 let fill_val: Option<$ty> = match $fill {
202 Some(FillValue::Float(v)) => Some(*v as $ty),
203 _ => None,
204 };
205 let mut min: Option<$ty> = None;
206 let mut max: Option<$ty> = None;
207 let mut null_count = 0u64;
208 for i in 0..n {
209 let e = <$ty>::from_le_bytes($bytes[i * size..(i + 1) * size].try_into().unwrap());
210 let is_fill =
211 fill_val.map_or(false, |f: $ty| if f.is_nan() { e.is_nan() } else { e == f });
212 if is_fill {
213 null_count += 1;
214 } else {
215 min = Some(match min {
216 None => e,
217 Some(m) => {
218 if e.total_cmp(&m).is_lt() {
219 e
220 } else {
221 m
222 }
223 }
224 });
225 max = Some(match max {
226 None => e,
227 Some(m) => {
228 if e.total_cmp(&m).is_gt() {
229 e
230 } else {
231 m
232 }
233 }
234 });
235 }
236 }
237 (
238 min.map(|v| StatValue::Float(v as f64)),
239 max.map(|v| StatValue::Float(v as f64)),
240 null_count,
241 n as u64,
242 )
243 }};
244}
245
246pub(crate) fn compute_chunk_partial(
254 bytes: &[u8],
255 dtype: &DType,
256 fill_value: Option<&FillValue>,
257) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
258 if bytes.is_empty() {
259 return (None, None, 0, 0);
260 }
261 match dtype {
262 DType::Int8 => int_partial!(bytes, fill_value, i8),
263 DType::Int16 => int_partial!(bytes, fill_value, i16),
264 DType::Int32 => int_partial!(bytes, fill_value, i32),
265 DType::Int64 => int_partial!(bytes, fill_value, i64),
266 DType::UInt8 => uint_partial!(bytes, fill_value, u8),
267 DType::UInt16 => uint_partial!(bytes, fill_value, u16),
268 DType::UInt32 => uint_partial!(bytes, fill_value, u32),
269 DType::UInt64 => uint_partial!(bytes, fill_value, u64),
270 DType::Bool => bool_partial(bytes, fill_value),
271 DType::Float32 => float_partial!(bytes, fill_value, f32),
272 DType::Float64 => float_partial!(bytes, fill_value, f64),
273 DType::String | DType::Binary => vlen_partial(bytes, fill_value),
274 DType::TimestampNs => timestamp_partial(bytes, fill_value),
275 DType::FixedSizeList { .. } | DType::List { .. } => (None, None, 0, 0),
276 }
277}
278
279pub(crate) fn merge_partial(
281 stats: &mut ArrayStats,
282 min: Option<StatValue>,
283 max: Option<StatValue>,
284 null_count: u64,
285 row_count: u64,
286) {
287 stats.null_count += null_count;
288 stats.row_count += row_count;
289 stats.min = stat_min(stats.min.take(), min);
290 stats.max = stat_max(stats.max.take(), max);
291}
292
293fn stat_min(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
296 match (a, b) {
297 (None, x) | (x, None) => x,
298 (Some(a), Some(b)) => Some(if stat_le(&a, &b) { a } else { b }),
299 }
300}
301
302fn stat_max(a: Option<StatValue>, b: Option<StatValue>) -> Option<StatValue> {
303 match (a, b) {
304 (None, x) | (x, None) => x,
305 (Some(a), Some(b)) => Some(if stat_le(&a, &b) { b } else { a }),
306 }
307}
308
309fn stat_le(a: &StatValue, b: &StatValue) -> bool {
310 match (a, b) {
311 (StatValue::Int(a), StatValue::Int(b)) => a <= b,
312 (StatValue::UInt(a), StatValue::UInt(b)) => a <= b,
313 (StatValue::Float(a), StatValue::Float(b)) => a.total_cmp(b).is_le(),
314 (StatValue::Bytes(a), StatValue::Bytes(b)) => a <= b,
315 (StatValue::TimestampNs(a), StatValue::TimestampNs(b)) => a <= b,
316 _ => false,
317 }
318}
319
320fn bool_partial(
321 bytes: &[u8],
322 fill: Option<&FillValue>,
323) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
324 let fill_val: Option<u8> = match fill {
325 Some(FillValue::Bool(b)) => Some(u8::from(*b)),
326 _ => None,
327 };
328 let mut min: Option<u8> = None;
329 let mut max: Option<u8> = None;
330 let mut null_count = 0u64;
331 for &e in bytes {
332 if fill_val == Some(e) {
333 null_count += 1;
334 } else {
335 min = Some(min.map_or(e, |m| m.min(e)));
336 max = Some(max.map_or(e, |m| m.max(e)));
337 }
338 }
339 (
340 min.map(|v| StatValue::UInt(v as u64)),
341 max.map(|v| StatValue::UInt(v as u64)),
342 null_count,
343 bytes.len() as u64,
344 )
345}
346
347fn vlen_partial(
348 bytes: &[u8],
349 fill: Option<&FillValue>,
350) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
351 let n = find_vlen_count(bytes);
352 if n == 0 {
353 return (None, None, 0, 0);
354 }
355 let values_base = (n + 1) * 4;
356 let fill_bytes: Option<&[u8]> = match fill {
357 Some(FillValue::String(s)) => Some(s.as_bytes()),
358 _ => None,
359 };
360 let mut min: Option<Vec<u8>> = None;
361 let mut max: Option<Vec<u8>> = None;
362 let mut null_count = 0u64;
363 for i in 0..n {
364 let start = u32::from_le_bytes(bytes[i * 4..i * 4 + 4].try_into().unwrap()) as usize;
365 let end =
366 u32::from_le_bytes(bytes[(i + 1) * 4..(i + 1) * 4 + 4].try_into().unwrap()) as usize;
367 let val = &bytes[values_base + start..values_base + end];
368 if fill_bytes == Some(val) {
369 null_count += 1;
370 } else {
371 match &mut min {
372 slot @ None => *slot = Some(val.to_vec()),
373 Some(m) if val < m.as_slice() => *m = val.to_vec(),
374 _ => {}
375 }
376 match &mut max {
377 slot @ None => *slot = Some(val.to_vec()),
378 Some(m) if val > m.as_slice() => *m = val.to_vec(),
379 _ => {}
380 }
381 }
382 }
383 (
384 min.map(StatValue::Bytes),
385 max.map(StatValue::Bytes),
386 null_count,
387 n as u64,
388 )
389}
390
391fn timestamp_partial(
392 bytes: &[u8],
393 fill: Option<&FillValue>,
394) -> (Option<StatValue>, Option<StatValue>, u64, u64) {
395 let n = bytes.len() / 8;
396 let fill_val: Option<i64> = match fill {
397 Some(FillValue::TimestampNs(v)) => Some(*v),
398 Some(FillValue::Int(v)) => Some(*v),
399 _ => None,
400 };
401 let mut min: Option<i64> = None;
402 let mut max: Option<i64> = None;
403 let mut null_count = 0u64;
404 for i in 0..n {
405 let e = i64::from_le_bytes(bytes[i * 8..(i + 1) * 8].try_into().unwrap());
406 if fill_val == Some(e) {
407 null_count += 1;
408 } else {
409 min = Some(min.map_or(e, |m| m.min(e)));
410 max = Some(max.map_or(e, |m| m.max(e)));
411 }
412 }
413 (
414 min.map(StatValue::TimestampNs),
415 max.map(StatValue::TimestampNs),
416 null_count,
417 n as u64,
418 )
419}
420
421fn find_vlen_count(bytes: &[u8]) -> usize {
424 let mut n = 0usize;
425 loop {
426 let pos = (n + 1) * 4;
427 if pos + 4 > bytes.len() {
428 break;
429 }
430 let off = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
431 if pos + 4 + off == bytes.len() {
432 n += 1;
433 break;
434 }
435 n += 1;
436 }
437 n
438}
439
440#[cfg(test)]
443mod tests {
444 use super::*;
445
446 fn i32_bytes(values: &[i32]) -> Vec<u8> {
447 values.iter().flat_map(|v| v.to_le_bytes()).collect()
448 }
449
450 fn i64_bytes(values: &[i64]) -> Vec<u8> {
451 values.iter().flat_map(|v| v.to_le_bytes()).collect()
452 }
453
454 fn f64_bytes(values: &[f64]) -> Vec<u8> {
455 values.iter().flat_map(|v| v.to_le_bytes()).collect()
456 }
457
458 fn string_bytes(values: &[&str]) -> Vec<u8> {
459 let mut offsets: Vec<u32> = vec![0];
460 let mut data: Vec<u8> = Vec::new();
461 for s in values {
462 data.extend_from_slice(s.as_bytes());
463 offsets.push(data.len() as u32);
464 }
465 let mut out: Vec<u8> = Vec::new();
466 for o in &offsets {
467 out.extend_from_slice(&o.to_le_bytes());
468 }
469 out.extend_from_slice(&data);
470 out
471 }
472
473 #[test]
474 fn i32_min_max_null_row_count() {
475 let bytes = i32_bytes(&[3, 1, 4, 1, 5, 9]);
478 let (min, max, null_count, row_count) =
479 compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(1)));
480 assert_eq!(min, Some(StatValue::Int(3)));
481 assert_eq!(max, Some(StatValue::Int(9)));
482 assert_eq!(null_count, 2); assert_eq!(row_count, 6);
484 }
485
486 #[test]
487 fn i32_no_fill_value() {
488 let bytes = i32_bytes(&[10, 20, 30]);
489 let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::Int32, None);
490 assert_eq!(min, Some(StatValue::Int(10)));
491 assert_eq!(max, Some(StatValue::Int(30)));
492 assert_eq!(null_count, 0);
493 assert_eq!(row_count, 3);
494 }
495
496 #[test]
497 fn all_elements_equal_fill_value() {
498 let bytes = i32_bytes(&[7, 7, 7]);
499 let (_, _, null_count, row_count) =
500 compute_chunk_partial(&bytes, &DType::Int32, Some(&FillValue::Int(7)));
501 assert_eq!(null_count, row_count);
502 assert_eq!(row_count, 3);
503 }
504
505 #[test]
506 fn f64_min_max() {
507 let bytes = f64_bytes(&[3.0, 1.0, 4.0, 1.5]);
508 let (min, max, null_count, row_count) =
509 compute_chunk_partial(&bytes, &DType::Float64, None);
510 assert_eq!(min, Some(StatValue::Float(1.0)));
511 assert_eq!(max, Some(StatValue::Float(4.0)));
512 assert_eq!(null_count, 0);
513 assert_eq!(row_count, 4);
514 }
515
516 #[test]
517 fn string_lexicographic_min_max() {
518 let bytes = string_bytes(&["banana", "apple", "cherry"]);
519 let (min, max, null_count, row_count) = compute_chunk_partial(&bytes, &DType::String, None);
520 assert_eq!(min, Some(StatValue::Bytes(b"apple".to_vec())));
521 assert_eq!(max, Some(StatValue::Bytes(b"cherry".to_vec())));
522 assert_eq!(null_count, 0);
523 assert_eq!(row_count, 3);
524 }
525
526 #[test]
527 fn string_fill_value_null_count() {
528 let bytes = string_bytes(&["a", "", "b", ""]);
529 let fill = FillValue::String(String::new());
530 let (_, _, null_count, row_count) =
531 compute_chunk_partial(&bytes, &DType::String, Some(&fill));
532 assert_eq!(null_count, 2);
533 assert_eq!(row_count, 4);
534 }
535
536 #[test]
537 fn merge_partial_aggregates_correctly() {
538 let mut stats = ArrayStats::new("x".into());
539 merge_partial(
540 &mut stats,
541 Some(StatValue::Int(5)),
542 Some(StatValue::Int(10)),
543 1,
544 3,
545 );
546 merge_partial(
547 &mut stats,
548 Some(StatValue::Int(2)),
549 Some(StatValue::Int(8)),
550 0,
551 2,
552 );
553 assert_eq!(stats.min, Some(StatValue::Int(2)));
554 assert_eq!(stats.max, Some(StatValue::Int(10)));
555 assert_eq!(stats.null_count, 1);
556 assert_eq!(stats.row_count, 5);
557 }
558
559 #[test]
560 fn statsfile_serialize_deserialize_roundtrip() {
561 let sf = StatsFile {
562 arrays: vec![ArrayStats {
563 name: "arr".into(),
564 min: Some(StatValue::Int(-1)),
565 max: Some(StatValue::Int(99)),
566 null_count: 3,
567 row_count: 100,
568 }],
569 };
570 let bytes = sf.serialize().unwrap();
571 let restored = StatsFile::deserialize(&bytes).unwrap();
572 assert_eq!(sf, restored);
573 }
574
575 #[test]
576 fn statsfile_empty_roundtrip() {
577 let sf = StatsFile::default();
578 let bytes = sf.serialize().unwrap();
579 let restored = StatsFile::deserialize(&bytes).unwrap();
580 assert_eq!(sf, restored);
581 }
582
583 #[test]
584 fn timestamp_min_max() {
585 let bytes = i64_bytes(&[10, 20, -5, 7, 20]);
588 let (min, max, null_count, row_count) = compute_chunk_partial(
589 &bytes,
590 &DType::TimestampNs,
591 Some(&FillValue::TimestampNs(20)),
592 );
593 assert_eq!(min, Some(StatValue::TimestampNs(-5)));
594 assert_eq!(max, Some(StatValue::TimestampNs(10)));
595 assert_eq!(null_count, 2);
596 assert_eq!(row_count, 5);
597 }
598
599 #[test]
600 fn timestamp_fill_value_int_fallback() {
601 let bytes = i64_bytes(&[1, 2, 3]);
604 let (_min, _max, null_count, row_count) =
605 compute_chunk_partial(&bytes, &DType::TimestampNs, Some(&FillValue::Int(2)));
606 assert_eq!(null_count, 1);
607 assert_eq!(row_count, 3);
608 }
609}