Skip to main content

flusso_query/
msearch.rs

1//! [`Client::msearch`] — several typed searches in one `_msearch` round-trip.
2//!
3//! Each search keeps its own index, body, and document type; OpenSearch
4//! answers per slot, in order, and each slot decodes with its own type. The
5//! bundle is a tuple of `&Search<T>` (arity 1–8, types may differ per slot),
6//! so the searches survive the call and stay reusable. For many searches of
7//! *one* type, see [`Client::msearch_all`].
8
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::{Value, json};
12
13use crate::error::{Error, Result};
14use crate::search::merge_inner_hits;
15use crate::{Client, Search, SearchResponse};
16
17impl Client {
18    /// Run several typed searches in one `_msearch` request, returning one
19    /// typed [`SearchResponse`] per search, in slot order. The bundle is a
20    /// tuple of `&Search<T>` whose document types may differ:
21    ///
22    /// ```no_run
23    /// # use flusso_query::{Client, Search};
24    /// # #[derive(serde::Deserialize)] struct User { email: String }
25    /// # #[derive(serde::Deserialize)] struct Order { status: String }
26    /// # async fn run() -> flusso_query::Result<()> {
27    /// # let client = Client::connect("http://localhost:9200")?;
28    /// # let users_query = Search::<User>::new("users", "xxxxxx");
29    /// # let orders_query = Search::<Order>::new("orders", "yyyyyy");
30    /// let (users, orders) = client.msearch((&users_query, &orders_query)).await?;
31    /// # Ok(())
32    /// # }
33    /// ```
34    ///
35    /// A slot-level failure fails the whole call with [`Error::Msearch`]
36    /// naming the slot — there are no partial results.
37    #[tracing::instrument(
38        name = "search.msearch",
39        skip_all,
40        fields(searches = B::LEN),
41        err,
42    )]
43    pub async fn msearch<B: MsearchBundle>(&self, bundle: B) -> Result<B::Output> {
44        let envelope = self.msearch_raw(bundle.ndjson(&self.index_prefix)?).await?;
45        let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
46        bundle.decode(raw.responses)
47    }
48
49    /// Run many searches of **one** document type in a single `_msearch`
50    /// request, returning one [`SearchResponse`] per search, in order. The
51    /// heterogeneous (mixed-type) form is [`Client::msearch`].
52    #[tracing::instrument(
53        name = "search.msearch_all",
54        skip_all,
55        fields(searches = searches.len()),
56        err,
57    )]
58    pub async fn msearch_all<T>(&self, searches: &[Search<T>]) -> Result<Vec<SearchResponse<T>>>
59    where
60        T: DeserializeOwned,
61    {
62        if searches.is_empty() {
63            return Ok(Vec::new());
64        }
65        let mut lines = String::new();
66        for search in searches {
67            append_lines(search, &self.index_prefix, &mut lines)?;
68        }
69        let envelope = self.msearch_raw(lines).await?;
70        let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
71        let mut entries = raw.responses.into_iter();
72        searches
73            .iter()
74            .enumerate()
75            .map(|(slot, search)| decode_slot(search, slot, entries.next()))
76            .collect()
77    }
78}
79
80/// A bundle of searches runnable in one [`Client::msearch`] request —
81/// implemented for tuples of `&Search<T>` up to arity 8, each slot with its
82/// own document type. You don't implement this; you pass tuples.
83pub trait MsearchBundle {
84    /// One typed [`SearchResponse`] per slot, in order.
85    type Output;
86
87    /// How many searches the bundle holds.
88    const LEN: usize;
89
90    /// Render the bundle as `_msearch` NDJSON: a `{"index": …}` header line
91    /// and a body line per slot. `prefix` is prepended to each slot's index
92    /// (empty for an unprefixed deployment).
93    fn ndjson(&self, prefix: &str) -> Result<String>;
94
95    /// Decode the envelope's `responses` entries, in slot order.
96    fn decode(&self, responses: Vec<Value>) -> Result<Self::Output>;
97}
98
99/// Append one search's two NDJSON lines: the `{"index": …}` header (the
100/// physical index `{prefix}{INDEX}_{SCHEMA_HASH}`, exactly what the sink
101/// writes) and the `_search` body.
102fn append_lines<T>(search: &Search<T>, prefix: &str, ndjson: &mut String) -> Result<()> {
103    let index = format!("{prefix}{}", search.physical_index());
104    let header = serde_json::to_string(&json!({ "index": index }))?;
105    let body = serde_json::to_string(&search.body())?;
106    ndjson.push_str(&header);
107    ndjson.push('\n');
108    ndjson.push_str(&body);
109    ndjson.push('\n');
110    Ok(())
111}
112
113/// Decode one slot: surface its per-slot error if present, merge inner hits
114/// for the slot's own nested projections, then decode the typed page.
115fn decode_slot<T>(
116    search: &Search<T>,
117    slot: usize,
118    entry: Option<Value>,
119) -> Result<SearchResponse<T>>
120where
121    T: DeserializeOwned,
122{
123    let mut entry = entry.ok_or_else(|| Error::Msearch {
124        slot,
125        status: 0,
126        body: "missing response slot".to_owned(),
127    })?;
128    if let Some(error) = entry.get("error") {
129        let status = entry
130            .get("status")
131            .and_then(Value::as_u64)
132            .and_then(|status| u16::try_from(status).ok())
133            .unwrap_or(0);
134        return Err(Error::Msearch {
135            slot,
136            status,
137            body: error.to_string(),
138        });
139    }
140    let paths = search.nested_paths();
141    if !paths.is_empty() {
142        merge_inner_hits(&mut entry, &paths);
143    }
144    SearchResponse::from_value(entry)
145}
146
147/// The `_msearch` response envelope.
148#[derive(Deserialize)]
149struct RawMsearchResponse {
150    responses: Vec<Value>,
151}
152
153/// Implement [`MsearchBundle`] for one tuple arity of `&Search<T>`.
154macro_rules! impl_msearch_bundle {
155    ($len:expr => $( $T:ident . $idx:tt ),+) => {
156        impl<$($T),+> MsearchBundle for ($(&Search<$T>,)+)
157        where
158            $($T: DeserializeOwned,)+
159        {
160            type Output = ($(SearchResponse<$T>,)+);
161
162            const LEN: usize = $len;
163
164            fn ndjson(&self, prefix: &str) -> Result<String> {
165                let mut lines = String::new();
166                $( append_lines(self.$idx, prefix, &mut lines)?; )+
167                Ok(lines)
168            }
169
170            fn decode(&self, responses: Vec<Value>) -> Result<Self::Output> {
171                let mut entries = responses.into_iter();
172                Ok(( $( decode_slot(self.$idx, $idx, entries.next())?, )+ ))
173            }
174        }
175    };
176}
177
178impl_msearch_bundle!(1 => T0.0);
179impl_msearch_bundle!(2 => T0.0, T1.1);
180impl_msearch_bundle!(3 => T0.0, T1.1, T2.2);
181impl_msearch_bundle!(4 => T0.0, T1.1, T2.2, T3.3);
182impl_msearch_bundle!(5 => T0.0, T1.1, T2.2, T3.3, T4.4);
183impl_msearch_bundle!(6 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5);
184impl_msearch_bundle!(7 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6);
185impl_msearch_bundle!(8 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6, T7.7);