1use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use helios_fhir::FhirVersion;
8use parking_lot::Mutex;
9use r2d2::PooledConnection;
10use r2d2_sqlite::SqliteConnectionManager;
11use rusqlite::params;
12use serde_json::Value;
13
14use crate::core::{Transaction, TransactionOptions, TransactionProvider};
15use crate::error::{
16 BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
17};
18use crate::search::SearchParameterExtractor;
19use crate::tenant::TenantContext;
20use crate::types::StoredResource;
21
22use super::SqliteBackend;
23
24fn internal_error(message: String) -> StorageError {
25 StorageError::Backend(BackendError::Internal {
26 backend_name: "sqlite".to_string(),
27 message,
28 source: None,
29 })
30}
31
32fn serialization_error(message: String) -> StorageError {
33 StorageError::Backend(BackendError::SerializationError { message })
34}
35
36pub struct SqliteTransaction {
38 conn: Arc<Mutex<PooledConnection<SqliteConnectionManager>>>,
40 active: bool,
42 tenant: TenantContext,
44 search_extractor: Arc<SearchParameterExtractor>,
46 search_offloaded: bool,
48}
49
50impl std::fmt::Debug for SqliteTransaction {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("SqliteTransaction")
53 .field("active", &self.active)
54 .field("tenant", &self.tenant)
55 .finish()
56 }
57}
58
59impl SqliteTransaction {
60 fn new(
62 conn: PooledConnection<SqliteConnectionManager>,
63 tenant: TenantContext,
64 search_extractor: Arc<SearchParameterExtractor>,
65 search_offloaded: bool,
66 ) -> StorageResult<Self> {
67 conn.execute("BEGIN IMMEDIATE", []).map_err(|e| {
69 StorageError::Transaction(TransactionError::RolledBack {
70 reason: format!("Failed to begin transaction: {}", e),
71 })
72 })?;
73
74 Ok(Self {
75 conn: Arc::new(Mutex::new(conn)),
76 active: true,
77 tenant,
78 search_extractor,
79 search_offloaded,
80 })
81 }
82
83 fn generate_id() -> String {
85 uuid::Uuid::new_v4().to_string()
86 }
87
88 fn index_resource(
90 &self,
91 conn: &rusqlite::Connection,
92 tenant_id: &str,
93 resource_type: &str,
94 resource_id: &str,
95 resource: &Value,
96 ) -> StorageResult<()> {
97 if self.search_offloaded {
99 return Ok(());
100 }
101
102 use super::search::writer::SqliteSearchIndexWriter;
103 use rusqlite::ToSql;
104
105 conn.execute(
107 "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
108 params![tenant_id, resource_type, resource_id],
109 )
110 .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
111
112 let values = self
114 .search_extractor
115 .extract(resource, resource_type)
116 .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
117
118 for value in values {
120 let sql_params = SqliteSearchIndexWriter::to_sql_params(
121 tenant_id,
122 resource_type,
123 resource_id,
124 &value,
125 );
126
127 let param_refs: Vec<&dyn ToSql> =
129 sql_params.iter().map(Self::sql_value_to_ref).collect();
130
131 conn.execute(SqliteSearchIndexWriter::insert_sql(), param_refs.as_slice())
132 .map_err(|e| {
133 internal_error(format!("Failed to insert search index entry: {}", e))
134 })?;
135 }
136
137 tracing::debug!(
138 "Indexed resource {}/{} within transaction",
139 resource_type,
140 resource_id
141 );
142
143 Ok(())
144 }
145
146 fn sql_value_to_ref(value: &super::search::writer::SqlValue) -> &dyn rusqlite::ToSql {
148 use super::search::writer::SqlValue;
149 match value {
150 SqlValue::String(s) => s,
151 SqlValue::OptString(opt) => opt,
152 SqlValue::Int(i) => i,
153 SqlValue::OptInt(opt) => opt,
154 SqlValue::Float(f) => f,
155 SqlValue::Null => &rusqlite::types::Null,
156 }
157 }
158}
159
160#[async_trait]
161impl Transaction for SqliteTransaction {
162 async fn create(
163 &mut self,
164 resource_type: &str,
165 resource: Value,
166 ) -> StorageResult<StoredResource> {
167 if !self.active {
168 return Err(StorageError::Transaction(
169 TransactionError::InvalidTransaction,
170 ));
171 }
172
173 let conn = self.conn.lock();
174 let tenant_id = self.tenant.tenant_id().as_str();
175
176 let id = resource
178 .get("id")
179 .and_then(|v| v.as_str())
180 .map(|s| s.to_string())
181 .unwrap_or_else(Self::generate_id);
182
183 let exists: bool = conn
185 .query_row(
186 "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
187 params![tenant_id, resource_type, id],
188 |_| Ok(true),
189 )
190 .unwrap_or(false);
191
192 if exists {
193 return Err(StorageError::Resource(ResourceError::AlreadyExists {
194 resource_type: resource_type.to_string(),
195 id: id.to_string(),
196 }));
197 }
198
199 let mut data = resource.clone();
201 if let Some(obj) = data.as_object_mut() {
202 obj.insert("id".to_string(), Value::String(id.clone()));
203 obj.insert(
204 "resourceType".to_string(),
205 Value::String(resource_type.to_string()),
206 );
207 }
208
209 let data_bytes = serde_json::to_vec(&data)
211 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
212
213 let now = Utc::now();
214 let last_updated = now.to_rfc3339();
215 let version_id = "1";
216
217 let fhir_version = FhirVersion::default();
219 let fhir_version_str = fhir_version.as_mime_param();
220
221 conn.execute(
223 "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
224 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
225 params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
226 )
227 .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
228
229 conn.execute(
231 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
232 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
233 params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
234 )
235 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
236
237 self.index_resource(&conn, tenant_id, resource_type, &id, &data)?;
239
240 Ok(StoredResource::from_storage(
241 resource_type,
242 &id,
243 version_id,
244 self.tenant.tenant_id().clone(),
245 data,
246 now,
247 now,
248 None,
249 fhir_version,
250 ))
251 }
252
253 async fn read(
254 &mut self,
255 resource_type: &str,
256 id: &str,
257 ) -> StorageResult<Option<StoredResource>> {
258 if !self.active {
259 return Err(StorageError::Transaction(
260 TransactionError::InvalidTransaction,
261 ));
262 }
263
264 let conn = self.conn.lock();
265 let tenant_id = self.tenant.tenant_id().as_str();
266
267 let result = conn.query_row(
268 "SELECT version_id, data, last_updated, is_deleted, fhir_version
269 FROM resources
270 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
271 params![tenant_id, resource_type, id],
272 |row| {
273 let version_id: String = row.get(0)?;
274 let data: Vec<u8> = row.get(1)?;
275 let last_updated: String = row.get(2)?;
276 let is_deleted: i32 = row.get(3)?;
277 let fhir_version: String = row.get(4)?;
278 Ok((version_id, data, last_updated, is_deleted, fhir_version))
279 },
280 );
281
282 match result {
283 Ok((version_id, data, last_updated, is_deleted, fhir_version_str)) => {
284 if is_deleted != 0 {
285 return Ok(None);
286 }
287
288 let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
289 serialization_error(format!("Failed to deserialize resource: {}", e))
290 })?;
291
292 let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
293 .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
294 .with_timezone(&Utc);
295
296 let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
297
298 Ok(Some(StoredResource::from_storage(
299 resource_type,
300 id,
301 version_id,
302 self.tenant.tenant_id().clone(),
303 json_data,
304 last_updated,
305 last_updated,
306 None,
307 fhir_version,
308 )))
309 }
310 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
311 Err(e) => Err(internal_error(format!("Failed to read resource: {}", e))),
312 }
313 }
314
315 async fn update(
316 &mut self,
317 current: &StoredResource,
318 resource: Value,
319 ) -> StorageResult<StoredResource> {
320 if !self.active {
321 return Err(StorageError::Transaction(
322 TransactionError::InvalidTransaction,
323 ));
324 }
325
326 let conn = self.conn.lock();
327 let tenant_id = self.tenant.tenant_id().as_str();
328 let resource_type = current.resource_type();
329 let id = current.id();
330
331 let db_version: Result<String, _> = conn.query_row(
333 "SELECT version_id FROM resources
334 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
335 params![tenant_id, resource_type, id],
336 |row| row.get(0),
337 );
338
339 let db_version = match db_version {
340 Ok(v) => v,
341 Err(rusqlite::Error::QueryReturnedNoRows) => {
342 return Err(StorageError::Resource(ResourceError::NotFound {
343 resource_type: resource_type.to_string(),
344 id: id.to_string(),
345 }));
346 }
347 Err(e) => {
348 return Err(internal_error(format!(
349 "Failed to get current version: {}",
350 e
351 )));
352 }
353 };
354
355 if db_version != current.version_id() {
356 return Err(StorageError::Concurrency(
357 ConcurrencyError::VersionConflict {
358 resource_type: resource_type.to_string(),
359 id: id.to_string(),
360 expected_version: current.version_id().to_string(),
361 actual_version: db_version,
362 },
363 ));
364 }
365
366 let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
368 let new_version_str = new_version.to_string();
369
370 let mut data = resource.clone();
372 if let Some(obj) = data.as_object_mut() {
373 obj.insert("id".to_string(), Value::String(id.to_string()));
374 obj.insert(
375 "resourceType".to_string(),
376 Value::String(resource_type.to_string()),
377 );
378 }
379
380 let data_bytes = serde_json::to_vec(&data)
382 .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
383
384 let now = Utc::now();
385 let last_updated = now.to_rfc3339();
386
387 let fhir_version = current.fhir_version();
389 let fhir_version_str = fhir_version.as_mime_param();
390
391 conn.execute(
393 "UPDATE resources SET version_id = ?1, data = ?2, last_updated = ?3
394 WHERE tenant_id = ?4 AND resource_type = ?5 AND id = ?6",
395 params![
396 new_version_str,
397 data_bytes,
398 last_updated,
399 tenant_id,
400 resource_type,
401 id
402 ],
403 )
404 .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
405
406 conn.execute(
408 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
409 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
410 params![tenant_id, resource_type, id, new_version_str, data_bytes, last_updated, fhir_version_str],
411 )
412 .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
413
414 self.index_resource(&conn, tenant_id, resource_type, id, &data)?;
416
417 Ok(StoredResource::from_storage(
418 resource_type,
419 id,
420 new_version_str,
421 self.tenant.tenant_id().clone(),
422 data,
423 now,
424 now,
425 None,
426 fhir_version,
427 ))
428 }
429
430 async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
431 if !self.active {
432 return Err(StorageError::Transaction(
433 TransactionError::InvalidTransaction,
434 ));
435 }
436
437 let conn = self.conn.lock();
438 let tenant_id = self.tenant.tenant_id().as_str();
439
440 let result: Result<(String, Vec<u8>, String), _> = conn.query_row(
442 "SELECT version_id, data, fhir_version FROM resources
443 WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
444 params![tenant_id, resource_type, id],
445 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
446 );
447
448 let (current_version, data, fhir_version_str) = match result {
449 Ok(v) => v,
450 Err(rusqlite::Error::QueryReturnedNoRows) => {
451 return Err(StorageError::Resource(ResourceError::NotFound {
452 resource_type: resource_type.to_string(),
453 id: id.to_string(),
454 }));
455 }
456 Err(e) => {
457 return Err(internal_error(format!("Failed to check resource: {}", e)));
458 }
459 };
460
461 let now = Utc::now();
462 let deleted_at = now.to_rfc3339();
463 let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
464 let new_version_str = new_version.to_string();
465
466 conn.execute(
468 "UPDATE resources SET is_deleted = 1, deleted_at = ?1, version_id = ?2, last_updated = ?1
469 WHERE tenant_id = ?3 AND resource_type = ?4 AND id = ?5",
470 params![deleted_at, new_version_str, tenant_id, resource_type, id],
471 )
472 .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
473
474 conn.execute(
476 "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
477 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7)",
478 params![tenant_id, resource_type, id, new_version_str, data, deleted_at, fhir_version_str],
479 )
480 .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
481
482 Ok(())
483 }
484
485 async fn commit(mut self: Box<Self>) -> StorageResult<()> {
486 if !self.active {
487 return Err(StorageError::Transaction(
488 TransactionError::InvalidTransaction,
489 ));
490 }
491
492 let conn = self.conn.lock();
493 conn.execute("COMMIT", []).map_err(|e| {
494 StorageError::Transaction(TransactionError::RolledBack {
495 reason: format!("Commit failed: {}", e),
496 })
497 })?;
498
499 self.active = false;
500 Ok(())
501 }
502
503 async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
504 if !self.active {
505 return Err(StorageError::Transaction(
506 TransactionError::InvalidTransaction,
507 ));
508 }
509
510 let conn = self.conn.lock();
511 conn.execute("ROLLBACK", []).map_err(|e| {
512 StorageError::Transaction(TransactionError::RolledBack {
513 reason: format!("Rollback failed: {}", e),
514 })
515 })?;
516
517 self.active = false;
518 Ok(())
519 }
520
521 fn tenant(&self) -> &TenantContext {
522 &self.tenant
523 }
524
525 fn is_active(&self) -> bool {
526 self.active
527 }
528}
529
530impl Drop for SqliteTransaction {
531 fn drop(&mut self) {
532 if self.active {
534 let conn = self.conn.lock();
535 let _ = conn.execute("ROLLBACK", []);
536 }
537 }
538}
539
540#[async_trait]
541impl TransactionProvider for SqliteBackend {
542 type Transaction = SqliteTransaction;
543
544 async fn begin_transaction(
545 &self,
546 tenant: &TenantContext,
547 _options: TransactionOptions,
548 ) -> StorageResult<Self::Transaction> {
549 let conn = self.get_connection()?;
550 SqliteTransaction::new(
551 conn,
552 tenant.clone(),
553 self.search_extractor().clone(),
554 self.is_search_offloaded(),
555 )
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use crate::core::ResourceStorage;
563 use crate::tenant::{TenantId, TenantPermissions};
564 use serde_json::json;
565
566 fn create_test_backend() -> SqliteBackend {
567 let backend = SqliteBackend::in_memory().unwrap();
568 backend.init_schema().unwrap();
569 backend
570 }
571
572 fn create_test_tenant() -> TenantContext {
573 TenantContext::new(
574 TenantId::new("test-tenant"),
575 TenantPermissions::full_access(),
576 )
577 }
578
579 #[tokio::test]
580 async fn test_transaction_commit() {
581 let backend = create_test_backend();
582 let tenant = create_test_tenant();
583
584 let mut tx = backend
586 .begin_transaction(&tenant, TransactionOptions::default())
587 .await
588 .unwrap();
589
590 let resource = json!({
592 "resourceType": "Patient",
593 "id": "patient-1",
594 "name": [{"family": "Test"}]
595 });
596 tx.create("Patient", resource).await.unwrap();
597
598 Box::new(tx).commit().await.unwrap();
600
601 let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
603 assert!(result.is_some());
604 }
605
606 #[tokio::test]
607 async fn test_transaction_rollback() {
608 let backend = create_test_backend();
609 let tenant = create_test_tenant();
610
611 let mut tx = backend
613 .begin_transaction(&tenant, TransactionOptions::default())
614 .await
615 .unwrap();
616
617 let resource = json!({
619 "resourceType": "Patient",
620 "id": "patient-1"
621 });
622 tx.create("Patient", resource).await.unwrap();
623
624 Box::new(tx).rollback().await.unwrap();
626
627 let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
629 assert!(result.is_none());
630 }
631
632 #[tokio::test]
633 async fn test_transaction_read_own_writes() {
634 let backend = create_test_backend();
635 let tenant = create_test_tenant();
636
637 let mut tx = backend
639 .begin_transaction(&tenant, TransactionOptions::default())
640 .await
641 .unwrap();
642
643 let resource = json!({
645 "resourceType": "Patient",
646 "id": "patient-1"
647 });
648 tx.create("Patient", resource).await.unwrap();
649
650 let read = tx.read("Patient", "patient-1").await.unwrap();
652 assert!(read.is_some());
653
654 Box::new(tx).rollback().await.unwrap();
655 }
656
657 #[tokio::test]
658 async fn test_transaction_update() {
659 let backend = create_test_backend();
660 let tenant = create_test_tenant();
661
662 let resource = json!({
664 "resourceType": "Patient",
665 "name": [{"family": "Original"}]
666 });
667 let created = backend
668 .create(&tenant, "Patient", resource, FhirVersion::default())
669 .await
670 .unwrap();
671
672 let mut tx = backend
674 .begin_transaction(&tenant, TransactionOptions::default())
675 .await
676 .unwrap();
677
678 let updated_data = json!({
679 "resourceType": "Patient",
680 "name": [{"family": "Updated"}]
681 });
682 let result = tx.update(&created, updated_data).await.unwrap();
683 assert_eq!(result.version_id(), "2");
684
685 Box::new(tx).commit().await.unwrap();
686
687 let read = backend
689 .read(&tenant, "Patient", created.id())
690 .await
691 .unwrap()
692 .unwrap();
693 assert_eq!(read.version_id(), "2");
694 }
695
696 #[tokio::test]
697 async fn test_transaction_delete() {
698 let backend = create_test_backend();
699 let tenant = create_test_tenant();
700
701 let resource = json!({
703 "resourceType": "Patient",
704 "id": "patient-1"
705 });
706 backend
707 .create(&tenant, "Patient", resource, FhirVersion::default())
708 .await
709 .unwrap();
710
711 let mut tx = backend
713 .begin_transaction(&tenant, TransactionOptions::default())
714 .await
715 .unwrap();
716
717 tx.delete("Patient", "patient-1").await.unwrap();
718 Box::new(tx).commit().await.unwrap();
719
720 let result = backend.read(&tenant, "Patient", "patient-1").await;
722 assert!(matches!(
723 result,
724 Err(StorageError::Resource(ResourceError::Gone { .. }))
725 ));
726 }
727
728 #[tokio::test]
729 async fn test_transaction_auto_rollback_on_drop() {
730 let backend = create_test_backend();
731 let tenant = create_test_tenant();
732
733 {
734 let mut tx = backend
736 .begin_transaction(&tenant, TransactionOptions::default())
737 .await
738 .unwrap();
739
740 let resource = json!({
742 "resourceType": "Patient",
743 "id": "patient-1"
744 });
745 tx.create("Patient", resource).await.unwrap();
746
747 }
749
750 let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
752 assert!(result.is_none());
753 }
754
755 #[tokio::test]
756 async fn test_transaction_is_active() {
757 let backend = create_test_backend();
758 let tenant = create_test_tenant();
759
760 let tx = backend
761 .begin_transaction(&tenant, TransactionOptions::default())
762 .await
763 .unwrap();
764
765 assert!(tx.is_active());
766
767 Box::new(tx).commit().await.unwrap();
768 }
770}