1use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use deadpool_postgres::Client;
8use helios_fhir::FhirVersion;
9use serde_json::Value;
10
11use crate::core::{Transaction, TransactionOptions, TransactionProvider};
12use crate::error::{
13 BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
14};
15use crate::search::SearchParameterExtractor;
16use crate::tenant::TenantContext;
17use crate::types::StoredResource;
18
19use super::PostgresBackend;
20use super::search::writer::PostgresSearchIndexWriter;
21
22fn internal_error(message: String) -> StorageError {
23 StorageError::Backend(BackendError::Internal {
24 backend_name: "postgres".to_string(),
25 message,
26 source: None,
27 })
28}
29
30#[allow(dead_code)]
31fn serialization_error(message: String) -> StorageError {
32 StorageError::Backend(BackendError::SerializationError { message })
33}
34
35pub struct PostgresTransaction {
40 client: Option<Client>,
43 active: bool,
45 tenant: TenantContext,
47 search_extractor: Arc<SearchParameterExtractor>,
49 search_offloaded: bool,
51}
52
53impl std::fmt::Debug for PostgresTransaction {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("PostgresTransaction")
56 .field("active", &self.active)
57 .field("tenant", &self.tenant)
58 .finish()
59 }
60}
61
62impl PostgresTransaction {
63 async fn new(
65 client: Client,
66 tenant: TenantContext,
67 search_extractor: Arc<SearchParameterExtractor>,
68 search_offloaded: bool,
69 ) -> StorageResult<Self> {
70 client.execute("BEGIN", &[]).await.map_err(|e| {
72 StorageError::Transaction(TransactionError::RolledBack {
73 reason: format!("Failed to begin transaction: {}", e),
74 })
75 })?;
76
77 Ok(Self {
78 client: Some(client),
79 active: true,
80 tenant,
81 search_extractor,
82 search_offloaded,
83 })
84 }
85
86 fn client(&self) -> StorageResult<&Client> {
87 self.client
88 .as_ref()
89 .ok_or_else(|| StorageError::Transaction(TransactionError::InvalidTransaction))
90 }
91
92 async fn index_resource(
94 &self,
95 tenant_id: &str,
96 resource_type: &str,
97 resource_id: &str,
98 resource: &Value,
99 ) -> StorageResult<()> {
100 if self.search_offloaded {
101 return Ok(());
102 }
103
104 let client = self.client()?;
105
106 client
108 .execute(
109 "DELETE FROM search_index WHERE tenant_id = $1 AND resource_type = $2 AND resource_id = $3",
110 &[&tenant_id, &resource_type, &resource_id],
111 )
112 .await
113 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
114
115 let values = self
117 .search_extractor
118 .extract(resource, resource_type)
119 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
120
121 for value in values {
123 PostgresSearchIndexWriter::write_entry(
124 client,
125 tenant_id,
126 resource_type,
127 resource_id,
128 &value,
129 )
130 .await?;
131 }
132
133 tracing::debug!(
134 "Indexed resource {}/{} within transaction",
135 resource_type,
136 resource_id
137 );
138
139 Ok(())
140 }
141}
142
143#[async_trait]
144impl Transaction for PostgresTransaction {
145 async fn create(
146 &mut self,
147 resource_type: &str,
148 resource: Value,
149 ) -> StorageResult<StoredResource> {
150 if !self.active {
151 return Err(StorageError::Transaction(
152 TransactionError::InvalidTransaction,
153 ));
154 }
155
156 let client = self.client()?;
157 let tenant_id = self.tenant.tenant_id().as_str();
158
159 let id = resource
161 .get("id")
162 .and_then(|v| v.as_str())
163 .map(|s| s.to_string())
164 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
165
166 let exists = client
168 .query_opt(
169 "SELECT 1 FROM resources WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
170 &[&tenant_id, &resource_type, &id],
171 )
172 .await
173 .map_err(|e| internal_error(format!("Failed to check existence: {}", e)))?;
174
175 if exists.is_some() {
176 return Err(StorageError::Resource(ResourceError::AlreadyExists {
177 resource_type: resource_type.to_string(),
178 id: id.to_string(),
179 }));
180 }
181
182 let mut data = resource.clone();
184 if let Some(obj) = data.as_object_mut() {
185 obj.insert("id".to_string(), Value::String(id.clone()));
186 obj.insert(
187 "resourceType".to_string(),
188 Value::String(resource_type.to_string()),
189 );
190 }
191
192 let now = Utc::now();
193 let version_id = "1";
194 let fhir_version = FhirVersion::default_enabled();
195 let fhir_version_str = fhir_version.as_mime_param();
196 let is_deleted = false;
197
198 client
200 .execute(
201 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
202 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
203 &[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
204 )
205 .await
206 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
207
208 client
210 .execute(
211 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
212 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
213 &[&tenant_id, &resource_type, &id, &version_id, &data, &now, &is_deleted, &fhir_version_str],
214 )
215 .await
216 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
217
218 self.index_resource(tenant_id, resource_type, &id, &data)
220 .await?;
221
222 Ok(StoredResource::from_storage(
223 resource_type,
224 &id,
225 version_id,
226 self.tenant.tenant_id().clone(),
227 data,
228 now,
229 now,
230 None,
231 fhir_version,
232 ))
233 }
234
235 async fn read(
236 &mut self,
237 resource_type: &str,
238 id: &str,
239 ) -> StorageResult<Option<StoredResource>> {
240 if !self.active {
241 return Err(StorageError::Transaction(
242 TransactionError::InvalidTransaction,
243 ));
244 }
245
246 let client = self.client()?;
247 let tenant_id = self.tenant.tenant_id().as_str();
248
249 let row = client
250 .query_opt(
251 "SELECT version_id, data, last_updated, is_deleted, fhir_version
252 FROM resources
253 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3",
254 &[&tenant_id, &resource_type, &id],
255 )
256 .await
257 .map_err(|e| internal_error(format!("Failed to read resource: {}", e)))?;
258
259 match row {
260 Some(row) => {
261 let version_id: String = row.get(0);
262 let data: serde_json::Value = row.get(1);
263 let last_updated: chrono::DateTime<Utc> = row.get(2);
264 let is_deleted: bool = row.get(3);
265 let fhir_version_str: String = row.get(4);
266
267 if is_deleted {
268 return Ok(None);
269 }
270
271 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
272 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
273
274 Ok(Some(StoredResource::from_storage(
275 resource_type,
276 id,
277 version_id,
278 self.tenant.tenant_id().clone(),
279 data,
280 last_updated,
281 last_updated,
282 None,
283 fhir_version,
284 )))
285 }
286 None => Ok(None),
287 }
288 }
289
290 async fn update(
291 &mut self,
292 current: &StoredResource,
293 resource: Value,
294 ) -> StorageResult<StoredResource> {
295 if !self.active {
296 return Err(StorageError::Transaction(
297 TransactionError::InvalidTransaction,
298 ));
299 }
300
301 let client = self.client()?;
302 let tenant_id = self.tenant.tenant_id().as_str();
303 let resource_type = current.resource_type();
304 let id = current.id();
305
306 let row = client
308 .query_opt(
309 "SELECT version_id FROM resources
310 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
311 &[&tenant_id, &resource_type, &id],
312 )
313 .await
314 .map_err(|e| internal_error(format!("Failed to get current version: {}", e)))?;
315
316 let db_version = match row {
317 Some(row) => row.get::<_, String>(0),
318 None => {
319 return Err(StorageError::Resource(ResourceError::NotFound {
320 resource_type: resource_type.to_string(),
321 id: id.to_string(),
322 }));
323 }
324 };
325
326 if db_version != current.version_id() {
327 return Err(StorageError::Concurrency(
328 ConcurrencyError::VersionConflict {
329 resource_type: resource_type.to_string(),
330 id: id.to_string(),
331 expected_version: current.version_id().to_string(),
332 actual_version: db_version,
333 },
334 ));
335 }
336
337 let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
339 let new_version_str = new_version.to_string();
340
341 let mut data = resource.clone();
343 if let Some(obj) = data.as_object_mut() {
344 obj.insert("id".to_string(), Value::String(id.to_string()));
345 obj.insert(
346 "resourceType".to_string(),
347 Value::String(resource_type.to_string()),
348 );
349 }
350
351 let now = Utc::now();
352 let fhir_version = current.fhir_version();
353 let fhir_version_str = fhir_version.as_mime_param();
354 let is_deleted = false;
355
356 client
358 .execute(
359 "UPDATE resources SET version_id = $1, data = $2, last_updated = $3
360 WHERE tenant_id = $4 AND resource_type = $5 AND id = $6",
361 &[
362 &new_version_str,
363 &data,
364 &now,
365 &tenant_id,
366 &resource_type,
367 &id,
368 ],
369 )
370 .await
371 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
372
373 client
375 .execute(
376 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
377 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
378 &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
379 )
380 .await
381 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
382
383 self.index_resource(tenant_id, resource_type, id, &data)
385 .await?;
386
387 Ok(StoredResource::from_storage(
388 resource_type,
389 id,
390 new_version_str,
391 self.tenant.tenant_id().clone(),
392 data,
393 now,
394 now,
395 None,
396 fhir_version,
397 ))
398 }
399
400 async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
401 if !self.active {
402 return Err(StorageError::Transaction(
403 TransactionError::InvalidTransaction,
404 ));
405 }
406
407 let client = self.client()?;
408 let tenant_id = self.tenant.tenant_id().as_str();
409
410 let row = client
412 .query_opt(
413 "SELECT version_id, data, fhir_version FROM resources
414 WHERE tenant_id = $1 AND resource_type = $2 AND id = $3 AND is_deleted = FALSE",
415 &[&tenant_id, &resource_type, &id],
416 )
417 .await
418 .map_err(|e| internal_error(format!("Failed to check resource: {}", e)))?;
419
420 let (current_version, data, fhir_version_str) = match row {
421 Some(row) => {
422 let v: String = row.get(0);
423 let d: serde_json::Value = row.get(1);
424 let f: String = row.get(2);
425 (v, d, f)
426 }
427 None => {
428 return Err(StorageError::Resource(ResourceError::NotFound {
429 resource_type: resource_type.to_string(),
430 id: id.to_string(),
431 }));
432 }
433 };
434
435 let now = Utc::now();
436 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
437 let new_version_str = new_version.to_string();
438 let is_deleted = true;
439
440 client
442 .execute(
443 "UPDATE resources SET is_deleted = TRUE, deleted_at = $1, version_id = $2, last_updated = $1
444 WHERE tenant_id = $3 AND resource_type = $4 AND id = $5",
445 &[&now, &new_version_str, &tenant_id, &resource_type, &id],
446 )
447 .await
448 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
449
450 client
452 .execute(
453 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
454 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
455 &[&tenant_id, &resource_type, &id, &new_version_str, &data, &now, &is_deleted, &fhir_version_str],
456 )
457 .await
458 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
459
460 Ok(())
461 }
462
463 async fn commit(mut self: Box<Self>) -> StorageResult<()> {
464 if !self.active {
465 return Err(StorageError::Transaction(
466 TransactionError::InvalidTransaction,
467 ));
468 }
469
470 if let Some(client) = self.client.as_ref() {
471 client.execute("COMMIT", &[]).await.map_err(|e| {
472 StorageError::Transaction(TransactionError::RolledBack {
473 reason: format!("Commit failed: {}", e),
474 })
475 })?;
476 }
477
478 self.active = false;
479 Ok(())
480 }
481
482 async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
483 if !self.active {
484 return Err(StorageError::Transaction(
485 TransactionError::InvalidTransaction,
486 ));
487 }
488
489 if let Some(client) = self.client.as_ref() {
490 client.execute("ROLLBACK", &[]).await.map_err(|e| {
491 StorageError::Transaction(TransactionError::RolledBack {
492 reason: format!("Rollback failed: {}", e),
493 })
494 })?;
495 }
496
497 self.active = false;
498 Ok(())
499 }
500
501 fn tenant(&self) -> &TenantContext {
502 &self.tenant
503 }
504
505 fn is_active(&self) -> bool {
506 self.active
507 }
508}
509
510impl Drop for PostgresTransaction {
511 fn drop(&mut self) {
512 if self.active {
517 tracing::warn!("PostgreSQL transaction dropped without explicit commit or rollback");
518 }
519 }
520}
521
522#[async_trait]
523impl TransactionProvider for PostgresBackend {
524 type Transaction = PostgresTransaction;
525
526 async fn begin_transaction(
527 &self,
528 tenant: &TenantContext,
529 _options: TransactionOptions,
530 ) -> StorageResult<Self::Transaction> {
531 let client = self.get_client().await?;
532 PostgresTransaction::new(
533 client,
534 tenant.clone(),
535 self.search_extractor().clone(),
536 self.is_search_offloaded(),
537 )
538 .await
539 }
540}