Skip to main content

flusso_query/
msearch.rs

1//! [`Client::msearch`] — several typed searches in one `_msearch` round-trip.
2//!
3//! Each search keeps its own index, body, and document type; OpenSearch
4//! answers per slot, in order, and each slot decodes with its own type. The
5//! bundle is a tuple of `&Search<T>` (arity 1–8, types may differ per slot),
6//! so the searches survive the call and stay reusable. For many searches of
7//! *one* type, see [`Client::msearch_all`].
8
9use serde::Deserialize;
10use serde::de::DeserializeOwned;
11use serde_json::{Value, json};
12
13use crate::error::{Error, Result};
14use crate::search::merge_inner_hits;
15use crate::{Client, Search, SearchResponse};
16
17impl Client {
18    /// Run several typed searches in one `_msearch` request, returning one
19    /// typed [`SearchResponse`] per search, in slot order. The bundle is a
20    /// tuple of `&Search<T>` whose document types may differ:
21    ///
22    /// ```no_run
23    /// # use flusso_query::{Client, Search};
24    /// # #[derive(serde::Deserialize)] struct User { email: String }
25    /// # #[derive(serde::Deserialize)] struct Order { status: String }
26    /// # async fn run() -> flusso_query::Result<()> {
27    /// # let client = Client::connect("http://localhost:9200")?;
28    /// # let users_query = Search::<User>::new("users", "xxxxxx");
29    /// # let orders_query = Search::<Order>::new("orders", "yyyyyy");
30    /// let (users, orders) = client.msearch((&users_query, &orders_query)).await?;
31    /// # Ok(())
32    /// # }
33    /// ```
34    ///
35    /// A slot-level failure fails the whole call with [`Error::Msearch`]
36    /// naming the slot — there are no partial results.
37    #[tracing::instrument(
38        name = "search.msearch",
39        skip_all,
40        fields(searches = B::LEN),
41        err,
42    )]
43    pub async fn msearch<B: MsearchBundle>(&self, bundle: B) -> Result<B::Output> {
44        let envelope = self.msearch_raw(bundle.ndjson(&self.index_prefix)?).await?;
45        let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
46        bundle.decode(raw.responses)
47    }
48
49    /// Run many searches of **one** document type in a single `_msearch`
50    /// request, returning one [`SearchResponse`] per search, in order. The
51    /// heterogeneous (mixed-type) form is [`Client::msearch`].
52    #[tracing::instrument(
53        name = "search.msearch_all",
54        skip_all,
55        fields(searches = searches.len()),
56        err,
57    )]
58    pub async fn msearch_all<T>(&self, searches: &[Search<T>]) -> Result<Vec<SearchResponse<T>>>
59    where
60        T: DeserializeOwned,
61    {
62        if searches.is_empty() {
63            return Ok(Vec::new());
64        }
65        let mut lines = String::new();
66        for search in searches {
67            append_lines(search, &self.index_prefix, &mut lines)?;
68        }
69        let envelope = self.msearch_raw(lines).await?;
70        let raw: RawMsearchResponse = serde_json::from_value(envelope)?;
71        let mut entries = raw.responses.into_iter();
72        searches
73            .iter()
74            .enumerate()
75            .map(|(slot, search)| decode_slot(search, slot, entries.next()))
76            .collect()
77    }
78}
79
80/// A bundle of searches runnable in one [`Client::msearch`] request —
81/// implemented for tuples of `&Search<T>` up to arity 8, each slot with its
82/// own document type. You don't implement this; you pass tuples.
83pub trait MsearchBundle {
84    /// One typed [`SearchResponse`] per slot, in order.
85    type Output;
86
87    /// How many searches the bundle holds.
88    const LEN: usize;
89
90    /// Render the bundle as `_msearch` NDJSON: a `{"index": …}` header line
91    /// and a body line per slot. `prefix` is prepended to each slot's index
92    /// (empty for an unprefixed deployment).
93    fn ndjson(&self, prefix: &str) -> Result<String>;
94
95    /// Decode the envelope's `responses` entries, in slot order.
96    fn decode(&self, responses: Vec<Value>) -> Result<Self::Output>;
97}
98
99/// Append one search's two NDJSON lines: the `{"index": …}` header (the
100/// physical index `{prefix}{INDEX}_{SCHEMA_HASH}`, exactly what the sink
101/// writes) and the `_search` body.
102fn append_lines<T>(search: &Search<T>, prefix: &str, ndjson: &mut String) -> Result<()> {
103    let index = format!("{prefix}{}", search.physical_index());
104    let header = serde_json::to_string(&json!({ "index": index }))?;
105    let body = serde_json::to_string(&search.body())?;
106    ndjson.push_str(&header);
107    ndjson.push('\n');
108    ndjson.push_str(&body);
109    ndjson.push('\n');
110    Ok(())
111}
112
113/// Decode one slot: surface its per-slot error if present, merge inner hits
114/// for the slot's own nested projections, then decode the typed page.
115fn decode_slot<T>(
116    search: &Search<T>,
117    slot: usize,
118    entry: Option<Value>,
119) -> Result<SearchResponse<T>>
120where
121    T: DeserializeOwned,
122{
123    let mut entry = entry.ok_or_else(|| Error::Msearch {
124        slot,
125        status: 0,
126        body: "missing response slot".to_owned(),
127    })?;
128    if let Some(error) = entry.get("error") {
129        let status = entry
130            .get("status")
131            .and_then(Value::as_u64)
132            .and_then(|status| u16::try_from(status).ok())
133            .unwrap_or(0);
134        tracing::warn!(
135            slot,
136            index = %search.physical_index(),
137            status,
138            "msearch slot failed"
139        );
140        return Err(Error::Msearch {
141            slot,
142            status,
143            body: error.to_string(),
144        });
145    }
146    let paths = search.nested_paths();
147    if !paths.is_empty() {
148        merge_inner_hits(&mut entry, &paths);
149    }
150    let page = SearchResponse::from_value(entry)?;
151    if page.is_partial() {
152        tracing::warn!(
153            slot,
154            index = %search.physical_index(),
155            timed_out = page.timed_out,
156            shards_failed = page.shards.failed,
157            "msearch slot returned partial results"
158        );
159    }
160    tracing::debug!(
161        slot,
162        total = page.total,
163        hits = page.hits.len(),
164        "msearch slot decoded"
165    );
166    Ok(page)
167}
168
169/// The `_msearch` response envelope.
170#[derive(Deserialize)]
171struct RawMsearchResponse {
172    responses: Vec<Value>,
173}
174
175/// Implement [`MsearchBundle`] for one tuple arity of `&Search<T>`.
176macro_rules! impl_msearch_bundle {
177    ($len:expr => $( $T:ident . $idx:tt ),+) => {
178        impl<$($T),+> MsearchBundle for ($(&Search<$T>,)+)
179        where
180            $($T: DeserializeOwned,)+
181        {
182            type Output = ($(SearchResponse<$T>,)+);
183
184            const LEN: usize = $len;
185
186            fn ndjson(&self, prefix: &str) -> Result<String> {
187                let mut lines = String::new();
188                $( append_lines(self.$idx, prefix, &mut lines)?; )+
189                Ok(lines)
190            }
191
192            fn decode(&self, responses: Vec<Value>) -> Result<Self::Output> {
193                let mut entries = responses.into_iter();
194                Ok(( $( decode_slot(self.$idx, $idx, entries.next())?, )+ ))
195            }
196        }
197    };
198}
199
200impl_msearch_bundle!(1 => T0.0);
201impl_msearch_bundle!(2 => T0.0, T1.1);
202impl_msearch_bundle!(3 => T0.0, T1.1, T2.2);
203impl_msearch_bundle!(4 => T0.0, T1.1, T2.2, T3.3);
204impl_msearch_bundle!(5 => T0.0, T1.1, T2.2, T3.3, T4.4);
205impl_msearch_bundle!(6 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5);
206impl_msearch_bundle!(7 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6);
207impl_msearch_bundle!(8 => T0.0, T1.1, T2.2, T3.3, T4.4, T5.5, T6.6, T7.7);