1use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40const MAX_CONCURRENT_MERGES: usize = 2;
42
43pub struct SegmentManager<D: DirectoryWriter + 'static> {
51 directory: Arc<D>,
53 schema: Arc<crate::dsl::Schema>,
55 metadata: Arc<RwLock<IndexMetadata>>,
58 merge_policy: Box<dyn MergePolicy>,
60 pending_merges: Arc<AtomicUsize>,
62 merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64 term_cache_blocks: usize,
66 merge_complete: Arc<Notify>,
68 tracker: Arc<SegmentTracker>,
70 merge_paused: Arc<AtomicBool>,
72}
73
74impl<D: DirectoryWriter + 'static> SegmentManager<D> {
75 pub fn new(
77 directory: Arc<D>,
78 schema: Arc<crate::dsl::Schema>,
79 metadata: IndexMetadata,
80 merge_policy: Box<dyn MergePolicy>,
81 term_cache_blocks: usize,
82 ) -> Self {
83 let tracker = Arc::new(SegmentTracker::new());
85 for seg_id in metadata.segment_metas.keys() {
86 tracker.register(seg_id);
87 }
88
89 Self {
90 directory,
91 schema,
92 metadata: Arc::new(RwLock::new(metadata)),
93 merge_policy,
94 pending_merges: Arc::new(AtomicUsize::new(0)),
95 merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
96 term_cache_blocks,
97 merge_complete: Arc::new(Notify::new()),
98 tracker,
99 merge_paused: Arc::new(AtomicBool::new(false)),
100 }
101 }
102
103 pub async fn get_segment_ids(&self) -> Vec<String> {
105 self.metadata.read().await.segment_ids()
106 }
107
108 pub fn pending_merge_count(&self) -> usize {
110 self.pending_merges.load(Ordering::SeqCst)
111 }
112
113 pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
115 Arc::clone(&self.metadata)
116 }
117
118 pub async fn update_metadata<F>(&self, f: F) -> Result<()>
120 where
121 F: FnOnce(&mut IndexMetadata),
122 {
123 let mut meta = self.metadata.write().await;
124 f(&mut meta);
125 meta.save(self.directory.as_ref()).await
126 }
127
128 pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
131 let acquired = {
132 let meta = self.metadata.read().await;
133 let segment_ids = meta.segment_ids();
134 self.tracker.acquire(&segment_ids)
135 };
136
137 SegmentSnapshot::new(
138 Arc::clone(&self.tracker),
139 Arc::clone(&self.directory),
140 acquired,
141 )
142 }
143
144 pub fn tracker(&self) -> Arc<SegmentTracker> {
146 Arc::clone(&self.tracker)
147 }
148
149 pub fn directory(&self) -> Arc<D> {
151 Arc::clone(&self.directory)
152 }
153}
154
155#[cfg(feature = "native")]
157impl<D: DirectoryWriter + 'static> SegmentManager<D> {
158 pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
162 {
163 let mut meta = self.metadata.write().await;
164 if !meta.has_segment(&segment_id) {
165 meta.add_segment(segment_id.clone(), num_docs);
166 self.tracker.register(&segment_id);
167 }
168 meta.save(self.directory.as_ref()).await?;
169 }
170
171 self.maybe_merge().await;
173 Ok(())
174 }
175
176 pub async fn maybe_merge(&self) {
179 let segments: Vec<SegmentInfo> = {
181 let meta = self.metadata.read().await;
182 let merging = self.merging_segments.read();
183
184 meta.segment_metas
186 .iter()
187 .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
188 .map(|(id, info)| SegmentInfo {
189 id: id.clone(),
190 num_docs: info.num_docs,
191 size_bytes: None,
192 })
193 .collect()
194 };
195
196 let candidates = self.merge_policy.find_merges(&segments);
198
199 for candidate in candidates {
200 if candidate.segment_ids.len() >= 2 {
201 self.spawn_merge(candidate.segment_ids);
202 }
203 }
204 }
205
206 pub fn pause_merges(&self) {
208 self.merge_paused.store(true, Ordering::SeqCst);
209 }
210
211 pub fn resume_merges(&self) {
213 self.merge_paused.store(false, Ordering::SeqCst);
214 }
215
216 fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
218 if self.merge_paused.load(Ordering::SeqCst) {
220 return;
221 }
222 if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
224 return;
225 }
226
227 {
231 let mut merging = self.merging_segments.write();
232 if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
234 return;
236 }
237 for id in &segment_ids_to_merge {
239 merging.insert(id.clone());
240 }
241 }
242
243 let directory = Arc::clone(&self.directory);
244 let schema = Arc::clone(&self.schema);
245 let metadata = Arc::clone(&self.metadata);
246 let merging_segments = Arc::clone(&self.merging_segments);
247 let pending_merges = Arc::clone(&self.pending_merges);
248 let merge_complete = Arc::clone(&self.merge_complete);
249 let tracker = Arc::clone(&self.tracker);
250 let term_cache_blocks = self.term_cache_blocks;
251
252 pending_merges.fetch_add(1, Ordering::SeqCst);
253
254 tokio::spawn(async move {
255 let result = Self::do_merge(
256 directory.as_ref(),
257 &schema,
258 &segment_ids_to_merge,
259 term_cache_blocks,
260 &metadata,
261 )
262 .await;
263
264 match result {
265 Ok((new_segment_id, merged_doc_count)) => {
266 tracker.register(&new_segment_id);
268
269 {
271 let mut meta = metadata.write().await;
272 for id in &segment_ids_to_merge {
273 meta.remove_segment(id);
274 }
275 meta.add_segment(new_segment_id, merged_doc_count);
276 if let Err(e) = meta.save(directory.as_ref()).await {
277 eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
278 }
279 }
280
281 let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
283 for segment_id in ready_to_delete {
284 let _ =
285 crate::segment::delete_segment(directory.as_ref(), segment_id).await;
286 }
287 }
288 Err(e) => {
289 eprintln!(
290 "Background merge failed for segments {:?}: {:?}",
291 segment_ids_to_merge, e
292 );
293 }
294 }
295
296 {
298 let mut merging = merging_segments.write();
299 for id in &segment_ids_to_merge {
300 merging.remove(id);
301 }
302 }
303
304 pending_merges.fetch_sub(1, Ordering::SeqCst);
306 merge_complete.notify_waiters();
307 });
308 }
309
310 async fn do_merge(
313 directory: &D,
314 schema: &crate::dsl::Schema,
315 segment_ids_to_merge: &[String],
316 term_cache_blocks: usize,
317 metadata: &RwLock<IndexMetadata>,
318 ) -> Result<(String, u32)> {
319 let mut readers = Vec::new();
321 let mut doc_offset = 0u32;
322 let mut total_docs = 0u32;
323
324 for id_str in segment_ids_to_merge {
325 let segment_id = SegmentId::from_hex(id_str)
326 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
327 let reader = match SegmentReader::open(
328 directory,
329 segment_id,
330 Arc::new(schema.clone()),
331 doc_offset,
332 term_cache_blocks,
333 )
334 .await
335 {
336 Ok(r) => r,
337 Err(e) => {
338 eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
339 return Err(e);
340 }
341 };
342 let num_docs = reader.meta().num_docs;
343 total_docs += num_docs;
344 doc_offset += num_docs;
345 readers.push(reader);
346 }
347
348 let (trained_centroids, trained_codebooks) = {
351 let meta = metadata.read().await;
352 meta.load_trained_structures(directory).await
353 };
354
355 let merger = SegmentMerger::new(Arc::new(schema.clone()));
356 let new_segment_id = SegmentId::new();
357
358 let merge_result = if !trained_centroids.is_empty() {
359 let trained = crate::segment::TrainedVectorStructures {
360 centroids: trained_centroids,
361 codebooks: trained_codebooks,
362 };
363 merger
364 .merge_with_ann(directory, &readers, new_segment_id, &trained)
365 .await
366 } else {
367 merger.merge(directory, &readers, new_segment_id).await
368 };
369
370 if let Err(e) = merge_result {
371 eprintln!(
372 "[merge] Merge failed for segments {:?} -> {}: {:?}",
373 segment_ids_to_merge,
374 new_segment_id.to_hex(),
375 e
376 );
377 return Err(e);
378 }
379
380 Ok((new_segment_id.to_hex(), total_docs))
382 }
383
384 pub async fn wait_for_merges(&self) {
386 while self.pending_merges.load(Ordering::SeqCst) > 0 {
387 self.merge_complete.notified().await;
388 }
389 }
390
391 pub async fn replace_segments(
394 &self,
395 new_segments: Vec<(String, u32)>,
396 old_to_delete: Vec<String>,
397 ) -> Result<()> {
398 for (seg_id, _) in &new_segments {
400 self.tracker.register(seg_id);
401 }
402
403 {
404 let mut meta = self.metadata.write().await;
405 meta.segment_metas.clear();
406 for (seg_id, num_docs) in new_segments {
407 meta.add_segment(seg_id, num_docs);
408 }
409 meta.save(self.directory.as_ref()).await?;
410 }
411
412 let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
414 for segment_id in ready_to_delete {
415 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
416 }
417 Ok(())
418 }
419
420 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
428 let registered_set: HashSet<String> = {
429 let meta = self.metadata.read().await;
430 meta.segment_metas.keys().cloned().collect()
431 };
432
433 let mut orphan_ids: HashSet<String> = HashSet::new();
435
436 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
438 for entry in entries {
439 let filename = entry.to_string_lossy();
440 if filename.starts_with("seg_") && filename.len() > 37 {
442 let hex_part = &filename[4..36];
444 if !registered_set.contains(hex_part) {
445 orphan_ids.insert(hex_part.to_string());
446 }
447 }
448 }
449 }
450
451 let mut deleted = 0;
453 for hex_id in &orphan_ids {
454 if let Some(segment_id) = SegmentId::from_hex(hex_id)
455 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
456 .await
457 .is_ok()
458 {
459 deleted += 1;
460 }
461 }
462
463 Ok(deleted)
464 }
465}