1use std::path::Path;
6use std::sync::Arc;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use rustc_hash::FxHashMap;
10use tokio::sync::Mutex as AsyncMutex;
11use tokio::sync::mpsc;
12
13use crate::DocId;
14use crate::directories::DirectoryWriter;
15use crate::dsl::{Document, Field, Schema};
16use crate::error::{Error, Result};
17use crate::segment::{
18 SegmentBuilder, SegmentBuilderConfig, SegmentId, SegmentMerger, SegmentReader,
19};
20use crate::tokenizer::BoxedTokenizer;
21
22use super::IndexConfig;
23
24pub struct IndexWriter<D: DirectoryWriter + 'static> {
32 directory: Arc<D>,
33 schema: Arc<Schema>,
34 config: IndexConfig,
35 builder_config: SegmentBuilderConfig,
36 tokenizers: FxHashMap<Field, BoxedTokenizer>,
37 builders: Vec<AsyncMutex<Option<SegmentBuilder>>>,
39 segment_manager: Arc<crate::merge::SegmentManager<D>>,
41 segment_id_sender: mpsc::UnboundedSender<String>,
43 segment_id_receiver: AsyncMutex<mpsc::UnboundedReceiver<String>>,
45 pending_builds: Arc<AtomicUsize>,
47}
48
49impl<D: DirectoryWriter + 'static> IndexWriter<D> {
50 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
52 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
53 }
54
55 pub async fn create_with_config(
57 directory: D,
58 schema: Schema,
59 config: IndexConfig,
60 builder_config: SegmentBuilderConfig,
61 ) -> Result<Self> {
62 let directory = Arc::new(directory);
63 let schema = Arc::new(schema);
64
65 let schema_bytes =
67 serde_json::to_vec(&*schema).map_err(|e| Error::Serialization(e.to_string()))?;
68 directory
69 .write(Path::new("schema.json"), &schema_bytes)
70 .await?;
71
72 let segments_bytes = serde_json::to_vec(&Vec::<String>::new())
74 .map_err(|e| Error::Serialization(e.to_string()))?;
75 directory
76 .write(Path::new("segments.json"), &segments_bytes)
77 .await?;
78
79 let num_builders = config.num_indexing_threads.max(1);
81 let mut builders = Vec::with_capacity(num_builders);
82 for _ in 0..num_builders {
83 builders.push(AsyncMutex::new(None));
84 }
85
86 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
88
89 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
91 Arc::clone(&directory),
92 Arc::clone(&schema),
93 Vec::new(),
94 config.merge_policy.clone_box(),
95 config.term_cache_blocks,
96 ));
97
98 Ok(Self {
99 directory,
100 schema,
101 config,
102 builder_config,
103 tokenizers: FxHashMap::default(),
104 builders,
105 segment_manager,
106 segment_id_sender,
107 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
108 pending_builds: Arc::new(AtomicUsize::new(0)),
109 })
110 }
111
112 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
114 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
115 }
116
117 pub async fn open_with_config(
119 directory: D,
120 config: IndexConfig,
121 builder_config: SegmentBuilderConfig,
122 ) -> Result<Self> {
123 let directory = Arc::new(directory);
124
125 let schema_slice = directory.open_read(Path::new("schema.json")).await?;
127 let schema_bytes = schema_slice.read_bytes().await?;
128 let schema: Schema = serde_json::from_slice(schema_bytes.as_slice())
129 .map_err(|e| Error::Serialization(e.to_string()))?;
130 let schema = Arc::new(schema);
131
132 let segments_slice = directory.open_read(Path::new("segments.json")).await?;
134 let segments_bytes = segments_slice.read_bytes().await?;
135 let segment_ids: Vec<String> = serde_json::from_slice(segments_bytes.as_slice())
136 .map_err(|e| Error::Serialization(e.to_string()))?;
137
138 let num_builders = config.num_indexing_threads.max(1);
140 let mut builders = Vec::with_capacity(num_builders);
141 for _ in 0..num_builders {
142 builders.push(AsyncMutex::new(None));
143 }
144
145 let (segment_id_sender, segment_id_receiver) = mpsc::unbounded_channel();
147
148 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
150 Arc::clone(&directory),
151 Arc::clone(&schema),
152 segment_ids,
153 config.merge_policy.clone_box(),
154 config.term_cache_blocks,
155 ));
156
157 Ok(Self {
158 directory,
159 schema,
160 config,
161 builder_config,
162 tokenizers: FxHashMap::default(),
163 builders,
164 segment_manager,
165 segment_id_sender,
166 segment_id_receiver: AsyncMutex::new(segment_id_receiver),
167 pending_builds: Arc::new(AtomicUsize::new(0)),
168 })
169 }
170
171 pub fn schema(&self) -> &Schema {
173 &self.schema
174 }
175
176 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
178 self.tokenizers.insert(field, Box::new(tokenizer));
179 }
180
181 pub async fn add_document(&self, doc: Document) -> Result<DocId> {
187 use rand::Rng;
188
189 let builder_idx = rand::rng().random_range(0..self.builders.len());
191
192 let mut builder_guard = self.builders[builder_idx].lock().await;
193
194 if builder_guard.is_none() {
196 let mut builder =
197 SegmentBuilder::new((*self.schema).clone(), self.builder_config.clone())?;
198 for (field, tokenizer) in &self.tokenizers {
199 builder.set_tokenizer(*field, tokenizer.clone_box());
200 }
201 *builder_guard = Some(builder);
202 }
203
204 let builder = builder_guard.as_mut().unwrap();
205 let doc_id = builder.add_document(doc)?;
206
207 if builder.num_docs() >= self.config.max_docs_per_segment {
209 let full_builder = builder_guard.take().unwrap();
210 drop(builder_guard); self.spawn_background_build(full_builder);
212 }
213
214 Ok(doc_id)
215 }
216
217 fn spawn_background_build(&self, builder: SegmentBuilder) {
222 let directory = Arc::clone(&self.directory);
223 let segment_id = SegmentId::new();
224 let segment_hex = segment_id.to_hex();
225 let sender = self.segment_id_sender.clone();
226 let segment_manager = Arc::clone(&self.segment_manager);
227
228 self.pending_builds.fetch_add(1, Ordering::SeqCst);
229
230 tokio::spawn(async move {
232 match builder.build(directory.as_ref(), segment_id).await {
233 Ok(_) => {
234 segment_manager.register_segment(segment_hex.clone()).await;
236 let _ = sender.send(segment_hex);
238 }
239 Err(e) => {
240 eprintln!("Background segment build failed: {:?}", e);
242 }
243 }
244 });
245 }
246
247 async fn collect_completed_segments(&self) {
251 let mut receiver = self.segment_id_receiver.lock().await;
252 while let Ok(_segment_hex) = receiver.try_recv() {
253 self.pending_builds.fetch_sub(1, Ordering::SeqCst);
255 }
256 }
257
258 pub fn pending_build_count(&self) -> usize {
260 self.pending_builds.load(Ordering::SeqCst)
261 }
262
263 pub fn pending_merge_count(&self) -> usize {
265 self.segment_manager.pending_merge_count()
266 }
267
268 pub async fn maybe_merge(&self) {
273 self.segment_manager.maybe_merge().await;
274 }
275
276 pub async fn wait_for_merges(&self) {
278 self.segment_manager.wait_for_merges().await;
279 }
280
281 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
289 self.segment_manager.cleanup_orphan_segments().await
290 }
291
292 pub async fn get_builder_stats(&self) -> Option<crate::segment::SegmentBuilderStats> {
294 let mut total_stats: Option<crate::segment::SegmentBuilderStats> = None;
295
296 for builder_mutex in &self.builders {
297 let guard = builder_mutex.lock().await;
298 if let Some(builder) = guard.as_ref() {
299 let stats = builder.stats();
300 if let Some(ref mut total) = total_stats {
301 total.num_docs += stats.num_docs;
302 total.unique_terms += stats.unique_terms;
303 total.postings_in_memory += stats.postings_in_memory;
304 total.interned_strings += stats.interned_strings;
305 total.doc_field_lengths_size += stats.doc_field_lengths_size;
306 } else {
307 total_stats = Some(stats);
308 }
309 }
310 }
311
312 total_stats
313 }
314
315 pub async fn flush(&self) -> Result<()> {
321 self.collect_completed_segments().await;
323
324 for builder_mutex in &self.builders {
326 let mut guard = builder_mutex.lock().await;
327 if let Some(builder) = guard.take()
328 && builder.num_docs() > 0
329 {
330 self.spawn_background_build(builder);
331 }
332 }
333
334 Ok(())
335 }
336
337 pub async fn commit(&self) -> Result<()> {
342 self.flush().await?;
344
345 let mut receiver = self.segment_id_receiver.lock().await;
347 while self.pending_builds.load(Ordering::SeqCst) > 0 {
348 match receiver.recv().await {
349 Some(_segment_hex) => {
350 self.pending_builds.fetch_sub(1, Ordering::SeqCst);
351 }
352 None => break, }
354 }
355 drop(receiver);
356
357 let segment_ids = self.segment_manager.get_segment_ids().await;
359 let segments_bytes =
360 serde_json::to_vec(&segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
361 self.directory
362 .write(Path::new("segments.json"), &segments_bytes)
363 .await?;
364
365 Ok(())
366 }
367
368 async fn do_merge(&self) -> Result<()> {
370 let segment_ids = self.segment_manager.get_segment_ids().await;
371
372 if segment_ids.len() < 2 {
373 return Ok(());
374 }
375
376 let ids_to_merge: Vec<String> = segment_ids.clone();
377 drop(segment_ids);
378
379 let mut readers = Vec::new();
381 let mut doc_offset = 0u32;
382
383 for id_str in &ids_to_merge {
384 let segment_id = SegmentId::from_hex(id_str)
385 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
386 let reader = SegmentReader::open(
387 self.directory.as_ref(),
388 segment_id,
389 Arc::clone(&self.schema),
390 doc_offset,
391 self.config.term_cache_blocks,
392 )
393 .await?;
394 doc_offset += reader.meta().num_docs;
395 readers.push(reader);
396 }
397
398 let merger = SegmentMerger::new(Arc::clone(&self.schema));
400 let new_segment_id = SegmentId::new();
401 merger
402 .merge(self.directory.as_ref(), &readers, new_segment_id)
403 .await?;
404
405 {
407 let segment_ids_arc = self.segment_manager.segment_ids();
408 let mut segment_ids = segment_ids_arc.lock().await;
409 segment_ids.clear();
410 segment_ids.push(new_segment_id.to_hex());
411 }
412
413 let segment_ids = self.segment_manager.get_segment_ids().await;
414 let segments_bytes =
415 serde_json::to_vec(&segment_ids).map_err(|e| Error::Serialization(e.to_string()))?;
416 self.directory
417 .write(Path::new("segments.json"), &segments_bytes)
418 .await?;
419
420 for id_str in ids_to_merge {
422 if let Some(segment_id) = SegmentId::from_hex(&id_str) {
423 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
424 }
425 }
426
427 Ok(())
428 }
429
430 pub async fn force_merge(&self) -> Result<()> {
432 self.commit().await?;
434 self.do_merge().await
436 }
437}