1use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use rustc_hash::FxHashMap;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::{mpsc, oneshot};
12use tokio::task::JoinHandle;
13
14use crate::DocId;
15use crate::directories::DirectoryWriter;
16use crate::dsl::{Document, Field, Schema};
17use crate::error::{Error, Result};
18use crate::segment::{
19 SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
20};
21use crate::tokenizer::BoxedTokenizer;
22
23use super::IndexConfig;
24
25enum WorkerMessage {
27 Document(Document),
29 Flush(oneshot::Sender<()>),
31}
32
33pub struct IndexWriter<D: DirectoryWriter + 'static> {
50 pub(super) directory: Arc<D>,
51 pub(super) schema: Arc<Schema>,
52 pub(super) config: IndexConfig,
53 #[allow(dead_code)] builder_config: SegmentBuilderConfig,
55 tokenizers: FxHashMap<Field, BoxedTokenizer>,
56 worker_senders: Vec<mpsc::UnboundedSender<WorkerMessage>>,
58 next_worker: AtomicUsize,
60 #[allow(dead_code)]
62 workers: Vec<JoinHandle<()>>,
63 #[allow(dead_code)]
65 worker_state: Arc<WorkerState<D>>,
66 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
68 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
70 pending_builds: Arc<AtomicUsize>,
72 #[allow(dead_code)]
74 global_memory_bytes: Arc<AtomicUsize>,
75}
76
77struct WorkerState<D: DirectoryWriter + 'static> {
79 directory: Arc<D>,
80 schema: Arc<Schema>,
81 config: IndexConfig,
82 builder_config: SegmentBuilderConfig,
83 tokenizers: FxHashMap<Field, BoxedTokenizer>,
84 segment_id_sender: mpsc::UnboundedSender<String>,
85 segment_manager: Arc<crate::merge::SegmentManager<D>>,
86 pending_builds: Arc<AtomicUsize>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93 }
94
95 pub async fn create_with_config(
97 directory: D,
98 schema: Schema,
99 config: IndexConfig,
100 builder_config: SegmentBuilderConfig,
101 ) -> Result<Self> {
102 let directory = Arc::new(directory);
103 let schema = Arc::new(schema);
104
105 let schema_bytes =
107 serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
108 directory
109 .write(Path::new("schema.json"), &schema_bytes)
110 .await?;
111
112 let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
114 .map_err(|e| Error::Serialization(e.to_string()))?;
115 directory
116 .write(Path::new("segments.json"), &segments_bytes)
117 .await?;
118
119 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
121
122 let metadata = super::IndexMetadata::new();
124
125 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
127 Arc::clone(&directory),
128 Arc::clone(&schema),
129 metadata,
130 config.merge_policy.clone_box(),
131 config.term_cache_blocks,
132 ));
133
134 segment_manager.update_metadata(|_| {}).await?;
136
137 let pending_builds = Arc::new(AtomicUsize::new(0));
138 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
139
140 let worker_state = Arc::new(WorkerState {
142 directory: Arc::clone(&directory),
143 schema: Arc::clone(&schema),
144 config: config.clone(),
145 builder_config: builder_config.clone(),
146 tokenizers: FxHashMap::default(),
147 segment_id_sender,
148 segment_manager: Arc::clone(&segment_manager),
149 pending_builds: Arc::clone(&pending_builds),
150 });
151
152 let num_workers = config.num_indexing_threads.max(1);
154 let mut worker_senders = Vec::with_capacity(num_workers);
155 let mut workers = Vec::with_capacity(num_workers);
156
157 for _ in 0..num_workers {
158 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
159 worker_senders.push(tx);
160
161 let state = Arc::clone(&worker_state);
162 let handle = tokio::spawn(async move {
163 Self::worker_loop(state, rx).await;
164 });
165 workers.push(handle);
166 }
167
168 Ok(Self {
169 directory,
170 schema,
171 config,
172 builder_config,
173 tokenizers: FxHashMap::default(),
174 worker_senders,
175 next_worker: AtomicUsize::new(0),
176 workers,
177 worker_state,
178 segment_manager,
179 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
180 pending_builds,
181 global_memory_bytes,
182 })
183 }
184
185 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
187 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
188 }
189
190 pub async fn open_with_config(
192 directory: D,
193 config: IndexConfig,
194 builder_config: SegmentBuilderConfig,
195 ) -> Result<Self> {
196 let directory = Arc::new(directory);
197
198 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
200 let schema_bytes = schema_slice.read_bytes().await?;
201 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
202 .map_err(|e| Error::Serialization(e.to_string()))?;
203 let schema = Arc::new(schema);
204
205 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
207
208 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
210
211 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
213 Arc::clone(&directory),
214 Arc::clone(&schema),
215 metadata,
216 config.merge_policy.clone_box(),
217 config.term_cache_blocks,
218 ));
219
220 let pending_builds = Arc::new(AtomicUsize::new(0));
221 let global_memory_bytes = Arc::new(AtomicUsize::new(0));
222
223 let worker_state = Arc::new(WorkerState {
225 directory: Arc::clone(&directory),
226 schema: Arc::clone(&schema),
227 config: config.clone(),
228 builder_config: builder_config.clone(),
229 tokenizers: FxHashMap::default(),
230 segment_id_sender,
231 segment_manager: Arc::clone(&segment_manager),
232 pending_builds: Arc::clone(&pending_builds),
233 });
234
235 let num_workers = config.num_indexing_threads.max(1);
237 let mut worker_senders = Vec::with_capacity(num_workers);
238 let mut workers = Vec::with_capacity(num_workers);
239
240 for _ in 0..num_workers {
241 let (tx, rx) = mpsc::unbounded_channel::<WorkerMessage>();
242 worker_senders.push(tx);
243
244 let state = Arc::clone(&worker_state);
245 let handle = tokio::spawn(async move {
246 Self::worker_loop(state, rx).await;
247 });
248 workers.push(handle);
249 }
250
251 Ok(Self {
252 directory,
253 schema,
254 config,
255 builder_config,
256 tokenizers: FxHashMap::default(),
257 worker_senders,
258 next_worker: AtomicUsize::new(0),
259 workers,
260 worker_state,
261 segment_manager,
262 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
263 pending_builds,
264 global_memory_bytes,
265 })
266 }
267
268 pub fn schema(&self) -> &Schema {
270 &self.schema
271 }
272
273 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
275 self.tokenizers.insert(field, Box::new(tokenizer));
276 }
277
278 pub fn add_document(&self, doc: Document) -> Result<DocId> {
284 let idx = self.next_worker.fetch_add(1, Ordering::Relaxed) % self.worker_senders.len();
286 self.worker_senders[idx]
287 .send(WorkerMessage::Document(doc))
288 .map_err(|_| Error::Internal("Document channel closed".into()))?;
289 Ok(0)
290 }
291
292 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
297 let num_workers = self.worker_senders.len();
298 let count = documents.len();
299 let base = self.next_worker.fetch_add(count, Ordering::Relaxed);
300 for (i, doc) in documents.into_iter().enumerate() {
301 let idx = (base + i) % num_workers;
302 let _ = self.worker_senders[idx].send(WorkerMessage::Document(doc));
303 }
304 Ok(count)
305 }
306
307 async fn worker_loop(
309 state: Arc<WorkerState<D>>,
310 mut receiver: mpsc::UnboundedReceiver<WorkerMessage>,
311 ) {
312 let mut builder: Option<SegmentBuilder> = None;
313 let mut doc_count = 0u32;
314
315 loop {
316 let msg = receiver.recv().await;
318
319 let Some(msg) = msg else {
320 if let Some(b) = builder.take()
322 && b.num_docs() > 0
323 {
324 Self::spawn_segment_build(&state, b);
325 }
326 return;
327 };
328
329 match msg {
330 WorkerMessage::Document(doc) => {
331 if builder.is_none() {
333 match SegmentBuilder::new(
334 (*state.schema).clone(),
335 state.builder_config.clone(),
336 ) {
337 Ok(mut b) => {
338 for (field, tokenizer) in &state.tokenizers {
339 b.set_tokenizer(*field, tokenizer.clone_box());
340 }
341 builder = Some(b);
342 }
343 Err(e) => {
344 eprintln!("Failed to create segment builder: {:?}", e);
345 continue;
346 }
347 }
348 }
349
350 let b = builder.as_mut().unwrap();
352 if let Err(e) = b.add_document(doc) {
353 eprintln!("Failed to index document: {:?}", e);
354 continue;
355 }
356
357 doc_count += 1;
358
359 let per_worker_limit = state.config.max_indexing_memory_bytes
362 / state.config.num_indexing_threads.max(1);
363 let check_interval = if per_worker_limit < 1024 * 1024 {
364 1
365 } else {
366 100
367 };
368
369 if doc_count.is_multiple_of(check_interval) {
370 let builder_memory = b.stats().estimated_memory_bytes;
371
372 if builder_memory >= per_worker_limit {
373 let full_builder = builder.take().unwrap();
374 Self::spawn_segment_build(&state, full_builder);
375 doc_count = 0;
376 }
377 }
378 }
379 WorkerMessage::Flush(respond) => {
380 if let Some(b) = builder.take()
382 && b.num_docs() > 0
383 {
384 Self::spawn_segment_build(&state, b);
385 }
386 doc_count = 0;
387 let _ = respond.send(());
389 }
390 }
391 }
392 }
393 fn spawn_segment_build(state: &Arc<WorkerState<D>>, builder: SegmentBuilder) {
394 let directory = Arc::clone(&state.directory);
395 let segment_id = SegmentId::new();
396 let segment_hex = segment_id.to_hex();
397 let sender = state.segment_id_sender.clone();
398 let segment_manager = Arc::clone(&state.segment_manager);
399 let pending_builds = Arc::clone(&state.pending_builds);
400
401 pending_builds.fetch_add(1, Ordering::SeqCst);
402
403 tokio::spawn(async move {
404 match builder.build(directory.as_ref(), segment_id).await {
405 Ok(_) => {
406 let _ = segment_manager.register_segment(segment_hex.clone()).await;
407 }
408 Err(e) => {
409 eprintln!("Background segment build failed: {:?}", e);
410 }
411 }
412 let _ = sender.send(segment_hex);
415 pending_builds.fetch_sub(1, Ordering::SeqCst);
416 });
417 }
418
419 async fn collect_completed_segments(&self) {
421 let mut receiver = self.segment_id_receiver.lock().await;
422 while receiver.try_recv().is_ok() {
423 }
425 }
426
427 pub fn pending_build_count(&self) -> usize {
429 self.pending_builds.load(Ordering::SeqCst)
430 }
431
432 pub fn pending_merge_count(&self) -> usize {
434 self.segment_manager.pending_merge_count()
435 }
436
437 pub async fn maybe_merge(&self) {
442 self.segment_manager.maybe_merge().await;
443 }
444
445 pub async fn wait_for_merges(&self) {
447 self.segment_manager.wait_for_merges().await;
448 }
449
450 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
458 self.segment_manager.cleanup_orphan_segments().await
459 }
460
461 pub async fn flush(&self) -> Result<()> {
466 let mut responses = Vec::with_capacity(self.worker_senders.len());
468
469 for sender in &self.worker_senders {
470 let (tx, rx) = oneshot::channel();
471 if sender.send(WorkerMessage::Flush(tx)).is_err() {
472 continue;
474 }
475 responses.push(rx);
476 }
477
478 for rx in responses {
480 let _ = rx.await;
481 }
482
483 self.collect_completed_segments().await;
485
486 Ok(())
487 }
488
489 pub async fn commit(&self) -> Result<()> {
496 self.flush().await?;
498
499 let mut receiver = self.segment_id_receiver.lock().await;
501 while self.pending_builds.load(Ordering::SeqCst) > 0 {
502 if receiver.recv().await.is_none() {
503 break; }
505 }
506 drop(receiver);
507
508 self.maybe_build_vector_index().await?;
510
511 Ok(())
512 }
513
514 async fn do_merge(&self) -> Result<()> {
518 let segment_ids = self.segment_manager.get_segment_ids().await;
519
520 if segment_ids.len() < 2 {
521 return Ok(());
522 }
523
524 let ids_to_merge: Vec<String> = segment_ids.clone();
525 drop(segment_ids);
526
527 let mut readers = Vec::new();
529 let mut doc_offset = 0u32;
530
531 for id_str in &ids_to_merge {
532 let segment_id = SegmentId::from_hex(id_str)
533 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
534 let reader = SegmentReader::open(
535 self.directory.as_ref(),
536 segment_id,
537 Arc::clone(&self.schema),
538 doc_offset,
539 self.config.term_cache_blocks,
540 )
541 .await?;
542 doc_offset += reader.meta().num_docs;
543 readers.push(reader);
544 }
545
546 let merger = SegmentMerger::new(Arc::clone(&self.schema));
548 let new_segment_id = SegmentId::new();
549 merger
550 .merge(self.directory.as_ref(), &readers, new_segment_id)
551 .await?;
552
553 self.segment_manager
555 .replace_segments(vec![new_segment_id.to_hex()], ids_to_merge)
556 .await?;
557
558 Ok(())
559 }
560
561 pub async fn force_merge(&self) -> Result<()> {
563 self.commit().await?;
565 self.do_merge().await
567 }
568
569 }