hermes_core/index/writer.rs
1//! IndexWriter — async document indexing with parallel segment building.
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! # Architecture
6//!
7//! ```text
8//! add_document() ──try_send──► [shared bounded MPMC] ◄──recv── worker 0
9//! ◄──recv── worker 1
10//! ◄──recv── worker N
11//! ```
12//!
13//! - **Shared MPMC queue** (`async_channel`): all workers compete for documents.
14//! Busy workers (building segments) naturally stop pulling; free workers pick up slack.
15//! - **Zero-copy pipeline**: `Document` is moved (never cloned) through every stage:
16//! `add_document()` → channel → `recv_blocking()` → `SegmentBuilder::add_document()`.
17//! - `add_document` returns `QueueFull` when the queue is at capacity.
18//! - **Workers are OS threads**: CPU-intensive work (tokenization, posting list building)
19//! runs on dedicated threads, never blocking the tokio async runtime.
20//! Async I/O (segment file writes) is bridged via `Handle::block_on()`.
21//! - **Fixed per-worker memory budget**: `max_indexing_memory_bytes / num_workers`.
22//! - **Two-phase commit**:
23//! 1. `prepare_commit()` — closes queue, workers flush builders to disk.
24//! Returns a `PreparedCommit` guard. No new documents accepted until resolved.
25//! 2. `PreparedCommit::commit()` — registers segments in metadata, resumes workers.
26//! 3. `PreparedCommit::abort()` — discards prepared segments, resumes workers.
27//! 4. `commit()` — convenience: `prepare_commit().await?.commit().await`.
28//!
29//! Since `prepare_commit`/`commit` take `&mut self`, Rust’s borrow checker
30//! guarantees no concurrent `add_document` calls during the commit window.
31
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45/// Total pipeline capacity (in documents).
46const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48/// Async IndexWriter for adding documents and committing segments.
49///
50/// **Backpressure:** `add_document()` is sync, O(1). Returns `Error::QueueFull`
51/// when the shared queue is at capacity — caller must back off.
52///
53/// **Two-phase commit:**
54/// - `prepare_commit()` → `PreparedCommit::commit()` or `PreparedCommit::abort()`
55/// - `commit()` is a convenience that does both phases.
56/// - Between prepare and commit, the caller can do external work (WAL, sync, etc.)
57/// knowing that abort is possible if something fails.
58/// - Dropping `PreparedCommit` without calling commit/abort auto-aborts.
59pub struct IndexWriter<D: DirectoryWriter + 'static> {
60 pub(super) directory: Arc<D>,
61 pub(super) schema: Arc<Schema>,
62 pub(super) config: IndexConfig,
63 /// MPMC sender — `try_send(&self)` is thread-safe, no lock needed.
64 /// Replaced on each commit cycle (workers get new receiver via resume).
65 doc_sender: async_channel::Sender<Document>,
66 /// Worker OS thread handles — long-lived, survive across commits.
67 workers: Vec<std::thread::JoinHandle<()>>,
68 /// Shared worker state (immutable config + mutable segment output + sync)
69 worker_state: Arc<WorkerState<D>>,
70 /// Segment manager — owns metadata.json, handles segments and background merging
71 pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72 /// Segments flushed to disk but not yet registered in metadata
73 flushed_segments: Vec<(String, u32)>,
74}
75
76/// Shared state for worker threads.
77struct WorkerState<D: DirectoryWriter + 'static> {
78 directory: Arc<D>,
79 schema: Arc<Schema>,
80 builder_config: SegmentBuilderConfig,
81 tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
82 /// Fixed per-worker memory budget (bytes). When a builder exceeds this, segment is built.
83 memory_budget_per_worker: usize,
84 /// Segment manager — workers read trained structures from its ArcSwap (lock-free).
85 segment_manager: Arc<crate::merge::SegmentManager<D>>,
86 /// Segments built by workers, collected by `prepare_commit()`. Sync mutex for sub-μs push.
87 built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
88
89 // === Worker lifecycle synchronization ===
90 // Workers survive across commits. On prepare_commit the channel is closed;
91 // workers flush their builders, increment flush_count, then wait on
92 // resume_cvar for a new receiver. commit/abort creates a fresh channel
93 // and wakes them.
94 /// Number of workers that have completed their flush.
95 flush_count: AtomicUsize,
96 /// Mutex + condvar for prepare_commit to wait on all workers flushed.
97 flush_mutex: parking_lot::Mutex<()>,
98 flush_cvar: parking_lot::Condvar,
99 /// Holds the new channel receiver after commit/abort. Workers clone from this.
100 resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
101 /// Monotonically increasing epoch, bumped by each resume_workers call.
102 /// Workers compare against their local epoch to avoid re-cloning a stale receiver.
103 resume_epoch: AtomicUsize,
104 /// Condvar for workers to wait for resume (new channel) or shutdown.
105 resume_cvar: parking_lot::Condvar,
106 /// When true, workers should exit permanently (IndexWriter dropped).
107 shutdown: AtomicBool,
108 /// Total number of worker threads.
109 num_workers: usize,
110}
111
112impl<D: DirectoryWriter + 'static> IndexWriter<D> {
113 /// Create a new index in the directory
114 pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
115 Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
116 }
117
118 /// Create a new index with custom builder config
119 pub async fn create_with_config(
120 directory: D,
121 schema: Schema,
122 config: IndexConfig,
123 builder_config: SegmentBuilderConfig,
124 ) -> Result<Self> {
125 let directory = Arc::new(directory);
126 let schema = Arc::new(schema);
127 let metadata = super::IndexMetadata::new((*schema).clone());
128
129 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
130 Arc::clone(&directory),
131 Arc::clone(&schema),
132 metadata,
133 config.merge_policy.clone_box(),
134 config.term_cache_blocks,
135 config.max_concurrent_merges,
136 ));
137 segment_manager.update_metadata(|_| {}).await?;
138
139 Ok(Self::new_with_parts(
140 directory,
141 schema,
142 config,
143 builder_config,
144 segment_manager,
145 ))
146 }
147
148 /// Open an existing index for writing
149 pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
150 Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
151 }
152
153 /// Open an existing index with custom builder config
154 pub async fn open_with_config(
155 directory: D,
156 config: IndexConfig,
157 builder_config: SegmentBuilderConfig,
158 ) -> Result<Self> {
159 let directory = Arc::new(directory);
160 let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
161 let schema = Arc::new(metadata.schema.clone());
162
163 let segment_manager = Arc::new(crate::merge::SegmentManager::new(
164 Arc::clone(&directory),
165 Arc::clone(&schema),
166 metadata,
167 config.merge_policy.clone_box(),
168 config.term_cache_blocks,
169 config.max_concurrent_merges,
170 ));
171 segment_manager.load_and_publish_trained().await;
172
173 Ok(Self::new_with_parts(
174 directory,
175 schema,
176 config,
177 builder_config,
178 segment_manager,
179 ))
180 }
181
182 /// Create an IndexWriter from an existing Index.
183 /// Shares the SegmentManager for consistent segment lifecycle management.
184 pub fn from_index(index: &super::Index<D>) -> Self {
185 Self::new_with_parts(
186 Arc::clone(&index.directory),
187 Arc::clone(&index.schema),
188 index.config.clone(),
189 SegmentBuilderConfig::default(),
190 Arc::clone(&index.segment_manager),
191 )
192 }
193
194 // ========================================================================
195 // Construction + pipeline management
196 // ========================================================================
197
198 /// Common construction: creates worker state, spawns workers, assembles `Self`.
199 fn new_with_parts(
200 directory: Arc<D>,
201 schema: Arc<Schema>,
202 config: IndexConfig,
203 builder_config: SegmentBuilderConfig,
204 segment_manager: Arc<crate::merge::SegmentManager<D>>,
205 ) -> Self {
206 let num_workers = config.num_indexing_threads.max(1);
207 let worker_state = Arc::new(WorkerState {
208 directory: Arc::clone(&directory),
209 schema: Arc::clone(&schema),
210 builder_config,
211 tokenizers: parking_lot::RwLock::new(FxHashMap::default()),
212 memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
213 segment_manager: Arc::clone(&segment_manager),
214 built_segments: parking_lot::Mutex::new(Vec::new()),
215 flush_count: AtomicUsize::new(0),
216 flush_mutex: parking_lot::Mutex::new(()),
217 flush_cvar: parking_lot::Condvar::new(),
218 resume_receiver: parking_lot::Mutex::new(None),
219 resume_epoch: AtomicUsize::new(0),
220 resume_cvar: parking_lot::Condvar::new(),
221 shutdown: AtomicBool::new(false),
222 num_workers,
223 });
224 let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
225
226 Self {
227 directory,
228 schema,
229 config,
230 doc_sender,
231 workers,
232 worker_state,
233 segment_manager,
234 flushed_segments: Vec::new(),
235 }
236 }
237
238 fn spawn_workers(
239 worker_state: &Arc<WorkerState<D>>,
240 num_workers: usize,
241 ) -> (
242 async_channel::Sender<Document>,
243 Vec<std::thread::JoinHandle<()>>,
244 ) {
245 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
246 let handle = tokio::runtime::Handle::current();
247 let mut workers = Vec::with_capacity(num_workers);
248 for i in 0..num_workers {
249 let state = Arc::clone(worker_state);
250 let rx = receiver.clone();
251 let rt = handle.clone();
252 workers.push(
253 std::thread::Builder::new()
254 .name(format!("index-worker-{}", i))
255 .spawn(move || Self::worker_loop(state, rx, rt))
256 .expect("failed to spawn index worker thread"),
257 );
258 }
259 (sender, workers)
260 }
261
262 /// Get the schema
263 pub fn schema(&self) -> &Schema {
264 &self.schema
265 }
266
267 /// Set tokenizer for a field.
268 /// Propagated to worker threads — takes effect for the next SegmentBuilder they create.
269 pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
270 self.worker_state
271 .tokenizers
272 .write()
273 .insert(field, Box::new(tokenizer));
274 }
275
276 /// Add a document to the indexing queue (sync, O(1), lock-free).
277 ///
278 /// `Document` is moved into the channel (zero-copy). Workers compete to pull it.
279 /// Returns `Error::QueueFull` when the queue is at capacity — caller must back off.
280 pub fn add_document(&self, doc: Document) -> Result<()> {
281 self.doc_sender.try_send(doc).map_err(|e| match e {
282 async_channel::TrySendError::Full(_) => Error::QueueFull,
283 async_channel::TrySendError::Closed(_) => {
284 Error::Internal("Document channel closed".into())
285 }
286 })
287 }
288
289 /// Add multiple documents to the indexing queue.
290 ///
291 /// Returns the number of documents successfully queued. Stops at the first
292 /// `QueueFull` and returns the count queued so far.
293 pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
294 let total = documents.len();
295 for (i, doc) in documents.into_iter().enumerate() {
296 match self.add_document(doc) {
297 Ok(()) => {}
298 Err(Error::QueueFull) => return Ok(i),
299 Err(e) => return Err(e),
300 }
301 }
302 Ok(total)
303 }
304
305 // ========================================================================
306 // Worker loop
307 // ========================================================================
308
309 /// Worker loop — runs on a dedicated OS thread, survives across commits.
310 ///
311 /// Outer loop: each iteration processes one commit cycle.
312 /// Inner loop: pull documents from MPMC queue, index them, build segments
313 /// when memory budget is exceeded.
314 /// On channel close (prepare_commit): flush current builder, signal
315 /// flush_count, wait for resume with new receiver.
316 /// On shutdown (Drop): exit permanently.
317 fn worker_loop(
318 state: Arc<WorkerState<D>>,
319 initial_receiver: async_channel::Receiver<Document>,
320 handle: tokio::runtime::Handle,
321 ) {
322 let mut receiver = initial_receiver;
323 let mut my_epoch = 0usize;
324
325 loop {
326 // Wrap the recv+build phase in catch_unwind so a panic doesn't
327 // prevent flush_count from being signaled (which would hang
328 // prepare_commit forever).
329 let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
330 let mut builder: Option<SegmentBuilder> = None;
331
332 while let Ok(doc) = receiver.recv_blocking() {
333 // Initialize builder if needed
334 if builder.is_none() {
335 match SegmentBuilder::new(
336 Arc::clone(&state.schema),
337 state.builder_config.clone(),
338 ) {
339 Ok(mut b) => {
340 for (field, tokenizer) in state.tokenizers.read().iter() {
341 b.set_tokenizer(*field, tokenizer.clone_box());
342 }
343 builder = Some(b);
344 }
345 Err(e) => {
346 log::error!("Failed to create segment builder: {:?}", e);
347 continue;
348 }
349 }
350 }
351
352 let b = builder.as_mut().unwrap();
353 if let Err(e) = b.add_document(doc) {
354 log::error!("Failed to index document: {:?}", e);
355 continue;
356 }
357
358 let builder_memory = b.estimated_memory_bytes();
359
360 if b.num_docs() & 0x3FFF == 0 {
361 log::debug!(
362 "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
363 b.num_docs(),
364 builder_memory as f64 / (1024.0 * 1024.0),
365 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
366 );
367 }
368
369 // Require minimum 100 docs before flushing to avoid tiny segments
370 const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
371
372 if builder_memory >= state.memory_budget_per_worker
373 && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH
374 {
375 log::info!(
376 "[indexing] memory budget reached, building segment: \
377 docs={}, memory={:.2} MB, budget={:.2} MB",
378 b.num_docs(),
379 builder_memory as f64 / (1024.0 * 1024.0),
380 state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
381 );
382 let full_builder = builder.take().unwrap();
383 Self::build_segment_inline(&state, full_builder, &handle);
384 }
385 }
386
387 // Channel closed — flush current builder
388 if let Some(b) = builder.take()
389 && b.num_docs() > 0
390 {
391 Self::build_segment_inline(&state, b, &handle);
392 }
393 }));
394
395 if build_result.is_err() {
396 log::error!(
397 "[worker] panic during indexing cycle — documents in this cycle may be lost"
398 );
399 }
400
401 // Signal flush completion (always, even after panic — prevents
402 // prepare_commit from hanging)
403 let prev = state.flush_count.fetch_add(1, Ordering::Release);
404 if prev + 1 == state.num_workers {
405 // Last worker — wake prepare_commit
406 let _lock = state.flush_mutex.lock();
407 state.flush_cvar.notify_one();
408 }
409
410 // Wait for resume (new channel) or shutdown.
411 // Check resume_epoch to avoid re-cloning a stale receiver from
412 // a previous cycle.
413 {
414 let mut lock = state.resume_receiver.lock();
415 loop {
416 if state.shutdown.load(Ordering::Acquire) {
417 return;
418 }
419 let current_epoch = state.resume_epoch.load(Ordering::Acquire);
420 if current_epoch > my_epoch
421 && let Some(rx) = lock.as_ref()
422 {
423 receiver = rx.clone();
424 my_epoch = current_epoch;
425 break;
426 }
427 state.resume_cvar.wait(&mut lock);
428 }
429 }
430 }
431 }
432
433 /// Build a segment on the worker thread. Uses `Handle::block_on()` to bridge
434 /// into async context for I/O (streaming writers). CPU work (rayon) stays on
435 /// the worker thread / rayon pool.
436 fn build_segment_inline(
437 state: &WorkerState<D>,
438 builder: SegmentBuilder,
439 handle: &tokio::runtime::Handle,
440 ) {
441 let segment_id = SegmentId::new();
442 let segment_hex = segment_id.to_hex();
443 let trained = state.segment_manager.trained();
444 let doc_count = builder.num_docs();
445 let build_start = std::time::Instant::now();
446
447 log::info!(
448 "[segment_build] segment_id={} doc_count={} ann={}",
449 segment_hex,
450 doc_count,
451 trained.is_some()
452 );
453
454 match handle.block_on(builder.build(
455 state.directory.as_ref(),
456 segment_id,
457 trained.as_deref(),
458 )) {
459 Ok(meta) if meta.num_docs > 0 => {
460 let duration_ms = build_start.elapsed().as_millis() as u64;
461 log::info!(
462 "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
463 segment_hex,
464 meta.num_docs,
465 duration_ms,
466 );
467 state
468 .built_segments
469 .lock()
470 .push((segment_hex, meta.num_docs));
471 }
472 Ok(_) => {}
473 Err(e) => {
474 log::error!(
475 "[segment_build_failed] segment_id={} error={:?}",
476 segment_hex,
477 e
478 );
479 }
480 }
481 }
482
483 // ========================================================================
484 // Public API — commit, merge, etc.
485 // ========================================================================
486
487 /// Check merge policy and spawn a background merge if needed.
488 pub async fn maybe_merge(&self) {
489 self.segment_manager.maybe_merge().await;
490 }
491
492 /// Wait for the in-flight background merge to complete (if any).
493 pub async fn wait_for_merging_thread(&self) {
494 self.segment_manager.wait_for_merging_thread().await;
495 }
496
497 /// Wait for all eligible merges to complete, including cascading merges.
498 pub async fn wait_for_all_merges(&self) {
499 self.segment_manager.wait_for_all_merges().await;
500 }
501
502 /// Get the segment tracker for sharing with readers.
503 pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
504 self.segment_manager.tracker()
505 }
506
507 /// Acquire a snapshot of current segments for reading.
508 pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
509 self.segment_manager.acquire_snapshot().await
510 }
511
512 /// Clean up orphan segment files not registered in metadata.
513 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
514 self.segment_manager.cleanup_orphan_segments().await
515 }
516
517 /// Prepare commit — signal workers to flush, wait for completion, collect segments.
518 ///
519 /// All documents sent via `add_document` before this call are guaranteed
520 /// to be written to segment files on disk. Segments are NOT yet registered
521 /// in metadata — call `PreparedCommit::commit()` for that.
522 ///
523 /// Workers are NOT destroyed — they flush their builders and wait for
524 /// `resume_workers()` to give them a new channel.
525 ///
526 /// `add_document` will return `Closed` error until commit/abort resumes workers.
527 pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
528 // 1. Close channel → workers drain remaining docs and flush builders
529 self.doc_sender.close();
530
531 // Wake any workers still waiting on resume_cvar from previous cycle.
532 // They'll clone the stale receiver, enter recv_blocking, get Err
533 // immediately (sender already closed), flush, and signal completion.
534 self.worker_state.resume_cvar.notify_all();
535
536 // 2. Wait for all workers to complete their flush (via spawn_blocking
537 // to avoid blocking the tokio runtime)
538 let state = Arc::clone(&self.worker_state);
539 tokio::task::spawn_blocking(move || {
540 let mut lock = state.flush_mutex.lock();
541 while state.flush_count.load(Ordering::Acquire) < state.num_workers {
542 state.flush_cvar.wait(&mut lock);
543 }
544 })
545 .await
546 .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
547
548 // 3. Collect built segments
549 let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
550 self.flushed_segments.extend(built);
551
552 Ok(PreparedCommit {
553 writer: self,
554 is_resolved: false,
555 })
556 }
557
558 /// Commit (convenience): prepare_commit + commit in one call.
559 ///
560 /// Guarantees all prior `add_document` calls are committed.
561 /// Vector training is decoupled — call `build_vector_index()` manually.
562 pub async fn commit(&mut self) -> Result<()> {
563 self.prepare_commit().await?.commit().await
564 }
565
566 /// Force merge all segments into one.
567 pub async fn force_merge(&mut self) -> Result<()> {
568 self.prepare_commit().await?.commit().await?;
569 self.segment_manager.force_merge().await
570 }
571
572 /// Resume workers with a fresh channel. Called after commit or abort.
573 ///
574 /// Workers are already alive — just give them a new channel and wake them.
575 /// If the tokio runtime has shut down (e.g., program exit), this is a no-op.
576 fn resume_workers(&mut self) {
577 if tokio::runtime::Handle::try_current().is_err() {
578 return;
579 }
580
581 // Reset flush count for next cycle
582 self.worker_state.flush_count.store(0, Ordering::Release);
583
584 // Create new channel
585 let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
586 self.doc_sender = sender;
587
588 // Set new receiver, bump epoch, and wake all workers
589 {
590 let mut lock = self.worker_state.resume_receiver.lock();
591 *lock = Some(receiver);
592 }
593 self.worker_state
594 .resume_epoch
595 .fetch_add(1, Ordering::Release);
596 self.worker_state.resume_cvar.notify_all();
597 }
598
599 // Vector index methods (build_vector_index, etc.) are in vector_builder.rs
600}
601
602impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
603 fn drop(&mut self) {
604 // 1. Signal permanent shutdown
605 self.worker_state.shutdown.store(true, Ordering::Release);
606 // 2. Close channel to wake workers blocked on recv_blocking
607 self.doc_sender.close();
608 // 3. Wake workers that might be waiting on resume_cvar
609 self.worker_state.resume_cvar.notify_all();
610 // 4. Join worker threads
611 for w in std::mem::take(&mut self.workers) {
612 let _ = w.join();
613 }
614 }
615}
616
617/// A prepared commit that can be finalized or aborted.
618///
619/// Two-phase commit guard. Between `prepare_commit()` and
620/// `commit()`/`abort()`, segments are on disk but NOT in metadata.
621/// Dropping without calling either will auto-abort (discard segments,
622/// respawn workers).
623pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
624 writer: &'a mut IndexWriter<D>,
625 is_resolved: bool,
626}
627
628impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
629 /// Finalize: register segments in metadata, evaluate merge policy, resume workers.
630 pub async fn commit(mut self) -> Result<()> {
631 self.is_resolved = true;
632 let segments = std::mem::take(&mut self.writer.flushed_segments);
633 self.writer.segment_manager.commit(segments).await?;
634 self.writer.segment_manager.maybe_merge().await;
635 self.writer.resume_workers();
636 Ok(())
637 }
638
639 /// Abort: discard prepared segments, resume workers.
640 /// Segment files become orphans (cleaned up by `cleanup_orphan_segments`).
641 pub fn abort(mut self) {
642 self.is_resolved = true;
643 self.writer.flushed_segments.clear();
644 self.writer.resume_workers();
645 }
646}
647
648impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
649 fn drop(&mut self) {
650 if !self.is_resolved {
651 log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
652 self.writer.flushed_segments.clear();
653 self.writer.resume_workers();
654 }
655 }
656}