flusso_query/multi.rs
1//! Combined search: one query over several indexes, hits ranked together.
2//!
3//! Where [`Client::msearch`](crate::Client::msearch) runs *independent*
4//! searches in one round-trip (separate result lists), a [`MultiSearch`] runs
5//! **one** query across every index a [`FlussoMultiDocument`] union spans and
6//! returns a single, blended, relevance-ranked result list. Each hit decodes
7//! into the union variant matching its physical `_index`.
8//!
9//! A subtlety: the sink addresses each index by the stable alias
10//! `{INDEX}_{SCHEMA_HASH}`, but the alias points at a generation-suffixed
11//! concrete index (`{INDEX}_{SCHEMA_HASH}_{n}`), and OpenSearch always reports
12//! that *concrete* name in a hit's `_index` — never the alias the query went
13//! through. So before dispatching, the suffix is normalized back to the
14//! `{INDEX}_{SCHEMA_HASH}` a variant claims (see [`decode_response`]).
15//!
16//! The union enum is yours: one single-field variant per document type, named
17//! after the search surface it serves. `#[derive(FlussoMultiDocument)]` (the
18//! `derive` feature) writes the impl; without it, a hand-written impl is two
19//! short members — see the trait docs.
20//!
21//! Root-scope queries already compose across document types ([`Query<Root>`]
22//! carries no document type), so any handle mix works in the builder. A field
23//! unmapped in one of the indexes simply doesn't match there — but **sorting**
24//! on it errors on the OpenSearch side unless the sort carries an
25//! `unmapped_type`; prefer sorting on fields all indexes share (or relevance).
26//!
27//! [`Query<Root>`]: crate::Query
28
29use std::borrow::Cow;
30use std::marker::PhantomData;
31use std::time::Duration;
32
33use serde::Deserialize;
34use serde_json::{Map, Value};
35
36use crate::Client;
37use crate::error::Result;
38use crate::handles::Sort;
39use crate::query::{AsQuery, BoolBuilder, Root};
40use crate::search::{Hit, RawCount, SearchResponse};
41
42/// A union of [`FlussoDocument`](crate::FlussoDocument) types searched
43/// together — one query, one blended result list, each hit decoded into the
44/// variant matching its index.
45///
46/// `#[derive(FlussoMultiDocument)]` (the `derive` feature) implements it for
47/// an enum with one single-field variant per document type. Without the
48/// derive, the impl is written by hand — exactly what the derive generates:
49///
50/// ```no_run
51/// use flusso_query::{FlussoDocument, FlussoIndex, FlussoMultiDocument, Error, Result, Segment};
52/// use serde_json::Value;
53/// # #[derive(serde::Deserialize)] struct User { email: String }
54/// # impl FlussoDocument for User { const PATH: &'static [Segment] = &[]; }
55/// # impl FlussoIndex for User {
56/// # const INDEX: &'static str = "users";
57/// # const SCHEMA_HASH: &'static str = "xxxxxx";
58/// # }
59/// # #[derive(serde::Deserialize)] struct Order { status: String }
60/// # impl FlussoDocument for Order { const PATH: &'static [Segment] = &[]; }
61/// # impl FlussoIndex for Order {
62/// # const INDEX: &'static str = "orders";
63/// # const SCHEMA_HASH: &'static str = "yyyyyy";
64/// # }
65///
66/// /// One item in the storefront's blended search — name it after the
67/// /// surface it serves, like your document structs.
68/// enum StoreItem {
69/// User(User),
70/// Order(Order),
71/// }
72///
73/// impl FlussoMultiDocument for StoreItem {
74/// const TARGETS: &'static [(&'static str, &'static str)] = &[
75/// (User::INDEX, User::SCHEMA_HASH),
76/// (Order::INDEX, Order::SCHEMA_HASH),
77/// ];
78///
79/// fn decode(physical_index: &str, source: Value) -> Result<Self> {
80/// if physical_index == User::physical_index() {
81/// return Ok(Self::User(serde_json::from_value(source)?));
82/// }
83/// if physical_index == Order::physical_index() {
84/// return Ok(Self::Order(serde_json::from_value(source)?));
85/// }
86/// Err(Error::UnexpectedIndex { index: physical_index.to_owned() })
87/// }
88/// }
89/// ```
90pub trait FlussoMultiDocument: Sized {
91 /// The `(logical index, schema hash)` pair of every document type in the
92 /// union, in variant order — each is that type's
93 /// [`INDEX`](crate::FlussoIndex::INDEX) /
94 /// [`SCHEMA_HASH`](crate::FlussoIndex::SCHEMA_HASH).
95 const TARGETS: &'static [(&'static str, &'static str)];
96
97 /// Decode one hit's `_source` into the right variant, dispatching on the
98 /// hit's physical index name. A hit from an index no variant claims is
99 /// [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
100 fn decode(physical_index: &str, source: Value) -> Result<Self>;
101
102 /// Start a typed query across all of this union's indexes. Like
103 /// [`FlussoIndex::query`](crate::FlussoIndex::query), the returned
104 /// builder is a plain client-free value.
105 fn query() -> MultiSearch<Self> {
106 MultiSearch::new()
107 }
108}
109
110/// A typed query across every index of a [`FlussoMultiDocument`] union — the
111/// blended counterpart of [`Search`](crate::Search), with the same clause
112/// builder and the same client-free shape.
113///
114/// Hits come back in **one** relevance-ranked list; `from`/`size` page that
115/// blended list, not each index. Terminals: [`send`](Self::send) for a typed
116/// page of union values, [`count`](Self::count) for the total matches across
117/// all the indexes.
118#[derive(Debug, Clone)]
119pub struct MultiSearch<U> {
120 /// The comma-joined physical index list the request addresses.
121 path: String,
122 bool_query: BoolBuilder,
123 raw: Option<Value>,
124 sort: Vec<Sort>,
125 from: Option<u64>,
126 size: Option<u64>,
127 _marker: PhantomData<fn() -> U>,
128}
129
130impl<U: FlussoMultiDocument> MultiSearch<U> {
131 /// Start a query across the union's indexes (usually via
132 /// [`FlussoMultiDocument::query`]).
133 #[must_use]
134 pub fn new() -> Self {
135 let path = U::TARGETS
136 .iter()
137 .map(|(index, hash)| format!("{index}_{hash}"))
138 .collect::<Vec<_>>()
139 .join(",");
140 Self {
141 path,
142 bool_query: BoolBuilder::default(),
143 raw: None,
144 sort: Vec::new(),
145 from: None,
146 size: None,
147 _marker: PhantomData,
148 }
149 }
150
151 /// A scoring clause (`bool.must`). Root-scope queries from *any* of the
152 /// union's document types compose here; a field unmapped in one index
153 /// simply doesn't match there. An absent clause adds nothing.
154 #[must_use]
155 pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
156 if let Some(query) = query.into_query() {
157 self.bool_query.push_must(query.into_inner());
158 }
159 self
160 }
161
162 /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
163 /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
164 #[must_use]
165 pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
166 if let Some(query) = query.into_query() {
167 self.bool_query.push_filter(query.into_inner());
168 }
169 self
170 }
171
172 /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
173 #[must_use]
174 pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
175 if let Some(query) = query.into_query() {
176 self.bool_query.push_must_not(query.into_inner());
177 }
178 self
179 }
180
181 /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
182 #[must_use]
183 pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
184 if let Some(query) = query.into_query() {
185 self.bool_query.push_should(query.into_inner());
186 }
187 self
188 }
189
190 /// Append a sort key. It applies to the **blended** list, so the field
191 /// must exist in every index of the union (or carry an `unmapped_type` in
192 /// its options) — OpenSearch rejects a sort on a field one index lacks.
193 /// Relevance (no sort) is always safe.
194 #[must_use]
195 pub fn sort(mut self, sort: Sort) -> Self {
196 self.sort.push(sort);
197 self
198 }
199
200 /// Append several sort keys at once — e.g. from a
201 /// [`SortBuilder`](crate::SortBuilder). Equivalent to repeated [`sort`](Self::sort).
202 #[must_use]
203 pub fn sorts(mut self, sorts: impl IntoIterator<Item = Sort>) -> Self {
204 self.sort.extend(sorts);
205 self
206 }
207
208 /// Offset of the first hit to return, in the blended list.
209 #[must_use]
210 pub fn from(mut self, from: u64) -> Self {
211 self.from = Some(from);
212 self
213 }
214
215 /// Maximum number of hits to return, across all the indexes combined.
216 #[must_use]
217 pub fn size(mut self, size: u64) -> Self {
218 self.size = Some(size);
219 self
220 }
221
222 /// Replace the query body with a raw OpenSearch query DSL value. The
223 /// pressure-release valve, as on [`Search`](crate::Search); hits still
224 /// decode into the union.
225 #[must_use]
226 pub fn raw(mut self, query: Value) -> Self {
227 self.raw = Some(query);
228 self
229 }
230
231 /// The comma-joined physical index list this query addresses — one
232 /// `{index}_{hash}` per union variant. For logging and debugging.
233 #[must_use]
234 pub fn physical_path(&self) -> &str {
235 &self.path
236 }
237
238 /// The accumulated query alone: the raw override, the bool clauses, or
239 /// `match_all` when nothing was added.
240 fn query_value(&self) -> Value {
241 match &self.raw {
242 Some(raw) => raw.clone(),
243 None if self.bool_query.is_empty() => crate::handles::match_all_value(),
244 None => self.bool_query.to_value(),
245 }
246 }
247
248 /// The request body this search will POST to `_search`. Pure — useful for
249 /// tests and debugging.
250 #[must_use]
251 pub fn body(&self) -> Value {
252 let mut root = Map::new();
253 root.insert("query".to_string(), self.query_value());
254 if !self.sort.is_empty() {
255 let keys = self.sort.iter().map(Sort::to_value).collect();
256 root.insert("sort".to_string(), Value::Array(keys));
257 }
258 if let Some(from) = self.from {
259 root.insert("from".to_string(), Value::from(from));
260 }
261 if let Some(size) = self.size {
262 root.insert("size".to_string(), Value::from(size));
263 }
264 Value::Object(root)
265 }
266
267 /// The request body [`count`](Self::count) will POST to `_count`: just
268 /// the query (as on [`Search::count_body`](crate::Search::count_body)).
269 #[must_use]
270 pub fn count_body(&self) -> Value {
271 let mut root = Map::new();
272 root.insert("query".to_string(), self.query_value());
273 Value::Object(root)
274 }
275
276 /// Execute the search and decode the blended hits into the union.
277 #[tracing::instrument(
278 name = "search.multi",
279 skip_all,
280 fields(
281 path = %self.path,
282 from = ?self.from,
283 size = ?self.size,
284 total = tracing::field::Empty,
285 took_ms = tracing::field::Empty,
286 ),
287 err,
288 )]
289 pub async fn send(&self, client: &Client) -> Result<SearchResponse<U>> {
290 let body = self.body();
291 let response = client.search_at(&self.path, &body).await?;
292 let page = decode_response::<U>(response, &client.index_prefix)?;
293 let span = tracing::Span::current();
294 span.record("total", page.total);
295 span.record("took_ms", page.took.as_millis() as u64);
296 if page.is_partial() {
297 tracing::warn!(
298 path = %self.path,
299 timed_out = page.timed_out,
300 shards_failed = page.shards.failed,
301 shards_total = page.shards.total,
302 "combined search returned partial results"
303 );
304 }
305 tracing::debug!(
306 total = page.total,
307 hits = page.hits.len(),
308 "combined search completed"
309 );
310 Ok(page)
311 }
312
313 /// Count the matches across all the union's indexes, without fetching
314 /// any hits.
315 #[tracing::instrument(
316 name = "search.multi_count",
317 skip_all,
318 fields(path = %self.path, count = tracing::field::Empty),
319 err,
320 )]
321 pub async fn count(&self, client: &Client) -> Result<u64> {
322 let body = self.count_body();
323 let response = client.count_at(&self.path, &body).await?;
324 let raw: RawCount = serde_json::from_value(response)?;
325 tracing::Span::current().record("count", raw.count);
326 tracing::debug!(count = raw.count, "combined count completed");
327 Ok(raw.count)
328 }
329}
330
331impl<U: FlussoMultiDocument> Default for MultiSearch<U> {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337/// Decode a combined `_search` response: the usual envelope, but each hit's
338/// `_source` is dispatched by the hit's `_index` into the union.
339///
340/// Each `_index` is normalized to a variant's `physical_index()` in two steps
341/// before dispatch:
342///
343/// 1. **Prefix.** `prefix` (the client's index prefix) is stripped — empty for
344/// an unprefixed deployment.
345/// 2. **Generation suffix.** A hit's `_index` is the *concrete* index behind the
346/// hash alias (`{INDEX}_{SCHEMA_HASH}_{n}`), so the trailing `_{n}` is
347/// normalized away. This is anchored on the union's known targets rather than
348/// blindly trimming a trailing `_{digits}`: a name maps to a target when it
349/// equals that target's `{INDEX}_{SCHEMA_HASH}` or is `{that}_{digits}`. The
350/// anchor matters because the eight-hex `SCHEMA_HASH` can itself be all digits
351/// — so a bare `{INDEX}_{numeric-hash}` (a legacy un-suffixed index) must not
352/// be mistaken for a generation of `{INDEX}`.
353///
354/// A name matching no target is passed through unchanged, so the variant's
355/// `decode` reports it as [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
356pub(crate) fn decode_response<U: FlussoMultiDocument>(
357 value: Value,
358 prefix: &str,
359) -> Result<SearchResponse<U>> {
360 let raw: RawMultiResponse = serde_json::from_value(value)?;
361 let hits = raw
362 .hits
363 .hits
364 .into_iter()
365 .map(|hit| {
366 let stripped = hit.index.strip_prefix(prefix).unwrap_or(&hit.index);
367 let index = dispatch_index::<U>(stripped);
368 Ok(Hit {
369 id: hit.id,
370 score: hit.score.unwrap_or(0.0),
371 source: U::decode(index.as_ref(), hit.source)?,
372 })
373 })
374 .collect::<Result<Vec<_>>>()?;
375 Ok(SearchResponse {
376 total: raw.hits.total.value,
377 max_score: raw.hits.max_score,
378 hits,
379 took: Duration::from_millis(raw.took),
380 timed_out: raw.timed_out,
381 shards: raw.shards.into(),
382 })
383}
384
385/// Normalize a (prefix-stripped) hit `_index` to the `physical_index()` of the
386/// union target it belongs to, collapsing the generation suffix the sink's hash
387/// alias hides. Returns the canonical `{INDEX}_{SCHEMA_HASH}` when the name is
388/// that target or a `{target}_{digits}` generation of it; otherwise the name
389/// unchanged (so dispatch misses and surfaces `UnexpectedIndex`).
390fn dispatch_index<U: FlussoMultiDocument>(stripped: &str) -> Cow<'_, str> {
391 for (index, hash) in U::TARGETS {
392 let physical = format!("{index}_{hash}");
393 if stripped == physical || is_generation_of(stripped, &physical) {
394 return Cow::Owned(physical);
395 }
396 }
397 Cow::Borrowed(stripped)
398}
399
400/// Whether `name` is `{physical}_{digits}` — a concrete generation of the hash
401/// alias `physical`. The remainder after `physical_` must be a non-empty run of
402/// ASCII digits (the sink's generation counter).
403fn is_generation_of(name: &str, physical: &str) -> bool {
404 name.strip_prefix(physical)
405 .and_then(|rest| rest.strip_prefix('_'))
406 .is_some_and(|generation| {
407 !generation.is_empty() && generation.bytes().all(|byte| byte.is_ascii_digit())
408 })
409}
410
411#[derive(Deserialize)]
412struct RawMultiResponse {
413 #[serde(default)]
414 took: u64,
415 #[serde(default)]
416 timed_out: bool,
417 #[serde(rename = "_shards", default)]
418 shards: crate::search::RawShards,
419 hits: RawMultiHits,
420}
421
422#[derive(Deserialize)]
423struct RawMultiHits {
424 total: RawMultiTotal,
425 #[serde(default)]
426 max_score: Option<f32>,
427 hits: Vec<RawMultiHit>,
428}
429
430#[derive(Deserialize)]
431struct RawMultiTotal {
432 value: u64,
433}
434
435#[derive(Deserialize)]
436struct RawMultiHit {
437 #[serde(rename = "_index")]
438 index: String,
439 #[serde(rename = "_id")]
440 id: String,
441 #[serde(rename = "_score", default)]
442 score: Option<f32>,
443 #[serde(rename = "_source")]
444 source: Value,
445}