1use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40const MAX_CONCURRENT_MERGES: usize = 2;
42
43pub struct SegmentManager<D: DirectoryWriter + 'static> {
51 directory: Arc<D>,
53 schema: Arc<crate::dsl::Schema>,
55 metadata: Arc<RwLock<IndexMetadata>>,
58 merge_policy: Box<dyn MergePolicy>,
60 pending_merges: Arc<AtomicUsize>,
62 merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64 term_cache_blocks: usize,
66 merge_complete: Arc<Notify>,
68 tracker: Arc<SegmentTracker>,
70 merge_paused: Arc<AtomicBool>,
72}
73
74impl<D: DirectoryWriter + 'static> SegmentManager<D> {
75 pub fn new(
77 directory: Arc<D>,
78 schema: Arc<crate::dsl::Schema>,
79 metadata: IndexMetadata,
80 merge_policy: Box<dyn MergePolicy>,
81 term_cache_blocks: usize,
82 ) -> Self {
83 let tracker = Arc::new(SegmentTracker::new());
85 for seg_id in metadata.segment_metas.keys() {
86 tracker.register(seg_id);
87 }
88
89 Self {
90 directory,
91 schema,
92 metadata: Arc::new(RwLock::new(metadata)),
93 merge_policy,
94 pending_merges: Arc::new(AtomicUsize::new(0)),
95 merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
96 term_cache_blocks,
97 merge_complete: Arc::new(Notify::new()),
98 tracker,
99 merge_paused: Arc::new(AtomicBool::new(false)),
100 }
101 }
102
103 pub async fn get_segment_ids(&self) -> Vec<String> {
105 self.metadata.read().await.segment_ids()
106 }
107
108 pub fn pending_merge_count(&self) -> usize {
110 self.pending_merges.load(Ordering::SeqCst)
111 }
112
113 pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
115 Arc::clone(&self.metadata)
116 }
117
118 pub async fn update_metadata<F>(&self, f: F) -> Result<()>
120 where
121 F: FnOnce(&mut IndexMetadata),
122 {
123 let mut meta = self.metadata.write().await;
124 f(&mut meta);
125 meta.save(self.directory.as_ref()).await
126 }
127
128 pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
131 let acquired = {
132 let meta = self.metadata.read().await;
133 let segment_ids = meta.segment_ids();
134 self.tracker.acquire(&segment_ids)
135 };
136
137 let dir = Arc::clone(&self.directory);
140 let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |segment_ids| {
141 let dir = Arc::clone(&dir);
142 tokio::spawn(async move {
143 for segment_id in segment_ids {
144 log::info!(
145 "[segment_cleanup] deleting deferred segment {}",
146 segment_id.0
147 );
148 let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
149 }
150 });
151 });
152
153 SegmentSnapshot::with_delete_fn(
154 Arc::clone(&self.tracker),
155 Arc::clone(&self.directory),
156 acquired,
157 delete_fn,
158 )
159 }
160
161 pub fn tracker(&self) -> Arc<SegmentTracker> {
163 Arc::clone(&self.tracker)
164 }
165
166 pub fn directory(&self) -> Arc<D> {
168 Arc::clone(&self.directory)
169 }
170}
171
172#[cfg(feature = "native")]
174impl<D: DirectoryWriter + 'static> SegmentManager<D> {
175 pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
179 {
180 let mut meta = self.metadata.write().await;
181 if !meta.has_segment(&segment_id) {
182 meta.add_segment(segment_id.clone(), num_docs);
183 self.tracker.register(&segment_id);
184 }
185 meta.save(self.directory.as_ref()).await?;
186 }
187
188 self.maybe_merge().await;
190 Ok(())
191 }
192
193 pub async fn maybe_merge(&self) {
196 let segments: Vec<SegmentInfo> = {
198 let meta = self.metadata.read().await;
199 let merging = self.merging_segments.read();
200
201 meta.segment_metas
203 .iter()
204 .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
205 .map(|(id, info)| SegmentInfo {
206 id: id.clone(),
207 num_docs: info.num_docs,
208 size_bytes: None,
209 })
210 .collect()
211 };
212
213 let candidates = self.merge_policy.find_merges(&segments);
215
216 for candidate in candidates {
217 if candidate.segment_ids.len() >= 2 {
218 self.spawn_merge(candidate.segment_ids);
219 }
220 }
221 }
222
223 pub fn pause_merges(&self) {
225 self.merge_paused.store(true, Ordering::SeqCst);
226 }
227
228 pub fn resume_merges(&self) {
230 self.merge_paused.store(false, Ordering::SeqCst);
231 }
232
233 fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
235 if self.merge_paused.load(Ordering::SeqCst) {
237 return;
238 }
239 if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
241 return;
242 }
243
244 {
248 let mut merging = self.merging_segments.write();
249 if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
251 return;
253 }
254 for id in &segment_ids_to_merge {
256 merging.insert(id.clone());
257 }
258 }
259
260 let directory = Arc::clone(&self.directory);
261 let schema = Arc::clone(&self.schema);
262 let metadata = Arc::clone(&self.metadata);
263 let merging_segments = Arc::clone(&self.merging_segments);
264 let pending_merges = Arc::clone(&self.pending_merges);
265 let merge_complete = Arc::clone(&self.merge_complete);
266 let tracker = Arc::clone(&self.tracker);
267 let term_cache_blocks = self.term_cache_blocks;
268
269 pending_merges.fetch_add(1, Ordering::SeqCst);
270
271 tokio::spawn(async move {
272 let result = Self::do_merge(
273 directory.as_ref(),
274 &schema,
275 &segment_ids_to_merge,
276 term_cache_blocks,
277 &metadata,
278 )
279 .await;
280
281 match result {
282 Ok((new_segment_id, merged_doc_count)) => {
283 tracker.register(&new_segment_id);
285
286 {
288 let mut meta = metadata.write().await;
289 for id in &segment_ids_to_merge {
290 meta.remove_segment(id);
291 }
292 meta.add_segment(new_segment_id, merged_doc_count);
293 if let Err(e) = meta.save(directory.as_ref()).await {
294 eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
295 }
296 }
297
298 let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
300 for segment_id in ready_to_delete {
301 let _ =
302 crate::segment::delete_segment(directory.as_ref(), segment_id).await;
303 }
304 }
305 Err(e) => {
306 eprintln!(
307 "Background merge failed for segments {:?}: {:?}",
308 segment_ids_to_merge, e
309 );
310 }
311 }
312
313 {
315 let mut merging = merging_segments.write();
316 for id in &segment_ids_to_merge {
317 merging.remove(id);
318 }
319 }
320
321 pending_merges.fetch_sub(1, Ordering::SeqCst);
323 merge_complete.notify_waiters();
324 });
325 }
326
327 async fn do_merge(
330 directory: &D,
331 schema: &crate::dsl::Schema,
332 segment_ids_to_merge: &[String],
333 term_cache_blocks: usize,
334 metadata: &RwLock<IndexMetadata>,
335 ) -> Result<(String, u32)> {
336 let mut readers = Vec::new();
338 let mut doc_offset = 0u32;
339 let mut total_docs = 0u32;
340
341 for id_str in segment_ids_to_merge {
342 let segment_id = SegmentId::from_hex(id_str)
343 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
344 let reader = match SegmentReader::open(
345 directory,
346 segment_id,
347 Arc::new(schema.clone()),
348 doc_offset,
349 term_cache_blocks,
350 )
351 .await
352 {
353 Ok(r) => r,
354 Err(e) => {
355 eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
356 return Err(e);
357 }
358 };
359 let num_docs = reader.meta().num_docs;
360 total_docs += num_docs;
361 doc_offset += num_docs;
362 readers.push(reader);
363 }
364
365 let (trained_centroids, trained_codebooks) = {
368 let meta = metadata.read().await;
369 meta.load_trained_structures(directory).await
370 };
371
372 let merger = SegmentMerger::new(Arc::new(schema.clone()));
373 let new_segment_id = SegmentId::new();
374
375 let merge_result = if !trained_centroids.is_empty() {
376 let trained = crate::segment::TrainedVectorStructures {
377 centroids: trained_centroids,
378 codebooks: trained_codebooks,
379 };
380 merger
381 .merge_with_ann(directory, &readers, new_segment_id, &trained)
382 .await
383 } else {
384 merger.merge(directory, &readers, new_segment_id).await
385 };
386
387 if let Err(e) = merge_result {
388 eprintln!(
389 "[merge] Merge failed for segments {:?} -> {}: {:?}",
390 segment_ids_to_merge,
391 new_segment_id.to_hex(),
392 e
393 );
394 return Err(e);
395 }
396
397 Ok((new_segment_id.to_hex(), total_docs))
399 }
400
401 pub async fn wait_for_merges(&self) {
403 while self.pending_merges.load(Ordering::SeqCst) > 0 {
404 self.merge_complete.notified().await;
405 }
406 }
407
408 pub async fn replace_segments(
411 &self,
412 new_segments: Vec<(String, u32)>,
413 old_to_delete: Vec<String>,
414 ) -> Result<()> {
415 for (seg_id, _) in &new_segments {
417 self.tracker.register(seg_id);
418 }
419
420 {
421 let mut meta = self.metadata.write().await;
422 meta.segment_metas.clear();
423 for (seg_id, num_docs) in new_segments {
424 meta.add_segment(seg_id, num_docs);
425 }
426 meta.save(self.directory.as_ref()).await?;
427 }
428
429 let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
431 for segment_id in ready_to_delete {
432 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
433 }
434 Ok(())
435 }
436
437 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
445 let registered_set: HashSet<String> = {
446 let meta = self.metadata.read().await;
447 meta.segment_metas.keys().cloned().collect()
448 };
449
450 let mut orphan_ids: HashSet<String> = HashSet::new();
452
453 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
455 for entry in entries {
456 let filename = entry.to_string_lossy();
457 if filename.starts_with("seg_") && filename.len() > 37 {
459 let hex_part = &filename[4..36];
461 if !registered_set.contains(hex_part) {
462 orphan_ids.insert(hex_part.to_string());
463 }
464 }
465 }
466 }
467
468 let mut deleted = 0;
470 for hex_id in &orphan_ids {
471 if let Some(segment_id) = SegmentId::from_hex(hex_id)
472 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
473 .await
474 .is_ok()
475 {
476 deleted += 1;
477 }
478 }
479
480 Ok(deleted)
481 }
482}