restate_sdk/context/mod.rs
1//! Types exposing Restate functionalities to service handlers.
2
3use crate::endpoint::{ContextInternal, InputMetadata};
4use crate::errors::{HandlerResult, TerminalError};
5use crate::serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8
9#[doc(hidden)]
10pub mod macro_support;
11mod request;
12mod run;
13mod select;
14
15pub use request::{CallFuture, InvocationHandle, Request, RequestTarget};
16pub use run::{RunClosure, RunFuture, RunRetryPolicy};
17
18pub type HeaderMap = http::HeaderMap<String>;
19
20/// Service handler context.
21pub struct Context<'ctx> {
22 random_seed: u64,
23 #[cfg(feature = "rand")]
24 std_rng: rand::prelude::StdRng,
25 headers: HeaderMap,
26 inner: &'ctx ContextInternal,
27}
28
29impl<'ctx> Context<'ctx> {
30 /// Get request headers.
31 pub fn headers(&self) -> &HeaderMap {
32 &self.headers
33 }
34
35 /// Get request headers.
36 pub fn headers_mut(&mut self) -> &HeaderMap {
37 &mut self.headers
38 }
39}
40
41impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for Context<'ctx> {
42 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
43 Self {
44 random_seed: value.1.random_seed,
45 #[cfg(feature = "rand")]
46 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
47 headers: value.1.headers,
48 inner: value.0,
49 }
50 }
51}
52
53/// Object shared handler context.
54pub struct SharedObjectContext<'ctx> {
55 key: String,
56 random_seed: u64,
57 #[cfg(feature = "rand")]
58 std_rng: rand::prelude::StdRng,
59 headers: HeaderMap,
60 pub(crate) inner: &'ctx ContextInternal,
61}
62
63impl<'ctx> SharedObjectContext<'ctx> {
64 /// Get object key.
65 pub fn key(&self) -> &str {
66 &self.key
67 }
68
69 /// Get request headers.
70 pub fn headers(&self) -> &HeaderMap {
71 &self.headers
72 }
73
74 /// Get request headers.
75 pub fn headers_mut(&mut self) -> &HeaderMap {
76 &mut self.headers
77 }
78}
79
80impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedObjectContext<'ctx> {
81 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
82 Self {
83 key: value.1.key,
84 random_seed: value.1.random_seed,
85 #[cfg(feature = "rand")]
86 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
87 headers: value.1.headers,
88 inner: value.0,
89 }
90 }
91}
92
93/// Object handler context.
94pub struct ObjectContext<'ctx> {
95 key: String,
96 random_seed: u64,
97 #[cfg(feature = "rand")]
98 std_rng: rand::prelude::StdRng,
99 headers: HeaderMap,
100 pub(crate) inner: &'ctx ContextInternal,
101}
102
103impl<'ctx> ObjectContext<'ctx> {
104 /// Get object key.
105 pub fn key(&self) -> &str {
106 &self.key
107 }
108
109 /// Get request headers.
110 pub fn headers(&self) -> &HeaderMap {
111 &self.headers
112 }
113
114 /// Get request headers.
115 pub fn headers_mut(&mut self) -> &HeaderMap {
116 &mut self.headers
117 }
118}
119
120impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for ObjectContext<'ctx> {
121 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
122 Self {
123 key: value.1.key,
124 random_seed: value.1.random_seed,
125 #[cfg(feature = "rand")]
126 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
127 headers: value.1.headers,
128 inner: value.0,
129 }
130 }
131}
132
133/// Workflow shared handler context.
134pub struct SharedWorkflowContext<'ctx> {
135 key: String,
136 random_seed: u64,
137 #[cfg(feature = "rand")]
138 std_rng: rand::prelude::StdRng,
139 headers: HeaderMap,
140 pub(crate) inner: &'ctx ContextInternal,
141}
142
143impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedWorkflowContext<'ctx> {
144 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
145 Self {
146 key: value.1.key,
147 random_seed: value.1.random_seed,
148 #[cfg(feature = "rand")]
149 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
150 headers: value.1.headers,
151 inner: value.0,
152 }
153 }
154}
155
156impl<'ctx> SharedWorkflowContext<'ctx> {
157 /// Get workflow key.
158 pub fn key(&self) -> &str {
159 &self.key
160 }
161
162 /// Get request headers.
163 pub fn headers(&self) -> &HeaderMap {
164 &self.headers
165 }
166
167 /// Get request headers.
168 pub fn headers_mut(&mut self) -> &HeaderMap {
169 &mut self.headers
170 }
171}
172
173/// Workflow handler context.
174pub struct WorkflowContext<'ctx> {
175 key: String,
176 random_seed: u64,
177 #[cfg(feature = "rand")]
178 std_rng: rand::prelude::StdRng,
179 headers: HeaderMap,
180 pub(crate) inner: &'ctx ContextInternal,
181}
182
183impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for WorkflowContext<'ctx> {
184 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
185 Self {
186 key: value.1.key,
187 random_seed: value.1.random_seed,
188 #[cfg(feature = "rand")]
189 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
190 headers: value.1.headers,
191 inner: value.0,
192 }
193 }
194}
195
196impl<'ctx> WorkflowContext<'ctx> {
197 /// Get workflow key.
198 pub fn key(&self) -> &str {
199 &self.key
200 }
201
202 /// Get request headers.
203 pub fn headers(&self) -> &HeaderMap {
204 &self.headers
205 }
206
207 /// Get request headers.
208 pub fn headers_mut(&mut self) -> &HeaderMap {
209 &mut self.headers
210 }
211}
212
213///
214/// # Scheduling & Timers
215/// The Restate SDK includes durable timers.
216/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time.
217/// These timers are resilient to failures and restarts.
218/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts.
219///
220/// ## Scheduling Async Tasks
221///
222/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after).
223///
224/// ## Durable sleep
225/// To sleep in a Restate application for ten seconds, do the following:
226///
227/// ```rust,no_run
228/// # use restate_sdk::prelude::*;
229/// # use std::convert::Infallible;
230/// # use std::time::Duration;
231/// #
232/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
233/// ctx.sleep(Duration::from_secs(10)).await?;
234/// # Ok(())
235/// # }
236/// ```
237///
238/// **Cost savings on FaaS**:
239/// Restate suspends the handler while it is sleeping, to free up resources.
240/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping.
241///
242/// **Sleeping in Virtual Objects**:
243/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping.
244///
245/// <details>
246/// <summary>Clock synchronization Restate Server vs. SDK</summary>
247///
248/// The Restate SDK calculates the wake-up time based on the delay you specify.
249/// The Restate Server then uses this calculated time to wake up the handler.
250/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate.
251/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized.
252/// </details>
253pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> {
254 /// Sleep using Restate
255 fn sleep(
256 &self,
257 duration: Duration,
258 ) -> impl DurableFuture<Output = Result<(), TerminalError>> + 'ctx {
259 private::SealedContext::inner_context(self).sleep(duration)
260 }
261}
262
263impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
264
265/// # Service Communication
266///
267/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response.
268///
269/// ## Request-response calls
270///
271/// Request-response calls are requests where the client waits for the response.
272///
273/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way:
274///
275/// ```rust,no_run
276/// # #[path = "../../examples/services/mod.rs"]
277/// # mod services;
278/// # use services::my_virtual_object::MyVirtualObjectClient;
279/// # use services::my_service::MyServiceClient;
280/// # use services::my_workflow::MyWorkflowClient;
281/// # use restate_sdk::prelude::*;
282/// #
283/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
284/// // To a Service:
285/// let service_response = ctx
286/// .service_client::<MyServiceClient>()
287/// .my_handler(String::from("Hi!"))
288/// .call()
289/// .await?;
290///
291/// // To a Virtual Object:
292/// let object_response = ctx
293/// .object_client::<MyVirtualObjectClient>("Mary")
294/// .my_handler(String::from("Hi!"))
295/// .call()
296/// .await?;
297///
298/// // To a Workflow:
299/// let workflow_result = ctx
300/// .workflow_client::<MyWorkflowClient>("my-workflow-id")
301/// .run(String::from("Hi!"))
302/// .call()
303/// .await?;
304/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
305/// .interact_with_workflow()
306/// .call()
307/// .await?;
308/// # Ok(())
309/// # }
310/// ```
311///
312/// 1. **Create a service client**.
313/// - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`.
314/// - For Workflows, you need to supply a workflow ID that is unique per workflow execution.
315/// 2. **Specify the handler** you want to call and supply the request.
316/// 3. **Await** the call to retrieve the response.
317///
318/// **No need for manual retry logic**:
319/// Restate proxies all the calls and logs them in the journal.
320/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here.
321///
322/// ## Sending messages
323///
324/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows:
325///
326/// ```rust,no_run
327/// # #[path = "../../examples/services/mod.rs"]
328/// # mod services;
329/// # use services::my_virtual_object::MyVirtualObjectClient;
330/// # use services::my_service::MyServiceClient;
331/// # use services::my_workflow::MyWorkflowClient;
332/// # use restate_sdk::prelude::*;
333/// #
334/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
335/// // To a Service:
336/// ctx.service_client::<MyServiceClient>()
337/// .my_handler(String::from("Hi!"))
338/// .send();
339///
340/// // To a Virtual Object:
341/// ctx.object_client::<MyVirtualObjectClient>("Mary")
342/// .my_handler(String::from("Hi!"))
343/// .send();
344///
345/// // To a Workflow:
346/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
347/// .run(String::from("Hi!"))
348/// .send();
349/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
350/// .interact_with_workflow()
351/// .send();
352/// # Ok(())
353/// # }
354/// ```
355///
356/// **No need for message queues**:
357/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery.
358/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed.
359///
360/// ## Delayed calls
361///
362/// A delayed call is a one-way call that gets executed after a specified delay.
363///
364/// To schedule a delayed call, send a message with a delay parameter, as follows:
365///
366/// ```rust,no_run
367/// # #[path = "../../examples/services/mod.rs"]
368/// # mod services;
369/// # use services::my_virtual_object::MyVirtualObjectClient;
370/// # use services::my_service::MyServiceClient;
371/// # use services::my_workflow::MyWorkflowClient;
372/// # use restate_sdk::prelude::*;
373/// # use std::time::Duration;
374/// #
375/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
376/// // To a Service:
377/// ctx.service_client::<MyServiceClient>()
378/// .my_handler(String::from("Hi!"))
379/// .send_after(Duration::from_millis(5000));
380///
381/// // To a Virtual Object:
382/// ctx.object_client::<MyVirtualObjectClient>("Mary")
383/// .my_handler(String::from("Hi!"))
384/// .send_after(Duration::from_millis(5000));
385///
386/// // To a Workflow:
387/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
388/// .run(String::from("Hi!"))
389/// .send_after(Duration::from_millis(5000));
390/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
391/// .interact_with_workflow()
392/// .send_after(Duration::from_millis(5000));
393/// # Ok(())
394/// # }
395/// ```
396///
397/// You can also use this functionality to schedule async tasks.
398/// Restate will make sure the task gets executed at the desired time.
399///
400/// ### Ordering guarantees in Virtual Objects
401/// Invocations to a Virtual Object are executed serially.
402/// Invocations will execute in the same order in which they arrive at Restate.
403/// For example, assume a handler calls the same Virtual Object twice:
404///
405/// ```rust,no_run
406/// # #[path = "../../examples/services/my_virtual_object.rs"]
407/// # mod my_virtual_object;
408/// # use my_virtual_object::MyVirtualObjectClient;
409/// # use restate_sdk::prelude::*;
410/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
411/// ctx.object_client::<MyVirtualObjectClient>("Mary")
412/// .my_handler(String::from("I'm call A!"))
413/// .send();
414/// ctx.object_client::<MyVirtualObjectClient>("Mary")
415/// .my_handler(String::from("I'm call B!"))
416/// .send();
417/// # Ok(())
418/// }
419/// ```
420///
421/// It is guaranteed that call A will execute before call B.
422/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls.
423///
424/// ### Deadlocks with Virtual Objects
425/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests.
426/// Some example cases:
427/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys.
428/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again.
429///
430/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations).
431///
432pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
433 /// Create a [`Request`].
434 fn request<Req, Res>(
435 &self,
436 request_target: RequestTarget,
437 req: Req,
438 ) -> Request<'ctx, Req, Res> {
439 Request::new(self.inner_context(), request_target, req)
440 }
441
442 /// Create an [`InvocationHandle`] from an invocation id.
443 fn invocation_handle(&self, invocation_id: String) -> impl InvocationHandle + 'ctx {
444 self.inner_context().invocation_handle(invocation_id)
445 }
446
447 /// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
448 ///
449 /// ```rust,no_run
450 /// # use std::time::Duration;
451 /// # use restate_sdk::prelude::*;
452 ///
453 /// #[restate_sdk::service]
454 /// trait MyService {
455 /// async fn handle() -> HandlerResult<()>;
456 /// }
457 ///
458 /// # async fn handler(ctx: Context<'_>) {
459 /// let client = ctx.service_client::<MyServiceClient>();
460 ///
461 /// // Do request
462 /// let result = client.handle().call().await;
463 ///
464 /// // Just send the request, don't wait the response
465 /// client.handle().send();
466 ///
467 /// // Schedule the request to be executed later
468 /// client.handle().send_after(Duration::from_secs(60));
469 /// # }
470 /// ```
471 fn service_client<C>(&self) -> C
472 where
473 C: IntoServiceClient<'ctx>,
474 {
475 C::create_client(self.inner_context())
476 }
477
478 /// Create an object client. The object client is generated by the [`restate_sdk_macros::object`] macro with the same name of the trait suffixed with `Client`.
479 ///
480 /// ```rust,no_run
481 /// # use std::time::Duration;
482 /// # use restate_sdk::prelude::*;
483 ///
484 /// #[restate_sdk::object]
485 /// trait MyObject {
486 /// async fn handle() -> HandlerResult<()>;
487 /// }
488 ///
489 /// # async fn handler(ctx: Context<'_>) {
490 /// let client = ctx.object_client::<MyObjectClient>("my-key");
491 ///
492 /// // Do request
493 /// let result = client.handle().call().await;
494 ///
495 /// // Just send the request, don't wait the response
496 /// client.handle().send();
497 ///
498 /// // Schedule the request to be executed later
499 /// client.handle().send_after(Duration::from_secs(60));
500 /// # }
501 /// ```
502 fn object_client<C>(&self, key: impl Into<String>) -> C
503 where
504 C: IntoObjectClient<'ctx>,
505 {
506 C::create_client(self.inner_context(), key.into())
507 }
508
509 /// Create an workflow client. The workflow client is generated by the [`restate_sdk_macros::workflow`] macro with the same name of the trait suffixed with `Client`.
510 ///
511 /// ```rust,no_run
512 /// # use std::time::Duration;
513 /// # use restate_sdk::prelude::*;
514 ///
515 /// #[restate_sdk::workflow]
516 /// trait MyWorkflow {
517 /// async fn handle() -> HandlerResult<()>;
518 /// }
519 ///
520 /// # async fn handler(ctx: Context<'_>) {
521 /// let client = ctx.workflow_client::<MyWorkflowClient>("my-key");
522 ///
523 /// // Do request
524 /// let result = client.handle().call().await;
525 ///
526 /// // Just send the request, don't wait the response
527 /// client.handle().send();
528 ///
529 /// // Schedule the request to be executed later
530 /// client.handle().send_after(Duration::from_secs(60));
531 /// # }
532 /// ```
533 fn workflow_client<C>(&self, key: impl Into<String>) -> C
534 where
535 C: IntoWorkflowClient<'ctx>,
536 {
537 C::create_client(self.inner_context(), key.into())
538 }
539}
540
541/// Trait used by codegen to create the service client.
542pub trait IntoServiceClient<'ctx>: Sized {
543 fn create_client(ctx: &'ctx ContextInternal) -> Self;
544}
545
546/// Trait used by codegen to use the object client.
547pub trait IntoObjectClient<'ctx>: Sized {
548 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
549}
550
551/// Trait used by codegen to use the workflow client.
552pub trait IntoWorkflowClient<'ctx>: Sized {
553 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
554}
555
556impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {}
557
558/// # Awakeables
559///
560/// Awakeables pause an invocation while waiting for another process to complete a task.
561/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result.
562/// This pattern is also known as the callback (task token) pattern.
563///
564/// ## Creating awakeables
565///
566/// 1. **Create an awakeable**. This contains a String identifier and a Promise.
567/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...).
568/// For example, send an HTTP request to a service that executes the task, and attach the ID to the payload.
569/// You use `ctx.run` to avoid re-triggering the task on retries.
570/// 3. **Wait** until the other process has executed the task.
571/// The handler **receives the payload and resumes**.
572///
573/// ```rust,no_run
574/// # use restate_sdk::prelude::*;
575/// #
576/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
577/// /// 1. Create an awakeable
578/// let (id, promise) = ctx.awakeable::<String>();
579///
580/// /// 2. Trigger a task
581/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?;
582///
583/// /// 3. Wait for the promise to be resolved
584/// let payload = promise.await?;
585/// # Ok(())
586/// # }
587/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{
588/// # Ok(())
589/// # }
590/// ```
591///
592///
593/// ## Completing awakeables
594///
595/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it
596/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler.
597///
598/// - Resolving over HTTP with its ID and an optional payload:
599///
600/// ```text
601/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve
602/// -H 'content-type: application/json'
603/// -d '{"hello": "world"}'
604/// ```
605///
606/// - Rejecting over HTTP with its ID and a reason:
607///
608/// ```text
609/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject
610/// -H 'content-type: text/plain' \
611/// -d 'Very bad error!'
612/// ```
613///
614/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason:
615///
616/// ```rust,no_run
617/// # use restate_sdk::prelude::*;
618/// #
619/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> {
620/// // Resolve the awakeable
621/// ctx.resolve_awakeable(&id, "hello".to_string());
622///
623/// // Or reject the awakeable
624/// ctx.reject_awakeable(&id, TerminalError::new("my error reason"));
625/// # Ok(())
626/// # }
627/// ```
628///
629/// For more info about serialization of the payload, see [crate::serde]
630///
631/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed.
632/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return.
633///
634/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved.
635pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
636 /// Create an awakeable
637 fn awakeable<T: Deserialize + 'static>(
638 &self,
639 ) -> (
640 String,
641 impl DurableFuture<Output = Result<T, TerminalError>> + Send + 'ctx,
642 ) {
643 self.inner_context().awakeable()
644 }
645
646 /// Resolve an awakeable
647 fn resolve_awakeable<T: Serialize + 'static>(&self, key: &str, t: T) {
648 self.inner_context().resolve_awakeable(key, t)
649 }
650
651 /// Resolve an awakeable
652 fn reject_awakeable(&self, key: &str, failure: TerminalError) {
653 self.inner_context().reject_awakeable(key, failure)
654 }
655}
656
657impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
658
659/// # Journaling Results
660///
661/// Restate uses an execution log for replay after failures and suspensions.
662/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log.
663/// The SDK offers some functionalities to help you with this:
664/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries.
665/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
666/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
667///
668pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
669 /// ## Journaled actions
670 /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc).
671 /// Restate replays the result instead of re-executing the operation on retries.
672 ///
673 /// Here is an example of a database request for which the string response is stored in Restate:
674 /// ```rust,no_run
675 /// # use restate_sdk::prelude::*;
676 /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
677 /// let response = ctx.run(|| do_db_request()).await?;
678 /// # Ok(())
679 /// # }
680 /// # async fn do_db_request() -> Result<String, HandlerError>{
681 /// # Ok("Hello".to_string())
682 /// # }
683 /// ```
684 ///
685 /// You cannot use the Restate context within `ctx.run`.
686 /// This includes actions such as getting state, calling another service, and nesting other journaled actions.
687 ///
688 /// For more info about serialization of the return values, see [crate::serde].
689 ///
690 /// You can configure the retry policy for the `ctx.run` block:
691 /// ```rust,no_run
692 /// # use std::time::Duration;
693 /// # use restate_sdk::prelude::*;
694 /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
695 /// let my_run_retry_policy = RunRetryPolicy::default()
696 /// .initial_delay(Duration::from_millis(100))
697 /// .exponentiation_factor(2.0)
698 /// .max_delay(Duration::from_millis(1000))
699 /// .max_attempts(10)
700 /// .max_duration(Duration::from_secs(10));
701 /// ctx.run(|| write_to_other_system())
702 /// .retry_policy(my_run_retry_policy)
703 /// .await
704 /// .map_err(|e| {
705 /// // Handle the terminal error after retries exhausted
706 /// // For example, undo previous actions (see sagas guide) and
707 /// // propagate the error back to the caller
708 /// e
709 /// })?;
710 /// # Ok(())
711 /// # }
712 /// # async fn write_to_other_system() -> Result<String, HandlerError>{
713 /// # Ok("Hello".to_string())
714 /// # }
715 /// ```
716 ///
717 /// This way you can override the default retry behavior of your Restate service for specific operations.
718 /// Have a look at [`RunFuture::retry_policy`] for more information.
719 ///
720 /// If you set a maximum number of attempts, then the `ctx.run` block will fail with a [TerminalError] once the retries are exhausted.
721 /// Have a look at the [Sagas guide](https://docs.restate.dev/guides/sagas) to learn how to undo previous actions of the handler to keep the system in a consistent state.
722 ///
723 /// **Caution: Immediately await journaled actions:**
724 /// Always immediately await `ctx.run`, before doing any other context calls.
725 /// If not, you might bump into non-determinism errors during replay,
726 /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way.
727 ///
728 #[must_use]
729 fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
730 where
731 R: RunClosure<Fut = F, Output = T> + Send + 'ctx,
732 F: Future<Output = HandlerResult<T>> + Send + 'ctx,
733 T: Serialize + Deserialize + 'static,
734 {
735 self.inner_context().run(run_closure)
736 }
737
738 /// Return a random seed inherently predictable, based on the invocation id, which is not secret.
739 ///
740 /// This value is stable during the invocation lifecycle, thus across retries.
741 fn random_seed(&self) -> u64 {
742 private::SealedContext::random_seed(self)
743 }
744
745 /// ### Generating random numbers
746 ///
747 /// Return a [`rand::Rng`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`].
748 ///
749 /// For example, you can use this to generate a random number:
750 ///
751 /// ```rust,no_run
752 /// # use restate_sdk::prelude::*;
753 /// # use rand::Rng;
754 /// async fn rand_generate(mut ctx: Context<'_>) {
755 /// let x: u32 = ctx.rand().gen();
756 /// # }
757 /// ```
758 ///
759 /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options.
760 /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`].
761 #[cfg(feature = "rand")]
762 fn rand(&mut self) -> &mut rand::prelude::StdRng {
763 private::SealedContext::rand(self)
764 }
765 /// ### Generating UUIDs
766 ///
767 /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`].
768 ///
769 /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries.
770 ///
771 /// Do not use this in cryptographic contexts.
772 ///
773 /// ```rust,no_run
774 /// # use restate_sdk::prelude::*;
775 /// # use uuid::Uuid;
776 /// # async fn uuid_generate(mut ctx: Context<'_>) {
777 /// let uuid: Uuid = ctx.rand_uuid();
778 /// # }
779 /// ```
780 #[cfg(all(feature = "rand", feature = "uuid"))]
781 fn rand_uuid(&mut self) -> uuid::Uuid {
782 let rand = private::SealedContext::rand(self);
783 uuid::Uuid::from_u64_pair(rand::RngCore::next_u64(rand), rand::RngCore::next_u64(rand))
784 }
785}
786
787impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {}
788
789/// # Reading state
790/// You can store key-value state in Restate.
791/// Restate makes sure the state is consistent with the processing of the code execution.
792///
793/// **This feature is only available for Virtual Objects and Workflows:**
794/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
795/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
796///
797/// ```rust,no_run
798/// # use restate_sdk::prelude::*;
799/// #
800/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
801/// /// 1. Listing state keys
802/// /// List all the state keys that have entries in the state store for this object, via:
803/// let keys = ctx.get_keys().await?;
804///
805/// /// 2. Getting a state value
806/// let my_string = ctx
807/// .get::<String>("my-key")
808/// .await?
809/// .unwrap_or(String::from("my-default"));
810/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
811///
812/// /// 3. Setting a state value
813/// ctx.set("my-key", String::from("my-value"));
814///
815/// /// 4. Clearing a state value
816/// ctx.clear("my-key");
817///
818/// /// 5. Clearing all state values
819/// /// Deletes all the state in Restate for the object
820/// ctx.clear_all();
821/// # Ok(())
822/// # }
823/// ```
824///
825/// For more info about serialization, see [crate::serde]
826///
827/// ### Command-line introspection
828/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
829/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
830///
831pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> {
832 /// Get state
833 fn get<T: Deserialize + 'static>(
834 &self,
835 key: &str,
836 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
837 self.inner_context().get(key)
838 }
839
840 /// Get state keys
841 fn get_keys(&self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + 'ctx {
842 self.inner_context().get_keys()
843 }
844}
845
846impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> ContextReadState<'ctx>
847 for CTX
848{
849}
850
851/// # Writing State
852/// You can store key-value state in Restate.
853/// Restate makes sure the state is consistent with the processing of the code execution.
854///
855/// **This feature is only available for Virtual Objects and Workflows:**
856/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
857/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
858///
859/// ```rust,no_run
860/// # use restate_sdk::prelude::*;
861/// #
862/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
863/// /// 1. Listing state keys
864/// /// List all the state keys that have entries in the state store for this object, via:
865/// let keys = ctx.get_keys().await?;
866///
867/// /// 2. Getting a state value
868/// let my_string = ctx
869/// .get::<String>("my-key")
870/// .await?
871/// .unwrap_or(String::from("my-default"));
872/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
873///
874/// /// 3. Setting a state value
875/// ctx.set("my-key", String::from("my-value"));
876///
877/// /// 4. Clearing a state value
878/// ctx.clear("my-key");
879///
880/// /// 5. Clearing all state values
881/// /// Deletes all the state in Restate for the object
882/// ctx.clear_all();
883/// # Ok(())
884/// # }
885/// ```
886///
887/// For more info about serialization, see [crate::serde]
888///
889/// ## Command-line introspection
890/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
891/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
892///
893pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> {
894 /// Set state
895 fn set<T: Serialize + 'static>(&self, key: &str, t: T) {
896 self.inner_context().set(key, t)
897 }
898
899 /// Clear state
900 fn clear(&self, key: &str) {
901 self.inner_context().clear(key)
902 }
903
904 /// Clear all state
905 fn clear_all(&self) {
906 self.inner_context().clear_all()
907 }
908}
909
910impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanWriteState> ContextWriteState<'ctx>
911 for CTX
912{
913}
914
915/// Trait exposing Restate promise functionalities.
916///
917/// A promise is a durable, distributed version of a Rust oneshot channel.
918/// Restate keeps track of the promises across restarts/failures.
919///
920/// You can use this feature to implement interaction between different workflow handlers, e.g. to
921/// send a signal from a shared handler to the workflow handler.
922pub trait ContextPromises<'ctx>: private::SealedContext<'ctx> {
923 /// Create a promise
924 fn promise<T: Deserialize + 'static>(
925 &'ctx self,
926 key: &str,
927 ) -> impl DurableFuture<Output = Result<T, TerminalError>> + 'ctx {
928 self.inner_context().promise(key)
929 }
930
931 /// Peek a promise
932 fn peek_promise<T: Deserialize + 'static>(
933 &self,
934 key: &str,
935 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
936 self.inner_context().peek_promise(key)
937 }
938
939 /// Resolve a promise
940 fn resolve_promise<T: Serialize + 'static>(&self, key: &str, t: T) {
941 self.inner_context().resolve_promise(key, t)
942 }
943
944 /// Resolve a promise
945 fn reject_promise(&self, key: &str, failure: TerminalError) {
946 self.inner_context().reject_promise(key, failure)
947 }
948}
949
950impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanUsePromises> ContextPromises<'ctx>
951 for CTX
952{
953}
954
955pub trait DurableFuture: Future + macro_support::SealedDurableFuture {}
956
957pub(crate) mod private {
958 use super::*;
959 pub trait SealedContext<'ctx> {
960 fn inner_context(&self) -> &'ctx ContextInternal;
961
962 fn random_seed(&self) -> u64;
963
964 #[cfg(feature = "rand")]
965 fn rand(&mut self) -> &mut rand::prelude::StdRng;
966 }
967
968 // Context capabilities
969 pub trait SealedCanReadState {}
970 pub trait SealedCanWriteState {}
971 pub trait SealedCanUsePromises {}
972
973 impl<'ctx> SealedContext<'ctx> for Context<'ctx> {
974 fn inner_context(&self) -> &'ctx ContextInternal {
975 self.inner
976 }
977
978 fn random_seed(&self) -> u64 {
979 self.random_seed
980 }
981
982 #[cfg(feature = "rand")]
983 fn rand(&mut self) -> &mut rand::prelude::StdRng {
984 &mut self.std_rng
985 }
986 }
987
988 impl<'ctx> SealedContext<'ctx> for SharedObjectContext<'ctx> {
989 fn inner_context(&self) -> &'ctx ContextInternal {
990 self.inner
991 }
992
993 fn random_seed(&self) -> u64 {
994 self.random_seed
995 }
996
997 #[cfg(feature = "rand")]
998 fn rand(&mut self) -> &mut rand::prelude::StdRng {
999 &mut self.std_rng
1000 }
1001 }
1002
1003 impl SealedCanReadState for SharedObjectContext<'_> {}
1004
1005 impl<'ctx> SealedContext<'ctx> for ObjectContext<'ctx> {
1006 fn inner_context(&self) -> &'ctx ContextInternal {
1007 self.inner
1008 }
1009
1010 fn random_seed(&self) -> u64 {
1011 self.random_seed
1012 }
1013
1014 #[cfg(feature = "rand")]
1015 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1016 &mut self.std_rng
1017 }
1018 }
1019
1020 impl SealedCanReadState for ObjectContext<'_> {}
1021 impl SealedCanWriteState for ObjectContext<'_> {}
1022
1023 impl<'ctx> SealedContext<'ctx> for SharedWorkflowContext<'ctx> {
1024 fn inner_context(&self) -> &'ctx ContextInternal {
1025 self.inner
1026 }
1027
1028 fn random_seed(&self) -> u64 {
1029 self.random_seed
1030 }
1031
1032 #[cfg(feature = "rand")]
1033 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1034 &mut self.std_rng
1035 }
1036 }
1037
1038 impl SealedCanReadState for SharedWorkflowContext<'_> {}
1039 impl SealedCanUsePromises for SharedWorkflowContext<'_> {}
1040
1041 impl<'ctx> SealedContext<'ctx> for WorkflowContext<'ctx> {
1042 fn inner_context(&self) -> &'ctx ContextInternal {
1043 self.inner
1044 }
1045
1046 fn random_seed(&self) -> u64 {
1047 self.random_seed
1048 }
1049
1050 #[cfg(feature = "rand")]
1051 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1052 &mut self.std_rng
1053 }
1054 }
1055
1056 impl SealedCanReadState for WorkflowContext<'_> {}
1057 impl SealedCanWriteState for WorkflowContext<'_> {}
1058 impl SealedCanUsePromises for WorkflowContext<'_> {}
1059}