use serde::Deserialize;
use serde::de::DeserializeOwned;
use serde_json::{Value, json};
use crate::error::{Error, Result};
use crate::search::merge_inner_hits;
use crate::{Client, Search, SearchResponse};
impl Client {
#[tracing::instrument(
name = "search.msearch",
skip_all,
fields(searches = B::LEN),
err,
)]
pub async fn msearch<B: MsearchBundle>(&self, bundle: B) -> Result<B::Output> {
let envelope = self.msearch_raw(bundle.ndjson(&self.index_prefix)?).await?;
let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
bundle.decode(raw.responses)
}
#[tracing::instrument(
name = "search.msearch_all",
skip_all,
fields(searches = searches.len()),
err,
)]
pub async fn msearch_all<T>(&self, searches: &[Search<T>]) -> Result<Vec<SearchResponse<T>>>
where
T: DeserializeOwned,
{
if searches.is_empty() {
return Ok(Vec::new());
}
let mut lines = String::new();
for search in searches {
append_lines(search, &self.index_prefix, &mut lines)?;
}
let envelope = self.msearch_raw(lines).await?;
let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
let mut entries = raw.responses.into_iter();
searches
.iter()
.enumerate()
.map(|(slot, search)| decode_slot(search, slot, entries.next()))
.collect()
}
}
pub trait MsearchBundle {
type Output;
const LEN: usize;
fn ndjson(&self, prefix: &str) -> Result<String>;
fn decode(&self, responses: Vec<Value>) -> Result<Self::Output>;
}
fn append_lines<T>(search: &Search<T>, prefix: &str, ndjson: &mut String) -> Result<()> {
let index = format!("{prefix}{}", search.physical_index());
let header = serde_json::to_string(&json!({ "index": index }))?;
let body = serde_json::to_string(&search.body())?;
ndjson.push_str(&header);
ndjson.push('\n');
ndjson.push_str(&body);
ndjson.push('\n');
Ok(())
}
fn decode_slot<T>(
search: &Search<T>,
slot: usize,
entry: Option<Value>,
) -> Result<SearchResponse<T>>
where
T: DeserializeOwned,
{
let mut entry = entry.ok_or_else(|| Error::Msearch {
slot,
status: 0,
body: "missing response slot".to_owned(),
})?;
if let Some(error) = entry.get("error") {
let status = entry
.get("status")
.and_then(Value::as_u64)
.and_then(|status| u16::try_from(status).ok())
.unwrap_or(0);
tracing::warn!(
slot,
index = %search.physical_index(),
status,
"msearch slot failed"
);
return Err(Error::Msearch {
slot,
status,
body: error.to_string(),
});
}
let paths = search.nested_paths();
if !paths.is_empty() {
merge_inner_hits(&mut entry, &paths);
}
let page = SearchResponse::from_value(entry)?;
if page.is_partial() {
tracing::warn!(
slot,
index = %search.physical_index(),
timed_out = page.timed_out,
shards_failed = page.shards.failed,
"msearch slot returned partial results"
);
}
tracing::debug!(
slot,
total = page.total,
hits = page.hits.len(),
"msearch slot decoded"
);
Ok(page)
}
#[derive(Deserialize)]
struct RawMsearchResponse {
responses: Vec<Value>,
}
macro_rules! impl_msearch_bundle {
($len:expr => $( $T:ident . $idx:tt ),+) => {
impl<$($T),+> MsearchBundle for ($(&Search<$T>,)+)
where
$($T: DeserializeOwned,)+
{
type Output = ($(SearchResponse<$T>,)+);
const LEN: usize = $len;
fn ndjson(&self, prefix: &str) -> Result<String> {
let mut lines = String::new();
$( append_lines(self.$idx, prefix, &mut lines)?; )+
Ok(lines)
}
fn decode(&self, responses: Vec<Value>) -> Result<Self::Output> {
let mut entries = responses.into_iter();
Ok(( $( decode_slot(self.$idx, $idx, entries.next())?, )+ ))
}
}
};
}
impl_msearch_bundle!(1 => T0.0);
impl_msearch_bundle!(2 => T0.0, T1.1);
impl_msearch_bundle!(3 => T0.0, T1.1, T2.2);
impl_msearch_bundle!(4 => T0.0, T1.1, T2.2, T3.3);
impl_msearch_bundle!(5 => T0.0, T1.1, T2.2, T3.3, T4.4);
impl_msearch_bundle!(6 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5);
impl_msearch_bundle!(7 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6);
impl_msearch_bundle!(8 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6, T7.7);