Skip to main content

helios_persistence/backends/postgres/
search_impl.rs

1//! Search implementation for PostgreSQL backend.
2//!
3//! This module provides search functionality for the PostgreSQL backend including:
4//! - Basic single-type search
5//! - Multi-type search
6//! - _include and _revinclude support
7//! - Chained search parameter support
8//! - Full-text search using tsvector/tsquery
9
10use std::collections::HashSet;
11
12use async_trait::async_trait;
13use chrono::Utc;
14use helios_fhir::FhirVersion;
15
16use crate::core::{
17    ChainedSearchProvider, IncludeProvider, MultiTypeSearchProvider, ResourceStorage,
18    RevincludeProvider, SearchProvider, SearchResult, TextSearchProvider,
19};
20use crate::error::{BackendError, StorageError, StorageResult};
21use crate::tenant::TenantContext;
22use crate::types::{
23    CursorDirection, CursorValue, IncludeDirective, Page, PageCursor, PageInfo, Pagination,
24    ReverseChainedParameter, SearchQuery, StoredResource,
25};
26
27use super::PostgresBackend;
28use super::search::chain_builder::ChainQueryBuilder;
29use super::search::query_builder::{PostgresQueryBuilder, SortValueKind, SqlParam};
30
31fn internal_error(message: String) -> StorageError {
32    StorageError::Backend(BackendError::Internal {
33        backend_name: "postgres".to_string(),
34        message,
35        source: None,
36    })
37}
38
39#[async_trait]
40impl SearchProvider for PostgresBackend {
41    async fn search(
42        &self,
43        tenant: &TenantContext,
44        query: &SearchQuery,
45    ) -> StorageResult<SearchResult> {
46        // `_contained` search uses a dedicated path (different index columns and
47        // heterogeneous result types); standard search handles `_contained=false`.
48        if query.contained != crate::types::ContainedMode::Off {
49            return self.search_contained(tenant, query).await;
50        }
51
52        // Populate Bundle.total only when the client asked for it
53        // (`_total=accurate|estimate`). Computed up-front so the count query's
54        // client is not held across the main query's await points.
55        let total = if query.wants_total() {
56            Some(self.search_count(tenant, query).await?)
57        } else {
58            None
59        };
60
61        let client = self.get_client().await?;
62        let tenant_id = tenant.tenant_id().as_str();
63        let resource_type = &query.resource_type;
64
65        // Get count with default
66        let count = query.count.unwrap_or(100) as usize;
67
68        // Keyset key for cursor pagination. `None` for multi-field sorts, which
69        // are returned as a single page rather than paged with an inconsistent
70        // keyset.
71        let keyset = PostgresQueryBuilder::primary_keyset_key(query);
72
73        // Only honor an inbound cursor when we can build a keyset comparison.
74        let cursor = if keyset.is_some() {
75            query
76                .cursor
77                .as_ref()
78                .and_then(|c| PageCursor::decode(c).ok())
79        } else {
80            None
81        };
82
83        // Param layout: $1=tenant, $2=type, then (cursor) $3=sort value, $4=id,
84        // then the search-filter params.
85        let param_offset = if cursor.is_some() { 4 } else { 2 };
86
87        let search_filter = if !query.parameters.is_empty() || query.compartment.is_some() {
88            PostgresQueryBuilder::build_search_query(query, param_offset)
89        } else {
90            None
91        };
92        let filter_clause = search_filter
93            .as_ref()
94            .map(|f| format!(" AND ({})", f.sql))
95            .unwrap_or_default();
96        let search_params = search_filter.map(|f| f.params).unwrap_or_default();
97
98        // SELECT the sort key alongside the row so the next cursor can be built.
99        let select_cols = match &keyset {
100            Some(k) => format!(
101                "id, version_id, data, last_updated, fhir_version, {} AS sort_key",
102                k.expr
103            ),
104            None => "id, version_id, data, last_updated, fhir_version".to_string(),
105        };
106
107        // ORDER BY for the first-page / offset paths.
108        let order_by = if query.sort.is_empty() {
109            "ORDER BY last_updated DESC, id ASC".to_string()
110        } else {
111            PostgresQueryBuilder::build_order_by(query)
112        };
113
114        // Build query based on pagination mode.
115        let (sql, has_previous) = if let (Some(cursor), Some(k)) = (&cursor, &keyset) {
116            let e = &k.expr;
117            let asc = k.direction == crate::types::SortDirection::Ascending;
118            match cursor.direction() {
119                CursorDirection::Next => {
120                    let e_op = if asc { ">" } else { "<" };
121                    let sql = format!(
122                        "SELECT {cols} FROM resources \
123                         WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
124                         AND ({e} {e_op} $3 OR ({e} = $3 AND id > $4)) \
125                         ORDER BY {e} {dir}, id ASC LIMIT {lim}",
126                        cols = select_cols,
127                        filter = filter_clause,
128                        e = e,
129                        e_op = e_op,
130                        dir = if asc { "ASC" } else { "DESC" },
131                        lim = count + 1,
132                    );
133                    (sql, true)
134                }
135                CursorDirection::Previous => {
136                    let e_op = if asc { "<" } else { ">" };
137                    let sql = format!(
138                        "SELECT {cols} FROM resources \
139                         WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
140                         AND ({e} {e_op} $3 OR ({e} = $3 AND id < $4)) \
141                         ORDER BY {e} {dir}, id DESC LIMIT {lim}",
142                        cols = select_cols,
143                        filter = filter_clause,
144                        e = e,
145                        e_op = e_op,
146                        dir = if asc { "DESC" } else { "ASC" },
147                        lim = count + 1,
148                    );
149                    (sql, false)
150                }
151            }
152        } else if let Some(offset) = query.offset {
153            let sql = format!(
154                "SELECT {cols} FROM resources \
155                 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
156                 {order} LIMIT {lim} OFFSET {off}",
157                cols = select_cols,
158                filter = filter_clause,
159                order = order_by,
160                lim = count + 1,
161                off = offset,
162            );
163            (sql, offset > 0)
164        } else {
165            let sql = format!(
166                "SELECT {cols} FROM resources \
167                 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
168                 {order} LIMIT {lim}",
169                cols = select_cols,
170                filter = filter_clause,
171                order = order_by,
172                lim = count + 1,
173            );
174            (sql, false)
175        };
176
177        // Build parameter list for binding.
178        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
179            Box::new(tenant_id.to_string()),
180            Box::new(resource_type.to_string()),
181        ];
182        if let (Some(cursor), Some(k)) = (&cursor, &keyset) {
183            Self::bind_cursor_value(&mut params, k.kind, cursor)?;
184            params.push(Box::new(cursor.resource_id().to_string()));
185        }
186        for param in &search_params {
187            match param {
188                SqlParam::Text(s) => params.push(Box::new(s.clone())),
189                SqlParam::Float(f) => params.push(Box::new(*f)),
190                SqlParam::Integer(i) => params.push(Box::new(*i)),
191                SqlParam::Bool(b) => params.push(Box::new(*b)),
192                SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
193                SqlParam::Null => params.push(Box::new(Option::<String>::None)),
194            }
195        }
196        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
197            .iter()
198            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
199            .collect();
200        let rows = client
201            .query(&sql, &param_refs)
202            .await
203            .map_err(|e| internal_error(format!("Failed to execute search: {}", e)))?;
204
205        // Parse rows, capturing the sort key for cursor construction.
206        let mut parsed: Vec<(StoredResource, Option<CursorValue>)> = Vec::new();
207        for row in &rows {
208            let id: String = row.get(0);
209            let version_id: String = row.get(1);
210            let json_data: serde_json::Value = row.get(2);
211            let last_updated: chrono::DateTime<Utc> = row.get(3);
212            let fhir_version_str: String = row.get(4);
213            let sort_key = keyset
214                .as_ref()
215                .map(|k| Self::read_cursor_value(row, 5, k.kind));
216
217            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
218                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
219            let resource = StoredResource::from_storage(
220                resource_type.clone(),
221                id,
222                version_id,
223                tenant.tenant_id().clone(),
224                json_data,
225                last_updated,
226                last_updated,
227                None,
228                fhir_version,
229            );
230            parsed.push((resource, sort_key));
231        }
232
233        // Backward pagination fetched in reverse order — restore sort order.
234        if cursor
235            .as_ref()
236            .map(|c| c.direction() == CursorDirection::Previous)
237            .unwrap_or(false)
238        {
239            parsed.reverse();
240        }
241
242        // We fetched one extra to detect a further page.
243        let has_next = parsed.len() > count;
244        if has_next {
245            parsed.pop();
246        }
247
248        let next_cursor = if has_next {
249            parsed.last().map(|(r, sk)| {
250                PageCursor::new(vec![sk.clone().unwrap_or(CursorValue::Null)], r.id()).encode()
251            })
252        } else {
253            None
254        };
255        let previous_cursor = if has_previous {
256            parsed.first().map(|(r, sk)| {
257                PageCursor::previous(vec![sk.clone().unwrap_or(CursorValue::Null)], r.id()).encode()
258            })
259        } else {
260            None
261        };
262
263        let resources: Vec<StoredResource> = parsed.into_iter().map(|(r, _)| r).collect();
264
265        // `total` was computed up-front (before acquiring `client`) to avoid
266        // holding a non-Send guard across the count query's await.
267        let page_info = PageInfo {
268            next_cursor,
269            previous_cursor,
270            total,
271            has_next,
272            has_previous,
273        };
274        let page = Page::new(resources, page_info);
275
276        Ok(SearchResult {
277            resources: page,
278            included: Vec::new(),
279            total,
280            scores: Default::default(),
281        })
282    }
283
284    async fn search_count(
285        &self,
286        tenant: &TenantContext,
287        query: &SearchQuery,
288    ) -> StorageResult<u64> {
289        let client = self.get_client().await?;
290        let tenant_id = tenant.tenant_id().as_str();
291        let resource_type = &query.resource_type;
292
293        let (sql, params): (
294            String,
295            Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
296        ) = if !query.parameters.is_empty() || query.compartment.is_some() {
297            let filter = PostgresQueryBuilder::build_search_query(query, 2);
298
299            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
300                Box::new(tenant_id.to_string()),
301                Box::new(resource_type.to_string()),
302            ];
303
304            if let Some(ref fragment) = filter {
305                for param in &fragment.params {
306                    match param {
307                        SqlParam::Text(s) => params.push(Box::new(s.clone())),
308                        SqlParam::Float(f) => params.push(Box::new(*f)),
309                        SqlParam::Integer(i) => params.push(Box::new(*i)),
310                        SqlParam::Bool(b) => params.push(Box::new(*b)),
311                        SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
312                        SqlParam::Null => params.push(Box::new(Option::<String>::None)),
313                    }
314                }
315
316                let sql = format!(
317                    "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND ({})",
318                    fragment.sql
319                );
320                (sql, params)
321            } else {
322                let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
323                (sql, params)
324            }
325        } else {
326            let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
327            let params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
328                Box::new(tenant_id.to_string()),
329                Box::new(resource_type.to_string()),
330            ];
331            (sql, params)
332        };
333
334        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
335            .iter()
336            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
337            .collect();
338
339        let row = client
340            .query_one(&sql, &param_refs)
341            .await
342            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
343
344        let count: i64 = row.get(0);
345        Ok(count as u64)
346    }
347
348    fn search_param_registry(
349        &self,
350    ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
351        self.search_registry()
352    }
353
354    fn supports_contained_search(&self) -> bool {
355        true
356    }
357
358    fn modifiers_for_param_type(
359        &self,
360        param_type: crate::types::SearchParamType,
361    ) -> Vec<&'static str> {
362        Self::modifiers_for_type(param_type)
363    }
364}
365
366#[async_trait]
367impl MultiTypeSearchProvider for PostgresBackend {
368    async fn search_multi(
369        &self,
370        tenant: &TenantContext,
371        resource_types: &[&str],
372        query: &SearchQuery,
373    ) -> StorageResult<SearchResult> {
374        let client = self.get_client().await?;
375        let tenant_id = tenant.tenant_id().as_str();
376
377        let count = query.count.unwrap_or(100) as usize;
378        let offset = query.offset.unwrap_or(0) as usize;
379
380        // Build the type filter
381        let type_filter = if resource_types.is_empty() {
382            String::new()
383        } else {
384            let types: Vec<String> = resource_types
385                .iter()
386                .map(|t| format!("'{}'", t.replace('\'', "''")))
387                .collect();
388            format!(" AND resource_type IN ({})", types.join(", "))
389        };
390
391        let sql = format!(
392            "SELECT resource_type, id, version_id, data, last_updated, fhir_version FROM resources
393             WHERE tenant_id = $1 AND is_deleted = FALSE{}
394             ORDER BY last_updated DESC, id DESC
395             LIMIT {} OFFSET {}",
396            type_filter,
397            count + 1,
398            offset
399        );
400
401        let rows = client
402            .query(&sql, &[&tenant_id])
403            .await
404            .map_err(|e| internal_error(format!("Failed to execute multi-type search: {}", e)))?;
405
406        let mut resources = Vec::new();
407        for row in &rows {
408            let res_type: String = row.get(0);
409            let id: String = row.get(1);
410            let version_id: String = row.get(2);
411            let json_data: serde_json::Value = row.get(3);
412            let last_updated: chrono::DateTime<Utc> = row.get(4);
413            let fhir_version_str: String = row.get(5);
414
415            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
416                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
417
418            let resource = StoredResource::from_storage(
419                res_type,
420                id,
421                version_id,
422                tenant.tenant_id().clone(),
423                json_data,
424                last_updated,
425                last_updated,
426                None,
427                fhir_version,
428            );
429
430            resources.push(resource);
431        }
432
433        let has_next = resources.len() > count;
434        if has_next {
435            resources.pop();
436        }
437
438        let page_info = PageInfo {
439            next_cursor: None,
440            previous_cursor: None,
441            total: None,
442            has_next,
443            has_previous: offset > 0,
444        };
445
446        Ok(SearchResult {
447            resources: Page::new(resources, page_info),
448            included: Vec::new(),
449            total: None,
450            scores: Default::default(),
451        })
452    }
453}
454
455#[async_trait]
456impl IncludeProvider for PostgresBackend {
457    async fn resolve_includes(
458        &self,
459        tenant: &TenantContext,
460        resources: &[StoredResource],
461        includes: &[IncludeDirective],
462    ) -> StorageResult<Vec<StoredResource>> {
463        if resources.is_empty() || includes.is_empty() {
464            return Ok(Vec::new());
465        }
466
467        let client = self.get_client().await?;
468        let tenant_id = tenant.tenant_id().as_str();
469
470        let mut included = Vec::new();
471        let mut seen_refs: HashSet<String> = HashSet::new();
472
473        for include in includes {
474            for resource in resources {
475                if resource.resource_type() != include.source_type {
476                    continue;
477                }
478
479                let refs = Self::extract_references(resource.content(), &include.search_param);
480
481                for reference in refs {
482                    if let Some((ref_type, ref_id)) = Self::parse_reference(&reference) {
483                        if let Some(ref target) = include.target_type {
484                            if ref_type != *target {
485                                continue;
486                            }
487                        }
488
489                        let ref_key = format!("{}/{}", ref_type, ref_id);
490                        if seen_refs.contains(&ref_key) {
491                            continue;
492                        }
493                        seen_refs.insert(ref_key);
494
495                        if let Some(included_resource) =
496                            Self::fetch_resource(&client, tenant_id, &ref_type, &ref_id).await?
497                        {
498                            included.push(included_resource);
499                        }
500                    }
501                }
502            }
503        }
504
505        Ok(included)
506    }
507}
508
509#[async_trait]
510impl RevincludeProvider for PostgresBackend {
511    async fn resolve_revincludes(
512        &self,
513        tenant: &TenantContext,
514        resources: &[StoredResource],
515        revincludes: &[IncludeDirective],
516    ) -> StorageResult<Vec<StoredResource>> {
517        if resources.is_empty() || revincludes.is_empty() {
518            return Ok(Vec::new());
519        }
520
521        let client = self.get_client().await?;
522        let tenant_id = tenant.tenant_id().as_str();
523
524        let mut included = Vec::new();
525        let mut seen_ids: HashSet<String> = HashSet::new();
526
527        for revinclude in revincludes {
528            let mut reference_values: Vec<String> = Vec::new();
529            for resource in resources {
530                reference_values.push(format!("{}/{}", resource.resource_type(), resource.id()));
531                reference_values.push(resource.id().to_string());
532            }
533
534            if reference_values.is_empty() {
535                continue;
536            }
537
538            // Use the search index to find resources referencing our results
539            let placeholders: Vec<String> = (0..reference_values.len())
540                .map(|i| format!("${}", i + 3))
541                .collect();
542
543            let sql = format!(
544                "SELECT DISTINCT r.id, r.version_id, r.data, r.last_updated, r.fhir_version
545                 FROM resources r
546                 INNER JOIN search_index si ON r.tenant_id = si.tenant_id
547                    AND r.resource_type = si.resource_type
548                    AND r.id = si.resource_id
549                 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
550                 AND si.param_name = '{}'
551                 AND si.value_reference IN ({})",
552                revinclude.search_param,
553                placeholders.join(", ")
554            );
555
556            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
557                Box::new(tenant_id.to_string()),
558                Box::new(revinclude.source_type.clone()),
559            ];
560            for rv in &reference_values {
561                params.push(Box::new(rv.clone()));
562            }
563
564            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
565                .iter()
566                .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
567                .collect();
568
569            let rows = client.query(&sql, &param_refs).await.map_err(|e| {
570                internal_error(format!("Failed to execute revinclude query: {}", e))
571            })?;
572
573            for row in &rows {
574                let id: String = row.get(0);
575                let version_id: String = row.get(1);
576                let json_data: serde_json::Value = row.get(2);
577                let last_updated: chrono::DateTime<Utc> = row.get(3);
578                let fhir_version_str: String = row.get(4);
579
580                let resource_key = format!("{}/{}", revinclude.source_type, id);
581                if seen_ids.contains(&resource_key) {
582                    continue;
583                }
584                seen_ids.insert(resource_key);
585
586                let fhir_version = FhirVersion::from_storage(&fhir_version_str)
587                    .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
588
589                let resource = StoredResource::from_storage(
590                    &revinclude.source_type,
591                    id,
592                    version_id,
593                    tenant.tenant_id().clone(),
594                    json_data,
595                    last_updated,
596                    last_updated,
597                    None,
598                    fhir_version,
599                );
600
601                included.push(resource);
602            }
603        }
604
605        Ok(included)
606    }
607}
608
609#[async_trait]
610impl ChainedSearchProvider for PostgresBackend {
611    async fn resolve_chain(
612        &self,
613        tenant: &TenantContext,
614        base_type: &str,
615        chain: &str,
616        value: &str,
617    ) -> StorageResult<Vec<String>> {
618        if chain.is_empty() {
619            return Ok(Vec::new());
620        }
621
622        let client = self.get_client().await?;
623        let tenant_id = tenant.tenant_id().as_str();
624
625        // Build a multi-step chain query via the registry-driven builder.
626        // The builder produces a `r.id IN (... nested SELECTs ...)` fragment
627        // that handles arbitrary chain depth (was previously stubbed for >2
628        // segments).
629        let builder = ChainQueryBuilder::new(tenant_id, base_type, self.search_registry().clone())
630            .with_param_offset(1);
631        let parsed = builder
632            .parse_chain(chain)
633            .map_err(|e| internal_error(format!("Failed to parse chain: {}", e)))?;
634        let parsed_value = crate::types::SearchValue::parse(value);
635        let fragment = builder.build_forward_chain_sql(&parsed, &parsed_value)?;
636
637        let sql = format!(
638            "SELECT r.id FROM resources r WHERE r.tenant_id = $1 \
639             AND r.resource_type = '{base}' AND r.is_deleted = FALSE AND {clause}",
640            base = base_type,
641            clause = fragment.sql,
642        );
643
644        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
645            vec![Box::new(tenant_id.to_string())];
646        for p in &fragment.params {
647            match p {
648                SqlParam::Text(s) => params.push(Box::new(s.clone())),
649                SqlParam::Float(f) => params.push(Box::new(*f)),
650                SqlParam::Integer(i) => params.push(Box::new(*i)),
651                SqlParam::Bool(b) => params.push(Box::new(*b)),
652                SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
653                SqlParam::Null => params.push(Box::new(Option::<String>::None)),
654            }
655        }
656        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
657            .iter()
658            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
659            .collect();
660
661        let rows = client
662            .query(&sql, &param_refs)
663            .await
664            .map_err(|e| internal_error(format!("Failed to execute chain query: {}", e)))?;
665
666        Ok(rows.iter().map(|r| r.get(0)).collect())
667    }
668
669    async fn resolve_reverse_chain(
670        &self,
671        tenant: &TenantContext,
672        base_type: &str,
673        reverse_chain: &ReverseChainedParameter,
674    ) -> StorageResult<Vec<String>> {
675        let client = self.get_client().await?;
676        let tenant_id = tenant.tenant_id().as_str();
677
678        // Use the registry-driven builder so we handle nested `_has` chains
679        // and any param type (was previously single-level only with hardcoded
680        // token-or-string-or-empty fallback).
681        let builder = ChainQueryBuilder::new(tenant_id, base_type, self.search_registry().clone())
682            .with_param_offset(1);
683        let fragment = builder.build_reverse_chain_sql(reverse_chain)?;
684
685        let sql = format!(
686            "SELECT r.id FROM resources r WHERE r.tenant_id = $1 \
687             AND r.resource_type = '{base}' AND r.is_deleted = FALSE AND {clause}",
688            base = base_type,
689            clause = fragment.sql,
690        );
691
692        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
693            vec![Box::new(tenant_id.to_string())];
694        for p in &fragment.params {
695            match p {
696                SqlParam::Text(s) => params.push(Box::new(s.clone())),
697                SqlParam::Float(f) => params.push(Box::new(*f)),
698                SqlParam::Integer(i) => params.push(Box::new(*i)),
699                SqlParam::Bool(b) => params.push(Box::new(*b)),
700                SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
701                SqlParam::Null => params.push(Box::new(Option::<String>::None)),
702            }
703        }
704        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
705            .iter()
706            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
707            .collect();
708
709        let rows = client
710            .query(&sql, &param_refs)
711            .await
712            .map_err(|e| internal_error(format!("Failed to execute reverse chain query: {}", e)))?;
713
714        Ok(rows.iter().map(|r| r.get(0)).collect())
715    }
716}
717
718#[async_trait]
719impl TextSearchProvider for PostgresBackend {
720    async fn search_text(
721        &self,
722        tenant: &TenantContext,
723        resource_type: &str,
724        text: &str,
725        pagination: &Pagination,
726    ) -> StorageResult<SearchResult> {
727        let client = self.get_client().await?;
728        let tenant_id = tenant.tenant_id().as_str();
729        let count = pagination.count as usize;
730
731        // Use PostgreSQL native FTS with tsvector/tsquery
732        let sql = format!(
733            "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
734                    ts_rank(fts.narrative_tsvector, plainto_tsquery('english', $3)) AS rank
735             FROM resources r
736             INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
737                AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
738             WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
739             AND fts.narrative_tsvector @@ plainto_tsquery('english', $3)
740             ORDER BY rank DESC, r.last_updated DESC
741             LIMIT {}",
742            count + 1
743        );
744
745        let rows = client
746            .query(&sql, &[&tenant_id, &resource_type, &text])
747            .await
748            .map_err(|e| internal_error(format!("Failed to execute text search: {}", e)))?;
749
750        let mut resources = Vec::new();
751        for row in &rows {
752            let id: String = row.get(0);
753            let version_id: String = row.get(1);
754            let json_data: serde_json::Value = row.get(2);
755            let last_updated: chrono::DateTime<Utc> = row.get(3);
756            let fhir_version_str: String = row.get(4);
757
758            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
759                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
760
761            resources.push(StoredResource::from_storage(
762                resource_type,
763                id,
764                version_id,
765                tenant.tenant_id().clone(),
766                json_data,
767                last_updated,
768                last_updated,
769                None,
770                fhir_version,
771            ));
772        }
773
774        let has_next = resources.len() > count;
775        if has_next {
776            resources.pop();
777        }
778
779        let page_info = PageInfo {
780            next_cursor: None,
781            previous_cursor: None,
782            total: None,
783            has_next,
784            has_previous: false,
785        };
786
787        Ok(SearchResult {
788            resources: Page::new(resources, page_info),
789            included: Vec::new(),
790            total: None,
791            scores: Default::default(),
792        })
793    }
794
795    async fn search_content(
796        &self,
797        tenant: &TenantContext,
798        resource_type: &str,
799        content: &str,
800        pagination: &Pagination,
801    ) -> StorageResult<SearchResult> {
802        let client = self.get_client().await?;
803        let tenant_id = tenant.tenant_id().as_str();
804        let count = pagination.count as usize;
805
806        // Use content_tsvector for _content search
807        let sql = format!(
808            "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
809                    ts_rank(fts.content_tsvector, plainto_tsquery('english', $3)) AS rank
810             FROM resources r
811             INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
812                AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
813             WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
814             AND fts.content_tsvector @@ plainto_tsquery('english', $3)
815             ORDER BY rank DESC, r.last_updated DESC
816             LIMIT {}",
817            count + 1
818        );
819
820        let rows = client
821            .query(&sql, &[&tenant_id, &resource_type, &content])
822            .await
823            .map_err(|e| internal_error(format!("Failed to execute content search: {}", e)))?;
824
825        let mut resources = Vec::new();
826        for row in &rows {
827            let id: String = row.get(0);
828            let version_id: String = row.get(1);
829            let json_data: serde_json::Value = row.get(2);
830            let last_updated: chrono::DateTime<Utc> = row.get(3);
831            let fhir_version_str: String = row.get(4);
832
833            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
834                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
835
836            resources.push(StoredResource::from_storage(
837                resource_type,
838                id,
839                version_id,
840                tenant.tenant_id().clone(),
841                json_data,
842                last_updated,
843                last_updated,
844                None,
845                fhir_version,
846            ));
847        }
848
849        let has_next = resources.len() > count;
850        if has_next {
851            resources.pop();
852        }
853
854        let page_info = PageInfo {
855            next_cursor: None,
856            previous_cursor: None,
857            total: None,
858            has_next,
859            has_previous: false,
860        };
861
862        Ok(SearchResult {
863            resources: Page::new(resources, page_info),
864            included: Vec::new(),
865            total: None,
866            scores: Default::default(),
867        })
868    }
869}
870
871/// Finds the `contained[]` entry with the given local `id` in a container's
872/// content.
873fn extract_contained_resource(
874    content: &serde_json::Value,
875    local_id: &str,
876) -> Option<serde_json::Value> {
877    content
878        .get("contained")?
879        .as_array()?
880        .iter()
881        .find(|e| e.get("id").and_then(|v| v.as_str()) == Some(local_id))
882        .cloned()
883}
884
885/// Builds a `StoredResource` for a contained resource, inheriting the
886/// container's version/tenant/timestamps. Used for `_containedType=contained`.
887fn build_contained_stored(
888    container: &StoredResource,
889    contained_type: &str,
890    local_id: &str,
891    content: serde_json::Value,
892) -> StoredResource {
893    StoredResource::from_storage(
894        contained_type.to_string(),
895        local_id.to_string(),
896        container.version_id().to_string(),
897        container.tenant_id().clone(),
898        content,
899        container.created_at(),
900        container.last_modified(),
901        None,
902        container.fhir_version(),
903    )
904}
905
906// Contained (`_contained`) search.
907impl PostgresBackend {
908    /// Executes a `_contained=true|both` search. See the SQLite backend's
909    /// `search_contained` for the shared semantics: matches contained resources
910    /// of `query.resource_type` via the `is_contained` index rows, returns the
911    /// containers (default) or the contained resources (`_containedType=contained`),
912    /// and for `both` merges top-level matches first. Paginated by
913    /// `_offset`/`_count` as a single window (no keyset cursor).
914    async fn search_contained(
915        &self,
916        tenant: &TenantContext,
917        query: &SearchQuery,
918    ) -> StorageResult<SearchResult> {
919        use crate::types::{ContainedMode, ContainedReturn};
920
921        let tenant_id = tenant.tenant_id().as_str();
922        let contained_type = query.resource_type.as_str();
923
924        // 1. Resolve contained matches → (container_type, container_id, local_id).
925        let matches: Vec<(String, String, Option<String>)> =
926            match PostgresQueryBuilder::build_contained(query) {
927                Some(fragment) => {
928                    let client = self.get_client().await?;
929                    let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
930                        Box::new(tenant_id.to_string()),
931                        Box::new(contained_type.to_string()),
932                    ];
933                    for param in &fragment.params {
934                        match param {
935                            SqlParam::Text(s) => params.push(Box::new(s.clone())),
936                            SqlParam::Float(f) => params.push(Box::new(*f)),
937                            SqlParam::Integer(i) => params.push(Box::new(*i)),
938                            SqlParam::Bool(b) => params.push(Box::new(*b)),
939                            SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
940                            SqlParam::Null => params.push(Box::new(Option::<String>::None)),
941                        }
942                    }
943                    let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
944                        .iter()
945                        .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
946                        .collect();
947                    let rows = client
948                        .query(&fragment.sql, &param_refs)
949                        .await
950                        .map_err(|e| {
951                            internal_error(format!("Failed to execute contained query: {e}"))
952                        })?;
953                    rows.iter()
954                        .map(|row| {
955                            (
956                                row.get::<_, String>(0),
957                                row.get::<_, String>(1),
958                                row.get::<_, Option<String>>(2),
959                            )
960                        })
961                        .collect()
962                }
963                None => Vec::new(),
964            };
965
966        // 2. Materialize result items (container or contained), de-duplicated.
967        let mut items: Vec<StoredResource> = Vec::new();
968        let mut seen: HashSet<String> = HashSet::new();
969        match query.contained_return {
970            ContainedReturn::Container => {
971                for (ctype, cid, _) in &matches {
972                    if !seen.insert(format!("{ctype}/{cid}")) {
973                        continue;
974                    }
975                    if let Some(container) = self.read(tenant, ctype, cid).await? {
976                        items.push(container);
977                    }
978                }
979            }
980            ContainedReturn::Contained => {
981                for (ctype, cid, local) in &matches {
982                    let Some(local_id) = local else { continue };
983                    if !seen.insert(format!("{ctype}/{cid}#{local_id}")) {
984                        continue;
985                    }
986                    if let Some(container) = self.read(tenant, ctype, cid).await? {
987                        if let Some(c) = extract_contained_resource(container.content(), local_id) {
988                            items.push(build_contained_stored(
989                                &container,
990                                contained_type,
991                                local_id,
992                                c,
993                            ));
994                        }
995                    }
996                }
997            }
998        }
999
1000        // 3. For `both`, merge top-level matches ahead of contained ones.
1001        if query.contained == ContainedMode::Both {
1002            let mut top_query = query.clone();
1003            top_query.contained = ContainedMode::Off;
1004            top_query.contained_return = ContainedReturn::Container;
1005            let top = self.search(tenant, &top_query).await?;
1006            let mut merged = top.resources.items;
1007            let top_urls: HashSet<String> = merged.iter().map(|r| r.url()).collect();
1008            for item in items {
1009                if !top_urls.contains(&item.url()) {
1010                    merged.push(item);
1011                }
1012            }
1013            items = merged;
1014        }
1015
1016        // 4. Apply the offset/count window.
1017        let count = query.count.unwrap_or(100) as usize;
1018        let offset = query.offset.unwrap_or(0) as usize;
1019        let total_matches = items.len() as u64;
1020        let windowed: Vec<StoredResource> = items.into_iter().skip(offset).take(count).collect();
1021
1022        let total = if query.wants_total() {
1023            Some(total_matches)
1024        } else {
1025            None
1026        };
1027        let page = Page::new(windowed, PageInfo::end());
1028        let mut result = SearchResult::new(page);
1029        if let Some(t) = total {
1030            result = result.with_total(t);
1031        }
1032        Ok(result)
1033    }
1034}
1035
1036// Helper methods for search implementations
1037impl PostgresBackend {
1038    /// Extract timestamp and ID from a cursor for keyset pagination.
1039    /// Binds the cursor's boundary sort value as `$3`, typed per the sort key
1040    /// kind so PostgreSQL compares it correctly against the sort expression.
1041    fn bind_cursor_value(
1042        params: &mut Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
1043        kind: SortValueKind,
1044        cursor: &PageCursor,
1045    ) -> StorageResult<()> {
1046        let value = cursor.sort_values().first();
1047        match kind {
1048            SortValueKind::Timestamp => {
1049                let dt = match value {
1050                    Some(CursorValue::String(s)) => chrono::DateTime::parse_from_rfc3339(s)
1051                        .map(|d| d.with_timezone(&Utc))
1052                        .map_err(|_| internal_error("Invalid cursor timestamp".to_string()))?,
1053                    _ => {
1054                        return Err(internal_error(
1055                            "Invalid cursor: expected timestamp".to_string(),
1056                        ));
1057                    }
1058                };
1059                params.push(Box::new(dt));
1060            }
1061            SortValueKind::Number => {
1062                let n = match value {
1063                    Some(CursorValue::Decimal(f)) => *f,
1064                    Some(CursorValue::Number(i)) => *i as f64,
1065                    Some(CursorValue::String(s)) => s.parse().unwrap_or(0.0),
1066                    _ => {
1067                        return Err(internal_error(
1068                            "Invalid cursor: expected number".to_string(),
1069                        ));
1070                    }
1071                };
1072                params.push(Box::new(n));
1073            }
1074            SortValueKind::Text => match value {
1075                Some(CursorValue::String(s)) => params.push(Box::new(s.clone())),
1076                Some(CursorValue::Null) | None => params.push(Box::new(Option::<String>::None)),
1077                _ => {
1078                    return Err(internal_error("Invalid cursor: expected text".to_string()));
1079                }
1080            },
1081        }
1082        Ok(())
1083    }
1084
1085    /// Reads the `sort_key` column (index 5) as a `CursorValue` per the key kind.
1086    fn read_cursor_value(
1087        row: &tokio_postgres::Row,
1088        idx: usize,
1089        kind: SortValueKind,
1090    ) -> CursorValue {
1091        match kind {
1092            SortValueKind::Timestamp => {
1093                let v: Option<chrono::DateTime<Utc>> = row.try_get(idx).ok().flatten();
1094                v.map(|d| CursorValue::String(d.to_rfc3339()))
1095                    .unwrap_or(CursorValue::Null)
1096            }
1097            SortValueKind::Number => {
1098                let v: Option<f64> = row.try_get(idx).ok().flatten();
1099                v.map(CursorValue::Decimal).unwrap_or(CursorValue::Null)
1100            }
1101            SortValueKind::Text => {
1102                let v: Option<String> = row.try_get(idx).ok().flatten();
1103                v.map(CursorValue::String).unwrap_or(CursorValue::Null)
1104            }
1105        }
1106    }
1107
1108    /// Extract references from a resource for a given search parameter.
1109    fn extract_references(content: &serde_json::Value, search_param: &str) -> Vec<String> {
1110        let mut refs = Vec::new();
1111        if let Some(value) = content.get(search_param) {
1112            Self::collect_references_from_value(value, &mut refs);
1113        }
1114        refs
1115    }
1116
1117    /// Recursively collect reference strings from a JSON value.
1118    fn collect_references_from_value(value: &serde_json::Value, refs: &mut Vec<String>) {
1119        match value {
1120            serde_json::Value::Object(obj) => {
1121                if let Some(serde_json::Value::String(ref_str)) = obj.get("reference") {
1122                    refs.push(ref_str.clone());
1123                }
1124                for v in obj.values() {
1125                    Self::collect_references_from_value(v, refs);
1126                }
1127            }
1128            serde_json::Value::Array(arr) => {
1129                for item in arr {
1130                    Self::collect_references_from_value(item, refs);
1131                }
1132            }
1133            _ => {}
1134        }
1135    }
1136
1137    /// Parse a reference string into (type, id).
1138    fn parse_reference(reference: &str) -> Option<(String, String)> {
1139        let path = reference
1140            .strip_prefix("http://")
1141            .or_else(|| reference.strip_prefix("https://"))
1142            .map(|s| s.rsplit('/').take(2).collect::<Vec<_>>())
1143            .unwrap_or_else(|| reference.split('/').collect());
1144
1145        if path.len() >= 2 {
1146            if reference.starts_with("http") {
1147                Some((path[1].to_string(), path[0].to_string()))
1148            } else {
1149                Some((path[0].to_string(), path[1].to_string()))
1150            }
1151        } else {
1152            None
1153        }
1154    }
1155
1156    /// Fetch a single resource by type and ID.
1157    async fn fetch_resource(
1158        client: &deadpool_postgres::Client,
1159        tenant_id: &str,
1160        resource_type: &str,
1161        id: &str,
1162    ) -> StorageResult<Option<StoredResource>> {
1163        let rows = client
1164            .query(
1165                "SELECT version_id, data, last_updated, fhir_version FROM resources
1166                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
1167                &[&tenant_id, &resource_type, &id],
1168            )
1169            .await
1170            .map_err(|e| internal_error(format!("Failed to fetch resource: {}", e)))?;
1171
1172        if rows.is_empty() {
1173            return Ok(None);
1174        }
1175
1176        let row = &rows[0];
1177        let version_id: String = row.get(0);
1178        let json_data: serde_json::Value = row.get(1);
1179        let last_updated: chrono::DateTime<Utc> = row.get(2);
1180        let fhir_version_str: String = row.get(3);
1181        let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1182            .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1183
1184        Ok(Some(StoredResource::from_storage(
1185            resource_type,
1186            id,
1187            version_id,
1188            crate::tenant::TenantId::new(tenant_id),
1189            json_data,
1190            last_updated,
1191            last_updated,
1192            None,
1193            fhir_version,
1194        )))
1195    }
1196}