Skip to main content

flusso_query/
search.rs

1//! The [`Search`] builder and the typed [`SearchResponse`] / [`Hit`] results.
2
3use std::marker::PhantomData;
4use std::time::Duration;
5
6use serde::Deserialize;
7use serde::de::DeserializeOwned;
8use serde_json::{Map, Value};
9
10use crate::Client;
11use crate::error::Result;
12use crate::handles::{NestedProjection, Sort};
13use crate::query::{AsQuery, BoolBuilder, Root};
14
15/// A document type bound to a flusso-maintained index — the trait that
16/// `#[derive(FlussoDocument)]` implements.
17///
18/// The derive supplies [`INDEX`](Self::INDEX) and [`SCHEMA_HASH`](Self::SCHEMA_HASH)
19/// (the physical index is `{INDEX}_{SCHEMA_HASH}`, exactly what the OpenSearch
20/// sink writes); [`query`](Self::query) and [`get`](Self::get) are provided.
21/// `DeserializeOwned` is required so search hits and fetched documents decode.
22pub trait FlussoDocument: DeserializeOwned {
23    /// The logical index name this binding queries.
24    const INDEX: &'static str;
25
26    /// The schema hash this binding was generated from (the physical-index suffix).
27    const SCHEMA_HASH: &'static str;
28
29    /// The physical index this binding addresses — `{INDEX}_{SCHEMA_HASH}`,
30    /// exactly what the sink writes. Useful for logging, admin, and
31    /// hand-written [`FlussoMultiDocument`](crate::FlussoMultiDocument) impls
32    /// dispatching hits by their `_index`.
33    fn physical_index() -> String {
34        format!("{}_{}", Self::INDEX, Self::SCHEMA_HASH)
35    }
36
37    /// Start a typed query against this index. No client is involved: the
38    /// returned [`Search`] is a plain value — build it anywhere, store it,
39    /// clone it, and hand a [`Client`] to a terminal
40    /// ([`send`](Search::send) / [`ids`](Search::ids) / [`count`](Search::count))
41    /// when it's time to run.
42    fn query() -> Search<Self> {
43        Search::new(Self::INDEX, Self::SCHEMA_HASH)
44    }
45
46    /// Fetch one document by id; `None` when absent.
47    fn get(
48        client: &Client,
49        id: impl std::fmt::Display,
50    ) -> impl std::future::Future<Output = Result<Option<Self>>> {
51        client.get_one::<Self>(Self::INDEX, Self::SCHEMA_HASH, id)
52    }
53}
54
55/// A typed query against one index — a plain, client-free value.
56///
57/// Built from [`FlussoDocument::query`] (or `Search::new(index, hash)` by
58/// hand). Clauses accumulate into a bool query: `query`/`should` score,
59/// `filter`/`must_not` don't. Because no client (and no lifetime) is
60/// involved, a `Search` can be named, stored, cloned, and reused; a [`Client`]
61/// appears only at the terminals — [`Search::send`] for a page of hits,
62/// [`Search::ids`] for a page of bare document ids, [`Search::count`] for
63/// just the number of matches (all `&self`, so running consumes nothing) —
64/// or several searches go in one round-trip via [`Client::msearch`].
65#[derive(Debug, Clone)]
66pub struct Search<T> {
67    index: String,
68    hash: String,
69    bool_query: BoolBuilder,
70    raw: Option<Value>,
71    sort: Vec<Sort>,
72    from: Option<u64>,
73    size: Option<u64>,
74    nested: Vec<NestedProjection>,
75    min_score: Option<f32>,
76    track_total_hits: Option<Value>,
77    track_scores: Option<bool>,
78    search_after: Option<Vec<Value>>,
79    collapse: Option<Value>,
80    post_filter: Option<Value>,
81    highlight: Option<Highlight>,
82    _marker: PhantomData<fn() -> T>,
83}
84
85impl<T> Search<T> {
86    /// Start a query against `index` (the logical name) and its schema `hash`.
87    pub fn new(index: impl Into<String>, hash: impl Into<String>) -> Self {
88        Self {
89            index: index.into(),
90            hash: hash.into(),
91            bool_query: BoolBuilder::default(),
92            raw: None,
93            sort: Vec::new(),
94            from: None,
95            size: None,
96            nested: Vec::new(),
97            min_score: None,
98            track_total_hits: None,
99            track_scores: None,
100            search_after: None,
101            collapse: None,
102            post_filter: None,
103            highlight: None,
104            _marker: PhantomData,
105        }
106    }
107
108    /// A scoring clause (`bool.must`). Accepts any root-scope [`AsQuery`]; an
109    /// absent one (e.g. a `None` optional) adds nothing.
110    #[must_use]
111    pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
112        if let Some(query) = query.into_query() {
113            self.bool_query.push_must(query.into_inner());
114        }
115        self
116    }
117
118    /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
119    /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
120    #[must_use]
121    pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
122        if let Some(query) = query.into_query() {
123            self.bool_query.push_filter(query.into_inner());
124        }
125        self
126    }
127
128    /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
129    #[must_use]
130    pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
131        if let Some(query) = query.into_query() {
132            self.bool_query.push_must_not(query.into_inner());
133        }
134        self
135    }
136
137    /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
138    #[must_use]
139    pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
140        if let Some(query) = query.into_query() {
141            self.bool_query.push_should(query.into_inner());
142        }
143        self
144    }
145
146    /// Require at least this many `should` clauses to match. Beside `query` /
147    /// `filter` clauses, `should` defaults to non-constraining (scoring only);
148    /// setting this makes a top-level `should`-group a real filter. Accepts an
149    /// integer (`1`) or an expression string (`"75%"`).
150    #[must_use]
151    pub fn min_should_match(mut self, value: impl Into<Value>) -> Self {
152        self.bool_query.set_min_should_match(value.into());
153        self
154    }
155
156    #[must_use]
157    pub fn sort(mut self, sort: Sort) -> Self {
158        self.sort.push(sort);
159        self
160    }
161
162    /// Drop hits scoring below `min_score`.
163    #[must_use]
164    pub fn min_score(mut self, min_score: f32) -> Self {
165        self.min_score = Some(min_score);
166        self
167    }
168
169    /// Control how the hit total is counted. `true` counts exactly, `false`
170    /// disables counting, an integer caps accuracy at that many (e.g. `10_000`).
171    #[must_use]
172    pub fn track_total_hits(mut self, track: impl Into<Value>) -> Self {
173        self.track_total_hits = Some(track.into());
174        self
175    }
176
177    /// Compute relevance scores even when sorting by a field.
178    #[must_use]
179    pub fn track_scores(mut self, track: bool) -> Self {
180        self.track_scores = Some(track);
181        self
182    }
183
184    /// Deep-paginate after the given sort values (the last hit's `sort` array
185    /// from the previous page). Pair with a deterministic [`sort`](Self::sort).
186    #[must_use]
187    pub fn search_after(mut self, values: impl IntoIterator<Item = impl Into<Value>>) -> Self {
188        self.search_after = Some(values.into_iter().map(Into::into).collect());
189        self
190    }
191
192    /// Collapse hits so only the top hit per `field` value is returned.
193    #[must_use]
194    pub fn collapse(mut self, field: impl Into<String>) -> Self {
195        let mut body = Map::new();
196        body.insert("field".to_string(), Value::String(field.into()));
197        self.collapse = Some(Value::Object(body));
198        self
199    }
200
201    /// A filter applied **after** scoring/aggregation — narrows the returned
202    /// hits without affecting scores or aggregations.
203    #[must_use]
204    pub fn post_filter(mut self, query: impl AsQuery<Root>) -> Self {
205        if let Some(query) = query.into_query() {
206            self.post_filter = Some(query.to_value());
207        }
208        self
209    }
210
211    /// Attach match highlighting (see [`Highlight`]).
212    #[must_use]
213    pub fn highlight(mut self, highlight: Highlight) -> Self {
214        self.highlight = Some(highlight);
215        self
216    }
217
218    /// Offset of the first hit to return.
219    #[must_use]
220    pub fn from(mut self, from: u64) -> Self {
221        self.from = Some(from);
222        self
223    }
224
225    /// Maximum number of hits to return.
226    #[must_use]
227    pub fn size(mut self, size: u64) -> Self {
228        self.size = Some(size);
229        self
230    }
231
232    /// Replace the query body with a raw OpenSearch query DSL value. The
233    /// pressure-release valve for anything the typed builder can't express;
234    /// results still deserialize into `T`.
235    #[must_use]
236    pub fn raw(mut self, query: Value) -> Self {
237        self.raw = Some(query);
238        self
239    }
240
241    /// Shape a nested array in the results (built via `Nested::matching` /
242    /// `Nested::project`). Each hit's `source.<path>` is replaced with the
243    /// matching subset; this does **not** change which parents match.
244    #[must_use]
245    pub fn filter_nested(mut self, projection: NestedProjection) -> Self {
246        self.nested.push(projection);
247        self
248    }
249
250    /// The accumulated query alone: the raw override, the bool clauses, or
251    /// `match_all` when nothing was added. Shared by [`body`](Self::body) and
252    /// [`count_body`](Self::count_body).
253    fn query_value(&self) -> Value {
254        match &self.raw {
255            Some(raw) => raw.clone(),
256            None if self.bool_query.is_empty() => crate::handles::match_all_value(),
257            None => self.bool_query.to_value(),
258        }
259    }
260
261    /// The request body this search will POST to `_search`. Pure — useful for
262    /// tests and debugging.
263    #[must_use]
264    pub fn body(&self) -> Value {
265        let query = self.query_value();
266
267        // `filter_nested` projections collect `inner_hits` without filtering
268        // parents: they sit in `should` of a bool whose `must` holds the real
269        // query, so (with `must` present) they're optional and only attach hits.
270        let query = if self.nested.is_empty() {
271            query
272        } else {
273            let mut bool_body = Map::new();
274            bool_body.insert("must".to_string(), Value::Array(vec![query]));
275            let shoulds = self.nested.iter().map(NestedProjection::to_value).collect();
276            bool_body.insert("should".to_string(), Value::Array(shoulds));
277            let mut outer = Map::new();
278            outer.insert("bool".to_string(), Value::Object(bool_body));
279            Value::Object(outer)
280        };
281
282        let mut root = Map::new();
283        root.insert("query".to_string(), query);
284        self.insert_page_params(&mut root);
285        self.insert_search_level(&mut root, true);
286        Value::Object(root)
287    }
288
289    /// Add the page-shaping keys (`sort` / `from` / `size`) to a request body.
290    /// Shared by [`body`](Self::body) and [`ids_body`](Self::ids_body).
291    fn insert_page_params(&self, root: &mut Map<String, Value>) {
292        if !self.sort.is_empty() {
293            let keys = self.sort.iter().map(Sort::to_value).collect();
294            root.insert("sort".to_string(), Value::Array(keys));
295        }
296        if let Some(from) = self.from {
297            root.insert("from".to_string(), Value::from(from));
298        }
299        if let Some(size) = self.size {
300            root.insert("size".to_string(), Value::from(size));
301        }
302    }
303
304    /// Add the search-level keys that shape *which* hits return and how the
305    /// total/scores are reported (`min_score`, `track_total_hits`,
306    /// `track_scores`, `search_after`, `collapse`, `post_filter`, and —
307    /// `with_highlight` only — `highlight`). Shared by [`body`](Self::body) and
308    /// [`ids_body`](Self::ids_body); `highlight` is skipped for ids (no source
309    /// to highlight). `_count` gets none of these.
310    fn insert_search_level(&self, root: &mut Map<String, Value>, with_highlight: bool) {
311        if let Some(min_score) = self.min_score {
312            root.insert("min_score".to_string(), Value::from(min_score));
313        }
314        if let Some(track) = &self.track_total_hits {
315            root.insert("track_total_hits".to_string(), track.clone());
316        }
317        if let Some(track) = self.track_scores {
318            root.insert("track_scores".to_string(), Value::Bool(track));
319        }
320        if let Some(values) = &self.search_after {
321            root.insert("search_after".to_string(), Value::Array(values.clone()));
322        }
323        if let Some(collapse) = &self.collapse {
324            root.insert("collapse".to_string(), collapse.clone());
325        }
326        if let Some(post_filter) = &self.post_filter {
327            root.insert("post_filter".to_string(), post_filter.clone());
328        }
329        if with_highlight && let Some(highlight) = &self.highlight {
330            root.insert("highlight".to_string(), highlight.to_value());
331        }
332    }
333
334    /// The request body [`count`](Self::count) will POST to `_count`: just the
335    /// query. Sort, `from`/`size`, and `filter_nested` projections are dropped —
336    /// `_count` accepts none of them, and none of them changes which documents
337    /// match. Pure — useful for tests and debugging.
338    #[must_use]
339    pub fn count_body(&self) -> Value {
340        let mut root = Map::new();
341        root.insert("query".to_string(), self.query_value());
342        Value::Object(root)
343    }
344
345    /// The request body [`ids`](Self::ids) will POST to `_search`: the query
346    /// plus sort and pagination, with `_source: false` so hits carry only
347    /// their `_id` and nothing is fetched from stored source. `filter_nested`
348    /// projections are dropped — they shape returned sources, and there are
349    /// none. Pure — useful for tests and debugging.
350    #[must_use]
351    pub fn ids_body(&self) -> Value {
352        let mut root = Map::new();
353        root.insert("query".to_string(), self.query_value());
354        self.insert_page_params(&mut root);
355        self.insert_search_level(&mut root, false);
356        root.insert("_source".to_string(), Value::Bool(false));
357        Value::Object(root)
358    }
359
360    /// Execute the search and return only the matching document ids (the root
361    /// primary keys, stringified by OpenSearch) — no sources are fetched, so
362    /// this is the cheap way to feed another lookup (e.g. load the rows from
363    /// Postgres). Sort, [`from`](Self::from), and [`size`](Self::size) apply
364    /// as in [`send`](Self::send); the page's ids are returned in order.
365    #[tracing::instrument(
366        name = "search.ids",
367        skip_all,
368        fields(index = %self.index, returned = tracing::field::Empty),
369        err,
370    )]
371    pub async fn ids(&self, client: &Client) -> Result<Vec<String>> {
372        let body = self.ids_body();
373        let response = client.search_at(&self.physical_index(), &body).await?;
374        let raw: RawIdsResponse = serde_json::from_value(response)?;
375        let ids: Vec<String> = raw.hits.hits.into_iter().map(|hit| hit.id).collect();
376        tracing::Span::current().record("returned", ids.len());
377        tracing::debug!(returned = ids.len(), "ids search completed");
378        Ok(ids)
379    }
380
381    /// The physical index this query addresses (`{index}_{hash}` — exactly
382    /// what the sink writes). Crate-internal: [`Client::msearch`] renders it
383    /// into each NDJSON header line.
384    pub(crate) fn physical_index(&self) -> String {
385        format!("{}_{}", self.index, self.hash)
386    }
387
388    /// The paths of the accumulated [`filter_nested`](Self::filter_nested)
389    /// projections, for post-processing a response with [`merge_inner_hits`].
390    pub(crate) fn nested_paths(&self) -> Vec<&str> {
391        self.nested.iter().map(NestedProjection::path).collect()
392    }
393
394    /// Execute the query as a count: how many documents match, without fetching
395    /// (or scoring) any hits — cheaper than [`send`](Self::send) when only the
396    /// total is needed. Sort, pagination, and nested projections are ignored
397    /// (see [`count_body`](Self::count_body)).
398    #[tracing::instrument(
399        name = "search.count",
400        skip_all,
401        fields(index = %self.index, count = tracing::field::Empty),
402        err,
403    )]
404    pub async fn count(&self, client: &Client) -> Result<u64> {
405        let body = self.count_body();
406        let response = client.count_at(&self.physical_index(), &body).await?;
407        let raw: RawCount = serde_json::from_value(response)?;
408        tracing::Span::current().record("count", raw.count);
409        tracing::debug!(count = raw.count, "count completed");
410        Ok(raw.count)
411    }
412}
413
414impl<T> Search<T>
415where
416    T: DeserializeOwned,
417{
418    /// Execute the search and decode the hits into `SearchResponse<T>`.
419    #[tracing::instrument(
420        name = "search.send",
421        skip_all,
422        fields(
423            index = %self.index,
424            from = ?self.from,
425            size = ?self.size,
426            total = tracing::field::Empty,
427            took_ms = tracing::field::Empty,
428        ),
429        err,
430    )]
431    pub async fn send(&self, client: &Client) -> Result<SearchResponse<T>> {
432        let body = self.body();
433        let mut response = client.search_at(&self.physical_index(), &body).await?;
434        let paths = self.nested_paths();
435        if !paths.is_empty() {
436            merge_inner_hits(&mut response, &paths);
437        }
438        let page = SearchResponse::from_value(response)?;
439        let span = tracing::Span::current();
440        span.record("total", page.total);
441        span.record("took_ms", page.took.as_millis() as u64);
442        tracing::debug!(
443            total = page.total,
444            hits = page.hits.len(),
445            "search completed"
446        );
447        Ok(page)
448    }
449}
450
451/// Replace each `paths` array in every hit's `_source` with that path's
452/// `inner_hits` subset, so the typed source carries the filtered nested array.
453pub(crate) fn merge_inner_hits(response: &mut Value, paths: &[&str]) {
454    let Some(hits) = response
455        .get_mut("hits")
456        .and_then(|hits| hits.get_mut("hits"))
457        .and_then(Value::as_array_mut)
458    else {
459        return;
460    };
461    for hit in hits {
462        let inner = match hit.get("inner_hits") {
463            Some(inner) => inner.clone(),
464            None => continue,
465        };
466        let Some(source) = hit.get_mut("_source").and_then(Value::as_object_mut) else {
467            continue;
468        };
469        for path in paths {
470            let subset: Vec<Value> = inner
471                .get(*path)
472                .and_then(|hit| hit.get("hits"))
473                .and_then(|hits| hits.get("hits"))
474                .and_then(Value::as_array)
475                .map(|hits| {
476                    hits.iter()
477                        .filter_map(|h| h.get("_source").cloned())
478                        .collect()
479                })
480                .unwrap_or_default();
481            source.insert((*path).to_string(), Value::Array(subset));
482        }
483    }
484}
485
486/// Match highlighting for a [`Search`] (the `highlight` block). Name the fields
487/// to highlight and tune the tags / fragments; pass it to
488/// [`Search::highlight`].
489#[derive(Debug, Clone, Default)]
490pub struct Highlight {
491    fields: Map<String, Value>,
492    opts: Map<String, Value>,
493}
494
495impl Highlight {
496    /// An empty highlight config — add fields with [`field`](Self::field).
497    #[must_use]
498    pub fn new() -> Self {
499        Self::default()
500    }
501
502    /// Highlight `field` with the default settings.
503    #[must_use]
504    pub fn field(mut self, field: impl Into<String>) -> Self {
505        self.fields.insert(field.into(), Value::Object(Map::new()));
506        self
507    }
508
509    /// Highlight `field` with explicit per-field settings (e.g. a custom
510    /// `fragment_size` / `matched_fields`).
511    #[must_use]
512    pub fn field_with(mut self, field: impl Into<String>, settings: Value) -> Self {
513        self.fields.insert(field.into(), settings);
514        self
515    }
516
517    /// Tags wrapping each highlighted snippet's start.
518    #[must_use]
519    pub fn pre_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
520        self.opts.insert(
521            "pre_tags".to_string(),
522            Value::Array(tags.into_iter().map(|t| Value::String(t.into())).collect()),
523        );
524        self
525    }
526
527    /// Tags wrapping each highlighted snippet's end.
528    #[must_use]
529    pub fn post_tags(mut self, tags: impl IntoIterator<Item = impl Into<String>>) -> Self {
530        self.opts.insert(
531            "post_tags".to_string(),
532            Value::Array(tags.into_iter().map(|t| Value::String(t.into())).collect()),
533        );
534        self
535    }
536
537    /// Character length of each highlighted fragment.
538    #[must_use]
539    pub fn fragment_size(mut self, fragment_size: u32) -> Self {
540        self.opts
541            .insert("fragment_size".to_string(), Value::from(fragment_size));
542        self
543    }
544
545    /// Maximum number of fragments returned per field.
546    #[must_use]
547    pub fn number_of_fragments(mut self, number_of_fragments: u32) -> Self {
548        self.opts.insert(
549            "number_of_fragments".to_string(),
550            Value::from(number_of_fragments),
551        );
552        self
553    }
554
555    /// Only highlight fields that the query matched (default `true`).
556    #[must_use]
557    pub fn require_field_match(mut self, require: bool) -> Self {
558        self.opts
559            .insert("require_field_match".to_string(), Value::Bool(require));
560        self
561    }
562
563    fn to_value(&self) -> Value {
564        let mut body = self.opts.clone();
565        body.insert("fields".to_string(), Value::Object(self.fields.clone()));
566        Value::Object(body)
567    }
568}
569
570/// A page of search results.
571#[derive(Debug)]
572pub struct SearchResponse<T> {
573    /// Total matches across the whole index, not the page size.
574    pub total: u64,
575    /// The top score in this page, if scored.
576    pub max_score: Option<f32>,
577    /// The hits in this page.
578    pub hits: Vec<Hit<T>>,
579    /// How long OpenSearch reported the query took.
580    pub took: Duration,
581}
582
583impl<T> SearchResponse<T>
584where
585    T: DeserializeOwned,
586{
587    /// Decode an OpenSearch `_search` response body into a typed page.
588    pub fn from_value(value: Value) -> Result<Self> {
589        let raw: RawResponse<T> = serde_json::from_value(value)?;
590        let hits = raw
591            .hits
592            .hits
593            .into_iter()
594            .map(|hit| Hit {
595                id: hit.id,
596                score: hit.score.unwrap_or(0.0),
597                source: hit.source,
598            })
599            .collect();
600        Ok(Self {
601            total: raw.hits.total.value,
602            max_score: raw.hits.max_score,
603            hits,
604            took: Duration::from_millis(raw.took),
605        })
606    }
607}
608
609/// One search hit: the typed document plus its envelope metadata.
610#[derive(Debug)]
611pub struct Hit<T> {
612    /// The document id (root primary key, stringified by OpenSearch).
613    pub id: String,
614    /// The relevance score (`0.0` when the query didn't score).
615    pub score: f32,
616    /// The fully-typed document.
617    pub source: T,
618}
619
620#[derive(Deserialize)]
621struct RawResponse<T> {
622    #[serde(default)]
623    took: u64,
624    hits: RawHits<T>,
625}
626
627#[derive(Deserialize)]
628struct RawHits<T> {
629    total: RawTotal,
630    #[serde(default)]
631    max_score: Option<f32>,
632    hits: Vec<RawHit<T>>,
633}
634
635#[derive(Deserialize)]
636struct RawTotal {
637    value: u64,
638}
639
640/// The `_count` response envelope (`{ "count": N, "_shards": … }`) — shared
641/// with the combined-search [`count`](crate::MultiSearch::count).
642#[derive(Deserialize)]
643pub(crate) struct RawCount {
644    pub(crate) count: u64,
645}
646
647/// A `_search` response read for its hit ids only (`_source: false`, so hits
648/// carry no source to decode).
649#[derive(Deserialize)]
650struct RawIdsResponse {
651    hits: RawIdsHits,
652}
653
654#[derive(Deserialize)]
655struct RawIdsHits {
656    hits: Vec<RawIdHit>,
657}
658
659#[derive(Deserialize)]
660struct RawIdHit {
661    #[serde(rename = "_id")]
662    id: String,
663}
664
665#[derive(Deserialize)]
666struct RawHit<T> {
667    #[serde(rename = "_id")]
668    id: String,
669    #[serde(rename = "_score", default)]
670    score: Option<f32>,
671    #[serde(rename = "_source")]
672    source: T,
673}