helios_persistence/core/transaction.rs
1//! Transaction traits for ACID operations.
2//!
3//! This module defines traits for transactional storage operations,
4//! including support for FHIR transaction and batch bundles.
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10use crate::error::{StorageResult, TransactionError};
11use crate::tenant::TenantContext;
12use crate::types::StoredResource;
13
14use super::storage::ResourceStorage;
15
16/// Transaction isolation levels.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum IsolationLevel {
19 /// Read committed - sees only committed data.
20 #[default]
21 ReadCommitted,
22 /// Repeatable read - consistent reads within transaction.
23 RepeatableRead,
24 /// Serializable - full isolation (may reduce concurrency).
25 Serializable,
26 /// Snapshot - point-in-time consistent view.
27 Snapshot,
28}
29
30impl std::fmt::Display for IsolationLevel {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 IsolationLevel::ReadCommitted => write!(f, "read-committed"),
34 IsolationLevel::RepeatableRead => write!(f, "repeatable-read"),
35 IsolationLevel::Serializable => write!(f, "serializable"),
36 IsolationLevel::Snapshot => write!(f, "snapshot"),
37 }
38 }
39}
40
41/// Locking strategy for concurrent access.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
43pub enum LockingStrategy {
44 /// Optimistic locking using version numbers (If-Match).
45 #[default]
46 Optimistic,
47 /// Pessimistic locking with row-level locks.
48 Pessimistic,
49 /// No locking (for read-only transactions).
50 None,
51}
52
53/// Options for starting a transaction.
54#[derive(Debug, Clone, Default)]
55pub struct TransactionOptions {
56 /// The isolation level for the transaction.
57 pub isolation_level: IsolationLevel,
58 /// The locking strategy to use.
59 pub locking_strategy: LockingStrategy,
60 /// Timeout in milliseconds (0 = no timeout).
61 pub timeout_ms: u64,
62 /// Whether this is a read-only transaction.
63 pub read_only: bool,
64}
65
66impl TransactionOptions {
67 /// Creates new options with defaults.
68 pub fn new() -> Self {
69 Self::default()
70 }
71
72 /// Sets the isolation level.
73 pub fn isolation_level(mut self, level: IsolationLevel) -> Self {
74 self.isolation_level = level;
75 self
76 }
77
78 /// Sets the locking strategy.
79 pub fn locking_strategy(mut self, strategy: LockingStrategy) -> Self {
80 self.locking_strategy = strategy;
81 self
82 }
83
84 /// Sets the timeout.
85 pub fn timeout_ms(mut self, timeout: u64) -> Self {
86 self.timeout_ms = timeout;
87 self
88 }
89
90 /// Marks this as a read-only transaction.
91 pub fn read_only(mut self) -> Self {
92 self.read_only = true;
93 self.locking_strategy = LockingStrategy::None;
94 self
95 }
96}
97
98/// A database transaction.
99///
100/// This trait represents an active transaction that can perform CRUD operations
101/// atomically. Changes are only persisted when `commit()` is called.
102///
103/// # Example
104///
105/// ```ignore
106/// use helios_persistence::core::{TransactionProvider, Transaction};
107///
108/// async fn transfer_care<S: TransactionProvider>(
109/// storage: &S,
110/// tenant: &TenantContext,
111/// ) -> Result<(), StorageError> {
112/// let mut tx = storage.begin_transaction(tenant, TransactionOptions::new()).await?;
113///
114/// // Read patient
115/// let patient = tx.read("Patient", "123").await?
116/// .ok_or(StorageError::Resource(ResourceError::NotFound { ... }))?;
117///
118/// // Update patient
119/// let mut content = patient.content().clone();
120/// content["generalPractitioner"] = json!([{"reference": "Practitioner/456"}]);
121/// tx.update(&patient, content).await?;
122///
123/// // Create an encounter
124/// tx.create("Encounter", json!({
125/// "resourceType": "Encounter",
126/// "subject": {"reference": "Patient/123"}
127/// })).await?;
128///
129/// // Commit all changes
130/// tx.commit().await?;
131///
132/// Ok(())
133/// }
134/// ```
135#[async_trait]
136pub trait Transaction: Send + Sync {
137 /// Creates a new resource within this transaction.
138 async fn create(
139 &mut self,
140 resource_type: &str,
141 resource: Value,
142 ) -> StorageResult<StoredResource>;
143
144 /// Reads a resource within this transaction.
145 ///
146 /// This sees uncommitted changes made within this transaction.
147 async fn read(
148 &mut self,
149 resource_type: &str,
150 id: &str,
151 ) -> StorageResult<Option<StoredResource>>;
152
153 /// Updates a resource within this transaction.
154 async fn update(
155 &mut self,
156 current: &StoredResource,
157 resource: Value,
158 ) -> StorageResult<StoredResource>;
159
160 /// Deletes a resource within this transaction.
161 async fn delete(&mut self, resource_type: &str, id: &str) -> StorageResult<()>;
162
163 /// Commits the transaction, persisting all changes.
164 ///
165 /// After calling this, the transaction is consumed and cannot be used again.
166 async fn commit(self: Box<Self>) -> StorageResult<()>;
167
168 /// Rolls back the transaction, discarding all changes.
169 ///
170 /// After calling this, the transaction is consumed and cannot be used again.
171 async fn rollback(self: Box<Self>) -> StorageResult<()>;
172
173 /// Returns the tenant context for this transaction.
174 fn tenant(&self) -> &TenantContext;
175
176 /// Returns whether this transaction is still active.
177 fn is_active(&self) -> bool;
178}
179
180/// Provider for transaction support.
181///
182/// Backends that support ACID transactions implement this trait.
183#[async_trait]
184pub trait TransactionProvider: ResourceStorage {
185 /// The transaction type returned by this provider.
186 type Transaction: Transaction;
187
188 /// Begins a new transaction.
189 ///
190 /// # Arguments
191 ///
192 /// * `tenant` - The tenant context for operations in this transaction
193 /// * `options` - Transaction options (isolation level, timeout, etc.)
194 ///
195 /// # Returns
196 ///
197 /// An active transaction that must be committed or rolled back.
198 ///
199 /// # Errors
200 ///
201 /// * `StorageError::Transaction(UnsupportedIsolationLevel)` - If isolation level not supported
202 /// * `StorageError::Backend` - If connection cannot be acquired
203 async fn begin_transaction(
204 &self,
205 tenant: &TenantContext,
206 options: TransactionOptions,
207 ) -> StorageResult<Self::Transaction>;
208
209 /// Executes a function within a transaction.
210 ///
211 /// This is a convenience method that handles commit/rollback automatically.
212 /// If the function returns Ok, the transaction is committed.
213 /// If the function returns Err or panics, the transaction is rolled back.
214 ///
215 /// # Example
216 ///
217 /// ```ignore
218 /// storage.with_transaction(&tenant, TransactionOptions::new(), |tx| async move {
219 /// let patient = tx.read("Patient", "123").await?;
220 /// // ... more operations
221 /// Ok(())
222 /// }).await?;
223 /// ```
224 async fn with_transaction<F, Fut, R>(
225 &self,
226 tenant: &TenantContext,
227 options: TransactionOptions,
228 f: F,
229 ) -> StorageResult<R>
230 where
231 F: FnOnce(Self::Transaction) -> Fut + Send,
232 Fut: std::future::Future<Output = StorageResult<R>> + Send,
233 R: Send,
234 {
235 let tx = self.begin_transaction(tenant, options).await?;
236 f(tx).await
237 }
238}
239
240/// Entry in a FHIR transaction or batch bundle.
241#[derive(Debug, Clone, Serialize, Deserialize, Default)]
242pub struct BundleEntry {
243 /// The HTTP method for this entry.
244 #[serde(default)]
245 pub method: BundleMethod,
246 /// The resource URL (relative or absolute).
247 #[serde(default)]
248 pub url: String,
249 /// The resource content (for POST, PUT, PATCH).
250 #[serde(default)]
251 pub resource: Option<Value>,
252 /// If-Match header value for conditional operations.
253 #[serde(default)]
254 pub if_match: Option<String>,
255 /// If-None-Match header value for conditional creates.
256 #[serde(default)]
257 pub if_none_match: Option<String>,
258 /// If-None-Exist header for conditional creates.
259 #[serde(default)]
260 pub if_none_exist: Option<String>,
261 /// The fullUrl for this entry, used for reference resolution.
262 /// Typically a urn:uuid: for new resources in transactions.
263 #[serde(default, skip_serializing_if = "Option::is_none")]
264 pub full_url: Option<String>,
265}
266
267/// HTTP method for bundle entries.
268#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
269#[serde(rename_all = "UPPERCASE")]
270pub enum BundleMethod {
271 /// GET - Read operation.
272 #[default]
273 Get,
274 /// POST - Create operation.
275 Post,
276 /// PUT - Update or create operation.
277 Put,
278 /// PATCH - Partial update operation.
279 Patch,
280 /// DELETE - Delete operation.
281 Delete,
282}
283
284impl std::fmt::Display for BundleMethod {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 match self {
287 BundleMethod::Get => write!(f, "GET"),
288 BundleMethod::Post => write!(f, "POST"),
289 BundleMethod::Put => write!(f, "PUT"),
290 BundleMethod::Patch => write!(f, "PATCH"),
291 BundleMethod::Delete => write!(f, "DELETE"),
292 }
293 }
294}
295
296/// Result of a bundle entry execution.
297#[derive(Debug, Clone, Serialize, Deserialize)]
298pub struct BundleEntryResult {
299 /// HTTP status code.
300 pub status: u16,
301 /// Location header (for creates).
302 pub location: Option<String>,
303 /// ETag header.
304 pub etag: Option<String>,
305 /// Last-Modified header.
306 pub last_modified: Option<String>,
307 /// Response resource (for reads, creates, updates).
308 pub resource: Option<Value>,
309 /// OperationOutcome for errors.
310 pub outcome: Option<Value>,
311}
312
313impl BundleEntryResult {
314 /// Creates a successful result for a create operation.
315 pub fn created(resource: StoredResource) -> Self {
316 Self {
317 status: 201,
318 location: Some(resource.versioned_url()),
319 etag: Some(resource.etag().to_string()),
320 last_modified: Some(resource.last_modified().to_rfc3339()),
321 resource: Some(resource.into_content()),
322 outcome: None,
323 }
324 }
325
326 /// Creates a successful result for a read operation.
327 pub fn ok(resource: StoredResource) -> Self {
328 Self {
329 status: 200,
330 location: None,
331 etag: Some(resource.etag().to_string()),
332 last_modified: Some(resource.last_modified().to_rfc3339()),
333 resource: Some(resource.into_content()),
334 outcome: None,
335 }
336 }
337
338 /// Creates a result for a delete operation.
339 pub fn deleted() -> Self {
340 Self {
341 status: 204,
342 location: None,
343 etag: None,
344 last_modified: None,
345 resource: None,
346 outcome: None,
347 }
348 }
349
350 /// Creates an error result.
351 pub fn error(status: u16, outcome: Value) -> Self {
352 Self {
353 status,
354 location: None,
355 etag: None,
356 last_modified: None,
357 resource: None,
358 outcome: Some(outcome),
359 }
360 }
361}
362
363/// Result of processing a transaction or batch bundle.
364#[derive(Debug, Clone)]
365pub struct BundleResult {
366 /// The bundle type.
367 pub bundle_type: BundleType,
368 /// Results for each entry.
369 pub entries: Vec<BundleEntryResult>,
370}
371
372/// Type of bundle operation.
373#[derive(Debug, Clone, Copy, PartialEq, Eq)]
374pub enum BundleType {
375 /// Transaction - all-or-nothing semantics.
376 Transaction,
377 /// Batch - independent operations.
378 Batch,
379}
380
381/// Provider for FHIR bundle operations.
382#[async_trait]
383pub trait BundleProvider: ResourceStorage {
384 /// Processes a transaction bundle (all-or-nothing).
385 ///
386 /// All entries are processed atomically. If any entry fails,
387 /// all changes are rolled back.
388 ///
389 /// # Arguments
390 ///
391 /// * `tenant` - The tenant context
392 /// * `entries` - The bundle entries to process
393 ///
394 /// # Returns
395 ///
396 /// Results for each entry. On failure, all entries will have error status.
397 async fn process_transaction(
398 &self,
399 tenant: &TenantContext,
400 entries: Vec<BundleEntry>,
401 ) -> Result<BundleResult, TransactionError>;
402
403 /// Processes a batch bundle (independent operations).
404 ///
405 /// Each entry is processed independently. Failures in one entry
406 /// do not affect other entries.
407 ///
408 /// # Arguments
409 ///
410 /// * `tenant` - The tenant context
411 /// * `entries` - The bundle entries to process
412 ///
413 /// # Returns
414 ///
415 /// Results for each entry. Some may succeed while others fail.
416 async fn process_batch(
417 &self,
418 tenant: &TenantContext,
419 entries: Vec<BundleEntry>,
420 ) -> StorageResult<BundleResult>;
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use helios_fhir::FhirVersion;
427
428 #[test]
429 fn test_isolation_level_display() {
430 assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read-committed");
431 assert_eq!(IsolationLevel::Serializable.to_string(), "serializable");
432 }
433
434 #[test]
435 fn test_transaction_options_builder() {
436 let opts = TransactionOptions::new()
437 .isolation_level(IsolationLevel::Serializable)
438 .timeout_ms(5000);
439
440 assert_eq!(opts.isolation_level, IsolationLevel::Serializable);
441 assert_eq!(opts.timeout_ms, 5000);
442 }
443
444 #[test]
445 fn test_transaction_options_read_only() {
446 let opts = TransactionOptions::new().read_only();
447
448 assert!(opts.read_only);
449 assert_eq!(opts.locking_strategy, LockingStrategy::None);
450 }
451
452 #[test]
453 fn test_bundle_method_display() {
454 assert_eq!(BundleMethod::Get.to_string(), "GET");
455 assert_eq!(BundleMethod::Post.to_string(), "POST");
456 assert_eq!(BundleMethod::Delete.to_string(), "DELETE");
457 }
458
459 #[test]
460 fn test_bundle_entry_result_created() {
461 let resource = StoredResource::new(
462 "Patient",
463 "123",
464 crate::tenant::TenantId::new("t1"),
465 serde_json::json!({}),
466 FhirVersion::default(),
467 );
468
469 let result = BundleEntryResult::created(resource);
470 assert_eq!(result.status, 201);
471 assert!(result.location.is_some());
472 assert!(result.etag.is_some());
473 }
474
475 #[test]
476 fn test_bundle_entry_result_error() {
477 let outcome = serde_json::json!({
478 "resourceType": "OperationOutcome",
479 "issue": [{"severity": "error", "code": "not-found"}]
480 });
481
482 let result = BundleEntryResult::error(404, outcome);
483 assert_eq!(result.status, 404);
484 assert!(result.outcome.is_some());
485 assert!(result.resource.is_none());
486 }
487}