use std::collections::HashMap;
use std::env;
use std::io;
use std::sync::{Arc, Mutex};
use std::thread;
use log::debug;
use crate::codecs::lucene90::compound;
use crate::codecs::lucene99::segment_info_format;
use crate::codecs::lucene99::segment_info_format::SegmentInfoFieldData;
use crate::document::Document;
use crate::index::config::IndexWriterConfig;
use crate::index::index_file_names::{self, radix36 as radix_fmt};
use crate::index::pipeline::channel::{self, Receiver, Sender};
use crate::index::pipeline::flush_control::FlushControl;
use crate::index::pipeline::id_generator::IdGenerator;
use crate::index::pipeline::segment_context::SegmentContext;
use crate::index::pipeline::segment_worker::SegmentWorker;
use crate::index::segment::{FlushedSegment, SegmentId};
use crate::index::segment_infos::SegmentInfos;
use crate::store::memory::MemoryIndexOutput;
use crate::store::{Directory, SegmentFile, SharedDirectory};
pub trait WorkerFactory: Send + Sync {
fn create_worker(&self, segment_id: SegmentId) -> (SegmentWorker, SegmentContext);
}
struct WorkerSource {
state: Mutex<WorkerSourceState>,
factory: Arc<dyn WorkerFactory>,
}
struct WorkerSourceState {
next_segment_num: u64,
id_generator: Box<dyn IdGenerator>,
}
impl WorkerSource {
fn new(id_generator: Box<dyn IdGenerator>, factory: Arc<dyn WorkerFactory>) -> Self {
Self {
state: Mutex::new(WorkerSourceState {
next_segment_num: 0,
id_generator,
}),
factory,
}
}
fn create_worker(&self) -> (SegmentWorker, SegmentContext) {
let segment_id = {
let mut state = self.state.lock().unwrap();
let name = format!("_{}", radix_fmt(state.next_segment_num));
state.next_segment_num += 1;
SegmentId {
name,
id: state.id_generator.next_id(),
}
};
self.factory.create_worker(segment_id)
}
}
fn worker_thread_loop(
doc_rx: Receiver,
worker_source: Arc<WorkerSource>,
flush_control: Arc<FlushControl>,
worker_id: usize,
) -> io::Result<Vec<FlushedSegment>> {
let mut segments = Vec::new();
let (mut worker, mut context) = worker_source.create_worker();
loop {
flush_control.wait_if_stalled();
let Some(doc) = doc_rx.recv() else { break };
worker.add_document(doc, &context)?;
flush_control.after_document(worker_id, worker.ram_bytes_used() as u64);
if flush_control.should_flush(worker_id) {
segments.push(worker.flush(&context)?);
flush_control.reset_worker(worker_id);
let (new_worker, new_context) = worker_source.create_worker();
worker = new_worker;
context = new_context;
}
}
let flushed = worker.flush(&context)?;
flush_control.reset_worker(worker_id);
if flushed.doc_count > 0 {
segments.push(flushed);
}
Ok(segments)
}
fn package_compound_segment(
segment: &mut FlushedSegment,
directory: &dyn Directory,
) -> io::Result<()> {
let seg_name = &segment.segment_id.name;
let si_name = index_file_names::segment_file_name(seg_name, "", "si");
let cfs_name = index_file_names::segment_file_name(seg_name, "", "cfs");
let cfe_name = index_file_names::segment_file_name(seg_name, "", "cfe");
let sub_file_names: Vec<&String> = segment
.file_names
.iter()
.filter(|f| !f.ends_with(".si"))
.collect();
let sub_files: Vec<SegmentFile> = sub_file_names
.iter()
.map(|name| {
let data = directory.read_file(name)?;
Ok(SegmentFile {
name: (*name).clone(),
data,
})
})
.collect::<io::Result<Vec<_>>>()?;
let mut cfs_out = MemoryIndexOutput::new(cfs_name.clone());
let cfe = compound::write_to(seg_name, &segment.segment_id.id, &sub_files, &mut cfs_out)?;
directory.write_file(&cfs_name, cfs_out.bytes())?;
directory.write_file(&cfe.name, &cfe.data)?;
for name in &sub_file_names {
directory.delete_file(name)?;
}
let compound_files = vec![si_name.clone(), cfs_name.clone(), cfe_name.clone()];
let mut diagnostics = HashMap::new();
diagnostics.insert("source".to_string(), "flush".to_string());
diagnostics.insert("os.name".to_string(), env::consts::OS.to_string());
diagnostics.insert("os.arch".to_string(), env::consts::ARCH.to_string());
let mut attributes = HashMap::new();
attributes.insert(
"Lucene90StoredFieldsFormat.mode".to_string(),
"BEST_SPEED".to_string(),
);
let si = SegmentInfoFieldData {
name: seg_name.clone(),
max_doc: segment.doc_count,
is_compound_file: true,
id: segment.segment_id.id,
diagnostics,
attributes,
has_blocks: false,
};
directory.delete_file(&si_name)?;
segment_info_format::write(directory, &si, &compound_files)?;
debug!(
"compound: packaged {} ({} sub-files → .cfs/.cfe)",
seg_name,
sub_file_names.len()
);
segment.file_names = compound_files;
Ok(())
}
pub struct IndexCoordinator {
sender: Sender,
workers: Vec<thread::JoinHandle<io::Result<Vec<FlushedSegment>>>>,
use_compound_file: bool,
directory: SharedDirectory,
segment_infos: SegmentInfos,
}
impl IndexCoordinator {
pub fn new(
config: &IndexWriterConfig,
id_generator: Box<dyn IdGenerator>,
directory: SharedDirectory,
worker_factory: Arc<dyn WorkerFactory>,
) -> Self {
let queue_capacity = config.get_num_threads();
let (sender, receiver) = channel::bounded(queue_capacity);
let worker_source = Arc::new(WorkerSource::new(id_generator, worker_factory));
let flush_control = Arc::new(FlushControl::new(
config.get_num_threads(),
config.get_ram_buffer_size_mb(),
config.get_max_buffered_docs(),
));
let mut workers = Vec::with_capacity(config.get_num_threads());
for worker_id in 0..config.get_num_threads() {
let rx = receiver.clone();
let source = Arc::clone(&worker_source);
let fc = Arc::clone(&flush_control);
let handle = thread::spawn(move || worker_thread_loop(rx, source, fc, worker_id));
workers.push(handle);
}
Self {
sender,
workers,
use_compound_file: config.get_use_compound_file(),
directory,
segment_infos: SegmentInfos::new(),
}
}
pub fn add_document(&self, doc: Document) -> io::Result<()> {
self.sender.send(doc)
}
pub fn shutdown(mut self) -> io::Result<Vec<FlushedSegment>> {
drop(self.sender);
let mut all_segments = Vec::new();
for handle in self.workers {
match handle.join() {
Ok(result) => all_segments.extend(result?),
Err(_) => return Err(io::Error::other("worker thread panicked")),
}
}
if self.use_compound_file {
for segment in &mut all_segments {
package_compound_segment(segment, &*self.directory)?;
}
}
if !all_segments.is_empty() {
for segment in &all_segments {
self.segment_infos.add(segment.clone());
}
self.segment_infos.commit(&*self.directory)?;
}
Ok(all_segments)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assertables::*;
use std::collections::HashSet;
use crate::analysis::StandardAnalyzer;
use crate::analysis::Token;
use crate::index::field::Field;
use crate::index::pipeline::consumer::{FieldConsumer, TokenInterest};
use crate::index::pipeline::flush_control::FlushControl;
use crate::index::pipeline::id_generator::RandomIdGenerator;
use crate::index::pipeline::segment_accumulator::SegmentAccumulator;
use crate::store::MemoryDirectory;
fn disabled_flush_control() -> Arc<FlushControl> {
Arc::new(FlushControl::new(1, 0.0, -1))
}
struct SequentialIdGenerator(u8);
impl IdGenerator for SequentialIdGenerator {
fn next_id(&mut self) -> [u8; 16] {
let id = [self.0; 16];
self.0 += 1;
id
}
}
struct NoOpConsumer;
impl mem_dbg::MemSize for NoOpConsumer {
fn mem_size_rec(
&self,
_flags: mem_dbg::SizeFlags,
_refs: &mut mem_dbg::HashMap<usize, usize>,
) -> usize {
0
}
}
impl FieldConsumer for NoOpConsumer {
fn start_document(&mut self, _doc_id: i32) -> io::Result<()> {
Ok(())
}
fn start_field(
&mut self,
_field_id: u32,
_field: &Field,
_acc: &mut SegmentAccumulator,
) -> io::Result<TokenInterest> {
Ok(TokenInterest::NoTokens)
}
fn add_token(
&mut self,
_field_id: u32,
_field: &Field,
_token: &Token<'_>,
_acc: &mut SegmentAccumulator,
) -> io::Result<()> {
Ok(())
}
fn finish_field(
&mut self,
_field_id: u32,
_field: &Field,
_acc: &mut SegmentAccumulator,
) -> io::Result<()> {
Ok(())
}
fn finish_document(
&mut self,
_doc_id: i32,
_acc: &mut SegmentAccumulator,
_context: &SegmentContext,
) -> io::Result<()> {
Ok(())
}
fn flush(
&mut self,
_context: &SegmentContext,
_acc: &SegmentAccumulator,
) -> io::Result<Vec<String>> {
Ok(vec![])
}
}
struct NoOpWorkerFactory {
directory: SharedDirectory,
}
impl WorkerFactory for NoOpWorkerFactory {
fn create_worker(&self, segment_id: SegmentId) -> (SegmentWorker, SegmentContext) {
let context = SegmentContext {
directory: Arc::clone(&self.directory),
segment_name: segment_id.name.clone(),
segment_id: segment_id.id,
};
let worker = SegmentWorker::new(
segment_id,
vec![Box::new(NoOpConsumer)],
Box::new(StandardAnalyzer::default()),
);
(worker, context)
}
}
struct StoredFieldsWorkerFactory {
directory: SharedDirectory,
}
impl WorkerFactory for StoredFieldsWorkerFactory {
fn create_worker(&self, segment_id: SegmentId) -> (SegmentWorker, SegmentContext) {
use crate::index::pipeline::field_infos_consumer::FieldInfosConsumer;
use crate::index::pipeline::stored_fields_consumer::StoredFieldsConsumer;
let context = SegmentContext {
directory: Arc::clone(&self.directory),
segment_name: segment_id.name.clone(),
segment_id: segment_id.id,
};
let consumers: Vec<Box<dyn FieldConsumer>> = vec![
Box::new(StoredFieldsConsumer::new()),
Box::new(FieldInfosConsumer::new()),
];
let worker =
SegmentWorker::new(segment_id, consumers, Box::new(StandardAnalyzer::default()));
(worker, context)
}
}
#[test]
fn worker_source_creates_sequential_segment_names() {
let dir: SharedDirectory = MemoryDirectory::create();
let factory = Arc::new(NoOpWorkerFactory {
directory: Arc::clone(&dir),
});
let source = WorkerSource::new(Box::new(RandomIdGenerator), factory);
let (_, ctx0) = source.create_worker();
let (_, ctx1) = source.create_worker();
let (_, ctx2) = source.create_worker();
assert_eq!(ctx0.segment_name, "_0");
assert_eq!(ctx1.segment_name, "_1");
assert_eq!(ctx2.segment_name, "_2");
}
#[test]
fn worker_source_creates_unique_segment_ids() {
let dir: SharedDirectory = MemoryDirectory::create();
let factory = Arc::new(NoOpWorkerFactory {
directory: Arc::clone(&dir),
});
let source = WorkerSource::new(Box::new(RandomIdGenerator), factory);
let (_, ctx0) = source.create_worker();
let (_, ctx1) = source.create_worker();
assert_ne!(ctx0.segment_id, ctx1.segment_id);
}
fn make_doc() -> Document {
use crate::document::DocumentBuilder;
use crate::index::field::stored;
DocumentBuilder::new()
.add_field(stored("f").string("v"))
.build()
}
#[test]
fn thread_loop_flushes_on_channel_close() {
let dir: SharedDirectory = MemoryDirectory::create();
let factory: Arc<dyn WorkerFactory> = Arc::new(NoOpWorkerFactory {
directory: Arc::clone(&dir),
});
let source = Arc::new(WorkerSource::new(
Box::new(SequentialIdGenerator(0)),
factory,
));
let (tx, rx) = channel::bounded(4);
tx.send(make_doc()).unwrap();
tx.send(make_doc()).unwrap();
drop(tx);
let segments = worker_thread_loop(rx, source, disabled_flush_control(), 0).unwrap();
assert_eq!(segments.len(), 1);
assert_eq!(segments[0].doc_count, 2);
}
#[test]
fn thread_loop_mid_flush_creates_replacement() {
let dir: SharedDirectory = MemoryDirectory::create();
let factory: Arc<dyn WorkerFactory> = Arc::new(NoOpWorkerFactory {
directory: Arc::clone(&dir),
});
let source = Arc::new(WorkerSource::new(
Box::new(SequentialIdGenerator(0)),
factory,
));
let (tx, rx) = channel::bounded(10);
for _ in 0..7 {
tx.send(make_doc()).unwrap();
}
drop(tx);
let fc = Arc::new(FlushControl::new(1, 0.0, 3));
let segments = worker_thread_loop(rx, source, fc, 0).unwrap();
assert_eq!(segments.len(), 3);
assert_eq!(segments[0].doc_count, 3);
assert_eq!(segments[1].doc_count, 3);
assert_eq!(segments[2].doc_count, 1);
let names: HashSet<_> = segments.iter().map(|s| &s.segment_id.name).collect();
assert_eq!(names.len(), 3);
}
#[test]
fn thread_loop_empty_channel_produces_no_segments() {
let dir: SharedDirectory = MemoryDirectory::create();
let factory: Arc<dyn WorkerFactory> = Arc::new(NoOpWorkerFactory {
directory: Arc::clone(&dir),
});
let source = Arc::new(WorkerSource::new(
Box::new(SequentialIdGenerator(0)),
factory,
));
let (tx, rx) = channel::bounded(4);
drop(tx);
let segments = worker_thread_loop(rx, source, disabled_flush_control(), 0).unwrap();
assert_is_empty!(segments);
}
#[test]
fn compound_packaging_creates_cfs_cfe() {
let dir: SharedDirectory = MemoryDirectory::create();
let factory: Arc<dyn WorkerFactory> = Arc::new(StoredFieldsWorkerFactory {
directory: Arc::clone(&dir),
});
let source = Arc::new(WorkerSource::new(
Box::new(SequentialIdGenerator(0)),
factory,
));
let (tx, rx) = channel::bounded(4);
tx.send(make_doc()).unwrap();
drop(tx);
let mut segments = worker_thread_loop(rx, source, disabled_flush_control(), 0).unwrap();
assert_eq!(segments.len(), 1);
let pre_files = segments[0].file_names.clone();
assert_ge!(pre_files.len(), 5);
package_compound_segment(&mut segments[0], &*dir).unwrap();
assert_eq!(segments[0].file_names.len(), 3);
assert_any!(segments[0].file_names.iter(), |f: &String| f
.ends_with(".si"));
assert_any!(segments[0].file_names.iter(), |f: &String| f
.ends_with(".cfs"));
assert_any!(segments[0].file_names.iter(), |f: &String| f
.ends_with(".cfe"));
let guard = &*dir;
for f in &pre_files {
if !f.ends_with(".si") {
assert!(
guard.read_file(f).is_err(),
"original file {f} should have been deleted"
);
}
}
assert!(guard.read_file("_0.cfs").is_ok());
assert!(guard.read_file("_0.cfe").is_ok());
}
}