1use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use rustc_hash::FxHashMap;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::mpsc;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18 SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24pub struct IndexWriter<D: DirectoryWriter + 'static> {
36 pub(super) directory: Arc<D>,
37 pub(super) schema: Arc<Schema>,
38 pub(super) config: IndexConfig,
39 builder_config: SegmentBuilderConfig,
40 tokenizers: FxHashMap<Field, BoxedTokenizer>,
41 builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
43 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
45 segment_id_sender: mpsc::UnboundedSender<String>,
47 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
49 pending_builds: Arc<AtomicUsize>,
51}
52
53impl<D: DirectoryWriter + 'static> IndexWriter<D> {
54 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
56 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
57 }
58
59 pub async fn create_with_config(
61 directory: D,
62 schema: Schema,
63 config: IndexConfig,
64 builder_config: SegmentBuilderConfig,
65 ) -> Result<Self> {
66 let directory = Arc::new(directory);
67 let schema = Arc::new(schema);
68
69 let schema_bytes =
71 serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
72 directory
73 .write(Path::new("schema.json"), &schema_bytes)
74 .await?;
75
76 let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
78 .map_err(|e| Error::Serialization(e.to_string()))?;
79 directory
80 .write(Path::new("segments.json"), &segments_bytes)
81 .await?;
82
83 let num_builders = config.num_indexing_threads.max(1);
85 let mut builders = Vec::with_capacity(num_builders);
86 for _ in 0..num_builders {
87 builders.push(AsyncMutex::new(None));
88 }
89
90 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
92
93 let metadata = super::IndexMetadata::new();
95
96 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
98 Arc::clone(&directory),
99 Arc::clone(&schema),
100 metadata,
101 config.merge_policy.clone_box(),
102 config.term_cache_blocks,
103 ));
104
105 segment_manager.update_metadata(|_| {}).await?;
107
108 Ok(Self {
109 directory,
110 schema,
111 config,
112 builder_config,
113 tokenizers: FxHashMap::default(),
114 builders,
115 segment_manager,
116 segment_id_sender,
117 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
118 pending_builds: Arc::new(AtomicUsize::new(0)),
119 })
120 }
121
122 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
124 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
125 }
126
127 pub async fn open_with_config(
129 directory: D,
130 config: IndexConfig,
131 builder_config: SegmentBuilderConfig,
132 ) -> Result<Self> {
133 let directory = Arc::new(directory);
134
135 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
137 let schema_bytes = schema_slice.read_bytes().await?;
138 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
139 .map_err(|e| Error::Serialization(e.to_string()))?;
140 let schema = Arc::new(schema);
141
142 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
144
145 let num_builders = config.num_indexing_threads.max(1);
147 let mut builders = Vec::with_capacity(num_builders);
148 for _ in 0..num_builders {
149 builders.push(AsyncMutex::new(None));
150 }
151
152 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
154
155 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
157 Arc::clone(&directory),
158 Arc::clone(&schema),
159 metadata,
160 config.merge_policy.clone_box(),
161 config.term_cache_blocks,
162 ));
163
164 Ok(Self {
165 directory,
166 schema,
167 config,
168 builder_config,
169 tokenizers: FxHashMap::default(),
170 builders,
171 segment_manager,
172 segment_id_sender,
173 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
174 pending_builds: Arc::new(AtomicUsize::new(0)),
175 })
176 }
177
178 pub fn schema(&self) -> &Schema {
180 &self.schema
181 }
182
183 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
185 self.tokenizers.insert(field, Box::new(tokenizer));
186 }
187
188 pub async fn add_document(&self, doc: Document) -> Result<DocId> {
194 use rand::Rng;
195
196 let builder_idx = rand::rng().random_range(0..self.builders.len());
198
199 let mut builder_guard = self.builders[builder_idx].lock().await;
200
201 if builder_guard.is_none() {
203 let mut builder =
204 SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
205 for (field, tokenizer) in &self.tokenizers {
206 builder.set_tokenizer(*field, tokenizer.clone_box());
207 }
208 *builder_guard = Some(builder);
209 }
210
211 let builder = builder_guard.as_mut().unwrap();
212 let doc_id = builder.add_document(doc)?;
213
214 if builder.num_docs() >= self.config.max_docs_per_segment {
216 let full_builder = builder_guard.take().unwrap();
217 drop(builder_guard); self.spawn_background_build(full_builder);
219 }
220
221 Ok(doc_id)
222 }
223
224 fn spawn_background_build(&self, builder: SegmentBuilder) {
229 let directory = Arc::clone(&self.directory);
230 let segment_id = SegmentId::new();
231 let segment_hex = segment_id.to_hex();
232 let sender = self.segment_id_sender.clone();
233 let segment_manager = Arc::clone(&self.segment_manager);
234
235 self.pending_builds.fetch_add(1, Ordering::SeqCst);
236
237 tokio::spawn(async move {
239 match builder.build(directory.as_ref(), segment_id).await {
240 Ok(_) => {
241 let _ = segment_manager.register_segment(segment_hex.clone()).await;
243 let _ = sender.send(segment_hex);
245 }
246 Err(e) => {
247 eprintln!("Background segment build failed: {:?}", e);
249 }
250 }
251 });
252 }
253
254 async fn collect_completed_segments(&self) {
258 let mut receiver = self.segment_id_receiver.lock().await;
259 while let Ok(_segment_hex) = receiver.try_recv() {
260 self.pending_builds.fetch_sub(1, Ordering::SeqCst);
262 }
263 }
264
265 pub fn pending_build_count(&self) -> usize {
267 self.pending_builds.load(Ordering::SeqCst)
268 }
269
270 pub fn pending_merge_count(&self) -> usize {
272 self.segment_manager.pending_merge_count()
273 }
274
275 pub async fn maybe_merge(&self) {
280 self.segment_manager.maybe_merge().await;
281 }
282
283 pub async fn wait_for_merges(&self) {
285 self.segment_manager.wait_for_merges().await;
286 }
287
288 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
296 self.segment_manager.cleanup_orphan_segments().await
297 }
298
299 pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
301 let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
302
303 for builder_mutex in &self.builders {
304 let guard = builder_mutex.lock().await;
305 if let Some(builder) = guard.as_ref() {
306 let stats = builder.stats();
307 if let Some(ref mut total) = total_stats {
308 total.num_docs += stats.num_docs;
309 total.unique_terms += stats.unique_terms;
310 total.postings_in_memory += stats.postings_in_memory;
311 total.interned_strings += stats.interned_strings;
312 total.doc_field_lengths_size += stats.doc_field_lengths_size;
313 } else {
314 total_stats = Some(stats);
315 }
316 }
317 }
318
319 total_stats
320 }
321
322 pub async fn flush(&self) -> Result<()> {
328 self.collect_completed_segments().await;
330
331 for builder_mutex in &self.builders {
333 let mut guard = builder_mutex.lock().await;
334 if let Some(builder) = guard.take()
335 && builder.num_docs() > 0
336 {
337 self.spawn_background_build(builder);
338 }
339 }
340
341 Ok(())
342 }
343
344 pub async fn commit(&self) -> Result<()> {
351 self.flush().await?;
353
354 let mut receiver = self.segment_id_receiver.lock().await;
356 while self.pending_builds.load(Ordering::SeqCst) > 0 {
357 match receiver.recv().await {
358 Some(_segment_hex) => {
359 self.pending_builds.fetch_sub(1, Ordering::SeqCst);
360 }
361 None => break, }
363 }
364 drop(receiver);
365
366 self.maybe_build_vector_index().await?;
368
369 Ok(())
370 }
371
372 async fn do_merge(&self) -> Result<()> {
376 let segment_ids = self.segment_manager.get_segment_ids().await;
377
378 if segment_ids.len() < 2 {
379 return Ok(());
380 }
381
382 let ids_to_merge: Vec<String> = segment_ids.clone();
383 drop(segment_ids);
384
385 let mut readers = Vec::new();
387 let mut doc_offset = 0u32;
388
389 for id_str in &ids_to_merge {
390 let segment_id = SegmentId::from_hex(id_str)
391 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
392 let reader = SegmentReader::open(
393 self.directory.as_ref(),
394 segment_id,
395 Arc::clone(&self.schema),
396 doc_offset,
397 self.config.term_cache_blocks,
398 )
399 .await?;
400 doc_offset += reader.meta().num_docs;
401 readers.push(reader);
402 }
403
404 let merger = SegmentMerger::new(Arc::clone(&self.schema));
406 let new_segment_id = SegmentId::new();
407 merger
408 .merge(self.directory.as_ref(), &readers, new_segment_id)
409 .await?;
410
411 self.segment_manager
413 .replace_segments(vec![new_segment_id.to_hex()], ids_to_merge)
414 .await?;
415
416 Ok(())
417 }
418
419 pub async fn force_merge(&self) -> Result<()> {
421 self.commit().await?;
423 self.do_merge().await
425 }
426
427 }