Skip to main content

helios_persistence/core/
transaction.rs

1//! Transaction traits for ACID operations.
2//!
3//! This module defines traits for transactional storage operations,
4//! including support for FHIR transaction and batch bundles.
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10use crate::error::{StorageResult, TransactionError};
11use crate::tenant::TenantContext;
12use crate::types::StoredResource;
13
14use super::storage::ResourceStorage;
15
16/// Transaction isolation levels.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum IsolationLevel {
19    /// Read committed - sees only committed data.
20    #[default]
21    ReadCommitted,
22    /// Repeatable read - consistent reads within transaction.
23    RepeatableRead,
24    /// Serializable - full isolation (may reduce concurrency).
25    Serializable,
26    /// Snapshot - point-in-time consistent view.
27    Snapshot,
28}
29
30impl std::fmt::Display for IsolationLevel {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            IsolationLevel::ReadCommitted => write!(f, "read-committed"),
34            IsolationLevel::RepeatableRead => write!(f, "repeatable-read"),
35            IsolationLevel::Serializable => write!(f, "serializable"),
36            IsolationLevel::Snapshot => write!(f, "snapshot"),
37        }
38    }
39}
40
41/// Locking strategy for concurrent access.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub enum LockingStrategy {
44    /// Optimistic locking using version numbers (If-Match).
45    #[default]
46    Optimistic,
47    /// Pessimistic locking with row-level locks.
48    Pessimistic,
49    /// No locking (for read-only transactions).
50    None,
51}
52
53/// Options for starting a transaction.
54#[derive(Debug, Clone, Default)]
55pub struct TransactionOptions {
56    /// The isolation level for the transaction.
57    pub isolation_level: IsolationLevel,
58    /// The locking strategy to use.
59    pub locking_strategy: LockingStrategy,
60    /// Timeout in milliseconds (0 = no timeout).
61    pub timeout_ms: u64,
62    /// Whether this is a read-only transaction.
63    pub read_only: bool,
64}
65
66impl TransactionOptions {
67    /// Creates new options with defaults.
68    pub fn new() -> Self {
69        Self::default()
70    }
71
72    /// Sets the isolation level.
73    pub fn isolation_level(mut self, level: IsolationLevel) -> Self {
74        self.isolation_level = level;
75        self
76    }
77
78    /// Sets the locking strategy.
79    pub fn locking_strategy(mut self, strategy: LockingStrategy) -> Self {
80        self.locking_strategy = strategy;
81        self
82    }
83
84    /// Sets the timeout.
85    pub fn timeout_ms(mut self, timeout: u64) -> Self {
86        self.timeout_ms = timeout;
87        self
88    }
89
90    /// Marks this as a read-only transaction.
91    pub fn read_only(mut self) -> Self {
92        self.read_only = true;
93        self.locking_strategy = LockingStrategy::None;
94        self
95    }
96}
97
98/// A database transaction.
99///
100/// This trait represents an active transaction that can perform CRUD operations
101/// atomically. Changes are only persisted when `commit()` is called.
102///
103/// # Example
104///
105/// ```ignore
106/// use helios_persistence::core::{TransactionProvider, Transaction};
107///
108/// async fn transfer_care<S: TransactionProvider>(
109///     storage: &S,
110///     tenant: &TenantContext,
111/// ) -> Result<(), StorageError> {
112///     let mut tx = storage.begin_transaction(tenant, TransactionOptions::new()).await?;
113///
114///     // Read patient
115///     let patient = tx.read("Patient", "123").await?
116///         .ok_or(StorageError::Resource(ResourceError::NotFound { ... }))?;
117///
118///     // Update patient
119///     let mut content = patient.content().clone();
120///     content["generalPractitioner"] = json!([{"reference": "Practitioner/456"}]);
121///     tx.update(&patient, content).await?;
122///
123///     // Create an encounter
124///     tx.create("Encounter", json!({
125///         "resourceType": "Encounter",
126///         "subject": {"reference": "Patient/123"}
127///     })).await?;
128///
129///     // Commit all changes
130///     tx.commit().await?;
131///
132///     Ok(())
133/// }
134/// ```
135#[async_trait]
136pub trait Transaction: Send + Sync {
137    /// Creates a new resource within this transaction.
138    async fn create(
139        &mut self,
140        resource_type: &str,
141        resource: Value,
142    ) -> StorageResult<StoredResource>;
143
144    /// Reads a resource within this transaction.
145    ///
146    /// This sees uncommitted changes made within this transaction.
147    async fn read(
148        &mut self,
149        resource_type: &str,
150        id: &str,
151    ) -> StorageResult<Option<StoredResource>>;
152
153    /// Updates a resource within this transaction.
154    async fn update(
155        &mut self,
156        current: &StoredResource,
157        resource: Value,
158    ) -> StorageResult<StoredResource>;
159
160    /// Deletes a resource within this transaction.
161    async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()>;
162
163    /// Commits the transaction, persisting all changes.
164    ///
165    /// After calling this, the transaction is consumed and cannot be used again.
166    async fn commit(self: Box<Self>) -> StorageResult<()>;
167
168    /// Rolls back the transaction, discarding all changes.
169    ///
170    /// After calling this, the transaction is consumed and cannot be used again.
171    async fn rollback(self: Box<Self>) -> StorageResult<()>;
172
173    /// Returns the tenant context for this transaction.
174    fn tenant(&self) -> &TenantContext;
175
176    /// Returns whether this transaction is still active.
177    fn is_active(&self) -> bool;
178}
179
180/// Provider for transaction support.
181///
182/// Backends that support ACID transactions implement this trait.
183#[async_trait]
184pub trait TransactionProvider: ResourceStorage {
185    /// The transaction type returned by this provider.
186    type Transaction: Transaction;
187
188    /// Begins a new transaction.
189    ///
190    /// # Arguments
191    ///
192    /// * `tenant` - The tenant context for operations in this transaction
193    /// * `options` - Transaction options (isolation level, timeout, etc.)
194    ///
195    /// # Returns
196    ///
197    /// An active transaction that must be committed or rolled back.
198    ///
199    /// # Errors
200    ///
201    /// * `StorageError::Transaction(UnsupportedIsolationLevel)` - If isolation level not supported
202    /// * `StorageError::Backend` - If connection cannot be acquired
203    async fn begin_transaction(
204        &self,
205        tenant: &TenantContext,
206        options: TransactionOptions,
207    ) -> StorageResult<Self::Transaction>;
208
209    /// Executes a function within a transaction.
210    ///
211    /// This is a convenience method that handles commit/rollback automatically.
212    /// If the function returns Ok, the transaction is committed.
213    /// If the function returns Err or panics, the transaction is rolled back.
214    ///
215    /// # Example
216    ///
217    /// ```ignore
218    /// storage.with_transaction(&tenant, TransactionOptions::new(), |tx| async move {
219    ///     let patient = tx.read("Patient", "123").await?;
220    ///     // ... more operations
221    ///     Ok(())
222    /// }).await?;
223    /// ```
224    async fn with_transaction<F, Fut, R>(
225        &self,
226        tenant: &TenantContext,
227        options: TransactionOptions,
228        f: F,
229    ) -> StorageResult<R>
230    where
231        F: FnOnce(Self::Transaction) -> Fut + Send,
232        Fut: std::future::Future<Output = StorageResult<R>> + Send,
233        R: Send,
234    {
235        let tx = self.begin_transaction(tenant, options).await?;
236        f(tx).await
237    }
238}
239
240/// Entry in a FHIR transaction or batch bundle.
241#[derive(Debug, Clone, Serialize, Deserialize, Default)]
242pub struct BundleEntry {
243    /// The HTTP method for this entry.
244    #[serde(default)]
245    pub method: BundleMethod,
246    /// The resource URL (relative or absolute).
247    #[serde(default)]
248    pub url: String,
249    /// The resource content (for POST, PUT, PATCH).
250    #[serde(default)]
251    pub resource: Option<Value>,
252    /// If-Match header value for conditional operations.
253    #[serde(default)]
254    pub if_match: Option<String>,
255    /// If-None-Match header value for conditional creates.
256    #[serde(default)]
257    pub if_none_match: Option<String>,
258    /// If-None-Exist header for conditional creates.
259    #[serde(default)]
260    pub if_none_exist: Option<String>,
261    /// The fullUrl for this entry, used for reference resolution.
262    /// Typically a urn:uuid: for new resources in transactions.
263    #[serde(default, skip_serializing_if = "Option::is_none")]
264    pub full_url: Option<String>,
265}
266
267/// HTTP method for bundle entries.
268#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
269#[serde(rename_all = "UPPERCASE")]
270pub enum BundleMethod {
271    /// GET - Read operation.
272    #[default]
273    Get,
274    /// POST - Create operation.
275    Post,
276    /// PUT - Update or create operation.
277    Put,
278    /// PATCH - Partial update operation.
279    Patch,
280    /// DELETE - Delete operation.
281    Delete,
282}
283
284impl std::fmt::Display for BundleMethod {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        match self {
287            BundleMethod::Get => write!(f, "GET"),
288            BundleMethod::Post => write!(f, "POST"),
289            BundleMethod::Put => write!(f, "PUT"),
290            BundleMethod::Patch => write!(f, "PATCH"),
291            BundleMethod::Delete => write!(f, "DELETE"),
292        }
293    }
294}
295
296/// Result of a bundle entry execution.
297#[derive(Debug, Clone, Serialize, Deserialize)]
298pub struct BundleEntryResult {
299    /// HTTP status code.
300    pub status: u16,
301    /// Location header (for creates).
302    pub location: Option<String>,
303    /// ETag header.
304    pub etag: Option<String>,
305    /// Last-Modified header.
306    pub last_modified: Option<String>,
307    /// Response resource (for reads, creates, updates).
308    pub resource: Option<Value>,
309    /// OperationOutcome for errors.
310    pub outcome: Option<Value>,
311}
312
313impl BundleEntryResult {
314    /// Creates a successful result for a create operation.
315    pub fn created(resource: StoredResource) -> Self {
316        Self {
317            status: 201,
318            location: Some(resource.versioned_url()),
319            etag: Some(resource.etag().to_string()),
320            last_modified: Some(resource.last_modified().to_rfc3339()),
321            resource: Some(resource.into_content()),
322            outcome: None,
323        }
324    }
325
326    /// Creates a successful result for a read operation.
327    pub fn ok(resource: StoredResource) -> Self {
328        Self {
329            status: 200,
330            location: None,
331            etag: Some(resource.etag().to_string()),
332            last_modified: Some(resource.last_modified().to_rfc3339()),
333            resource: Some(resource.into_content()),
334            outcome: None,
335        }
336    }
337
338    /// Creates a result for a delete operation.
339    pub fn deleted() -> Self {
340        Self {
341            status: 204,
342            location: None,
343            etag: None,
344            last_modified: None,
345            resource: None,
346            outcome: None,
347        }
348    }
349
350    /// Creates an error result.
351    pub fn error(status: u16, outcome: Value) -> Self {
352        Self {
353            status,
354            location: None,
355            etag: None,
356            last_modified: None,
357            resource: None,
358            outcome: Some(outcome),
359        }
360    }
361}
362
363/// Result of processing a transaction or batch bundle.
364#[derive(Debug, Clone)]
365pub struct BundleResult {
366    /// The bundle type.
367    pub bundle_type: BundleType,
368    /// Results for each entry.
369    pub entries: Vec<BundleEntryResult>,
370}
371
372/// Type of bundle operation.
373#[derive(Debug, Clone, Copy, PartialEq, Eq)]
374pub enum BundleType {
375    /// Transaction - all-or-nothing semantics.
376    Transaction,
377    /// Batch - independent operations.
378    Batch,
379}
380
381/// Provider for FHIR bundle operations.
382#[async_trait]
383pub trait BundleProvider: ResourceStorage {
384    /// Processes a transaction bundle (all-or-nothing).
385    ///
386    /// All entries are processed atomically. If any entry fails,
387    /// all changes are rolled back.
388    ///
389    /// # Arguments
390    ///
391    /// * `tenant` - The tenant context
392    /// * `entries` - The bundle entries to process
393    ///
394    /// # Returns
395    ///
396    /// Results for each entry. On failure, all entries will have error status.
397    async fn process_transaction(
398        &self,
399        tenant: &TenantContext,
400        entries: Vec<BundleEntry>,
401    ) -> Result<BundleResult, TransactionError>;
402
403    /// Processes a batch bundle (independent operations).
404    ///
405    /// Each entry is processed independently. Failures in one entry
406    /// do not affect other entries.
407    ///
408    /// # Arguments
409    ///
410    /// * `tenant` - The tenant context
411    /// * `entries` - The bundle entries to process
412    ///
413    /// # Returns
414    ///
415    /// Results for each entry. Some may succeed while others fail.
416    async fn process_batch(
417        &self,
418        tenant: &TenantContext,
419        entries: Vec<BundleEntry>,
420    ) -> StorageResult<BundleResult>;
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use helios_fhir::FhirVersion;
427
428    #[test]
429    fn test_isolation_level_display() {
430        assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read-committed");
431        assert_eq!(IsolationLevel::Serializable.to_string(), "serializable");
432    }
433
434    #[test]
435    fn test_transaction_options_builder() {
436        let opts = TransactionOptions::new()
437            .isolation_level(IsolationLevel::Serializable)
438            .timeout_ms(5000);
439
440        assert_eq!(opts.isolation_level, IsolationLevel::Serializable);
441        assert_eq!(opts.timeout_ms, 5000);
442    }
443
444    #[test]
445    fn test_transaction_options_read_only() {
446        let opts = TransactionOptions::new().read_only();
447
448        assert!(opts.read_only);
449        assert_eq!(opts.locking_strategy, LockingStrategy::None);
450    }
451
452    #[test]
453    fn test_bundle_method_display() {
454        assert_eq!(BundleMethod::Get.to_string(), "GET");
455        assert_eq!(BundleMethod::Post.to_string(), "POST");
456        assert_eq!(BundleMethod::Delete.to_string(), "DELETE");
457    }
458
459    #[test]
460    fn test_bundle_entry_result_created() {
461        let resource = StoredResource::new(
462            "Patient",
463            "123",
464            crate::tenant::TenantId::new("t1"),
465            serde_json::json!({}),
466            FhirVersion::default(),
467        );
468
469        let result = BundleEntryResult::created(resource);
470        assert_eq!(result.status, 201);
471        assert!(result.location.is_some());
472        assert!(result.etag.is_some());
473    }
474
475    #[test]
476    fn test_bundle_entry_result_error() {
477        let outcome = serde_json::json!({
478            "resourceType": "OperationOutcome",
479            "issue": [{"severity": "error", "code": "not-found"}]
480        });
481
482        let result = BundleEntryResult::error(404, outcome);
483        assert_eq!(result.status, 404);
484        assert!(result.outcome.is_some());
485        assert!(result.resource.is_none());
486    }
487}