flusso-query 0.10.1

Backend-neutral OpenSearch/Elasticsearch query client for flusso indexes.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
//! Combined search: one query over several indexes, hits ranked together.
//!
//! Where [`Client::msearch`](crate::Client::msearch) runs *independent*
//! searches in one round-trip (separate result lists), a [`MultiSearch`] runs
//! **one** query across every index a [`FlussoMultiDocument`] union spans and
//! returns a single, blended, relevance-ranked result list. Each hit decodes
//! into the union variant matching its physical `_index`.
//!
//! A subtlety: the sink addresses each index by the stable alias
//! `{INDEX}_{SCHEMA_HASH}`, but the alias points at a generation-suffixed
//! concrete index (`{INDEX}_{SCHEMA_HASH}_{n}`), and OpenSearch always reports
//! that *concrete* name in a hit's `_index` — never the alias the query went
//! through. So before dispatching, the suffix is normalized back to the
//! `{INDEX}_{SCHEMA_HASH}` a variant claims (see [`decode_response`]).
//!
//! The union enum is yours: one single-field variant per document type, named
//! after the search surface it serves. `#[derive(FlussoMultiDocument)]` (the
//! `derive` feature) writes the impl; without it, a hand-written impl is two
//! short members — see the trait docs.
//!
//! Root-scope queries already compose across document types ([`Query<Root>`]
//! carries no document type), so any handle mix works in the builder. A field
//! unmapped in one of the indexes simply doesn't match there — but **sorting**
//! on it errors on the OpenSearch side unless the sort carries an
//! `unmapped_type`; prefer sorting on fields all indexes share (or relevance).
//!
//! [`Query<Root>`]: crate::Query

use std::borrow::Cow;
use std::marker::PhantomData;
use std::time::Duration;

use serde::Deserialize;
use serde_json::{Map, Value};

use crate::Client;
use crate::error::Result;
use crate::handles::Sort;
use crate::query::{AsQuery, BoolBuilder, Root};
use crate::search::{Hit, RawCount, SearchResponse};

/// A union of [`FlussoDocument`](crate::FlussoDocument) types searched
/// together — one query, one blended result list, each hit decoded into the
/// variant matching its index.
///
/// `#[derive(FlussoMultiDocument)]` (the `derive` feature) implements it for
/// an enum with one single-field variant per document type. Without the
/// derive, the impl is written by hand — exactly what the derive generates:
///
/// ```no_run
/// use flusso_query::{FlussoDocument, FlussoIndex, FlussoMultiDocument, Error, Result, Segment};
/// use serde_json::Value;
/// # #[derive(serde::Deserialize)] struct User { email: String }
/// # impl FlussoDocument for User { const PATH: &'static [Segment] = &[]; }
/// # impl FlussoIndex for User {
/// #     const INDEX: &'static str = "users";
/// #     const SCHEMA_HASH: &'static str = "xxxxxx";
/// # }
/// # #[derive(serde::Deserialize)] struct Order { status: String }
/// # impl FlussoDocument for Order { const PATH: &'static [Segment] = &[]; }
/// # impl FlussoIndex for Order {
/// #     const INDEX: &'static str = "orders";
/// #     const SCHEMA_HASH: &'static str = "yyyyyy";
/// # }
///
/// /// One item in the storefront's blended search — name it after the
/// /// surface it serves, like your document structs.
/// enum StoreItem {
///     User(User),
///     Order(Order),
/// }
///
/// impl FlussoMultiDocument for StoreItem {
///     const TARGETS: &'static [(&'static str, &'static str)] = &[
///         (User::INDEX, User::SCHEMA_HASH),
///         (Order::INDEX, Order::SCHEMA_HASH),
///     ];
///
///     fn decode(physical_index: &str, source: Value) -> Result<Self> {
///         if physical_index == User::physical_index() {
///             return Ok(Self::User(serde_json::from_value(source)?));
///         }
///         if physical_index == Order::physical_index() {
///             return Ok(Self::Order(serde_json::from_value(source)?));
///         }
///         Err(Error::UnexpectedIndex { index: physical_index.to_owned() })
///     }
/// }
/// ```
pub trait FlussoMultiDocument: Sized {
    /// The `(logical index, schema hash)` pair of every document type in the
    /// union, in variant order — each is that type's
    /// [`INDEX`](crate::FlussoIndex::INDEX) /
    /// [`SCHEMA_HASH`](crate::FlussoIndex::SCHEMA_HASH).
    const TARGETS: &'static [(&'static str, &'static str)];

    /// Decode one hit's `_source` into the right variant, dispatching on the
    /// hit's physical index name. A hit from an index no variant claims is
    /// [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
    fn decode(physical_index: &str, source: Value) -> Result<Self>;

    /// Start a typed query across all of this union's indexes. Like
    /// [`FlussoIndex::query`](crate::FlussoIndex::query), the returned
    /// builder is a plain client-free value.
    fn query() -> MultiSearch<Self> {
        MultiSearch::new()
    }
}

/// A typed query across every index of a [`FlussoMultiDocument`] union — the
/// blended counterpart of [`Search`](crate::Search), with the same clause
/// builder and the same client-free shape.
///
/// Hits come back in **one** relevance-ranked list; `from`/`size` page that
/// blended list, not each index. Terminals: [`send`](Self::send) for a typed
/// page of union values, [`count`](Self::count) for the total matches across
/// all the indexes.
#[derive(Debug, Clone)]
pub struct MultiSearch<U> {
    /// The comma-joined physical index list the request addresses.
    path: String,
    bool_query: BoolBuilder,
    raw: Option<Value>,
    sort: Vec<Sort>,
    from: Option<u64>,
    size: Option<u64>,
    _marker: PhantomData<fn() -> U>,
}

impl<U: FlussoMultiDocument> MultiSearch<U> {
    /// Start a query across the union's indexes (usually via
    /// [`FlussoMultiDocument::query`]).
    #[must_use]
    pub fn new() -> Self {
        let path = U::TARGETS
            .iter()
            .map(|(index, hash)| format!("{index}_{hash}"))
            .collect::<Vec<_>>()
            .join(",");
        Self {
            path,
            bool_query: BoolBuilder::default(),
            raw: None,
            sort: Vec::new(),
            from: None,
            size: None,
            _marker: PhantomData,
        }
    }

    /// A scoring clause (`bool.must`). Root-scope queries from *any* of the
    /// union's document types compose here; a field unmapped in one index
    /// simply doesn't match there. An absent clause adds nothing.
    #[must_use]
    pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
        if let Some(query) = query.into_query() {
            self.bool_query.push_must(query.into_inner());
        }
        self
    }

    /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
    /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
    #[must_use]
    pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
        if let Some(query) = query.into_query() {
            self.bool_query.push_filter(query.into_inner());
        }
        self
    }

    /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
    #[must_use]
    pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
        if let Some(query) = query.into_query() {
            self.bool_query.push_must_not(query.into_inner());
        }
        self
    }

    /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
    #[must_use]
    pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
        if let Some(query) = query.into_query() {
            self.bool_query.push_should(query.into_inner());
        }
        self
    }

    /// Append a sort key. It applies to the **blended** list, so the field
    /// must exist in every index of the union (or carry an `unmapped_type` in
    /// its options) — OpenSearch rejects a sort on a field one index lacks.
    /// Relevance (no sort) is always safe.
    #[must_use]
    pub fn sort(mut self, sort: Sort) -> Self {
        self.sort.push(sort);
        self
    }

    /// Append several sort keys at once — e.g. from a
    /// [`SortBuilder`](crate::SortBuilder). Equivalent to repeated [`sort`](Self::sort).
    #[must_use]
    pub fn sorts(mut self, sorts: impl IntoIterator<Item = Sort>) -> Self {
        self.sort.extend(sorts);
        self
    }

    /// Offset of the first hit to return, in the blended list.
    #[must_use]
    pub fn from(mut self, from: u64) -> Self {
        self.from = Some(from);
        self
    }

    /// Maximum number of hits to return, across all the indexes combined.
    #[must_use]
    pub fn size(mut self, size: u64) -> Self {
        self.size = Some(size);
        self
    }

    /// Replace the query body with a raw OpenSearch query DSL value. The
    /// pressure-release valve, as on [`Search`](crate::Search); hits still
    /// decode into the union.
    #[must_use]
    pub fn raw(mut self, query: Value) -> Self {
        self.raw = Some(query);
        self
    }

    /// The comma-joined physical index list this query addresses — one
    /// `{index}_{hash}` per union variant. For logging and debugging.
    #[must_use]
    pub fn physical_path(&self) -> &str {
        &self.path
    }

    /// The accumulated query alone: the raw override, the bool clauses, or
    /// `match_all` when nothing was added.
    fn query_value(&self) -> Value {
        match &self.raw {
            Some(raw) => raw.clone(),
            None if self.bool_query.is_empty() => crate::handles::match_all_value(),
            None => self.bool_query.to_value(),
        }
    }

    /// The request body this search will POST to `_search`. Pure — useful for
    /// tests and debugging.
    #[must_use]
    pub fn body(&self) -> Value {
        let mut root = Map::new();
        root.insert("query".to_string(), self.query_value());
        if !self.sort.is_empty() {
            let keys = self.sort.iter().map(Sort::to_value).collect();
            root.insert("sort".to_string(), Value::Array(keys));
        }
        if let Some(from) = self.from {
            root.insert("from".to_string(), Value::from(from));
        }
        if let Some(size) = self.size {
            root.insert("size".to_string(), Value::from(size));
        }
        Value::Object(root)
    }

    /// The request body [`count`](Self::count) will POST to `_count`: just
    /// the query (as on [`Search::count_body`](crate::Search::count_body)).
    #[must_use]
    pub fn count_body(&self) -> Value {
        let mut root = Map::new();
        root.insert("query".to_string(), self.query_value());
        Value::Object(root)
    }

    /// Execute the search and decode the blended hits into the union.
    #[tracing::instrument(
        name = "search.multi",
        skip_all,
        fields(
            path = %self.path,
            from = ?self.from,
            size = ?self.size,
            total = tracing::field::Empty,
            took_ms = tracing::field::Empty,
        ),
        err,
    )]
    pub async fn send(&self, client: &Client) -> Result<SearchResponse<U>> {
        let body = self.body();
        let response = client.search_at(&self.path, &body).await?;
        let page = decode_response::<U>(response, &client.index_prefix)?;
        let span = tracing::Span::current();
        span.record("total", page.total);
        span.record("took_ms", page.took.as_millis() as u64);
        if page.is_partial() {
            tracing::warn!(
                path = %self.path,
                timed_out = page.timed_out,
                shards_failed = page.shards.failed,
                shards_total = page.shards.total,
                "combined search returned partial results"
            );
        }
        tracing::debug!(
            total = page.total,
            hits = page.hits.len(),
            "combined search completed"
        );
        Ok(page)
    }

    /// Count the matches across all the union's indexes, without fetching
    /// any hits.
    #[tracing::instrument(
        name = "search.multi_count",
        skip_all,
        fields(path = %self.path, count = tracing::field::Empty),
        err,
    )]
    pub async fn count(&self, client: &Client) -> Result<u64> {
        let body = self.count_body();
        let response = client.count_at(&self.path, &body).await?;
        let raw: RawCount = serde_json::from_value(response)?;
        tracing::Span::current().record("count", raw.count);
        tracing::debug!(count = raw.count, "combined count completed");
        Ok(raw.count)
    }
}

impl<U: FlussoMultiDocument> Default for MultiSearch<U> {
    fn default() -> Self {
        Self::new()
    }
}

/// Decode a combined `_search` response: the usual envelope, but each hit's
/// `_source` is dispatched by the hit's `_index` into the union.
///
/// Each `_index` is normalized to a variant's `physical_index()` in two steps
/// before dispatch:
///
/// 1. **Prefix.** `prefix` (the client's index prefix) is stripped — empty for
///    an unprefixed deployment.
/// 2. **Generation suffix.** A hit's `_index` is the *concrete* index behind the
///    hash alias (`{INDEX}_{SCHEMA_HASH}_{n}`), so the trailing `_{n}` is
///    normalized away. This is anchored on the union's known targets rather than
///    blindly trimming a trailing `_{digits}`: a name maps to a target when it
///    equals that target's `{INDEX}_{SCHEMA_HASH}` or is `{that}_{digits}`. The
///    anchor matters because the eight-hex `SCHEMA_HASH` can itself be all digits
///    — so a bare `{INDEX}_{numeric-hash}` (a legacy un-suffixed index) must not
///    be mistaken for a generation of `{INDEX}`.
///
/// A name matching no target is passed through unchanged, so the variant's
/// `decode` reports it as [`Error::UnexpectedIndex`](crate::Error::UnexpectedIndex).
pub(crate) fn decode_response<U: FlussoMultiDocument>(
    value: Value,
    prefix: &str,
) -> Result<SearchResponse<U>> {
    let raw: RawMultiResponse = serde_json::from_value(value)?;
    let hits = raw
        .hits
        .hits
        .into_iter()
        .map(|hit| {
            let stripped = hit.index.strip_prefix(prefix).unwrap_or(&hit.index);
            let index = dispatch_index::<U>(stripped);
            Ok(Hit {
                id: hit.id,
                score: hit.score.unwrap_or(0.0),
                source: U::decode(index.as_ref(), hit.source)?,
            })
        })
        .collect::<Result<Vec<_>>>()?;
    Ok(SearchResponse {
        total: raw.hits.total.value,
        max_score: raw.hits.max_score,
        hits,
        took: Duration::from_millis(raw.took),
        timed_out: raw.timed_out,
        shards: raw.shards.into(),
    })
}

/// Normalize a (prefix-stripped) hit `_index` to the `physical_index()` of the
/// union target it belongs to, collapsing the generation suffix the sink's hash
/// alias hides. Returns the canonical `{INDEX}_{SCHEMA_HASH}` when the name is
/// that target or a `{target}_{digits}` generation of it; otherwise the name
/// unchanged (so dispatch misses and surfaces `UnexpectedIndex`).
fn dispatch_index<U: FlussoMultiDocument>(stripped: &str) -> Cow<'_, str> {
    for (index, hash) in U::TARGETS {
        let physical = format!("{index}_{hash}");
        if stripped == physical || is_generation_of(stripped, &physical) {
            return Cow::Owned(physical);
        }
    }
    Cow::Borrowed(stripped)
}

/// Whether `name` is `{physical}_{digits}` — a concrete generation of the hash
/// alias `physical`. The remainder after `physical_` must be a non-empty run of
/// ASCII digits (the sink's generation counter).
fn is_generation_of(name: &str, physical: &str) -> bool {
    name.strip_prefix(physical)
        .and_then(|rest| rest.strip_prefix('_'))
        .is_some_and(|generation| {
            !generation.is_empty() && generation.bytes().all(|byte| byte.is_ascii_digit())
        })
}

#[derive(Deserialize)]
struct RawMultiResponse {
    #[serde(default)]
    took: u64,
    #[serde(default)]
    timed_out: bool,
    #[serde(rename = "_shards", default)]
    shards: crate::search::RawShards,
    hits: RawMultiHits,
}

#[derive(Deserialize)]
struct RawMultiHits {
    total: RawMultiTotal,
    #[serde(default)]
    max_score: Option<f32>,
    hits: Vec<RawMultiHit>,
}

#[derive(Deserialize)]
struct RawMultiTotal {
    value: u64,
}

#[derive(Deserialize)]
struct RawMultiHit {
    #[serde(rename = "_index")]
    index: String,
    #[serde(rename = "_id")]
    id: String,
    #[serde(rename = "_score", default)]
    score: Option<f32>,
    #[serde(rename = "_source")]
    source: Value,
}