Skip to main content

hermes_core/index/
writer.rs

1//! IndexWriter — async document indexing with parallel segment building.
2//!
3//! This module is only compiled with the "native" feature.
4//!
5//! # Architecture
6//!
7//! ```text
8//! add_document() ──try_send──► [shared bounded MPMC] ◄──recv── worker 0
9//!                                                     ◄──recv── worker 1
10//!                                                     ◄──recv── worker N
11//! ```
12//!
13//! - **Shared MPMC queue** (`async_channel`): all workers compete for documents.
14//!   Busy workers (building segments) naturally stop pulling; free workers pick up slack.
15//! - **Zero-copy pipeline**: `Document` is moved (never cloned) through every stage:
16//!   `add_document()` → channel → `recv_blocking()` → `SegmentBuilder::add_document()`.
17//! - `add_document` returns `QueueFull` when the queue is at capacity.
18//! - **Workers are OS threads**: CPU-intensive work (tokenization, posting list building)
19//!   runs on dedicated threads, never blocking the tokio async runtime.
20//!   Async I/O (segment file writes) is bridged via `Handle::block_on()`.
21//! - **Fixed per-worker memory budget**: `max_indexing_memory_bytes / num_workers`.
22//! - **Two-phase commit**:
23//!   1. `prepare_commit()` — closes queue, workers flush builders to disk.
24//!      Returns a `PreparedCommit` guard. No new documents accepted until resolved.
25//!   2. `PreparedCommit::commit()` — registers segments in metadata, resumes workers.
26//!   3. `PreparedCommit::abort()` — discards prepared segments, resumes workers.
27//!   4. `commit()` — convenience: `prepare_commit().await?.commit().await`.
28//!
29//! Since `prepare_commit`/`commit` take `&mut self`, Rust’s borrow checker
30//! guarantees no concurrent `add_document` calls during the commit window.
31
32use std::sync::Arc;
33use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
34
35use rustc_hash::FxHashMap;
36
37use crate::directories::DirectoryWriter;
38use crate::dsl::{Document, Field, Schema};
39use crate::error::{Error, Result};
40use crate::segment::{SegmentBuilder, SegmentBuilderConfig, SegmentId};
41use crate::tokenizer::BoxedTokenizer;
42
43use super::IndexConfig;
44
45/// Total pipeline capacity (in documents).
46const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
47
48/// Async IndexWriter for adding documents and committing segments.
49///
50/// **Backpressure:** `add_document()` is sync, O(1). Returns `Error::QueueFull`
51/// when the shared queue is at capacity — caller must back off.
52///
53/// **Two-phase commit:**
54/// - `prepare_commit()` → `PreparedCommit::commit()` or `PreparedCommit::abort()`
55/// - `commit()` is a convenience that does both phases.
56/// - Between prepare and commit, the caller can do external work (WAL, sync, etc.)
57///   knowing that abort is possible if something fails.
58/// - Dropping `PreparedCommit` without calling commit/abort auto-aborts.
59pub struct IndexWriter<D: DirectoryWriter + 'static> {
60    pub(super) directory: Arc<D>,
61    pub(super) schema: Arc<Schema>,
62    pub(super) config: IndexConfig,
63    /// MPMC sender — `try_send(&self)` is thread-safe, no lock needed.
64    /// Replaced on each commit cycle (workers get new receiver via resume).
65    doc_sender: async_channel::Sender<Document>,
66    /// Worker OS thread handles — long-lived, survive across commits.
67    workers: Vec<std::thread::JoinHandle<()>>,
68    /// Shared worker state (immutable config + mutable segment output + sync)
69    worker_state: Arc<WorkerState<D>>,
70    /// Segment manager — owns metadata.json, handles segments and background merging
71    pub(super) segment_manager: Arc<crate::merge::SegmentManager<D>>,
72    /// Segments flushed to disk but not yet registered in metadata
73    flushed_segments: Vec<(String, u32)>,
74}
75
76/// Shared state for worker threads.
77struct WorkerState<D: DirectoryWriter + 'static> {
78    directory: Arc<D>,
79    schema: Arc<Schema>,
80    builder_config: SegmentBuilderConfig,
81    tokenizers: parking_lot::RwLock<FxHashMap<Field, BoxedTokenizer>>,
82    /// Fixed per-worker memory budget (bytes). When a builder exceeds this, segment is built.
83    memory_budget_per_worker: usize,
84    /// Segment manager — workers read trained structures from its ArcSwap (lock-free).
85    segment_manager: Arc<crate::merge::SegmentManager<D>>,
86    /// Segments built by workers, collected by `prepare_commit()`. Sync mutex for sub-μs push.
87    built_segments: parking_lot::Mutex<Vec<(String, u32)>>,
88
89    // === Worker lifecycle synchronization ===
90    // Workers survive across commits. On prepare_commit the channel is closed;
91    // workers flush their builders, increment flush_count, then wait on
92    // resume_cvar for a new receiver. commit/abort creates a fresh channel
93    // and wakes them.
94    /// Number of workers that have completed their flush.
95    flush_count: AtomicUsize,
96    /// Mutex + condvar for prepare_commit to wait on all workers flushed.
97    flush_mutex: parking_lot::Mutex<()>,
98    flush_cvar: parking_lot::Condvar,
99    /// Holds the new channel receiver after commit/abort. Workers clone from this.
100    resume_receiver: parking_lot::Mutex<Option<async_channel::Receiver<Document>>>,
101    /// Monotonically increasing epoch, bumped by each resume_workers call.
102    /// Workers compare against their local epoch to avoid re-cloning a stale receiver.
103    resume_epoch: AtomicUsize,
104    /// Condvar for workers to wait for resume (new channel) or shutdown.
105    resume_cvar: parking_lot::Condvar,
106    /// When true, workers should exit permanently (IndexWriter dropped).
107    shutdown: AtomicBool,
108    /// Total number of worker threads.
109    num_workers: usize,
110}
111
112impl<D: DirectoryWriter + 'static> IndexWriter<D> {
113    /// Create a new index in the directory
114    pub async fn create(directory: D, schema: Schema, config: IndexConfig) -> Result<Self> {
115        Self::create_with_config(directory, schema, config, SegmentBuilderConfig::default()).await
116    }
117
118    /// Create a new index with custom builder config
119    pub async fn create_with_config(
120        directory: D,
121        schema: Schema,
122        config: IndexConfig,
123        builder_config: SegmentBuilderConfig,
124    ) -> Result<Self> {
125        let directory = Arc::new(directory);
126        let schema = Arc::new(schema);
127        let metadata = super::IndexMetadata::new((*schema).clone());
128
129        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
130            Arc::clone(&directory),
131            Arc::clone(&schema),
132            metadata,
133            config.merge_policy.clone_box(),
134            config.term_cache_blocks,
135            config.max_concurrent_merges,
136        ));
137        segment_manager.update_metadata(|_| {}).await?;
138
139        Ok(Self::new_with_parts(
140            directory,
141            schema,
142            config,
143            builder_config,
144            segment_manager,
145        ))
146    }
147
148    /// Open an existing index for writing
149    pub async fn open(directory: D, config: IndexConfig) -> Result<Self> {
150        Self::open_with_config(directory, config, SegmentBuilderConfig::default()).await
151    }
152
153    /// Open an existing index with custom builder config
154    pub async fn open_with_config(
155        directory: D,
156        config: IndexConfig,
157        builder_config: SegmentBuilderConfig,
158    ) -> Result<Self> {
159        let directory = Arc::new(directory);
160        let metadata = super::IndexMetadata::load(directory.as_ref()).await?;
161        let schema = Arc::new(metadata.schema.clone());
162
163        let segment_manager = Arc::new(crate::merge::SegmentManager::new(
164            Arc::clone(&directory),
165            Arc::clone(&schema),
166            metadata,
167            config.merge_policy.clone_box(),
168            config.term_cache_blocks,
169            config.max_concurrent_merges,
170        ));
171        segment_manager.load_and_publish_trained().await;
172
173        Ok(Self::new_with_parts(
174            directory,
175            schema,
176            config,
177            builder_config,
178            segment_manager,
179        ))
180    }
181
182    /// Create an IndexWriter from an existing Index.
183    /// Shares the SegmentManager for consistent segment lifecycle management.
184    pub fn from_index(index: &super::Index<D>) -> Self {
185        Self::new_with_parts(
186            Arc::clone(&index.directory),
187            Arc::clone(&index.schema),
188            index.config.clone(),
189            SegmentBuilderConfig::default(),
190            Arc::clone(&index.segment_manager),
191        )
192    }
193
194    // ========================================================================
195    // Construction + pipeline management
196    // ========================================================================
197
198    /// Common construction: creates worker state, spawns workers, assembles `Self`.
199    fn new_with_parts(
200        directory: Arc<D>,
201        schema: Arc<Schema>,
202        config: IndexConfig,
203        builder_config: SegmentBuilderConfig,
204        segment_manager: Arc<crate::merge::SegmentManager<D>>,
205    ) -> Self {
206        let num_workers = config.num_indexing_threads.max(1);
207        let worker_state = Arc::new(WorkerState {
208            directory: Arc::clone(&directory),
209            schema: Arc::clone(&schema),
210            builder_config,
211            tokenizers: parking_lot::RwLock::new(FxHashMap::default()),
212            memory_budget_per_worker: config.max_indexing_memory_bytes / num_workers,
213            segment_manager: Arc::clone(&segment_manager),
214            built_segments: parking_lot::Mutex::new(Vec::new()),
215            flush_count: AtomicUsize::new(0),
216            flush_mutex: parking_lot::Mutex::new(()),
217            flush_cvar: parking_lot::Condvar::new(),
218            resume_receiver: parking_lot::Mutex::new(None),
219            resume_epoch: AtomicUsize::new(0),
220            resume_cvar: parking_lot::Condvar::new(),
221            shutdown: AtomicBool::new(false),
222            num_workers,
223        });
224        let (doc_sender, workers) = Self::spawn_workers(&worker_state, num_workers);
225
226        Self {
227            directory,
228            schema,
229            config,
230            doc_sender,
231            workers,
232            worker_state,
233            segment_manager,
234            flushed_segments: Vec::new(),
235        }
236    }
237
238    fn spawn_workers(
239        worker_state: &Arc<WorkerState<D>>,
240        num_workers: usize,
241    ) -> (
242        async_channel::Sender<Document>,
243        Vec<std::thread::JoinHandle<()>>,
244    ) {
245        let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
246        let handle = tokio::runtime::Handle::current();
247        let mut workers = Vec::with_capacity(num_workers);
248        for i in 0..num_workers {
249            let state = Arc::clone(worker_state);
250            let rx = receiver.clone();
251            let rt = handle.clone();
252            workers.push(
253                std::thread::Builder::new()
254                    .name(format!("index-worker-{}", i))
255                    .spawn(move || Self::worker_loop(state, rx, rt))
256                    .expect("failed to spawn index worker thread"),
257            );
258        }
259        (sender, workers)
260    }
261
262    /// Get the schema
263    pub fn schema(&self) -> &Schema {
264        &self.schema
265    }
266
267    /// Set tokenizer for a field.
268    /// Propagated to worker threads — takes effect for the next SegmentBuilder they create.
269    pub fn set_tokenizer<T: crate::tokenizer::Tokenizer>(&mut self, field: Field, tokenizer: T) {
270        self.worker_state
271            .tokenizers
272            .write()
273            .insert(field, Box::new(tokenizer));
274    }
275
276    /// Add a document to the indexing queue (sync, O(1), lock-free).
277    ///
278    /// `Document` is moved into the channel (zero-copy). Workers compete to pull it.
279    /// Returns `Error::QueueFull` when the queue is at capacity — caller must back off.
280    pub fn add_document(&self, doc: Document) -> Result<()> {
281        self.doc_sender.try_send(doc).map_err(|e| match e {
282            async_channel::TrySendError::Full(_) => Error::QueueFull,
283            async_channel::TrySendError::Closed(_) => {
284                Error::Internal("Document channel closed".into())
285            }
286        })
287    }
288
289    /// Add multiple documents to the indexing queue.
290    ///
291    /// Returns the number of documents successfully queued. Stops at the first
292    /// `QueueFull` and returns the count queued so far.
293    pub fn add_documents(&self, documents: Vec<Document>) -> Result<usize> {
294        let total = documents.len();
295        for (i, doc) in documents.into_iter().enumerate() {
296            match self.add_document(doc) {
297                Ok(()) => {}
298                Err(Error::QueueFull) => return Ok(i),
299                Err(e) => return Err(e),
300            }
301        }
302        Ok(total)
303    }
304
305    // ========================================================================
306    // Worker loop
307    // ========================================================================
308
309    /// Worker loop — runs on a dedicated OS thread, survives across commits.
310    ///
311    /// Outer loop: each iteration processes one commit cycle.
312    ///   Inner loop: pull documents from MPMC queue, index them, build segments
313    ///   when memory budget is exceeded.
314    ///   On channel close (prepare_commit): flush current builder, signal
315    ///   flush_count, wait for resume with new receiver.
316    ///   On shutdown (Drop): exit permanently.
317    fn worker_loop(
318        state: Arc<WorkerState<D>>,
319        initial_receiver: async_channel::Receiver<Document>,
320        handle: tokio::runtime::Handle,
321    ) {
322        let mut receiver = initial_receiver;
323        let mut my_epoch = 0usize;
324
325        loop {
326            // Wrap the recv+build phase in catch_unwind so a panic doesn't
327            // prevent flush_count from being signaled (which would hang
328            // prepare_commit forever).
329            let build_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
330                let mut builder: Option<SegmentBuilder> = None;
331
332                while let Ok(doc) = receiver.recv_blocking() {
333                    // Initialize builder if needed
334                    if builder.is_none() {
335                        match SegmentBuilder::new(
336                            Arc::clone(&state.schema),
337                            state.builder_config.clone(),
338                        ) {
339                            Ok(mut b) => {
340                                for (field, tokenizer) in state.tokenizers.read().iter() {
341                                    b.set_tokenizer(*field, tokenizer.clone_box());
342                                }
343                                builder = Some(b);
344                            }
345                            Err(e) => {
346                                log::error!("Failed to create segment builder: {:?}", e);
347                                continue;
348                            }
349                        }
350                    }
351
352                    let b = builder.as_mut().unwrap();
353                    if let Err(e) = b.add_document(doc) {
354                        log::error!("Failed to index document: {:?}", e);
355                        continue;
356                    }
357
358                    let builder_memory = b.estimated_memory_bytes();
359
360                    if b.num_docs() & 0x3FFF == 0 {
361                        log::debug!(
362                            "[indexing] docs={}, memory={:.2} MB, budget={:.2} MB",
363                            b.num_docs(),
364                            builder_memory as f64 / (1024.0 * 1024.0),
365                            state.memory_budget_per_worker as f64 / (1024.0 * 1024.0)
366                        );
367                    }
368
369                    // Require minimum 100 docs before flushing to avoid tiny segments
370                    const MIN_DOCS_BEFORE_FLUSH: u32 = 100;
371
372                    if builder_memory >= state.memory_budget_per_worker
373                        && b.num_docs() >= MIN_DOCS_BEFORE_FLUSH
374                    {
375                        log::info!(
376                            "[indexing] memory budget reached, building segment: \
377                             docs={}, memory={:.2} MB, budget={:.2} MB",
378                            b.num_docs(),
379                            builder_memory as f64 / (1024.0 * 1024.0),
380                            state.memory_budget_per_worker as f64 / (1024.0 * 1024.0),
381                        );
382                        let full_builder = builder.take().unwrap();
383                        Self::build_segment_inline(&state, full_builder, &handle);
384                    }
385                }
386
387                // Channel closed — flush current builder
388                if let Some(b) = builder.take()
389                    && b.num_docs() > 0
390                {
391                    Self::build_segment_inline(&state, b, &handle);
392                }
393            }));
394
395            if build_result.is_err() {
396                log::error!(
397                    "[worker] panic during indexing cycle — documents in this cycle may be lost"
398                );
399            }
400
401            // Signal flush completion (always, even after panic — prevents
402            // prepare_commit from hanging)
403            let prev = state.flush_count.fetch_add(1, Ordering::Release);
404            if prev + 1 == state.num_workers {
405                // Last worker — wake prepare_commit
406                let _lock = state.flush_mutex.lock();
407                state.flush_cvar.notify_one();
408            }
409
410            // Wait for resume (new channel) or shutdown.
411            // Check resume_epoch to avoid re-cloning a stale receiver from
412            // a previous cycle.
413            {
414                let mut lock = state.resume_receiver.lock();
415                loop {
416                    if state.shutdown.load(Ordering::Acquire) {
417                        return;
418                    }
419                    let current_epoch = state.resume_epoch.load(Ordering::Acquire);
420                    if current_epoch > my_epoch
421                        && let Some(rx) = lock.as_ref()
422                    {
423                        receiver = rx.clone();
424                        my_epoch = current_epoch;
425                        break;
426                    }
427                    state.resume_cvar.wait(&mut lock);
428                }
429            }
430        }
431    }
432
433    /// Build a segment on the worker thread. Uses `Handle::block_on()` to bridge
434    /// into async context for I/O (streaming writers). CPU work (rayon) stays on
435    /// the worker thread / rayon pool.
436    fn build_segment_inline(
437        state: &WorkerState<D>,
438        builder: SegmentBuilder,
439        handle: &tokio::runtime::Handle,
440    ) {
441        let segment_id = SegmentId::new();
442        let segment_hex = segment_id.to_hex();
443        let trained = state.segment_manager.trained();
444        let doc_count = builder.num_docs();
445        let build_start = std::time::Instant::now();
446
447        log::info!(
448            "[segment_build] segment_id={} doc_count={} ann={}",
449            segment_hex,
450            doc_count,
451            trained.is_some()
452        );
453
454        match handle.block_on(builder.build(
455            state.directory.as_ref(),
456            segment_id,
457            trained.as_deref(),
458        )) {
459            Ok(meta) if meta.num_docs > 0 => {
460                let duration_ms = build_start.elapsed().as_millis() as u64;
461                log::info!(
462                    "[segment_build_done] segment_id={} doc_count={} duration_ms={}",
463                    segment_hex,
464                    meta.num_docs,
465                    duration_ms,
466                );
467                state
468                    .built_segments
469                    .lock()
470                    .push((segment_hex, meta.num_docs));
471            }
472            Ok(_) => {}
473            Err(e) => {
474                log::error!(
475                    "[segment_build_failed] segment_id={} error={:?}",
476                    segment_hex,
477                    e
478                );
479            }
480        }
481    }
482
483    // ========================================================================
484    // Public API — commit, merge, etc.
485    // ========================================================================
486
487    /// Check merge policy and spawn a background merge if needed.
488    pub async fn maybe_merge(&self) {
489        self.segment_manager.maybe_merge().await;
490    }
491
492    /// Wait for the in-flight background merge to complete (if any).
493    pub async fn wait_for_merging_thread(&self) {
494        self.segment_manager.wait_for_merging_thread().await;
495    }
496
497    /// Wait for all eligible merges to complete, including cascading merges.
498    pub async fn wait_for_all_merges(&self) {
499        self.segment_manager.wait_for_all_merges().await;
500    }
501
502    /// Get the segment tracker for sharing with readers.
503    pub fn tracker(&self) -> std::sync::Arc<crate::segment::SegmentTracker> {
504        self.segment_manager.tracker()
505    }
506
507    /// Acquire a snapshot of current segments for reading.
508    pub async fn acquire_snapshot(&self) -> crate::segment::SegmentSnapshot {
509        self.segment_manager.acquire_snapshot().await
510    }
511
512    /// Clean up orphan segment files not registered in metadata.
513    pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
514        self.segment_manager.cleanup_orphan_segments().await
515    }
516
517    /// Prepare commit — signal workers to flush, wait for completion, collect segments.
518    ///
519    /// All documents sent via `add_document` before this call are guaranteed
520    /// to be written to segment files on disk. Segments are NOT yet registered
521    /// in metadata — call `PreparedCommit::commit()` for that.
522    ///
523    /// Workers are NOT destroyed — they flush their builders and wait for
524    /// `resume_workers()` to give them a new channel.
525    ///
526    /// `add_document` will return `Closed` error until commit/abort resumes workers.
527    pub async fn prepare_commit(&mut self) -> Result<PreparedCommit<'_, D>> {
528        // 1. Close channel → workers drain remaining docs and flush builders
529        self.doc_sender.close();
530
531        // Wake any workers still waiting on resume_cvar from previous cycle.
532        // They'll clone the stale receiver, enter recv_blocking, get Err
533        // immediately (sender already closed), flush, and signal completion.
534        self.worker_state.resume_cvar.notify_all();
535
536        // 2. Wait for all workers to complete their flush (via spawn_blocking
537        //    to avoid blocking the tokio runtime)
538        let state = Arc::clone(&self.worker_state);
539        tokio::task::spawn_blocking(move || {
540            let mut lock = state.flush_mutex.lock();
541            while state.flush_count.load(Ordering::Acquire) < state.num_workers {
542                state.flush_cvar.wait(&mut lock);
543            }
544        })
545        .await
546        .map_err(|e| Error::Internal(format!("Failed to wait for workers: {}", e)))?;
547
548        // 3. Collect built segments
549        let built = std::mem::take(&mut *self.worker_state.built_segments.lock());
550        self.flushed_segments.extend(built);
551
552        Ok(PreparedCommit {
553            writer: self,
554            is_resolved: false,
555        })
556    }
557
558    /// Commit (convenience): prepare_commit + commit in one call.
559    ///
560    /// Guarantees all prior `add_document` calls are committed.
561    /// Vector training is decoupled — call `build_vector_index()` manually.
562    pub async fn commit(&mut self) -> Result<()> {
563        self.prepare_commit().await?.commit().await
564    }
565
566    /// Force merge all segments into one.
567    pub async fn force_merge(&mut self) -> Result<()> {
568        self.prepare_commit().await?.commit().await?;
569        self.segment_manager.force_merge().await
570    }
571
572    /// Resume workers with a fresh channel. Called after commit or abort.
573    ///
574    /// Workers are already alive — just give them a new channel and wake them.
575    /// If the tokio runtime has shut down (e.g., program exit), this is a no-op.
576    fn resume_workers(&mut self) {
577        if tokio::runtime::Handle::try_current().is_err() {
578            return;
579        }
580
581        // Reset flush count for next cycle
582        self.worker_state.flush_count.store(0, Ordering::Release);
583
584        // Create new channel
585        let (sender, receiver) = async_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
586        self.doc_sender = sender;
587
588        // Set new receiver, bump epoch, and wake all workers
589        {
590            let mut lock = self.worker_state.resume_receiver.lock();
591            *lock = Some(receiver);
592        }
593        self.worker_state
594            .resume_epoch
595            .fetch_add(1, Ordering::Release);
596        self.worker_state.resume_cvar.notify_all();
597    }
598
599    // Vector index methods (build_vector_index, etc.) are in vector_builder.rs
600}
601
602impl<D: DirectoryWriter + 'static> Drop for IndexWriter<D> {
603    fn drop(&mut self) {
604        // 1. Signal permanent shutdown
605        self.worker_state.shutdown.store(true, Ordering::Release);
606        // 2. Close channel to wake workers blocked on recv_blocking
607        self.doc_sender.close();
608        // 3. Wake workers that might be waiting on resume_cvar
609        self.worker_state.resume_cvar.notify_all();
610        // 4. Join worker threads
611        for w in std::mem::take(&mut self.workers) {
612            let _ = w.join();
613        }
614    }
615}
616
617/// A prepared commit that can be finalized or aborted.
618///
619/// Two-phase commit guard. Between `prepare_commit()` and
620/// `commit()`/`abort()`, segments are on disk but NOT in metadata.
621/// Dropping without calling either will auto-abort (discard segments,
622/// respawn workers).
623pub struct PreparedCommit<'a, D: DirectoryWriter + 'static> {
624    writer: &'a mut IndexWriter<D>,
625    is_resolved: bool,
626}
627
628impl<'a, D: DirectoryWriter + 'static> PreparedCommit<'a, D> {
629    /// Finalize: register segments in metadata, evaluate merge policy, resume workers.
630    pub async fn commit(mut self) -> Result<()> {
631        self.is_resolved = true;
632        let segments = std::mem::take(&mut self.writer.flushed_segments);
633        self.writer.segment_manager.commit(segments).await?;
634        self.writer.segment_manager.maybe_merge().await;
635        self.writer.resume_workers();
636        Ok(())
637    }
638
639    /// Abort: discard prepared segments, resume workers.
640    /// Segment files become orphans (cleaned up by `cleanup_orphan_segments`).
641    pub fn abort(mut self) {
642        self.is_resolved = true;
643        self.writer.flushed_segments.clear();
644        self.writer.resume_workers();
645    }
646}
647
648impl<D: DirectoryWriter + 'static> Drop for PreparedCommit<'_, D> {
649    fn drop(&mut self) {
650        if !self.is_resolved {
651            log::warn!("PreparedCommit dropped without commit/abort — auto-aborting");
652            self.writer.flushed_segments.clear();
653            self.writer.resume_workers();
654        }
655    }
656}