1use std::collections::HashSet;
11
12use async_trait::async_trait;
13use chrono::Utc;
14use helios_fhir::FhirVersion;
15
16use crate::core::{
17 ChainedSearchProvider, IncludeProvider, MultiTypeSearchProvider, ResourceStorage,
18 RevincludeProvider, SearchProvider, SearchResult, TextSearchProvider,
19};
20use crate::error::{BackendError, StorageError, StorageResult};
21use crate::tenant::TenantContext;
22use crate::types::{
23 CursorDirection, CursorValue, IncludeDirective, Page, PageCursor, PageInfo, Pagination,
24 ReverseChainedParameter, SearchQuery, StoredResource,
25};
26
27use super::PostgresBackend;
28use super::search::chain_builder::ChainQueryBuilder;
29use super::search::query_builder::{PostgresQueryBuilder, SortValueKind, SqlParam};
30
31fn internal_error(message: String) -> StorageError {
32 StorageError::Backend(BackendError::Internal {
33 backend_name: "postgres".to_string(),
34 message,
35 source: None,
36 })
37}
38
39#[async_trait]
40impl SearchProvider for PostgresBackend {
41 async fn search(
42 &self,
43 tenant: &TenantContext,
44 query: &SearchQuery,
45 ) -> StorageResult<SearchResult> {
46 if query.contained != crate::types::ContainedMode::Off {
49 return self.search_contained(tenant, query).await;
50 }
51
52 let total = if query.wants_total() {
56 Some(self.search_count(tenant, query).await?)
57 } else {
58 None
59 };
60
61 let client = self.get_client().await?;
62 let tenant_id = tenant.tenant_id().as_str();
63 let resource_type = &query.resource_type;
64
65 let count = query.count.unwrap_or(100) as usize;
67
68 let keyset = PostgresQueryBuilder::primary_keyset_key(query);
72
73 let cursor = if keyset.is_some() {
75 query
76 .cursor
77 .as_ref()
78 .and_then(|c| PageCursor::decode(c).ok())
79 } else {
80 None
81 };
82
83 let param_offset = if cursor.is_some() { 4 } else { 2 };
86
87 let search_filter = if !query.parameters.is_empty() || query.compartment.is_some() {
88 PostgresQueryBuilder::build_search_query(query, param_offset)
89 } else {
90 None
91 };
92 let filter_clause = search_filter
93 .as_ref()
94 .map(|f| format!(" AND ({})", f.sql))
95 .unwrap_or_default();
96 let search_params = search_filter.map(|f| f.params).unwrap_or_default();
97
98 let select_cols = match &keyset {
100 Some(k) => format!(
101 "id, version_id, data, last_updated, fhir_version, {} AS sort_key",
102 k.expr
103 ),
104 None => "id, version_id, data, last_updated, fhir_version".to_string(),
105 };
106
107 let order_by = if query.sort.is_empty() {
109 "ORDER BY last_updated DESC, id ASC".to_string()
110 } else {
111 PostgresQueryBuilder::build_order_by(query)
112 };
113
114 let (sql, has_previous) = if let (Some(cursor), Some(k)) = (&cursor, &keyset) {
116 let e = &k.expr;
117 let asc = k.direction == crate::types::SortDirection::Ascending;
118 match cursor.direction() {
119 CursorDirection::Next => {
120 let e_op = if asc { ">" } else { "<" };
121 let sql = format!(
122 "SELECT {cols} FROM resources \
123 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
124 AND ({e} {e_op} $3 OR ({e} = $3 AND id > $4)) \
125 ORDER BY {e} {dir}, id ASC LIMIT {lim}",
126 cols = select_cols,
127 filter = filter_clause,
128 e = e,
129 e_op = e_op,
130 dir = if asc { "ASC" } else { "DESC" },
131 lim = count + 1,
132 );
133 (sql, true)
134 }
135 CursorDirection::Previous => {
136 let e_op = if asc { "<" } else { ">" };
137 let sql = format!(
138 "SELECT {cols} FROM resources \
139 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
140 AND ({e} {e_op} $3 OR ({e} = $3 AND id < $4)) \
141 ORDER BY {e} {dir}, id DESC LIMIT {lim}",
142 cols = select_cols,
143 filter = filter_clause,
144 e = e,
145 e_op = e_op,
146 dir = if asc { "DESC" } else { "ASC" },
147 lim = count + 1,
148 );
149 (sql, false)
150 }
151 }
152 } else if let Some(offset) = query.offset {
153 let sql = format!(
154 "SELECT {cols} FROM resources \
155 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
156 {order} LIMIT {lim} OFFSET {off}",
157 cols = select_cols,
158 filter = filter_clause,
159 order = order_by,
160 lim = count + 1,
161 off = offset,
162 );
163 (sql, offset > 0)
164 } else {
165 let sql = format!(
166 "SELECT {cols} FROM resources \
167 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE{filter} \
168 {order} LIMIT {lim}",
169 cols = select_cols,
170 filter = filter_clause,
171 order = order_by,
172 lim = count + 1,
173 );
174 (sql, false)
175 };
176
177 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
179 Box::new(tenant_id.to_string()),
180 Box::new(resource_type.to_string()),
181 ];
182 if let (Some(cursor), Some(k)) = (&cursor, &keyset) {
183 Self::bind_cursor_value(&mut params, k.kind, cursor)?;
184 params.push(Box::new(cursor.resource_id().to_string()));
185 }
186 for param in &search_params {
187 match param {
188 SqlParam::Text(s) => params.push(Box::new(s.clone())),
189 SqlParam::Float(f) => params.push(Box::new(*f)),
190 SqlParam::Integer(i) => params.push(Box::new(*i)),
191 SqlParam::Bool(b) => params.push(Box::new(*b)),
192 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
193 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
194 }
195 }
196 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
197 .iter()
198 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
199 .collect();
200 let rows = client
201 .query(&sql, ¶m_refs)
202 .await
203 .map_err(|e| internal_error(format!("Failed to execute search: {}", e)))?;
204
205 let mut parsed: Vec<(StoredResource, Option<CursorValue>)> = Vec::new();
207 for row in &rows {
208 let id: String = row.get(0);
209 let version_id: String = row.get(1);
210 let json_data: serde_json::Value = row.get(2);
211 let last_updated: chrono::DateTime<Utc> = row.get(3);
212 let fhir_version_str: String = row.get(4);
213 let sort_key = keyset
214 .as_ref()
215 .map(|k| Self::read_cursor_value(row, 5, k.kind));
216
217 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
218 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
219 let resource = StoredResource::from_storage(
220 resource_type.clone(),
221 id,
222 version_id,
223 tenant.tenant_id().clone(),
224 json_data,
225 last_updated,
226 last_updated,
227 None,
228 fhir_version,
229 );
230 parsed.push((resource, sort_key));
231 }
232
233 if cursor
235 .as_ref()
236 .map(|c| c.direction() == CursorDirection::Previous)
237 .unwrap_or(false)
238 {
239 parsed.reverse();
240 }
241
242 let has_next = parsed.len() > count;
244 if has_next {
245 parsed.pop();
246 }
247
248 let next_cursor = if has_next {
249 parsed.last().map(|(r, sk)| {
250 PageCursor::new(vec![sk.clone().unwrap_or(CursorValue::Null)], r.id()).encode()
251 })
252 } else {
253 None
254 };
255 let previous_cursor = if has_previous {
256 parsed.first().map(|(r, sk)| {
257 PageCursor::previous(vec![sk.clone().unwrap_or(CursorValue::Null)], r.id()).encode()
258 })
259 } else {
260 None
261 };
262
263 let resources: Vec<StoredResource> = parsed.into_iter().map(|(r, _)| r).collect();
264
265 let page_info = PageInfo {
268 next_cursor,
269 previous_cursor,
270 total,
271 has_next,
272 has_previous,
273 };
274 let page = Page::new(resources, page_info);
275
276 Ok(SearchResult {
277 resources: page,
278 included: Vec::new(),
279 total,
280 scores: Default::default(),
281 })
282 }
283
284 async fn search_count(
285 &self,
286 tenant: &TenantContext,
287 query: &SearchQuery,
288 ) -> StorageResult<u64> {
289 let client = self.get_client().await?;
290 let tenant_id = tenant.tenant_id().as_str();
291 let resource_type = &query.resource_type;
292
293 let (sql, params): (
294 String,
295 Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
296 ) = if !query.parameters.is_empty() || query.compartment.is_some() {
297 let filter = PostgresQueryBuilder::build_search_query(query, 2);
298
299 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
300 Box::new(tenant_id.to_string()),
301 Box::new(resource_type.to_string()),
302 ];
303
304 if let Some(ref fragment) = filter {
305 for param in &fragment.params {
306 match param {
307 SqlParam::Text(s) => params.push(Box::new(s.clone())),
308 SqlParam::Float(f) => params.push(Box::new(*f)),
309 SqlParam::Integer(i) => params.push(Box::new(*i)),
310 SqlParam::Bool(b) => params.push(Box::new(*b)),
311 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
312 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
313 }
314 }
315
316 let sql = format!(
317 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND ({})",
318 fragment.sql
319 );
320 (sql, params)
321 } else {
322 let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
323 (sql, params)
324 }
325 } else {
326 let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
327 let params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
328 Box::new(tenant_id.to_string()),
329 Box::new(resource_type.to_string()),
330 ];
331 (sql, params)
332 };
333
334 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
335 .iter()
336 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
337 .collect();
338
339 let row = client
340 .query_one(&sql, ¶m_refs)
341 .await
342 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
343
344 let count: i64 = row.get(0);
345 Ok(count as u64)
346 }
347
348 fn search_param_registry(
349 &self,
350 ) -> &std::sync::Arc<parking_lot::RwLock<crate::search::SearchParameterRegistry>> {
351 self.search_registry()
352 }
353
354 fn supports_contained_search(&self) -> bool {
355 true
356 }
357
358 fn modifiers_for_param_type(
359 &self,
360 param_type: crate::types::SearchParamType,
361 ) -> Vec<&'static str> {
362 Self::modifiers_for_type(param_type)
363 }
364}
365
366#[async_trait]
367impl MultiTypeSearchProvider for PostgresBackend {
368 async fn search_multi(
369 &self,
370 tenant: &TenantContext,
371 resource_types: &[&str],
372 query: &SearchQuery,
373 ) -> StorageResult<SearchResult> {
374 let client = self.get_client().await?;
375 let tenant_id = tenant.tenant_id().as_str();
376
377 let count = query.count.unwrap_or(100) as usize;
378 let offset = query.offset.unwrap_or(0) as usize;
379
380 let type_filter = if resource_types.is_empty() {
382 String::new()
383 } else {
384 let types: Vec<String> = resource_types
385 .iter()
386 .map(|t| format!("'{}'", t.replace('\'', "''")))
387 .collect();
388 format!(" AND resource_type IN ({})", types.join(", "))
389 };
390
391 let sql = format!(
392 "SELECT resource_type, id, version_id, data, last_updated, fhir_version FROM resources
393 WHERE tenant_id = $1 AND is_deleted = FALSE{}
394 ORDER BY last_updated DESC, id DESC
395 LIMIT {} OFFSET {}",
396 type_filter,
397 count + 1,
398 offset
399 );
400
401 let rows = client
402 .query(&sql, &[&tenant_id])
403 .await
404 .map_err(|e| internal_error(format!("Failed to execute multi-type search: {}", e)))?;
405
406 let mut resources = Vec::new();
407 for row in &rows {
408 let res_type: String = row.get(0);
409 let id: String = row.get(1);
410 let version_id: String = row.get(2);
411 let json_data: serde_json::Value = row.get(3);
412 let last_updated: chrono::DateTime<Utc> = row.get(4);
413 let fhir_version_str: String = row.get(5);
414
415 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
416 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
417
418 let resource = StoredResource::from_storage(
419 res_type,
420 id,
421 version_id,
422 tenant.tenant_id().clone(),
423 json_data,
424 last_updated,
425 last_updated,
426 None,
427 fhir_version,
428 );
429
430 resources.push(resource);
431 }
432
433 let has_next = resources.len() > count;
434 if has_next {
435 resources.pop();
436 }
437
438 let page_info = PageInfo {
439 next_cursor: None,
440 previous_cursor: None,
441 total: None,
442 has_next,
443 has_previous: offset > 0,
444 };
445
446 Ok(SearchResult {
447 resources: Page::new(resources, page_info),
448 included: Vec::new(),
449 total: None,
450 scores: Default::default(),
451 })
452 }
453}
454
455#[async_trait]
456impl IncludeProvider for PostgresBackend {
457 async fn resolve_includes(
458 &self,
459 tenant: &TenantContext,
460 resources: &[StoredResource],
461 includes: &[IncludeDirective],
462 ) -> StorageResult<Vec<StoredResource>> {
463 if resources.is_empty() || includes.is_empty() {
464 return Ok(Vec::new());
465 }
466
467 let client = self.get_client().await?;
468 let tenant_id = tenant.tenant_id().as_str();
469
470 let mut included = Vec::new();
471 let mut seen_refs: HashSet<String> = HashSet::new();
472
473 for include in includes {
474 for resource in resources {
475 if resource.resource_type() != include.source_type {
476 continue;
477 }
478
479 let refs = Self::extract_references(resource.content(), &include.search_param);
480
481 for reference in refs {
482 if let Some((ref_type, ref_id)) = Self::parse_reference(&reference) {
483 if let Some(ref target) = include.target_type {
484 if ref_type != *target {
485 continue;
486 }
487 }
488
489 let ref_key = format!("{}/{}", ref_type, ref_id);
490 if seen_refs.contains(&ref_key) {
491 continue;
492 }
493 seen_refs.insert(ref_key);
494
495 if let Some(included_resource) =
496 Self::fetch_resource(&client, tenant_id, &ref_type, &ref_id).await?
497 {
498 included.push(included_resource);
499 }
500 }
501 }
502 }
503 }
504
505 Ok(included)
506 }
507}
508
509#[async_trait]
510impl RevincludeProvider for PostgresBackend {
511 async fn resolve_revincludes(
512 &self,
513 tenant: &TenantContext,
514 resources: &[StoredResource],
515 revincludes: &[IncludeDirective],
516 ) -> StorageResult<Vec<StoredResource>> {
517 if resources.is_empty() || revincludes.is_empty() {
518 return Ok(Vec::new());
519 }
520
521 let client = self.get_client().await?;
522 let tenant_id = tenant.tenant_id().as_str();
523
524 let mut included = Vec::new();
525 let mut seen_ids: HashSet<String> = HashSet::new();
526
527 for revinclude in revincludes {
528 let mut reference_values: Vec<String> = Vec::new();
529 for resource in resources {
530 reference_values.push(format!("{}/{}", resource.resource_type(), resource.id()));
531 reference_values.push(resource.id().to_string());
532 }
533
534 if reference_values.is_empty() {
535 continue;
536 }
537
538 let placeholders: Vec<String> = (0..reference_values.len())
540 .map(|i| format!("${}", i + 3))
541 .collect();
542
543 let sql = format!(
544 "SELECT DISTINCT r.id, r.version_id, r.data, r.last_updated, r.fhir_version
545 FROM resources r
546 INNER JOIN search_index si ON r.tenant_id = si.tenant_id
547 AND r.resource_type = si.resource_type
548 AND r.id = si.resource_id
549 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
550 AND si.param_name = '{}'
551 AND si.value_reference IN ({})",
552 revinclude.search_param,
553 placeholders.join(", ")
554 );
555
556 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
557 Box::new(tenant_id.to_string()),
558 Box::new(revinclude.source_type.clone()),
559 ];
560 for rv in &reference_values {
561 params.push(Box::new(rv.clone()));
562 }
563
564 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
565 .iter()
566 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
567 .collect();
568
569 let rows = client.query(&sql, ¶m_refs).await.map_err(|e| {
570 internal_error(format!("Failed to execute revinclude query: {}", e))
571 })?;
572
573 for row in &rows {
574 let id: String = row.get(0);
575 let version_id: String = row.get(1);
576 let json_data: serde_json::Value = row.get(2);
577 let last_updated: chrono::DateTime<Utc> = row.get(3);
578 let fhir_version_str: String = row.get(4);
579
580 let resource_key = format!("{}/{}", revinclude.source_type, id);
581 if seen_ids.contains(&resource_key) {
582 continue;
583 }
584 seen_ids.insert(resource_key);
585
586 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
587 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
588
589 let resource = StoredResource::from_storage(
590 &revinclude.source_type,
591 id,
592 version_id,
593 tenant.tenant_id().clone(),
594 json_data,
595 last_updated,
596 last_updated,
597 None,
598 fhir_version,
599 );
600
601 included.push(resource);
602 }
603 }
604
605 Ok(included)
606 }
607}
608
609#[async_trait]
610impl ChainedSearchProvider for PostgresBackend {
611 async fn resolve_chain(
612 &self,
613 tenant: &TenantContext,
614 base_type: &str,
615 chain: &str,
616 value: &str,
617 ) -> StorageResult<Vec<String>> {
618 if chain.is_empty() {
619 return Ok(Vec::new());
620 }
621
622 let client = self.get_client().await?;
623 let tenant_id = tenant.tenant_id().as_str();
624
625 let builder = ChainQueryBuilder::new(tenant_id, base_type, self.search_registry().clone())
630 .with_param_offset(1);
631 let parsed = builder
632 .parse_chain(chain)
633 .map_err(|e| internal_error(format!("Failed to parse chain: {}", e)))?;
634 let parsed_value = crate::types::SearchValue::parse(value);
635 let fragment = builder.build_forward_chain_sql(&parsed, &parsed_value)?;
636
637 let sql = format!(
638 "SELECT r.id FROM resources r WHERE r.tenant_id = $1 \
639 AND r.resource_type = '{base}' AND r.is_deleted = FALSE AND {clause}",
640 base = base_type,
641 clause = fragment.sql,
642 );
643
644 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
645 vec![Box::new(tenant_id.to_string())];
646 for p in &fragment.params {
647 match p {
648 SqlParam::Text(s) => params.push(Box::new(s.clone())),
649 SqlParam::Float(f) => params.push(Box::new(*f)),
650 SqlParam::Integer(i) => params.push(Box::new(*i)),
651 SqlParam::Bool(b) => params.push(Box::new(*b)),
652 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
653 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
654 }
655 }
656 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
657 .iter()
658 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
659 .collect();
660
661 let rows = client
662 .query(&sql, ¶m_refs)
663 .await
664 .map_err(|e| internal_error(format!("Failed to execute chain query: {}", e)))?;
665
666 Ok(rows.iter().map(|r| r.get(0)).collect())
667 }
668
669 async fn resolve_reverse_chain(
670 &self,
671 tenant: &TenantContext,
672 base_type: &str,
673 reverse_chain: &ReverseChainedParameter,
674 ) -> StorageResult<Vec<String>> {
675 let client = self.get_client().await?;
676 let tenant_id = tenant.tenant_id().as_str();
677
678 let builder = ChainQueryBuilder::new(tenant_id, base_type, self.search_registry().clone())
682 .with_param_offset(1);
683 let fragment = builder.build_reverse_chain_sql(reverse_chain)?;
684
685 let sql = format!(
686 "SELECT r.id FROM resources r WHERE r.tenant_id = $1 \
687 AND r.resource_type = '{base}' AND r.is_deleted = FALSE AND {clause}",
688 base = base_type,
689 clause = fragment.sql,
690 );
691
692 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
693 vec![Box::new(tenant_id.to_string())];
694 for p in &fragment.params {
695 match p {
696 SqlParam::Text(s) => params.push(Box::new(s.clone())),
697 SqlParam::Float(f) => params.push(Box::new(*f)),
698 SqlParam::Integer(i) => params.push(Box::new(*i)),
699 SqlParam::Bool(b) => params.push(Box::new(*b)),
700 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
701 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
702 }
703 }
704 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
705 .iter()
706 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
707 .collect();
708
709 let rows = client
710 .query(&sql, ¶m_refs)
711 .await
712 .map_err(|e| internal_error(format!("Failed to execute reverse chain query: {}", e)))?;
713
714 Ok(rows.iter().map(|r| r.get(0)).collect())
715 }
716}
717
718#[async_trait]
719impl TextSearchProvider for PostgresBackend {
720 async fn search_text(
721 &self,
722 tenant: &TenantContext,
723 resource_type: &str,
724 text: &str,
725 pagination: &Pagination,
726 ) -> StorageResult<SearchResult> {
727 let client = self.get_client().await?;
728 let tenant_id = tenant.tenant_id().as_str();
729 let count = pagination.count as usize;
730
731 let sql = format!(
733 "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
734 ts_rank(fts.narrative_tsvector, plainto_tsquery('english', $3)) AS rank
735 FROM resources r
736 INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
737 AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
738 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
739 AND fts.narrative_tsvector @@ plainto_tsquery('english', $3)
740 ORDER BY rank DESC, r.last_updated DESC
741 LIMIT {}",
742 count + 1
743 );
744
745 let rows = client
746 .query(&sql, &[&tenant_id, &resource_type, &text])
747 .await
748 .map_err(|e| internal_error(format!("Failed to execute text search: {}", e)))?;
749
750 let mut resources = Vec::new();
751 for row in &rows {
752 let id: String = row.get(0);
753 let version_id: String = row.get(1);
754 let json_data: serde_json::Value = row.get(2);
755 let last_updated: chrono::DateTime<Utc> = row.get(3);
756 let fhir_version_str: String = row.get(4);
757
758 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
759 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
760
761 resources.push(StoredResource::from_storage(
762 resource_type,
763 id,
764 version_id,
765 tenant.tenant_id().clone(),
766 json_data,
767 last_updated,
768 last_updated,
769 None,
770 fhir_version,
771 ));
772 }
773
774 let has_next = resources.len() > count;
775 if has_next {
776 resources.pop();
777 }
778
779 let page_info = PageInfo {
780 next_cursor: None,
781 previous_cursor: None,
782 total: None,
783 has_next,
784 has_previous: false,
785 };
786
787 Ok(SearchResult {
788 resources: Page::new(resources, page_info),
789 included: Vec::new(),
790 total: None,
791 scores: Default::default(),
792 })
793 }
794
795 async fn search_content(
796 &self,
797 tenant: &TenantContext,
798 resource_type: &str,
799 content: &str,
800 pagination: &Pagination,
801 ) -> StorageResult<SearchResult> {
802 let client = self.get_client().await?;
803 let tenant_id = tenant.tenant_id().as_str();
804 let count = pagination.count as usize;
805
806 let sql = format!(
808 "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
809 ts_rank(fts.content_tsvector, plainto_tsquery('english', $3)) AS rank
810 FROM resources r
811 INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
812 AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
813 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
814 AND fts.content_tsvector @@ plainto_tsquery('english', $3)
815 ORDER BY rank DESC, r.last_updated DESC
816 LIMIT {}",
817 count + 1
818 );
819
820 let rows = client
821 .query(&sql, &[&tenant_id, &resource_type, &content])
822 .await
823 .map_err(|e| internal_error(format!("Failed to execute content search: {}", e)))?;
824
825 let mut resources = Vec::new();
826 for row in &rows {
827 let id: String = row.get(0);
828 let version_id: String = row.get(1);
829 let json_data: serde_json::Value = row.get(2);
830 let last_updated: chrono::DateTime<Utc> = row.get(3);
831 let fhir_version_str: String = row.get(4);
832
833 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
834 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
835
836 resources.push(StoredResource::from_storage(
837 resource_type,
838 id,
839 version_id,
840 tenant.tenant_id().clone(),
841 json_data,
842 last_updated,
843 last_updated,
844 None,
845 fhir_version,
846 ));
847 }
848
849 let has_next = resources.len() > count;
850 if has_next {
851 resources.pop();
852 }
853
854 let page_info = PageInfo {
855 next_cursor: None,
856 previous_cursor: None,
857 total: None,
858 has_next,
859 has_previous: false,
860 };
861
862 Ok(SearchResult {
863 resources: Page::new(resources, page_info),
864 included: Vec::new(),
865 total: None,
866 scores: Default::default(),
867 })
868 }
869}
870
871fn extract_contained_resource(
874 content: &serde_json::Value,
875 local_id: &str,
876) -> Option<serde_json::Value> {
877 content
878 .get("contained")?
879 .as_array()?
880 .iter()
881 .find(|e| e.get("id").and_then(|v| v.as_str()) == Some(local_id))
882 .cloned()
883}
884
885fn build_contained_stored(
888 container: &StoredResource,
889 contained_type: &str,
890 local_id: &str,
891 content: serde_json::Value,
892) -> StoredResource {
893 StoredResource::from_storage(
894 contained_type.to_string(),
895 local_id.to_string(),
896 container.version_id().to_string(),
897 container.tenant_id().clone(),
898 content,
899 container.created_at(),
900 container.last_modified(),
901 None,
902 container.fhir_version(),
903 )
904}
905
906impl PostgresBackend {
908 async fn search_contained(
915 &self,
916 tenant: &TenantContext,
917 query: &SearchQuery,
918 ) -> StorageResult<SearchResult> {
919 use crate::types::{ContainedMode, ContainedReturn};
920
921 let tenant_id = tenant.tenant_id().as_str();
922 let contained_type = query.resource_type.as_str();
923
924 let matches: Vec<(String, String, Option<String>)> =
926 match PostgresQueryBuilder::build_contained(query) {
927 Some(fragment) => {
928 let client = self.get_client().await?;
929 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
930 Box::new(tenant_id.to_string()),
931 Box::new(contained_type.to_string()),
932 ];
933 for param in &fragment.params {
934 match param {
935 SqlParam::Text(s) => params.push(Box::new(s.clone())),
936 SqlParam::Float(f) => params.push(Box::new(*f)),
937 SqlParam::Integer(i) => params.push(Box::new(*i)),
938 SqlParam::Bool(b) => params.push(Box::new(*b)),
939 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
940 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
941 }
942 }
943 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
944 .iter()
945 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
946 .collect();
947 let rows = client
948 .query(&fragment.sql, ¶m_refs)
949 .await
950 .map_err(|e| {
951 internal_error(format!("Failed to execute contained query: {e}"))
952 })?;
953 rows.iter()
954 .map(|row| {
955 (
956 row.get::<_, String>(0),
957 row.get::<_, String>(1),
958 row.get::<_, Option<String>>(2),
959 )
960 })
961 .collect()
962 }
963 None => Vec::new(),
964 };
965
966 let mut items: Vec<StoredResource> = Vec::new();
968 let mut seen: HashSet<String> = HashSet::new();
969 match query.contained_return {
970 ContainedReturn::Container => {
971 for (ctype, cid, _) in &matches {
972 if !seen.insert(format!("{ctype}/{cid}")) {
973 continue;
974 }
975 if let Some(container) = self.read(tenant, ctype, cid).await? {
976 items.push(container);
977 }
978 }
979 }
980 ContainedReturn::Contained => {
981 for (ctype, cid, local) in &matches {
982 let Some(local_id) = local else { continue };
983 if !seen.insert(format!("{ctype}/{cid}#{local_id}")) {
984 continue;
985 }
986 if let Some(container) = self.read(tenant, ctype, cid).await? {
987 if let Some(c) = extract_contained_resource(container.content(), local_id) {
988 items.push(build_contained_stored(
989 &container,
990 contained_type,
991 local_id,
992 c,
993 ));
994 }
995 }
996 }
997 }
998 }
999
1000 if query.contained == ContainedMode::Both {
1002 let mut top_query = query.clone();
1003 top_query.contained = ContainedMode::Off;
1004 top_query.contained_return = ContainedReturn::Container;
1005 let top = self.search(tenant, &top_query).await?;
1006 let mut merged = top.resources.items;
1007 let top_urls: HashSet<String> = merged.iter().map(|r| r.url()).collect();
1008 for item in items {
1009 if !top_urls.contains(&item.url()) {
1010 merged.push(item);
1011 }
1012 }
1013 items = merged;
1014 }
1015
1016 let count = query.count.unwrap_or(100) as usize;
1018 let offset = query.offset.unwrap_or(0) as usize;
1019 let total_matches = items.len() as u64;
1020 let windowed: Vec<StoredResource> = items.into_iter().skip(offset).take(count).collect();
1021
1022 let total = if query.wants_total() {
1023 Some(total_matches)
1024 } else {
1025 None
1026 };
1027 let page = Page::new(windowed, PageInfo::end());
1028 let mut result = SearchResult::new(page);
1029 if let Some(t) = total {
1030 result = result.with_total(t);
1031 }
1032 Ok(result)
1033 }
1034}
1035
1036impl PostgresBackend {
1038 fn bind_cursor_value(
1042 params: &mut Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
1043 kind: SortValueKind,
1044 cursor: &PageCursor,
1045 ) -> StorageResult<()> {
1046 let value = cursor.sort_values().first();
1047 match kind {
1048 SortValueKind::Timestamp => {
1049 let dt = match value {
1050 Some(CursorValue::String(s)) => chrono::DateTime::parse_from_rfc3339(s)
1051 .map(|d| d.with_timezone(&Utc))
1052 .map_err(|_| internal_error("Invalid cursor timestamp".to_string()))?,
1053 _ => {
1054 return Err(internal_error(
1055 "Invalid cursor: expected timestamp".to_string(),
1056 ));
1057 }
1058 };
1059 params.push(Box::new(dt));
1060 }
1061 SortValueKind::Number => {
1062 let n = match value {
1063 Some(CursorValue::Decimal(f)) => *f,
1064 Some(CursorValue::Number(i)) => *i as f64,
1065 Some(CursorValue::String(s)) => s.parse().unwrap_or(0.0),
1066 _ => {
1067 return Err(internal_error(
1068 "Invalid cursor: expected number".to_string(),
1069 ));
1070 }
1071 };
1072 params.push(Box::new(n));
1073 }
1074 SortValueKind::Text => match value {
1075 Some(CursorValue::String(s)) => params.push(Box::new(s.clone())),
1076 Some(CursorValue::Null) | None => params.push(Box::new(Option::<String>::None)),
1077 _ => {
1078 return Err(internal_error("Invalid cursor: expected text".to_string()));
1079 }
1080 },
1081 }
1082 Ok(())
1083 }
1084
1085 fn read_cursor_value(
1087 row: &tokio_postgres::Row,
1088 idx: usize,
1089 kind: SortValueKind,
1090 ) -> CursorValue {
1091 match kind {
1092 SortValueKind::Timestamp => {
1093 let v: Option<chrono::DateTime<Utc>> = row.try_get(idx).ok().flatten();
1094 v.map(|d| CursorValue::String(d.to_rfc3339()))
1095 .unwrap_or(CursorValue::Null)
1096 }
1097 SortValueKind::Number => {
1098 let v: Option<f64> = row.try_get(idx).ok().flatten();
1099 v.map(CursorValue::Decimal).unwrap_or(CursorValue::Null)
1100 }
1101 SortValueKind::Text => {
1102 let v: Option<String> = row.try_get(idx).ok().flatten();
1103 v.map(CursorValue::String).unwrap_or(CursorValue::Null)
1104 }
1105 }
1106 }
1107
1108 fn extract_references(content: &serde_json::Value, search_param: &str) -> Vec<String> {
1110 let mut refs = Vec::new();
1111 if let Some(value) = content.get(search_param) {
1112 Self::collect_references_from_value(value, &mut refs);
1113 }
1114 refs
1115 }
1116
1117 fn collect_references_from_value(value: &serde_json::Value, refs: &mut Vec<String>) {
1119 match value {
1120 serde_json::Value::Object(obj) => {
1121 if let Some(serde_json::Value::String(ref_str)) = obj.get("reference") {
1122 refs.push(ref_str.clone());
1123 }
1124 for v in obj.values() {
1125 Self::collect_references_from_value(v, refs);
1126 }
1127 }
1128 serde_json::Value::Array(arr) => {
1129 for item in arr {
1130 Self::collect_references_from_value(item, refs);
1131 }
1132 }
1133 _ => {}
1134 }
1135 }
1136
1137 fn parse_reference(reference: &str) -> Option<(String, String)> {
1139 let path = reference
1140 .strip_prefix("http://")
1141 .or_else(|| reference.strip_prefix("https://"))
1142 .map(|s| s.rsplit('/').take(2).collect::<Vec<_>>())
1143 .unwrap_or_else(|| reference.split('/').collect());
1144
1145 if path.len() >= 2 {
1146 if reference.starts_with("http") {
1147 Some((path[1].to_string(), path[0].to_string()))
1148 } else {
1149 Some((path[0].to_string(), path[1].to_string()))
1150 }
1151 } else {
1152 None
1153 }
1154 }
1155
1156 async fn fetch_resource(
1158 client: &deadpool_postgres::Client,
1159 tenant_id: &str,
1160 resource_type: &str,
1161 id: &str,
1162 ) -> StorageResult<Option<StoredResource>> {
1163 let rows = client
1164 .query(
1165 "SELECT version_id, data, last_updated, fhir_version FROM resources
1166 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
1167 &[&tenant_id, &resource_type, &id],
1168 )
1169 .await
1170 .map_err(|e| internal_error(format!("Failed to fetch resource: {}", e)))?;
1171
1172 if rows.is_empty() {
1173 return Ok(None);
1174 }
1175
1176 let row = &rows[0];
1177 let version_id: String = row.get(0);
1178 let json_data: serde_json::Value = row.get(1);
1179 let last_updated: chrono::DateTime<Utc> = row.get(2);
1180 let fhir_version_str: String = row.get(3);
1181 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1182 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1183
1184 Ok(Some(StoredResource::from_storage(
1185 resource_type,
1186 id,
1187 version_id,
1188 crate::tenant::TenantId::new(tenant_id),
1189 json_data,
1190 last_updated,
1191 last_updated,
1192 None,
1193 fhir_version,
1194 )))
1195 }
1196}