1use std::collections::HashSet;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27
28use parking_lot::RwLock as SyncRwLock;
29use tokio::sync::{Notify, RwLock};
30
31use crate::directories::DirectoryWriter;
32use crate::error::{Error, Result};
33use crate::index::IndexMetadata;
34use crate::segment::{SegmentId, SegmentSnapshot, SegmentTracker};
35#[cfg(feature = "native")]
36use crate::segment::{SegmentMerger, SegmentReader};
37
38use super::{MergePolicy, SegmentInfo};
39
40const MAX_CONCURRENT_MERGES: usize = 2;
42
43pub struct SegmentManager<D: DirectoryWriter + 'static> {
51 directory: Arc<D>,
53 schema: Arc<crate::dsl::Schema>,
55 metadata: Arc<RwLock<IndexMetadata>>,
58 merge_policy: Box<dyn MergePolicy>,
60 pending_merges: Arc<AtomicUsize>,
62 merging_segments: Arc<SyncRwLock<HashSet<String>>>,
64 term_cache_blocks: usize,
66 merge_complete: Arc<Notify>,
68 tracker: Arc<SegmentTracker>,
70 merge_paused: Arc<AtomicBool>,
72}
73
74impl<D: DirectoryWriter + 'static> SegmentManager<D> {
75 pub fn new(
77 directory: Arc<D>,
78 schema: Arc<crate::dsl::Schema>,
79 metadata: IndexMetadata,
80 merge_policy: Box<dyn MergePolicy>,
81 term_cache_blocks: usize,
82 ) -> Self {
83 let tracker = Arc::new(SegmentTracker::new());
85 for seg_id in metadata.segment_metas.keys() {
86 tracker.register(seg_id);
87 }
88
89 Self {
90 directory,
91 schema,
92 metadata: Arc::new(RwLock::new(metadata)),
93 merge_policy,
94 pending_merges: Arc::new(AtomicUsize::new(0)),
95 merging_segments: Arc::new(SyncRwLock::new(HashSet::new())),
96 term_cache_blocks,
97 merge_complete: Arc::new(Notify::new()),
98 tracker,
99 merge_paused: Arc::new(AtomicBool::new(false)),
100 }
101 }
102
103 pub async fn get_segment_ids(&self) -> Vec<String> {
105 self.metadata.read().await.segment_ids()
106 }
107
108 pub fn pending_merge_count(&self) -> usize {
110 self.pending_merges.load(Ordering::SeqCst)
111 }
112
113 pub fn metadata(&self) -> Arc<RwLock<IndexMetadata>> {
115 Arc::clone(&self.metadata)
116 }
117
118 pub async fn update_metadata<F>(&self, f: F) -> Result<()>
120 where
121 F: FnOnce(&mut IndexMetadata),
122 {
123 let mut meta = self.metadata.write().await;
124 f(&mut meta);
125 meta.save(self.directory.as_ref()).await
126 }
127
128 pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
131 let acquired = {
132 let meta = self.metadata.read().await;
133 let segment_ids = meta.segment_ids();
134 self.tracker.acquire(&segment_ids)
135 };
136
137 let dir = Arc::clone(&self.directory);
140 let delete_fn: Arc<dyn Fn(Vec<SegmentId>) + Send + Sync> = Arc::new(move |segment_ids| {
141 let dir = Arc::clone(&dir);
142 tokio::spawn(async move {
143 for segment_id in segment_ids {
144 log::info!(
145 "[segment_cleanup] deleting deferred segment {}",
146 segment_id.0
147 );
148 let _ = crate::segment::delete_segment(dir.as_ref(), segment_id).await;
149 }
150 });
151 });
152
153 SegmentSnapshot::with_delete_fn(Arc::clone(&self.tracker), acquired, delete_fn)
154 }
155
156 pub fn tracker(&self) -> Arc<SegmentTracker> {
158 Arc::clone(&self.tracker)
159 }
160
161 pub fn directory(&self) -> Arc<D> {
163 Arc::clone(&self.directory)
164 }
165}
166
167#[cfg(feature = "native")]
169impl<D: DirectoryWriter + 'static> SegmentManager<D> {
170 pub async fn register_segment(&self, segment_id: String, num_docs: u32) -> Result<()> {
174 {
175 let mut meta = self.metadata.write().await;
176 if !meta.has_segment(&segment_id) {
177 meta.add_segment(segment_id.clone(), num_docs);
178 self.tracker.register(&segment_id);
179 }
180 meta.save(self.directory.as_ref()).await?;
181 }
182
183 self.maybe_merge().await;
185 Ok(())
186 }
187
188 pub async fn maybe_merge(&self) {
191 let segments: Vec<SegmentInfo> = {
193 let meta = self.metadata.read().await;
194 let merging = self.merging_segments.read();
195
196 meta.segment_metas
198 .iter()
199 .filter(|(id, _)| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
200 .map(|(id, info)| SegmentInfo {
201 id: id.clone(),
202 num_docs: info.num_docs,
203 size_bytes: None,
204 })
205 .collect()
206 };
207
208 let candidates = self.merge_policy.find_merges(&segments);
210
211 for candidate in candidates {
212 if candidate.segment_ids.len() >= 2 {
213 self.spawn_merge(candidate.segment_ids);
214 }
215 }
216 }
217
218 pub fn pause_merges(&self) {
220 self.merge_paused.store(true, Ordering::SeqCst);
221 }
222
223 pub fn resume_merges(&self) {
225 self.merge_paused.store(false, Ordering::SeqCst);
226 }
227
228 fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
230 if self.merge_paused.load(Ordering::SeqCst) {
232 return;
233 }
234 if self.pending_merges.load(Ordering::SeqCst) >= MAX_CONCURRENT_MERGES {
236 return;
237 }
238
239 {
243 let mut merging = self.merging_segments.write();
244 if segment_ids_to_merge.iter().any(|id| merging.contains(id)) {
246 return;
248 }
249 for id in &segment_ids_to_merge {
251 merging.insert(id.clone());
252 }
253 }
254
255 let directory = Arc::clone(&self.directory);
256 let schema = Arc::clone(&self.schema);
257 let metadata = Arc::clone(&self.metadata);
258 let merging_segments = Arc::clone(&self.merging_segments);
259 let pending_merges = Arc::clone(&self.pending_merges);
260 let merge_complete = Arc::clone(&self.merge_complete);
261 let tracker = Arc::clone(&self.tracker);
262 let term_cache_blocks = self.term_cache_blocks;
263
264 pending_merges.fetch_add(1, Ordering::SeqCst);
265
266 tokio::spawn(async move {
267 let result = Self::do_merge(
268 directory.as_ref(),
269 &schema,
270 &segment_ids_to_merge,
271 term_cache_blocks,
272 &metadata,
273 )
274 .await;
275
276 match result {
277 Ok((new_segment_id, merged_doc_count)) => {
278 tracker.register(&new_segment_id);
280
281 {
283 let mut meta = metadata.write().await;
284 for id in &segment_ids_to_merge {
285 meta.remove_segment(id);
286 }
287 meta.add_segment(new_segment_id, merged_doc_count);
288 if let Err(e) = meta.save(directory.as_ref()).await {
289 log::error!("[merge] Failed to save metadata after merge: {:?}", e);
290 }
291 }
292
293 let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
295 for segment_id in ready_to_delete {
296 let _ =
297 crate::segment::delete_segment(directory.as_ref(), segment_id).await;
298 }
299 }
300 Err(e) => {
301 log::error!(
302 "[merge] Background merge failed for segments {:?}: {:?}",
303 segment_ids_to_merge,
304 e
305 );
306 }
307 }
308
309 {
311 let mut merging = merging_segments.write();
312 for id in &segment_ids_to_merge {
313 merging.remove(id);
314 }
315 }
316
317 pending_merges.fetch_sub(1, Ordering::SeqCst);
319 merge_complete.notify_waiters();
320 });
321 }
322
323 pub async fn do_merge(
327 directory: &D,
328 schema: &crate::dsl::Schema,
329 segment_ids_to_merge: &[String],
330 term_cache_blocks: usize,
331 metadata: &RwLock<IndexMetadata>,
332 ) -> Result<(String, u32)> {
333 let load_start = std::time::Instant::now();
334
335 let segment_ids: Vec<SegmentId> = segment_ids_to_merge
337 .iter()
338 .map(|id_str| {
339 SegmentId::from_hex(id_str)
340 .ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))
341 })
342 .collect::<Result<Vec<_>>>()?;
343
344 let schema_arc = Arc::new(schema.clone());
346 let futures: Vec<_> = segment_ids
347 .iter()
348 .map(|&sid| {
349 let sch = Arc::clone(&schema_arc);
350 async move { SegmentReader::open(directory, sid, sch, 0, term_cache_blocks).await }
351 })
352 .collect();
353
354 let results = futures::future::join_all(futures).await;
355 let mut readers = Vec::with_capacity(results.len());
356 let mut total_docs = 0u32;
357 for (i, result) in results.into_iter().enumerate() {
358 match result {
359 Ok(r) => {
360 total_docs += r.meta().num_docs;
361 readers.push(r);
362 }
363 Err(e) => {
364 log::error!(
365 "[merge] Failed to open segment {}: {:?}",
366 segment_ids_to_merge[i],
367 e
368 );
369 return Err(e);
370 }
371 }
372 }
373
374 log::info!(
375 "[merge] loaded {} segment readers in {:.1}s",
376 readers.len(),
377 load_start.elapsed().as_secs_f64()
378 );
379
380 let trained = {
382 let meta = metadata.read().await;
383 meta.load_trained_structures(directory).await
384 };
385
386 let merger = SegmentMerger::new(schema_arc);
387 let new_segment_id = SegmentId::new();
388
389 log::info!(
390 "[merge] {} segments -> {} (trained={})",
391 segment_ids_to_merge.len(),
392 new_segment_id.to_hex(),
393 trained.as_ref().map_or(0, |t| t.centroids.len())
394 );
395
396 let merge_result = merger
397 .merge(directory, &readers, new_segment_id, trained.as_ref())
398 .await;
399
400 if let Err(e) = merge_result {
401 log::error!(
402 "[merge] Merge failed for segments {:?} -> {}: {:?}",
403 segment_ids_to_merge,
404 new_segment_id.to_hex(),
405 e
406 );
407 return Err(e);
408 }
409
410 log::info!(
411 "[merge] total wall-clock: {:.1}s ({} segments, {} docs)",
412 load_start.elapsed().as_secs_f64(),
413 readers.len(),
414 total_docs,
415 );
416
417 Ok((new_segment_id.to_hex(), total_docs))
419 }
420
421 pub async fn wait_for_merges(&self) {
423 while self.pending_merges.load(Ordering::SeqCst) > 0 {
424 self.merge_complete.notified().await;
425 }
426 }
427
428 pub async fn replace_segments(
434 &self,
435 new_segments: Vec<(String, u32)>,
436 old_to_delete: Vec<String>,
437 ) -> Result<()> {
438 for (seg_id, _) in &new_segments {
440 self.tracker.register(seg_id);
441 }
442
443 {
444 let mut meta = self.metadata.write().await;
445 for id in &old_to_delete {
446 meta.remove_segment(id);
447 }
448 for (seg_id, num_docs) in new_segments {
449 meta.add_segment(seg_id, num_docs);
450 }
451 meta.save(self.directory.as_ref()).await?;
452 }
453
454 let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
456 for segment_id in ready_to_delete {
457 let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
458 }
459 Ok(())
460 }
461
462 pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
470 let registered_set: HashSet<String> = {
471 let meta = self.metadata.read().await;
472 meta.segment_metas.keys().cloned().collect()
473 };
474
475 let mut orphan_ids: HashSet<String> = HashSet::new();
477
478 if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
480 for entry in entries {
481 let filename = entry.to_string_lossy();
482 if filename.starts_with("seg_") && filename.len() > 37 {
484 let hex_part = &filename[4..36];
486 if !registered_set.contains(hex_part) {
487 orphan_ids.insert(hex_part.to_string());
488 }
489 }
490 }
491 }
492
493 let mut deleted = 0;
495 for hex_id in &orphan_ids {
496 if let Some(segment_id) = SegmentId::from_hex(hex_id)
497 && crate::segment::delete_segment(self.directory.as_ref(), segment_id)
498 .await
499 .is_ok()
500 {
501 deleted += 1;
502 }
503 }
504
505 Ok(deleted)
506 }
507}