restate_sdk/endpoint/builder.rs
1use crate::endpoint::{BoxedService, Endpoint, EndpointInner, Error};
2use crate::service::{Discoverable, Service};
3use futures::future::BoxFuture;
4use restate_sdk_shared_core::{IdentityVerifier, KeyError};
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8
9/// Various configuration options that can be provided when binding a service
10#[derive(Default, Debug, Clone)]
11pub struct ServiceOptions {
12 pub(crate) metadata: HashMap<String, String>,
13 pub(crate) inactivity_timeout: Option<Duration>,
14 pub(crate) abort_timeout: Option<Duration>,
15 pub(crate) idempotency_retention: Option<Duration>,
16 pub(crate) journal_retention: Option<Duration>,
17 pub(crate) enable_lazy_state: Option<bool>,
18 pub(crate) ingress_private: Option<bool>,
19 // Retry policy options
20 pub(crate) retry_policy_initial_interval: Option<Duration>,
21 pub(crate) retry_policy_exponentiation_factor: Option<f64>,
22 pub(crate) retry_policy_max_interval: Option<Duration>,
23 pub(crate) retry_policy_max_attempts: Option<u64>,
24 pub(crate) retry_policy_on_max_attempts: Option<crate::discovery::RetryPolicyOnMaxAttempts>,
25 pub(crate) handler_options: HashMap<String, HandlerOptions>,
26
27 _priv: (),
28}
29
30impl ServiceOptions {
31 pub fn new() -> Self {
32 Self::default()
33 }
34
35 /// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
36 /// termination by asking the invocation to suspend (which preserves intermediate progress).
37 ///
38 /// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
39 /// suspend.
40 ///
41 /// This overrides the default inactivity timeout configured in the restate-server for all
42 /// invocations to this service.
43 pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
44 self.inactivity_timeout = Some(timeout);
45 self
46 }
47
48 /// This timer guards against stalled service/handler invocations that are supposed to terminate. The
49 /// abort timeout is started after the inactivity_timeout has expired and the service/handler
50 /// invocation has been asked to gracefully terminate. Once the timer expires, it will abort the
51 /// service/handler invocation.
52 ///
53 /// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
54 /// terminate, then this value needs to be set accordingly.
55 ///
56 /// This overrides the default abort timeout configured in the restate-server for all invocations to
57 /// this service.
58 pub fn abort_timeout(mut self, timeout: Duration) -> Self {
59 self.abort_timeout = Some(timeout);
60 self
61 }
62
63 /// The retention duration of idempotent requests to this service.
64 pub fn idempotency_retention(mut self, retention: Duration) -> Self {
65 self.idempotency_retention = Some(retention);
66 self
67 }
68
69 /// The journal retention. When set, this applies to all requests to all handlers of this service.
70 ///
71 /// In case the request has an idempotency key, the idempotency_retention caps the journal retention
72 /// time.
73 pub fn journal_retention(mut self, retention: Duration) -> Self {
74 self.journal_retention = Some(retention);
75 self
76 }
77
78 /// When set to `true`, lazy state will be enabled for all invocations to this service. This is
79 /// relevant only for workflows and virtual objects.
80 pub fn enable_lazy_state(mut self, enable: bool) -> Self {
81 self.enable_lazy_state = Some(enable);
82 self
83 }
84
85 /// When set to `true` this service, with all its handlers, cannot be invoked from the restate-server
86 /// HTTP and Kafka ingress, but only from other services.
87 pub fn ingress_private(mut self, private: bool) -> Self {
88 self.ingress_private = Some(private);
89 self
90 }
91
92 /// Custom metadata of this service definition. This metadata is shown on the Admin API when querying the service definition.
93 pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
94 self.metadata.insert(key.into(), value.into());
95 self
96 }
97
98 /// Initial delay before the first retry attempt.
99 ///
100 /// If unset, the server default is used.
101 pub fn retry_policy_initial_interval(mut self, interval: Duration) -> Self {
102 self.retry_policy_initial_interval = Some(interval);
103 self
104 }
105
106 /// Exponential backoff multiplier used to compute the next retry delay.
107 ///
108 /// For attempt n, the next delay is roughly previousDelay * exponentiationFactor,
109 /// capped by retry_policy_max_interval if set.
110 pub fn retry_policy_exponentiation_factor(mut self, factor: f64) -> Self {
111 self.retry_policy_exponentiation_factor = Some(factor);
112 self
113 }
114
115 /// Upper bound for the computed retry delay.
116 pub fn retry_policy_max_interval(mut self, interval: Duration) -> Self {
117 self.retry_policy_max_interval = Some(interval);
118 self
119 }
120
121 /// Maximum number of attempts before giving up retrying.
122 ///
123 /// The initial call counts as the first attempt; retries increment the count by 1.
124 pub fn retry_policy_max_attempts(mut self, attempts: u64) -> Self {
125 self.retry_policy_max_attempts = Some(attempts);
126 self
127 }
128
129 /// Behavior when the configured retry_policy_max_attempts is reached: pause the invocation.
130 ///
131 /// The invocation enters the paused state and can be manually resumed from the CLI or UI.
132 pub fn retry_policy_pause_on_max_attempts(mut self) -> Self {
133 self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Pause);
134 self
135 }
136
137 /// Behavior when the configured retry_policy_max_attempts is reached: kill the invocation.
138 ///
139 /// The invocation will be marked as failed and will not be retried unless explicitly re-triggered.
140 pub fn retry_policy_kill_on_max_attempts(mut self) -> Self {
141 self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Kill);
142 self
143 }
144
145 /// Handler-specific options.
146 ///
147 /// *Note*: If you provide a handler name for a non-existing handler, binding the service will *panic!*.
148 pub fn handler(mut self, handler_name: impl Into<String>, options: HandlerOptions) -> Self {
149 self.handler_options.insert(handler_name.into(), options);
150 self
151 }
152}
153
154/// Various configuration options that can be provided when binding a service handler
155#[derive(Default, Debug, Clone)]
156pub struct HandlerOptions {
157 pub(crate) metadata: HashMap<String, String>,
158 pub(crate) inactivity_timeout: Option<Duration>,
159 pub(crate) abort_timeout: Option<Duration>,
160 pub(crate) idempotency_retention: Option<Duration>,
161 pub(crate) workflow_retention: Option<Duration>,
162 pub(crate) journal_retention: Option<Duration>,
163 pub(crate) ingress_private: Option<bool>,
164 pub(crate) enable_lazy_state: Option<bool>,
165 // Retry policy options
166 pub(crate) retry_policy_initial_interval: Option<Duration>,
167 pub(crate) retry_policy_exponentiation_factor: Option<f64>,
168 pub(crate) retry_policy_max_interval: Option<Duration>,
169 pub(crate) retry_policy_max_attempts: Option<u64>,
170 pub(crate) retry_policy_on_max_attempts: Option<crate::discovery::RetryPolicyOnMaxAttempts>,
171
172 _priv: (),
173}
174
175impl HandlerOptions {
176 pub fn new() -> Self {
177 Self::default()
178 }
179
180 /// Custom metadata of this handler definition. This metadata is shown on the Admin API when querying the service/handler definition.
181 pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
182 self.metadata.insert(key.into(), value.into());
183 self
184 }
185
186 /// This timer guards against stalled invocations. Once it expires, Restate triggers a graceful
187 /// termination by asking the invocation to suspend (which preserves intermediate progress).
188 ///
189 /// The abort_timeout is used to abort the invocation, in case it doesn't react to the request to
190 /// suspend.
191 ///
192 /// This overrides the inactivity timeout set for the service and the default set in restate-server.
193 pub fn inactivity_timeout(mut self, timeout: Duration) -> Self {
194 self.inactivity_timeout = Some(timeout);
195 self
196 }
197
198 /// This timer guards against stalled invocations that are supposed to terminate. The abort timeout
199 /// is started after the inactivity_timeout has expired and the invocation has been asked to
200 /// gracefully terminate. Once the timer expires, it will abort the invocation.
201 ///
202 /// This timer potentially *interrupts* user code. If the user code needs longer to gracefully
203 /// terminate, then this value needs to be set accordingly.
204 ///
205 /// This overrides the abort timeout set for the service and the default set in restate-server.
206 pub fn abort_timeout(mut self, timeout: Duration) -> Self {
207 self.abort_timeout = Some(timeout);
208 self
209 }
210
211 /// The retention duration of idempotent requests to this service.
212 pub fn idempotency_retention(mut self, retention: Duration) -> Self {
213 self.idempotency_retention = Some(retention);
214 self
215 }
216
217 /// The retention duration for this workflow handler.
218 pub fn workflow_retention(mut self, retention: Duration) -> Self {
219 self.workflow_retention = Some(retention);
220 self
221 }
222
223 /// The journal retention for invocations to this handler.
224 ///
225 /// In case the request has an idempotency key, the idempotency_retention caps the journal retention
226 /// time.
227 pub fn journal_retention(mut self, retention: Duration) -> Self {
228 self.journal_retention = Some(retention);
229 self
230 }
231
232 /// When set to `true` this handler cannot be invoked from the restate-server HTTP and Kafka ingress,
233 /// but only from other services.
234 pub fn ingress_private(mut self, private: bool) -> Self {
235 self.ingress_private = Some(private);
236 self
237 }
238
239 /// When set to `true`, lazy state will be enabled for all invocations to this handler. This is
240 /// relevant only for workflows and virtual objects.
241 pub fn enable_lazy_state(mut self, enable: bool) -> Self {
242 self.enable_lazy_state = Some(enable);
243 self
244 }
245
246 /// Initial delay before the first retry attempt.
247 ///
248 /// If unset, the server default is used.
249 pub fn retry_policy_initial_interval(mut self, interval: Duration) -> Self {
250 self.retry_policy_initial_interval = Some(interval);
251 self
252 }
253
254 /// Exponential backoff multiplier used to compute the next retry delay.
255 ///
256 /// For attempt n, the next delay is roughly previousDelay * exponentiationFactor,
257 /// capped by retry_policy_max_interval if set.
258 pub fn retry_policy_exponentiation_factor(mut self, factor: f64) -> Self {
259 self.retry_policy_exponentiation_factor = Some(factor);
260 self
261 }
262
263 /// Upper bound for the computed retry delay.
264 pub fn retry_policy_max_interval(mut self, interval: Duration) -> Self {
265 self.retry_policy_max_interval = Some(interval);
266 self
267 }
268
269 /// Maximum number of attempts before giving up retrying.
270 ///
271 /// The initial call counts as the first attempt; retries increment the count by 1.
272 pub fn retry_policy_max_attempts(mut self, attempts: u64) -> Self {
273 self.retry_policy_max_attempts = Some(attempts);
274 self
275 }
276
277 /// Behavior when the configured retry_policy_max_attempts is reached: pause the invocation.
278 ///
279 /// The invocation enters the paused state and can be manually resumed from the CLI or UI.
280 pub fn retry_policy_pause_on_max_attempts(mut self) -> Self {
281 self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Pause);
282 self
283 }
284
285 /// Behavior when the configured retry_policy_max_attempts is reached: kill the invocation.
286 ///
287 /// The invocation will be marked as failed and will not be retried unless explicitly re-triggered.
288 pub fn retry_policy_kill_on_max_attempts(mut self) -> Self {
289 self.retry_policy_on_max_attempts = Some(crate::discovery::RetryPolicyOnMaxAttempts::Kill);
290 self
291 }
292}
293
294/// Builder for [`Endpoint`]
295#[derive(Default)]
296pub struct Builder {
297 svcs: HashMap<String, BoxedService>,
298 discovery_services: Vec<crate::discovery::Service>,
299 identity_verifier: IdentityVerifier,
300}
301
302impl Builder {
303 /// Create a new builder for [`Endpoint`].
304 pub fn new() -> Self {
305 Self::default()
306 }
307
308 /// Add a [`Service`] to this endpoint.
309 ///
310 /// When using the [`service`](macro@crate::service), [`object`](macro@crate::object) or [`workflow`](macro@crate::workflow) macros,
311 /// you need to pass the result of the `serve` method.
312 pub fn bind<
313 S: Service<Future = BoxFuture<'static, Result<(), Error>>>
314 + Discoverable
315 + Send
316 + Sync
317 + 'static,
318 >(
319 self,
320 s: S,
321 ) -> Self {
322 self.bind_with_options(s, ServiceOptions::default())
323 }
324
325 /// Like [`bind`], but providing options
326 pub fn bind_with_options<
327 S: Service<Future = BoxFuture<'static, Result<(), Error>>>
328 + Discoverable
329 + Send
330 + Sync
331 + 'static,
332 >(
333 mut self,
334 s: S,
335 service_options: ServiceOptions,
336 ) -> Self {
337 // Discover and apply options
338 let mut service_metadata = S::discover();
339 service_metadata.apply_options(service_options);
340
341 let boxed_service = BoxedService::new(s);
342 self.svcs
343 .insert(service_metadata.name.to_string(), boxed_service);
344 self.discovery_services.push(service_metadata);
345 self
346 }
347
348 /// Add identity key, e.g. `publickeyv1_ChjENKeMvCtRnqG2mrBK1HmPKufgFUc98K8B3ononQvp`.
349 pub fn identity_key(mut self, key: &str) -> Result<Self, KeyError> {
350 self.identity_verifier = self.identity_verifier.with_key(key)?;
351 Ok(self)
352 }
353
354 /// Build the [`Endpoint`].
355 pub fn build(self) -> Endpoint {
356 Endpoint(Arc::new(EndpointInner {
357 svcs: self.svcs,
358 discovery_services: self.discovery_services,
359 identity_verifier: self.identity_verifier,
360 }))
361 }
362}