Skip to main content

flusso_query/
search.rs

1//! The [`Search`] builder and the typed [`SearchResponse`] / [`Hit`] results.
2
3use std::marker::PhantomData;
4use std::time::Duration;
5
6use serde::Deserialize;
7use serde::de::DeserializeOwned;
8use serde_json::{Map, Value};
9
10use crate::Client;
11use crate::error::Result;
12use crate::handles::{NestedProjection, Sort};
13use crate::query::{AsQuery, BoolBuilder, Root};
14
15/// A document type bound to a flusso-maintained index — the trait that
16/// `#[derive(FlussoDocument)]` implements.
17///
18/// The derive supplies [`INDEX`](Self::INDEX) and [`SCHEMA_HASH`](Self::SCHEMA_HASH)
19/// (the physical index is `{INDEX}_{SCHEMA_HASH}`, exactly what the OpenSearch
20/// sink writes); [`query`](Self::query) and [`get`](Self::get) are provided.
21/// `DeserializeOwned` is required so search hits and fetched documents decode.
22pub trait FlussoDocument: DeserializeOwned {
23    /// The logical index name this binding queries.
24    const INDEX: &'static str;
25
26    /// The schema hash this binding was generated from (the physical-index suffix).
27    const SCHEMA_HASH: &'static str;
28
29    /// The physical index this binding addresses — `{INDEX}_{SCHEMA_HASH}`,
30    /// exactly what the sink writes. Useful for logging, admin, and
31    /// hand-written [`FlussoMultiDocument`](crate::FlussoMultiDocument) impls
32    /// dispatching hits by their `_index`.
33    fn physical_index() -> String {
34        format!("{}_{}", Self::INDEX, Self::SCHEMA_HASH)
35    }
36
37    /// Start a typed query against this index. No client is involved: the
38    /// returned [`Search`] is a plain value — build it anywhere, store it,
39    /// clone it, and hand a [`Client`] to a terminal
40    /// ([`send`](Search::send) / [`ids`](Search::ids) / [`count`](Search::count))
41    /// when it's time to run.
42    fn query() -> Search<Self> {
43        Search::new(Self::INDEX, Self::SCHEMA_HASH)
44    }
45
46    /// Fetch one document by id; `None` when absent.
47    fn get(
48        client: &Client,
49        id: impl std::fmt::Display,
50    ) -> impl std::future::Future<Output = Result<Option<Self>>> {
51        client.get_one::<Self>(Self::INDEX, Self::SCHEMA_HASH, id)
52    }
53}
54
55/// A typed query against one index — a plain, client-free value.
56///
57/// Built from [`FlussoDocument::query`] (or `Search::new(index, hash)` by
58/// hand). Clauses accumulate into a bool query: `query`/`should` score,
59/// `filter`/`must_not` don't. Because no client (and no lifetime) is
60/// involved, a `Search` can be named, stored, cloned, and reused; a [`Client`]
61/// appears only at the terminals — [`Search::send`] for a page of hits,
62/// [`Search::ids`] for a page of bare document ids, [`Search::count`] for
63/// just the number of matches (all `&self`, so running consumes nothing) —
64/// or several searches go in one round-trip via [`Client::msearch`].
65#[derive(Debug, Clone)]
66pub struct Search<T> {
67    index: String,
68    hash: String,
69    bool_query: BoolBuilder,
70    raw: Option<Value>,
71    sort: Vec<Sort>,
72    from: Option<u64>,
73    size: Option<u64>,
74    nested: Vec<NestedProjection>,
75    _marker: PhantomData<fn() -> T>,
76}
77
78impl<T> Search<T> {
79    /// Start a query against `index` (the logical name) and its schema `hash`.
80    pub fn new(index: impl Into<String>, hash: impl Into<String>) -> Self {
81        Self {
82            index: index.into(),
83            hash: hash.into(),
84            bool_query: BoolBuilder::default(),
85            raw: None,
86            sort: Vec::new(),
87            from: None,
88            size: None,
89            nested: Vec::new(),
90            _marker: PhantomData,
91        }
92    }
93
94    /// A scoring clause (`bool.must`). Accepts any root-scope [`AsQuery`]; an
95    /// absent one (e.g. a `None` optional) adds nothing.
96    #[must_use]
97    pub fn query(mut self, query: impl AsQuery<Root>) -> Self {
98        if let Some(query) = query.into_query() {
99            self.bool_query.push_must(query.into_inner());
100        }
101        self
102    }
103
104    /// A non-scoring, cacheable clause (`bool.filter`). An absent clause adds
105    /// nothing — so `filter(opt.map(|v| handle.eq(v)))` is a conditional filter.
106    #[must_use]
107    pub fn filter(mut self, query: impl AsQuery<Root>) -> Self {
108        if let Some(query) = query.into_query() {
109            self.bool_query.push_filter(query.into_inner());
110        }
111        self
112    }
113
114    /// An exclusion clause (`bool.must_not`). An absent clause excludes nothing.
115    #[must_use]
116    pub fn must_not(mut self, query: impl AsQuery<Root>) -> Self {
117        if let Some(query) = query.into_query() {
118            self.bool_query.push_must_not(query.into_inner());
119        }
120        self
121    }
122
123    /// An optional, scoring clause (`bool.should`). An absent clause adds nothing.
124    #[must_use]
125    pub fn should(mut self, query: impl AsQuery<Root>) -> Self {
126        if let Some(query) = query.into_query() {
127            self.bool_query.push_should(query.into_inner());
128        }
129        self
130    }
131
132    /// Append a sort key.
133    #[must_use]
134    pub fn sort(mut self, sort: Sort) -> Self {
135        self.sort.push(sort);
136        self
137    }
138
139    /// Offset of the first hit to return.
140    #[must_use]
141    pub fn from(mut self, from: u64) -> Self {
142        self.from = Some(from);
143        self
144    }
145
146    /// Maximum number of hits to return.
147    #[must_use]
148    pub fn size(mut self, size: u64) -> Self {
149        self.size = Some(size);
150        self
151    }
152
153    /// Replace the query body with a raw OpenSearch query DSL value. The
154    /// pressure-release valve for anything the typed builder can't express;
155    /// results still deserialize into `T`.
156    #[must_use]
157    pub fn raw(mut self, query: Value) -> Self {
158        self.raw = Some(query);
159        self
160    }
161
162    /// Shape a nested array in the results (built via `Nested::matching` /
163    /// `Nested::project`). Each hit's `source.<path>` is replaced with the
164    /// matching subset; this does **not** change which parents match.
165    #[must_use]
166    pub fn filter_nested(mut self, projection: NestedProjection) -> Self {
167        self.nested.push(projection);
168        self
169    }
170
171    /// The accumulated query alone: the raw override, the bool clauses, or
172    /// `match_all` when nothing was added. Shared by [`body`](Self::body) and
173    /// [`count_body`](Self::count_body).
174    fn query_value(&self) -> Value {
175        match &self.raw {
176            Some(raw) => raw.clone(),
177            None if self.bool_query.is_empty() => crate::handles::match_all_value(),
178            None => self.bool_query.to_value(),
179        }
180    }
181
182    /// The request body this search will POST to `_search`. Pure — useful for
183    /// tests and debugging.
184    #[must_use]
185    pub fn body(&self) -> Value {
186        let query = self.query_value();
187
188        // `filter_nested` projections collect `inner_hits` without filtering
189        // parents: they sit in `should` of a bool whose `must` holds the real
190        // query, so (with `must` present) they're optional and only attach hits.
191        let query = if self.nested.is_empty() {
192            query
193        } else {
194            let mut bool_body = Map::new();
195            bool_body.insert("must".to_string(), Value::Array(vec![query]));
196            let shoulds = self.nested.iter().map(NestedProjection::to_value).collect();
197            bool_body.insert("should".to_string(), Value::Array(shoulds));
198            let mut outer = Map::new();
199            outer.insert("bool".to_string(), Value::Object(bool_body));
200            Value::Object(outer)
201        };
202
203        let mut root = Map::new();
204        root.insert("query".to_string(), query);
205        self.insert_page_params(&mut root);
206        Value::Object(root)
207    }
208
209    /// Add the page-shaping keys (`sort` / `from` / `size`) to a request body.
210    /// Shared by [`body`](Self::body) and [`ids_body`](Self::ids_body).
211    fn insert_page_params(&self, root: &mut Map<String, Value>) {
212        if !self.sort.is_empty() {
213            let keys = self.sort.iter().map(Sort::to_value).collect();
214            root.insert("sort".to_string(), Value::Array(keys));
215        }
216        if let Some(from) = self.from {
217            root.insert("from".to_string(), Value::from(from));
218        }
219        if let Some(size) = self.size {
220            root.insert("size".to_string(), Value::from(size));
221        }
222    }
223
224    /// The request body [`count`](Self::count) will POST to `_count`: just the
225    /// query. Sort, `from`/`size`, and `filter_nested` projections are dropped —
226    /// `_count` accepts none of them, and none of them changes which documents
227    /// match. Pure — useful for tests and debugging.
228    #[must_use]
229    pub fn count_body(&self) -> Value {
230        let mut root = Map::new();
231        root.insert("query".to_string(), self.query_value());
232        Value::Object(root)
233    }
234
235    /// The request body [`ids`](Self::ids) will POST to `_search`: the query
236    /// plus sort and pagination, with `_source: false` so hits carry only
237    /// their `_id` and nothing is fetched from stored source. `filter_nested`
238    /// projections are dropped — they shape returned sources, and there are
239    /// none. Pure — useful for tests and debugging.
240    #[must_use]
241    pub fn ids_body(&self) -> Value {
242        let mut root = Map::new();
243        root.insert("query".to_string(), self.query_value());
244        self.insert_page_params(&mut root);
245        root.insert("_source".to_string(), Value::Bool(false));
246        Value::Object(root)
247    }
248
249    /// Execute the search and return only the matching document ids (the root
250    /// primary keys, stringified by OpenSearch) — no sources are fetched, so
251    /// this is the cheap way to feed another lookup (e.g. load the rows from
252    /// Postgres). Sort, [`from`](Self::from), and [`size`](Self::size) apply
253    /// as in [`send`](Self::send); the page's ids are returned in order.
254    #[tracing::instrument(
255        name = "search.ids",
256        skip_all,
257        fields(index = %self.index, returned = tracing::field::Empty),
258        err,
259    )]
260    pub async fn ids(&self, client: &Client) -> Result<Vec<String>> {
261        let body = self.ids_body();
262        let response = client.search_at(&self.physical_index(), &body).await?;
263        let raw: RawIdsResponse = serde_json::from_value(response)?;
264        let ids: Vec<String> = raw.hits.hits.into_iter().map(|hit| hit.id).collect();
265        tracing::Span::current().record("returned", ids.len());
266        tracing::debug!(returned = ids.len(), "ids search completed");
267        Ok(ids)
268    }
269
270    /// The physical index this query addresses (`{index}_{hash}` — exactly
271    /// what the sink writes). Crate-internal: [`Client::msearch`] renders it
272    /// into each NDJSON header line.
273    pub(crate) fn physical_index(&self) -> String {
274        format!("{}_{}", self.index, self.hash)
275    }
276
277    /// The paths of the accumulated [`filter_nested`](Self::filter_nested)
278    /// projections, for post-processing a response with [`merge_inner_hits`].
279    pub(crate) fn nested_paths(&self) -> Vec<&str> {
280        self.nested.iter().map(NestedProjection::path).collect()
281    }
282
283    /// Execute the query as a count: how many documents match, without fetching
284    /// (or scoring) any hits — cheaper than [`send`](Self::send) when only the
285    /// total is needed. Sort, pagination, and nested projections are ignored
286    /// (see [`count_body`](Self::count_body)).
287    #[tracing::instrument(
288        name = "search.count",
289        skip_all,
290        fields(index = %self.index, count = tracing::field::Empty),
291        err,
292    )]
293    pub async fn count(&self, client: &Client) -> Result<u64> {
294        let body = self.count_body();
295        let response = client.count_at(&self.physical_index(), &body).await?;
296        let raw: RawCount = serde_json::from_value(response)?;
297        tracing::Span::current().record("count", raw.count);
298        tracing::debug!(count = raw.count, "count completed");
299        Ok(raw.count)
300    }
301}
302
303impl<T> Search<T>
304where
305    T: DeserializeOwned,
306{
307    /// Execute the search and decode the hits into `SearchResponse<T>`.
308    #[tracing::instrument(
309        name = "search.send",
310        skip_all,
311        fields(
312            index = %self.index,
313            from = ?self.from,
314            size = ?self.size,
315            total = tracing::field::Empty,
316            took_ms = tracing::field::Empty,
317        ),
318        err,
319    )]
320    pub async fn send(&self, client: &Client) -> Result<SearchResponse<T>> {
321        let body = self.body();
322        let mut response = client.search_at(&self.physical_index(), &body).await?;
323        let paths = self.nested_paths();
324        if !paths.is_empty() {
325            merge_inner_hits(&mut response, &paths);
326        }
327        let page = SearchResponse::from_value(response)?;
328        let span = tracing::Span::current();
329        span.record("total", page.total);
330        span.record("took_ms", page.took.as_millis() as u64);
331        tracing::debug!(
332            total = page.total,
333            hits = page.hits.len(),
334            "search completed"
335        );
336        Ok(page)
337    }
338}
339
340/// Replace each `paths` array in every hit's `_source` with that path's
341/// `inner_hits` subset, so the typed source carries the filtered nested array.
342pub(crate) fn merge_inner_hits(response: &mut Value, paths: &[&str]) {
343    let Some(hits) = response
344        .get_mut("hits")
345        .and_then(|hits| hits.get_mut("hits"))
346        .and_then(Value::as_array_mut)
347    else {
348        return;
349    };
350    for hit in hits {
351        let inner = match hit.get("inner_hits") {
352            Some(inner) => inner.clone(),
353            None => continue,
354        };
355        let Some(source) = hit.get_mut("_source").and_then(Value::as_object_mut) else {
356            continue;
357        };
358        for path in paths {
359            let subset: Vec<Value> = inner
360                .get(*path)
361                .and_then(|hit| hit.get("hits"))
362                .and_then(|hits| hits.get("hits"))
363                .and_then(Value::as_array)
364                .map(|hits| {
365                    hits.iter()
366                        .filter_map(|h| h.get("_source").cloned())
367                        .collect()
368                })
369                .unwrap_or_default();
370            source.insert((*path).to_string(), Value::Array(subset));
371        }
372    }
373}
374
375/// A page of search results.
376#[derive(Debug)]
377pub struct SearchResponse<T> {
378    /// Total matches across the whole index, not the page size.
379    pub total: u64,
380    /// The top score in this page, if scored.
381    pub max_score: Option<f32>,
382    /// The hits in this page.
383    pub hits: Vec<Hit<T>>,
384    /// How long OpenSearch reported the query took.
385    pub took: Duration,
386}
387
388impl<T> SearchResponse<T>
389where
390    T: DeserializeOwned,
391{
392    /// Decode an OpenSearch `_search` response body into a typed page.
393    pub fn from_value(value: Value) -> Result<Self> {
394        let raw: RawResponse<T> = serde_json::from_value(value)?;
395        let hits = raw
396            .hits
397            .hits
398            .into_iter()
399            .map(|hit| Hit {
400                id: hit.id,
401                score: hit.score.unwrap_or(0.0),
402                source: hit.source,
403            })
404            .collect();
405        Ok(Self {
406            total: raw.hits.total.value,
407            max_score: raw.hits.max_score,
408            hits,
409            took: Duration::from_millis(raw.took),
410        })
411    }
412}
413
414/// One search hit: the typed document plus its envelope metadata.
415#[derive(Debug)]
416pub struct Hit<T> {
417    /// The document id (root primary key, stringified by OpenSearch).
418    pub id: String,
419    /// The relevance score (`0.0` when the query didn't score).
420    pub score: f32,
421    /// The fully-typed document.
422    pub source: T,
423}
424
425// ---- wire types ------------------------------------------------------------
426
427#[derive(Deserialize)]
428struct RawResponse<T> {
429    #[serde(default)]
430    took: u64,
431    hits: RawHits<T>,
432}
433
434#[derive(Deserialize)]
435struct RawHits<T> {
436    total: RawTotal,
437    #[serde(default)]
438    max_score: Option<f32>,
439    hits: Vec<RawHit<T>>,
440}
441
442#[derive(Deserialize)]
443struct RawTotal {
444    value: u64,
445}
446
447/// The `_count` response envelope (`{ "count": N, "_shards": … }`) — shared
448/// with the combined-search [`count`](crate::MultiSearch::count).
449#[derive(Deserialize)]
450pub(crate) struct RawCount {
451    pub(crate) count: u64,
452}
453
454/// A `_search` response read for its hit ids only (`_source: false`, so hits
455/// carry no source to decode).
456#[derive(Deserialize)]
457struct RawIdsResponse {
458    hits: RawIdsHits,
459}
460
461#[derive(Deserialize)]
462struct RawIdsHits {
463    hits: Vec<RawIdHit>,
464}
465
466#[derive(Deserialize)]
467struct RawIdHit {
468    #[serde(rename = "_id")]
469    id: String,
470}
471
472#[derive(Deserialize)]
473struct RawHit<T> {
474    #[serde(rename = "_id")]
475    id: String,
476    #[serde(rename = "_score", default)]
477    score: Option<f32>,
478    #[serde(rename = "_source")]
479    source: T,
480}