1mod compfn;
16
17use std::fmt::{Debug, Display, Formatter};
18use std::rc::Rc;
19
20use crate::bindings::golem::api::host::{get_oplog_index, set_oplog_index, OplogIndex};
21use crate::mark_atomic_operation;
22
23pub use compfn::*;
24
25pub trait Operation: Clone {
30 type In: Clone;
31 type Out: Clone;
32 type Err: Clone;
33
34 fn execute(&self, input: Self::In) -> Result<Self::Out, Self::Err>;
36
37 fn compensate(&self, input: Self::In, result: Self::Out) -> Result<(), Self::Err>;
39}
40
41pub fn operation<In: Clone, Out: Clone, Err: Clone>(
47 execute_fn: impl Fn(In) -> Result<Out, Err> + 'static,
48 compensate_fn: impl Fn(In, Out) -> Result<(), Err> + 'static,
49) -> impl Operation<In = In, Out = Out, Err = Err> {
50 FnOperation {
51 execute_fn: Rc::new(execute_fn),
52 compensate_fn: Rc::new(compensate_fn),
53 }
54}
55
56#[allow(clippy::type_complexity)]
57struct FnOperation<In, Out, Err> {
58 execute_fn: Rc<dyn Fn(In) -> Result<Out, Err>>,
59 compensate_fn: Rc<dyn Fn(In, Out) -> Result<(), Err>>,
60}
61
62impl<In, Out, Err> Clone for FnOperation<In, Out, Err> {
63 fn clone(&self) -> Self {
64 Self {
65 execute_fn: self.execute_fn.clone(),
66 compensate_fn: self.compensate_fn.clone(),
67 }
68 }
69}
70
71impl<In: Clone, Out: Clone, Err: Clone> Operation for FnOperation<In, Out, Err> {
72 type In = In;
73 type Out = Out;
74 type Err = Err;
75
76 fn execute(&self, input: In) -> Result<Out, Err> {
77 (self.execute_fn)(input)
78 }
79
80 fn compensate(&self, input: In, result: Out) -> Result<(), Err> {
81 (self.compensate_fn)(input, result)
82 }
83}
84
85pub type TransactionResult<Out, Err> = Result<Out, TransactionFailure<Err>>;
87
88#[derive(Debug)]
90pub enum TransactionFailure<Err> {
91 FailedAndRolledBackCompletely(Err),
93 FailedAndRolledBackPartially {
96 failure: Err,
97 compensation_failure: Err,
98 },
99}
100
101impl<Err: Display> Display for TransactionFailure<Err> {
102 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103 match self {
104 TransactionFailure::FailedAndRolledBackCompletely(err) => {
105 write!(f, "Transaction failed with {err} and rolled back completely.")
106 }
107 TransactionFailure::FailedAndRolledBackPartially {
108 failure,
109 compensation_failure,
110 } => write!(
111 f,
112 "Transaction failed with {failure} and rolled back partially; compensation failed with: {compensation_failure}."
113 ),
114 }
115 }
116}
117
118pub fn fallible_transaction<Out, Err: Clone + 'static>(
122 f: impl FnOnce(&mut FallibleTransaction<Err>) -> Result<Out, Err>,
123) -> TransactionResult<Out, Err> {
124 let mut transaction = FallibleTransaction::new();
125 match f(&mut transaction) {
126 Ok(output) => Ok(output),
127 Err(error) => Err(transaction.on_fail(error)),
128 }
129}
130
131pub fn infallible_transaction<Out>(f: impl FnOnce(&mut InfallibleTransaction) -> Out) -> Out {
135 let oplog_index = get_oplog_index();
136 let _atomic_region = mark_atomic_operation();
137 let mut transaction = InfallibleTransaction::new(oplog_index);
138 f(&mut transaction)
139}
140
141pub fn infallible_transaction_with_strong_rollback_guarantees<Out>(
145 _f: impl FnOnce(&mut InfallibleTransaction) -> Out,
146) -> Out {
147 unimplemented!()
148}
149
150pub fn transaction<Out, Err, F, T>(f: F) -> TransactionResult<Out, Err>
157where
158 T: Transaction<Err>,
159 F: FnOnce(&mut T) -> Result<Out, Err>,
160{
161 T::run(f)
162}
163
164#[allow(clippy::type_complexity)]
166struct CompensationAction<Err> {
167 action: Box<dyn Fn() -> Result<(), Err>>,
168}
169
170impl<Err> CompensationAction<Err> {
171 pub fn execute(&self) -> Result<(), Err> {
172 (self.action)()
173 }
174}
175
176pub struct FallibleTransaction<Err> {
183 compensations: Vec<CompensationAction<Err>>,
184}
185
186impl<Err: Clone + 'static> FallibleTransaction<Err> {
187 fn new() -> Self {
188 Self {
189 compensations: Vec::new(),
190 }
191 }
192
193 pub fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
194 &mut self,
195 operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
196 input: OpIn,
197 ) -> Result<OpOut, Err> {
198 let result = operation.execute(input.clone());
199 if let Ok(output) = &result {
200 let cloned_op = operation.clone();
201 let cloned_out = output.clone();
202 self.compensations.push(CompensationAction {
203 action: Box::new(move || cloned_op.compensate(input.clone(), cloned_out.clone())),
204 });
205 }
206 result
207 }
208
209 fn on_fail(&mut self, failure: Err) -> TransactionFailure<Err> {
210 for compensation_action in self.compensations.drain(..).rev() {
211 if let Err(compensation_failure) = compensation_action.execute() {
212 return TransactionFailure::FailedAndRolledBackPartially {
213 failure,
214 compensation_failure,
215 };
216 }
217 }
218 TransactionFailure::FailedAndRolledBackCompletely(failure)
219 }
220}
221
222pub struct InfallibleTransaction {
233 begin_oplog_index: OplogIndex,
234 compensations: Vec<CompensationAction<()>>,
235}
236
237impl InfallibleTransaction {
238 fn new(begin_oplog_index: OplogIndex) -> Self {
239 Self {
240 begin_oplog_index,
241 compensations: Vec::new(),
242 }
243 }
244
245 pub fn execute<
246 OpIn: Clone + 'static,
247 OpOut: Clone + 'static,
248 OpErr: Debug + Clone + 'static,
249 >(
250 &mut self,
251 operation: impl Operation<In = OpIn, Out = OpOut, Err = OpErr> + 'static,
252 input: OpIn,
253 ) -> OpOut {
254 match operation.execute(input.clone()) {
255 Ok(output) => {
256 let cloned_op = operation.clone();
257 let cloned_out = output.clone();
258 self.compensations.push(CompensationAction {
259 action: Box::new(move || {
260 cloned_op
261 .compensate(input.clone(), cloned_out.clone())
262 .expect("Compensation action failed");
263 Ok(())
264 }),
265 });
266 output
267 }
268 Err(_) => {
269 self.retry();
270 unreachable!()
271 }
272 }
273 }
274
275 pub fn retry(&mut self) {
277 for compensation_action in self.compensations.drain(..).rev() {
278 let _ = compensation_action.execute();
279 }
280 set_oplog_index(self.begin_oplog_index);
281 }
282}
283
284pub trait Transaction<Err> {
288 fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
289 &mut self,
290 operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
291 input: OpIn,
292 ) -> Result<OpOut, Err>;
293
294 fn fail(&mut self, error: Err) -> Result<(), Err>;
295
296 fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err>;
297}
298
299impl<Err: Clone + 'static> Transaction<Err> for FallibleTransaction<Err> {
300 fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
301 &mut self,
302 operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
303 input: OpIn,
304 ) -> Result<OpOut, Err> {
305 FallibleTransaction::execute(self, operation, input)
306 }
307
308 fn fail(&mut self, error: Err) -> Result<(), Err> {
309 Err(error)
310 }
311
312 fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err> {
313 fallible_transaction(f)
314 }
315}
316
317impl<Err: Debug + Clone + 'static> Transaction<Err> for InfallibleTransaction {
318 fn execute<OpIn: Clone + 'static, OpOut: Clone + 'static>(
319 &mut self,
320 operation: impl Operation<In = OpIn, Out = OpOut, Err = Err> + 'static,
321 input: OpIn,
322 ) -> Result<OpOut, Err> {
323 Ok(InfallibleTransaction::execute(self, operation, input))
324 }
325
326 fn fail(&mut self, error: Err) -> Result<(), Err> {
327 InfallibleTransaction::retry(self);
328 Err(error)
329 }
330
331 fn run<Out>(f: impl FnOnce(&mut Self) -> Result<Out, Err>) -> TransactionResult<Out, Err> {
332 Ok(infallible_transaction(|tx| f(tx).unwrap()))
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use std::cell::RefCell;
339 use std::rc::Rc;
340
341 use crate::{fallible_transaction, infallible_transaction, operation};
342
343 #[test]
345 #[ignore]
346 fn tx_test_1() {
347 let log = Rc::new(RefCell::new(Vec::new()));
348
349 let log1 = log.clone();
350 let log2 = log.clone();
351 let log3 = log.clone();
352 let log4 = log.clone();
353
354 let op1 = operation(
355 move |input: String| {
356 log1.borrow_mut().push(format!("op1 execute {input}"));
357 Ok(())
358 },
359 move |input: String, _| {
360 log2.borrow_mut().push(format!("op1 rollback {input}"));
361 Ok(())
362 },
363 );
364
365 let op2 = operation(
366 move |_: ()| {
367 log3.clone().borrow_mut().push("op2 execute".to_string());
368 Err::<(), &str>("op2 error")
369 },
370 move |_: (), _| {
371 log4.clone().borrow_mut().push("op2 rollback".to_string());
372 Ok(())
373 },
374 );
375
376 let result = fallible_transaction(|tx| {
377 println!("First we execute op1");
378 tx.execute(op1, "hello".to_string())?;
379 println!("Then execute op2");
380 tx.execute(op2, ())?;
381 println!("Finally compute a result");
382 Ok(11)
383 });
384
385 println!("{log:?}");
386 println!("{result:?}");
387 }
388
389 #[test]
391 #[ignore]
392 fn tx_test_2() {
393 let log = Rc::new(RefCell::new(Vec::new()));
394
395 let log1 = log.clone();
396 let log2 = log.clone();
397 let log3 = log.clone();
398 let log4 = log.clone();
399
400 let op1 = operation(
401 move |input: String| {
402 log1.borrow_mut().push(format!("op1 execute {input}"));
403 Ok::<(), ()>(())
404 },
405 move |input: String, _| {
406 log2.borrow_mut().push(format!("op1 rollback {input}"));
407 Ok(())
408 },
409 );
410
411 let op2 = operation(
412 move |_: ()| {
413 log3.clone().borrow_mut().push("op2 execute".to_string());
414 Err::<(), &str>("op2 error")
415 },
416 move |_: (), r| {
417 log4.clone()
418 .borrow_mut()
419 .push(format!("op2 rollback {r:?}"));
420 Ok(())
421 },
422 );
423
424 let result = infallible_transaction(|tx| {
425 println!("First we execute op1");
426 tx.execute(op1, "hello".to_string());
427 println!("Then execute op2");
428 tx.execute(op2, ());
429 println!("Finally compute a result");
430 11
431 });
432
433 println!("{log:?}");
434 println!("{result:?}");
435 }
436}
437
438#[cfg(test)]
439#[cfg(feature = "macro")]
440mod macro_tests {
441 use golem_rust_macro::golem_operation;
442
443 use crate::{fallible_transaction, infallible_transaction};
444
445 mod golem_rust {
446 pub use crate::*;
447 }
448
449 #[golem_operation(compensation=test_compensation)]
450 fn test_operation(input1: u64, input2: f32) -> Result<bool, String> {
451 println!("Op input: {input1}, {input2}");
452 Ok(true)
453 }
454
455 fn test_compensation(_: bool, input1: u64, input2: f32) -> Result<(), String> {
456 println!("Compensation input: {input1}, {input2}");
457 Ok(())
458 }
459
460 #[golem_operation(compensation=test_compensation_2)]
461 fn test_operation_2(input1: u64, input2: f32) -> Result<bool, String> {
462 println!("Op input: {input1}, {input2}");
463 Ok(true)
464 }
465
466 fn test_compensation_2(result: bool) -> Result<(), String> {
467 println!("Compensation for operation result {result:?}");
468 Ok(())
469 }
470
471 #[golem_operation(compensation=test_compensation_3)]
472 fn test_operation_3(input: String) -> Result<(), String> {
473 println!("Op input: {input}");
474 Ok(())
475 }
476
477 fn test_compensation_3() -> Result<(), String> {
478 println!("Compensation for operation, not using any input");
479 Ok(())
480 }
481
482 #[golem_operation(compensation=test_compensation_4)]
483 fn test_operation_4(input: u64) -> Result<(), String> {
484 println!("Op input: {input}");
485 Ok(())
486 }
487
488 fn test_compensation_4(_: (), input: u64) -> Result<(), String> {
489 println!("Compensation for operation with single input {input}");
490 Ok(())
491 }
492
493 #[test]
495 #[ignore]
496 fn tx_test_1() {
497 let result = fallible_transaction(|tx| {
498 println!("Executing the annotated function as an operation directly");
499 tx.test_operation(1, 0.1)?;
500 tx.test_operation_2(1, 0.1)?;
501 tx.test_operation_3("test".to_string())?;
502 tx.test_operation_4(1)?;
503
504 Ok(11)
505 });
506
507 println!("{result:?}");
508 }
509
510 #[test]
512 #[ignore]
513 fn tx_test_2() {
514 let result = infallible_transaction(|tx| {
515 println!("Executing the annotated function as an operation directly");
516 let _ = tx.test_operation(1, 0.1);
517 11
518 });
519
520 println!("{result:?}");
521 }
522}