golem_rust/transaction/
mod.rs

1// Copyright 2024-2025 Golem Cloud
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compfn;
16
17use std::fmt::{Debug, Display, Formatter};
18use std::rc::Rc;
19
20use crate::bindings::golem::api::host::{get_oplog_index, set_oplog_index, OplogIndex};
21use crate::mark_atomic_operation;
22
23pub use compfn::*;
24
25/// Represents an atomic operation of the transaction which has a rollback action.
26///
27/// Implement this trait and use it within a `transaction` block.
28/// Operations can also be constructed from closures using `operation`.
29pub trait Operation: Clone {
30    type In: Clone;
31    type Out: Clone;
32    type Err: Clone;
33
34    /// Executes the operation which may fail with a domain error
35    fn execute(&self, input: Self::In) -> Result<Self::Out, Self::Err>;
36
37    /// Executes a compensation action for the operation.
38    fn compensate(&self, input: Self::In, result: Self::Out) -> Result<(), Self::Err>;
39}
40
41/// Constructs an `Operation` from two closures: one for executing the operation,
42/// and one for rolling it back. The rollback operation always sees the input and
43/// the output of the operation.
44///
45/// This operation can run the compensation in both fallible and infallible transactions.
46pub fn operation<In: Clone, Out: Clone, Err: Clone>(
47    execute_fn: impl Fn(In) -> Result<Out, Err> + 'static,
48    compensate_fn: impl Fn(In, Out) -> Result<(), Err> + 'static,
49) -> impl Operation<In = In, Out = Out, Err = Err> {
50    FnOperation {
51        execute_fn: Rc::new(execute_fn),
52        compensate_fn: Rc::new(compensate_fn),
53    }
54}
55
56#[allow(clippy::type_complexity)]
57struct FnOperation<In, Out, Err> {
58    execute_fn: Rc<dyn Fn(In) -> Result<Out, Err>>,
59    compensate_fn: Rc<dyn Fn(In, Out) -> Result<(), Err>>,
60}
61
62impl<In, Out, Err> Clone for FnOperation<In, Out, Err> {
63    fn clone(&self) -> Self {
64        Self {
65            execute_fn: self.execute_fn.clone(),
66            compensate_fn: self.compensate_fn.clone(),
67        }
68    }
69}
70
71impl<In: Clone, Out: Clone, Err: Clone> Operation for FnOperation<In, Out, Err> {
72    type In = In;
73    type Out = Out;
74    type Err = Err;
75
76    fn execute(&self, input: In) -> Result<Out, Err> {
77        (self.execute_fn)(input)
78    }
79
80    fn compensate(&self, input: In, result: Out) -> Result<(), Err> {
81        (self.compensate_fn)(input, result)
82    }
83}
84
85/// The result of a transaction execution.
86pub type TransactionResult<Out, Err> = Result<Out, TransactionFailure<Err>>;
87
88/// The result of a transaction execution that failed.
89#[derive(Debug)]
90pub enum TransactionFailure<Err> {
91    /// One of the operations failed with an error, and the transaction was fully rolled back.
92    FailedAndRolledBackCompletely(Err),
93    /// One of the operations failed with an error, and the transaction was partially rolled back
94    /// because the compensation action of one of the operations also failed.
95    FailedAndRolledBackPartially {
96        failure: Err,
97        compensation_failure: Err,
98    },
99}
100
101impl<Err: Display> Display for TransactionFailure<Err> {
102    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103        match self {
104            TransactionFailure::FailedAndRolledBackCompletely(err) => {
105                write!(f, "Transaction failed with {err} and rolled back completely.")
106            }
107            TransactionFailure::FailedAndRolledBackPartially {
108                failure,
109                compensation_failure,
110            } => write!(
111                f,
112                "Transaction failed with {failure} and rolled back partially; compensation failed with: {compensation_failure}."
113            ),
114        }
115    }
116}
117
118/// Fallible transaction execution. If any operation fails, all the already executed
119/// successful operation's compensation actions are executed in reverse order and the transaction
120/// returns with a failure.
121pub fn fallible_transaction<Out, Err: Clone + 'static>(
122    f: impl FnOnce(&mut FallibleTransaction<Err>) -> Result<Out, Err>,
123) -> TransactionResult<Out, Err> {
124    let mut transaction = FallibleTransaction::new();
125    match f(&mut transaction) {
126        Ok(output) => Ok(output),
127        Err(error) => Err(transaction.on_fail(error)),
128    }
129}
130
131/// Retry the transaction in case of failure. If any operation returns with a failure, all
132/// the already executed successful operation's compensation actions are executed in reverse order
133/// and the transaction gets retried, using Golem's active retry policy.
134pub fn infallible_transaction<Out>(f: impl FnOnce(&mut InfallibleTransaction) -> Out) -> Out {
135    let oplog_index = get_oplog_index();
136    let _atomic_region = mark_atomic_operation();
137    let mut transaction = InfallibleTransaction::new(oplog_index);
138    f(&mut transaction)
139}
140
141/// Same as `infallible_transaction`, but with strong rollback guarantees. The compensation actions
142/// are guaranteed to be always executed before the transaction gets retried, even if it
143/// fails due to a panic or an external executor failure.
144pub fn infallible_transaction_with_strong_rollback_guarantees<Out>(
145    _f: impl FnOnce(&mut InfallibleTransaction) -> Out,
146) -> Out {
147    unimplemented!()
148}
149
150/// A generic interface for defining transactions, where the transaction mode is
151/// determined by the function's parameter (it can be `FallibleTransaction` or `InfallibleTransaction`).
152///
153/// This makes switching between different transaction guarantees easier, but is more constrained
154/// than using the specific transaction functions where for retried transactions errors does
155/// not have to be handled.
156pub fn transaction<Out, Err, F, T>(f: F) -> TransactionResult<Out, Err>
157where
158    T: Transaction<Err>,
159    F: FnOnce(&mut T) -> Result<Out, Err>,
160{
161    T::run(f)
162}
163
164/// Helper struct for coupling compensation action and the result of the operation.
165#[allow(clippy::type_complexity)]
166struct CompensationAction<Err> {
167    action: Box<dyn Fn() -> Result<(), Err>>,
168}
169
170impl<Err> CompensationAction<Err> {
171    pub fn execute(&self) -> Result<(), Err> {
172        (self.action)()
173    }
174}
175
176/// FallibleTransaction is a sequence of operations that are executed in a way that if any of the
177/// operations fails all the already performed operation's compensation actions got executed in
178/// reverse order.
179///
180/// In case of fatal errors (panic) and external executor failures it does not perform the
181/// compensation actions and the whole transaction gets retried.
182pub struct FallibleTransaction<Err> {
183    compensations: Vec<CompensationAction<Err>>,
184}
185
186impl<Err: Clone + 'static> FallibleTransaction<Err> {
187    fn new() -> Self {
188        Self {
189            compensations: Vec::new(),
190        }
191    }
192
193    pub fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
194        &mut self,
195        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
196        input: OpIn,
197    ) -> Result<OpOut, Err> {
198        let result = operation.execute(input.clone());
199        if let Ok(output) = &result {
200            let cloned_op = operation.clone();
201            let cloned_out = output.clone();
202            self.compensations.push(CompensationAction {
203                action: Box::new(move || cloned_op.compensate(input.clone(), cloned_out.clone())),
204            });
205        }
206        result
207    }
208
209    fn on_fail(&mut self, failure: Err) -> TransactionFailure<Err> {
210        for compensation_action in self.compensations.drain(..).rev() {
211            if let Err(compensation_failure) = compensation_action.execute() {
212                return TransactionFailure::FailedAndRolledBackPartially {
213                    failure,
214                    compensation_failure,
215                };
216            }
217        }
218        TransactionFailure::FailedAndRolledBackCompletely(failure)
219    }
220}
221
222/// InfallibleTransaction is a sequence of operations that are executed in a way that if any of the
223/// operations or the underlying Golem executor fails, the whole transaction is going to
224/// be retried.
225///
226/// In addition to that, **user level failures** (represented by the `Result::Err` value
227/// of an operation) lead to performing the compensation actions of each already performed operation
228/// in reverse order.
229///
230/// Fatal errors (panic) and external executor failures are currently cannot perform the
231/// rollback actions.
232pub struct InfallibleTransaction {
233    begin_oplog_index: OplogIndex,
234    compensations: Vec<CompensationAction<()>>,
235}
236
237impl InfallibleTransaction {
238    fn new(begin_oplog_index: OplogIndex) -> Self {
239        Self {
240            begin_oplog_index,
241            compensations: Vec::new(),
242        }
243    }
244
245    pub fn execute<
246        OpIn: Clone + 'static,
247        OpOut: Clone + 'static,
248        OpErr: Debug + Clone + 'static,
249    >(
250        &mut self,
251        operation: impl Operation<In = OpIn, Out = OpOut, Err = OpErr> + 'static,
252        input: OpIn,
253    ) -> OpOut {
254        match operation.execute(input.clone()) {
255            Ok(output) => {
256                let cloned_op = operation.clone();
257                let cloned_out = output.clone();
258                self.compensations.push(CompensationAction {
259                    action: Box::new(move || {
260                        cloned_op
261                            .compensate(input.clone(), cloned_out.clone())
262                            .expect("Compensation action failed");
263                        Ok(())
264                    }),
265                });
266                output
267            }
268            Err(_) => {
269                self.retry();
270                unreachable!()
271            }
272        }
273    }
274
275    /// Stop executing the transaction and retry from the beginning, after executing the compensation actions
276    pub fn retry(&mut self) {
277        for compensation_action in self.compensations.drain(..).rev() {
278            let _ = compensation_action.execute();
279        }
280        set_oplog_index(self.begin_oplog_index);
281    }
282}
283
284/// A unified interface for the different types of transactions. Using it can make the code
285/// easier to switch between different transactional guarantees but is more constrained in
286/// terms of error types.
287pub trait Transaction<Err> {
288    fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
289        &mut self,
290        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
291        input: OpIn,
292    ) -> Result<OpOut, Err>;
293
294    fn fail(&mut self, error: Err) -> Result<(), Err>;
295
296    fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err>;
297}
298
299impl<Err: Clone + 'static> Transaction<Err> for FallibleTransaction<Err> {
300    fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
301        &mut self,
302        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
303        input: OpIn,
304    ) -> Result<OpOut, Err> {
305        FallibleTransaction::execute(self, operation, input)
306    }
307
308    fn fail(&mut self, error: Err) -> Result<(), Err> {
309        Err(error)
310    }
311
312    fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err> {
313        fallible_transaction(f)
314    }
315}
316
317impl<Err: Debug + Clone + 'static> Transaction<Err> for InfallibleTransaction {
318    fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
319        &mut self,
320        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
321        input: OpIn,
322    ) -> Result<OpOut, Err> {
323        Ok(InfallibleTransaction::execute(self, operation, input))
324    }
325
326    fn fail(&mut self, error: Err) -> Result<(), Err> {
327        InfallibleTransaction::retry(self);
328        Err(error)
329    }
330
331    fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err> {
332        Ok(infallible_transaction(|tx| f(tx).unwrap()))
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use std::cell::RefCell;
339    use std::rc::Rc;
340
341    use crate::{fallible_transaction, infallible_transaction, operation};
342
343    // Not a real test, just verifying that the code compiles
344    #[test]
345    #[ignore]
346    fn tx_test_1() {
347        let log = Rc::new(RefCell::new(Vec::new()));
348
349        let log1 = log.clone();
350        let log2 = log.clone();
351        let log3 = log.clone();
352        let log4 = log.clone();
353
354        let op1 = operation(
355            move |input: String| {
356                log1.borrow_mut().push(format!("op1 execute {input}"));
357                Ok(())
358            },
359            move |input: String, _| {
360                log2.borrow_mut().push(format!("op1 rollback {input}"));
361                Ok(())
362            },
363        );
364
365        let op2 = operation(
366            move |_: ()| {
367                log3.clone().borrow_mut().push("op2 execute".to_string());
368                Err::<(), &str>("op2 error")
369            },
370            move |_: (), _| {
371                log4.clone().borrow_mut().push("op2 rollback".to_string());
372                Ok(())
373            },
374        );
375
376        let result = fallible_transaction(|tx| {
377            println!("First we execute op1");
378            tx.execute(op1, "hello".to_string())?;
379            println!("Then execute op2");
380            tx.execute(op2, ())?;
381            println!("Finally compute a result");
382            Ok(11)
383        });
384
385        println!("{log:?}");
386        println!("{result:?}");
387    }
388
389    // Not a real test, just verifying that the code compiles
390    #[test]
391    #[ignore]
392    fn tx_test_2() {
393        let log = Rc::new(RefCell::new(Vec::new()));
394
395        let log1 = log.clone();
396        let log2 = log.clone();
397        let log3 = log.clone();
398        let log4 = log.clone();
399
400        let op1 = operation(
401            move |input: String| {
402                log1.borrow_mut().push(format!("op1 execute {input}"));
403                Ok::<(), ()>(())
404            },
405            move |input: String, _| {
406                log2.borrow_mut().push(format!("op1 rollback {input}"));
407                Ok(())
408            },
409        );
410
411        let op2 = operation(
412            move |_: ()| {
413                log3.clone().borrow_mut().push("op2 execute".to_string());
414                Err::<(), &str>("op2 error")
415            },
416            move |_: (), r| {
417                log4.clone()
418                    .borrow_mut()
419                    .push(format!("op2 rollback {r:?}"));
420                Ok(())
421            },
422        );
423
424        let result = infallible_transaction(|tx| {
425            println!("First we execute op1");
426            tx.execute(op1, "hello".to_string());
427            println!("Then execute op2");
428            tx.execute(op2, ());
429            println!("Finally compute a result");
430            11
431        });
432
433        println!("{log:?}");
434        println!("{result:?}");
435    }
436}
437
438#[cfg(test)]
439#[cfg(feature = "macro")]
440mod macro_tests {
441    use golem_rust_macro::golem_operation;
442
443    use crate::{fallible_transaction, infallible_transaction};
444
445    mod golem_rust {
446        pub use crate::*;
447    }
448
449    #[golem_operation(compensation=test_compensation)]
450    fn test_operation(input1: u64, input2: f32) -> Result<bool, String> {
451        println!("Op input: {input1}, {input2}");
452        Ok(true)
453    }
454
455    fn test_compensation(_: bool, input1: u64, input2: f32) -> Result<(), String> {
456        println!("Compensation input: {input1}, {input2}");
457        Ok(())
458    }
459
460    #[golem_operation(compensation=test_compensation_2)]
461    fn test_operation_2(input1: u64, input2: f32) -> Result<bool, String> {
462        println!("Op input: {input1}, {input2}");
463        Ok(true)
464    }
465
466    fn test_compensation_2(result: bool) -> Result<(), String> {
467        println!("Compensation for operation result {result:?}");
468        Ok(())
469    }
470
471    #[golem_operation(compensation=test_compensation_3)]
472    fn test_operation_3(input: String) -> Result<(), String> {
473        println!("Op input: {input}");
474        Ok(())
475    }
476
477    fn test_compensation_3() -> Result<(), String> {
478        println!("Compensation for operation, not using any input");
479        Ok(())
480    }
481
482    #[golem_operation(compensation=test_compensation_4)]
483    fn test_operation_4(input: u64) -> Result<(), String> {
484        println!("Op input: {input}");
485        Ok(())
486    }
487
488    fn test_compensation_4(_: (), input: u64) -> Result<(), String> {
489        println!("Compensation for operation with single input {input}");
490        Ok(())
491    }
492
493    // Not a real test, just verifying that the code compiles
494    #[test]
495    #[ignore]
496    fn tx_test_1() {
497        let result = fallible_transaction(|tx| {
498            println!("Executing the annotated function as an operation directly");
499            tx.test_operation(1, 0.1)?;
500            tx.test_operation_2(1, 0.1)?;
501            tx.test_operation_3("test".to_string())?;
502            tx.test_operation_4(1)?;
503
504            Ok(11)
505        });
506
507        println!("{result:?}");
508    }
509
510    // Not a real test, just verifying that the code compiles
511    #[test]
512    #[ignore]
513    fn tx_test_2() {
514        let result = infallible_transaction(|tx| {
515            println!("Executing the annotated function as an operation directly");
516            let _ = tx.test_operation(1, 0.1);
517            11
518        });
519
520        println!("{result:?}");
521    }
522}