Skip to main content

helios_persistence/backends/sqlite/
transaction.rs

1//! Transaction support for SQLite backend.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use helios_fhir::FhirVersion;
8use parking_lot::Mutex;
9use r2d2::PooledConnection;
10use r2d2_sqlite::SqliteConnectionManager;
11use rusqlite::params;
12use serde_json::Value;
13
14use crate::core::{Transaction, TransactionOptions, TransactionProvider};
15use crate::error::{
16    BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
17};
18use crate::search::SearchParameterExtractor;
19use crate::tenant::TenantContext;
20use crate::types::StoredResource;
21
22use super::SqliteBackend;
23
24fn internal_error(message: String) -> StorageError {
25    StorageError::Backend(BackendError::Internal {
26        backend_name: "sqlite".to_string(),
27        message,
28        source: None,
29    })
30}
31
32fn serialization_error(message: String) -> StorageError {
33    StorageError::Backend(BackendError::SerializationError { message })
34}
35
36/// A SQLite transaction.
37pub struct SqliteTransaction {
38    /// The connection used for this transaction.
39    conn: Arc<Mutex<PooledConnection<SqliteConnectionManager>>>,
40    /// Whether the transaction is still active.
41    active: bool,
42    /// The tenant context for this transaction.
43    tenant: TenantContext,
44    /// Search parameter extractor for indexing resources.
45    search_extractor: Arc<SearchParameterExtractor>,
46    /// When true, search indexing is offloaded to a secondary backend.
47    search_offloaded: bool,
48}
49
50impl std::fmt::Debug for SqliteTransaction {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("SqliteTransaction")
53            .field("active", &self.active)
54            .field("tenant", &self.tenant)
55            .finish()
56    }
57}
58
59impl SqliteTransaction {
60    /// Create a new transaction.
61    fn new(
62        conn: PooledConnection<SqliteConnectionManager>,
63        tenant: TenantContext,
64        search_extractor: Arc<SearchParameterExtractor>,
65        search_offloaded: bool,
66    ) -> StorageResult<Self> {
67        // Start the transaction
68        conn.execute("BEGIN IMMEDIATE", []).map_err(|e| {
69            StorageError::Transaction(TransactionError::RolledBack {
70                reason: format!("Failed to begin transaction: {}", e),
71            })
72        })?;
73
74        Ok(Self {
75            conn: Arc::new(Mutex::new(conn)),
76            active: true,
77            tenant,
78            search_extractor,
79            search_offloaded,
80        })
81    }
82
83    /// Generate a new ID for a resource.
84    fn generate_id() -> String {
85        uuid::Uuid::new_v4().to_string()
86    }
87
88    /// Index a resource for search.
89    fn index_resource(
90        &self,
91        conn: &rusqlite::Connection,
92        tenant_id: &str,
93        resource_type: &str,
94        resource_id: &str,
95        resource: &Value,
96    ) -> StorageResult<()> {
97        // When search is offloaded to a secondary backend, skip local indexing
98        if self.search_offloaded {
99            return Ok(());
100        }
101
102        use super::search::writer::SqliteSearchIndexWriter;
103        use rusqlite::ToSql;
104
105        // First, delete any existing index entries for this resource
106        conn.execute(
107            "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
108            params![tenant_id, resource_type, resource_id],
109        )
110        .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
111
112        // Extract values using the registry-driven extractor
113        let values = self
114            .search_extractor
115            .extract(resource, resource_type)
116            .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
117
118        // Write each extracted value to the index
119        for value in values {
120            let sql_params = SqliteSearchIndexWriter::to_sql_params(
121                tenant_id,
122                resource_type,
123                resource_id,
124                &value,
125            );
126
127            // Build parameter refs for rusqlite
128            let param_refs: Vec<&dyn ToSql> =
129                sql_params.iter().map(Self::sql_value_to_ref).collect();
130
131            conn.execute(SqliteSearchIndexWriter::insert_sql(), param_refs.as_slice())
132                .map_err(|e| {
133                    internal_error(format!("Failed to insert search index entry: {}", e))
134                })?;
135        }
136
137        tracing::debug!(
138            "Indexed resource {}/{} within transaction",
139            resource_type,
140            resource_id
141        );
142
143        Ok(())
144    }
145
146    /// Converts a SqlValue to a rusqlite-compatible reference.
147    fn sql_value_to_ref(value: &super::search::writer::SqlValue) -> &dyn rusqlite::ToSql {
148        use super::search::writer::SqlValue;
149        match value {
150            SqlValue::String(s) => s,
151            SqlValue::OptString(opt) => opt,
152            SqlValue::Int(i) => i,
153            SqlValue::OptInt(opt) => opt,
154            SqlValue::Float(f) => f,
155            SqlValue::Null => &rusqlite::types::Null,
156        }
157    }
158}
159
160#[async_trait]
161impl Transaction for SqliteTransaction {
162    async fn create(
163        &mut self,
164        resource_type: &str,
165        resource: Value,
166    ) -> StorageResult<StoredResource> {
167        if !self.active {
168            return Err(StorageError::Transaction(
169                TransactionError::InvalidTransaction,
170            ));
171        }
172
173        let conn = self.conn.lock();
174        let tenant_id = self.tenant.tenant_id().as_str();
175
176        // Get or generate ID
177        let id = resource
178            .get("id")
179            .and_then(|v| v.as_str())
180            .map(|s| s.to_string())
181            .unwrap_or_else(Self::generate_id);
182
183        // Check if resource already exists
184        let exists: bool = conn
185            .query_row(
186                "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
187                params![tenant_id, resource_type, id],
188                |_| Ok(true),
189            )
190            .unwrap_or(false);
191
192        if exists {
193            return Err(StorageError::Resource(ResourceError::AlreadyExists {
194                resource_type: resource_type.to_string(),
195                id: id.to_string(),
196            }));
197        }
198
199        // Build the resource with id and resourceType
200        let mut data = resource.clone();
201        if let Some(obj) = data.as_object_mut() {
202            obj.insert("id".to_string(), Value::String(id.clone()));
203            obj.insert(
204                "resourceType".to_string(),
205                Value::String(resource_type.to_string()),
206            );
207        }
208
209        // Serialize the resource data
210        let data_bytes = serde_json::to_vec(&data)
211            .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
212
213        let now = Utc::now();
214        let last_updated = now.to_rfc3339();
215        let version_id = "1";
216
217        // Use default FHIR version for transaction operations
218        let fhir_version = FhirVersion::default_enabled();
219        let fhir_version_str = fhir_version.as_mime_param();
220
221        // Insert the resource
222        conn.execute(
223            "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
224             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
225            params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
226        )
227        .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
228
229        // Insert into history
230        conn.execute(
231            "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
232             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
233            params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
234        )
235        .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
236
237        // Index the resource for search
238        self.index_resource(&conn, tenant_id, resource_type, &id, &data)?;
239
240        Ok(StoredResource::from_storage(
241            resource_type,
242            &id,
243            version_id,
244            self.tenant.tenant_id().clone(),
245            data,
246            now,
247            now,
248            None,
249            fhir_version,
250        ))
251    }
252
253    async fn read(
254        &mut self,
255        resource_type: &str,
256        id: &str,
257    ) -> StorageResult<Option<StoredResource>> {
258        if !self.active {
259            return Err(StorageError::Transaction(
260                TransactionError::InvalidTransaction,
261            ));
262        }
263
264        let conn = self.conn.lock();
265        let tenant_id = self.tenant.tenant_id().as_str();
266
267        let result = conn.query_row(
268            "SELECT version_id, data, last_updated, is_deleted, fhir_version
269             FROM resources
270             WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
271            params![tenant_id, resource_type, id],
272            |row| {
273                let version_id: String = row.get(0)?;
274                let data: Vec<u8> = row.get(1)?;
275                let last_updated: String = row.get(2)?;
276                let is_deleted: i32 = row.get(3)?;
277                let fhir_version: String = row.get(4)?;
278                Ok((version_id, data, last_updated, is_deleted, fhir_version))
279            },
280        );
281
282        match result {
283            Ok((version_id, data, last_updated, is_deleted, fhir_version_str)) => {
284                if is_deleted != 0 {
285                    return Ok(None);
286                }
287
288                let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
289                    serialization_error(format!("Failed to deserialize resource: {}", e))
290                })?;
291
292                let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
293                    .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
294                    .with_timezone(&Utc);
295
296                let fhir_version = FhirVersion::from_storage(&fhir_version_str)
297                    .unwrap_or_else(helios_fhir::FhirVersion::default_enabled);
298
299                Ok(Some(StoredResource::from_storage(
300                    resource_type,
301                    id,
302                    version_id,
303                    self.tenant.tenant_id().clone(),
304                    json_data,
305                    last_updated,
306                    last_updated,
307                    None,
308                    fhir_version,
309                )))
310            }
311            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
312            Err(e) => Err(internal_error(format!("Failed to read resource: {}", e))),
313        }
314    }
315
316    async fn update(
317        &mut self,
318        current: &StoredResource,
319        resource: Value,
320    ) -> StorageResult<StoredResource> {
321        if !self.active {
322            return Err(StorageError::Transaction(
323                TransactionError::InvalidTransaction,
324            ));
325        }
326
327        let conn = self.conn.lock();
328        let tenant_id = self.tenant.tenant_id().as_str();
329        let resource_type = current.resource_type();
330        let id = current.id();
331
332        // Verify current version still matches (optimistic locking)
333        let db_version: Result<String, _> = conn.query_row(
334            "SELECT version_id FROM resources
335             WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
336            params![tenant_id, resource_type, id],
337            |row| row.get(0),
338        );
339
340        let db_version = match db_version {
341            Ok(v) => v,
342            Err(rusqlite::Error::QueryReturnedNoRows) => {
343                return Err(StorageError::Resource(ResourceError::NotFound {
344                    resource_type: resource_type.to_string(),
345                    id: id.to_string(),
346                }));
347            }
348            Err(e) => {
349                return Err(internal_error(format!(
350                    "Failed to get current version: {}",
351                    e
352                )));
353            }
354        };
355
356        if db_version != current.version_id() {
357            return Err(StorageError::Concurrency(
358                ConcurrencyError::VersionConflict {
359                    resource_type: resource_type.to_string(),
360                    id: id.to_string(),
361                    expected_version: current.version_id().to_string(),
362                    actual_version: db_version,
363                },
364            ));
365        }
366
367        // Calculate new version
368        let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
369        let new_version_str = new_version.to_string();
370
371        // Build the resource with id and resourceType
372        let mut data = resource.clone();
373        if let Some(obj) = data.as_object_mut() {
374            obj.insert("id".to_string(), Value::String(id.to_string()));
375            obj.insert(
376                "resourceType".to_string(),
377                Value::String(resource_type.to_string()),
378            );
379        }
380
381        // Serialize the resource data
382        let data_bytes = serde_json::to_vec(&data)
383            .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
384
385        let now = Utc::now();
386        let last_updated = now.to_rfc3339();
387
388        // Preserve FHIR version from the current resource
389        let fhir_version = current.fhir_version();
390        let fhir_version_str = fhir_version.as_mime_param();
391
392        // Update the resource
393        conn.execute(
394            "UPDATE resources SET version_id = ?1, data = ?2, last_updated = ?3
395             WHERE tenant_id = ?4 AND resource_type = ?5 AND id = ?6",
396            params![
397                new_version_str,
398                data_bytes,
399                last_updated,
400                tenant_id,
401                resource_type,
402                id
403            ],
404        )
405        .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
406
407        // Insert into history
408        conn.execute(
409            "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
410             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
411            params![tenant_id, resource_type, id, new_version_str, data_bytes, last_updated, fhir_version_str],
412        )
413        .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
414
415        // Re-index the resource for search
416        self.index_resource(&conn, tenant_id, resource_type, id, &data)?;
417
418        Ok(StoredResource::from_storage(
419            resource_type,
420            id,
421            new_version_str,
422            self.tenant.tenant_id().clone(),
423            data,
424            now,
425            now,
426            None,
427            fhir_version,
428        ))
429    }
430
431    async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
432        if !self.active {
433            return Err(StorageError::Transaction(
434                TransactionError::InvalidTransaction,
435            ));
436        }
437
438        let conn = self.conn.lock();
439        let tenant_id = self.tenant.tenant_id().as_str();
440
441        // Check if resource exists
442        let result: Result<(String, Vec<u8>, String), _> = conn.query_row(
443            "SELECT version_id, data, fhir_version FROM resources
444             WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
445            params![tenant_id, resource_type, id],
446            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
447        );
448
449        let (current_version, data, fhir_version_str) = match result {
450            Ok(v) => v,
451            Err(rusqlite::Error::QueryReturnedNoRows) => {
452                return Err(StorageError::Resource(ResourceError::NotFound {
453                    resource_type: resource_type.to_string(),
454                    id: id.to_string(),
455                }));
456            }
457            Err(e) => {
458                return Err(internal_error(format!("Failed to check resource: {}", e)));
459            }
460        };
461
462        let now = Utc::now();
463        let deleted_at = now.to_rfc3339();
464        let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
465        let new_version_str = new_version.to_string();
466
467        // Soft delete the resource
468        conn.execute(
469            "UPDATE resources SET is_deleted = 1, deleted_at = ?1, version_id = ?2, last_updated = ?1
470             WHERE tenant_id = ?3 AND resource_type = ?4 AND id = ?5",
471            params![deleted_at, new_version_str, tenant_id, resource_type, id],
472        )
473        .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
474
475        // Insert deletion record into history
476        conn.execute(
477            "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
478             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7)",
479            params![tenant_id, resource_type, id, new_version_str, data, deleted_at, fhir_version_str],
480        )
481        .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
482
483        Ok(())
484    }
485
486    async fn commit(mut self: Box<Self>) -> StorageResult<()> {
487        if !self.active {
488            return Err(StorageError::Transaction(
489                TransactionError::InvalidTransaction,
490            ));
491        }
492
493        let conn = self.conn.lock();
494        conn.execute("COMMIT", []).map_err(|e| {
495            StorageError::Transaction(TransactionError::RolledBack {
496                reason: format!("Commit failed: {}", e),
497            })
498        })?;
499
500        self.active = false;
501        Ok(())
502    }
503
504    async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
505        if !self.active {
506            return Err(StorageError::Transaction(
507                TransactionError::InvalidTransaction,
508            ));
509        }
510
511        let conn = self.conn.lock();
512        conn.execute("ROLLBACK", []).map_err(|e| {
513            StorageError::Transaction(TransactionError::RolledBack {
514                reason: format!("Rollback failed: {}", e),
515            })
516        })?;
517
518        self.active = false;
519        Ok(())
520    }
521
522    fn tenant(&self) -> &TenantContext {
523        &self.tenant
524    }
525
526    fn is_active(&self) -> bool {
527        self.active
528    }
529}
530
531impl Drop for SqliteTransaction {
532    fn drop(&mut self) {
533        // If transaction wasn't explicitly committed or rolled back, roll it back
534        if self.active {
535            let conn = self.conn.lock();
536            let _ = conn.execute("ROLLBACK", []);
537        }
538    }
539}
540
541#[async_trait]
542impl TransactionProvider for SqliteBackend {
543    type Transaction = SqliteTransaction;
544
545    async fn begin_transaction(
546        &self,
547        tenant: &TenantContext,
548        _options: TransactionOptions,
549    ) -> StorageResult<Self::Transaction> {
550        let conn = self.get_connection()?;
551        SqliteTransaction::new(
552            conn,
553            tenant.clone(),
554            self.search_extractor().clone(),
555            self.is_search_offloaded(),
556        )
557    }
558}
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563    use crate::core::ResourceStorage;
564    use crate::tenant::{TenantId, TenantPermissions};
565    use serde_json::json;
566
567    fn create_test_backend() -> SqliteBackend {
568        let backend = SqliteBackend::in_memory().unwrap();
569        backend.init_schema().unwrap();
570        backend
571    }
572
573    fn create_test_tenant() -> TenantContext {
574        TenantContext::new(
575            TenantId::new("test-tenant"),
576            TenantPermissions::full_access(),
577        )
578    }
579
580    #[tokio::test]
581    async fn test_transaction_commit() {
582        let backend = create_test_backend();
583        let tenant = create_test_tenant();
584
585        // Start transaction
586        let mut tx = backend
587            .begin_transaction(&tenant, TransactionOptions::default())
588            .await
589            .unwrap();
590
591        // Create resource in transaction
592        let resource = json!({
593            "resourceType": "Patient",
594            "id": "patient-1",
595            "name": [{"family": "Test"}]
596        });
597        tx.create("Patient", resource).await.unwrap();
598
599        // Commit
600        Box::new(tx).commit().await.unwrap();
601
602        // Verify resource exists
603        let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
604        assert!(result.is_some());
605    }
606
607    #[tokio::test]
608    async fn test_transaction_rollback() {
609        let backend = create_test_backend();
610        let tenant = create_test_tenant();
611
612        // Start transaction
613        let mut tx = backend
614            .begin_transaction(&tenant, TransactionOptions::default())
615            .await
616            .unwrap();
617
618        // Create resource in transaction
619        let resource = json!({
620            "resourceType": "Patient",
621            "id": "patient-1"
622        });
623        tx.create("Patient", resource).await.unwrap();
624
625        // Rollback
626        Box::new(tx).rollback().await.unwrap();
627
628        // Verify resource does NOT exist
629        let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
630        assert!(result.is_none());
631    }
632
633    #[tokio::test]
634    async fn test_transaction_read_own_writes() {
635        let backend = create_test_backend();
636        let tenant = create_test_tenant();
637
638        // Start transaction
639        let mut tx = backend
640            .begin_transaction(&tenant, TransactionOptions::default())
641            .await
642            .unwrap();
643
644        // Create resource
645        let resource = json!({
646            "resourceType": "Patient",
647            "id": "patient-1"
648        });
649        tx.create("Patient", resource).await.unwrap();
650
651        // Read within same transaction
652        let read = tx.read("Patient", "patient-1").await.unwrap();
653        assert!(read.is_some());
654
655        Box::new(tx).rollback().await.unwrap();
656    }
657
658    #[tokio::test]
659    async fn test_transaction_update() {
660        let backend = create_test_backend();
661        let tenant = create_test_tenant();
662
663        // Create initial resource
664        let resource = json!({
665            "resourceType": "Patient",
666            "name": [{"family": "Original"}]
667        });
668        let created = backend
669            .create(&tenant, "Patient", resource, FhirVersion::default())
670            .await
671            .unwrap();
672
673        // Start transaction and update
674        let mut tx = backend
675            .begin_transaction(&tenant, TransactionOptions::default())
676            .await
677            .unwrap();
678
679        let updated_data = json!({
680            "resourceType": "Patient",
681            "name": [{"family": "Updated"}]
682        });
683        let result = tx.update(&created, updated_data).await.unwrap();
684        assert_eq!(result.version_id(), "2");
685
686        Box::new(tx).commit().await.unwrap();
687
688        // Verify update persisted
689        let read = backend
690            .read(&tenant, "Patient", created.id())
691            .await
692            .unwrap()
693            .unwrap();
694        assert_eq!(read.version_id(), "2");
695    }
696
697    #[tokio::test]
698    async fn test_transaction_delete() {
699        let backend = create_test_backend();
700        let tenant = create_test_tenant();
701
702        // Create initial resource
703        let resource = json!({
704            "resourceType": "Patient",
705            "id": "patient-1"
706        });
707        backend
708            .create(&tenant, "Patient", resource, FhirVersion::default())
709            .await
710            .unwrap();
711
712        // Start transaction and delete
713        let mut tx = backend
714            .begin_transaction(&tenant, TransactionOptions::default())
715            .await
716            .unwrap();
717
718        tx.delete("Patient", "patient-1").await.unwrap();
719        Box::new(tx).commit().await.unwrap();
720
721        // Verify deleted (returns Gone error)
722        let result = backend.read(&tenant, "Patient", "patient-1").await;
723        assert!(matches!(
724            result,
725            Err(StorageError::Resource(ResourceError::Gone { .. }))
726        ));
727    }
728
729    #[tokio::test]
730    async fn test_transaction_auto_rollback_on_drop() {
731        let backend = create_test_backend();
732        let tenant = create_test_tenant();
733
734        {
735            // Start transaction
736            let mut tx = backend
737                .begin_transaction(&tenant, TransactionOptions::default())
738                .await
739                .unwrap();
740
741            // Create resource
742            let resource = json!({
743                "resourceType": "Patient",
744                "id": "patient-1"
745            });
746            tx.create("Patient", resource).await.unwrap();
747
748            // Drop without commit or rollback
749        }
750
751        // Verify resource does NOT exist (auto-rollback)
752        let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
753        assert!(result.is_none());
754    }
755
756    #[tokio::test]
757    async fn test_transaction_is_active() {
758        let backend = create_test_backend();
759        let tenant = create_test_tenant();
760
761        let tx = backend
762            .begin_transaction(&tenant, TransactionOptions::default())
763            .await
764            .unwrap();
765
766        assert!(tx.is_active());
767
768        Box::new(tx).commit().await.unwrap();
769        // After commit, we can't check is_active since tx is consumed
770    }
771}