Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter — async document indexing with parallel segment building.
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! # Architecture
6//!
7//! ```text
8//! add_document() ──try_send──► [shared bounded MPMC] ◄──recv── worker 0
9//!                                                     ◄──recv── worker 1
10//!                                                     ◄──recv── worker N
11//! ```
12//!
13//! - **Shared MPMC queue** (`async_channel`): all workers compete for documents.
14//!   Busy workers (building segments) naturally stop pulling; free workers pick up slack.
15//! - **Zero-copy pipeline**: `Document` is moved (never cloned) through every stage:
16//!   `add_document()` → channel → `recv_blocking()` → `SegmentBuilder::add_document()`.
17//! - `add_document` returns `QueueFull` when the queue is at capacity.
18//! - **Workers are OS threads**: CPU-intensive work (tokenization, posting list building)
19//!   runs on dedicated threads, never blocking the tokio async runtime.
20//!   Async I/O (segment file writes) is bridged via `Handle::block_on()`.
21//! - **Fixed per-worker memory budget**: `max_indexing_memory_bytes / num_workers`.
22//! - **Two-phase commit**:
23//!   1. `prepare_commit()` — closes queue, workers flush builders to disk.
24//!      Returns a `PreparedCommit` guard. No new documents accepted until resolved.
25//!   2. `PreparedCommit::commit()` — registers segments in metadata, resumes workers.
26//!   3. `PreparedCommit::abort()` — discards prepared segments, resumes workers.
27//!   4. `commit()` — convenience: `prepare_commit().await?.commit().await`.
28//!
29//! Since `prepare_commit`/`commit` take `&mut self`, Rust’s borrow checker
30//! guarantees no concurrent `add_document` calls during the commit window.
31
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45/// Total pipeline capacity (in documents).
46const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48/// Async IndexWriter for adding documents and committing segments.
49///
50/// **Backpressure:** `add_document()` is sync, O(1). Returns `Error::QueueFull`
51/// when the shared queue is at capacity — caller must back off.
52///
53/// **Two-phase commit:**
54/// - `prepare_commit()` → `PreparedCommit::commit()` or `PreparedCommit::abort()`
55/// - `commit()` is a convenience that does both phases.
56/// - Between prepare and commit, the caller can do external work (WAL, sync, etc.)
57///   knowing that abort is possible if something fails.
58/// - Dropping `PreparedCommit` without calling commit/abort auto-aborts.
59pub struct IndexWriter<D: DirectoryWriter + 'static> {
60    pub(super) directory: Arc<D>,
61    pub(super) schema: Arc<Schema>,
62    pub(super) config: IndexConfig,
63    /// MPMC sender — `try_send(&self)` is thread-safe, no lock needed.
64    /// Replaced on each commit cycle (workers get new receiver via resume).
65    doc_sender: async_channel::Sender<Document>,
66    /// Worker OS thread handles — long-lived, survive across commits.
67    workers: Vec<std::thread::JoinHandle<()>>,
68    /// Shared worker state (immutable config + mutable segment output + sync)
69    worker_state: Arc<WorkerState<D>>,
70    /// Segment manager — owns metadata.json, handles segments and background merging
71    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72    /// Segments flushed to disk but not yet registered in metadata
73    flushed_segments: Vec<(String, u32)>,
74    /// Primary key dedup index (None if schema has no primary field)
75    primary_key_index: Option<super::primary_key::PrimaryKeyIndex>,
76}
77
78/// Shared state for worker threads.
79struct WorkerState<D: DirectoryWriter + 'static> {
80    directory: Arc<D>,
81    schema: Arc<Schema>,
82    builder_config: SegmentBuilderConfig,
83    tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
84    /// Fixed per-worker memory budget (bytes). When a builder exceeds this, segment is built.
85    memory_budget_per_worker: usize,
86    /// Segment manager — workers read trained structures from its ArcSwap (lock-free).
87    segment_manager: Arc<crate::merge::SegmentManager<D>>,
88    /// Segments built by workers, collected by `prepare_commit()`. Sync mutex for sub-μs push.
89    built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
90
91    // === Worker lifecycle synchronization ===
92    // Workers survive across commits. On prepare_commit the channel is closed;
93    // workers flush their builders, increment flush_count, then wait on
94    // resume_cvar for a new receiver. commit/abort creates a fresh channel
95    // and wakes them.
96    /// Number of workers that have completed their flush.
97    flush_count: AtomicUsize,
98    /// Mutex + condvar for prepare_commit to wait on all workers flushed.
99    flush_mutex: parking_lot::Mutex<()>,
100    flush_cvar: parking_lot::Condvar,
101    /// Holds the new channel receiver after commit/abort. Workers clone from this.
102    resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
103    /// Monotonically increasing epoch, bumped by each resume_workers call.
104    /// Workers compare against their local epoch to avoid re-cloning a stale receiver.
105    resume_epoch: AtomicUsize,
106    /// Condvar for workers to wait for resume (new channel) or shutdown.
107    resume_cvar: parking_lot::Condvar,
108    /// When true, workers should exit permanently (IndexWriter dropped).
109    shutdown: AtomicBool,
110    /// Total number of worker threads.
111    num_workers: usize,
112}
113
114impl<D: DirectoryWriter + 'static> IndexWriter<D> {
115    /// Create a new index in the directory
116    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
117        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
118    }
119
120    /// Create a new index with custom builder config
121    pub async fn create_with_config(
122        directory: D,
123        schema: Schema,
124        config: IndexConfig,
125        builder_config: SegmentBuilderConfig,
126    ) -> Result<Self> {
127        let directory = Arc::new(directory);
128        let schema = Arc::new(schema);
129        let metadata = super::IndexMetadata::new((*schema).clone());
130
131        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
132            Arc::clone(&directory),
133            Arc::clone(&schema),
134            metadata,
135            config.merge_policy.clone_box(),
136            config.term_cache_blocks,
137            config.max_concurrent_merges,
138        ));
139        segment_manager.update_metadata(|_| {}).await?;
140
141        Ok(Self::new_with_parts(
142            directory,
143            schema,
144            config,
145            builder_config,
146            segment_manager,
147        ))
148    }
149
150    /// Open an existing index for writing
151    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
152        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
153    }
154
155    /// Open an existing index with custom builder config
156    pub async fn open_with_config(
157        directory: D,
158        config: IndexConfig,
159        builder_config: SegmentBuilderConfig,
160    ) -> Result<Self> {
161        let directory = Arc::new(directory);
162        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
163        let schema = Arc::new(metadata.schema.clone());
164
165        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
166            Arc::clone(&directory),
167            Arc::clone(&schema),
168            metadata,
169            config.merge_policy.clone_box(),
170            config.term_cache_blocks,
171            config.max_concurrent_merges,
172        ));
173        segment_manager.load_and_publish_trained().await;
174
175        Ok(Self::new_with_parts(
176            directory,
177            schema,
178            config,
179            builder_config,
180            segment_manager,
181        ))
182    }
183
184    /// Create an IndexWriter from an existing Index.
185    /// Shares the SegmentManager for consistent segment lifecycle management.
186    pub fn from_index(index: &super::Index<D>) -> Self {
187        Self::new_with_parts(
188            Arc::clone(&index.directory),
189            Arc::clone(&index.schema),
190            index.config.clone(),
191            SegmentBuilderConfig::default(),
192            Arc::clone(&index.segment_manager),
193        )
194    }
195
196    // ========================================================================
197    // Construction + pipeline management
198    // ========================================================================
199
200    /// Common construction: creates worker state, spawns workers, assembles `Self`.
201    fn new_with_parts(
202        directory: Arc<D>,
203        schema: Arc<Schema>,
204        config: IndexConfig,
205        builder_config: SegmentBuilderConfig,
206        segment_manager: Arc<crate::merge::SegmentManager<D>>,
207    ) -> Self {
208        // Auto-configure tokenizers from schema for all text fields
209        let registry = crate::tokenizer::TokenizerRegistry::new();
210        let mut tokenizers = FxHashMap::default();
211        for (field, entry) in schema.fields() {
212            if matches!(entry.field_type, crate::dsl::FieldType::Text)
213                && let Some(ref tok_name) = entry.tokenizer
214                && let Some(tok) = registry.get(tok_name)
215            {
216                tokenizers.insert(field, tok);
217            }
218        }
219
220        let num_workers = config.num_indexing_threads.max(1);
221        let worker_state = Arc::new(WorkerState {
222            directory: Arc::clone(&directory),
223            schema: Arc::clone(&schema),
224            builder_config,
225            tokenizers: parking_lot::RwLock::new(tokenizers),
226            memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
227            segment_manager: Arc::clone(&segment_manager),
228            built_segments: parking_lot::Mutex::new(Vec::new()),
229            flush_count: AtomicUsize::new(0),
230            flush_mutex: parking_lot::Mutex::new(()),
231            flush_cvar: parking_lot::Condvar::new(),
232            resume_receiver: parking_lot::Mutex::new(None),
233            resume_epoch: AtomicUsize::new(0),
234            resume_cvar: parking_lot::Condvar::new(),
235            shutdown: AtomicBool::new(false),
236            num_workers,
237        });
238        let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
239
240        Self {
241            directory,
242            schema,
243            config,
244            doc_sender,
245            workers,
246            worker_state,
247            segment_manager,
248            flushed_segments: Vec::new(),
249            primary_key_index: None,
250        }
251    }
252
253    fn spawn_workers(
254        worker_state: &Arc<WorkerState<D>>,
255        num_workers: usize,
256    ) -> (
257        async_channel::Sender<Document>,
258        Vec<std::thread::JoinHandle<()>>,
259    ) {
260        let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
261        let handle = tokio::runtime::Handle::current();
262        let mut workers = Vec::with_capacity(num_workers);
263        for i in 0..num_workers {
264            let state = Arc::clone(worker_state);
265            let rx = receiver.clone();
266            let rt = handle.clone();
267            workers.push(
268                std::thread::Builder::new()
269                    .name(format!("index-worker-{}", i))
270                    .spawn(move || Self::worker_loop(state, rx, rt))
271                    .expect("failed to spawn index worker thread"),
272            );
273        }
274        (sender, workers)
275    }
276
277    /// Get the schema
278    pub fn schema(&self) -> &Schema {
279        &self.schema
280    }
281
282    /// Set tokenizer for a field.
283    /// Propagated to worker threads — takes effect for the next SegmentBuilder they create.
284    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
285        self.worker_state
286            .tokenizers
287            .write()
288            .insert(field, Box::new(tokenizer));
289    }
290
291    /// Initialize primary key deduplication from committed segments.
292    ///
293    /// Tries to load a cached bloom filter from `pk_bloom.bin` first. If the
294    /// cache covers all current segments, the bloom is reused directly (fast
295    /// path). If new segments appeared since the cache was written, only their
296    /// keys are iterated (incremental). Falls back to a full rebuild when no
297    /// cache exists.
298    ///
299    /// Only loads fast-field data (text dictionaries) per segment — NOT full
300    /// `SegmentReader`s — to avoid duplicating dense/sparse index memory.
301    ///
302    /// The CPU-intensive bloom build is offloaded via `spawn_blocking` so it
303    /// does not block the tokio runtime.
304    ///
305    /// No-op if schema has no primary field.
306    pub async fn init_primary_key_dedup(&mut self) -> Result<()> {
307        use super::primary_key::{PK_BLOOM_FILE, deserialize_pk_bloom};
308
309        let field = match self.schema.primary_field() {
310            Some(f) => f,
311            None => return Ok(()),
312        };
313
314        let snapshot = self.segment_manager.acquire_snapshot().await;
315        let current_seg_ids: Vec<String> = snapshot.segment_ids().to_vec();
316
317        // Try to load persisted bloom filter.
318        let cached = match self
319            .directory
320            .open_read(std::path::Path::new(PK_BLOOM_FILE))
321            .await
322        {
323            Ok(handle) => {
324                let data = handle.read_bytes_range(0..handle.len()).await;
325                match data {
326                    Ok(bytes) => deserialize_pk_bloom(bytes.as_slice()),
327                    Err(_) => None,
328                }
329            }
330            Err(_) => None,
331        };
332
333        // Load lightweight fast-field data for all segments concurrently.
334        let load_futures: Vec<_> = current_seg_ids
335            .iter()
336            .map(|seg_id_str| {
337                let seg_id_str = seg_id_str.clone();
338                let dir = self.directory.as_ref();
339                let schema = Arc::clone(&self.schema);
340                async move { load_pk_segment_data(dir, &seg_id_str, &schema).await }
341            })
342            .collect();
343        let all_data = futures::future::try_join_all(load_futures).await?;
344
345        if let Some((persisted_seg_ids, bloom)) = cached {
346            // Partition: old segments (covered by bloom) first, new segments at end.
347            let mut pk_data = Vec::with_capacity(all_data.len());
348            let mut new_data = Vec::new();
349            for d in all_data {
350                if persisted_seg_ids.contains(&d.segment_id) {
351                    pk_data.push(d);
352                } else {
353                    new_data.push(d);
354                }
355            }
356            let needs_persist = !new_data.is_empty();
357            let new_start = pk_data.len();
358            pk_data.extend(new_data);
359
360            let pk_index = if new_start == pk_data.len() {
361                // Fast path: all segments covered by cache.
362                super::primary_key::PrimaryKeyIndex::from_persisted(
363                    field,
364                    bloom,
365                    pk_data,
366                    &[],
367                    snapshot,
368                )
369            } else {
370                // Incremental: only iterate new segments' keys.
371                tokio::task::spawn_blocking(move || {
372                    // Insert new segments' keys into the bloom, then construct
373                    // PrimaryKeyIndex with the pre-populated bloom.
374                    let mut bloom = bloom;
375                    let mut added = 0usize;
376                    let num_new = pk_data.len() - new_start;
377                    for data in &pk_data[new_start..] {
378                        if let Some(ff) = data.fast_fields.get(&field.0)
379                            && let Some(dict) = ff.text_dict()
380                        {
381                            for key in dict.iter() {
382                                bloom.insert(key.as_bytes());
383                                added += 1;
384                            }
385                        }
386                    }
387                    if added > 0 {
388                        log::info!(
389                            "[primary_key] bloom: added {} keys from {} new segment(s)",
390                            added,
391                            num_new,
392                        );
393                    }
394                    super::primary_key::PrimaryKeyIndex::from_persisted(
395                        field,
396                        bloom,
397                        pk_data,
398                        &[],
399                        snapshot,
400                    )
401                })
402                .await
403                .map_err(|e| Error::Internal(format!("spawn_blocking failed: {}", e)))?
404            };
405
406            if needs_persist {
407                self.persist_pk_bloom(&pk_index, &current_seg_ids).await;
408            }
409
410            self.primary_key_index = Some(pk_index);
411        } else {
412            // No cache — full rebuild, offloaded to blocking thread.
413            let pk_index = tokio::task::spawn_blocking(move || {
414                super::primary_key::PrimaryKeyIndex::new(field, all_data, snapshot)
415            })
416            .await
417            .map_err(|e| Error::Internal(format!("spawn_blocking failed: {}", e)))?;
418
419            self.persist_pk_bloom(&pk_index, &current_seg_ids).await;
420            self.primary_key_index = Some(pk_index);
421        }
422
423        Ok(())
424    }
425
426    /// Persist the primary-key bloom filter to `pk_bloom.bin`.
427    /// Best-effort: errors are logged but not propagated.
428    async fn persist_pk_bloom(
429        &self,
430        pk_index: &super::primary_key::PrimaryKeyIndex,
431        segment_ids: &[String],
432    ) {
433        use super::primary_key::{PK_BLOOM_FILE, serialize_pk_bloom};
434
435        let bloom_bytes = pk_index.bloom_to_bytes();
436        let data = serialize_pk_bloom(segment_ids, &bloom_bytes);
437        if let Err(e) = self
438            .directory
439            .write(std::path::Path::new(PK_BLOOM_FILE), &data)
440            .await
441        {
442            log::warn!("[primary_key] failed to persist bloom cache: {}", e);
443        }
444    }
445
446    /// Add a document to the indexing queue (sync, O(1), lock-free).
447    ///
448    /// `Document` is moved into the channel (zero-copy). Workers compete to pull it.
449    /// Returns `Error::QueueFull` when the queue is at capacity — caller must back off.
450    pub fn add_document(&self, doc: Document) -> Result<()> {
451        if let Some(ref pk_index) = self.primary_key_index {
452            pk_index.check_and_insert(&doc)?;
453        }
454        match self.doc_sender.try_send(doc) {
455            Ok(()) => Ok(()),
456            Err(async_channel::TrySendError::Full(doc)) => {
457                // Roll back PK registration so the caller can retry later
458                if let Some(ref pk_index) = self.primary_key_index {
459                    pk_index.rollback_uncommitted_key(&doc);
460                }
461                Err(Error::QueueFull)
462            }
463            Err(async_channel::TrySendError::Closed(doc)) => {
464                // Roll back PK registration for defense-in-depth
465                if let Some(ref pk_index) = self.primary_key_index {
466                    pk_index.rollback_uncommitted_key(&doc);
467                }
468                Err(Error::Internal("Document channel closed".into()))
469            }
470        }
471    }
472
473    /// Add multiple documents to the indexing queue.
474    ///
475    /// Returns the number of documents successfully queued. Stops at the first
476    /// `QueueFull` and returns the count queued so far.
477    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
478        let total = documents.len();
479        for (i, doc) in documents.into_iter().enumerate() {
480            match self.add_document(doc) {
481                Ok(()) => {}
482                Err(Error::QueueFull) => return Ok(i),
483                Err(e) => return Err(e),
484            }
485        }
486        Ok(total)
487    }
488
489    // ========================================================================
490    // Worker loop
491    // ========================================================================
492
493    /// Worker loop — runs on a dedicated OS thread, survives across commits.
494    ///
495    /// Outer loop: each iteration processes one commit cycle.
496    ///   Inner loop: pull documents from MPMC queue, index them, build segments
497    ///   when memory budget is exceeded.
498    ///   On channel close (prepare_commit): flush current builder, signal
499    ///   flush_count, wait for resume with new receiver.
500    ///   On shutdown (Drop): exit permanently.
501    fn worker_loop(
502        state: Arc<WorkerState<D>>,
503        initial_receiver: async_channel::Receiver<Document>,
504        handle: tokio::runtime::Handle,
505    ) {
506        let mut receiver = initial_receiver;
507        let mut my_epoch = 0usize;
508
509        loop {
510            // Wrap the recv+build phase in catch_unwind so a panic doesn't
511            // prevent flush_count from being signaled (which would hang
512            // prepare_commit forever).
513            let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
514                let mut builder: Option<SegmentBuilder> = None;
515
516                while let Ok(doc) = receiver.recv_blocking() {
517                    // Initialize builder if needed
518                    if builder.is_none() {
519                        match SegmentBuilder::new(
520                            Arc::clone(&state.schema),
521                            state.builder_config.clone(),
522                        ) {
523                            Ok(mut b) => {
524                                for (field, tokenizer) in state.tokenizers.read().iter() {
525                                    b.set_tokenizer(*field, tokenizer.clone_box());
526                                }
527                                builder = Some(b);
528                            }
529                            Err(e) => {
530                                log::error!("Failed to create segment builder: {:?}", e);
531                                continue;
532                            }
533                        }
534                    }
535
536                    let b = builder.as_mut().unwrap();
537                    if let Err(e) = b.add_document(doc) {
538                        log::error!("Failed to index document: {:?}", e);
539                        continue;
540                    }
541
542                    let builder_memory = b.estimated_memory_bytes();
543
544                    if b.num_docs() & 0x3FFF == 0 {
545                        log::debug!(
546                            "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
547                            b.num_docs(),
548                            builder_memory as f64 / (1024.0 * 1024.0),
549                            state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
550                        );
551                    }
552
553                    // Require minimum 100 docs before flushing to avoid tiny segments
554                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
555
556                    // Reserve 20% headroom for segment build overhead (vid_set,
557                    // VidLookup, postings_flat, grid_entries). These temporary
558                    // allocations exist alongside the builder's data during build.
559                    let effective_budget = state.memory_budget_per_worker * 4 / 5;
560
561                    if builder_memory >= effective_budget && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH {
562                        log::info!(
563                            "[indexing] memory budget reached, building segment: \
564                             docs={}, memory={:.2} MB, budget={:.2} MB",
565                            b.num_docs(),
566                            builder_memory as f64 / (1024.0 * 1024.0),
567                            state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
568                        );
569                        let full_builder = builder.take().unwrap();
570                        Self::build_segment_inline(&state, full_builder, &handle);
571                    }
572                }
573
574                // Channel closed — flush current builder
575                if let Some(b) = builder.take()
576                    && b.num_docs() > 0
577                {
578                    Self::build_segment_inline(&state, b, &handle);
579                }
580            }));
581
582            if build_result.is_err() {
583                log::error!(
584                    "[worker] panic during indexing cycle — documents in this cycle may be lost"
585                );
586            }
587
588            // Signal flush completion (always, even after panic — prevents
589            // prepare_commit from hanging)
590            let prev = state.flush_count.fetch_add(1, Ordering::Release);
591            if prev + 1 == state.num_workers {
592                // Last worker — wake prepare_commit
593                let _lock = state.flush_mutex.lock();
594                state.flush_cvar.notify_one();
595            }
596
597            // Wait for resume (new channel) or shutdown.
598            // Check resume_epoch to avoid re-cloning a stale receiver from
599            // a previous cycle.
600            {
601                let mut lock = state.resume_receiver.lock();
602                loop {
603                    if state.shutdown.load(Ordering::Acquire) {
604                        return;
605                    }
606                    let current_epoch = state.resume_epoch.load(Ordering::Acquire);
607                    if current_epoch > my_epoch
608                        && let Some(rx) = lock.as_ref()
609                    {
610                        receiver = rx.clone();
611                        my_epoch = current_epoch;
612                        break;
613                    }
614                    state.resume_cvar.wait(&mut lock);
615                }
616            }
617        }
618    }
619
620    /// Build a segment on the worker thread. Uses `Handle::block_on()` to bridge
621    /// into async context for I/O (streaming writers). CPU work (rayon) stays on
622    /// the worker thread / rayon pool.
623    fn build_segment_inline(
624        state: &WorkerState<D>,
625        builder: SegmentBuilder,
626        handle: &tokio::runtime::Handle,
627    ) {
628        let segment_id = SegmentId::new();
629        let segment_hex = segment_id.to_hex();
630        let trained = state.segment_manager.trained();
631        let doc_count = builder.num_docs();
632        let build_start = std::time::Instant::now();
633
634        log::info!(
635            "[segment_build] segment_id={} doc_count={} ann={}",
636            segment_hex,
637            doc_count,
638            trained.is_some()
639        );
640
641        match handle.block_on(builder.build(
642            state.directory.as_ref(),
643            segment_id,
644            trained.as_deref(),
645        )) {
646            Ok(meta) if meta.num_docs > 0 => {
647                let duration_ms = build_start.elapsed().as_millis() as u64;
648                log::info!(
649                    "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
650                    segment_hex,
651                    meta.num_docs,
652                    duration_ms,
653                );
654                state
655                    .built_segments
656                    .lock()
657                    .push((segment_hex, meta.num_docs));
658            }
659            Ok(_) => {}
660            Err(e) => {
661                log::error!(
662                    "[segment_build_failed] segment_id={} error={:?}",
663                    segment_hex,
664                    e
665                );
666            }
667        }
668    }
669
670    // ========================================================================
671    // Public API — commit, merge, etc.
672    // ========================================================================
673
674    /// Check merge policy and spawn a background merge if needed.
675    pub async fn maybe_merge(&self) {
676        self.segment_manager.maybe_merge().await;
677    }
678
679    /// Abort all in-flight merge tasks without waiting for completion.
680    pub async fn abort_merges(&self) {
681        self.segment_manager.abort_merges().await;
682    }
683
684    /// Wait for the in-flight background merge to complete (if any).
685    pub async fn wait_for_merging_thread(&self) {
686        self.segment_manager.wait_for_merging_thread().await;
687    }
688
689    /// Wait for all eligible merges to complete, including cascading merges.
690    pub async fn wait_for_all_merges(&self) {
691        self.segment_manager.wait_for_all_merges().await;
692    }
693
694    /// Get the segment tracker for sharing with readers.
695    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
696        self.segment_manager.tracker()
697    }
698
699    /// Acquire a snapshot of current segments for reading.
700    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
701        self.segment_manager.acquire_snapshot().await
702    }
703
704    /// Clean up orphan segment files not registered in metadata.
705    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
706        self.segment_manager.cleanup_orphan_segments().await
707    }
708
709    /// Prepare commit — signal workers to flush, wait for completion, collect segments.
710    ///
711    /// All documents sent via `add_document` before this call are guaranteed
712    /// to be written to segment files on disk. Segments are NOT yet registered
713    /// in metadata — call `PreparedCommit::commit()` for that.
714    ///
715    /// Workers are NOT destroyed — they flush their builders and wait for
716    /// `resume_workers()` to give them a new channel.
717    ///
718    /// `add_document` will return `Closed` error until commit/abort resumes workers.
719    pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
720        // 1. Close channel → workers drain remaining docs and flush builders
721        self.doc_sender.close();
722
723        // Wake any workers still waiting on resume_cvar from previous cycle.
724        // They'll clone the stale receiver, enter recv_blocking, get Err
725        // immediately (sender already closed), flush, and signal completion.
726        self.worker_state.resume_cvar.notify_all();
727
728        // 2. Wait for all workers to complete their flush (via spawn_blocking
729        //    to avoid blocking the tokio runtime)
730        let state = Arc::clone(&self.worker_state);
731        let all_flushed = tokio::task::spawn_blocking(move || {
732            let mut lock = state.flush_mutex.lock();
733            let deadline = std::time::Instant::now() + std::time::Duration::from_secs(300);
734            while state.flush_count.load(Ordering::Acquire) < state.num_workers {
735                let remaining = deadline.saturating_duration_since(std::time::Instant::now());
736                if remaining.is_zero() {
737                    log::error!(
738                        "[prepare_commit] timed out waiting for workers: {}/{} flushed",
739                        state.flush_count.load(Ordering::Acquire),
740                        state.num_workers
741                    );
742                    return false;
743                }
744                state.flush_cvar.wait_for(&mut lock, remaining);
745            }
746            true
747        })
748        .await
749        .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
750
751        if !all_flushed {
752            // Resume workers so the system isn't stuck, then return error
753            self.resume_workers();
754            return Err(Error::Internal(format!(
755                "prepare_commit timed out: {}/{} workers flushed",
756                self.worker_state.flush_count.load(Ordering::Acquire),
757                self.worker_state.num_workers
758            )));
759        }
760
761        // 3. Collect built segments
762        let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
763        self.flushed_segments.extend(built);
764
765        Ok(PreparedCommit {
766            writer: self,
767            is_resolved: false,
768        })
769    }
770
771    /// Commit (convenience): prepare_commit + commit in one call.
772    ///
773    /// Guarantees all prior `add_document` calls are committed.
774    /// Vector training is decoupled — call `build_vector_index()` manually.
775    pub async fn commit(&mut self) -> Result<bool> {
776        self.prepare_commit().await?.commit().await
777    }
778
779    /// Force merge all segments into one.
780    pub async fn force_merge(&mut self) -> Result<()> {
781        self.prepare_commit().await?.commit().await?;
782        self.segment_manager.force_merge().await
783    }
784
785    /// Reorder all segments via Recursive Graph Bisection (BP) for better BMP pruning.
786    ///
787    /// Each segment is individually rebuilt with record-level BP reordering:
788    /// ordinals are shuffled across blocks so that similar content clusters tightly.
789    pub async fn reorder(&mut self) -> Result<()> {
790        self.prepare_commit().await?.commit().await?;
791        self.segment_manager.reorder_segments().await
792    }
793
794    /// Get the segment manager (for background optimizer access).
795    pub fn segment_manager(&self) -> &Arc<crate::merge::SegmentManager<D>> {
796        &self.segment_manager
797    }
798
799    /// Resume workers with a fresh channel. Called after commit or abort.
800    ///
801    /// Workers are already alive — just give them a new channel and wake them.
802    /// If the tokio runtime has shut down (e.g., program exit), this is a no-op.
803    fn resume_workers(&mut self) {
804        if tokio::runtime::Handle::try_current().is_err() {
805            // Runtime is gone — signal permanent shutdown so workers don't
806            // hang forever on resume_cvar.
807            self.worker_state.shutdown.store(true, Ordering::Release);
808            self.worker_state.resume_cvar.notify_all();
809            return;
810        }
811
812        // Reset flush count for next cycle
813        self.worker_state.flush_count.store(0, Ordering::Release);
814
815        // Create new channel
816        let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
817        self.doc_sender = sender;
818
819        // Set new receiver, bump epoch, and wake all workers
820        {
821            let mut lock = self.worker_state.resume_receiver.lock();
822            *lock = Some(receiver);
823        }
824        self.worker_state
825            .resume_epoch
826            .fetch_add(1, Ordering::Release);
827        self.worker_state.resume_cvar.notify_all();
828    }
829
830    // Vector index methods (build_vector_index, etc.) are in vector_builder.rs
831}
832
833impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
834    fn drop(&mut self) {
835        // 1. Signal permanent shutdown
836        self.worker_state.shutdown.store(true, Ordering::Release);
837        // 2. Close channel to wake workers blocked on recv_blocking
838        self.doc_sender.close();
839        // 3. Wake workers that might be waiting on resume_cvar
840        self.worker_state.resume_cvar.notify_all();
841        // 4. Join worker threads
842        for w in std::mem::take(&mut self.workers) {
843            let _ = w.join();
844        }
845    }
846}
847
848/// A prepared commit that can be finalized or aborted.
849///
850/// Two-phase commit guard. Between `prepare_commit()` and
851/// `commit()`/`abort()`, segments are on disk but NOT in metadata.
852/// Dropping without calling either will auto-abort (discard segments,
853/// respawn workers).
854pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
855    writer: &'a mut IndexWriter<D>,
856    is_resolved: bool,
857}
858
859impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
860    /// Finalize: register segments in metadata, evaluate merge policy, resume workers.
861    ///
862    /// Returns `true` if new segments were committed, `false` if nothing changed.
863    pub async fn commit(mut self) -> Result<bool> {
864        self.is_resolved = true;
865        let segments = std::mem::take(&mut self.writer.flushed_segments);
866
867        // Fast path: nothing to commit
868        if segments.is_empty() {
869            log::debug!("[commit] no segments to commit, skipping");
870            self.writer.resume_workers();
871            return Ok(false);
872        }
873
874        self.writer.segment_manager.commit(segments).await?;
875
876        // Refresh primary key index: only load fast fields for NEW segments.
877        if let Some(ref mut pk_index) = self.writer.primary_key_index {
878            let snapshot = self.writer.segment_manager.acquire_snapshot().await;
879            let existing_ids: std::collections::HashSet<&str> =
880                pk_index.committed_segment_ids().collect();
881
882            // Only load fast fields for segments not already held.
883            let load_futures: Vec<_> = snapshot
884                .segment_ids()
885                .iter()
886                .filter(|id| !existing_ids.contains(id.as_str()))
887                .map(|seg_id_str| {
888                    let seg_id_str = seg_id_str.clone();
889                    let dir = self.writer.directory.as_ref();
890                    let schema = Arc::clone(&self.writer.schema);
891                    async move { load_pk_segment_data(dir, &seg_id_str, &schema).await }
892                })
893                .collect();
894            let new_data = futures::future::try_join_all(load_futures).await?;
895
896            let seg_ids: Vec<String> = snapshot.segment_ids().to_vec();
897            pk_index.refresh_incremental(new_data, snapshot);
898
899            // Persist bloom cache (extract bytes to avoid borrow conflict).
900            let bloom_bytes = pk_index.bloom_to_bytes();
901            let data = super::primary_key::serialize_pk_bloom(&seg_ids, &bloom_bytes);
902            if let Err(e) = self
903                .writer
904                .directory
905                .write(
906                    std::path::Path::new(super::primary_key::PK_BLOOM_FILE),
907                    &data,
908                )
909                .await
910            {
911                log::warn!("[primary_key] failed to persist bloom cache: {}", e);
912            }
913        }
914
915        self.writer.segment_manager.maybe_merge().await;
916        self.writer.resume_workers();
917        Ok(true)
918    }
919
920    /// Abort: discard prepared segments, resume workers.
921    /// Segment files become orphans (cleaned up by `cleanup_orphan_segments`).
922    pub fn abort(mut self) {
923        self.is_resolved = true;
924        self.writer.flushed_segments.clear();
925        if let Some(ref mut pk_index) = self.writer.primary_key_index {
926            pk_index.clear_uncommitted();
927        }
928        self.writer.resume_workers();
929    }
930}
931
932impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
933    fn drop(&mut self) {
934        if !self.is_resolved {
935            log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
936            self.writer.flushed_segments.clear();
937            if let Some(ref mut pk_index) = self.writer.primary_key_index {
938                pk_index.clear_uncommitted();
939            }
940            self.writer.resume_workers();
941        }
942    }
943}
944
945/// Load only fast-field data for a segment (lightweight alternative to full SegmentReader).
946async fn load_pk_segment_data<D: crate::directories::Directory>(
947    dir: &D,
948    seg_id_str: &str,
949    schema: &Arc<crate::dsl::Schema>,
950) -> Result<super::primary_key::PkSegmentData> {
951    let seg_id = crate::segment::SegmentId::from_hex(seg_id_str)
952        .ok_or_else(|| Error::Internal(format!("Invalid segment id: {}", seg_id_str)))?;
953    let files = crate::segment::SegmentFiles::new(seg_id.0);
954    let fast_fields =
955        crate::segment::reader::loader::load_fast_fields_file(dir, &files, schema).await?;
956    Ok(super::primary_key::PkSegmentData {
957        segment_id: seg_id_str.to_string(),
958        fast_fields,
959    })
960}