1use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48pub struct IndexWriter<D: DirectoryWriter + 'static> {
60 pub(super) directory: Arc<D>,
61 pub(super) schema: Arc<Schema>,
62 pub(super) config: IndexConfig,
63 doc_sender: async_channel::Sender<Document>,
66 workers: Vec<std::thread::JoinHandle<()>>,
68 worker_state: Arc<WorkerState<D>>,
70 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72 flushed_segments: Vec<(String, u32)>,
74 primary_key_index: Option<super::primary_key::PrimaryKeyIndex>,
76}
77
78struct WorkerState<D: DirectoryWriter + 'static> {
80 directory: Arc<D>,
81 schema: Arc<Schema>,
82 builder_config: SegmentBuilderConfig,
83 tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
84 memory_budget_per_worker: usize,
86 segment_manager: Arc<crate::merge::SegmentManager<D>>,
88 built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
90
91 flush_count: AtomicUsize,
98 flush_mutex: parking_lot::Mutex<()>,
100 flush_cvar: parking_lot::Condvar,
101 resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
103 resume_epoch: AtomicUsize,
106 resume_cvar: parking_lot::Condvar,
108 shutdown: AtomicBool,
110 num_workers: usize,
112}
113
114impl<D: DirectoryWriter + 'static> IndexWriter<D> {
115 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
117 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
118 }
119
120 pub async fn create_with_config(
122 directory: D,
123 schema: Schema,
124 config: IndexConfig,
125 builder_config: SegmentBuilderConfig,
126 ) -> Result<Self> {
127 let directory = Arc::new(directory);
128 let schema = Arc::new(schema);
129 let metadata = super::IndexMetadata::new((*schema).clone());
130
131 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
132 Arc::clone(&directory),
133 Arc::clone(&schema),
134 metadata,
135 config.merge_policy.clone_box(),
136 config.term_cache_blocks,
137 config.max_concurrent_merges,
138 ));
139 segment_manager.update_metadata(|_| {}).await?;
140
141 Ok(Self::new_with_parts(
142 directory,
143 schema,
144 config,
145 builder_config,
146 segment_manager,
147 ))
148 }
149
150 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
152 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
153 }
154
155 pub async fn open_with_config(
157 directory: D,
158 config: IndexConfig,
159 builder_config: SegmentBuilderConfig,
160 ) -> Result<Self> {
161 let directory = Arc::new(directory);
162 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
163 let schema = Arc::new(metadata.schema.clone());
164
165 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
166 Arc::clone(&directory),
167 Arc::clone(&schema),
168 metadata,
169 config.merge_policy.clone_box(),
170 config.term_cache_blocks,
171 config.max_concurrent_merges,
172 ));
173 segment_manager.load_and_publish_trained().await;
174
175 Ok(Self::new_with_parts(
176 directory,
177 schema,
178 config,
179 builder_config,
180 segment_manager,
181 ))
182 }
183
184 pub fn from_index(index: &super::Index<D>) -> Self {
187 Self::new_with_parts(
188 Arc::clone(&index.directory),
189 Arc::clone(&index.schema),
190 index.config.clone(),
191 SegmentBuilderConfig::default(),
192 Arc::clone(&index.segment_manager),
193 )
194 }
195
196 fn new_with_parts(
202 directory: Arc<D>,
203 schema: Arc<Schema>,
204 config: IndexConfig,
205 builder_config: SegmentBuilderConfig,
206 segment_manager: Arc<crate::merge::SegmentManager<D>>,
207 ) -> Self {
208 let registry = crate::tokenizer::TokenizerRegistry::new();
210 let mut tokenizers = FxHashMap::default();
211 for (field, entry) in schema.fields() {
212 if matches!(entry.field_type, crate::dsl::FieldType::Text)
213 && let Some(ref tok_name) = entry.tokenizer
214 && let Some(tok) = registry.get(tok_name)
215 {
216 tokenizers.insert(field, tok);
217 }
218 }
219
220 let num_workers = config.num_indexing_threads.max(1);
221 let worker_state = Arc::new(WorkerState {
222 directory: Arc::clone(&directory),
223 schema: Arc::clone(&schema),
224 builder_config,
225 tokenizers: parking_lot::RwLock::new(tokenizers),
226 memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
227 segment_manager: Arc::clone(&segment_manager),
228 built_segments: parking_lot::Mutex::new(Vec::new()),
229 flush_count: AtomicUsize::new(0),
230 flush_mutex: parking_lot::Mutex::new(()),
231 flush_cvar: parking_lot::Condvar::new(),
232 resume_receiver: parking_lot::Mutex::new(None),
233 resume_epoch: AtomicUsize::new(0),
234 resume_cvar: parking_lot::Condvar::new(),
235 shutdown: AtomicBool::new(false),
236 num_workers,
237 });
238 let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
239
240 Self {
241 directory,
242 schema,
243 config,
244 doc_sender,
245 workers,
246 worker_state,
247 segment_manager,
248 flushed_segments: Vec::new(),
249 primary_key_index: None,
250 }
251 }
252
253 fn spawn_workers(
254 worker_state: &Arc<WorkerState<D>>,
255 num_workers: usize,
256 ) -> (
257 async_channel::Sender<Document>,
258 Vec<std::thread::JoinHandle<()>>,
259 ) {
260 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
261 let handle = tokio::runtime::Handle::current();
262 let mut workers = Vec::with_capacity(num_workers);
263 for i in 0..num_workers {
264 let state = Arc::clone(worker_state);
265 let rx = receiver.clone();
266 let rt = handle.clone();
267 workers.push(
268 std::thread::Builder::new()
269 .name(format!("index-worker-{}", i))
270 .spawn(move || Self::worker_loop(state, rx, rt))
271 .expect("failed to spawn index worker thread"),
272 );
273 }
274 (sender, workers)
275 }
276
277 pub fn schema(&self) -> &Schema {
279 &self.schema
280 }
281
282 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
285 self.worker_state
286 .tokenizers
287 .write()
288 .insert(field, Box::new(tokenizer));
289 }
290
291 pub async fn init_primary_key_dedup(&mut self) -> Result<()> {
297 let field = match self.schema.primary_field() {
298 Some(f) => f,
299 None => return Ok(()),
300 };
301
302 let snapshot = self.segment_manager.acquire_snapshot().await;
303 let open_futures: Vec<_> = snapshot
305 .segment_ids()
306 .iter()
307 .map(|seg_id_str| {
308 let seg_id_str = seg_id_str.clone();
309 let dir = self.directory.as_ref();
310 let schema = Arc::clone(&self.schema);
311 let cache_blocks = self.config.term_cache_blocks;
312 async move {
313 let seg_id =
314 crate::segment::SegmentId::from_hex(&seg_id_str).ok_or_else(|| {
315 Error::Internal(format!("Invalid segment id: {}", seg_id_str))
316 })?;
317 let reader =
318 crate::segment::SegmentReader::open(dir, seg_id, schema, cache_blocks)
319 .await?;
320 Ok::<_, Error>(Arc::new(reader))
321 }
322 })
323 .collect();
324 let readers = futures::future::try_join_all(open_futures).await?;
325
326 self.primary_key_index = Some(super::primary_key::PrimaryKeyIndex::new(
327 field, readers, snapshot,
328 ));
329 Ok(())
330 }
331
332 pub fn add_document(&self, doc: Document) -> Result<()> {
337 if let Some(ref pk_index) = self.primary_key_index {
338 pk_index.check_and_insert(&doc)?;
339 }
340 match self.doc_sender.try_send(doc) {
341 Ok(()) => Ok(()),
342 Err(async_channel::TrySendError::Full(doc)) => {
343 if let Some(ref pk_index) = self.primary_key_index {
345 pk_index.rollback_uncommitted_key(&doc);
346 }
347 Err(Error::QueueFull)
348 }
349 Err(async_channel::TrySendError::Closed(doc)) => {
350 if let Some(ref pk_index) = self.primary_key_index {
352 pk_index.rollback_uncommitted_key(&doc);
353 }
354 Err(Error::Internal("Document channel closed".into()))
355 }
356 }
357 }
358
359 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
364 let total = documents.len();
365 for (i, doc) in documents.into_iter().enumerate() {
366 match self.add_document(doc) {
367 Ok(()) => {}
368 Err(Error::QueueFull) => return Ok(i),
369 Err(e) => return Err(e),
370 }
371 }
372 Ok(total)
373 }
374
375 fn worker_loop(
388 state: Arc<WorkerState<D>>,
389 initial_receiver: async_channel::Receiver<Document>,
390 handle: tokio::runtime::Handle,
391 ) {
392 let mut receiver = initial_receiver;
393 let mut my_epoch = 0usize;
394
395 loop {
396 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
400 let mut builder: Option<SegmentBuilder> = None;
401
402 while let Ok(doc) = receiver.recv_blocking() {
403 if builder.is_none() {
405 match SegmentBuilder::new(
406 Arc::clone(&state.schema),
407 state.builder_config.clone(),
408 ) {
409 Ok(mut b) => {
410 for (field, tokenizer) in state.tokenizers.read().iter() {
411 b.set_tokenizer(*field, tokenizer.clone_box());
412 }
413 builder = Some(b);
414 }
415 Err(e) => {
416 log::error!("Failed to create segment builder: {:?}", e);
417 continue;
418 }
419 }
420 }
421
422 let b = builder.as_mut().unwrap();
423 if let Err(e) = b.add_document(doc) {
424 log::error!("Failed to index document: {:?}", e);
425 continue;
426 }
427
428 let builder_memory = b.estimated_memory_bytes();
429
430 if b.num_docs() & 0x3FFF == 0 {
431 log::debug!(
432 "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
433 b.num_docs(),
434 builder_memory as f64 / (1024.0 * 1024.0),
435 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
436 );
437 }
438
439 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
441
442 if builder_memory >= state.memory_budget_per_worker
443 && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH
444 {
445 log::info!(
446 "[indexing] memory budget reached, building segment: \
447 docs={}, memory={:.2} MB, budget={:.2} MB",
448 b.num_docs(),
449 builder_memory as f64 / (1024.0 * 1024.0),
450 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
451 );
452 let full_builder = builder.take().unwrap();
453 Self::build_segment_inline(&state, full_builder, &handle);
454 }
455 }
456
457 if let Some(b) = builder.take()
459 && b.num_docs() > 0
460 {
461 Self::build_segment_inline(&state, b, &handle);
462 }
463 }));
464
465 if build_result.is_err() {
466 log::error!(
467 "[worker] panic during indexing cycle — documents in this cycle may be lost"
468 );
469 }
470
471 let prev = state.flush_count.fetch_add(1, Ordering::Release);
474 if prev + 1 == state.num_workers {
475 let _lock = state.flush_mutex.lock();
477 state.flush_cvar.notify_one();
478 }
479
480 {
484 let mut lock = state.resume_receiver.lock();
485 loop {
486 if state.shutdown.load(Ordering::Acquire) {
487 return;
488 }
489 let current_epoch = state.resume_epoch.load(Ordering::Acquire);
490 if current_epoch > my_epoch
491 && let Some(rx) = lock.as_ref()
492 {
493 receiver = rx.clone();
494 my_epoch = current_epoch;
495 break;
496 }
497 state.resume_cvar.wait(&mut lock);
498 }
499 }
500 }
501 }
502
503 fn build_segment_inline(
507 state: &WorkerState<D>,
508 builder: SegmentBuilder,
509 handle: &tokio::runtime::Handle,
510 ) {
511 let segment_id = SegmentId::new();
512 let segment_hex = segment_id.to_hex();
513 let trained = state.segment_manager.trained();
514 let doc_count = builder.num_docs();
515 let build_start = std::time::Instant::now();
516
517 log::info!(
518 "[segment_build] segment_id={} doc_count={} ann={}",
519 segment_hex,
520 doc_count,
521 trained.is_some()
522 );
523
524 match handle.block_on(builder.build(
525 state.directory.as_ref(),
526 segment_id,
527 trained.as_deref(),
528 )) {
529 Ok(meta) if meta.num_docs > 0 => {
530 let duration_ms = build_start.elapsed().as_millis() as u64;
531 log::info!(
532 "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
533 segment_hex,
534 meta.num_docs,
535 duration_ms,
536 );
537 state
538 .built_segments
539 .lock()
540 .push((segment_hex, meta.num_docs));
541 }
542 Ok(_) => {}
543 Err(e) => {
544 log::error!(
545 "[segment_build_failed] segment_id={} error={:?}",
546 segment_hex,
547 e
548 );
549 }
550 }
551 }
552
553 pub async fn maybe_merge(&self) {
559 self.segment_manager.maybe_merge().await;
560 }
561
562 pub async fn wait_for_merging_thread(&self) {
564 self.segment_manager.wait_for_merging_thread().await;
565 }
566
567 pub async fn wait_for_all_merges(&self) {
569 self.segment_manager.wait_for_all_merges().await;
570 }
571
572 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
574 self.segment_manager.tracker()
575 }
576
577 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
579 self.segment_manager.acquire_snapshot().await
580 }
581
582 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
584 self.segment_manager.cleanup_orphan_segments().await
585 }
586
587 pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
598 self.doc_sender.close();
600
601 self.worker_state.resume_cvar.notify_all();
605
606 let state = Arc::clone(&self.worker_state);
609 let all_flushed = tokio::task::spawn_blocking(move || {
610 let mut lock = state.flush_mutex.lock();
611 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(300);
612 while state.flush_count.load(Ordering::Acquire) < state.num_workers {
613 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
614 if remaining.is_zero() {
615 log::error!(
616 "[prepare_commit] timed out waiting for workers: {}/{} flushed",
617 state.flush_count.load(Ordering::Acquire),
618 state.num_workers
619 );
620 return false;
621 }
622 state.flush_cvar.wait_for(&mut lock, remaining);
623 }
624 true
625 })
626 .await
627 .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
628
629 if !all_flushed {
630 self.resume_workers();
632 return Err(Error::Internal(format!(
633 "prepare_commit timed out: {}/{} workers flushed",
634 self.worker_state.flush_count.load(Ordering::Acquire),
635 self.worker_state.num_workers
636 )));
637 }
638
639 let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
641 self.flushed_segments.extend(built);
642
643 Ok(PreparedCommit {
644 writer: self,
645 is_resolved: false,
646 })
647 }
648
649 pub async fn commit(&mut self) -> Result<()> {
654 self.prepare_commit().await?.commit().await
655 }
656
657 pub async fn force_merge(&mut self) -> Result<()> {
659 self.prepare_commit().await?.commit().await?;
660 self.segment_manager.force_merge().await
661 }
662
663 fn resume_workers(&mut self) {
668 if tokio::runtime::Handle::try_current().is_err() {
669 self.worker_state.shutdown.store(true, Ordering::Release);
672 self.worker_state.resume_cvar.notify_all();
673 return;
674 }
675
676 self.worker_state.flush_count.store(0, Ordering::Release);
678
679 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
681 self.doc_sender = sender;
682
683 {
685 let mut lock = self.worker_state.resume_receiver.lock();
686 *lock = Some(receiver);
687 }
688 self.worker_state
689 .resume_epoch
690 .fetch_add(1, Ordering::Release);
691 self.worker_state.resume_cvar.notify_all();
692 }
693
694 }
696
697impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
698 fn drop(&mut self) {
699 self.worker_state.shutdown.store(true, Ordering::Release);
701 self.doc_sender.close();
703 self.worker_state.resume_cvar.notify_all();
705 for w in std::mem::take(&mut self.workers) {
707 let _ = w.join();
708 }
709 }
710}
711
712pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
719 writer: &'a mut IndexWriter<D>,
720 is_resolved: bool,
721}
722
723impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
724 pub async fn commit(mut self) -> Result<()> {
726 self.is_resolved = true;
727 let segments = std::mem::take(&mut self.writer.flushed_segments);
728 self.writer.segment_manager.commit(segments).await?;
729
730 if let Some(ref mut pk_index) = self.writer.primary_key_index {
732 let snapshot = self.writer.segment_manager.acquire_snapshot().await;
733 let open_futures: Vec<_> = snapshot
734 .segment_ids()
735 .iter()
736 .filter_map(|seg_id_str| {
737 let seg_id = crate::segment::SegmentId::from_hex(seg_id_str)?;
738 let dir = self.writer.directory.as_ref();
739 let schema = Arc::clone(&self.writer.schema);
740 let cache_blocks = self.writer.config.term_cache_blocks;
741 Some(async move {
742 crate::segment::SegmentReader::open(dir, seg_id, schema, cache_blocks)
743 .await
744 .map(Arc::new)
745 })
746 })
747 .collect();
748 let readers = futures::future::try_join_all(open_futures).await?;
749 pk_index.refresh(readers, snapshot);
750 }
751
752 self.writer.segment_manager.maybe_merge().await;
753 self.writer.resume_workers();
754 Ok(())
755 }
756
757 pub fn abort(mut self) {
760 self.is_resolved = true;
761 self.writer.flushed_segments.clear();
762 if let Some(ref mut pk_index) = self.writer.primary_key_index {
763 pk_index.clear_uncommitted();
764 }
765 self.writer.resume_workers();
766 }
767}
768
769impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
770 fn drop(&mut self) {
771 if !self.is_resolved {
772 log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
773 self.writer.flushed_segments.clear();
774 if let Some(ref mut pk_index) = self.writer.primary_key_index {
775 pk_index.clear_uncommitted();
776 }
777 self.writer.resume_workers();
778 }
779 }
780}