1use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::{Value, json};
12
13use crate::error::{Error, Result};
14use crate::search::merge_inner_hits;
15use crate::{Client, Search, SearchResponse};
16
17impl Client {
18 #[tracing::instrument(
38 name = "search.msearch",
39 skip_all,
40 fields(searches = B::LEN),
41 err,
42 )]
43 pub async fn msearch<B: MsearchBundle>(&self, bundle: B) -> Result<B::Output> {
44 let envelope = self.msearch_raw(bundle.ndjson()?).await?;
45 let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
46 bundle.decode(raw.responses)
47 }
48
49 #[tracing::instrument(
53 name = "search.msearch_all",
54 skip_all,
55 fields(searches = searches.len()),
56 err,
57 )]
58 pub async fn msearch_all<T>(&self, searches: &[Search<T>]) -> Result<Vec<SearchResponse<T>>>
59 where
60 T: DeserializeOwned,
61 {
62 if searches.is_empty() {
63 return Ok(Vec::new());
64 }
65 let mut lines = String::new();
66 for search in searches {
67 append_lines(search, &mut lines)?;
68 }
69 let envelope = self.msearch_raw(lines).await?;
70 let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
71 let mut entries = raw.responses.into_iter();
72 searches
73 .iter()
74 .enumerate()
75 .map(|(slot, search)| decode_slot(search, slot, entries.next()))
76 .collect()
77 }
78}
79
80pub trait MsearchBundle {
84 type Output;
86
87 const LEN: usize;
89
90 fn ndjson(&self) -> Result<String>;
93
94 fn decode(&self, responses: Vec<Value>) -> Result<Self::Output>;
96}
97
98fn append_lines<T>(search: &Search<T>, ndjson: &mut String) -> Result<()> {
101 let header = serde_json::to_string(&json!({ "index": search.physical_index() }))?;
102 let body = serde_json::to_string(&search.body())?;
103 ndjson.push_str(&header);
104 ndjson.push('\n');
105 ndjson.push_str(&body);
106 ndjson.push('\n');
107 Ok(())
108}
109
110fn decode_slot<T>(
113 search: &Search<T>,
114 slot: usize,
115 entry: Option<Value>,
116) -> Result<SearchResponse<T>>
117where
118 T: DeserializeOwned,
119{
120 let mut entry = entry.ok_or_else(|| Error::Msearch {
121 slot,
122 status: 0,
123 body: "missing response slot".to_owned(),
124 })?;
125 if let Some(error) = entry.get("error") {
126 let status = entry
127 .get("status")
128 .and_then(Value::as_u64)
129 .and_then(|status| u16::try_from(status).ok())
130 .unwrap_or(0);
131 return Err(Error::Msearch {
132 slot,
133 status,
134 body: error.to_string(),
135 });
136 }
137 let paths = search.nested_paths();
138 if !paths.is_empty() {
139 merge_inner_hits(&mut entry, &paths);
140 }
141 SearchResponse::from_value(entry)
142}
143
144#[derive(Deserialize)]
146struct RawMsearchResponse {
147 responses: Vec<Value>,
148}
149
150macro_rules! impl_msearch_bundle {
152 ($len:expr => $( $T:ident . $idx:tt ),+) => {
153 impl<$($T),+> MsearchBundle for ($(&Search<$T>,)+)
154 where
155 $($T: DeserializeOwned,)+
156 {
157 type Output = ($(SearchResponse<$T>,)+);
158
159 const LEN: usize = $len;
160
161 fn ndjson(&self) -> Result<String> {
162 let mut lines = String::new();
163 $( append_lines(self.$idx, &mut lines)?; )+
164 Ok(lines)
165 }
166
167 fn decode(&self, responses: Vec<Value>) -> Result<Self::Output> {
168 let mut entries = responses.into_iter();
169 Ok(( $( decode_slot(self.$idx, $idx, entries.next())?, )+ ))
170 }
171 }
172 };
173}
174
175impl_msearch_bundle!(1 => T0.0);
176impl_msearch_bundle!(2 => T0.0, T1.1);
177impl_msearch_bundle!(3 => T0.0, T1.1, T2.2);
178impl_msearch_bundle!(4 => T0.0, T1.1, T2.2, T3.3);
179impl_msearch_bundle!(5 => T0.0, T1.1, T2.2, T3.3, T4.4);
180impl_msearch_bundle!(6 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5);
181impl_msearch_bundle!(7 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6);
182impl_msearch_bundle!(8 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6, T7.7);