1use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18 SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24enum WorkerMessage {
26 Document(Document),
28 Flush(oneshot::Sender<()>),
30}
31
32pub struct IndexWriter<D: DirectoryWriter + 'static> {
49 pub(super) directory: Arc<D>,
50 pub(super) schema: Arc<Schema>,
51 pub(super) config: IndexConfig,
52 #[allow(dead_code)] builder_config: SegmentBuilderConfig,
54 tokenizers: FxHashMap<Field, BoxedTokenizer>,
55 worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57 next_worker: AtomicUsize,
59 #[allow(dead_code)]
61 workers: Vec<JoinHandle<()>>,
62 #[allow(dead_code)]
64 worker_state: Arc<WorkerState<D>>,
65 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
69 pending_builds: Arc<AtomicUsize>,
71 flushed_segments: AsyncMutex<Vec<(String, u32)>>,
73}
74
75struct WorkerState<D: DirectoryWriter + 'static> {
77 directory: Arc<D>,
78 schema: Arc<Schema>,
79 config: IndexConfig,
80 builder_config: SegmentBuilderConfig,
81 tokenizers: FxHashMap<Field, BoxedTokenizer>,
82 segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
83 pending_builds: Arc<AtomicUsize>,
84 build_semaphore: Arc<tokio::sync::Semaphore>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93 }
94
95 pub async fn create_with_config(
97 directory: D,
98 schema: Schema,
99 config: IndexConfig,
100 builder_config: SegmentBuilderConfig,
101 ) -> Result<Self> {
102 let directory = Arc::new(directory);
103 let schema = Arc::new(schema);
104
105 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
107
108 let metadata = super::IndexMetadata::new((*schema).clone());
110
111 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
113 Arc::clone(&directory),
114 Arc::clone(&schema),
115 metadata,
116 config.merge_policy.clone_box(),
117 config.term_cache_blocks,
118 ));
119
120 segment_manager.update_metadata(|_| {}).await?;
122
123 let pending_builds = Arc::new(AtomicUsize::new(0));
124
125 let num_workers = config.num_indexing_threads.max(1);
128 let max_concurrent_builds = num_workers.div_ceil(2).max(1);
129 let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
130
131 let worker_state = Arc::new(WorkerState {
133 directory: Arc::clone(&directory),
134 schema: Arc::clone(&schema),
135 config: config.clone(),
136 builder_config: builder_config.clone(),
137 tokenizers: FxHashMap::default(),
138 segment_id_sender,
139 pending_builds: Arc::clone(&pending_builds),
140 build_semaphore,
141 });
142
143 let mut worker_senders = Vec::with_capacity(num_workers);
145 let mut workers = Vec::with_capacity(num_workers);
146
147 for _ in 0..num_workers {
148 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
149 worker_senders.push(tx);
150
151 let state = Arc::clone(&worker_state);
152 let handle = tokio::spawn(async move {
153 Self::worker_loop(state, rx).await;
154 });
155 workers.push(handle);
156 }
157
158 Ok(Self {
159 directory,
160 schema,
161 config,
162 builder_config,
163 tokenizers: FxHashMap::default(),
164 worker_senders,
165 next_worker: AtomicUsize::new(0),
166 workers,
167 worker_state,
168 segment_manager,
169 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
170 pending_builds,
171 flushed_segments: AsyncMutex::new(Vec::new()),
172 })
173 }
174
175 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
177 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
178 }
179
180 pub async fn open_with_config(
182 directory: D,
183 config: IndexConfig,
184 builder_config: SegmentBuilderConfig,
185 ) -> Result<Self> {
186 let directory = Arc::new(directory);
187
188 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
190 let schema = Arc::new(metadata.schema.clone());
191
192 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
194
195 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
197 Arc::clone(&directory),
198 Arc::clone(&schema),
199 metadata,
200 config.merge_policy.clone_box(),
201 config.term_cache_blocks,
202 ));
203
204 let pending_builds = Arc::new(AtomicUsize::new(0));
205
206 let num_workers = config.num_indexing_threads.max(1);
208 let max_concurrent_builds = num_workers.div_ceil(2).max(1);
209 let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
210
211 let worker_state = Arc::new(WorkerState {
213 directory: Arc::clone(&directory),
214 schema: Arc::clone(&schema),
215 config: config.clone(),
216 builder_config: builder_config.clone(),
217 tokenizers: FxHashMap::default(),
218 segment_id_sender,
219 pending_builds: Arc::clone(&pending_builds),
220 build_semaphore,
221 });
222
223 let mut worker_senders = Vec::with_capacity(num_workers);
225 let mut workers = Vec::with_capacity(num_workers);
226
227 for _ in 0..num_workers {
228 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
229 worker_senders.push(tx);
230
231 let state = Arc::clone(&worker_state);
232 let handle = tokio::spawn(async move {
233 Self::worker_loop(state, rx).await;
234 });
235 workers.push(handle);
236 }
237
238 Ok(Self {
239 directory,
240 schema,
241 config,
242 builder_config,
243 tokenizers: FxHashMap::default(),
244 worker_senders,
245 next_worker: AtomicUsize::new(0),
246 workers,
247 worker_state,
248 segment_manager,
249 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
250 pending_builds,
251 flushed_segments: AsyncMutex::new(Vec::new()),
252 })
253 }
254
255 pub fn from_index(index: &super::Index<D>) -> Self {
260 let segment_manager = Arc::clone(&index.segment_manager);
261 let directory = Arc::clone(&index.directory);
262 let schema = Arc::clone(&index.schema);
263 let config = index.config.clone();
264 let builder_config = crate::segment::SegmentBuilderConfig::default();
265
266 let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
268
269 let pending_builds = Arc::new(AtomicUsize::new(0));
270
271 let num_workers = config.num_indexing_threads.max(1);
273 let max_concurrent_builds = num_workers.div_ceil(2).max(1);
274 let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
275
276 let worker_state = Arc::new(WorkerState {
278 directory: Arc::clone(&directory),
279 schema: Arc::clone(&schema),
280 config: config.clone(),
281 builder_config: builder_config.clone(),
282 tokenizers: FxHashMap::default(),
283 segment_id_sender,
284 pending_builds: Arc::clone(&pending_builds),
285 build_semaphore,
286 });
287
288 let mut worker_senders = Vec::with_capacity(num_workers);
290 let mut workers = Vec::with_capacity(num_workers);
291
292 for _ in 0..num_workers {
293 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
294 worker_senders.push(tx);
295
296 let state = Arc::clone(&worker_state);
297 let handle = tokio::spawn(async move {
298 Self::worker_loop(state, rx).await;
299 });
300 workers.push(handle);
301 }
302
303 Self {
304 directory,
305 schema,
306 config,
307 builder_config,
308 tokenizers: FxHashMap::default(),
309 worker_senders,
310 next_worker: AtomicUsize::new(0),
311 workers,
312 worker_state,
313 segment_manager,
314 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
315 pending_builds,
316 flushed_segments: AsyncMutex::new(Vec::new()),
317 }
318 }
319
320 pub fn schema(&self) -> &Schema {
322 &self.schema
323 }
324
325 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
327 self.tokenizers.insert(field, Box::new(tokenizer));
328 }
329
330 pub fn add_document(&self, doc: Document) -> Result<DocId> {
336 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
338 self.worker_senders[idx]
339 .send(WorkerMessage::Document(doc))
340 .map_err(|_| Error::Internal("Document channel closed".into()))?;
341 Ok(0)
342 }
343
344 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
349 let num_workers = self.worker_senders.len();
350 let count = documents.len();
351 let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
352 for (i, doc) in documents.into_iter().enumerate() {
353 let idx = (base + i) % num_workers;
354 let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
355 }
356 Ok(count)
357 }
358
359 async fn worker_loop(
361 state: Arc<WorkerState<D>>,
362 mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
363 ) {
364 let mut builder: Option<SegmentBuilder> = None;
365 let mut _doc_count = 0u32;
366
367 loop {
368 let msg = receiver.recv().await;
370
371 let Some(msg) = msg else {
372 if let Some(b) = builder.take()
374 && b.num_docs() > 0
375 {
376 Self::spawn_segment_build(&state, b).await;
377 }
378 return;
379 };
380
381 match msg {
382 WorkerMessage::Document(doc) => {
383 if builder.is_none() {
385 match SegmentBuilder::new(
386 (*state.schema).clone(),
387 state.builder_config.clone(),
388 ) {
389 Ok(mut b) => {
390 for (field, tokenizer) in &state.tokenizers {
391 b.set_tokenizer(*field, tokenizer.clone_box());
392 }
393 builder = Some(b);
394 }
395 Err(e) => {
396 eprintln!("Failed to create segment builder: {:?}", e);
397 continue;
398 }
399 }
400 }
401
402 let b = builder.as_mut().unwrap();
404 if let Err(e) = b.add_document(doc) {
405 eprintln!("Failed to index document: {:?}", e);
406 continue;
407 }
408
409 _doc_count += 1;
410
411 if b.num_docs().is_multiple_of(1000) {
415 b.recalibrate_memory();
416 }
417
418 let in_flight = state.pending_builds.load(Ordering::Relaxed);
423 let num_workers = state.config.num_indexing_threads.max(1);
424 let effective_slots = num_workers * 2 + in_flight * 2;
425 let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
426 let builder_memory = b.estimated_memory_bytes();
427
428 if _doc_count.is_multiple_of(10_000) {
430 log::debug!(
431 "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
432 b.num_docs(),
433 builder_memory as f64 / (1024.0 * 1024.0),
434 per_worker_limit as f64 / (1024.0 * 1024.0)
435 );
436 }
437
438 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
441 let doc_count = b.num_docs();
442
443 if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
444 let stats = b.stats();
446 let mb = stats.memory_breakdown;
447 log::info!(
448 "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
449 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
450 unique_terms={}, sparse_dims={}",
451 doc_count,
452 builder_memory as f64 / (1024.0 * 1024.0),
453 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
454 mb.postings_bytes as f64 / (1024.0 * 1024.0),
455 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
456 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
457 mb.interner_bytes as f64 / (1024.0 * 1024.0),
458 stats.unique_terms,
459 b.sparse_dim_count(),
460 );
461 let full_builder = builder.take().unwrap();
462 Self::spawn_segment_build(&state, full_builder).await;
463 _doc_count = 0;
464 }
465 }
466 WorkerMessage::Flush(respond) => {
467 if let Some(b) = builder.take()
469 && b.num_docs() > 0
470 {
471 let stats = b.stats();
473 let mb = stats.memory_breakdown;
474 log::info!(
475 "[indexing_flush] docs={}, total_mem={:.2} MB, \
476 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
477 interner={:.2} MB, positions={:.2} MB, unique_terms={}",
478 b.num_docs(),
479 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
480 mb.postings_bytes as f64 / (1024.0 * 1024.0),
481 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
482 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
483 mb.dense_vector_count,
484 mb.interner_bytes as f64 / (1024.0 * 1024.0),
485 mb.position_index_bytes as f64 / (1024.0 * 1024.0),
486 stats.unique_terms,
487 );
488 Self::spawn_segment_build(&state, b).await;
489 }
490 _doc_count = 0;
491 let _ = respond.send(());
493 }
494 }
495 }
496 }
497 async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
498 let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
501
502 let directory = Arc::clone(&state.directory);
503 let segment_id = SegmentId::new();
504 let segment_hex = segment_id.to_hex();
505 let sender = state.segment_id_sender.clone();
506 let pending_builds = Arc::clone(&state.pending_builds);
507
508 let doc_count = builder.num_docs();
509 let memory_bytes = builder.estimated_memory_bytes();
510
511 log::info!(
512 "[segment_build_started] segment_id={} doc_count={} memory_bytes={}",
513 segment_hex,
514 doc_count,
515 memory_bytes
516 );
517
518 pending_builds.fetch_add(1, Ordering::SeqCst);
519
520 tokio::spawn(async move {
521 let _permit = permit; let build_start = std::time::Instant::now();
523 let result = match builder.build(directory.as_ref(), segment_id).await {
524 Ok(meta) => {
525 let build_duration_ms = build_start.elapsed().as_millis() as u64;
526 log::info!(
527 "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
528 segment_hex,
529 meta.num_docs,
530 build_duration_ms
531 );
532 (segment_hex, meta.num_docs)
533 }
534 Err(e) => {
535 log::error!(
536 "[segment_build_failed] segment_id={} error={}",
537 segment_hex,
538 e
539 );
540 eprintln!("Background segment build failed: {:?}", e);
541 (segment_hex, 0)
543 }
544 };
545 let _ = sender.send(result);
548 pending_builds.fetch_sub(1, Ordering::SeqCst);
549 });
550 }
551
552 pub fn pending_build_count(&self) -> usize {
554 self.pending_builds.load(Ordering::SeqCst)
555 }
556
557 pub fn pending_merge_count(&self) -> usize {
559 self.segment_manager.pending_merge_count()
560 }
561
562 pub async fn maybe_merge(&self) {
567 self.segment_manager.maybe_merge().await;
568 }
569
570 pub async fn wait_for_merges(&self) {
572 self.segment_manager.wait_for_merges().await;
573 }
574
575 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
578 self.segment_manager.tracker()
579 }
580
581 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
584 self.segment_manager.acquire_snapshot().await
585 }
586
587 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
595 self.segment_manager.cleanup_orphan_segments().await
596 }
597
598 pub async fn flush(&self) -> Result<()> {
607 let mut responses = Vec::with_capacity(self.worker_senders.len());
609
610 for sender in &self.worker_senders {
611 let (tx, rx) = oneshot::channel();
612 if sender.send(WorkerMessage::Flush(tx)).is_err() {
613 continue;
615 }
616 responses.push(rx);
617 }
618
619 for rx in responses {
621 let _ = rx.await;
622 }
623
624 let mut receiver = self.segment_id_receiver.lock().await;
626 while self.pending_builds.load(Ordering::SeqCst) > 0 {
627 if let Some((segment_hex, num_docs)) = receiver.recv().await {
628 if num_docs > 0 {
629 self.flushed_segments
630 .lock()
631 .await
632 .push((segment_hex, num_docs));
633 }
634 } else {
635 break; }
637 }
638
639 while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
641 if num_docs > 0 {
642 self.flushed_segments
643 .lock()
644 .await
645 .push((segment_hex, num_docs));
646 }
647 }
648
649 Ok(())
650 }
651
652 pub async fn commit(&self) -> Result<()> {
661 self.flush().await?;
663
664 let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
666 for (segment_hex, num_docs) in segments {
667 self.segment_manager
668 .register_segment(segment_hex, num_docs)
669 .await?;
670 }
671
672 self.maybe_build_vector_index().await?;
674
675 Ok(())
676 }
677
678 async fn do_merge(&self) -> Result<()> {
682 let segment_ids = self.segment_manager.get_segment_ids().await;
683
684 if segment_ids.len() < 2 {
685 return Ok(());
686 }
687
688 let ids_to_merge: Vec<String> = segment_ids;
689
690 let mut readers = Vec::new();
692 let mut doc_offset = 0u32;
693
694 for id_str in &ids_to_merge {
695 let segment_id = SegmentId::from_hex(id_str)
696 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
697 let reader = SegmentReader::open(
698 self.directory.as_ref(),
699 segment_id,
700 Arc::clone(&self.schema),
701 doc_offset,
702 self.config.term_cache_blocks,
703 )
704 .await?;
705 doc_offset += reader.meta().num_docs;
706 readers.push(reader);
707 }
708
709 let total_docs: u32 = readers.iter().map(|r| r.meta().num_docs).sum();
711
712 let merger = SegmentMerger::new(Arc::clone(&self.schema));
714 let new_segment_id = SegmentId::new();
715 merger
716 .merge(self.directory.as_ref(), &readers, new_segment_id)
717 .await
718 .map(|_| ())?;
719
720 self.segment_manager
722 .replace_segments(vec![(new_segment_id.to_hex(), total_docs)], ids_to_merge)
723 .await?;
724
725 Ok(())
726 }
727
728 pub async fn force_merge(&self) -> Result<()> {
730 self.commit().await?;
732 self.wait_for_merges().await;
734 self.do_merge().await
736 }
737
738 }