Skip to main content

flusso_query/
search.rs

1//! The [`Search`] builder and the typed [`SearchResponse`] / [`Hit`] results.
2
3use std::marker::PhantomData;
4use std::time::Duration;
5
6use serde::Deserialize;
7use serde::de::DeserializeOwned;
8use serde_json::{Map, Value};
9
10use crate::Client;
11use crate::error::Result;
12use crate::handles::{NestedProjection, Sort};
13use crate::query::{AsQuery, BoolBuilder, Root};
14
15/// A document type bound to a flusso-maintained index — the trait that
16/// `#[derive(FlussoDocument)]` implements.
17///
18/// The derive supplies [`INDEX`](Self::INDEX) and [`SCHEMA_HASH`](Self::SCHEMA_HASH)
19/// (the physical index is `{INDEX}_{SCHEMA_HASH}`, exactly what the OpenSearch
20/// sink writes); [`query`](Self::query) and [`get`](Self::get) are provided.
21/// `DeserializeOwned` is required so search hits and fetched documents decode.
22pub trait FlussoDocument: DeserializeOwned {
23    /// The logical index name this binding queries.
24    const INDEX: &'static str;
25
26    /// The schema hash this binding was generated from (the physical-index suffix).
27    const SCHEMA_HASH: &'static str;
28
29    /// The physical index this binding addresses — `{INDEX}_{SCHEMA_HASH}`,
30    /// exactly what the sink writes. Useful for logging, admin, and
31    /// hand-written [`FlussoMultiDocument`](crate::FlussoMultiDocument) impls
32    /// dispatching hits by their `_index`.
33    fn physical_index() -> String {
34        format!("{}_{}", Self::INDEX, Self::SCHEMA_HASH)
35    }
36
37    /// Start a typed query against this index. No client is involved: the
38    /// returned [`Search`] is a plain value — build it anywhere, store it,
39    /// clone it, and hand a [`Client`] to a terminal
40    /// ([`send`](Search::send) / [`ids`](Search::ids) / [`count`](Search::count))
41    /// when it's time to run.
42    fn query() -> Search<Self> {
43        Search::new(Self::INDEX, Self::SCHEMA_HASH)
44    }
45
46    /// Fetch one document by id; `None` when absent.
47    fn get(
48        client: &Client,
49        id: impl std::fmt::Display,
50    ) -> impl std::future::Future<Output = Result<Option<Self>>> {
51        client.get_one::<Self>(Self::INDEX, Self::SCHEMA_HASH, id)
52    }
53}
54
55/// A typed query against one index — a plain, client-free value.
56///
57/// Built from [`FlussoDocument::query`] (or `Search::new(index, hash)` by
58/// hand). Clauses accumulate into a bool query: `query`/`should` score,
59/// `filter`/`must_not` don't. Because no client (and no lifetime) is
60/// involved, a `Search` can be named, stored, cloned, and reused; a [`Client`]
61/// appears only at the terminals — [`Search::send`] for a page of hits,
62/// [`Search::ids`] for a page of bare document ids, [`Search::count`] for
63/// just the number of matches (all `&self`, so running consumes nothing) —
64/// or several searches go in one round-trip via [`Client::msearch`].
65#[derive(Debug, Clone)]
66pub struct Search<T> {
67    index: String,
68    hash: String,
69    bool_query: BoolBuilder,
70    raw: Option<Value>,
71    sort: Vec<Sort>,
72    from: Option<u64>,
73    size: Option<u64>,
74    nested: Vec<NestedProjection>,
75    _marker: PhantomData<fn() -> T>,
76}
77
78impl<T> Search<T> {
79    /// Start a query against `index` (the logical name) and its schema `hash`.
80    pub fn new(index: impl Into<String>, hash: impl Into<String>) -> Self {
81        Self {
82            index: index.into(),
83            hash: hash.into(),
84            bool_query: BoolBuilder::default(),
85            raw: None,
86            sort: Vec::new(),
87            from: None,
88            size: None,
89            nested: Vec::new(),
90            _marker: PhantomData,
91        }
92    }
93
94    /// A scoring clause (`bool.must`). Accepts any root-scope [`AsQuery`]; an
95    /// absent one (e.g. a `None` optional) adds nothing.
96    #[must_use]
97    pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
98        if let Some(query) = query.into_query() {
99            self.bool_query.push_must(query.into_inner());
100        }
101        self
102    }
103
104    /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
105    /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
106    #[must_use]
107    pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
108        if let Some(query) = query.into_query() {
109            self.bool_query.push_filter(query.into_inner());
110        }
111        self
112    }
113
114    /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
115    #[must_use]
116    pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
117        if let Some(query) = query.into_query() {
118            self.bool_query.push_must_not(query.into_inner());
119        }
120        self
121    }
122
123    /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
124    #[must_use]
125    pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
126        if let Some(query) = query.into_query() {
127            self.bool_query.push_should(query.into_inner());
128        }
129        self
130    }
131
132    #[must_use]
133    pub fn sort(mut self, sort: Sort) -> Self {
134        self.sort.push(sort);
135        self
136    }
137
138    /// Offset of the first hit to return.
139    #[must_use]
140    pub fn from(mut self, from: u64) -> Self {
141        self.from = Some(from);
142        self
143    }
144
145    /// Maximum number of hits to return.
146    #[must_use]
147    pub fn size(mut self, size: u64) -> Self {
148        self.size = Some(size);
149        self
150    }
151
152    /// Replace the query body with a raw OpenSearch query DSL value. The
153    /// pressure-release valve for anything the typed builder can't express;
154    /// results still deserialize into `T`.
155    #[must_use]
156    pub fn raw(mut self, query: Value) -> Self {
157        self.raw = Some(query);
158        self
159    }
160
161    /// Shape a nested array in the results (built via `Nested::matching` /
162    /// `Nested::project`). Each hit's `source.<path>` is replaced with the
163    /// matching subset; this does **not** change which parents match.
164    #[must_use]
165    pub fn filter_nested(mut self, projection: NestedProjection) -> Self {
166        self.nested.push(projection);
167        self
168    }
169
170    /// The accumulated query alone: the raw override, the bool clauses, or
171    /// `match_all` when nothing was added. Shared by [`body`](Self::body) and
172    /// [`count_body`](Self::count_body).
173    fn query_value(&self) -> Value {
174        match &self.raw {
175            Some(raw) => raw.clone(),
176            None if self.bool_query.is_empty() => crate::handles::match_all_value(),
177            None => self.bool_query.to_value(),
178        }
179    }
180
181    /// The request body this search will POST to `_search`. Pure — useful for
182    /// tests and debugging.
183    #[must_use]
184    pub fn body(&self) -> Value {
185        let query = self.query_value();
186
187        // `filter_nested` projections collect `inner_hits` without filtering
188        // parents: they sit in `should` of a bool whose `must` holds the real
189        // query, so (with `must` present) they're optional and only attach hits.
190        let query = if self.nested.is_empty() {
191            query
192        } else {
193            let mut bool_body = Map::new();
194            bool_body.insert("must".to_string(), Value::Array(vec![query]));
195            let shoulds = self.nested.iter().map(NestedProjection::to_value).collect();
196            bool_body.insert("should".to_string(), Value::Array(shoulds));
197            let mut outer = Map::new();
198            outer.insert("bool".to_string(), Value::Object(bool_body));
199            Value::Object(outer)
200        };
201
202        let mut root = Map::new();
203        root.insert("query".to_string(), query);
204        self.insert_page_params(&mut root);
205        Value::Object(root)
206    }
207
208    /// Add the page-shaping keys (`sort` / `from` / `size`) to a request body.
209    /// Shared by [`body`](Self::body) and [`ids_body`](Self::ids_body).
210    fn insert_page_params(&self, root: &mut Map<String, Value>) {
211        if !self.sort.is_empty() {
212            let keys = self.sort.iter().map(Sort::to_value).collect();
213            root.insert("sort".to_string(), Value::Array(keys));
214        }
215        if let Some(from) = self.from {
216            root.insert("from".to_string(), Value::from(from));
217        }
218        if let Some(size) = self.size {
219            root.insert("size".to_string(), Value::from(size));
220        }
221    }
222
223    /// The request body [`count`](Self::count) will POST to `_count`: just the
224    /// query. Sort, `from`/`size`, and `filter_nested` projections are dropped —
225    /// `_count` accepts none of them, and none of them changes which documents
226    /// match. Pure — useful for tests and debugging.
227    #[must_use]
228    pub fn count_body(&self) -> Value {
229        let mut root = Map::new();
230        root.insert("query".to_string(), self.query_value());
231        Value::Object(root)
232    }
233
234    /// The request body [`ids`](Self::ids) will POST to `_search`: the query
235    /// plus sort and pagination, with `_source: false` so hits carry only
236    /// their `_id` and nothing is fetched from stored source. `filter_nested`
237    /// projections are dropped — they shape returned sources, and there are
238    /// none. Pure — useful for tests and debugging.
239    #[must_use]
240    pub fn ids_body(&self) -> Value {
241        let mut root = Map::new();
242        root.insert("query".to_string(), self.query_value());
243        self.insert_page_params(&mut root);
244        root.insert("_source".to_string(), Value::Bool(false));
245        Value::Object(root)
246    }
247
248    /// Execute the search and return only the matching document ids (the root
249    /// primary keys, stringified by OpenSearch) — no sources are fetched, so
250    /// this is the cheap way to feed another lookup (e.g. load the rows from
251    /// Postgres). Sort, [`from`](Self::from), and [`size`](Self::size) apply
252    /// as in [`send`](Self::send); the page's ids are returned in order.
253    #[tracing::instrument(
254        name = "search.ids",
255        skip_all,
256        fields(index = %self.index, returned = tracing::field::Empty),
257        err,
258    )]
259    pub async fn ids(&self, client: &Client) -> Result<Vec<String>> {
260        let body = self.ids_body();
261        let response = client.search_at(&self.physical_index(), &body).await?;
262        let raw: RawIdsResponse = serde_json::from_value(response)?;
263        let ids: Vec<String> = raw.hits.hits.into_iter().map(|hit| hit.id).collect();
264        tracing::Span::current().record("returned", ids.len());
265        tracing::debug!(returned = ids.len(), "ids search completed");
266        Ok(ids)
267    }
268
269    /// The physical index this query addresses (`{index}_{hash}` — exactly
270    /// what the sink writes). Crate-internal: [`Client::msearch`] renders it
271    /// into each NDJSON header line.
272    pub(crate) fn physical_index(&self) -> String {
273        format!("{}_{}", self.index, self.hash)
274    }
275
276    /// The paths of the accumulated [`filter_nested`](Self::filter_nested)
277    /// projections, for post-processing a response with [`merge_inner_hits`].
278    pub(crate) fn nested_paths(&self) -> Vec<&str> {
279        self.nested.iter().map(NestedProjection::path).collect()
280    }
281
282    /// Execute the query as a count: how many documents match, without fetching
283    /// (or scoring) any hits — cheaper than [`send`](Self::send) when only the
284    /// total is needed. Sort, pagination, and nested projections are ignored
285    /// (see [`count_body`](Self::count_body)).
286    #[tracing::instrument(
287        name = "search.count",
288        skip_all,
289        fields(index = %self.index, count = tracing::field::Empty),
290        err,
291    )]
292    pub async fn count(&self, client: &Client) -> Result<u64> {
293        let body = self.count_body();
294        let response = client.count_at(&self.physical_index(), &body).await?;
295        let raw: RawCount = serde_json::from_value(response)?;
296        tracing::Span::current().record("count", raw.count);
297        tracing::debug!(count = raw.count, "count completed");
298        Ok(raw.count)
299    }
300}
301
302impl<T> Search<T>
303where
304    T: DeserializeOwned,
305{
306    /// Execute the search and decode the hits into `SearchResponse<T>`.
307    #[tracing::instrument(
308        name = "search.send",
309        skip_all,
310        fields(
311            index = %self.index,
312            from = ?self.from,
313            size = ?self.size,
314            total = tracing::field::Empty,
315            took_ms = tracing::field::Empty,
316        ),
317        err,
318    )]
319    pub async fn send(&self, client: &Client) -> Result<SearchResponse<T>> {
320        let body = self.body();
321        let mut response = client.search_at(&self.physical_index(), &body).await?;
322        let paths = self.nested_paths();
323        if !paths.is_empty() {
324            merge_inner_hits(&mut response, &paths);
325        }
326        let page = SearchResponse::from_value(response)?;
327        let span = tracing::Span::current();
328        span.record("total", page.total);
329        span.record("took_ms", page.took.as_millis() as u64);
330        tracing::debug!(
331            total = page.total,
332            hits = page.hits.len(),
333            "search completed"
334        );
335        Ok(page)
336    }
337}
338
339/// Replace each `paths` array in every hit's `_source` with that path's
340/// `inner_hits` subset, so the typed source carries the filtered nested array.
341pub(crate) fn merge_inner_hits(response: &mut Value, paths: &[&str]) {
342    let Some(hits) = response
343        .get_mut("hits")
344        .and_then(|hits| hits.get_mut("hits"))
345        .and_then(Value::as_array_mut)
346    else {
347        return;
348    };
349    for hit in hits {
350        let inner = match hit.get("inner_hits") {
351            Some(inner) => inner.clone(),
352            None => continue,
353        };
354        let Some(source) = hit.get_mut("_source").and_then(Value::as_object_mut) else {
355            continue;
356        };
357        for path in paths {
358            let subset: Vec<Value> = inner
359                .get(*path)
360                .and_then(|hit| hit.get("hits"))
361                .and_then(|hits| hits.get("hits"))
362                .and_then(Value::as_array)
363                .map(|hits| {
364                    hits.iter()
365                        .filter_map(|h| h.get("_source").cloned())
366                        .collect()
367                })
368                .unwrap_or_default();
369            source.insert((*path).to_string(), Value::Array(subset));
370        }
371    }
372}
373
374/// A page of search results.
375#[derive(Debug)]
376pub struct SearchResponse<T> {
377    /// Total matches across the whole index, not the page size.
378    pub total: u64,
379    /// The top score in this page, if scored.
380    pub max_score: Option<f32>,
381    /// The hits in this page.
382    pub hits: Vec<Hit<T>>,
383    /// How long OpenSearch reported the query took.
384    pub took: Duration,
385}
386
387impl<T> SearchResponse<T>
388where
389    T: DeserializeOwned,
390{
391    /// Decode an OpenSearch `_search` response body into a typed page.
392    pub fn from_value(value: Value) -> Result<Self> {
393        let raw: RawResponse<T> = serde_json::from_value(value)?;
394        let hits = raw
395            .hits
396            .hits
397            .into_iter()
398            .map(|hit| Hit {
399                id: hit.id,
400                score: hit.score.unwrap_or(0.0),
401                source: hit.source,
402            })
403            .collect();
404        Ok(Self {
405            total: raw.hits.total.value,
406            max_score: raw.hits.max_score,
407            hits,
408            took: Duration::from_millis(raw.took),
409        })
410    }
411}
412
413/// One search hit: the typed document plus its envelope metadata.
414#[derive(Debug)]
415pub struct Hit<T> {
416    /// The document id (root primary key, stringified by OpenSearch).
417    pub id: String,
418    /// The relevance score (`0.0` when the query didn't score).
419    pub score: f32,
420    /// The fully-typed document.
421    pub source: T,
422}
423
424#[derive(Deserialize)]
425struct RawResponse<T> {
426    #[serde(default)]
427    took: u64,
428    hits: RawHits<T>,
429}
430
431#[derive(Deserialize)]
432struct RawHits<T> {
433    total: RawTotal,
434    #[serde(default)]
435    max_score: Option<f32>,
436    hits: Vec<RawHit<T>>,
437}
438
439#[derive(Deserialize)]
440struct RawTotal {
441    value: u64,
442}
443
444/// The `_count` response envelope (`{ "count": N, "_shards": … }`) — shared
445/// with the combined-search [`count`](crate::MultiSearch::count).
446#[derive(Deserialize)]
447pub(crate) struct RawCount {
448    pub(crate) count: u64,
449}
450
451/// A `_search` response read for its hit ids only (`_source: false`, so hits
452/// carry no source to decode).
453#[derive(Deserialize)]
454struct RawIdsResponse {
455    hits: RawIdsHits,
456}
457
458#[derive(Deserialize)]
459struct RawIdsHits {
460    hits: Vec<RawIdHit>,
461}
462
463#[derive(Deserialize)]
464struct RawIdHit {
465    #[serde(rename = "_id")]
466    id: String,
467}
468
469#[derive(Deserialize)]
470struct RawHit<T> {
471    #[serde(rename = "_id")]
472    id: String,
473    #[serde(rename = "_score", default)]
474    score: Option<f32>,
475    #[serde(rename = "_source")]
476    source: T,
477}