1use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18 SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24enum WorkerMessage {
26 Document(Document),
28 Flush(oneshot::Sender<()>),
30}
31
32pub struct IndexWriter<D: DirectoryWriter + 'static> {
49 pub(super) directory: Arc<D>,
50 pub(super) schema: Arc<Schema>,
51 pub(super) config: IndexConfig,
52 #[allow(dead_code)] builder_config: SegmentBuilderConfig,
54 tokenizers: FxHashMap<Field, BoxedTokenizer>,
55 worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57 next_worker: AtomicUsize,
59 #[allow(dead_code)]
61 workers: Vec<JoinHandle<()>>,
62 #[allow(dead_code)]
64 worker_state: Arc<WorkerState<D>>,
65 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<(String, u32)>>,
69 pending_builds: Arc<AtomicUsize>,
71 flushed_segments: AsyncMutex<Vec<(String, u32)>>,
73 #[allow(dead_code)]
75 global_memory_bytes: Arc<AtomicUsize>,
76}
77
78struct WorkerState<D: DirectoryWriter + 'static> {
80 directory: Arc<D>,
81 schema: Arc<Schema>,
82 config: IndexConfig,
83 builder_config: SegmentBuilderConfig,
84 tokenizers: FxHashMap<Field, BoxedTokenizer>,
85 segment_id_sender: mpsc::UnboundedSender<(String, u32)>,
86 pending_builds: Arc<AtomicUsize>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93 }
94
95 pub async fn create_with_config(
97 directory: D,
98 schema: Schema,
99 config: IndexConfig,
100 builder_config: SegmentBuilderConfig,
101 ) -> Result<Self> {
102 let directory = Arc::new(directory);
103 let schema = Arc::new(schema);
104
105 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
107
108 let metadata = super::IndexMetadata::new((*schema).clone());
110
111 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
113 Arc::clone(&directory),
114 Arc::clone(&schema),
115 metadata,
116 config.merge_policy.clone_box(),
117 config.term_cache_blocks,
118 ));
119
120 segment_manager.update_metadata(|_| {}).await?;
122
123 let pending_builds = Arc::new(AtomicUsize::new(0));
124 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
125
126 let worker_state = Arc::new(WorkerState {
128 directory: Arc::clone(&directory),
129 schema: Arc::clone(&schema),
130 config: config.clone(),
131 builder_config: builder_config.clone(),
132 tokenizers: FxHashMap::default(),
133 segment_id_sender,
134 pending_builds: Arc::clone(&pending_builds),
135 });
136
137 let num_workers = config.num_indexing_threads.max(1);
139 let mut worker_senders = Vec::with_capacity(num_workers);
140 let mut workers = Vec::with_capacity(num_workers);
141
142 for _ in 0..num_workers {
143 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
144 worker_senders.push(tx);
145
146 let state = Arc::clone(&worker_state);
147 let handle = tokio::spawn(async move {
148 Self::worker_loop(state, rx).await;
149 });
150 workers.push(handle);
151 }
152
153 Ok(Self {
154 directory,
155 schema,
156 config,
157 builder_config,
158 tokenizers: FxHashMap::default(),
159 worker_senders,
160 next_worker: AtomicUsize::new(0),
161 workers,
162 worker_state,
163 segment_manager,
164 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
165 pending_builds,
166 global_memory_bytes,
167 flushed_segments: AsyncMutex::new(Vec::new()),
168 })
169 }
170
171 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
173 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
174 }
175
176 pub async fn open_with_config(
178 directory: D,
179 config: IndexConfig,
180 builder_config: SegmentBuilderConfig,
181 ) -> Result<Self> {
182 let directory = Arc::new(directory);
183
184 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
186 let schema = Arc::new(metadata.schema.clone());
187
188 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
190
191 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
193 Arc::clone(&directory),
194 Arc::clone(&schema),
195 metadata,
196 config.merge_policy.clone_box(),
197 config.term_cache_blocks,
198 ));
199
200 let pending_builds = Arc::new(AtomicUsize::new(0));
201 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
202
203 let worker_state = Arc::new(WorkerState {
205 directory: Arc::clone(&directory),
206 schema: Arc::clone(&schema),
207 config: config.clone(),
208 builder_config: builder_config.clone(),
209 tokenizers: FxHashMap::default(),
210 segment_id_sender,
211 pending_builds: Arc::clone(&pending_builds),
212 });
213
214 let num_workers = config.num_indexing_threads.max(1);
216 let mut worker_senders = Vec::with_capacity(num_workers);
217 let mut workers = Vec::with_capacity(num_workers);
218
219 for _ in 0..num_workers {
220 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
221 worker_senders.push(tx);
222
223 let state = Arc::clone(&worker_state);
224 let handle = tokio::spawn(async move {
225 Self::worker_loop(state, rx).await;
226 });
227 workers.push(handle);
228 }
229
230 Ok(Self {
231 directory,
232 schema,
233 config,
234 builder_config,
235 tokenizers: FxHashMap::default(),
236 worker_senders,
237 next_worker: AtomicUsize::new(0),
238 workers,
239 worker_state,
240 segment_manager,
241 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
242 pending_builds,
243 global_memory_bytes,
244 flushed_segments: AsyncMutex::new(Vec::new()),
245 })
246 }
247
248 pub fn from_index(index: &super::Index<D>) -> Self {
253 let segment_manager = Arc::clone(&index.segment_manager);
254 let directory = Arc::clone(&index.directory);
255 let schema = Arc::clone(&index.schema);
256 let config = index.config.clone();
257 let builder_config = crate::segment::SegmentBuilderConfig::default();
258
259 let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
261
262 let pending_builds = Arc::new(AtomicUsize::new(0));
263 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
264
265 let worker_state = Arc::new(WorkerState {
267 directory: Arc::clone(&directory),
268 schema: Arc::clone(&schema),
269 config: config.clone(),
270 builder_config: builder_config.clone(),
271 tokenizers: FxHashMap::default(),
272 segment_id_sender,
273 pending_builds: Arc::clone(&pending_builds),
274 });
275
276 let num_workers = config.num_indexing_threads.max(1);
278 let mut worker_senders = Vec::with_capacity(num_workers);
279 let mut workers = Vec::with_capacity(num_workers);
280
281 for _ in 0..num_workers {
282 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
283 worker_senders.push(tx);
284
285 let state = Arc::clone(&worker_state);
286 let handle = tokio::spawn(async move {
287 Self::worker_loop(state, rx).await;
288 });
289 workers.push(handle);
290 }
291
292 Self {
293 directory,
294 schema,
295 config,
296 builder_config,
297 tokenizers: FxHashMap::default(),
298 worker_senders,
299 next_worker: AtomicUsize::new(0),
300 workers,
301 worker_state,
302 segment_manager,
303 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
304 pending_builds,
305 global_memory_bytes,
306 flushed_segments: AsyncMutex::new(Vec::new()),
307 }
308 }
309
310 pub fn schema(&self) -> &Schema {
312 &self.schema
313 }
314
315 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
317 self.tokenizers.insert(field, Box::new(tokenizer));
318 }
319
320 pub fn add_document(&self, doc: Document) -> Result<DocId> {
326 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
328 self.worker_senders[idx]
329 .send(WorkerMessage::Document(doc))
330 .map_err(|_| Error::Internal("Document channel closed".into()))?;
331 Ok(0)
332 }
333
334 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
339 let num_workers = self.worker_senders.len();
340 let count = documents.len();
341 let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
342 for (i, doc) in documents.into_iter().enumerate() {
343 let idx = (base + i) % num_workers;
344 let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
345 }
346 Ok(count)
347 }
348
349 async fn worker_loop(
351 state: Arc<WorkerState<D>>,
352 mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
353 ) {
354 let mut builder: Option<SegmentBuilder> = None;
355 let mut _doc_count = 0u32;
356
357 loop {
358 let msg = receiver.recv().await;
360
361 let Some(msg) = msg else {
362 if let Some(b) = builder.take()
364 && b.num_docs() > 0
365 {
366 Self::spawn_segment_build(&state, b);
367 }
368 return;
369 };
370
371 match msg {
372 WorkerMessage::Document(doc) => {
373 if builder.is_none() {
375 match SegmentBuilder::new(
376 (*state.schema).clone(),
377 state.builder_config.clone(),
378 ) {
379 Ok(mut b) => {
380 for (field, tokenizer) in &state.tokenizers {
381 b.set_tokenizer(*field, tokenizer.clone_box());
382 }
383 builder = Some(b);
384 }
385 Err(e) => {
386 eprintln!("Failed to create segment builder: {:?}", e);
387 continue;
388 }
389 }
390 }
391
392 let b = builder.as_mut().unwrap();
394 if let Err(e) = b.add_document(doc) {
395 eprintln!("Failed to index document: {:?}", e);
396 continue;
397 }
398
399 _doc_count += 1;
400
401 let per_worker_limit = state.config.max_indexing_memory_bytes
403 / state.config.num_indexing_threads.max(1);
404 let builder_memory = b.estimated_memory_bytes();
405
406 if _doc_count.is_multiple_of(10_000) {
408 log::debug!(
409 "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
410 b.num_docs(),
411 builder_memory as f64 / (1024.0 * 1024.0),
412 per_worker_limit as f64 / (1024.0 * 1024.0)
413 );
414 }
415
416 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
419 let doc_count = b.num_docs();
420
421 if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
422 let stats = b.stats();
424 let mb = stats.memory_breakdown;
425 log::info!(
426 "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
427 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
428 unique_terms={}, sparse_dims={}",
429 doc_count,
430 builder_memory as f64 / (1024.0 * 1024.0),
431 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
432 mb.postings_bytes as f64 / (1024.0 * 1024.0),
433 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
434 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
435 mb.interner_bytes as f64 / (1024.0 * 1024.0),
436 stats.unique_terms,
437 b.sparse_dim_count(),
438 );
439 let full_builder = builder.take().unwrap();
440 Self::spawn_segment_build(&state, full_builder);
441 _doc_count = 0;
442 }
443 }
444 WorkerMessage::Flush(respond) => {
445 if let Some(b) = builder.take()
447 && b.num_docs() > 0
448 {
449 let stats = b.stats();
451 let mb = stats.memory_breakdown;
452 log::info!(
453 "[indexing_flush] docs={}, total_mem={:.2} MB, \
454 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
455 interner={:.2} MB, positions={:.2} MB, unique_terms={}",
456 b.num_docs(),
457 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
458 mb.postings_bytes as f64 / (1024.0 * 1024.0),
459 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
460 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
461 mb.dense_vector_count,
462 mb.interner_bytes as f64 / (1024.0 * 1024.0),
463 mb.position_index_bytes as f64 / (1024.0 * 1024.0),
464 stats.unique_terms,
465 );
466 Self::spawn_segment_build(&state, b);
467 }
468 _doc_count = 0;
469 let _ = respond.send(());
471 }
472 }
473 }
474 }
475 fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
476 let directory = Arc::clone(&state.directory);
477 let segment_id = SegmentId::new();
478 let segment_hex = segment_id.to_hex();
479 let sender = state.segment_id_sender.clone();
480 let pending_builds = Arc::clone(&state.pending_builds);
481
482 let doc_count = builder.num_docs();
483 let memory_bytes = builder.estimated_memory_bytes();
484
485 log::info!(
486 "[segment_build_started] segment_id={} doc_count={} memory_bytes={}",
487 segment_hex,
488 doc_count,
489 memory_bytes
490 );
491
492 pending_builds.fetch_add(1, Ordering::SeqCst);
493
494 tokio::spawn(async move {
495 let build_start = std::time::Instant::now();
496 let result = match builder.build(directory.as_ref(), segment_id).await {
497 Ok(meta) => {
498 let build_duration_ms = build_start.elapsed().as_millis() as u64;
499 log::info!(
500 "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
501 segment_hex,
502 meta.num_docs,
503 build_duration_ms
504 );
505 (segment_hex, meta.num_docs)
506 }
507 Err(e) => {
508 log::error!(
509 "[segment_build_failed] segment_id={} error={}",
510 segment_hex,
511 e
512 );
513 eprintln!("Background segment build failed: {:?}", e);
514 (segment_hex, 0)
516 }
517 };
518 let _ = sender.send(result);
521 pending_builds.fetch_sub(1, Ordering::SeqCst);
522 });
523 }
524
525 pub fn pending_build_count(&self) -> usize {
527 self.pending_builds.load(Ordering::SeqCst)
528 }
529
530 pub fn pending_merge_count(&self) -> usize {
532 self.segment_manager.pending_merge_count()
533 }
534
535 pub async fn maybe_merge(&self) {
540 self.segment_manager.maybe_merge().await;
541 }
542
543 pub async fn wait_for_merges(&self) {
545 self.segment_manager.wait_for_merges().await;
546 }
547
548 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
551 self.segment_manager.tracker()
552 }
553
554 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
557 self.segment_manager.acquire_snapshot().await
558 }
559
560 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
568 self.segment_manager.cleanup_orphan_segments().await
569 }
570
571 pub async fn flush(&self) -> Result<()> {
580 let mut responses = Vec::with_capacity(self.worker_senders.len());
582
583 for sender in &self.worker_senders {
584 let (tx, rx) = oneshot::channel();
585 if sender.send(WorkerMessage::Flush(tx)).is_err() {
586 continue;
588 }
589 responses.push(rx);
590 }
591
592 for rx in responses {
594 let _ = rx.await;
595 }
596
597 let mut receiver = self.segment_id_receiver.lock().await;
599 while self.pending_builds.load(Ordering::SeqCst) > 0 {
600 if let Some((segment_hex, num_docs)) = receiver.recv().await {
601 if num_docs > 0 {
602 self.flushed_segments
603 .lock()
604 .await
605 .push((segment_hex, num_docs));
606 }
607 } else {
608 break; }
610 }
611
612 while let Ok((segment_hex, num_docs)) = receiver.try_recv() {
614 if num_docs > 0 {
615 self.flushed_segments
616 .lock()
617 .await
618 .push((segment_hex, num_docs));
619 }
620 }
621
622 Ok(())
623 }
624
625 pub async fn commit(&self) -> Result<()> {
634 self.flush().await?;
636
637 let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
639 for (segment_hex, num_docs) in segments {
640 self.segment_manager
641 .register_segment(segment_hex, num_docs)
642 .await?;
643 }
644
645 self.maybe_build_vector_index().await?;
647
648 Ok(())
649 }
650
651 async fn do_merge(&self) -> Result<()> {
655 let segment_ids = self.segment_manager.get_segment_ids().await;
656
657 if segment_ids.len() < 2 {
658 return Ok(());
659 }
660
661 let ids_to_merge: Vec<String> = segment_ids;
662
663 let mut readers = Vec::new();
665 let mut doc_offset = 0u32;
666
667 for id_str in &ids_to_merge {
668 let segment_id = SegmentId::from_hex(id_str)
669 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
670 let reader = SegmentReader::open(
671 self.directory.as_ref(),
672 segment_id,
673 Arc::clone(&self.schema),
674 doc_offset,
675 self.config.term_cache_blocks,
676 )
677 .await?;
678 doc_offset += reader.meta().num_docs;
679 readers.push(reader);
680 }
681
682 let total_docs: u32 = readers.iter().map(|r| r.meta().num_docs).sum();
684
685 let merger = SegmentMerger::new(Arc::clone(&self.schema));
687 let new_segment_id = SegmentId::new();
688 merger
689 .merge(self.directory.as_ref(), &readers, new_segment_id)
690 .await?;
691
692 self.segment_manager
694 .replace_segments(vec![(new_segment_id.to_hex(), total_docs)], ids_to_merge)
695 .await?;
696
697 Ok(())
698 }
699
700 pub async fn force_merge(&self) -> Result<()> {
702 self.commit().await?;
704 self.wait_for_merges().await;
706 self.do_merge().await
708 }
709
710 }