use std::collections::HashMap;
use crate::block::CompactPosting;
use crate::codec::smallfloat;
use super::merge;
use super::segment::{reader::SegmentReader, writer};
pub struct WorkerResult {
pub term_postings: HashMap<String, Vec<CompactPosting>>,
}
impl WorkerResult {
pub fn new() -> Self {
Self {
term_postings: HashMap::new(),
}
}
pub fn insert(&mut self, term: &str, posting: CompactPosting) {
self.term_postings
.entry(term.to_string())
.or_default()
.push(posting);
}
pub fn flush_to_segment(self) -> Vec<u8> {
writer::flush_to_segment(self.term_postings)
}
}
impl Default for WorkerResult {
fn default() -> Self {
Self::new()
}
}
pub fn merge_worker_segments(worker_segments: Vec<Vec<u8>>) -> Vec<u8> {
let readers: Vec<SegmentReader> = worker_segments
.into_iter()
.filter_map(SegmentReader::open)
.collect();
if readers.is_empty() {
return writer::build_from_blocks(&[]);
}
let merged_term_blocks = merge::merge_segments(&readers);
writer::build_from_blocks(&merged_term_blocks)
}
pub fn partition_doc_range(total_docs: u32, num_workers: u32) -> Vec<(u32, u32)> {
if total_docs == 0 || num_workers == 0 {
return Vec::new();
}
let chunk = total_docs / num_workers;
let remainder = total_docs % num_workers;
let mut ranges = Vec::with_capacity(num_workers as usize);
let mut start = 0;
for i in 0..num_workers {
let extra = if i < remainder { 1 } else { 0 };
let end = start + chunk + extra;
ranges.push((start, end));
start = end;
}
ranges
}
pub fn make_compact_posting(
doc_id: u32,
term_freq: u32,
doc_len: u32,
positions: Vec<u32>,
) -> CompactPosting {
CompactPosting {
doc_id,
term_freq,
fieldnorm: smallfloat::encode(doc_len),
positions,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn partition_even() {
let ranges = partition_doc_range(100, 4);
assert_eq!(ranges, vec![(0, 25), (25, 50), (50, 75), (75, 100)]);
}
#[test]
fn partition_uneven() {
let ranges = partition_doc_range(10, 3);
assert_eq!(ranges, vec![(0, 4), (4, 7), (7, 10)]);
}
#[test]
fn partition_zero() {
assert!(partition_doc_range(0, 4).is_empty());
assert!(partition_doc_range(10, 0).is_empty());
}
#[test]
fn worker_result_flush_and_merge() {
let mut w1 = WorkerResult::new();
w1.insert("hello", make_compact_posting(0, 2, 100, vec![0, 3]));
w1.insert("hello", make_compact_posting(1, 1, 50, vec![5]));
w1.insert("world", make_compact_posting(0, 1, 100, vec![1]));
let mut w2 = WorkerResult::new();
w2.insert("hello", make_compact_posting(2, 1, 80, vec![0]));
w2.insert("foo", make_compact_posting(3, 3, 120, vec![0, 2, 7]));
let seg1 = w1.flush_to_segment();
let seg2 = w2.flush_to_segment();
let merged = merge_worker_segments(vec![seg1, seg2]);
let reader = SegmentReader::open(merged).unwrap();
assert_eq!(reader.num_terms(), 3); assert_eq!(reader.df("hello"), 3); assert_eq!(reader.df("world"), 1); assert_eq!(reader.df("foo"), 1); }
#[test]
fn merge_empty_workers() {
let merged = merge_worker_segments(Vec::new());
let reader = SegmentReader::open(merged).unwrap();
assert_eq!(reader.num_terms(), 0);
}
}