1use std::collections::HashMap;
10use std::sync::RwLock;
11
12use crate::core::{FieldId, Result, SegmentId};
13
14use crate::inverted::norms::FieldNormsReader;
15use crate::inverted::postings::{
16 BlockMaxPostingListReader, PositionPostingListReader, PostingListReader, has_block_max,
17 has_positions,
18};
19use crate::inverted::term_dict::TermDict;
20use crate::segment::format::{ComponentType, SegmentHeader};
21use crate::spatial::geo::GeoPointStore;
22use crate::store::doc_store::DocStoreReader;
23
24struct FieldIndex<'a> {
26 term_dict: TermDict<'a>,
27 postings_data: &'a [u8],
28 norms_data: Option<&'a [u8]>,
29}
30
31pub struct SegmentReader {
35 data: Vec<u8>,
36 header: SegmentHeader,
37 #[allow(dead_code)]
38 header_size: usize,
39 geo_cache: RwLock<HashMap<FieldId, GeoPointStore>>,
41 geo_shape_cache: RwLock<HashMap<FieldId, crate::spatial::shape::GeoShapeStore>>,
43}
44
45impl SegmentReader {
46 pub fn open(data: Vec<u8>) -> Result<Self> {
51 let (header, header_size) = SegmentHeader::from_bytes(&data)?;
52 Ok(Self {
53 data,
54 header,
55 header_size,
56 geo_cache: RwLock::new(HashMap::new()),
57 geo_shape_cache: RwLock::new(HashMap::new()),
58 })
59 }
60
61 pub fn segment_id(&self) -> SegmentId {
62 self.header.segment_id
63 }
64
65 pub fn doc_count(&self) -> u32 {
66 self.header.doc_count
67 }
68
69 pub fn max_doc(&self) -> u32 {
70 self.header.max_doc
71 }
72
73 pub fn header(&self) -> &SegmentHeader {
74 &self.header
75 }
76
77 pub fn doc_store(&self) -> DocStoreReader<'_> {
79 let comp = self
80 .header
81 .component(ComponentType::DocStore)
82 .expect("segment must have a DocStore component");
83 let start = comp.offset as usize;
84 let end = start + comp.length as usize;
85 DocStoreReader::open(&self.data[start..end])
86 }
87
88 pub fn postings(&self, field_id: FieldId, term: &str) -> Option<PostingListReader<'_>> {
94 let field_index = self.field_index(field_id)?;
95 let posting_offset = field_index.term_dict.get(term)?;
96 let postings_data = &field_index.postings_data[posting_offset as usize..];
97 Some(PostingListReader::new(postings_data))
98 }
99
100 pub fn postings_block_max(
103 &self,
104 field_id: FieldId,
105 term: &str,
106 ) -> Option<BlockMaxPostingListReader<'_>> {
107 let field_index = self.field_index(field_id)?;
108 let posting_offset = field_index.term_dict.get(term)?;
109 let postings_data = &field_index.postings_data[posting_offset as usize..];
110 if has_block_max(postings_data) {
111 Some(BlockMaxPostingListReader::new(postings_data))
112 } else {
113 None
114 }
115 }
116
117 pub fn postings_with_positions(
122 &self,
123 field_id: FieldId,
124 term: &str,
125 ) -> Option<PositionPostingListReader<'_>> {
126 let field_index = self.field_index(field_id)?;
127 let posting_offset = field_index.term_dict.get(term)?;
128 let postings_data = &field_index.postings_data[posting_offset as usize..];
129 if has_positions(postings_data) {
130 Some(PositionPostingListReader::new(postings_data))
131 } else {
132 None
133 }
134 }
135
136 pub fn terms_with_prefix(&self, field_id: FieldId, prefix: &str) -> Vec<(String, u32)> {
138 let Some(field_index) = self.field_index(field_id) else {
139 return Vec::new();
140 };
141 field_index
142 .term_dict
143 .prefix_iter(prefix)
144 .into_iter()
145 .map(|(term, offset)| {
146 let postings_data = &field_index.postings_data[offset as usize..];
147 let reader = PostingListReader::new(postings_data);
148 (term, reader.len())
149 })
150 .collect()
151 }
152
153 pub fn automaton_search<A: fst::Automaton>(
156 &self,
157 field_id: FieldId,
158 aut: A,
159 ) -> Vec<(String, u32)> {
160 let Some(field_index) = self.field_index(field_id) else {
161 return Vec::new();
162 };
163 field_index
164 .term_dict
165 .automaton_search(aut)
166 .into_iter()
167 .map(|(term, offset)| {
168 let postings_data = &field_index.postings_data[offset as usize..];
169 let reader = PostingListReader::new(postings_data);
170 (term, reader.len())
171 })
172 .collect()
173 }
174
175 pub fn doc_freq(&self, field_id: FieldId, term: &str) -> u32 {
177 match self.postings(field_id, term) {
178 Some(reader) => reader.len(),
179 None => 0,
180 }
181 }
182
183 pub fn parent_bitset(&self) -> Option<&[bool]> {
185 self.header.parent_bitset.as_deref()
186 }
187
188 pub fn geo_points(&self, field_id: FieldId) -> Option<GeoPointStore> {
193 {
195 let cache = self.geo_cache.read().unwrap();
196 if let Some(store) = cache.get(&field_id) {
197 return Some(store.clone());
198 }
199 }
200
201 let comp = self.header.component(ComponentType::Spatial)?;
203 let start = comp.offset as usize;
204 let spatial_data = &self.data[start..start + comp.length as usize];
205
206 let num_fields = u16::from_le_bytes([spatial_data[0], spatial_data[1]]) as usize;
207 let mut pos = 2;
208 for _ in 0..num_fields {
209 let fid = FieldId::new(u16::from_le_bytes([
210 spatial_data[pos],
211 spatial_data[pos + 1],
212 ]));
213 pos += 2;
214 let sub_type = spatial_data[pos];
215 pos += 1;
216 let data_len =
217 u32::from_le_bytes(spatial_data[pos..pos + 4].try_into().unwrap()) as usize;
218 pos += 4;
219
220 if fid == field_id && sub_type == 0 {
221 let store = GeoPointStore::from_bytes(&spatial_data[pos..pos + data_len]);
222 self.geo_cache
223 .write()
224 .unwrap()
225 .insert(field_id, store.clone());
226 return Some(store);
227 }
228 pos += data_len;
229 }
230 None
231 }
232
233 pub fn geo_shapes(&self, field_id: FieldId) -> Option<crate::spatial::shape::GeoShapeStore> {
237 {
238 let cache = self.geo_shape_cache.read().unwrap();
239 if let Some(store) = cache.get(&field_id) {
240 return Some(store.clone());
241 }
242 }
243
244 let comp = self.header.component(ComponentType::Spatial)?;
245 let start = comp.offset as usize;
246 let spatial_data = &self.data[start..start + comp.length as usize];
247
248 let num_fields = u16::from_le_bytes([spatial_data[0], spatial_data[1]]) as usize;
249 let mut pos = 2;
250 for _ in 0..num_fields {
251 let fid = FieldId::new(u16::from_le_bytes([
252 spatial_data[pos],
253 spatial_data[pos + 1],
254 ]));
255 pos += 2;
256 let sub_type = spatial_data[pos];
257 pos += 1;
258 let data_len =
259 u32::from_le_bytes(spatial_data[pos..pos + 4].try_into().unwrap()) as usize;
260 pos += 4;
261
262 if fid == field_id && sub_type == 1 {
263 let store = crate::spatial::shape::GeoShapeStore::from_bytes(
264 &spatial_data[pos..pos + data_len],
265 );
266 self.geo_shape_cache
267 .write()
268 .unwrap()
269 .insert(field_id, store.clone());
270 return Some(store);
271 }
272 pos += data_len;
273 }
274 None
275 }
276
277 pub fn column(&self, field_id: FieldId) -> Option<crate::columnar::reader::ColumnReader<'_>> {
279 let comp = self.header.component(ComponentType::Columnar)?;
280 let start = comp.offset as usize;
281 let end = start + comp.length as usize;
282 let columnar = crate::columnar::reader::ColumnarReader::open(&self.data[start..end]);
283 columnar.column(field_id)
284 }
285
286 pub fn norms(&self, field_id: FieldId) -> Option<FieldNormsReader<'_>> {
288 let field_index = self.field_index(field_id)?;
289 let norms_data = field_index.norms_data?;
290 Some(FieldNormsReader::open(norms_data))
291 }
292
293 pub fn avg_field_length(&self, field_id: FieldId) -> f32 {
295 match self.norms(field_id) {
296 Some(norms_reader) => {
297 if norms_reader.doc_count() == 0 {
298 return 0.0;
299 }
300 let mut total = 0.0f64;
301 for i in 0..norms_reader.doc_count() {
302 total += norms_reader.norm(crate::core::DocId::new(i)) as f64;
303 }
304 (total / norms_reader.doc_count() as f64) as f32
305 }
306 None => 0.0,
307 }
308 }
309
310 fn field_index(&self, field_id: FieldId) -> Option<FieldIndex<'_>> {
312 let comp = self.header.component(ComponentType::InvertedIndex)?;
313 let inv_start = comp.offset as usize;
314 let inv_data = &self.data[inv_start..inv_start + comp.length as usize];
315
316 let num_fields = u16::from_le_bytes([inv_data[0], inv_data[1]]) as usize;
318 let mut pos = 2;
319
320 for _ in 0..num_fields {
321 let fid = FieldId::new(u16::from_le_bytes([inv_data[pos], inv_data[pos + 1]]));
322 pos += 2;
323
324 let td_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
326 pos += 4;
327 let td_data = &inv_data[pos..pos + td_len];
328 pos += td_len;
329
330 let pd_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
332 pos += 4;
333 let pd_data = &inv_data[pos..pos + pd_len];
334 pos += pd_len;
335
336 let has_norms = inv_data[pos] != 0;
338 pos += 1;
339 let norms_data = if has_norms {
340 let n_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
341 pos += 4;
342 let nd = &inv_data[pos..pos + n_len];
343 pos += n_len;
344 Some(nd)
345 } else {
346 None
347 };
348
349 if fid == field_id {
350 return Some(FieldIndex {
351 term_dict: TermDict::open(td_data),
352 postings_data: pd_data,
353 norms_data,
354 });
355 }
356 }
357
358 None
359 }
360
361 pub fn terms(&self, field_id: FieldId) -> Vec<String> {
363 let Some(field_index) = self.field_index(field_id) else {
364 return Vec::new();
365 };
366 let td = &field_index.term_dict;
372 let mut result = Vec::new();
373
374 if td.len() == 0 {
377 return result;
378 }
379
380 let comp = self.header.component(ComponentType::InvertedIndex).unwrap();
384 let inv_start = comp.offset as usize;
385 let inv_data = &self.data[inv_start..inv_start + comp.length as usize];
386 let mut pos = 2;
387
388 for _ in 0..u16::from_le_bytes([inv_data[0], inv_data[1]]) {
389 let fid = FieldId::new(u16::from_le_bytes([inv_data[pos], inv_data[pos + 1]]));
390 pos += 2;
391
392 let td_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
393 pos += 4;
394 let td_data = &inv_data[pos..pos + td_len];
395 pos += td_len;
396
397 let pd_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
398 pos += 4 + pd_len;
399
400 let has_norms = inv_data[pos] != 0;
401 pos += 1;
402 if has_norms {
403 let n_len = u32::from_le_bytes(inv_data[pos..pos + 4].try_into().unwrap()) as usize;
404 pos += 4 + n_len;
405 }
406
407 if fid == field_id {
408 let td = crate::inverted::term_dict::TermDict::open(td_data);
410 for (term, _) in td.prefix_iter("") {
411 result.push(term);
412 }
413 break;
414 }
415 }
416
417 result
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use crate::analysis::Token;
425 use crate::core::DocId;
426 use crate::mapping::{FieldType, Mapping};
427 use crate::segment::builder::SegmentBuilder;
428
429 fn make_tokens(terms: &[&str]) -> Vec<Token> {
430 terms
431 .iter()
432 .enumerate()
433 .map(|(i, t)| Token::new(*t, 0, t.len(), i as u32))
434 .collect()
435 }
436
437 fn build_single_doc_segment() -> Vec<u8> {
438 let schema = Mapping::builder().field("title", FieldType::Text).build();
439 let mut builder = SegmentBuilder::new(SegmentId::new(1), &schema);
440 builder.add_document(
441 &[(FieldId::new(0), make_tokens(&["hello", "world"]))],
442 br#"{"title":"hello world"}"#,
443 );
444 builder.build()
445 }
446
447 #[test]
448 fn open_valid_segment() {
449 let data = build_single_doc_segment();
450 let reader = SegmentReader::open(data).unwrap();
451 assert_eq!(reader.segment_id(), SegmentId::new(1));
452 assert_eq!(reader.doc_count(), 1);
453 }
454
455 #[test]
456 fn reject_invalid_magic() {
457 let mut data = build_single_doc_segment();
458 data[0] = b'X';
459 assert!(SegmentReader::open(data).is_err());
460 }
461
462 #[test]
463 fn reject_bad_checksum() {
464 let mut data = build_single_doc_segment();
465 data[12] ^= 0xFF; assert!(SegmentReader::open(data).is_err());
468 }
469
470 #[test]
471 fn term_lookup() {
472 let data = build_single_doc_segment();
473 let reader = SegmentReader::open(data).unwrap();
474
475 let terms = reader.terms(FieldId::new(0));
477 assert!(terms.contains(&"hello".to_string()));
478 assert!(terms.contains(&"world".to_string()));
479 }
480
481 #[test]
482 fn posting_iteration() {
483 let data = build_single_doc_segment();
484 let reader = SegmentReader::open(data).unwrap();
485
486 let mut postings = reader.postings(FieldId::new(0), "hello").unwrap();
487 let (doc_id, tf) = postings.next().unwrap();
488 assert_eq!(doc_id, DocId::new(0));
489 assert_eq!(tf, 1);
490 assert!(postings.next().is_none());
491 }
492
493 #[test]
494 fn doc_retrieval() {
495 let data = build_single_doc_segment();
496 let reader = SegmentReader::open(data).unwrap();
497 let store = reader.doc_store();
498 let doc = store.get(0).unwrap();
499 assert_eq!(doc, br#"{"title":"hello world"}"#);
500 }
501
502 #[test]
503 fn norms_lookup() {
504 let data = build_single_doc_segment();
505 let reader = SegmentReader::open(data).unwrap();
506
507 let norms = reader.norms(FieldId::new(0)).unwrap();
508 assert_eq!(norms.norm(DocId::new(0)), 2.0);
510 }
511
512 #[test]
513 fn missing_term_returns_none() {
514 let data = build_single_doc_segment();
515 let reader = SegmentReader::open(data).unwrap();
516 assert!(reader.postings(FieldId::new(0), "nonexistent").is_none());
517 }
518
519 #[test]
520 fn missing_field_returns_none() {
521 let data = build_single_doc_segment();
522 let reader = SegmentReader::open(data).unwrap();
523 assert!(reader.postings(FieldId::new(99), "hello").is_none());
524 }
525
526 #[test]
527 fn end_to_end_multi_doc() {
528 let schema = Mapping::builder()
529 .field("body", FieldType::Text)
530 .field("tag", FieldType::Keyword)
531 .build();
532 let mut builder = SegmentBuilder::new(SegmentId::new(42), &schema);
533
534 builder.add_document(
535 &[
536 (
537 FieldId::new(0),
538 make_tokens(&["the", "quick", "brown", "fox"]),
539 ),
540 (FieldId::new(1), make_tokens(&["animal"])),
541 ],
542 br#"{"body":"the quick brown fox","tag":"animal"}"#,
543 );
544 builder.add_document(
545 &[
546 (FieldId::new(0), make_tokens(&["the", "lazy", "dog"])),
547 (FieldId::new(1), make_tokens(&["animal"])),
548 ],
549 br#"{"body":"the lazy dog","tag":"animal"}"#,
550 );
551 builder.add_document(
552 &[
553 (FieldId::new(0), make_tokens(&["quick", "search", "engine"])),
554 (FieldId::new(1), make_tokens(&["tech"])),
555 ],
556 br#"{"body":"quick search engine","tag":"tech"}"#,
557 );
558
559 let data = builder.build();
560 let reader = SegmentReader::open(data).unwrap();
561
562 assert_eq!(reader.doc_count(), 3);
563
564 let mut postings = reader.postings(FieldId::new(0), "the").unwrap();
566 assert_eq!(postings.next().unwrap().0, DocId::new(0));
567 assert_eq!(postings.next().unwrap().0, DocId::new(1));
568 assert!(postings.next().is_none());
569
570 let mut postings = reader.postings(FieldId::new(0), "quick").unwrap();
572 assert_eq!(postings.next().unwrap().0, DocId::new(0));
573 assert_eq!(postings.next().unwrap().0, DocId::new(2));
574 assert!(postings.next().is_none());
575
576 let mut postings = reader.postings(FieldId::new(1), "animal").unwrap();
578 assert_eq!(postings.next().unwrap().0, DocId::new(0));
579 assert_eq!(postings.next().unwrap().0, DocId::new(1));
580 assert!(postings.next().is_none());
581
582 let store = reader.doc_store();
584 let doc0: serde_json::Value = serde_json::from_slice(&store.get(0).unwrap()).unwrap();
585 assert_eq!(doc0["tag"], "animal");
586 let doc2: serde_json::Value = serde_json::from_slice(&store.get(2).unwrap()).unwrap();
587 assert_eq!(doc2["tag"], "tech");
588
589 let norms = reader.norms(FieldId::new(0)).unwrap();
591 assert_eq!(norms.norm(DocId::new(0)), 4.0);
592 assert_eq!(norms.norm(DocId::new(1)), 3.0);
593 assert_eq!(norms.norm(DocId::new(2)), 3.0);
594
595 assert_eq!(reader.doc_freq(FieldId::new(0), "the"), 2);
597 assert_eq!(reader.doc_freq(FieldId::new(0), "quick"), 2);
598 assert_eq!(reader.doc_freq(FieldId::new(0), "fox"), 1);
599 assert_eq!(reader.doc_freq(FieldId::new(0), "missing"), 0);
600 }
601}