use core::codec::field_infos::{FieldInfos, FieldInfosFormat, FieldNumbers, FieldNumbersRef};
use core::codec::segment_infos::{
file_name_from_generation, get_last_commit_segments_filename, SegmentCommitInfo, SegmentInfo,
SegmentInfoFormat, SegmentInfos, INDEX_FILE_PENDING_SEGMENTS,
};
use core::codec::{Codec, CompoundFormat, LiveDocsFormat};
use core::doc::Fieldable;
use core::doc::Term;
use core::index::merge::MergeRateLimiter;
use core::index::merge::MergeScheduler;
use core::index::merge::SegmentMerger;
use core::index::merge::{DocMap, MergeState};
use core::index::merge::{MergePolicy, MergeSpecification, MergerTrigger};
use core::index::merge::{OneMerge, OneMergeRunningInfo};
use core::index::reader::index_exist;
use core::index::reader::{LeafReader, SegmentReader, StandardDirectoryReader};
use core::index::writer::{
BufferedUpdatesStream, DocumentsWriter, Event, FlushedSegment, FrozenBufferedUpdates,
IndexFileDeleter, IndexWriterConfig, OpenMode,
};
use core::search::query::{MatchAllDocsQuery, Query};
use core::store::directory::{Directory, LockValidatingDirectoryWrapper, TrackingDirectoryWrapper};
use core::store::{FlushInfo, IOContext};
use core::util::random_id;
use core::util::to_base36;
use core::util::{BitsRef, DerefWrapper, DocId, VERSION_LATEST};
use core::index::ErrorKind::MergeAborted;
use error::ErrorKind::{AlreadyClosed, IllegalArgument, IllegalState, Index, RuntimeError};
use error::{Error, Result};
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem;
use std::ops::Deref;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
use std::time::{Duration, SystemTime};
use core::index::writer::dir_wrapper::RateLimitFilterDirectory;
use thread_local::ThreadLocal;
pub const INDEX_MAX_DOCS: i32 = i32::max_value() - 128;
pub const INDEX_MAX_POSITION: i32 = i32::max_value() - 128;
pub struct IndexWriter<
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
> {
writer: Arc<IndexWriterInner<D, C, MS, MP>>,
}
impl<D, C, MS, MP> Clone for IndexWriter<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn clone(&self) -> Self {
Self {
writer: Arc::clone(&self.writer),
}
}
}
impl<D, C, MS, MP> IndexWriter<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
pub fn new(
d: Arc<D>,
conf: Arc<IndexWriterConfig<C, MS, MP>>,
) -> Result<IndexWriter<D, C, MS, MP>> {
let mut index_writer = IndexWriter {
writer: Arc::new(IndexWriterInner::new(d, conf)?),
};
index_writer.init();
Ok(index_writer)
}
pub fn get_reader(
&self,
apply_all_deletes: bool,
write_all_deletes: bool,
) -> Result<StandardDirectoryReader<D, C, MS, MP>> {
IndexWriterInner::get_reader(self, apply_all_deletes, write_all_deletes)
}
#[inline]
pub fn config(&self) -> &Arc<IndexWriterConfig<C, MS, MP>> {
&self.writer.config
}
#[inline]
pub fn max_doc(&self) -> u32 {
let _l = self.writer.lock.lock().unwrap();
self.writer.doc_writer.num_docs() + self.writer.segment_infos.total_max_doc() as u32
}
pub fn num_docs(&self) -> u32 {
let _l = self.writer.lock.lock().unwrap();
let mut count = self.writer.doc_writer.num_docs();
for info in &self.writer.segment_infos.segments {
count += info.info.max_doc() as u32 - self.writer.num_deleted_docs(&info);
}
count
}
#[inline]
pub fn directory(&self) -> &Arc<D> {
&self.writer.directory_orig
}
pub fn close(&self) -> Result<()> {
IndexWriterInner::close(self)
}
pub fn rollback(&self) -> Result<()> {
self.writer.rollback()
}
pub fn add_document<F: Fieldable>(&self, doc: Vec<F>) -> Result<u64> {
IndexWriterInner::update_document(self, doc, None)
}
pub fn update_document<F: Fieldable>(&self, doc: Vec<F>, term: Option<Term>) -> Result<u64> {
IndexWriterInner::update_document(self, doc, term)
}
pub fn add_documents<F: Fieldable>(&self, docs: Vec<Vec<F>>) -> Result<u64> {
IndexWriterInner::update_documents(self, docs, None)
}
pub fn update_documents<F: Fieldable>(
&self,
docs: Vec<Vec<F>>,
term: Option<Term>,
) -> Result<u64> {
IndexWriterInner::update_documents(self, docs, term)
}
pub fn delete_documents_by_terms(&self, terms: Vec<Term>) -> Result<u64> {
IndexWriterInner::delete_documents_by_terms(self, terms)
}
pub fn delete_documents_by_queries(&self, queries: Vec<Arc<dyn Query<C>>>) -> Result<u64> {
IndexWriterInner::delete_documents_by_queries(self, queries)
}
pub fn delete_all(&self) -> Result<u64> {
IndexWriterInner::delete_all(self)
}
pub fn nrt_is_current(&self, infos: &SegmentInfos<D, C>) -> bool {
self.writer.nrt_is_current(infos)
}
pub fn force_merge(&self, max_num_segments: u32, do_wait: bool) -> Result<()> {
IndexWriterInner::force_merge(self, max_num_segments, do_wait)
}
pub fn has_uncommitted_changes(&self) -> bool {
self.writer.has_uncommitted_changes()
}
pub fn commit(&self) -> Result<i64> {
IndexWriterInner::commit(self)
}
pub fn flush(&self) -> Result<()> {
IndexWriterInner::flush(self, true, true)
}
pub fn is_open(&self) -> bool {
self.writer.is_open()
}
pub fn tragedy(&self) -> Option<&Error> {
self.writer.tragedy.as_ref()
}
}
impl<D, C, MS, MP> IndexWriter<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn init(&mut self) {
unsafe {
let w1 = Arc::downgrade(&self.writer);
let w2 = Arc::downgrade(&self.writer);
let writer = self.writer.as_ref() as *const IndexWriterInner<D, C, MS, MP>
as *mut IndexWriterInner<D, C, MS, MP>;
let doc_writer = &mut (*writer).doc_writer as *mut DocumentsWriter<D, C, MS, MP>;
let reader_pool = &mut (*writer).reader_pool as *mut ReaderPool<D, C, MS, MP>;
(*doc_writer).init(w1);
(*reader_pool).init(w2);
}
}
pub fn publish_flushed_segment(
&self,
new_segment: FlushedSegment<D, C>,
global_packet: Option<FrozenBufferedUpdates<C>>,
) -> Result<()> {
self.writer
.publish_flushed_segment(new_segment, global_packet)
}
pub fn publish_frozen_updates(&self, packet: FrozenBufferedUpdates<C>) -> Result<()> {
self.writer.publish_frozen_updates(packet)
}
pub fn with_inner(writer: Arc<IndexWriterInner<D, C, MS, MP>>) -> Self {
Self { writer }
}
pub fn apply_deletes_and_purge(&self, force_purge: bool) -> Result<()> {
IndexWriterInner::apply_deletes_and_purge(self, force_purge)
}
pub fn do_after_segment_flushed(&self, trigger_merge: bool, force_purge: bool) -> Result<()> {
IndexWriterInner::do_after_segment_flushed(self, trigger_merge, force_purge)
}
pub fn purge(&self, forced: bool) -> Result<u32> {
IndexWriterInner::purge(self, forced)
}
pub fn flush_failed(&self, info: &SegmentInfo<D, C>) -> Result<()> {
self.writer.flush_failed(info)
}
pub fn is_closed(&self) -> bool {
self.writer.is_closed()
}
pub fn delete_new_files(&self, files: &HashSet<String>) -> Result<()> {
let _l = self.writer.lock.lock()?;
self.writer.delete_new_files(files)
}
pub fn num_deleted_docs(&self, info: &SegmentCommitInfo<D, C>) -> u32 {
self.writer.num_deleted_docs(info)
}
pub fn inc_ref_deleter(&self, segment_infos: &SegmentInfos<D, C>) -> Result<()> {
self.writer.inc_ref_deleter(segment_infos)
}
pub fn dec_ref_deleter(&self, segment_infos: &SegmentInfos<D, C>) -> Result<()> {
self.writer.dec_ref_deleter(segment_infos)
}
#[inline]
pub fn reader_pool(&self) -> &ReaderPool<D, C, MS, MP> {
&self.writer.reader_pool
}
pub fn merge(&self, merge: &mut OneMerge<D, C>) -> Result<()> {
IndexWriterInner::merge(self, merge)
}
#[inline]
pub fn merging_segments(&self) -> &HashSet<String> {
&self.writer.merging_segments
}
#[inline]
pub fn next_merge_id(&self) -> u32 {
self.writer.next_merge_id()
}
pub fn next_merge(&self) -> Option<OneMerge<D, C>> {
self.writer.next_merge()
}
pub fn has_pending_merges(&self) -> bool {
let _l = self.writer.lock.lock().unwrap();
!self.writer.pending_merges.is_empty()
}
}
impl<D, C, MS, MP> Drop for IndexWriter<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn drop(&mut self) {
if Arc::strong_count(&self.writer) == 1 {
if self.writer.config.commit_on_close {
if let Err(e) = IndexWriterInner::shutdown(self) {
error!("IndexWriter: shutdown on close failed by: {:?}", e);
}
} else if let Err(e) = self.rollback() {
error!("IndexWriter: rollback on close failed by: {:?}", e);
}
}
}
}
pub struct IndexWriterInner<
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
> {
pub config: Arc<IndexWriterConfig<C, MS, MP>>,
directory_orig: Arc<D>,
directory: Arc<LockValidatingDirectoryWrapper<D>>,
lock: Arc<Mutex<()>>,
closed: AtomicBool,
closing: AtomicBool,
cond: Condvar,
global_field_numbers: Arc<FieldNumbers>,
pub pending_num_docs: Arc<AtomicI64>,
pub doc_writer: DocumentsWriter<D, C, MS, MP>,
pub deleter: IndexFileDeleter<D>,
tragedy: Option<Error>,
segment_infos: SegmentInfos<D, C>,
segment_infos_lock: Mutex<()>,
change_count: AtomicU64,
last_commit_change_count: AtomicU64,
rollback_segments: Vec<Arc<SegmentCommitInfo<D, C>>>,
commit_lock: Mutex<()>,
pending_commit: Option<SegmentInfos<D, C>>,
pending_seq_no: AtomicI64,
pending_commit_change_count: AtomicU64,
files_to_commit: HashSet<String>,
rate_limiters: Arc<ThreadLocal<Arc<MergeRateLimiter>>>,
merge_directory: RateLimitFilterDirectory<LockValidatingDirectoryWrapper<D>, MergeRateLimiter>,
segments_to_merge: HashMap<Arc<SegmentCommitInfo<D, C>>, bool>,
merge_max_num_segments: u32,
merging_segments: HashSet<String>,
merge_scheduler: MS,
merge_id_gen: AtomicU32,
pending_merges: VecDeque<OneMerge<D, C>>,
running_merges: HashMap<u32, OneMergeRunningInfo<D, C>>,
merge_exceptions: Vec<OneMerge<D, C>>,
merge_gen: u64,
stop_merges: bool,
full_flush_lock: Arc<Mutex<()>>,
flush_count: AtomicU32,
flush_deletes_count: AtomicU32,
reader_pool: ReaderPool<D, C, MS, MP>,
updates_stream_lock: Mutex<()>,
buffered_updates_stream: BufferedUpdatesStream<C>,
pool_readers: AtomicBool,
}
unsafe impl<D, C, MS, MP> Send for IndexWriterInner<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
}
unsafe impl<D, C, MS, MP> Sync for IndexWriterInner<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
}
impl<D, C, MS, MP> IndexWriterInner<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn new(d: Arc<D>, conf: Arc<IndexWriterConfig<C, MS, MP>>) -> Result<Self> {
let directory = Arc::new(LockValidatingDirectoryWrapper::new(Arc::clone(&d)));
let rate_limiters = Arc::new(ThreadLocal::default());
let merge_directory =
RateLimitFilterDirectory::new(Arc::clone(&directory), Arc::clone(&rate_limiters));
let buffered_updates_stream = BufferedUpdatesStream::default();
let pool_readers = conf.reader_pooling;
let create = match conf.open_mode {
OpenMode::Create => true,
OpenMode::Append => false,
OpenMode::CreateOrAppend => !index_exist(directory.as_ref())?,
};
let mut initial_index_exists = true;
let files = directory.list_all()?;
let mut segment_infos: SegmentInfos<D, C>;
let rollback_segments: Vec<Arc<SegmentCommitInfo<D, C>>>;
let change_count = AtomicU64::new(0);
if create {
segment_infos = {
match SegmentInfos::read_latest_commit(&d) {
Ok(mut sis) => {
sis.clear();
sis
}
Err(_) => {
initial_index_exists = false;
SegmentInfos::default()
}
}
};
rollback_segments = segment_infos.create_backup_segment_infos();
change_count.fetch_add(1, Ordering::AcqRel);
segment_infos.changed();
} else {
let last_segments_file = get_last_commit_segments_filename(&files)?;
if last_segments_file.is_none() {
bail!(
"IndexNotFound: no segments* file found in '{}', files: {:?}",
&directory,
&files
);
}
let last_segments_file = last_segments_file.unwrap();
segment_infos = SegmentInfos::read_commit(&d, &last_segments_file)?;
rollback_segments = segment_infos.create_backup_segment_infos();
}
let pending_num_docs = AtomicI64::new(segment_infos.total_max_doc() as i64);
let global_field_numbers = Arc::new(FieldNumbers::new());
for info in &segment_infos.segments {
let fis = read_field_infos(info.as_ref())?;
for fi in fis.by_number.values() {
global_field_numbers.add_or_get(
&fi.name,
fi.number,
fi.doc_values_type,
fi.point_dimension_count,
fi.point_num_bytes,
)?;
}
}
Self::validate_index_sort(conf.as_ref(), &segment_infos)?;
let doc_writer =
DocumentsWriter::new(Arc::clone(&conf), Arc::clone(&d), Arc::clone(&directory));
let mut deleter = IndexFileDeleter::new(directory.clone());
let starting_commit_deleted =
deleter.init(d.clone(), &files, &mut segment_infos, initial_index_exists)?;
if starting_commit_deleted {
change_count.fetch_add(1, Ordering::AcqRel);
segment_infos.changed();
}
Ok(IndexWriterInner {
lock: Arc::new(Mutex::new(())),
cond: Condvar::new(),
directory_orig: d,
directory,
merge_directory,
change_count,
last_commit_change_count: AtomicU64::new(0),
rollback_segments,
pending_commit: None,
pending_seq_no: AtomicI64::new(0),
pending_commit_change_count: AtomicU64::new(0),
files_to_commit: HashSet::new(),
segment_infos,
segment_infos_lock: Mutex::new(()),
global_field_numbers,
doc_writer,
deleter,
segments_to_merge: HashMap::new(),
merge_max_num_segments: 0,
closed: AtomicBool::new(false),
closing: AtomicBool::new(false),
merging_segments: HashSet::new(),
merge_scheduler: conf.merge_scheduler(),
merge_id_gen: AtomicU32::new(0),
pending_merges: VecDeque::new(),
running_merges: HashMap::new(),
merge_exceptions: vec![],
merge_gen: 0,
stop_merges: false,
flush_count: AtomicU32::new(0),
flush_deletes_count: AtomicU32::new(0),
reader_pool: ReaderPool::new(),
updates_stream_lock: Mutex::new(()),
buffered_updates_stream,
pool_readers: AtomicBool::new(pool_readers),
config: conf,
pending_num_docs: Arc::new(pending_num_docs),
full_flush_lock: Arc::new(Mutex::new(())),
commit_lock: Mutex::new(()),
rate_limiters,
tragedy: None,
})
}
#[allow(clippy::mut_from_ref)]
unsafe fn writer_mut(&self, _l: &MutexGuard<()>) -> &mut IndexWriterInner<D, C, MS, MP> {
let writer =
self as *const IndexWriterInner<D, C, MS, MP> as *mut IndexWriterInner<D, C, MS, MP>;
&mut *writer
}
fn get_reader(
index_writer: &IndexWriter<D, C, MS, MP>,
apply_all_deletes: bool,
write_all_deletes: bool,
) -> Result<StandardDirectoryReader<D, C, MS, MP>> {
index_writer.writer.ensure_open(true)?;
if write_all_deletes && !apply_all_deletes {
bail!(IllegalArgument(
"apply_all_deletes must be true when write_all_deletes=true".into()
));
}
debug!("IW - flush at get_reader");
let start = SystemTime::now();
index_writer
.writer
.pool_readers
.store(true, Ordering::Release);
index_writer.writer.do_before_flush();
let mut any_changes = false;
let reader = Self::do_get_reader(
index_writer,
apply_all_deletes,
write_all_deletes,
&mut any_changes,
)?;
let _ = Self::maybe_merge(index_writer, MergerTrigger::FullFlush, None);
debug!(
"IW - get_reader took {} ms",
SystemTime::now()
.duration_since(start)
.unwrap()
.subsec_millis()
);
Ok(reader)
}
fn do_get_reader(
index_writer: &IndexWriter<D, C, MS, MP>,
apply_all_deletes: bool,
write_all_deletes: bool,
any_changes: &mut bool,
) -> Result<StandardDirectoryReader<D, C, MS, MP>> {
let _l = index_writer.writer.full_flush_lock.lock()?;
let res = Self::flush_and_open(
index_writer,
apply_all_deletes,
write_all_deletes,
any_changes,
);
index_writer.writer.doc_writer.finish_full_flush(true);
match res {
Ok(reader) => {
Self::process_events(index_writer, false, true)?;
index_writer.writer.do_after_flush();
Ok(reader)
}
Err(e) => {
error!("IW - hit error during NRT reader: {:?}", e);
Err(e)
}
}
}
fn flush_and_open(
index_writer: &IndexWriter<D, C, MS, MP>,
apply_all_deletes: bool,
write_all_deletes: bool,
any_changes: &mut bool,
) -> Result<StandardDirectoryReader<D, C, MS, MP>> {
let (changes, _) = index_writer.writer.doc_writer.flush_all_threads()?;
*any_changes = changes;
if !changes {
index_writer
.writer
.flush_count
.fetch_add(1, Ordering::AcqRel);
}
let reader = {
let l = index_writer.writer.lock.lock()?;
*any_changes |= index_writer
.writer
.maybe_apply_deletes(apply_all_deletes, &l)?;
if write_all_deletes {
index_writer
.writer
.reader_pool
.commit(&index_writer.writer.segment_infos)?;
}
let r = StandardDirectoryReader::open_by_writer(
index_writer.clone(),
&index_writer.writer.segment_infos,
apply_all_deletes,
write_all_deletes,
)?;
debug!(
"IW - return reader version: {}, reader: {:?} ",
r.version(),
&r
);
r
};
Ok(reader)
}
fn validate_index_sort<MS1: MergeScheduler, MP1: MergePolicy>(
config: &IndexWriterConfig<C, MS1, MP1>,
segment_infos: &SegmentInfos<D, C>,
) -> Result<()> {
if let Some(index_sort) = config.index_sort() {
for info in &segment_infos.segments {
if let Some(segment_sort) = info.info.index_sort() {
if segment_sort != index_sort {
bail!(IllegalArgument(format!(
"config and segment index sort mismatch. segment: {:?}, config: {:?}",
segment_sort, index_sort
)));
}
}
}
}
Ok(())
}
#[inline]
fn next_merge_id(&self) -> u32 {
self.merge_id_gen.fetch_add(1, Ordering::AcqRel)
}
#[inline]
pub fn global_field_numbers(&self) -> &Arc<FieldNumbers> {
&self.global_field_numbers
}
#[inline]
pub fn buffered_updates_stream(&self) -> &BufferedUpdatesStream<C> {
&self.buffered_updates_stream
}
fn close(index_writer: &IndexWriter<D, C, MS, MP>) -> Result<()> {
debug!(
"IW - start close. commit on close: {}",
index_writer.writer.config.commit_on_close
);
if index_writer.writer.config.commit_on_close {
Self::shutdown(index_writer)
} else {
index_writer.writer.rollback()
}
}
fn shutdown(index_writer: &IndexWriter<D, C, MS, MP>) -> Result<()> {
if index_writer.writer.pending_commit.is_some() {
bail!(IllegalState(
"cannot close: prepareCommit was already called with no corresponding call to \
commit"
.into()
));
}
if index_writer.writer.should_close(true) {
debug!("IW - now flush at close");
if let Err(e) = Self::do_shutdown(index_writer) {
let commit_lock = index_writer.writer.commit_lock.lock()?;
if let Err(err) = index_writer.writer.rollback_internal(&commit_lock) {
warn!("rollback internal failed when shutdown by '{:?}'", err);
}
return Err(e);
}
}
Ok(())
}
fn do_shutdown(index_writer: &IndexWriter<D, C, MS, MP>) -> Result<()> {
IndexWriterInner::flush(index_writer, true, true)?;
Self::wait_for_merges(index_writer)?;
Self::commit(index_writer)?;
let commit_lock = index_writer.writer.commit_lock.lock()?;
index_writer.writer.rollback_internal(&commit_lock)
}
fn should_close(&self, wait_for_close: bool) -> bool {
let mut l = self.lock.lock().unwrap();
loop {
if !self.closed.load(Ordering::Acquire) {
if !self.closing.load(Ordering::Acquire) {
self.closing.store(true, Ordering::Release);
return true;
} else if !wait_for_close {
return false;
} else {
let (loc, _) = self
.cond
.wait_timeout(l, Duration::from_millis(1000))
.unwrap();
l = loc;
}
} else {
return false;
}
}
}
fn publish_flushed_segment(
&self,
new_segment: FlushedSegment<D, C>,
global_packet: Option<FrozenBufferedUpdates<C>>,
) -> Result<()> {
let res = self.do_publish_flushed_segment(new_segment, global_packet);
self.flush_count.fetch_add(1, Ordering::AcqRel);
self.do_after_flush();
res
}
fn do_publish_flushed_segment(
&self,
mut new_segment: FlushedSegment<D, C>,
global_packet: Option<FrozenBufferedUpdates<C>>,
) -> Result<()> {
let l = self.lock.lock()?;
self.ensure_open(false)?;
let _bl = self.updates_stream_lock.lock()?;
debug!("publish_flushed_segment");
if let Some(global_packet) = global_packet {
if global_packet.any() {
self.buffered_updates_stream.push(global_packet)?;
}
}
let segment_updates = new_segment.segment_updates.take();
let next_gen = if let Some(p) = segment_updates {
if p.any() {
self.buffered_updates_stream.push(p)?
} else {
self.buffered_updates_stream.get_next_gen()
}
} else {
self.buffered_updates_stream.get_next_gen()
};
debug!(
"publish sets new_segment del_gen={}, seg={}",
next_gen, &new_segment.segment_info
);
new_segment
.segment_info
.set_buffered_deletes_gen(next_gen as i64);
let writer = unsafe { self.writer_mut(&l) };
writer
.segment_infos
.add(Arc::clone(&new_segment.segment_info));
writer.check_point(&l)
}
fn publish_frozen_updates(&self, packet: FrozenBufferedUpdates<C>) -> Result<()> {
debug_assert!(packet.any());
let _gl = self.lock.lock()?;
let _l = self.updates_stream_lock.lock()?;
self.buffered_updates_stream.push(packet)?;
Ok(())
}
fn rollback(&self) -> Result<()> {
if self.should_close(true) {
let commit_lock = self.commit_lock.lock()?;
self.rollback_internal(&commit_lock)
} else {
Ok(())
}
}
fn rollback_internal(&self, commit_lock: &MutexGuard<()>) -> Result<()> {
let writer_mut = unsafe { self.writer_mut(commit_lock) };
writer_mut.rollback_internal_no_commit()
}
fn rollback_internal_no_commit(&mut self) -> Result<()> {
debug!("IW - rollback");
let res = self.do_rollback_internal_no_commit();
if res.is_err() {
if let Err(e) = self.merge_scheduler.close() {
warn!(
"merge_scheduler close when rollback commit faile by '{:?}'",
e
);
}
}
{
let _l = self.lock.lock()?;
if res.is_err() {
if self.pending_commit.is_some() {
self.pending_commit
.as_mut()
.unwrap()
.rollback_commit(self.directory.as_ref());
if let Err(e) = self
.deleter
.dec_ref_files(&self.pending_commit.as_ref().unwrap().files(false))
{
warn!(
"index write deleter dec ref by segment failed when rollback with '{}'",
e
);
}
self.pending_commit = None;
}
let _res = self.reader_pool.drop_all(false);
let _res = self.deleter.close();
}
self.closed.store(true, Ordering::Release);
self.closing.store(false, Ordering::Release);
self.cond.notify_all();
}
Ok(())
}
fn do_rollback_internal_no_commit(&mut self) -> Result<()> {
{
let lock = Arc::clone(&self.lock);
let l = lock.lock()?;
let _ = self.abort_merges(l)?;
}
self.rate_limiters = Arc::new(ThreadLocal::new());
debug!("IW - rollback: done finish merges");
self.merge_scheduler.close()?;
self.buffered_updates_stream.clear()?;
self.doc_writer.close();
self.doc_writer.abort()?;
{
let _l = self.lock.lock()?;
if self.pending_commit.is_some() {
self.pending_commit
.as_mut()
.unwrap()
.rollback_commit(self.directory.as_ref());
let res = self
.deleter
.dec_ref_files(&self.pending_commit.as_ref().unwrap().files(false));
self.pending_commit = None;
self.cond.notify_all();
res?;
}
self.reader_pool.drop_all(false)?;
self.segment_infos
.rollback_segment_infos(self.rollback_segments.clone());
debug!("IW - rollback segment commit infos");
if self.tragedy.is_none() {
self.deleter.checkpoint(&self.segment_infos, false)?;
self.deleter.refresh()?;
self.deleter.close()?;
}
self.last_commit_change_count
.store(self.change_count(), Ordering::Release);
self.closed.store(true, Ordering::Release);
}
Ok(())
}
fn delete_all(index_writer: &IndexWriter<D, C, MS, MP>) -> Result<u64> {
index_writer.writer.ensure_open(true)?;
let seq_no: u64;
{
let l = index_writer.writer.full_flush_lock.lock()?;
let aborted_doc_count = index_writer.writer.doc_writer.lock_and_abort_all(&l)?;
index_writer
.writer
.pending_num_docs
.fetch_sub(aborted_doc_count as i64, Ordering::AcqRel);
Self::process_events(index_writer, false, true)?;
{
let mut _gl = index_writer.writer.lock.lock()?;
let writer_mut = unsafe { index_writer.writer.writer_mut(&_gl) };
let _gl = writer_mut.abort_merges(_gl)?;
writer_mut.stop_merges = false;
index_writer.writer.pending_num_docs.fetch_sub(
index_writer.writer.segment_infos.total_max_doc() as i64,
Ordering::AcqRel,
);
writer_mut.segment_infos.clear();
writer_mut
.deleter
.checkpoint(&index_writer.writer.segment_infos, false)?;
index_writer.writer.reader_pool.drop_all(false)?;
index_writer
.writer
.change_count
.fetch_add(1, Ordering::AcqRel);
writer_mut.segment_infos.changed();
index_writer.writer.global_field_numbers.clear();
seq_no = index_writer
.writer
.doc_writer
.delete_queue
.next_sequence_number();
writer_mut.doc_writer.last_seq_no = seq_no;
}
}
Ok(seq_no)
}
fn check_point(&mut self, lock: &MutexGuard<()>) -> Result<()> {
self.changed(lock);
self.deleter.checkpoint(&self.segment_infos, false)
}
fn changed(&mut self, _lock: &MutexGuard<()>) {
self.change_count.fetch_add(1, Ordering::AcqRel);
self.segment_infos.changed();
}
fn num_deleted_docs(&self, info: &SegmentCommitInfo<D, C>) -> u32 {
let mut del_count = info.del_count() as u32;
if let Some(rld) = self.reader_pool.get(info) {
del_count += rld.pending_delete_count();
}
del_count
}
fn change_count(&self) -> u64 {
self.change_count.load(Ordering::Acquire)
}
fn do_before_flush(&self) {}
fn do_after_flush(&self) {}
fn ensure_open(&self, fail_if_closing: bool) -> Result<()> {
if self.closed.load(Ordering::Acquire)
|| (fail_if_closing && self.closing.load(Ordering::Acquire))
{
bail!(AlreadyClosed("this IndexWriter is closed".into()));
}
Ok(())
}
fn apply_deletes_and_purge(
index_writer: &IndexWriter<D, C, MS, MP>,
force_purge: bool,
) -> Result<()> {
let res = Self::purge(index_writer, force_purge);
let _any_changes = {
let l = index_writer.writer.lock.lock()?;
index_writer.writer.apply_all_deletes_and_update(&l)?
};
index_writer
.writer
.flush_count
.fetch_add(1, Ordering::AcqRel);
match res {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
fn do_after_segment_flushed(
index_writer: &IndexWriter<D, C, MS, MP>,
_trigger_merge: bool,
force_purge: bool,
) -> Result<()> {
let res = Self::purge(index_writer, force_purge);
match res {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
fn inc_ref_deleter(&self, segment_infos: &SegmentInfos<D, C>) -> Result<()> {
self.ensure_open(true)?;
self.deleter.inc_ref_files(&segment_infos.files(false));
Ok(())
}
fn dec_ref_deleter(&self, segment_infos: &SegmentInfos<D, C>) -> Result<()> {
self.ensure_open(true)?;
self.deleter.dec_ref_files(&segment_infos.files(false))
}
fn process_events(
index_writer: &IndexWriter<D, C, MS, MP>,
trigger_merge: bool,
force_purge: bool,
) -> Result<bool> {
let mut processed = false;
if index_writer.writer.tragedy.is_none() {
while let Ok(event) = index_writer.writer.doc_writer.events.pop() {
processed = true;
event.process(index_writer, trigger_merge, force_purge)?;
}
}
Ok(processed)
}
fn purge(index_writer: &IndexWriter<D, C, MS, MP>, forced: bool) -> Result<u32> {
index_writer
.writer
.doc_writer
.purge_buffer(index_writer, forced)
}
fn has_uncommitted_changes(&self) -> bool {
self.change_count() != self.last_commit_change_count.load(Ordering::Acquire)
|| self.doc_writer.any_changes()
|| self.buffered_updates_stream.any()
}
fn commit(index_writer: &IndexWriter<D, C, MS, MP>) -> Result<i64> {
debug!("IW - commit: start");
let mut do_maybe_merge = false;
let seq_no: i64;
{
let l = index_writer.writer.commit_lock.lock()?;
let writer = unsafe { index_writer.writer.writer_mut(&l) };
index_writer.writer.ensure_open(false)?;
debug!("IW - commit: enter lock");
seq_no = if index_writer.writer.pending_commit.is_none() {
debug!("IW - commit: now prepare");
writer.prepare_commit_internal(&mut do_maybe_merge, index_writer, &l)?
} else {
debug!("IW - commit: already prepared");
index_writer.writer.pending_seq_no.load(Ordering::Acquire)
};
writer.finish_commit(&l)?;
}
Ok(seq_no)
}
fn prepare_commit_internal(
&mut self,
do_maybe_merge: &mut bool,
index_writer: &IndexWriter<D, C, MS, MP>,
_l: &MutexGuard<()>,
) -> Result<i64> {
self.ensure_open(false)?;
debug!("IW - prepare commit: flush");
if let Some(ref tragedy) = self.tragedy {
bail!(IllegalState(format!(
"this writer hit an unrecoverable error; cannot commit: {:?}",
tragedy
)));
}
if self.pending_commit.is_some() {
bail!(IllegalState(
"prepareCommit was already called with no corresponding call to commit".into()
));
}
self.do_before_flush();
let mut seq_no = 0u64;
let to_commit: SegmentInfos<D, C>;
let mut any_segments_flushed = false;
{
let full_flush_lock = Arc::clone(&self.full_flush_lock);
let _fl = full_flush_lock.lock()?;
let mut flush_success = true;
let res = self.prepare_commit_internal_inner(
index_writer,
&mut seq_no,
&mut flush_success,
&mut any_segments_flushed,
);
if res.is_err() {
debug!("IW - hit error during perpare commit");
}
self.doc_writer.finish_full_flush(flush_success);
self.do_after_flush();
match res {
Ok(infos) => {
to_commit = infos;
}
Err(e) => {
return Err(e);
}
}
}
if any_segments_flushed {
*do_maybe_merge = true;
}
let res = self.start_commit(to_commit);
if res.is_err() {
let lock = Arc::clone(&self.lock);
let _l = lock.lock().unwrap();
if !self.files_to_commit.is_empty() {
self.deleter.dec_ref_files_no_error(&self.files_to_commit);
self.files_to_commit.clear();
}
}
match res {
Ok(()) => {
if self.pending_commit.is_none() {
Ok(-1)
} else {
Ok(seq_no as i64)
}
}
Err(e) => Err(e),
}
}
fn prepare_commit_internal_inner(
&mut self,
index_writer: &IndexWriter<D, C, MS, MP>,
seq_no: &mut u64,
flush_success: &mut bool,
any_segment_flushed: &mut bool,
) -> Result<SegmentInfos<D, C>> {
let (any_flushed, no) = self.doc_writer.flush_all_threads()?;
*seq_no = no;
*any_segment_flushed = any_flushed;
if !any_flushed {
self.flush_count.fetch_add(1, Ordering::AcqRel);
}
Self::process_events(index_writer, false, true)?;
*flush_success = true;
let lock = Arc::clone(&self.lock);
let gl = lock.lock()?;
self.maybe_apply_deletes(true, &gl)?;
self.reader_pool.commit(&self.segment_infos)?;
if self.change_count() != self.last_commit_change_count.load(Ordering::Acquire) {
self.change_count.fetch_add(1, Ordering::AcqRel);
self.segment_infos.changed();
}
let to_commit = self.segment_infos.clone();
self.pending_commit_change_count
.store(self.change_count(), Ordering::Release);
self.files_to_commit = to_commit.files(false);
self.deleter.inc_ref_files(&self.files_to_commit);
Ok(to_commit)
}
fn start_commit(&mut self, to_sync: SegmentInfos<D, C>) -> Result<()> {
debug_assert!(self.pending_commit.is_none());
if let Some(ref tragedy) = self.tragedy {
bail!(IllegalState(format!(
"this writer hit an unrecoverable error; cannot commit: {:?}",
tragedy
)));
}
debug!("IW - start_commit(): start");
{
let lock = Arc::clone(&self.lock);
let _l = lock.lock()?;
if self.last_commit_change_count.load(Ordering::Acquire) > self.change_count() {
bail!(IllegalState(
"change_count is smaller than last_commit_change_count".into()
));
}
if self.pending_commit_change_count.load(Ordering::Acquire) > self.change_count() {
debug!("IW - skip start_commit(): no changes pending");
let res = self.deleter.dec_ref_files(&self.files_to_commit);
self.files_to_commit.clear();
return res;
}
debug_assert!(self.files_exist(&to_sync));
}
let mut pending_commit_set = false;
let last_gen = to_sync.last_generation;
let gen = to_sync.generation;
let err = self.start_commit_inner(to_sync, &mut pending_commit_set);
{
let lock = Arc::clone(&self.lock);
let _l = lock.lock()?;
self.segment_infos.update_generation(last_gen, gen);
if !pending_commit_set {
debug!("IW - hit error committing segments file");
self.deleter.dec_ref_files_no_error(&self.files_to_commit);
self.files_to_commit.clear();
}
}
if let Err(e) = err {
self.tragic_event(e, "start_commit")?;
}
Ok(())
}
fn start_commit_inner(
&mut self,
mut to_sync: SegmentInfos<D, C>,
pending_commit_set: &mut bool,
) -> Result<()> {
{
let lock = Arc::clone(&self.lock);
let _l = lock.lock()?;
debug_assert!(self.pending_commit.is_none());
to_sync.prepare_commit(self.directory.as_ref())?;
debug!(
"IW - start_commit: wrote pending segment file '{}' ",
file_name_from_generation(
INDEX_FILE_PENDING_SEGMENTS,
"",
to_sync.generation as u64,
)
);
*pending_commit_set = true;
self.pending_commit = Some(to_sync);
}
let files_to_sync: HashSet<String> = self.pending_commit.as_ref().unwrap().files(false);
if let Err(e) = self.directory.sync(&files_to_sync) {
*pending_commit_set = false;
self.pending_commit
.as_mut()
.unwrap()
.rollback_commit(self.directory.as_ref());
self.pending_commit = None;
return Err(e);
}
debug!("IW - done all syncs: {:?}", &files_to_sync);
Ok(())
}
fn finish_commit(&mut self, commit_lock: &MutexGuard<()>) -> Result<()> {
let mut commit_completed = false;
let res = self.try_finish_commit(&mut commit_completed);
if let Err(e) = res {
if commit_completed {
{
let l = self.lock.lock()?;
if self.tragedy.is_some() {
bail!(e);
}
let writer = unsafe { self.writer_mut(&l) };
writer.tragedy = Some(e);
}
if self.should_close(false) {
self.rollback_internal(commit_lock)?;
}
bail!(IllegalState(format!(
"this writer hit an unrecoverable error; {:?}",
&self.tragedy
)))
} else {
return Err(e);
}
}
debug!("IW - commit: done");
Ok(())
}
fn try_finish_commit(&mut self, commit_completed: &mut bool) -> Result<()> {
let lock = Arc::clone(&self.lock);
let _l = lock.lock()?;
self.ensure_open(false)?;
if let Some(ref tragedy) = self.tragedy {
bail!(IllegalState(format!(
"this writer hit an unrecoverable error; cannot complete commit: {:?}",
tragedy
)));
}
if self.pending_commit.is_some() {
let mut res = self.do_finish_commit(commit_completed);
self.cond.notify_all();
if res.is_ok() {
res = self.deleter.dec_ref_files(&self.files_to_commit);
} else {
self.deleter.dec_ref_files_no_error(&self.files_to_commit);
}
self.pending_commit = None;
self.files_to_commit.clear();
if res.is_err() {
return res;
}
} else {
debug!("IW - commit: pendingCommit is None; skip");
}
Ok(())
}
fn do_finish_commit(&mut self, commit_completed: &mut bool) -> Result<()> {
debug!("IW - commit: pending_commit is not none");
let committed_segments_file = self
.pending_commit
.as_mut()
.unwrap()
.finish_commit(self.directory.as_ref())?;
*commit_completed = true;
debug!(
"IW - commit: done writing segments file {}",
&committed_segments_file
);
self.deleter
.checkpoint(self.pending_commit.as_ref().unwrap(), true)?;
let last_gen = self.pending_commit.as_ref().unwrap().last_generation;
let gen = self.pending_commit.as_ref().unwrap().generation;
self.segment_infos.update_generation(last_gen, gen);
self.last_commit_change_count.store(
self.pending_commit_change_count.load(Ordering::Acquire),
Ordering::Release,
);
self.rollback_segments = self
.pending_commit
.as_ref()
.unwrap()
.create_backup_segment_infos();
Ok(())
}
fn files_exist(&self, to_syc: &SegmentInfos<D, C>) -> bool {
let files = to_syc.files(false);
for file_name in &files {
assert!(self.deleter.exists(file_name));
}
true
}
fn flush(
index_writer: &IndexWriter<D, C, MS, MP>,
trigger_merge: bool,
apply_all_deletes: bool,
) -> Result<()> {
index_writer.writer.ensure_open(false)?;
if Self::do_flush(index_writer, apply_all_deletes)? && trigger_merge {
Self::maybe_merge(index_writer, MergerTrigger::FullFlush, None)?;
}
Ok(())
}
fn do_flush(index_writer: &IndexWriter<D, C, MS, MP>, apply_deletes: bool) -> Result<bool> {
if let Some(ref tragedy) = index_writer.writer.tragedy {
bail!(IllegalState(format!(
"this writer hit an unrecoverable error; cannot flush: {:?}",
tragedy
)));
}
index_writer.writer.do_before_flush();
debug!("IW - start flush: apply_all_deletes={}", apply_deletes);
let mut any_changes = false;
{
let _l = index_writer.writer.full_flush_lock.lock()?;
let res = index_writer.writer.doc_writer.flush_all_threads();
if let Ok((any_flush, _)) = &res {
any_changes = *any_flush;
if !any_changes {
index_writer
.writer
.flush_count
.fetch_add(1, Ordering::AcqRel);
}
}
index_writer.writer.doc_writer.finish_full_flush(true);
if let Err(e) = Self::process_events(index_writer, false, true) {
if res.is_ok() {
return Err(e);
} else {
error!("process events failed after do_flush by: '{:?}'", e);
}
}
if let Err(e) = res {
return Err(e);
}
}
{
let l = index_writer.writer.lock.lock()?;
any_changes |= index_writer.writer.maybe_apply_deletes(apply_deletes, &l)?;
index_writer.writer.do_after_flush();
}
Ok(any_changes)
}
fn maybe_apply_deletes(&self, apply_all_deletes: bool, l: &MutexGuard<()>) -> Result<bool> {
if apply_all_deletes {
debug!("IW - apply all deletes during flush");
return self.apply_all_deletes_and_update(l);
}
debug!(
"IW - don't apply deletes now del_term_count={}",
self.buffered_updates_stream.num_terms(),
);
Ok(false)
}
fn apply_all_deletes_and_update(&self, l: &MutexGuard<()>) -> Result<bool> {
self.flush_deletes_count.fetch_add(1, Ordering::AcqRel);
debug!(
"IW: now apply all deletes for all segments, max_doc={}",
self.doc_writer.num_docs() + self.segment_infos.total_max_doc() as u32
);
let writer_mut = unsafe { self.writer_mut(l) };
let result = self
.buffered_updates_stream
.apply_deletes_and_updates(&self.reader_pool, &self.segment_infos.segments)?;
if result.any_deletes {
writer_mut.check_point(l)?;
}
if !result.all_deleted.is_empty() {
debug!("IW: drop 100% deleted segments.");
for info in result.all_deleted {
if !self.merging_segments.contains(&info.info.name) {
writer_mut.segment_infos.remove(&info);
self.pending_num_docs
.fetch_sub(info.info.max_doc() as i64, Ordering::AcqRel);
self.reader_pool.drop(&info)?;
}
}
writer_mut.check_point(l)?;
}
self.buffered_updates_stream.prune(&self.segment_infos);
Ok(result.any_deletes)
}
fn flush_failed(&self, info: &SegmentInfo<D, C>) -> Result<()> {
let mut files = HashSet::new();
for f in info.files() {
files.insert(f.clone());
}
self.deleter.delete_new_files(&files)
}
fn update_documents<F: Fieldable>(
index_writer: &IndexWriter<D, C, MS, MP>,
docs: Vec<Vec<F>>,
term: Option<Term>,
) -> Result<u64> {
index_writer.writer.ensure_open(true)?;
let (seq_no, changed) = index_writer
.writer
.doc_writer
.update_documents(docs, term)?;
if changed {
Self::process_events(index_writer, true, false)?;
}
Ok(seq_no)
}
fn delete_documents_by_terms(
index_writer: &IndexWriter<D, C, MS, MP>,
terms: Vec<Term>,
) -> Result<u64> {
index_writer.writer.ensure_open(true)?;
let (seq_no, changed) = index_writer.writer.doc_writer.delete_terms(terms);
if changed {
Self::process_events(index_writer, true, false)?;
}
Ok(seq_no)
}
fn delete_documents_by_queries(
index_writer: &IndexWriter<D, C, MS, MP>,
queries: Vec<Arc<dyn Query<C>>>,
) -> Result<u64> {
index_writer.writer.ensure_open(true)?;
for q in &queries {
if q.as_any().is::<MatchAllDocsQuery>() {
return Self::delete_all(index_writer);
}
}
let (seq_no, changed) = index_writer.writer.doc_writer.delete_queries(queries);
if changed {
Self::process_events(index_writer, true, false)?;
}
Ok(seq_no)
}
fn update_document<F: Fieldable>(
index_writer: &IndexWriter<D, C, MS, MP>,
doc: Vec<F>,
term: Option<Term>,
) -> Result<u64> {
index_writer.writer.ensure_open(true)?;
let (seq_no, changed) = index_writer.writer.doc_writer.update_document(doc, term)?;
if changed {
Self::process_events(index_writer, true, false)?;
}
Ok(seq_no)
}
#[allow(dead_code)]
fn update_numeric_doc_value(&mut self, _field: &str, _value: i64, _term: Term) -> Result<u64> {
unimplemented!()
}
pub fn new_segment_name(&self) -> String {
let _l = self.segment_infos_lock.lock().unwrap();
let writer =
self as *const IndexWriterInner<D, C, MS, MP> as *mut IndexWriterInner<D, C, MS, MP>;
let count = unsafe {
(*writer).segment_infos.changed();
let counter = self.segment_infos.counter;
(*writer).segment_infos.counter += 1;
counter
};
format!("_{}", to_base36(count as u64))
}
pub fn create_compound_file<DW: Directory, T: Deref<Target = DW>>(
&self,
directory: &TrackingDirectoryWrapper<DW, T>,
info: &mut SegmentInfo<D, C>,
context: &IOContext,
) -> Result<()> {
if !directory.create_files().is_empty() {
bail!(IllegalState(
"pass a clean tracking dir for CFS creation".into()
));
}
debug!("IW: create compound file for segment: {}", &info.name);
let res = info
.codec()
.compound_format()
.write(directory, info, context);
if let Err(err) = res {
if let Err(e) = self.delete_new_files(&directory.create_files()) {
error!(
"clean up files when create compound file failed, error occur: {:?}",
e
);
}
return Err(err);
}
info.set_files(&directory.create_files())?;
Ok(())
}
fn nrt_is_current(&self, infos: &SegmentInfos<D, C>) -> bool {
let _l = self.lock.lock().unwrap();
infos.version == self.segment_infos.version
&& !self.doc_writer.any_changes()
&& !self.buffered_updates_stream.any()
}
fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
}
fn is_open(&self) -> bool {
!self.closing.load(Ordering::Acquire) || !self.is_closed()
}
fn delete_new_files(&self, files: &HashSet<String>) -> Result<()> {
self.deleter.delete_new_files(files)
}
fn force_merge(
index_writer: &IndexWriter<D, C, MS, MP>,
max_num_segments: u32,
do_wait: bool,
) -> Result<()> {
index_writer.writer.ensure_open(true)?;
if max_num_segments < 1 {
bail!(IllegalArgument(format!(
"max_num_segments must be >= 1, got {}",
max_num_segments
)));
}
trace!("IW - force_merge: flush at force merge");
Self::flush(index_writer, true, true)?;
{
let l = index_writer.writer.lock.lock()?;
let writer_mut = unsafe { index_writer.writer.writer_mut(&l) };
writer_mut.reset_merge_exceptions(&l);
writer_mut.segments_to_merge.clear();
for info in &index_writer.writer.segment_infos.segments {
writer_mut.segments_to_merge.insert(Arc::clone(info), true);
}
writer_mut.merge_max_num_segments = max_num_segments;
for merge in &mut writer_mut.pending_merges {
merge.max_num_segments.set(Some(max_num_segments));
writer_mut
.segments_to_merge
.insert(Arc::clone(merge.info.as_ref().unwrap()), true);
}
let new_running_merges = HashMap::with_capacity(writer_mut.running_merges.len());
let mut running_merges =
mem::replace(&mut writer_mut.running_merges, new_running_merges);
for (_, merge) in running_merges.drain() {
merge.max_num_segments.set(Some(max_num_segments));
writer_mut
.segments_to_merge
.insert(Arc::clone(merge.info.as_ref().unwrap()), true);
writer_mut.running_merges.insert(merge.id, merge);
}
}
Self::maybe_merge(
index_writer,
MergerTrigger::Explicit,
Some(max_num_segments),
)?;
if do_wait {
let mut l = index_writer.writer.lock.lock()?;
loop {
if let Some(ref tragedy) = index_writer.writer.tragedy {
bail!(IllegalState(format!(
"this writer hit an unrecoverable error; cannot complete forceMerge: {:?}",
tragedy
)));
}
if !index_writer.writer.merge_exceptions.is_empty() {
for merge in &index_writer.writer.merge_exceptions {
if merge.max_num_segments.get().is_some() {
bail!(RuntimeError("background merge hit exception".into()));
}
}
}
if index_writer.writer.max_num_segments_merges_pending(&l) {
let (guard, _) = index_writer
.writer
.cond
.wait_timeout(l, Duration::from_millis(1000))?;
l = guard;
} else {
break;
}
}
index_writer.writer.ensure_open(true)?;
}
Ok(())
}
fn max_num_segments_merges_pending(&self, _lock: &MutexGuard<()>) -> bool {
self.pending_merges
.iter()
.any(|m| m.max_num_segments.get().is_some())
|| self
.running_merges
.values()
.any(|m| m.max_num_segments.get().is_some())
}
fn reset_merge_exceptions(&mut self, _lock: &MutexGuard<()>) {
self.merge_exceptions = vec![];
self.merge_gen += 1;
}
fn register_merge(
&mut self,
mut merge: OneMerge<D, C>,
_lock: &MutexGuard<()>,
) -> Result<bool> {
if merge.register_done {
return Ok(true);
}
debug_assert!(!merge.segments.is_empty());
if self.stop_merges {
merge.rate_limiter.set_abort();
bail!(Index(MergeAborted("merge is abort!".into())));
}
let mut is_external = false;
for info in &merge.segments {
if self.merging_segments.contains(&info.info.name) {
return Ok(false);
}
if !self.segment_infos.segments.contains(info) {
return Ok(false);
}
if info.info.directory.as_ref() as *const D != self.directory_orig.as_ref() as *const D
{
is_external = true;
}
if self.segments_to_merge.contains_key(info) {
merge
.max_num_segments
.set(Some(self.merge_max_num_segments));
}
}
for info in &merge.segments {
if !self.segment_infos.segments.contains(info) {
bail!(
"MergeError: MergePolicy selected a segment '{}' that is not in the current \
index",
&info.info.name
);
}
}
merge.merge_gen = self.merge_gen;
merge.is_external = is_external;
for info in &merge.segments {
self.merging_segments.insert(info.info.name.clone());
}
debug_assert_eq!(merge.estimated_merge_bytes.read(), 0);
debug_assert_eq!(merge.total_merge_bytes, 0);
for info in &merge.segments {
if info.info.max_doc > 0 {
let del_count = self.num_deleted_docs(info.as_ref());
debug_assert!((del_count as i32) <= info.info.max_doc);
let total_size = info.size_in_bytes();
let del_ratio = del_count as f64 / info.info.max_doc as f64;
merge
.estimated_merge_bytes
.update(|bytes| *bytes += (total_size as f64 * (1.0 - del_ratio)) as u64);
merge.total_merge_bytes = total_size as u64;
}
}
merge.register_done = true;
self.pending_merges.push_back(merge);
Ok(true)
}
fn next_merge(&self) -> Option<OneMerge<D, C>> {
let l = self.lock.lock().unwrap();
let writer_mut = unsafe { self.writer_mut(&l) };
if let Some(one_merge) = writer_mut.pending_merges.pop_front() {
writer_mut
.running_merges
.insert(one_merge.id, one_merge.running_info());
Some(one_merge)
} else {
None
}
}
pub fn maybe_merge(
index_writer: &IndexWriter<D, C, MS, MP>,
trigger: MergerTrigger,
max_num_segments: Option<u32>,
) -> Result<()> {
index_writer.writer.ensure_open(false)?;
let new_merges_found = {
let l = index_writer.writer.lock.lock()?;
let writer = unsafe { index_writer.writer.writer_mut(&l) };
writer.update_pending_merges(trigger, max_num_segments, index_writer, &l)?
};
index_writer
.writer
.merge_scheduler
.merge(index_writer, trigger, new_merges_found)
}
fn update_pending_merges(
&mut self,
trigger: MergerTrigger,
max_num_segments: Option<u32>,
index_writer: &IndexWriter<D, C, MS, MP>,
l: &MutexGuard<()>,
) -> Result<bool> {
debug_assert!(max_num_segments.is_none() || *max_num_segments.as_ref().unwrap() > 0);
if self.stop_merges {
return Ok(false);
}
if self.tragedy.is_some() {
return Ok(false);
}
let mut spec: Option<MergeSpecification<D, C>>;
if let Some(max_num_segments) = max_num_segments {
debug_assert!(
trigger == MergerTrigger::Explicit || trigger == MergerTrigger::MergeFinished
);
spec = self.config.merge_policy().find_forced_merges(
&self.segment_infos,
max_num_segments,
&self.segments_to_merge,
index_writer,
)?;
if let Some(ref mut spec) = spec {
for merge in &mut spec.merges {
merge.max_num_segments.set(Some(max_num_segments));
}
}
} else {
spec = self.config.merge_policy().find_merges(
trigger,
&self.segment_infos,
index_writer,
)?;
}
let new_merges_found = spec.is_some();
if let Some(ref mut spec) = spec {
let merges = mem::replace(&mut spec.merges, vec![]);
for merge in merges {
self.register_merge(merge, &l)?;
}
}
Ok(new_merges_found)
}
fn merge(index_writer: &IndexWriter<D, C, MS, MP>, merge: &mut OneMerge<D, C>) -> Result<()> {
if let Err(e) = Self::do_merge(index_writer, merge) {
index_writer.writer.tragic_event(e, "merge")?;
}
Ok(())
}
fn do_merge(
index_writer: &IndexWriter<D, C, MS, MP>,
merge: &mut OneMerge<D, C>,
) -> Result<()> {
let res = Self::execute_merge(index_writer, merge);
{
let l = index_writer.writer.lock.lock().unwrap();
let writer_mut = unsafe { index_writer.writer.writer_mut(&l) };
writer_mut.merge_finish(&l, merge);
if res.is_err() {
trace!("IW - hit error during merge");
} else if !merge.rate_limiter.aborted() && merge.max_num_segments.get().is_some()
|| (!index_writer.writer.closed.load(Ordering::Acquire)
&& !index_writer.writer.closing.load(Ordering::Acquire))
{
writer_mut.update_pending_merges(
MergerTrigger::MergeFinished,
merge.max_num_segments.get(),
index_writer,
&l,
)?;
}
}
match res {
Err(Error(Index(MergeAborted(_)), _)) => {
let segments: Vec<_> = merge.segments.iter().map(|s| &s.info.name).collect();
warn!("the merge for segments {:?} is aborted!", segments);
Ok(())
}
Ok(()) => Ok(()),
Err(e) => Err(e),
}
}
fn execute_merge(
index_writer: &IndexWriter<D, C, MS, MP>,
merge: &mut OneMerge<D, C>,
) -> Result<()> {
index_writer
.writer
.rate_limiters
.get_or(|| Box::new(Arc::clone(&merge.rate_limiter)));
index_writer.writer.merge_init(merge)?;
trace!("IW - now merge");
Self::merge_middle(index_writer, merge)?;
Ok(())
}
fn merge_init(&self, merge: &mut OneMerge<D, C>) -> Result<()> {
let lock = Arc::clone(&self.lock);
let l = lock.lock().unwrap();
let writer = unsafe { self.writer_mut(&l) };
let res = writer.do_merge_init(merge, &l);
if res.is_err() {
trace!("IW - hit error in merge_init");
writer.merge_finish(&l, merge);
}
res
}
fn do_merge_init(&mut self, merge: &mut OneMerge<D, C>, l: &MutexGuard<()>) -> Result<()> {
debug_assert!(merge.register_done);
if self.tragedy.is_some() {
bail!(IllegalState(
"this writer hit an unrecoverable error; cannot merge".into()
));
}
if merge.info.is_some() {
return Ok(());
}
if merge.rate_limiter.aborted() {
return Ok(());
}
trace!(
"IW - now apply deletes for {} merging segments.",
merge.segments.len()
);
let result = self
.buffered_updates_stream
.apply_deletes_and_updates(&self.reader_pool, &merge.segments)?;
if result.any_deletes {
self.check_point(&l)?;
}
if !result.all_deleted.is_empty() {
trace!(
"IW - drop 100% deleted {} segments",
result.all_deleted.len()
);
for info in &result.all_deleted {
self.segment_infos.remove(info);
self.pending_num_docs
.fetch_sub(info.info.max_doc as i64, Ordering::AcqRel);
if merge.segments.contains(info) {
self.merging_segments.remove(&info.info.name);
merge.segments.remove_item(info);
}
self.reader_pool.drop(info.as_ref())?;
}
self.check_point(&l)?;
}
let merge_segment_name = self.new_segment_name();
let mut si = SegmentInfo::new(
VERSION_LATEST,
&merge_segment_name,
-1,
Arc::clone(&self.directory_orig),
false,
Some(Arc::clone(&self.config.codec)),
HashMap::new(),
random_id(),
HashMap::new(),
self.config.index_sort().map(Clone::clone),
)?;
let mut details = HashMap::new();
details.insert(
"merge_max_num_segments".into(),
merge.max_num_segments.get().unwrap_or(0).to_string(),
);
details.insert("merge_factor".into(), merge.segments.len().to_string());
details.insert("source".into(), "merge".into());
si.set_diagnostics(details);
let sci = SegmentCommitInfo::new(si, 0, -1, -1, -1, HashMap::new(), HashSet::new());
merge.info = Some(Arc::new(sci));
self.buffered_updates_stream.prune(&self.segment_infos);
Ok(())
}
fn merge_middle(
index_writer: &IndexWriter<D, C, MS, MP>,
merge: &mut OneMerge<D, C>,
) -> Result<i32> {
match Self::do_merge_middle(index_writer, merge) {
Err(e) => {
error!("merge_middle err {:?}", e);
let l = index_writer.writer.lock.lock().unwrap();
index_writer.writer.close_merge_readers(merge, true, &l)?;
Err(e)
}
Ok(r) => Ok(r),
}
}
fn do_merge_middle(
index_writer: &IndexWriter<D, C, MS, MP>,
merge: &mut OneMerge<D, C>,
) -> Result<i32> {
merge.rate_limiter.check_abort()?;
let context = IOContext::Merge(merge.store_merge_info());
let dir_wrapper = Arc::new(TrackingDirectoryWrapper::new(DerefWrapper(
index_writer.writer.merge_directory.clone(),
)));
merge.readers = Vec::with_capacity(merge.segments.len());
let mut seg_upto = 0;
while seg_upto < merge.segments.len() {
let rld = index_writer
.writer
.reader_pool
.get_or_create(&merge.segments[seg_upto])?;
let mut reader: Arc<SegmentReader<D, C>>;
let live_docs: BitsRef;
let del_count: i32;
{
let _l = index_writer.writer.lock.lock()?;
let res = rld.reader_for_merge(&context);
match res {
Ok(r) => reader = r,
Err(e) => {
return Err(e);
}
}
live_docs = rld.readonly_live_docs();
del_count =
rld.pending_delete_count() as i32 + merge.segments[seg_upto].del_count();
debug_assert!(rld.verify_doc_counts());
}
if reader.num_deleted_docs() != del_count {
debug_assert!(del_count > reader.num_deleted_docs());
let new_reader = {
let _l = index_writer.writer.lock.lock()?;
SegmentReader::build_from(
Arc::clone(&merge.segments[seg_upto]),
reader.as_ref(),
live_docs,
merge.segments[seg_upto].info.max_doc - del_count,
true,
)?
};
reader = Arc::new(new_reader);
}
merge.readers.push(reader);
debug_assert!(del_count <= merge.segments[seg_upto].info.max_doc);
seg_upto += 1;
}
let merge_readers: Vec<Arc<SegmentReader<D, C>>> =
merge.readers.iter().map(Arc::clone).collect();
let mut merger = SegmentMerger::new(
merge_readers,
&merge.info.as_ref().unwrap().info,
Arc::clone(&dir_wrapper),
FieldNumbersRef::new(Arc::clone(&index_writer.writer.global_field_numbers)),
context,
)?;
merge.rate_limiter.check_abort()?;
merge.merge_start_time.write(Some(SystemTime::now()));
if merger.should_merge() {
merger.merge()?;
}
merger
.merge_state
.segment_info()
.set_files(&dir_wrapper.create_files())?;
if !merger.should_merge() {
debug_assert_eq!(merger.merge_state.segment_info().max_doc, 0);
index_writer
.writer
.commit_merge(merge, &merger.merge_state)?;
return Ok(0);
}
debug_assert!(merger.merge_state.segment_info().max_doc > 0);
let use_compound_file = {
let _l = index_writer.writer.lock.lock()?;
index_writer.writer.config.merge_policy().use_compound_file(
&index_writer.writer.segment_infos,
merge.info.as_ref().unwrap().as_ref(),
index_writer,
)
};
if use_compound_file {
let tracking_cfs_dir =
TrackingDirectoryWrapper::new(&index_writer.writer.merge_directory);
let info = merge.info.as_mut().unwrap();
let segment_info = Arc::get_mut(info).unwrap();
let files_to_remove = segment_info.files();
if let Err(e) = index_writer.writer.create_compound_file(
&tracking_cfs_dir,
&mut segment_info.info,
&context,
) {
let _l = index_writer.writer.lock.lock().unwrap();
if merge.rate_limiter.aborted() {
return Ok(0);
} else {
index_writer
.writer
.delete_new_files(&segment_info.files())?;
return Err(e);
}
}
{
let _l = index_writer.writer.lock.lock().unwrap();
index_writer.writer.delete_new_files(&files_to_remove)?;
if merge.rate_limiter.aborted() {
index_writer
.writer
.delete_new_files(&segment_info.files())?;
return Ok(0);
}
}
segment_info.info.set_use_compound_file();
}
{
let info = merge.info.as_mut().unwrap();
let segment_info = Arc::get_mut(info).unwrap();
if let Err(e) = index_writer
.writer
.config
.codec()
.segment_info_format()
.write(
&index_writer.writer.directory,
&mut segment_info.info,
&context,
)
{
index_writer
.writer
.delete_new_files(&segment_info.files())?;
return Err(e);
}
}
if !index_writer
.writer
.commit_merge(merge, &merger.merge_state)?
{
return Ok(0);
}
Ok(merge.info.as_ref().unwrap().info.max_doc)
}
fn merge_finish(&mut self, _lock: &MutexGuard<()>, merge: &mut OneMerge<D, C>) {
self.cond.notify_all();
if merge.register_done {
for info in &merge.segments {
self.merging_segments.remove(&info.info.name);
}
merge.register_done = false;
}
self.running_merges.remove(&merge.id);
}
fn wait_for_merges(index_writer: &IndexWriter<D, C, MS, MP>) -> Result<()> {
index_writer
.writer
.merge_scheduler
.merge(index_writer, MergerTrigger::Closing, false)?;
{
let mut l = index_writer.writer.lock.lock()?;
index_writer.writer.ensure_open(false)?;
debug!("IW - wait for merges");
while !index_writer.writer.pending_merges.is_empty()
|| !index_writer.writer.running_merges.is_empty()
{
let (loc, _) = index_writer
.writer
.cond
.wait_timeout(l, Duration::from_millis(1000))?;
l = loc;
}
debug_assert!(index_writer.writer.merging_segments.is_empty());
debug!("IW - wait for merges done");
}
Ok(())
}
fn abort_merges<'a>(&mut self, mut lock: MutexGuard<'a, ()>) -> Result<MutexGuard<'a, ()>> {
self.stop_merges = true;
let pending_merges = mem::replace(&mut self.pending_merges, VecDeque::new());
for mut merge in pending_merges {
merge.rate_limiter.set_abort();
self.merge_finish(&lock, &mut merge);
}
for merge in self.running_merges.values() {
merge.rate_limiter.set_abort();
}
while !self.running_merges.is_empty() {
let (loc, _) = self.cond.wait_timeout(lock, Duration::from_millis(1000))?;
warn!(
"IW - abort merges, waiting for running_merges to be empty, current size: {}",
self.running_merges.len()
);
lock = loc;
}
self.cond.notify_all();
debug!(
"debug abort_merges {} {} {}",
self.pending_merges.len(),
self.running_merges.len(),
self.merging_segments.len()
);
debug_assert!(self.merging_segments.is_empty());
trace!("IW - all running merges have aborted");
Ok(lock)
}
fn commit_merged_deletes_and_updates(
&mut self,
merge: &OneMerge<D, C>,
merge_state: &MergeState<D, C>,
_lock: &MutexGuard<()>,
) -> Result<Option<Arc<ReadersAndUpdates<D, C, MS, MP>>>> {
let mut min_gen = i64::max_value();
let mut holder = MergedDeletesAndUpdates::default();
debug_assert_eq!(merge.segments.len(), merge_state.doc_maps.len());
for i in 0..merge.segments.len() {
let info = &merge.segments[i];
min_gen = min_gen.min(info.buffered_deletes_gen());
let max_doc = info.info.max_doc;
let prev_live_docs = merge.readers[i].live_docs();
let rld = self.reader_pool.get(info.as_ref()).unwrap();
let inner = rld.inner.lock()?;
if !prev_live_docs.is_empty() {
debug_assert!(inner.live_docs.is_some());
debug_assert_eq!(prev_live_docs.len(), max_doc as usize);
let cur_live_doc = inner.live_docs();
debug_assert_eq!(cur_live_doc.len(), max_doc as usize);
if ptr::eq(cur_live_doc.as_ref(), prev_live_docs.as_ref()) {
for j in 0..max_doc as usize {
if !prev_live_docs.get(j)? {
debug_assert!(!cur_live_doc.get(j).unwrap());
} else if !cur_live_doc.get(j)? {
if holder.merged_deletes_and_updates.is_none()
|| !holder.inited_writable_live_docs
{
holder.init(&self.reader_pool, merge, true)?;
}
let doc_id = merge_state.doc_maps[i]
.get(merge_state.leaf_doc_maps[i].get(j as i32)?)?;
holder
.merged_deletes_and_updates
.as_ref()
.unwrap()
.delete(doc_id)?;
}
}
}
} else if inner.live_docs.is_some() && !inner.live_docs().is_empty() {
let current_live_docs = inner.live_docs();
debug_assert_eq!(current_live_docs.len(), max_doc as usize);
for j in 0..max_doc {
if !current_live_docs.get(j as usize)? {
if holder.merged_deletes_and_updates.is_none()
|| !holder.inited_writable_live_docs
{
holder.init(&self.reader_pool, merge, true)?;
}
let doc_id =
merge_state.doc_maps[i].get(merge_state.leaf_doc_maps[i].get(j)?)?;
holder
.merged_deletes_and_updates
.as_ref()
.unwrap()
.delete(doc_id)?;
}
}
}
}
merge
.info
.as_ref()
.unwrap()
.set_buffered_deletes_gen(min_gen);
Ok(holder.merged_deletes_and_updates.take())
}
fn commit_merge(
&self,
merge: &mut OneMerge<D, C>,
merge_state: &MergeState<D, C>,
) -> Result<bool> {
let l = self.lock.lock()?;
let writer_mut = unsafe { self.writer_mut(&l) };
if self.tragedy.is_some() {
bail!(IllegalState(
"this writer hit an unrecoverable error".into()
));
}
trace!("IW - commit_merge");
debug_assert!(merge.register_done);
if merge.rate_limiter.aborted() {
trace!("IW - commit_merge: skip, it was aborted");
self.reader_pool
.drop(merge.info.as_ref().unwrap().as_ref())?;
self.delete_new_files(merge_state.segment_info().files())?;
return Ok(false);
}
let merge_updates = if merge_state.segment_info().max_doc == 0 {
None
} else {
writer_mut.commit_merged_deletes_and_updates(merge, merge_state, &l)?
};
debug_assert!(!self
.segment_infos
.segments
.contains(merge.info.as_ref().unwrap()));
let drop_segment = merge.segments.is_empty()
|| merge.info.as_ref().unwrap().info.max_doc == 0
|| (merge_updates.is_some()
&& merge_updates.as_ref().unwrap().pending_delete_count()
== merge.info.as_ref().unwrap().info.max_doc as u32);
debug_assert!(!merge.segments.is_empty() || drop_segment);
debug_assert!(merge.info.as_ref().unwrap().info.max_doc > 0 || drop_segment);
if let Some(merged_updates) = merge_updates {
if drop_segment {
merged_updates.drop_changes();
}
if let Err(e) = self.reader_pool.release(&merged_updates) {
merged_updates.drop_changes();
if let Err(e) = self.reader_pool.drop(merge.info.as_ref().unwrap().as_ref()) {
warn!("IndexWriter: drop segment failed with error: {:?}", e);
}
return Err(e);
}
}
writer_mut
.segment_infos
.apply_merge_changes(merge, drop_segment);
let del_doc_count = merge.total_max_doc as i32 - merge.info.as_ref().unwrap().info.max_doc;
debug_assert!(del_doc_count >= 0);
self.pending_num_docs
.fetch_sub(del_doc_count as i64, Ordering::AcqRel);
if drop_segment {
self.reader_pool
.drop(merge.info.as_ref().unwrap().as_ref())?;
self.delete_new_files(&merge.info.as_ref().unwrap().files())?;
}
match self.close_merge_readers(merge, false, &l) {
Ok(()) => {
writer_mut.check_point(&l)?;
}
Err(e) => {
let _ = writer_mut.check_point(&l);
return Err(e);
}
}
if merge.max_num_segments.get().is_some() && !drop_segment {
if !self
.segments_to_merge
.contains_key(merge.info.as_ref().unwrap())
{
writer_mut
.segments_to_merge
.insert(Arc::clone(merge.info.as_ref().unwrap()), false);
}
}
Ok(true)
}
fn close_merge_readers(
&self,
merge: &mut OneMerge<D, C>,
suppress_errors: bool,
_lock: &MutexGuard<()>,
) -> Result<()> {
let drop = !suppress_errors;
let mut res = Ok(());
for reader in &merge.readers {
let rld = self.reader_pool.get(reader.si.as_ref());
debug_assert!(rld.is_some());
let rld = rld.unwrap();
if drop {
rld.drop_changes();
}
let mut res_drop = self.reader_pool.release(&rld);
if res_drop.is_ok() {
if drop {
res_drop = self.reader_pool.drop(rld.info.as_ref());
}
}
if let Err(e) = res_drop {
if res.is_ok() {
res = Err(e);
}
}
}
merge.readers.clear();
if !suppress_errors {
return res;
}
Ok(())
}
fn tragic_event(&self, tragedy: Error, location: &str) -> Result<()> {
trace!("IW - hit tragic '{:?}' inside {}", &tragedy, location);
{
let l = self.lock.lock()?;
if self.tragedy.is_some() {
bail!(tragedy);
}
let writer = unsafe { self.writer_mut(&l) };
writer.tragedy = Some(tragedy);
}
if self.should_close(false) {
let commit_lock = self.commit_lock.lock()?;
self.rollback_internal(&commit_lock)?;
}
bail!(IllegalState(format!(
"this writer hit an unrecoverable error; {:?}",
&self.tragedy
)))
}
}
pub struct ReaderPool<
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
> {
lock: Mutex<()>,
reader_map: Arc<Mutex<HashMap<String, Arc<ReadersAndUpdates<D, C, MS, MP>>>>>,
index_writer: Weak<IndexWriterInner<D, C, MS, MP>>,
inited: bool,
}
impl<D, C, MS, MP> ReaderPool<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
pub fn new() -> Self {
Default::default()
}
fn init(&mut self, index_writer: Weak<IndexWriterInner<D, C, MS, MP>>) {
self.index_writer = index_writer;
self.inited = true;
}
fn writer(&self) -> Arc<IndexWriterInner<D, C, MS, MP>> {
debug_assert!(self.inited);
self.index_writer.upgrade().unwrap()
}
pub fn info_is_live(&self, info: &SegmentCommitInfo<D, C>) -> bool {
let l = self.lock.lock().unwrap();
self.info_is_live_syn(info, &l)
}
fn info_is_live_syn(&self, info: &SegmentCommitInfo<D, C>, _lock: &MutexGuard<()>) -> bool {
self.writer()
.segment_infos
.segments
.iter()
.any(|i| i.info.name == info.info.name)
}
pub fn drop(&self, info: &SegmentCommitInfo<D, C>) -> Result<()> {
let _lock = self.lock.lock().unwrap();
if let Some(rld) = self.reader_map.lock()?.remove(&info.info.name) {
rld.drop_readers()?;
}
Ok(())
}
fn drop_all(&self, do_save: bool) -> Result<()> {
let l = self.lock.lock()?;
let mut prior_err = Ok(());
let keys: Vec<String> = self.reader_map.lock()?.keys().cloned().collect();
let mut reader_map = self.reader_map.lock()?;
for key in keys {
{
let rld = reader_map.get(&key).unwrap();
if let Err(e) = self.flush_and_check(do_save, rld, &l) {
if do_save {
return Err(e);
} else if prior_err.is_ok() {
prior_err = Err(e);
}
}
}
let rld = reader_map.remove(&key).unwrap();
if let Err(e) = rld.drop_readers() {
if do_save {
return Err(e);
} else if prior_err.is_ok() {
prior_err = Err(e);
}
}
}
debug_assert!(reader_map.is_empty());
prior_err
}
fn flush_and_check(
&self,
do_save: bool,
rld: &ReadersAndUpdates<D, C, MS, MP>,
guard: &MutexGuard<()>,
) -> Result<()> {
if do_save && rld.write_live_docs(&self.writer().directory)? {
debug_assert!(self.info_is_live(rld.info.as_ref()));
self.check_point_no_sis(guard)?;
}
Ok(())
}
#[allow(dead_code)]
fn any_pending_deletes(&self) -> bool {
let _lock = self.lock.lock().unwrap();
(&self.reader_map.lock().unwrap())
.values()
.any(|rld| rld.pending_delete_count() > 0)
}
pub fn release(&self, rld: &Arc<ReadersAndUpdates<D, C, MS, MP>>) -> Result<()> {
let _lock = self.lock.lock().unwrap();
rld.dec_ref();
debug_assert!(rld.ref_count() >= 1);
Ok(())
}
fn check_point_no_sis(&self, _guard: &MutexGuard<()>) -> Result<()> {
let writer_inner = self.writer();
unsafe {
let writer = writer_inner.as_ref() as *const IndexWriterInner<D, C, MS, MP>
as *mut IndexWriterInner<D, C, MS, MP>;
(*writer).change_count.fetch_add(1, Ordering::AcqRel);
(*writer)
.deleter
.checkpoint(&(*writer).segment_infos, false)
}
}
pub fn commit(&self, infos: &SegmentInfos<D, C>) -> Result<()> {
let l = self.lock.lock()?;
for info in &infos.segments {
if let Some(rld) = self.reader_map.lock()?.get(&info.info.name) {
if rld.write_live_docs(&self.writer().directory)? {
debug_assert!(self.info_is_live_syn(info.as_ref(), &l));
self.check_point_no_sis(&l)?;
}
}
}
Ok(())
}
pub fn get(
&self,
info: &SegmentCommitInfo<D, C>,
) -> Option<Arc<ReadersAndUpdates<D, C, MS, MP>>> {
let _lock = self.lock.lock().unwrap();
debug_assert!(ptr::eq(
info.info.directory.as_ref(),
self.writer().directory_orig.as_ref(),
));
self.reader_map
.lock()
.unwrap()
.get(&info.info.name)
.map(Arc::clone)
}
pub fn get_or_create(
&self,
info: &Arc<SegmentCommitInfo<D, C>>,
) -> Result<Arc<ReadersAndUpdates<D, C, MS, MP>>> {
let writer = self.writer();
writer.ensure_open(false)?;
let _lock = self.lock.lock().unwrap();
debug_assert!(ptr::eq(
info.info.directory.as_ref(),
writer.directory_orig.as_ref(),
));
if !self.reader_map.lock()?.contains_key(&info.info.name) {
let rld = Arc::new(ReadersAndUpdates::new(
Weak::clone(&self.index_writer),
Arc::clone(info),
));
self.reader_map.lock()?.insert(info.info.name.clone(), rld);
}
let rld = Arc::clone(self.reader_map.lock()?.get_mut(&info.info.name).unwrap());
rld.inc_ref();
Ok(rld)
}
}
impl<D: Directory + Send + Sync + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> Default
for ReaderPool<D, C, MS, MP>
{
fn default() -> Self {
ReaderPool {
lock: Mutex::new(()),
reader_map: Arc::new(Mutex::new(HashMap::new())),
index_writer: Weak::new(),
inited: false,
}
}
}
impl<D: Directory + Send + Sync + 'static, C: Codec, MS: MergeScheduler, MP: MergePolicy> Drop
for ReaderPool<D, C, MS, MP>
{
fn drop(&mut self) {
if let Err(e) = self.drop_all(false) {
error!("ReaderPool: drop_all on close failed by: {:?}", e);
}
}
}
fn read_field_infos<D: Directory, C: Codec>(si: &SegmentCommitInfo<D, C>) -> Result<FieldInfos> {
let codec = si.info.codec();
let reader = codec.field_infos_format();
if si.has_field_updates() {
let segment_suffix = to_base36(si.field_infos_gen() as u64);
reader.read(
&*si.info.directory,
&si.info,
&segment_suffix,
&IOContext::READ_ONCE,
)
} else if si.info.is_compound_file() {
let cfs = codec.compound_format().get_compound_reader(
Arc::clone(&si.info.directory),
&si.info,
&IOContext::Default,
)?;
reader.read(&cfs, &si.info, "", &IOContext::READ_ONCE)
} else {
reader.read(&*si.info.directory, &si.info, "", &IOContext::READ_ONCE)
}
}
pub struct ReadersAndUpdates<
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
> {
pub info: Arc<SegmentCommitInfo<D, C>>,
pub inner: Mutex<ReadersAndUpdatesInner<D, C, MS, MP>>,
ref_count: AtomicU32,
}
impl<D, C, MS, MP> ReadersAndUpdates<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn new(
writer: Weak<IndexWriterInner<D, C, MS, MP>>,
info: Arc<SegmentCommitInfo<D, C>>,
) -> Self {
ReadersAndUpdates {
inner: Mutex::new(ReadersAndUpdatesInner::new(writer)),
ref_count: AtomicU32::new(1),
info,
}
}
#[allow(dead_code)]
fn with_reader(
writer: Weak<IndexWriterInner<D, C, MS, MP>>,
reader: SegmentReader<D, C>,
) -> Self {
let info = Arc::clone(&reader.si);
let inner = ReadersAndUpdatesInner::with_reader(writer, reader);
ReadersAndUpdates {
inner: Mutex::new(inner),
ref_count: AtomicU32::new(1),
info,
}
}
pub fn pending_delete_count(&self) -> u32 {
let guard = self.inner.lock().unwrap();
guard.pending_delete_count
}
pub fn ref_count(&self) -> u32 {
self.ref_count.load(Ordering::Acquire)
}
pub fn inc_ref(&self) {
self.ref_count.fetch_add(1, Ordering::AcqRel);
}
pub fn dec_ref(&self) {
let org = self.ref_count.fetch_sub(1, Ordering::AcqRel);
debug_assert!(org > 0);
}
pub fn create_reader_if_not_exist(&self, context: &IOContext) -> Result<()> {
let mut guard = self.inner.lock()?;
guard.create_reader_if_not_exist(&self.info, context)
}
pub fn delete(&self, doc_id: DocId) -> Result<bool> {
let mut guard = self.inner.lock()?;
guard.delete(doc_id)
}
pub fn init_writable_live_docs(&self) -> Result<()> {
let mut guard = self.inner.lock()?;
guard.init_writable_live_docs(&self.info)
}
pub fn write_live_docs<D1: Directory>(&self, dir: &Arc<D1>) -> Result<bool> {
let mut guard = self.inner.lock()?;
guard.write_live_docs(&self.info, dir)
}
pub fn drop_readers(&self) -> Result<()> {
let mut guard = self.inner.lock()?;
guard.reader = None;
self.dec_ref();
Ok(())
}
pub fn get_readonly_clone(&self, context: &IOContext) -> Result<SegmentReader<D, C>> {
let mut guard = self.inner.lock()?;
guard.get_readonly_clone(&self.info, context)
}
pub fn test_doc_id(&self, doc_id: usize) -> Result<bool> {
let guard = self.inner.lock().unwrap();
debug_assert!(guard.live_docs.is_some());
guard.live_docs().get(doc_id)
}
fn verify_doc_counts(&self) -> bool {
let guard = self.inner.lock().unwrap();
guard.verify_doc_counts(self.info.as_ref())
}
pub fn readonly_live_docs(&self) -> BitsRef {
let mut guard = self.inner.lock().unwrap();
debug_assert!(guard.live_docs.is_some());
guard.live_docs_shared = true;
Arc::clone(guard.live_docs.as_ref().unwrap())
}
pub fn drop_changes(&self) {
let mut guard = self.inner.lock().unwrap();
guard.pending_delete_count = 0;
guard.drop_merging_updates();
}
pub fn reader_for_merge(&self, context: &IOContext) -> Result<Arc<SegmentReader<D, C>>> {
self.create_reader_if_not_exist(context)?;
let mut guard = self.inner.lock()?;
guard.is_merging = true;
Ok(Arc::clone(guard.reader.as_ref().unwrap()))
}
}
pub struct ReadersAndUpdatesInner<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
_writer: Weak<IndexWriterInner<D, C, MS, MP>>,
reader: Option<Arc<SegmentReader<D, C>>>,
live_docs: Option<BitsRef>,
pub pending_delete_count: u32,
live_docs_shared: bool,
is_merging: bool,
}
impl<D, C, MS, MP> ReadersAndUpdatesInner<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn new(writer: Weak<IndexWriterInner<D, C, MS, MP>>) -> Self {
ReadersAndUpdatesInner {
_writer: writer,
reader: None,
live_docs: None,
pending_delete_count: 0,
live_docs_shared: true,
is_merging: false,
}
}
#[allow(dead_code)]
fn with_reader(
writer: Weak<IndexWriterInner<D, C, MS, MP>>,
reader: SegmentReader<D, C>,
) -> Self {
let live_docs = reader.live_docs();
let pending_delete_count = reader.num_deleted_docs();
debug_assert!(pending_delete_count >= 0);
ReadersAndUpdatesInner {
_writer: writer,
reader: Some(Arc::new(reader)),
live_docs: Some(live_docs),
pending_delete_count: pending_delete_count as u32,
live_docs_shared: true,
is_merging: false,
}
}
fn verify_doc_counts(&self, info: &SegmentCommitInfo<D, C>) -> bool {
let mut count = 0;
if let Some(ref live_docs) = self.live_docs {
for i in 0..info.info.max_doc {
if live_docs.get(i as usize).unwrap() {
count += 1;
}
}
} else {
count = info.info.max_doc;
}
info.info.max_doc - info.del_count() - self.pending_delete_count as i32 == count
}
pub fn create_reader_if_not_exist(
&mut self,
info: &Arc<SegmentCommitInfo<D, C>>,
context: &IOContext,
) -> Result<()> {
if self.reader.is_none() {
self.reader = Some(Arc::new(SegmentReader::open(info, context)?));
if self.live_docs.is_none() {
self.live_docs = Some(self.reader.as_ref().unwrap().live_docs());
}
}
Ok(())
}
fn live_docs(&self) -> &BitsRef {
debug_assert!(self.live_docs.is_some());
self.live_docs.as_ref().unwrap()
}
pub fn reader(&self) -> &Arc<SegmentReader<D, C>> {
debug_assert!(self.reader.is_some());
self.reader.as_ref().unwrap()
}
pub fn delete(&mut self, doc_id: DocId) -> Result<bool> {
debug_assert!(self.live_docs.is_some());
let live_docs = self.live_docs.as_mut().unwrap();
debug_assert!(
doc_id >= 0 && doc_id < live_docs.len() as i32,
"doc: {}, live docs len: {}",
doc_id,
live_docs.len()
);
debug_assert!(!self.live_docs_shared);
let did_deleted = live_docs.get(doc_id as usize)?;
if did_deleted {
Arc::get_mut(live_docs)
.unwrap()
.as_bit_set_mut()
.clear(doc_id as usize);
self.pending_delete_count += 1;
}
Ok(did_deleted)
}
pub fn get_readonly_clone(
&mut self,
info: &Arc<SegmentCommitInfo<D, C>>,
context: &IOContext,
) -> Result<SegmentReader<D, C>> {
if self.reader.is_none() {
self.create_reader_if_not_exist(info, context)?;
debug_assert!(self.reader.is_some());
}
self.live_docs_shared = true;
let live_docs = if self.live_docs.is_some() {
Arc::clone(self.live_docs.as_ref().unwrap())
} else {
let live_docs = &self.reader.as_ref().unwrap().live_docs;
assert!(live_docs.is_empty());
Arc::clone(live_docs)
};
let reader = self.reader.as_ref().unwrap();
SegmentReader::build(
Arc::clone(info),
live_docs,
info.info.max_doc - info.del_count() - self.pending_delete_count as i32,
Arc::clone(&reader.core),
)
}
pub fn init_writable_live_docs(&mut self, info: &Arc<SegmentCommitInfo<D, C>>) -> Result<()> {
debug_assert!(info.info.max_doc > 0);
if self.live_docs_shared {
let live_docs_format = info.info.codec().live_docs_format();
let mut live_docs = None;
if let Some(ref docs) = self.live_docs {
if !docs.is_empty() {
live_docs = Some(live_docs_format.new_live_docs_from_existing(docs.as_ref())?);
}
};
if live_docs.is_none() {
let bits = live_docs_format.new_live_docs(info.info.max_doc() as usize)?;
live_docs = Some(Arc::new(bits));
}
self.live_docs = live_docs;
self.live_docs_shared = false;
}
Ok(())
}
pub fn write_live_docs<D1: Directory>(
&mut self,
info: &Arc<SegmentCommitInfo<D, C>>,
dir: &Arc<D1>,
) -> Result<bool> {
debug_assert!(self.live_docs.is_some());
if self.pending_delete_count == 0 {
return Ok(false);
}
debug_assert_eq!(self.live_docs().len(), info.info.max_doc as usize);
let tracking_dir = TrackingDirectoryWrapper::new(dir.as_ref());
let res = info.info.codec().live_docs_format().write_live_docs(
self.live_docs().as_ref(),
&tracking_dir,
info,
self.pending_delete_count as i32,
&IOContext::Default,
);
if res.is_err() {
info.advance_next_write_del_gen();
for file_name in &tracking_dir.create_files() {
if let Err(e) = tracking_dir.delete_file(file_name.as_str()) {
warn!("delete file '{}' failed by '{:?}'", file_name, e);
}
}
}
info.advance_del_gen();
let del_count = info.del_count() + self.pending_delete_count as i32;
info.set_del_count(del_count)?;
self.pending_delete_count = 0;
match res {
Err(e) => Err(e),
_ => Ok(true),
}
}
fn drop_merging_updates(&mut self) {
self.is_merging = false;
}
#[allow(dead_code)]
fn write_field_infos_gen<F: FieldInfosFormat>(
&self,
info: &Arc<SegmentCommitInfo<D, C>>,
field_infos: &FieldInfos,
dir: &Arc<D>,
infos_format: &F,
) -> Result<HashSet<String>> {
let next_field_infos_gen = info.next_field_infos_gen();
let segment_suffix = to_base36(next_field_infos_gen as u64);
let infos_context = IOContext::Flush(FlushInfo::new(info.info.max_doc() as u32));
let tracking_dir = TrackingDirectoryWrapper::new(dir.as_ref());
infos_format.write(
&tracking_dir,
&info.info,
&segment_suffix,
field_infos,
&infos_context,
)?;
info.advance_del_gen();
Ok(tracking_dir.get_create_files())
}
pub fn write_field_updates<DW: Directory>(&self, _dir: &DW) -> Result<()> {
unreachable!()
}
}
struct MergedDeletesAndUpdates<
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
> {
merged_deletes_and_updates: Option<Arc<ReadersAndUpdates<D, C, MS, MP>>>,
inited_writable_live_docs: bool,
}
impl<D, C, MS, MP> Default for MergedDeletesAndUpdates<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn default() -> Self {
Self {
merged_deletes_and_updates: None,
inited_writable_live_docs: false,
}
}
}
impl<D, C, MS, MP> MergedDeletesAndUpdates<D, C, MS, MP>
where
D: Directory + Send + Sync + 'static,
C: Codec,
MS: MergeScheduler,
MP: MergePolicy,
{
fn init(
&mut self,
reader_pool: &ReaderPool<D, C, MS, MP>,
merge: &OneMerge<D, C>,
init_writable_live_docs: bool,
) -> Result<()> {
if self.merged_deletes_and_updates.is_none() {
self.merged_deletes_and_updates =
Some(reader_pool.get_or_create(merge.info.as_ref().unwrap())?);
}
if init_writable_live_docs && !self.inited_writable_live_docs {
self.merged_deletes_and_updates
.as_ref()
.unwrap()
.init_writable_live_docs()?;
self.inited_writable_live_docs = true;
}
Ok(())
}
}