restate_sdk/context/run.rs
1use crate::errors::HandlerResult;
2use crate::serde::{Deserialize, Serialize};
3use std::future::Future;
4use std::time::Duration;
5
6/// Run closure trait
7pub trait RunClosure {
8 type Output: Deserialize + Serialize + 'static;
9 type Fut: Future<Output = HandlerResult<Self::Output>>;
10
11 fn run(self) -> Self::Fut;
12}
13
14impl<F, O, Fut> RunClosure for F
15where
16 F: FnOnce() -> Fut,
17 Fut: Future<Output = HandlerResult<O>>,
18 O: Deserialize + Serialize + 'static,
19{
20 type Output = O;
21 type Fut = Fut;
22
23 fn run(self) -> Self::Fut {
24 self()
25 }
26}
27
28/// Future created using [`ContextSideEffects::run`](super::ContextSideEffects::run).
29pub trait RunFuture<O>: Future<Output = O> {
30 /// Provide a custom retry policy for this `run` operation.
31 ///
32 /// If unspecified, the `run` will be retried using the [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server),
33 /// which by default retries indefinitely.
34 fn retry_policy(self, retry_policy: RunRetryPolicy) -> Self;
35
36 /// Define a name for this `run` operation.
37 ///
38 /// This is used mainly for observability.
39 fn name(self, name: impl Into<String>) -> Self;
40}
41
42/// This struct represents the policy to execute retries for run closures.
43#[derive(Debug, Clone)]
44pub struct RunRetryPolicy {
45 pub(crate) initial_delay: Duration,
46 pub(crate) factor: f32,
47 pub(crate) max_delay: Option<Duration>,
48 pub(crate) max_attempts: Option<u32>,
49 pub(crate) max_duration: Option<Duration>,
50}
51
52impl Default for RunRetryPolicy {
53 fn default() -> Self {
54 Self {
55 initial_delay: Duration::from_millis(100),
56 factor: 2.0,
57 max_delay: Some(Duration::from_secs(2)),
58 max_attempts: None,
59 max_duration: Some(Duration::from_secs(50)),
60 }
61 }
62}
63
64impl RunRetryPolicy {
65 /// Create a new retry policy.
66 pub fn new() -> Self {
67 Self {
68 initial_delay: Duration::from_millis(100),
69 factor: 1.0,
70 max_delay: None,
71 max_attempts: None,
72 max_duration: None,
73 }
74 }
75
76 /// Initial retry delay for the first retry attempt.
77 pub fn initial_delay(mut self, initial_interval: Duration) -> Self {
78 self.initial_delay = initial_interval;
79 self
80 }
81
82 /// Exponentiation factor to use when computing the next retry delay.
83 pub fn exponentiation_factor(mut self, factor: f32) -> Self {
84 self.factor = factor;
85 self
86 }
87
88 /// Maximum delay between retries.
89 pub fn max_delay(mut self, max_interval: Duration) -> Self {
90 self.max_delay = Some(max_interval);
91 self
92 }
93
94 /// Gives up retrying when either at least the given number of attempts, including the initial attempt,
95 /// is reached, or `max_duration` (if set) is reached first.
96 ///
97 /// **Note:** The number of actual retries may be higher than the provided value.
98 /// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
99 ///
100 /// Infinite retries if this field and `max_duration` are unset.
101 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
102 self.max_attempts = Some(max_attempts);
103 self
104 }
105
106 /// Gives up retrying when either the retry loop lasted at least for this given max duration,
107 /// or `max_attempts` (if set) is reached first.
108 ///
109 /// **Note:** The real retry loop duration may be higher than the given duration.
110 /// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
111 ///
112 /// Infinite retries if this field and `max_attempts` are unset.
113 pub fn max_duration(mut self, max_duration: Duration) -> Self {
114 self.max_duration = Some(max_duration);
115 self
116 }
117}