Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter — async document indexing with parallel segment building.
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! # Architecture
6//!
7//! ```text
8//! add_document() ──try_send──► [shared bounded MPMC] ◄──recv── worker 0
9//!                                                     ◄──recv── worker 1
10//!                                                     ◄──recv── worker N
11//! ```
12//!
13//! - **Shared MPMC queue** (`async_channel`): all workers compete for documents.
14//!   Busy workers (building segments) naturally stop pulling; free workers pick up slack.
15//! - **Zero-copy pipeline**: `Document` is moved (never cloned) through every stage:
16//!   `add_document()` → channel → `recv_blocking()` → `SegmentBuilder::add_document()`.
17//! - `add_document` returns `QueueFull` when the queue is at capacity.
18//! - **Workers are OS threads**: CPU-intensive work (tokenization, posting list building)
19//!   runs on dedicated threads, never blocking the tokio async runtime.
20//!   Async I/O (segment file writes) is bridged via `Handle::block_on()`.
21//! - **Fixed per-worker memory budget**: `max_indexing_memory_bytes / num_workers`.
22//! - **Two-phase commit**:
23//!   1. `prepare_commit()` — closes queue, drains workers, segments written to disk.
24//!      Returns a `PreparedCommit` guard. No new documents accepted until resolved.
25//!   2. `PreparedCommit::commit()` — registers segments in metadata, respawns workers.
26//!   3. `PreparedCommit::abort()` — discards prepared segments, respawns workers.
27//!   4. `commit()` — convenience: `prepare_commit().await?.commit().await`.
28//!
29//! Since `prepare_commit`/`commit` take `&mut self`, Rust’s borrow checker
30//! guarantees no concurrent `add_document` calls during the commit window.
31
32use std::sync::Arc;
33
34use rustc_hash::FxHashMap;
35
36use crate::directories::DirectoryWriter;
37use crate::dsl::{Document, Field, Schema};
38use crate::error::{Error, Result};
39use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
40use crate::tokenizer::BoxedTokenizer;
41
42use super::IndexConfig;
43
44/// Total pipeline capacity (in documents).
45const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
46
47/// Async IndexWriter for adding documents and committing segments.
48///
49/// **Backpressure:** `add_document()` is sync, O(1). Returns `Error::QueueFull`
50/// when the shared queue is at capacity — caller must back off.
51///
52/// **Two-phase commit:**
53/// - `prepare_commit()` → `PreparedCommit::commit()` or `PreparedCommit::abort()`
54/// - `commit()` is a convenience that does both phases.
55/// - Between prepare and commit, the caller can do external work (WAL, sync, etc.)
56///   knowing that abort is possible if something fails.
57/// - Dropping `PreparedCommit` without calling commit/abort auto-aborts.
58pub struct IndexWriter<D: DirectoryWriter + 'static> {
59    pub(super) directory: Arc<D>,
60    pub(super) schema: Arc<Schema>,
61    pub(super) config: IndexConfig,
62    /// MPMC sender — `try_send(&self)` is thread-safe, no lock needed.
63    /// Replaced on each commit cycle.
64    doc_sender: async_channel::Sender<Document>,
65    /// Worker OS thread handles — replaced on each commit cycle.
66    workers: Vec<std::thread::JoinHandle<()>>,
67    /// Shared worker state (immutable config + mutable segment output)
68    worker_state: Arc<WorkerState<D>>,
69    /// Segment manager — owns metadata.json, handles segments and background merging
70    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
71    /// Segments flushed to disk but not yet registered in metadata
72    flushed_segments: Vec<(String, u32)>,
73}
74
75/// Shared state for worker threads.
76struct WorkerState<D: DirectoryWriter + 'static> {
77    directory: Arc<D>,
78    schema: Arc<Schema>,
79    builder_config: SegmentBuilderConfig,
80    tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
81    /// Fixed per-worker memory budget (bytes). When a builder exceeds this, segment is built.
82    memory_budget_per_worker: usize,
83    /// Segment manager — workers read trained structures from its ArcSwap (lock-free).
84    segment_manager: Arc<crate::merge::SegmentManager<D>>,
85    /// Segments built by workers, collected by `prepare_commit()`. Sync mutex for sub-μs push.
86    built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
87}
88
89impl<D: DirectoryWriter + 'static> IndexWriter<D> {
90    /// Create a new index in the directory
91    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
92        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
93    }
94
95    /// Create a new index with custom builder config
96    pub async fn create_with_config(
97        directory: D,
98        schema: Schema,
99        config: IndexConfig,
100        builder_config: SegmentBuilderConfig,
101    ) -> Result<Self> {
102        let directory = Arc::new(directory);
103        let schema = Arc::new(schema);
104        let metadata = super::IndexMetadata::new((*schema).clone());
105
106        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
107            Arc::clone(&directory),
108            Arc::clone(&schema),
109            metadata,
110            config.merge_policy.clone_box(),
111            config.term_cache_blocks,
112            config.max_concurrent_merges,
113        ));
114        segment_manager.update_metadata(|_| {}).await?;
115
116        Ok(Self::new_with_parts(
117            directory,
118            schema,
119            config,
120            builder_config,
121            segment_manager,
122        ))
123    }
124
125    /// Open an existing index for writing
126    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
127        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
128    }
129
130    /// Open an existing index with custom builder config
131    pub async fn open_with_config(
132        directory: D,
133        config: IndexConfig,
134        builder_config: SegmentBuilderConfig,
135    ) -> Result<Self> {
136        let directory = Arc::new(directory);
137        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
138        let schema = Arc::new(metadata.schema.clone());
139
140        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
141            Arc::clone(&directory),
142            Arc::clone(&schema),
143            metadata,
144            config.merge_policy.clone_box(),
145            config.term_cache_blocks,
146            config.max_concurrent_merges,
147        ));
148        segment_manager.load_and_publish_trained().await;
149
150        Ok(Self::new_with_parts(
151            directory,
152            schema,
153            config,
154            builder_config,
155            segment_manager,
156        ))
157    }
158
159    /// Create an IndexWriter from an existing Index.
160    /// Shares the SegmentManager for consistent segment lifecycle management.
161    pub fn from_index(index: &super::Index<D>) -> Self {
162        Self::new_with_parts(
163            Arc::clone(&index.directory),
164            Arc::clone(&index.schema),
165            index.config.clone(),
166            SegmentBuilderConfig::default(),
167            Arc::clone(&index.segment_manager),
168        )
169    }
170
171    // ========================================================================
172    // Construction + pipeline management
173    // ========================================================================
174
175    /// Common construction: creates worker state, spawns workers, assembles `Self`.
176    fn new_with_parts(
177        directory: Arc<D>,
178        schema: Arc<Schema>,
179        config: IndexConfig,
180        builder_config: SegmentBuilderConfig,
181        segment_manager: Arc<crate::merge::SegmentManager<D>>,
182    ) -> Self {
183        let num_workers = config.num_indexing_threads.max(1);
184        let worker_state = Arc::new(WorkerState {
185            directory: Arc::clone(&directory),
186            schema: Arc::clone(&schema),
187            builder_config,
188            tokenizers: parking_lot::RwLock::new(FxHashMap::default()),
189            memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
190            segment_manager: Arc::clone(&segment_manager),
191            built_segments: parking_lot::Mutex::new(Vec::new()),
192        });
193        let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
194
195        Self {
196            directory,
197            schema,
198            config,
199            doc_sender,
200            workers,
201            worker_state,
202            segment_manager,
203            flushed_segments: Vec::new(),
204        }
205    }
206
207    fn spawn_workers(
208        worker_state: &Arc<WorkerState<D>>,
209        num_workers: usize,
210    ) -> (
211        async_channel::Sender<Document>,
212        Vec<std::thread::JoinHandle<()>>,
213    ) {
214        let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
215        let handle = tokio::runtime::Handle::current();
216        let mut workers = Vec::with_capacity(num_workers);
217        for i in 0..num_workers {
218            let state = Arc::clone(worker_state);
219            let rx = receiver.clone();
220            let rt = handle.clone();
221            workers.push(
222                std::thread::Builder::new()
223                    .name(format!("index-worker-{}", i))
224                    .spawn(move || Self::worker_loop(state, rx, rt))
225                    .expect("failed to spawn index worker thread"),
226            );
227        }
228        (sender, workers)
229    }
230
231    /// Get the schema
232    pub fn schema(&self) -> &Schema {
233        &self.schema
234    }
235
236    /// Set tokenizer for a field.
237    /// Propagated to worker threads — takes effect for the next SegmentBuilder they create.
238    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
239        self.worker_state
240            .tokenizers
241            .write()
242            .insert(field, Box::new(tokenizer));
243    }
244
245    /// Add a document to the indexing queue (sync, O(1), lock-free).
246    ///
247    /// `Document` is moved into the channel (zero-copy). Workers compete to pull it.
248    /// Returns `Error::QueueFull` when the queue is at capacity — caller must back off.
249    pub fn add_document(&self, doc: Document) -> Result<()> {
250        self.doc_sender.try_send(doc).map_err(|e| match e {
251            async_channel::TrySendError::Full(_) => Error::QueueFull,
252            async_channel::TrySendError::Closed(_) => {
253                Error::Internal("Document channel closed".into())
254            }
255        })
256    }
257
258    /// Add multiple documents to the indexing queue.
259    ///
260    /// Returns the number of documents successfully queued. Stops at the first
261    /// `QueueFull` and returns the count queued so far.
262    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
263        let total = documents.len();
264        for (i, doc) in documents.into_iter().enumerate() {
265            match self.add_document(doc) {
266                Ok(()) => {}
267                Err(Error::QueueFull) => return Ok(i),
268                Err(e) => return Err(e),
269            }
270        }
271        Ok(total)
272    }
273
274    // ========================================================================
275    // Worker loop
276    // ========================================================================
277
278    /// Worker loop — runs on a dedicated OS thread.
279    ///
280    /// Pulls documents from the shared MPMC queue (blocking recv), indexes them
281    /// (CPU-intensive: tokenization, posting list updates), and builds segments
282    /// inline when the memory budget is exceeded.
283    ///
284    /// Async I/O (segment file writes) is bridged via `Handle::block_on()`.
285    /// Exits when the channel is closed (prepare_commit closes the sender).
286    fn worker_loop(
287        state: Arc<WorkerState<D>>,
288        receiver: async_channel::Receiver<Document>,
289        handle: tokio::runtime::Handle,
290    ) {
291        let mut builder: Option<SegmentBuilder> = None;
292
293        while let Ok(doc) = receiver.recv_blocking() {
294            // Initialize builder if needed
295            if builder.is_none() {
296                match SegmentBuilder::new(Arc::clone(&state.schema), state.builder_config.clone()) {
297                    Ok(mut b) => {
298                        for (field, tokenizer) in state.tokenizers.read().iter() {
299                            b.set_tokenizer(*field, tokenizer.clone_box());
300                        }
301                        builder = Some(b);
302                    }
303                    Err(e) => {
304                        log::error!("Failed to create segment builder: {:?}", e);
305                        continue;
306                    }
307                }
308            }
309
310            let b = builder.as_mut().unwrap();
311            if let Err(e) = b.add_document(doc) {
312                log::error!("Failed to index document: {:?}", e);
313                continue;
314            }
315
316            let builder_memory = b.estimated_memory_bytes();
317
318            if b.num_docs() & 0x3FFF == 0 {
319                log::debug!(
320                    "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
321                    b.num_docs(),
322                    builder_memory as f64 / (1024.0 * 1024.0),
323                    state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
324                );
325            }
326
327            // Require minimum 100 docs before flushing to avoid tiny segments
328            const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
329
330            if builder_memory >= state.memory_budget_per_worker
331                && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH
332            {
333                log::info!(
334                    "[indexing] memory budget reached, building segment: \
335                     docs={}, memory={:.2} MB, budget={:.2} MB",
336                    b.num_docs(),
337                    builder_memory as f64 / (1024.0 * 1024.0),
338                    state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
339                );
340                let full_builder = builder.take().unwrap();
341                Self::build_segment_inline(&state, full_builder, &handle);
342            }
343        }
344
345        // Channel closed — build remaining docs
346        if let Some(b) = builder.take()
347            && b.num_docs() > 0
348        {
349            Self::build_segment_inline(&state, b, &handle);
350        }
351    }
352
353    /// Build a segment on the worker thread. Uses `Handle::block_on()` to bridge
354    /// into async context for I/O (streaming writers). CPU work (rayon) stays on
355    /// the worker thread / rayon pool.
356    fn build_segment_inline(
357        state: &WorkerState<D>,
358        builder: SegmentBuilder,
359        handle: &tokio::runtime::Handle,
360    ) {
361        let segment_id = SegmentId::new();
362        let segment_hex = segment_id.to_hex();
363        let trained = state.segment_manager.trained();
364        let doc_count = builder.num_docs();
365        let build_start = std::time::Instant::now();
366
367        log::info!(
368            "[segment_build] segment_id={} doc_count={} ann={}",
369            segment_hex,
370            doc_count,
371            trained.is_some()
372        );
373
374        match handle.block_on(builder.build(
375            state.directory.as_ref(),
376            segment_id,
377            trained.as_deref(),
378        )) {
379            Ok(meta) if meta.num_docs > 0 => {
380                let duration_ms = build_start.elapsed().as_millis() as u64;
381                log::info!(
382                    "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
383                    segment_hex,
384                    meta.num_docs,
385                    duration_ms,
386                );
387                state
388                    .built_segments
389                    .lock()
390                    .push((segment_hex, meta.num_docs));
391            }
392            Ok(_) => {}
393            Err(e) => {
394                log::error!(
395                    "[segment_build_failed] segment_id={} error={:?}",
396                    segment_hex,
397                    e
398                );
399            }
400        }
401    }
402
403    // ========================================================================
404    // Public API — commit, merge, etc.
405    // ========================================================================
406
407    /// Check merge policy and spawn a background merge if needed.
408    pub async fn maybe_merge(&self) {
409        self.segment_manager.maybe_merge().await;
410    }
411
412    /// Wait for the in-flight background merge to complete (if any).
413    pub async fn wait_for_merging_thread(&self) {
414        self.segment_manager.wait_for_merging_thread().await;
415    }
416
417    /// Wait for all eligible merges to complete, including cascading merges.
418    pub async fn wait_for_all_merges(&self) {
419        self.segment_manager.wait_for_all_merges().await;
420    }
421
422    /// Get the segment tracker for sharing with readers.
423    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
424        self.segment_manager.tracker()
425    }
426
427    /// Acquire a snapshot of current segments for reading.
428    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
429        self.segment_manager.acquire_snapshot().await
430    }
431
432    /// Clean up orphan segment files not registered in metadata.
433    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
434        self.segment_manager.cleanup_orphan_segments().await
435    }
436
437    /// Prepare commit — close queue, drain workers, collect built segments.
438    ///
439    /// All documents sent via `add_document` before this call are guaranteed
440    /// to be written to segment files on disk. Segments are NOT yet registered
441    /// in metadata — call `PreparedCommit::commit()` for that.
442    ///
443    /// Between prepare and commit, the caller can do external work (WAL sync,
444    /// replication, etc.) knowing that `abort()` is possible if something fails.
445    ///
446    /// `add_document` will return `Closed` error until commit/abort respawns workers.
447    pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
448        // 1. Close channel → workers drain remaining docs and exit
449        self.doc_sender.close();
450
451        // 2. Join worker OS threads (via spawn_blocking to avoid blocking tokio)
452        let workers = std::mem::take(&mut self.workers);
453        tokio::task::spawn_blocking(move || {
454            for w in workers {
455                let _ = w.join();
456            }
457        })
458        .await
459        .map_err(|e| Error::Internal(format!("Failed to join workers: {}", e)))?;
460
461        // 3. Collect built segments
462        let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
463        self.flushed_segments.extend(built);
464
465        Ok(PreparedCommit {
466            writer: self,
467            is_resolved: false,
468        })
469    }
470
471    /// Commit (convenience): prepare_commit + commit in one call.
472    ///
473    /// Guarantees all prior `add_document` calls are committed.
474    /// Vector training is decoupled — call `build_vector_index()` manually.
475    pub async fn commit(&mut self) -> Result<()> {
476        self.prepare_commit().await?.commit().await
477    }
478
479    /// Force merge all segments into one.
480    pub async fn force_merge(&mut self) -> Result<()> {
481        self.prepare_commit().await?.commit().await?;
482        self.segment_manager.force_merge().await
483    }
484
485    /// Respawn workers with a fresh channel. Called after commit or abort.
486    ///
487    /// If the tokio runtime has shut down (e.g., program exit), this is a no-op
488    /// to avoid panicking in `Handle::current()`. The writer is left in a
489    /// degraded state (closed channel, no workers) — `add_document` will return
490    /// `Closed` errors, which is acceptable during shutdown.
491    fn respawn_workers(&mut self) {
492        if tokio::runtime::Handle::try_current().is_err() {
493            return;
494        }
495        let num_workers = self.config.num_indexing_threads.max(1);
496        let (sender, workers) = Self::spawn_workers(&self.worker_state, num_workers);
497        self.doc_sender = sender;
498        self.workers = workers;
499    }
500
501    // Vector index methods (build_vector_index, etc.) are in vector_builder.rs
502}
503
504impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
505    fn drop(&mut self) {
506        self.doc_sender.close();
507        for w in std::mem::take(&mut self.workers) {
508            let _ = w.join();
509        }
510    }
511}
512
513/// A prepared commit that can be finalized or aborted.
514///
515/// Two-phase commit guard. Between `prepare_commit()` and
516/// `commit()`/`abort()`, segments are on disk but NOT in metadata.
517/// Dropping without calling either will auto-abort (discard segments,
518/// respawn workers).
519pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
520    writer: &'a mut IndexWriter<D>,
521    is_resolved: bool,
522}
523
524impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
525    /// Finalize: register segments in metadata, evaluate merge policy, respawn workers.
526    pub async fn commit(mut self) -> Result<()> {
527        self.is_resolved = true;
528        let segments = std::mem::take(&mut self.writer.flushed_segments);
529        self.writer.segment_manager.commit(segments).await?;
530        self.writer.segment_manager.maybe_merge().await;
531        self.writer.respawn_workers();
532        Ok(())
533    }
534
535    /// Abort: discard prepared segments, respawn workers.
536    /// Segment files become orphans (cleaned up by `cleanup_orphan_segments`).
537    pub fn abort(mut self) {
538        self.is_resolved = true;
539        self.writer.flushed_segments.clear();
540        self.writer.respawn_workers();
541    }
542}
543
544impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
545    fn drop(&mut self) {
546        if !self.is_resolved {
547            log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
548            self.writer.flushed_segments.clear();
549            self.writer.respawn_workers();
550        }
551    }
552}