golem_rust/transaction/
mod.rs

1// Copyright 2024-2025 Golem Cloud
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compfn;
16
17use std::fmt::{Debug, Display, Formatter};
18use std::rc::Rc;
19
20use crate::bindings::golem::api::host::{get_oplog_index, set_oplog_index, OplogIndex};
21use crate::mark_atomic_operation;
22
23pub use compfn::*;
24
25/// Represents an atomic operation of the transaction which has a rollback action.
26///
27/// Implement this trait and use it within a `transaction` block.
28/// Operations can also be constructed from closures using `operation`.
29pub trait Operation: Clone {
30    type In: Clone;
31    type Out: Clone;
32    type Err: Clone;
33
34    /// Executes the operation which may fail with a domain error
35    fn execute(&self, input: Self::In) -> Result<Self::Out, Self::Err>;
36
37    /// Executes a compensation action for the operation.
38    fn compensate(&self, input: Self::In, result: Self::Out) -> Result<(), Self::Err>;
39}
40
41/// Constructs an `Operation` from two closures: one for executing the operation,
42/// and one for rolling it back. The rollback operation always sees the input and
43/// the output of the operation.
44///
45/// This operation can run the compensation in both fallible and infallible transactions.
46pub fn operation<In: Clone, Out: Clone, Err: Clone>(
47    execute_fn: impl Fn(In) -> Result<Out, Err> + 'static,
48    compensate_fn: impl Fn(In, Out) -> Result<(), Err> + 'static,
49) -> impl Operation<In = In, Out = Out, Err = Err> {
50    FnOperation {
51        execute_fn: Rc::new(execute_fn),
52        compensate_fn: Rc::new(compensate_fn),
53    }
54}
55
56#[allow(clippy::type_complexity)]
57struct FnOperation<In, Out, Err> {
58    execute_fn: Rc<dyn Fn(In) -> Result<Out, Err>>,
59    compensate_fn: Rc<dyn Fn(In, Out) -> Result<(), Err>>,
60}
61
62impl<In, Out, Err> Clone for FnOperation<In, Out, Err> {
63    fn clone(&self) -> Self {
64        Self {
65            execute_fn: self.execute_fn.clone(),
66            compensate_fn: self.compensate_fn.clone(),
67        }
68    }
69}
70
71impl<In: Clone, Out: Clone, Err: Clone> Operation for FnOperation<In, Out, Err> {
72    type In = In;
73    type Out = Out;
74    type Err = Err;
75
76    fn execute(&self, input: In) -> Result<Out, Err> {
77        (self.execute_fn)(input)
78    }
79
80    fn compensate(&self, input: In, result: Out) -> Result<(), Err> {
81        (self.compensate_fn)(input, result)
82    }
83}
84
85/// The result of a transaction execution.
86pub type TransactionResult<Out, Err> = Result<Out, TransactionFailure<Err>>;
87
88/// The result of a transaction execution that failed.
89#[derive(Debug)]
90pub enum TransactionFailure<Err> {
91    /// One of the operations failed with an error, and the transaction was fully rolled back.
92    FailedAndRolledBackCompletely(Err),
93    /// One of the operations failed with an error, and the transaction was partially rolled back
94    /// because the compensation action of one of the operations also failed.
95    FailedAndRolledBackPartially {
96        failure: Err,
97        compensation_failure: Err,
98    },
99}
100
101impl<Err: Display> Display for TransactionFailure<Err> {
102    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103        match self {
104            TransactionFailure::FailedAndRolledBackCompletely(err) => {
105                write!(f, "Transaction failed with {err} and rolled back completely.")
106            }
107            TransactionFailure::FailedAndRolledBackPartially {
108                failure,
109                compensation_failure,
110            } => write!(
111                f,
112                "Transaction failed with {failure} and rolled back partially; compensation failed with: {compensation_failure}."
113            ),
114        }
115    }
116}
117
118/// Fallible transaction execution. If any operation fails, all the already executed
119/// successful operation's compensation actions are executed in reverse order and the transaction
120/// returns with a failure.
121pub fn fallible_transaction<Out, Err: Clone + 'static>(
122    f: impl FnOnce(&mut FallibleTransaction<Err>) -> Result<Out, Err>,
123) -> TransactionResult<Out, Err> {
124    let mut transaction = FallibleTransaction::new();
125    match f(&mut transaction) {
126        Ok(output) => Ok(output),
127        Err(error) => Err(transaction.on_fail(error)),
128    }
129}
130
131/// Retry the transaction in case of failure. If any operation returns with a failure, all
132/// the already executed successful operation's compensation actions are executed in reverse order
133/// and the transaction gets retried, using Golem's active retry policy.
134pub fn infallible_transaction<Out>(f: impl FnOnce(&mut InfallibleTransaction) -> Out) -> Out {
135    let oplog_index = get_oplog_index();
136    let _atomic_region = mark_atomic_operation();
137    let mut transaction = InfallibleTransaction::new(oplog_index);
138    f(&mut transaction)
139}
140
141/// Same as `infallible_transaction`, but with strong rollback guarantees. The compensation actions
142/// are guaranteed to be always executed before the transaction gets retried, even if it
143/// fails due to a panic or an external executor failure.
144pub fn infallible_transaction_with_strong_rollback_guarantees<Out>(
145    _f: impl FnOnce(&mut InfallibleTransaction) -> Out,
146) -> Out {
147    unimplemented!()
148}
149
150/// A generic interface for defining transactions, where the transaction mode is
151/// determined by the function's parameter (it can be `FallibleTransaction` or `InfallibleTransaction`).
152///
153/// This makes switching between different transaction guarantees easier, but is more constrained
154/// than using the specific transaction functions where for retried transactions errors does
155/// not have to be handled.
156pub fn transaction<Out, Err, F, T>(f: F) -> TransactionResult<Out, Err>
157where
158    T: Transaction<Err>,
159    F: FnOnce(&mut T) -> Result<Out, Err>,
160{
161    T::run(f)
162}
163
164/// Helper struct for coupling compensation action and the result of the operation.
165#[allow(clippy::type_complexity)]
166struct CompensationAction<Err> {
167    action: Box<dyn Fn() -> Result<(), Err>>,
168}
169
170impl<Err> CompensationAction<Err> {
171    pub fn execute(&self) -> Result<(), Err> {
172        (self.action)()
173    }
174}
175
176/// FallibleTransaction is a sequence of operations that are executed in a way that if any of the
177/// operations fails all the already performed operation's compensation actions got executed in
178/// reverse order.
179///
180/// In case of fatal errors (panic) and external executor failures it does not perform the
181/// compensation actions and the whole transaction gets retried.
182pub struct FallibleTransaction<Err> {
183    compensations: Vec<CompensationAction<Err>>,
184}
185
186impl<Err: Clone + 'static> FallibleTransaction<Err> {
187    fn new() -> Self {
188        Self {
189            compensations: Vec::new(),
190        }
191    }
192
193    pub fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
194        &mut self,
195        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
196        input: OpIn,
197    ) -> Result<OpOut, Err> {
198        let result = operation.execute(input.clone());
199        if let Ok(output) = &result {
200            let cloned_op = operation.clone();
201            let cloned_out = output.clone();
202            self.compensations.push(CompensationAction {
203                action: Box::new(move || cloned_op.compensate(input.clone(), cloned_out.clone())),
204            });
205        }
206        result
207    }
208
209    fn on_fail(&mut self, failure: Err) -> TransactionFailure<Err> {
210        for compensation_action in self.compensations.drain(..).rev() {
211            if let Err(compensation_failure) = compensation_action.execute() {
212                return TransactionFailure::FailedAndRolledBackPartially {
213                    failure,
214                    compensation_failure,
215                };
216            }
217        }
218        TransactionFailure::FailedAndRolledBackCompletely(failure)
219    }
220}
221
222/// InfallibleTransaction is a sequence of operations that are executed in a way that if any of the
223/// operations or the underlying Golem executor fails, the whole transaction is going to
224/// be retried.
225///
226/// In addition to that, **user level failures** (represented by the `Result::Err` value
227/// of an operation) lead to performing the compensation actions of each already performed operation
228/// in reverse order.
229///
230/// Fatal errors (panic) and external executor failures are currently cannot perform the
231/// rollback actions.
232pub struct InfallibleTransaction {
233    begin_oplog_index: OplogIndex,
234    compensations: Vec<CompensationAction<()>>,
235}
236
237impl InfallibleTransaction {
238    fn new(begin_oplog_index: OplogIndex) -> Self {
239        Self {
240            begin_oplog_index,
241            compensations: Vec::new(),
242        }
243    }
244
245    pub fn execute<
246        OpIn: Clone + 'static,
247        OpOut: Clone + 'static,
248        OpErr: Debug + Clone + 'static,
249    >(
250        &mut self,
251        operation: impl Operation<In = OpIn, Out = OpOut, Err = OpErr> + 'static,
252        input: OpIn,
253    ) -> OpOut {
254        match operation.execute(input.clone()) {
255            Ok(output) => {
256                let cloned_op = operation.clone();
257                let cloned_out = output.clone();
258                self.compensations.push(CompensationAction {
259                    action: Box::new(move || {
260                        cloned_op
261                            .compensate(input.clone(), cloned_out.clone())
262                            .expect("Compensation action failed");
263                        Ok(())
264                    }),
265                });
266                output
267            }
268            Err(_) => {
269                self.retry();
270                unreachable!()
271            }
272        }
273    }
274
275    /// Stop executing the transaction and retry from the beginning, after executing the compensation actions
276    pub fn retry(&mut self) {
277        for compensation_action in self.compensations.drain(..).rev() {
278            let _ = compensation_action.execute();
279        }
280        set_oplog_index(self.begin_oplog_index);
281    }
282}
283
284/// A unified interface for the different types of transactions. Using it can make the code
285/// easier to switch between different transactional guarantees but is more constrained in
286/// terms of error types.
287pub trait Transaction<Err> {
288    fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
289        &mut self,
290        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
291        input: OpIn,
292    ) -> Result<OpOut, Err>;
293
294    fn fail(&mut self, error: Err) -> Result<(), Err>;
295
296    fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err>;
297}
298
299impl<Err: Clone + 'static> Transaction<Err> for FallibleTransaction<Err> {
300    fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
301        &mut self,
302        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
303        input: OpIn,
304    ) -> Result<OpOut, Err> {
305        FallibleTransaction::execute(self, operation, input)
306    }
307
308    fn fail(&mut self, error: Err) -> Result<(), Err> {
309        Err(error)
310    }
311
312    fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err> {
313        fallible_transaction(f)
314    }
315}
316
317impl<Err: Debug + Clone + 'static> Transaction<Err> for InfallibleTransaction {
318    fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
319        &mut self,
320        operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
321        input: OpIn,
322    ) -> Result<OpOut, Err> {
323        Ok(InfallibleTransaction::execute(self, operation, input))
324    }
325
326    fn fail(&mut self, error: Err) -> Result<(), Err> {
327        InfallibleTransaction::retry(self);
328        Err(error)
329    }
330
331    fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err> {
332        Ok(infallible_transaction(|tx| f(tx).unwrap()))
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use std::cell::RefCell;
339    use std::rc::Rc;
340    use test_r::test;
341
342    use crate::{fallible_transaction, infallible_transaction, operation};
343
344    // Not a real test, just verifying that the code compiles
345    #[test]
346    #[ignore]
347    fn tx_test_1() {
348        let log = Rc::new(RefCell::new(Vec::new()));
349
350        let log1 = log.clone();
351        let log2 = log.clone();
352        let log3 = log.clone();
353        let log4 = log.clone();
354
355        let op1 = operation(
356            move |input: String| {
357                log1.borrow_mut().push(format!("op1 execute {input}"));
358                Ok(())
359            },
360            move |input: String, _| {
361                log2.borrow_mut().push(format!("op1 rollback {input}"));
362                Ok(())
363            },
364        );
365
366        let op2 = operation(
367            move |_: ()| {
368                log3.clone().borrow_mut().push("op2 execute".to_string());
369                Err::<(), &str>("op2 error")
370            },
371            move |_: (), _| {
372                log4.clone().borrow_mut().push("op2 rollback".to_string());
373                Ok(())
374            },
375        );
376
377        let result = fallible_transaction(|tx| {
378            println!("First we execute op1");
379            tx.execute(op1, "hello".to_string())?;
380            println!("Then execute op2");
381            tx.execute(op2, ())?;
382            println!("Finally compute a result");
383            Ok(11)
384        });
385
386        println!("{log:?}");
387        println!("{result:?}");
388    }
389
390    // Not a real test, just verifying that the code compiles
391    #[test]
392    #[ignore]
393    fn tx_test_2() {
394        let log = Rc::new(RefCell::new(Vec::new()));
395
396        let log1 = log.clone();
397        let log2 = log.clone();
398        let log3 = log.clone();
399        let log4 = log.clone();
400
401        let op1 = operation(
402            move |input: String| {
403                log1.borrow_mut().push(format!("op1 execute {input}"));
404                Ok::<(), ()>(())
405            },
406            move |input: String, _| {
407                log2.borrow_mut().push(format!("op1 rollback {input}"));
408                Ok(())
409            },
410        );
411
412        let op2 = operation(
413            move |_: ()| {
414                log3.clone().borrow_mut().push("op2 execute".to_string());
415                Err::<(), &str>("op2 error")
416            },
417            move |_: (), r| {
418                log4.clone()
419                    .borrow_mut()
420                    .push(format!("op2 rollback {r:?}"));
421                Ok(())
422            },
423        );
424
425        let result = infallible_transaction(|tx| {
426            println!("First we execute op1");
427            tx.execute(op1, "hello".to_string());
428            println!("Then execute op2");
429            tx.execute(op2, ());
430            println!("Finally compute a result");
431            11
432        });
433
434        println!("{log:?}");
435        println!("{result:?}");
436    }
437}
438
439#[cfg(test)]
440#[cfg(feature = "macro")]
441mod macro_tests {
442    use golem_rust_macro::golem_operation;
443
444    use crate::{fallible_transaction, infallible_transaction};
445
446    mod golem_rust {
447        pub use crate::*;
448    }
449
450    #[golem_operation(compensation=test_compensation)]
451    fn test_operation(input1: u64, input2: f32) -> Result<bool, String> {
452        println!("Op input: {input1}, {input2}");
453        Ok(true)
454    }
455
456    fn test_compensation(_: bool, input1: u64, input2: f32) -> Result<(), String> {
457        println!("Compensation input: {input1}, {input2}");
458        Ok(())
459    }
460
461    #[golem_operation(compensation=test_compensation_2)]
462    fn test_operation_2(input1: u64, input2: f32) -> Result<bool, String> {
463        println!("Op input: {input1}, {input2}");
464        Ok(true)
465    }
466
467    fn test_compensation_2(result: bool) -> Result<(), String> {
468        println!("Compensation for operation result {result:?}");
469        Ok(())
470    }
471
472    #[golem_operation(compensation=test_compensation_3)]
473    fn test_operation_3(input: String) -> Result<(), String> {
474        println!("Op input: {input}");
475        Ok(())
476    }
477
478    fn test_compensation_3() -> Result<(), String> {
479        println!("Compensation for operation, not using any input");
480        Ok(())
481    }
482
483    #[golem_operation(compensation=test_compensation_4)]
484    fn test_operation_4(input: u64) -> Result<(), String> {
485        println!("Op input: {input}");
486        Ok(())
487    }
488
489    fn test_compensation_4(_: (), input: u64) -> Result<(), String> {
490        println!("Compensation for operation with single input {input}");
491        Ok(())
492    }
493
494    // Not a real test, just verifying that the code compiles
495    #[test]
496    #[ignore]
497    fn tx_test_1() {
498        let result = fallible_transaction(|tx| {
499            println!("Executing the annotated function as an operation directly");
500            tx.test_operation(1, 0.1)?;
501            tx.test_operation_2(1, 0.1)?;
502            tx.test_operation_3("test".to_string())?;
503            tx.test_operation_4(1)?;
504
505            Ok(11)
506        });
507
508        println!("{result:?}");
509    }
510
511    // Not a real test, just verifying that the code compiles
512    #[test]
513    #[ignore]
514    fn tx_test_2() {
515        let result = infallible_transaction(|tx| {
516            println!("Executing the annotated function as an operation directly");
517            let _ = tx.test_operation(1, 0.1);
518            11
519        });
520
521        println!("{result:?}");
522    }
523}