1use std::sync::Arc;
33
34use rustc_hash::FxHashMap;
35
36use crate::directories::DirectoryWriter;
37use crate::dsl::{Document, Field, Schema};
38use crate::error::{Error, Result};
39use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
40use crate::tokenizer::BoxedTokenizer;
41
42use super::IndexConfig;
43
44const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
46
47pub struct IndexWriter<D: DirectoryWriter + 'static> {
59 pub(super) directory: Arc<D>,
60 pub(super) schema: Arc<Schema>,
61 pub(super) config: IndexConfig,
62 doc_sender: async_channel::Sender<Document>,
65 workers: Vec<std::thread::JoinHandle<()>>,
67 worker_state: Arc<WorkerState<D>>,
69 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
71 flushed_segments: Vec<(String, u32)>,
73}
74
75struct WorkerState<D: DirectoryWriter + 'static> {
77 directory: Arc<D>,
78 schema: Arc<Schema>,
79 builder_config: SegmentBuilderConfig,
80 tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
81 memory_budget_per_worker: usize,
83 segment_manager: Arc<crate::merge::SegmentManager<D>>,
85 built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93 }
94
95 pub async fn create_with_config(
97 directory: D,
98 schema: Schema,
99 config: IndexConfig,
100 builder_config: SegmentBuilderConfig,
101 ) -> Result<Self> {
102 let directory = Arc::new(directory);
103 let schema = Arc::new(schema);
104 let metadata = super::IndexMetadata::new((*schema).clone());
105
106 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
107 Arc::clone(&directory),
108 Arc::clone(&schema),
109 metadata,
110 config.merge_policy.clone_box(),
111 config.term_cache_blocks,
112 config.max_concurrent_merges,
113 ));
114 segment_manager.update_metadata(|_| {}).await?;
115
116 Ok(Self::new_with_parts(
117 directory,
118 schema,
119 config,
120 builder_config,
121 segment_manager,
122 ))
123 }
124
125 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
127 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
128 }
129
130 pub async fn open_with_config(
132 directory: D,
133 config: IndexConfig,
134 builder_config: SegmentBuilderConfig,
135 ) -> Result<Self> {
136 let directory = Arc::new(directory);
137 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
138 let schema = Arc::new(metadata.schema.clone());
139
140 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
141 Arc::clone(&directory),
142 Arc::clone(&schema),
143 metadata,
144 config.merge_policy.clone_box(),
145 config.term_cache_blocks,
146 config.max_concurrent_merges,
147 ));
148 segment_manager.load_and_publish_trained().await;
149
150 Ok(Self::new_with_parts(
151 directory,
152 schema,
153 config,
154 builder_config,
155 segment_manager,
156 ))
157 }
158
159 pub fn from_index(index: &super::Index<D>) -> Self {
162 Self::new_with_parts(
163 Arc::clone(&index.directory),
164 Arc::clone(&index.schema),
165 index.config.clone(),
166 SegmentBuilderConfig::default(),
167 Arc::clone(&index.segment_manager),
168 )
169 }
170
171 fn new_with_parts(
177 directory: Arc<D>,
178 schema: Arc<Schema>,
179 config: IndexConfig,
180 builder_config: SegmentBuilderConfig,
181 segment_manager: Arc<crate::merge::SegmentManager<D>>,
182 ) -> Self {
183 let num_workers = config.num_indexing_threads.max(1);
184 let worker_state = Arc::new(WorkerState {
185 directory: Arc::clone(&directory),
186 schema: Arc::clone(&schema),
187 builder_config,
188 tokenizers: parking_lot::RwLock::new(FxHashMap::default()),
189 memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
190 segment_manager: Arc::clone(&segment_manager),
191 built_segments: parking_lot::Mutex::new(Vec::new()),
192 });
193 let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
194
195 Self {
196 directory,
197 schema,
198 config,
199 doc_sender,
200 workers,
201 worker_state,
202 segment_manager,
203 flushed_segments: Vec::new(),
204 }
205 }
206
207 fn spawn_workers(
208 worker_state: &Arc<WorkerState<D>>,
209 num_workers: usize,
210 ) -> (
211 async_channel::Sender<Document>,
212 Vec<std::thread::JoinHandle<()>>,
213 ) {
214 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
215 let handle = tokio::runtime::Handle::current();
216 let mut workers = Vec::with_capacity(num_workers);
217 for i in 0..num_workers {
218 let state = Arc::clone(worker_state);
219 let rx = receiver.clone();
220 let rt = handle.clone();
221 workers.push(
222 std::thread::Builder::new()
223 .name(format!("index-worker-{}", i))
224 .spawn(move || Self::worker_loop(state, rx, rt))
225 .expect("failed to spawn index worker thread"),
226 );
227 }
228 (sender, workers)
229 }
230
231 pub fn schema(&self) -> &Schema {
233 &self.schema
234 }
235
236 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
239 self.worker_state
240 .tokenizers
241 .write()
242 .insert(field, Box::new(tokenizer));
243 }
244
245 pub fn add_document(&self, doc: Document) -> Result<()> {
250 self.doc_sender.try_send(doc).map_err(|e| match e {
251 async_channel::TrySendError::Full(_) => Error::QueueFull,
252 async_channel::TrySendError::Closed(_) => {
253 Error::Internal("Document channel closed".into())
254 }
255 })
256 }
257
258 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
263 let total = documents.len();
264 for (i, doc) in documents.into_iter().enumerate() {
265 match self.add_document(doc) {
266 Ok(()) => {}
267 Err(Error::QueueFull) => return Ok(i),
268 Err(e) => return Err(e),
269 }
270 }
271 Ok(total)
272 }
273
274 fn worker_loop(
287 state: Arc<WorkerState<D>>,
288 receiver: async_channel::Receiver<Document>,
289 handle: tokio::runtime::Handle,
290 ) {
291 let mut builder: Option<SegmentBuilder> = None;
292
293 while let Ok(doc) = receiver.recv_blocking() {
294 if builder.is_none() {
296 match SegmentBuilder::new(Arc::clone(&state.schema), state.builder_config.clone()) {
297 Ok(mut b) => {
298 for (field, tokenizer) in state.tokenizers.read().iter() {
299 b.set_tokenizer(*field, tokenizer.clone_box());
300 }
301 builder = Some(b);
302 }
303 Err(e) => {
304 log::error!("Failed to create segment builder: {:?}", e);
305 continue;
306 }
307 }
308 }
309
310 let b = builder.as_mut().unwrap();
311 if let Err(e) = b.add_document(doc) {
312 log::error!("Failed to index document: {:?}", e);
313 continue;
314 }
315
316 let builder_memory = b.estimated_memory_bytes();
317
318 if b.num_docs() & 0x3FFF == 0 {
319 log::debug!(
320 "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
321 b.num_docs(),
322 builder_memory as f64 / (1024.0 * 1024.0),
323 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
324 );
325 }
326
327 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
329
330 if builder_memory >= state.memory_budget_per_worker
331 && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH
332 {
333 log::info!(
334 "[indexing] memory budget reached, building segment: \
335 docs={}, memory={:.2} MB, budget={:.2} MB",
336 b.num_docs(),
337 builder_memory as f64 / (1024.0 * 1024.0),
338 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
339 );
340 let full_builder = builder.take().unwrap();
341 Self::build_segment_inline(&state, full_builder, &handle);
342 }
343 }
344
345 if let Some(b) = builder.take()
347 && b.num_docs() > 0
348 {
349 Self::build_segment_inline(&state, b, &handle);
350 }
351 }
352
353 fn build_segment_inline(
357 state: &WorkerState<D>,
358 builder: SegmentBuilder,
359 handle: &tokio::runtime::Handle,
360 ) {
361 let segment_id = SegmentId::new();
362 let segment_hex = segment_id.to_hex();
363 let trained = state.segment_manager.trained();
364 let doc_count = builder.num_docs();
365 let build_start = std::time::Instant::now();
366
367 log::info!(
368 "[segment_build] segment_id={} doc_count={} ann={}",
369 segment_hex,
370 doc_count,
371 trained.is_some()
372 );
373
374 match handle.block_on(builder.build(
375 state.directory.as_ref(),
376 segment_id,
377 trained.as_deref(),
378 )) {
379 Ok(meta) if meta.num_docs > 0 => {
380 let duration_ms = build_start.elapsed().as_millis() as u64;
381 log::info!(
382 "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
383 segment_hex,
384 meta.num_docs,
385 duration_ms,
386 );
387 state
388 .built_segments
389 .lock()
390 .push((segment_hex, meta.num_docs));
391 }
392 Ok(_) => {}
393 Err(e) => {
394 log::error!(
395 "[segment_build_failed] segment_id={} error={:?}",
396 segment_hex,
397 e
398 );
399 }
400 }
401 }
402
403 pub async fn maybe_merge(&self) {
409 self.segment_manager.maybe_merge().await;
410 }
411
412 pub async fn wait_for_merging_thread(&self) {
414 self.segment_manager.wait_for_merging_thread().await;
415 }
416
417 pub async fn wait_for_all_merges(&self) {
419 self.segment_manager.wait_for_all_merges().await;
420 }
421
422 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
424 self.segment_manager.tracker()
425 }
426
427 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
429 self.segment_manager.acquire_snapshot().await
430 }
431
432 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
434 self.segment_manager.cleanup_orphan_segments().await
435 }
436
437 pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
448 self.doc_sender.close();
450
451 let workers = std::mem::take(&mut self.workers);
453 tokio::task::spawn_blocking(move || {
454 for w in workers {
455 let _ = w.join();
456 }
457 })
458 .await
459 .map_err(|e| Error::Internal(format!("Failed to join workers: {}", e)))?;
460
461 let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
463 self.flushed_segments.extend(built);
464
465 Ok(PreparedCommit {
466 writer: self,
467 is_resolved: false,
468 })
469 }
470
471 pub async fn commit(&mut self) -> Result<()> {
476 self.prepare_commit().await?.commit().await
477 }
478
479 pub async fn force_merge(&mut self) -> Result<()> {
481 self.prepare_commit().await?.commit().await?;
482 self.segment_manager.force_merge().await
483 }
484
485 fn respawn_workers(&mut self) {
492 if tokio::runtime::Handle::try_current().is_err() {
493 return;
494 }
495 let num_workers = self.config.num_indexing_threads.max(1);
496 let (sender, workers) = Self::spawn_workers(&self.worker_state, num_workers);
497 self.doc_sender = sender;
498 self.workers = workers;
499 }
500
501 }
503
504impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
505 fn drop(&mut self) {
506 self.doc_sender.close();
507 for w in std::mem::take(&mut self.workers) {
508 let _ = w.join();
509 }
510 }
511}
512
513pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
520 writer: &'a mut IndexWriter<D>,
521 is_resolved: bool,
522}
523
524impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
525 pub async fn commit(mut self) -> Result<()> {
527 self.is_resolved = true;
528 let segments = std::mem::take(&mut self.writer.flushed_segments);
529 self.writer.segment_manager.commit(segments).await?;
530 self.writer.segment_manager.maybe_merge().await;
531 self.writer.respawn_workers();
532 Ok(())
533 }
534
535 pub fn abort(mut self) {
538 self.is_resolved = true;
539 self.writer.flushed_segments.clear();
540 self.writer.respawn_workers();
541 }
542}
543
544impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
545 fn drop(&mut self) {
546 if !self.is_resolved {
547 log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
548 self.writer.flushed_segments.clear();
549 self.writer.respawn_workers();
550 }
551 }
552}