1use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8use rustc_hash::FxHashMap;
9use tokio::sync::Mutex as AsyncMutex;
10use tokio::sync::{mpsc, oneshot};
11use tokio::task::JoinHandle;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
18use crate::tokenizer::BoxedTokenizer;
19
20use super::IndexConfig;
21
22enum WorkerMessage {
24 Document(Document),
26 Flush(oneshot::Sender<()>),
28}
29
30pub struct IndexWriter<D: DirectoryWriter + 'static> {
47 pub(super) directory: Arc<D>,
48 pub(super) schema: Arc<Schema>,
49 pub(super) config: IndexConfig,
50 #[allow(dead_code)]
51 builder_config: SegmentBuilderConfig,
52 tokenizers: FxHashMap<Field, BoxedTokenizer>,
53 worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
55 next_worker: AtomicUsize,
57 #[allow(dead_code)]
59 workers: Vec<JoinHandle<()>>,
60 worker_state: Arc<WorkerState<D>>,
62 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
64 flushed_segments: AsyncMutex<Vec<(String, u32)>>,
66}
67
68struct WorkerState<D: DirectoryWriter + 'static> {
70 directory: Arc<D>,
71 schema: Arc<Schema>,
72 config: IndexConfig,
73 builder_config: SegmentBuilderConfig,
74 tokenizers: FxHashMap<Field, BoxedTokenizer>,
75 pending_builds: Arc<AtomicUsize>,
77 build_semaphore: Arc<tokio::sync::Semaphore>,
80 build_handles: AsyncMutex<Vec<JoinHandle<(String, u32)>>>,
83 segment_manager: Arc<crate::merge::SegmentManager<D>>,
85}
86
87impl<D: DirectoryWriter + 'static> IndexWriter<D> {
88 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
90 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
91 }
92
93 pub async fn create_with_config(
95 directory: D,
96 schema: Schema,
97 config: IndexConfig,
98 builder_config: SegmentBuilderConfig,
99 ) -> Result<Self> {
100 let directory = Arc::new(directory);
101 let schema = Arc::new(schema);
102
103 let metadata = super::IndexMetadata::new((*schema).clone());
105
106 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
108 Arc::clone(&directory),
109 Arc::clone(&schema),
110 metadata,
111 config.merge_policy.clone_box(),
112 config.term_cache_blocks,
113 ));
114
115 segment_manager.update_metadata(|_| {}).await?;
117
118 let (worker_state, worker_senders, workers) = Self::spawn_workers(
119 &directory,
120 &schema,
121 &config,
122 &builder_config,
123 &segment_manager,
124 );
125
126 Ok(Self {
127 directory,
128 schema,
129 config,
130 builder_config,
131 tokenizers: FxHashMap::default(),
132 worker_senders,
133 next_worker: AtomicUsize::new(0),
134 workers,
135 worker_state,
136 segment_manager,
137 flushed_segments: AsyncMutex::new(Vec::new()),
138 })
139 }
140
141 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
143 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
144 }
145
146 pub async fn open_with_config(
148 directory: D,
149 config: IndexConfig,
150 builder_config: SegmentBuilderConfig,
151 ) -> Result<Self> {
152 let directory = Arc::new(directory);
153
154 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
156 let schema = Arc::new(metadata.schema.clone());
157
158 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
160 Arc::clone(&directory),
161 Arc::clone(&schema),
162 metadata,
163 config.merge_policy.clone_box(),
164 config.term_cache_blocks,
165 ));
166
167 segment_manager.load_and_publish_trained().await;
169
170 let (worker_state, worker_senders, workers) = Self::spawn_workers(
171 &directory,
172 &schema,
173 &config,
174 &builder_config,
175 &segment_manager,
176 );
177
178 Ok(Self {
179 directory,
180 schema,
181 config,
182 builder_config,
183 tokenizers: FxHashMap::default(),
184 worker_senders,
185 next_worker: AtomicUsize::new(0),
186 workers,
187 worker_state,
188 segment_manager,
189 flushed_segments: AsyncMutex::new(Vec::new()),
190 })
191 }
192
193 pub fn from_index(index: &super::Index<D>) -> Self {
198 let segment_manager = Arc::clone(&index.segment_manager);
199 let directory = Arc::clone(&index.directory);
200 let schema = Arc::clone(&index.schema);
201 let config = index.config.clone();
202 let builder_config = crate::segment::SegmentBuilderConfig::default();
203
204 let (worker_state, worker_senders, workers) = Self::spawn_workers(
205 &directory,
206 &schema,
207 &config,
208 &builder_config,
209 &segment_manager,
210 );
211
212 Self {
213 directory,
214 schema,
215 config,
216 builder_config,
217 tokenizers: FxHashMap::default(),
218 worker_senders,
219 next_worker: AtomicUsize::new(0),
220 workers,
221 worker_state,
222 segment_manager,
223 flushed_segments: AsyncMutex::new(Vec::new()),
224 }
225 }
226
227 #[allow(clippy::type_complexity)]
229 fn spawn_workers(
230 directory: &Arc<D>,
231 schema: &Arc<Schema>,
232 config: &IndexConfig,
233 builder_config: &SegmentBuilderConfig,
234 segment_manager: &Arc<crate::merge::SegmentManager<D>>,
235 ) -> (
236 Arc<WorkerState<D>>,
237 Vec<tokio::sync::mpsc::UnboundedSender<WorkerMessage>>,
238 Vec<JoinHandle<()>>,
239 ) {
240 let num_workers = config.num_indexing_threads.max(1);
241 let max_concurrent_builds = num_workers.div_ceil(2).max(1);
242
243 let worker_state = Arc::new(WorkerState {
244 directory: Arc::clone(directory),
245 schema: Arc::clone(schema),
246 config: config.clone(),
247 builder_config: builder_config.clone(),
248 tokenizers: FxHashMap::default(),
249 pending_builds: Arc::new(AtomicUsize::new(0)),
250 build_semaphore: Arc::new(tokio::sync::Semaphore::new(max_concurrent_builds)),
251 build_handles: AsyncMutex::new(Vec::new()),
252 segment_manager: Arc::clone(segment_manager),
253 });
254
255 let mut worker_senders = Vec::with_capacity(num_workers);
256 let mut workers = Vec::with_capacity(num_workers);
257
258 for _ in 0..num_workers {
259 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<WorkerMessage>();
260 worker_senders.push(tx);
261
262 let state = Arc::clone(&worker_state);
263 let handle = tokio::spawn(async move {
264 Self::worker_loop(state, rx).await;
265 });
266 workers.push(handle);
267 }
268
269 (worker_state, worker_senders, workers)
270 }
271
272 pub fn schema(&self) -> &Schema {
274 &self.schema
275 }
276
277 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
279 self.tokenizers.insert(field, Box::new(tokenizer));
280 }
281
282 pub fn add_document(&self, doc: Document) -> Result<DocId> {
288 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
290 self.worker_senders[idx]
291 .send(WorkerMessage::Document(doc))
292 .map_err(|_| Error::Internal("Document channel closed".into()))?;
293 Ok(0)
294 }
295
296 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
301 let num_workers = self.worker_senders.len();
302 let count = documents.len();
303 let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
304 for (i, doc) in documents.into_iter().enumerate() {
305 let idx = (base + i) % num_workers;
306 let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
307 }
308 Ok(count)
309 }
310
311 async fn worker_loop(
313 state: Arc<WorkerState<D>>,
314 mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
315 ) {
316 let mut builder: Option<SegmentBuilder> = None;
317 let mut doc_count = 0u32;
318
319 loop {
320 let msg = receiver.recv().await;
322
323 let Some(msg) = msg else {
324 if let Some(b) = builder.take()
326 && b.num_docs() > 0
327 {
328 Self::spawn_segment_build(&state, b).await;
329 }
330 return;
331 };
332
333 match msg {
334 WorkerMessage::Document(doc) => {
335 if builder.is_none() {
337 match SegmentBuilder::new(
338 (*state.schema).clone(),
339 state.builder_config.clone(),
340 ) {
341 Ok(mut b) => {
342 for (field, tokenizer) in &state.tokenizers {
343 b.set_tokenizer(*field, tokenizer.clone_box());
344 }
345 builder = Some(b);
346 }
347 Err(e) => {
348 log::error!("Failed to create segment builder: {:?}", e);
349 continue;
350 }
351 }
352 }
353
354 let b = builder.as_mut().unwrap();
356 if let Err(e) = b.add_document(doc) {
357 log::error!("Failed to index document: {:?}", e);
358 continue;
359 }
360
361 doc_count += 1;
362
363 if b.num_docs().is_multiple_of(1000) {
367 b.recalibrate_memory();
368 }
369
370 let in_flight = state.pending_builds.load(Ordering::Relaxed);
375 let num_workers = state.config.num_indexing_threads.max(1);
376 let effective_slots = num_workers * 2 + in_flight * 2;
377 let per_worker_limit = state.config.max_indexing_memory_bytes / effective_slots;
378 let builder_memory = b.estimated_memory_bytes();
379
380 if doc_count.is_multiple_of(10_000) {
382 log::debug!(
383 "[indexing] docs={}, memory={:.2} MB, limit={:.2} MB",
384 b.num_docs(),
385 builder_memory as f64 / (1024.0 * 1024.0),
386 per_worker_limit as f64 / (1024.0 * 1024.0)
387 );
388 }
389
390 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
393 let num_docs = b.num_docs();
394
395 if builder_memory >= per_worker_limit && num_docs >= MIN_DOCS_BEFORE_FLUSH {
396 let stats = b.stats();
398 let mb = stats.memory_breakdown;
399 log::info!(
400 "[indexing] flushing segment: docs={}, est_mem={:.2} MB, actual_mem={:.2} MB, \
401 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB, interner={:.2} MB, \
402 unique_terms={}, sparse_dims={}",
403 doc_count,
404 builder_memory as f64 / (1024.0 * 1024.0),
405 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
406 mb.postings_bytes as f64 / (1024.0 * 1024.0),
407 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
408 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
409 mb.interner_bytes as f64 / (1024.0 * 1024.0),
410 stats.unique_terms,
411 b.sparse_dim_count(),
412 );
413 let full_builder = builder.take().unwrap();
414 Self::spawn_segment_build(&state, full_builder).await;
415 doc_count = 0;
416 }
417 }
418 WorkerMessage::Flush(respond) => {
419 if let Some(b) = builder.take()
421 && b.num_docs() > 0
422 {
423 let stats = b.stats();
425 let mb = stats.memory_breakdown;
426 log::info!(
427 "[indexing_flush] docs={}, total_mem={:.2} MB, \
428 postings={:.2} MB, sparse={:.2} MB, dense={:.2} MB ({} vectors), \
429 interner={:.2} MB, positions={:.2} MB, unique_terms={}",
430 b.num_docs(),
431 stats.estimated_memory_bytes as f64 / (1024.0 * 1024.0),
432 mb.postings_bytes as f64 / (1024.0 * 1024.0),
433 mb.sparse_vectors_bytes as f64 / (1024.0 * 1024.0),
434 mb.dense_vectors_bytes as f64 / (1024.0 * 1024.0),
435 mb.dense_vector_count,
436 mb.interner_bytes as f64 / (1024.0 * 1024.0),
437 mb.position_index_bytes as f64 / (1024.0 * 1024.0),
438 stats.unique_terms,
439 );
440 Self::spawn_segment_build(&state, b).await;
441 }
442 doc_count = 0;
443 let _ = respond.send(());
445 }
446 }
447 }
448 }
449 async fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
450 let permit = state.build_semaphore.clone().acquire_owned().await.unwrap();
453
454 let directory = Arc::clone(&state.directory);
455 let segment_id = SegmentId::new();
456 let segment_hex = segment_id.to_hex();
457 let pending_builds = Arc::clone(&state.pending_builds);
458
459 let trained = state.segment_manager.trained();
461
462 let doc_count = builder.num_docs();
463 let memory_bytes = builder.estimated_memory_bytes();
464
465 log::info!(
466 "[segment_build_started] segment_id={} doc_count={} memory_bytes={} ann={}",
467 segment_hex,
468 doc_count,
469 memory_bytes,
470 trained.is_some()
471 );
472
473 pending_builds.fetch_add(1, Ordering::SeqCst);
474
475 let handle = tokio::spawn(async move {
476 let _permit = permit; let build_start = std::time::Instant::now();
478 let result = match builder
479 .build(directory.as_ref(), segment_id, trained.as_deref())
480 .await
481 {
482 Ok(meta) => {
483 let build_duration_ms = build_start.elapsed().as_millis() as u64;
484 log::info!(
485 "[segment_build_completed] segment_id={} doc_count={} duration_ms={}",
486 segment_hex,
487 meta.num_docs,
488 build_duration_ms
489 );
490 (segment_hex, meta.num_docs)
491 }
492 Err(e) => {
493 log::error!(
494 "[segment_build_failed] segment_id={} error={:?}",
495 segment_hex,
496 e
497 );
498 (segment_hex, 0)
499 }
500 };
501 pending_builds.fetch_sub(1, Ordering::SeqCst);
502 result
503 });
504
505 state.build_handles.lock().await.push(handle);
507 }
508
509 pub fn pending_build_count(&self) -> usize {
511 self.worker_state.pending_builds.load(Ordering::SeqCst)
512 }
513
514 pub async fn maybe_merge(&self) {
519 self.segment_manager.maybe_merge().await;
520 }
521
522 pub async fn wait_for_merges(&self) {
524 self.segment_manager.wait_for_merges().await;
525 }
526
527 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
530 self.segment_manager.tracker()
531 }
532
533 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
536 self.segment_manager.acquire_snapshot().await
537 }
538
539 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
547 self.segment_manager.cleanup_orphan_segments().await
548 }
549
550 pub async fn flush(&self) -> Result<()> {
559 let mut responses = Vec::with_capacity(self.worker_senders.len());
561
562 for sender in &self.worker_senders {
563 let (tx, rx) = oneshot::channel();
564 if sender.send(WorkerMessage::Flush(tx)).is_err() {
565 continue;
566 }
567 responses.push(rx);
568 }
569
570 for rx in responses {
574 let _ = rx.await;
575 }
576
577 let handles: Vec<JoinHandle<(String, u32)>> =
580 std::mem::take(&mut *self.worker_state.build_handles.lock().await);
581 for handle in handles {
582 match handle.await {
583 Ok((segment_hex, num_docs)) if num_docs > 0 => {
584 self.flushed_segments
585 .lock()
586 .await
587 .push((segment_hex, num_docs));
588 }
589 Ok(_) => {} Err(e) => log::error!("[flush] build task panicked: {:?}", e),
591 }
592 }
593
594 Ok(())
595 }
596
597 pub async fn commit(&self) -> Result<()> {
602 self.flush().await?;
604
605 let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
607 self.segment_manager.commit(segments).await?;
608
609 self.segment_manager.maybe_merge().await;
611
612 Ok(())
613 }
614
615 pub async fn force_merge(&self) -> Result<()> {
619 self.flush().await?;
621 let segments = std::mem::take(&mut *self.flushed_segments.lock().await);
622 self.segment_manager.commit(segments).await?;
623 self.segment_manager.force_merge().await
624 }
625
626 }