Skip to main content

helios_persistence/backends/postgres/
storage.rs

1//! ResourceStorage and VersionedStorage implementations for PostgreSQL.
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7
8use crate::core::history::{
9    DifferentialHistoryProvider, HistoryEntry, HistoryMethod, HistoryPage, HistoryParams,
10    InstanceHistoryProvider, SystemHistoryProvider, TypeHistoryProvider,
11};
12use crate::core::transaction::{
13    BundleEntry, BundleEntryResult, BundleMethod, BundleProvider, BundleResult, BundleType,
14};
15use crate::core::{
16    ConditionalCreateResult, ConditionalDeleteResult, ConditionalStorage, ConditionalUpdateResult,
17    PurgableStorage, ResourceStorage, SearchProvider, VersionedStorage,
18};
19use crate::error::TransactionError;
20use crate::error::{BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult};
21use crate::search::loader::SearchParameterLoader;
22use crate::search::registry::SearchParameterStatus;
23use crate::search::reindex::{ReindexableStorage, ResourcePage};
24use crate::tenant::TenantContext;
25use crate::types::Pagination;
26use crate::types::{CursorValue, Page, PageCursor, PageInfo, StoredResource};
27use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
28
29use super::PostgresBackend;
30use super::search::writer::PostgresSearchIndexWriter;
31
32fn internal_error(message: String) -> StorageError {
33    StorageError::Backend(BackendError::Internal {
34        backend_name: "postgres".to_string(),
35        message,
36        source: None,
37    })
38}
39
40#[allow(dead_code)]
41fn serialization_error(message: String) -> StorageError {
42    StorageError::Backend(BackendError::SerializationError { message })
43}
44
45#[async_trait]
46impl ResourceStorage for PostgresBackend {
47    fn backend_name(&self) -> &'static str {
48        "postgres"
49    }
50
51    async fn create(
52        &self,
53        tenant: &TenantContext,
54        resource_type: &str,
55        resource: Value,
56        fhir_version: FhirVersion,
57    ) -> StorageResult<StoredResource> {
58        let client = self.get_client().await?;
59        let tenant_id = tenant.tenant_id().as_str();
60
61        // Extract or generate ID
62        let id = resource
63            .get("id")
64            .and_then(|v| v.as_str())
65            .map(String::from)
66            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
67
68        // Check if resource already exists
69        let exists = client
70            .query_opt(
71                "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
72                &[&tenant_id, &resource_type, &id],
73            )
74            .await
75            .map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
76
77        if exists.is_some() {
78            return Err(StorageError::Resource(ResourceError::AlreadyExists {
79                resource_type: resource_type.to_string(),
80                id: id.clone(),
81            }));
82        }
83
84        // Ensure the resource has correct type and id
85        let mut resource = resource;
86        if let Some(obj) = resource.as_object_mut() {
87            obj.insert(
88                "resourceType".to_string(),
89                Value::String(resource_type.to_string()),
90            );
91            obj.insert("id".to_string(), Value::String(id.clone()));
92        }
93
94        let now = Utc::now();
95        let version_id = "1";
96        let fhir_version_str = fhir_version.as_mime_param();
97        let is_deleted = false;
98
99        // Insert the resource
100        client
101            .execute(
102                "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
103                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
104                &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
105            )
106            .await
107            .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
108
109        // Insert into history
110        client
111            .execute(
112                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
113                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
114                &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
115            )
116            .await
117            .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
118
119        // Index the resource for search
120        self.index_resource(&client, tenant_id, resource_type, &id, &resource)
121            .await?;
122
123        // Handle SearchParameter resources specially - update registry
124        if resource_type == "SearchParameter" {
125            self.handle_search_parameter_create(&resource)?;
126        }
127
128        // Return the stored resource with updated metadata
129        Ok(StoredResource::from_storage(
130            resource_type,
131            &id,
132            version_id,
133            tenant.tenant_id().clone(),
134            resource,
135            now,
136            now,
137            None,
138            fhir_version,
139        ))
140    }
141
142    async fn create_or_update(
143        &self,
144        tenant: &TenantContext,
145        resource_type: &str,
146        id: &str,
147        resource: Value,
148        fhir_version: FhirVersion,
149    ) -> StorageResult<(StoredResource, bool)> {
150        // Check if exists
151        let existing = self.read(tenant, resource_type, id).await?;
152
153        if let Some(current) = existing {
154            // Update existing (preserves original FHIR version)
155            let updated = self.update(tenant, &current, resource).await?;
156            Ok((updated, false))
157        } else {
158            // Create new with specific ID
159            let mut resource = resource;
160            if let Some(obj) = resource.as_object_mut() {
161                obj.insert("id".to_string(), Value::String(id.to_string()));
162            }
163            let created = self
164                .create(tenant, resource_type, resource, fhir_version)
165                .await?;
166            Ok((created, true))
167        }
168    }
169
170    async fn read(
171        &self,
172        tenant: &TenantContext,
173        resource_type: &str,
174        id: &str,
175    ) -> StorageResult<Option<StoredResource>> {
176        let client = self.get_client().await?;
177        let tenant_id = tenant.tenant_id().as_str();
178
179        let row = client
180            .query_opt(
181                "SELECT version_id, data, last_updated, is_deleted, deleted_at, fhir_version
182                 FROM resources
183                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
184                &[&tenant_id, &resource_type, &id],
185            )
186            .await
187            .map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
188
189        match row {
190            Some(row) => {
191                let version_id: String = row.get(0);
192                let data: Value = row.get(1);
193                let last_updated: DateTime<Utc> = row.get(2);
194                let is_deleted: bool = row.get(3);
195                let deleted_at: Option<DateTime<Utc>> = row.get(4);
196                let fhir_version_str: String = row.get(5);
197
198                // If deleted, return Gone error
199                if is_deleted {
200                    return Err(StorageError::Resource(ResourceError::Gone {
201                        resource_type: resource_type.to_string(),
202                        id: id.to_string(),
203                        deleted_at,
204                    }));
205                }
206
207                let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
208
209                Ok(Some(StoredResource::from_storage(
210                    resource_type,
211                    id,
212                    version_id,
213                    tenant.tenant_id().clone(),
214                    data,
215                    last_updated,
216                    last_updated,
217                    None,
218                    fhir_version,
219                )))
220            }
221            None => Ok(None),
222        }
223    }
224
225    async fn update(
226        &self,
227        tenant: &TenantContext,
228        current: &StoredResource,
229        resource: Value,
230    ) -> StorageResult<StoredResource> {
231        let client = self.get_client().await?;
232        let tenant_id = tenant.tenant_id().as_str();
233        let resource_type = current.resource_type();
234        let id = current.id();
235
236        // Check that the resource still exists with the expected version
237        let row = client
238            .query_opt(
239                "SELECT version_id FROM resources
240                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
241                &[&tenant_id, &resource_type, &id],
242            )
243            .await
244            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
245
246        let actual_version = match row {
247            Some(row) => row.get::<_, String>(0),
248            None => {
249                return Err(StorageError::Resource(ResourceError::NotFound {
250                    resource_type: resource_type.to_string(),
251                    id: id.to_string(),
252                }));
253            }
254        };
255
256        // Check version match
257        if actual_version != current.version_id() {
258            return Err(StorageError::Concurrency(
259                ConcurrencyError::VersionConflict {
260                    resource_type: resource_type.to_string(),
261                    id: id.to_string(),
262                    expected_version: current.version_id().to_string(),
263                    actual_version,
264                },
265            ));
266        }
267
268        // Calculate new version
269        let new_version: u64 = actual_version.parse().unwrap_or(0) + 1;
270        let new_version_str = new_version.to_string();
271
272        // Ensure the resource has correct type and id
273        let mut resource = resource;
274        if let Some(obj) = resource.as_object_mut() {
275            obj.insert(
276                "resourceType".to_string(),
277                Value::String(resource_type.to_string()),
278            );
279            obj.insert("id".to_string(), Value::String(id.to_string()));
280        }
281
282        let now = Utc::now();
283        let fhir_version_str = current.fhir_version().as_mime_param();
284        let is_deleted = false;
285
286        // Update the resource
287        client
288            .execute(
289                "UPDATE resources SET version_id = $1, data = $2, last_updated = $3
290                 WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
291                &[
292                    &new_version_str,
293                    &resource,
294                    &now,
295                    &tenant_id,
296                    &resource_type,
297                    &id,
298                ],
299            )
300            .await
301            .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
302
303        // Insert into history (preserve the original FHIR version)
304        client
305            .execute(
306                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
307                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
308                &[&tenant_id, &resource_type, &id, &new_version_str, &resource, &now, &is_deleted, &fhir_version_str],
309            )
310            .await
311            .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
312
313        // Re-index the resource (delete old entries, add new)
314        self.delete_search_index(&client, tenant_id, resource_type, id)
315            .await?;
316        self.index_resource(&client, tenant_id, resource_type, id, &resource)
317            .await?;
318
319        // Handle SearchParameter resources specially - update registry
320        if resource_type == "SearchParameter" {
321            self.handle_search_parameter_update(current.content(), &resource)?;
322        }
323
324        Ok(StoredResource::from_storage(
325            resource_type,
326            id,
327            new_version_str,
328            tenant.tenant_id().clone(),
329            resource,
330            now,
331            now,
332            None,
333            current.fhir_version(),
334        ))
335    }
336
337    async fn delete(
338        &self,
339        tenant: &TenantContext,
340        resource_type: &str,
341        id: &str,
342    ) -> StorageResult<()> {
343        let client = self.get_client().await?;
344        let tenant_id = tenant.tenant_id().as_str();
345
346        // Check if resource exists and get its fhir_version
347        let row = client
348            .query_opt(
349                "SELECT version_id, data, fhir_version FROM resources
350                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
351                &[&tenant_id, &resource_type, &id],
352            )
353            .await
354            .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
355
356        let (current_version, data, fhir_version_str) = match row {
357            Some(row) => {
358                let v: String = row.get(0);
359                let d: Value = row.get(1);
360                let f: String = row.get(2);
361                (v, d, f)
362            }
363            None => {
364                return Err(StorageError::Resource(ResourceError::NotFound {
365                    resource_type: resource_type.to_string(),
366                    id: id.to_string(),
367                }));
368            }
369        };
370
371        let now = Utc::now();
372
373        // Calculate new version for the deletion record
374        let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
375        let new_version_str = new_version.to_string();
376        let is_deleted = true;
377
378        // Soft delete the resource
379        client
380            .execute(
381                "UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
382                 WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
383                &[&now, &new_version_str, &tenant_id, &resource_type, &id],
384            )
385            .await
386            .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
387
388        // Insert deletion record into history (preserve fhir_version)
389        client
390            .execute(
391                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
392                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
393                &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
394            )
395            .await
396            .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
397
398        // Delete search index entries (skip when search is offloaded)
399        if !self.is_search_offloaded() {
400            client
401                .execute(
402                    "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
403                    &[&tenant_id, &resource_type, &id],
404                )
405                .await
406                .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
407        }
408
409        // Handle SearchParameter resources specially - update registry
410        if resource_type == "SearchParameter" {
411            self.handle_search_parameter_delete(&data)?;
412        }
413
414        Ok(())
415    }
416
417    async fn count(
418        &self,
419        tenant: &TenantContext,
420        resource_type: Option<&str>,
421    ) -> StorageResult<u64> {
422        let client = self.get_client().await?;
423        let tenant_id = tenant.tenant_id().as_str();
424
425        let count: i64 = if let Some(rt) = resource_type {
426            let row = client
427                .query_one(
428                    "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE",
429                    &[&tenant_id, &rt],
430                )
431                .await
432                .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
433            row.get(0)
434        } else {
435            let row = client
436                .query_one(
437                    "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
438                    &[&tenant_id],
439                )
440                .await
441                .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
442            row.get(0)
443        };
444
445        Ok(count as u64)
446    }
447}
448
449// ============================================================================
450// Search Index Helpers
451// ============================================================================
452
453impl PostgresBackend {
454    /// Index a resource for search.
455    ///
456    /// This method uses the SearchParameterExtractor to dynamically extract
457    /// searchable values based on the configured SearchParameterRegistry.
458    pub(crate) async fn index_resource(
459        &self,
460        client: &deadpool_postgres::Client,
461        tenant_id: &str,
462        resource_type: &str,
463        resource_id: &str,
464        resource: &Value,
465    ) -> StorageResult<()> {
466        // When search is offloaded to a secondary backend, skip local indexing
467        if self.is_search_offloaded() {
468            return Ok(());
469        }
470
471        // Delete existing index entries
472        client
473            .execute(
474                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
475                &[&tenant_id, &resource_type, &resource_id],
476            )
477            .await
478            .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
479
480        // Extract values using the registry-driven extractor
481        match self.search_extractor().extract(resource, resource_type) {
482            Ok(values) => {
483                let mut count = 0;
484                for value in values {
485                    PostgresSearchIndexWriter::write_entry(
486                        client,
487                        tenant_id,
488                        resource_type,
489                        resource_id,
490                        &value,
491                    )
492                    .await?;
493                    count += 1;
494                }
495                tracing::debug!(
496                    "Dynamically indexed {} values for {}/{}",
497                    count,
498                    resource_type,
499                    resource_id
500                );
501            }
502            Err(e) => {
503                tracing::warn!(
504                    "Dynamic extraction failed for {}/{}: {}. Using minimal fallback (_id, _lastUpdated only).",
505                    resource_type,
506                    resource_id,
507                    e
508                );
509                // Fall back to minimal extraction (just _id and _lastUpdated)
510                self.index_minimal_fallback(
511                    client,
512                    tenant_id,
513                    resource_type,
514                    resource_id,
515                    resource,
516                )
517                .await?;
518            }
519        }
520
521        // Index FTS content for _text and _content searches
522        self.index_fts_content(client, tenant_id, resource_type, resource_id, resource)
523            .await?;
524
525        Ok(())
526    }
527
528    /// Index full-text search content for _text and _content searches.
529    ///
530    /// Populates the resource_fts table using PostgreSQL tsvector/tsquery.
531    async fn index_fts_content(
532        &self,
533        client: &deadpool_postgres::Client,
534        tenant_id: &str,
535        resource_type: &str,
536        resource_id: &str,
537        resource: &Value,
538    ) -> StorageResult<()> {
539        // Check if FTS table exists
540        let fts_exists = client
541            .query_opt(
542                "SELECT 1 FROM information_schema.tables WHERE table_name = 'resource_fts'",
543                &[],
544            )
545            .await
546            .map_err(|e| internal_error(format!("Failed to check FTS table: {}", e)))?;
547
548        if fts_exists.is_none() {
549            return Ok(());
550        }
551
552        // Extract searchable content
553        let content = extract_searchable_content(resource);
554
555        if content.is_empty() {
556            return Ok(());
557        }
558
559        // Delete existing FTS entry first
560        let _ = client
561            .execute(
562                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
563                &[&tenant_id, &resource_type, &resource_id],
564            )
565            .await;
566
567        // Insert into FTS table (the trigger will populate tsvector columns)
568        client
569            .execute(
570                "INSERT INTO resource_fts (resource_id, resource_type, tenant_id, narrative_text, full_content)
571                 VALUES ($1, $2, $3, $4, $5)",
572                &[
573                    &resource_id,
574                    &resource_type,
575                    &tenant_id,
576                    &content.narrative,
577                    &content.full_content,
578                ],
579            )
580            .await
581            .map_err(|e| internal_error(format!("Failed to insert FTS content: {}", e)))?;
582
583        Ok(())
584    }
585
586    /// Index minimal fallback search parameters.
587    ///
588    /// Only indexes `_id` and `_lastUpdated` when dynamic extraction fails.
589    async fn index_minimal_fallback(
590        &self,
591        client: &deadpool_postgres::Client,
592        tenant_id: &str,
593        resource_type: &str,
594        resource_id: &str,
595        resource: &Value,
596    ) -> StorageResult<()> {
597        // _id - always available from resource.id
598        if let Some(id) = resource.get("id").and_then(|v| v.as_str()) {
599            client
600                .execute(
601                    "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_token_code)
602                     VALUES ($1, $2, $3, '_id', $4)",
603                    &[&tenant_id, &resource_type, &resource_id, &id],
604                )
605                .await
606                .map_err(|e| internal_error(format!("Failed to insert _id index: {}", e)))?;
607        }
608
609        // _lastUpdated - from resource.meta.lastUpdated
610        if let Some(last_updated) = resource
611            .get("meta")
612            .and_then(|m| m.get("lastUpdated"))
613            .and_then(|v| v.as_str())
614        {
615            let normalized = normalize_date_for_pg(last_updated);
616            client
617                .execute(
618                    "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_date)
619                     VALUES ($1, $2, $3, '_lastUpdated', $4::timestamptz)",
620                    &[&tenant_id, &resource_type, &resource_id, &normalized],
621                )
622                .await
623                .map_err(|e| {
624                    internal_error(format!("Failed to insert _lastUpdated index: {}", e))
625                })?;
626        }
627
628        Ok(())
629    }
630
631    /// Delete search index entries for a resource.
632    pub(crate) async fn delete_search_index(
633        &self,
634        client: &deadpool_postgres::Client,
635        tenant_id: &str,
636        resource_type: &str,
637        resource_id: &str,
638    ) -> StorageResult<()> {
639        // When search is offloaded to a secondary backend, skip local index cleanup
640        if self.is_search_offloaded() {
641            return Ok(());
642        }
643
644        // Delete from main search index
645        client
646            .execute(
647                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
648                &[&tenant_id, &resource_type, &resource_id],
649            )
650            .await
651            .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
652
653        // Delete from FTS table if it exists
654        let _ = client
655            .execute(
656                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
657                &[&tenant_id, &resource_type, &resource_id],
658            )
659            .await;
660
661        Ok(())
662    }
663}
664
665// ============================================================================
666// SearchParameter Resource Handling
667// ============================================================================
668
669impl PostgresBackend {
670    /// Handle creation of a SearchParameter resource.
671    ///
672    /// If the SearchParameter has status=active, it will be registered in the
673    /// search parameter registry, making it available for searches on new resources.
674    /// Existing resources will NOT be indexed for this parameter until $reindex is run.
675    fn handle_search_parameter_create(&self, resource: &Value) -> StorageResult<()> {
676        let loader = SearchParameterLoader::new(self.config().fhir_version);
677
678        match loader.parse_resource(resource) {
679            Ok(def) => {
680                // Only register if status is active
681                if def.status == SearchParameterStatus::Active {
682                    let mut registry = self.search_registry().write();
683                    // Ignore duplicate URL errors - the param may already be embedded
684                    if let Err(e) = registry.register(def) {
685                        tracing::debug!("SearchParameter registration skipped: {}", e);
686                    }
687                }
688            }
689            Err(e) => {
690                // Log but don't fail - the resource is still stored
691                tracing::warn!("Failed to parse SearchParameter for registry: {}", e);
692            }
693        }
694
695        Ok(())
696    }
697
698    /// Handle update of a SearchParameter resource.
699    ///
700    /// Updates the registry based on status changes:
701    /// - active -> retired: Parameter disabled for searches
702    /// - retired -> active: Parameter re-enabled for searches
703    /// - Any other change: Updates the registry entry
704    fn handle_search_parameter_update(
705        &self,
706        old_resource: &Value,
707        new_resource: &Value,
708    ) -> StorageResult<()> {
709        let loader = SearchParameterLoader::new(self.config().fhir_version);
710
711        let old_def = loader.parse_resource(old_resource).ok();
712        let new_def = loader.parse_resource(new_resource).ok();
713
714        match (old_def, new_def) {
715            (Some(old), Some(new)) => {
716                let mut registry = self.search_registry().write();
717
718                // If URL changed, unregister old and register new
719                if old.url != new.url {
720                    let _ = registry.unregister(&old.url);
721                    if new.status == SearchParameterStatus::Active {
722                        let _ = registry.register(new);
723                    }
724                } else if old.status != new.status {
725                    // Status change - update in registry
726                    if let Err(e) = registry.update_status(&new.url, new.status) {
727                        tracing::debug!("SearchParameter status update skipped: {}", e);
728                    }
729                } else {
730                    // Other changes - re-register (unregister then register)
731                    let _ = registry.unregister(&old.url);
732                    if new.status == SearchParameterStatus::Active {
733                        let _ = registry.register(new);
734                    }
735                }
736            }
737            (None, Some(new)) => {
738                // Old wasn't valid, try to register new
739                if new.status == SearchParameterStatus::Active {
740                    let mut registry = self.search_registry().write();
741                    let _ = registry.register(new);
742                }
743            }
744            (Some(old), None) => {
745                // New isn't valid, unregister old
746                let mut registry = self.search_registry().write();
747                let _ = registry.unregister(&old.url);
748            }
749            (None, None) => {
750                // Neither valid - nothing to do
751            }
752        }
753
754        Ok(())
755    }
756
757    /// Handle deletion of a SearchParameter resource.
758    ///
759    /// Removes the parameter from the registry. Search index entries for this
760    /// parameter are NOT automatically cleaned up (use $reindex for that).
761    fn handle_search_parameter_delete(&self, resource: &Value) -> StorageResult<()> {
762        if let Some(url) = resource.get("url").and_then(|v| v.as_str()) {
763            let mut registry = self.search_registry().write();
764            if let Err(e) = registry.unregister(url) {
765                tracing::debug!("SearchParameter unregistration skipped: {}", e);
766            }
767        }
768
769        Ok(())
770    }
771}
772
773// ============================================================================
774// VersionedStorage Implementation
775// ============================================================================
776
777#[async_trait]
778impl VersionedStorage for PostgresBackend {
779    async fn vread(
780        &self,
781        tenant: &TenantContext,
782        resource_type: &str,
783        id: &str,
784        version_id: &str,
785    ) -> StorageResult<Option<StoredResource>> {
786        let client = self.get_client().await?;
787        let tenant_id = tenant.tenant_id().as_str();
788
789        let row = client
790            .query_opt(
791                "SELECT data, last_updated, is_deleted, fhir_version
792                 FROM resource_history
793                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
794                &[&tenant_id, &resource_type, &id, &version_id],
795            )
796            .await
797            .map_err(|e| internal_error(format!("Failed to read version: {}", e)))?;
798
799        match row {
800            Some(row) => {
801                let data: Value = row.get(0);
802                let last_updated: DateTime<Utc> = row.get(1);
803                let is_deleted: bool = row.get(2);
804                let fhir_version_str: String = row.get(3);
805
806                // For deleted versions, use last_updated as deleted_at
807                let deleted_at = if is_deleted { Some(last_updated) } else { None };
808
809                let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
810
811                Ok(Some(StoredResource::from_storage(
812                    resource_type,
813                    id,
814                    version_id,
815                    tenant.tenant_id().clone(),
816                    data,
817                    last_updated,
818                    last_updated,
819                    deleted_at,
820                    fhir_version,
821                )))
822            }
823            None => Ok(None),
824        }
825    }
826
827    async fn update_with_match(
828        &self,
829        tenant: &TenantContext,
830        resource_type: &str,
831        id: &str,
832        expected_version: &str,
833        resource: Value,
834    ) -> StorageResult<StoredResource> {
835        // Read current resource
836        let current = self.read(tenant, resource_type, id).await?.ok_or_else(|| {
837            StorageError::Resource(ResourceError::NotFound {
838                resource_type: resource_type.to_string(),
839                id: id.to_string(),
840            })
841        })?;
842
843        // Check version match
844        if current.version_id() != expected_version {
845            return Err(StorageError::Concurrency(
846                ConcurrencyError::VersionConflict {
847                    resource_type: resource_type.to_string(),
848                    id: id.to_string(),
849                    expected_version: expected_version.to_string(),
850                    actual_version: current.version_id().to_string(),
851                },
852            ));
853        }
854
855        // Perform update
856        self.update(tenant, &current, resource).await
857    }
858
859    async fn delete_with_match(
860        &self,
861        tenant: &TenantContext,
862        resource_type: &str,
863        id: &str,
864        expected_version: &str,
865    ) -> StorageResult<()> {
866        let client = self.get_client().await?;
867        let tenant_id = tenant.tenant_id().as_str();
868
869        // Check version match
870        let row = client
871            .query_opt(
872                "SELECT version_id FROM resources
873                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
874                &[&tenant_id, &resource_type, &id],
875            )
876            .await
877            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
878
879        let current_version = match row {
880            Some(row) => row.get::<_, String>(0),
881            None => {
882                return Err(StorageError::Resource(ResourceError::NotFound {
883                    resource_type: resource_type.to_string(),
884                    id: id.to_string(),
885                }));
886            }
887        };
888
889        if current_version != expected_version {
890            return Err(StorageError::Concurrency(
891                ConcurrencyError::VersionConflict {
892                    resource_type: resource_type.to_string(),
893                    id: id.to_string(),
894                    expected_version: expected_version.to_string(),
895                    actual_version: current_version,
896                },
897            ));
898        }
899
900        // Perform delete
901        self.delete(tenant, resource_type, id).await
902    }
903
904    async fn list_versions(
905        &self,
906        tenant: &TenantContext,
907        resource_type: &str,
908        id: &str,
909    ) -> StorageResult<Vec<String>> {
910        let client = self.get_client().await?;
911        let tenant_id = tenant.tenant_id().as_str();
912
913        let rows = client
914            .query(
915                "SELECT version_id FROM resource_history
916                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3
917                 ORDER BY CAST(version_id AS INTEGER) ASC",
918                &[&tenant_id, &resource_type, &id],
919            )
920            .await
921            .map_err(|e| internal_error(format!("Failed to list versions: {}", e)))?;
922
923        let versions: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
924        Ok(versions)
925    }
926}
927
928// ============================================================================
929// InstanceHistoryProvider Implementation
930// ============================================================================
931
932#[async_trait]
933impl InstanceHistoryProvider for PostgresBackend {
934    async fn history_instance(
935        &self,
936        tenant: &TenantContext,
937        resource_type: &str,
938        id: &str,
939        params: &HistoryParams,
940    ) -> StorageResult<HistoryPage> {
941        let client = self.get_client().await?;
942        let tenant_id = tenant.tenant_id().as_str();
943
944        // Build the query with filters
945        let mut sql = String::from(
946            "SELECT version_id, data, last_updated, is_deleted, fhir_version
947             FROM resource_history
948             WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
949        );
950        let mut param_index: usize = 4;
951        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
952            Box::new(tenant_id.to_string()),
953            Box::new(resource_type.to_string()),
954            Box::new(id.to_string()),
955        ];
956
957        // Apply deleted filter
958        if !params.include_deleted {
959            sql.push_str(" AND is_deleted = FALSE");
960        }
961
962        // Apply since filter
963        if let Some(since) = &params.since {
964            sql.push_str(&format!(" AND last_updated >= ${}", param_index));
965            query_params.push(Box::new(*since));
966            param_index += 1;
967        }
968
969        // Apply before filter
970        if let Some(before) = &params.before {
971            sql.push_str(&format!(" AND last_updated < ${}", param_index));
972            query_params.push(Box::new(*before));
973            param_index += 1;
974        }
975
976        // Apply cursor filter if present
977        if let Some(cursor) = params.pagination.cursor_value() {
978            if let Some(CursorValue::String(version_str)) = cursor.sort_values().first() {
979                if let Ok(version_int) = version_str.parse::<i64>() {
980                    sql.push_str(&format!(
981                        " AND CAST(version_id AS INTEGER) < ${}",
982                        param_index
983                    ));
984                    query_params.push(Box::new(version_int));
985                    param_index += 1;
986                }
987            }
988        }
989
990        // Order by version descending (newest first) and limit
991        let limit = params.pagination.count as i64 + 1; // +1 to detect if there are more
992        sql.push_str(&format!(
993            " ORDER BY CAST(version_id AS INTEGER) DESC LIMIT ${}",
994            param_index
995        ));
996        query_params.push(Box::new(limit));
997
998        // Execute the query
999        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1000            .iter()
1001            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1002            .collect();
1003
1004        let rows = client
1005            .query(&sql, &param_refs)
1006            .await
1007            .map_err(|e| internal_error(format!("Failed to query history: {}", e)))?;
1008
1009        let mut entries = Vec::new();
1010        let mut last_version: Option<String> = None;
1011
1012        for row in &rows {
1013            // Stop if we've collected enough items (we fetched count+1 to detect more)
1014            if entries.len() >= params.pagination.count as usize {
1015                break;
1016            }
1017
1018            let version_id: String = row.get(0);
1019            let data: Value = row.get(1);
1020            let last_updated: DateTime<Utc> = row.get(2);
1021            let is_deleted: bool = row.get(3);
1022            let fhir_version_str: String = row.get(4);
1023
1024            let deleted_at = if is_deleted { Some(last_updated) } else { None };
1025
1026            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1027
1028            let resource = StoredResource::from_storage(
1029                resource_type,
1030                id,
1031                &version_id,
1032                tenant.tenant_id().clone(),
1033                data,
1034                last_updated,
1035                last_updated,
1036                deleted_at,
1037                fhir_version,
1038            );
1039
1040            // Determine the method based on version and deletion status
1041            let method = if is_deleted {
1042                HistoryMethod::Delete
1043            } else if version_id == "1" {
1044                HistoryMethod::Post
1045            } else {
1046                HistoryMethod::Put
1047            };
1048
1049            last_version = Some(version_id);
1050
1051            entries.push(HistoryEntry {
1052                resource,
1053                method,
1054                timestamp: last_updated,
1055            });
1056        }
1057
1058        // Determine if there are more results
1059        let has_more = rows.len() > params.pagination.count as usize;
1060
1061        // Build page info
1062        let page_info = if let (true, Some(version)) = (has_more, last_version) {
1063            let cursor = PageCursor::new(vec![CursorValue::String(version)], id.to_string());
1064            PageInfo::with_next(cursor)
1065        } else {
1066            PageInfo::end()
1067        };
1068
1069        Ok(Page::new(entries, page_info))
1070    }
1071
1072    async fn history_instance_count(
1073        &self,
1074        tenant: &TenantContext,
1075        resource_type: &str,
1076        id: &str,
1077    ) -> StorageResult<u64> {
1078        let client = self.get_client().await?;
1079        let tenant_id = tenant.tenant_id().as_str();
1080
1081        let row = client
1082            .query_one(
1083                "SELECT COUNT(*) FROM resource_history
1084                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1085                &[&tenant_id, &resource_type, &id],
1086            )
1087            .await
1088            .map_err(|e| internal_error(format!("Failed to count history: {}", e)))?;
1089
1090        let count: i64 = row.get(0);
1091        Ok(count as u64)
1092    }
1093
1094    async fn delete_instance_history(
1095        &self,
1096        tenant: &TenantContext,
1097        resource_type: &str,
1098        id: &str,
1099    ) -> StorageResult<u64> {
1100        let client = self.get_client().await?;
1101        let tenant_id = tenant.tenant_id().as_str();
1102
1103        // First, verify the resource exists
1104        let exists = client
1105            .query_opt(
1106                "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1107                &[&tenant_id, &resource_type, &id],
1108            )
1109            .await
1110            .map_err(|e| internal_error(format!("Failed to check resource existence: {}", e)))?;
1111
1112        if exists.is_none() {
1113            return Err(StorageError::Resource(ResourceError::NotFound {
1114                resource_type: resource_type.to_string(),
1115                id: id.to_string(),
1116            }));
1117        }
1118
1119        // Get the current version from resources table (to preserve it)
1120        let current_row = client
1121            .query_one(
1122                "SELECT version_id FROM resources
1123                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1124                &[&tenant_id, &resource_type, &id],
1125            )
1126            .await
1127            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1128
1129        let current_version: String = current_row.get(0);
1130
1131        // Delete all history entries EXCEPT the current version
1132        let deleted = client
1133            .execute(
1134                "DELETE FROM resource_history
1135                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id != $4",
1136                &[&tenant_id, &resource_type, &id, &current_version],
1137            )
1138            .await
1139            .map_err(|e| internal_error(format!("Failed to delete history: {}", e)))?;
1140
1141        Ok(deleted)
1142    }
1143
1144    async fn delete_version(
1145        &self,
1146        tenant: &TenantContext,
1147        resource_type: &str,
1148        id: &str,
1149        version_id: &str,
1150    ) -> StorageResult<()> {
1151        let client = self.get_client().await?;
1152        let tenant_id = tenant.tenant_id().as_str();
1153
1154        // First, get the current version to ensure we're not deleting it
1155        let current_row = client
1156            .query_opt(
1157                "SELECT version_id FROM resources
1158                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1159                &[&tenant_id, &resource_type, &id],
1160            )
1161            .await
1162            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1163
1164        let current_version = match current_row {
1165            Some(row) => row.get::<_, String>(0),
1166            None => {
1167                return Err(StorageError::Resource(ResourceError::NotFound {
1168                    resource_type: resource_type.to_string(),
1169                    id: id.to_string(),
1170                }));
1171            }
1172        };
1173
1174        // Prevent deletion of the current version
1175        if version_id == current_version {
1176            return Err(StorageError::Validation(
1177                crate::error::ValidationError::InvalidResource {
1178                    message: format!(
1179                        "Cannot delete current version {} of {}/{}. Use DELETE on the resource instead.",
1180                        version_id, resource_type, id
1181                    ),
1182                    details: vec![],
1183                },
1184            ));
1185        }
1186
1187        // Check if the version exists in history
1188        let version_exists = client
1189            .query_opt(
1190                "SELECT 1 FROM resource_history
1191                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1192                &[&tenant_id, &resource_type, &id, &version_id],
1193            )
1194            .await
1195            .map_err(|e| internal_error(format!("Failed to check version existence: {}", e)))?;
1196
1197        if version_exists.is_none() {
1198            return Err(StorageError::Resource(ResourceError::VersionNotFound {
1199                resource_type: resource_type.to_string(),
1200                id: id.to_string(),
1201                version_id: version_id.to_string(),
1202            }));
1203        }
1204
1205        // Delete the specific version
1206        client
1207            .execute(
1208                "DELETE FROM resource_history
1209                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1210                &[&tenant_id, &resource_type, &id, &version_id],
1211            )
1212            .await
1213            .map_err(|e| internal_error(format!("Failed to delete version: {}", e)))?;
1214
1215        Ok(())
1216    }
1217}
1218
1219// ============================================================================
1220// TypeHistoryProvider Implementation
1221// ============================================================================
1222
1223#[async_trait]
1224impl TypeHistoryProvider for PostgresBackend {
1225    async fn history_type(
1226        &self,
1227        tenant: &TenantContext,
1228        resource_type: &str,
1229        params: &HistoryParams,
1230    ) -> StorageResult<HistoryPage> {
1231        let client = self.get_client().await?;
1232        let tenant_id = tenant.tenant_id().as_str();
1233
1234        // Build the query with filters
1235        let mut sql = String::from(
1236            "SELECT id, version_id, data, last_updated, is_deleted, fhir_version
1237             FROM resource_history
1238             WHERE tenant_id = $1 AND resource_type = $2",
1239        );
1240        let mut param_index: usize = 3;
1241        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1242            Box::new(tenant_id.to_string()),
1243            Box::new(resource_type.to_string()),
1244        ];
1245
1246        // Apply deleted filter
1247        if !params.include_deleted {
1248            sql.push_str(" AND is_deleted = FALSE");
1249        }
1250
1251        // Apply since filter
1252        if let Some(since) = &params.since {
1253            sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1254            query_params.push(Box::new(*since));
1255            param_index += 1;
1256        }
1257
1258        // Apply before filter
1259        if let Some(before) = &params.before {
1260            sql.push_str(&format!(" AND last_updated < ${}", param_index));
1261            query_params.push(Box::new(*before));
1262            param_index += 1;
1263        }
1264
1265        // Apply cursor filter if present
1266        if let Some(cursor) = params.pagination.cursor_value() {
1267            let sort_values = cursor.sort_values();
1268            if sort_values.len() >= 2 {
1269                if let (
1270                    Some(CursorValue::String(timestamp)),
1271                    Some(CursorValue::String(resource_id)),
1272                ) = (sort_values.first(), sort_values.get(1))
1273                {
1274                    sql.push_str(&format!(
1275                        " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND id < ${}))",
1276                        param_index, param_index, param_index + 1
1277                    ));
1278                    query_params.push(Box::new(timestamp.clone()));
1279                    query_params.push(Box::new(resource_id.clone()));
1280                    param_index += 2;
1281                }
1282            }
1283        }
1284
1285        // Order by last_updated descending (newest first), then by id for consistency
1286        let limit = params.pagination.count as i64 + 1;
1287        sql.push_str(&format!(
1288            " ORDER BY last_updated DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1289            param_index
1290        ));
1291        query_params.push(Box::new(limit));
1292
1293        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1294            .iter()
1295            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1296            .collect();
1297
1298        let rows = client
1299            .query(&sql, &param_refs)
1300            .await
1301            .map_err(|e| internal_error(format!("Failed to query type history: {}", e)))?;
1302
1303        let mut entries = Vec::new();
1304        let mut last_entry: Option<(String, String)> = None; // (last_updated, id)
1305
1306        for row in &rows {
1307            if entries.len() >= params.pagination.count as usize {
1308                break;
1309            }
1310
1311            let row_id: String = row.get(0);
1312            let version_id: String = row.get(1);
1313            let data: Value = row.get(2);
1314            let last_updated: DateTime<Utc> = row.get(3);
1315            let is_deleted: bool = row.get(4);
1316            let fhir_version_str: String = row.get(5);
1317
1318            let deleted_at = if is_deleted { Some(last_updated) } else { None };
1319
1320            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1321
1322            let resource = StoredResource::from_storage(
1323                resource_type,
1324                &row_id,
1325                &version_id,
1326                tenant.tenant_id().clone(),
1327                data,
1328                last_updated,
1329                last_updated,
1330                deleted_at,
1331                fhir_version,
1332            );
1333
1334            let method = if is_deleted {
1335                HistoryMethod::Delete
1336            } else if version_id == "1" {
1337                HistoryMethod::Post
1338            } else {
1339                HistoryMethod::Put
1340            };
1341
1342            last_entry = Some((last_updated.to_rfc3339(), row_id));
1343
1344            entries.push(HistoryEntry {
1345                resource,
1346                method,
1347                timestamp: last_updated,
1348            });
1349        }
1350
1351        // Determine if there are more results
1352        let has_more = rows.len() > params.pagination.count as usize;
1353
1354        // Build page info
1355        let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1356            let cursor = PageCursor::new(
1357                vec![CursorValue::String(timestamp), CursorValue::String(id)],
1358                resource_type.to_string(),
1359            );
1360            PageInfo::with_next(cursor)
1361        } else {
1362            PageInfo::end()
1363        };
1364
1365        Ok(Page::new(entries, page_info))
1366    }
1367
1368    async fn history_type_count(
1369        &self,
1370        tenant: &TenantContext,
1371        resource_type: &str,
1372    ) -> StorageResult<u64> {
1373        let client = self.get_client().await?;
1374        let tenant_id = tenant.tenant_id().as_str();
1375
1376        let row = client
1377            .query_one(
1378                "SELECT COUNT(*) FROM resource_history
1379                 WHERE tenant_id = $1 AND resource_type = $2",
1380                &[&tenant_id, &resource_type],
1381            )
1382            .await
1383            .map_err(|e| internal_error(format!("Failed to count type history: {}", e)))?;
1384
1385        let count: i64 = row.get(0);
1386        Ok(count as u64)
1387    }
1388}
1389
1390// ============================================================================
1391// SystemHistoryProvider Implementation
1392// ============================================================================
1393
1394#[async_trait]
1395impl SystemHistoryProvider for PostgresBackend {
1396    async fn history_system(
1397        &self,
1398        tenant: &TenantContext,
1399        params: &HistoryParams,
1400    ) -> StorageResult<HistoryPage> {
1401        let client = self.get_client().await?;
1402        let tenant_id = tenant.tenant_id().as_str();
1403
1404        // Build the query with filters
1405        let mut sql = String::from(
1406            "SELECT resource_type, id, version_id, data, last_updated, is_deleted, fhir_version
1407             FROM resource_history
1408             WHERE tenant_id = $1",
1409        );
1410        let mut param_index: usize = 2;
1411        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1412            vec![Box::new(tenant_id.to_string())];
1413
1414        // Apply deleted filter
1415        if !params.include_deleted {
1416            sql.push_str(" AND is_deleted = FALSE");
1417        }
1418
1419        // Apply since filter
1420        if let Some(since) = &params.since {
1421            sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1422            query_params.push(Box::new(*since));
1423            param_index += 1;
1424        }
1425
1426        // Apply before filter
1427        if let Some(before) = &params.before {
1428            sql.push_str(&format!(" AND last_updated < ${}", param_index));
1429            query_params.push(Box::new(*before));
1430            param_index += 1;
1431        }
1432
1433        // Apply cursor filter if present
1434        if let Some(cursor) = params.pagination.cursor_value() {
1435            let sort_values = cursor.sort_values();
1436            if sort_values.len() >= 3 {
1437                if let (
1438                    Some(CursorValue::String(timestamp)),
1439                    Some(CursorValue::String(res_type)),
1440                    Some(CursorValue::String(res_id)),
1441                ) = (sort_values.first(), sort_values.get(1), sort_values.get(2))
1442                {
1443                    sql.push_str(&format!(
1444                        " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND (resource_type < ${} OR (resource_type = ${} AND id < ${}))))",
1445                        param_index, param_index, param_index + 1, param_index + 1, param_index + 2
1446                    ));
1447                    query_params.push(Box::new(timestamp.clone()));
1448                    query_params.push(Box::new(res_type.clone()));
1449                    query_params.push(Box::new(res_id.clone()));
1450                    param_index += 3;
1451                }
1452            }
1453        }
1454
1455        // Order by last_updated descending (newest first)
1456        let limit = params.pagination.count as i64 + 1;
1457        sql.push_str(&format!(
1458            " ORDER BY last_updated DESC, resource_type DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1459            param_index
1460        ));
1461        query_params.push(Box::new(limit));
1462
1463        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1464            .iter()
1465            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1466            .collect();
1467
1468        let rows = client
1469            .query(&sql, &param_refs)
1470            .await
1471            .map_err(|e| internal_error(format!("Failed to query system history: {}", e)))?;
1472
1473        let mut entries = Vec::new();
1474        let mut last_entry: Option<(String, String, String)> = None;
1475
1476        for row in &rows {
1477            if entries.len() >= params.pagination.count as usize {
1478                break;
1479            }
1480
1481            let row_resource_type: String = row.get(0);
1482            let row_id: String = row.get(1);
1483            let version_id: String = row.get(2);
1484            let data: Value = row.get(3);
1485            let last_updated: DateTime<Utc> = row.get(4);
1486            let is_deleted: bool = row.get(5);
1487            let fhir_version_str: String = row.get(6);
1488
1489            let deleted_at = if is_deleted { Some(last_updated) } else { None };
1490
1491            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1492
1493            let resource = StoredResource::from_storage(
1494                &row_resource_type,
1495                &row_id,
1496                &version_id,
1497                tenant.tenant_id().clone(),
1498                data,
1499                last_updated,
1500                last_updated,
1501                deleted_at,
1502                fhir_version,
1503            );
1504
1505            let method = if is_deleted {
1506                HistoryMethod::Delete
1507            } else if version_id == "1" {
1508                HistoryMethod::Post
1509            } else {
1510                HistoryMethod::Put
1511            };
1512
1513            last_entry = Some((last_updated.to_rfc3339(), row_resource_type, row_id));
1514
1515            entries.push(HistoryEntry {
1516                resource,
1517                method,
1518                timestamp: last_updated,
1519            });
1520        }
1521
1522        let has_more = rows.len() > params.pagination.count as usize;
1523
1524        let page_info = if let (true, Some((timestamp, resource_type, id))) = (has_more, last_entry)
1525        {
1526            let cursor = PageCursor::new(
1527                vec![
1528                    CursorValue::String(timestamp),
1529                    CursorValue::String(resource_type),
1530                    CursorValue::String(id),
1531                ],
1532                "system".to_string(),
1533            );
1534            PageInfo::with_next(cursor)
1535        } else {
1536            PageInfo::end()
1537        };
1538
1539        Ok(Page::new(entries, page_info))
1540    }
1541
1542    async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1543        let client = self.get_client().await?;
1544        let tenant_id = tenant.tenant_id().as_str();
1545
1546        let row = client
1547            .query_one(
1548                "SELECT COUNT(*) FROM resource_history WHERE tenant_id = $1",
1549                &[&tenant_id],
1550            )
1551            .await
1552            .map_err(|e| internal_error(format!("Failed to count system history: {}", e)))?;
1553
1554        let count: i64 = row.get(0);
1555        Ok(count as u64)
1556    }
1557}
1558
1559// ============================================================================
1560// DifferentialHistoryProvider Implementation
1561// ============================================================================
1562
1563#[async_trait]
1564impl DifferentialHistoryProvider for PostgresBackend {
1565    async fn modified_since(
1566        &self,
1567        tenant: &TenantContext,
1568        resource_type: Option<&str>,
1569        since: DateTime<Utc>,
1570        pagination: &Pagination,
1571    ) -> StorageResult<Page<StoredResource>> {
1572        let client = self.get_client().await?;
1573        let tenant_id = tenant.tenant_id().as_str();
1574
1575        // Build query for current versions of resources modified since timestamp
1576        let mut sql = String::from(
1577            "SELECT resource_type, id, version_id, data, last_updated, fhir_version
1578             FROM resources
1579             WHERE tenant_id = $1 AND last_updated > $2 AND is_deleted = FALSE",
1580        );
1581        let mut param_index: usize = 3;
1582        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1583            vec![Box::new(tenant_id.to_string()), Box::new(since)];
1584
1585        // Filter by resource type if specified
1586        if let Some(rt) = resource_type {
1587            sql.push_str(&format!(" AND resource_type = ${}", param_index));
1588            query_params.push(Box::new(rt.to_string()));
1589            param_index += 1;
1590        }
1591
1592        // Apply cursor filter if present
1593        if let Some(cursor) = pagination.cursor_value() {
1594            let sort_values = cursor.sort_values();
1595            if sort_values.len() >= 2 {
1596                if let (Some(CursorValue::String(timestamp)), Some(CursorValue::String(res_id))) =
1597                    (sort_values.first(), sort_values.get(1))
1598                {
1599                    sql.push_str(&format!(
1600                        " AND (last_updated > ${}::timestamptz OR (last_updated = ${}::timestamptz AND id > ${}))",
1601                        param_index, param_index, param_index + 1
1602                    ));
1603                    query_params.push(Box::new(timestamp.clone()));
1604                    query_params.push(Box::new(res_id.clone()));
1605                    param_index += 2;
1606                }
1607            }
1608        }
1609
1610        // Order by last_updated ascending (oldest first for sync)
1611        let limit = pagination.count as i64 + 1;
1612        sql.push_str(&format!(
1613            " ORDER BY last_updated ASC, id ASC LIMIT ${}",
1614            param_index
1615        ));
1616        query_params.push(Box::new(limit));
1617
1618        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1619            .iter()
1620            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1621            .collect();
1622
1623        let rows = client
1624            .query(&sql, &param_refs)
1625            .await
1626            .map_err(|e| internal_error(format!("Failed to query modified resources: {}", e)))?;
1627
1628        let mut resources = Vec::new();
1629        let mut last_entry: Option<(String, String)> = None;
1630
1631        for row in &rows {
1632            if resources.len() >= pagination.count as usize {
1633                break;
1634            }
1635
1636            let row_resource_type: String = row.get(0);
1637            let row_id: String = row.get(1);
1638            let version_id: String = row.get(2);
1639            let data: Value = row.get(3);
1640            let last_updated: DateTime<Utc> = row.get(4);
1641            let fhir_version_str: String = row.get(5);
1642
1643            let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
1644
1645            let resource = StoredResource::from_storage(
1646                &row_resource_type,
1647                &row_id,
1648                &version_id,
1649                tenant.tenant_id().clone(),
1650                data,
1651                last_updated,
1652                last_updated,
1653                None,
1654                fhir_version,
1655            );
1656
1657            last_entry = Some((last_updated.to_rfc3339(), row_id));
1658            resources.push(resource);
1659        }
1660
1661        let has_more = rows.len() > pagination.count as usize;
1662
1663        let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1664            let cursor = PageCursor::new(
1665                vec![CursorValue::String(timestamp), CursorValue::String(id)],
1666                "modified_since".to_string(),
1667            );
1668            PageInfo::with_next(cursor)
1669        } else {
1670            PageInfo::end()
1671        };
1672
1673        Ok(Page::new(resources, page_info))
1674    }
1675}
1676
1677// ============================================================================
1678// PurgableStorage Implementation
1679// ============================================================================
1680
1681#[async_trait]
1682impl PurgableStorage for PostgresBackend {
1683    async fn purge(
1684        &self,
1685        tenant: &TenantContext,
1686        resource_type: &str,
1687        id: &str,
1688    ) -> StorageResult<()> {
1689        let client = self.get_client().await?;
1690        let tenant_id = tenant.tenant_id().as_str();
1691
1692        // Check if resource exists (in any state)
1693        let exists = client
1694            .query_opt(
1695                "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1696                &[&tenant_id, &resource_type, &id],
1697            )
1698            .await
1699            .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
1700
1701        if exists.is_none() {
1702            // Also check history in case it was already purged from main table
1703            let history_exists = client
1704                .query_opt(
1705                    "SELECT 1 FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1706                    &[&tenant_id, &resource_type, &id],
1707                )
1708                .await
1709                .map_err(|e| internal_error(format!("Failed to check history: {}", e)))?;
1710
1711            if history_exists.is_none() {
1712                return Err(StorageError::Resource(ResourceError::NotFound {
1713                    resource_type: resource_type.to_string(),
1714                    id: id.to_string(),
1715                }));
1716            }
1717        }
1718
1719        // Delete from search index first (due to FK constraint)
1720        client
1721            .execute(
1722                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1723                &[&tenant_id, &resource_type, &id],
1724            )
1725            .await
1726            .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1727
1728        // Delete from FTS table
1729        let _ = client
1730            .execute(
1731                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1732                &[&tenant_id, &resource_type, &id],
1733            )
1734            .await;
1735
1736        // Delete from history table (before resources due to FK)
1737        client
1738            .execute(
1739                "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1740                &[&tenant_id, &resource_type, &id],
1741            )
1742            .await
1743            .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1744
1745        // Delete from resources table
1746        client
1747            .execute(
1748                "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1749                &[&tenant_id, &resource_type, &id],
1750            )
1751            .await
1752            .map_err(|e| internal_error(format!("Failed to purge resource: {}", e)))?;
1753
1754        Ok(())
1755    }
1756
1757    async fn purge_all(&self, tenant: &TenantContext, resource_type: &str) -> StorageResult<u64> {
1758        let client = self.get_client().await?;
1759        let tenant_id = tenant.tenant_id().as_str();
1760
1761        // Count how many we're about to delete
1762        let row = client
1763            .query_one(
1764                "SELECT COUNT(DISTINCT id) FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1765                &[&tenant_id, &resource_type],
1766            )
1767            .await
1768            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1769        let count: i64 = row.get(0);
1770
1771        // Delete from search index first (due to FK constraint)
1772        client
1773            .execute(
1774                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2",
1775                &[&tenant_id, &resource_type],
1776            )
1777            .await
1778            .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1779
1780        // Delete from FTS table
1781        let _ = client
1782            .execute(
1783                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2",
1784                &[&tenant_id, &resource_type],
1785            )
1786            .await;
1787
1788        // Delete from history table
1789        client
1790            .execute(
1791                "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2",
1792                &[&tenant_id, &resource_type],
1793            )
1794            .await
1795            .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1796
1797        // Delete from resources table
1798        client
1799            .execute(
1800                "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1801                &[&tenant_id, &resource_type],
1802            )
1803            .await
1804            .map_err(|e| internal_error(format!("Failed to purge resources: {}", e)))?;
1805
1806        Ok(count as u64)
1807    }
1808}
1809
1810// ============================================================================
1811// ConditionalStorage Implementation
1812// ============================================================================
1813
1814// Helper function to parse simple search parameters
1815// Supports basic formats like: identifier=X, _id=Y, name=Z
1816fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
1817    params
1818        .split('&')
1819        .filter_map(|pair| {
1820            let parts: Vec<&str> = pair.splitn(2, '=').collect();
1821            if parts.len() == 2 {
1822                Some((parts[0].to_string(), parts[1].to_string()))
1823            } else {
1824                None
1825            }
1826        })
1827        .collect()
1828}
1829
1830#[async_trait]
1831impl ConditionalStorage for PostgresBackend {
1832    async fn conditional_create(
1833        &self,
1834        tenant: &TenantContext,
1835        resource_type: &str,
1836        resource: Value,
1837        search_params: &str,
1838        fhir_version: FhirVersion,
1839    ) -> StorageResult<ConditionalCreateResult> {
1840        // Find matching resources based on search parameters
1841        let matches = self
1842            .find_matching_resources(tenant, resource_type, search_params)
1843            .await?;
1844
1845        match matches.len() {
1846            0 => {
1847                // No match - create the resource
1848                let created = self
1849                    .create(tenant, resource_type, resource, fhir_version)
1850                    .await?;
1851                Ok(ConditionalCreateResult::Created(created))
1852            }
1853            1 => {
1854                // Exactly one match - return the existing resource
1855                Ok(ConditionalCreateResult::Exists(
1856                    matches.into_iter().next().unwrap(),
1857                ))
1858            }
1859            n => {
1860                // Multiple matches - error condition
1861                Ok(ConditionalCreateResult::MultipleMatches(n))
1862            }
1863        }
1864    }
1865
1866    async fn conditional_update(
1867        &self,
1868        tenant: &TenantContext,
1869        resource_type: &str,
1870        resource: Value,
1871        search_params: &str,
1872        upsert: bool,
1873        fhir_version: FhirVersion,
1874    ) -> StorageResult<ConditionalUpdateResult> {
1875        // Find matching resources based on search parameters
1876        let matches = self
1877            .find_matching_resources(tenant, resource_type, search_params)
1878            .await?;
1879
1880        match matches.len() {
1881            0 => {
1882                if upsert {
1883                    // No match, but upsert is true - create new resource
1884                    let created = self
1885                        .create(tenant, resource_type, resource, fhir_version)
1886                        .await?;
1887                    Ok(ConditionalUpdateResult::Created(created))
1888                } else {
1889                    // No match and no upsert
1890                    Ok(ConditionalUpdateResult::NoMatch)
1891                }
1892            }
1893            1 => {
1894                // Exactly one match - update it (preserves existing FHIR version)
1895                let existing = matches.into_iter().next().unwrap();
1896                let updated = self.update(tenant, &existing, resource).await?;
1897                Ok(ConditionalUpdateResult::Updated(updated))
1898            }
1899            n => {
1900                // Multiple matches - error condition
1901                Ok(ConditionalUpdateResult::MultipleMatches(n))
1902            }
1903        }
1904    }
1905
1906    async fn conditional_delete(
1907        &self,
1908        tenant: &TenantContext,
1909        resource_type: &str,
1910        search_params: &str,
1911    ) -> StorageResult<ConditionalDeleteResult> {
1912        // Find matching resources based on search parameters
1913        let matches = self
1914            .find_matching_resources(tenant, resource_type, search_params)
1915            .await?;
1916
1917        match matches.len() {
1918            0 => {
1919                // No match
1920                Ok(ConditionalDeleteResult::NoMatch)
1921            }
1922            1 => {
1923                // Exactly one match - delete it
1924                let existing = matches.into_iter().next().unwrap();
1925                self.delete(tenant, resource_type, existing.id()).await?;
1926                Ok(ConditionalDeleteResult::Deleted)
1927            }
1928            n => {
1929                // Multiple matches - error condition
1930                Ok(ConditionalDeleteResult::MultipleMatches(n))
1931            }
1932        }
1933    }
1934
1935    async fn conditional_patch(
1936        &self,
1937        tenant: &TenantContext,
1938        resource_type: &str,
1939        search_params: &str,
1940        patch: &crate::core::PatchFormat,
1941    ) -> StorageResult<crate::core::ConditionalPatchResult> {
1942        use crate::core::{ConditionalPatchResult, PatchFormat};
1943
1944        // Find matching resources based on search parameters
1945        let matches = self
1946            .find_matching_resources(tenant, resource_type, search_params)
1947            .await?;
1948
1949        match matches.len() {
1950            0 => Ok(ConditionalPatchResult::NoMatch),
1951            1 => {
1952                // Exactly one match - apply the patch
1953                let existing = matches.into_iter().next().unwrap();
1954                let current_content = existing.content().clone();
1955
1956                // Apply the patch based on format
1957                let patched_content = match patch {
1958                    PatchFormat::JsonPatch(patch_doc) => {
1959                        self.apply_json_patch(&current_content, patch_doc)?
1960                    }
1961                    PatchFormat::FhirPathPatch(patch_params) => {
1962                        self.apply_fhirpath_patch(&current_content, patch_params)?
1963                    }
1964                    PatchFormat::MergePatch(merge_doc) => {
1965                        self.apply_merge_patch(&current_content, merge_doc)
1966                    }
1967                };
1968
1969                // Update the resource with the patched content
1970                let updated = self.update(tenant, &existing, patched_content).await?;
1971                Ok(ConditionalPatchResult::Patched(updated))
1972            }
1973            n => Ok(ConditionalPatchResult::MultipleMatches(n)),
1974        }
1975    }
1976}
1977
1978impl PostgresBackend {
1979    /// Find resources matching the given search parameters.
1980    ///
1981    /// Uses the SearchProvider implementation to leverage the pre-computed search index.
1982    async fn find_matching_resources(
1983        &self,
1984        tenant: &TenantContext,
1985        resource_type: &str,
1986        search_params_str: &str,
1987    ) -> StorageResult<Vec<StoredResource>> {
1988        // Parse search parameters into (name, value) pairs
1989        let parsed_params = parse_simple_search_params(search_params_str);
1990
1991        if parsed_params.is_empty() {
1992            // No search params means match all - but for conditional ops this is unusual
1993            return Ok(Vec::new());
1994        }
1995
1996        // Build SearchParameter objects by looking up types from the registry
1997        let search_params = self.build_search_parameters(resource_type, &parsed_params)?;
1998
1999        // Build a SearchQuery
2000        let query = SearchQuery {
2001            resource_type: resource_type.to_string(),
2002            parameters: search_params,
2003            count: Some(1000),
2004            ..Default::default()
2005        };
2006
2007        // Use the SearchProvider implementation
2008        let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
2009
2010        Ok(result.resources.items)
2011    }
2012
2013    /// Builds SearchParameter objects from parsed (name, value) pairs.
2014    fn build_search_parameters(
2015        &self,
2016        resource_type: &str,
2017        params: &[(String, String)],
2018    ) -> StorageResult<Vec<SearchParameter>> {
2019        let registry = self.search_registry().read();
2020        let mut search_params = Vec::with_capacity(params.len());
2021
2022        for (name, value) in params {
2023            let param_type = self
2024                .lookup_param_type(&registry, resource_type, name)
2025                .unwrap_or({
2026                    match name.as_str() {
2027                        "_id" => SearchParamType::Token,
2028                        "_lastUpdated" => SearchParamType::Date,
2029                        "_tag" | "_profile" | "_security" => SearchParamType::Token,
2030                        "identifier" => SearchParamType::Token,
2031                        "patient" | "subject" | "encounter" | "performer" | "author"
2032                        | "requester" | "recorder" | "asserter" | "practitioner"
2033                        | "organization" | "location" | "device" => SearchParamType::Reference,
2034                        _ => SearchParamType::String,
2035                    }
2036                });
2037
2038            search_params.push(SearchParameter {
2039                name: name.clone(),
2040                param_type,
2041                modifier: None,
2042                values: vec![SearchValue::parse(value)],
2043                chain: vec![],
2044                components: vec![],
2045            });
2046        }
2047
2048        Ok(search_params)
2049    }
2050
2051    /// Looks up a search parameter type from the registry.
2052    fn lookup_param_type(
2053        &self,
2054        registry: &crate::search::SearchParameterRegistry,
2055        resource_type: &str,
2056        param_name: &str,
2057    ) -> Option<SearchParamType> {
2058        if let Some(def) = registry.get_param(resource_type, param_name) {
2059            return Some(def.param_type);
2060        }
2061        if let Some(def) = registry.get_param("Resource", param_name) {
2062            return Some(def.param_type);
2063        }
2064        None
2065    }
2066
2067    // ========================================================================
2068    // Patch Helper Methods
2069    // ========================================================================
2070
2071    /// Applies a JSON Patch (RFC 6902) to a resource.
2072    fn apply_json_patch(&self, resource: &Value, patch_doc: &Value) -> StorageResult<Value> {
2073        use crate::error::ValidationError;
2074
2075        let patch: json_patch::Patch = serde_json::from_value(patch_doc.clone()).map_err(|e| {
2076            StorageError::Validation(ValidationError::InvalidResource {
2077                message: format!("Invalid JSON Patch document: {}", e),
2078                details: vec![],
2079            })
2080        })?;
2081
2082        let mut patched = resource.clone();
2083        json_patch::patch(&mut patched, &patch).map_err(|e| {
2084            StorageError::Validation(ValidationError::InvalidResource {
2085                message: format!("Failed to apply JSON Patch: {}", e),
2086                details: vec![],
2087            })
2088        })?;
2089
2090        Ok(patched)
2091    }
2092
2093    /// Applies a FHIRPath Patch to a resource.
2094    fn apply_fhirpath_patch(&self, resource: &Value, patch_params: &Value) -> StorageResult<Value> {
2095        use crate::error::ValidationError;
2096
2097        let parameter = patch_params.get("parameter").and_then(|p| p.as_array());
2098        if parameter.is_none() {
2099            return Err(StorageError::Validation(ValidationError::InvalidResource {
2100                message: "FHIRPath Patch must have a 'parameter' array".to_string(),
2101                details: vec![],
2102            }));
2103        }
2104
2105        let mut patched = resource.clone();
2106
2107        for operation in parameter.unwrap() {
2108            let parts = operation.get("part").and_then(|p| p.as_array());
2109            if parts.is_none() {
2110                continue;
2111            }
2112
2113            let mut op_type = None;
2114            let mut op_path = None;
2115            let mut op_name = None;
2116            let mut op_value = None;
2117
2118            for part in parts.unwrap() {
2119                match part.get("name").and_then(|n| n.as_str()) {
2120                    Some("type") => {
2121                        op_type = part
2122                            .get("valueCode")
2123                            .and_then(|v| v.as_str())
2124                            .map(|s| s.to_string());
2125                    }
2126                    Some("path") => {
2127                        op_path = part
2128                            .get("valueString")
2129                            .and_then(|v| v.as_str())
2130                            .map(|s| s.to_string());
2131                    }
2132                    Some("name") => {
2133                        op_name = part
2134                            .get("valueString")
2135                            .and_then(|v| v.as_str())
2136                            .map(|s| s.to_string());
2137                    }
2138                    Some("value") => {
2139                        op_value = part
2140                            .get("valueString")
2141                            .or_else(|| part.get("valueBoolean"))
2142                            .or_else(|| part.get("valueInteger"))
2143                            .or_else(|| part.get("valueDecimal"))
2144                            .or_else(|| part.get("valueCode"))
2145                            .cloned();
2146                    }
2147                    _ => {}
2148                }
2149            }
2150
2151            match op_type.as_deref() {
2152                Some("replace") => {
2153                    if let (Some(path), Some(value)) = (&op_path, &op_value) {
2154                        self.fhirpath_replace(&mut patched, path, value)?;
2155                    }
2156                }
2157                Some("add") => {
2158                    if let (Some(path), Some(name), Some(value)) = (&op_path, &op_name, &op_value) {
2159                        self.fhirpath_add(&mut patched, path, name, value)?;
2160                    }
2161                }
2162                Some("delete") => {
2163                    if let Some(path) = &op_path {
2164                        self.fhirpath_delete(&mut patched, path)?;
2165                    }
2166                }
2167                _ => {
2168                    // Unsupported operation type - skip
2169                }
2170            }
2171        }
2172
2173        Ok(patched)
2174    }
2175
2176    /// Helper for FHIRPath replace operation.
2177    fn fhirpath_replace(
2178        &self,
2179        resource: &mut Value,
2180        path: &str,
2181        value: &Value,
2182    ) -> StorageResult<()> {
2183        let parts: Vec<&str> = path.split('.').collect();
2184        if parts.len() == 2 {
2185            if let Some(obj) = resource.as_object_mut() {
2186                obj.insert(parts[1].to_string(), value.clone());
2187            }
2188        }
2189        Ok(())
2190    }
2191
2192    /// Helper for FHIRPath add operation.
2193    fn fhirpath_add(
2194        &self,
2195        resource: &mut Value,
2196        path: &str,
2197        name: &str,
2198        value: &Value,
2199    ) -> StorageResult<()> {
2200        let parts: Vec<&str> = path.split('.').collect();
2201        if parts.len() == 1
2202            && parts[0]
2203                == resource
2204                    .get("resourceType")
2205                    .and_then(|r| r.as_str())
2206                    .unwrap_or("")
2207        {
2208            if let Some(obj) = resource.as_object_mut() {
2209                obj.insert(name.to_string(), value.clone());
2210            }
2211        }
2212        Ok(())
2213    }
2214
2215    /// Helper for FHIRPath delete operation.
2216    fn fhirpath_delete(&self, resource: &mut Value, path: &str) -> StorageResult<()> {
2217        let parts: Vec<&str> = path.split('.').collect();
2218        if parts.len() == 2 {
2219            if let Some(obj) = resource.as_object_mut() {
2220                obj.remove(parts[1]);
2221            }
2222        }
2223        Ok(())
2224    }
2225
2226    /// Applies a JSON Merge Patch (RFC 7386) to a resource.
2227    fn apply_merge_patch(&self, resource: &Value, merge_doc: &Value) -> Value {
2228        let mut patched = resource.clone();
2229        json_patch::merge(&mut patched, merge_doc);
2230        patched
2231    }
2232}
2233
2234// ============================================================================
2235// BundleProvider Implementation
2236// ============================================================================
2237
2238#[async_trait]
2239impl BundleProvider for PostgresBackend {
2240    async fn process_transaction(
2241        &self,
2242        tenant: &TenantContext,
2243        entries: Vec<BundleEntry>,
2244    ) -> Result<BundleResult, TransactionError> {
2245        use crate::core::transaction::{Transaction, TransactionOptions, TransactionProvider};
2246        use std::collections::HashMap;
2247
2248        // Start a transaction
2249        let mut tx = self
2250            .begin_transaction(tenant, TransactionOptions::new())
2251            .await
2252            .map_err(|e| TransactionError::RolledBack {
2253                reason: format!("Failed to begin transaction: {}", e),
2254            })?;
2255
2256        let mut results = Vec::with_capacity(entries.len());
2257        let mut error_info: Option<(usize, String)> = None;
2258
2259        // Build a map of fullUrl -> assigned reference for reference resolution
2260        let mut reference_map: HashMap<String, String> = HashMap::new();
2261
2262        // Make entries mutable for reference resolution
2263        let mut entries = entries;
2264
2265        // Process each entry within the transaction
2266        for (idx, entry) in entries.iter_mut().enumerate() {
2267            // Resolve references in this entry's resource before processing
2268            if let Some(ref mut resource) = entry.resource {
2269                resolve_bundle_references(resource, &reference_map);
2270            }
2271
2272            let result = self.process_bundle_entry_tx(&mut tx, entry).await;
2273
2274            match result {
2275                Ok(entry_result) => {
2276                    if entry_result.status >= 400 {
2277                        error_info = Some((
2278                            idx,
2279                            format!("Entry failed with status {}", entry_result.status),
2280                        ));
2281                        break;
2282                    }
2283
2284                    // If this was a create (POST) and we have a fullUrl, record the mapping
2285                    if entry.method == BundleMethod::Post {
2286                        if let Some(ref full_url) = entry.full_url {
2287                            if let Some(ref location) = entry_result.location {
2288                                let reference = location
2289                                    .split("/_history")
2290                                    .next()
2291                                    .unwrap_or(location)
2292                                    .to_string();
2293                                reference_map.insert(full_url.clone(), reference);
2294                            }
2295                        }
2296                    }
2297
2298                    results.push(entry_result);
2299                }
2300                Err(e) => {
2301                    error_info = Some((idx, format!("Entry processing failed: {}", e)));
2302                    break;
2303                }
2304            }
2305        }
2306
2307        // Handle error or commit
2308        if let Some((index, message)) = error_info {
2309            let _ = Box::new(tx).rollback().await;
2310            return Err(TransactionError::BundleError { index, message });
2311        }
2312
2313        // Commit the transaction
2314        Box::new(tx)
2315            .commit()
2316            .await
2317            .map_err(|e| TransactionError::RolledBack {
2318                reason: format!("Commit failed: {}", e),
2319            })?;
2320
2321        Ok(BundleResult {
2322            bundle_type: BundleType::Transaction,
2323            entries: results,
2324        })
2325    }
2326
2327    async fn process_batch(
2328        &self,
2329        tenant: &TenantContext,
2330        entries: Vec<BundleEntry>,
2331    ) -> StorageResult<BundleResult> {
2332        let mut results = Vec::with_capacity(entries.len());
2333
2334        // Process each entry independently
2335        for entry in &entries {
2336            let result = self.process_batch_entry(tenant, entry).await;
2337            results.push(result);
2338        }
2339
2340        Ok(BundleResult {
2341            bundle_type: BundleType::Batch,
2342            entries: results,
2343        })
2344    }
2345}
2346
2347impl PostgresBackend {
2348    /// Process a single bundle entry within a transaction.
2349    async fn process_bundle_entry_tx(
2350        &self,
2351        tx: &mut super::transaction::PostgresTransaction,
2352        entry: &BundleEntry,
2353    ) -> StorageResult<BundleEntryResult> {
2354        use crate::core::transaction::Transaction;
2355
2356        match entry.method {
2357            BundleMethod::Get => {
2358                let (resource_type, id) = self.parse_url(&entry.url)?;
2359                match tx.read(&resource_type, &id).await? {
2360                    Some(resource) => Ok(BundleEntryResult::ok(resource)),
2361                    None => Ok(BundleEntryResult::error(
2362                        404,
2363                        serde_json::json!({
2364                            "resourceType": "OperationOutcome",
2365                            "issue": [{"severity": "error", "code": "not-found"}]
2366                        }),
2367                    )),
2368                }
2369            }
2370            BundleMethod::Post => {
2371                let resource = entry.resource.clone().ok_or_else(|| {
2372                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2373                        field: "resource".to_string(),
2374                    })
2375                })?;
2376
2377                let resource_type = resource
2378                    .get("resourceType")
2379                    .and_then(|v| v.as_str())
2380                    .map(|s| s.to_string())
2381                    .ok_or_else(|| {
2382                        StorageError::Validation(
2383                            crate::error::ValidationError::MissingRequiredField {
2384                                field: "resourceType".to_string(),
2385                            },
2386                        )
2387                    })?;
2388
2389                let created = tx.create(&resource_type, resource).await?;
2390                Ok(BundleEntryResult::created(created))
2391            }
2392            BundleMethod::Put => {
2393                let resource = entry.resource.clone().ok_or_else(|| {
2394                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2395                        field: "resource".to_string(),
2396                    })
2397                })?;
2398
2399                let (resource_type, id) = self.parse_url(&entry.url)?;
2400
2401                match tx.read(&resource_type, &id).await? {
2402                    Some(existing) => {
2403                        // Check If-Match if provided
2404                        if let Some(ref if_match) = entry.if_match {
2405                            let current_etag = existing.etag();
2406                            if current_etag != if_match.as_str() {
2407                                return Ok(BundleEntryResult::error(
2408                                    412,
2409                                    serde_json::json!({
2410                                        "resourceType": "OperationOutcome",
2411                                        "issue": [{"severity": "error", "code": "conflict", "diagnostics": "ETag mismatch"}]
2412                                    }),
2413                                ));
2414                            }
2415                        }
2416                        let updated = tx.update(&existing, resource).await?;
2417                        Ok(BundleEntryResult::ok(updated))
2418                    }
2419                    None => {
2420                        // Create new resource with specified ID
2421                        let mut resource_with_id = resource;
2422                        resource_with_id["id"] = serde_json::json!(id);
2423                        let created = tx.create(&resource_type, resource_with_id).await?;
2424                        Ok(BundleEntryResult::created(created))
2425                    }
2426                }
2427            }
2428            BundleMethod::Delete => {
2429                let (resource_type, id) = self.parse_url(&entry.url)?;
2430                tx.delete(&resource_type, &id).await?;
2431                Ok(BundleEntryResult::deleted())
2432            }
2433            BundleMethod::Patch => {
2434                // PATCH is not fully implemented yet
2435                Ok(BundleEntryResult::error(
2436                    501,
2437                    serde_json::json!({
2438                        "resourceType": "OperationOutcome",
2439                        "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented in transaction bundles"}]
2440                    }),
2441                ))
2442            }
2443        }
2444    }
2445
2446    /// Process a single batch entry (independent, no transaction).
2447    async fn process_batch_entry(
2448        &self,
2449        tenant: &TenantContext,
2450        entry: &BundleEntry,
2451    ) -> BundleEntryResult {
2452        match self.process_batch_entry_inner(tenant, entry).await {
2453            Ok(result) => result,
2454            Err(e) => BundleEntryResult::error(
2455                500,
2456                serde_json::json!({
2457                    "resourceType": "OperationOutcome",
2458                    "issue": [{"severity": "error", "code": "exception", "diagnostics": e.to_string()}]
2459                }),
2460            ),
2461        }
2462    }
2463
2464    async fn process_batch_entry_inner(
2465        &self,
2466        tenant: &TenantContext,
2467        entry: &BundleEntry,
2468    ) -> StorageResult<BundleEntryResult> {
2469        match entry.method {
2470            BundleMethod::Get => {
2471                let (resource_type, id) = self.parse_url(&entry.url)?;
2472                match self.read(tenant, &resource_type, &id).await? {
2473                    Some(resource) => Ok(BundleEntryResult::ok(resource)),
2474                    None => Ok(BundleEntryResult::error(
2475                        404,
2476                        serde_json::json!({
2477                            "resourceType": "OperationOutcome",
2478                            "issue": [{"severity": "error", "code": "not-found"}]
2479                        }),
2480                    )),
2481                }
2482            }
2483            BundleMethod::Post => {
2484                let resource = entry.resource.clone().ok_or_else(|| {
2485                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2486                        field: "resource".to_string(),
2487                    })
2488                })?;
2489
2490                let resource_type = resource
2491                    .get("resourceType")
2492                    .and_then(|v| v.as_str())
2493                    .map(|s| s.to_string())
2494                    .ok_or_else(|| {
2495                        StorageError::Validation(
2496                            crate::error::ValidationError::MissingRequiredField {
2497                                field: "resourceType".to_string(),
2498                            },
2499                        )
2500                    })?;
2501
2502                let created = self
2503                    .create(tenant, &resource_type, resource, FhirVersion::default())
2504                    .await?;
2505                Ok(BundleEntryResult::created(created))
2506            }
2507            BundleMethod::Put => {
2508                let resource = entry.resource.clone().ok_or_else(|| {
2509                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2510                        field: "resource".to_string(),
2511                    })
2512                })?;
2513
2514                let (resource_type, id) = self.parse_url(&entry.url)?;
2515                let (stored, _created) = self
2516                    .create_or_update(
2517                        tenant,
2518                        &resource_type,
2519                        &id,
2520                        resource,
2521                        FhirVersion::default(),
2522                    )
2523                    .await?;
2524                Ok(BundleEntryResult::ok(stored))
2525            }
2526            BundleMethod::Delete => {
2527                let (resource_type, id) = self.parse_url(&entry.url)?;
2528                match self.delete(tenant, &resource_type, &id).await {
2529                    Ok(()) => Ok(BundleEntryResult::deleted()),
2530                    Err(StorageError::Resource(ResourceError::NotFound { .. })) => {
2531                        Ok(BundleEntryResult::deleted()) // Idempotent delete
2532                    }
2533                    Err(e) => Err(e),
2534                }
2535            }
2536            BundleMethod::Patch => Ok(BundleEntryResult::error(
2537                501,
2538                serde_json::json!({
2539                    "resourceType": "OperationOutcome",
2540                    "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2541                }),
2542            )),
2543        }
2544    }
2545
2546    /// Parse a FHIR URL into resource type and ID.
2547    fn parse_url(&self, url: &str) -> StorageResult<(String, String)> {
2548        let path = url
2549            .strip_prefix("http://")
2550            .or_else(|| url.strip_prefix("https://"))
2551            .map(|s| s.find('/').map(|i| &s[i..]).unwrap_or(s))
2552            .unwrap_or(url);
2553
2554        let path = path.trim_start_matches('/');
2555        let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
2556
2557        if parts.len() >= 2 {
2558            let len = parts.len();
2559            Ok((parts[len - 2].to_string(), parts[len - 1].to_string()))
2560        } else {
2561            Err(StorageError::Validation(
2562                crate::error::ValidationError::InvalidReference {
2563                    reference: url.to_string(),
2564                    message: "URL must be in format ResourceType/id".to_string(),
2565                },
2566            ))
2567        }
2568    }
2569}
2570
2571/// Recursively resolves urn:uuid references in a JSON value using the reference map.
2572fn resolve_bundle_references(
2573    value: &mut serde_json::Value,
2574    reference_map: &std::collections::HashMap<String, String>,
2575) {
2576    use serde_json::Value;
2577    match value {
2578        Value::Object(map) => {
2579            if let Some(Value::String(ref_str)) = map.get("reference") {
2580                if ref_str.starts_with("urn:uuid:") {
2581                    if let Some(resolved) = reference_map.get(ref_str) {
2582                        map.insert("reference".to_string(), Value::String(resolved.clone()));
2583                    }
2584                }
2585            }
2586            for v in map.values_mut() {
2587                resolve_bundle_references(v, reference_map);
2588            }
2589        }
2590        Value::Array(arr) => {
2591            for item in arr {
2592                resolve_bundle_references(item, reference_map);
2593            }
2594        }
2595        _ => {}
2596    }
2597}
2598
2599// ============================================================================
2600// ReindexableStorage Implementation
2601// ============================================================================
2602
2603#[async_trait]
2604impl ReindexableStorage for PostgresBackend {
2605    async fn list_resource_types(&self, tenant: &TenantContext) -> StorageResult<Vec<String>> {
2606        let client = self.get_client().await?;
2607        let tenant_id = tenant.tenant_id().as_str();
2608
2609        let rows = client
2610            .query(
2611                "SELECT DISTINCT resource_type FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
2612                &[&tenant_id],
2613            )
2614            .await
2615            .map_err(|e| internal_error(format!("Failed to query resource types: {}", e)))?;
2616
2617        let types: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
2618        Ok(types)
2619    }
2620
2621    async fn count_resources(
2622        &self,
2623        tenant: &TenantContext,
2624        resource_type: &str,
2625    ) -> StorageResult<u64> {
2626        self.count(tenant, Some(resource_type)).await
2627    }
2628
2629    async fn fetch_resources_page(
2630        &self,
2631        tenant: &TenantContext,
2632        resource_type: &str,
2633        cursor: Option<&str>,
2634        limit: u32,
2635    ) -> StorageResult<ResourcePage> {
2636        let client = self.get_client().await?;
2637        let tenant_id = tenant.tenant_id().as_str();
2638
2639        // Parse cursor if provided (format: "last_updated|id")
2640        let (cursor_ts, cursor_id) = if let Some(c) = cursor {
2641            let parts: Vec<&str> = c.split('|').collect();
2642            if parts.len() == 2 {
2643                let ts = DateTime::parse_from_rfc3339(parts[0])
2644                    .map(|dt| dt.with_timezone(&Utc))
2645                    .map_err(|e| internal_error(format!("Invalid cursor timestamp: {}", e)))?;
2646                (Some(ts), Some(parts[1].to_string()))
2647            } else {
2648                (None, None)
2649            }
2650        } else {
2651            (None, None)
2652        };
2653
2654        let rows = if let (Some(ts), Some(id)) = (&cursor_ts, &cursor_id) {
2655            client
2656                .query(
2657                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2658                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2659                     AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
2660                     ORDER BY last_updated ASC, id ASC LIMIT $5",
2661                    &[
2662                        &tenant_id,
2663                        &resource_type,
2664                        ts,
2665                        &id.as_str(),
2666                        &(limit as i64),
2667                    ],
2668                )
2669                .await
2670                .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2671        } else {
2672            client
2673                .query(
2674                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2675                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2676                     ORDER BY last_updated ASC, id ASC LIMIT $3",
2677                    &[&tenant_id, &resource_type, &(limit as i64)],
2678                )
2679                .await
2680                .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2681        };
2682
2683        let resources: Vec<StoredResource> = rows
2684            .iter()
2685            .map(|row| {
2686                let id: String = row.get(0);
2687                let version_id: String = row.get(1);
2688                let data: Value = row.get(2);
2689                let last_updated: DateTime<Utc> = row.get(3);
2690                let fhir_version_str: String = row.get(4);
2691                let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
2692
2693                StoredResource::from_storage(
2694                    resource_type,
2695                    id,
2696                    version_id,
2697                    tenant.tenant_id().clone(),
2698                    data,
2699                    last_updated,
2700                    last_updated,
2701                    None,
2702                    fhir_version,
2703                )
2704            })
2705            .collect();
2706
2707        // Determine next cursor
2708        let next_cursor = if resources.len() == limit as usize {
2709            resources
2710                .last()
2711                .map(|r| format!("{}|{}", r.last_modified().to_rfc3339(), r.id()))
2712        } else {
2713            None
2714        };
2715
2716        Ok(ResourcePage {
2717            resources,
2718            next_cursor,
2719        })
2720    }
2721
2722    async fn delete_search_entries(
2723        &self,
2724        tenant: &TenantContext,
2725        resource_type: &str,
2726        resource_id: &str,
2727    ) -> StorageResult<()> {
2728        let client = self.get_client().await?;
2729        self.delete_search_index(
2730            &client,
2731            tenant.tenant_id().as_str(),
2732            resource_type,
2733            resource_id,
2734        )
2735        .await
2736    }
2737
2738    async fn write_search_entries(
2739        &self,
2740        tenant: &TenantContext,
2741        resource_type: &str,
2742        resource_id: &str,
2743        resource: &Value,
2744    ) -> StorageResult<usize> {
2745        let client = self.get_client().await?;
2746        let tenant_id = tenant.tenant_id().as_str();
2747
2748        // Use the dynamic extraction
2749        let values = self
2750            .search_extractor()
2751            .extract(resource, resource_type)
2752            .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
2753
2754        let mut count = 0;
2755        for value in values {
2756            PostgresSearchIndexWriter::write_entry(
2757                &client,
2758                tenant_id,
2759                resource_type,
2760                resource_id,
2761                &value,
2762            )
2763            .await?;
2764            count += 1;
2765        }
2766
2767        Ok(count)
2768    }
2769
2770    async fn clear_search_index(&self, tenant: &TenantContext) -> StorageResult<u64> {
2771        let client = self.get_client().await?;
2772        let tenant_id = tenant.tenant_id().as_str();
2773
2774        let deleted = client
2775            .execute(
2776                "DELETE FROM search_index WHERE tenant_id = $1",
2777                &[&tenant_id],
2778            )
2779            .await
2780            .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
2781
2782        // Also clear FTS entries
2783        let _ = client
2784            .execute(
2785                "DELETE FROM resource_fts WHERE tenant_id = $1",
2786                &[&tenant_id],
2787            )
2788            .await;
2789
2790        Ok(deleted)
2791    }
2792}
2793
2794// ============================================================================
2795// Helper Functions
2796// ============================================================================
2797
2798/// Normalize a date string for PostgreSQL TIMESTAMPTZ.
2799fn normalize_date_for_pg(value: &str) -> String {
2800    if value.contains('T') {
2801        if value.contains('+') || value.contains('Z') || value.ends_with("-00:00") {
2802            value.to_string()
2803        } else {
2804            format!("{}+00:00", value)
2805        }
2806    } else if value.len() == 10 {
2807        format!("{}T00:00:00+00:00", value)
2808    } else if value.len() == 7 {
2809        format!("{}-01T00:00:00+00:00", value)
2810    } else if value.len() == 4 {
2811        format!("{}-01-01T00:00:00+00:00", value)
2812    } else {
2813        value.to_string()
2814    }
2815}
2816
2817// ============================================================================
2818// FTS Content Extraction (local copy to avoid cross-feature dependency on sqlite)
2819// ============================================================================
2820
2821/// Content extracted from a resource for full-text search.
2822struct SearchableContent {
2823    narrative: String,
2824    full_content: String,
2825}
2826
2827impl SearchableContent {
2828    fn is_empty(&self) -> bool {
2829        self.narrative.is_empty() && self.full_content.is_empty()
2830    }
2831}
2832
2833/// Extracts searchable text content from a FHIR resource.
2834fn extract_searchable_content(resource: &Value) -> SearchableContent {
2835    SearchableContent {
2836        narrative: extract_narrative(resource),
2837        full_content: extract_all_strings(resource),
2838    }
2839}
2840
2841/// Extracts narrative text from resource.text.div, stripping HTML tags.
2842fn extract_narrative(resource: &Value) -> String {
2843    resource
2844        .get("text")
2845        .and_then(|t| t.get("div"))
2846        .and_then(|d| d.as_str())
2847        .map(strip_html_tags)
2848        .unwrap_or_default()
2849}
2850
2851/// Strips HTML tags from a string, returning plain text.
2852fn strip_html_tags(html: &str) -> String {
2853    let mut result = String::with_capacity(html.len());
2854    let mut in_tag = false;
2855
2856    for c in html.chars() {
2857        match c {
2858            '<' => in_tag = true,
2859            '>' if in_tag => {
2860                in_tag = false;
2861                result.push(' ');
2862            }
2863            _ if !in_tag => result.push(c),
2864            _ => {}
2865        }
2866    }
2867
2868    result.split_whitespace().collect::<Vec<_>>().join(" ")
2869}
2870
2871/// Extracts all string values from a JSON value recursively.
2872fn extract_all_strings(value: &Value) -> String {
2873    let mut parts = Vec::new();
2874    collect_strings(value, &mut parts);
2875    parts.join(" ")
2876}
2877
2878fn collect_strings(value: &Value, parts: &mut Vec<String>) {
2879    match value {
2880        Value::String(s) => {
2881            if !s.is_empty() {
2882                parts.push(s.clone());
2883            }
2884        }
2885        Value::Object(map) => {
2886            for (key, val) in map {
2887                if key == "div" || key == "data" {
2888                    continue;
2889                }
2890                collect_strings(val, parts);
2891            }
2892        }
2893        Value::Array(arr) => {
2894            for val in arr {
2895                collect_strings(val, parts);
2896            }
2897        }
2898        _ => {}
2899    }
2900}