restate_sdk/context/
run.rs

1use crate::errors::HandlerResult;
2use crate::serde::{Deserialize, Serialize};
3use std::future::Future;
4use std::time::Duration;
5
6/// Run closure trait
7pub trait RunClosure {
8    type Output: Deserialize + Serialize + 'static;
9    type Fut: Future<Output = HandlerResult<Self::Output>>;
10
11    fn run(self) -> Self::Fut;
12}
13
14impl<F, O, Fut> RunClosure for F
15where
16    F: FnOnce() -> Fut,
17    Fut: Future<Output = HandlerResult<O>>,
18    O: Deserialize + Serialize + 'static,
19{
20    type Output = O;
21    type Fut = Fut;
22
23    fn run(self) -> Self::Fut {
24        self()
25    }
26}
27
28/// Future created using [`ContextSideEffects::run`](super::ContextSideEffects::run).
29pub trait RunFuture<O>: Future<Output = O> {
30    /// Provide a custom retry policy for this `run` operation.
31    ///
32    /// If unspecified, the `run` will be retried using the [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server),
33    /// which by default retries indefinitely.
34    fn retry_policy(self, retry_policy: RunRetryPolicy) -> Self;
35
36    /// Define a name for this `run` operation.
37    ///
38    /// This is used mainly for observability.
39    fn name(self, name: impl Into<String>) -> Self;
40}
41
42/// This struct represents the policy to execute retries for run closures.
43#[derive(Debug, Clone)]
44pub struct RunRetryPolicy {
45    pub(crate) initial_delay: Duration,
46    pub(crate) factor: f32,
47    pub(crate) max_delay: Option<Duration>,
48    pub(crate) max_attempts: Option<u32>,
49    pub(crate) max_duration: Option<Duration>,
50}
51
52impl Default for RunRetryPolicy {
53    fn default() -> Self {
54        Self {
55            initial_delay: Duration::from_millis(100),
56            factor: 2.0,
57            max_delay: Some(Duration::from_secs(2)),
58            max_attempts: None,
59            max_duration: Some(Duration::from_secs(50)),
60        }
61    }
62}
63
64impl RunRetryPolicy {
65    /// Create a new retry policy.
66    pub fn new() -> Self {
67        Self {
68            initial_delay: Duration::from_millis(100),
69            factor: 1.0,
70            max_delay: None,
71            max_attempts: None,
72            max_duration: None,
73        }
74    }
75
76    /// Initial retry delay for the first retry attempt.
77    pub fn initial_delay(mut self, initial_interval: Duration) -> Self {
78        self.initial_delay = initial_interval;
79        self
80    }
81
82    /// Exponentiation factor to use when computing the next retry delay.
83    pub fn exponentiation_factor(mut self, factor: f32) -> Self {
84        self.factor = factor;
85        self
86    }
87
88    /// Maximum delay between retries.
89    pub fn max_delay(mut self, max_interval: Duration) -> Self {
90        self.max_delay = Some(max_interval);
91        self
92    }
93
94    /// Gives up retrying when either at least the given number of attempts is reached,
95    /// or `max_duration` (if set) is reached first.
96    ///
97    /// **Note:** The number of actual retries may be higher than the provided value.
98    /// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
99    ///
100    /// Infinite retries if this field and `max_duration` are unset.
101    pub fn max_attempts(mut self, max_attempts: u32) -> Self {
102        self.max_attempts = Some(max_attempts);
103        self
104    }
105
106    /// Gives up retrying when either the retry loop lasted at least for this given max duration,
107    /// or `max_attempts` (if set) is reached first.
108    ///
109    /// **Note:** The real retry loop duration may be higher than the given duration.
110    /// This is due to the nature of the run operation, which executes the closure on the service and sends the result afterward to Restate.
111    ///
112    /// Infinite retries if this field and `max_attempts` are unset.
113    pub fn max_duration(mut self, max_duration: Duration) -> Self {
114        self.max_duration = Some(max_duration);
115        self
116    }
117}