Skip to main content

flusso_query/
msearch.rs

1//! [`Client::msearch`] — several typed searches in one `_msearch` round-trip.
2//!
3//! Each search keeps its own index, body, and document type; OpenSearch
4//! answers per slot, in order, and each slot decodes with its own type. The
5//! bundle is a tuple of `&Search<T>` (arity 1–8, types may differ per slot),
6//! so the searches survive the call and stay reusable. For many searches of
7//! *one* type, see [`Client::msearch_all`].
8
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::{Value, json};
12
13use crate::error::{Error, Result};
14use crate::search::merge_inner_hits;
15use crate::{Client, Search, SearchResponse};
16
17impl Client {
18    /// Run several typed searches in one `_msearch` request, returning one
19    /// typed [`SearchResponse`] per search, in slot order. The bundle is a
20    /// tuple of `&Search<T>` whose document types may differ:
21    ///
22    /// ```no_run
23    /// # use flusso_query::{Client, Search};
24    /// # #[derive(serde::Deserialize)] struct User { email: String }
25    /// # #[derive(serde::Deserialize)] struct Order { status: String }
26    /// # async fn run() -> flusso_query::Result<()> {
27    /// # let client = Client::connect("http://localhost:9200")?;
28    /// # let users_query = Search::<User>::new("users", "xxxxxx");
29    /// # let orders_query = Search::<Order>::new("orders", "yyyyyy");
30    /// let (users, orders) = client.msearch((&users_query, &orders_query)).await?;
31    /// # Ok(())
32    /// # }
33    /// ```
34    ///
35    /// A slot-level failure fails the whole call with [`Error::Msearch`]
36    /// naming the slot — there are no partial results.
37    #[tracing::instrument(
38        name = "search.msearch",
39        skip_all,
40        fields(searches = B::LEN),
41        err,
42    )]
43    pub async fn msearch<B: MsearchBundle>(&self, bundle: B) -> Result<B::Output> {
44        let envelope = self.msearch_raw(bundle.ndjson()?).await?;
45        let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
46        bundle.decode(raw.responses)
47    }
48
49    /// Run many searches of **one** document type in a single `_msearch`
50    /// request, returning one [`SearchResponse`] per search, in order. The
51    /// heterogeneous (mixed-type) form is [`Client::msearch`].
52    #[tracing::instrument(
53        name = "search.msearch_all",
54        skip_all,
55        fields(searches = searches.len()),
56        err,
57    )]
58    pub async fn msearch_all<T>(&self, searches: &[Search<T>]) -> Result<Vec<SearchResponse<T>>>
59    where
60        T: DeserializeOwned,
61    {
62        if searches.is_empty() {
63            return Ok(Vec::new());
64        }
65        let mut lines = String::new();
66        for search in searches {
67            append_lines(search, &mut lines)?;
68        }
69        let envelope = self.msearch_raw(lines).await?;
70        let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
71        let mut entries = raw.responses.into_iter();
72        searches
73            .iter()
74            .enumerate()
75            .map(|(slot, search)| decode_slot(search, slot, entries.next()))
76            .collect()
77    }
78}
79
80/// A bundle of searches runnable in one [`Client::msearch`] request —
81/// implemented for tuples of `&Search<T>` up to arity 8, each slot with its
82/// own document type. You don't implement this; you pass tuples.
83pub trait MsearchBundle {
84    /// One typed [`SearchResponse`] per slot, in order.
85    type Output;
86
87    /// How many searches the bundle holds.
88    const LEN: usize;
89
90    /// Render the bundle as `_msearch` NDJSON: a `{"index": …}` header line
91    /// and a body line per slot.
92    fn ndjson(&self) -> Result<String>;
93
94    /// Decode the envelope's `responses` entries, in slot order.
95    fn decode(&self, responses: Vec<Value>) -> Result<Self::Output>;
96}
97
98/// Append one search's two NDJSON lines: the `{"index": …}` header (the
99/// physical index, exactly what the sink writes) and the `_search` body.
100fn append_lines<T>(search: &Search<T>, ndjson: &mut String) -> Result<()> {
101    let header = serde_json::to_string(&json!({ "index": search.physical_index() }))?;
102    let body = serde_json::to_string(&search.body())?;
103    ndjson.push_str(&header);
104    ndjson.push('\n');
105    ndjson.push_str(&body);
106    ndjson.push('\n');
107    Ok(())
108}
109
110/// Decode one slot: surface its per-slot error if present, merge inner hits
111/// for the slot's own nested projections, then decode the typed page.
112fn decode_slot<T>(
113    search: &Search<T>,
114    slot: usize,
115    entry: Option<Value>,
116) -> Result<SearchResponse<T>>
117where
118    T: DeserializeOwned,
119{
120    let mut entry = entry.ok_or_else(|| Error::Msearch {
121        slot,
122        status: 0,
123        body: "missing response slot".to_owned(),
124    })?;
125    if let Some(error) = entry.get("error") {
126        let status = entry
127            .get("status")
128            .and_then(Value::as_u64)
129            .and_then(|status| u16::try_from(status).ok())
130            .unwrap_or(0);
131        return Err(Error::Msearch {
132            slot,
133            status,
134            body: error.to_string(),
135        });
136    }
137    let paths = search.nested_paths();
138    if !paths.is_empty() {
139        merge_inner_hits(&mut entry, &paths);
140    }
141    SearchResponse::from_value(entry)
142}
143
144/// The `_msearch` response envelope.
145#[derive(Deserialize)]
146struct RawMsearchResponse {
147    responses: Vec<Value>,
148}
149
150/// Implement [`MsearchBundle`] for one tuple arity of `&Search<T>`.
151macro_rules! impl_msearch_bundle {
152    ($len:expr => $( $T:ident . $idx:tt ),+) => {
153        impl<$($T),+> MsearchBundle for ($(&Search<$T>,)+)
154        where
155            $($T: DeserializeOwned,)+
156        {
157            type Output = ($(SearchResponse<$T>,)+);
158
159            const LEN: usize = $len;
160
161            fn ndjson(&self) -> Result<String> {
162                let mut lines = String::new();
163                $( append_lines(self.$idx, &mut lines)?; )+
164                Ok(lines)
165            }
166
167            fn decode(&self, responses: Vec<Value>) -> Result<Self::Output> {
168                let mut entries = responses.into_iter();
169                Ok(( $( decode_slot(self.$idx, $idx, entries.next())?, )+ ))
170            }
171        }
172    };
173}
174
175impl_msearch_bundle!(1 => T0.0);
176impl_msearch_bundle!(2 => T0.0, T1.1);
177impl_msearch_bundle!(3 => T0.0, T1.1, T2.2);
178impl_msearch_bundle!(4 => T0.0, T1.1, T2.2, T3.3);
179impl_msearch_bundle!(5 => T0.0, T1.1, T2.2, T3.3, T4.4);
180impl_msearch_bundle!(6 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5);
181impl_msearch_bundle!(7 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6);
182impl_msearch_bundle!(8 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6, T7.7);