1use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use deadpool_postgres::Client;
8use helios_fhir::FhirVersion;
9use serde_json::Value;
10
11use crate::core::{Transaction, TransactionOptions, TransactionProvider};
12use crate::error::{
13 BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
14};
15use crate::search::SearchParameterExtractor;
16use crate::tenant::TenantContext;
17use crate::types::StoredResource;
18
19use super::PostgresBackend;
20use super::search::writer::PostgresSearchIndexWriter;
21
22fn internal_error(message: String) -> StorageError {
23 StorageError::Backend(BackendError::Internal {
24 backend_name: "postgres".to_string(),
25 message,
26 source: None,
27 })
28}
29
30#[allow(dead_code)]
31fn serialization_error(message: String) -> StorageError {
32 StorageError::Backend(BackendError::SerializationError { message })
33}
34
35pub struct PostgresTransaction {
40 client: Option<Client>,
43 active: bool,
45 tenant: TenantContext,
47 search_extractor: Arc<SearchParameterExtractor>,
49 search_offloaded: bool,
51}
52
53impl std::fmt::Debug for PostgresTransaction {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("PostgresTransaction")
56 .field("active", &self.active)
57 .field("tenant", &self.tenant)
58 .finish()
59 }
60}
61
62impl PostgresTransaction {
63 async fn new(
65 client: Client,
66 tenant: TenantContext,
67 search_extractor: Arc<SearchParameterExtractor>,
68 search_offloaded: bool,
69 ) -> StorageResult<Self> {
70 client.execute("BEGIN", &[]).await.map_err(|e| {
72 StorageError::Transaction(TransactionError::RolledBack {
73 reason: format!("Failed to begin transaction: {}", e),
74 })
75 })?;
76
77 Ok(Self {
78 client: Some(client),
79 active: true,
80 tenant,
81 search_extractor,
82 search_offloaded,
83 })
84 }
85
86 fn client(&self) -> StorageResult<&Client> {
87 self.client
88 .as_ref()
89 .ok_or_else(|| StorageError::Transaction(TransactionError::InvalidTransaction))
90 }
91
92 async fn index_resource(
94 &self,
95 tenant_id: &str,
96 resource_type: &str,
97 resource_id: &str,
98 resource: &Value,
99 ) -> StorageResult<()> {
100 if self.search_offloaded {
101 return Ok(());
102 }
103
104 let client = self.client()?;
105
106 client
108 .execute(
109 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
110 &[&tenant_id, &resource_type, &resource_id],
111 )
112 .await
113 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
114
115 let values = self
117 .search_extractor
118 .extract(resource, resource_type)
119 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
120
121 for value in values {
123 PostgresSearchIndexWriter::write_entry(
124 client,
125 tenant_id,
126 resource_type,
127 resource_id,
128 &value,
129 )
130 .await?;
131 }
132
133 tracing::debug!(
134 "Indexed resource {}/{} within transaction",
135 resource_type,
136 resource_id
137 );
138
139 Ok(())
140 }
141}
142
143#[async_trait]
144impl Transaction for PostgresTransaction {
145 async fn create(
146 &mut self,
147 resource_type: &str,
148 resource: Value,
149 ) -> StorageResult<StoredResource> {
150 if !self.active {
151 return Err(StorageError::Transaction(
152 TransactionError::InvalidTransaction,
153 ));
154 }
155
156 let client = self.client()?;
157 let tenant_id = self.tenant.tenant_id().as_str();
158
159 let id = resource
161 .get("id")
162 .and_then(|v| v.as_str())
163 .map(|s| s.to_string())
164 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
165
166 let exists = client
168 .query_opt(
169 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
170 &[&tenant_id, &resource_type, &id],
171 )
172 .await
173 .map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
174
175 if exists.is_some() {
176 return Err(StorageError::Resource(ResourceError::AlreadyExists {
177 resource_type: resource_type.to_string(),
178 id: id.to_string(),
179 }));
180 }
181
182 let mut data = resource.clone();
184 if let Some(obj) = data.as_object_mut() {
185 obj.insert("id".to_string(), Value::String(id.clone()));
186 obj.insert(
187 "resourceType".to_string(),
188 Value::String(resource_type.to_string()),
189 );
190 }
191
192 let now = Utc::now();
193 let version_id = "1";
194 let fhir_version = FhirVersion::default();
195 let fhir_version_str = fhir_version.as_mime_param();
196 let is_deleted = false;
197
198 client
200 .execute(
201 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
202 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
203 &[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
204 )
205 .await
206 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
207
208 client
210 .execute(
211 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
212 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
213 &[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
214 )
215 .await
216 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
217
218 self.index_resource(tenant_id, resource_type, &id, &data)
220 .await?;
221
222 Ok(StoredResource::from_storage(
223 resource_type,
224 &id,
225 version_id,
226 self.tenant.tenant_id().clone(),
227 data,
228 now,
229 now,
230 None,
231 fhir_version,
232 ))
233 }
234
235 async fn read(
236 &mut self,
237 resource_type: &str,
238 id: &str,
239 ) -> StorageResult<Option<StoredResource>> {
240 if !self.active {
241 return Err(StorageError::Transaction(
242 TransactionError::InvalidTransaction,
243 ));
244 }
245
246 let client = self.client()?;
247 let tenant_id = self.tenant.tenant_id().as_str();
248
249 let row = client
250 .query_opt(
251 "SELECT version_id, data, last_updated, is_deleted, fhir_version
252 FROM resources
253 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
254 &[&tenant_id, &resource_type, &id],
255 )
256 .await
257 .map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
258
259 match row {
260 Some(row) => {
261 let version_id: String = row.get(0);
262 let data: serde_json::Value = row.get(1);
263 let last_updated: chrono::DateTime<Utc> = row.get(2);
264 let is_deleted: bool = row.get(3);
265 let fhir_version_str: String = row.get(4);
266
267 if is_deleted {
268 return Ok(None);
269 }
270
271 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
272
273 Ok(Some(StoredResource::from_storage(
274 resource_type,
275 id,
276 version_id,
277 self.tenant.tenant_id().clone(),
278 data,
279 last_updated,
280 last_updated,
281 None,
282 fhir_version,
283 )))
284 }
285 None => Ok(None),
286 }
287 }
288
289 async fn update(
290 &mut self,
291 current: &StoredResource,
292 resource: Value,
293 ) -> StorageResult<StoredResource> {
294 if !self.active {
295 return Err(StorageError::Transaction(
296 TransactionError::InvalidTransaction,
297 ));
298 }
299
300 let client = self.client()?;
301 let tenant_id = self.tenant.tenant_id().as_str();
302 let resource_type = current.resource_type();
303 let id = current.id();
304
305 let row = client
307 .query_opt(
308 "SELECT version_id FROM resources
309 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
310 &[&tenant_id, &resource_type, &id],
311 )
312 .await
313 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
314
315 let db_version = match row {
316 Some(row) => row.get::<_, String>(0),
317 None => {
318 return Err(StorageError::Resource(ResourceError::NotFound {
319 resource_type: resource_type.to_string(),
320 id: id.to_string(),
321 }));
322 }
323 };
324
325 if db_version != current.version_id() {
326 return Err(StorageError::Concurrency(
327 ConcurrencyError::VersionConflict {
328 resource_type: resource_type.to_string(),
329 id: id.to_string(),
330 expected_version: current.version_id().to_string(),
331 actual_version: db_version,
332 },
333 ));
334 }
335
336 let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
338 let new_version_str = new_version.to_string();
339
340 let mut data = resource.clone();
342 if let Some(obj) = data.as_object_mut() {
343 obj.insert("id".to_string(), Value::String(id.to_string()));
344 obj.insert(
345 "resourceType".to_string(),
346 Value::String(resource_type.to_string()),
347 );
348 }
349
350 let now = Utc::now();
351 let fhir_version = current.fhir_version();
352 let fhir_version_str = fhir_version.as_mime_param();
353 let is_deleted = false;
354
355 client
357 .execute(
358 "UPDATE resources SET version_id = $1, data = $2, last_updated = $3
359 WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
360 &[
361 &new_version_str,
362 &data,
363 &now,
364 &tenant_id,
365 &resource_type,
366 &id,
367 ],
368 )
369 .await
370 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
371
372 client
374 .execute(
375 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
376 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
377 &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
378 )
379 .await
380 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
381
382 self.index_resource(tenant_id, resource_type, id, &data)
384 .await?;
385
386 Ok(StoredResource::from_storage(
387 resource_type,
388 id,
389 new_version_str,
390 self.tenant.tenant_id().clone(),
391 data,
392 now,
393 now,
394 None,
395 fhir_version,
396 ))
397 }
398
399 async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
400 if !self.active {
401 return Err(StorageError::Transaction(
402 TransactionError::InvalidTransaction,
403 ));
404 }
405
406 let client = self.client()?;
407 let tenant_id = self.tenant.tenant_id().as_str();
408
409 let row = client
411 .query_opt(
412 "SELECT version_id, data, fhir_version FROM resources
413 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
414 &[&tenant_id, &resource_type, &id],
415 )
416 .await
417 .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
418
419 let (current_version, data, fhir_version_str) = match row {
420 Some(row) => {
421 let v: String = row.get(0);
422 let d: serde_json::Value = row.get(1);
423 let f: String = row.get(2);
424 (v, d, f)
425 }
426 None => {
427 return Err(StorageError::Resource(ResourceError::NotFound {
428 resource_type: resource_type.to_string(),
429 id: id.to_string(),
430 }));
431 }
432 };
433
434 let now = Utc::now();
435 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
436 let new_version_str = new_version.to_string();
437 let is_deleted = true;
438
439 client
441 .execute(
442 "UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
443 WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
444 &[&now, &new_version_str, &tenant_id, &resource_type, &id],
445 )
446 .await
447 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
448
449 client
451 .execute(
452 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
453 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
454 &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
455 )
456 .await
457 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
458
459 Ok(())
460 }
461
462 async fn commit(mut self: Box<Self>) -> StorageResult<()> {
463 if !self.active {
464 return Err(StorageError::Transaction(
465 TransactionError::InvalidTransaction,
466 ));
467 }
468
469 if let Some(client) = self.client.as_ref() {
470 client.execute("COMMIT", &[]).await.map_err(|e| {
471 StorageError::Transaction(TransactionError::RolledBack {
472 reason: format!("Commit failed: {}", e),
473 })
474 })?;
475 }
476
477 self.active = false;
478 Ok(())
479 }
480
481 async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
482 if !self.active {
483 return Err(StorageError::Transaction(
484 TransactionError::InvalidTransaction,
485 ));
486 }
487
488 if let Some(client) = self.client.as_ref() {
489 client.execute("ROLLBACK", &[]).await.map_err(|e| {
490 StorageError::Transaction(TransactionError::RolledBack {
491 reason: format!("Rollback failed: {}", e),
492 })
493 })?;
494 }
495
496 self.active = false;
497 Ok(())
498 }
499
500 fn tenant(&self) -> &TenantContext {
501 &self.tenant
502 }
503
504 fn is_active(&self) -> bool {
505 self.active
506 }
507}
508
509impl Drop for PostgresTransaction {
510 fn drop(&mut self) {
511 if self.active {
516 tracing::warn!("PostgreSQL transaction dropped without explicit commit or rollback");
517 }
518 }
519}
520
521#[async_trait]
522impl TransactionProvider for PostgresBackend {
523 type Transaction = PostgresTransaction;
524
525 async fn begin_transaction(
526 &self,
527 tenant: &TenantContext,
528 _options: TransactionOptions,
529 ) -> StorageResult<Self::Transaction> {
530 let client = self.get_client().await?;
531 PostgresTransaction::new(
532 client,
533 tenant.clone(),
534 self.search_extractor().clone(),
535 self.is_search_offloaded(),
536 )
537 .await
538 }
539}