Skip to main content

helios_persistence/backends/sqlite/
transaction.rs

1//! Transaction support for SQLite backend.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use helios_fhir::FhirVersion;
8use parking_lot::Mutex;
9use r2d2::PooledConnection;
10use r2d2_sqlite::SqliteConnectionManager;
11use rusqlite::params;
12use serde_json::Value;
13
14use crate::core::{Transaction, TransactionOptions, TransactionProvider};
15use crate::error::{
16    BackendError, ConcurrencyError, ResourceError, StorageError, StorageResult, TransactionError,
17};
18use crate::search::SearchParameterExtractor;
19use crate::tenant::TenantContext;
20use crate::types::StoredResource;
21
22use super::SqliteBackend;
23
24fn internal_error(message: String) -> StorageError {
25    StorageError::Backend(BackendError::Internal {
26        backend_name: "sqlite".to_string(),
27        message,
28        source: None,
29    })
30}
31
32fn serialization_error(message: String) -> StorageError {
33    StorageError::Backend(BackendError::SerializationError { message })
34}
35
36/// A SQLite transaction.
37pub struct SqliteTransaction {
38    /// The connection used for this transaction.
39    conn: Arc<Mutex<PooledConnection<SqliteConnectionManager>>>,
40    /// Whether the transaction is still active.
41    active: bool,
42    /// The tenant context for this transaction.
43    tenant: TenantContext,
44    /// Search parameter extractor for indexing resources.
45    search_extractor: Arc<SearchParameterExtractor>,
46    /// When true, search indexing is offloaded to a secondary backend.
47    search_offloaded: bool,
48}
49
50impl std::fmt::Debug for SqliteTransaction {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("SqliteTransaction")
53            .field("active", &self.active)
54            .field("tenant", &self.tenant)
55            .finish()
56    }
57}
58
59impl SqliteTransaction {
60    /// Create a new transaction.
61    fn new(
62        conn: PooledConnection<SqliteConnectionManager>,
63        tenant: TenantContext,
64        search_extractor: Arc<SearchParameterExtractor>,
65        search_offloaded: bool,
66    ) -> StorageResult<Self> {
67        // Start the transaction
68        conn.execute("BEGIN IMMEDIATE", []).map_err(|e| {
69            StorageError::Transaction(TransactionError::RolledBack {
70                reason: format!("Failed to begin transaction: {}", e),
71            })
72        })?;
73
74        Ok(Self {
75            conn: Arc::new(Mutex::new(conn)),
76            active: true,
77            tenant,
78            search_extractor,
79            search_offloaded,
80        })
81    }
82
83    /// Generate a new ID for a resource.
84    fn generate_id() -> String {
85        uuid::Uuid::new_v4().to_string()
86    }
87
88    /// Index a resource for search.
89    fn index_resource(
90        &self,
91        conn: &rusqlite::Connection,
92        tenant_id: &str,
93        resource_type: &str,
94        resource_id: &str,
95        resource: &Value,
96    ) -> StorageResult<()> {
97        // When search is offloaded to a secondary backend, skip local indexing
98        if self.search_offloaded {
99            return Ok(());
100        }
101
102        use super::search::writer::SqliteSearchIndexWriter;
103        use rusqlite::ToSql;
104
105        // First, delete any existing index entries for this resource
106        conn.execute(
107            "DELETE FROM search_index WHERE tenant_id = ?1 AND resource_type = ?2 AND resource_id = ?3",
108            params![tenant_id, resource_type, resource_id],
109        )
110        .map_err(|e| internal_error(format!("Failed to clear search index: {}", e)))?;
111
112        // Extract values using the registry-driven extractor
113        let values = self
114            .search_extractor
115            .extract(resource, resource_type)
116            .map_err(|e| internal_error(format!("Search parameter extraction failed: {}", e)))?;
117
118        // Write each extracted value to the index
119        for value in values {
120            let sql_params = SqliteSearchIndexWriter::to_sql_params(
121                tenant_id,
122                resource_type,
123                resource_id,
124                &value,
125            );
126
127            // Build parameter refs for rusqlite
128            let param_refs: Vec<&dyn ToSql> =
129                sql_params.iter().map(Self::sql_value_to_ref).collect();
130
131            conn.execute(SqliteSearchIndexWriter::insert_sql(), param_refs.as_slice())
132                .map_err(|e| {
133                    internal_error(format!("Failed to insert search index entry: {}", e))
134                })?;
135        }
136
137        tracing::debug!(
138            "Indexed resource {}/{} within transaction",
139            resource_type,
140            resource_id
141        );
142
143        Ok(())
144    }
145
146    /// Converts a SqlValue to a rusqlite-compatible reference.
147    fn sql_value_to_ref(value: &super::search::writer::SqlValue) -> &dyn rusqlite::ToSql {
148        use super::search::writer::SqlValue;
149        match value {
150            SqlValue::String(s) => s,
151            SqlValue::OptString(opt) => opt,
152            SqlValue::Int(i) => i,
153            SqlValue::OptInt(opt) => opt,
154            SqlValue::Float(f) => f,
155            SqlValue::Null => &rusqlite::types::Null,
156        }
157    }
158}
159
160#[async_trait]
161impl Transaction for SqliteTransaction {
162    async fn create(
163        &mut self,
164        resource_type: &str,
165        resource: Value,
166    ) -> StorageResult<StoredResource> {
167        if !self.active {
168            return Err(StorageError::Transaction(
169                TransactionError::InvalidTransaction,
170            ));
171        }
172
173        let conn = self.conn.lock();
174        let tenant_id = self.tenant.tenant_id().as_str();
175
176        // Get or generate ID
177        let id = resource
178            .get("id")
179            .and_then(|v| v.as_str())
180            .map(|s| s.to_string())
181            .unwrap_or_else(Self::generate_id);
182
183        // Check if resource already exists
184        let exists: bool = conn
185            .query_row(
186                "SELECT 1 FROM resources WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
187                params![tenant_id, resource_type, id],
188                |_| Ok(true),
189            )
190            .unwrap_or(false);
191
192        if exists {
193            return Err(StorageError::Resource(ResourceError::AlreadyExists {
194                resource_type: resource_type.to_string(),
195                id: id.to_string(),
196            }));
197        }
198
199        // Build the resource with id and resourceType
200        let mut data = resource.clone();
201        if let Some(obj) = data.as_object_mut() {
202            obj.insert("id".to_string(), Value::String(id.clone()));
203            obj.insert(
204                "resourceType".to_string(),
205                Value::String(resource_type.to_string()),
206            );
207        }
208
209        // Serialize the resource data
210        let data_bytes = serde_json::to_vec(&data)
211            .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
212
213        let now = Utc::now();
214        let last_updated = now.to_rfc3339();
215        let version_id = "1";
216
217        // Use default FHIR version for transaction operations
218        let fhir_version = FhirVersion::default();
219        let fhir_version_str = fhir_version.as_mime_param();
220
221        // Insert the resource
222        conn.execute(
223            "INSERT INTO resources (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
224             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
225            params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
226        )
227        .map_err(|e| internal_error(format!("Failed to insert resource: {}", e)))?;
228
229        // Insert into history
230        conn.execute(
231            "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
232             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
233            params![tenant_id, resource_type, id, version_id, data_bytes, last_updated, fhir_version_str],
234        )
235        .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
236
237        // Index the resource for search
238        self.index_resource(&conn, tenant_id, resource_type, &id, &data)?;
239
240        Ok(StoredResource::from_storage(
241            resource_type,
242            &id,
243            version_id,
244            self.tenant.tenant_id().clone(),
245            data,
246            now,
247            now,
248            None,
249            fhir_version,
250        ))
251    }
252
253    async fn read(
254        &mut self,
255        resource_type: &str,
256        id: &str,
257    ) -> StorageResult<Option<StoredResource>> {
258        if !self.active {
259            return Err(StorageError::Transaction(
260                TransactionError::InvalidTransaction,
261            ));
262        }
263
264        let conn = self.conn.lock();
265        let tenant_id = self.tenant.tenant_id().as_str();
266
267        let result = conn.query_row(
268            "SELECT version_id, data, last_updated, is_deleted, fhir_version
269             FROM resources
270             WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3",
271            params![tenant_id, resource_type, id],
272            |row| {
273                let version_id: String = row.get(0)?;
274                let data: Vec<u8> = row.get(1)?;
275                let last_updated: String = row.get(2)?;
276                let is_deleted: i32 = row.get(3)?;
277                let fhir_version: String = row.get(4)?;
278                Ok((version_id, data, last_updated, is_deleted, fhir_version))
279            },
280        );
281
282        match result {
283            Ok((version_id, data, last_updated, is_deleted, fhir_version_str)) => {
284                if is_deleted != 0 {
285                    return Ok(None);
286                }
287
288                let json_data: serde_json::Value = serde_json::from_slice(&data).map_err(|e| {
289                    serialization_error(format!("Failed to deserialize resource: {}", e))
290                })?;
291
292                let last_updated = chrono::DateTime::parse_from_rfc3339(&last_updated)
293                    .map_err(|e| internal_error(format!("Failed to parse last_updated: {}", e)))?
294                    .with_timezone(&Utc);
295
296                let fhir_version = FhirVersion::from_storage(&fhir_version_str).unwrap_or_default();
297
298                Ok(Some(StoredResource::from_storage(
299                    resource_type,
300                    id,
301                    version_id,
302                    self.tenant.tenant_id().clone(),
303                    json_data,
304                    last_updated,
305                    last_updated,
306                    None,
307                    fhir_version,
308                )))
309            }
310            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
311            Err(e) => Err(internal_error(format!("Failed to read resource: {}", e))),
312        }
313    }
314
315    async fn update(
316        &mut self,
317        current: &StoredResource,
318        resource: Value,
319    ) -> StorageResult<StoredResource> {
320        if !self.active {
321            return Err(StorageError::Transaction(
322                TransactionError::InvalidTransaction,
323            ));
324        }
325
326        let conn = self.conn.lock();
327        let tenant_id = self.tenant.tenant_id().as_str();
328        let resource_type = current.resource_type();
329        let id = current.id();
330
331        // Verify current version still matches (optimistic locking)
332        let db_version: Result<String, _> = conn.query_row(
333            "SELECT version_id FROM resources
334             WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
335            params![tenant_id, resource_type, id],
336            |row| row.get(0),
337        );
338
339        let db_version = match db_version {
340            Ok(v) => v,
341            Err(rusqlite::Error::QueryReturnedNoRows) => {
342                return Err(StorageError::Resource(ResourceError::NotFound {
343                    resource_type: resource_type.to_string(),
344                    id: id.to_string(),
345                }));
346            }
347            Err(e) => {
348                return Err(internal_error(format!(
349                    "Failed to get current version: {}",
350                    e
351                )));
352            }
353        };
354
355        if db_version != current.version_id() {
356            return Err(StorageError::Concurrency(
357                ConcurrencyError::VersionConflict {
358                    resource_type: resource_type.to_string(),
359                    id: id.to_string(),
360                    expected_version: current.version_id().to_string(),
361                    actual_version: db_version,
362                },
363            ));
364        }
365
366        // Calculate new version
367        let new_version: u64 = db_version.parse().unwrap_or(0) + 1;
368        let new_version_str = new_version.to_string();
369
370        // Build the resource with id and resourceType
371        let mut data = resource.clone();
372        if let Some(obj) = data.as_object_mut() {
373            obj.insert("id".to_string(), Value::String(id.to_string()));
374            obj.insert(
375                "resourceType".to_string(),
376                Value::String(resource_type.to_string()),
377            );
378        }
379
380        // Serialize the resource data
381        let data_bytes = serde_json::to_vec(&data)
382            .map_err(|e| serialization_error(format!("Failed to serialize resource: {}", e)))?;
383
384        let now = Utc::now();
385        let last_updated = now.to_rfc3339();
386
387        // Preserve FHIR version from the current resource
388        let fhir_version = current.fhir_version();
389        let fhir_version_str = fhir_version.as_mime_param();
390
391        // Update the resource
392        conn.execute(
393            "UPDATE resources SET version_id = ?1, data = ?2, last_updated = ?3
394             WHERE tenant_id = ?4 AND resource_type = ?5 AND id = ?6",
395            params![
396                new_version_str,
397                data_bytes,
398                last_updated,
399                tenant_id,
400                resource_type,
401                id
402            ],
403        )
404        .map_err(|e| internal_error(format!("Failed to update resource: {}", e)))?;
405
406        // Insert into history
407        conn.execute(
408            "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
409             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 0, ?7)",
410            params![tenant_id, resource_type, id, new_version_str, data_bytes, last_updated, fhir_version_str],
411        )
412        .map_err(|e| internal_error(format!("Failed to insert history: {}", e)))?;
413
414        // Re-index the resource for search
415        self.index_resource(&conn, tenant_id, resource_type, id, &data)?;
416
417        Ok(StoredResource::from_storage(
418            resource_type,
419            id,
420            new_version_str,
421            self.tenant.tenant_id().clone(),
422            data,
423            now,
424            now,
425            None,
426            fhir_version,
427        ))
428    }
429
430    async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()> {
431        if !self.active {
432            return Err(StorageError::Transaction(
433                TransactionError::InvalidTransaction,
434            ));
435        }
436
437        let conn = self.conn.lock();
438        let tenant_id = self.tenant.tenant_id().as_str();
439
440        // Check if resource exists
441        let result: Result<(String, Vec<u8>, String), _> = conn.query_row(
442            "SELECT version_id, data, fhir_version FROM resources
443             WHERE tenant_id = ?1 AND resource_type = ?2 AND id = ?3 AND is_deleted = 0",
444            params![tenant_id, resource_type, id],
445            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
446        );
447
448        let (current_version, data, fhir_version_str) = match result {
449            Ok(v) => v,
450            Err(rusqlite::Error::QueryReturnedNoRows) => {
451                return Err(StorageError::Resource(ResourceError::NotFound {
452                    resource_type: resource_type.to_string(),
453                    id: id.to_string(),
454                }));
455            }
456            Err(e) => {
457                return Err(internal_error(format!("Failed to check resource: {}", e)));
458            }
459        };
460
461        let now = Utc::now();
462        let deleted_at = now.to_rfc3339();
463        let new_version: u64 = current_version.parse().unwrap_or(0) + 1;
464        let new_version_str = new_version.to_string();
465
466        // Soft delete the resource
467        conn.execute(
468            "UPDATE resources SET is_deleted = 1, deleted_at = ?1, version_id = ?2, last_updated = ?1
469             WHERE tenant_id = ?3 AND resource_type = ?4 AND id = ?5",
470            params![deleted_at, new_version_str, tenant_id, resource_type, id],
471        )
472        .map_err(|e| internal_error(format!("Failed to delete resource: {}", e)))?;
473
474        // Insert deletion record into history
475        conn.execute(
476            "INSERT INTO resource_history (tenant_id, resource_type, id, version_id, data, last_updated, is_deleted, fhir_version)
477             VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, ?7)",
478            params![tenant_id, resource_type, id, new_version_str, data, deleted_at, fhir_version_str],
479        )
480        .map_err(|e| internal_error(format!("Failed to insert deletion history: {}", e)))?;
481
482        Ok(())
483    }
484
485    async fn commit(mut self: Box<Self>) -> StorageResult<()> {
486        if !self.active {
487            return Err(StorageError::Transaction(
488                TransactionError::InvalidTransaction,
489            ));
490        }
491
492        let conn = self.conn.lock();
493        conn.execute("COMMIT", []).map_err(|e| {
494            StorageError::Transaction(TransactionError::RolledBack {
495                reason: format!("Commit failed: {}", e),
496            })
497        })?;
498
499        self.active = false;
500        Ok(())
501    }
502
503    async fn rollback(mut self: Box<Self>) -> StorageResult<()> {
504        if !self.active {
505            return Err(StorageError::Transaction(
506                TransactionError::InvalidTransaction,
507            ));
508        }
509
510        let conn = self.conn.lock();
511        conn.execute("ROLLBACK", []).map_err(|e| {
512            StorageError::Transaction(TransactionError::RolledBack {
513                reason: format!("Rollback failed: {}", e),
514            })
515        })?;
516
517        self.active = false;
518        Ok(())
519    }
520
521    fn tenant(&self) -> &TenantContext {
522        &self.tenant
523    }
524
525    fn is_active(&self) -> bool {
526        self.active
527    }
528}
529
530impl Drop for SqliteTransaction {
531    fn drop(&mut self) {
532        // If transaction wasn't explicitly committed or rolled back, roll it back
533        if self.active {
534            let conn = self.conn.lock();
535            let _ = conn.execute("ROLLBACK", []);
536        }
537    }
538}
539
540#[async_trait]
541impl TransactionProvider for SqliteBackend {
542    type Transaction = SqliteTransaction;
543
544    async fn begin_transaction(
545        &self,
546        tenant: &TenantContext,
547        _options: TransactionOptions,
548    ) -> StorageResult<Self::Transaction> {
549        let conn = self.get_connection()?;
550        SqliteTransaction::new(
551            conn,
552            tenant.clone(),
553            self.search_extractor().clone(),
554            self.is_search_offloaded(),
555        )
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use crate::core::ResourceStorage;
563    use crate::tenant::{TenantId, TenantPermissions};
564    use serde_json::json;
565
566    fn create_test_backend() -> SqliteBackend {
567        let backend = SqliteBackend::in_memory().unwrap();
568        backend.init_schema().unwrap();
569        backend
570    }
571
572    fn create_test_tenant() -> TenantContext {
573        TenantContext::new(
574            TenantId::new("test-tenant"),
575            TenantPermissions::full_access(),
576        )
577    }
578
579    #[tokio::test]
580    async fn test_transaction_commit() {
581        let backend = create_test_backend();
582        let tenant = create_test_tenant();
583
584        // Start transaction
585        let mut tx = backend
586            .begin_transaction(&tenant, TransactionOptions::default())
587            .await
588            .unwrap();
589
590        // Create resource in transaction
591        let resource = json!({
592            "resourceType": "Patient",
593            "id": "patient-1",
594            "name": [{"family": "Test"}]
595        });
596        tx.create("Patient", resource).await.unwrap();
597
598        // Commit
599        Box::new(tx).commit().await.unwrap();
600
601        // Verify resource exists
602        let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
603        assert!(result.is_some());
604    }
605
606    #[tokio::test]
607    async fn test_transaction_rollback() {
608        let backend = create_test_backend();
609        let tenant = create_test_tenant();
610
611        // Start transaction
612        let mut tx = backend
613            .begin_transaction(&tenant, TransactionOptions::default())
614            .await
615            .unwrap();
616
617        // Create resource in transaction
618        let resource = json!({
619            "resourceType": "Patient",
620            "id": "patient-1"
621        });
622        tx.create("Patient", resource).await.unwrap();
623
624        // Rollback
625        Box::new(tx).rollback().await.unwrap();
626
627        // Verify resource does NOT exist
628        let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
629        assert!(result.is_none());
630    }
631
632    #[tokio::test]
633    async fn test_transaction_read_own_writes() {
634        let backend = create_test_backend();
635        let tenant = create_test_tenant();
636
637        // Start transaction
638        let mut tx = backend
639            .begin_transaction(&tenant, TransactionOptions::default())
640            .await
641            .unwrap();
642
643        // Create resource
644        let resource = json!({
645            "resourceType": "Patient",
646            "id": "patient-1"
647        });
648        tx.create("Patient", resource).await.unwrap();
649
650        // Read within same transaction
651        let read = tx.read("Patient", "patient-1").await.unwrap();
652        assert!(read.is_some());
653
654        Box::new(tx).rollback().await.unwrap();
655    }
656
657    #[tokio::test]
658    async fn test_transaction_update() {
659        let backend = create_test_backend();
660        let tenant = create_test_tenant();
661
662        // Create initial resource
663        let resource = json!({
664            "resourceType": "Patient",
665            "name": [{"family": "Original"}]
666        });
667        let created = backend
668            .create(&tenant, "Patient", resource, FhirVersion::default())
669            .await
670            .unwrap();
671
672        // Start transaction and update
673        let mut tx = backend
674            .begin_transaction(&tenant, TransactionOptions::default())
675            .await
676            .unwrap();
677
678        let updated_data = json!({
679            "resourceType": "Patient",
680            "name": [{"family": "Updated"}]
681        });
682        let result = tx.update(&created, updated_data).await.unwrap();
683        assert_eq!(result.version_id(), "2");
684
685        Box::new(tx).commit().await.unwrap();
686
687        // Verify update persisted
688        let read = backend
689            .read(&tenant, "Patient", created.id())
690            .await
691            .unwrap()
692            .unwrap();
693        assert_eq!(read.version_id(), "2");
694    }
695
696    #[tokio::test]
697    async fn test_transaction_delete() {
698        let backend = create_test_backend();
699        let tenant = create_test_tenant();
700
701        // Create initial resource
702        let resource = json!({
703            "resourceType": "Patient",
704            "id": "patient-1"
705        });
706        backend
707            .create(&tenant, "Patient", resource, FhirVersion::default())
708            .await
709            .unwrap();
710
711        // Start transaction and delete
712        let mut tx = backend
713            .begin_transaction(&tenant, TransactionOptions::default())
714            .await
715            .unwrap();
716
717        tx.delete("Patient", "patient-1").await.unwrap();
718        Box::new(tx).commit().await.unwrap();
719
720        // Verify deleted (returns Gone error)
721        let result = backend.read(&tenant, "Patient", "patient-1").await;
722        assert!(matches!(
723            result,
724            Err(StorageError::Resource(ResourceError::Gone { .. }))
725        ));
726    }
727
728    #[tokio::test]
729    async fn test_transaction_auto_rollback_on_drop() {
730        let backend = create_test_backend();
731        let tenant = create_test_tenant();
732
733        {
734            // Start transaction
735            let mut tx = backend
736                .begin_transaction(&tenant, TransactionOptions::default())
737                .await
738                .unwrap();
739
740            // Create resource
741            let resource = json!({
742                "resourceType": "Patient",
743                "id": "patient-1"
744            });
745            tx.create("Patient", resource).await.unwrap();
746
747            // Drop without commit or rollback
748        }
749
750        // Verify resource does NOT exist (auto-rollback)
751        let result = backend.read(&tenant, "Patient", "patient-1").await.unwrap();
752        assert!(result.is_none());
753    }
754
755    #[tokio::test]
756    async fn test_transaction_is_active() {
757        let backend = create_test_backend();
758        let tenant = create_test_tenant();
759
760        let tx = backend
761            .begin_transaction(&tenant, TransactionOptions::default())
762            .await
763            .unwrap();
764
765        assert!(tx.is_active());
766
767        Box::new(tx).commit().await.unwrap();
768        // After commit, we can't check is_active since tx is consumed
769    }
770}