Skip to main content

helios_persistence/backends/postgres/
transaction.rs

1//! Transaction support for PostgreSQL backend.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use deadpool_postgres::Client;
8use helios_fhir::FhirVersion;
9use serde_json::Value;
10
11use crate::core::{Transaction, TransactionOptions, TransactionProvider};
12use crate::error::{
13    BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
14};
15use crate::search::SearchParameterExtractor;
16use crate::tenant::TenantContext;
17use crate::types::StoredResource;
18
19use super::PostgresBackend;
20use super::search::writer::PostgresSearchIndexWriter;
21
22fn internal_error(message: String) -> StorageError {
23    StorageError::Backend(BackendError::Internal {
24        backend_name: "postgres".to_string(),
25        message,
26        source: None,
27    })
28}
29
30#[allow(dead_code)]
31fn serialization_error(message: String) -> StorageError {
32    StorageError::Backend(BackendError::SerializationError { message })
33}
34
35/// A PostgreSQL transaction.
36///
37/// Wraps a deadpool_postgres Client that has an active transaction.
38/// The transaction is automatically rolled back on drop if not committed.
39pub struct PostgresTransaction {
40    /// The client with active transaction.
41    /// Option so we can take it during commit/rollback.
42    client: Option<Client>,
43    /// Whether the transaction is still active.
44    active: bool,
45    /// The tenant context for this transaction.
46    tenant: TenantContext,
47    /// Search parameter extractor for indexing resources.
48    search_extractor: Arc<SearchParameterExtractor>,
49    /// When true, search indexing is offloaded to a secondary backend.
50    search_offloaded: bool,
51}
52
53impl std::fmt::Debug for PostgresTransaction {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("PostgresTransaction")
56            .field("active", &self.active)
57            .field("tenant", &self.tenant)
58            .finish()
59    }
60}
61
62impl PostgresTransaction {
63    /// Create a new transaction.
64    async fn new(
65        client: Client,
66        tenant: TenantContext,
67        search_extractor: Arc<SearchParameterExtractor>,
68        search_offloaded: bool,
69    ) -> StorageResult<Self> {
70        // Start the transaction
71        client.execute("BEGIN", &[]).await.map_err(|e| {
72            StorageError::Transaction(TransactionError::RolledBack {
73                reason: format!("Failed to begin transaction: {}", e),
74            })
75        })?;
76
77        Ok(Self {
78            client: Some(client),
79            active: true,
80            tenant,
81            search_extractor,
82            search_offloaded,
83        })
84    }
85
86    fn client(&self) -> StorageResult<&Client> {
87        self.client
88            .as_ref()
89            .ok_or_else(|| StorageError::Transaction(TransactionError::InvalidTransaction))
90    }
91
92    /// Index a resource for search within the transaction.
93    async fn index_resource(
94        &self,
95        tenant_id: &str,
96        resource_type: &str,
97        resource_id: &str,
98        resource: &Value,
99    ) -> StorageResult<()> {
100        if self.search_offloaded {
101            return Ok(());
102        }
103
104        let client = self.client()?;
105
106        // Delete existing index entries
107        client
108            .execute(
109                "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
110                &[&tenant_id, &resource_type, &resource_id],
111            )
112            .await
113            .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
114
115        // Extract values using the registry-driven extractor
116        let values = self
117            .search_extractor
118            .extract(resource, resource_type)
119            .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
120
121        // Write each extracted value to the index
122        for value in values {
123            PostgresSearchIndexWriter::write_entry(
124                client,
125                tenant_id,
126                resource_type,
127                resource_id,
128                &value,
129            )
130            .await?;
131        }
132
133        tracing::debug!(
134            "Indexed resource {}/{} within transaction",
135            resource_type,
136            resource_id
137        );
138
139        Ok(())
140    }
141}
142
143#[async_trait]
144impl Transaction for PostgresTransaction {
145    async fn create(
146        &mut self,
147        resource_type: &str,
148        resource: Value,
149    ) -> StorageResult<StoredResource> {
150        if !self.active {
151            return Err(StorageError::Transaction(
152                TransactionError::InvalidTransaction,
153            ));
154        }
155
156        let client = self.client()?;
157        let tenant_id = self.tenant.tenant_id().as_str();
158
159        // Get or generate ID
160        let id = resource
161            .get("id")
162            .and_then(|v| v.as_str())
163            .map(|s| s.to_string())
164            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
165
166        // Check if resource already exists
167        let exists = client
168            .query_opt(
169                "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
170                &[&tenant_id, &resource_type, &id],
171            )
172            .await
173            .map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
174
175        if exists.is_some() {
176            return Err(StorageError::Resource(ResourceError::AlreadyExists {
177                resource_type: resource_type.to_string(),
178                id: id.to_string(),
179            }));
180        }
181
182        // Build the resource with id and resourceType
183        let mut data = resource.clone();
184        if let Some(obj) = data.as_object_mut() {
185            obj.insert("id".to_string(), Value::String(id.clone()));
186            obj.insert(
187                "resourceType".to_string(),
188                Value::String(resource_type.to_string()),
189            );
190        }
191
192        let now = Utc::now();
193        let version_id = "1";
194        let fhir_version = FhirVersion::default();
195        let fhir_version_str = fhir_version.as_mime_param();
196        let is_deleted = false;
197
198        // Insert the resource
199        client
200            .execute(
201                "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
202                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
203                &[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
204            )
205            .await
206            .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
207
208        // Insert into history
209        client
210            .execute(
211                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
212                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
213                &[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
214            )
215            .await
216            .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
217
218        // Index the resource for search
219        self.index_resource(tenant_id, resource_type, &id, &data)
220            .await?;
221
222        Ok(StoredResource::from_storage(
223            resource_type,
224            &id,
225            version_id,
226            self.tenant.tenant_id().clone(),
227            data,
228            now,
229            now,
230            None,
231            fhir_version,
232        ))
233    }
234
235    async fn read(
236        &mut self,
237        resource_type: &str,
238        id: &str,
239    ) -> StorageResult<Option<StoredResource>> {
240        if !self.active {
241            return Err(StorageError::Transaction(
242                TransactionError::InvalidTransaction,
243            ));
244        }
245
246        let client = self.client()?;
247        let tenant_id = self.tenant.tenant_id().as_str();
248
249        let row = client
250            .query_opt(
251                "SELECT version_id, data, last_updated, is_deleted, fhir_version
252                 FROM resources
253                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
254                &[&tenant_id, &resource_type, &id],
255            )
256            .await
257            .map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
258
259        match row {
260            Some(row) => {
261                let version_id: String = row.get(0);
262                let data: serde_json::Value = row.get(1);
263                let last_updated: chrono::DateTime<Utc> = row.get(2);
264                let is_deleted: bool = row.get(3);
265                let fhir_version_str: String = row.get(4);
266
267                if is_deleted {
268                    return Ok(None);
269                }
270
271                let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
272
273                Ok(Some(StoredResource::from_storage(
274                    resource_type,
275                    id,
276                    version_id,
277                    self.tenant.tenant_id().clone(),
278                    data,
279                    last_updated,
280                    last_updated,
281                    None,
282                    fhir_version,
283                )))
284            }
285            None => Ok(None),
286        }
287    }
288
289    async fn update(
290        &mut self,
291        current: &StoredResource,
292        resource: Value,
293    ) -> StorageResult<StoredResource> {
294        if !self.active {
295            return Err(StorageError::Transaction(
296                TransactionError::InvalidTransaction,
297            ));
298        }
299
300        let client = self.client()?;
301        let tenant_id = self.tenant.tenant_id().as_str();
302        let resource_type = current.resource_type();
303        let id = current.id();
304
305        // Verify current version still matches (optimistic locking)
306        let row = client
307            .query_opt(
308                "SELECT version_id FROM resources
309                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
310                &[&tenant_id, &resource_type, &id],
311            )
312            .await
313            .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
314
315        let db_version = match row {
316            Some(row) => row.get::<_, String>(0),
317            None => {
318                return Err(StorageError::Resource(ResourceError::NotFound {
319                    resource_type: resource_type.to_string(),
320                    id: id.to_string(),
321                }));
322            }
323        };
324
325        if db_version != current.version_id() {
326            return Err(StorageError::Concurrency(
327                ConcurrencyError::VersionConflict {
328                    resource_type: resource_type.to_string(),
329                    id: id.to_string(),
330                    expected_version: current.version_id().to_string(),
331                    actual_version: db_version,
332                },
333            ));
334        }
335
336        // Calculate new version
337        let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
338        let new_version_str = new_version.to_string();
339
340        // Build the resource with id and resourceType
341        let mut data = resource.clone();
342        if let Some(obj) = data.as_object_mut() {
343            obj.insert("id".to_string(), Value::String(id.to_string()));
344            obj.insert(
345                "resourceType".to_string(),
346                Value::String(resource_type.to_string()),
347            );
348        }
349
350        let now = Utc::now();
351        let fhir_version = current.fhir_version();
352        let fhir_version_str = fhir_version.as_mime_param();
353        let is_deleted = false;
354
355        // Update the resource
356        client
357            .execute(
358                "UPDATE resources SET version_id = $1, data = $2, last_updated = $3
359                 WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
360                &[
361                    &new_version_str,
362                    &data,
363                    &now,
364                    &tenant_id,
365                    &resource_type,
366                    &id,
367                ],
368            )
369            .await
370            .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
371
372        // Insert into history
373        client
374            .execute(
375                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
376                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
377                &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
378            )
379            .await
380            .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
381
382        // Re-index the resource for search
383        self.index_resource(tenant_id, resource_type, id, &data)
384            .await?;
385
386        Ok(StoredResource::from_storage(
387            resource_type,
388            id,
389            new_version_str,
390            self.tenant.tenant_id().clone(),
391            data,
392            now,
393            now,
394            None,
395            fhir_version,
396        ))
397    }
398
399    async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
400        if !self.active {
401            return Err(StorageError::Transaction(
402                TransactionError::InvalidTransaction,
403            ));
404        }
405
406        let client = self.client()?;
407        let tenant_id = self.tenant.tenant_id().as_str();
408
409        // Check if resource exists
410        let row = client
411            .query_opt(
412                "SELECT version_id, data, fhir_version FROM resources
413                 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
414                &[&tenant_id, &resource_type, &id],
415            )
416            .await
417            .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
418
419        let (current_version, data, fhir_version_str) = match row {
420            Some(row) => {
421                let v: String = row.get(0);
422                let d: serde_json::Value = row.get(1);
423                let f: String = row.get(2);
424                (v, d, f)
425            }
426            None => {
427                return Err(StorageError::Resource(ResourceError::NotFound {
428                    resource_type: resource_type.to_string(),
429                    id: id.to_string(),
430                }));
431            }
432        };
433
434        let now = Utc::now();
435        let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
436        let new_version_str = new_version.to_string();
437        let is_deleted = true;
438
439        // Soft delete the resource
440        client
441            .execute(
442                "UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
443                 WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
444                &[&now, &new_version_str, &tenant_id, &resource_type, &id],
445            )
446            .await
447            .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
448
449        // Insert deletion record into history
450        client
451            .execute(
452                "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
453                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
454                &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
455            )
456            .await
457            .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
458
459        Ok(())
460    }
461
462    async fn commit(mut self: Box<Self>) -> StorageResult<()> {
463        if !self.active {
464            return Err(StorageError::Transaction(
465                TransactionError::InvalidTransaction,
466            ));
467        }
468
469        if let Some(client) = self.client.as_ref() {
470            client.execute("COMMIT", &[]).await.map_err(|e| {
471                StorageError::Transaction(TransactionError::RolledBack {
472                    reason: format!("Commit failed: {}", e),
473                })
474            })?;
475        }
476
477        self.active = false;
478        Ok(())
479    }
480
481    async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
482        if !self.active {
483            return Err(StorageError::Transaction(
484                TransactionError::InvalidTransaction,
485            ));
486        }
487
488        if let Some(client) = self.client.as_ref() {
489            client.execute("ROLLBACK", &[]).await.map_err(|e| {
490                StorageError::Transaction(TransactionError::RolledBack {
491                    reason: format!("Rollback failed: {}", e),
492                })
493            })?;
494        }
495
496        self.active = false;
497        Ok(())
498    }
499
500    fn tenant(&self) -> &TenantContext {
501        &self.tenant
502    }
503
504    fn is_active(&self) -> bool {
505        self.active
506    }
507}
508
509impl Drop for PostgresTransaction {
510    fn drop(&mut self) {
511        // If transaction wasn't explicitly committed or rolled back, attempt rollback.
512        // Note: We can't do async in Drop, so we just log a warning.
513        // The connection will be returned to the pool and PostgreSQL will auto-rollback
514        // any uncommitted transaction when the connection is recycled.
515        if self.active {
516            tracing::warn!("PostgreSQL transaction dropped without explicit commit or rollback");
517        }
518    }
519}
520
521#[async_trait]
522impl TransactionProvider for PostgresBackend {
523    type Transaction = PostgresTransaction;
524
525    async fn begin_transaction(
526        &self,
527        tenant: &TenantContext,
528        _options: TransactionOptions,
529    ) -> StorageResult<Self::Transaction> {
530        let client = self.get_client().await?;
531        PostgresTransaction::new(
532            client,
533            tenant.clone(),
534            self.search_extractor().clone(),
535            self.is_search_offloaded(),
536        )
537        .await
538    }
539}