use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{Mutex as AsyncMutex, Notify};
use crate::directories::DirectoryWriter;
use crate::error::{Error, Result};
use crate::index::IndexMetadata;
use crate::segment::{SegmentId, SegmentMerger, SegmentReader, SegmentSnapshot, SegmentTracker};
use super::{MergePolicy, SegmentInfo};
pub struct SegmentManager<D: DirectoryWriter + 'static> {
directory: Arc<D>,
schema: Arc<crate::dsl::Schema>,
metadata: Arc<AsyncMutex<IndexMetadata>>,
merge_policy: Box<dyn MergePolicy>,
pending_merges: Arc<AtomicUsize>,
merging_segments: Arc<AsyncMutex<HashSet<String>>>,
term_cache_blocks: usize,
merge_complete: Arc<Notify>,
tracker: Arc<SegmentTracker>,
}
impl<D: DirectoryWriter + 'static> SegmentManager<D> {
pub fn new(
directory: Arc<D>,
schema: Arc<crate::dsl::Schema>,
metadata: IndexMetadata,
merge_policy: Box<dyn MergePolicy>,
term_cache_blocks: usize,
) -> Self {
let tracker = Arc::new(SegmentTracker::new());
for seg_id in &metadata.segments {
tracker.register(seg_id);
}
Self {
directory,
schema,
metadata: Arc::new(AsyncMutex::new(metadata)),
merge_policy,
pending_merges: Arc::new(AtomicUsize::new(0)),
merging_segments: Arc::new(AsyncMutex::new(HashSet::new())),
term_cache_blocks,
merge_complete: Arc::new(Notify::new()),
tracker,
}
}
pub async fn get_segment_ids(&self) -> Vec<String> {
self.metadata.lock().await.segments.clone()
}
pub async fn register_segment(&self, segment_id: String) -> Result<()> {
{
let mut meta = self.metadata.lock().await;
if !meta.segments.contains(&segment_id) {
meta.segments.push(segment_id.clone());
self.tracker.register(&segment_id);
}
meta.save(self.directory.as_ref()).await?;
}
self.maybe_merge().await;
Ok(())
}
pub fn pending_merge_count(&self) -> usize {
self.pending_merges.load(Ordering::SeqCst)
}
pub async fn maybe_merge(&self) {
let meta = self.metadata.lock().await;
let merging = self.merging_segments.lock().await;
let available_segments: Vec<String> = meta
.segments
.iter()
.filter(|id| !merging.contains(*id) && !self.tracker.is_pending_deletion(id))
.cloned()
.collect();
drop(merging);
drop(meta);
let segments: Vec<SegmentInfo> = available_segments
.iter()
.enumerate()
.map(|(i, id)| SegmentInfo {
id: id.clone(),
num_docs: ((i + 1) * 1000) as u32,
size_bytes: None,
})
.collect();
let candidates = self.merge_policy.find_merges(&segments);
for candidate in candidates {
if candidate.segment_ids.len() >= 2 {
self.spawn_merge(candidate.segment_ids).await;
}
}
}
async fn spawn_merge(&self, segment_ids_to_merge: Vec<String>) {
{
let mut merging = self.merging_segments.lock().await;
for id in &segment_ids_to_merge {
merging.insert(id.clone());
}
}
let directory = Arc::clone(&self.directory);
let schema = Arc::clone(&self.schema);
let metadata = Arc::clone(&self.metadata);
let merging_segments = Arc::clone(&self.merging_segments);
let pending_merges = Arc::clone(&self.pending_merges);
let merge_complete = Arc::clone(&self.merge_complete);
let tracker = Arc::clone(&self.tracker);
let term_cache_blocks = self.term_cache_blocks;
pending_merges.fetch_add(1, Ordering::SeqCst);
tokio::spawn(async move {
let result = Self::do_merge(
directory.as_ref(),
&schema,
&segment_ids_to_merge,
term_cache_blocks,
)
.await;
match result {
Ok(new_segment_id) => {
tracker.register(&new_segment_id);
let mut meta = metadata.lock().await;
meta.segments
.retain(|id| !segment_ids_to_merge.contains(id));
meta.segments.push(new_segment_id);
if let Err(e) = meta.save(directory.as_ref()).await {
eprintln!("[merge] Failed to save metadata after merge: {:?}", e);
}
drop(meta);
let ready_to_delete = tracker.mark_for_deletion(&segment_ids_to_merge);
for segment_id in ready_to_delete {
let _ =
crate::segment::delete_segment(directory.as_ref(), segment_id).await;
}
}
Err(e) => {
eprintln!(
"Background merge failed for segments {:?}: {:?}",
segment_ids_to_merge, e
);
}
}
let mut merging = merging_segments.lock().await;
for id in &segment_ids_to_merge {
merging.remove(id);
}
pending_merges.fetch_sub(1, Ordering::SeqCst);
merge_complete.notify_waiters();
});
}
async fn do_merge(
directory: &D,
schema: &crate::dsl::Schema,
segment_ids_to_merge: &[String],
term_cache_blocks: usize,
) -> Result<String> {
let mut readers = Vec::new();
let mut doc_offset = 0u32;
for id_str in segment_ids_to_merge {
let segment_id = SegmentId::from_hex(id_str)
.ok_or_else(|| Error::Corruption(format!("Invalid segment ID: {}", id_str)))?;
let reader = match SegmentReader::open(
directory,
segment_id,
Arc::new(schema.clone()),
doc_offset,
term_cache_blocks,
)
.await
{
Ok(r) => r,
Err(e) => {
eprintln!("[merge] Failed to open segment {}: {:?}", id_str, e);
return Err(e);
}
};
doc_offset += reader.meta().num_docs;
readers.push(reader);
}
let merger = SegmentMerger::new(Arc::new(schema.clone()));
let new_segment_id = SegmentId::new();
if let Err(e) = merger.merge(directory, &readers, new_segment_id).await {
eprintln!(
"[merge] Merge failed for segments {:?} -> {}: {:?}",
segment_ids_to_merge,
new_segment_id.to_hex(),
e
);
return Err(e);
}
Ok(new_segment_id.to_hex())
}
pub async fn wait_for_merges(&self) {
while self.pending_merges.load(Ordering::SeqCst) > 0 {
self.merge_complete.notified().await;
}
}
pub fn metadata(&self) -> Arc<AsyncMutex<IndexMetadata>> {
Arc::clone(&self.metadata)
}
pub async fn update_metadata<F>(&self, f: F) -> Result<()>
where
F: FnOnce(&mut IndexMetadata),
{
let mut meta = self.metadata.lock().await;
f(&mut meta);
meta.save(self.directory.as_ref()).await
}
pub async fn replace_segments(
&self,
new_segments: Vec<String>,
old_to_delete: Vec<String>,
) -> Result<()> {
for seg_id in &new_segments {
self.tracker.register(seg_id);
}
{
let mut meta = self.metadata.lock().await;
meta.segments = new_segments;
meta.save(self.directory.as_ref()).await?;
}
let ready_to_delete = self.tracker.mark_for_deletion(&old_to_delete);
for segment_id in ready_to_delete {
let _ = crate::segment::delete_segment(self.directory.as_ref(), segment_id).await;
}
Ok(())
}
pub async fn acquire_snapshot(&self) -> SegmentSnapshot<D> {
let meta = self.metadata.lock().await;
let acquired = self.tracker.acquire(&meta.segments);
drop(meta);
SegmentSnapshot::new(
Arc::clone(&self.tracker),
Arc::clone(&self.directory),
acquired,
)
}
pub fn tracker(&self) -> Arc<SegmentTracker> {
Arc::clone(&self.tracker)
}
pub fn directory(&self) -> Arc<D> {
Arc::clone(&self.directory)
}
pub async fn cleanup_orphan_segments(&self) -> Result<usize> {
let registered_set: HashSet<String> = {
let meta = self.metadata.lock().await;
meta.segments.iter().cloned().collect()
};
let mut orphan_ids: HashSet<String> = HashSet::new();
if let Ok(entries) = self.directory.list_files(std::path::Path::new("")).await {
for entry in entries {
let filename = entry.to_string_lossy();
if filename.starts_with("seg_") && filename.len() > 37 {
let hex_part = &filename[4..36];
if !registered_set.contains(hex_part) {
orphan_ids.insert(hex_part.to_string());
}
}
}
}
let mut deleted = 0;
for hex_id in &orphan_ids {
if let Some(segment_id) = SegmentId::from_hex(hex_id)
&& crate::segment::delete_segment(self.directory.as_ref(), segment_id)
.await
.is_ok()
{
deleted += 1;
}
}
Ok(deleted)
}
}