Skip to main content

flusso_query/
multi.rs

1//! Combined search: one query over several indexes, hits ranked together.
2//!
3//! Where [`Client::msearch`](crate::Client::msearch) runs *independent*
4//! searches in one round-trip (separate result lists), a [`MultiSearch`] runs
5//! **one** query across every index a [`FlussoMultiDocument`] union spans and
6//! returns a single, blended, relevance-ranked result list. Each hit decodes
7//! into the union variant matching its physical `_index` — the sink writes
8//! exactly `{INDEX}_{SCHEMA_HASH}`, so dispatch is precise, no alias involved.
9//!
10//! The union enum is yours: one single-field variant per document type, named
11//! after the search surface it serves. `#[derive(FlussoMultiDocument)]` (the
12//! `derive` feature) writes the impl; without it, a hand-written impl is two
13//! short members — see the trait docs.
14//!
15//! Root-scope queries already compose across document types ([`Query<Root>`]
16//! carries no document type), so any handle mix works in the builder. A field
17//! unmapped in one of the indexes simply doesn't match there — but **sorting**
18//! on it errors on the OpenSearch side unless the sort carries an
19//! `unmapped_type`; prefer sorting on fields all indexes share (or relevance).
20//!
21//! [`Query<Root>`]: crate::Query
22
23use std::marker::PhantomData;
24use std::time::Duration;
25
26use serde::Deserialize;
27use serde_json::{Map, Value};
28
29use crate::Client;
30use crate::error::Result;
31use crate::handles::Sort;
32use crate::query::{AsQuery, BoolBuilder, Root};
33use crate::search::{Hit, RawCount, SearchResponse};
34
35/// A union of [`FlussoDocument`](crate::FlussoDocument) types searched
36/// together — one query, one blended result list, each hit decoded into the
37/// variant matching its index.
38///
39/// `#[derive(FlussoMultiDocument)]` (the `derive` feature) implements it for
40/// an enum with one single-field variant per document type. Without the
41/// derive, the impl is written by hand — exactly what the derive generates:
42///
43/// ```no_run
44/// use flusso_query::{FlussoDocument, FlussoMultiDocument, Error, Result};
45/// use serde_json::Value;
46/// # #[derive(serde::Deserialize)] struct User { email: String }
47/// # impl FlussoDocument for User {
48/// #     const INDEX: &'static str = "users";
49/// #     const SCHEMA_HASH: &'static str = "xxxxxx";
50/// # }
51/// # #[derive(serde::Deserialize)] struct Order { status: String }
52/// # impl FlussoDocument for Order {
53/// #     const INDEX: &'static str = "orders";
54/// #     const SCHEMA_HASH: &'static str = "yyyyyy";
55/// # }
56///
57/// /// One item in the storefront's blended search — name it after the
58/// /// surface it serves, like your document structs.
59/// enum StoreItem {
60///     User(User),
61///     Order(Order),
62/// }
63///
64/// impl FlussoMultiDocument for StoreItem {
65///     const TARGETS: &'static [(&'static str, &'static str)] = &[
66///         (User::INDEX, User::SCHEMA_HASH),
67///         (Order::INDEX, Order::SCHEMA_HASH),
68///     ];
69///
70///     fn decode(physical_index: &str, source: Value) -> Result<Self> {
71///         if physical_index == User::physical_index() {
72///             return Ok(Self::User(serde_json::from_value(source)?));
73///         }
74///         if physical_index == Order::physical_index() {
75///             return Ok(Self::Order(serde_json::from_value(source)?));
76///         }
77///         Err(Error::UnexpectedIndex { index: physical_index.to_owned() })
78///     }
79/// }
80/// ```
81pub trait FlussoMultiDocument: Sized {
82    /// The `(logical index, schema hash)` pair of every document type in the
83    /// union, in variant order — each is that type's
84    /// [`INDEX`](crate::FlussoDocument::INDEX) /
85    /// [`SCHEMA_HASH`](crate::FlussoDocument::SCHEMA_HASH).
86    const TARGETS: &'static [(&'static str, &'static str)];
87
88    /// Decode one hit's `_source` into the right variant, dispatching on the
89    /// hit's physical index name. A hit from an index no variant claims is
90    /// [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
91    fn decode(physical_index: &str, source: Value) -> Result<Self>;
92
93    /// Start a typed query across all of this union's indexes. Like
94    /// [`FlussoDocument::query`](crate::FlussoDocument::query), the returned
95    /// builder is a plain client-free value.
96    fn query() -> MultiSearch<Self> {
97        MultiSearch::new()
98    }
99}
100
101/// A typed query across every index of a [`FlussoMultiDocument`] union — the
102/// blended counterpart of [`Search`](crate::Search), with the same clause
103/// builder and the same client-free shape.
104///
105/// Hits come back in **one** relevance-ranked list; `from`/`size` page that
106/// blended list, not each index. Terminals: [`send`](Self::send) for a typed
107/// page of union values, [`count`](Self::count) for the total matches across
108/// all the indexes.
109#[derive(Debug, Clone)]
110pub struct MultiSearch<U> {
111    /// The comma-joined physical index list the request addresses.
112    path: String,
113    bool_query: BoolBuilder,
114    raw: Option<Value>,
115    sort: Vec<Sort>,
116    from: Option<u64>,
117    size: Option<u64>,
118    _marker: PhantomData<fn() -> U>,
119}
120
121impl<U: FlussoMultiDocument> MultiSearch<U> {
122    /// Start a query across the union's indexes (usually via
123    /// [`FlussoMultiDocument::query`]).
124    #[must_use]
125    pub fn new() -> Self {
126        let path = U::TARGETS
127            .iter()
128            .map(|(index, hash)| format!("{index}_{hash}"))
129            .collect::<Vec<_>>()
130            .join(",");
131        Self {
132            path,
133            bool_query: BoolBuilder::default(),
134            raw: None,
135            sort: Vec::new(),
136            from: None,
137            size: None,
138            _marker: PhantomData,
139        }
140    }
141
142    /// A scoring clause (`bool.must`). Root-scope queries from *any* of the
143    /// union's document types compose here; a field unmapped in one index
144    /// simply doesn't match there. An absent clause adds nothing.
145    #[must_use]
146    pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
147        if let Some(query) = query.into_query() {
148            self.bool_query.push_must(query.into_inner());
149        }
150        self
151    }
152
153    /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
154    /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
155    #[must_use]
156    pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
157        if let Some(query) = query.into_query() {
158            self.bool_query.push_filter(query.into_inner());
159        }
160        self
161    }
162
163    /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
164    #[must_use]
165    pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
166        if let Some(query) = query.into_query() {
167            self.bool_query.push_must_not(query.into_inner());
168        }
169        self
170    }
171
172    /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
173    #[must_use]
174    pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
175        if let Some(query) = query.into_query() {
176            self.bool_query.push_should(query.into_inner());
177        }
178        self
179    }
180
181    /// Append a sort key. It applies to the **blended** list, so the field
182    /// must exist in every index of the union (or carry an `unmapped_type` in
183    /// its options) — OpenSearch rejects a sort on a field one index lacks.
184    /// Relevance (no sort) is always safe.
185    #[must_use]
186    pub fn sort(mut self, sort: Sort) -> Self {
187        self.sort.push(sort);
188        self
189    }
190
191    /// Offset of the first hit to return, in the blended list.
192    #[must_use]
193    pub fn from(mut self, from: u64) -> Self {
194        self.from = Some(from);
195        self
196    }
197
198    /// Maximum number of hits to return, across all the indexes combined.
199    #[must_use]
200    pub fn size(mut self, size: u64) -> Self {
201        self.size = Some(size);
202        self
203    }
204
205    /// Replace the query body with a raw OpenSearch query DSL value. The
206    /// pressure-release valve, as on [`Search`](crate::Search); hits still
207    /// decode into the union.
208    #[must_use]
209    pub fn raw(mut self, query: Value) -> Self {
210        self.raw = Some(query);
211        self
212    }
213
214    /// The comma-joined physical index list this query addresses — one
215    /// `{index}_{hash}` per union variant. For logging and debugging.
216    #[must_use]
217    pub fn physical_path(&self) -> &str {
218        &self.path
219    }
220
221    /// The accumulated query alone: the raw override, the bool clauses, or
222    /// `match_all` when nothing was added.
223    fn query_value(&self) -> Value {
224        match &self.raw {
225            Some(raw) => raw.clone(),
226            None if self.bool_query.is_empty() => crate::handles::match_all_value(),
227            None => self.bool_query.to_value(),
228        }
229    }
230
231    /// The request body this search will POST to `_search`. Pure — useful for
232    /// tests and debugging.
233    #[must_use]
234    pub fn body(&self) -> Value {
235        let mut root = Map::new();
236        root.insert("query".to_string(), self.query_value());
237        if !self.sort.is_empty() {
238            let keys = self.sort.iter().map(Sort::to_value).collect();
239            root.insert("sort".to_string(), Value::Array(keys));
240        }
241        if let Some(from) = self.from {
242            root.insert("from".to_string(), Value::from(from));
243        }
244        if let Some(size) = self.size {
245            root.insert("size".to_string(), Value::from(size));
246        }
247        Value::Object(root)
248    }
249
250    /// The request body [`count`](Self::count) will POST to `_count`: just
251    /// the query (as on [`Search::count_body`](crate::Search::count_body)).
252    #[must_use]
253    pub fn count_body(&self) -> Value {
254        let mut root = Map::new();
255        root.insert("query".to_string(), self.query_value());
256        Value::Object(root)
257    }
258
259    /// Execute the search and decode the blended hits into the union.
260    #[tracing::instrument(
261        name = "search.multi",
262        skip_all,
263        fields(
264            path = %self.path,
265            from = ?self.from,
266            size = ?self.size,
267            total = tracing::field::Empty,
268            took_ms = tracing::field::Empty,
269        ),
270        err,
271    )]
272    pub async fn send(&self, client: &Client) -> Result<SearchResponse<U>> {
273        let body = self.body();
274        let response = client.search_at(&self.path, &body).await?;
275        let page = decode_response::<U>(response)?;
276        let span = tracing::Span::current();
277        span.record("total", page.total);
278        span.record("took_ms", page.took.as_millis() as u64);
279        tracing::debug!(
280            total = page.total,
281            hits = page.hits.len(),
282            "combined search completed"
283        );
284        Ok(page)
285    }
286
287    /// Count the matches across all the union's indexes, without fetching
288    /// any hits.
289    #[tracing::instrument(
290        name = "search.multi_count",
291        skip_all,
292        fields(path = %self.path, count = tracing::field::Empty),
293        err,
294    )]
295    pub async fn count(&self, client: &Client) -> Result<u64> {
296        let body = self.count_body();
297        let response = client.count_at(&self.path, &body).await?;
298        let raw: RawCount = serde_json::from_value(response)?;
299        tracing::Span::current().record("count", raw.count);
300        tracing::debug!(count = raw.count, "combined count completed");
301        Ok(raw.count)
302    }
303}
304
305impl<U: FlussoMultiDocument> Default for MultiSearch<U> {
306    fn default() -> Self {
307        Self::new()
308    }
309}
310
311/// Decode a combined `_search` response: the usual envelope, but each hit's
312/// `_source` is dispatched by the hit's `_index` into the union.
313pub(crate) fn decode_response<U: FlussoMultiDocument>(value: Value) -> Result<SearchResponse<U>> {
314    let raw: RawMultiResponse = serde_json::from_value(value)?;
315    let hits = raw
316        .hits
317        .hits
318        .into_iter()
319        .map(|hit| {
320            Ok(Hit {
321                id: hit.id,
322                score: hit.score.unwrap_or(0.0),
323                source: U::decode(&hit.index, hit.source)?,
324            })
325        })
326        .collect::<Result<Vec<_>>>()?;
327    Ok(SearchResponse {
328        total: raw.hits.total.value,
329        max_score: raw.hits.max_score,
330        hits,
331        took: Duration::from_millis(raw.took),
332    })
333}
334
335// ---- wire types ------------------------------------------------------------
336//
337// Mirrors `search.rs`'s response shapes, but keeps each hit's `_index` (the
338// dispatch key) and defers `_source` to `Value` (decoded per variant).
339
340#[derive(Deserialize)]
341struct RawMultiResponse {
342    #[serde(default)]
343    took: u64,
344    hits: RawMultiHits,
345}
346
347#[derive(Deserialize)]
348struct RawMultiHits {
349    total: RawMultiTotal,
350    #[serde(default)]
351    max_score: Option<f32>,
352    hits: Vec<RawMultiHit>,
353}
354
355#[derive(Deserialize)]
356struct RawMultiTotal {
357    value: u64,
358}
359
360#[derive(Deserialize)]
361struct RawMultiHit {
362    #[serde(rename = "_index")]
363    index: String,
364    #[serde(rename = "_id")]
365    id: String,
366    #[serde(rename = "_score", default)]
367    score: Option<f32>,
368    #[serde(rename = "_source")]
369    source: Value,
370}