1#[cfg(test)]
6use std::cell::Cell;
7use std::cell::OnceCell;
8
9use super::writer::{ColumnType, DICT_BLOCK_MASK, DICT_BLOCK_SHIFT, DICT_BLOCK_SIZE, unpack_i64};
10use crate::core::FieldId;
11
12#[derive(Clone, Debug)]
14pub struct ColumnStats {
15 pub null_count: u32,
16 pub min: f64,
17 pub max: f64,
18}
19
20enum KeywordDict<'a> {
25 None,
27 Eager(Vec<&'a str>),
29 Blocked {
36 dict_body_start: usize,
37 block_addrs_start: usize,
38 num_entries: u32,
39 dict: OnceCell<Vec<&'a str>>,
40 },
41}
42
43pub struct ColumnReader<'a> {
45 data: &'a [u8],
46 field_id: FieldId,
47 col_type: ColumnType,
48 doc_count: u32,
49 null_bitset_start: usize,
50 body_start: usize,
51 keyword: KeywordDict<'a>,
53 ordinals_start: usize,
55 stats: Option<ColumnStats>,
57}
58
59const _: fn() = || {
70 fn assert_send<T: Send>() {}
71 assert_send::<ColumnReader<'static>>();
72};
73
74#[cfg(test)]
75thread_local! {
76 static DICT_BUILDS: Cell<u32> = const { Cell::new(0) };
83}
84
85impl<'a> ColumnReader<'a> {
86 pub fn open(data: &'a [u8]) -> Self {
87 if data.len() < 7 {
88 return Self::empty(data);
89 }
90
91 let field_id = FieldId::new(u16::from_le_bytes([data[0], data[1]]));
92 let col_type_byte = data[2];
93 let doc_count = u32::from_le_bytes([data[3], data[4], data[5], data[6]]);
94
95 let col_type = match col_type_byte {
96 0 => ColumnType::Empty,
97 1 => ColumnType::Keyword,
98 2 => ColumnType::I64,
99 3 => ColumnType::F64,
100 4 => ColumnType::Bool,
101 5 => ColumnType::ConstantI64,
102 6 => ColumnType::ConstantF64,
103 7 => ColumnType::BitpackedI64,
104 8 => ColumnType::KeywordBlocked,
105 _ => ColumnType::Empty,
106 };
107
108 if doc_count == 0 || col_type == ColumnType::Empty {
109 return Self {
110 data,
111 field_id,
112 col_type,
113 doc_count: 0,
114 null_bitset_start: 7,
115 body_start: 7,
116 keyword: KeywordDict::None,
117 ordinals_start: 0,
118 stats: None,
119 };
120 }
121
122 let null_bytes = (doc_count as usize + 7) / 8;
123 let null_bitset_start = 7;
124 let mut body_start = null_bitset_start + null_bytes;
125
126 let stats = if col_type.is_numeric() && body_start + 20 <= data.len() {
128 let null_count =
129 u32::from_le_bytes(data[body_start..body_start + 4].try_into().unwrap());
130 let min = f64::from_le_bytes(data[body_start + 4..body_start + 12].try_into().unwrap());
131 let max =
132 f64::from_le_bytes(data[body_start + 12..body_start + 20].try_into().unwrap());
133 body_start += 20; Some(ColumnStats {
135 null_count,
136 min,
137 max,
138 })
139 } else {
140 None
141 };
142
143 let mut reader = Self {
144 data,
145 field_id,
146 col_type,
147 doc_count,
148 null_bitset_start,
149 body_start,
150 keyword: KeywordDict::None,
151 ordinals_start: 0,
152 stats,
153 };
154
155 match col_type {
156 ColumnType::Keyword => reader.parse_keyword_dict(),
158 ColumnType::KeywordBlocked => {
161 let mut pos = reader.body_start;
162 let num_entries = u32::from_le_bytes(reader.data[pos..pos + 4].try_into().unwrap());
163 pos += 4;
164 let body_len =
165 u64::from_le_bytes(reader.data[pos..pos + 8].try_into().unwrap()) as usize;
166 pos += 8;
167 debug_assert!(body_len as u64 <= usize::MAX as u64);
171 let dict_body_start = pos;
172 let block_addrs_start = dict_body_start + body_len;
173 let num_blocks = (num_entries as usize).div_ceil(DICT_BLOCK_SIZE);
174 reader.ordinals_start = block_addrs_start + num_blocks * 8;
175 reader.keyword = KeywordDict::Blocked {
176 dict_body_start,
177 block_addrs_start,
178 num_entries,
179 dict: OnceCell::new(),
180 };
181 }
182 _ => {}
183 }
184
185 reader
186 }
187
188 fn empty(data: &'a [u8]) -> Self {
189 Self {
190 data,
191 field_id: FieldId::new(0),
192 col_type: ColumnType::Empty,
193 doc_count: 0,
194 null_bitset_start: 0,
195 body_start: 0,
196 keyword: KeywordDict::None,
197 ordinals_start: 0,
198 stats: None,
199 }
200 }
201
202 fn parse_keyword_dict(&mut self) {
203 let mut pos = self.body_start;
204 let num_entries = u32::from_le_bytes(self.data[pos..pos + 4].try_into().unwrap()) as usize;
205 pos += 4;
206
207 let mut dict = Vec::with_capacity(num_entries);
208 for _ in 0..num_entries {
209 let len = u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
210 pos += 2;
211 let s = std::str::from_utf8(&self.data[pos..pos + len]).unwrap();
212 pos += len;
213 dict.push(s);
214 }
215 self.ordinals_start = pos;
216 self.keyword = KeywordDict::Eager(dict);
217 }
218
219 pub fn field_id(&self) -> FieldId {
220 self.field_id
221 }
222
223 pub(crate) fn col_type(&self) -> ColumnType {
224 self.col_type
225 }
226
227 pub fn doc_count(&self) -> u32 {
228 self.doc_count
229 }
230
231 pub fn is_null(&self, doc_id: u32) -> bool {
232 if doc_id >= self.doc_count {
233 return true;
234 }
235 let byte_idx = self.null_bitset_start + (doc_id as usize / 8);
236 let bit_idx = doc_id as usize % 8;
237 (self.data[byte_idx] >> bit_idx) & 1 == 1
238 }
239
240 pub fn keyword_ordinal(&self, doc_id: u32) -> Option<u32> {
242 if doc_id >= self.doc_count || self.is_null(doc_id) {
243 return None;
244 }
245 let pos = self.ordinals_start + doc_id as usize * 4;
246 let ordinal = u32::from_le_bytes(self.data[pos..pos + 4].try_into().unwrap());
247 if ordinal == u32::MAX {
248 None
249 } else {
250 Some(ordinal)
251 }
252 }
253
254 pub fn dict_size(&self) -> usize {
256 match &self.keyword {
257 KeywordDict::None => 0,
258 KeywordDict::Eager(dict) => dict.len(),
259 KeywordDict::Blocked { num_entries, .. } => *num_entries as usize,
264 }
265 }
266
267 pub fn ordinal_to_string(&self, ordinal: u32) -> Option<&'a str> {
271 match &self.keyword {
272 KeywordDict::None => None,
273 KeywordDict::Eager(dict) => dict.get(ordinal as usize).copied(),
274 KeywordDict::Blocked {
275 dict_body_start,
276 block_addrs_start,
277 num_entries,
278 dict,
279 } => {
280 if ordinal >= *num_entries {
281 return None;
282 }
283 if let Some(v) = dict.get() {
287 return v.get(ordinal as usize).copied();
288 }
289 let block = (ordinal >> DICT_BLOCK_SHIFT) as usize;
291 let off = block_addrs_start + block * 8;
292 let addr = u64::from_le_bytes(self.data[off..off + 8].try_into().unwrap()) as usize;
293 let mut pos = dict_body_start + addr;
294 for _ in 0..(ordinal & DICT_BLOCK_MASK) {
295 let len =
296 u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
297 pos += 2 + len;
298 }
299 let len = u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
300 pos += 2;
301 Some(
302 std::str::from_utf8(&self.data[pos..pos + len])
303 .expect("keyword dict bytes are valid UTF-8 by construction"),
304 )
305 }
306 }
307 }
308
309 pub fn ensure_dict(&self) {
316 if let KeywordDict::Blocked {
317 dict_body_start,
318 num_entries,
319 dict,
320 ..
321 } = &self.keyword
322 {
323 dict.get_or_init(|| {
324 #[cfg(test)]
325 DICT_BUILDS.with(|c| c.set(c.get() + 1));
326 let mut v = Vec::with_capacity(*num_entries as usize);
327 let mut pos = *dict_body_start;
328 for _ in 0..*num_entries {
329 let len =
330 u16::from_le_bytes(self.data[pos..pos + 2].try_into().unwrap()) as usize;
331 pos += 2;
332 v.push(
333 std::str::from_utf8(&self.data[pos..pos + len])
334 .expect("keyword dict bytes are valid UTF-8 by construction"),
335 );
336 pos += len;
337 }
338 v
339 });
340 }
341 }
342
343 pub fn keyword_value(&self, doc_id: u32) -> Option<&'a str> {
349 self.keyword_ordinal(doc_id)
350 .and_then(|o| self.ordinal_to_string(o))
351 }
352
353 pub fn i64_value(&self, doc_id: u32) -> Option<i64> {
354 if doc_id >= self.doc_count || self.is_null(doc_id) {
355 return None;
356 }
357 if self.col_type == ColumnType::BitpackedI64 {
358 let min_val = i64::from_le_bytes(
359 self.data[self.body_start..self.body_start + 8]
360 .try_into()
361 .unwrap(),
362 );
363 let bit_width = self.data[self.body_start + 8];
364 let packed_start = self.body_start + 9;
365 return Some(unpack_i64(
366 &self.data[packed_start..],
367 doc_id as usize,
368 min_val,
369 bit_width,
370 ));
371 }
372 let pos = self.body_start + doc_id as usize * 8;
373 Some(i64::from_le_bytes(
374 self.data[pos..pos + 8].try_into().unwrap(),
375 ))
376 }
377
378 pub fn f64_value(&self, doc_id: u32) -> Option<f64> {
379 if doc_id >= self.doc_count || self.is_null(doc_id) {
380 return None;
381 }
382 let pos = self.body_start + doc_id as usize * 8;
383 Some(f64::from_le_bytes(
384 self.data[pos..pos + 8].try_into().unwrap(),
385 ))
386 }
387
388 pub fn bool_value(&self, doc_id: u32) -> Option<bool> {
389 if doc_id >= self.doc_count || self.is_null(doc_id) {
390 return None;
391 }
392 let byte_idx = self.body_start + (doc_id as usize / 8);
393 let bit_idx = doc_id as usize % 8;
394 Some((self.data[byte_idx] >> bit_idx) & 1 == 1)
395 }
396
397 pub fn numeric_value(&self, doc_id: u32) -> Option<f64> {
401 match self.col_type {
402 ColumnType::I64 | ColumnType::BitpackedI64 => self.i64_value(doc_id).map(|v| v as f64),
403 ColumnType::F64 => self.f64_value(doc_id),
404 _ => None,
405 }
406 }
407
408 pub fn is_constant(&self) -> bool {
411 matches!(
412 self.col_type,
413 ColumnType::ConstantI64 | ColumnType::ConstantF64
414 )
415 }
416
417 pub fn constant_value(&self) -> Option<f64> {
419 match self.col_type {
420 ColumnType::ConstantI64 => Some(i64::from_le_bytes(
421 self.data[self.body_start..self.body_start + 8]
422 .try_into()
423 .unwrap(),
424 ) as f64),
425 ColumnType::ConstantF64 => Some(f64::from_le_bytes(
426 self.data[self.body_start..self.body_start + 8]
427 .try_into()
428 .unwrap(),
429 )),
430 _ => None,
431 }
432 }
433
434 pub fn stats(&self) -> Option<&ColumnStats> {
438 self.stats.as_ref()
439 }
440}
441
442pub struct ColumnarReader<'a> {
444 data: &'a [u8],
445 columns: Vec<(FieldId, usize, usize)>, }
447
448impl<'a> ColumnarReader<'a> {
449 pub fn open(data: &'a [u8]) -> Self {
450 if data.len() < 2 {
451 return Self {
452 data,
453 columns: Vec::new(),
454 };
455 }
456
457 let num_columns = u16::from_le_bytes([data[0], data[1]]) as usize;
458 let mut pos = 2usize;
459 let mut columns = Vec::with_capacity(num_columns);
460
461 for _ in 0..num_columns {
462 let start = pos;
463 if pos + 7 > data.len() {
465 break;
466 }
467 let field_id = FieldId::new(u16::from_le_bytes([data[pos], data[pos + 1]]));
468 let col_type = data[pos + 2];
469 let doc_count = u32::from_le_bytes(data[pos + 3..pos + 7].try_into().unwrap()) as usize;
470 pos += 7;
471
472 if doc_count == 0 || col_type == 0 {
473 columns.push((field_id, start, pos));
474 continue;
475 }
476
477 let null_bytes = (doc_count + 7) / 8;
479 pos += null_bytes;
480
481 let is_numeric = matches!(col_type, 2 | 3 | 5 | 6 | 7);
483 if is_numeric {
484 pos += 20;
485 }
486
487 match col_type {
489 1 => {
490 let num_entries =
492 u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
493 pos += 4;
494 for _ in 0..num_entries {
495 let len =
496 u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
497 pos += 2 + len;
498 }
499 pos += doc_count * 4; }
501 8 => {
502 let num_entries =
505 u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
506 pos += 4;
507 let body_len =
508 u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize;
509 pos += 8;
510 pos += body_len; pos += num_entries.div_ceil(DICT_BLOCK_SIZE) * 8; pos += doc_count * 4; }
514 2 | 3 => {
515 pos += doc_count * 8;
517 }
518 4 => {
519 pos += (doc_count + 7) / 8;
521 }
522 5 | 6 => {
523 pos += 8;
525 }
526 7 => {
527 let bit_width = data[pos + 8] as usize;
529 pos += 9; pos += (doc_count * bit_width + 7) / 8; }
532 _ => {}
533 }
534
535 columns.push((field_id, start, pos));
536 }
537
538 Self { data, columns }
539 }
540
541 pub fn column(&self, field_id: FieldId) -> Option<ColumnReader<'a>> {
543 for &(fid, start, end) in &self.columns {
544 if fid == field_id {
545 return Some(ColumnReader::open(&self.data[start..end]));
546 }
547 }
548 None
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::{ColumnReader, ColumnarReader, DICT_BUILDS, KeywordDict};
555 use crate::columnar::writer::{
556 ColumnType, ColumnValue, ColumnWriter, ColumnarWriter, DICT_BLOCK_SIZE,
557 };
558 use crate::core::FieldId;
559 use std::collections::HashMap;
560
561 fn write_legacy_keyword_column(field_id: u16, values: &[&str]) -> Vec<u8> {
566 let mut dict: Vec<String> = values.iter().map(|s| s.to_string()).collect();
567 dict.sort();
568 dict.dedup();
569 let mut ord_of: HashMap<String, u32> = HashMap::new();
570 for (i, t) in dict.iter().enumerate() {
571 ord_of.insert(t.clone(), i as u32);
572 }
573 let mut buf = Vec::new();
574 buf.extend_from_slice(&field_id.to_le_bytes());
575 buf.push(1u8); buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
577 let null_bytes = values.len().div_ceil(8);
578 buf.resize(buf.len() + null_bytes, 0u8); buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
580 for t in &dict {
581 buf.extend_from_slice(&(t.len() as u16).to_le_bytes());
582 buf.extend_from_slice(t.as_bytes());
583 }
584 for v in values {
585 buf.extend_from_slice(&ord_of[*v].to_le_bytes());
586 }
587 buf
588 }
589
590 #[test]
593 fn keyword_offset_index_roundtrip() {
594 let values: Vec<String> = (0..200).map(|i| format!("key_{i:04}")).collect();
596 let mut w = ColumnWriter::new(FieldId::new(0));
597 for v in &values {
598 w.add(ColumnValue::Keyword(v.clone()));
599 }
600 let data = w.finish();
601 let r = ColumnReader::open(&data);
602 assert_eq!(r.dict_size(), 200);
603 for (i, v) in values.iter().enumerate() {
604 assert_eq!(
605 r.ordinal_to_string(i as u32),
606 Some(v.as_str()),
607 "ordinal {i} mismatch"
608 );
609 }
610 assert_eq!(r.ordinal_to_string(200), None);
611 }
612
613 #[test]
615 fn keyword_value_matches_doc() {
616 let mut w = ColumnWriter::new(FieldId::new(0));
617 let mut expected: Vec<Option<String>> = Vec::new();
618 for doc in 0..1000u32 {
619 if doc % 7 == 3 {
620 w.add(ColumnValue::Null);
621 expected.push(None);
622 } else {
623 let v = format!("val_{:03}", (doc.wrapping_mul(31).wrapping_add(17)) % 80);
624 w.add(ColumnValue::Keyword(v.clone()));
625 expected.push(Some(v));
626 }
627 }
628 let data = w.finish();
629 let r = ColumnReader::open(&data);
630 for (doc, exp) in expected.iter().enumerate() {
631 assert_eq!(
632 r.keyword_value(doc as u32),
633 exp.as_deref(),
634 "doc {doc} mismatch"
635 );
636 }
637 }
638
639 #[test]
641 fn keyword_single_entry() {
642 let mut w = ColumnWriter::new(FieldId::new(0));
643 w.add(ColumnValue::Keyword("only".into()));
644 let data = w.finish();
645 let r = ColumnReader::open(&data);
646 assert_eq!(r.dict_size(), 1);
647 assert_eq!(r.ordinal_to_string(0), Some("only"));
648 assert_eq!(r.ordinal_to_string(1), None);
649 assert_eq!(r.keyword_value(0), Some("only"));
650 }
651
652 #[test]
656 fn keyword_none_column() {
657 let w = ColumnWriter::new(FieldId::new(0));
658 let data = w.finish(); let r = ColumnReader::open(&data);
660 assert!(matches!(r.keyword, KeywordDict::None));
661 assert_eq!(r.dict_size(), 0);
662 assert_eq!(r.ordinal_to_string(0), None);
663 assert_eq!(r.keyword_value(0), None);
664
665 let mut cw = ColumnarWriter::new();
668 cw.add(FieldId::new(0), ColumnValue::Keyword("alpha".into()));
669 cw.add(FieldId::new(1), ColumnValue::Keyword("beta".into()));
670 cw.pad_to(1);
671 let cdata = cw.finish();
672 let cr = ColumnarReader::open(&cdata);
673 assert_eq!(
674 cr.column(FieldId::new(0)).unwrap().keyword_value(0),
675 Some("alpha")
676 );
677 assert_eq!(
678 cr.column(FieldId::new(1)).unwrap().keyword_value(0),
679 Some("beta")
680 );
681 }
682
683 #[test]
686 fn legacy_keyword_column_still_reads() {
687 let data = write_legacy_keyword_column(0, &["cherry", "apple", "banana", "apple"]);
688 let r = ColumnReader::open(&data);
689 assert_eq!(r.col_type(), ColumnType::Keyword);
690 assert!(matches!(r.keyword, KeywordDict::Eager(_)));
691 assert_eq!(r.dict_size(), 3); assert_eq!(r.keyword_value(0), Some("cherry"));
693 assert_eq!(r.keyword_value(1), Some("apple"));
694 assert_eq!(r.keyword_value(2), Some("banana"));
695 assert_eq!(r.keyword_value(3), Some("apple"));
696 assert_eq!(r.ordinal_to_string(0), Some("apple"));
697 assert_eq!(r.ordinal_to_string(2), Some("cherry"));
698 }
699
700 #[test]
703 fn block_boundary_exhaustive() {
704 let k = DICT_BLOCK_SIZE;
705 for &n in &[k, k + 1, 2 * k] {
706 let values: Vec<String> = (0..n).map(|i| format!("k{i:05}")).collect();
707 let mut w = ColumnWriter::new(FieldId::new(0));
708 for v in &values {
709 w.add(ColumnValue::Keyword(v.clone()));
710 }
711 let data = w.finish();
712 let r = ColumnReader::open(&data);
713 assert_eq!(r.dict_size(), n, "N={n} dict_size");
714 for (i, v) in values.iter().enumerate() {
716 assert_eq!(
717 r.ordinal_to_string(i as u32),
718 Some(v.as_str()),
719 "N={n} ordinal {i}"
720 );
721 }
722 assert_eq!(r.ordinal_to_string(n as u32), None, "N={n} out-of-range");
723 }
724 }
725
726 #[test]
729 fn ensure_dict_builds_once() {
730 let values: Vec<String> = (0..200).map(|i| format!("v{i:04}")).collect();
731 let mut w = ColumnWriter::new(FieldId::new(0));
732 for v in &values {
733 w.add(ColumnValue::Keyword(v.clone()));
734 }
735 let data = w.finish();
736 let r = ColumnReader::open(&data);
737
738 assert_eq!(r.dict_size(), 200);
740
741 DICT_BUILDS.with(|c| c.set(0));
743 for _ in 0..1000 {
744 r.ensure_dict();
745 }
746 assert_eq!(DICT_BUILDS.with(|c| c.get()), 1);
747
748 for i in 0..5000u32 {
750 let ord = i % 200;
751 assert_eq!(
752 r.ordinal_to_string(ord),
753 Some(values[ord as usize].as_str())
754 );
755 }
756 assert_eq!(DICT_BUILDS.with(|c| c.get()), 1);
757 }
758
759 #[test]
762 #[should_panic(expected = "valid UTF-8")]
763 fn blocked_corrupt_entry_fails_loud() {
764 let mut w = ColumnWriter::new(FieldId::new(0));
765 for i in 0..10 {
766 w.add(ColumnValue::Keyword(format!("value{i}")));
767 }
768 let mut data = w.finish();
769 let pos = data
772 .iter()
773 .position(|&b| b == b'v')
774 .expect("dict body contains 'v'");
775 data[pos] = 0xFF;
776 let r = ColumnReader::open(&data);
777 for ord in 0..r.dict_size() as u32 {
778 let _ = r.ordinal_to_string(ord);
779 }
780 }
781}