Skip to main content

helios_persistence/backends/postgres/
search_impl.rs

1//! Search implementation for PostgreSQL backend.
2//!
3//! This module provides search functionality for the PostgreSQL backend including:
4//! - Basic single-type search
5//! - Multi-type search
6//! - _include and _revinclude support
7//! - Chained search parameter support
8//! - Full-text search using tsvector/tsquery
9
10use std::collections::HashSet;
11
12use async_trait::async_trait;
13use chrono::Utc;
14use helios_fhir::FhirVersion;
15
16use crate::core::{
17    ChainedSearchProvider, IncludeProvider, MultiTypeSearchProvider, RevincludeProvider,
18    SearchProvider, SearchResult, TextSearchProvider,
19};
20use crate::error::{BackendError, StorageError, StorageResult};
21use crate::tenant::TenantContext;
22use crate::types::{
23    CursorDirection, CursorValue, IncludeDirective, Page, PageCursor, PageInfo, Pagination,
24    ReverseChainedParameter, SearchQuery, StoredResource,
25};
26
27use super::PostgresBackend;
28use super::search::query_builder::{PostgresQueryBuilder, SqlParam};
29
30fn internal_error(message: String) -> StorageError {
31    StorageError::Backend(BackendError::Internal {
32        backend_name: "postgres".to_string(),
33        message,
34        source: None,
35    })
36}
37
38#[async_trait]
39impl SearchProvider for PostgresBackend {
40    async fn search(
41        &self,
42        tenant: &TenantContext,
43        query: &SearchQuery,
44    ) -> StorageResult<SearchResult> {
45        let client = self.get_client().await?;
46        let tenant_id = tenant.tenant_id().as_str();
47        let resource_type = &query.resource_type;
48
49        // Get count with default
50        let count = query.count.unwrap_or(100) as usize;
51
52        // Check for cursor-based pagination
53        let cursor = query
54            .cursor
55            .as_ref()
56            .and_then(|c| PageCursor::decode(c).ok());
57
58        // Determine param offset based on pagination mode
59        // Cursor pagination: $1=tenant, $2=type, $3=timestamp, $4=id -> offset=4
60        // Non-cursor: $1=tenant, $2=type -> offset=2
61        let param_offset = if cursor.is_some() { 4 } else { 2 };
62
63        // Build the search filter subquery if there are search parameters
64        let search_filter = if !query.parameters.is_empty() {
65            PostgresQueryBuilder::build_search_query(query, param_offset)
66        } else {
67            None
68        };
69
70        // Build query based on pagination mode
71        let (sql, has_previous, search_params) = if let Some(ref cursor) = cursor {
72            match cursor.direction() {
73                CursorDirection::Next => {
74                    let sql = if let Some(ref filter) = search_filter {
75                        format!(
76                            "SELECT id, version_id, data, last_updated, fhir_version FROM resources
77                             WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
78                             AND ({})
79                             AND (last_updated < $3 OR (last_updated = $3 AND id < $4))
80                             ORDER BY last_updated DESC, id DESC
81                             LIMIT {}",
82                            filter.sql,
83                            count + 1
84                        )
85                    } else {
86                        format!(
87                            "SELECT id, version_id, data, last_updated, fhir_version FROM resources
88                             WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
89                             AND (last_updated < $3 OR (last_updated = $3 AND id < $4))
90                             ORDER BY last_updated DESC, id DESC
91                             LIMIT {}",
92                            count + 1
93                        )
94                    };
95                    (
96                        sql,
97                        true,
98                        search_filter.map(|f| f.params).unwrap_or_default(),
99                    )
100                }
101                CursorDirection::Previous => {
102                    let sql = if let Some(ref filter) = search_filter {
103                        format!(
104                            "SELECT id, version_id, data, last_updated, fhir_version FROM resources
105                             WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
106                             AND ({})
107                             AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
108                             ORDER BY last_updated ASC, id ASC
109                             LIMIT {}",
110                            filter.sql,
111                            count + 1
112                        )
113                    } else {
114                        format!(
115                            "SELECT id, version_id, data, last_updated, fhir_version FROM resources
116                             WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
117                             AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
118                             ORDER BY last_updated ASC, id ASC
119                             LIMIT {}",
120                            count + 1
121                        )
122                    };
123                    (
124                        sql,
125                        false,
126                        search_filter.map(|f| f.params).unwrap_or_default(),
127                    )
128                }
129            }
130        } else if let Some(offset) = query.offset {
131            // Offset-based pagination (legacy support)
132            let sql = if let Some(ref filter) = search_filter {
133                format!(
134                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
135                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
136                     AND ({})
137                     ORDER BY last_updated DESC, id DESC
138                     LIMIT {} OFFSET {}",
139                    filter.sql,
140                    count + 1,
141                    offset
142                )
143            } else {
144                format!(
145                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
146                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
147                     ORDER BY last_updated DESC, id DESC
148                     LIMIT {} OFFSET {}",
149                    count + 1,
150                    offset
151                )
152            };
153            (
154                sql,
155                offset > 0,
156                search_filter.map(|f| f.params).unwrap_or_default(),
157            )
158        } else {
159            // First page (no cursor, no offset)
160            let sql = if let Some(ref filter) = search_filter {
161                format!(
162                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
163                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
164                     AND ({})
165                     ORDER BY last_updated DESC, id DESC
166                     LIMIT {}",
167                    filter.sql,
168                    count + 1
169                )
170            } else {
171                format!(
172                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
173                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
174                     ORDER BY last_updated DESC, id DESC
175                     LIMIT {}",
176                    count + 1
177                )
178            };
179            (
180                sql,
181                false,
182                search_filter.map(|f| f.params).unwrap_or_default(),
183            )
184        };
185
186        // Build parameter list for binding
187        let rows = if let Some(ref cursor) = cursor {
188            let (cursor_timestamp, cursor_id) = Self::extract_cursor_values(cursor)?;
189
190            // Build params: [tenant_id, resource_type, cursor_timestamp, cursor_id, ...search_params]
191            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
192                Box::new(tenant_id.to_string()),
193                Box::new(resource_type.to_string()),
194                Box::new(cursor_timestamp),
195                Box::new(cursor_id),
196            ];
197
198            for param in &search_params {
199                match param {
200                    SqlParam::Text(s) => params.push(Box::new(s.clone())),
201                    SqlParam::Float(f) => params.push(Box::new(*f)),
202                    SqlParam::Integer(i) => params.push(Box::new(*i)),
203                    SqlParam::Bool(b) => params.push(Box::new(*b)),
204                    SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
205                    SqlParam::Null => params.push(Box::new(Option::<String>::None)),
206                }
207            }
208
209            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
210                .iter()
211                .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
212                .collect();
213
214            client
215                .query(&sql, &param_refs)
216                .await
217                .map_err(|e| internal_error(format!("Failed to execute search: {}", e)))?
218        } else {
219            // Build params: [tenant_id, resource_type, ...search_params]
220            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
221                Box::new(tenant_id.to_string()),
222                Box::new(resource_type.to_string()),
223            ];
224
225            for param in &search_params {
226                match param {
227                    SqlParam::Text(s) => params.push(Box::new(s.clone())),
228                    SqlParam::Float(f) => params.push(Box::new(*f)),
229                    SqlParam::Integer(i) => params.push(Box::new(*i)),
230                    SqlParam::Bool(b) => params.push(Box::new(*b)),
231                    SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
232                    SqlParam::Null => params.push(Box::new(Option::<String>::None)),
233                }
234            }
235
236            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
237                .iter()
238                .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
239                .collect();
240
241            client
242                .query(&sql, &param_refs)
243                .await
244                .map_err(|e| internal_error(format!("Failed to execute search: {}", e)))?
245        };
246
247        let mut resources = Vec::new();
248        for row in &rows {
249            let id: String = row.get(0);
250            let version_id: String = row.get(1);
251            let json_data: serde_json::Value = row.get(2);
252            let last_updated: chrono::DateTime<Utc> = row.get(3);
253            let fhir_version_str: String = row.get(4);
254
255            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
256
257            let resource = StoredResource::from_storage(
258                resource_type.clone(),
259                id,
260                version_id,
261                tenant.tenant_id().clone(),
262                json_data,
263                last_updated,
264                last_updated,
265                None,
266                fhir_version,
267            );
268
269            resources.push(resource);
270        }
271
272        // For backward pagination, reverse the results to maintain DESC order
273        if cursor
274            .as_ref()
275            .map(|c| c.direction() == CursorDirection::Previous)
276            .unwrap_or(false)
277        {
278            resources.reverse();
279        }
280
281        // Check if there are more results (we fetched one extra)
282        let has_next = resources.len() > count;
283        if has_next {
284            resources.pop();
285        }
286
287        // Generate cursors for pagination
288        let next_cursor = if has_next {
289            resources.last().map(|r| {
290                let cursor = PageCursor::new(
291                    vec![CursorValue::String(r.last_modified().to_rfc3339())],
292                    r.id(),
293                );
294                cursor.encode()
295            })
296        } else {
297            None
298        };
299
300        let previous_cursor = if has_previous {
301            resources.first().map(|r| {
302                let cursor = PageCursor::previous(
303                    vec![CursorValue::String(r.last_modified().to_rfc3339())],
304                    r.id(),
305                );
306                cursor.encode()
307            })
308        } else {
309            None
310        };
311
312        let page_info = PageInfo {
313            next_cursor,
314            previous_cursor,
315            total: None,
316            has_next,
317            has_previous,
318        };
319
320        let page = Page::new(resources, page_info);
321
322        Ok(SearchResult {
323            resources: page,
324            included: Vec::new(),
325            total: None,
326        })
327    }
328
329    async fn search_count(
330        &self,
331        tenant: &TenantContext,
332        query: &SearchQuery,
333    ) -> StorageResult<u64> {
334        let client = self.get_client().await?;
335        let tenant_id = tenant.tenant_id().as_str();
336        let resource_type = &query.resource_type;
337
338        let (sql, params): (
339            String,
340            Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
341        ) = if !query.parameters.is_empty() {
342            let filter = PostgresQueryBuilder::build_search_query(query, 2);
343
344            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
345                Box::new(tenant_id.to_string()),
346                Box::new(resource_type.to_string()),
347            ];
348
349            if let Some(ref fragment) = filter {
350                for param in &fragment.params {
351                    match param {
352                        SqlParam::Text(s) => params.push(Box::new(s.clone())),
353                        SqlParam::Float(f) => params.push(Box::new(*f)),
354                        SqlParam::Integer(i) => params.push(Box::new(*i)),
355                        SqlParam::Bool(b) => params.push(Box::new(*b)),
356                        SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
357                        SqlParam::Null => params.push(Box::new(Option::<String>::None)),
358                    }
359                }
360
361                let sql = format!(
362                    "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND ({})",
363                    fragment.sql
364                );
365                (sql, params)
366            } else {
367                let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
368                (sql, params)
369            }
370        } else {
371            let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
372            let params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
373                Box::new(tenant_id.to_string()),
374                Box::new(resource_type.to_string()),
375            ];
376            (sql, params)
377        };
378
379        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
380            .iter()
381            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
382            .collect();
383
384        let row = client
385            .query_one(&sql, &param_refs)
386            .await
387            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
388
389        let count: i64 = row.get(0);
390        Ok(count as u64)
391    }
392}
393
394#[async_trait]
395impl MultiTypeSearchProvider for PostgresBackend {
396    async fn search_multi(
397        &self,
398        tenant: &TenantContext,
399        resource_types: &[&str],
400        query: &SearchQuery,
401    ) -> StorageResult<SearchResult> {
402        let client = self.get_client().await?;
403        let tenant_id = tenant.tenant_id().as_str();
404
405        let count = query.count.unwrap_or(100) as usize;
406        let offset = query.offset.unwrap_or(0) as usize;
407
408        // Build the type filter
409        let type_filter = if resource_types.is_empty() {
410            String::new()
411        } else {
412            let types: Vec<String> = resource_types
413                .iter()
414                .map(|t| format!("'{}'", t.replace('\'', "''")))
415                .collect();
416            format!(" AND resource_type IN ({})", types.join(", "))
417        };
418
419        let sql = format!(
420            "SELECT resource_type, id, version_id, data, last_updated, fhir_version FROM resources
421             WHERE tenant_id = $1 AND is_deleted = FALSE{}
422             ORDER BY last_updated DESC, id DESC
423             LIMIT {} OFFSET {}",
424            type_filter,
425            count + 1,
426            offset
427        );
428
429        let rows = client
430            .query(&sql, &[&tenant_id])
431            .await
432            .map_err(|e| internal_error(format!("Failed to execute multi-type search: {}", e)))?;
433
434        let mut resources = Vec::new();
435        for row in &rows {
436            let res_type: String = row.get(0);
437            let id: String = row.get(1);
438            let version_id: String = row.get(2);
439            let json_data: serde_json::Value = row.get(3);
440            let last_updated: chrono::DateTime<Utc> = row.get(4);
441            let fhir_version_str: String = row.get(5);
442
443            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
444
445            let resource = StoredResource::from_storage(
446                res_type,
447                id,
448                version_id,
449                tenant.tenant_id().clone(),
450                json_data,
451                last_updated,
452                last_updated,
453                None,
454                fhir_version,
455            );
456
457            resources.push(resource);
458        }
459
460        let has_next = resources.len() > count;
461        if has_next {
462            resources.pop();
463        }
464
465        let page_info = PageInfo {
466            next_cursor: None,
467            previous_cursor: None,
468            total: None,
469            has_next,
470            has_previous: offset > 0,
471        };
472
473        Ok(SearchResult {
474            resources: Page::new(resources, page_info),
475            included: Vec::new(),
476            total: None,
477        })
478    }
479}
480
481#[async_trait]
482impl IncludeProvider for PostgresBackend {
483    async fn resolve_includes(
484        &self,
485        tenant: &TenantContext,
486        resources: &[StoredResource],
487        includes: &[IncludeDirective],
488    ) -> StorageResult<Vec<StoredResource>> {
489        if resources.is_empty() || includes.is_empty() {
490            return Ok(Vec::new());
491        }
492
493        let client = self.get_client().await?;
494        let tenant_id = tenant.tenant_id().as_str();
495
496        let mut included = Vec::new();
497        let mut seen_refs: HashSet<String> = HashSet::new();
498
499        for include in includes {
500            for resource in resources {
501                if resource.resource_type() != include.source_type {
502                    continue;
503                }
504
505                let refs = Self::extract_references(resource.content(), &include.search_param);
506
507                for reference in refs {
508                    if let Some((ref_type, ref_id)) = Self::parse_reference(&reference) {
509                        if let Some(ref target) = include.target_type {
510                            if ref_type != *target {
511                                continue;
512                            }
513                        }
514
515                        let ref_key = format!("{}/{}", ref_type, ref_id);
516                        if seen_refs.contains(&ref_key) {
517                            continue;
518                        }
519                        seen_refs.insert(ref_key);
520
521                        if let Some(included_resource) =
522                            Self::fetch_resource(&client, tenant_id, &ref_type, &ref_id).await?
523                        {
524                            included.push(included_resource);
525                        }
526                    }
527                }
528            }
529        }
530
531        Ok(included)
532    }
533}
534
535#[async_trait]
536impl RevincludeProvider for PostgresBackend {
537    async fn resolve_revincludes(
538        &self,
539        tenant: &TenantContext,
540        resources: &[StoredResource],
541        revincludes: &[IncludeDirective],
542    ) -> StorageResult<Vec<StoredResource>> {
543        if resources.is_empty() || revincludes.is_empty() {
544            return Ok(Vec::new());
545        }
546
547        let client = self.get_client().await?;
548        let tenant_id = tenant.tenant_id().as_str();
549
550        let mut included = Vec::new();
551        let mut seen_ids: HashSet<String> = HashSet::new();
552
553        for revinclude in revincludes {
554            let mut reference_values: Vec<String> = Vec::new();
555            for resource in resources {
556                reference_values.push(format!("{}/{}", resource.resource_type(), resource.id()));
557                reference_values.push(resource.id().to_string());
558            }
559
560            if reference_values.is_empty() {
561                continue;
562            }
563
564            // Use the search index to find resources referencing our results
565            let placeholders: Vec<String> = (0..reference_values.len())
566                .map(|i| format!("${}", i + 3))
567                .collect();
568
569            let sql = format!(
570                "SELECT DISTINCT r.id, r.version_id, r.data, r.last_updated, r.fhir_version
571                 FROM resources r
572                 INNER JOIN search_index si ON r.tenant_id = si.tenant_id
573                    AND r.resource_type = si.resource_type
574                    AND r.id = si.resource_id
575                 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
576                 AND si.param_name = '{}'
577                 AND si.value_reference IN ({})",
578                revinclude.search_param,
579                placeholders.join(", ")
580            );
581
582            let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
583                Box::new(tenant_id.to_string()),
584                Box::new(revinclude.source_type.clone()),
585            ];
586            for rv in &reference_values {
587                params.push(Box::new(rv.clone()));
588            }
589
590            let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
591                .iter()
592                .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
593                .collect();
594
595            let rows = client.query(&sql, &param_refs).await.map_err(|e| {
596                internal_error(format!("Failed to execute revinclude query: {}", e))
597            })?;
598
599            for row in &rows {
600                let id: String = row.get(0);
601                let version_id: String = row.get(1);
602                let json_data: serde_json::Value = row.get(2);
603                let last_updated: chrono::DateTime<Utc> = row.get(3);
604                let fhir_version_str: String = row.get(4);
605
606                let resource_key = format!("{}/{}", revinclude.source_type, id);
607                if seen_ids.contains(&resource_key) {
608                    continue;
609                }
610                seen_ids.insert(resource_key);
611
612                let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
613
614                let resource = StoredResource::from_storage(
615                    &revinclude.source_type,
616                    id,
617                    version_id,
618                    tenant.tenant_id().clone(),
619                    json_data,
620                    last_updated,
621                    last_updated,
622                    None,
623                    fhir_version,
624                );
625
626                included.push(resource);
627            }
628        }
629
630        Ok(included)
631    }
632}
633
634#[async_trait]
635impl ChainedSearchProvider for PostgresBackend {
636    async fn resolve_chain(
637        &self,
638        tenant: &TenantContext,
639        base_type: &str,
640        chain: &str,
641        value: &str,
642    ) -> StorageResult<Vec<String>> {
643        let client = self.get_client().await?;
644        let tenant_id = tenant.tenant_id().as_str();
645
646        if chain.is_empty() {
647            return Ok(Vec::new());
648        }
649
650        // Parse the chain path (e.g., "patient.organization.name")
651        let parts: Vec<&str> = chain.split('.').collect();
652        if parts.is_empty() {
653            return Ok(Vec::new());
654        }
655
656        // Simple single-step chain: param_name.target_param = value
657        // For multi-step chains, build nested subqueries
658        if parts.len() == 2 {
659            // Single step: e.g., patient.name=Smith
660            // Find resources of the target type matching the value,
661            // then find base resources referencing them
662            let ref_param = parts[0];
663            let target_param = parts[1];
664
665            let sql = format!(
666                "SELECT DISTINCT si_ref.resource_id
667                 FROM search_index si_ref
668                 WHERE si_ref.tenant_id = $1
669                   AND si_ref.resource_type = $2
670                   AND si_ref.param_name = '{}'
671                   AND si_ref.value_reference IN (
672                       SELECT resource_type || '/' || resource_id
673                       FROM search_index si_target
674                       WHERE si_target.tenant_id = $1
675                         AND si_target.param_name = '{}'
676                         AND si_target.value_string ILIKE $3
677                   )",
678                ref_param, target_param
679            );
680
681            let rows = client
682                .query(&sql, &[&tenant_id, &base_type, &format!("{}%", value)])
683                .await
684                .map_err(|e| internal_error(format!("Failed to execute chain query: {}", e)))?;
685
686            let ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
687            Ok(ids)
688        } else {
689            // Multi-step or single parameter chain - simplified implementation
690            Ok(Vec::new())
691        }
692    }
693
694    async fn resolve_reverse_chain(
695        &self,
696        tenant: &TenantContext,
697        base_type: &str,
698        reverse_chain: &ReverseChainedParameter,
699    ) -> StorageResult<Vec<String>> {
700        let client = self.get_client().await?;
701        let tenant_id = tenant.tenant_id().as_str();
702
703        // _has:Observation:patient:code=1234-5
704        // Find Observations with code=1234-5, then find the Patient IDs they reference
705        let value_str = reverse_chain
706            .value
707            .as_ref()
708            .map(|v| v.value.clone())
709            .unwrap_or_default();
710
711        let sql = format!(
712            "SELECT DISTINCT si_ref.value_reference
713             FROM search_index si_ref
714             INNER JOIN search_index si_val
715                ON si_ref.tenant_id = si_val.tenant_id
716                AND si_ref.resource_type = si_val.resource_type
717                AND si_ref.resource_id = si_val.resource_id
718             WHERE si_ref.tenant_id = $1
719               AND si_ref.resource_type = '{}'
720               AND si_ref.param_name = '{}'
721               AND si_val.param_name = '{}'
722               AND (si_val.value_token_code = $2
723                    OR si_val.value_string ILIKE $3)",
724            reverse_chain.source_type, reverse_chain.reference_param, reverse_chain.search_param
725        );
726
727        let like_value = format!("{}%", value_str);
728        let rows = client
729            .query(
730                &sql,
731                &[&tenant_id, &value_str.as_str(), &like_value.as_str()],
732            )
733            .await
734            .map_err(|e| internal_error(format!("Failed to execute reverse chain query: {}", e)))?;
735
736        let mut ids = Vec::new();
737        for row in &rows {
738            let reference: String = row.get(0);
739            // Extract ID from "ResourceType/ID" reference
740            let expected_prefix = format!("{}/", base_type);
741            if let Some(id) = reference.strip_prefix(&expected_prefix) {
742                ids.push(id.to_string());
743            }
744        }
745
746        Ok(ids)
747    }
748}
749
750#[async_trait]
751impl TextSearchProvider for PostgresBackend {
752    async fn search_text(
753        &self,
754        tenant: &TenantContext,
755        resource_type: &str,
756        text: &str,
757        pagination: &Pagination,
758    ) -> StorageResult<SearchResult> {
759        let client = self.get_client().await?;
760        let tenant_id = tenant.tenant_id().as_str();
761        let count = pagination.count as usize;
762
763        // Use PostgreSQL native FTS with tsvector/tsquery
764        let sql = format!(
765            "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
766                    ts_rank(fts.narrative_tsvector, plainto_tsquery('english', $3)) AS rank
767             FROM resources r
768             INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
769                AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
770             WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
771             AND fts.narrative_tsvector @@ plainto_tsquery('english', $3)
772             ORDER BY rank DESC, r.last_updated DESC
773             LIMIT {}",
774            count + 1
775        );
776
777        let rows = client
778            .query(&sql, &[&tenant_id, &resource_type, &text])
779            .await
780            .map_err(|e| internal_error(format!("Failed to execute text search: {}", e)))?;
781
782        let mut resources = Vec::new();
783        for row in &rows {
784            let id: String = row.get(0);
785            let version_id: String = row.get(1);
786            let json_data: serde_json::Value = row.get(2);
787            let last_updated: chrono::DateTime<Utc> = row.get(3);
788            let fhir_version_str: String = row.get(4);
789
790            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
791
792            resources.push(StoredResource::from_storage(
793                resource_type,
794                id,
795                version_id,
796                tenant.tenant_id().clone(),
797                json_data,
798                last_updated,
799                last_updated,
800                None,
801                fhir_version,
802            ));
803        }
804
805        let has_next = resources.len() > count;
806        if has_next {
807            resources.pop();
808        }
809
810        let page_info = PageInfo {
811            next_cursor: None,
812            previous_cursor: None,
813            total: None,
814            has_next,
815            has_previous: false,
816        };
817
818        Ok(SearchResult {
819            resources: Page::new(resources, page_info),
820            included: Vec::new(),
821            total: None,
822        })
823    }
824
825    async fn search_content(
826        &self,
827        tenant: &TenantContext,
828        resource_type: &str,
829        content: &str,
830        pagination: &Pagination,
831    ) -> StorageResult<SearchResult> {
832        let client = self.get_client().await?;
833        let tenant_id = tenant.tenant_id().as_str();
834        let count = pagination.count as usize;
835
836        // Use content_tsvector for _content search
837        let sql = format!(
838            "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
839                    ts_rank(fts.content_tsvector, plainto_tsquery('english', $3)) AS rank
840             FROM resources r
841             INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
842                AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
843             WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
844             AND fts.content_tsvector @@ plainto_tsquery('english', $3)
845             ORDER BY rank DESC, r.last_updated DESC
846             LIMIT {}",
847            count + 1
848        );
849
850        let rows = client
851            .query(&sql, &[&tenant_id, &resource_type, &content])
852            .await
853            .map_err(|e| internal_error(format!("Failed to execute content search: {}", e)))?;
854
855        let mut resources = Vec::new();
856        for row in &rows {
857            let id: String = row.get(0);
858            let version_id: String = row.get(1);
859            let json_data: serde_json::Value = row.get(2);
860            let last_updated: chrono::DateTime<Utc> = row.get(3);
861            let fhir_version_str: String = row.get(4);
862
863            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
864
865            resources.push(StoredResource::from_storage(
866                resource_type,
867                id,
868                version_id,
869                tenant.tenant_id().clone(),
870                json_data,
871                last_updated,
872                last_updated,
873                None,
874                fhir_version,
875            ));
876        }
877
878        let has_next = resources.len() > count;
879        if has_next {
880            resources.pop();
881        }
882
883        let page_info = PageInfo {
884            next_cursor: None,
885            previous_cursor: None,
886            total: None,
887            has_next,
888            has_previous: false,
889        };
890
891        Ok(SearchResult {
892            resources: Page::new(resources, page_info),
893            included: Vec::new(),
894            total: None,
895        })
896    }
897}
898
899// Helper methods for search implementations
900impl PostgresBackend {
901    /// Extract timestamp and ID from a cursor for keyset pagination.
902    fn extract_cursor_values(cursor: &PageCursor) -> StorageResult<(String, String)> {
903        let sort_values = cursor.sort_values();
904        let timestamp = match sort_values.first() {
905            Some(CursorValue::String(s)) => s.clone(),
906            _ => {
907                return Err(internal_error(
908                    "Invalid cursor: missing or invalid timestamp".to_string(),
909                ));
910            }
911        };
912        let id = cursor.resource_id().to_string();
913        Ok((timestamp, id))
914    }
915
916    /// Extract references from a resource for a given search parameter.
917    fn extract_references(content: &serde_json::Value, search_param: &str) -> Vec<String> {
918        let mut refs = Vec::new();
919        if let Some(value) = content.get(search_param) {
920            Self::collect_references_from_value(value, &mut refs);
921        }
922        refs
923    }
924
925    /// Recursively collect reference strings from a JSON value.
926    fn collect_references_from_value(value: &serde_json::Value, refs: &mut Vec<String>) {
927        match value {
928            serde_json::Value::Object(obj) => {
929                if let Some(serde_json::Value::String(ref_str)) = obj.get("reference") {
930                    refs.push(ref_str.clone());
931                }
932                for v in obj.values() {
933                    Self::collect_references_from_value(v, refs);
934                }
935            }
936            serde_json::Value::Array(arr) => {
937                for item in arr {
938                    Self::collect_references_from_value(item, refs);
939                }
940            }
941            _ => {}
942        }
943    }
944
945    /// Parse a reference string into (type, id).
946    fn parse_reference(reference: &str) -> Option<(String, String)> {
947        let path = reference
948            .strip_prefix("http://")
949            .or_else(|| reference.strip_prefix("https://"))
950            .map(|s| s.rsplit('/').take(2).collect::<Vec<_>>())
951            .unwrap_or_else(|| reference.split('/').collect());
952
953        if path.len() >= 2 {
954            if reference.starts_with("http") {
955                Some((path[1].to_string(), path[0].to_string()))
956            } else {
957                Some((path[0].to_string(), path[1].to_string()))
958            }
959        } else {
960            None
961        }
962    }
963
964    /// Fetch a single resource by type and ID.
965    async fn fetch_resource(
966        client: &deadpool_postgres::Client,
967        tenant_id: &str,
968        resource_type: &str,
969        id: &str,
970    ) -> StorageResult<Option<StoredResource>> {
971        let rows = client
972            .query(
973                "SELECT version_id, data, last_updated, fhir_version FROM resources
974                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
975                &[&tenant_id, &resource_type, &id],
976            )
977            .await
978            .map_err(|e| internal_error(format!("Failed to fetch resource: {}", e)))?;
979
980        if rows.is_empty() {
981            return Ok(None);
982        }
983
984        let row = &rows[0];
985        let version_id: String = row.get(0);
986        let json_data: serde_json::Value = row.get(1);
987        let last_updated: chrono::DateTime<Utc> = row.get(2);
988        let fhir_version_str: String = row.get(3);
989        let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
990
991        Ok(Some(StoredResource::from_storage(
992            resource_type,
993            id,
994            version_id,
995            crate::tenant::TenantId::new(tenant_id),
996            json_data,
997            last_updated,
998            last_updated,
999            None,
1000            fhir_version,
1001        )))
1002    }
1003}