1use std::collections::HashSet;
11
12use async_trait::async_trait;
13use chrono::Utc;
14use helios_fhir::FhirVersion;
15
16use crate::core::{
17 ChainedSearchProvider, IncludeProvider, MultiTypeSearchProvider, RevincludeProvider,
18 SearchProvider, SearchResult, TextSearchProvider,
19};
20use crate::error::{BackendError, StorageError, StorageResult};
21use crate::tenant::TenantContext;
22use crate::types::{
23 CursorDirection, CursorValue, IncludeDirective, Page, PageCursor, PageInfo, Pagination,
24 ReverseChainedParameter, SearchQuery, StoredResource,
25};
26
27use super::PostgresBackend;
28use super::search::query_builder::{PostgresQueryBuilder, SqlParam};
29
30fn internal_error(message: String) -> StorageError {
31 StorageError::Backend(BackendError::Internal {
32 backend_name: "postgres".to_string(),
33 message,
34 source: None,
35 })
36}
37
38#[async_trait]
39impl SearchProvider for PostgresBackend {
40 async fn search(
41 &self,
42 tenant: &TenantContext,
43 query: &SearchQuery,
44 ) -> StorageResult<SearchResult> {
45 let client = self.get_client().await?;
46 let tenant_id = tenant.tenant_id().as_str();
47 let resource_type = &query.resource_type;
48
49 let count = query.count.unwrap_or(100) as usize;
51
52 let cursor = query
54 .cursor
55 .as_ref()
56 .and_then(|c| PageCursor::decode(c).ok());
57
58 let param_offset = if cursor.is_some() { 4 } else { 2 };
62
63 let search_filter = if !query.parameters.is_empty() {
65 PostgresQueryBuilder::build_search_query(query, param_offset)
66 } else {
67 None
68 };
69
70 let (sql, has_previous, search_params) = if let Some(ref cursor) = cursor {
72 match cursor.direction() {
73 CursorDirection::Next => {
74 let sql = if let Some(ref filter) = search_filter {
75 format!(
76 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
77 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
78 AND ({})
79 AND (last_updated < $3 OR (last_updated = $3 AND id < $4))
80 ORDER BY last_updated DESC, id DESC
81 LIMIT {}",
82 filter.sql,
83 count + 1
84 )
85 } else {
86 format!(
87 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
88 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
89 AND (last_updated < $3 OR (last_updated = $3 AND id < $4))
90 ORDER BY last_updated DESC, id DESC
91 LIMIT {}",
92 count + 1
93 )
94 };
95 (
96 sql,
97 true,
98 search_filter.map(|f| f.params).unwrap_or_default(),
99 )
100 }
101 CursorDirection::Previous => {
102 let sql = if let Some(ref filter) = search_filter {
103 format!(
104 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
105 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
106 AND ({})
107 AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
108 ORDER BY last_updated ASC, id ASC
109 LIMIT {}",
110 filter.sql,
111 count + 1
112 )
113 } else {
114 format!(
115 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
116 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
117 AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
118 ORDER BY last_updated ASC, id ASC
119 LIMIT {}",
120 count + 1
121 )
122 };
123 (
124 sql,
125 false,
126 search_filter.map(|f| f.params).unwrap_or_default(),
127 )
128 }
129 }
130 } else if let Some(offset) = query.offset {
131 let sql = if let Some(ref filter) = search_filter {
133 format!(
134 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
135 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
136 AND ({})
137 ORDER BY last_updated DESC, id DESC
138 LIMIT {} OFFSET {}",
139 filter.sql,
140 count + 1,
141 offset
142 )
143 } else {
144 format!(
145 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
146 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
147 ORDER BY last_updated DESC, id DESC
148 LIMIT {} OFFSET {}",
149 count + 1,
150 offset
151 )
152 };
153 (
154 sql,
155 offset > 0,
156 search_filter.map(|f| f.params).unwrap_or_default(),
157 )
158 } else {
159 let sql = if let Some(ref filter) = search_filter {
161 format!(
162 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
163 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
164 AND ({})
165 ORDER BY last_updated DESC, id DESC
166 LIMIT {}",
167 filter.sql,
168 count + 1
169 )
170 } else {
171 format!(
172 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
173 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
174 ORDER BY last_updated DESC, id DESC
175 LIMIT {}",
176 count + 1
177 )
178 };
179 (
180 sql,
181 false,
182 search_filter.map(|f| f.params).unwrap_or_default(),
183 )
184 };
185
186 let rows = if let Some(ref cursor) = cursor {
188 let (cursor_timestamp, cursor_id) = Self::extract_cursor_values(cursor)?;
189
190 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
192 Box::new(tenant_id.to_string()),
193 Box::new(resource_type.to_string()),
194 Box::new(cursor_timestamp),
195 Box::new(cursor_id),
196 ];
197
198 for param in &search_params {
199 match param {
200 SqlParam::Text(s) => params.push(Box::new(s.clone())),
201 SqlParam::Float(f) => params.push(Box::new(*f)),
202 SqlParam::Integer(i) => params.push(Box::new(*i)),
203 SqlParam::Bool(b) => params.push(Box::new(*b)),
204 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
205 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
206 }
207 }
208
209 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
210 .iter()
211 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
212 .collect();
213
214 client
215 .query(&sql, ¶m_refs)
216 .await
217 .map_err(|e| internal_error(format!("Failed to execute search: {}", e)))?
218 } else {
219 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
221 Box::new(tenant_id.to_string()),
222 Box::new(resource_type.to_string()),
223 ];
224
225 for param in &search_params {
226 match param {
227 SqlParam::Text(s) => params.push(Box::new(s.clone())),
228 SqlParam::Float(f) => params.push(Box::new(*f)),
229 SqlParam::Integer(i) => params.push(Box::new(*i)),
230 SqlParam::Bool(b) => params.push(Box::new(*b)),
231 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
232 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
233 }
234 }
235
236 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
237 .iter()
238 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
239 .collect();
240
241 client
242 .query(&sql, ¶m_refs)
243 .await
244 .map_err(|e| internal_error(format!("Failed to execute search: {}", e)))?
245 };
246
247 let mut resources = Vec::new();
248 for row in &rows {
249 let id: String = row.get(0);
250 let version_id: String = row.get(1);
251 let json_data: serde_json::Value = row.get(2);
252 let last_updated: chrono::DateTime<Utc> = row.get(3);
253 let fhir_version_str: String = row.get(4);
254
255 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
256
257 let resource = StoredResource::from_storage(
258 resource_type.clone(),
259 id,
260 version_id,
261 tenant.tenant_id().clone(),
262 json_data,
263 last_updated,
264 last_updated,
265 None,
266 fhir_version,
267 );
268
269 resources.push(resource);
270 }
271
272 if cursor
274 .as_ref()
275 .map(|c| c.direction() == CursorDirection::Previous)
276 .unwrap_or(false)
277 {
278 resources.reverse();
279 }
280
281 let has_next = resources.len() > count;
283 if has_next {
284 resources.pop();
285 }
286
287 let next_cursor = if has_next {
289 resources.last().map(|r| {
290 let cursor = PageCursor::new(
291 vec![CursorValue::String(r.last_modified().to_rfc3339())],
292 r.id(),
293 );
294 cursor.encode()
295 })
296 } else {
297 None
298 };
299
300 let previous_cursor = if has_previous {
301 resources.first().map(|r| {
302 let cursor = PageCursor::previous(
303 vec![CursorValue::String(r.last_modified().to_rfc3339())],
304 r.id(),
305 );
306 cursor.encode()
307 })
308 } else {
309 None
310 };
311
312 let page_info = PageInfo {
313 next_cursor,
314 previous_cursor,
315 total: None,
316 has_next,
317 has_previous,
318 };
319
320 let page = Page::new(resources, page_info);
321
322 Ok(SearchResult {
323 resources: page,
324 included: Vec::new(),
325 total: None,
326 })
327 }
328
329 async fn search_count(
330 &self,
331 tenant: &TenantContext,
332 query: &SearchQuery,
333 ) -> StorageResult<u64> {
334 let client = self.get_client().await?;
335 let tenant_id = tenant.tenant_id().as_str();
336 let resource_type = &query.resource_type;
337
338 let (sql, params): (
339 String,
340 Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>>,
341 ) = if !query.parameters.is_empty() {
342 let filter = PostgresQueryBuilder::build_search_query(query, 2);
343
344 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
345 Box::new(tenant_id.to_string()),
346 Box::new(resource_type.to_string()),
347 ];
348
349 if let Some(ref fragment) = filter {
350 for param in &fragment.params {
351 match param {
352 SqlParam::Text(s) => params.push(Box::new(s.clone())),
353 SqlParam::Float(f) => params.push(Box::new(*f)),
354 SqlParam::Integer(i) => params.push(Box::new(*i)),
355 SqlParam::Bool(b) => params.push(Box::new(*b)),
356 SqlParam::Timestamp(dt) => params.push(Box::new(*dt)),
357 SqlParam::Null => params.push(Box::new(Option::<String>::None)),
358 }
359 }
360
361 let sql = format!(
362 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE AND ({})",
363 fragment.sql
364 );
365 (sql, params)
366 } else {
367 let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
368 (sql, params)
369 }
370 } else {
371 let sql = "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE".to_string();
372 let params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
373 Box::new(tenant_id.to_string()),
374 Box::new(resource_type.to_string()),
375 ];
376 (sql, params)
377 };
378
379 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
380 .iter()
381 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
382 .collect();
383
384 let row = client
385 .query_one(&sql, ¶m_refs)
386 .await
387 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
388
389 let count: i64 = row.get(0);
390 Ok(count as u64)
391 }
392}
393
394#[async_trait]
395impl MultiTypeSearchProvider for PostgresBackend {
396 async fn search_multi(
397 &self,
398 tenant: &TenantContext,
399 resource_types: &[&str],
400 query: &SearchQuery,
401 ) -> StorageResult<SearchResult> {
402 let client = self.get_client().await?;
403 let tenant_id = tenant.tenant_id().as_str();
404
405 let count = query.count.unwrap_or(100) as usize;
406 let offset = query.offset.unwrap_or(0) as usize;
407
408 let type_filter = if resource_types.is_empty() {
410 String::new()
411 } else {
412 let types: Vec<String> = resource_types
413 .iter()
414 .map(|t| format!("'{}'", t.replace('\'', "''")))
415 .collect();
416 format!(" AND resource_type IN ({})", types.join(", "))
417 };
418
419 let sql = format!(
420 "SELECT resource_type, id, version_id, data, last_updated, fhir_version FROM resources
421 WHERE tenant_id = $1 AND is_deleted = FALSE{}
422 ORDER BY last_updated DESC, id DESC
423 LIMIT {} OFFSET {}",
424 type_filter,
425 count + 1,
426 offset
427 );
428
429 let rows = client
430 .query(&sql, &[&tenant_id])
431 .await
432 .map_err(|e| internal_error(format!("Failed to execute multi-type search: {}", e)))?;
433
434 let mut resources = Vec::new();
435 for row in &rows {
436 let res_type: String = row.get(0);
437 let id: String = row.get(1);
438 let version_id: String = row.get(2);
439 let json_data: serde_json::Value = row.get(3);
440 let last_updated: chrono::DateTime<Utc> = row.get(4);
441 let fhir_version_str: String = row.get(5);
442
443 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
444
445 let resource = StoredResource::from_storage(
446 res_type,
447 id,
448 version_id,
449 tenant.tenant_id().clone(),
450 json_data,
451 last_updated,
452 last_updated,
453 None,
454 fhir_version,
455 );
456
457 resources.push(resource);
458 }
459
460 let has_next = resources.len() > count;
461 if has_next {
462 resources.pop();
463 }
464
465 let page_info = PageInfo {
466 next_cursor: None,
467 previous_cursor: None,
468 total: None,
469 has_next,
470 has_previous: offset > 0,
471 };
472
473 Ok(SearchResult {
474 resources: Page::new(resources, page_info),
475 included: Vec::new(),
476 total: None,
477 })
478 }
479}
480
481#[async_trait]
482impl IncludeProvider for PostgresBackend {
483 async fn resolve_includes(
484 &self,
485 tenant: &TenantContext,
486 resources: &[StoredResource],
487 includes: &[IncludeDirective],
488 ) -> StorageResult<Vec<StoredResource>> {
489 if resources.is_empty() || includes.is_empty() {
490 return Ok(Vec::new());
491 }
492
493 let client = self.get_client().await?;
494 let tenant_id = tenant.tenant_id().as_str();
495
496 let mut included = Vec::new();
497 let mut seen_refs: HashSet<String> = HashSet::new();
498
499 for include in includes {
500 for resource in resources {
501 if resource.resource_type() != include.source_type {
502 continue;
503 }
504
505 let refs = Self::extract_references(resource.content(), &include.search_param);
506
507 for reference in refs {
508 if let Some((ref_type, ref_id)) = Self::parse_reference(&reference) {
509 if let Some(ref target) = include.target_type {
510 if ref_type != *target {
511 continue;
512 }
513 }
514
515 let ref_key = format!("{}/{}", ref_type, ref_id);
516 if seen_refs.contains(&ref_key) {
517 continue;
518 }
519 seen_refs.insert(ref_key);
520
521 if let Some(included_resource) =
522 Self::fetch_resource(&client, tenant_id, &ref_type, &ref_id).await?
523 {
524 included.push(included_resource);
525 }
526 }
527 }
528 }
529 }
530
531 Ok(included)
532 }
533}
534
535#[async_trait]
536impl RevincludeProvider for PostgresBackend {
537 async fn resolve_revincludes(
538 &self,
539 tenant: &TenantContext,
540 resources: &[StoredResource],
541 revincludes: &[IncludeDirective],
542 ) -> StorageResult<Vec<StoredResource>> {
543 if resources.is_empty() || revincludes.is_empty() {
544 return Ok(Vec::new());
545 }
546
547 let client = self.get_client().await?;
548 let tenant_id = tenant.tenant_id().as_str();
549
550 let mut included = Vec::new();
551 let mut seen_ids: HashSet<String> = HashSet::new();
552
553 for revinclude in revincludes {
554 let mut reference_values: Vec<String> = Vec::new();
555 for resource in resources {
556 reference_values.push(format!("{}/{}", resource.resource_type(), resource.id()));
557 reference_values.push(resource.id().to_string());
558 }
559
560 if reference_values.is_empty() {
561 continue;
562 }
563
564 let placeholders: Vec<String> = (0..reference_values.len())
566 .map(|i| format!("${}", i + 3))
567 .collect();
568
569 let sql = format!(
570 "SELECT DISTINCT r.id, r.version_id, r.data, r.last_updated, r.fhir_version
571 FROM resources r
572 INNER JOIN search_index si ON r.tenant_id = si.tenant_id
573 AND r.resource_type = si.resource_type
574 AND r.id = si.resource_id
575 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
576 AND si.param_name = '{}'
577 AND si.value_reference IN ({})",
578 revinclude.search_param,
579 placeholders.join(", ")
580 );
581
582 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
583 Box::new(tenant_id.to_string()),
584 Box::new(revinclude.source_type.clone()),
585 ];
586 for rv in &reference_values {
587 params.push(Box::new(rv.clone()));
588 }
589
590 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = params
591 .iter()
592 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
593 .collect();
594
595 let rows = client.query(&sql, ¶m_refs).await.map_err(|e| {
596 internal_error(format!("Failed to execute revinclude query: {}", e))
597 })?;
598
599 for row in &rows {
600 let id: String = row.get(0);
601 let version_id: String = row.get(1);
602 let json_data: serde_json::Value = row.get(2);
603 let last_updated: chrono::DateTime<Utc> = row.get(3);
604 let fhir_version_str: String = row.get(4);
605
606 let resource_key = format!("{}/{}", revinclude.source_type, id);
607 if seen_ids.contains(&resource_key) {
608 continue;
609 }
610 seen_ids.insert(resource_key);
611
612 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
613
614 let resource = StoredResource::from_storage(
615 &revinclude.source_type,
616 id,
617 version_id,
618 tenant.tenant_id().clone(),
619 json_data,
620 last_updated,
621 last_updated,
622 None,
623 fhir_version,
624 );
625
626 included.push(resource);
627 }
628 }
629
630 Ok(included)
631 }
632}
633
634#[async_trait]
635impl ChainedSearchProvider for PostgresBackend {
636 async fn resolve_chain(
637 &self,
638 tenant: &TenantContext,
639 base_type: &str,
640 chain: &str,
641 value: &str,
642 ) -> StorageResult<Vec<String>> {
643 let client = self.get_client().await?;
644 let tenant_id = tenant.tenant_id().as_str();
645
646 if chain.is_empty() {
647 return Ok(Vec::new());
648 }
649
650 let parts: Vec<&str> = chain.split('.').collect();
652 if parts.is_empty() {
653 return Ok(Vec::new());
654 }
655
656 if parts.len() == 2 {
659 let ref_param = parts[0];
663 let target_param = parts[1];
664
665 let sql = format!(
666 "SELECT DISTINCT si_ref.resource_id
667 FROM search_index si_ref
668 WHERE si_ref.tenant_id = $1
669 AND si_ref.resource_type = $2
670 AND si_ref.param_name = '{}'
671 AND si_ref.value_reference IN (
672 SELECT resource_type || '/' || resource_id
673 FROM search_index si_target
674 WHERE si_target.tenant_id = $1
675 AND si_target.param_name = '{}'
676 AND si_target.value_string ILIKE $3
677 )",
678 ref_param, target_param
679 );
680
681 let rows = client
682 .query(&sql, &[&tenant_id, &base_type, &format!("{}%", value)])
683 .await
684 .map_err(|e| internal_error(format!("Failed to execute chain query: {}", e)))?;
685
686 let ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
687 Ok(ids)
688 } else {
689 Ok(Vec::new())
691 }
692 }
693
694 async fn resolve_reverse_chain(
695 &self,
696 tenant: &TenantContext,
697 base_type: &str,
698 reverse_chain: &ReverseChainedParameter,
699 ) -> StorageResult<Vec<String>> {
700 let client = self.get_client().await?;
701 let tenant_id = tenant.tenant_id().as_str();
702
703 let value_str = reverse_chain
706 .value
707 .as_ref()
708 .map(|v| v.value.clone())
709 .unwrap_or_default();
710
711 let sql = format!(
712 "SELECT DISTINCT si_ref.value_reference
713 FROM search_index si_ref
714 INNER JOIN search_index si_val
715 ON si_ref.tenant_id = si_val.tenant_id
716 AND si_ref.resource_type = si_val.resource_type
717 AND si_ref.resource_id = si_val.resource_id
718 WHERE si_ref.tenant_id = $1
719 AND si_ref.resource_type = '{}'
720 AND si_ref.param_name = '{}'
721 AND si_val.param_name = '{}'
722 AND (si_val.value_token_code = $2
723 OR si_val.value_string ILIKE $3)",
724 reverse_chain.source_type, reverse_chain.reference_param, reverse_chain.search_param
725 );
726
727 let like_value = format!("{}%", value_str);
728 let rows = client
729 .query(
730 &sql,
731 &[&tenant_id, &value_str.as_str(), &like_value.as_str()],
732 )
733 .await
734 .map_err(|e| internal_error(format!("Failed to execute reverse chain query: {}", e)))?;
735
736 let mut ids = Vec::new();
737 for row in &rows {
738 let reference: String = row.get(0);
739 let expected_prefix = format!("{}/", base_type);
741 if let Some(id) = reference.strip_prefix(&expected_prefix) {
742 ids.push(id.to_string());
743 }
744 }
745
746 Ok(ids)
747 }
748}
749
750#[async_trait]
751impl TextSearchProvider for PostgresBackend {
752 async fn search_text(
753 &self,
754 tenant: &TenantContext,
755 resource_type: &str,
756 text: &str,
757 pagination: &Pagination,
758 ) -> StorageResult<SearchResult> {
759 let client = self.get_client().await?;
760 let tenant_id = tenant.tenant_id().as_str();
761 let count = pagination.count as usize;
762
763 let sql = format!(
765 "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
766 ts_rank(fts.narrative_tsvector, plainto_tsquery('english', $3)) AS rank
767 FROM resources r
768 INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
769 AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
770 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
771 AND fts.narrative_tsvector @@ plainto_tsquery('english', $3)
772 ORDER BY rank DESC, r.last_updated DESC
773 LIMIT {}",
774 count + 1
775 );
776
777 let rows = client
778 .query(&sql, &[&tenant_id, &resource_type, &text])
779 .await
780 .map_err(|e| internal_error(format!("Failed to execute text search: {}", e)))?;
781
782 let mut resources = Vec::new();
783 for row in &rows {
784 let id: String = row.get(0);
785 let version_id: String = row.get(1);
786 let json_data: serde_json::Value = row.get(2);
787 let last_updated: chrono::DateTime<Utc> = row.get(3);
788 let fhir_version_str: String = row.get(4);
789
790 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
791
792 resources.push(StoredResource::from_storage(
793 resource_type,
794 id,
795 version_id,
796 tenant.tenant_id().clone(),
797 json_data,
798 last_updated,
799 last_updated,
800 None,
801 fhir_version,
802 ));
803 }
804
805 let has_next = resources.len() > count;
806 if has_next {
807 resources.pop();
808 }
809
810 let page_info = PageInfo {
811 next_cursor: None,
812 previous_cursor: None,
813 total: None,
814 has_next,
815 has_previous: false,
816 };
817
818 Ok(SearchResult {
819 resources: Page::new(resources, page_info),
820 included: Vec::new(),
821 total: None,
822 })
823 }
824
825 async fn search_content(
826 &self,
827 tenant: &TenantContext,
828 resource_type: &str,
829 content: &str,
830 pagination: &Pagination,
831 ) -> StorageResult<SearchResult> {
832 let client = self.get_client().await?;
833 let tenant_id = tenant.tenant_id().as_str();
834 let count = pagination.count as usize;
835
836 let sql = format!(
838 "SELECT r.id, r.version_id, r.data, r.last_updated, r.fhir_version,
839 ts_rank(fts.content_tsvector, plainto_tsquery('english', $3)) AS rank
840 FROM resources r
841 INNER JOIN resource_fts fts ON r.tenant_id = fts.tenant_id
842 AND r.resource_type = fts.resource_type AND r.id = fts.resource_id
843 WHERE r.tenant_id = $1 AND r.resource_type = $2 AND r.is_deleted = FALSE
844 AND fts.content_tsvector @@ plainto_tsquery('english', $3)
845 ORDER BY rank DESC, r.last_updated DESC
846 LIMIT {}",
847 count + 1
848 );
849
850 let rows = client
851 .query(&sql, &[&tenant_id, &resource_type, &content])
852 .await
853 .map_err(|e| internal_error(format!("Failed to execute content search: {}", e)))?;
854
855 let mut resources = Vec::new();
856 for row in &rows {
857 let id: String = row.get(0);
858 let version_id: String = row.get(1);
859 let json_data: serde_json::Value = row.get(2);
860 let last_updated: chrono::DateTime<Utc> = row.get(3);
861 let fhir_version_str: String = row.get(4);
862
863 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
864
865 resources.push(StoredResource::from_storage(
866 resource_type,
867 id,
868 version_id,
869 tenant.tenant_id().clone(),
870 json_data,
871 last_updated,
872 last_updated,
873 None,
874 fhir_version,
875 ));
876 }
877
878 let has_next = resources.len() > count;
879 if has_next {
880 resources.pop();
881 }
882
883 let page_info = PageInfo {
884 next_cursor: None,
885 previous_cursor: None,
886 total: None,
887 has_next,
888 has_previous: false,
889 };
890
891 Ok(SearchResult {
892 resources: Page::new(resources, page_info),
893 included: Vec::new(),
894 total: None,
895 })
896 }
897}
898
899impl PostgresBackend {
901 fn extract_cursor_values(cursor: &PageCursor) -> StorageResult<(String, String)> {
903 let sort_values = cursor.sort_values();
904 let timestamp = match sort_values.first() {
905 Some(CursorValue::String(s)) => s.clone(),
906 _ => {
907 return Err(internal_error(
908 "Invalid cursor: missing or invalid timestamp".to_string(),
909 ));
910 }
911 };
912 let id = cursor.resource_id().to_string();
913 Ok((timestamp, id))
914 }
915
916 fn extract_references(content: &serde_json::Value, search_param: &str) -> Vec<String> {
918 let mut refs = Vec::new();
919 if let Some(value) = content.get(search_param) {
920 Self::collect_references_from_value(value, &mut refs);
921 }
922 refs
923 }
924
925 fn collect_references_from_value(value: &serde_json::Value, refs: &mut Vec<String>) {
927 match value {
928 serde_json::Value::Object(obj) => {
929 if let Some(serde_json::Value::String(ref_str)) = obj.get("reference") {
930 refs.push(ref_str.clone());
931 }
932 for v in obj.values() {
933 Self::collect_references_from_value(v, refs);
934 }
935 }
936 serde_json::Value::Array(arr) => {
937 for item in arr {
938 Self::collect_references_from_value(item, refs);
939 }
940 }
941 _ => {}
942 }
943 }
944
945 fn parse_reference(reference: &str) -> Option<(String, String)> {
947 let path = reference
948 .strip_prefix("http://")
949 .or_else(|| reference.strip_prefix("https://"))
950 .map(|s| s.rsplit('/').take(2).collect::<Vec<_>>())
951 .unwrap_or_else(|| reference.split('/').collect());
952
953 if path.len() >= 2 {
954 if reference.starts_with("http") {
955 Some((path[1].to_string(), path[0].to_string()))
956 } else {
957 Some((path[0].to_string(), path[1].to_string()))
958 }
959 } else {
960 None
961 }
962 }
963
964 async fn fetch_resource(
966 client: &deadpool_postgres::Client,
967 tenant_id: &str,
968 resource_type: &str,
969 id: &str,
970 ) -> StorageResult<Option<StoredResource>> {
971 let rows = client
972 .query(
973 "SELECT version_id, data, last_updated, fhir_version FROM resources
974 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
975 &[&tenant_id, &resource_type, &id],
976 )
977 .await
978 .map_err(|e| internal_error(format!("Failed to fetch resource: {}", e)))?;
979
980 if rows.is_empty() {
981 return Ok(None);
982 }
983
984 let row = &rows[0];
985 let version_id: String = row.get(0);
986 let json_data: serde_json::Value = row.get(1);
987 let last_updated: chrono::DateTime<Utc> = row.get(2);
988 let fhir_version_str: String = row.get(3);
989 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
990
991 Ok(Some(StoredResource::from_storage(
992 resource_type,
993 id,
994 version_id,
995 crate::tenant::TenantId::new(tenant_id),
996 json_data,
997 last_updated,
998 last_updated,
999 None,
1000 fhir_version,
1001 )))
1002 }
1003}