1use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
60 pub(super) directory: Arc<D>,
61 pub(super) schema: Arc<Schema>,
62 pub(super) config: IndexConfig,
63 doc_sender: async_channel::Sender<Document>,
66 workers: Vec<std::thread::JoinHandle<()>>,
68 worker_state: Arc<WorkerState<D>>,
70 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72 flushed_segments: Vec<(String, u32)>,
74 primary_key_index: Option<super::primary_key::PrimaryKeyIndex>,
76}
77
78struct WorkerState<D: DirectoryWriter + 'static> {
80 directory: Arc<D>,
81 schema: Arc<Schema>,
82 builder_config: SegmentBuilderConfig,
83 tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
84 memory_budget_per_worker: usize,
86 segment_manager: Arc<crate::merge::SegmentManager<D>>,
88 built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
90
91 flush_count: AtomicUsize,
98 flush_mutex: parking_lot::Mutex<()>,
100 flush_cvar: parking_lot::Condvar,
101 resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
103 resume_epoch: AtomicUsize,
106 resume_cvar: parking_lot::Condvar,
108 shutdown: AtomicBool,
110 num_workers: usize,
112}
113
114impl<D: DirectoryWriter + 'static> IndexWriter<D> {
115 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
117 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
118 }
119
120 pub async fn create_with_config(
122 directory: D,
123 schema: Schema,
124 config: IndexConfig,
125 builder_config: SegmentBuilderConfig,
126 ) -> Result<Self> {
127 let directory = Arc::new(directory);
128 let schema = Arc::new(schema);
129 let metadata = super::IndexMetadata::new((*schema).clone());
130
131 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
132 Arc::clone(&directory),
133 Arc::clone(&schema),
134 metadata,
135 config.merge_policy.clone_box(),
136 config.term_cache_blocks,
137 config.max_concurrent_merges,
138 ));
139 segment_manager.update_metadata(|_| {}).await?;
140
141 Ok(Self::new_with_parts(
142 directory,
143 schema,
144 config,
145 builder_config,
146 segment_manager,
147 ))
148 }
149
150 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
152 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
153 }
154
155 pub async fn open_with_config(
157 directory: D,
158 config: IndexConfig,
159 builder_config: SegmentBuilderConfig,
160 ) -> Result<Self> {
161 let directory = Arc::new(directory);
162 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
163 let schema = Arc::new(metadata.schema.clone());
164
165 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
166 Arc::clone(&directory),
167 Arc::clone(&schema),
168 metadata,
169 config.merge_policy.clone_box(),
170 config.term_cache_blocks,
171 config.max_concurrent_merges,
172 ));
173 segment_manager.load_and_publish_trained().await;
174
175 Ok(Self::new_with_parts(
176 directory,
177 schema,
178 config,
179 builder_config,
180 segment_manager,
181 ))
182 }
183
184 pub fn from_index(index: &super::Index<D>) -> Self {
187 Self::new_with_parts(
188 Arc::clone(&index.directory),
189 Arc::clone(&index.schema),
190 index.config.clone(),
191 SegmentBuilderConfig::default(),
192 Arc::clone(&index.segment_manager),
193 )
194 }
195
196 fn new_with_parts(
202 directory: Arc<D>,
203 schema: Arc<Schema>,
204 config: IndexConfig,
205 builder_config: SegmentBuilderConfig,
206 segment_manager: Arc<crate::merge::SegmentManager<D>>,
207 ) -> Self {
208 let registry = crate::tokenizer::TokenizerRegistry::new();
210 let mut tokenizers = FxHashMap::default();
211 for (field, entry) in schema.fields() {
212 if matches!(entry.field_type, crate::dsl::FieldType::Text)
213 && let Some(ref tok_name) = entry.tokenizer
214 && let Some(tok) = registry.get(tok_name)
215 {
216 tokenizers.insert(field, tok);
217 }
218 }
219
220 let num_workers = config.num_indexing_threads.max(1);
221 let worker_state = Arc::new(WorkerState {
222 directory: Arc::clone(&directory),
223 schema: Arc::clone(&schema),
224 builder_config,
225 tokenizers: parking_lot::RwLock::new(tokenizers),
226 memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
227 segment_manager: Arc::clone(&segment_manager),
228 built_segments: parking_lot::Mutex::new(Vec::new()),
229 flush_count: AtomicUsize::new(0),
230 flush_mutex: parking_lot::Mutex::new(()),
231 flush_cvar: parking_lot::Condvar::new(),
232 resume_receiver: parking_lot::Mutex::new(None),
233 resume_epoch: AtomicUsize::new(0),
234 resume_cvar: parking_lot::Condvar::new(),
235 shutdown: AtomicBool::new(false),
236 num_workers,
237 });
238 let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
239
240 Self {
241 directory,
242 schema,
243 config,
244 doc_sender,
245 workers,
246 worker_state,
247 segment_manager,
248 flushed_segments: Vec::new(),
249 primary_key_index: None,
250 }
251 }
252
253 fn spawn_workers(
254 worker_state: &Arc<WorkerState<D>>,
255 num_workers: usize,
256 ) -> (
257 async_channel::Sender<Document>,
258 Vec<std::thread::JoinHandle<()>>,
259 ) {
260 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
261 let handle = tokio::runtime::Handle::current();
262 let mut workers = Vec::with_capacity(num_workers);
263 for i in 0..num_workers {
264 let state = Arc::clone(worker_state);
265 let rx = receiver.clone();
266 let rt = handle.clone();
267 workers.push(
268 std::thread::Builder::new()
269 .name(format!("index-worker-{}", i))
270 .spawn(move || Self::worker_loop(state, rx, rt))
271 .expect("failed to spawn index worker thread"),
272 );
273 }
274 (sender, workers)
275 }
276
277 pub fn schema(&self) -> &Schema {
279 &self.schema
280 }
281
282 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
285 self.worker_state
286 .tokenizers
287 .write()
288 .insert(field, Box::new(tokenizer));
289 }
290
291 pub async fn init_primary_key_dedup(&mut self) -> Result<()> {
297 let field = match self.schema.primary_field() {
298 Some(f) => f,
299 None => return Ok(()),
300 };
301
302 let snapshot = self.segment_manager.acquire_snapshot().await;
303 let open_futures: Vec<_> = snapshot
305 .segment_ids()
306 .iter()
307 .map(|seg_id_str| {
308 let seg_id_str = seg_id_str.clone();
309 let dir = self.directory.as_ref();
310 let schema = Arc::clone(&self.schema);
311 let cache_blocks = self.config.term_cache_blocks;
312 async move {
313 let seg_id =
314 crate::segment::SegmentId::from_hex(&seg_id_str).ok_or_else(|| {
315 Error::Internal(format!("Invalid segment id: {}", seg_id_str))
316 })?;
317 let reader =
318 crate::segment::SegmentReader::open(dir, seg_id, schema, cache_blocks)
319 .await?;
320 Ok::<_, Error>(Arc::new(reader))
321 }
322 })
323 .collect();
324 let readers = futures::future::try_join_all(open_futures).await?;
325
326 self.primary_key_index = Some(super::primary_key::PrimaryKeyIndex::new(
327 field, readers, snapshot,
328 ));
329 Ok(())
330 }
331
332 pub fn add_document(&self, doc: Document) -> Result<()> {
337 if let Some(ref pk_index) = self.primary_key_index {
338 pk_index.check_and_insert(&doc)?;
339 }
340 match self.doc_sender.try_send(doc) {
341 Ok(()) => Ok(()),
342 Err(async_channel::TrySendError::Full(doc)) => {
343 if let Some(ref pk_index) = self.primary_key_index {
345 pk_index.rollback_uncommitted_key(&doc);
346 }
347 Err(Error::QueueFull)
348 }
349 Err(async_channel::TrySendError::Closed(doc)) => {
350 if let Some(ref pk_index) = self.primary_key_index {
352 pk_index.rollback_uncommitted_key(&doc);
353 }
354 Err(Error::Internal("Document channel closed".into()))
355 }
356 }
357 }
358
359 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
364 let total = documents.len();
365 for (i, doc) in documents.into_iter().enumerate() {
366 match self.add_document(doc) {
367 Ok(()) => {}
368 Err(Error::QueueFull) => return Ok(i),
369 Err(e) => return Err(e),
370 }
371 }
372 Ok(total)
373 }
374
375 fn worker_loop(
388 state: Arc<WorkerState<D>>,
389 initial_receiver: async_channel::Receiver<Document>,
390 handle: tokio::runtime::Handle,
391 ) {
392 let mut receiver = initial_receiver;
393 let mut my_epoch = 0usize;
394
395 loop {
396 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
400 let mut builder: Option<SegmentBuilder> = None;
401
402 while let Ok(doc) = receiver.recv_blocking() {
403 if builder.is_none() {
405 match SegmentBuilder::new(
406 Arc::clone(&state.schema),
407 state.builder_config.clone(),
408 ) {
409 Ok(mut b) => {
410 for (field, tokenizer) in state.tokenizers.read().iter() {
411 b.set_tokenizer(*field, tokenizer.clone_box());
412 }
413 builder = Some(b);
414 }
415 Err(e) => {
416 log::error!("Failed to create segment builder: {:?}", e);
417 continue;
418 }
419 }
420 }
421
422 let b = builder.as_mut().unwrap();
423 if let Err(e) = b.add_document(doc) {
424 log::error!("Failed to index document: {:?}", e);
425 continue;
426 }
427
428 let builder_memory = b.estimated_memory_bytes();
429
430 if b.num_docs() & 0x3FFF == 0 {
431 log::debug!(
432 "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
433 b.num_docs(),
434 builder_memory as f64 / (1024.0 * 1024.0),
435 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
436 );
437 }
438
439 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
441
442 let effective_budget = state.memory_budget_per_worker * 4 / 5;
446
447 if builder_memory >= effective_budget && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH {
448 log::info!(
449 "[indexing] memory budget reached, building segment: \
450 docs={}, memory={:.2} MB, budget={:.2} MB",
451 b.num_docs(),
452 builder_memory as f64 / (1024.0 * 1024.0),
453 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
454 );
455 let full_builder = builder.take().unwrap();
456 Self::build_segment_inline(&state, full_builder, &handle);
457 }
458 }
459
460 if let Some(b) = builder.take()
462 && b.num_docs() > 0
463 {
464 Self::build_segment_inline(&state, b, &handle);
465 }
466 }));
467
468 if build_result.is_err() {
469 log::error!(
470 "[worker] panic during indexing cycle — documents in this cycle may be lost"
471 );
472 }
473
474 let prev = state.flush_count.fetch_add(1, Ordering::Release);
477 if prev + 1 == state.num_workers {
478 let _lock = state.flush_mutex.lock();
480 state.flush_cvar.notify_one();
481 }
482
483 {
487 let mut lock = state.resume_receiver.lock();
488 loop {
489 if state.shutdown.load(Ordering::Acquire) {
490 return;
491 }
492 let current_epoch = state.resume_epoch.load(Ordering::Acquire);
493 if current_epoch > my_epoch
494 && let Some(rx) = lock.as_ref()
495 {
496 receiver = rx.clone();
497 my_epoch = current_epoch;
498 break;
499 }
500 state.resume_cvar.wait(&mut lock);
501 }
502 }
503 }
504 }
505
506 fn build_segment_inline(
510 state: &WorkerState<D>,
511 builder: SegmentBuilder,
512 handle: &tokio::runtime::Handle,
513 ) {
514 let segment_id = SegmentId::new();
515 let segment_hex = segment_id.to_hex();
516 let trained = state.segment_manager.trained();
517 let doc_count = builder.num_docs();
518 let build_start = std::time::Instant::now();
519
520 log::info!(
521 "[segment_build] segment_id={} doc_count={} ann={}",
522 segment_hex,
523 doc_count,
524 trained.is_some()
525 );
526
527 match handle.block_on(builder.build(
528 state.directory.as_ref(),
529 segment_id,
530 trained.as_deref(),
531 )) {
532 Ok(meta) if meta.num_docs > 0 => {
533 let duration_ms = build_start.elapsed().as_millis() as u64;
534 log::info!(
535 "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
536 segment_hex,
537 meta.num_docs,
538 duration_ms,
539 );
540 state
541 .built_segments
542 .lock()
543 .push((segment_hex, meta.num_docs));
544 }
545 Ok(_) => {}
546 Err(e) => {
547 log::error!(
548 "[segment_build_failed] segment_id={} error={:?}",
549 segment_hex,
550 e
551 );
552 }
553 }
554 }
555
556 pub async fn maybe_merge(&self) {
562 self.segment_manager.maybe_merge().await;
563 }
564
565 pub async fn abort_merges(&self) {
567 self.segment_manager.abort_merges().await;
568 }
569
570 pub async fn wait_for_merging_thread(&self) {
572 self.segment_manager.wait_for_merging_thread().await;
573 }
574
575 pub async fn wait_for_all_merges(&self) {
577 self.segment_manager.wait_for_all_merges().await;
578 }
579
580 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
582 self.segment_manager.tracker()
583 }
584
585 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
587 self.segment_manager.acquire_snapshot().await
588 }
589
590 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
592 self.segment_manager.cleanup_orphan_segments().await
593 }
594
595 pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
606 self.doc_sender.close();
608
609 self.worker_state.resume_cvar.notify_all();
613
614 let state = Arc::clone(&self.worker_state);
617 let all_flushed = tokio::task::spawn_blocking(move || {
618 let mut lock = state.flush_mutex.lock();
619 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(300);
620 while state.flush_count.load(Ordering::Acquire) < state.num_workers {
621 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
622 if remaining.is_zero() {
623 log::error!(
624 "[prepare_commit] timed out waiting for workers: {}/{} flushed",
625 state.flush_count.load(Ordering::Acquire),
626 state.num_workers
627 );
628 return false;
629 }
630 state.flush_cvar.wait_for(&mut lock, remaining);
631 }
632 true
633 })
634 .await
635 .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
636
637 if !all_flushed {
638 self.resume_workers();
640 return Err(Error::Internal(format!(
641 "prepare_commit timed out: {}/{} workers flushed",
642 self.worker_state.flush_count.load(Ordering::Acquire),
643 self.worker_state.num_workers
644 )));
645 }
646
647 let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
649 self.flushed_segments.extend(built);
650
651 Ok(PreparedCommit {
652 writer: self,
653 is_resolved: false,
654 })
655 }
656
657 pub async fn commit(&mut self) -> Result<bool> {
662 self.prepare_commit().await?.commit().await
663 }
664
665 pub async fn force_merge(&mut self) -> Result<()> {
667 self.prepare_commit().await?.commit().await?;
668 self.segment_manager.force_merge().await
669 }
670
671 pub async fn reorder(&mut self) -> Result<()> {
676 self.prepare_commit().await?.commit().await?;
677 self.segment_manager.reorder_segments().await
678 }
679
680 fn resume_workers(&mut self) {
685 if tokio::runtime::Handle::try_current().is_err() {
686 self.worker_state.shutdown.store(true, Ordering::Release);
689 self.worker_state.resume_cvar.notify_all();
690 return;
691 }
692
693 self.worker_state.flush_count.store(0, Ordering::Release);
695
696 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
698 self.doc_sender = sender;
699
700 {
702 let mut lock = self.worker_state.resume_receiver.lock();
703 *lock = Some(receiver);
704 }
705 self.worker_state
706 .resume_epoch
707 .fetch_add(1, Ordering::Release);
708 self.worker_state.resume_cvar.notify_all();
709 }
710
711 }
713
714impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
715 fn drop(&mut self) {
716 self.worker_state.shutdown.store(true, Ordering::Release);
718 self.doc_sender.close();
720 self.worker_state.resume_cvar.notify_all();
722 for w in std::mem::take(&mut self.workers) {
724 let _ = w.join();
725 }
726 }
727}
728
729pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
736 writer: &'a mut IndexWriter<D>,
737 is_resolved: bool,
738}
739
740impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
741 pub async fn commit(mut self) -> Result<bool> {
745 self.is_resolved = true;
746 let segments = std::mem::take(&mut self.writer.flushed_segments);
747
748 if segments.is_empty() {
750 log::debug!("[commit] no segments to commit, skipping");
751 self.writer.resume_workers();
752 return Ok(false);
753 }
754
755 self.writer.segment_manager.commit(segments).await?;
756
757 if let Some(ref mut pk_index) = self.writer.primary_key_index {
759 let snapshot = self.writer.segment_manager.acquire_snapshot().await;
760 let open_futures: Vec<_> = snapshot
761 .segment_ids()
762 .iter()
763 .filter_map(|seg_id_str| {
764 let seg_id = crate::segment::SegmentId::from_hex(seg_id_str)?;
765 let dir = self.writer.directory.as_ref();
766 let schema = Arc::clone(&self.writer.schema);
767 let cache_blocks = self.writer.config.term_cache_blocks;
768 Some(async move {
769 crate::segment::SegmentReader::open(dir, seg_id, schema, cache_blocks)
770 .await
771 .map(Arc::new)
772 })
773 })
774 .collect();
775 let readers = futures::future::try_join_all(open_futures).await?;
776 pk_index.refresh(readers, snapshot);
777 }
778
779 self.writer.segment_manager.maybe_merge().await;
780 self.writer.resume_workers();
781 Ok(true)
782 }
783
784 pub fn abort(mut self) {
787 self.is_resolved = true;
788 self.writer.flushed_segments.clear();
789 if let Some(ref mut pk_index) = self.writer.primary_key_index {
790 pk_index.clear_uncommitted();
791 }
792 self.writer.resume_workers();
793 }
794}
795
796impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
797 fn drop(&mut self) {
798 if !self.is_resolved {
799 log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
800 self.writer.flushed_segments.clear();
801 if let Some(ref mut pk_index) = self.writer.primary_key_index {
802 pk_index.clear_uncommitted();
803 }
804 self.writer.resume_workers();
805 }
806 }
807}