1use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::{Value, json};
12
13use crate::error::{Error, Result};
14use crate::search::merge_inner_hits;
15use crate::{Client, Search, SearchResponse};
16
17impl Client {
18 #[tracing::instrument(
38 name = "search.msearch",
39 skip_all,
40 fields(searches = B::LEN),
41 err,
42 )]
43 pub async fn msearch<B: MsearchBundle>(&self, bundle: B) -> Result<B::Output> {
44 let envelope = self.msearch_raw(bundle.ndjson(&self.index_prefix)?).await?;
45 let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
46 bundle.decode(raw.responses)
47 }
48
49 #[tracing::instrument(
53 name = "search.msearch_all",
54 skip_all,
55 fields(searches = searches.len()),
56 err,
57 )]
58 pub async fn msearch_all<T>(&self, searches: &[Search<T>]) -> Result<Vec<SearchResponse<T>>>
59 where
60 T: DeserializeOwned,
61 {
62 if searches.is_empty() {
63 return Ok(Vec::new());
64 }
65 let mut lines = String::new();
66 for search in searches {
67 append_lines(search, &self.index_prefix, &mut lines)?;
68 }
69 let envelope = self.msearch_raw(lines).await?;
70 let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
71 let mut entries = raw.responses.into_iter();
72 searches
73 .iter()
74 .enumerate()
75 .map(|(slot, search)| decode_slot(search, slot, entries.next()))
76 .collect()
77 }
78}
79
80pub trait MsearchBundle {
84 type Output;
86
87 const LEN: usize;
89
90 fn ndjson(&self, prefix: &str) -> Result<String>;
94
95 fn decode(&self, responses: Vec<Value>) -> Result<Self::Output>;
97}
98
99fn append_lines<T>(search: &Search<T>, prefix: &str, ndjson: &mut String) -> Result<()> {
103 let index = format!("{prefix}{}", search.physical_index());
104 let header = serde_json::to_string(&json!({ "index": index }))?;
105 let body = serde_json::to_string(&search.body())?;
106 ndjson.push_str(&header);
107 ndjson.push('\n');
108 ndjson.push_str(&body);
109 ndjson.push('\n');
110 Ok(())
111}
112
113fn decode_slot<T>(
116 search: &Search<T>,
117 slot: usize,
118 entry: Option<Value>,
119) -> Result<SearchResponse<T>>
120where
121 T: DeserializeOwned,
122{
123 let mut entry = entry.ok_or_else(|| Error::Msearch {
124 slot,
125 status: 0,
126 body: "missing response slot".to_owned(),
127 })?;
128 if let Some(error) = entry.get("error") {
129 let status = entry
130 .get("status")
131 .and_then(Value::as_u64)
132 .and_then(|status| u16::try_from(status).ok())
133 .unwrap_or(0);
134 tracing::warn!(
135 slot,
136 index = %search.physical_index(),
137 status,
138 "msearch slot failed"
139 );
140 return Err(Error::Msearch {
141 slot,
142 status,
143 body: error.to_string(),
144 });
145 }
146 let paths = search.nested_paths();
147 if !paths.is_empty() {
148 merge_inner_hits(&mut entry, &paths);
149 }
150 let page = SearchResponse::from_value(entry)?;
151 if page.is_partial() {
152 tracing::warn!(
153 slot,
154 index = %search.physical_index(),
155 timed_out = page.timed_out,
156 shards_failed = page.shards.failed,
157 "msearch slot returned partial results"
158 );
159 }
160 tracing::debug!(
161 slot,
162 total = page.total,
163 hits = page.hits.len(),
164 "msearch slot decoded"
165 );
166 Ok(page)
167}
168
169#[derive(Deserialize)]
171struct RawMsearchResponse {
172 responses: Vec<Value>,
173}
174
175macro_rules! impl_msearch_bundle {
177 ($len:expr => $( $T:ident . $idx:tt ),+) => {
178 impl<$($T),+> MsearchBundle for ($(&Search<$T>,)+)
179 where
180 $($T: DeserializeOwned,)+
181 {
182 type Output = ($(SearchResponse<$T>,)+);
183
184 const LEN: usize = $len;
185
186 fn ndjson(&self, prefix: &str) -> Result<String> {
187 let mut lines = String::new();
188 $( append_lines(self.$idx, prefix, &mut lines)?; )+
189 Ok(lines)
190 }
191
192 fn decode(&self, responses: Vec<Value>) -> Result<Self::Output> {
193 let mut entries = responses.into_iter();
194 Ok(( $( decode_slot(self.$idx, $idx, entries.next())?, )+ ))
195 }
196 }
197 };
198}
199
200impl_msearch_bundle!(1 => T0.0);
201impl_msearch_bundle!(2 => T0.0, T1.1);
202impl_msearch_bundle!(3 => T0.0, T1.1, T2.2);
203impl_msearch_bundle!(4 => T0.0, T1.1, T2.2, T3.3);
204impl_msearch_bundle!(5 => T0.0, T1.1, T2.2, T3.3, T4.4);
205impl_msearch_bundle!(6 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5);
206impl_msearch_bundle!(7 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6);
207impl_msearch_bundle!(8 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6, T7.7);