1use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
60 pub(super) directory: Arc<D>,
61 pub(super) schema: Arc<Schema>,
62 pub(super) config: IndexConfig,
63 doc_sender: async_channel::Sender<Document>,
66 workers: Vec<std::thread::JoinHandle<()>>,
68 worker_state: Arc<WorkerState<D>>,
70 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72 flushed_segments: Vec<(String, u32)>,
74 primary_key_index: Option<super::primary_key::PrimaryKeyIndex>,
76}
77
78struct WorkerState<D: DirectoryWriter + 'static> {
80 directory: Arc<D>,
81 schema: Arc<Schema>,
82 builder_config: SegmentBuilderConfig,
83 tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
84 memory_budget_per_worker: usize,
86 segment_manager: Arc<crate::merge::SegmentManager<D>>,
88 built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
90
91 flush_count: AtomicUsize,
98 flush_mutex: parking_lot::Mutex<()>,
100 flush_cvar: parking_lot::Condvar,
101 resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
103 resume_epoch: AtomicUsize,
106 resume_cvar: parking_lot::Condvar,
108 shutdown: AtomicBool,
110 num_workers: usize,
112}
113
114impl<D: DirectoryWriter + 'static> IndexWriter<D> {
115 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
117 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
118 }
119
120 pub async fn create_with_config(
122 directory: D,
123 schema: Schema,
124 config: IndexConfig,
125 builder_config: SegmentBuilderConfig,
126 ) -> Result<Self> {
127 let directory = Arc::new(directory);
128 let schema = Arc::new(schema);
129 let metadata = super::IndexMetadata::new((*schema).clone());
130
131 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
132 Arc::clone(&directory),
133 Arc::clone(&schema),
134 metadata,
135 config.merge_policy.clone_box(),
136 config.term_cache_blocks,
137 config.max_concurrent_merges,
138 ));
139 segment_manager.update_metadata(|_| {}).await?;
140
141 Ok(Self::new_with_parts(
142 directory,
143 schema,
144 config,
145 builder_config,
146 segment_manager,
147 ))
148 }
149
150 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
152 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
153 }
154
155 pub async fn open_with_config(
157 directory: D,
158 config: IndexConfig,
159 builder_config: SegmentBuilderConfig,
160 ) -> Result<Self> {
161 let directory = Arc::new(directory);
162 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
163 let schema = Arc::new(metadata.schema.clone());
164
165 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
166 Arc::clone(&directory),
167 Arc::clone(&schema),
168 metadata,
169 config.merge_policy.clone_box(),
170 config.term_cache_blocks,
171 config.max_concurrent_merges,
172 ));
173 segment_manager.load_and_publish_trained().await;
174
175 Ok(Self::new_with_parts(
176 directory,
177 schema,
178 config,
179 builder_config,
180 segment_manager,
181 ))
182 }
183
184 pub fn from_index(index: &super::Index<D>) -> Self {
187 Self::new_with_parts(
188 Arc::clone(&index.directory),
189 Arc::clone(&index.schema),
190 index.config.clone(),
191 SegmentBuilderConfig::default(),
192 Arc::clone(&index.segment_manager),
193 )
194 }
195
196 fn new_with_parts(
202 directory: Arc<D>,
203 schema: Arc<Schema>,
204 config: IndexConfig,
205 builder_config: SegmentBuilderConfig,
206 segment_manager: Arc<crate::merge::SegmentManager<D>>,
207 ) -> Self {
208 let registry = crate::tokenizer::TokenizerRegistry::new();
210 let mut tokenizers = FxHashMap::default();
211 for (field, entry) in schema.fields() {
212 if matches!(entry.field_type, crate::dsl::FieldType::Text)
213 && let Some(ref tok_name) = entry.tokenizer
214 && let Some(tok) = registry.get(tok_name)
215 {
216 tokenizers.insert(field, tok);
217 }
218 }
219
220 let num_workers = config.num_indexing_threads.max(1);
221 let worker_state = Arc::new(WorkerState {
222 directory: Arc::clone(&directory),
223 schema: Arc::clone(&schema),
224 builder_config,
225 tokenizers: parking_lot::RwLock::new(tokenizers),
226 memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
227 segment_manager: Arc::clone(&segment_manager),
228 built_segments: parking_lot::Mutex::new(Vec::new()),
229 flush_count: AtomicUsize::new(0),
230 flush_mutex: parking_lot::Mutex::new(()),
231 flush_cvar: parking_lot::Condvar::new(),
232 resume_receiver: parking_lot::Mutex::new(None),
233 resume_epoch: AtomicUsize::new(0),
234 resume_cvar: parking_lot::Condvar::new(),
235 shutdown: AtomicBool::new(false),
236 num_workers,
237 });
238 let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
239
240 Self {
241 directory,
242 schema,
243 config,
244 doc_sender,
245 workers,
246 worker_state,
247 segment_manager,
248 flushed_segments: Vec::new(),
249 primary_key_index: None,
250 }
251 }
252
253 fn spawn_workers(
254 worker_state: &Arc<WorkerState<D>>,
255 num_workers: usize,
256 ) -> (
257 async_channel::Sender<Document>,
258 Vec<std::thread::JoinHandle<()>>,
259 ) {
260 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
261 let handle = tokio::runtime::Handle::current();
262 let mut workers = Vec::with_capacity(num_workers);
263 for i in 0..num_workers {
264 let state = Arc::clone(worker_state);
265 let rx = receiver.clone();
266 let rt = handle.clone();
267 workers.push(
268 std::thread::Builder::new()
269 .name(format!("index-worker-{}", i))
270 .spawn(move || Self::worker_loop(state, rx, rt))
271 .expect("failed to spawn index worker thread"),
272 );
273 }
274 (sender, workers)
275 }
276
277 pub fn schema(&self) -> &Schema {
279 &self.schema
280 }
281
282 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
285 self.worker_state
286 .tokenizers
287 .write()
288 .insert(field, Box::new(tokenizer));
289 }
290
291 pub async fn init_primary_key_dedup(&mut self) -> Result<()> {
304 use super::primary_key::{PK_BLOOM_FILE, deserialize_pk_bloom};
305
306 let field = match self.schema.primary_field() {
307 Some(f) => f,
308 None => return Ok(()),
309 };
310
311 let snapshot = self.segment_manager.acquire_snapshot().await;
312 let current_seg_ids: Vec<String> = snapshot.segment_ids().to_vec();
313
314 let cached = match self
316 .directory
317 .open_read(std::path::Path::new(PK_BLOOM_FILE))
318 .await
319 {
320 Ok(handle) => {
321 let data = handle.read_bytes_range(0..handle.len()).await;
322 match data {
323 Ok(bytes) => deserialize_pk_bloom(bytes.as_slice()),
324 Err(_) => None,
325 }
326 }
327 Err(_) => None,
328 };
329
330 let open_futures: Vec<_> = current_seg_ids
333 .iter()
334 .map(|seg_id_str| {
335 let seg_id_str = seg_id_str.clone();
336 let dir = self.directory.as_ref();
337 let schema = Arc::clone(&self.schema);
338 let cache_blocks = self.config.term_cache_blocks;
339 async move {
340 let seg_id =
341 crate::segment::SegmentId::from_hex(&seg_id_str).ok_or_else(|| {
342 Error::Internal(format!("Invalid segment id: {}", seg_id_str))
343 })?;
344 let reader =
345 crate::segment::SegmentReader::open(dir, seg_id, schema, cache_blocks)
346 .await?;
347 Ok::<_, Error>(Arc::new(reader))
348 }
349 })
350 .collect();
351 let readers = futures::future::try_join_all(open_futures).await?;
352
353 if let Some((persisted_seg_ids, bloom)) = cached {
354 let new_readers: Vec<Arc<crate::segment::SegmentReader>> = current_seg_ids
356 .iter()
357 .zip(readers.iter())
358 .filter(|(id, _)| !persisted_seg_ids.contains(*id))
359 .map(|(_, r)| Arc::clone(r))
360 .collect();
361
362 let needs_persist = !new_readers.is_empty();
363 let pk_index = if new_readers.is_empty() {
364 super::primary_key::PrimaryKeyIndex::from_persisted(
366 field,
367 bloom,
368 readers,
369 &[],
370 snapshot,
371 )
372 } else {
373 tokio::task::spawn_blocking(move || {
375 super::primary_key::PrimaryKeyIndex::from_persisted(
376 field,
377 bloom,
378 readers,
379 &new_readers,
380 snapshot,
381 )
382 })
383 .await
384 .map_err(|e| Error::Internal(format!("spawn_blocking failed: {}", e)))?
385 };
386
387 if needs_persist {
388 self.persist_pk_bloom(&pk_index, ¤t_seg_ids).await;
389 }
390
391 self.primary_key_index = Some(pk_index);
392 } else {
393 let pk_index = tokio::task::spawn_blocking(move || {
395 super::primary_key::PrimaryKeyIndex::new(field, readers, snapshot)
396 })
397 .await
398 .map_err(|e| Error::Internal(format!("spawn_blocking failed: {}", e)))?;
399
400 self.persist_pk_bloom(&pk_index, ¤t_seg_ids).await;
401 self.primary_key_index = Some(pk_index);
402 }
403
404 Ok(())
405 }
406
407 async fn persist_pk_bloom(
410 &self,
411 pk_index: &super::primary_key::PrimaryKeyIndex,
412 segment_ids: &[String],
413 ) {
414 use super::primary_key::{PK_BLOOM_FILE, serialize_pk_bloom};
415
416 let bloom_bytes = pk_index.bloom_to_bytes();
417 let data = serialize_pk_bloom(segment_ids, &bloom_bytes);
418 if let Err(e) = self
419 .directory
420 .write(std::path::Path::new(PK_BLOOM_FILE), &data)
421 .await
422 {
423 log::warn!("[primary_key] failed to persist bloom cache: {}", e);
424 }
425 }
426
427 pub fn add_document(&self, doc: Document) -> Result<()> {
432 if let Some(ref pk_index) = self.primary_key_index {
433 pk_index.check_and_insert(&doc)?;
434 }
435 match self.doc_sender.try_send(doc) {
436 Ok(()) => Ok(()),
437 Err(async_channel::TrySendError::Full(doc)) => {
438 if let Some(ref pk_index) = self.primary_key_index {
440 pk_index.rollback_uncommitted_key(&doc);
441 }
442 Err(Error::QueueFull)
443 }
444 Err(async_channel::TrySendError::Closed(doc)) => {
445 if let Some(ref pk_index) = self.primary_key_index {
447 pk_index.rollback_uncommitted_key(&doc);
448 }
449 Err(Error::Internal("Document channel closed".into()))
450 }
451 }
452 }
453
454 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
459 let total = documents.len();
460 for (i, doc) in documents.into_iter().enumerate() {
461 match self.add_document(doc) {
462 Ok(()) => {}
463 Err(Error::QueueFull) => return Ok(i),
464 Err(e) => return Err(e),
465 }
466 }
467 Ok(total)
468 }
469
470 fn worker_loop(
483 state: Arc<WorkerState<D>>,
484 initial_receiver: async_channel::Receiver<Document>,
485 handle: tokio::runtime::Handle,
486 ) {
487 let mut receiver = initial_receiver;
488 let mut my_epoch = 0usize;
489
490 loop {
491 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
495 let mut builder: Option<SegmentBuilder> = None;
496
497 while let Ok(doc) = receiver.recv_blocking() {
498 if builder.is_none() {
500 match SegmentBuilder::new(
501 Arc::clone(&state.schema),
502 state.builder_config.clone(),
503 ) {
504 Ok(mut b) => {
505 for (field, tokenizer) in state.tokenizers.read().iter() {
506 b.set_tokenizer(*field, tokenizer.clone_box());
507 }
508 builder = Some(b);
509 }
510 Err(e) => {
511 log::error!("Failed to create segment builder: {:?}", e);
512 continue;
513 }
514 }
515 }
516
517 let b = builder.as_mut().unwrap();
518 if let Err(e) = b.add_document(doc) {
519 log::error!("Failed to index document: {:?}", e);
520 continue;
521 }
522
523 let builder_memory = b.estimated_memory_bytes();
524
525 if b.num_docs() & 0x3FFF == 0 {
526 log::debug!(
527 "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
528 b.num_docs(),
529 builder_memory as f64 / (1024.0 * 1024.0),
530 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
531 );
532 }
533
534 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
536
537 let effective_budget = state.memory_budget_per_worker * 4 / 5;
541
542 if builder_memory >= effective_budget && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH {
543 log::info!(
544 "[indexing] memory budget reached, building segment: \
545 docs={}, memory={:.2} MB, budget={:.2} MB",
546 b.num_docs(),
547 builder_memory as f64 / (1024.0 * 1024.0),
548 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
549 );
550 let full_builder = builder.take().unwrap();
551 Self::build_segment_inline(&state, full_builder, &handle);
552 }
553 }
554
555 if let Some(b) = builder.take()
557 && b.num_docs() > 0
558 {
559 Self::build_segment_inline(&state, b, &handle);
560 }
561 }));
562
563 if build_result.is_err() {
564 log::error!(
565 "[worker] panic during indexing cycle — documents in this cycle may be lost"
566 );
567 }
568
569 let prev = state.flush_count.fetch_add(1, Ordering::Release);
572 if prev + 1 == state.num_workers {
573 let _lock = state.flush_mutex.lock();
575 state.flush_cvar.notify_one();
576 }
577
578 {
582 let mut lock = state.resume_receiver.lock();
583 loop {
584 if state.shutdown.load(Ordering::Acquire) {
585 return;
586 }
587 let current_epoch = state.resume_epoch.load(Ordering::Acquire);
588 if current_epoch > my_epoch
589 && let Some(rx) = lock.as_ref()
590 {
591 receiver = rx.clone();
592 my_epoch = current_epoch;
593 break;
594 }
595 state.resume_cvar.wait(&mut lock);
596 }
597 }
598 }
599 }
600
601 fn build_segment_inline(
605 state: &WorkerState<D>,
606 builder: SegmentBuilder,
607 handle: &tokio::runtime::Handle,
608 ) {
609 let segment_id = SegmentId::new();
610 let segment_hex = segment_id.to_hex();
611 let trained = state.segment_manager.trained();
612 let doc_count = builder.num_docs();
613 let build_start = std::time::Instant::now();
614
615 log::info!(
616 "[segment_build] segment_id={} doc_count={} ann={}",
617 segment_hex,
618 doc_count,
619 trained.is_some()
620 );
621
622 match handle.block_on(builder.build(
623 state.directory.as_ref(),
624 segment_id,
625 trained.as_deref(),
626 )) {
627 Ok(meta) if meta.num_docs > 0 => {
628 let duration_ms = build_start.elapsed().as_millis() as u64;
629 log::info!(
630 "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
631 segment_hex,
632 meta.num_docs,
633 duration_ms,
634 );
635 state
636 .built_segments
637 .lock()
638 .push((segment_hex, meta.num_docs));
639 }
640 Ok(_) => {}
641 Err(e) => {
642 log::error!(
643 "[segment_build_failed] segment_id={} error={:?}",
644 segment_hex,
645 e
646 );
647 }
648 }
649 }
650
651 pub async fn maybe_merge(&self) {
657 self.segment_manager.maybe_merge().await;
658 }
659
660 pub async fn abort_merges(&self) {
662 self.segment_manager.abort_merges().await;
663 }
664
665 pub async fn wait_for_merging_thread(&self) {
667 self.segment_manager.wait_for_merging_thread().await;
668 }
669
670 pub async fn wait_for_all_merges(&self) {
672 self.segment_manager.wait_for_all_merges().await;
673 }
674
675 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
677 self.segment_manager.tracker()
678 }
679
680 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
682 self.segment_manager.acquire_snapshot().await
683 }
684
685 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
687 self.segment_manager.cleanup_orphan_segments().await
688 }
689
690 pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
701 self.doc_sender.close();
703
704 self.worker_state.resume_cvar.notify_all();
708
709 let state = Arc::clone(&self.worker_state);
712 let all_flushed = tokio::task::spawn_blocking(move || {
713 let mut lock = state.flush_mutex.lock();
714 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(300);
715 while state.flush_count.load(Ordering::Acquire) < state.num_workers {
716 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
717 if remaining.is_zero() {
718 log::error!(
719 "[prepare_commit] timed out waiting for workers: {}/{} flushed",
720 state.flush_count.load(Ordering::Acquire),
721 state.num_workers
722 );
723 return false;
724 }
725 state.flush_cvar.wait_for(&mut lock, remaining);
726 }
727 true
728 })
729 .await
730 .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
731
732 if !all_flushed {
733 self.resume_workers();
735 return Err(Error::Internal(format!(
736 "prepare_commit timed out: {}/{} workers flushed",
737 self.worker_state.flush_count.load(Ordering::Acquire),
738 self.worker_state.num_workers
739 )));
740 }
741
742 let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
744 self.flushed_segments.extend(built);
745
746 Ok(PreparedCommit {
747 writer: self,
748 is_resolved: false,
749 })
750 }
751
752 pub async fn commit(&mut self) -> Result<bool> {
757 self.prepare_commit().await?.commit().await
758 }
759
760 pub async fn force_merge(&mut self) -> Result<()> {
762 self.prepare_commit().await?.commit().await?;
763 self.segment_manager.force_merge().await
764 }
765
766 pub async fn reorder(&mut self) -> Result<()> {
771 self.prepare_commit().await?.commit().await?;
772 self.segment_manager.reorder_segments().await
773 }
774
775 pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
777 &self.segment_manager
778 }
779
780 fn resume_workers(&mut self) {
785 if tokio::runtime::Handle::try_current().is_err() {
786 self.worker_state.shutdown.store(true, Ordering::Release);
789 self.worker_state.resume_cvar.notify_all();
790 return;
791 }
792
793 self.worker_state.flush_count.store(0, Ordering::Release);
795
796 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
798 self.doc_sender = sender;
799
800 {
802 let mut lock = self.worker_state.resume_receiver.lock();
803 *lock = Some(receiver);
804 }
805 self.worker_state
806 .resume_epoch
807 .fetch_add(1, Ordering::Release);
808 self.worker_state.resume_cvar.notify_all();
809 }
810
811 }
813
814impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
815 fn drop(&mut self) {
816 self.worker_state.shutdown.store(true, Ordering::Release);
818 self.doc_sender.close();
820 self.worker_state.resume_cvar.notify_all();
822 for w in std::mem::take(&mut self.workers) {
824 let _ = w.join();
825 }
826 }
827}
828
829pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
836 writer: &'a mut IndexWriter<D>,
837 is_resolved: bool,
838}
839
840impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
841 pub async fn commit(mut self) -> Result<bool> {
845 self.is_resolved = true;
846 let segments = std::mem::take(&mut self.writer.flushed_segments);
847
848 if segments.is_empty() {
850 log::debug!("[commit] no segments to commit, skipping");
851 self.writer.resume_workers();
852 return Ok(false);
853 }
854
855 self.writer.segment_manager.commit(segments).await?;
856
857 if let Some(ref mut pk_index) = self.writer.primary_key_index {
859 let snapshot = self.writer.segment_manager.acquire_snapshot().await;
860 let open_futures: Vec<_> = snapshot
861 .segment_ids()
862 .iter()
863 .filter_map(|seg_id_str| {
864 let seg_id = crate::segment::SegmentId::from_hex(seg_id_str)?;
865 let dir = self.writer.directory.as_ref();
866 let schema = Arc::clone(&self.writer.schema);
867 let cache_blocks = self.writer.config.term_cache_blocks;
868 Some(async move {
869 crate::segment::SegmentReader::open(dir, seg_id, schema, cache_blocks)
870 .await
871 .map(Arc::new)
872 })
873 })
874 .collect();
875 let readers = futures::future::try_join_all(open_futures).await?;
876 let seg_ids: Vec<String> = snapshot.segment_ids().to_vec();
877 pk_index.refresh(readers, snapshot);
878
879 let bloom_bytes = pk_index.bloom_to_bytes();
881 let data = super::primary_key::serialize_pk_bloom(&seg_ids, &bloom_bytes);
882 if let Err(e) = self
883 .writer
884 .directory
885 .write(
886 std::path::Path::new(super::primary_key::PK_BLOOM_FILE),
887 &data,
888 )
889 .await
890 {
891 log::warn!("[primary_key] failed to persist bloom cache: {}", e);
892 }
893 }
894
895 self.writer.segment_manager.maybe_merge().await;
896 self.writer.resume_workers();
897 Ok(true)
898 }
899
900 pub fn abort(mut self) {
903 self.is_resolved = true;
904 self.writer.flushed_segments.clear();
905 if let Some(ref mut pk_index) = self.writer.primary_key_index {
906 pk_index.clear_uncommitted();
907 }
908 self.writer.resume_workers();
909 }
910}
911
912impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
913 fn drop(&mut self) {
914 if !self.is_resolved {
915 log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
916 self.writer.flushed_segments.clear();
917 if let Some(ref mut pk_index) = self.writer.primary_key_index {
918 pk_index.clear_uncommitted();
919 }
920 self.writer.resume_workers();
921 }
922 }
923}