1use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7
8use crate::core::history::{
9 DifferentialHistoryProvider, HistoryEntry, HistoryMethod, HistoryPage, HistoryParams,
10 InstanceHistoryProvider, SystemHistoryProvider, TypeHistoryProvider,
11};
12use crate::core::transaction::{
13 BundleEntry, BundleEntryResult, BundleMethod, BundleProvider, BundleResult, BundleType,
14};
15use crate::core::{
16 ConditionalCreateResult, ConditionalDeleteResult, ConditionalStorage, ConditionalUpdateResult,
17 PurgableStorage, ResourceStorage, SearchProvider, VersionedStorage,
18};
19use crate::error::TransactionError;
20use crate::error::{BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult};
21use crate::search::loader::SearchParameterLoader;
22use crate::search::registry::SearchParameterStatus;
23use crate::search::reindex::{ReindexableStorage, ResourcePage};
24use crate::tenant::TenantContext;
25use crate::types::Pagination;
26use crate::types::{CursorValue, Page, PageCursor, PageInfo, StoredResource};
27use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
28
29use super::PostgresBackend;
30use super::search::writer::PostgresSearchIndexWriter;
31
32fn internal_error(message: String) -> StorageError {
33 StorageError::Backend(BackendError::Internal {
34 backend_name: "postgres".to_string(),
35 message,
36 source: None,
37 })
38}
39
40#[allow(dead_code)]
41fn serialization_error(message: String) -> StorageError {
42 StorageError::Backend(BackendError::SerializationError { message })
43}
44
45#[async_trait]
46impl ResourceStorage for PostgresBackend {
47 fn backend_name(&self) -> &'static str {
48 "postgres"
49 }
50
51 async fn create(
52 &self,
53 tenant: &TenantContext,
54 resource_type: &str,
55 resource: Value,
56 fhir_version: FhirVersion,
57 ) -> StorageResult<StoredResource> {
58 let client = self.get_client().await?;
59 let tenant_id = tenant.tenant_id().as_str();
60
61 let id = resource
63 .get("id")
64 .and_then(|v| v.as_str())
65 .map(String::from)
66 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
67
68 let exists = client
70 .query_opt(
71 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
72 &[&tenant_id, &resource_type, &id],
73 )
74 .await
75 .map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
76
77 if exists.is_some() {
78 return Err(StorageError::Resource(ResourceError::AlreadyExists {
79 resource_type: resource_type.to_string(),
80 id: id.clone(),
81 }));
82 }
83
84 let mut resource = resource;
86 if let Some(obj) = resource.as_object_mut() {
87 obj.insert(
88 "resourceType".to_string(),
89 Value::String(resource_type.to_string()),
90 );
91 obj.insert("id".to_string(), Value::String(id.clone()));
92 }
93
94 let now = Utc::now();
95 let version_id = "1";
96 let fhir_version_str = fhir_version.as_mime_param();
97 let is_deleted = false;
98
99 client
101 .execute(
102 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
103 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
104 &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
105 )
106 .await
107 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
108
109 client
111 .execute(
112 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
113 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
114 &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
115 )
116 .await
117 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
118
119 self.index_resource(&client, tenant_id, resource_type, &id, &resource)
121 .await?;
122
123 if resource_type == "SearchParameter" {
125 self.handle_search_parameter_create(&resource)?;
126 }
127
128 Ok(StoredResource::from_storage(
130 resource_type,
131 &id,
132 version_id,
133 tenant.tenant_id().clone(),
134 resource,
135 now,
136 now,
137 None,
138 fhir_version,
139 ))
140 }
141
142 async fn create_or_update(
143 &self,
144 tenant: &TenantContext,
145 resource_type: &str,
146 id: &str,
147 resource: Value,
148 fhir_version: FhirVersion,
149 ) -> StorageResult<(StoredResource, bool)> {
150 let existing = self.read(tenant, resource_type, id).await?;
152
153 if let Some(current) = existing {
154 let updated = self.update(tenant, ¤t, resource).await?;
156 Ok((updated, false))
157 } else {
158 let mut resource = resource;
160 if let Some(obj) = resource.as_object_mut() {
161 obj.insert("id".to_string(), Value::String(id.to_string()));
162 }
163 let created = self
164 .create(tenant, resource_type, resource, fhir_version)
165 .await?;
166 Ok((created, true))
167 }
168 }
169
170 async fn read(
171 &self,
172 tenant: &TenantContext,
173 resource_type: &str,
174 id: &str,
175 ) -> StorageResult<Option<StoredResource>> {
176 let client = self.get_client().await?;
177 let tenant_id = tenant.tenant_id().as_str();
178
179 let row = client
180 .query_opt(
181 "SELECT version_id, data, last_updated, is_deleted, deleted_at, fhir_version
182 FROM resources
183 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
184 &[&tenant_id, &resource_type, &id],
185 )
186 .await
187 .map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
188
189 match row {
190 Some(row) => {
191 let version_id: String = row.get(0);
192 let data: Value = row.get(1);
193 let last_updated: DateTime<Utc> = row.get(2);
194 let is_deleted: bool = row.get(3);
195 let deleted_at: Option<DateTime<Utc>> = row.get(4);
196 let fhir_version_str: String = row.get(5);
197
198 if is_deleted {
200 return Err(StorageError::Resource(ResourceError::Gone {
201 resource_type: resource_type.to_string(),
202 id: id.to_string(),
203 deleted_at,
204 }));
205 }
206
207 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
208
209 Ok(Some(StoredResource::from_storage(
210 resource_type,
211 id,
212 version_id,
213 tenant.tenant_id().clone(),
214 data,
215 last_updated,
216 last_updated,
217 None,
218 fhir_version,
219 )))
220 }
221 None => Ok(None),
222 }
223 }
224
225 async fn update(
226 &self,
227 tenant: &TenantContext,
228 current: &StoredResource,
229 resource: Value,
230 ) -> StorageResult<StoredResource> {
231 let client = self.get_client().await?;
232 let tenant_id = tenant.tenant_id().as_str();
233 let resource_type = current.resource_type();
234 let id = current.id();
235
236 let row = client
238 .query_opt(
239 "SELECT version_id FROM resources
240 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
241 &[&tenant_id, &resource_type, &id],
242 )
243 .await
244 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
245
246 let actual_version = match row {
247 Some(row) => row.get::<_, String>(0),
248 None => {
249 return Err(StorageError::Resource(ResourceError::NotFound {
250 resource_type: resource_type.to_string(),
251 id: id.to_string(),
252 }));
253 }
254 };
255
256 if actual_version != current.version_id() {
258 return Err(StorageError::Concurrency(
259 ConcurrencyError::VersionConflict {
260 resource_type: resource_type.to_string(),
261 id: id.to_string(),
262 expected_version: current.version_id().to_string(),
263 actual_version,
264 },
265 ));
266 }
267
268 let new_version: u64 = actual_version.parse().unwrap_or(0) + 1;
270 let new_version_str = new_version.to_string();
271
272 let mut resource = resource;
274 if let Some(obj) = resource.as_object_mut() {
275 obj.insert(
276 "resourceType".to_string(),
277 Value::String(resource_type.to_string()),
278 );
279 obj.insert("id".to_string(), Value::String(id.to_string()));
280 }
281
282 let now = Utc::now();
283 let fhir_version_str = current.fhir_version().as_mime_param();
284 let is_deleted = false;
285
286 client
288 .execute(
289 "UPDATE resources SET version_id = $1, data = $2, last_updated = $3
290 WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
291 &[
292 &new_version_str,
293 &resource,
294 &now,
295 &tenant_id,
296 &resource_type,
297 &id,
298 ],
299 )
300 .await
301 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
302
303 client
305 .execute(
306 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
307 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
308 &[&tenant_id, &resource_type, &id, &new_version_str, &resource, &now, &is_deleted, &fhir_version_str],
309 )
310 .await
311 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
312
313 self.delete_search_index(&client, tenant_id, resource_type, id)
315 .await?;
316 self.index_resource(&client, tenant_id, resource_type, id, &resource)
317 .await?;
318
319 if resource_type == "SearchParameter" {
321 self.handle_search_parameter_update(current.content(), &resource)?;
322 }
323
324 Ok(StoredResource::from_storage(
325 resource_type,
326 id,
327 new_version_str,
328 tenant.tenant_id().clone(),
329 resource,
330 now,
331 now,
332 None,
333 current.fhir_version(),
334 ))
335 }
336
337 async fn delete(
338 &self,
339 tenant: &TenantContext,
340 resource_type: &str,
341 id: &str,
342 ) -> StorageResult<()> {
343 let client = self.get_client().await?;
344 let tenant_id = tenant.tenant_id().as_str();
345
346 let row = client
348 .query_opt(
349 "SELECT version_id, data, fhir_version FROM resources
350 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
351 &[&tenant_id, &resource_type, &id],
352 )
353 .await
354 .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
355
356 let (current_version, data, fhir_version_str) = match row {
357 Some(row) => {
358 let v: String = row.get(0);
359 let d: Value = row.get(1);
360 let f: String = row.get(2);
361 (v, d, f)
362 }
363 None => {
364 return Err(StorageError::Resource(ResourceError::NotFound {
365 resource_type: resource_type.to_string(),
366 id: id.to_string(),
367 }));
368 }
369 };
370
371 let now = Utc::now();
372
373 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
375 let new_version_str = new_version.to_string();
376 let is_deleted = true;
377
378 client
380 .execute(
381 "UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
382 WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
383 &[&now, &new_version_str, &tenant_id, &resource_type, &id],
384 )
385 .await
386 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
387
388 client
390 .execute(
391 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
392 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
393 &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
394 )
395 .await
396 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
397
398 if !self.is_search_offloaded() {
400 client
401 .execute(
402 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
403 &[&tenant_id, &resource_type, &id],
404 )
405 .await
406 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
407 }
408
409 if resource_type == "SearchParameter" {
411 self.handle_search_parameter_delete(&data)?;
412 }
413
414 Ok(())
415 }
416
417 async fn count(
418 &self,
419 tenant: &TenantContext,
420 resource_type: Option<&str>,
421 ) -> StorageResult<u64> {
422 let client = self.get_client().await?;
423 let tenant_id = tenant.tenant_id().as_str();
424
425 let count: i64 = if let Some(rt) = resource_type {
426 let row = client
427 .query_one(
428 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE",
429 &[&tenant_id, &rt],
430 )
431 .await
432 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
433 row.get(0)
434 } else {
435 let row = client
436 .query_one(
437 "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
438 &[&tenant_id],
439 )
440 .await
441 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
442 row.get(0)
443 };
444
445 Ok(count as u64)
446 }
447}
448
449impl PostgresBackend {
454 pub(crate) async fn index_resource(
459 &self,
460 client: &deadpool_postgres::Client,
461 tenant_id: &str,
462 resource_type: &str,
463 resource_id: &str,
464 resource: &Value,
465 ) -> StorageResult<()> {
466 if self.is_search_offloaded() {
468 return Ok(());
469 }
470
471 client
473 .execute(
474 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
475 &[&tenant_id, &resource_type, &resource_id],
476 )
477 .await
478 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
479
480 match self.search_extractor().extract(resource, resource_type) {
482 Ok(values) => {
483 let mut count = 0;
484 for value in values {
485 PostgresSearchIndexWriter::write_entry(
486 client,
487 tenant_id,
488 resource_type,
489 resource_id,
490 &value,
491 )
492 .await?;
493 count += 1;
494 }
495 tracing::debug!(
496 "Dynamically indexed {} values for {}/{}",
497 count,
498 resource_type,
499 resource_id
500 );
501 }
502 Err(e) => {
503 tracing::warn!(
504 "Dynamic extraction failed for {}/{}: {}. Using minimal fallback (_id, _lastUpdated only).",
505 resource_type,
506 resource_id,
507 e
508 );
509 self.index_minimal_fallback(
511 client,
512 tenant_id,
513 resource_type,
514 resource_id,
515 resource,
516 )
517 .await?;
518 }
519 }
520
521 self.index_fts_content(client, tenant_id, resource_type, resource_id, resource)
523 .await?;
524
525 Ok(())
526 }
527
528 async fn index_fts_content(
532 &self,
533 client: &deadpool_postgres::Client,
534 tenant_id: &str,
535 resource_type: &str,
536 resource_id: &str,
537 resource: &Value,
538 ) -> StorageResult<()> {
539 let fts_exists = client
541 .query_opt(
542 "SELECT 1 FROM information_schema.tables WHERE table_name = 'resource_fts'",
543 &[],
544 )
545 .await
546 .map_err(|e| internal_error(format!("Failed to check FTS table: {}", e)))?;
547
548 if fts_exists.is_none() {
549 return Ok(());
550 }
551
552 let content = extract_searchable_content(resource);
554
555 if content.is_empty() {
556 return Ok(());
557 }
558
559 let _ = client
561 .execute(
562 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
563 &[&tenant_id, &resource_type, &resource_id],
564 )
565 .await;
566
567 client
569 .execute(
570 "INSERT INTO resource_fts (resource_id, resource_type, tenant_id, narrative_text, full_content)
571 VALUES ($1, $2, $3, $4, $5)",
572 &[
573 &resource_id,
574 &resource_type,
575 &tenant_id,
576 &content.narrative,
577 &content.full_content,
578 ],
579 )
580 .await
581 .map_err(|e| internal_error(format!("Failed to insert FTS content: {}", e)))?;
582
583 Ok(())
584 }
585
586 async fn index_minimal_fallback(
590 &self,
591 client: &deadpool_postgres::Client,
592 tenant_id: &str,
593 resource_type: &str,
594 resource_id: &str,
595 resource: &Value,
596 ) -> StorageResult<()> {
597 if let Some(id) = resource.get("id").and_then(|v| v.as_str()) {
599 client
600 .execute(
601 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_token_code)
602 VALUES ($1, $2, $3, '_id', $4)",
603 &[&tenant_id, &resource_type, &resource_id, &id],
604 )
605 .await
606 .map_err(|e| internal_error(format!("Failed to insert _id index: {}", e)))?;
607 }
608
609 if let Some(last_updated) = resource
611 .get("meta")
612 .and_then(|m| m.get("lastUpdated"))
613 .and_then(|v| v.as_str())
614 {
615 let normalized = normalize_date_for_pg(last_updated);
616 client
617 .execute(
618 "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_date)
619 VALUES ($1, $2, $3, '_lastUpdated', $4::timestamptz)",
620 &[&tenant_id, &resource_type, &resource_id, &normalized],
621 )
622 .await
623 .map_err(|e| {
624 internal_error(format!("Failed to insert _lastUpdated index: {}", e))
625 })?;
626 }
627
628 Ok(())
629 }
630
631 pub(crate) async fn delete_search_index(
633 &self,
634 client: &deadpool_postgres::Client,
635 tenant_id: &str,
636 resource_type: &str,
637 resource_id: &str,
638 ) -> StorageResult<()> {
639 if self.is_search_offloaded() {
641 return Ok(());
642 }
643
644 client
646 .execute(
647 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
648 &[&tenant_id, &resource_type, &resource_id],
649 )
650 .await
651 .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
652
653 let _ = client
655 .execute(
656 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
657 &[&tenant_id, &resource_type, &resource_id],
658 )
659 .await;
660
661 Ok(())
662 }
663}
664
665impl PostgresBackend {
670 fn handle_search_parameter_create(&self, resource: &Value) -> StorageResult<()> {
676 let loader = SearchParameterLoader::new(self.config().fhir_version);
677
678 match loader.parse_resource(resource) {
679 Ok(def) => {
680 if def.status == SearchParameterStatus::Active {
682 let mut registry = self.search_registry().write();
683 if let Err(e) = registry.register(def) {
685 tracing::debug!("SearchParameter registration skipped: {}", e);
686 }
687 }
688 }
689 Err(e) => {
690 tracing::warn!("Failed to parse SearchParameter for registry: {}", e);
692 }
693 }
694
695 Ok(())
696 }
697
698 fn handle_search_parameter_update(
705 &self,
706 old_resource: &Value,
707 new_resource: &Value,
708 ) -> StorageResult<()> {
709 let loader = SearchParameterLoader::new(self.config().fhir_version);
710
711 let old_def = loader.parse_resource(old_resource).ok();
712 let new_def = loader.parse_resource(new_resource).ok();
713
714 match (old_def, new_def) {
715 (Some(old), Some(new)) => {
716 let mut registry = self.search_registry().write();
717
718 if old.url != new.url {
720 let _ = registry.unregister(&old.url);
721 if new.status == SearchParameterStatus::Active {
722 let _ = registry.register(new);
723 }
724 } else if old.status != new.status {
725 if let Err(e) = registry.update_status(&new.url, new.status) {
727 tracing::debug!("SearchParameter status update skipped: {}", e);
728 }
729 } else {
730 let _ = registry.unregister(&old.url);
732 if new.status == SearchParameterStatus::Active {
733 let _ = registry.register(new);
734 }
735 }
736 }
737 (None, Some(new)) => {
738 if new.status == SearchParameterStatus::Active {
740 let mut registry = self.search_registry().write();
741 let _ = registry.register(new);
742 }
743 }
744 (Some(old), None) => {
745 let mut registry = self.search_registry().write();
747 let _ = registry.unregister(&old.url);
748 }
749 (None, None) => {
750 }
752 }
753
754 Ok(())
755 }
756
757 fn handle_search_parameter_delete(&self, resource: &Value) -> StorageResult<()> {
762 if let Some(url) = resource.get("url").and_then(|v| v.as_str()) {
763 let mut registry = self.search_registry().write();
764 if let Err(e) = registry.unregister(url) {
765 tracing::debug!("SearchParameter unregistration skipped: {}", e);
766 }
767 }
768
769 Ok(())
770 }
771}
772
773#[async_trait]
778impl VersionedStorage for PostgresBackend {
779 async fn vread(
780 &self,
781 tenant: &TenantContext,
782 resource_type: &str,
783 id: &str,
784 version_id: &str,
785 ) -> StorageResult<Option<StoredResource>> {
786 let client = self.get_client().await?;
787 let tenant_id = tenant.tenant_id().as_str();
788
789 let row = client
790 .query_opt(
791 "SELECT data, last_updated, is_deleted, fhir_version
792 FROM resource_history
793 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
794 &[&tenant_id, &resource_type, &id, &version_id],
795 )
796 .await
797 .map_err(|e| internal_error(format!("Failed to read version: {}", e)))?;
798
799 match row {
800 Some(row) => {
801 let data: Value = row.get(0);
802 let last_updated: DateTime<Utc> = row.get(1);
803 let is_deleted: bool = row.get(2);
804 let fhir_version_str: String = row.get(3);
805
806 let deleted_at = if is_deleted { Some(last_updated) } else { None };
808
809 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
810
811 Ok(Some(StoredResource::from_storage(
812 resource_type,
813 id,
814 version_id,
815 tenant.tenant_id().clone(),
816 data,
817 last_updated,
818 last_updated,
819 deleted_at,
820 fhir_version,
821 )))
822 }
823 None => Ok(None),
824 }
825 }
826
827 async fn update_with_match(
828 &self,
829 tenant: &TenantContext,
830 resource_type: &str,
831 id: &str,
832 expected_version: &str,
833 resource: Value,
834 ) -> StorageResult<StoredResource> {
835 let current = self.read(tenant, resource_type, id).await?.ok_or_else(|| {
837 StorageError::Resource(ResourceError::NotFound {
838 resource_type: resource_type.to_string(),
839 id: id.to_string(),
840 })
841 })?;
842
843 if current.version_id() != expected_version {
845 return Err(StorageError::Concurrency(
846 ConcurrencyError::VersionConflict {
847 resource_type: resource_type.to_string(),
848 id: id.to_string(),
849 expected_version: expected_version.to_string(),
850 actual_version: current.version_id().to_string(),
851 },
852 ));
853 }
854
855 self.update(tenant, ¤t, resource).await
857 }
858
859 async fn delete_with_match(
860 &self,
861 tenant: &TenantContext,
862 resource_type: &str,
863 id: &str,
864 expected_version: &str,
865 ) -> StorageResult<()> {
866 let client = self.get_client().await?;
867 let tenant_id = tenant.tenant_id().as_str();
868
869 let row = client
871 .query_opt(
872 "SELECT version_id FROM resources
873 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
874 &[&tenant_id, &resource_type, &id],
875 )
876 .await
877 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
878
879 let current_version = match row {
880 Some(row) => row.get::<_, String>(0),
881 None => {
882 return Err(StorageError::Resource(ResourceError::NotFound {
883 resource_type: resource_type.to_string(),
884 id: id.to_string(),
885 }));
886 }
887 };
888
889 if current_version != expected_version {
890 return Err(StorageError::Concurrency(
891 ConcurrencyError::VersionConflict {
892 resource_type: resource_type.to_string(),
893 id: id.to_string(),
894 expected_version: expected_version.to_string(),
895 actual_version: current_version,
896 },
897 ));
898 }
899
900 self.delete(tenant, resource_type, id).await
902 }
903
904 async fn list_versions(
905 &self,
906 tenant: &TenantContext,
907 resource_type: &str,
908 id: &str,
909 ) -> StorageResult<Vec<String>> {
910 let client = self.get_client().await?;
911 let tenant_id = tenant.tenant_id().as_str();
912
913 let rows = client
914 .query(
915 "SELECT version_id FROM resource_history
916 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3
917 ORDER BY CAST(version_id AS INTEGER) ASC",
918 &[&tenant_id, &resource_type, &id],
919 )
920 .await
921 .map_err(|e| internal_error(format!("Failed to list versions: {}", e)))?;
922
923 let versions: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
924 Ok(versions)
925 }
926}
927
928#[async_trait]
933impl InstanceHistoryProvider for PostgresBackend {
934 async fn history_instance(
935 &self,
936 tenant: &TenantContext,
937 resource_type: &str,
938 id: &str,
939 params: &HistoryParams,
940 ) -> StorageResult<HistoryPage> {
941 let client = self.get_client().await?;
942 let tenant_id = tenant.tenant_id().as_str();
943
944 let mut sql = String::from(
946 "SELECT version_id, data, last_updated, is_deleted, fhir_version
947 FROM resource_history
948 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
949 );
950 let mut param_index: usize = 4;
951 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
952 Box::new(tenant_id.to_string()),
953 Box::new(resource_type.to_string()),
954 Box::new(id.to_string()),
955 ];
956
957 if !params.include_deleted {
959 sql.push_str(" AND is_deleted = FALSE");
960 }
961
962 if let Some(since) = ¶ms.since {
964 sql.push_str(&format!(" AND last_updated >= ${}", param_index));
965 query_params.push(Box::new(*since));
966 param_index += 1;
967 }
968
969 if let Some(before) = ¶ms.before {
971 sql.push_str(&format!(" AND last_updated < ${}", param_index));
972 query_params.push(Box::new(*before));
973 param_index += 1;
974 }
975
976 if let Some(cursor) = params.pagination.cursor_value() {
978 if let Some(CursorValue::String(version_str)) = cursor.sort_values().first() {
979 if let Ok(version_int) = version_str.parse::<i64>() {
980 sql.push_str(&format!(
981 " AND CAST(version_id AS INTEGER) < ${}",
982 param_index
983 ));
984 query_params.push(Box::new(version_int));
985 param_index += 1;
986 }
987 }
988 }
989
990 let limit = params.pagination.count as i64 + 1; sql.push_str(&format!(
993 " ORDER BY CAST(version_id AS INTEGER) DESC LIMIT ${}",
994 param_index
995 ));
996 query_params.push(Box::new(limit));
997
998 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1000 .iter()
1001 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1002 .collect();
1003
1004 let rows = client
1005 .query(&sql, ¶m_refs)
1006 .await
1007 .map_err(|e| internal_error(format!("Failed to query history: {}", e)))?;
1008
1009 let mut entries = Vec::new();
1010 let mut last_version: Option<String> = None;
1011
1012 for row in &rows {
1013 if entries.len() >= params.pagination.count as usize {
1015 break;
1016 }
1017
1018 let version_id: String = row.get(0);
1019 let data: Value = row.get(1);
1020 let last_updated: DateTime<Utc> = row.get(2);
1021 let is_deleted: bool = row.get(3);
1022 let fhir_version_str: String = row.get(4);
1023
1024 let deleted_at = if is_deleted { Some(last_updated) } else { None };
1025
1026 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1027
1028 let resource = StoredResource::from_storage(
1029 resource_type,
1030 id,
1031 &version_id,
1032 tenant.tenant_id().clone(),
1033 data,
1034 last_updated,
1035 last_updated,
1036 deleted_at,
1037 fhir_version,
1038 );
1039
1040 let method = if is_deleted {
1042 HistoryMethod::Delete
1043 } else if version_id == "1" {
1044 HistoryMethod::Post
1045 } else {
1046 HistoryMethod::Put
1047 };
1048
1049 last_version = Some(version_id);
1050
1051 entries.push(HistoryEntry {
1052 resource,
1053 method,
1054 timestamp: last_updated,
1055 });
1056 }
1057
1058 let has_more = rows.len() > params.pagination.count as usize;
1060
1061 let page_info = if let (true, Some(version)) = (has_more, last_version) {
1063 let cursor = PageCursor::new(vec![CursorValue::String(version)], id.to_string());
1064 PageInfo::with_next(cursor)
1065 } else {
1066 PageInfo::end()
1067 };
1068
1069 Ok(Page::new(entries, page_info))
1070 }
1071
1072 async fn history_instance_count(
1073 &self,
1074 tenant: &TenantContext,
1075 resource_type: &str,
1076 id: &str,
1077 ) -> StorageResult<u64> {
1078 let client = self.get_client().await?;
1079 let tenant_id = tenant.tenant_id().as_str();
1080
1081 let row = client
1082 .query_one(
1083 "SELECT COUNT(*) FROM resource_history
1084 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1085 &[&tenant_id, &resource_type, &id],
1086 )
1087 .await
1088 .map_err(|e| internal_error(format!("Failed to count history: {}", e)))?;
1089
1090 let count: i64 = row.get(0);
1091 Ok(count as u64)
1092 }
1093
1094 async fn delete_instance_history(
1095 &self,
1096 tenant: &TenantContext,
1097 resource_type: &str,
1098 id: &str,
1099 ) -> StorageResult<u64> {
1100 let client = self.get_client().await?;
1101 let tenant_id = tenant.tenant_id().as_str();
1102
1103 let exists = client
1105 .query_opt(
1106 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1107 &[&tenant_id, &resource_type, &id],
1108 )
1109 .await
1110 .map_err(|e| internal_error(format!("Failed to check resource existence: {}", e)))?;
1111
1112 if exists.is_none() {
1113 return Err(StorageError::Resource(ResourceError::NotFound {
1114 resource_type: resource_type.to_string(),
1115 id: id.to_string(),
1116 }));
1117 }
1118
1119 let current_row = client
1121 .query_one(
1122 "SELECT version_id FROM resources
1123 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1124 &[&tenant_id, &resource_type, &id],
1125 )
1126 .await
1127 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1128
1129 let current_version: String = current_row.get(0);
1130
1131 let deleted = client
1133 .execute(
1134 "DELETE FROM resource_history
1135 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id != $4",
1136 &[&tenant_id, &resource_type, &id, ¤t_version],
1137 )
1138 .await
1139 .map_err(|e| internal_error(format!("Failed to delete history: {}", e)))?;
1140
1141 Ok(deleted)
1142 }
1143
1144 async fn delete_version(
1145 &self,
1146 tenant: &TenantContext,
1147 resource_type: &str,
1148 id: &str,
1149 version_id: &str,
1150 ) -> StorageResult<()> {
1151 let client = self.get_client().await?;
1152 let tenant_id = tenant.tenant_id().as_str();
1153
1154 let current_row = client
1156 .query_opt(
1157 "SELECT version_id FROM resources
1158 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1159 &[&tenant_id, &resource_type, &id],
1160 )
1161 .await
1162 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1163
1164 let current_version = match current_row {
1165 Some(row) => row.get::<_, String>(0),
1166 None => {
1167 return Err(StorageError::Resource(ResourceError::NotFound {
1168 resource_type: resource_type.to_string(),
1169 id: id.to_string(),
1170 }));
1171 }
1172 };
1173
1174 if version_id == current_version {
1176 return Err(StorageError::Validation(
1177 crate::error::ValidationError::InvalidResource {
1178 message: format!(
1179 "Cannot delete current version {} of {}/{}. Use DELETE on the resource instead.",
1180 version_id, resource_type, id
1181 ),
1182 details: vec![],
1183 },
1184 ));
1185 }
1186
1187 let version_exists = client
1189 .query_opt(
1190 "SELECT 1 FROM resource_history
1191 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1192 &[&tenant_id, &resource_type, &id, &version_id],
1193 )
1194 .await
1195 .map_err(|e| internal_error(format!("Failed to check version existence: {}", e)))?;
1196
1197 if version_exists.is_none() {
1198 return Err(StorageError::Resource(ResourceError::VersionNotFound {
1199 resource_type: resource_type.to_string(),
1200 id: id.to_string(),
1201 version_id: version_id.to_string(),
1202 }));
1203 }
1204
1205 client
1207 .execute(
1208 "DELETE FROM resource_history
1209 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1210 &[&tenant_id, &resource_type, &id, &version_id],
1211 )
1212 .await
1213 .map_err(|e| internal_error(format!("Failed to delete version: {}", e)))?;
1214
1215 Ok(())
1216 }
1217}
1218
1219#[async_trait]
1224impl TypeHistoryProvider for PostgresBackend {
1225 async fn history_type(
1226 &self,
1227 tenant: &TenantContext,
1228 resource_type: &str,
1229 params: &HistoryParams,
1230 ) -> StorageResult<HistoryPage> {
1231 let client = self.get_client().await?;
1232 let tenant_id = tenant.tenant_id().as_str();
1233
1234 let mut sql = String::from(
1236 "SELECT id, version_id, data, last_updated, is_deleted, fhir_version
1237 FROM resource_history
1238 WHERE tenant_id = $1 AND resource_type = $2",
1239 );
1240 let mut param_index: usize = 3;
1241 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1242 Box::new(tenant_id.to_string()),
1243 Box::new(resource_type.to_string()),
1244 ];
1245
1246 if !params.include_deleted {
1248 sql.push_str(" AND is_deleted = FALSE");
1249 }
1250
1251 if let Some(since) = ¶ms.since {
1253 sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1254 query_params.push(Box::new(*since));
1255 param_index += 1;
1256 }
1257
1258 if let Some(before) = ¶ms.before {
1260 sql.push_str(&format!(" AND last_updated < ${}", param_index));
1261 query_params.push(Box::new(*before));
1262 param_index += 1;
1263 }
1264
1265 if let Some(cursor) = params.pagination.cursor_value() {
1267 let sort_values = cursor.sort_values();
1268 if sort_values.len() >= 2 {
1269 if let (
1270 Some(CursorValue::String(timestamp)),
1271 Some(CursorValue::String(resource_id)),
1272 ) = (sort_values.first(), sort_values.get(1))
1273 {
1274 sql.push_str(&format!(
1275 " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND id < ${}))",
1276 param_index, param_index, param_index + 1
1277 ));
1278 query_params.push(Box::new(timestamp.clone()));
1279 query_params.push(Box::new(resource_id.clone()));
1280 param_index += 2;
1281 }
1282 }
1283 }
1284
1285 let limit = params.pagination.count as i64 + 1;
1287 sql.push_str(&format!(
1288 " ORDER BY last_updated DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1289 param_index
1290 ));
1291 query_params.push(Box::new(limit));
1292
1293 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1294 .iter()
1295 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1296 .collect();
1297
1298 let rows = client
1299 .query(&sql, ¶m_refs)
1300 .await
1301 .map_err(|e| internal_error(format!("Failed to query type history: {}", e)))?;
1302
1303 let mut entries = Vec::new();
1304 let mut last_entry: Option<(String, String)> = None; for row in &rows {
1307 if entries.len() >= params.pagination.count as usize {
1308 break;
1309 }
1310
1311 let row_id: String = row.get(0);
1312 let version_id: String = row.get(1);
1313 let data: Value = row.get(2);
1314 let last_updated: DateTime<Utc> = row.get(3);
1315 let is_deleted: bool = row.get(4);
1316 let fhir_version_str: String = row.get(5);
1317
1318 let deleted_at = if is_deleted { Some(last_updated) } else { None };
1319
1320 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1321
1322 let resource = StoredResource::from_storage(
1323 resource_type,
1324 &row_id,
1325 &version_id,
1326 tenant.tenant_id().clone(),
1327 data,
1328 last_updated,
1329 last_updated,
1330 deleted_at,
1331 fhir_version,
1332 );
1333
1334 let method = if is_deleted {
1335 HistoryMethod::Delete
1336 } else if version_id == "1" {
1337 HistoryMethod::Post
1338 } else {
1339 HistoryMethod::Put
1340 };
1341
1342 last_entry = Some((last_updated.to_rfc3339(), row_id));
1343
1344 entries.push(HistoryEntry {
1345 resource,
1346 method,
1347 timestamp: last_updated,
1348 });
1349 }
1350
1351 let has_more = rows.len() > params.pagination.count as usize;
1353
1354 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1356 let cursor = PageCursor::new(
1357 vec![CursorValue::String(timestamp), CursorValue::String(id)],
1358 resource_type.to_string(),
1359 );
1360 PageInfo::with_next(cursor)
1361 } else {
1362 PageInfo::end()
1363 };
1364
1365 Ok(Page::new(entries, page_info))
1366 }
1367
1368 async fn history_type_count(
1369 &self,
1370 tenant: &TenantContext,
1371 resource_type: &str,
1372 ) -> StorageResult<u64> {
1373 let client = self.get_client().await?;
1374 let tenant_id = tenant.tenant_id().as_str();
1375
1376 let row = client
1377 .query_one(
1378 "SELECT COUNT(*) FROM resource_history
1379 WHERE tenant_id = $1 AND resource_type = $2",
1380 &[&tenant_id, &resource_type],
1381 )
1382 .await
1383 .map_err(|e| internal_error(format!("Failed to count type history: {}", e)))?;
1384
1385 let count: i64 = row.get(0);
1386 Ok(count as u64)
1387 }
1388}
1389
1390#[async_trait]
1395impl SystemHistoryProvider for PostgresBackend {
1396 async fn history_system(
1397 &self,
1398 tenant: &TenantContext,
1399 params: &HistoryParams,
1400 ) -> StorageResult<HistoryPage> {
1401 let client = self.get_client().await?;
1402 let tenant_id = tenant.tenant_id().as_str();
1403
1404 let mut sql = String::from(
1406 "SELECT resource_type, id, version_id, data, last_updated, is_deleted, fhir_version
1407 FROM resource_history
1408 WHERE tenant_id = $1",
1409 );
1410 let mut param_index: usize = 2;
1411 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1412 vec![Box::new(tenant_id.to_string())];
1413
1414 if !params.include_deleted {
1416 sql.push_str(" AND is_deleted = FALSE");
1417 }
1418
1419 if let Some(since) = ¶ms.since {
1421 sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1422 query_params.push(Box::new(*since));
1423 param_index += 1;
1424 }
1425
1426 if let Some(before) = ¶ms.before {
1428 sql.push_str(&format!(" AND last_updated < ${}", param_index));
1429 query_params.push(Box::new(*before));
1430 param_index += 1;
1431 }
1432
1433 if let Some(cursor) = params.pagination.cursor_value() {
1435 let sort_values = cursor.sort_values();
1436 if sort_values.len() >= 3 {
1437 if let (
1438 Some(CursorValue::String(timestamp)),
1439 Some(CursorValue::String(res_type)),
1440 Some(CursorValue::String(res_id)),
1441 ) = (sort_values.first(), sort_values.get(1), sort_values.get(2))
1442 {
1443 sql.push_str(&format!(
1444 " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND (resource_type < ${} OR (resource_type = ${} AND id < ${}))))",
1445 param_index, param_index, param_index + 1, param_index + 1, param_index + 2
1446 ));
1447 query_params.push(Box::new(timestamp.clone()));
1448 query_params.push(Box::new(res_type.clone()));
1449 query_params.push(Box::new(res_id.clone()));
1450 param_index += 3;
1451 }
1452 }
1453 }
1454
1455 let limit = params.pagination.count as i64 + 1;
1457 sql.push_str(&format!(
1458 " ORDER BY last_updated DESC, resource_type DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1459 param_index
1460 ));
1461 query_params.push(Box::new(limit));
1462
1463 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1464 .iter()
1465 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1466 .collect();
1467
1468 let rows = client
1469 .query(&sql, ¶m_refs)
1470 .await
1471 .map_err(|e| internal_error(format!("Failed to query system history: {}", e)))?;
1472
1473 let mut entries = Vec::new();
1474 let mut last_entry: Option<(String, String, String)> = None;
1475
1476 for row in &rows {
1477 if entries.len() >= params.pagination.count as usize {
1478 break;
1479 }
1480
1481 let row_resource_type: String = row.get(0);
1482 let row_id: String = row.get(1);
1483 let version_id: String = row.get(2);
1484 let data: Value = row.get(3);
1485 let last_updated: DateTime<Utc> = row.get(4);
1486 let is_deleted: bool = row.get(5);
1487 let fhir_version_str: String = row.get(6);
1488
1489 let deleted_at = if is_deleted { Some(last_updated) } else { None };
1490
1491 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1492
1493 let resource = StoredResource::from_storage(
1494 &row_resource_type,
1495 &row_id,
1496 &version_id,
1497 tenant.tenant_id().clone(),
1498 data,
1499 last_updated,
1500 last_updated,
1501 deleted_at,
1502 fhir_version,
1503 );
1504
1505 let method = if is_deleted {
1506 HistoryMethod::Delete
1507 } else if version_id == "1" {
1508 HistoryMethod::Post
1509 } else {
1510 HistoryMethod::Put
1511 };
1512
1513 last_entry = Some((last_updated.to_rfc3339(), row_resource_type, row_id));
1514
1515 entries.push(HistoryEntry {
1516 resource,
1517 method,
1518 timestamp: last_updated,
1519 });
1520 }
1521
1522 let has_more = rows.len() > params.pagination.count as usize;
1523
1524 let page_info = if let (true, Some((timestamp, resource_type, id))) = (has_more, last_entry)
1525 {
1526 let cursor = PageCursor::new(
1527 vec![
1528 CursorValue::String(timestamp),
1529 CursorValue::String(resource_type),
1530 CursorValue::String(id),
1531 ],
1532 "system".to_string(),
1533 );
1534 PageInfo::with_next(cursor)
1535 } else {
1536 PageInfo::end()
1537 };
1538
1539 Ok(Page::new(entries, page_info))
1540 }
1541
1542 async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1543 let client = self.get_client().await?;
1544 let tenant_id = tenant.tenant_id().as_str();
1545
1546 let row = client
1547 .query_one(
1548 "SELECT COUNT(*) FROM resource_history WHERE tenant_id = $1",
1549 &[&tenant_id],
1550 )
1551 .await
1552 .map_err(|e| internal_error(format!("Failed to count system history: {}", e)))?;
1553
1554 let count: i64 = row.get(0);
1555 Ok(count as u64)
1556 }
1557}
1558
1559#[async_trait]
1564impl DifferentialHistoryProvider for PostgresBackend {
1565 async fn modified_since(
1566 &self,
1567 tenant: &TenantContext,
1568 resource_type: Option<&str>,
1569 since: DateTime<Utc>,
1570 pagination: &Pagination,
1571 ) -> StorageResult<Page<StoredResource>> {
1572 let client = self.get_client().await?;
1573 let tenant_id = tenant.tenant_id().as_str();
1574
1575 let mut sql = String::from(
1577 "SELECT resource_type, id, version_id, data, last_updated, fhir_version
1578 FROM resources
1579 WHERE tenant_id = $1 AND last_updated > $2 AND is_deleted = FALSE",
1580 );
1581 let mut param_index: usize = 3;
1582 let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1583 vec![Box::new(tenant_id.to_string()), Box::new(since)];
1584
1585 if let Some(rt) = resource_type {
1587 sql.push_str(&format!(" AND resource_type = ${}", param_index));
1588 query_params.push(Box::new(rt.to_string()));
1589 param_index += 1;
1590 }
1591
1592 if let Some(cursor) = pagination.cursor_value() {
1594 let sort_values = cursor.sort_values();
1595 if sort_values.len() >= 2 {
1596 if let (Some(CursorValue::String(timestamp)), Some(CursorValue::String(res_id))) =
1597 (sort_values.first(), sort_values.get(1))
1598 {
1599 sql.push_str(&format!(
1600 " AND (last_updated > ${}::timestamptz OR (last_updated = ${}::timestamptz AND id > ${}))",
1601 param_index, param_index, param_index + 1
1602 ));
1603 query_params.push(Box::new(timestamp.clone()));
1604 query_params.push(Box::new(res_id.clone()));
1605 param_index += 2;
1606 }
1607 }
1608 }
1609
1610 let limit = pagination.count as i64 + 1;
1612 sql.push_str(&format!(
1613 " ORDER BY last_updated ASC, id ASC LIMIT ${}",
1614 param_index
1615 ));
1616 query_params.push(Box::new(limit));
1617
1618 let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1619 .iter()
1620 .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1621 .collect();
1622
1623 let rows = client
1624 .query(&sql, ¶m_refs)
1625 .await
1626 .map_err(|e| internal_error(format!("Failed to query modified resources: {}", e)))?;
1627
1628 let mut resources = Vec::new();
1629 let mut last_entry: Option<(String, String)> = None;
1630
1631 for row in &rows {
1632 if resources.len() >= pagination.count as usize {
1633 break;
1634 }
1635
1636 let row_resource_type: String = row.get(0);
1637 let row_id: String = row.get(1);
1638 let version_id: String = row.get(2);
1639 let data: Value = row.get(3);
1640 let last_updated: DateTime<Utc> = row.get(4);
1641 let fhir_version_str: String = row.get(5);
1642
1643 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1644
1645 let resource = StoredResource::from_storage(
1646 &row_resource_type,
1647 &row_id,
1648 &version_id,
1649 tenant.tenant_id().clone(),
1650 data,
1651 last_updated,
1652 last_updated,
1653 None,
1654 fhir_version,
1655 );
1656
1657 last_entry = Some((last_updated.to_rfc3339(), row_id));
1658 resources.push(resource);
1659 }
1660
1661 let has_more = rows.len() > pagination.count as usize;
1662
1663 let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1664 let cursor = PageCursor::new(
1665 vec![CursorValue::String(timestamp), CursorValue::String(id)],
1666 "modified_since".to_string(),
1667 );
1668 PageInfo::with_next(cursor)
1669 } else {
1670 PageInfo::end()
1671 };
1672
1673 Ok(Page::new(resources, page_info))
1674 }
1675}
1676
1677#[async_trait]
1682impl PurgableStorage for PostgresBackend {
1683 async fn purge(
1684 &self,
1685 tenant: &TenantContext,
1686 resource_type: &str,
1687 id: &str,
1688 ) -> StorageResult<()> {
1689 let client = self.get_client().await?;
1690 let tenant_id = tenant.tenant_id().as_str();
1691
1692 let exists = client
1694 .query_opt(
1695 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1696 &[&tenant_id, &resource_type, &id],
1697 )
1698 .await
1699 .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
1700
1701 if exists.is_none() {
1702 let history_exists = client
1704 .query_opt(
1705 "SELECT 1 FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1706 &[&tenant_id, &resource_type, &id],
1707 )
1708 .await
1709 .map_err(|e| internal_error(format!("Failed to check history: {}", e)))?;
1710
1711 if history_exists.is_none() {
1712 return Err(StorageError::Resource(ResourceError::NotFound {
1713 resource_type: resource_type.to_string(),
1714 id: id.to_string(),
1715 }));
1716 }
1717 }
1718
1719 client
1721 .execute(
1722 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1723 &[&tenant_id, &resource_type, &id],
1724 )
1725 .await
1726 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1727
1728 let _ = client
1730 .execute(
1731 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1732 &[&tenant_id, &resource_type, &id],
1733 )
1734 .await;
1735
1736 client
1738 .execute(
1739 "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1740 &[&tenant_id, &resource_type, &id],
1741 )
1742 .await
1743 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1744
1745 client
1747 .execute(
1748 "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1749 &[&tenant_id, &resource_type, &id],
1750 )
1751 .await
1752 .map_err(|e| internal_error(format!("Failed to purge resource: {}", e)))?;
1753
1754 Ok(())
1755 }
1756
1757 async fn purge_all(&self, tenant: &TenantContext, resource_type: &str) -> StorageResult<u64> {
1758 let client = self.get_client().await?;
1759 let tenant_id = tenant.tenant_id().as_str();
1760
1761 let row = client
1763 .query_one(
1764 "SELECT COUNT(DISTINCT id) FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1765 &[&tenant_id, &resource_type],
1766 )
1767 .await
1768 .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1769 let count: i64 = row.get(0);
1770
1771 client
1773 .execute(
1774 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2",
1775 &[&tenant_id, &resource_type],
1776 )
1777 .await
1778 .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1779
1780 let _ = client
1782 .execute(
1783 "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2",
1784 &[&tenant_id, &resource_type],
1785 )
1786 .await;
1787
1788 client
1790 .execute(
1791 "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2",
1792 &[&tenant_id, &resource_type],
1793 )
1794 .await
1795 .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1796
1797 client
1799 .execute(
1800 "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1801 &[&tenant_id, &resource_type],
1802 )
1803 .await
1804 .map_err(|e| internal_error(format!("Failed to purge resources: {}", e)))?;
1805
1806 Ok(count as u64)
1807 }
1808}
1809
1810fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
1817 params
1818 .split('&')
1819 .filter_map(|pair| {
1820 let parts: Vec<&str> = pair.splitn(2, '=').collect();
1821 if parts.len() == 2 {
1822 Some((parts[0].to_string(), parts[1].to_string()))
1823 } else {
1824 None
1825 }
1826 })
1827 .collect()
1828}
1829
1830#[async_trait]
1831impl ConditionalStorage for PostgresBackend {
1832 async fn conditional_create(
1833 &self,
1834 tenant: &TenantContext,
1835 resource_type: &str,
1836 resource: Value,
1837 search_params: &str,
1838 fhir_version: FhirVersion,
1839 ) -> StorageResult<ConditionalCreateResult> {
1840 let matches = self
1842 .find_matching_resources(tenant, resource_type, search_params)
1843 .await?;
1844
1845 match matches.len() {
1846 0 => {
1847 let created = self
1849 .create(tenant, resource_type, resource, fhir_version)
1850 .await?;
1851 Ok(ConditionalCreateResult::Created(created))
1852 }
1853 1 => {
1854 Ok(ConditionalCreateResult::Exists(
1856 matches.into_iter().next().unwrap(),
1857 ))
1858 }
1859 n => {
1860 Ok(ConditionalCreateResult::MultipleMatches(n))
1862 }
1863 }
1864 }
1865
1866 async fn conditional_update(
1867 &self,
1868 tenant: &TenantContext,
1869 resource_type: &str,
1870 resource: Value,
1871 search_params: &str,
1872 upsert: bool,
1873 fhir_version: FhirVersion,
1874 ) -> StorageResult<ConditionalUpdateResult> {
1875 let matches = self
1877 .find_matching_resources(tenant, resource_type, search_params)
1878 .await?;
1879
1880 match matches.len() {
1881 0 => {
1882 if upsert {
1883 let created = self
1885 .create(tenant, resource_type, resource, fhir_version)
1886 .await?;
1887 Ok(ConditionalUpdateResult::Created(created))
1888 } else {
1889 Ok(ConditionalUpdateResult::NoMatch)
1891 }
1892 }
1893 1 => {
1894 let existing = matches.into_iter().next().unwrap();
1896 let updated = self.update(tenant, &existing, resource).await?;
1897 Ok(ConditionalUpdateResult::Updated(updated))
1898 }
1899 n => {
1900 Ok(ConditionalUpdateResult::MultipleMatches(n))
1902 }
1903 }
1904 }
1905
1906 async fn conditional_delete(
1907 &self,
1908 tenant: &TenantContext,
1909 resource_type: &str,
1910 search_params: &str,
1911 ) -> StorageResult<ConditionalDeleteResult> {
1912 let matches = self
1914 .find_matching_resources(tenant, resource_type, search_params)
1915 .await?;
1916
1917 match matches.len() {
1918 0 => {
1919 Ok(ConditionalDeleteResult::NoMatch)
1921 }
1922 1 => {
1923 let existing = matches.into_iter().next().unwrap();
1925 self.delete(tenant, resource_type, existing.id()).await?;
1926 Ok(ConditionalDeleteResult::Deleted)
1927 }
1928 n => {
1929 Ok(ConditionalDeleteResult::MultipleMatches(n))
1931 }
1932 }
1933 }
1934
1935 async fn conditional_patch(
1936 &self,
1937 tenant: &TenantContext,
1938 resource_type: &str,
1939 search_params: &str,
1940 patch: &crate::core::PatchFormat,
1941 ) -> StorageResult<crate::core::ConditionalPatchResult> {
1942 use crate::core::{ConditionalPatchResult, PatchFormat};
1943
1944 let matches = self
1946 .find_matching_resources(tenant, resource_type, search_params)
1947 .await?;
1948
1949 match matches.len() {
1950 0 => Ok(ConditionalPatchResult::NoMatch),
1951 1 => {
1952 let existing = matches.into_iter().next().unwrap();
1954 let current_content = existing.content().clone();
1955
1956 let patched_content = match patch {
1958 PatchFormat::JsonPatch(patch_doc) => {
1959 self.apply_json_patch(¤t_content, patch_doc)?
1960 }
1961 PatchFormat::FhirPathPatch(patch_params) => {
1962 self.apply_fhirpath_patch(¤t_content, patch_params)?
1963 }
1964 PatchFormat::MergePatch(merge_doc) => {
1965 self.apply_merge_patch(¤t_content, merge_doc)
1966 }
1967 };
1968
1969 let updated = self.update(tenant, &existing, patched_content).await?;
1971 Ok(ConditionalPatchResult::Patched(updated))
1972 }
1973 n => Ok(ConditionalPatchResult::MultipleMatches(n)),
1974 }
1975 }
1976}
1977
1978impl PostgresBackend {
1979 async fn find_matching_resources(
1983 &self,
1984 tenant: &TenantContext,
1985 resource_type: &str,
1986 search_params_str: &str,
1987 ) -> StorageResult<Vec<StoredResource>> {
1988 let parsed_params = parse_simple_search_params(search_params_str);
1990
1991 if parsed_params.is_empty() {
1992 return Ok(Vec::new());
1994 }
1995
1996 let search_params = self.build_search_parameters(resource_type, &parsed_params)?;
1998
1999 let query = SearchQuery {
2001 resource_type: resource_type.to_string(),
2002 parameters: search_params,
2003 count: Some(1000),
2004 ..Default::default()
2005 };
2006
2007 let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
2009
2010 Ok(result.resources.items)
2011 }
2012
2013 fn build_search_parameters(
2015 &self,
2016 resource_type: &str,
2017 params: &[(String, String)],
2018 ) -> StorageResult<Vec<SearchParameter>> {
2019 let registry = self.search_registry().read();
2020 let mut search_params = Vec::with_capacity(params.len());
2021
2022 for (name, value) in params {
2023 let param_type = self
2024 .lookup_param_type(®istry, resource_type, name)
2025 .unwrap_or({
2026 match name.as_str() {
2027 "_id" => SearchParamType::Token,
2028 "_lastUpdated" => SearchParamType::Date,
2029 "_tag" | "_profile" | "_security" => SearchParamType::Token,
2030 "identifier" => SearchParamType::Token,
2031 "patient" | "subject" | "encounter" | "performer" | "author"
2032 | "requester" | "recorder" | "asserter" | "practitioner"
2033 | "organization" | "location" | "device" => SearchParamType::Reference,
2034 _ => SearchParamType::String,
2035 }
2036 });
2037
2038 search_params.push(SearchParameter {
2039 name: name.clone(),
2040 param_type,
2041 modifier: None,
2042 values: vec![SearchValue::parse(value)],
2043 chain: vec![],
2044 components: vec![],
2045 });
2046 }
2047
2048 Ok(search_params)
2049 }
2050
2051 fn lookup_param_type(
2053 &self,
2054 registry: &crate::search::SearchParameterRegistry,
2055 resource_type: &str,
2056 param_name: &str,
2057 ) -> Option<SearchParamType> {
2058 if let Some(def) = registry.get_param(resource_type, param_name) {
2059 return Some(def.param_type);
2060 }
2061 if let Some(def) = registry.get_param("Resource", param_name) {
2062 return Some(def.param_type);
2063 }
2064 None
2065 }
2066
2067 fn apply_json_patch(&self, resource: &Value, patch_doc: &Value) -> StorageResult<Value> {
2073 use crate::error::ValidationError;
2074
2075 let patch: json_patch::Patch = serde_json::from_value(patch_doc.clone()).map_err(|e| {
2076 StorageError::Validation(ValidationError::InvalidResource {
2077 message: format!("Invalid JSON Patch document: {}", e),
2078 details: vec![],
2079 })
2080 })?;
2081
2082 let mut patched = resource.clone();
2083 json_patch::patch(&mut patched, &patch).map_err(|e| {
2084 StorageError::Validation(ValidationError::InvalidResource {
2085 message: format!("Failed to apply JSON Patch: {}", e),
2086 details: vec![],
2087 })
2088 })?;
2089
2090 Ok(patched)
2091 }
2092
2093 fn apply_fhirpath_patch(&self, resource: &Value, patch_params: &Value) -> StorageResult<Value> {
2095 use crate::error::ValidationError;
2096
2097 let parameter = patch_params.get("parameter").and_then(|p| p.as_array());
2098 if parameter.is_none() {
2099 return Err(StorageError::Validation(ValidationError::InvalidResource {
2100 message: "FHIRPath Patch must have a 'parameter' array".to_string(),
2101 details: vec![],
2102 }));
2103 }
2104
2105 let mut patched = resource.clone();
2106
2107 for operation in parameter.unwrap() {
2108 let parts = operation.get("part").and_then(|p| p.as_array());
2109 if parts.is_none() {
2110 continue;
2111 }
2112
2113 let mut op_type = None;
2114 let mut op_path = None;
2115 let mut op_name = None;
2116 let mut op_value = None;
2117
2118 for part in parts.unwrap() {
2119 match part.get("name").and_then(|n| n.as_str()) {
2120 Some("type") => {
2121 op_type = part
2122 .get("valueCode")
2123 .and_then(|v| v.as_str())
2124 .map(|s| s.to_string());
2125 }
2126 Some("path") => {
2127 op_path = part
2128 .get("valueString")
2129 .and_then(|v| v.as_str())
2130 .map(|s| s.to_string());
2131 }
2132 Some("name") => {
2133 op_name = part
2134 .get("valueString")
2135 .and_then(|v| v.as_str())
2136 .map(|s| s.to_string());
2137 }
2138 Some("value") => {
2139 op_value = part
2140 .get("valueString")
2141 .or_else(|| part.get("valueBoolean"))
2142 .or_else(|| part.get("valueInteger"))
2143 .or_else(|| part.get("valueDecimal"))
2144 .or_else(|| part.get("valueCode"))
2145 .cloned();
2146 }
2147 _ => {}
2148 }
2149 }
2150
2151 match op_type.as_deref() {
2152 Some("replace") => {
2153 if let (Some(path), Some(value)) = (&op_path, &op_value) {
2154 self.fhirpath_replace(&mut patched, path, value)?;
2155 }
2156 }
2157 Some("add") => {
2158 if let (Some(path), Some(name), Some(value)) = (&op_path, &op_name, &op_value) {
2159 self.fhirpath_add(&mut patched, path, name, value)?;
2160 }
2161 }
2162 Some("delete") => {
2163 if let Some(path) = &op_path {
2164 self.fhirpath_delete(&mut patched, path)?;
2165 }
2166 }
2167 _ => {
2168 }
2170 }
2171 }
2172
2173 Ok(patched)
2174 }
2175
2176 fn fhirpath_replace(
2178 &self,
2179 resource: &mut Value,
2180 path: &str,
2181 value: &Value,
2182 ) -> StorageResult<()> {
2183 let parts: Vec<&str> = path.split('.').collect();
2184 if parts.len() == 2 {
2185 if let Some(obj) = resource.as_object_mut() {
2186 obj.insert(parts[1].to_string(), value.clone());
2187 }
2188 }
2189 Ok(())
2190 }
2191
2192 fn fhirpath_add(
2194 &self,
2195 resource: &mut Value,
2196 path: &str,
2197 name: &str,
2198 value: &Value,
2199 ) -> StorageResult<()> {
2200 let parts: Vec<&str> = path.split('.').collect();
2201 if parts.len() == 1
2202 && parts[0]
2203 == resource
2204 .get("resourceType")
2205 .and_then(|r| r.as_str())
2206 .unwrap_or("")
2207 {
2208 if let Some(obj) = resource.as_object_mut() {
2209 obj.insert(name.to_string(), value.clone());
2210 }
2211 }
2212 Ok(())
2213 }
2214
2215 fn fhirpath_delete(&self, resource: &mut Value, path: &str) -> StorageResult<()> {
2217 let parts: Vec<&str> = path.split('.').collect();
2218 if parts.len() == 2 {
2219 if let Some(obj) = resource.as_object_mut() {
2220 obj.remove(parts[1]);
2221 }
2222 }
2223 Ok(())
2224 }
2225
2226 fn apply_merge_patch(&self, resource: &Value, merge_doc: &Value) -> Value {
2228 let mut patched = resource.clone();
2229 json_patch::merge(&mut patched, merge_doc);
2230 patched
2231 }
2232}
2233
2234#[async_trait]
2239impl BundleProvider for PostgresBackend {
2240 async fn process_transaction(
2241 &self,
2242 tenant: &TenantContext,
2243 entries: Vec<BundleEntry>,
2244 ) -> Result<BundleResult, TransactionError> {
2245 use crate::core::transaction::{Transaction, TransactionOptions, TransactionProvider};
2246 use std::collections::HashMap;
2247
2248 let mut tx = self
2250 .begin_transaction(tenant, TransactionOptions::new())
2251 .await
2252 .map_err(|e| TransactionError::RolledBack {
2253 reason: format!("Failed to begin transaction: {}", e),
2254 })?;
2255
2256 let mut results = Vec::with_capacity(entries.len());
2257 let mut error_info: Option<(usize, String)> = None;
2258
2259 let mut reference_map: HashMap<String, String> = HashMap::new();
2261
2262 let mut entries = entries;
2264
2265 for (idx, entry) in entries.iter_mut().enumerate() {
2267 if let Some(ref mut resource) = entry.resource {
2269 resolve_bundle_references(resource, &reference_map);
2270 }
2271
2272 let result = self.process_bundle_entry_tx(&mut tx, entry).await;
2273
2274 match result {
2275 Ok(entry_result) => {
2276 if entry_result.status >= 400 {
2277 error_info = Some((
2278 idx,
2279 format!("Entry failed with status {}", entry_result.status),
2280 ));
2281 break;
2282 }
2283
2284 if entry.method == BundleMethod::Post {
2286 if let Some(ref full_url) = entry.full_url {
2287 if let Some(ref location) = entry_result.location {
2288 let reference = location
2289 .split("/_history")
2290 .next()
2291 .unwrap_or(location)
2292 .to_string();
2293 reference_map.insert(full_url.clone(), reference);
2294 }
2295 }
2296 }
2297
2298 results.push(entry_result);
2299 }
2300 Err(e) => {
2301 error_info = Some((idx, format!("Entry processing failed: {}", e)));
2302 break;
2303 }
2304 }
2305 }
2306
2307 if let Some((index, message)) = error_info {
2309 let _ = Box::new(tx).rollback().await;
2310 return Err(TransactionError::BundleError { index, message });
2311 }
2312
2313 Box::new(tx)
2315 .commit()
2316 .await
2317 .map_err(|e| TransactionError::RolledBack {
2318 reason: format!("Commit failed: {}", e),
2319 })?;
2320
2321 Ok(BundleResult {
2322 bundle_type: BundleType::Transaction,
2323 entries: results,
2324 })
2325 }
2326
2327 async fn process_batch(
2328 &self,
2329 tenant: &TenantContext,
2330 entries: Vec<BundleEntry>,
2331 ) -> StorageResult<BundleResult> {
2332 let mut results = Vec::with_capacity(entries.len());
2333
2334 for entry in &entries {
2336 let result = self.process_batch_entry(tenant, entry).await;
2337 results.push(result);
2338 }
2339
2340 Ok(BundleResult {
2341 bundle_type: BundleType::Batch,
2342 entries: results,
2343 })
2344 }
2345}
2346
2347impl PostgresBackend {
2348 async fn process_bundle_entry_tx(
2350 &self,
2351 tx: &mut super::transaction::PostgresTransaction,
2352 entry: &BundleEntry,
2353 ) -> StorageResult<BundleEntryResult> {
2354 use crate::core::transaction::Transaction;
2355
2356 match entry.method {
2357 BundleMethod::Get => {
2358 let (resource_type, id) = self.parse_url(&entry.url)?;
2359 match tx.read(&resource_type, &id).await? {
2360 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2361 None => Ok(BundleEntryResult::error(
2362 404,
2363 serde_json::json!({
2364 "resourceType": "OperationOutcome",
2365 "issue": [{"severity": "error", "code": "not-found"}]
2366 }),
2367 )),
2368 }
2369 }
2370 BundleMethod::Post => {
2371 let resource = entry.resource.clone().ok_or_else(|| {
2372 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2373 field: "resource".to_string(),
2374 })
2375 })?;
2376
2377 let resource_type = resource
2378 .get("resourceType")
2379 .and_then(|v| v.as_str())
2380 .map(|s| s.to_string())
2381 .ok_or_else(|| {
2382 StorageError::Validation(
2383 crate::error::ValidationError::MissingRequiredField {
2384 field: "resourceType".to_string(),
2385 },
2386 )
2387 })?;
2388
2389 let created = tx.create(&resource_type, resource).await?;
2390 Ok(BundleEntryResult::created(created))
2391 }
2392 BundleMethod::Put => {
2393 let resource = entry.resource.clone().ok_or_else(|| {
2394 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2395 field: "resource".to_string(),
2396 })
2397 })?;
2398
2399 let (resource_type, id) = self.parse_url(&entry.url)?;
2400
2401 match tx.read(&resource_type, &id).await? {
2402 Some(existing) => {
2403 if let Some(ref if_match) = entry.if_match {
2405 let current_etag = existing.etag();
2406 if current_etag != if_match.as_str() {
2407 return Ok(BundleEntryResult::error(
2408 412,
2409 serde_json::json!({
2410 "resourceType": "OperationOutcome",
2411 "issue": [{"severity": "error", "code": "conflict", "diagnostics": "ETag mismatch"}]
2412 }),
2413 ));
2414 }
2415 }
2416 let updated = tx.update(&existing, resource).await?;
2417 Ok(BundleEntryResult::ok(updated))
2418 }
2419 None => {
2420 let mut resource_with_id = resource;
2422 resource_with_id["id"] = serde_json::json!(id);
2423 let created = tx.create(&resource_type, resource_with_id).await?;
2424 Ok(BundleEntryResult::created(created))
2425 }
2426 }
2427 }
2428 BundleMethod::Delete => {
2429 let (resource_type, id) = self.parse_url(&entry.url)?;
2430 tx.delete(&resource_type, &id).await?;
2431 Ok(BundleEntryResult::deleted())
2432 }
2433 BundleMethod::Patch => {
2434 Ok(BundleEntryResult::error(
2436 501,
2437 serde_json::json!({
2438 "resourceType": "OperationOutcome",
2439 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented in transaction bundles"}]
2440 }),
2441 ))
2442 }
2443 }
2444 }
2445
2446 async fn process_batch_entry(
2448 &self,
2449 tenant: &TenantContext,
2450 entry: &BundleEntry,
2451 ) -> BundleEntryResult {
2452 match self.process_batch_entry_inner(tenant, entry).await {
2453 Ok(result) => result,
2454 Err(e) => BundleEntryResult::error(
2455 500,
2456 serde_json::json!({
2457 "resourceType": "OperationOutcome",
2458 "issue": [{"severity": "error", "code": "exception", "diagnostics": e.to_string()}]
2459 }),
2460 ),
2461 }
2462 }
2463
2464 async fn process_batch_entry_inner(
2465 &self,
2466 tenant: &TenantContext,
2467 entry: &BundleEntry,
2468 ) -> StorageResult<BundleEntryResult> {
2469 match entry.method {
2470 BundleMethod::Get => {
2471 let (resource_type, id) = self.parse_url(&entry.url)?;
2472 match self.read(tenant, &resource_type, &id).await? {
2473 Some(resource) => Ok(BundleEntryResult::ok(resource)),
2474 None => Ok(BundleEntryResult::error(
2475 404,
2476 serde_json::json!({
2477 "resourceType": "OperationOutcome",
2478 "issue": [{"severity": "error", "code": "not-found"}]
2479 }),
2480 )),
2481 }
2482 }
2483 BundleMethod::Post => {
2484 let resource = entry.resource.clone().ok_or_else(|| {
2485 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2486 field: "resource".to_string(),
2487 })
2488 })?;
2489
2490 let resource_type = resource
2491 .get("resourceType")
2492 .and_then(|v| v.as_str())
2493 .map(|s| s.to_string())
2494 .ok_or_else(|| {
2495 StorageError::Validation(
2496 crate::error::ValidationError::MissingRequiredField {
2497 field: "resourceType".to_string(),
2498 },
2499 )
2500 })?;
2501
2502 let created = self
2503 .create(tenant, &resource_type, resource, FhirVersion::default())
2504 .await?;
2505 Ok(BundleEntryResult::created(created))
2506 }
2507 BundleMethod::Put => {
2508 let resource = entry.resource.clone().ok_or_else(|| {
2509 StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2510 field: "resource".to_string(),
2511 })
2512 })?;
2513
2514 let (resource_type, id) = self.parse_url(&entry.url)?;
2515 let (stored, _created) = self
2516 .create_or_update(
2517 tenant,
2518 &resource_type,
2519 &id,
2520 resource,
2521 FhirVersion::default(),
2522 )
2523 .await?;
2524 Ok(BundleEntryResult::ok(stored))
2525 }
2526 BundleMethod::Delete => {
2527 let (resource_type, id) = self.parse_url(&entry.url)?;
2528 match self.delete(tenant, &resource_type, &id).await {
2529 Ok(()) => Ok(BundleEntryResult::deleted()),
2530 Err(StorageError::Resource(ResourceError::NotFound { .. })) => {
2531 Ok(BundleEntryResult::deleted()) }
2533 Err(e) => Err(e),
2534 }
2535 }
2536 BundleMethod::Patch => Ok(BundleEntryResult::error(
2537 501,
2538 serde_json::json!({
2539 "resourceType": "OperationOutcome",
2540 "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2541 }),
2542 )),
2543 }
2544 }
2545
2546 fn parse_url(&self, url: &str) -> StorageResult<(String, String)> {
2548 let path = url
2549 .strip_prefix("http://")
2550 .or_else(|| url.strip_prefix("https://"))
2551 .map(|s| s.find('/').map(|i| &s[i..]).unwrap_or(s))
2552 .unwrap_or(url);
2553
2554 let path = path.trim_start_matches('/');
2555 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
2556
2557 if parts.len() >= 2 {
2558 let len = parts.len();
2559 Ok((parts[len - 2].to_string(), parts[len - 1].to_string()))
2560 } else {
2561 Err(StorageError::Validation(
2562 crate::error::ValidationError::InvalidReference {
2563 reference: url.to_string(),
2564 message: "URL must be in format ResourceType/id".to_string(),
2565 },
2566 ))
2567 }
2568 }
2569}
2570
2571fn resolve_bundle_references(
2573 value: &mut serde_json::Value,
2574 reference_map: &std::collections::HashMap<String, String>,
2575) {
2576 use serde_json::Value;
2577 match value {
2578 Value::Object(map) => {
2579 if let Some(Value::String(ref_str)) = map.get("reference") {
2580 if ref_str.starts_with("urn:uuid:") {
2581 if let Some(resolved) = reference_map.get(ref_str) {
2582 map.insert("reference".to_string(), Value::String(resolved.clone()));
2583 }
2584 }
2585 }
2586 for v in map.values_mut() {
2587 resolve_bundle_references(v, reference_map);
2588 }
2589 }
2590 Value::Array(arr) => {
2591 for item in arr {
2592 resolve_bundle_references(item, reference_map);
2593 }
2594 }
2595 _ => {}
2596 }
2597}
2598
2599#[async_trait]
2604impl ReindexableStorage for PostgresBackend {
2605 async fn list_resource_types(&self, tenant: &TenantContext) -> StorageResult<Vec<String>> {
2606 let client = self.get_client().await?;
2607 let tenant_id = tenant.tenant_id().as_str();
2608
2609 let rows = client
2610 .query(
2611 "SELECT DISTINCT resource_type FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
2612 &[&tenant_id],
2613 )
2614 .await
2615 .map_err(|e| internal_error(format!("Failed to query resource types: {}", e)))?;
2616
2617 let types: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
2618 Ok(types)
2619 }
2620
2621 async fn count_resources(
2622 &self,
2623 tenant: &TenantContext,
2624 resource_type: &str,
2625 ) -> StorageResult<u64> {
2626 self.count(tenant, Some(resource_type)).await
2627 }
2628
2629 async fn fetch_resources_page(
2630 &self,
2631 tenant: &TenantContext,
2632 resource_type: &str,
2633 cursor: Option<&str>,
2634 limit: u32,
2635 ) -> StorageResult<ResourcePage> {
2636 let client = self.get_client().await?;
2637 let tenant_id = tenant.tenant_id().as_str();
2638
2639 let (cursor_ts, cursor_id) = if let Some(c) = cursor {
2641 let parts: Vec<&str> = c.split('|').collect();
2642 if parts.len() == 2 {
2643 let ts = DateTime::parse_from_rfc3339(parts[0])
2644 .map(|dt| dt.with_timezone(&Utc))
2645 .map_err(|e| internal_error(format!("Invalid cursor timestamp: {}", e)))?;
2646 (Some(ts), Some(parts[1].to_string()))
2647 } else {
2648 (None, None)
2649 }
2650 } else {
2651 (None, None)
2652 };
2653
2654 let rows = if let (Some(ts), Some(id)) = (&cursor_ts, &cursor_id) {
2655 client
2656 .query(
2657 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2658 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2659 AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
2660 ORDER BY last_updated ASC, id ASC LIMIT $5",
2661 &[
2662 &tenant_id,
2663 &resource_type,
2664 ts,
2665 &id.as_str(),
2666 &(limit as i64),
2667 ],
2668 )
2669 .await
2670 .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2671 } else {
2672 client
2673 .query(
2674 "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2675 WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2676 ORDER BY last_updated ASC, id ASC LIMIT $3",
2677 &[&tenant_id, &resource_type, &(limit as i64)],
2678 )
2679 .await
2680 .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2681 };
2682
2683 let resources: Vec<StoredResource> = rows
2684 .iter()
2685 .map(|row| {
2686 let id: String = row.get(0);
2687 let version_id: String = row.get(1);
2688 let data: Value = row.get(2);
2689 let last_updated: DateTime<Utc> = row.get(3);
2690 let fhir_version_str: String = row.get(4);
2691 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
2692
2693 StoredResource::from_storage(
2694 resource_type,
2695 id,
2696 version_id,
2697 tenant.tenant_id().clone(),
2698 data,
2699 last_updated,
2700 last_updated,
2701 None,
2702 fhir_version,
2703 )
2704 })
2705 .collect();
2706
2707 let next_cursor = if resources.len() == limit as usize {
2709 resources
2710 .last()
2711 .map(|r| format!("{}|{}", r.last_modified().to_rfc3339(), r.id()))
2712 } else {
2713 None
2714 };
2715
2716 Ok(ResourcePage {
2717 resources,
2718 next_cursor,
2719 })
2720 }
2721
2722 async fn delete_search_entries(
2723 &self,
2724 tenant: &TenantContext,
2725 resource_type: &str,
2726 resource_id: &str,
2727 ) -> StorageResult<()> {
2728 let client = self.get_client().await?;
2729 self.delete_search_index(
2730 &client,
2731 tenant.tenant_id().as_str(),
2732 resource_type,
2733 resource_id,
2734 )
2735 .await
2736 }
2737
2738 async fn write_search_entries(
2739 &self,
2740 tenant: &TenantContext,
2741 resource_type: &str,
2742 resource_id: &str,
2743 resource: &Value,
2744 ) -> StorageResult<usize> {
2745 let client = self.get_client().await?;
2746 let tenant_id = tenant.tenant_id().as_str();
2747
2748 let values = self
2750 .search_extractor()
2751 .extract(resource, resource_type)
2752 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
2753
2754 let mut count = 0;
2755 for value in values {
2756 PostgresSearchIndexWriter::write_entry(
2757 &client,
2758 tenant_id,
2759 resource_type,
2760 resource_id,
2761 &value,
2762 )
2763 .await?;
2764 count += 1;
2765 }
2766
2767 Ok(count)
2768 }
2769
2770 async fn clear_search_index(&self, tenant: &TenantContext) -> StorageResult<u64> {
2771 let client = self.get_client().await?;
2772 let tenant_id = tenant.tenant_id().as_str();
2773
2774 let deleted = client
2775 .execute(
2776 "DELETE FROM search_index WHERE tenant_id = $1",
2777 &[&tenant_id],
2778 )
2779 .await
2780 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
2781
2782 let _ = client
2784 .execute(
2785 "DELETE FROM resource_fts WHERE tenant_id = $1",
2786 &[&tenant_id],
2787 )
2788 .await;
2789
2790 Ok(deleted)
2791 }
2792}
2793
2794fn normalize_date_for_pg(value: &str) -> String {
2800 if value.contains('T') {
2801 if value.contains('+') || value.contains('Z') || value.ends_with("-00:00") {
2802 value.to_string()
2803 } else {
2804 format!("{}+00:00", value)
2805 }
2806 } else if value.len() == 10 {
2807 format!("{}T00:00:00+00:00", value)
2808 } else if value.len() == 7 {
2809 format!("{}-01T00:00:00+00:00", value)
2810 } else if value.len() == 4 {
2811 format!("{}-01-01T00:00:00+00:00", value)
2812 } else {
2813 value.to_string()
2814 }
2815}
2816
2817struct SearchableContent {
2823 narrative: String,
2824 full_content: String,
2825}
2826
2827impl SearchableContent {
2828 fn is_empty(&self) -> bool {
2829 self.narrative.is_empty() && self.full_content.is_empty()
2830 }
2831}
2832
2833fn extract_searchable_content(resource: &Value) -> SearchableContent {
2835 SearchableContent {
2836 narrative: extract_narrative(resource),
2837 full_content: extract_all_strings(resource),
2838 }
2839}
2840
2841fn extract_narrative(resource: &Value) -> String {
2843 resource
2844 .get("text")
2845 .and_then(|t| t.get("div"))
2846 .and_then(|d| d.as_str())
2847 .map(strip_html_tags)
2848 .unwrap_or_default()
2849}
2850
2851fn strip_html_tags(html: &str) -> String {
2853 let mut result = String::with_capacity(html.len());
2854 let mut in_tag = false;
2855
2856 for c in html.chars() {
2857 match c {
2858 '<' => in_tag = true,
2859 '>' if in_tag => {
2860 in_tag = false;
2861 result.push(' ');
2862 }
2863 _ if !in_tag => result.push(c),
2864 _ => {}
2865 }
2866 }
2867
2868 result.split_whitespace().collect::<Vec<_>>().join(" ")
2869}
2870
2871fn extract_all_strings(value: &Value) -> String {
2873 let mut parts = Vec::new();
2874 collect_strings(value, &mut parts);
2875 parts.join(" ")
2876}
2877
2878fn collect_strings(value: &Value, parts: &mut Vec<String>) {
2879 match value {
2880 Value::String(s) => {
2881 if !s.is_empty() {
2882 parts.push(s.clone());
2883 }
2884 }
2885 Value::Object(map) => {
2886 for (key, val) in map {
2887 if key == "div" || key == "data" {
2888 continue;
2889 }
2890 collect_strings(val, parts);
2891 }
2892 }
2893 Value::Array(arr) => {
2894 for val in arr {
2895 collect_strings(val, parts);
2896 }
2897 }
2898 _ => {}
2899 }
2900}