1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7
8use crate::core::history::{
9 DifferentialHistoryProvider, HistoryEntry, HistoryMethod, HistoryPage, HistoryParams,
10 InstanceHistoryProvider, SystemHistoryProvider, TypeHistoryProvider,
11};
12use crate::core::transaction::{
13 BundleEntry, BundleEntryResult, BundleMethod, BundleProvider, BundleResult, BundleType,
14};
15use crate::core::{
16 ConditionalCreateResult, ConditionalDeleteResult, ConditionalStorage, ConditionalUpdateResult,
17 PurgableStorage, ResourceStorage, SearchProvider, VersionedStorage,
18};
19use crate::error::TransactionError;
20use crate::error::{BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult};
21use crate::search::loader::SearchParameterLoader;
22use crate::search::registry::SearchParameterStatus;
23use crate::search::reindex::{ReindexableStorage, ResourcePage};
24use crate::tenant::TenantContext;
25use crate::types::Pagination;
26use crate::types::{CursorValue, Page, PageCursor, PageInfo, StoredResource};
27use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
28
29use super::PostgresBackend;
30use super::search::writer::PostgresSearchIndexWriter;
31
32fn internal_error(message: String) -> StorageError {
33 StorageError::Backend(BackendError::Internal {
34 backend_name: "postgres".to_string(),
35 message,
36 source: None,
37 })
38}
39
40#[allow(dead_code)]
41fn serialization_error(message: String) -> StorageError {
42 StorageError::Backend(BackendError::SerializationError { message })
43}
44
45fn extract_part_value(part: &Value) -> Option<Value> {
51 part.as_object()?.iter().find_map(|(k, v)| {
52 let suffix = k.strip_prefix("value")?;
53 suffix
54 .chars()
55 .next()?
56 .is_ascii_uppercase()
57 .then(|| v.clone())
58 })
59}
60
61#[async_trait]
62impl ResourceStorage for PostgresBackend {
63 fn backend_name(&self) -> &'static str {
64 "postgres"
65 }
66
67 fn sof_runner(&self) -> Option<std::sync::Arc<dyn crate::core::sof_runner::SofRunner>> {
68 use crate::sof::postgres::PgInDbRunner;
69 Some(std::sync::Arc::new(PgInDbRunner::new(self.pool())))
70 }
71
72 async fn create(
73 &self,
74 tenant: &TenantContext,
75 resource_type: &str,
76 resource: Value,
77 fhir_version: FhirVersion,
78 ) -> StorageResult<StoredResource> {
79 let client = self.get_client().await?;
80 let tenant_id = tenant.tenant_id().as_str();
81
82 let id = resource
84 .get("id")
85 .and_then(|v| v.as_str())
86 .map(String::from)
87 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
88
89 let exists = client
91 .query_opt(
92 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
93 &[&tenant_id, &resource_type, &id],
94 )
95 .await
96 .map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
97
98 if exists.is_some() {
99 return Err(StorageError::Resource(ResourceError::AlreadyExists {
100 resource_type: resource_type.to_string(),
101 id: id.clone(),
102 }));
103 }
104
105 let mut resource = resource;
107 if let Some(obj) = resource.as_object_mut() {
108 obj.insert(
109 "resourceType".to_string(),
110 Value::String(resource_type.to_string()),
111 );
112 obj.insert("id".to_string(), Value::String(id.clone()));
113 }
114
115 let now = Utc::now();
116 let version_id = "1";
117 let fhir_version_str = fhir_version.as_mime_param();
118 let is_deleted = false;
119
120 client
122 .execute(
123 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
124 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
125 &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
126 )
127 .await
128 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
129
130 client
132 .execute(
133 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
134 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
135 &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
136 )
137 .await
138 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
139
140 self.index_resource(&client, tenant_id, resource_type, &id, &resource)
142 .await?;
143
144 if resource_type == "SearchParameter" {
146 self.handle_search_parameter_create(&resource)?;
147 }
148
149 Ok(StoredResource::from_storage(
151 resource_type,
152 &id,
153 version_id,
154 tenant.tenant_id().clone(),
155 resource,
156 now,
157 now,
158 None,
159 fhir_version,
160 ))
161 }
162
163 async fn create_or_update(
164 &self,
165 tenant: &TenantContext,
166 resource_type: &str,
167 id: &str,
168 resource: Value,
169 fhir_version: FhirVersion,
170 ) -> StorageResult<(StoredResource, bool)> {
171 let existing = self.read(tenant, resource_type, id).await?;
173
174 if let Some(current) = existing {
175 let updated = self.update(tenant, ¤t, resource).await?;
177 Ok((updated, false))
178 } else {
179 let mut resource = resource;
181 if let Some(obj) = resource.as_object_mut() {
182 obj.insert("id".to_string(), Value::String(id.to_string()));
183 }
184 let created = self
185 .create(tenant, resource_type, resource, fhir_version)
186 .await?;
187 Ok((created, true))
188 }
189 }
190
191 async fn read(
192 &self,
193 tenant: &TenantContext,
194 resource_type: &str,
195 id: &str,
196 ) -> StorageResult<Option<StoredResource>> {
197 let client = self.get_client().await?;
198 let tenant_id = tenant.tenant_id().as_str();
199
200 let row = client
201 .query_opt(
202 "SELECT version_id, data, last_updated, is_deleted, deleted_at, fhir_version
203 FROM resources
204 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
205 &[&tenant_id, &resource_type, &id],
206 )
207 .await
208 .map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
209
210 match row {
211 Some(row) => {
212 let version_id: String = row.get(0);
213 let data: Value = row.get(1);
214 let last_updated: DateTime<Utc> = row.get(2);
215 let is_deleted: bool = row.get(3);
216 let deleted_at: Option<DateTime<Utc>> = row.get(4);
217 let fhir_version_str: String = row.get(5);
218
219 if is_deleted {
221 return Err(StorageError::Resource(ResourceError::Gone {
222 resource_type: resource_type.to_string(),
223 id: id.to_string(),
224 deleted_at,
225 }));
226 }
227
228 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
229 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
230
231 Ok(Some(StoredResource::from_storage(
232 resource_type,
233 id,
234 version_id,
235 tenant.tenant_id().clone(),
236 data,
237 last_updated,
238 last_updated,
239 None,
240 fhir_version,
241 )))
242 }
243 None => Ok(None),
244 }
245 }
246
247 async fn update(
248 &self,
249 tenant: &TenantContext,
250 current: &StoredResource,
251 resource: Value,
252 ) -> StorageResult<StoredResource> {
253 let client = self.get_client().await?;
254 let tenant_id = tenant.tenant_id().as_str();
255 let resource_type = current.resource_type();
256 let id = current.id();
257
258 let row = client
260 .query_opt(
261 "SELECT version_id FROM resources
262 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
263 &[&tenant_id, &resource_type, &id],
264 )
265 .await
266 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
267
268 let actual_version = match row {
269 Some(row) => row.get::<_, String>(0),
270 None => {
271 return Err(StorageError::Resource(ResourceError::NotFound {
272 resource_type: resource_type.to_string(),
273 id: id.to_string(),
274 }));
275 }
276 };
277
278 if actual_version != current.version_id() {
280 return Err(StorageError::Concurrency(
281 ConcurrencyError::VersionConflict {
282 resource_type: resource_type.to_string(),
283 id: id.to_string(),
284 expected_version: current.version_id().to_string(),
285 actual_version,
286 },
287 ));
288 }
289
290 let new_version: u64 = actual_version.parse().unwrap_or(0) + 1;
292 let new_version_str = new_version.to_string();
293
294 let mut resource = resource;
296 if let Some(obj) = resource.as_object_mut() {
297 obj.insert(
298 "resourceType".to_string(),
299 Value::String(resource_type.to_string()),
300 );
301 obj.insert("id".to_string(), Value::String(id.to_string()));
302 }
303
304 let now = Utc::now();
305 let fhir_version_str = current.fhir_version().as_mime_param();
306 let is_deleted = false;
307
308 client
310 .execute(
311 "UPDATE resources SET version_id = $1, data = $2, last_updated = $3
312 WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
313 &[
314 &new_version_str,
315 &resource,
316 &now,
317 &tenant_id,
318 &resource_type,
319 &id,
320 ],
321 )
322 .await
323 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
324
325 client
327 .execute(
328 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
329 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
330 &[&tenant_id, &resource_type, &id, &new_version_str, &resource, &now, &is_deleted, &fhir_version_str],
331 )
332 .await
333 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
334
335 self.delete_search_index(&client, tenant_id, resource_type, id)
337 .await?;
338 self.index_resource(&client, tenant_id, resource_type, id, &resource)
339 .await?;
340
341 if resource_type == "SearchParameter" {
343 self.handle_search_parameter_update(current.content(), &resource)?;
344 }
345
346 Ok(StoredResource::from_storage(
347 resource_type,
348 id,
349 new_version_str,
350 tenant.tenant_id().clone(),
351 resource,
352 now,
353 now,
354 None,
355 current.fhir_version(),
356 ))
357 }
358
359 async fn delete(
360 &self,
361 tenant: &TenantContext,
362 resource_type: &str,
363 id: &str,
364 ) -> StorageResult<()> {
365 let client = self.get_client().await?;
366 let tenant_id = tenant.tenant_id().as_str();
367
368 let row = client
370 .query_opt(
371 "SELECT version_id, data, fhir_version FROM resources
372 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
373 &[&tenant_id, &resource_type, &id],
374 )
375 .await
376 .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
377
378 let (current_version, data, fhir_version_str) = match row {
379 Some(row) => {
380 let v: String = row.get(0);
381 let d: Value = row.get(1);
382 let f: String = row.get(2);
383 (v, d, f)
384 }
385 None => {
386 return Err(StorageError::Resource(ResourceError::NotFound {
387 resource_type: resource_type.to_string(),
388 id: id.to_string(),
389 }));
390 }
391 };
392
393 let now = Utc::now();
394
395 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
397 let new_version_str = new_version.to_string();
398 let is_deleted = true;
399
400 client
402 .execute(
403 "UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
404 WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
405 &[&now, &new_version_str, &tenant_id, &resource_type, &id],
406 )
407 .await
408 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
409
410 client
412 .execute(
413 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
414 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
415 &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
416 )
417 .await
418 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
419
420 if !self.is_search_offloaded() {
422 client
423 .execute(
424 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
425 &[&tenant_id, &resource_type, &id],
426 )
427 .await
428 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
429 }
430
431 if resource_type == "SearchParameter" {
433 self.handle_search_parameter_delete(&data)?;
434 }
435
436 Ok(())
437 }
438
439 async fn count(
440 &self,
441 tenant: &TenantContext,
442 resource_type: Option<&str>,
443 ) -> StorageResult<u64> {
444 let client = self.get_client().await?;
445 let tenant_id = tenant.tenant_id().as_str();
446
447 let count: i64 = if let Some(rt) = resource_type {
448 let row = client
449 .query_one(
450 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE",
451 &[&tenant_id, &rt],
452 )
453 .await
454 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
455 row.get(0)
456 } else {
457 let row = client
458 .query_one(
459 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
460 &[&tenant_id],
461 )
462 .await
463 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
464 row.get(0)
465 };
466
467 Ok(count as u64)
468 }
469}
470
471impl PostgresBackend {
476 pub(crate) async fn index_resource(
481 &self,
482 client: &deadpool_postgres::Client,
483 tenant_id: &str,
484 resource_type: &str,
485 resource_id: &str,
486 resource: &Value,
487 ) -> StorageResult<()> {
488 if self.is_search_offloaded() {
490 return Ok(());
491 }
492
493 client
495 .execute(
496 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
497 &[&tenant_id, &resource_type, &resource_id],
498 )
499 .await
500 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
501
502 match self.search_extractor().extract(resource, resource_type) {
504 Ok(values) => {
505 let mut count = 0;
506 for value in values {
507 PostgresSearchIndexWriter::write_entry(
508 client,
509 tenant_id,
510 resource_type,
511 resource_id,
512 &value,
513 )
514 .await?;
515 count += 1;
516 }
517 tracing::debug!(
518 "Dynamically indexed {} values for {}/{}",
519 count,
520 resource_type,
521 resource_id
522 );
523 }
524 Err(e) => {
525 tracing::warn!(
526 "Dynamic extraction failed for {}/{}: {}. Using minimal fallback (_id, _lastUpdated only).",
527 resource_type,
528 resource_id,
529 e
530 );
531 self.index_minimal_fallback(
533 client,
534 tenant_id,
535 resource_type,
536 resource_id,
537 resource,
538 )
539 .await?;
540 }
541 }
542
543 self.index_contained_resources(client, tenant_id, resource_type, resource_id, resource)
545 .await?;
546
547 self.index_fts_content(client, tenant_id, resource_type, resource_id, resource)
549 .await?;
550
551 Ok(())
552 }
553
554 async fn index_contained_resources(
559 &self,
560 client: &deadpool_postgres::Client,
561 tenant_id: &str,
562 container_type: &str,
563 container_id: &str,
564 resource: &Value,
565 ) -> StorageResult<usize> {
566 let mut count = 0;
567 let container = (container_type, container_id);
568 for contained in self.search_extractor().extract_contained(resource) {
569 for value in &contained.values {
570 PostgresSearchIndexWriter::write_contained_entry(
571 client,
572 tenant_id,
573 container,
574 (&contained.contained_type, &contained.local_id),
575 value,
576 )
577 .await?;
578 count += 1;
579 }
580 }
581 Ok(count)
582 }
583
584 async fn index_fts_content(
588 &self,
589 client: &deadpool_postgres::Client,
590 tenant_id: &str,
591 resource_type: &str,
592 resource_id: &str,
593 resource: &Value,
594 ) -> StorageResult<()> {
595 let fts_exists = client
597 .query_opt(
598 "SELECT 1 FROM information_schema.tables WHERE table_name = 'resource_fts'",
599 &[],
600 )
601 .await
602 .map_err(|e| internal_error(format!("Failed to check FTS table: {}", e)))?;
603
604 if fts_exists.is_none() {
605 return Ok(());
606 }
607
608 let content = extract_searchable_content(resource);
610
611 if content.is_empty() {
612 return Ok(());
613 }
614
615 let _ = client
617 .execute(
618 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
619 &[&tenant_id, &resource_type, &resource_id],
620 )
621 .await;
622
623 client
625 .execute(
626 "INSERT INTO resource_fts (resource_id, resource_type, tenant_id, narrative_text, full_content)
627 VALUES ($1, $2, $3, $4, $5)",
628 &[
629 &resource_id,
630 &resource_type,
631 &tenant_id,
632 &content.narrative,
633 &content.full_content,
634 ],
635 )
636 .await
637 .map_err(|e| internal_error(format!("Failed to insert FTS content: {}", e)))?;
638
639 Ok(())
640 }
641
642 async fn index_minimal_fallback(
646 &self,
647 client: &deadpool_postgres::Client,
648 tenant_id: &str,
649 resource_type: &str,
650 resource_id: &str,
651 resource: &Value,
652 ) -> StorageResult<()> {
653 if let Some(id) = resource.get("id").and_then(|v| v.as_str()) {
655 client
656 .execute(
657 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_token_code)
658 VALUES ($1, $2, $3, '_id', $4)",
659 &[&tenant_id, &resource_type, &resource_id, &id],
660 )
661 .await
662 .map_err(|e| internal_error(format!("Failed to insert _id index: {}", e)))?;
663 }
664
665 if let Some(last_updated) = resource
667 .get("meta")
668 .and_then(|m| m.get("lastUpdated"))
669 .and_then(|v| v.as_str())
670 {
671 let normalized = normalize_date_for_pg(last_updated);
672 client
673 .execute(
674 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_date)
675 VALUES ($1, $2, $3, '_lastUpdated', $4::timestamptz)",
676 &[&tenant_id, &resource_type, &resource_id, &normalized],
677 )
678 .await
679 .map_err(|e| {
680 internal_error(format!("Failed to insert _lastUpdated index: {}", e))
681 })?;
682 }
683
684 Ok(())
685 }
686
687 pub(crate) async fn delete_search_index(
689 &self,
690 client: &deadpool_postgres::Client,
691 tenant_id: &str,
692 resource_type: &str,
693 resource_id: &str,
694 ) -> StorageResult<()> {
695 if self.is_search_offloaded() {
697 return Ok(());
698 }
699
700 client
702 .execute(
703 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
704 &[&tenant_id, &resource_type, &resource_id],
705 )
706 .await
707 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
708
709 let _ = client
711 .execute(
712 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
713 &[&tenant_id, &resource_type, &resource_id],
714 )
715 .await;
716
717 Ok(())
718 }
719}
720
721impl PostgresBackend {
726 fn handle_search_parameter_create(&self, resource: &Value) -> StorageResult<()> {
732 let loader = SearchParameterLoader::new(self.config().fhir_version);
733
734 match loader.parse_resource(resource) {
735 Ok(def) => {
736 if def.status == SearchParameterStatus::Active {
738 let mut registry = self.search_registry().write();
739 if let Err(e) = registry.register(def) {
741 tracing::debug!("SearchParameter registration skipped: {}", e);
742 }
743 }
744 }
745 Err(e) => {
746 tracing::warn!("Failed to parse SearchParameter for registry: {}", e);
748 }
749 }
750
751 Ok(())
752 }
753
754 fn handle_search_parameter_update(
761 &self,
762 old_resource: &Value,
763 new_resource: &Value,
764 ) -> StorageResult<()> {
765 let loader = SearchParameterLoader::new(self.config().fhir_version);
766
767 let old_def = loader.parse_resource(old_resource).ok();
768 let new_def = loader.parse_resource(new_resource).ok();
769
770 match (old_def, new_def) {
771 (Some(old), Some(new)) => {
772 let mut registry = self.search_registry().write();
773
774 if old.url != new.url {
776 let _ = registry.unregister(&old.url);
777 if new.status == SearchParameterStatus::Active {
778 let _ = registry.register(new);
779 }
780 } else if old.status != new.status {
781 if let Err(e) = registry.update_status(&new.url, new.status) {
783 tracing::debug!("SearchParameter status update skipped: {}", e);
784 }
785 } else {
786 let _ = registry.unregister(&old.url);
788 if new.status == SearchParameterStatus::Active {
789 let _ = registry.register(new);
790 }
791 }
792 }
793 (None, Some(new)) => {
794 if new.status == SearchParameterStatus::Active {
796 let mut registry = self.search_registry().write();
797 let _ = registry.register(new);
798 }
799 }
800 (Some(old), None) => {
801 let mut registry = self.search_registry().write();
803 let _ = registry.unregister(&old.url);
804 }
805 (None, None) => {
806 }
808 }
809
810 Ok(())
811 }
812
813 fn handle_search_parameter_delete(&self, resource: &Value) -> StorageResult<()> {
818 if let Some(url) = resource.get("url").and_then(|v| v.as_str()) {
819 let mut registry = self.search_registry().write();
820 if let Err(e) = registry.unregister(url) {
821 tracing::debug!("SearchParameter unregistration skipped: {}", e);
822 }
823 }
824
825 Ok(())
826 }
827}
828
829#[async_trait]
834impl VersionedStorage for PostgresBackend {
835 async fn vread(
836 &self,
837 tenant: &TenantContext,
838 resource_type: &str,
839 id: &str,
840 version_id: &str,
841 ) -> StorageResult<Option<StoredResource>> {
842 let client = self.get_client().await?;
843 let tenant_id = tenant.tenant_id().as_str();
844
845 let row = client
846 .query_opt(
847 "SELECT data, last_updated, is_deleted, fhir_version
848 FROM resource_history
849 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
850 &[&tenant_id, &resource_type, &id, &version_id],
851 )
852 .await
853 .map_err(|e| internal_error(format!("Failed to read version: {}", e)))?;
854
855 match row {
856 Some(row) => {
857 let data: Value = row.get(0);
858 let last_updated: DateTime<Utc> = row.get(1);
859 let is_deleted: bool = row.get(2);
860 let fhir_version_str: String = row.get(3);
861
862 let deleted_at = if is_deleted { Some(last_updated) } else { None };
864
865 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
866 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
867
868 Ok(Some(StoredResource::from_storage(
869 resource_type,
870 id,
871 version_id,
872 tenant.tenant_id().clone(),
873 data,
874 last_updated,
875 last_updated,
876 deleted_at,
877 fhir_version,
878 )))
879 }
880 None => Ok(None),
881 }
882 }
883
884 async fn update_with_match(
885 &self,
886 tenant: &TenantContext,
887 resource_type: &str,
888 id: &str,
889 expected_version: &str,
890 resource: Value,
891 ) -> StorageResult<StoredResource> {
892 let current = self.read(tenant, resource_type, id).await?.ok_or_else(|| {
894 StorageError::Resource(ResourceError::NotFound {
895 resource_type: resource_type.to_string(),
896 id: id.to_string(),
897 })
898 })?;
899
900 if current.version_id() != expected_version {
902 return Err(StorageError::Concurrency(
903 ConcurrencyError::VersionConflict {
904 resource_type: resource_type.to_string(),
905 id: id.to_string(),
906 expected_version: expected_version.to_string(),
907 actual_version: current.version_id().to_string(),
908 },
909 ));
910 }
911
912 self.update(tenant, ¤t, resource).await
914 }
915
916 async fn delete_with_match(
917 &self,
918 tenant: &TenantContext,
919 resource_type: &str,
920 id: &str,
921 expected_version: &str,
922 ) -> StorageResult<()> {
923 let client = self.get_client().await?;
924 let tenant_id = tenant.tenant_id().as_str();
925
926 let row = client
928 .query_opt(
929 "SELECT version_id FROM resources
930 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
931 &[&tenant_id, &resource_type, &id],
932 )
933 .await
934 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
935
936 let current_version = match row {
937 Some(row) => row.get::<_, String>(0),
938 None => {
939 return Err(StorageError::Resource(ResourceError::NotFound {
940 resource_type: resource_type.to_string(),
941 id: id.to_string(),
942 }));
943 }
944 };
945
946 if current_version != expected_version {
947 return Err(StorageError::Concurrency(
948 ConcurrencyError::VersionConflict {
949 resource_type: resource_type.to_string(),
950 id: id.to_string(),
951 expected_version: expected_version.to_string(),
952 actual_version: current_version,
953 },
954 ));
955 }
956
957 self.delete(tenant, resource_type, id).await
959 }
960
961 async fn list_versions(
962 &self,
963 tenant: &TenantContext,
964 resource_type: &str,
965 id: &str,
966 ) -> StorageResult<Vec<String>> {
967 let client = self.get_client().await?;
968 let tenant_id = tenant.tenant_id().as_str();
969
970 let rows = client
971 .query(
972 "SELECT version_id FROM resource_history
973 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3
974 ORDER BY CAST(version_id AS INTEGER) ASC",
975 &[&tenant_id, &resource_type, &id],
976 )
977 .await
978 .map_err(|e| internal_error(format!("Failed to list versions: {}", e)))?;
979
980 let versions: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
981 Ok(versions)
982 }
983}
984
985#[async_trait]
990impl InstanceHistoryProvider for PostgresBackend {
991 async fn history_instance(
992 &self,
993 tenant: &TenantContext,
994 resource_type: &str,
995 id: &str,
996 params: &HistoryParams,
997 ) -> StorageResult<HistoryPage> {
998 let client = self.get_client().await?;
999 let tenant_id = tenant.tenant_id().as_str();
1000
1001 let mut sql = String::from(
1003 "SELECT version_id, data, last_updated, is_deleted, fhir_version
1004 FROM resource_history
1005 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1006 );
1007 let mut param_index: usize = 4;
1008 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1009 Box::new(tenant_id.to_string()),
1010 Box::new(resource_type.to_string()),
1011 Box::new(id.to_string()),
1012 ];
1013
1014 if !params.include_deleted {
1016 sql.push_str(" AND is_deleted = FALSE");
1017 }
1018
1019 if let Some(since) = ¶ms.since {
1021 sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1022 query_params.push(Box::new(*since));
1023 param_index += 1;
1024 }
1025
1026 if let Some(before) = ¶ms.before {
1028 sql.push_str(&format!(" AND last_updated < ${}", param_index));
1029 query_params.push(Box::new(*before));
1030 param_index += 1;
1031 }
1032
1033 if let Some(cursor) = params.pagination.cursor_value() {
1035 if let Some(CursorValue::String(version_str)) = cursor.sort_values().first() {
1036 if let Ok(version_int) = version_str.parse::<i64>() {
1037 sql.push_str(&format!(
1038 " AND CAST(version_id AS INTEGER) < ${}",
1039 param_index
1040 ));
1041 query_params.push(Box::new(version_int));
1042 param_index += 1;
1043 }
1044 }
1045 }
1046
1047 let limit = params.pagination.count as i64 + 1; sql.push_str(&format!(
1050 " ORDER BY CAST(version_id AS INTEGER) DESC LIMIT ${}",
1051 param_index
1052 ));
1053 query_params.push(Box::new(limit));
1054
1055 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1057 .iter()
1058 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1059 .collect();
1060
1061 let rows = client
1062 .query(&sql, ¶m_refs)
1063 .await
1064 .map_err(|e| internal_error(format!("Failed to query history: {}", e)))?;
1065
1066 let mut entries = Vec::new();
1067 let mut last_version: Option<String> = None;
1068
1069 for row in &rows {
1070 if entries.len() >= params.pagination.count as usize {
1072 break;
1073 }
1074
1075 let version_id: String = row.get(0);
1076 let data: Value = row.get(1);
1077 let last_updated: DateTime<Utc> = row.get(2);
1078 let is_deleted: bool = row.get(3);
1079 let fhir_version_str: String = row.get(4);
1080
1081 let deleted_at = if is_deleted { Some(last_updated) } else { None };
1082
1083 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1084 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1085
1086 let resource = StoredResource::from_storage(
1087 resource_type,
1088 id,
1089 &version_id,
1090 tenant.tenant_id().clone(),
1091 data,
1092 last_updated,
1093 last_updated,
1094 deleted_at,
1095 fhir_version,
1096 );
1097
1098 let method = if is_deleted {
1100 HistoryMethod::Delete
1101 } else if version_id == "1" {
1102 HistoryMethod::Post
1103 } else {
1104 HistoryMethod::Put
1105 };
1106
1107 last_version = Some(version_id);
1108
1109 entries.push(HistoryEntry {
1110 resource,
1111 method,
1112 timestamp: last_updated,
1113 });
1114 }
1115
1116 let has_more = rows.len() > params.pagination.count as usize;
1118
1119 let page_info = if let (true, Some(version)) = (has_more, last_version) {
1121 let cursor = PageCursor::new(vec![CursorValue::String(version)], id.to_string());
1122 PageInfo::with_next(cursor)
1123 } else {
1124 PageInfo::end()
1125 };
1126
1127 Ok(Page::new(entries, page_info))
1128 }
1129
1130 async fn history_instance_count(
1131 &self,
1132 tenant: &TenantContext,
1133 resource_type: &str,
1134 id: &str,
1135 ) -> StorageResult<u64> {
1136 let client = self.get_client().await?;
1137 let tenant_id = tenant.tenant_id().as_str();
1138
1139 let row = client
1140 .query_one(
1141 "SELECT COUNT(*) FROM resource_history
1142 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1143 &[&tenant_id, &resource_type, &id],
1144 )
1145 .await
1146 .map_err(|e| internal_error(format!("Failed to count history: {}", e)))?;
1147
1148 let count: i64 = row.get(0);
1149 Ok(count as u64)
1150 }
1151
1152 async fn delete_instance_history(
1153 &self,
1154 tenant: &TenantContext,
1155 resource_type: &str,
1156 id: &str,
1157 ) -> StorageResult<u64> {
1158 let client = self.get_client().await?;
1159 let tenant_id = tenant.tenant_id().as_str();
1160
1161 let exists = client
1163 .query_opt(
1164 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1165 &[&tenant_id, &resource_type, &id],
1166 )
1167 .await
1168 .map_err(|e| internal_error(format!("Failed to check resource existence: {}", e)))?;
1169
1170 if exists.is_none() {
1171 return Err(StorageError::Resource(ResourceError::NotFound {
1172 resource_type: resource_type.to_string(),
1173 id: id.to_string(),
1174 }));
1175 }
1176
1177 let current_row = client
1179 .query_one(
1180 "SELECT version_id FROM resources
1181 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1182 &[&tenant_id, &resource_type, &id],
1183 )
1184 .await
1185 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1186
1187 let current_version: String = current_row.get(0);
1188
1189 let deleted = client
1191 .execute(
1192 "DELETE FROM resource_history
1193 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id != $4",
1194 &[&tenant_id, &resource_type, &id, ¤t_version],
1195 )
1196 .await
1197 .map_err(|e| internal_error(format!("Failed to delete history: {}", e)))?;
1198
1199 Ok(deleted)
1200 }
1201
1202 async fn delete_version(
1203 &self,
1204 tenant: &TenantContext,
1205 resource_type: &str,
1206 id: &str,
1207 version_id: &str,
1208 ) -> StorageResult<()> {
1209 let client = self.get_client().await?;
1210 let tenant_id = tenant.tenant_id().as_str();
1211
1212 let current_row = client
1214 .query_opt(
1215 "SELECT version_id FROM resources
1216 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1217 &[&tenant_id, &resource_type, &id],
1218 )
1219 .await
1220 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1221
1222 let current_version = match current_row {
1223 Some(row) => row.get::<_, String>(0),
1224 None => {
1225 return Err(StorageError::Resource(ResourceError::NotFound {
1226 resource_type: resource_type.to_string(),
1227 id: id.to_string(),
1228 }));
1229 }
1230 };
1231
1232 if version_id == current_version {
1234 return Err(StorageError::Validation(
1235 crate::error::ValidationError::InvalidResource {
1236 message: format!(
1237 "Cannot delete current version {} of {}/{}. Use DELETE on the resource instead.",
1238 version_id, resource_type, id
1239 ),
1240 details: vec![],
1241 },
1242 ));
1243 }
1244
1245 let version_exists = client
1247 .query_opt(
1248 "SELECT 1 FROM resource_history
1249 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1250 &[&tenant_id, &resource_type, &id, &version_id],
1251 )
1252 .await
1253 .map_err(|e| internal_error(format!("Failed to check version existence: {}", e)))?;
1254
1255 if version_exists.is_none() {
1256 return Err(StorageError::Resource(ResourceError::VersionNotFound {
1257 resource_type: resource_type.to_string(),
1258 id: id.to_string(),
1259 version_id: version_id.to_string(),
1260 }));
1261 }
1262
1263 client
1265 .execute(
1266 "DELETE FROM resource_history
1267 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1268 &[&tenant_id, &resource_type, &id, &version_id],
1269 )
1270 .await
1271 .map_err(|e| internal_error(format!("Failed to delete version: {}", e)))?;
1272
1273 Ok(())
1274 }
1275}
1276
1277#[async_trait]
1282impl TypeHistoryProvider for PostgresBackend {
1283 async fn history_type(
1284 &self,
1285 tenant: &TenantContext,
1286 resource_type: &str,
1287 params: &HistoryParams,
1288 ) -> StorageResult<HistoryPage> {
1289 let client = self.get_client().await?;
1290 let tenant_id = tenant.tenant_id().as_str();
1291
1292 let mut sql = String::from(
1294 "SELECT id, version_id, data, last_updated, is_deleted, fhir_version
1295 FROM resource_history
1296 WHERE tenant_id = $1 AND resource_type = $2",
1297 );
1298 let mut param_index: usize = 3;
1299 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1300 Box::new(tenant_id.to_string()),
1301 Box::new(resource_type.to_string()),
1302 ];
1303
1304 if !params.include_deleted {
1306 sql.push_str(" AND is_deleted = FALSE");
1307 }
1308
1309 if let Some(since) = ¶ms.since {
1311 sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1312 query_params.push(Box::new(*since));
1313 param_index += 1;
1314 }
1315
1316 if let Some(before) = ¶ms.before {
1318 sql.push_str(&format!(" AND last_updated < ${}", param_index));
1319 query_params.push(Box::new(*before));
1320 param_index += 1;
1321 }
1322
1323 if let Some(cursor) = params.pagination.cursor_value() {
1325 let sort_values = cursor.sort_values();
1326 if sort_values.len() >= 2 {
1327 if let (
1328 Some(CursorValue::String(timestamp)),
1329 Some(CursorValue::String(resource_id)),
1330 ) = (sort_values.first(), sort_values.get(1))
1331 {
1332 sql.push_str(&format!(
1333 " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND id < ${}))",
1334 param_index, param_index, param_index + 1
1335 ));
1336 query_params.push(Box::new(timestamp.clone()));
1337 query_params.push(Box::new(resource_id.clone()));
1338 param_index += 2;
1339 }
1340 }
1341 }
1342
1343 let limit = params.pagination.count as i64 + 1;
1345 sql.push_str(&format!(
1346 " ORDER BY last_updated DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1347 param_index
1348 ));
1349 query_params.push(Box::new(limit));
1350
1351 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1352 .iter()
1353 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1354 .collect();
1355
1356 let rows = client
1357 .query(&sql, ¶m_refs)
1358 .await
1359 .map_err(|e| internal_error(format!("Failed to query type history: {}", e)))?;
1360
1361 let mut entries = Vec::new();
1362 let mut last_entry: Option<(String, String)> = None; for row in &rows {
1365 if entries.len() >= params.pagination.count as usize {
1366 break;
1367 }
1368
1369 let row_id: String = row.get(0);
1370 let version_id: String = row.get(1);
1371 let data: Value = row.get(2);
1372 let last_updated: DateTime<Utc> = row.get(3);
1373 let is_deleted: bool = row.get(4);
1374 let fhir_version_str: String = row.get(5);
1375
1376 let deleted_at = if is_deleted { Some(last_updated) } else { None };
1377
1378 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1379 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1380
1381 let resource = StoredResource::from_storage(
1382 resource_type,
1383 &row_id,
1384 &version_id,
1385 tenant.tenant_id().clone(),
1386 data,
1387 last_updated,
1388 last_updated,
1389 deleted_at,
1390 fhir_version,
1391 );
1392
1393 let method = if is_deleted {
1394 HistoryMethod::Delete
1395 } else if version_id == "1" {
1396 HistoryMethod::Post
1397 } else {
1398 HistoryMethod::Put
1399 };
1400
1401 last_entry = Some((last_updated.to_rfc3339(), row_id));
1402
1403 entries.push(HistoryEntry {
1404 resource,
1405 method,
1406 timestamp: last_updated,
1407 });
1408 }
1409
1410 let has_more = rows.len() > params.pagination.count as usize;
1412
1413 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1415 let cursor = PageCursor::new(
1416 vec![CursorValue::String(timestamp), CursorValue::String(id)],
1417 resource_type.to_string(),
1418 );
1419 PageInfo::with_next(cursor)
1420 } else {
1421 PageInfo::end()
1422 };
1423
1424 Ok(Page::new(entries, page_info))
1425 }
1426
1427 async fn history_type_count(
1428 &self,
1429 tenant: &TenantContext,
1430 resource_type: &str,
1431 ) -> StorageResult<u64> {
1432 let client = self.get_client().await?;
1433 let tenant_id = tenant.tenant_id().as_str();
1434
1435 let row = client
1436 .query_one(
1437 "SELECT COUNT(*) FROM resource_history
1438 WHERE tenant_id = $1 AND resource_type = $2",
1439 &[&tenant_id, &resource_type],
1440 )
1441 .await
1442 .map_err(|e| internal_error(format!("Failed to count type history: {}", e)))?;
1443
1444 let count: i64 = row.get(0);
1445 Ok(count as u64)
1446 }
1447}
1448
1449#[async_trait]
1454impl SystemHistoryProvider for PostgresBackend {
1455 async fn history_system(
1456 &self,
1457 tenant: &TenantContext,
1458 params: &HistoryParams,
1459 ) -> StorageResult<HistoryPage> {
1460 let client = self.get_client().await?;
1461 let tenant_id = tenant.tenant_id().as_str();
1462
1463 let mut sql = String::from(
1465 "SELECT resource_type, id, version_id, data, last_updated, is_deleted, fhir_version
1466 FROM resource_history
1467 WHERE tenant_id = $1",
1468 );
1469 let mut param_index: usize = 2;
1470 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1471 vec![Box::new(tenant_id.to_string())];
1472
1473 if !params.include_deleted {
1475 sql.push_str(" AND is_deleted = FALSE");
1476 }
1477
1478 if let Some(since) = ¶ms.since {
1480 sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1481 query_params.push(Box::new(*since));
1482 param_index += 1;
1483 }
1484
1485 if let Some(before) = ¶ms.before {
1487 sql.push_str(&format!(" AND last_updated < ${}", param_index));
1488 query_params.push(Box::new(*before));
1489 param_index += 1;
1490 }
1491
1492 if let Some(cursor) = params.pagination.cursor_value() {
1494 let sort_values = cursor.sort_values();
1495 if sort_values.len() >= 3 {
1496 if let (
1497 Some(CursorValue::String(timestamp)),
1498 Some(CursorValue::String(res_type)),
1499 Some(CursorValue::String(res_id)),
1500 ) = (sort_values.first(), sort_values.get(1), sort_values.get(2))
1501 {
1502 sql.push_str(&format!(
1503 " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND (resource_type < ${} OR (resource_type = ${} AND id < ${}))))",
1504 param_index, param_index, param_index + 1, param_index + 1, param_index + 2
1505 ));
1506 query_params.push(Box::new(timestamp.clone()));
1507 query_params.push(Box::new(res_type.clone()));
1508 query_params.push(Box::new(res_id.clone()));
1509 param_index += 3;
1510 }
1511 }
1512 }
1513
1514 let limit = params.pagination.count as i64 + 1;
1516 sql.push_str(&format!(
1517 " ORDER BY last_updated DESC, resource_type DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1518 param_index
1519 ));
1520 query_params.push(Box::new(limit));
1521
1522 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1523 .iter()
1524 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1525 .collect();
1526
1527 let rows = client
1528 .query(&sql, ¶m_refs)
1529 .await
1530 .map_err(|e| internal_error(format!("Failed to query system history: {}", e)))?;
1531
1532 let mut entries = Vec::new();
1533 let mut last_entry: Option<(String, String, String)> = None;
1534
1535 for row in &rows {
1536 if entries.len() >= params.pagination.count as usize {
1537 break;
1538 }
1539
1540 let row_resource_type: String = row.get(0);
1541 let row_id: String = row.get(1);
1542 let version_id: String = row.get(2);
1543 let data: Value = row.get(3);
1544 let last_updated: DateTime<Utc> = row.get(4);
1545 let is_deleted: bool = row.get(5);
1546 let fhir_version_str: String = row.get(6);
1547
1548 let deleted_at = if is_deleted { Some(last_updated) } else { None };
1549
1550 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1551 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1552
1553 let resource = StoredResource::from_storage(
1554 &row_resource_type,
1555 &row_id,
1556 &version_id,
1557 tenant.tenant_id().clone(),
1558 data,
1559 last_updated,
1560 last_updated,
1561 deleted_at,
1562 fhir_version,
1563 );
1564
1565 let method = if is_deleted {
1566 HistoryMethod::Delete
1567 } else if version_id == "1" {
1568 HistoryMethod::Post
1569 } else {
1570 HistoryMethod::Put
1571 };
1572
1573 last_entry = Some((last_updated.to_rfc3339(), row_resource_type, row_id));
1574
1575 entries.push(HistoryEntry {
1576 resource,
1577 method,
1578 timestamp: last_updated,
1579 });
1580 }
1581
1582 let has_more = rows.len() > params.pagination.count as usize;
1583
1584 let page_info = if let (true, Some((timestamp, resource_type, id))) = (has_more, last_entry)
1585 {
1586 let cursor = PageCursor::new(
1587 vec![
1588 CursorValue::String(timestamp),
1589 CursorValue::String(resource_type),
1590 CursorValue::String(id),
1591 ],
1592 "system".to_string(),
1593 );
1594 PageInfo::with_next(cursor)
1595 } else {
1596 PageInfo::end()
1597 };
1598
1599 Ok(Page::new(entries, page_info))
1600 }
1601
1602 async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1603 let client = self.get_client().await?;
1604 let tenant_id = tenant.tenant_id().as_str();
1605
1606 let row = client
1607 .query_one(
1608 "SELECT COUNT(*) FROM resource_history WHERE tenant_id = $1",
1609 &[&tenant_id],
1610 )
1611 .await
1612 .map_err(|e| internal_error(format!("Failed to count system history: {}", e)))?;
1613
1614 let count: i64 = row.get(0);
1615 Ok(count as u64)
1616 }
1617}
1618
1619#[async_trait]
1624impl DifferentialHistoryProvider for PostgresBackend {
1625 async fn modified_since(
1626 &self,
1627 tenant: &TenantContext,
1628 resource_type: Option<&str>,
1629 since: DateTime<Utc>,
1630 pagination: &Pagination,
1631 ) -> StorageResult<Page<StoredResource>> {
1632 let client = self.get_client().await?;
1633 let tenant_id = tenant.tenant_id().as_str();
1634
1635 let mut sql = String::from(
1637 "SELECT resource_type, id, version_id, data, last_updated, fhir_version
1638 FROM resources
1639 WHERE tenant_id = $1 AND last_updated > $2 AND is_deleted = FALSE",
1640 );
1641 let mut param_index: usize = 3;
1642 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1643 vec![Box::new(tenant_id.to_string()), Box::new(since)];
1644
1645 if let Some(rt) = resource_type {
1647 sql.push_str(&format!(" AND resource_type = ${}", param_index));
1648 query_params.push(Box::new(rt.to_string()));
1649 param_index += 1;
1650 }
1651
1652 if let Some(cursor) = pagination.cursor_value() {
1654 let sort_values = cursor.sort_values();
1655 if sort_values.len() >= 2 {
1656 if let (Some(CursorValue::String(timestamp)), Some(CursorValue::String(res_id))) =
1657 (sort_values.first(), sort_values.get(1))
1658 {
1659 sql.push_str(&format!(
1660 " AND (last_updated > ${}::timestamptz OR (last_updated = ${}::timestamptz AND id > ${}))",
1661 param_index, param_index, param_index + 1
1662 ));
1663 query_params.push(Box::new(timestamp.clone()));
1664 query_params.push(Box::new(res_id.clone()));
1665 param_index += 2;
1666 }
1667 }
1668 }
1669
1670 let limit = pagination.count as i64 + 1;
1672 sql.push_str(&format!(
1673 " ORDER BY last_updated ASC, id ASC LIMIT ${}",
1674 param_index
1675 ));
1676 query_params.push(Box::new(limit));
1677
1678 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1679 .iter()
1680 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1681 .collect();
1682
1683 let rows = client
1684 .query(&sql, ¶m_refs)
1685 .await
1686 .map_err(|e| internal_error(format!("Failed to query modified resources: {}", e)))?;
1687
1688 let mut resources = Vec::new();
1689 let mut last_entry: Option<(String, String)> = None;
1690
1691 for row in &rows {
1692 if resources.len() >= pagination.count as usize {
1693 break;
1694 }
1695
1696 let row_resource_type: String = row.get(0);
1697 let row_id: String = row.get(1);
1698 let version_id: String = row.get(2);
1699 let data: Value = row.get(3);
1700 let last_updated: DateTime<Utc> = row.get(4);
1701 let fhir_version_str: String = row.get(5);
1702
1703 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1704 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1705
1706 let resource = StoredResource::from_storage(
1707 &row_resource_type,
1708 &row_id,
1709 &version_id,
1710 tenant.tenant_id().clone(),
1711 data,
1712 last_updated,
1713 last_updated,
1714 None,
1715 fhir_version,
1716 );
1717
1718 last_entry = Some((last_updated.to_rfc3339(), row_id));
1719 resources.push(resource);
1720 }
1721
1722 let has_more = rows.len() > pagination.count as usize;
1723
1724 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1725 let cursor = PageCursor::new(
1726 vec![CursorValue::String(timestamp), CursorValue::String(id)],
1727 "modified_since".to_string(),
1728 );
1729 PageInfo::with_next(cursor)
1730 } else {
1731 PageInfo::end()
1732 };
1733
1734 Ok(Page::new(resources, page_info))
1735 }
1736}
1737
1738#[async_trait]
1743impl PurgableStorage for PostgresBackend {
1744 async fn purge(
1745 &self,
1746 tenant: &TenantContext,
1747 resource_type: &str,
1748 id: &str,
1749 ) -> StorageResult<()> {
1750 let client = self.get_client().await?;
1751 let tenant_id = tenant.tenant_id().as_str();
1752
1753 let exists = client
1755 .query_opt(
1756 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1757 &[&tenant_id, &resource_type, &id],
1758 )
1759 .await
1760 .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
1761
1762 if exists.is_none() {
1763 let history_exists = client
1765 .query_opt(
1766 "SELECT 1 FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1767 &[&tenant_id, &resource_type, &id],
1768 )
1769 .await
1770 .map_err(|e| internal_error(format!("Failed to check history: {}", e)))?;
1771
1772 if history_exists.is_none() {
1773 return Err(StorageError::Resource(ResourceError::NotFound {
1774 resource_type: resource_type.to_string(),
1775 id: id.to_string(),
1776 }));
1777 }
1778 }
1779
1780 client
1782 .execute(
1783 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1784 &[&tenant_id, &resource_type, &id],
1785 )
1786 .await
1787 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1788
1789 let _ = client
1791 .execute(
1792 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1793 &[&tenant_id, &resource_type, &id],
1794 )
1795 .await;
1796
1797 client
1799 .execute(
1800 "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1801 &[&tenant_id, &resource_type, &id],
1802 )
1803 .await
1804 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1805
1806 client
1808 .execute(
1809 "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1810 &[&tenant_id, &resource_type, &id],
1811 )
1812 .await
1813 .map_err(|e| internal_error(format!("Failed to purge resource: {}", e)))?;
1814
1815 Ok(())
1816 }
1817
1818 async fn purge_all(&self, tenant: &TenantContext, resource_type: &str) -> StorageResult<u64> {
1819 let client = self.get_client().await?;
1820 let tenant_id = tenant.tenant_id().as_str();
1821
1822 let row = client
1824 .query_one(
1825 "SELECT COUNT(DISTINCT id) FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1826 &[&tenant_id, &resource_type],
1827 )
1828 .await
1829 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1830 let count: i64 = row.get(0);
1831
1832 client
1834 .execute(
1835 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2",
1836 &[&tenant_id, &resource_type],
1837 )
1838 .await
1839 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1840
1841 let _ = client
1843 .execute(
1844 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2",
1845 &[&tenant_id, &resource_type],
1846 )
1847 .await;
1848
1849 client
1851 .execute(
1852 "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2",
1853 &[&tenant_id, &resource_type],
1854 )
1855 .await
1856 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1857
1858 client
1860 .execute(
1861 "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1862 &[&tenant_id, &resource_type],
1863 )
1864 .await
1865 .map_err(|e| internal_error(format!("Failed to purge resources: {}", e)))?;
1866
1867 Ok(count as u64)
1868 }
1869}
1870
1871fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
1878 params
1879 .split('&')
1880 .filter_map(|pair| {
1881 let parts: Vec<&str> = pair.splitn(2, '=').collect();
1882 if parts.len() == 2 {
1883 Some((parts[0].to_string(), parts[1].to_string()))
1884 } else {
1885 None
1886 }
1887 })
1888 .collect()
1889}
1890
1891#[async_trait]
1892impl ConditionalStorage for PostgresBackend {
1893 async fn conditional_create(
1894 &self,
1895 tenant: &TenantContext,
1896 resource_type: &str,
1897 resource: Value,
1898 search_params: &str,
1899 fhir_version: FhirVersion,
1900 ) -> StorageResult<ConditionalCreateResult> {
1901 let matches = self
1903 .find_matching_resources(tenant, resource_type, search_params)
1904 .await?;
1905
1906 match matches.len() {
1907 0 => {
1908 let created = self
1910 .create(tenant, resource_type, resource, fhir_version)
1911 .await?;
1912 Ok(ConditionalCreateResult::Created(created))
1913 }
1914 1 => {
1915 Ok(ConditionalCreateResult::Exists(
1917 matches.into_iter().next().unwrap(),
1918 ))
1919 }
1920 n => {
1921 Ok(ConditionalCreateResult::MultipleMatches(n))
1923 }
1924 }
1925 }
1926
1927 async fn conditional_update(
1928 &self,
1929 tenant: &TenantContext,
1930 resource_type: &str,
1931 resource: Value,
1932 search_params: &str,
1933 upsert: bool,
1934 fhir_version: FhirVersion,
1935 ) -> StorageResult<ConditionalUpdateResult> {
1936 let matches = self
1938 .find_matching_resources(tenant, resource_type, search_params)
1939 .await?;
1940
1941 match matches.len() {
1942 0 => {
1943 if upsert {
1944 let created = self
1946 .create(tenant, resource_type, resource, fhir_version)
1947 .await?;
1948 Ok(ConditionalUpdateResult::Created(created))
1949 } else {
1950 Ok(ConditionalUpdateResult::NoMatch)
1952 }
1953 }
1954 1 => {
1955 let existing = matches.into_iter().next().unwrap();
1957 let updated = self.update(tenant, &existing, resource).await?;
1958 Ok(ConditionalUpdateResult::Updated(updated))
1959 }
1960 n => {
1961 Ok(ConditionalUpdateResult::MultipleMatches(n))
1963 }
1964 }
1965 }
1966
1967 async fn conditional_delete(
1968 &self,
1969 tenant: &TenantContext,
1970 resource_type: &str,
1971 search_params: &str,
1972 ) -> StorageResult<ConditionalDeleteResult> {
1973 let matches = self
1975 .find_matching_resources(tenant, resource_type, search_params)
1976 .await?;
1977
1978 match matches.len() {
1979 0 => {
1980 Ok(ConditionalDeleteResult::NoMatch)
1982 }
1983 1 => {
1984 let existing = matches.into_iter().next().unwrap();
1986 self.delete(tenant, resource_type, existing.id()).await?;
1987 Ok(ConditionalDeleteResult::Deleted)
1988 }
1989 n => {
1990 Ok(ConditionalDeleteResult::MultipleMatches(n))
1992 }
1993 }
1994 }
1995
1996 async fn conditional_patch(
1997 &self,
1998 tenant: &TenantContext,
1999 resource_type: &str,
2000 search_params: &str,
2001 patch: &crate::core::PatchFormat,
2002 ) -> StorageResult<crate::core::ConditionalPatchResult> {
2003 use crate::core::{ConditionalPatchResult, PatchFormat};
2004
2005 let matches = self
2007 .find_matching_resources(tenant, resource_type, search_params)
2008 .await?;
2009
2010 match matches.len() {
2011 0 => Ok(ConditionalPatchResult::NoMatch),
2012 1 => {
2013 let existing = matches.into_iter().next().unwrap();
2015 let current_content = existing.content().clone();
2016
2017 let patched_content = match patch {
2019 PatchFormat::JsonPatch(patch_doc) => {
2020 self.apply_json_patch(¤t_content, patch_doc)?
2021 }
2022 PatchFormat::FhirPathPatch(patch_params) => {
2023 self.apply_fhirpath_patch(¤t_content, patch_params)?
2024 }
2025 PatchFormat::MergePatch(merge_doc) => {
2026 self.apply_merge_patch(¤t_content, merge_doc)
2027 }
2028 };
2029
2030 let updated = self.update(tenant, &existing, patched_content).await?;
2032 Ok(ConditionalPatchResult::Patched(updated))
2033 }
2034 n => Ok(ConditionalPatchResult::MultipleMatches(n)),
2035 }
2036 }
2037}
2038
2039impl PostgresBackend {
2040 async fn find_matching_resources(
2044 &self,
2045 tenant: &TenantContext,
2046 resource_type: &str,
2047 search_params_str: &str,
2048 ) -> StorageResult<Vec<StoredResource>> {
2049 let parsed_params = parse_simple_search_params(search_params_str);
2051
2052 if parsed_params.is_empty() {
2053 return Ok(Vec::new());
2055 }
2056
2057 let search_params = self.build_search_parameters(resource_type, &parsed_params)?;
2059
2060 let query = SearchQuery {
2062 resource_type: resource_type.to_string(),
2063 parameters: search_params,
2064 count: Some(1000),
2065 ..Default::default()
2066 };
2067
2068 let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
2070
2071 Ok(result.resources.items)
2072 }
2073
2074 fn build_search_parameters(
2076 &self,
2077 resource_type: &str,
2078 params: &[(String, String)],
2079 ) -> StorageResult<Vec<SearchParameter>> {
2080 let registry = self.search_registry().read();
2081 let mut search_params = Vec::with_capacity(params.len());
2082
2083 for (name, value) in params {
2084 let param_type = self
2085 .lookup_param_type(®istry, resource_type, name)
2086 .unwrap_or({
2087 match name.as_str() {
2088 "_id" => SearchParamType::Token,
2089 "_lastUpdated" => SearchParamType::Date,
2090 "_tag" | "_profile" | "_security" => SearchParamType::Token,
2091 "identifier" => SearchParamType::Token,
2092 "patient" | "subject" | "encounter" | "performer" | "author"
2093 | "requester" | "recorder" | "asserter" | "practitioner"
2094 | "organization" | "location" | "device" => SearchParamType::Reference,
2095 _ => SearchParamType::String,
2096 }
2097 });
2098
2099 search_params.push(SearchParameter {
2100 name: name.clone(),
2101 param_type,
2102 modifier: None,
2103 values: vec![SearchValue::parse(value)],
2104 chain: vec![],
2105 components: vec![],
2106 });
2107 }
2108
2109 Ok(search_params)
2110 }
2111
2112 fn lookup_param_type(
2114 &self,
2115 registry: &crate::search::SearchParameterRegistry,
2116 resource_type: &str,
2117 param_name: &str,
2118 ) -> Option<SearchParamType> {
2119 if let Some(def) = registry.get_param(resource_type, param_name) {
2120 return Some(def.param_type);
2121 }
2122 if let Some(def) = registry.get_param("Resource", param_name) {
2123 return Some(def.param_type);
2124 }
2125 None
2126 }
2127
2128 fn apply_json_patch(&self, resource: &Value, patch_doc: &Value) -> StorageResult<Value> {
2134 use crate::error::ValidationError;
2135
2136 let patch: json_patch::Patch = serde_json::from_value(patch_doc.clone()).map_err(|e| {
2137 StorageError::Validation(ValidationError::InvalidResource {
2138 message: format!("Invalid JSON Patch document: {}", e),
2139 details: vec![],
2140 })
2141 })?;
2142
2143 let mut patched = resource.clone();
2144 json_patch::patch(&mut patched, &patch).map_err(|e| {
2145 StorageError::Validation(ValidationError::InvalidResource {
2146 message: format!("Failed to apply JSON Patch: {}", e),
2147 details: vec![],
2148 })
2149 })?;
2150
2151 Ok(patched)
2152 }
2153
2154 fn apply_fhirpath_patch(&self, resource: &Value, patch_params: &Value) -> StorageResult<Value> {
2156 use crate::error::ValidationError;
2157
2158 let parameter = patch_params.get("parameter").and_then(|p| p.as_array());
2159 if parameter.is_none() {
2160 return Err(StorageError::Validation(ValidationError::InvalidResource {
2161 message: "FHIRPath Patch must have a 'parameter' array".to_string(),
2162 details: vec![],
2163 }));
2164 }
2165
2166 let mut patched = resource.clone();
2167
2168 for operation in parameter.unwrap() {
2169 let parts = operation.get("part").and_then(|p| p.as_array());
2170 if parts.is_none() {
2171 continue;
2172 }
2173
2174 let mut op_type = None;
2175 let mut op_path = None;
2176 let mut op_name = None;
2177 let mut op_value = None;
2178
2179 for part in parts.unwrap() {
2180 match part.get("name").and_then(|n| n.as_str()) {
2181 Some("type") => {
2182 op_type = part
2183 .get("valueCode")
2184 .and_then(|v| v.as_str())
2185 .map(|s| s.to_string());
2186 }
2187 Some("path") => {
2188 op_path = part
2189 .get("valueString")
2190 .and_then(|v| v.as_str())
2191 .map(|s| s.to_string());
2192 }
2193 Some("name") => {
2194 op_name = part
2195 .get("valueString")
2196 .and_then(|v| v.as_str())
2197 .map(|s| s.to_string());
2198 }
2199 Some("value") => {
2200 op_value = extract_part_value(part);
2201 }
2202 _ => {}
2203 }
2204 }
2205
2206 match op_type.as_deref() {
2207 Some("replace") => {
2208 if let (Some(path), Some(value)) = (&op_path, &op_value) {
2209 self.fhirpath_replace(&mut patched, path, value)?;
2210 }
2211 }
2212 Some("add") => {
2213 if let (Some(path), Some(name), Some(value)) = (&op_path, &op_name, &op_value) {
2214 self.fhirpath_add(&mut patched, path, name, value)?;
2215 }
2216 }
2217 Some("delete") => {
2218 if let Some(path) = &op_path {
2219 self.fhirpath_delete(&mut patched, path)?;
2220 }
2221 }
2222 _ => {
2223 }
2225 }
2226 }
2227
2228 Ok(patched)
2229 }
2230
2231 fn fhirpath_replace(
2233 &self,
2234 resource: &mut Value,
2235 path: &str,
2236 value: &Value,
2237 ) -> StorageResult<()> {
2238 let parts: Vec<&str> = path.split('.').collect();
2239 if parts.len() == 2 {
2240 if let Some(obj) = resource.as_object_mut() {
2241 obj.insert(parts[1].to_string(), value.clone());
2242 }
2243 }
2244 Ok(())
2245 }
2246
2247 fn fhirpath_add(
2249 &self,
2250 resource: &mut Value,
2251 path: &str,
2252 name: &str,
2253 value: &Value,
2254 ) -> StorageResult<()> {
2255 let parts: Vec<&str> = path.split('.').collect();
2256 if parts.len() == 1
2257 && parts[0]
2258 == resource
2259 .get("resourceType")
2260 .and_then(|r| r.as_str())
2261 .unwrap_or("")
2262 {
2263 if let Some(obj) = resource.as_object_mut() {
2264 obj.insert(name.to_string(), value.clone());
2265 }
2266 }
2267 Ok(())
2268 }
2269
2270 fn fhirpath_delete(&self, resource: &mut Value, path: &str) -> StorageResult<()> {
2272 let parts: Vec<&str> = path.split('.').collect();
2273 if parts.len() == 2 {
2274 if let Some(obj) = resource.as_object_mut() {
2275 obj.remove(parts[1]);
2276 }
2277 }
2278 Ok(())
2279 }
2280
2281 fn apply_merge_patch(&self, resource: &Value, merge_doc: &Value) -> Value {
2283 let mut patched = resource.clone();
2284 json_patch::merge(&mut patched, merge_doc);
2285 patched
2286 }
2287}
2288
2289#[async_trait]
2294impl BundleProvider for PostgresBackend {
2295 async fn process_transaction(
2296 &self,
2297 tenant: &TenantContext,
2298 entries: Vec<BundleEntry>,
2299 ) -> Result<BundleResult, TransactionError> {
2300 use crate::core::transaction::{Transaction, TransactionOptions, TransactionProvider};
2301 use std::collections::HashMap;
2302
2303 let mut tx = self
2305 .begin_transaction(tenant, TransactionOptions::new())
2306 .await
2307 .map_err(|e| TransactionError::RolledBack {
2308 reason: format!("Failed to begin transaction: {}", e),
2309 })?;
2310
2311 let mut results = Vec::with_capacity(entries.len());
2312 let mut error_info: Option<(usize, String)> = None;
2313
2314 let mut reference_map: HashMap<String, String> = HashMap::new();
2316
2317 let mut entries = entries;
2319
2320 for (idx, entry) in entries.iter_mut().enumerate() {
2322 if let Some(ref mut resource) = entry.resource {
2324 resolve_bundle_references(resource, &reference_map);
2325 }
2326
2327 let result = self.process_bundle_entry_tx(&mut tx, entry).await;
2328
2329 match result {
2330 Ok(entry_result) => {
2331 if entry_result.status >= 400 {
2332 error_info = Some((
2333 idx,
2334 format!("Entry failed with status {}", entry_result.status),
2335 ));
2336 break;
2337 }
2338
2339 if entry.method == BundleMethod::Post {
2341 if let Some(ref full_url) = entry.full_url {
2342 if let Some(ref location) = entry_result.location {
2343 let reference = location
2344 .split("/_history")
2345 .next()
2346 .unwrap_or(location)
2347 .to_string();
2348 reference_map.insert(full_url.clone(), reference);
2349 }
2350 }
2351 }
2352
2353 results.push(entry_result);
2354 }
2355 Err(e) => {
2356 error_info = Some((idx, format!("Entry processing failed: {}", e)));
2357 break;
2358 }
2359 }
2360 }
2361
2362 if let Some((index, message)) = error_info {
2364 let _ = Box::new(tx).rollback().await;
2365 return Err(TransactionError::BundleError { index, message });
2366 }
2367
2368 Box::new(tx)
2370 .commit()
2371 .await
2372 .map_err(|e| TransactionError::RolledBack {
2373 reason: format!("Commit failed: {}", e),
2374 })?;
2375
2376 Ok(BundleResult {
2377 bundle_type: BundleType::Transaction,
2378 entries: results,
2379 })
2380 }
2381
2382 async fn process_batch(
2383 &self,
2384 tenant: &TenantContext,
2385 entries: Vec<BundleEntry>,
2386 ) -> StorageResult<BundleResult> {
2387 let mut results = Vec::with_capacity(entries.len());
2388
2389 for entry in &entries {
2391 let result = self.process_batch_entry(tenant, entry).await;
2392 results.push(result);
2393 }
2394
2395 Ok(BundleResult {
2396 bundle_type: BundleType::Batch,
2397 entries: results,
2398 })
2399 }
2400}
2401
2402impl PostgresBackend {
2403 async fn process_bundle_entry_tx(
2405 &self,
2406 tx: &mut super::transaction::PostgresTransaction,
2407 entry: &BundleEntry,
2408 ) -> StorageResult<BundleEntryResult> {
2409 use crate::core::transaction::Transaction;
2410
2411 match entry.method {
2412 BundleMethod::Get => {
2413 let (resource_type, id) = self.parse_url(&entry.url)?;
2414 match tx.read(&resource_type, &id).await? {
2415 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2416 None => Ok(BundleEntryResult::error(
2417 404,
2418 serde_json::json!({
2419 "resourceType": "OperationOutcome",
2420 "issue": [{"severity": "error", "code": "not-found"}]
2421 }),
2422 )),
2423 }
2424 }
2425 BundleMethod::Post => {
2426 let resource = entry.resource.clone().ok_or_else(|| {
2427 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2428 field: "resource".to_string(),
2429 })
2430 })?;
2431
2432 let resource_type = resource
2433 .get("resourceType")
2434 .and_then(|v| v.as_str())
2435 .map(|s| s.to_string())
2436 .ok_or_else(|| {
2437 StorageError::Validation(
2438 crate::error::ValidationError::MissingRequiredField {
2439 field: "resourceType".to_string(),
2440 },
2441 )
2442 })?;
2443
2444 let created = tx.create(&resource_type, resource).await?;
2445 Ok(BundleEntryResult::created(created))
2446 }
2447 BundleMethod::Put => {
2448 let resource = entry.resource.clone().ok_or_else(|| {
2449 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2450 field: "resource".to_string(),
2451 })
2452 })?;
2453
2454 let (resource_type, id) = self.parse_url(&entry.url)?;
2455
2456 match tx.read(&resource_type, &id).await? {
2457 Some(existing) => {
2458 if let Some(ref if_match) = entry.if_match {
2460 let current_etag = existing.etag();
2461 if current_etag != if_match.as_str() {
2462 return Ok(BundleEntryResult::error(
2463 412,
2464 serde_json::json!({
2465 "resourceType": "OperationOutcome",
2466 "issue": [{"severity": "error", "code": "conflict", "diagnostics": "ETag mismatch"}]
2467 }),
2468 ));
2469 }
2470 }
2471 let updated = tx.update(&existing, resource).await?;
2472 Ok(BundleEntryResult::ok(updated))
2473 }
2474 None => {
2475 let mut resource_with_id = resource;
2477 resource_with_id["id"] = serde_json::json!(id);
2478 let created = tx.create(&resource_type, resource_with_id).await?;
2479 Ok(BundleEntryResult::created(created))
2480 }
2481 }
2482 }
2483 BundleMethod::Delete => {
2484 let (resource_type, id) = self.parse_url(&entry.url)?;
2485 tx.delete(&resource_type, &id).await?;
2486 Ok(BundleEntryResult::deleted())
2487 }
2488 BundleMethod::Patch => {
2489 Ok(BundleEntryResult::error(
2491 501,
2492 serde_json::json!({
2493 "resourceType": "OperationOutcome",
2494 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented in transaction bundles"}]
2495 }),
2496 ))
2497 }
2498 }
2499 }
2500
2501 async fn process_batch_entry(
2503 &self,
2504 tenant: &TenantContext,
2505 entry: &BundleEntry,
2506 ) -> BundleEntryResult {
2507 match self.process_batch_entry_inner(tenant, entry).await {
2508 Ok(result) => result,
2509 Err(e) => BundleEntryResult::error(
2510 500,
2511 serde_json::json!({
2512 "resourceType": "OperationOutcome",
2513 "issue": [{"severity": "error", "code": "exception", "diagnostics": e.to_string()}]
2514 }),
2515 ),
2516 }
2517 }
2518
2519 async fn process_batch_entry_inner(
2520 &self,
2521 tenant: &TenantContext,
2522 entry: &BundleEntry,
2523 ) -> StorageResult<BundleEntryResult> {
2524 match entry.method {
2525 BundleMethod::Get => {
2526 let (resource_type, id) = self.parse_url(&entry.url)?;
2527 match self.read(tenant, &resource_type, &id).await? {
2528 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2529 None => Ok(BundleEntryResult::error(
2530 404,
2531 serde_json::json!({
2532 "resourceType": "OperationOutcome",
2533 "issue": [{"severity": "error", "code": "not-found"}]
2534 }),
2535 )),
2536 }
2537 }
2538 BundleMethod::Post => {
2539 let resource = entry.resource.clone().ok_or_else(|| {
2540 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2541 field: "resource".to_string(),
2542 })
2543 })?;
2544
2545 let resource_type = resource
2546 .get("resourceType")
2547 .and_then(|v| v.as_str())
2548 .map(|s| s.to_string())
2549 .ok_or_else(|| {
2550 StorageError::Validation(
2551 crate::error::ValidationError::MissingRequiredField {
2552 field: "resourceType".to_string(),
2553 },
2554 )
2555 })?;
2556
2557 let created = self
2558 .create(
2559 tenant,
2560 &resource_type,
2561 resource,
2562 FhirVersion::default_enabled(),
2563 )
2564 .await?;
2565 Ok(BundleEntryResult::created(created))
2566 }
2567 BundleMethod::Put => {
2568 let resource = entry.resource.clone().ok_or_else(|| {
2569 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2570 field: "resource".to_string(),
2571 })
2572 })?;
2573
2574 let (resource_type, id) = self.parse_url(&entry.url)?;
2575 let (stored, _created) = self
2576 .create_or_update(
2577 tenant,
2578 &resource_type,
2579 &id,
2580 resource,
2581 FhirVersion::default_enabled(),
2582 )
2583 .await?;
2584 Ok(BundleEntryResult::ok(stored))
2585 }
2586 BundleMethod::Delete => {
2587 let (resource_type, id) = self.parse_url(&entry.url)?;
2588 match self.delete(tenant, &resource_type, &id).await {
2589 Ok(()) => Ok(BundleEntryResult::deleted()),
2590 Err(StorageError::Resource(ResourceError::NotFound { .. })) => {
2591 Ok(BundleEntryResult::deleted()) }
2593 Err(e) => Err(e),
2594 }
2595 }
2596 BundleMethod::Patch => Ok(BundleEntryResult::error(
2597 501,
2598 serde_json::json!({
2599 "resourceType": "OperationOutcome",
2600 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2601 }),
2602 )),
2603 }
2604 }
2605
2606 fn parse_url(&self, url: &str) -> StorageResult<(String, String)> {
2608 let path = url
2609 .strip_prefix("http://")
2610 .or_else(|| url.strip_prefix("https://"))
2611 .map(|s| s.find('/').map(|i| &s[i..]).unwrap_or(s))
2612 .unwrap_or(url);
2613
2614 let path = path.trim_start_matches('/');
2615 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
2616
2617 if parts.len() >= 2 {
2618 let len = parts.len();
2619 Ok((parts[len - 2].to_string(), parts[len - 1].to_string()))
2620 } else {
2621 Err(StorageError::Validation(
2622 crate::error::ValidationError::InvalidReference {
2623 reference: url.to_string(),
2624 message: "URL must be in format ResourceType/id".to_string(),
2625 },
2626 ))
2627 }
2628 }
2629}
2630
2631fn resolve_bundle_references(
2633 value: &mut serde_json::Value,
2634 reference_map: &std::collections::HashMap<String, String>,
2635) {
2636 use serde_json::Value;
2637 match value {
2638 Value::Object(map) => {
2639 if let Some(Value::String(ref_str)) = map.get("reference") {
2640 if ref_str.starts_with("urn:uuid:") {
2641 if let Some(resolved) = reference_map.get(ref_str) {
2642 map.insert("reference".to_string(), Value::String(resolved.clone()));
2643 }
2644 }
2645 }
2646 for v in map.values_mut() {
2647 resolve_bundle_references(v, reference_map);
2648 }
2649 }
2650 Value::Array(arr) => {
2651 for item in arr {
2652 resolve_bundle_references(item, reference_map);
2653 }
2654 }
2655 _ => {}
2656 }
2657}
2658
2659#[async_trait]
2664impl ReindexableStorage for PostgresBackend {
2665 async fn list_resource_types(&self, tenant: &TenantContext) -> StorageResult<Vec<String>> {
2666 let client = self.get_client().await?;
2667 let tenant_id = tenant.tenant_id().as_str();
2668
2669 let rows = client
2670 .query(
2671 "SELECT DISTINCT resource_type FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
2672 &[&tenant_id],
2673 )
2674 .await
2675 .map_err(|e| internal_error(format!("Failed to query resource types: {}", e)))?;
2676
2677 let types: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
2678 Ok(types)
2679 }
2680
2681 async fn count_resources(
2682 &self,
2683 tenant: &TenantContext,
2684 resource_type: &str,
2685 ) -> StorageResult<u64> {
2686 self.count(tenant, Some(resource_type)).await
2687 }
2688
2689 async fn fetch_resources_page(
2690 &self,
2691 tenant: &TenantContext,
2692 resource_type: &str,
2693 cursor: Option<&str>,
2694 limit: u32,
2695 ) -> StorageResult<ResourcePage> {
2696 let client = self.get_client().await?;
2697 let tenant_id = tenant.tenant_id().as_str();
2698
2699 let (cursor_ts, cursor_id) = if let Some(c) = cursor {
2701 let parts: Vec<&str> = c.split('|').collect();
2702 if parts.len() == 2 {
2703 let ts = DateTime::parse_from_rfc3339(parts[0])
2704 .map(|dt| dt.with_timezone(&Utc))
2705 .map_err(|e| internal_error(format!("Invalid cursor timestamp: {}", e)))?;
2706 (Some(ts), Some(parts[1].to_string()))
2707 } else {
2708 (None, None)
2709 }
2710 } else {
2711 (None, None)
2712 };
2713
2714 let rows = if let (Some(ts), Some(id)) = (&cursor_ts, &cursor_id) {
2715 client
2716 .query(
2717 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2718 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2719 AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
2720 ORDER BY last_updated ASC, id ASC LIMIT $5",
2721 &[
2722 &tenant_id,
2723 &resource_type,
2724 ts,
2725 &id.as_str(),
2726 &(limit as i64),
2727 ],
2728 )
2729 .await
2730 .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2731 } else {
2732 client
2733 .query(
2734 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2735 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2736 ORDER BY last_updated ASC, id ASC LIMIT $3",
2737 &[&tenant_id, &resource_type, &(limit as i64)],
2738 )
2739 .await
2740 .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2741 };
2742
2743 let resources: Vec<StoredResource> = rows
2744 .iter()
2745 .map(|row| {
2746 let id: String = row.get(0);
2747 let version_id: String = row.get(1);
2748 let data: Value = row.get(2);
2749 let last_updated: DateTime<Utc> = row.get(3);
2750 let fhir_version_str: String = row.get(4);
2751 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
2752 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
2753
2754 StoredResource::from_storage(
2755 resource_type,
2756 id,
2757 version_id,
2758 tenant.tenant_id().clone(),
2759 data,
2760 last_updated,
2761 last_updated,
2762 None,
2763 fhir_version,
2764 )
2765 })
2766 .collect();
2767
2768 let next_cursor = if resources.len() == limit as usize {
2770 resources
2771 .last()
2772 .map(|r| format!("{}|{}", r.last_modified().to_rfc3339(), r.id()))
2773 } else {
2774 None
2775 };
2776
2777 Ok(ResourcePage {
2778 resources,
2779 next_cursor,
2780 })
2781 }
2782
2783 async fn delete_search_entries(
2784 &self,
2785 tenant: &TenantContext,
2786 resource_type: &str,
2787 resource_id: &str,
2788 ) -> StorageResult<()> {
2789 let client = self.get_client().await?;
2790 self.delete_search_index(
2791 &client,
2792 tenant.tenant_id().as_str(),
2793 resource_type,
2794 resource_id,
2795 )
2796 .await
2797 }
2798
2799 async fn write_search_entries(
2800 &self,
2801 tenant: &TenantContext,
2802 resource_type: &str,
2803 resource_id: &str,
2804 resource: &Value,
2805 ) -> StorageResult<usize> {
2806 let client = self.get_client().await?;
2807 let tenant_id = tenant.tenant_id().as_str();
2808
2809 let values = self
2811 .search_extractor()
2812 .extract(resource, resource_type)
2813 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
2814
2815 let mut count = 0;
2816 for value in values {
2817 PostgresSearchIndexWriter::write_entry(
2818 &client,
2819 tenant_id,
2820 resource_type,
2821 resource_id,
2822 &value,
2823 )
2824 .await?;
2825 count += 1;
2826 }
2827
2828 count += self
2831 .index_contained_resources(&client, tenant_id, resource_type, resource_id, resource)
2832 .await?;
2833
2834 Ok(count)
2835 }
2836
2837 async fn clear_search_index(&self, tenant: &TenantContext) -> StorageResult<u64> {
2838 let client = self.get_client().await?;
2839 let tenant_id = tenant.tenant_id().as_str();
2840
2841 let deleted = client
2842 .execute(
2843 "DELETE FROM search_index WHERE tenant_id = $1",
2844 &[&tenant_id],
2845 )
2846 .await
2847 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
2848
2849 let _ = client
2851 .execute(
2852 "DELETE FROM resource_fts WHERE tenant_id = $1",
2853 &[&tenant_id],
2854 )
2855 .await;
2856
2857 Ok(deleted)
2858 }
2859}
2860
2861fn normalize_date_for_pg(value: &str) -> String {
2867 if value.contains('T') {
2868 if value.contains('+') || value.contains('Z') || value.ends_with("-00:00") {
2869 value.to_string()
2870 } else {
2871 format!("{}+00:00", value)
2872 }
2873 } else if value.len() == 10 {
2874 format!("{}T00:00:00+00:00", value)
2875 } else if value.len() == 7 {
2876 format!("{}-01T00:00:00+00:00", value)
2877 } else if value.len() == 4 {
2878 format!("{}-01-01T00:00:00+00:00", value)
2879 } else {
2880 value.to_string()
2881 }
2882}
2883
2884struct SearchableContent {
2890 narrative: String,
2891 full_content: String,
2892}
2893
2894impl SearchableContent {
2895 fn is_empty(&self) -> bool {
2896 self.narrative.is_empty() && self.full_content.is_empty()
2897 }
2898}
2899
2900fn extract_searchable_content(resource: &Value) -> SearchableContent {
2902 SearchableContent {
2903 narrative: extract_narrative(resource),
2904 full_content: extract_all_strings(resource),
2905 }
2906}
2907
2908fn extract_narrative(resource: &Value) -> String {
2910 resource
2911 .get("text")
2912 .and_then(|t| t.get("div"))
2913 .and_then(|d| d.as_str())
2914 .map(strip_html_tags)
2915 .unwrap_or_default()
2916}
2917
2918fn strip_html_tags(html: &str) -> String {
2920 let mut result = String::with_capacity(html.len());
2921 let mut in_tag = false;
2922
2923 for c in html.chars() {
2924 match c {
2925 '<' => in_tag = true,
2926 '>' if in_tag => {
2927 in_tag = false;
2928 result.push(' ');
2929 }
2930 _ if !in_tag => result.push(c),
2931 _ => {}
2932 }
2933 }
2934
2935 result.split_whitespace().collect::<Vec<_>>().join(" ")
2936}
2937
2938fn extract_all_strings(value: &Value) -> String {
2940 let mut parts = Vec::new();
2941 collect_strings(value, &mut parts);
2942 parts.join(" ")
2943}
2944
2945fn collect_strings(value: &Value, parts: &mut Vec<String>) {
2946 match value {
2947 Value::String(s) => {
2948 if !s.is_empty() {
2949 parts.push(s.clone());
2950 }
2951 }
2952 Value::Object(map) => {
2953 for (key, val) in map {
2954 if key == "div" || key == "data" {
2955 continue;
2956 }
2957 collect_strings(val, parts);
2958 }
2959 }
2960 Value::Array(arr) => {
2961 for val in arr {
2962 collect_strings(val, parts);
2963 }
2964 }
2965 _ => {}
2966 }
2967}