1use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
18use crate::tokenizer::BoxedTokenizer;
19
20use super::IndexConfig;
21
22enum WorkerMessage {
24 Document(Document),
26 Flush(oneshot::Sender<()>),
28}
29
30pub struct IndexWriter<D: DirectoryWriter + 'static> {
47 pub(super) directory: Arc<D>,
48 pub(super) schema: Arc<Schema>,
49 pub(super) config: IndexConfig,
50 #[allow(dead_code)] builder_config: SegmentBuilderConfig,
52 tokenizers: FxHashMap<Field, BoxedTokenizer>,
53 worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
55 next_worker: AtomicUsize,
57 #[allow(dead_code)]
59 workers: Vec<JoinHandle<()>>,
60 #[allow(dead_code)]
62 worker_state: Arc<WorkerState<D>>,
63 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
65 pub(super) trained_structures:
67 Arc<std::sync::RwLock<Option<crate::segment::TrainedVectorStructures>>>,
68 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
70 pending_builds: Arc<AtomicUsize>,
72 flushed_segments: AsyncMutex<Vec<(String, u32)>>,
74}
75
76struct WorkerState<D: DirectoryWriter + 'static> {
78 directory: Arc<D>,
79 schema: Arc<Schema>,
80 config: IndexConfig,
81 builder_config: SegmentBuilderConfig,
82 tokenizers: FxHashMap<Field, BoxedTokenizer>,
83 segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
84 pending_builds: Arc<AtomicUsize>,
85 build_semaphore: Arc<tokio::sync::Semaphore>,
88 trained_structures: Arc<std::sync::RwLock<Option<crate::segment::TrainedVectorStructures>>>,
92}
93
94impl<D: DirectoryWriter + 'static> IndexWriter<D> {
95 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
97 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
98 }
99
100 pub async fn create_with_config(
102 directory: D,
103 schema: Schema,
104 config: IndexConfig,
105 builder_config: SegmentBuilderConfig,
106 ) -> Result<Self> {
107 let directory = Arc::new(directory);
108 let schema = Arc::new(schema);
109
110 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
112
113 let metadata = super::IndexMetadata::new((*schema).clone());
115
116 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
118 Arc::clone(&directory),
119 Arc::clone(&schema),
120 metadata,
121 config.merge_policy.clone_box(),
122 config.term_cache_blocks,
123 ));
124
125 segment_manager.update_metadata(|_| {}).await?;
127
128 let pending_builds = Arc::new(AtomicUsize::new(0));
129
130 let num_workers = config.num_indexing_threads.max(1);
133 let max_concurrent_builds = num_workers.div_ceil(2).max(1);
134 let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
135
136 let trained_structures = Arc::new(std::sync::RwLock::new(None));
138 let worker_state = Arc::new(WorkerState {
139 directory: Arc::clone(&directory),
140 schema: Arc::clone(&schema),
141 config: config.clone(),
142 builder_config: builder_config.clone(),
143 tokenizers: FxHashMap::default(),
144 segment_id_sender,
145 pending_builds: Arc::clone(&pending_builds),
146 build_semaphore,
147 trained_structures: Arc::clone(&trained_structures),
148 });
149
150 let mut worker_senders = Vec::with_capacity(num_workers);
152 let mut workers = Vec::with_capacity(num_workers);
153
154 for _ in 0..num_workers {
155 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
156 worker_senders.push(tx);
157
158 let state = Arc::clone(&worker_state);
159 let handle = tokio::spawn(async move {
160 Self::worker_loop(state, rx).await;
161 });
162 workers.push(handle);
163 }
164
165 Ok(Self {
166 directory,
167 schema,
168 config,
169 builder_config,
170 tokenizers: FxHashMap::default(),
171 worker_senders,
172 next_worker: AtomicUsize::new(0),
173 workers,
174 worker_state,
175 segment_manager,
176 trained_structures,
177 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
178 pending_builds,
179 flushed_segments: AsyncMutex::new(Vec::new()),
180 })
181 }
182
183 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
185 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
186 }
187
188 pub async fn open_with_config(
190 directory: D,
191 config: IndexConfig,
192 builder_config: SegmentBuilderConfig,
193 ) -> Result<Self> {
194 let directory = Arc::new(directory);
195
196 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
198 let schema = Arc::new(metadata.schema.clone());
199
200 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
202
203 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
205 Arc::clone(&directory),
206 Arc::clone(&schema),
207 metadata,
208 config.merge_policy.clone_box(),
209 config.term_cache_blocks,
210 ));
211
212 let pending_builds = Arc::new(AtomicUsize::new(0));
213
214 let num_workers = config.num_indexing_threads.max(1);
216 let max_concurrent_builds = num_workers.div_ceil(2).max(1);
217 let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
218
219 let trained_structures = Arc::new(std::sync::RwLock::new(None));
221 let worker_state = Arc::new(WorkerState {
222 directory: Arc::clone(&directory),
223 schema: Arc::clone(&schema),
224 config: config.clone(),
225 builder_config: builder_config.clone(),
226 tokenizers: FxHashMap::default(),
227 segment_id_sender,
228 pending_builds: Arc::clone(&pending_builds),
229 build_semaphore,
230 trained_structures: Arc::clone(&trained_structures),
231 });
232
233 let mut worker_senders = Vec::with_capacity(num_workers);
235 let mut workers = Vec::with_capacity(num_workers);
236
237 for _ in 0..num_workers {
238 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
239 worker_senders.push(tx);
240
241 let state = Arc::clone(&worker_state);
242 let handle = tokio::spawn(async move {
243 Self::worker_loop(state, rx).await;
244 });
245 workers.push(handle);
246 }
247
248 let writer = Self {
249 directory,
250 schema,
251 config,
252 builder_config,
253 tokenizers: FxHashMap::default(),
254 worker_senders,
255 next_worker: AtomicUsize::new(0),
256 workers,
257 worker_state,
258 segment_manager,
259 trained_structures,
260 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
261 pending_builds,
262 flushed_segments: AsyncMutex::new(Vec::new()),
263 };
264
265 writer.publish_trained_structures().await;
267
268 Ok(writer)
269 }
270
271 pub fn from_index(index: &super::Index<D>) -> Self {
276 let segment_manager = Arc::clone(&index.segment_manager);
277 let directory = Arc::clone(&index.directory);
278 let schema = Arc::clone(&index.schema);
279 let config = index.config.clone();
280 let builder_config = crate::segment::SegmentBuilderConfig::default();
281
282 let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
284
285 let pending_builds = Arc::new(AtomicUsize::new(0));
286
287 let num_workers = config.num_indexing_threads.max(1);
289 let max_concurrent_builds = num_workers.div_ceil(2).max(1);
290 let build_semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds));
291
292 let initial_trained = if !index.trained_centroids.is_empty() {
294 Some(crate::segment::TrainedVectorStructures {
295 centroids: index.trained_centroids.clone(),
296 codebooks: index.trained_codebooks.clone(),
297 })
298 } else {
299 None
300 };
301 let trained_structures = Arc::new(std::sync::RwLock::new(initial_trained));
302
303 let worker_state = Arc::new(WorkerState {
304 directory: Arc::clone(&directory),
305 schema: Arc::clone(&schema),
306 config: config.clone(),
307 builder_config: builder_config.clone(),
308 tokenizers: FxHashMap::default(),
309 segment_id_sender,
310 pending_builds: Arc::clone(&pending_builds),
311 build_semaphore,
312 trained_structures: Arc::clone(&trained_structures),
313 });
314
315 let mut worker_senders = Vec::with_capacity(num_workers);
317 let mut workers = Vec::with_capacity(num_workers);
318
319 for _ in 0..num_workers {
320 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
321 worker_senders.push(tx);
322
323 let state = Arc::clone(&worker_state);
324 let handle = tokio::spawn(async move {
325 Self::worker_loop(state, rx).await;
326 });
327 workers.push(handle);
328 }
329
330 Self {
331 directory,
332 schema,
333 config,
334 builder_config,
335 tokenizers: FxHashMap::default(),
336 worker_senders,
337 next_worker: AtomicUsize::new(0),
338 workers,
339 worker_state,
340 segment_manager,
341 trained_structures,
342 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
343 pending_builds,
344 flushed_segments: AsyncMutex::new(Vec::new()),
345 }
346 }
347
348 pub fn schema(&self) -> &Schema {
350 &self.schema
351 }
352
353 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
355 self.tokenizers.insert(field, Box::new(tokenizer));
356 }
357
358 pub fn add_document(&self, doc: Document) -> Result<DocId> {
364 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
366 self.worker_senders[idx]
367 .send(WorkerMessage::Document(doc))
368 .map_err(|_| Error::Internal("Document channel closed".into()))?;
369 Ok(0)
370 }
371
372 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
377 let num_workers = self.worker_senders.len();
378 let count = documents.len();
379 let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
380 for (i, doc) in documents.into_iter().enumerate() {
381 let idx = (base + i) % num_workers;
382 let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
383 }
384 Ok(count)
385 }
386
387 async fn worker_loop(
389 state: Arc<WorkerState<D>>,
390 mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
391 ) {
392 let mut builder: Option<SegmentBuilder> = None;
393 let mut _doc_count = 0u32;
394
395 loop {
396 let msg = receiver.recv().await;
398
399 let Some(msg) = msg else {
400 if let Some(b) = builder.take()
402 && b.num_docs() > 0
403 {
404 Self::spawn_segment_build(&state, b).await;
405 }
406 return;
407 };
408
409 match msg {
410 WorkerMessage::Document(doc) => {
411 if builder.is_none() {
413 match SegmentBuilder::new(
414 (*state.schema).clone(),
415 state.builder_config.clone(),
416 ) {
417 Ok(mut b) => {
418 for (field, tokenizer) in &state.tokenizers {
419 b.set_tokenizer(*field, tokenizer.clone_box());
420 }
421 builder = Some(b);
422 }
423 Err(e) => {
424 eprintln!("Failed to create segment builder: {:?}", e);
425 continue;
426 }
427 }
428 }
429
430 let b = builder.as_mut().unwrap();
432 if let Err(e) = b.add_document(doc) {
433 eprintln!("Failed to index document: {:?}", e);
434 continue;
435 }
436
437 _doc_count += 1;
438
439 if b.num_docs().is_multiple_of(1000) {
443 b.recalibrate_memory();
444 }
445
446 let in_flight = state.pending_builds.load(Ordering::Relaxed);
451 let num_workers = state.config.num_indexing_threads.max(1);
452 let effective_slots = num_workers * 2 + in_flight * 2;
453 let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
454 let builder_memory = b.estimated_memory_bytes();
455
456 if _doc_count.is_multiple_of(10_000) {
458 log::debug!(
459 "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
460 b.num_docs(),
461 builder_memory as f64 / (1024.0 * 1024.0),
462 per_worker_limit as f64 / (1024.0 * 1024.0)
463 );
464 }
465
466 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
469 let doc_count = b.num_docs();
470
471 if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
472 let stats = b.stats();
474 let mb = stats.memory_breakdown;
475 log::info!(
476 "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
477 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
478 unique_terms={}, sparse_dims={}",
479 doc_count,
480 builder_memory as f64 / (1024.0 * 1024.0),
481 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
482 mb.postings_bytes as f64 / (1024.0 * 1024.0),
483 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
484 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
485 mb.interner_bytes as f64 / (1024.0 * 1024.0),
486 stats.unique_terms,
487 b.sparse_dim_count(),
488 );
489 let full_builder = builder.take().unwrap();
490 Self::spawn_segment_build(&state, full_builder).await;
491 _doc_count = 0;
492 }
493 }
494 WorkerMessage::Flush(respond) => {
495 if let Some(b) = builder.take()
497 && b.num_docs() > 0
498 {
499 let stats = b.stats();
501 let mb = stats.memory_breakdown;
502 log::info!(
503 "[indexing_flush] docs={}, total_mem={:.2} MB, \
504 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
505 interner={:.2} MB, positions={:.2} MB, unique_terms={}",
506 b.num_docs(),
507 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
508 mb.postings_bytes as f64 / (1024.0 * 1024.0),
509 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
510 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
511 mb.dense_vector_count,
512 mb.interner_bytes as f64 / (1024.0 * 1024.0),
513 mb.position_index_bytes as f64 / (1024.0 * 1024.0),
514 stats.unique_terms,
515 );
516 Self::spawn_segment_build(&state, b).await;
517 }
518 _doc_count = 0;
519 let _ = respond.send(());
521 }
522 }
523 }
524 }
525 async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
526 let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
529
530 let directory = Arc::clone(&state.directory);
531 let segment_id = SegmentId::new();
532 let segment_hex = segment_id.to_hex();
533 let sender = state.segment_id_sender.clone();
534 let pending_builds = Arc::clone(&state.pending_builds);
535
536 let trained = state
538 .trained_structures
539 .read()
540 .ok()
541 .and_then(|guard| guard.clone());
542
543 let doc_count = builder.num_docs();
544 let memory_bytes = builder.estimated_memory_bytes();
545
546 log::info!(
547 "[segment_build_started] segment_id={} doc_count={} memory_bytes={} ann={}",
548 segment_hex,
549 doc_count,
550 memory_bytes,
551 trained.is_some()
552 );
553
554 pending_builds.fetch_add(1, Ordering::SeqCst);
555
556 tokio::spawn(async move {
557 let _permit = permit; let build_start = std::time::Instant::now();
559 let result = match builder
560 .build(directory.as_ref(), segment_id, trained.as_ref())
561 .await
562 {
563 Ok(meta) => {
564 let build_duration_ms = build_start.elapsed().as_millis() as u64;
565 log::info!(
566 "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
567 segment_hex,
568 meta.num_docs,
569 build_duration_ms
570 );
571 (segment_hex, meta.num_docs)
572 }
573 Err(e) => {
574 log::error!(
575 "[segment_build_failed] segment_id={} error={}",
576 segment_hex,
577 e
578 );
579 eprintln!("Background segment build failed: {:?}", e);
580 (segment_hex, 0)
582 }
583 };
584 let _ = sender.send(result);
587 pending_builds.fetch_sub(1, Ordering::SeqCst);
588 });
589 }
590
591 pub fn pending_build_count(&self) -> usize {
593 self.pending_builds.load(Ordering::SeqCst)
594 }
595
596 pub fn pending_merge_count(&self) -> usize {
598 self.segment_manager.pending_merge_count()
599 }
600
601 pub async fn maybe_merge(&self) {
606 self.segment_manager.maybe_merge().await;
607 }
608
609 pub async fn wait_for_merges(&self) {
611 self.segment_manager.wait_for_merges().await;
612 }
613
614 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
617 self.segment_manager.tracker()
618 }
619
620 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
623 self.segment_manager.acquire_snapshot().await
624 }
625
626 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
634 self.segment_manager.cleanup_orphan_segments().await
635 }
636
637 pub async fn flush(&self) -> Result<()> {
646 let mut responses = Vec::with_capacity(self.worker_senders.len());
648
649 for sender in &self.worker_senders {
650 let (tx, rx) = oneshot::channel();
651 if sender.send(WorkerMessage::Flush(tx)).is_err() {
652 continue;
654 }
655 responses.push(rx);
656 }
657
658 for rx in responses {
660 let _ = rx.await;
661 }
662
663 let mut receiver = self.segment_id_receiver.lock().await;
665 while self.pending_builds.load(Ordering::SeqCst) > 0 {
666 if let Some((segment_hex, num_docs)) = receiver.recv().await {
667 if num_docs > 0 {
668 self.flushed_segments
669 .lock()
670 .await
671 .push((segment_hex, num_docs));
672 }
673 } else {
674 break; }
676 }
677
678 while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
680 if num_docs > 0 {
681 self.flushed_segments
682 .lock()
683 .await
684 .push((segment_hex, num_docs));
685 }
686 }
687
688 Ok(())
689 }
690
691 pub async fn commit(&self) -> Result<()> {
700 self.flush().await?;
702
703 let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
705 for (segment_hex, num_docs) in segments {
706 self.segment_manager
707 .register_segment(segment_hex, num_docs)
708 .await?;
709 }
710
711 self.maybe_build_vector_index().await?;
713
714 Ok(())
715 }
716
717 async fn do_merge(&self) -> Result<()> {
722 let ids_to_merge = self.segment_manager.get_segment_ids().await;
723
724 if ids_to_merge.len() < 2 {
725 return Ok(());
726 }
727
728 let metadata_arc = self.segment_manager.metadata();
729 let (new_segment_id, total_docs) = crate::merge::SegmentManager::do_merge(
730 self.directory.as_ref(),
731 &self.schema,
732 &ids_to_merge,
733 self.config.term_cache_blocks,
734 &metadata_arc,
735 )
736 .await?;
737
738 self.segment_manager
740 .replace_segments(vec![(new_segment_id, total_docs)], ids_to_merge)
741 .await?;
742
743 Ok(())
744 }
745
746 pub async fn force_merge(&self) -> Result<()> {
748 self.commit().await?;
750 self.wait_for_merges().await;
752 self.do_merge().await
754 }
755
756 }