restate_sdk/endpoint/
builder.rs

1use crate::endpoint::{BoxedService, Endpoint, EndpointInner, Error};
2use crate::service::{Discoverable, Service};
3use futures::future::BoxFuture;
4use restate_sdk_shared_core::{IdentityVerifier, KeyError};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8
9/// Various configuration options that can be provided when binding a service
10#[derive(Default, Debug, Clone)]
11pub struct ServiceOptions {
12    pub(crate) metadata: HashMap<String, String>,
13    pub(crate) inactivity_timeout: Option<Duration>,
14    pub(crate) abort_timeout: Option<Duration>,
15    pub(crate) idempotency_retention: Option<Duration>,
16    pub(crate) journal_retention: Option<Duration>,
17    pub(crate) enable_lazy_state: Option<bool>,
18    pub(crate) ingress_private: Option<bool>,
19    // Retry policy options
20    pub(crate) retry_policy_initial_interval: Option<Duration>,
21    pub(crate) retry_policy_exponentiation_factor: Option<f64>,
22    pub(crate) retry_policy_max_interval: Option<Duration>,
23    pub(crate) retry_policy_max_attempts: Option<u64>,
24    pub(crate) retry_policy_on_max_attempts: Option<crate::discovery::RetryPolicyOnMaxAttempts>,
25    pub(crate) handler_options: HashMap<String, HandlerOptions>,
26
27    _priv: (),
28}
29
30impl ServiceOptions {
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    /// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
36    /// termination by asking the invocation to suspend (which preserves intermediate progress).
37    ///
38    /// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
39    /// suspend.
40    ///
41    /// This overrides the default inactivity timeout configured in the restate-server for all
42    /// invocations to this service.
43    pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
44        self.inactivity_timeout = Some(timeout);
45        self
46    }
47
48    /// This timer guards against stalled service/handler invocations that are supposed to terminate. The
49    /// abort timeout is started after the inactivity_timeout has expired and the service/handler
50    /// invocation has been asked to gracefully terminate. Once the timer expires, it will abort the
51    /// service/handler invocation.
52    ///
53    /// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
54    /// terminate, then this value needs to be set accordingly.
55    ///
56    /// This overrides the default abort timeout configured in the restate-server for all invocations to
57    /// this service.
58    pub fn abort_timeout(mut self, timeout: Duration) -> Self {
59        self.abort_timeout = Some(timeout);
60        self
61    }
62
63    /// The retention duration of idempotent requests to this service.
64    pub fn idempotency_retention(mut self, retention: Duration) -> Self {
65        self.idempotency_retention = Some(retention);
66        self
67    }
68
69    /// The journal retention. When set, this applies to all requests to all handlers of this service.
70    ///
71    /// In case the request has an idempotency key, the idempotency_retention caps the journal retention
72    /// time.
73    pub fn journal_retention(mut self, retention: Duration) -> Self {
74        self.journal_retention = Some(retention);
75        self
76    }
77
78    /// When set to `true`, lazy state will be enabled for all invocations to this service. This is
79    /// relevant only for workflows and virtual objects.
80    pub fn enable_lazy_state(mut self, enable: bool) -> Self {
81        self.enable_lazy_state = Some(enable);
82        self
83    }
84
85    /// When set to `true` this service, with all its handlers, cannot be invoked from the restate-server
86    /// HTTP and Kafka ingress, but only from other services.
87    pub fn ingress_private(mut self, private: bool) -> Self {
88        self.ingress_private = Some(private);
89        self
90    }
91
92    /// Custom metadata of this service definition. This metadata is shown on the Admin API when querying the service definition.
93    pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
94        self.metadata.insert(key.into(), value.into());
95        self
96    }
97
98    /// Initial delay before the first retry attempt.
99    ///
100    /// If unset, the server default is used.
101    pub fn retry_policy_initial_interval(mut self, interval: Duration) -> Self {
102        self.retry_policy_initial_interval = Some(interval);
103        self
104    }
105
106    /// Exponential backoff multiplier used to compute the next retry delay.
107    ///
108    /// For attempt n, the next delay is roughly previousDelay * exponentiationFactor,
109    /// capped by retry_policy_max_interval if set.
110    pub fn retry_policy_exponentiation_factor(mut self, factor: f64) -> Self {
111        self.retry_policy_exponentiation_factor = Some(factor);
112        self
113    }
114
115    /// Upper bound for the computed retry delay.
116    pub fn retry_policy_max_interval(mut self, interval: Duration) -> Self {
117        self.retry_policy_max_interval = Some(interval);
118        self
119    }
120
121    /// Maximum number of attempts before giving up retrying.
122    ///
123    /// The initial call counts as the first attempt; retries increment the count by 1.
124    pub fn retry_policy_max_attempts(mut self, attempts: u64) -> Self {
125        self.retry_policy_max_attempts = Some(attempts);
126        self
127    }
128
129    /// Behavior when the configured retry_policy_max_attempts is reached: pause the invocation.
130    ///
131    /// The invocation enters the paused state and can be manually resumed from the CLI or UI.
132    pub fn retry_policy_pause_on_max_attempts(mut self) -> Self {
133        self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Pause);
134        self
135    }
136
137    /// Behavior when the configured retry_policy_max_attempts is reached: kill the invocation.
138    ///
139    /// The invocation will be marked as failed and will not be retried unless explicitly re-triggered.
140    pub fn retry_policy_kill_on_max_attempts(mut self) -> Self {
141        self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Kill);
142        self
143    }
144
145    /// Handler-specific options.
146    ///
147    /// *Note*: If you provide a handler name for a non-existing handler, binding the service will *panic!*.
148    pub fn handler(mut self, handler_name: impl Into<String>, options: HandlerOptions) -> Self {
149        self.handler_options.insert(handler_name.into(), options);
150        self
151    }
152}
153
154/// Various configuration options that can be provided when binding a service handler
155#[derive(Default, Debug, Clone)]
156pub struct HandlerOptions {
157    pub(crate) metadata: HashMap<String, String>,
158    pub(crate) inactivity_timeout: Option<Duration>,
159    pub(crate) abort_timeout: Option<Duration>,
160    pub(crate) idempotency_retention: Option<Duration>,
161    pub(crate) workflow_retention: Option<Duration>,
162    pub(crate) journal_retention: Option<Duration>,
163    pub(crate) ingress_private: Option<bool>,
164    pub(crate) enable_lazy_state: Option<bool>,
165    // Retry policy options
166    pub(crate) retry_policy_initial_interval: Option<Duration>,
167    pub(crate) retry_policy_exponentiation_factor: Option<f64>,
168    pub(crate) retry_policy_max_interval: Option<Duration>,
169    pub(crate) retry_policy_max_attempts: Option<u64>,
170    pub(crate) retry_policy_on_max_attempts: Option<crate::discovery::RetryPolicyOnMaxAttempts>,
171
172    _priv: (),
173}
174
175impl HandlerOptions {
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    /// Custom metadata of this handler definition. This metadata is shown on the Admin API when querying the service/handler definition.
181    pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
182        self.metadata.insert(key.into(), value.into());
183        self
184    }
185
186    /// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
187    /// termination by asking the invocation to suspend (which preserves intermediate progress).
188    ///
189    /// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
190    /// suspend.
191    ///
192    /// This overrides the inactivity timeout set for the service and the default set in restate-server.
193    pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
194        self.inactivity_timeout = Some(timeout);
195        self
196    }
197
198    /// This timer guards against stalled invocations that are supposed to terminate. The abort timeout
199    /// is started after the inactivity_timeout has expired and the invocation has been asked to
200    /// gracefully terminate. Once the timer expires, it will abort the invocation.
201    ///
202    /// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
203    /// terminate, then this value needs to be set accordingly.
204    ///
205    /// This overrides the abort timeout set for the service and the default set in restate-server.
206    pub fn abort_timeout(mut self, timeout: Duration) -> Self {
207        self.abort_timeout = Some(timeout);
208        self
209    }
210
211    /// The retention duration of idempotent requests to this service.
212    pub fn idempotency_retention(mut self, retention: Duration) -> Self {
213        self.idempotency_retention = Some(retention);
214        self
215    }
216
217    /// The retention duration for this workflow handler.
218    pub fn workflow_retention(mut self, retention: Duration) -> Self {
219        self.workflow_retention = Some(retention);
220        self
221    }
222
223    /// The journal retention for invocations to this handler.
224    ///
225    /// In case the request has an idempotency key, the idempotency_retention caps the journal retention
226    /// time.
227    pub fn journal_retention(mut self, retention: Duration) -> Self {
228        self.journal_retention = Some(retention);
229        self
230    }
231
232    /// When set to `true` this handler cannot be invoked from the restate-server HTTP and Kafka ingress,
233    /// but only from other services.
234    pub fn ingress_private(mut self, private: bool) -> Self {
235        self.ingress_private = Some(private);
236        self
237    }
238
239    /// When set to `true`, lazy state will be enabled for all invocations to this handler. This is
240    /// relevant only for workflows and virtual objects.
241    pub fn enable_lazy_state(mut self, enable: bool) -> Self {
242        self.enable_lazy_state = Some(enable);
243        self
244    }
245
246    /// Initial delay before the first retry attempt.
247    ///
248    /// If unset, the server default is used.
249    pub fn retry_policy_initial_interval(mut self, interval: Duration) -> Self {
250        self.retry_policy_initial_interval = Some(interval);
251        self
252    }
253
254    /// Exponential backoff multiplier used to compute the next retry delay.
255    ///
256    /// For attempt n, the next delay is roughly previousDelay * exponentiationFactor,
257    /// capped by retry_policy_max_interval if set.
258    pub fn retry_policy_exponentiation_factor(mut self, factor: f64) -> Self {
259        self.retry_policy_exponentiation_factor = Some(factor);
260        self
261    }
262
263    /// Upper bound for the computed retry delay.
264    pub fn retry_policy_max_interval(mut self, interval: Duration) -> Self {
265        self.retry_policy_max_interval = Some(interval);
266        self
267    }
268
269    /// Maximum number of attempts before giving up retrying.
270    ///
271    /// The initial call counts as the first attempt; retries increment the count by 1.
272    pub fn retry_policy_max_attempts(mut self, attempts: u64) -> Self {
273        self.retry_policy_max_attempts = Some(attempts);
274        self
275    }
276
277    /// Behavior when the configured retry_policy_max_attempts is reached: pause the invocation.
278    ///
279    /// The invocation enters the paused state and can be manually resumed from the CLI or UI.
280    pub fn retry_policy_pause_on_max_attempts(mut self) -> Self {
281        self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Pause);
282        self
283    }
284
285    /// Behavior when the configured retry_policy_max_attempts is reached: kill the invocation.
286    ///
287    /// The invocation will be marked as failed and will not be retried unless explicitly re-triggered.
288    pub fn retry_policy_kill_on_max_attempts(mut self) -> Self {
289        self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Kill);
290        self
291    }
292}
293
294/// Builder for [`Endpoint`]
295#[derive(Default)]
296pub struct Builder {
297    svcs: HashMap<String, BoxedService>,
298    discovery_services: Vec<crate::discovery::Service>,
299    identity_verifier: IdentityVerifier,
300}
301
302impl Builder {
303    /// Create a new builder for [`Endpoint`].
304    pub fn new() -> Self {
305        Self::default()
306    }
307
308    /// Add a [`Service`] to this endpoint.
309    ///
310    /// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros,
311    /// you need to pass the result of the `serve` method.
312    pub fn bind<
313        S: Service<Future = BoxFuture<'static, Result<(), Error>>>
314            + Discoverable
315            + Send
316            + Sync
317            + 'static,
318    >(
319        self,
320        s: S,
321    ) -> Self {
322        self.bind_with_options(s, ServiceOptions::default())
323    }
324
325    /// Like [`bind`], but providing options
326    pub fn bind_with_options<
327        S: Service<Future = BoxFuture<'static, Result<(), Error>>>
328            + Discoverable
329            + Send
330            + Sync
331            + 'static,
332    >(
333        mut self,
334        s: S,
335        service_options: ServiceOptions,
336    ) -> Self {
337        // Discover and apply options
338        let mut service_metadata = S::discover();
339        service_metadata.apply_options(service_options);
340
341        let boxed_service = BoxedService::new(s);
342        self.svcs
343            .insert(service_metadata.name.to_string(), boxed_service);
344        self.discovery_services.push(service_metadata);
345        self
346    }
347
348    /// Add identity key, e.g. `publickeyv1_ChjENKeMvCtRnqG2mrBK1HmPKufgFUc98K8B3ononQvp`.
349    pub fn identity_key(mut self, key: &str) -> Result<Self, KeyError> {
350        self.identity_verifier = self.identity_verifier.with_key(key)?;
351        Ok(self)
352    }
353
354    /// Build the [`Endpoint`].
355    pub fn build(self) -> Endpoint {
356        Endpoint(Arc::new(EndpointInner {
357            svcs: self.svcs,
358            discovery_services: self.discovery_services,
359            identity_verifier: self.identity_verifier,
360        }))
361    }
362}