Skip to main content

helios_persistence/backends/postgres/
storage.rs

1//! ResourceStorage and VersionedStorage implementations for PostgreSQL.
2
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use helios_fhir::FhirVersion;
6use serde_json::Value;
7
8use crate::core::history::{
9    DifferentialHistoryProvider, HistoryEntry, HistoryMethod, HistoryPage, HistoryParams,
10    InstanceHistoryProvider, SystemHistoryProvider, TypeHistoryProvider,
11};
12use crate::core::transaction::{
13    BundleEntry, BundleEntryResult, BundleMethod, BundleProvider, BundleResult, BundleType,
14};
15use crate::core::{
16    ConditionalCreateResult, ConditionalDeleteResult, ConditionalStorage, ConditionalUpdateResult,
17    PurgableStorage, ResourceStorage, SearchProvider, VersionedStorage,
18};
19use crate::error::TransactionError;
20use crate::error::{BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult};
21use crate::search::loader::SearchParameterLoader;
22use crate::search::registry::SearchParameterStatus;
23use crate::search::reindex::{ReindexableStorage, ResourcePage};
24use crate::tenant::TenantContext;
25use crate::types::Pagination;
26use crate::types::{CursorValue, Page, PageCursor, PageInfo, StoredResource};
27use crate::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue};
28
29use super::PostgresBackend;
30use super::search::writer::PostgresSearchIndexWriter;
31
32fn internal_error(message: String) -> StorageError {
33    StorageError::Backend(BackendError::Internal {
34        backend_name: "postgres".to_string(),
35        message,
36        source: None,
37    })
38}
39
40#[allow(dead_code)]
41fn serialization_error(message: String) -> StorageError {
42    StorageError::Backend(BackendError::SerializationError { message })
43}
44
45/// Extracts the `value[x]` payload from a FHIRPath Patch `Parameters.part`
46/// entry whose `name` is `"value"`. Returns the value of the first key
47/// matching `value[A-Z]…` (e.g. `valueString`, `valueQuantity`,
48/// `valueReference`), so every FHIR polymorphic variant is accepted rather
49/// than only the handful the patch handler used to special-case.
50fn extract_part_value(part: &Value) -> Option<Value> {
51    part.as_object()?.iter().find_map(|(k, v)| {
52        let suffix = k.strip_prefix("value")?;
53        suffix
54            .chars()
55            .next()?
56            .is_ascii_uppercase()
57            .then(|| v.clone())
58    })
59}
60
61#[async_trait]
62impl ResourceStorage for PostgresBackend {
63    fn backend_name(&self) -> &'static str {
64        "postgres"
65    }
66
67    fn sof_runner(&self) -> Option<std::sync::Arc<dyn crate::core::sof_runner::SofRunner>> {
68        use crate::sof::postgres::PgInDbRunner;
69        Some(std::sync::Arc::new(PgInDbRunner::new(self.pool())))
70    }
71
72    async fn create(
73        &self,
74        tenant: &TenantContext,
75        resource_type: &str,
76        resource: Value,
77        fhir_version: FhirVersion,
78    ) -> StorageResult<StoredResource> {
79        let client = self.get_client().await?;
80        let tenant_id = tenant.tenant_id().as_str();
81
82        // Extract or generate ID
83        let id = resource
84            .get("id")
85            .and_then(|v| v.as_str())
86            .map(String::from)
87            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
88
89        // Check if resource already exists
90        let exists = client
91            .query_opt(
92                "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
93                &[&tenant_id, &resource_type, &id],
94            )
95            .await
96            .map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
97
98        if exists.is_some() {
99            return Err(StorageError::Resource(ResourceError::AlreadyExists {
100                resource_type: resource_type.to_string(),
101                id: id.clone(),
102            }));
103        }
104
105        // Ensure the resource has correct type and id
106        let mut resource = resource;
107        if let Some(obj) = resource.as_object_mut() {
108            obj.insert(
109                "resourceType".to_string(),
110                Value::String(resource_type.to_string()),
111            );
112            obj.insert("id".to_string(), Value::String(id.clone()));
113        }
114
115        let now = Utc::now();
116        let version_id = "1";
117        let fhir_version_str = fhir_version.as_mime_param();
118        let is_deleted = false;
119
120        // Insert the resource
121        client
122            .execute(
123                "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
124                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
125                &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
126            )
127            .await
128            .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
129
130        // Insert into history
131        client
132            .execute(
133                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
134                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
135                &[&tenant_id, &resource_type, &id, &version_id, &resource, &now, &is_deleted, &fhir_version_str],
136            )
137            .await
138            .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
139
140        // Index the resource for search
141        self.index_resource(&client, tenant_id, resource_type, &id, &resource)
142            .await?;
143
144        // Handle SearchParameter resources specially - update registry
145        if resource_type == "SearchParameter" {
146            self.handle_search_parameter_create(&resource)?;
147        }
148
149        // Return the stored resource with updated metadata
150        Ok(StoredResource::from_storage(
151            resource_type,
152            &id,
153            version_id,
154            tenant.tenant_id().clone(),
155            resource,
156            now,
157            now,
158            None,
159            fhir_version,
160        ))
161    }
162
163    async fn create_or_update(
164        &self,
165        tenant: &TenantContext,
166        resource_type: &str,
167        id: &str,
168        resource: Value,
169        fhir_version: FhirVersion,
170    ) -> StorageResult<(StoredResource, bool)> {
171        // Check if exists
172        let existing = self.read(tenant, resource_type, id).await?;
173
174        if let Some(current) = existing {
175            // Update existing (preserves original FHIR version)
176            let updated = self.update(tenant, &current, resource).await?;
177            Ok((updated, false))
178        } else {
179            // Create new with specific ID
180            let mut resource = resource;
181            if let Some(obj) = resource.as_object_mut() {
182                obj.insert("id".to_string(), Value::String(id.to_string()));
183            }
184            let created = self
185                .create(tenant, resource_type, resource, fhir_version)
186                .await?;
187            Ok((created, true))
188        }
189    }
190
191    async fn read(
192        &self,
193        tenant: &TenantContext,
194        resource_type: &str,
195        id: &str,
196    ) -> StorageResult<Option<StoredResource>> {
197        let client = self.get_client().await?;
198        let tenant_id = tenant.tenant_id().as_str();
199
200        let row = client
201            .query_opt(
202                "SELECT version_id, data, last_updated, is_deleted, deleted_at, fhir_version
203                 FROM resources
204                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
205                &[&tenant_id, &resource_type, &id],
206            )
207            .await
208            .map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
209
210        match row {
211            Some(row) => {
212                let version_id: String = row.get(0);
213                let data: Value = row.get(1);
214                let last_updated: DateTime<Utc> = row.get(2);
215                let is_deleted: bool = row.get(3);
216                let deleted_at: Option<DateTime<Utc>> = row.get(4);
217                let fhir_version_str: String = row.get(5);
218
219                // If deleted, return Gone error
220                if is_deleted {
221                    return Err(StorageError::Resource(ResourceError::Gone {
222                        resource_type: resource_type.to_string(),
223                        id: id.to_string(),
224                        deleted_at,
225                    }));
226                }
227
228                let fhir_version = FhirVersion::from_storage(&fhir_version_str)
229                    .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
230
231                Ok(Some(StoredResource::from_storage(
232                    resource_type,
233                    id,
234                    version_id,
235                    tenant.tenant_id().clone(),
236                    data,
237                    last_updated,
238                    last_updated,
239                    None,
240                    fhir_version,
241                )))
242            }
243            None => Ok(None),
244        }
245    }
246
247    async fn update(
248        &self,
249        tenant: &TenantContext,
250        current: &StoredResource,
251        resource: Value,
252    ) -> StorageResult<StoredResource> {
253        let client = self.get_client().await?;
254        let tenant_id = tenant.tenant_id().as_str();
255        let resource_type = current.resource_type();
256        let id = current.id();
257
258        // Check that the resource still exists with the expected version
259        let row = client
260            .query_opt(
261                "SELECT version_id FROM resources
262                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
263                &[&tenant_id, &resource_type, &id],
264            )
265            .await
266            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
267
268        let actual_version = match row {
269            Some(row) => row.get::<_, String>(0),
270            None => {
271                return Err(StorageError::Resource(ResourceError::NotFound {
272                    resource_type: resource_type.to_string(),
273                    id: id.to_string(),
274                }));
275            }
276        };
277
278        // Check version match
279        if actual_version != current.version_id() {
280            return Err(StorageError::Concurrency(
281                ConcurrencyError::VersionConflict {
282                    resource_type: resource_type.to_string(),
283                    id: id.to_string(),
284                    expected_version: current.version_id().to_string(),
285                    actual_version,
286                },
287            ));
288        }
289
290        // Calculate new version
291        let new_version: u64 = actual_version.parse().unwrap_or(0) + 1;
292        let new_version_str = new_version.to_string();
293
294        // Ensure the resource has correct type and id
295        let mut resource = resource;
296        if let Some(obj) = resource.as_object_mut() {
297            obj.insert(
298                "resourceType".to_string(),
299                Value::String(resource_type.to_string()),
300            );
301            obj.insert("id".to_string(), Value::String(id.to_string()));
302        }
303
304        let now = Utc::now();
305        let fhir_version_str = current.fhir_version().as_mime_param();
306        let is_deleted = false;
307
308        // Update the resource
309        client
310            .execute(
311                "UPDATE resources SET version_id = $1, data = $2, last_updated = $3
312                 WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
313                &[
314                    &new_version_str,
315                    &resource,
316                    &now,
317                    &tenant_id,
318                    &resource_type,
319                    &id,
320                ],
321            )
322            .await
323            .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
324
325        // Insert into history (preserve the original FHIR version)
326        client
327            .execute(
328                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
329                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
330                &[&tenant_id, &resource_type, &id, &new_version_str, &resource, &now, &is_deleted, &fhir_version_str],
331            )
332            .await
333            .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
334
335        // Re-index the resource (delete old entries, add new)
336        self.delete_search_index(&client, tenant_id, resource_type, id)
337            .await?;
338        self.index_resource(&client, tenant_id, resource_type, id, &resource)
339            .await?;
340
341        // Handle SearchParameter resources specially - update registry
342        if resource_type == "SearchParameter" {
343            self.handle_search_parameter_update(current.content(), &resource)?;
344        }
345
346        Ok(StoredResource::from_storage(
347            resource_type,
348            id,
349            new_version_str,
350            tenant.tenant_id().clone(),
351            resource,
352            now,
353            now,
354            None,
355            current.fhir_version(),
356        ))
357    }
358
359    async fn delete(
360        &self,
361        tenant: &TenantContext,
362        resource_type: &str,
363        id: &str,
364    ) -> StorageResult<()> {
365        let client = self.get_client().await?;
366        let tenant_id = tenant.tenant_id().as_str();
367
368        // Check if resource exists and get its fhir_version
369        let row = client
370            .query_opt(
371                "SELECT version_id, data, fhir_version FROM resources
372                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
373                &[&tenant_id, &resource_type, &id],
374            )
375            .await
376            .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
377
378        let (current_version, data, fhir_version_str) = match row {
379            Some(row) => {
380                let v: String = row.get(0);
381                let d: Value = row.get(1);
382                let f: String = row.get(2);
383                (v, d, f)
384            }
385            None => {
386                return Err(StorageError::Resource(ResourceError::NotFound {
387                    resource_type: resource_type.to_string(),
388                    id: id.to_string(),
389                }));
390            }
391        };
392
393        let now = Utc::now();
394
395        // Calculate new version for the deletion record
396        let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
397        let new_version_str = new_version.to_string();
398        let is_deleted = true;
399
400        // Soft delete the resource
401        client
402            .execute(
403                "UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
404                 WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
405                &[&now, &new_version_str, &tenant_id, &resource_type, &id],
406            )
407            .await
408            .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
409
410        // Insert deletion record into history (preserve fhir_version)
411        client
412            .execute(
413                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
414                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
415                &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
416            )
417            .await
418            .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
419
420        // Delete search index entries (skip when search is offloaded)
421        if !self.is_search_offloaded() {
422            client
423                .execute(
424                    "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
425                    &[&tenant_id, &resource_type, &id],
426                )
427                .await
428                .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
429        }
430
431        // Handle SearchParameter resources specially - update registry
432        if resource_type == "SearchParameter" {
433            self.handle_search_parameter_delete(&data)?;
434        }
435
436        Ok(())
437    }
438
439    async fn count(
440        &self,
441        tenant: &TenantContext,
442        resource_type: Option<&str>,
443    ) -> StorageResult<u64> {
444        let client = self.get_client().await?;
445        let tenant_id = tenant.tenant_id().as_str();
446
447        let count: i64 = if let Some(rt) = resource_type {
448            let row = client
449                .query_one(
450                    "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE",
451                    &[&tenant_id, &rt],
452                )
453                .await
454                .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
455            row.get(0)
456        } else {
457            let row = client
458                .query_one(
459                    "SELECT COUNT(*) FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
460                    &[&tenant_id],
461                )
462                .await
463                .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
464            row.get(0)
465        };
466
467        Ok(count as u64)
468    }
469}
470
471// ============================================================================
472// Search Index Helpers
473// ============================================================================
474
475impl PostgresBackend {
476    /// Index a resource for search.
477    ///
478    /// This method uses the SearchParameterExtractor to dynamically extract
479    /// searchable values based on the configured SearchParameterRegistry.
480    pub(crate) async fn index_resource(
481        &self,
482        client: &deadpool_postgres::Client,
483        tenant_id: &str,
484        resource_type: &str,
485        resource_id: &str,
486        resource: &Value,
487    ) -> StorageResult<()> {
488        // When search is offloaded to a secondary backend, skip local indexing
489        if self.is_search_offloaded() {
490            return Ok(());
491        }
492
493        // Delete existing index entries
494        client
495            .execute(
496                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
497                &[&tenant_id, &resource_type, &resource_id],
498            )
499            .await
500            .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
501
502        // Extract values using the registry-driven extractor
503        match self.search_extractor().extract(resource, resource_type) {
504            Ok(values) => {
505                let mut count = 0;
506                for value in values {
507                    PostgresSearchIndexWriter::write_entry(
508                        client,
509                        tenant_id,
510                        resource_type,
511                        resource_id,
512                        &value,
513                    )
514                    .await?;
515                    count += 1;
516                }
517                tracing::debug!(
518                    "Dynamically indexed {} values for {}/{}",
519                    count,
520                    resource_type,
521                    resource_id
522                );
523            }
524            Err(e) => {
525                tracing::warn!(
526                    "Dynamic extraction failed for {}/{}: {}. Using minimal fallback (_id, _lastUpdated only).",
527                    resource_type,
528                    resource_id,
529                    e
530                );
531                // Fall back to minimal extraction (just _id and _lastUpdated)
532                self.index_minimal_fallback(
533                    client,
534                    tenant_id,
535                    resource_type,
536                    resource_id,
537                    resource,
538                )
539                .await?;
540            }
541        }
542
543        // Index any contained resources for `_contained` search.
544        self.index_contained_resources(client, tenant_id, resource_type, resource_id, resource)
545            .await?;
546
547        // Index FTS content for _text and _content searches
548        self.index_fts_content(client, tenant_id, resource_type, resource_id, resource)
549            .await?;
550
551        Ok(())
552    }
553
554    /// Extracts and indexes a container's `contained[]` resources for
555    /// `_contained` search. Each contained resource's search values are written
556    /// as `is_contained = TRUE` rows whose `resource_type` / `resource_id`
557    /// identify the container. Returns the number of entries written.
558    async fn index_contained_resources(
559        &self,
560        client: &deadpool_postgres::Client,
561        tenant_id: &str,
562        container_type: &str,
563        container_id: &str,
564        resource: &Value,
565    ) -> StorageResult<usize> {
566        let mut count = 0;
567        let container = (container_type, container_id);
568        for contained in self.search_extractor().extract_contained(resource) {
569            for value in &contained.values {
570                PostgresSearchIndexWriter::write_contained_entry(
571                    client,
572                    tenant_id,
573                    container,
574                    (&contained.contained_type, &contained.local_id),
575                    value,
576                )
577                .await?;
578                count += 1;
579            }
580        }
581        Ok(count)
582    }
583
584    /// Index full-text search content for _text and _content searches.
585    ///
586    /// Populates the resource_fts table using PostgreSQL tsvector/tsquery.
587    async fn index_fts_content(
588        &self,
589        client: &deadpool_postgres::Client,
590        tenant_id: &str,
591        resource_type: &str,
592        resource_id: &str,
593        resource: &Value,
594    ) -> StorageResult<()> {
595        // Check if FTS table exists
596        let fts_exists = client
597            .query_opt(
598                "SELECT 1 FROM information_schema.tables WHERE table_name = 'resource_fts'",
599                &[],
600            )
601            .await
602            .map_err(|e| internal_error(format!("Failed to check FTS table: {}", e)))?;
603
604        if fts_exists.is_none() {
605            return Ok(());
606        }
607
608        // Extract searchable content
609        let content = extract_searchable_content(resource);
610
611        if content.is_empty() {
612            return Ok(());
613        }
614
615        // Delete existing FTS entry first
616        let _ = client
617            .execute(
618                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
619                &[&tenant_id, &resource_type, &resource_id],
620            )
621            .await;
622
623        // Insert into FTS table (the trigger will populate tsvector columns)
624        client
625            .execute(
626                "INSERT INTO resource_fts (resource_id, resource_type, tenant_id, narrative_text, full_content)
627                 VALUES ($1, $2, $3, $4, $5)",
628                &[
629                    &resource_id,
630                    &resource_type,
631                    &tenant_id,
632                    &content.narrative,
633                    &content.full_content,
634                ],
635            )
636            .await
637            .map_err(|e| internal_error(format!("Failed to insert FTS content: {}", e)))?;
638
639        Ok(())
640    }
641
642    /// Index minimal fallback search parameters.
643    ///
644    /// Only indexes `_id` and `_lastUpdated` when dynamic extraction fails.
645    async fn index_minimal_fallback(
646        &self,
647        client: &deadpool_postgres::Client,
648        tenant_id: &str,
649        resource_type: &str,
650        resource_id: &str,
651        resource: &Value,
652    ) -> StorageResult<()> {
653        // _id - always available from resource.id
654        if let Some(id) = resource.get("id").and_then(|v| v.as_str()) {
655            client
656                .execute(
657                    "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_token_code)
658                     VALUES ($1, $2, $3, '_id', $4)",
659                    &[&tenant_id, &resource_type, &resource_id, &id],
660                )
661                .await
662                .map_err(|e| internal_error(format!("Failed to insert _id index: {}", e)))?;
663        }
664
665        // _lastUpdated - from resource.meta.lastUpdated
666        if let Some(last_updated) = resource
667            .get("meta")
668            .and_then(|m| m.get("lastUpdated"))
669            .and_then(|v| v.as_str())
670        {
671            let normalized = normalize_date_for_pg(last_updated);
672            client
673                .execute(
674                    "INSERT INTO search_index (tenant_id, resource_type, resource_id, param_name, value_date)
675                     VALUES ($1, $2, $3, '_lastUpdated', $4::timestamptz)",
676                    &[&tenant_id, &resource_type, &resource_id, &normalized],
677                )
678                .await
679                .map_err(|e| {
680                    internal_error(format!("Failed to insert _lastUpdated index: {}", e))
681                })?;
682        }
683
684        Ok(())
685    }
686
687    /// Delete search index entries for a resource.
688    pub(crate) async fn delete_search_index(
689        &self,
690        client: &deadpool_postgres::Client,
691        tenant_id: &str,
692        resource_type: &str,
693        resource_id: &str,
694    ) -> StorageResult<()> {
695        // When search is offloaded to a secondary backend, skip local index cleanup
696        if self.is_search_offloaded() {
697            return Ok(());
698        }
699
700        // Delete from main search index
701        client
702            .execute(
703                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
704                &[&tenant_id, &resource_type, &resource_id],
705            )
706            .await
707            .map_err(|e| internal_error(format!("Failed to delete search index: {}", e)))?;
708
709        // Delete from FTS table if it exists
710        let _ = client
711            .execute(
712                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
713                &[&tenant_id, &resource_type, &resource_id],
714            )
715            .await;
716
717        Ok(())
718    }
719}
720
721// ============================================================================
722// SearchParameter Resource Handling
723// ============================================================================
724
725impl PostgresBackend {
726    /// Handle creation of a SearchParameter resource.
727    ///
728    /// If the SearchParameter has status=active, it will be registered in the
729    /// search parameter registry, making it available for searches on new resources.
730    /// Existing resources will NOT be indexed for this parameter until $reindex is run.
731    fn handle_search_parameter_create(&self, resource: &Value) -> StorageResult<()> {
732        let loader = SearchParameterLoader::new(self.config().fhir_version);
733
734        match loader.parse_resource(resource) {
735            Ok(def) => {
736                // Only register if status is active
737                if def.status == SearchParameterStatus::Active {
738                    let mut registry = self.search_registry().write();
739                    // Ignore duplicate URL errors - the param may already be embedded
740                    if let Err(e) = registry.register(def) {
741                        tracing::debug!("SearchParameter registration skipped: {}", e);
742                    }
743                }
744            }
745            Err(e) => {
746                // Log but don't fail - the resource is still stored
747                tracing::warn!("Failed to parse SearchParameter for registry: {}", e);
748            }
749        }
750
751        Ok(())
752    }
753
754    /// Handle update of a SearchParameter resource.
755    ///
756    /// Updates the registry based on status changes:
757    /// - active -> retired: Parameter disabled for searches
758    /// - retired -> active: Parameter re-enabled for searches
759    /// - Any other change: Updates the registry entry
760    fn handle_search_parameter_update(
761        &self,
762        old_resource: &Value,
763        new_resource: &Value,
764    ) -> StorageResult<()> {
765        let loader = SearchParameterLoader::new(self.config().fhir_version);
766
767        let old_def = loader.parse_resource(old_resource).ok();
768        let new_def = loader.parse_resource(new_resource).ok();
769
770        match (old_def, new_def) {
771            (Some(old), Some(new)) => {
772                let mut registry = self.search_registry().write();
773
774                // If URL changed, unregister old and register new
775                if old.url != new.url {
776                    let _ = registry.unregister(&old.url);
777                    if new.status == SearchParameterStatus::Active {
778                        let _ = registry.register(new);
779                    }
780                } else if old.status != new.status {
781                    // Status change - update in registry
782                    if let Err(e) = registry.update_status(&new.url, new.status) {
783                        tracing::debug!("SearchParameter status update skipped: {}", e);
784                    }
785                } else {
786                    // Other changes - re-register (unregister then register)
787                    let _ = registry.unregister(&old.url);
788                    if new.status == SearchParameterStatus::Active {
789                        let _ = registry.register(new);
790                    }
791                }
792            }
793            (None, Some(new)) => {
794                // Old wasn't valid, try to register new
795                if new.status == SearchParameterStatus::Active {
796                    let mut registry = self.search_registry().write();
797                    let _ = registry.register(new);
798                }
799            }
800            (Some(old), None) => {
801                // New isn't valid, unregister old
802                let mut registry = self.search_registry().write();
803                let _ = registry.unregister(&old.url);
804            }
805            (None, None) => {
806                // Neither valid - nothing to do
807            }
808        }
809
810        Ok(())
811    }
812
813    /// Handle deletion of a SearchParameter resource.
814    ///
815    /// Removes the parameter from the registry. Search index entries for this
816    /// parameter are NOT automatically cleaned up (use $reindex for that).
817    fn handle_search_parameter_delete(&self, resource: &Value) -> StorageResult<()> {
818        if let Some(url) = resource.get("url").and_then(|v| v.as_str()) {
819            let mut registry = self.search_registry().write();
820            if let Err(e) = registry.unregister(url) {
821                tracing::debug!("SearchParameter unregistration skipped: {}", e);
822            }
823        }
824
825        Ok(())
826    }
827}
828
829// ============================================================================
830// VersionedStorage Implementation
831// ============================================================================
832
833#[async_trait]
834impl VersionedStorage for PostgresBackend {
835    async fn vread(
836        &self,
837        tenant: &TenantContext,
838        resource_type: &str,
839        id: &str,
840        version_id: &str,
841    ) -> StorageResult<Option<StoredResource>> {
842        let client = self.get_client().await?;
843        let tenant_id = tenant.tenant_id().as_str();
844
845        let row = client
846            .query_opt(
847                "SELECT data, last_updated, is_deleted, fhir_version
848                 FROM resource_history
849                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
850                &[&tenant_id, &resource_type, &id, &version_id],
851            )
852            .await
853            .map_err(|e| internal_error(format!("Failed to read version: {}", e)))?;
854
855        match row {
856            Some(row) => {
857                let data: Value = row.get(0);
858                let last_updated: DateTime<Utc> = row.get(1);
859                let is_deleted: bool = row.get(2);
860                let fhir_version_str: String = row.get(3);
861
862                // For deleted versions, use last_updated as deleted_at
863                let deleted_at = if is_deleted { Some(last_updated) } else { None };
864
865                let fhir_version = FhirVersion::from_storage(&fhir_version_str)
866                    .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
867
868                Ok(Some(StoredResource::from_storage(
869                    resource_type,
870                    id,
871                    version_id,
872                    tenant.tenant_id().clone(),
873                    data,
874                    last_updated,
875                    last_updated,
876                    deleted_at,
877                    fhir_version,
878                )))
879            }
880            None => Ok(None),
881        }
882    }
883
884    async fn update_with_match(
885        &self,
886        tenant: &TenantContext,
887        resource_type: &str,
888        id: &str,
889        expected_version: &str,
890        resource: Value,
891    ) -> StorageResult<StoredResource> {
892        // Read current resource
893        let current = self.read(tenant, resource_type, id).await?.ok_or_else(|| {
894            StorageError::Resource(ResourceError::NotFound {
895                resource_type: resource_type.to_string(),
896                id: id.to_string(),
897            })
898        })?;
899
900        // Check version match
901        if current.version_id() != expected_version {
902            return Err(StorageError::Concurrency(
903                ConcurrencyError::VersionConflict {
904                    resource_type: resource_type.to_string(),
905                    id: id.to_string(),
906                    expected_version: expected_version.to_string(),
907                    actual_version: current.version_id().to_string(),
908                },
909            ));
910        }
911
912        // Perform update
913        self.update(tenant, &current, resource).await
914    }
915
916    async fn delete_with_match(
917        &self,
918        tenant: &TenantContext,
919        resource_type: &str,
920        id: &str,
921        expected_version: &str,
922    ) -> StorageResult<()> {
923        let client = self.get_client().await?;
924        let tenant_id = tenant.tenant_id().as_str();
925
926        // Check version match
927        let row = client
928            .query_opt(
929                "SELECT version_id FROM resources
930                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
931                &[&tenant_id, &resource_type, &id],
932            )
933            .await
934            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
935
936        let current_version = match row {
937            Some(row) => row.get::<_, String>(0),
938            None => {
939                return Err(StorageError::Resource(ResourceError::NotFound {
940                    resource_type: resource_type.to_string(),
941                    id: id.to_string(),
942                }));
943            }
944        };
945
946        if current_version != expected_version {
947            return Err(StorageError::Concurrency(
948                ConcurrencyError::VersionConflict {
949                    resource_type: resource_type.to_string(),
950                    id: id.to_string(),
951                    expected_version: expected_version.to_string(),
952                    actual_version: current_version,
953                },
954            ));
955        }
956
957        // Perform delete
958        self.delete(tenant, resource_type, id).await
959    }
960
961    async fn list_versions(
962        &self,
963        tenant: &TenantContext,
964        resource_type: &str,
965        id: &str,
966    ) -> StorageResult<Vec<String>> {
967        let client = self.get_client().await?;
968        let tenant_id = tenant.tenant_id().as_str();
969
970        let rows = client
971            .query(
972                "SELECT version_id FROM resource_history
973                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3
974                 ORDER BY CAST(version_id AS INTEGER) ASC",
975                &[&tenant_id, &resource_type, &id],
976            )
977            .await
978            .map_err(|e| internal_error(format!("Failed to list versions: {}", e)))?;
979
980        let versions: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
981        Ok(versions)
982    }
983}
984
985// ============================================================================
986// InstanceHistoryProvider Implementation
987// ============================================================================
988
989#[async_trait]
990impl InstanceHistoryProvider for PostgresBackend {
991    async fn history_instance(
992        &self,
993        tenant: &TenantContext,
994        resource_type: &str,
995        id: &str,
996        params: &HistoryParams,
997    ) -> StorageResult<HistoryPage> {
998        let client = self.get_client().await?;
999        let tenant_id = tenant.tenant_id().as_str();
1000
1001        // Build the query with filters
1002        let mut sql = String::from(
1003            "SELECT version_id, data, last_updated, is_deleted, fhir_version
1004             FROM resource_history
1005             WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1006        );
1007        let mut param_index: usize = 4;
1008        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1009            Box::new(tenant_id.to_string()),
1010            Box::new(resource_type.to_string()),
1011            Box::new(id.to_string()),
1012        ];
1013
1014        // Apply deleted filter
1015        if !params.include_deleted {
1016            sql.push_str(" AND is_deleted = FALSE");
1017        }
1018
1019        // Apply since filter
1020        if let Some(since) = &params.since {
1021            sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1022            query_params.push(Box::new(*since));
1023            param_index += 1;
1024        }
1025
1026        // Apply before filter
1027        if let Some(before) = &params.before {
1028            sql.push_str(&format!(" AND last_updated < ${}", param_index));
1029            query_params.push(Box::new(*before));
1030            param_index += 1;
1031        }
1032
1033        // Apply cursor filter if present
1034        if let Some(cursor) = params.pagination.cursor_value() {
1035            if let Some(CursorValue::String(version_str)) = cursor.sort_values().first() {
1036                if let Ok(version_int) = version_str.parse::<i64>() {
1037                    sql.push_str(&format!(
1038                        " AND CAST(version_id AS INTEGER) < ${}",
1039                        param_index
1040                    ));
1041                    query_params.push(Box::new(version_int));
1042                    param_index += 1;
1043                }
1044            }
1045        }
1046
1047        // Order by version descending (newest first) and limit
1048        let limit = params.pagination.count as i64 + 1; // +1 to detect if there are more
1049        sql.push_str(&format!(
1050            " ORDER BY CAST(version_id AS INTEGER) DESC LIMIT ${}",
1051            param_index
1052        ));
1053        query_params.push(Box::new(limit));
1054
1055        // Execute the query
1056        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1057            .iter()
1058            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1059            .collect();
1060
1061        let rows = client
1062            .query(&sql, &param_refs)
1063            .await
1064            .map_err(|e| internal_error(format!("Failed to query history: {}", e)))?;
1065
1066        let mut entries = Vec::new();
1067        let mut last_version: Option<String> = None;
1068
1069        for row in &rows {
1070            // Stop if we've collected enough items (we fetched count+1 to detect more)
1071            if entries.len() >= params.pagination.count as usize {
1072                break;
1073            }
1074
1075            let version_id: String = row.get(0);
1076            let data: Value = row.get(1);
1077            let last_updated: DateTime<Utc> = row.get(2);
1078            let is_deleted: bool = row.get(3);
1079            let fhir_version_str: String = row.get(4);
1080
1081            let deleted_at = if is_deleted { Some(last_updated) } else { None };
1082
1083            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1084                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1085
1086            let resource = StoredResource::from_storage(
1087                resource_type,
1088                id,
1089                &version_id,
1090                tenant.tenant_id().clone(),
1091                data,
1092                last_updated,
1093                last_updated,
1094                deleted_at,
1095                fhir_version,
1096            );
1097
1098            // Determine the method based on version and deletion status
1099            let method = if is_deleted {
1100                HistoryMethod::Delete
1101            } else if version_id == "1" {
1102                HistoryMethod::Post
1103            } else {
1104                HistoryMethod::Put
1105            };
1106
1107            last_version = Some(version_id);
1108
1109            entries.push(HistoryEntry {
1110                resource,
1111                method,
1112                timestamp: last_updated,
1113            });
1114        }
1115
1116        // Determine if there are more results
1117        let has_more = rows.len() > params.pagination.count as usize;
1118
1119        // Build page info
1120        let page_info = if let (true, Some(version)) = (has_more, last_version) {
1121            let cursor = PageCursor::new(vec![CursorValue::String(version)], id.to_string());
1122            PageInfo::with_next(cursor)
1123        } else {
1124            PageInfo::end()
1125        };
1126
1127        Ok(Page::new(entries, page_info))
1128    }
1129
1130    async fn history_instance_count(
1131        &self,
1132        tenant: &TenantContext,
1133        resource_type: &str,
1134        id: &str,
1135    ) -> StorageResult<u64> {
1136        let client = self.get_client().await?;
1137        let tenant_id = tenant.tenant_id().as_str();
1138
1139        let row = client
1140            .query_one(
1141                "SELECT COUNT(*) FROM resource_history
1142                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1143                &[&tenant_id, &resource_type, &id],
1144            )
1145            .await
1146            .map_err(|e| internal_error(format!("Failed to count history: {}", e)))?;
1147
1148        let count: i64 = row.get(0);
1149        Ok(count as u64)
1150    }
1151
1152    async fn delete_instance_history(
1153        &self,
1154        tenant: &TenantContext,
1155        resource_type: &str,
1156        id: &str,
1157    ) -> StorageResult<u64> {
1158        let client = self.get_client().await?;
1159        let tenant_id = tenant.tenant_id().as_str();
1160
1161        // First, verify the resource exists
1162        let exists = client
1163            .query_opt(
1164                "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1165                &[&tenant_id, &resource_type, &id],
1166            )
1167            .await
1168            .map_err(|e| internal_error(format!("Failed to check resource existence: {}", e)))?;
1169
1170        if exists.is_none() {
1171            return Err(StorageError::Resource(ResourceError::NotFound {
1172                resource_type: resource_type.to_string(),
1173                id: id.to_string(),
1174            }));
1175        }
1176
1177        // Get the current version from resources table (to preserve it)
1178        let current_row = client
1179            .query_one(
1180                "SELECT version_id FROM resources
1181                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1182                &[&tenant_id, &resource_type, &id],
1183            )
1184            .await
1185            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1186
1187        let current_version: String = current_row.get(0);
1188
1189        // Delete all history entries EXCEPT the current version
1190        let deleted = client
1191            .execute(
1192                "DELETE FROM resource_history
1193                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id != $4",
1194                &[&tenant_id, &resource_type, &id, &current_version],
1195            )
1196            .await
1197            .map_err(|e| internal_error(format!("Failed to delete history: {}", e)))?;
1198
1199        Ok(deleted)
1200    }
1201
1202    async fn delete_version(
1203        &self,
1204        tenant: &TenantContext,
1205        resource_type: &str,
1206        id: &str,
1207        version_id: &str,
1208    ) -> StorageResult<()> {
1209        let client = self.get_client().await?;
1210        let tenant_id = tenant.tenant_id().as_str();
1211
1212        // First, get the current version to ensure we're not deleting it
1213        let current_row = client
1214            .query_opt(
1215                "SELECT version_id FROM resources
1216                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1217                &[&tenant_id, &resource_type, &id],
1218            )
1219            .await
1220            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
1221
1222        let current_version = match current_row {
1223            Some(row) => row.get::<_, String>(0),
1224            None => {
1225                return Err(StorageError::Resource(ResourceError::NotFound {
1226                    resource_type: resource_type.to_string(),
1227                    id: id.to_string(),
1228                }));
1229            }
1230        };
1231
1232        // Prevent deletion of the current version
1233        if version_id == current_version {
1234            return Err(StorageError::Validation(
1235                crate::error::ValidationError::InvalidResource {
1236                    message: format!(
1237                        "Cannot delete current version {} of {}/{}. Use DELETE on the resource instead.",
1238                        version_id, resource_type, id
1239                    ),
1240                    details: vec![],
1241                },
1242            ));
1243        }
1244
1245        // Check if the version exists in history
1246        let version_exists = client
1247            .query_opt(
1248                "SELECT 1 FROM resource_history
1249                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1250                &[&tenant_id, &resource_type, &id, &version_id],
1251            )
1252            .await
1253            .map_err(|e| internal_error(format!("Failed to check version existence: {}", e)))?;
1254
1255        if version_exists.is_none() {
1256            return Err(StorageError::Resource(ResourceError::VersionNotFound {
1257                resource_type: resource_type.to_string(),
1258                id: id.to_string(),
1259                version_id: version_id.to_string(),
1260            }));
1261        }
1262
1263        // Delete the specific version
1264        client
1265            .execute(
1266                "DELETE FROM resource_history
1267                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND version_id = $4",
1268                &[&tenant_id, &resource_type, &id, &version_id],
1269            )
1270            .await
1271            .map_err(|e| internal_error(format!("Failed to delete version: {}", e)))?;
1272
1273        Ok(())
1274    }
1275}
1276
1277// ============================================================================
1278// TypeHistoryProvider Implementation
1279// ============================================================================
1280
1281#[async_trait]
1282impl TypeHistoryProvider for PostgresBackend {
1283    async fn history_type(
1284        &self,
1285        tenant: &TenantContext,
1286        resource_type: &str,
1287        params: &HistoryParams,
1288    ) -> StorageResult<HistoryPage> {
1289        let client = self.get_client().await?;
1290        let tenant_id = tenant.tenant_id().as_str();
1291
1292        // Build the query with filters
1293        let mut sql = String::from(
1294            "SELECT id, version_id, data, last_updated, is_deleted, fhir_version
1295             FROM resource_history
1296             WHERE tenant_id = $1 AND resource_type = $2",
1297        );
1298        let mut param_index: usize = 3;
1299        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = vec![
1300            Box::new(tenant_id.to_string()),
1301            Box::new(resource_type.to_string()),
1302        ];
1303
1304        // Apply deleted filter
1305        if !params.include_deleted {
1306            sql.push_str(" AND is_deleted = FALSE");
1307        }
1308
1309        // Apply since filter
1310        if let Some(since) = &params.since {
1311            sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1312            query_params.push(Box::new(*since));
1313            param_index += 1;
1314        }
1315
1316        // Apply before filter
1317        if let Some(before) = &params.before {
1318            sql.push_str(&format!(" AND last_updated < ${}", param_index));
1319            query_params.push(Box::new(*before));
1320            param_index += 1;
1321        }
1322
1323        // Apply cursor filter if present
1324        if let Some(cursor) = params.pagination.cursor_value() {
1325            let sort_values = cursor.sort_values();
1326            if sort_values.len() >= 2 {
1327                if let (
1328                    Some(CursorValue::String(timestamp)),
1329                    Some(CursorValue::String(resource_id)),
1330                ) = (sort_values.first(), sort_values.get(1))
1331                {
1332                    sql.push_str(&format!(
1333                        " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND id < ${}))",
1334                        param_index, param_index, param_index + 1
1335                    ));
1336                    query_params.push(Box::new(timestamp.clone()));
1337                    query_params.push(Box::new(resource_id.clone()));
1338                    param_index += 2;
1339                }
1340            }
1341        }
1342
1343        // Order by last_updated descending (newest first), then by id for consistency
1344        let limit = params.pagination.count as i64 + 1;
1345        sql.push_str(&format!(
1346            " ORDER BY last_updated DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1347            param_index
1348        ));
1349        query_params.push(Box::new(limit));
1350
1351        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1352            .iter()
1353            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1354            .collect();
1355
1356        let rows = client
1357            .query(&sql, &param_refs)
1358            .await
1359            .map_err(|e| internal_error(format!("Failed to query type history: {}", e)))?;
1360
1361        let mut entries = Vec::new();
1362        let mut last_entry: Option<(String, String)> = None; // (last_updated, id)
1363
1364        for row in &rows {
1365            if entries.len() >= params.pagination.count as usize {
1366                break;
1367            }
1368
1369            let row_id: String = row.get(0);
1370            let version_id: String = row.get(1);
1371            let data: Value = row.get(2);
1372            let last_updated: DateTime<Utc> = row.get(3);
1373            let is_deleted: bool = row.get(4);
1374            let fhir_version_str: String = row.get(5);
1375
1376            let deleted_at = if is_deleted { Some(last_updated) } else { None };
1377
1378            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1379                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1380
1381            let resource = StoredResource::from_storage(
1382                resource_type,
1383                &row_id,
1384                &version_id,
1385                tenant.tenant_id().clone(),
1386                data,
1387                last_updated,
1388                last_updated,
1389                deleted_at,
1390                fhir_version,
1391            );
1392
1393            let method = if is_deleted {
1394                HistoryMethod::Delete
1395            } else if version_id == "1" {
1396                HistoryMethod::Post
1397            } else {
1398                HistoryMethod::Put
1399            };
1400
1401            last_entry = Some((last_updated.to_rfc3339(), row_id));
1402
1403            entries.push(HistoryEntry {
1404                resource,
1405                method,
1406                timestamp: last_updated,
1407            });
1408        }
1409
1410        // Determine if there are more results
1411        let has_more = rows.len() > params.pagination.count as usize;
1412
1413        // Build page info
1414        let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1415            let cursor = PageCursor::new(
1416                vec![CursorValue::String(timestamp), CursorValue::String(id)],
1417                resource_type.to_string(),
1418            );
1419            PageInfo::with_next(cursor)
1420        } else {
1421            PageInfo::end()
1422        };
1423
1424        Ok(Page::new(entries, page_info))
1425    }
1426
1427    async fn history_type_count(
1428        &self,
1429        tenant: &TenantContext,
1430        resource_type: &str,
1431    ) -> StorageResult<u64> {
1432        let client = self.get_client().await?;
1433        let tenant_id = tenant.tenant_id().as_str();
1434
1435        let row = client
1436            .query_one(
1437                "SELECT COUNT(*) FROM resource_history
1438                 WHERE tenant_id = $1 AND resource_type = $2",
1439                &[&tenant_id, &resource_type],
1440            )
1441            .await
1442            .map_err(|e| internal_error(format!("Failed to count type history: {}", e)))?;
1443
1444        let count: i64 = row.get(0);
1445        Ok(count as u64)
1446    }
1447}
1448
1449// ============================================================================
1450// SystemHistoryProvider Implementation
1451// ============================================================================
1452
1453#[async_trait]
1454impl SystemHistoryProvider for PostgresBackend {
1455    async fn history_system(
1456        &self,
1457        tenant: &TenantContext,
1458        params: &HistoryParams,
1459    ) -> StorageResult<HistoryPage> {
1460        let client = self.get_client().await?;
1461        let tenant_id = tenant.tenant_id().as_str();
1462
1463        // Build the query with filters
1464        let mut sql = String::from(
1465            "SELECT resource_type, id, version_id, data, last_updated, is_deleted, fhir_version
1466             FROM resource_history
1467             WHERE tenant_id = $1",
1468        );
1469        let mut param_index: usize = 2;
1470        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1471            vec![Box::new(tenant_id.to_string())];
1472
1473        // Apply deleted filter
1474        if !params.include_deleted {
1475            sql.push_str(" AND is_deleted = FALSE");
1476        }
1477
1478        // Apply since filter
1479        if let Some(since) = &params.since {
1480            sql.push_str(&format!(" AND last_updated >= ${}", param_index));
1481            query_params.push(Box::new(*since));
1482            param_index += 1;
1483        }
1484
1485        // Apply before filter
1486        if let Some(before) = &params.before {
1487            sql.push_str(&format!(" AND last_updated < ${}", param_index));
1488            query_params.push(Box::new(*before));
1489            param_index += 1;
1490        }
1491
1492        // Apply cursor filter if present
1493        if let Some(cursor) = params.pagination.cursor_value() {
1494            let sort_values = cursor.sort_values();
1495            if sort_values.len() >= 3 {
1496                if let (
1497                    Some(CursorValue::String(timestamp)),
1498                    Some(CursorValue::String(res_type)),
1499                    Some(CursorValue::String(res_id)),
1500                ) = (sort_values.first(), sort_values.get(1), sort_values.get(2))
1501                {
1502                    sql.push_str(&format!(
1503                        " AND (last_updated < ${}::timestamptz OR (last_updated = ${}::timestamptz AND (resource_type < ${} OR (resource_type = ${} AND id < ${}))))",
1504                        param_index, param_index, param_index + 1, param_index + 1, param_index + 2
1505                    ));
1506                    query_params.push(Box::new(timestamp.clone()));
1507                    query_params.push(Box::new(res_type.clone()));
1508                    query_params.push(Box::new(res_id.clone()));
1509                    param_index += 3;
1510                }
1511            }
1512        }
1513
1514        // Order by last_updated descending (newest first)
1515        let limit = params.pagination.count as i64 + 1;
1516        sql.push_str(&format!(
1517            " ORDER BY last_updated DESC, resource_type DESC, id DESC, CAST(version_id AS INTEGER) DESC LIMIT ${}",
1518            param_index
1519        ));
1520        query_params.push(Box::new(limit));
1521
1522        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1523            .iter()
1524            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1525            .collect();
1526
1527        let rows = client
1528            .query(&sql, &param_refs)
1529            .await
1530            .map_err(|e| internal_error(format!("Failed to query system history: {}", e)))?;
1531
1532        let mut entries = Vec::new();
1533        let mut last_entry: Option<(String, String, String)> = None;
1534
1535        for row in &rows {
1536            if entries.len() >= params.pagination.count as usize {
1537                break;
1538            }
1539
1540            let row_resource_type: String = row.get(0);
1541            let row_id: String = row.get(1);
1542            let version_id: String = row.get(2);
1543            let data: Value = row.get(3);
1544            let last_updated: DateTime<Utc> = row.get(4);
1545            let is_deleted: bool = row.get(5);
1546            let fhir_version_str: String = row.get(6);
1547
1548            let deleted_at = if is_deleted { Some(last_updated) } else { None };
1549
1550            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1551                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1552
1553            let resource = StoredResource::from_storage(
1554                &row_resource_type,
1555                &row_id,
1556                &version_id,
1557                tenant.tenant_id().clone(),
1558                data,
1559                last_updated,
1560                last_updated,
1561                deleted_at,
1562                fhir_version,
1563            );
1564
1565            let method = if is_deleted {
1566                HistoryMethod::Delete
1567            } else if version_id == "1" {
1568                HistoryMethod::Post
1569            } else {
1570                HistoryMethod::Put
1571            };
1572
1573            last_entry = Some((last_updated.to_rfc3339(), row_resource_type, row_id));
1574
1575            entries.push(HistoryEntry {
1576                resource,
1577                method,
1578                timestamp: last_updated,
1579            });
1580        }
1581
1582        let has_more = rows.len() > params.pagination.count as usize;
1583
1584        let page_info = if let (true, Some((timestamp, resource_type, id))) = (has_more, last_entry)
1585        {
1586            let cursor = PageCursor::new(
1587                vec![
1588                    CursorValue::String(timestamp),
1589                    CursorValue::String(resource_type),
1590                    CursorValue::String(id),
1591                ],
1592                "system".to_string(),
1593            );
1594            PageInfo::with_next(cursor)
1595        } else {
1596            PageInfo::end()
1597        };
1598
1599        Ok(Page::new(entries, page_info))
1600    }
1601
1602    async fn history_system_count(&self, tenant: &TenantContext) -> StorageResult<u64> {
1603        let client = self.get_client().await?;
1604        let tenant_id = tenant.tenant_id().as_str();
1605
1606        let row = client
1607            .query_one(
1608                "SELECT COUNT(*) FROM resource_history WHERE tenant_id = $1",
1609                &[&tenant_id],
1610            )
1611            .await
1612            .map_err(|e| internal_error(format!("Failed to count system history: {}", e)))?;
1613
1614        let count: i64 = row.get(0);
1615        Ok(count as u64)
1616    }
1617}
1618
1619// ============================================================================
1620// DifferentialHistoryProvider Implementation
1621// ============================================================================
1622
1623#[async_trait]
1624impl DifferentialHistoryProvider for PostgresBackend {
1625    async fn modified_since(
1626        &self,
1627        tenant: &TenantContext,
1628        resource_type: Option<&str>,
1629        since: DateTime<Utc>,
1630        pagination: &Pagination,
1631    ) -> StorageResult<Page<StoredResource>> {
1632        let client = self.get_client().await?;
1633        let tenant_id = tenant.tenant_id().as_str();
1634
1635        // Build query for current versions of resources modified since timestamp
1636        let mut sql = String::from(
1637            "SELECT resource_type, id, version_id, data, last_updated, fhir_version
1638             FROM resources
1639             WHERE tenant_id = $1 AND last_updated > $2 AND is_deleted = FALSE",
1640        );
1641        let mut param_index: usize = 3;
1642        let mut query_params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> =
1643            vec![Box::new(tenant_id.to_string()), Box::new(since)];
1644
1645        // Filter by resource type if specified
1646        if let Some(rt) = resource_type {
1647            sql.push_str(&format!(" AND resource_type = ${}", param_index));
1648            query_params.push(Box::new(rt.to_string()));
1649            param_index += 1;
1650        }
1651
1652        // Apply cursor filter if present
1653        if let Some(cursor) = pagination.cursor_value() {
1654            let sort_values = cursor.sort_values();
1655            if sort_values.len() >= 2 {
1656                if let (Some(CursorValue::String(timestamp)), Some(CursorValue::String(res_id))) =
1657                    (sort_values.first(), sort_values.get(1))
1658                {
1659                    sql.push_str(&format!(
1660                        " AND (last_updated > ${}::timestamptz OR (last_updated = ${}::timestamptz AND id > ${}))",
1661                        param_index, param_index, param_index + 1
1662                    ));
1663                    query_params.push(Box::new(timestamp.clone()));
1664                    query_params.push(Box::new(res_id.clone()));
1665                    param_index += 2;
1666                }
1667            }
1668        }
1669
1670        // Order by last_updated ascending (oldest first for sync)
1671        let limit = pagination.count as i64 + 1;
1672        sql.push_str(&format!(
1673            " ORDER BY last_updated ASC, id ASC LIMIT ${}",
1674            param_index
1675        ));
1676        query_params.push(Box::new(limit));
1677
1678        let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = query_params
1679            .iter()
1680            .map(|p| p.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
1681            .collect();
1682
1683        let rows = client
1684            .query(&sql, &param_refs)
1685            .await
1686            .map_err(|e| internal_error(format!("Failed to query modified resources: {}", e)))?;
1687
1688        let mut resources = Vec::new();
1689        let mut last_entry: Option<(String, String)> = None;
1690
1691        for row in &rows {
1692            if resources.len() >= pagination.count as usize {
1693                break;
1694            }
1695
1696            let row_resource_type: String = row.get(0);
1697            let row_id: String = row.get(1);
1698            let version_id: String = row.get(2);
1699            let data: Value = row.get(3);
1700            let last_updated: DateTime<Utc> = row.get(4);
1701            let fhir_version_str: String = row.get(5);
1702
1703            let fhir_version = FhirVersion::from_storage(&fhir_version_str)
1704                .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
1705
1706            let resource = StoredResource::from_storage(
1707                &row_resource_type,
1708                &row_id,
1709                &version_id,
1710                tenant.tenant_id().clone(),
1711                data,
1712                last_updated,
1713                last_updated,
1714                None,
1715                fhir_version,
1716            );
1717
1718            last_entry = Some((last_updated.to_rfc3339(), row_id));
1719            resources.push(resource);
1720        }
1721
1722        let has_more = rows.len() > pagination.count as usize;
1723
1724        let page_info = if let (true, Some((timestamp, id))) = (has_more, last_entry) {
1725            let cursor = PageCursor::new(
1726                vec![CursorValue::String(timestamp), CursorValue::String(id)],
1727                "modified_since".to_string(),
1728            );
1729            PageInfo::with_next(cursor)
1730        } else {
1731            PageInfo::end()
1732        };
1733
1734        Ok(Page::new(resources, page_info))
1735    }
1736}
1737
1738// ============================================================================
1739// PurgableStorage Implementation
1740// ============================================================================
1741
1742#[async_trait]
1743impl PurgableStorage for PostgresBackend {
1744    async fn purge(
1745        &self,
1746        tenant: &TenantContext,
1747        resource_type: &str,
1748        id: &str,
1749    ) -> StorageResult<()> {
1750        let client = self.get_client().await?;
1751        let tenant_id = tenant.tenant_id().as_str();
1752
1753        // Check if resource exists (in any state)
1754        let exists = client
1755            .query_opt(
1756                "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1757                &[&tenant_id, &resource_type, &id],
1758            )
1759            .await
1760            .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
1761
1762        if exists.is_none() {
1763            // Also check history in case it was already purged from main table
1764            let history_exists = client
1765                .query_opt(
1766                    "SELECT 1 FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1767                    &[&tenant_id, &resource_type, &id],
1768                )
1769                .await
1770                .map_err(|e| internal_error(format!("Failed to check history: {}", e)))?;
1771
1772            if history_exists.is_none() {
1773                return Err(StorageError::Resource(ResourceError::NotFound {
1774                    resource_type: resource_type.to_string(),
1775                    id: id.to_string(),
1776                }));
1777            }
1778        }
1779
1780        // Delete from search index first (due to FK constraint)
1781        client
1782            .execute(
1783                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1784                &[&tenant_id, &resource_type, &id],
1785            )
1786            .await
1787            .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1788
1789        // Delete from FTS table
1790        let _ = client
1791            .execute(
1792                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
1793                &[&tenant_id, &resource_type, &id],
1794            )
1795            .await;
1796
1797        // Delete from history table (before resources due to FK)
1798        client
1799            .execute(
1800                "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1801                &[&tenant_id, &resource_type, &id],
1802            )
1803            .await
1804            .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1805
1806        // Delete from resources table
1807        client
1808            .execute(
1809                "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
1810                &[&tenant_id, &resource_type, &id],
1811            )
1812            .await
1813            .map_err(|e| internal_error(format!("Failed to purge resource: {}", e)))?;
1814
1815        Ok(())
1816    }
1817
1818    async fn purge_all(&self, tenant: &TenantContext, resource_type: &str) -> StorageResult<u64> {
1819        let client = self.get_client().await?;
1820        let tenant_id = tenant.tenant_id().as_str();
1821
1822        // Count how many we're about to delete
1823        let row = client
1824            .query_one(
1825                "SELECT COUNT(DISTINCT id) FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1826                &[&tenant_id, &resource_type],
1827            )
1828            .await
1829            .map_err(|e| internal_error(format!("Failed to count resources: {}", e)))?;
1830        let count: i64 = row.get(0);
1831
1832        // Delete from search index first (due to FK constraint)
1833        client
1834            .execute(
1835                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2",
1836                &[&tenant_id, &resource_type],
1837            )
1838            .await
1839            .map_err(|e| internal_error(format!("Failed to purge search index: {}", e)))?;
1840
1841        // Delete from FTS table
1842        let _ = client
1843            .execute(
1844                "DELETE FROM resource_fts WHERE tenant_id = $1 AND resource_type = $2",
1845                &[&tenant_id, &resource_type],
1846            )
1847            .await;
1848
1849        // Delete from history table
1850        client
1851            .execute(
1852                "DELETE FROM resource_history WHERE tenant_id = $1 AND resource_type = $2",
1853                &[&tenant_id, &resource_type],
1854            )
1855            .await
1856            .map_err(|e| internal_error(format!("Failed to purge resource history: {}", e)))?;
1857
1858        // Delete from resources table
1859        client
1860            .execute(
1861                "DELETE FROM resources WHERE tenant_id = $1 AND resource_type = $2",
1862                &[&tenant_id, &resource_type],
1863            )
1864            .await
1865            .map_err(|e| internal_error(format!("Failed to purge resources: {}", e)))?;
1866
1867        Ok(count as u64)
1868    }
1869}
1870
1871// ============================================================================
1872// ConditionalStorage Implementation
1873// ============================================================================
1874
1875// Helper function to parse simple search parameters
1876// Supports basic formats like: identifier=X, _id=Y, name=Z
1877fn parse_simple_search_params(params: &str) -> Vec<(String, String)> {
1878    params
1879        .split('&')
1880        .filter_map(|pair| {
1881            let parts: Vec<&str> = pair.splitn(2, '=').collect();
1882            if parts.len() == 2 {
1883                Some((parts[0].to_string(), parts[1].to_string()))
1884            } else {
1885                None
1886            }
1887        })
1888        .collect()
1889}
1890
1891#[async_trait]
1892impl ConditionalStorage for PostgresBackend {
1893    async fn conditional_create(
1894        &self,
1895        tenant: &TenantContext,
1896        resource_type: &str,
1897        resource: Value,
1898        search_params: &str,
1899        fhir_version: FhirVersion,
1900    ) -> StorageResult<ConditionalCreateResult> {
1901        // Find matching resources based on search parameters
1902        let matches = self
1903            .find_matching_resources(tenant, resource_type, search_params)
1904            .await?;
1905
1906        match matches.len() {
1907            0 => {
1908                // No match - create the resource
1909                let created = self
1910                    .create(tenant, resource_type, resource, fhir_version)
1911                    .await?;
1912                Ok(ConditionalCreateResult::Created(created))
1913            }
1914            1 => {
1915                // Exactly one match - return the existing resource
1916                Ok(ConditionalCreateResult::Exists(
1917                    matches.into_iter().next().unwrap(),
1918                ))
1919            }
1920            n => {
1921                // Multiple matches - error condition
1922                Ok(ConditionalCreateResult::MultipleMatches(n))
1923            }
1924        }
1925    }
1926
1927    async fn conditional_update(
1928        &self,
1929        tenant: &TenantContext,
1930        resource_type: &str,
1931        resource: Value,
1932        search_params: &str,
1933        upsert: bool,
1934        fhir_version: FhirVersion,
1935    ) -> StorageResult<ConditionalUpdateResult> {
1936        // Find matching resources based on search parameters
1937        let matches = self
1938            .find_matching_resources(tenant, resource_type, search_params)
1939            .await?;
1940
1941        match matches.len() {
1942            0 => {
1943                if upsert {
1944                    // No match, but upsert is true - create new resource
1945                    let created = self
1946                        .create(tenant, resource_type, resource, fhir_version)
1947                        .await?;
1948                    Ok(ConditionalUpdateResult::Created(created))
1949                } else {
1950                    // No match and no upsert
1951                    Ok(ConditionalUpdateResult::NoMatch)
1952                }
1953            }
1954            1 => {
1955                // Exactly one match - update it (preserves existing FHIR version)
1956                let existing = matches.into_iter().next().unwrap();
1957                let updated = self.update(tenant, &existing, resource).await?;
1958                Ok(ConditionalUpdateResult::Updated(updated))
1959            }
1960            n => {
1961                // Multiple matches - error condition
1962                Ok(ConditionalUpdateResult::MultipleMatches(n))
1963            }
1964        }
1965    }
1966
1967    async fn conditional_delete(
1968        &self,
1969        tenant: &TenantContext,
1970        resource_type: &str,
1971        search_params: &str,
1972    ) -> StorageResult<ConditionalDeleteResult> {
1973        // Find matching resources based on search parameters
1974        let matches = self
1975            .find_matching_resources(tenant, resource_type, search_params)
1976            .await?;
1977
1978        match matches.len() {
1979            0 => {
1980                // No match
1981                Ok(ConditionalDeleteResult::NoMatch)
1982            }
1983            1 => {
1984                // Exactly one match - delete it
1985                let existing = matches.into_iter().next().unwrap();
1986                self.delete(tenant, resource_type, existing.id()).await?;
1987                Ok(ConditionalDeleteResult::Deleted)
1988            }
1989            n => {
1990                // Multiple matches - error condition
1991                Ok(ConditionalDeleteResult::MultipleMatches(n))
1992            }
1993        }
1994    }
1995
1996    async fn conditional_patch(
1997        &self,
1998        tenant: &TenantContext,
1999        resource_type: &str,
2000        search_params: &str,
2001        patch: &crate::core::PatchFormat,
2002    ) -> StorageResult<crate::core::ConditionalPatchResult> {
2003        use crate::core::{ConditionalPatchResult, PatchFormat};
2004
2005        // Find matching resources based on search parameters
2006        let matches = self
2007            .find_matching_resources(tenant, resource_type, search_params)
2008            .await?;
2009
2010        match matches.len() {
2011            0 => Ok(ConditionalPatchResult::NoMatch),
2012            1 => {
2013                // Exactly one match - apply the patch
2014                let existing = matches.into_iter().next().unwrap();
2015                let current_content = existing.content().clone();
2016
2017                // Apply the patch based on format
2018                let patched_content = match patch {
2019                    PatchFormat::JsonPatch(patch_doc) => {
2020                        self.apply_json_patch(&current_content, patch_doc)?
2021                    }
2022                    PatchFormat::FhirPathPatch(patch_params) => {
2023                        self.apply_fhirpath_patch(&current_content, patch_params)?
2024                    }
2025                    PatchFormat::MergePatch(merge_doc) => {
2026                        self.apply_merge_patch(&current_content, merge_doc)
2027                    }
2028                };
2029
2030                // Update the resource with the patched content
2031                let updated = self.update(tenant, &existing, patched_content).await?;
2032                Ok(ConditionalPatchResult::Patched(updated))
2033            }
2034            n => Ok(ConditionalPatchResult::MultipleMatches(n)),
2035        }
2036    }
2037}
2038
2039impl PostgresBackend {
2040    /// Find resources matching the given search parameters.
2041    ///
2042    /// Uses the SearchProvider implementation to leverage the pre-computed search index.
2043    async fn find_matching_resources(
2044        &self,
2045        tenant: &TenantContext,
2046        resource_type: &str,
2047        search_params_str: &str,
2048    ) -> StorageResult<Vec<StoredResource>> {
2049        // Parse search parameters into (name, value) pairs
2050        let parsed_params = parse_simple_search_params(search_params_str);
2051
2052        if parsed_params.is_empty() {
2053            // No search params means match all - but for conditional ops this is unusual
2054            return Ok(Vec::new());
2055        }
2056
2057        // Build SearchParameter objects by looking up types from the registry
2058        let search_params = self.build_search_parameters(resource_type, &parsed_params)?;
2059
2060        // Build a SearchQuery
2061        let query = SearchQuery {
2062            resource_type: resource_type.to_string(),
2063            parameters: search_params,
2064            count: Some(1000),
2065            ..Default::default()
2066        };
2067
2068        // Use the SearchProvider implementation
2069        let result = <Self as SearchProvider>::search(self, tenant, &query).await?;
2070
2071        Ok(result.resources.items)
2072    }
2073
2074    /// Builds SearchParameter objects from parsed (name, value) pairs.
2075    fn build_search_parameters(
2076        &self,
2077        resource_type: &str,
2078        params: &[(String, String)],
2079    ) -> StorageResult<Vec<SearchParameter>> {
2080        let registry = self.search_registry().read();
2081        let mut search_params = Vec::with_capacity(params.len());
2082
2083        for (name, value) in params {
2084            let param_type = self
2085                .lookup_param_type(&registry, resource_type, name)
2086                .unwrap_or({
2087                    match name.as_str() {
2088                        "_id" => SearchParamType::Token,
2089                        "_lastUpdated" => SearchParamType::Date,
2090                        "_tag" | "_profile" | "_security" => SearchParamType::Token,
2091                        "identifier" => SearchParamType::Token,
2092                        "patient" | "subject" | "encounter" | "performer" | "author"
2093                        | "requester" | "recorder" | "asserter" | "practitioner"
2094                        | "organization" | "location" | "device" => SearchParamType::Reference,
2095                        _ => SearchParamType::String,
2096                    }
2097                });
2098
2099            search_params.push(SearchParameter {
2100                name: name.clone(),
2101                param_type,
2102                modifier: None,
2103                values: vec![SearchValue::parse(value)],
2104                chain: vec![],
2105                components: vec![],
2106            });
2107        }
2108
2109        Ok(search_params)
2110    }
2111
2112    /// Looks up a search parameter type from the registry.
2113    fn lookup_param_type(
2114        &self,
2115        registry: &crate::search::SearchParameterRegistry,
2116        resource_type: &str,
2117        param_name: &str,
2118    ) -> Option<SearchParamType> {
2119        if let Some(def) = registry.get_param(resource_type, param_name) {
2120            return Some(def.param_type);
2121        }
2122        if let Some(def) = registry.get_param("Resource", param_name) {
2123            return Some(def.param_type);
2124        }
2125        None
2126    }
2127
2128    // ========================================================================
2129    // Patch Helper Methods
2130    // ========================================================================
2131
2132    /// Applies a JSON Patch (RFC 6902) to a resource.
2133    fn apply_json_patch(&self, resource: &Value, patch_doc: &Value) -> StorageResult<Value> {
2134        use crate::error::ValidationError;
2135
2136        let patch: json_patch::Patch = serde_json::from_value(patch_doc.clone()).map_err(|e| {
2137            StorageError::Validation(ValidationError::InvalidResource {
2138                message: format!("Invalid JSON Patch document: {}", e),
2139                details: vec![],
2140            })
2141        })?;
2142
2143        let mut patched = resource.clone();
2144        json_patch::patch(&mut patched, &patch).map_err(|e| {
2145            StorageError::Validation(ValidationError::InvalidResource {
2146                message: format!("Failed to apply JSON Patch: {}", e),
2147                details: vec![],
2148            })
2149        })?;
2150
2151        Ok(patched)
2152    }
2153
2154    /// Applies a FHIRPath Patch to a resource.
2155    fn apply_fhirpath_patch(&self, resource: &Value, patch_params: &Value) -> StorageResult<Value> {
2156        use crate::error::ValidationError;
2157
2158        let parameter = patch_params.get("parameter").and_then(|p| p.as_array());
2159        if parameter.is_none() {
2160            return Err(StorageError::Validation(ValidationError::InvalidResource {
2161                message: "FHIRPath Patch must have a 'parameter' array".to_string(),
2162                details: vec![],
2163            }));
2164        }
2165
2166        let mut patched = resource.clone();
2167
2168        for operation in parameter.unwrap() {
2169            let parts = operation.get("part").and_then(|p| p.as_array());
2170            if parts.is_none() {
2171                continue;
2172            }
2173
2174            let mut op_type = None;
2175            let mut op_path = None;
2176            let mut op_name = None;
2177            let mut op_value = None;
2178
2179            for part in parts.unwrap() {
2180                match part.get("name").and_then(|n| n.as_str()) {
2181                    Some("type") => {
2182                        op_type = part
2183                            .get("valueCode")
2184                            .and_then(|v| v.as_str())
2185                            .map(|s| s.to_string());
2186                    }
2187                    Some("path") => {
2188                        op_path = part
2189                            .get("valueString")
2190                            .and_then(|v| v.as_str())
2191                            .map(|s| s.to_string());
2192                    }
2193                    Some("name") => {
2194                        op_name = part
2195                            .get("valueString")
2196                            .and_then(|v| v.as_str())
2197                            .map(|s| s.to_string());
2198                    }
2199                    Some("value") => {
2200                        op_value = extract_part_value(part);
2201                    }
2202                    _ => {}
2203                }
2204            }
2205
2206            match op_type.as_deref() {
2207                Some("replace") => {
2208                    if let (Some(path), Some(value)) = (&op_path, &op_value) {
2209                        self.fhirpath_replace(&mut patched, path, value)?;
2210                    }
2211                }
2212                Some("add") => {
2213                    if let (Some(path), Some(name), Some(value)) = (&op_path, &op_name, &op_value) {
2214                        self.fhirpath_add(&mut patched, path, name, value)?;
2215                    }
2216                }
2217                Some("delete") => {
2218                    if let Some(path) = &op_path {
2219                        self.fhirpath_delete(&mut patched, path)?;
2220                    }
2221                }
2222                _ => {
2223                    // Unsupported operation type - skip
2224                }
2225            }
2226        }
2227
2228        Ok(patched)
2229    }
2230
2231    /// Helper for FHIRPath replace operation.
2232    fn fhirpath_replace(
2233        &self,
2234        resource: &mut Value,
2235        path: &str,
2236        value: &Value,
2237    ) -> StorageResult<()> {
2238        let parts: Vec<&str> = path.split('.').collect();
2239        if parts.len() == 2 {
2240            if let Some(obj) = resource.as_object_mut() {
2241                obj.insert(parts[1].to_string(), value.clone());
2242            }
2243        }
2244        Ok(())
2245    }
2246
2247    /// Helper for FHIRPath add operation.
2248    fn fhirpath_add(
2249        &self,
2250        resource: &mut Value,
2251        path: &str,
2252        name: &str,
2253        value: &Value,
2254    ) -> StorageResult<()> {
2255        let parts: Vec<&str> = path.split('.').collect();
2256        if parts.len() == 1
2257            && parts[0]
2258                == resource
2259                    .get("resourceType")
2260                    .and_then(|r| r.as_str())
2261                    .unwrap_or("")
2262        {
2263            if let Some(obj) = resource.as_object_mut() {
2264                obj.insert(name.to_string(), value.clone());
2265            }
2266        }
2267        Ok(())
2268    }
2269
2270    /// Helper for FHIRPath delete operation.
2271    fn fhirpath_delete(&self, resource: &mut Value, path: &str) -> StorageResult<()> {
2272        let parts: Vec<&str> = path.split('.').collect();
2273        if parts.len() == 2 {
2274            if let Some(obj) = resource.as_object_mut() {
2275                obj.remove(parts[1]);
2276            }
2277        }
2278        Ok(())
2279    }
2280
2281    /// Applies a JSON Merge Patch (RFC 7386) to a resource.
2282    fn apply_merge_patch(&self, resource: &Value, merge_doc: &Value) -> Value {
2283        let mut patched = resource.clone();
2284        json_patch::merge(&mut patched, merge_doc);
2285        patched
2286    }
2287}
2288
2289// ============================================================================
2290// BundleProvider Implementation
2291// ============================================================================
2292
2293#[async_trait]
2294impl BundleProvider for PostgresBackend {
2295    async fn process_transaction(
2296        &self,
2297        tenant: &TenantContext,
2298        entries: Vec<BundleEntry>,
2299    ) -> Result<BundleResult, TransactionError> {
2300        use crate::core::transaction::{Transaction, TransactionOptions, TransactionProvider};
2301        use std::collections::HashMap;
2302
2303        // Start a transaction
2304        let mut tx = self
2305            .begin_transaction(tenant, TransactionOptions::new())
2306            .await
2307            .map_err(|e| TransactionError::RolledBack {
2308                reason: format!("Failed to begin transaction: {}", e),
2309            })?;
2310
2311        let mut results = Vec::with_capacity(entries.len());
2312        let mut error_info: Option<(usize, String)> = None;
2313
2314        // Build a map of fullUrl -> assigned reference for reference resolution
2315        let mut reference_map: HashMap<String, String> = HashMap::new();
2316
2317        // Make entries mutable for reference resolution
2318        let mut entries = entries;
2319
2320        // Process each entry within the transaction
2321        for (idx, entry) in entries.iter_mut().enumerate() {
2322            // Resolve references in this entry's resource before processing
2323            if let Some(ref mut resource) = entry.resource {
2324                resolve_bundle_references(resource, &reference_map);
2325            }
2326
2327            let result = self.process_bundle_entry_tx(&mut tx, entry).await;
2328
2329            match result {
2330                Ok(entry_result) => {
2331                    if entry_result.status >= 400 {
2332                        error_info = Some((
2333                            idx,
2334                            format!("Entry failed with status {}", entry_result.status),
2335                        ));
2336                        break;
2337                    }
2338
2339                    // If this was a create (POST) and we have a fullUrl, record the mapping
2340                    if entry.method == BundleMethod::Post {
2341                        if let Some(ref full_url) = entry.full_url {
2342                            if let Some(ref location) = entry_result.location {
2343                                let reference = location
2344                                    .split("/_history")
2345                                    .next()
2346                                    .unwrap_or(location)
2347                                    .to_string();
2348                                reference_map.insert(full_url.clone(), reference);
2349                            }
2350                        }
2351                    }
2352
2353                    results.push(entry_result);
2354                }
2355                Err(e) => {
2356                    error_info = Some((idx, format!("Entry processing failed: {}", e)));
2357                    break;
2358                }
2359            }
2360        }
2361
2362        // Handle error or commit
2363        if let Some((index, message)) = error_info {
2364            let _ = Box::new(tx).rollback().await;
2365            return Err(TransactionError::BundleError { index, message });
2366        }
2367
2368        // Commit the transaction
2369        Box::new(tx)
2370            .commit()
2371            .await
2372            .map_err(|e| TransactionError::RolledBack {
2373                reason: format!("Commit failed: {}", e),
2374            })?;
2375
2376        Ok(BundleResult {
2377            bundle_type: BundleType::Transaction,
2378            entries: results,
2379        })
2380    }
2381
2382    async fn process_batch(
2383        &self,
2384        tenant: &TenantContext,
2385        entries: Vec<BundleEntry>,
2386    ) -> StorageResult<BundleResult> {
2387        let mut results = Vec::with_capacity(entries.len());
2388
2389        // Process each entry independently
2390        for entry in &entries {
2391            let result = self.process_batch_entry(tenant, entry).await;
2392            results.push(result);
2393        }
2394
2395        Ok(BundleResult {
2396            bundle_type: BundleType::Batch,
2397            entries: results,
2398        })
2399    }
2400}
2401
2402impl PostgresBackend {
2403    /// Process a single bundle entry within a transaction.
2404    async fn process_bundle_entry_tx(
2405        &self,
2406        tx: &mut super::transaction::PostgresTransaction,
2407        entry: &BundleEntry,
2408    ) -> StorageResult<BundleEntryResult> {
2409        use crate::core::transaction::Transaction;
2410
2411        match entry.method {
2412            BundleMethod::Get => {
2413                let (resource_type, id) = self.parse_url(&entry.url)?;
2414                match tx.read(&resource_type, &id).await? {
2415                    Some(resource) => Ok(BundleEntryResult::ok(resource)),
2416                    None => Ok(BundleEntryResult::error(
2417                        404,
2418                        serde_json::json!({
2419                            "resourceType": "OperationOutcome",
2420                            "issue": [{"severity": "error", "code": "not-found"}]
2421                        }),
2422                    )),
2423                }
2424            }
2425            BundleMethod::Post => {
2426                let resource = entry.resource.clone().ok_or_else(|| {
2427                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2428                        field: "resource".to_string(),
2429                    })
2430                })?;
2431
2432                let resource_type = resource
2433                    .get("resourceType")
2434                    .and_then(|v| v.as_str())
2435                    .map(|s| s.to_string())
2436                    .ok_or_else(|| {
2437                        StorageError::Validation(
2438                            crate::error::ValidationError::MissingRequiredField {
2439                                field: "resourceType".to_string(),
2440                            },
2441                        )
2442                    })?;
2443
2444                let created = tx.create(&resource_type, resource).await?;
2445                Ok(BundleEntryResult::created(created))
2446            }
2447            BundleMethod::Put => {
2448                let resource = entry.resource.clone().ok_or_else(|| {
2449                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2450                        field: "resource".to_string(),
2451                    })
2452                })?;
2453
2454                let (resource_type, id) = self.parse_url(&entry.url)?;
2455
2456                match tx.read(&resource_type, &id).await? {
2457                    Some(existing) => {
2458                        // Check If-Match if provided
2459                        if let Some(ref if_match) = entry.if_match {
2460                            let current_etag = existing.etag();
2461                            if current_etag != if_match.as_str() {
2462                                return Ok(BundleEntryResult::error(
2463                                    412,
2464                                    serde_json::json!({
2465                                        "resourceType": "OperationOutcome",
2466                                        "issue": [{"severity": "error", "code": "conflict", "diagnostics": "ETag mismatch"}]
2467                                    }),
2468                                ));
2469                            }
2470                        }
2471                        let updated = tx.update(&existing, resource).await?;
2472                        Ok(BundleEntryResult::ok(updated))
2473                    }
2474                    None => {
2475                        // Create new resource with specified ID
2476                        let mut resource_with_id = resource;
2477                        resource_with_id["id"] = serde_json::json!(id);
2478                        let created = tx.create(&resource_type, resource_with_id).await?;
2479                        Ok(BundleEntryResult::created(created))
2480                    }
2481                }
2482            }
2483            BundleMethod::Delete => {
2484                let (resource_type, id) = self.parse_url(&entry.url)?;
2485                tx.delete(&resource_type, &id).await?;
2486                Ok(BundleEntryResult::deleted())
2487            }
2488            BundleMethod::Patch => {
2489                // PATCH is not fully implemented yet
2490                Ok(BundleEntryResult::error(
2491                    501,
2492                    serde_json::json!({
2493                        "resourceType": "OperationOutcome",
2494                        "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented in transaction bundles"}]
2495                    }),
2496                ))
2497            }
2498        }
2499    }
2500
2501    /// Process a single batch entry (independent, no transaction).
2502    async fn process_batch_entry(
2503        &self,
2504        tenant: &TenantContext,
2505        entry: &BundleEntry,
2506    ) -> BundleEntryResult {
2507        match self.process_batch_entry_inner(tenant, entry).await {
2508            Ok(result) => result,
2509            Err(e) => BundleEntryResult::error(
2510                500,
2511                serde_json::json!({
2512                    "resourceType": "OperationOutcome",
2513                    "issue": [{"severity": "error", "code": "exception", "diagnostics": e.to_string()}]
2514                }),
2515            ),
2516        }
2517    }
2518
2519    async fn process_batch_entry_inner(
2520        &self,
2521        tenant: &TenantContext,
2522        entry: &BundleEntry,
2523    ) -> StorageResult<BundleEntryResult> {
2524        match entry.method {
2525            BundleMethod::Get => {
2526                let (resource_type, id) = self.parse_url(&entry.url)?;
2527                match self.read(tenant, &resource_type, &id).await? {
2528                    Some(resource) => Ok(BundleEntryResult::ok(resource)),
2529                    None => Ok(BundleEntryResult::error(
2530                        404,
2531                        serde_json::json!({
2532                            "resourceType": "OperationOutcome",
2533                            "issue": [{"severity": "error", "code": "not-found"}]
2534                        }),
2535                    )),
2536                }
2537            }
2538            BundleMethod::Post => {
2539                let resource = entry.resource.clone().ok_or_else(|| {
2540                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2541                        field: "resource".to_string(),
2542                    })
2543                })?;
2544
2545                let resource_type = resource
2546                    .get("resourceType")
2547                    .and_then(|v| v.as_str())
2548                    .map(|s| s.to_string())
2549                    .ok_or_else(|| {
2550                        StorageError::Validation(
2551                            crate::error::ValidationError::MissingRequiredField {
2552                                field: "resourceType".to_string(),
2553                            },
2554                        )
2555                    })?;
2556
2557                let created = self
2558                    .create(
2559                        tenant,
2560                        &resource_type,
2561                        resource,
2562                        FhirVersion::default_enabled(),
2563                    )
2564                    .await?;
2565                Ok(BundleEntryResult::created(created))
2566            }
2567            BundleMethod::Put => {
2568                let resource = entry.resource.clone().ok_or_else(|| {
2569                    StorageError::Validation(crate::error::ValidationError::MissingRequiredField {
2570                        field: "resource".to_string(),
2571                    })
2572                })?;
2573
2574                let (resource_type, id) = self.parse_url(&entry.url)?;
2575                let (stored, _created) = self
2576                    .create_or_update(
2577                        tenant,
2578                        &resource_type,
2579                        &id,
2580                        resource,
2581                        FhirVersion::default_enabled(),
2582                    )
2583                    .await?;
2584                Ok(BundleEntryResult::ok(stored))
2585            }
2586            BundleMethod::Delete => {
2587                let (resource_type, id) = self.parse_url(&entry.url)?;
2588                match self.delete(tenant, &resource_type, &id).await {
2589                    Ok(()) => Ok(BundleEntryResult::deleted()),
2590                    Err(StorageError::Resource(ResourceError::NotFound { .. })) => {
2591                        Ok(BundleEntryResult::deleted()) // Idempotent delete
2592                    }
2593                    Err(e) => Err(e),
2594                }
2595            }
2596            BundleMethod::Patch => Ok(BundleEntryResult::error(
2597                501,
2598                serde_json::json!({
2599                    "resourceType": "OperationOutcome",
2600                    "issue": [{"severity": "error", "code": "not-supported", "diagnostics": "PATCH not implemented"}]
2601                }),
2602            )),
2603        }
2604    }
2605
2606    /// Parse a FHIR URL into resource type and ID.
2607    fn parse_url(&self, url: &str) -> StorageResult<(String, String)> {
2608        let path = url
2609            .strip_prefix("http://")
2610            .or_else(|| url.strip_prefix("https://"))
2611            .map(|s| s.find('/').map(|i| &s[i..]).unwrap_or(s))
2612            .unwrap_or(url);
2613
2614        let path = path.trim_start_matches('/');
2615        let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
2616
2617        if parts.len() >= 2 {
2618            let len = parts.len();
2619            Ok((parts[len - 2].to_string(), parts[len - 1].to_string()))
2620        } else {
2621            Err(StorageError::Validation(
2622                crate::error::ValidationError::InvalidReference {
2623                    reference: url.to_string(),
2624                    message: "URL must be in format ResourceType/id".to_string(),
2625                },
2626            ))
2627        }
2628    }
2629}
2630
2631/// Recursively resolves urn:uuid references in a JSON value using the reference map.
2632fn resolve_bundle_references(
2633    value: &mut serde_json::Value,
2634    reference_map: &std::collections::HashMap<String, String>,
2635) {
2636    use serde_json::Value;
2637    match value {
2638        Value::Object(map) => {
2639            if let Some(Value::String(ref_str)) = map.get("reference") {
2640                if ref_str.starts_with("urn:uuid:") {
2641                    if let Some(resolved) = reference_map.get(ref_str) {
2642                        map.insert("reference".to_string(), Value::String(resolved.clone()));
2643                    }
2644                }
2645            }
2646            for v in map.values_mut() {
2647                resolve_bundle_references(v, reference_map);
2648            }
2649        }
2650        Value::Array(arr) => {
2651            for item in arr {
2652                resolve_bundle_references(item, reference_map);
2653            }
2654        }
2655        _ => {}
2656    }
2657}
2658
2659// ============================================================================
2660// ReindexableStorage Implementation
2661// ============================================================================
2662
2663#[async_trait]
2664impl ReindexableStorage for PostgresBackend {
2665    async fn list_resource_types(&self, tenant: &TenantContext) -> StorageResult<Vec<String>> {
2666        let client = self.get_client().await?;
2667        let tenant_id = tenant.tenant_id().as_str();
2668
2669        let rows = client
2670            .query(
2671                "SELECT DISTINCT resource_type FROM resources WHERE tenant_id = $1 AND is_deleted = FALSE",
2672                &[&tenant_id],
2673            )
2674            .await
2675            .map_err(|e| internal_error(format!("Failed to query resource types: {}", e)))?;
2676
2677        let types: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
2678        Ok(types)
2679    }
2680
2681    async fn count_resources(
2682        &self,
2683        tenant: &TenantContext,
2684        resource_type: &str,
2685    ) -> StorageResult<u64> {
2686        self.count(tenant, Some(resource_type)).await
2687    }
2688
2689    async fn fetch_resources_page(
2690        &self,
2691        tenant: &TenantContext,
2692        resource_type: &str,
2693        cursor: Option<&str>,
2694        limit: u32,
2695    ) -> StorageResult<ResourcePage> {
2696        let client = self.get_client().await?;
2697        let tenant_id = tenant.tenant_id().as_str();
2698
2699        // Parse cursor if provided (format: "last_updated|id")
2700        let (cursor_ts, cursor_id) = if let Some(c) = cursor {
2701            let parts: Vec<&str> = c.split('|').collect();
2702            if parts.len() == 2 {
2703                let ts = DateTime::parse_from_rfc3339(parts[0])
2704                    .map(|dt| dt.with_timezone(&Utc))
2705                    .map_err(|e| internal_error(format!("Invalid cursor timestamp: {}", e)))?;
2706                (Some(ts), Some(parts[1].to_string()))
2707            } else {
2708                (None, None)
2709            }
2710        } else {
2711            (None, None)
2712        };
2713
2714        let rows = if let (Some(ts), Some(id)) = (&cursor_ts, &cursor_id) {
2715            client
2716                .query(
2717                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2718                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2719                     AND (last_updated > $3 OR (last_updated = $3 AND id > $4))
2720                     ORDER BY last_updated ASC, id ASC LIMIT $5",
2721                    &[
2722                        &tenant_id,
2723                        &resource_type,
2724                        ts,
2725                        &id.as_str(),
2726                        &(limit as i64),
2727                    ],
2728                )
2729                .await
2730                .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2731        } else {
2732            client
2733                .query(
2734                    "SELECT id, version_id, data, last_updated, fhir_version FROM resources
2735                     WHERE tenant_id = $1 AND resource_type = $2 AND is_deleted = FALSE
2736                     ORDER BY last_updated ASC, id ASC LIMIT $3",
2737                    &[&tenant_id, &resource_type, &(limit as i64)],
2738                )
2739                .await
2740                .map_err(|e| internal_error(format!("Failed to fetch resources page: {}", e)))?
2741        };
2742
2743        let resources: Vec<StoredResource> = rows
2744            .iter()
2745            .map(|row| {
2746                let id: String = row.get(0);
2747                let version_id: String = row.get(1);
2748                let data: Value = row.get(2);
2749                let last_updated: DateTime<Utc> = row.get(3);
2750                let fhir_version_str: String = row.get(4);
2751                let fhir_version = FhirVersion::from_storage(&fhir_version_str)
2752                    .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
2753
2754                StoredResource::from_storage(
2755                    resource_type,
2756                    id,
2757                    version_id,
2758                    tenant.tenant_id().clone(),
2759                    data,
2760                    last_updated,
2761                    last_updated,
2762                    None,
2763                    fhir_version,
2764                )
2765            })
2766            .collect();
2767
2768        // Determine next cursor
2769        let next_cursor = if resources.len() == limit as usize {
2770            resources
2771                .last()
2772                .map(|r| format!("{}|{}", r.last_modified().to_rfc3339(), r.id()))
2773        } else {
2774            None
2775        };
2776
2777        Ok(ResourcePage {
2778            resources,
2779            next_cursor,
2780        })
2781    }
2782
2783    async fn delete_search_entries(
2784        &self,
2785        tenant: &TenantContext,
2786        resource_type: &str,
2787        resource_id: &str,
2788    ) -> StorageResult<()> {
2789        let client = self.get_client().await?;
2790        self.delete_search_index(
2791            &client,
2792            tenant.tenant_id().as_str(),
2793            resource_type,
2794            resource_id,
2795        )
2796        .await
2797    }
2798
2799    async fn write_search_entries(
2800        &self,
2801        tenant: &TenantContext,
2802        resource_type: &str,
2803        resource_id: &str,
2804        resource: &Value,
2805    ) -> StorageResult<usize> {
2806        let client = self.get_client().await?;
2807        let tenant_id = tenant.tenant_id().as_str();
2808
2809        // Use the dynamic extraction
2810        let values = self
2811            .search_extractor()
2812            .extract(resource, resource_type)
2813            .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
2814
2815        let mut count = 0;
2816        for value in values {
2817            PostgresSearchIndexWriter::write_entry(
2818                &client,
2819                tenant_id,
2820                resource_type,
2821                resource_id,
2822                &value,
2823            )
2824            .await?;
2825            count += 1;
2826        }
2827
2828        // Re-index contained resources too, so `$reindex` rebuilds `_contained`
2829        // search entries.
2830        count += self
2831            .index_contained_resources(&client, tenant_id, resource_type, resource_id, resource)
2832            .await?;
2833
2834        Ok(count)
2835    }
2836
2837    async fn clear_search_index(&self, tenant: &TenantContext) -> StorageResult<u64> {
2838        let client = self.get_client().await?;
2839        let tenant_id = tenant.tenant_id().as_str();
2840
2841        let deleted = client
2842            .execute(
2843                "DELETE FROM search_index WHERE tenant_id = $1",
2844                &[&tenant_id],
2845            )
2846            .await
2847            .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
2848
2849        // Also clear FTS entries
2850        let _ = client
2851            .execute(
2852                "DELETE FROM resource_fts WHERE tenant_id = $1",
2853                &[&tenant_id],
2854            )
2855            .await;
2856
2857        Ok(deleted)
2858    }
2859}
2860
2861// ============================================================================
2862// Helper Functions
2863// ============================================================================
2864
2865/// Normalize a date string for PostgreSQL TIMESTAMPTZ.
2866fn normalize_date_for_pg(value: &str) -> String {
2867    if value.contains('T') {
2868        if value.contains('+') || value.contains('Z') || value.ends_with("-00:00") {
2869            value.to_string()
2870        } else {
2871            format!("{}+00:00", value)
2872        }
2873    } else if value.len() == 10 {
2874        format!("{}T00:00:00+00:00", value)
2875    } else if value.len() == 7 {
2876        format!("{}-01T00:00:00+00:00", value)
2877    } else if value.len() == 4 {
2878        format!("{}-01-01T00:00:00+00:00", value)
2879    } else {
2880        value.to_string()
2881    }
2882}
2883
2884// ============================================================================
2885// FTS Content Extraction (local copy to avoid cross-feature dependency on sqlite)
2886// ============================================================================
2887
2888/// Content extracted from a resource for full-text search.
2889struct SearchableContent {
2890    narrative: String,
2891    full_content: String,
2892}
2893
2894impl SearchableContent {
2895    fn is_empty(&self) -> bool {
2896        self.narrative.is_empty() && self.full_content.is_empty()
2897    }
2898}
2899
2900/// Extracts searchable text content from a FHIR resource.
2901fn extract_searchable_content(resource: &Value) -> SearchableContent {
2902    SearchableContent {
2903        narrative: extract_narrative(resource),
2904        full_content: extract_all_strings(resource),
2905    }
2906}
2907
2908/// Extracts narrative text from resource.text.div, stripping HTML tags.
2909fn extract_narrative(resource: &Value) -> String {
2910    resource
2911        .get("text")
2912        .and_then(|t| t.get("div"))
2913        .and_then(|d| d.as_str())
2914        .map(strip_html_tags)
2915        .unwrap_or_default()
2916}
2917
2918/// Strips HTML tags from a string, returning plain text.
2919fn strip_html_tags(html: &str) -> String {
2920    let mut result = String::with_capacity(html.len());
2921    let mut in_tag = false;
2922
2923    for c in html.chars() {
2924        match c {
2925            '<' => in_tag = true,
2926            '>' if in_tag => {
2927                in_tag = false;
2928                result.push(' ');
2929            }
2930            _ if !in_tag => result.push(c),
2931            _ => {}
2932        }
2933    }
2934
2935    result.split_whitespace().collect::<Vec<_>>().join(" ")
2936}
2937
2938/// Extracts all string values from a JSON value recursively.
2939fn extract_all_strings(value: &Value) -> String {
2940    let mut parts = Vec::new();
2941    collect_strings(value, &mut parts);
2942    parts.join(" ")
2943}
2944
2945fn collect_strings(value: &Value, parts: &mut Vec<String>) {
2946    match value {
2947        Value::String(s) => {
2948            if !s.is_empty() {
2949                parts.push(s.clone());
2950            }
2951        }
2952        Value::Object(map) => {
2953            for (key, val) in map {
2954                if key == "div" || key == "data" {
2955                    continue;
2956                }
2957                collect_strings(val, parts);
2958            }
2959        }
2960        Value::Array(arr) => {
2961            for val in arr {
2962                collect_strings(val, parts);
2963            }
2964        }
2965        _ => {}
2966    }
2967}