1use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use helios_fhir::FhirVersion;
8use parking_lot::Mutex;
9use r2d2::PooledConnection;
10use r2d2_sqlite::SqliteConnectionManager;
11use rusqlite::params;
12use serde_json::Value;
13
14use crate::core::{Transaction, TransactionOptions, TransactionProvider};
15use crate::error::{
16 BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
17};
18use crate::search::SearchParameterExtractor;
19use crate::tenant::TenantContext;
20use crate::types::StoredResource;
21
22use super::SqliteBackend;
23
24fn internal_error(message: String) -> StorageError {
25 StorageError::Backend(BackendError::Internal {
26 backend_name: "sqlite".to_string(),
27 message,
28 source: None,
29 })
30}
31
32fn serialization_error(message: String) -> StorageError {
33 StorageError::Backend(BackendError::SerializationError { message })
34}
35
36pub struct SqliteTransaction {
38 conn: Arc<Mutex<PooledConnection<SqliteConnectionManager>>>,
40 active: bool,
42 tenant: TenantContext,
44 search_extractor: Arc<SearchParameterExtractor>,
46 search_offloaded: bool,
48}
49
50impl std::fmt::Debug for SqliteTransaction {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("SqliteTransaction")
53 .field("active", &self.active)
54 .field("tenant", &self.tenant)
55 .finish()
56 }
57}
58
59impl SqliteTransaction {
60 fn new(
62 conn: PooledConnection<SqliteConnectionManager>,
63 tenant: TenantContext,
64 search_extractor: Arc<SearchParameterExtractor>,
65 search_offloaded: bool,
66 ) -> StorageResult<Self> {
67 conn.execute("BEGIN IMMEDIATE", []).map_err(|e| {
69 StorageError::Transaction(TransactionError::RolledBack {
70 reason: format!("Failed to begin transaction: {}", e),
71 })
72 })?;
73
74 Ok(Self {
75 conn: Arc::new(Mutex::new(conn)),
76 active: true,
77 tenant,
78 search_extractor,
79 search_offloaded,
80 })
81 }
82
83 fn generate_id() -> String {
85 uuid::Uuid::new_v4().to_string()
86 }
87
88 fn index_resource(
90 &self,
91 conn: &rusqlite::Connection,
92 tenant_id: &str,
93 resource_type: &str,
94 resource_id: &str,
95 resource: &Value,
96 ) -> StorageResult<()> {
97 if self.search_offloaded {
99 return Ok(());
100 }
101
102 use super::search::writer::SqliteSearchIndexWriter;
103 use rusqlite::ToSql;
104
105 conn.execute(
107 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
108 params![tenant_id, resource_type, resource_id],
109 )
110 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
111
112 let values = self
114 .search_extractor
115 .extract(resource, resource_type)
116 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
117
118 for value in values {
120 let sql_params = SqliteSearchIndexWriter::to_sql_params(
121 tenant_id,
122 resource_type,
123 resource_id,
124 &value,
125 );
126
127 let param_refs: Vec<&dyn ToSql> =
129 sql_params.iter().map(Self::sql_value_to_ref).collect();
130
131 conn.execute(SqliteSearchIndexWriter::insert_sql(), param_refs.as_slice())
132 .map_err(|e| {
133 internal_error(format!("Failed to insert search index entry: {}", e))
134 })?;
135 }
136
137 tracing::debug!(
138 "Indexed resource {}/{} within transaction",
139 resource_type,
140 resource_id
141 );
142
143 Ok(())
144 }
145
146 fn sql_value_to_ref(value: &super::search::writer::SqlValue) -> &dyn rusqlite::ToSql {
148 use super::search::writer::SqlValue;
149 match value {
150 SqlValue::String(s) => s,
151 SqlValue::OptString(opt) => opt,
152 SqlValue::Int(i) => i,
153 SqlValue::OptInt(opt) => opt,
154 SqlValue::Float(f) => f,
155 SqlValue::Null => &rusqlite::types::Null,
156 }
157 }
158}
159
160#[async_trait]
161impl Transaction for SqliteTransaction {
162 async fn create(
163 &mut self,
164 resource_type: &str,
165 resource: Value,
166 ) -> StorageResult<StoredResource> {
167 if !self.active {
168 return Err(StorageError::Transaction(
169 TransactionError::InvalidTransaction,
170 ));
171 }
172
173 let conn = self.conn.lock();
174 let tenant_id = self.tenant.tenant_id().as_str();
175
176 let id = resource
178 .get("id")
179 .and_then(|v| v.as_str())
180 .map(|s| s.to_string())
181 .unwrap_or_else(Self::generate_id);
182
183 let exists: bool = conn
185 .query_row(
186 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
187 params![tenant_id, resource_type, id],
188 |_| Ok(true),
189 )
190 .unwrap_or(false);
191
192 if exists {
193 return Err(StorageError::Resource(ResourceError::AlreadyExists {
194 resource_type: resource_type.to_string(),
195 id: id.to_string(),
196 }));
197 }
198
199 let mut data = resource.clone();
201 if let Some(obj) = data.as_object_mut() {
202 obj.insert("id".to_string(), Value::String(id.clone()));
203 obj.insert(
204 "resourceType".to_string(),
205 Value::String(resource_type.to_string()),
206 );
207 }
208
209 let data_bytes = serde_json::to_vec(&data)
211 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
212
213 let now = Utc::now();
214 let last_updated = now.to_rfc3339();
215 let version_id = "1";
216
217 let fhir_version = FhirVersion::default_enabled();
219 let fhir_version_str = fhir_version.as_mime_param();
220
221 conn.execute(
223 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
224 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
225 params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
226 )
227 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
228
229 conn.execute(
231 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
232 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
233 params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
234 )
235 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
236
237 self.index_resource(&conn, tenant_id, resource_type, &id, &data)?;
239
240 Ok(StoredResource::from_storage(
241 resource_type,
242 &id,
243 version_id,
244 self.tenant.tenant_id().clone(),
245 data,
246 now,
247 now,
248 None,
249 fhir_version,
250 ))
251 }
252
253 async fn read(
254 &mut self,
255 resource_type: &str,
256 id: &str,
257 ) -> StorageResult<Option<StoredResource>> {
258 if !self.active {
259 return Err(StorageError::Transaction(
260 TransactionError::InvalidTransaction,
261 ));
262 }
263
264 let conn = self.conn.lock();
265 let tenant_id = self.tenant.tenant_id().as_str();
266
267 let result = conn.query_row(
268 "SELECT version_id, data, last_updated, is_deleted, fhir_version
269 FROM resources
270 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
271 params![tenant_id, resource_type, id],
272 |row| {
273 let version_id: String = row.get(0)?;
274 let data: Vec<u8> = row.get(1)?;
275 let last_updated: String = row.get(2)?;
276 let is_deleted: i32 = row.get(3)?;
277 let fhir_version: String = row.get(4)?;
278 Ok((version_id, data, last_updated, is_deleted, fhir_version))
279 },
280 );
281
282 match result {
283 Ok((version_id, data, last_updated, is_deleted, fhir_version_str)) => {
284 if is_deleted != 0 {
285 return Ok(None);
286 }
287
288 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
289 serialization_error(format!("Failed to deserialize resource: {}", e))
290 })?;
291
292 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
293 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
294 .with_timezone(&Utc);
295
296 let fhir_version = FhirVersion::from_storage(&fhir_version_str)
297 .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
298
299 Ok(Some(StoredResource::from_storage(
300 resource_type,
301 id,
302 version_id,
303 self.tenant.tenant_id().clone(),
304 json_data,
305 last_updated,
306 last_updated,
307 None,
308 fhir_version,
309 )))
310 }
311 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
312 Err(e) => Err(internal_error(format!("Failed to read resource: {}", e))),
313 }
314 }
315
316 async fn update(
317 &mut self,
318 current: &StoredResource,
319 resource: Value,
320 ) -> StorageResult<StoredResource> {
321 if !self.active {
322 return Err(StorageError::Transaction(
323 TransactionError::InvalidTransaction,
324 ));
325 }
326
327 let conn = self.conn.lock();
328 let tenant_id = self.tenant.tenant_id().as_str();
329 let resource_type = current.resource_type();
330 let id = current.id();
331
332 let db_version: Result<String, _> = conn.query_row(
334 "SELECT version_id FROM resources
335 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
336 params![tenant_id, resource_type, id],
337 |row| row.get(0),
338 );
339
340 let db_version = match db_version {
341 Ok(v) => v,
342 Err(rusqlite::Error::QueryReturnedNoRows) => {
343 return Err(StorageError::Resource(ResourceError::NotFound {
344 resource_type: resource_type.to_string(),
345 id: id.to_string(),
346 }));
347 }
348 Err(e) => {
349 return Err(internal_error(format!(
350 "Failed to get current version: {}",
351 e
352 )));
353 }
354 };
355
356 if db_version != current.version_id() {
357 return Err(StorageError::Concurrency(
358 ConcurrencyError::VersionConflict {
359 resource_type: resource_type.to_string(),
360 id: id.to_string(),
361 expected_version: current.version_id().to_string(),
362 actual_version: db_version,
363 },
364 ));
365 }
366
367 let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
369 let new_version_str = new_version.to_string();
370
371 let mut data = resource.clone();
373 if let Some(obj) = data.as_object_mut() {
374 obj.insert("id".to_string(), Value::String(id.to_string()));
375 obj.insert(
376 "resourceType".to_string(),
377 Value::String(resource_type.to_string()),
378 );
379 }
380
381 let data_bytes = serde_json::to_vec(&data)
383 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
384
385 let now = Utc::now();
386 let last_updated = now.to_rfc3339();
387
388 let fhir_version = current.fhir_version();
390 let fhir_version_str = fhir_version.as_mime_param();
391
392 conn.execute(
394 "UPDATE resources SET version_id = ?1, data = ?2, last_updated = ?3
395 WHERE tenant_id = ?4 AND resource_type = ?5 AND id = ?6",
396 params![
397 new_version_str,
398 data_bytes,
399 last_updated,
400 tenant_id,
401 resource_type,
402 id
403 ],
404 )
405 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
406
407 conn.execute(
409 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
410 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
411 params![tenant_id, resource_type, id, new_version_str, data_bytes, last_updated, fhir_version_str],
412 )
413 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
414
415 self.index_resource(&conn, tenant_id, resource_type, id, &data)?;
417
418 Ok(StoredResource::from_storage(
419 resource_type,
420 id,
421 new_version_str,
422 self.tenant.tenant_id().clone(),
423 data,
424 now,
425 now,
426 None,
427 fhir_version,
428 ))
429 }
430
431 async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
432 if !self.active {
433 return Err(StorageError::Transaction(
434 TransactionError::InvalidTransaction,
435 ));
436 }
437
438 let conn = self.conn.lock();
439 let tenant_id = self.tenant.tenant_id().as_str();
440
441 let result: Result<(String, Vec<u8>, String), _> = conn.query_row(
443 "SELECT version_id, data, fhir_version FROM resources
444 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
445 params![tenant_id, resource_type, id],
446 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
447 );
448
449 let (current_version, data, fhir_version_str) = match result {
450 Ok(v) => v,
451 Err(rusqlite::Error::QueryReturnedNoRows) => {
452 return Err(StorageError::Resource(ResourceError::NotFound {
453 resource_type: resource_type.to_string(),
454 id: id.to_string(),
455 }));
456 }
457 Err(e) => {
458 return Err(internal_error(format!("Failed to check resource: {}", e)));
459 }
460 };
461
462 let now = Utc::now();
463 let deleted_at = now.to_rfc3339();
464 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
465 let new_version_str = new_version.to_string();
466
467 conn.execute(
469 "UPDATE resources SET is_deleted = 1, deleted_at = ?1, version_id = ?2, last_updated = ?1
470 WHERE tenant_id = ?3 AND resource_type = ?4 AND id = ?5",
471 params![deleted_at, new_version_str, tenant_id, resource_type, id],
472 )
473 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
474
475 conn.execute(
477 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
478 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7)",
479 params![tenant_id, resource_type, id, new_version_str, data, deleted_at, fhir_version_str],
480 )
481 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
482
483 Ok(())
484 }
485
486 async fn commit(mut self: Box<Self>) -> StorageResult<()> {
487 if !self.active {
488 return Err(StorageError::Transaction(
489 TransactionError::InvalidTransaction,
490 ));
491 }
492
493 let conn = self.conn.lock();
494 conn.execute("COMMIT", []).map_err(|e| {
495 StorageError::Transaction(TransactionError::RolledBack {
496 reason: format!("Commit failed: {}", e),
497 })
498 })?;
499
500 self.active = false;
501 Ok(())
502 }
503
504 async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
505 if !self.active {
506 return Err(StorageError::Transaction(
507 TransactionError::InvalidTransaction,
508 ));
509 }
510
511 let conn = self.conn.lock();
512 conn.execute("ROLLBACK", []).map_err(|e| {
513 StorageError::Transaction(TransactionError::RolledBack {
514 reason: format!("Rollback failed: {}", e),
515 })
516 })?;
517
518 self.active = false;
519 Ok(())
520 }
521
522 fn tenant(&self) -> &TenantContext {
523 &self.tenant
524 }
525
526 fn is_active(&self) -> bool {
527 self.active
528 }
529}
530
531impl Drop for SqliteTransaction {
532 fn drop(&mut self) {
533 if self.active {
535 let conn = self.conn.lock();
536 let _ = conn.execute("ROLLBACK", []);
537 }
538 }
539}
540
541#[async_trait]
542impl TransactionProvider for SqliteBackend {
543 type Transaction = SqliteTransaction;
544
545 async fn begin_transaction(
546 &self,
547 tenant: &TenantContext,
548 _options: TransactionOptions,
549 ) -> StorageResult<Self::Transaction> {
550 let conn = self.get_connection()?;
551 SqliteTransaction::new(
552 conn,
553 tenant.clone(),
554 self.search_extractor().clone(),
555 self.is_search_offloaded(),
556 )
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563 use crate::core::ResourceStorage;
564 use crate::tenant::{TenantId, TenantPermissions};
565 use serde_json::json;
566
567 fn create_test_backend() -> SqliteBackend {
568 let backend = SqliteBackend::in_memory().unwrap();
569 backend.init_schema().unwrap();
570 backend
571 }
572
573 fn create_test_tenant() -> TenantContext {
574 TenantContext::new(
575 TenantId::new("test-tenant"),
576 TenantPermissions::full_access(),
577 )
578 }
579
580 #[tokio::test]
581 async fn test_transaction_commit() {
582 let backend = create_test_backend();
583 let tenant = create_test_tenant();
584
585 let mut tx = backend
587 .begin_transaction(&tenant, TransactionOptions::default())
588 .await
589 .unwrap();
590
591 let resource = json!({
593 "resourceType": "Patient",
594 "id": "patient-1",
595 "name": [{"family": "Test"}]
596 });
597 tx.create("Patient", resource).await.unwrap();
598
599 Box::new(tx).commit().await.unwrap();
601
602 let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
604 assert!(result.is_some());
605 }
606
607 #[tokio::test]
608 async fn test_transaction_rollback() {
609 let backend = create_test_backend();
610 let tenant = create_test_tenant();
611
612 let mut tx = backend
614 .begin_transaction(&tenant, TransactionOptions::default())
615 .await
616 .unwrap();
617
618 let resource = json!({
620 "resourceType": "Patient",
621 "id": "patient-1"
622 });
623 tx.create("Patient", resource).await.unwrap();
624
625 Box::new(tx).rollback().await.unwrap();
627
628 let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
630 assert!(result.is_none());
631 }
632
633 #[tokio::test]
634 async fn test_transaction_read_own_writes() {
635 let backend = create_test_backend();
636 let tenant = create_test_tenant();
637
638 let mut tx = backend
640 .begin_transaction(&tenant, TransactionOptions::default())
641 .await
642 .unwrap();
643
644 let resource = json!({
646 "resourceType": "Patient",
647 "id": "patient-1"
648 });
649 tx.create("Patient", resource).await.unwrap();
650
651 let read = tx.read("Patient", "patient-1").await.unwrap();
653 assert!(read.is_some());
654
655 Box::new(tx).rollback().await.unwrap();
656 }
657
658 #[tokio::test]
659 async fn test_transaction_update() {
660 let backend = create_test_backend();
661 let tenant = create_test_tenant();
662
663 let resource = json!({
665 "resourceType": "Patient",
666 "name": [{"family": "Original"}]
667 });
668 let created = backend
669 .create(&tenant, "Patient", resource, FhirVersion::default())
670 .await
671 .unwrap();
672
673 let mut tx = backend
675 .begin_transaction(&tenant, TransactionOptions::default())
676 .await
677 .unwrap();
678
679 let updated_data = json!({
680 "resourceType": "Patient",
681 "name": [{"family": "Updated"}]
682 });
683 let result = tx.update(&created, updated_data).await.unwrap();
684 assert_eq!(result.version_id(), "2");
685
686 Box::new(tx).commit().await.unwrap();
687
688 let read = backend
690 .read(&tenant, "Patient", created.id())
691 .await
692 .unwrap()
693 .unwrap();
694 assert_eq!(read.version_id(), "2");
695 }
696
697 #[tokio::test]
698 async fn test_transaction_delete() {
699 let backend = create_test_backend();
700 let tenant = create_test_tenant();
701
702 let resource = json!({
704 "resourceType": "Patient",
705 "id": "patient-1"
706 });
707 backend
708 .create(&tenant, "Patient", resource, FhirVersion::default())
709 .await
710 .unwrap();
711
712 let mut tx = backend
714 .begin_transaction(&tenant, TransactionOptions::default())
715 .await
716 .unwrap();
717
718 tx.delete("Patient", "patient-1").await.unwrap();
719 Box::new(tx).commit().await.unwrap();
720
721 let result = backend.read(&tenant, "Patient", "patient-1").await;
723 assert!(matches!(
724 result,
725 Err(StorageError::Resource(ResourceError::Gone { .. }))
726 ));
727 }
728
729 #[tokio::test]
730 async fn test_transaction_auto_rollback_on_drop() {
731 let backend = create_test_backend();
732 let tenant = create_test_tenant();
733
734 {
735 let mut tx = backend
737 .begin_transaction(&tenant, TransactionOptions::default())
738 .await
739 .unwrap();
740
741 let resource = json!({
743 "resourceType": "Patient",
744 "id": "patient-1"
745 });
746 tx.create("Patient", resource).await.unwrap();
747
748 }
750
751 let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
753 assert!(result.is_none());
754 }
755
756 #[tokio::test]
757 async fn test_transaction_is_active() {
758 let backend = create_test_backend();
759 let tenant = create_test_tenant();
760
761 let tx = backend
762 .begin_transaction(&tenant, TransactionOptions::default())
763 .await
764 .unwrap();
765
766 assert!(tx.is_active());
767
768 Box::new(tx).commit().await.unwrap();
769 }
771}