hermes_core/index/writer.rs
1//! IndexWriter — async document indexing with parallel segment building.
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! # Architecture
6//!
7//! ```text
8//! add_document() ──try_send──► [shared bounded MPMC] ◄──recv── worker 0
9//! ◄──recv── worker 1
10//! ◄──recv── worker N
11//! ```
12//!
13//! - **Shared MPMC queue** (`async_channel`): all workers compete for documents.
14//! Busy workers (building segments) naturally stop pulling; free workers pick up slack.
15//! - **Zero-copy pipeline**: `Document` is moved (never cloned) through every stage:
16//! `add_document()` → channel → `recv_blocking()` → `SegmentBuilder::add_document()`.
17//! - `add_document` returns `QueueFull` when the queue is at capacity.
18//! - **Workers are OS threads**: CPU-intensive work (tokenization, posting list building)
19//! runs on dedicated threads, never blocking the tokio async runtime.
20//! Async I/O (segment file writes) is bridged via `Handle::block_on()`.
21//! - **Fixed per-worker memory budget**: `max_indexing_memory_bytes / num_workers`.
22//! - **Two-phase commit**:
23//! 1. `prepare_commit()` — closes queue, workers flush builders to disk.
24//! Returns a `PreparedCommit` guard. No new documents accepted until resolved.
25//! 2. `PreparedCommit::commit()` — registers segments in metadata, resumes workers.
26//! 3. `PreparedCommit::abort()` — discards prepared segments, resumes workers.
27//! 4. `commit()` — convenience: `prepare_commit().await?.commit().await`.
28//!
29//! Since `prepare_commit`/`commit` take `&mut self`, Rust’s borrow checker
30//! guarantees no concurrent `add_document` calls during the commit window.
31
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45/// Total pipeline capacity (in documents).
46const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48/// Async IndexWriter for adding documents and committing segments.
49///
50/// **Backpressure:** `add_document()` is sync, O(1). Returns `Error::QueueFull`
51/// when the shared queue is at capacity — caller must back off.
52///
53/// **Two-phase commit:**
54/// - `prepare_commit()` → `PreparedCommit::commit()` or `PreparedCommit::abort()`
55/// - `commit()` is a convenience that does both phases.
56/// - Between prepare and commit, the caller can do external work (WAL, sync, etc.)
57/// knowing that abort is possible if something fails.
58/// - Dropping `PreparedCommit` without calling commit/abort auto-aborts.
59pub struct IndexWriter<D: DirectoryWriter + 'static> {
60 pub(super) directory: Arc<D>,
61 pub(super) schema: Arc<Schema>,
62 pub(super) config: IndexConfig,
63 /// MPMC sender — `try_send(&self)` is thread-safe, no lock needed.
64 /// Replaced on each commit cycle (workers get new receiver via resume).
65 doc_sender: async_channel::Sender<Document>,
66 /// Worker OS thread handles — long-lived, survive across commits.
67 workers: Vec<std::thread::JoinHandle<()>>,
68 /// Shared worker state (immutable config + mutable segment output + sync)
69 worker_state: Arc<WorkerState<D>>,
70 /// Segment manager — owns metadata.json, handles segments and background merging
71 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72 /// Segments flushed to disk but not yet registered in metadata
73 flushed_segments: Vec<(String, u32)>,
74}
75
76/// Shared state for worker threads.
77struct WorkerState<D: DirectoryWriter + 'static> {
78 directory: Arc<D>,
79 schema: Arc<Schema>,
80 builder_config: SegmentBuilderConfig,
81 tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
82 /// Fixed per-worker memory budget (bytes). When a builder exceeds this, segment is built.
83 memory_budget_per_worker: usize,
84 /// Segment manager — workers read trained structures from its ArcSwap (lock-free).
85 segment_manager: Arc<crate::merge::SegmentManager<D>>,
86 /// Segments built by workers, collected by `prepare_commit()`. Sync mutex for sub-μs push.
87 built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
88
89 // === Worker lifecycle synchronization ===
90 // Workers survive across commits. On prepare_commit the channel is closed;
91 // workers flush their builders, increment flush_count, then wait on
92 // resume_cvar for a new receiver. commit/abort creates a fresh channel
93 // and wakes them.
94 /// Number of workers that have completed their flush.
95 flush_count: AtomicUsize,
96 /// Mutex + condvar for prepare_commit to wait on all workers flushed.
97 flush_mutex: parking_lot::Mutex<()>,
98 flush_cvar: parking_lot::Condvar,
99 /// Holds the new channel receiver after commit/abort. Workers clone from this.
100 resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
101 /// Monotonically increasing epoch, bumped by each resume_workers call.
102 /// Workers compare against their local epoch to avoid re-cloning a stale receiver.
103 resume_epoch: AtomicUsize,
104 /// Condvar for workers to wait for resume (new channel) or shutdown.
105 resume_cvar: parking_lot::Condvar,
106 /// When true, workers should exit permanently (IndexWriter dropped).
107 shutdown: AtomicBool,
108 /// Total number of worker threads.
109 num_workers: usize,
110}
111
112impl<D: DirectoryWriter + 'static> IndexWriter<D> {
113 /// Create a new index in the directory
114 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
115 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
116 }
117
118 /// Create a new index with custom builder config
119 pub async fn create_with_config(
120 directory: D,
121 schema: Schema,
122 config: IndexConfig,
123 builder_config: SegmentBuilderConfig,
124 ) -> Result<Self> {
125 let directory = Arc::new(directory);
126 let schema = Arc::new(schema);
127 let metadata = super::IndexMetadata::new((*schema).clone());
128
129 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
130 Arc::clone(&directory),
131 Arc::clone(&schema),
132 metadata,
133 config.merge_policy.clone_box(),
134 config.term_cache_blocks,
135 config.max_concurrent_merges,
136 ));
137 segment_manager.update_metadata(|_| {}).await?;
138
139 Ok(Self::new_with_parts(
140 directory,
141 schema,
142 config,
143 builder_config,
144 segment_manager,
145 ))
146 }
147
148 /// Open an existing index for writing
149 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
150 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
151 }
152
153 /// Open an existing index with custom builder config
154 pub async fn open_with_config(
155 directory: D,
156 config: IndexConfig,
157 builder_config: SegmentBuilderConfig,
158 ) -> Result<Self> {
159 let directory = Arc::new(directory);
160 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
161 let schema = Arc::new(metadata.schema.clone());
162
163 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
164 Arc::clone(&directory),
165 Arc::clone(&schema),
166 metadata,
167 config.merge_policy.clone_box(),
168 config.term_cache_blocks,
169 config.max_concurrent_merges,
170 ));
171 segment_manager.load_and_publish_trained().await;
172
173 Ok(Self::new_with_parts(
174 directory,
175 schema,
176 config,
177 builder_config,
178 segment_manager,
179 ))
180 }
181
182 /// Create an IndexWriter from an existing Index.
183 /// Shares the SegmentManager for consistent segment lifecycle management.
184 pub fn from_index(index: &super::Index<D>) -> Self {
185 Self::new_with_parts(
186 Arc::clone(&index.directory),
187 Arc::clone(&index.schema),
188 index.config.clone(),
189 SegmentBuilderConfig::default(),
190 Arc::clone(&index.segment_manager),
191 )
192 }
193
194 // ========================================================================
195 // Construction + pipeline management
196 // ========================================================================
197
198 /// Common construction: creates worker state, spawns workers, assembles `Self`.
199 fn new_with_parts(
200 directory: Arc<D>,
201 schema: Arc<Schema>,
202 config: IndexConfig,
203 builder_config: SegmentBuilderConfig,
204 segment_manager: Arc<crate::merge::SegmentManager<D>>,
205 ) -> Self {
206 // Auto-configure tokenizers from schema for all text fields
207 let registry = crate::tokenizer::TokenizerRegistry::new();
208 let mut tokenizers = FxHashMap::default();
209 for (field, entry) in schema.fields() {
210 if matches!(entry.field_type, crate::dsl::FieldType::Text)
211 && let Some(ref tok_name) = entry.tokenizer
212 && let Some(tok) = registry.get(tok_name)
213 {
214 tokenizers.insert(field, tok);
215 }
216 }
217
218 let num_workers = config.num_indexing_threads.max(1);
219 let worker_state = Arc::new(WorkerState {
220 directory: Arc::clone(&directory),
221 schema: Arc::clone(&schema),
222 builder_config,
223 tokenizers: parking_lot::RwLock::new(tokenizers),
224 memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
225 segment_manager: Arc::clone(&segment_manager),
226 built_segments: parking_lot::Mutex::new(Vec::new()),
227 flush_count: AtomicUsize::new(0),
228 flush_mutex: parking_lot::Mutex::new(()),
229 flush_cvar: parking_lot::Condvar::new(),
230 resume_receiver: parking_lot::Mutex::new(None),
231 resume_epoch: AtomicUsize::new(0),
232 resume_cvar: parking_lot::Condvar::new(),
233 shutdown: AtomicBool::new(false),
234 num_workers,
235 });
236 let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
237
238 Self {
239 directory,
240 schema,
241 config,
242 doc_sender,
243 workers,
244 worker_state,
245 segment_manager,
246 flushed_segments: Vec::new(),
247 }
248 }
249
250 fn spawn_workers(
251 worker_state: &Arc<WorkerState<D>>,
252 num_workers: usize,
253 ) -> (
254 async_channel::Sender<Document>,
255 Vec<std::thread::JoinHandle<()>>,
256 ) {
257 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
258 let handle = tokio::runtime::Handle::current();
259 let mut workers = Vec::with_capacity(num_workers);
260 for i in 0..num_workers {
261 let state = Arc::clone(worker_state);
262 let rx = receiver.clone();
263 let rt = handle.clone();
264 workers.push(
265 std::thread::Builder::new()
266 .name(format!("index-worker-{}", i))
267 .spawn(move || Self::worker_loop(state, rx, rt))
268 .expect("failed to spawn index worker thread"),
269 );
270 }
271 (sender, workers)
272 }
273
274 /// Get the schema
275 pub fn schema(&self) -> &Schema {
276 &self.schema
277 }
278
279 /// Set tokenizer for a field.
280 /// Propagated to worker threads — takes effect for the next SegmentBuilder they create.
281 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
282 self.worker_state
283 .tokenizers
284 .write()
285 .insert(field, Box::new(tokenizer));
286 }
287
288 /// Add a document to the indexing queue (sync, O(1), lock-free).
289 ///
290 /// `Document` is moved into the channel (zero-copy). Workers compete to pull it.
291 /// Returns `Error::QueueFull` when the queue is at capacity — caller must back off.
292 pub fn add_document(&self, doc: Document) -> Result<()> {
293 self.doc_sender.try_send(doc).map_err(|e| match e {
294 async_channel::TrySendError::Full(_) => Error::QueueFull,
295 async_channel::TrySendError::Closed(_) => {
296 Error::Internal("Document channel closed".into())
297 }
298 })
299 }
300
301 /// Add multiple documents to the indexing queue.
302 ///
303 /// Returns the number of documents successfully queued. Stops at the first
304 /// `QueueFull` and returns the count queued so far.
305 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
306 let total = documents.len();
307 for (i, doc) in documents.into_iter().enumerate() {
308 match self.add_document(doc) {
309 Ok(()) => {}
310 Err(Error::QueueFull) => return Ok(i),
311 Err(e) => return Err(e),
312 }
313 }
314 Ok(total)
315 }
316
317 // ========================================================================
318 // Worker loop
319 // ========================================================================
320
321 /// Worker loop — runs on a dedicated OS thread, survives across commits.
322 ///
323 /// Outer loop: each iteration processes one commit cycle.
324 /// Inner loop: pull documents from MPMC queue, index them, build segments
325 /// when memory budget is exceeded.
326 /// On channel close (prepare_commit): flush current builder, signal
327 /// flush_count, wait for resume with new receiver.
328 /// On shutdown (Drop): exit permanently.
329 fn worker_loop(
330 state: Arc<WorkerState<D>>,
331 initial_receiver: async_channel::Receiver<Document>,
332 handle: tokio::runtime::Handle,
333 ) {
334 let mut receiver = initial_receiver;
335 let mut my_epoch = 0usize;
336
337 loop {
338 // Wrap the recv+build phase in catch_unwind so a panic doesn't
339 // prevent flush_count from being signaled (which would hang
340 // prepare_commit forever).
341 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
342 let mut builder: Option<SegmentBuilder> = None;
343
344 while let Ok(doc) = receiver.recv_blocking() {
345 // Initialize builder if needed
346 if builder.is_none() {
347 match SegmentBuilder::new(
348 Arc::clone(&state.schema),
349 state.builder_config.clone(),
350 ) {
351 Ok(mut b) => {
352 for (field, tokenizer) in state.tokenizers.read().iter() {
353 b.set_tokenizer(*field, tokenizer.clone_box());
354 }
355 builder = Some(b);
356 }
357 Err(e) => {
358 log::error!("Failed to create segment builder: {:?}", e);
359 continue;
360 }
361 }
362 }
363
364 let b = builder.as_mut().unwrap();
365 if let Err(e) = b.add_document(doc) {
366 log::error!("Failed to index document: {:?}", e);
367 continue;
368 }
369
370 let builder_memory = b.estimated_memory_bytes();
371
372 if b.num_docs() & 0x3FFF == 0 {
373 log::debug!(
374 "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
375 b.num_docs(),
376 builder_memory as f64 / (1024.0 * 1024.0),
377 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
378 );
379 }
380
381 // Require minimum 100 docs before flushing to avoid tiny segments
382 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
383
384 if builder_memory >= state.memory_budget_per_worker
385 && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH
386 {
387 log::info!(
388 "[indexing] memory budget reached, building segment: \
389 docs={}, memory={:.2} MB, budget={:.2} MB",
390 b.num_docs(),
391 builder_memory as f64 / (1024.0 * 1024.0),
392 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
393 );
394 let full_builder = builder.take().unwrap();
395 Self::build_segment_inline(&state, full_builder, &handle);
396 }
397 }
398
399 // Channel closed — flush current builder
400 if let Some(b) = builder.take()
401 && b.num_docs() > 0
402 {
403 Self::build_segment_inline(&state, b, &handle);
404 }
405 }));
406
407 if build_result.is_err() {
408 log::error!(
409 "[worker] panic during indexing cycle — documents in this cycle may be lost"
410 );
411 }
412
413 // Signal flush completion (always, even after panic — prevents
414 // prepare_commit from hanging)
415 let prev = state.flush_count.fetch_add(1, Ordering::Release);
416 if prev + 1 == state.num_workers {
417 // Last worker — wake prepare_commit
418 let _lock = state.flush_mutex.lock();
419 state.flush_cvar.notify_one();
420 }
421
422 // Wait for resume (new channel) or shutdown.
423 // Check resume_epoch to avoid re-cloning a stale receiver from
424 // a previous cycle.
425 {
426 let mut lock = state.resume_receiver.lock();
427 loop {
428 if state.shutdown.load(Ordering::Acquire) {
429 return;
430 }
431 let current_epoch = state.resume_epoch.load(Ordering::Acquire);
432 if current_epoch > my_epoch
433 && let Some(rx) = lock.as_ref()
434 {
435 receiver = rx.clone();
436 my_epoch = current_epoch;
437 break;
438 }
439 state.resume_cvar.wait(&mut lock);
440 }
441 }
442 }
443 }
444
445 /// Build a segment on the worker thread. Uses `Handle::block_on()` to bridge
446 /// into async context for I/O (streaming writers). CPU work (rayon) stays on
447 /// the worker thread / rayon pool.
448 fn build_segment_inline(
449 state: &WorkerState<D>,
450 builder: SegmentBuilder,
451 handle: &tokio::runtime::Handle,
452 ) {
453 let segment_id = SegmentId::new();
454 let segment_hex = segment_id.to_hex();
455 let trained = state.segment_manager.trained();
456 let doc_count = builder.num_docs();
457 let build_start = std::time::Instant::now();
458
459 log::info!(
460 "[segment_build] segment_id={} doc_count={} ann={}",
461 segment_hex,
462 doc_count,
463 trained.is_some()
464 );
465
466 match handle.block_on(builder.build(
467 state.directory.as_ref(),
468 segment_id,
469 trained.as_deref(),
470 )) {
471 Ok(meta) if meta.num_docs > 0 => {
472 let duration_ms = build_start.elapsed().as_millis() as u64;
473 log::info!(
474 "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
475 segment_hex,
476 meta.num_docs,
477 duration_ms,
478 );
479 state
480 .built_segments
481 .lock()
482 .push((segment_hex, meta.num_docs));
483 }
484 Ok(_) => {}
485 Err(e) => {
486 log::error!(
487 "[segment_build_failed] segment_id={} error={:?}",
488 segment_hex,
489 e
490 );
491 }
492 }
493 }
494
495 // ========================================================================
496 // Public API — commit, merge, etc.
497 // ========================================================================
498
499 /// Check merge policy and spawn a background merge if needed.
500 pub async fn maybe_merge(&self) {
501 self.segment_manager.maybe_merge().await;
502 }
503
504 /// Wait for the in-flight background merge to complete (if any).
505 pub async fn wait_for_merging_thread(&self) {
506 self.segment_manager.wait_for_merging_thread().await;
507 }
508
509 /// Wait for all eligible merges to complete, including cascading merges.
510 pub async fn wait_for_all_merges(&self) {
511 self.segment_manager.wait_for_all_merges().await;
512 }
513
514 /// Get the segment tracker for sharing with readers.
515 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
516 self.segment_manager.tracker()
517 }
518
519 /// Acquire a snapshot of current segments for reading.
520 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
521 self.segment_manager.acquire_snapshot().await
522 }
523
524 /// Clean up orphan segment files not registered in metadata.
525 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
526 self.segment_manager.cleanup_orphan_segments().await
527 }
528
529 /// Prepare commit — signal workers to flush, wait for completion, collect segments.
530 ///
531 /// All documents sent via `add_document` before this call are guaranteed
532 /// to be written to segment files on disk. Segments are NOT yet registered
533 /// in metadata — call `PreparedCommit::commit()` for that.
534 ///
535 /// Workers are NOT destroyed — they flush their builders and wait for
536 /// `resume_workers()` to give them a new channel.
537 ///
538 /// `add_document` will return `Closed` error until commit/abort resumes workers.
539 pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
540 // 1. Close channel → workers drain remaining docs and flush builders
541 self.doc_sender.close();
542
543 // Wake any workers still waiting on resume_cvar from previous cycle.
544 // They'll clone the stale receiver, enter recv_blocking, get Err
545 // immediately (sender already closed), flush, and signal completion.
546 self.worker_state.resume_cvar.notify_all();
547
548 // 2. Wait for all workers to complete their flush (via spawn_blocking
549 // to avoid blocking the tokio runtime)
550 let state = Arc::clone(&self.worker_state);
551 tokio::task::spawn_blocking(move || {
552 let mut lock = state.flush_mutex.lock();
553 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(300);
554 while state.flush_count.load(Ordering::Acquire) < state.num_workers {
555 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
556 if remaining.is_zero() {
557 log::error!(
558 "[prepare_commit] timed out waiting for workers: {}/{} flushed",
559 state.flush_count.load(Ordering::Acquire),
560 state.num_workers
561 );
562 break;
563 }
564 state.flush_cvar.wait_for(&mut lock, remaining);
565 }
566 })
567 .await
568 .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
569
570 // 3. Collect built segments
571 let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
572 self.flushed_segments.extend(built);
573
574 Ok(PreparedCommit {
575 writer: self,
576 is_resolved: false,
577 })
578 }
579
580 /// Commit (convenience): prepare_commit + commit in one call.
581 ///
582 /// Guarantees all prior `add_document` calls are committed.
583 /// Vector training is decoupled — call `build_vector_index()` manually.
584 pub async fn commit(&mut self) -> Result<()> {
585 self.prepare_commit().await?.commit().await
586 }
587
588 /// Force merge all segments into one.
589 pub async fn force_merge(&mut self) -> Result<()> {
590 self.prepare_commit().await?.commit().await?;
591 self.segment_manager.force_merge().await
592 }
593
594 /// Resume workers with a fresh channel. Called after commit or abort.
595 ///
596 /// Workers are already alive — just give them a new channel and wake them.
597 /// If the tokio runtime has shut down (e.g., program exit), this is a no-op.
598 fn resume_workers(&mut self) {
599 if tokio::runtime::Handle::try_current().is_err() {
600 // Runtime is gone — signal permanent shutdown so workers don't
601 // hang forever on resume_cvar.
602 self.worker_state.shutdown.store(true, Ordering::Release);
603 self.worker_state.resume_cvar.notify_all();
604 return;
605 }
606
607 // Reset flush count for next cycle
608 self.worker_state.flush_count.store(0, Ordering::Release);
609
610 // Create new channel
611 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
612 self.doc_sender = sender;
613
614 // Set new receiver, bump epoch, and wake all workers
615 {
616 let mut lock = self.worker_state.resume_receiver.lock();
617 *lock = Some(receiver);
618 }
619 self.worker_state
620 .resume_epoch
621 .fetch_add(1, Ordering::Release);
622 self.worker_state.resume_cvar.notify_all();
623 }
624
625 // Vector index methods (build_vector_index, etc.) are in vector_builder.rs
626}
627
628impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
629 fn drop(&mut self) {
630 // 1. Signal permanent shutdown
631 self.worker_state.shutdown.store(true, Ordering::Release);
632 // 2. Close channel to wake workers blocked on recv_blocking
633 self.doc_sender.close();
634 // 3. Wake workers that might be waiting on resume_cvar
635 self.worker_state.resume_cvar.notify_all();
636 // 4. Join worker threads
637 for w in std::mem::take(&mut self.workers) {
638 let _ = w.join();
639 }
640 }
641}
642
643/// A prepared commit that can be finalized or aborted.
644///
645/// Two-phase commit guard. Between `prepare_commit()` and
646/// `commit()`/`abort()`, segments are on disk but NOT in metadata.
647/// Dropping without calling either will auto-abort (discard segments,
648/// respawn workers).
649pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
650 writer: &'a mut IndexWriter<D>,
651 is_resolved: bool,
652}
653
654impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
655 /// Finalize: register segments in metadata, evaluate merge policy, resume workers.
656 pub async fn commit(mut self) -> Result<()> {
657 self.is_resolved = true;
658 let segments = std::mem::take(&mut self.writer.flushed_segments);
659 self.writer.segment_manager.commit(segments).await?;
660 self.writer.segment_manager.maybe_merge().await;
661 self.writer.resume_workers();
662 Ok(())
663 }
664
665 /// Abort: discard prepared segments, resume workers.
666 /// Segment files become orphans (cleaned up by `cleanup_orphan_segments`).
667 pub fn abort(mut self) {
668 self.is_resolved = true;
669 self.writer.flushed_segments.clear();
670 self.writer.resume_workers();
671 }
672}
673
674impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
675 fn drop(&mut self) {
676 if !self.is_resolved {
677 log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
678 self.writer.flushed_segments.clear();
679 self.writer.resume_workers();
680 }
681 }
682}