use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use arc_swap::ArcSwap;
use parking_lot::RwLock;
use crate::directories::DirectoryWriter;
use crate::dsl::Schema;
use crate::error::Result;
use super::Searcher;
struct SearcherState<D: DirectoryWriter + 'static> {
searcher: Arc<Searcher<D>>,
segment_ids: Vec<String>,
}
pub struct IndexReader<D: DirectoryWriter + 'static> {
schema: Arc<Schema>,
segment_manager: Arc<crate::merge::SegmentManager<D>>,
state: ArcSwap<SearcherState<D>>,
term_cache_blocks: usize,
last_reload_check: RwLock<std::time::Instant>,
reload_check_interval: std::time::Duration,
reloading: AtomicBool,
}
impl<D: DirectoryWriter + 'static> IndexReader<D> {
pub async fn from_segment_manager(
schema: Arc<Schema>,
segment_manager: Arc<crate::merge::SegmentManager<D>>,
term_cache_blocks: usize,
reload_interval_ms: u64,
) -> Result<Self> {
let initial_segment_ids = segment_manager.get_segment_ids().await;
let reader = Self::create_reader(&schema, &segment_manager, term_cache_blocks).await?;
Ok(Self {
schema,
segment_manager,
state: ArcSwap::from_pointee(SearcherState {
searcher: Arc::new(reader),
segment_ids: initial_segment_ids,
}),
term_cache_blocks,
last_reload_check: RwLock::new(std::time::Instant::now()),
reload_check_interval: std::time::Duration::from_millis(reload_interval_ms),
reloading: AtomicBool::new(false),
})
}
async fn create_reader(
schema: &Arc<Schema>,
segment_manager: &Arc<crate::merge::SegmentManager<D>>,
term_cache_blocks: usize,
) -> Result<Searcher<D>> {
let trained = segment_manager.trained();
let trained_centroids = trained
.as_ref()
.map(|t| t.centroids.clone())
.unwrap_or_default();
let snapshot = segment_manager.acquire_snapshot().await;
Searcher::from_snapshot(
segment_manager.directory(),
Arc::clone(schema),
snapshot,
trained_centroids,
term_cache_blocks,
)
.await
}
pub fn set_reload_interval(&mut self, interval: std::time::Duration) {
self.reload_check_interval = interval;
}
pub async fn searcher(&self) -> Result<Arc<Searcher<D>>> {
let should_check = {
let last = self.last_reload_check.read();
last.elapsed() >= self.reload_check_interval
};
if should_check {
if self
.reloading
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
let result = self.do_reload_check().await;
self.reloading.store(false, Ordering::Release);
result?;
}
}
Ok(Arc::clone(&self.state.load().searcher))
}
async fn do_reload_check(&self) -> Result<()> {
*self.last_reload_check.write() = std::time::Instant::now();
let new_segment_ids = self.segment_manager.get_segment_ids().await;
let segments_changed = {
let state = self.state.load();
state.segment_ids != new_segment_ids
};
if segments_changed {
let old_count = self.state.load().segment_ids.len();
let new_count = new_segment_ids.len();
log::info!(
"[index_reload] old_count={} new_count={}",
old_count,
new_count
);
self.reload_with_segments(new_segment_ids).await?;
}
Ok(())
}
pub async fn reload(&self) -> Result<()> {
loop {
if self
.reloading
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
break;
}
tokio::task::yield_now().await;
}
let new_segment_ids = self.segment_manager.get_segment_ids().await;
let segments_changed = {
let state = self.state.load();
state.segment_ids != new_segment_ids
};
let result = if segments_changed {
self.reload_with_segments(new_segment_ids).await
} else {
log::debug!("[reload] segments unchanged, skipping");
Ok(())
};
self.reloading.store(false, Ordering::Release);
result
}
async fn reload_with_segments(&self, new_segment_ids: Vec<String>) -> Result<()> {
let existing_segments: Vec<Arc<crate::segment::SegmentReader>> =
self.state.load().searcher.segment_readers().to_vec();
let trained = self.segment_manager.trained();
let trained_centroids = trained
.as_ref()
.map(|t| t.centroids.clone())
.unwrap_or_default();
let snapshot = self.segment_manager.acquire_snapshot().await;
let new_reader = Searcher::from_snapshot_reuse(
self.segment_manager.directory(),
Arc::clone(&self.schema),
snapshot,
trained_centroids,
self.term_cache_blocks,
&existing_segments,
)
.await?;
self.state.store(Arc::new(SearcherState {
searcher: Arc::new(new_reader),
segment_ids: new_segment_ids,
}));
Ok(())
}
pub fn schema(&self) -> &Schema {
&self.schema
}
}