use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Context;
use futures::stream::{StreamExt, TryStreamExt};
use itertools::Itertools;
use quickwit_proto::{FetchDocsResponse, PartialHit, SplitIdAndFooterOffsets};
use quickwit_storage::Storage;
use tantivy::{IndexReader, ReloadPolicy};
use tracing::error;
use crate::leaf::open_index;
use crate::GlobalDocAddress;
#[allow(clippy::needless_lifetimes)]
async fn fetch_docs_to_map(
mut global_doc_addrs: Vec<GlobalDocAddress>,
index_storage: Arc<dyn Storage>,
splits: &[SplitIdAndFooterOffsets],
) -> anyhow::Result<HashMap<GlobalDocAddress, String>> {
let mut split_fetch_docs_futures = Vec::new();
let split_offsets_map: HashMap<&str, &SplitIdAndFooterOffsets> = splits
.iter()
.map(|split| (split.split_id.as_str(), split))
.collect();
global_doc_addrs.sort_by(|a, b| a.split.cmp(&b.split));
for (split_id, global_doc_addrs) in global_doc_addrs
.iter()
.group_by(|global_doc_addr| global_doc_addr.split.as_str())
.into_iter()
{
let global_doc_addrs: Vec<GlobalDocAddress> =
global_doc_addrs.into_iter().cloned().collect();
let split_and_offset = split_offsets_map
.get(split_id)
.ok_or_else(|| anyhow::anyhow!("Failed to find offset for split {}", split_id))?;
split_fetch_docs_futures.push(fetch_docs_in_split(
global_doc_addrs,
index_storage.clone(),
*split_and_offset,
));
}
let split_fetch_docs: Vec<Vec<(GlobalDocAddress, String)>> = futures::future::try_join_all(
split_fetch_docs_futures,
)
.await
.map_err(|error| {
let split_ids = splits
.iter()
.map(|split| split.split_id.clone())
.collect_vec();
error!(split_ids = ?split_ids, error = ?error, "Error when fetching docs in splits.");
anyhow::anyhow!(
"Error when fetching docs for splits {:?}: {:?}.",
split_ids,
error
)
})?;
let global_doc_addr_to_doc_json: HashMap<GlobalDocAddress, String> = split_fetch_docs
.into_iter()
.flat_map(|docs| docs.into_iter())
.collect();
Ok(global_doc_addr_to_doc_json)
}
pub async fn fetch_docs(
partial_hits: Vec<PartialHit>,
index_storage: Arc<dyn Storage>,
splits: &[SplitIdAndFooterOffsets],
) -> anyhow::Result<FetchDocsResponse> {
let global_doc_addrs: Vec<GlobalDocAddress> = partial_hits
.iter()
.map(GlobalDocAddress::from_partial_hit)
.collect();
let mut global_doc_addr_to_doc_json =
fetch_docs_to_map(global_doc_addrs, index_storage, splits).await?;
let hits: Vec<quickwit_proto::LeafHit> = partial_hits
.iter()
.flat_map(|partial_hit| {
let global_doc_addr = GlobalDocAddress::from_partial_hit(partial_hit);
if let Some((_, leaf_json)) = global_doc_addr_to_doc_json.remove_entry(&global_doc_addr)
{
Some(quickwit_proto::LeafHit {
leaf_json,
partial_hit: Some(partial_hit.clone()),
})
} else {
None
}
})
.collect();
Ok(FetchDocsResponse { hits })
}
async fn get_searcher_for_split(
num_searchers: usize,
index_storage: Arc<dyn Storage>,
split: &SplitIdAndFooterOffsets,
) -> anyhow::Result<IndexReader> {
let index = open_index(index_storage, split)
.await
.with_context(|| "open-index-for-split")?;
let reader = index
.reader_builder()
.num_searchers(num_searchers)
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
Ok(reader)
}
#[tracing::instrument(skip(global_doc_addrs, index_storage, split))]
#[allow(clippy::needless_lifetimes)]
async fn fetch_docs_in_split(
global_doc_addrs: Vec<GlobalDocAddress>,
index_storage: Arc<dyn Storage>,
split: &SplitIdAndFooterOffsets,
) -> anyhow::Result<Vec<(GlobalDocAddress, String)>> {
let index_reader = get_searcher_for_split(global_doc_addrs.len(), index_storage, split).await?;
let doc_futures = global_doc_addrs.into_iter().map(|global_doc_addr| {
let searcher = index_reader.searcher();
async move {
let doc = searcher
.doc_async(global_doc_addr.doc_addr)
.await
.context("searcher-doc-async")?;
let doc_json = searcher.schema().to_json(&doc);
Ok((global_doc_addr, doc_json))
}
});
let stream = futures::stream::iter(doc_futures).buffer_unordered(10);
stream.try_collect::<Vec<_>>().await
}