flusso_query/multi.rs
1//! Combined search: one query over several indexes, hits ranked together.
2//!
3//! Where [`Client::msearch`](crate::Client::msearch) runs *independent*
4//! searches in one round-trip (separate result lists), a [`MultiSearch`] runs
5//! **one** query across every index a [`FlussoMultiDocument`] union spans and
6//! returns a single, blended, relevance-ranked result list. Each hit decodes
7//! into the union variant matching its physical `_index` — the sink writes
8//! exactly `{INDEX}_{SCHEMA_HASH}`, so dispatch is precise, no alias involved.
9//!
10//! The union enum is yours: one single-field variant per document type, named
11//! after the search surface it serves. `#[derive(FlussoMultiDocument)]` (the
12//! `derive` feature) writes the impl; without it, a hand-written impl is two
13//! short members — see the trait docs.
14//!
15//! Root-scope queries already compose across document types ([`Query<Root>`]
16//! carries no document type), so any handle mix works in the builder. A field
17//! unmapped in one of the indexes simply doesn't match there — but **sorting**
18//! on it errors on the OpenSearch side unless the sort carries an
19//! `unmapped_type`; prefer sorting on fields all indexes share (or relevance).
20//!
21//! [`Query<Root>`]: crate::Query
22
23use std::marker::PhantomData;
24use std::time::Duration;
25
26use serde::Deserialize;
27use serde_json::{Map, Value};
28
29use crate::Client;
30use crate::error::Result;
31use crate::handles::Sort;
32use crate::query::{AsQuery, BoolBuilder, Root};
33use crate::search::{Hit, RawCount, SearchResponse};
34
35/// A union of [`FlussoDocument`](crate::FlussoDocument) types searched
36/// together — one query, one blended result list, each hit decoded into the
37/// variant matching its index.
38///
39/// `#[derive(FlussoMultiDocument)]` (the `derive` feature) implements it for
40/// an enum with one single-field variant per document type. Without the
41/// derive, the impl is written by hand — exactly what the derive generates:
42///
43/// ```no_run
44/// use flusso_query::{FlussoDocument, FlussoMultiDocument, Error, Result};
45/// use serde_json::Value;
46/// # #[derive(serde::Deserialize)] struct User { email: String }
47/// # impl FlussoDocument for User {
48/// # const INDEX: &'static str = "users";
49/// # const SCHEMA_HASH: &'static str = "xxxxxx";
50/// # }
51/// # #[derive(serde::Deserialize)] struct Order { status: String }
52/// # impl FlussoDocument for Order {
53/// # const INDEX: &'static str = "orders";
54/// # const SCHEMA_HASH: &'static str = "yyyyyy";
55/// # }
56///
57/// /// One item in the storefront's blended search — name it after the
58/// /// surface it serves, like your document structs.
59/// enum StoreItem {
60/// User(User),
61/// Order(Order),
62/// }
63///
64/// impl FlussoMultiDocument for StoreItem {
65/// const TARGETS: &'static [(&'static str, &'static str)] = &[
66/// (User::INDEX, User::SCHEMA_HASH),
67/// (Order::INDEX, Order::SCHEMA_HASH),
68/// ];
69///
70/// fn decode(physical_index: &str, source: Value) -> Result<Self> {
71/// if physical_index == User::physical_index() {
72/// return Ok(Self::User(serde_json::from_value(source)?));
73/// }
74/// if physical_index == Order::physical_index() {
75/// return Ok(Self::Order(serde_json::from_value(source)?));
76/// }
77/// Err(Error::UnexpectedIndex { index: physical_index.to_owned() })
78/// }
79/// }
80/// ```
81pub trait FlussoMultiDocument: Sized {
82 /// The `(logical index, schema hash)` pair of every document type in the
83 /// union, in variant order — each is that type's
84 /// [`INDEX`](crate::FlussoDocument::INDEX) /
85 /// [`SCHEMA_HASH`](crate::FlussoDocument::SCHEMA_HASH).
86 const TARGETS: &'static [(&'static str, &'static str)];
87
88 /// Decode one hit's `_source` into the right variant, dispatching on the
89 /// hit's physical index name. A hit from an index no variant claims is
90 /// [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
91 fn decode(physical_index: &str, source: Value) -> Result<Self>;
92
93 /// Start a typed query across all of this union's indexes. Like
94 /// [`FlussoDocument::query`](crate::FlussoDocument::query), the returned
95 /// builder is a plain client-free value.
96 fn query() -> MultiSearch<Self> {
97 MultiSearch::new()
98 }
99}
100
101/// A typed query across every index of a [`FlussoMultiDocument`] union — the
102/// blended counterpart of [`Search`](crate::Search), with the same clause
103/// builder and the same client-free shape.
104///
105/// Hits come back in **one** relevance-ranked list; `from`/`size` page that
106/// blended list, not each index. Terminals: [`send`](Self::send) for a typed
107/// page of union values, [`count`](Self::count) for the total matches across
108/// all the indexes.
109#[derive(Debug, Clone)]
110pub struct MultiSearch<U> {
111 /// The comma-joined physical index list the request addresses.
112 path: String,
113 bool_query: BoolBuilder,
114 raw: Option<Value>,
115 sort: Vec<Sort>,
116 from: Option<u64>,
117 size: Option<u64>,
118 _marker: PhantomData<fn() -> U>,
119}
120
121impl<U: FlussoMultiDocument> MultiSearch<U> {
122 /// Start a query across the union's indexes (usually via
123 /// [`FlussoMultiDocument::query`]).
124 #[must_use]
125 pub fn new() -> Self {
126 let path = U::TARGETS
127 .iter()
128 .map(|(index, hash)| format!("{index}_{hash}"))
129 .collect::<Vec<_>>()
130 .join(",");
131 Self {
132 path,
133 bool_query: BoolBuilder::default(),
134 raw: None,
135 sort: Vec::new(),
136 from: None,
137 size: None,
138 _marker: PhantomData,
139 }
140 }
141
142 /// A scoring clause (`bool.must`). Root-scope queries from *any* of the
143 /// union's document types compose here; a field unmapped in one index
144 /// simply doesn't match there. An absent clause adds nothing.
145 #[must_use]
146 pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
147 if let Some(query) = query.into_query() {
148 self.bool_query.push_must(query.into_inner());
149 }
150 self
151 }
152
153 /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
154 /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
155 #[must_use]
156 pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
157 if let Some(query) = query.into_query() {
158 self.bool_query.push_filter(query.into_inner());
159 }
160 self
161 }
162
163 /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
164 #[must_use]
165 pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
166 if let Some(query) = query.into_query() {
167 self.bool_query.push_must_not(query.into_inner());
168 }
169 self
170 }
171
172 /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
173 #[must_use]
174 pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
175 if let Some(query) = query.into_query() {
176 self.bool_query.push_should(query.into_inner());
177 }
178 self
179 }
180
181 /// Append a sort key. It applies to the **blended** list, so the field
182 /// must exist in every index of the union (or carry an `unmapped_type` in
183 /// its options) — OpenSearch rejects a sort on a field one index lacks.
184 /// Relevance (no sort) is always safe.
185 #[must_use]
186 pub fn sort(mut self, sort: Sort) -> Self {
187 self.sort.push(sort);
188 self
189 }
190
191 /// Offset of the first hit to return, in the blended list.
192 #[must_use]
193 pub fn from(mut self, from: u64) -> Self {
194 self.from = Some(from);
195 self
196 }
197
198 /// Maximum number of hits to return, across all the indexes combined.
199 #[must_use]
200 pub fn size(mut self, size: u64) -> Self {
201 self.size = Some(size);
202 self
203 }
204
205 /// Replace the query body with a raw OpenSearch query DSL value. The
206 /// pressure-release valve, as on [`Search`](crate::Search); hits still
207 /// decode into the union.
208 #[must_use]
209 pub fn raw(mut self, query: Value) -> Self {
210 self.raw = Some(query);
211 self
212 }
213
214 /// The comma-joined physical index list this query addresses — one
215 /// `{index}_{hash}` per union variant. For logging and debugging.
216 #[must_use]
217 pub fn physical_path(&self) -> &str {
218 &self.path
219 }
220
221 /// The accumulated query alone: the raw override, the bool clauses, or
222 /// `match_all` when nothing was added.
223 fn query_value(&self) -> Value {
224 match &self.raw {
225 Some(raw) => raw.clone(),
226 None if self.bool_query.is_empty() => crate::handles::match_all_value(),
227 None => self.bool_query.to_value(),
228 }
229 }
230
231 /// The request body this search will POST to `_search`. Pure — useful for
232 /// tests and debugging.
233 #[must_use]
234 pub fn body(&self) -> Value {
235 let mut root = Map::new();
236 root.insert("query".to_string(), self.query_value());
237 if !self.sort.is_empty() {
238 let keys = self.sort.iter().map(Sort::to_value).collect();
239 root.insert("sort".to_string(), Value::Array(keys));
240 }
241 if let Some(from) = self.from {
242 root.insert("from".to_string(), Value::from(from));
243 }
244 if let Some(size) = self.size {
245 root.insert("size".to_string(), Value::from(size));
246 }
247 Value::Object(root)
248 }
249
250 /// The request body [`count`](Self::count) will POST to `_count`: just
251 /// the query (as on [`Search::count_body`](crate::Search::count_body)).
252 #[must_use]
253 pub fn count_body(&self) -> Value {
254 let mut root = Map::new();
255 root.insert("query".to_string(), self.query_value());
256 Value::Object(root)
257 }
258
259 /// Execute the search and decode the blended hits into the union.
260 #[tracing::instrument(
261 name = "search.multi",
262 skip_all,
263 fields(
264 path = %self.path,
265 from = ?self.from,
266 size = ?self.size,
267 total = tracing::field::Empty,
268 took_ms = tracing::field::Empty,
269 ),
270 err,
271 )]
272 pub async fn send(&self, client: &Client) -> Result<SearchResponse<U>> {
273 let body = self.body();
274 let response = client.search_at(&self.path, &body).await?;
275 let page = decode_response::<U>(response)?;
276 let span = tracing::Span::current();
277 span.record("total", page.total);
278 span.record("took_ms", page.took.as_millis() as u64);
279 tracing::debug!(
280 total = page.total,
281 hits = page.hits.len(),
282 "combined search completed"
283 );
284 Ok(page)
285 }
286
287 /// Count the matches across all the union's indexes, without fetching
288 /// any hits.
289 #[tracing::instrument(
290 name = "search.multi_count",
291 skip_all,
292 fields(path = %self.path, count = tracing::field::Empty),
293 err,
294 )]
295 pub async fn count(&self, client: &Client) -> Result<u64> {
296 let body = self.count_body();
297 let response = client.count_at(&self.path, &body).await?;
298 let raw: RawCount = serde_json::from_value(response)?;
299 tracing::Span::current().record("count", raw.count);
300 tracing::debug!(count = raw.count, "combined count completed");
301 Ok(raw.count)
302 }
303}
304
305impl<U: FlussoMultiDocument> Default for MultiSearch<U> {
306 fn default() -> Self {
307 Self::new()
308 }
309}
310
311/// Decode a combined `_search` response: the usual envelope, but each hit's
312/// `_source` is dispatched by the hit's `_index` into the union.
313pub(crate) fn decode_response<U: FlussoMultiDocument>(value: Value) -> Result<SearchResponse<U>> {
314 let raw: RawMultiResponse = serde_json::from_value(value)?;
315 let hits = raw
316 .hits
317 .hits
318 .into_iter()
319 .map(|hit| {
320 Ok(Hit {
321 id: hit.id,
322 score: hit.score.unwrap_or(0.0),
323 source: U::decode(&hit.index, hit.source)?,
324 })
325 })
326 .collect::<Result<Vec<_>>>()?;
327 Ok(SearchResponse {
328 total: raw.hits.total.value,
329 max_score: raw.hits.max_score,
330 hits,
331 took: Duration::from_millis(raw.took),
332 })
333}
334
335// ---- wire types ------------------------------------------------------------
336//
337// Mirrors `search.rs`'s response shapes, but keeps each hit's `_index` (the
338// dispatch key) and defers `_source` to `Value` (decoded per variant).
339
340#[derive(Deserialize)]
341struct RawMultiResponse {
342 #[serde(default)]
343 took: u64,
344 hits: RawMultiHits,
345}
346
347#[derive(Deserialize)]
348struct RawMultiHits {
349 total: RawMultiTotal,
350 #[serde(default)]
351 max_score: Option<f32>,
352 hits: Vec<RawMultiHit>,
353}
354
355#[derive(Deserialize)]
356struct RawMultiTotal {
357 value: u64,
358}
359
360#[derive(Deserialize)]
361struct RawMultiHit {
362 #[serde(rename = "_index")]
363 index: String,
364 #[serde(rename = "_id")]
365 id: String,
366 #[serde(rename = "_score", default)]
367 score: Option<f32>,
368 #[serde(rename = "_source")]
369 source: Value,
370}