Skip to main content

flusso_query/
multi.rs

1//! Combined search: one query over several indexes, hits ranked together.
2//!
3//! Where [`Client::msearch`](crate::Client::msearch) runs *independent*
4//! searches in one round-trip (separate result lists), a [`MultiSearch`] runs
5//! **one** query across every index a [`FlussoMultiDocument`] union spans and
6//! returns a single, blended, relevance-ranked result list. Each hit decodes
7//! into the union variant matching its physical `_index`.
8//!
9//! A subtlety: the sink addresses each index by the stable alias
10//! `{INDEX}_{SCHEMA_HASH}`, but the alias points at a generation-suffixed
11//! concrete index (`{INDEX}_{SCHEMA_HASH}_{n}`), and OpenSearch always reports
12//! that *concrete* name in a hit's `_index` — never the alias the query went
13//! through. So before dispatching, the suffix is normalized back to the
14//! `{INDEX}_{SCHEMA_HASH}` a variant claims (see [`decode_response`]).
15//!
16//! The union enum is yours: one single-field variant per document type, named
17//! after the search surface it serves. `#[derive(FlussoMultiDocument)]` (the
18//! `derive` feature) writes the impl; without it, a hand-written impl is two
19//! short members — see the trait docs.
20//!
21//! Root-scope queries already compose across document types ([`Query<Root>`]
22//! carries no document type), so any handle mix works in the builder. A field
23//! unmapped in one of the indexes simply doesn't match there — but **sorting**
24//! on it errors on the OpenSearch side unless the sort carries an
25//! `unmapped_type`; prefer sorting on fields all indexes share (or relevance).
26//!
27//! [`Query<Root>`]: crate::Query
28
29use std::borrow::Cow;
30use std::marker::PhantomData;
31use std::time::Duration;
32
33use serde::Deserialize;
34use serde_json::{Map, Value};
35
36use crate::Client;
37use crate::error::Result;
38use crate::handles::Sort;
39use crate::query::{AsQuery, BoolBuilder, Root};
40use crate::search::{Hit, RawCount, SearchResponse};
41
42/// A union of [`FlussoDocument`](crate::FlussoDocument) types searched
43/// together — one query, one blended result list, each hit decoded into the
44/// variant matching its index.
45///
46/// `#[derive(FlussoMultiDocument)]` (the `derive` feature) implements it for
47/// an enum with one single-field variant per document type. Without the
48/// derive, the impl is written by hand — exactly what the derive generates:
49///
50/// ```no_run
51/// use flusso_query::{FlussoDocument, FlussoIndex, FlussoMultiDocument, Error, Result, Segment};
52/// use serde_json::Value;
53/// # #[derive(serde::Deserialize)] struct User { email: String }
54/// # impl FlussoDocument for User { const PATH: &'static [Segment] = &[]; }
55/// # impl FlussoIndex for User {
56/// #     const INDEX: &'static str = "users";
57/// #     const SCHEMA_HASH: &'static str = "xxxxxx";
58/// # }
59/// # #[derive(serde::Deserialize)] struct Order { status: String }
60/// # impl FlussoDocument for Order { const PATH: &'static [Segment] = &[]; }
61/// # impl FlussoIndex for Order {
62/// #     const INDEX: &'static str = "orders";
63/// #     const SCHEMA_HASH: &'static str = "yyyyyy";
64/// # }
65///
66/// /// One item in the storefront's blended search — name it after the
67/// /// surface it serves, like your document structs.
68/// enum StoreItem {
69///     User(User),
70///     Order(Order),
71/// }
72///
73/// impl FlussoMultiDocument for StoreItem {
74///     const TARGETS: &'static [(&'static str, &'static str)] = &[
75///         (User::INDEX, User::SCHEMA_HASH),
76///         (Order::INDEX, Order::SCHEMA_HASH),
77///     ];
78///
79///     fn decode(physical_index: &str, source: Value) -> Result<Self> {
80///         if physical_index == User::physical_index() {
81///             return Ok(Self::User(serde_json::from_value(source)?));
82///         }
83///         if physical_index == Order::physical_index() {
84///             return Ok(Self::Order(serde_json::from_value(source)?));
85///         }
86///         Err(Error::UnexpectedIndex { index: physical_index.to_owned() })
87///     }
88/// }
89/// ```
90pub trait FlussoMultiDocument: Sized {
91    /// The `(logical index, schema hash)` pair of every document type in the
92    /// union, in variant order — each is that type's
93    /// [`INDEX`](crate::FlussoIndex::INDEX) /
94    /// [`SCHEMA_HASH`](crate::FlussoIndex::SCHEMA_HASH).
95    const TARGETS: &'static [(&'static str, &'static str)];
96
97    /// Decode one hit's `_source` into the right variant, dispatching on the
98    /// hit's physical index name. A hit from an index no variant claims is
99    /// [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
100    fn decode(physical_index: &str, source: Value) -> Result<Self>;
101
102    /// Start a typed query across all of this union's indexes. Like
103    /// [`FlussoIndex::query`](crate::FlussoIndex::query), the returned
104    /// builder is a plain client-free value.
105    fn query() -> MultiSearch<Self> {
106        MultiSearch::new()
107    }
108}
109
110/// A typed query across every index of a [`FlussoMultiDocument`] union — the
111/// blended counterpart of [`Search`](crate::Search), with the same clause
112/// builder and the same client-free shape.
113///
114/// Hits come back in **one** relevance-ranked list; `from`/`size` page that
115/// blended list, not each index. Terminals: [`send`](Self::send) for a typed
116/// page of union values, [`count`](Self::count) for the total matches across
117/// all the indexes.
118#[derive(Debug, Clone)]
119pub struct MultiSearch<U> {
120    /// The comma-joined physical index list the request addresses.
121    path: String,
122    bool_query: BoolBuilder,
123    raw: Option<Value>,
124    sort: Vec<Sort>,
125    from: Option<u64>,
126    size: Option<u64>,
127    _marker: PhantomData<fn() -> U>,
128}
129
130impl<U: FlussoMultiDocument> MultiSearch<U> {
131    /// Start a query across the union's indexes (usually via
132    /// [`FlussoMultiDocument::query`]).
133    #[must_use]
134    pub fn new() -> Self {
135        let path = U::TARGETS
136            .iter()
137            .map(|(index, hash)| format!("{index}_{hash}"))
138            .collect::<Vec<_>>()
139            .join(",");
140        Self {
141            path,
142            bool_query: BoolBuilder::default(),
143            raw: None,
144            sort: Vec::new(),
145            from: None,
146            size: None,
147            _marker: PhantomData,
148        }
149    }
150
151    /// A scoring clause (`bool.must`). Root-scope queries from *any* of the
152    /// union's document types compose here; a field unmapped in one index
153    /// simply doesn't match there. An absent clause adds nothing.
154    #[must_use]
155    pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
156        if let Some(query) = query.into_query() {
157            self.bool_query.push_must(query.into_inner());
158        }
159        self
160    }
161
162    /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
163    /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
164    #[must_use]
165    pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
166        if let Some(query) = query.into_query() {
167            self.bool_query.push_filter(query.into_inner());
168        }
169        self
170    }
171
172    /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
173    #[must_use]
174    pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
175        if let Some(query) = query.into_query() {
176            self.bool_query.push_must_not(query.into_inner());
177        }
178        self
179    }
180
181    /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
182    #[must_use]
183    pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
184        if let Some(query) = query.into_query() {
185            self.bool_query.push_should(query.into_inner());
186        }
187        self
188    }
189
190    /// Append a sort key. It applies to the **blended** list, so the field
191    /// must exist in every index of the union (or carry an `unmapped_type` in
192    /// its options) — OpenSearch rejects a sort on a field one index lacks.
193    /// Relevance (no sort) is always safe.
194    #[must_use]
195    pub fn sort(mut self, sort: Sort) -> Self {
196        self.sort.push(sort);
197        self
198    }
199
200    /// Append several sort keys at once — e.g. from a
201    /// [`SortBuilder`](crate::SortBuilder). Equivalent to repeated [`sort`](Self::sort).
202    #[must_use]
203    pub fn sorts(mut self, sorts: impl IntoIterator<Item = Sort>) -> Self {
204        self.sort.extend(sorts);
205        self
206    }
207
208    /// Offset of the first hit to return, in the blended list.
209    #[must_use]
210    pub fn from(mut self, from: u64) -> Self {
211        self.from = Some(from);
212        self
213    }
214
215    /// Maximum number of hits to return, across all the indexes combined.
216    #[must_use]
217    pub fn size(mut self, size: u64) -> Self {
218        self.size = Some(size);
219        self
220    }
221
222    /// Replace the query body with a raw OpenSearch query DSL value. The
223    /// pressure-release valve, as on [`Search`](crate::Search); hits still
224    /// decode into the union.
225    #[must_use]
226    pub fn raw(mut self, query: Value) -> Self {
227        self.raw = Some(query);
228        self
229    }
230
231    /// The comma-joined physical index list this query addresses — one
232    /// `{index}_{hash}` per union variant. For logging and debugging.
233    #[must_use]
234    pub fn physical_path(&self) -> &str {
235        &self.path
236    }
237
238    /// The accumulated query alone: the raw override, the bool clauses, or
239    /// `match_all` when nothing was added.
240    fn query_value(&self) -> Value {
241        match &self.raw {
242            Some(raw) => raw.clone(),
243            None if self.bool_query.is_empty() => crate::handles::match_all_value(),
244            None => self.bool_query.to_value(),
245        }
246    }
247
248    /// The request body this search will POST to `_search`. Pure — useful for
249    /// tests and debugging.
250    #[must_use]
251    pub fn body(&self) -> Value {
252        let mut root = Map::new();
253        root.insert("query".to_string(), self.query_value());
254        if !self.sort.is_empty() {
255            let keys = self.sort.iter().map(Sort::to_value).collect();
256            root.insert("sort".to_string(), Value::Array(keys));
257        }
258        if let Some(from) = self.from {
259            root.insert("from".to_string(), Value::from(from));
260        }
261        if let Some(size) = self.size {
262            root.insert("size".to_string(), Value::from(size));
263        }
264        Value::Object(root)
265    }
266
267    /// The request body [`count`](Self::count) will POST to `_count`: just
268    /// the query (as on [`Search::count_body`](crate::Search::count_body)).
269    #[must_use]
270    pub fn count_body(&self) -> Value {
271        let mut root = Map::new();
272        root.insert("query".to_string(), self.query_value());
273        Value::Object(root)
274    }
275
276    /// Execute the search and decode the blended hits into the union.
277    #[tracing::instrument(
278        name = "search.multi",
279        skip_all,
280        fields(
281            path = %self.path,
282            from = ?self.from,
283            size = ?self.size,
284            total = tracing::field::Empty,
285            took_ms = tracing::field::Empty,
286        ),
287        err,
288    )]
289    pub async fn send(&self, client: &Client) -> Result<SearchResponse<U>> {
290        let body = self.body();
291        let response = client.search_at(&self.path, &body).await?;
292        let page = decode_response::<U>(response, &client.index_prefix)?;
293        let span = tracing::Span::current();
294        span.record("total", page.total);
295        span.record("took_ms", page.took.as_millis() as u64);
296        if page.is_partial() {
297            tracing::warn!(
298                path = %self.path,
299                timed_out = page.timed_out,
300                shards_failed = page.shards.failed,
301                shards_total = page.shards.total,
302                "combined search returned partial results"
303            );
304        }
305        tracing::debug!(
306            total = page.total,
307            hits = page.hits.len(),
308            "combined search completed"
309        );
310        Ok(page)
311    }
312
313    /// Count the matches across all the union's indexes, without fetching
314    /// any hits.
315    #[tracing::instrument(
316        name = "search.multi_count",
317        skip_all,
318        fields(path = %self.path, count = tracing::field::Empty),
319        err,
320    )]
321    pub async fn count(&self, client: &Client) -> Result<u64> {
322        let body = self.count_body();
323        let response = client.count_at(&self.path, &body).await?;
324        let raw: RawCount = serde_json::from_value(response)?;
325        tracing::Span::current().record("count", raw.count);
326        tracing::debug!(count = raw.count, "combined count completed");
327        Ok(raw.count)
328    }
329}
330
331impl<U: FlussoMultiDocument> Default for MultiSearch<U> {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337/// Decode a combined `_search` response: the usual envelope, but each hit's
338/// `_source` is dispatched by the hit's `_index` into the union.
339///
340/// Each `_index` is normalized to a variant's `physical_index()` in two steps
341/// before dispatch:
342///
343/// 1. **Prefix.** `prefix` (the client's index prefix) is stripped — empty for
344///    an unprefixed deployment.
345/// 2. **Generation suffix.** A hit's `_index` is the *concrete* index behind the
346///    hash alias (`{INDEX}_{SCHEMA_HASH}_{n}`), so the trailing `_{n}` is
347///    normalized away. This is anchored on the union's known targets rather than
348///    blindly trimming a trailing `_{digits}`: a name maps to a target when it
349///    equals that target's `{INDEX}_{SCHEMA_HASH}` or is `{that}_{digits}`. The
350///    anchor matters because the eight-hex `SCHEMA_HASH` can itself be all digits
351///    — so a bare `{INDEX}_{numeric-hash}` (a legacy un-suffixed index) must not
352///    be mistaken for a generation of `{INDEX}`.
353///
354/// A name matching no target is passed through unchanged, so the variant's
355/// `decode` reports it as [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
356pub(crate) fn decode_response<U: FlussoMultiDocument>(
357    value: Value,
358    prefix: &str,
359) -> Result<SearchResponse<U>> {
360    let raw: RawMultiResponse = serde_json::from_value(value)?;
361    let hits = raw
362        .hits
363        .hits
364        .into_iter()
365        .map(|hit| {
366            let stripped = hit.index.strip_prefix(prefix).unwrap_or(&hit.index);
367            let index = dispatch_index::<U>(stripped);
368            Ok(Hit {
369                id: hit.id,
370                score: hit.score.unwrap_or(0.0),
371                source: U::decode(index.as_ref(), hit.source)?,
372            })
373        })
374        .collect::<Result<Vec<_>>>()?;
375    Ok(SearchResponse {
376        total: raw.hits.total.value,
377        max_score: raw.hits.max_score,
378        hits,
379        took: Duration::from_millis(raw.took),
380        timed_out: raw.timed_out,
381        shards: raw.shards.into(),
382    })
383}
384
385/// Normalize a (prefix-stripped) hit `_index` to the `physical_index()` of the
386/// union target it belongs to, collapsing the generation suffix the sink's hash
387/// alias hides. Returns the canonical `{INDEX}_{SCHEMA_HASH}` when the name is
388/// that target or a `{target}_{digits}` generation of it; otherwise the name
389/// unchanged (so dispatch misses and surfaces `UnexpectedIndex`).
390fn dispatch_index<U: FlussoMultiDocument>(stripped: &str) -> Cow<'_, str> {
391    for (index, hash) in U::TARGETS {
392        let physical = format!("{index}_{hash}");
393        if stripped == physical || is_generation_of(stripped, &physical) {
394            return Cow::Owned(physical);
395        }
396    }
397    Cow::Borrowed(stripped)
398}
399
400/// Whether `name` is `{physical}_{digits}` — a concrete generation of the hash
401/// alias `physical`. The remainder after `physical_` must be a non-empty run of
402/// ASCII digits (the sink's generation counter).
403fn is_generation_of(name: &str, physical: &str) -> bool {
404    name.strip_prefix(physical)
405        .and_then(|rest| rest.strip_prefix('_'))
406        .is_some_and(|generation| {
407            !generation.is_empty() && generation.bytes().all(|byte| byte.is_ascii_digit())
408        })
409}
410
411#[derive(Deserialize)]
412struct RawMultiResponse {
413    #[serde(default)]
414    took: u64,
415    #[serde(default)]
416    timed_out: bool,
417    #[serde(rename = "_shards", default)]
418    shards: crate::search::RawShards,
419    hits: RawMultiHits,
420}
421
422#[derive(Deserialize)]
423struct RawMultiHits {
424    total: RawMultiTotal,
425    #[serde(default)]
426    max_score: Option<f32>,
427    hits: Vec<RawMultiHit>,
428}
429
430#[derive(Deserialize)]
431struct RawMultiTotal {
432    value: u64,
433}
434
435#[derive(Deserialize)]
436struct RawMultiHit {
437    #[serde(rename = "_index")]
438    index: String,
439    #[serde(rename = "_id")]
440    id: String,
441    #[serde(rename = "_score", default)]
442    score: Option<f32>,
443    #[serde(rename = "_source")]
444    source: Value,
445}