use std::{future::Future, sync::Arc, time::Instant};
use futures::future::try_join_all;
use uuid::Uuid;
use super::SuperfileHit;
use crate::{
storage::StorageProvider,
superfile::SuperfileReader,
supertable::{
error::QueryError,
handle::SupertableReader,
manifest::SuperfileEntry,
query::superfile_reader::superfile_reader,
reader_cache::{DiskCacheStore, SuperfileReaderCache},
tombstones::SidecarCache,
},
};
pub(crate) async fn open_reader(
store: &Arc<dyn SuperfileReaderCache>,
disk_cache: Option<&Arc<DiskCacheStore>>,
storage: Option<&Arc<dyn StorageProvider>>,
entry: &SuperfileEntry,
) -> Result<Arc<SuperfileReader>, QueryError> {
superfile_reader(
store,
disk_cache,
storage,
&entry.uri,
entry.subsection_offsets.as_ref(),
)
.await
.map_err(|e| QueryError::Store(e.to_string()))
}
pub(crate) fn tag_hits(entry: &SuperfileEntry, hits: Vec<(u32, f32)>) -> Vec<SuperfileHit> {
hits.into_iter()
.map(|(local_doc_id, score)| SuperfileHit {
superfile: entry.uri,
local_doc_id,
score,
})
.collect()
}
pub(crate) fn apply_tombstone_filter(
cache: Option<&Arc<SidecarCache>>,
entry: &SuperfileEntry,
hits: &mut Vec<SuperfileHit>,
now: Instant,
) -> Result<(), QueryError> {
let Some(cache) = cache else {
return Ok(());
};
let bitmap = cache
.bitmap_for(entry.superfile_id, now)
.map_err(|e| QueryError::Store(format!("tombstone cache: {e}")))?;
if bitmap.is_empty() {
return Ok(());
}
hits.retain(|h| !bitmap.contains(h.local_doc_id));
Ok(())
}
pub(crate) async fn fanout<P, K, Fut>(
reader: &SupertableReader,
units: Vec<(Arc<SuperfileEntry>, P)>,
kernel: K,
) -> Result<Vec<Vec<SuperfileHit>>, QueryError>
where
P: Send + 'static,
K: Fn(Arc<SuperfileReader>, P) -> Fut + Clone + Send + 'static,
Fut: Future<Output = Result<Vec<(u32, f32)>, QueryError>> + Send + 'static,
{
fanout_with(
reader,
units,
move |r, entry, tombstone_cache, now, params| {
let kernel = kernel.clone();
async move {
let hits = kernel(r, params).await?;
let mut tagged = tag_hits(&entry, hits);
apply_tombstone_filter(tombstone_cache.as_ref(), &entry, &mut tagged, now)?;
Ok::<Vec<SuperfileHit>, QueryError>(tagged)
}
},
)
.await
}
pub(crate) async fn fanout_with<P, R, B, Fut>(
reader: &SupertableReader,
units: Vec<(Arc<SuperfileEntry>, P)>,
body: B,
) -> Result<Vec<R>, QueryError>
where
P: Send + 'static,
R: Send + 'static,
B: Fn(Arc<SuperfileReader>, Arc<SuperfileEntry>, Option<Arc<SidecarCache>>, Instant, P) -> Fut
+ Clone
+ Send
+ 'static,
Fut: Future<Output = Result<R, QueryError>> + Send + 'static,
{
if units.is_empty() {
return Ok(Vec::new());
}
let manifest = reader.manifest();
let store = Arc::clone(&manifest.options.store);
let disk_cache = manifest.options.disk_cache.as_ref().map(Arc::clone);
let storage = manifest.options.storage.as_ref().map(Arc::clone);
let tombstone_cache = reader.tombstone_cache.clone();
let now = Instant::now();
if let Some(cache) = tombstone_cache.as_ref() {
let mut ids: Vec<Uuid> = units.iter().map(|(e, _)| e.superfile_id).collect();
ids.sort_unstable();
ids.dedup();
cache.prefetch(&ids, now).await;
}
let handles = units.into_iter().map(|(entry, params)| {
let store = Arc::clone(&store);
let disk_cache = disk_cache.clone();
let storage = storage.clone();
let tombstone_cache = tombstone_cache.clone();
let body = body.clone();
let handle = tokio::spawn(async move {
let r = open_reader(&store, disk_cache.as_ref(), storage.as_ref(), &entry).await?;
body(r, entry, tombstone_cache, now, params).await
});
async move {
handle
.await
.map_err(|e| QueryError::Store(format!("fan-out task join: {e}")))?
}
});
try_join_all(handles).await
}