1use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18 SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24enum WorkerMessage {
26 Document(Document),
28 Flush(oneshot::Sender<()>),
30}
31
32pub struct IndexWriter<D: DirectoryWriter + 'static> {
49 pub(super) directory: Arc<D>,
50 pub(super) schema: Arc<Schema>,
51 pub(super) config: IndexConfig,
52 #[allow(dead_code)] builder_config: SegmentBuilderConfig,
54 tokenizers: FxHashMap<Field, BoxedTokenizer>,
55 worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
57 next_worker: AtomicUsize,
59 #[allow(dead_code)]
61 workers: Vec<JoinHandle<()>>,
62 #[allow(dead_code)]
64 worker_state: Arc<WorkerState<D>>,
65 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
67 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
69 pending_builds: Arc<AtomicUsize>,
71 #[allow(dead_code)]
73 global_memory_bytes: Arc<AtomicUsize>,
74}
75
76struct WorkerState<D: DirectoryWriter + 'static> {
78 directory: Arc<D>,
79 schema: Arc<Schema>,
80 config: IndexConfig,
81 builder_config: SegmentBuilderConfig,
82 tokenizers: FxHashMap<Field, BoxedTokenizer>,
83 segment_id_sender: mpsc::UnboundedSender<String>,
84 segment_manager: Arc<crate::merge::SegmentManager<D>>,
85 pending_builds: Arc<AtomicUsize>,
86}
87
88impl<D: DirectoryWriter + 'static> IndexWriter<D> {
89 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
91 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
92 }
93
94 pub async fn create_with_config(
96 directory: D,
97 schema: Schema,
98 config: IndexConfig,
99 builder_config: SegmentBuilderConfig,
100 ) -> Result<Self> {
101 let directory = Arc::new(directory);
102 let schema = Arc::new(schema);
103
104 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
106
107 let metadata = super::IndexMetadata::new((*schema).clone());
109
110 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
112 Arc::clone(&directory),
113 Arc::clone(&schema),
114 metadata,
115 config.merge_policy.clone_box(),
116 config.term_cache_blocks,
117 ));
118
119 segment_manager.update_metadata(|_| {}).await?;
121
122 let pending_builds = Arc::new(AtomicUsize::new(0));
123 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
124
125 let worker_state = Arc::new(WorkerState {
127 directory: Arc::clone(&directory),
128 schema: Arc::clone(&schema),
129 config: config.clone(),
130 builder_config: builder_config.clone(),
131 tokenizers: FxHashMap::default(),
132 segment_id_sender,
133 segment_manager: Arc::clone(&segment_manager),
134 pending_builds: Arc::clone(&pending_builds),
135 });
136
137 let num_workers = config.num_indexing_threads.max(1);
139 let mut worker_senders = Vec::with_capacity(num_workers);
140 let mut workers = Vec::with_capacity(num_workers);
141
142 for _ in 0..num_workers {
143 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
144 worker_senders.push(tx);
145
146 let state = Arc::clone(&worker_state);
147 let handle = tokio::spawn(async move {
148 Self::worker_loop(state, rx).await;
149 });
150 workers.push(handle);
151 }
152
153 Ok(Self {
154 directory,
155 schema,
156 config,
157 builder_config,
158 tokenizers: FxHashMap::default(),
159 worker_senders,
160 next_worker: AtomicUsize::new(0),
161 workers,
162 worker_state,
163 segment_manager,
164 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
165 pending_builds,
166 global_memory_bytes,
167 })
168 }
169
170 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
172 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
173 }
174
175 pub async fn open_with_config(
177 directory: D,
178 config: IndexConfig,
179 builder_config: SegmentBuilderConfig,
180 ) -> Result<Self> {
181 let directory = Arc::new(directory);
182
183 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
185 let schema = Arc::new(metadata.schema.clone());
186
187 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
189
190 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
192 Arc::clone(&directory),
193 Arc::clone(&schema),
194 metadata,
195 config.merge_policy.clone_box(),
196 config.term_cache_blocks,
197 ));
198
199 let pending_builds = Arc::new(AtomicUsize::new(0));
200 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
201
202 let worker_state = Arc::new(WorkerState {
204 directory: Arc::clone(&directory),
205 schema: Arc::clone(&schema),
206 config: config.clone(),
207 builder_config: builder_config.clone(),
208 tokenizers: FxHashMap::default(),
209 segment_id_sender,
210 segment_manager: Arc::clone(&segment_manager),
211 pending_builds: Arc::clone(&pending_builds),
212 });
213
214 let num_workers = config.num_indexing_threads.max(1);
216 let mut worker_senders = Vec::with_capacity(num_workers);
217 let mut workers = Vec::with_capacity(num_workers);
218
219 for _ in 0..num_workers {
220 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
221 worker_senders.push(tx);
222
223 let state = Arc::clone(&worker_state);
224 let handle = tokio::spawn(async move {
225 Self::worker_loop(state, rx).await;
226 });
227 workers.push(handle);
228 }
229
230 Ok(Self {
231 directory,
232 schema,
233 config,
234 builder_config,
235 tokenizers: FxHashMap::default(),
236 worker_senders,
237 next_worker: AtomicUsize::new(0),
238 workers,
239 worker_state,
240 segment_manager,
241 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
242 pending_builds,
243 global_memory_bytes,
244 })
245 }
246
247 pub fn from_index(index: &super::Index<D>) -> Self {
252 let segment_manager = Arc::clone(&index.segment_manager);
253 let directory = Arc::clone(&index.directory);
254 let schema = Arc::clone(&index.schema);
255 let config = index.config.clone();
256 let builder_config = crate::segment::SegmentBuilderConfig::default();
257
258 let (segment_id_sender, segment_id_receiver) = tokio::sync::mpsc::unbounded_channel();
260
261 let pending_builds = Arc::new(AtomicUsize::new(0));
262 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
263
264 let worker_state = Arc::new(WorkerState {
266 directory: Arc::clone(&directory),
267 schema: Arc::clone(&schema),
268 config: config.clone(),
269 builder_config: builder_config.clone(),
270 tokenizers: FxHashMap::default(),
271 segment_id_sender,
272 segment_manager: Arc::clone(&segment_manager),
273 pending_builds: Arc::clone(&pending_builds),
274 });
275
276 let num_workers = config.num_indexing_threads.max(1);
278 let mut worker_senders = Vec::with_capacity(num_workers);
279 let mut workers = Vec::with_capacity(num_workers);
280
281 for _ in 0..num_workers {
282 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
283 worker_senders.push(tx);
284
285 let state = Arc::clone(&worker_state);
286 let handle = tokio::spawn(async move {
287 Self::worker_loop(state, rx).await;
288 });
289 workers.push(handle);
290 }
291
292 Self {
293 directory,
294 schema,
295 config,
296 builder_config,
297 tokenizers: FxHashMap::default(),
298 worker_senders,
299 next_worker: AtomicUsize::new(0),
300 workers,
301 worker_state,
302 segment_manager,
303 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
304 pending_builds,
305 global_memory_bytes,
306 }
307 }
308
309 pub fn schema(&self) -> &Schema {
311 &self.schema
312 }
313
314 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
316 self.tokenizers.insert(field, Box::new(tokenizer));
317 }
318
319 pub fn add_document(&self, doc: Document) -> Result<DocId> {
325 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
327 self.worker_senders[idx]
328 .send(WorkerMessage::Document(doc))
329 .map_err(|_| Error::Internal("Document channel closed".into()))?;
330 Ok(0)
331 }
332
333 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
338 let num_workers = self.worker_senders.len();
339 let count = documents.len();
340 let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
341 for (i, doc) in documents.into_iter().enumerate() {
342 let idx = (base + i) % num_workers;
343 let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
344 }
345 Ok(count)
346 }
347
348 async fn worker_loop(
350 state: Arc<WorkerState<D>>,
351 mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
352 ) {
353 let mut builder: Option<SegmentBuilder> = None;
354 let mut _doc_count = 0u32;
355
356 loop {
357 let msg = receiver.recv().await;
359
360 let Some(msg) = msg else {
361 if let Some(b) = builder.take()
363 && b.num_docs() > 0
364 {
365 Self::spawn_segment_build(&state, b);
366 }
367 return;
368 };
369
370 match msg {
371 WorkerMessage::Document(doc) => {
372 if builder.is_none() {
374 match SegmentBuilder::new(
375 (*state.schema).clone(),
376 state.builder_config.clone(),
377 ) {
378 Ok(mut b) => {
379 for (field, tokenizer) in &state.tokenizers {
380 b.set_tokenizer(*field, tokenizer.clone_box());
381 }
382 builder = Some(b);
383 }
384 Err(e) => {
385 eprintln!("Failed to create segment builder: {:?}", e);
386 continue;
387 }
388 }
389 }
390
391 let b = builder.as_mut().unwrap();
393 if let Err(e) = b.add_document(doc) {
394 eprintln!("Failed to index document: {:?}", e);
395 continue;
396 }
397
398 _doc_count += 1;
399
400 let per_worker_limit = state.config.max_indexing_memory_bytes
402 / state.config.num_indexing_threads.max(1);
403 let builder_memory = b.estimated_memory_bytes();
404
405 if _doc_count.is_multiple_of(10_000) {
407 log::debug!(
408 "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
409 b.num_docs(),
410 builder_memory as f64 / (1024.0 * 1024.0),
411 per_worker_limit as f64 / (1024.0 * 1024.0)
412 );
413 }
414
415 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
418 let doc_count = b.num_docs();
419
420 if builder_memory >= per_worker_limit && doc_count >= MIN_DOCS_BEFORE_FLUSH {
421 let stats = b.stats();
423 let mb = stats.memory_breakdown;
424 log::info!(
425 "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
426 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
427 unique_terms={}, sparse_dims={}",
428 doc_count,
429 builder_memory as f64 / (1024.0 * 1024.0),
430 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
431 mb.postings_bytes as f64 / (1024.0 * 1024.0),
432 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
433 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
434 mb.interner_bytes as f64 / (1024.0 * 1024.0),
435 stats.unique_terms,
436 b.sparse_dim_count(),
437 );
438 let full_builder = builder.take().unwrap();
439 Self::spawn_segment_build(&state, full_builder);
440 _doc_count = 0;
441 }
442 }
443 WorkerMessage::Flush(respond) => {
444 if let Some(b) = builder.take()
446 && b.num_docs() > 0
447 {
448 Self::spawn_segment_build(&state, b);
449 }
450 _doc_count = 0;
451 let _ = respond.send(());
453 }
454 }
455 }
456 }
457 fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
458 let directory = Arc::clone(&state.directory);
459 let segment_id = SegmentId::new();
460 let segment_hex = segment_id.to_hex();
461 let sender = state.segment_id_sender.clone();
462 let segment_manager = Arc::clone(&state.segment_manager);
463 let pending_builds = Arc::clone(&state.pending_builds);
464
465 pending_builds.fetch_add(1, Ordering::SeqCst);
466
467 tokio::spawn(async move {
468 match builder.build(directory.as_ref(), segment_id).await {
469 Ok(_) => {
470 let _ = segment_manager.register_segment(segment_hex.clone()).await;
471 }
472 Err(e) => {
473 eprintln!("Background segment build failed: {:?}", e);
474 }
475 }
476 let _ = sender.send(segment_hex);
479 pending_builds.fetch_sub(1, Ordering::SeqCst);
480 });
481 }
482
483 async fn collect_completed_segments(&self) {
485 let mut receiver = self.segment_id_receiver.lock().await;
486 while receiver.try_recv().is_ok() {
487 }
489 }
490
491 pub fn pending_build_count(&self) -> usize {
493 self.pending_builds.load(Ordering::SeqCst)
494 }
495
496 pub fn pending_merge_count(&self) -> usize {
498 self.segment_manager.pending_merge_count()
499 }
500
501 pub async fn maybe_merge(&self) {
506 self.segment_manager.maybe_merge().await;
507 }
508
509 pub async fn wait_for_merges(&self) {
511 self.segment_manager.wait_for_merges().await;
512 }
513
514 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
517 self.segment_manager.tracker()
518 }
519
520 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot<D> {
523 self.segment_manager.acquire_snapshot().await
524 }
525
526 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
534 self.segment_manager.cleanup_orphan_segments().await
535 }
536
537 pub async fn flush(&self) -> Result<()> {
542 let mut responses = Vec::with_capacity(self.worker_senders.len());
544
545 for sender in &self.worker_senders {
546 let (tx, rx) = oneshot::channel();
547 if sender.send(WorkerMessage::Flush(tx)).is_err() {
548 continue;
550 }
551 responses.push(rx);
552 }
553
554 for rx in responses {
556 let _ = rx.await;
557 }
558
559 self.collect_completed_segments().await;
561
562 Ok(())
563 }
564
565 pub async fn commit(&self) -> Result<()> {
572 self.flush().await?;
574
575 let mut receiver = self.segment_id_receiver.lock().await;
577 while self.pending_builds.load(Ordering::SeqCst) > 0 {
578 if receiver.recv().await.is_none() {
579 break; }
581 }
582 drop(receiver);
583
584 self.maybe_build_vector_index().await?;
586
587 Ok(())
588 }
589
590 async fn do_merge(&self) -> Result<()> {
594 let segment_ids = self.segment_manager.get_segment_ids().await;
595
596 if segment_ids.len() < 2 {
597 return Ok(());
598 }
599
600 let ids_to_merge: Vec<String> = segment_ids.clone();
601 drop(segment_ids);
602
603 let mut readers = Vec::new();
605 let mut doc_offset = 0u32;
606
607 for id_str in &ids_to_merge {
608 let segment_id = SegmentId::from_hex(id_str)
609 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
610 let reader = SegmentReader::open(
611 self.directory.as_ref(),
612 segment_id,
613 Arc::clone(&self.schema),
614 doc_offset,
615 self.config.term_cache_blocks,
616 )
617 .await?;
618 doc_offset += reader.meta().num_docs;
619 readers.push(reader);
620 }
621
622 let merger = SegmentMerger::new(Arc::clone(&self.schema));
624 let new_segment_id = SegmentId::new();
625 merger
626 .merge(self.directory.as_ref(), &readers, new_segment_id)
627 .await?;
628
629 self.segment_manager
631 .replace_segments(vec![new_segment_id.to_hex()], ids_to_merge)
632 .await?;
633
634 Ok(())
635 }
636
637 pub async fn force_merge(&self) -> Result<()> {
639 self.commit().await?;
641 self.wait_for_merges().await;
643 self.do_merge().await
645 }
646
647 }