1use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::{Value, json};
12
13use crate::error::{Error, Result};
14use crate::search::merge_inner_hits;
15use crate::{Client, Search, SearchResponse};
16
17impl Client {
18 #[tracing::instrument(
38 name = "search.msearch",
39 skip_all,
40 fields(searches = B::LEN),
41 err,
42 )]
43 pub async fn msearch<B: MsearchBundle>(&self, bundle: B) -> Result<B::Output> {
44 let envelope = self.msearch_raw(bundle.ndjson(&self.index_prefix)?).await?;
45 let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
46 bundle.decode(raw.responses)
47 }
48
49 #[tracing::instrument(
53 name = "search.msearch_all",
54 skip_all,
55 fields(searches = searches.len()),
56 err,
57 )]
58 pub async fn msearch_all<T>(&self, searches: &[Search<T>]) -> Result<Vec<SearchResponse<T>>>
59 where
60 T: DeserializeOwned,
61 {
62 if searches.is_empty() {
63 return Ok(Vec::new());
64 }
65 let mut lines = String::new();
66 for search in searches {
67 append_lines(search, &self.index_prefix, &mut lines)?;
68 }
69 let envelope = self.msearch_raw(lines).await?;
70 let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
71 let mut entries = raw.responses.into_iter();
72 searches
73 .iter()
74 .enumerate()
75 .map(|(slot, search)| decode_slot(search, slot, entries.next()))
76 .collect()
77 }
78}
79
80pub trait MsearchBundle {
84 type Output;
86
87 const LEN: usize;
89
90 fn ndjson(&self, prefix: &str) -> Result<String>;
94
95 fn decode(&self, responses: Vec<Value>) -> Result<Self::Output>;
97}
98
99fn append_lines<T>(search: &Search<T>, prefix: &str, ndjson: &mut String) -> Result<()> {
103 let index = format!("{prefix}{}", search.physical_index());
104 let header = serde_json::to_string(&json!({ "index": index }))?;
105 let body = serde_json::to_string(&search.body())?;
106 ndjson.push_str(&header);
107 ndjson.push('\n');
108 ndjson.push_str(&body);
109 ndjson.push('\n');
110 Ok(())
111}
112
113fn decode_slot<T>(
116 search: &Search<T>,
117 slot: usize,
118 entry: Option<Value>,
119) -> Result<SearchResponse<T>>
120where
121 T: DeserializeOwned,
122{
123 let mut entry = entry.ok_or_else(|| Error::Msearch {
124 slot,
125 status: 0,
126 body: "missing response slot".to_owned(),
127 })?;
128 if let Some(error) = entry.get("error") {
129 let status = entry
130 .get("status")
131 .and_then(Value::as_u64)
132 .and_then(|status| u16::try_from(status).ok())
133 .unwrap_or(0);
134 return Err(Error::Msearch {
135 slot,
136 status,
137 body: error.to_string(),
138 });
139 }
140 let paths = search.nested_paths();
141 if !paths.is_empty() {
142 merge_inner_hits(&mut entry, &paths);
143 }
144 SearchResponse::from_value(entry)
145}
146
147#[derive(Deserialize)]
149struct RawMsearchResponse {
150 responses: Vec<Value>,
151}
152
153macro_rules! impl_msearch_bundle {
155 ($len:expr => $( $T:ident . $idx:tt ),+) => {
156 impl<$($T),+> MsearchBundle for ($(&Search<$T>,)+)
157 where
158 $($T: DeserializeOwned,)+
159 {
160 type Output = ($(SearchResponse<$T>,)+);
161
162 const LEN: usize = $len;
163
164 fn ndjson(&self, prefix: &str) -> Result<String> {
165 let mut lines = String::new();
166 $( append_lines(self.$idx, prefix, &mut lines)?; )+
167 Ok(lines)
168 }
169
170 fn decode(&self, responses: Vec<Value>) -> Result<Self::Output> {
171 let mut entries = responses.into_iter();
172 Ok(( $( decode_slot(self.$idx, $idx, entries.next())?, )+ ))
173 }
174 }
175 };
176}
177
178impl_msearch_bundle!(1 => T0.0);
179impl_msearch_bundle!(2 => T0.0, T1.1);
180impl_msearch_bundle!(3 => T0.0, T1.1, T2.2);
181impl_msearch_bundle!(4 => T0.0, T1.1, T2.2, T3.3);
182impl_msearch_bundle!(5 => T0.0, T1.1, T2.2, T3.3, T4.4);
183impl_msearch_bundle!(6 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5);
184impl_msearch_bundle!(7 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6);
185impl_msearch_bundle!(8 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6, T7.7);