restate_sdk/context/mod.rs
1//! Types exposing Restate functionalities to service handlers.
2
3use crate::endpoint::{ContextInternal, InputMetadata};
4use crate::errors::{HandlerResult, TerminalError};
5use crate::serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8
9#[doc(hidden)]
10pub mod macro_support;
11mod request;
12mod run;
13mod select;
14
15pub use request::{CallFuture, InvocationHandle, Request, RequestTarget};
16pub use run::{RunClosure, RunFuture, RunRetryPolicy};
17
18pub type HeaderMap = http::HeaderMap<String>;
19
20/// Service handler context.
21pub struct Context<'ctx> {
22 invocation_id: String,
23 random_seed: u64,
24 #[cfg(feature = "rand")]
25 std_rng: rand::prelude::StdRng,
26 headers: HeaderMap,
27 inner: &'ctx ContextInternal,
28}
29
30impl Context<'_> {
31 /// Get invocation id.
32 pub fn invocation_id(&self) -> &str {
33 &self.invocation_id
34 }
35
36 /// Get request headers.
37 pub fn headers(&self) -> &HeaderMap {
38 &self.headers
39 }
40
41 /// Get request headers.
42 pub fn headers_mut(&mut self) -> &HeaderMap {
43 &mut self.headers
44 }
45}
46
47impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for Context<'ctx> {
48 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
49 Self {
50 invocation_id: value.1.invocation_id,
51 random_seed: value.1.random_seed,
52 #[cfg(feature = "rand")]
53 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
54 headers: value.1.headers,
55 inner: value.0,
56 }
57 }
58}
59
60/// Object shared handler context.
61pub struct SharedObjectContext<'ctx> {
62 invocation_id: String,
63 key: String,
64 random_seed: u64,
65 #[cfg(feature = "rand")]
66 std_rng: rand::prelude::StdRng,
67 headers: HeaderMap,
68 pub(crate) inner: &'ctx ContextInternal,
69}
70
71impl SharedObjectContext<'_> {
72 /// Get invocation id.
73 pub fn invocation_id(&self) -> &str {
74 &self.invocation_id
75 }
76
77 /// Get object key.
78 pub fn key(&self) -> &str {
79 &self.key
80 }
81
82 /// Get request headers.
83 pub fn headers(&self) -> &HeaderMap {
84 &self.headers
85 }
86
87 /// Get request headers.
88 pub fn headers_mut(&mut self) -> &HeaderMap {
89 &mut self.headers
90 }
91}
92
93impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedObjectContext<'ctx> {
94 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
95 Self {
96 invocation_id: value.1.invocation_id,
97 key: value.1.key,
98 random_seed: value.1.random_seed,
99 #[cfg(feature = "rand")]
100 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
101 headers: value.1.headers,
102 inner: value.0,
103 }
104 }
105}
106
107/// Object handler context.
108pub struct ObjectContext<'ctx> {
109 invocation_id: String,
110 key: String,
111 random_seed: u64,
112 #[cfg(feature = "rand")]
113 std_rng: rand::prelude::StdRng,
114 headers: HeaderMap,
115 pub(crate) inner: &'ctx ContextInternal,
116}
117
118impl ObjectContext<'_> {
119 /// Get invocation id.
120 pub fn invocation_id(&self) -> &str {
121 &self.invocation_id
122 }
123
124 /// Get object key.
125 pub fn key(&self) -> &str {
126 &self.key
127 }
128
129 /// Get request headers.
130 pub fn headers(&self) -> &HeaderMap {
131 &self.headers
132 }
133
134 /// Get request headers.
135 pub fn headers_mut(&mut self) -> &HeaderMap {
136 &mut self.headers
137 }
138}
139
140impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for ObjectContext<'ctx> {
141 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
142 Self {
143 invocation_id: value.1.invocation_id,
144 key: value.1.key,
145 random_seed: value.1.random_seed,
146 #[cfg(feature = "rand")]
147 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
148 headers: value.1.headers,
149 inner: value.0,
150 }
151 }
152}
153
154/// Workflow shared handler context.
155pub struct SharedWorkflowContext<'ctx> {
156 invocation_id: String,
157 key: String,
158 random_seed: u64,
159 #[cfg(feature = "rand")]
160 std_rng: rand::prelude::StdRng,
161 headers: HeaderMap,
162 pub(crate) inner: &'ctx ContextInternal,
163}
164
165impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedWorkflowContext<'ctx> {
166 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
167 Self {
168 invocation_id: value.1.invocation_id,
169 key: value.1.key,
170 random_seed: value.1.random_seed,
171 #[cfg(feature = "rand")]
172 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
173 headers: value.1.headers,
174 inner: value.0,
175 }
176 }
177}
178
179impl SharedWorkflowContext<'_> {
180 /// Get invocation id.
181 pub fn invocation_id(&self) -> &str {
182 &self.invocation_id
183 }
184
185 /// Get workflow key.
186 pub fn key(&self) -> &str {
187 &self.key
188 }
189
190 /// Get request headers.
191 pub fn headers(&self) -> &HeaderMap {
192 &self.headers
193 }
194
195 /// Get request headers.
196 pub fn headers_mut(&mut self) -> &HeaderMap {
197 &mut self.headers
198 }
199}
200
201/// Workflow handler context.
202pub struct WorkflowContext<'ctx> {
203 invocation_id: String,
204 key: String,
205 random_seed: u64,
206 #[cfg(feature = "rand")]
207 std_rng: rand::prelude::StdRng,
208 headers: HeaderMap,
209 pub(crate) inner: &'ctx ContextInternal,
210}
211
212impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for WorkflowContext<'ctx> {
213 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
214 Self {
215 invocation_id: value.1.invocation_id,
216 key: value.1.key,
217 random_seed: value.1.random_seed,
218 #[cfg(feature = "rand")]
219 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
220 headers: value.1.headers,
221 inner: value.0,
222 }
223 }
224}
225
226impl WorkflowContext<'_> {
227 /// Get invocation id.
228 pub fn invocation_id(&self) -> &str {
229 &self.invocation_id
230 }
231
232 /// Get workflow key.
233 pub fn key(&self) -> &str {
234 &self.key
235 }
236
237 /// Get request headers.
238 pub fn headers(&self) -> &HeaderMap {
239 &self.headers
240 }
241
242 /// Get request headers.
243 pub fn headers_mut(&mut self) -> &HeaderMap {
244 &mut self.headers
245 }
246}
247
248///
249/// # Scheduling & Timers
250/// The Restate SDK includes durable timers.
251/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time.
252/// These timers are resilient to failures and restarts.
253/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts.
254///
255/// ## Scheduling Async Tasks
256///
257/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after).
258///
259/// ## Durable sleep
260/// To sleep in a Restate application for ten seconds, do the following:
261///
262/// ```rust,no_run
263/// # use restate_sdk::prelude::*;
264/// # use std::convert::Infallible;
265/// # use std::time::Duration;
266/// #
267/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
268/// ctx.sleep(Duration::from_secs(10)).await?;
269/// # Ok(())
270/// # }
271/// ```
272///
273/// **Cost savings on FaaS**:
274/// Restate suspends the handler while it is sleeping, to free up resources.
275/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping.
276///
277/// **Sleeping in Virtual Objects**:
278/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping.
279///
280/// <details>
281/// <summary>Clock synchronization Restate Server vs. SDK</summary>
282///
283/// The Restate SDK calculates the wake-up time based on the delay you specify.
284/// The Restate Server then uses this calculated time to wake up the handler.
285/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate.
286/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized.
287/// </details>
288pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> {
289 /// Sleep using Restate
290 fn sleep(
291 &self,
292 duration: Duration,
293 ) -> impl DurableFuture<Output = Result<(), TerminalError>> + 'ctx {
294 private::SealedContext::inner_context(self).sleep(duration)
295 }
296}
297
298impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
299
300/// # Service Communication
301///
302/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response.
303///
304/// ## Request-response calls
305///
306/// Request-response calls are requests where the client waits for the response.
307///
308/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way:
309///
310/// ```rust,no_run
311/// # #[path = "../../examples/services/mod.rs"]
312/// # mod services;
313/// # use services::my_virtual_object::MyVirtualObjectClient;
314/// # use services::my_service::MyServiceClient;
315/// # use services::my_workflow::MyWorkflowClient;
316/// # use restate_sdk::prelude::*;
317/// #
318/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
319/// // To a Service:
320/// let service_response = ctx
321/// .service_client::<MyServiceClient>()
322/// .my_handler(String::from("Hi!"))
323/// .call()
324/// .await?;
325///
326/// // To a Virtual Object:
327/// let object_response = ctx
328/// .object_client::<MyVirtualObjectClient>("Mary")
329/// .my_handler(String::from("Hi!"))
330/// .call()
331/// .await?;
332///
333/// // To a Workflow:
334/// let workflow_result = ctx
335/// .workflow_client::<MyWorkflowClient>("my-workflow-id")
336/// .run(String::from("Hi!"))
337/// .call()
338/// .await?;
339/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
340/// .interact_with_workflow()
341/// .call()
342/// .await?;
343/// # Ok(())
344/// # }
345/// ```
346///
347/// 1. **Create a service client**.
348/// - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`.
349/// - For Workflows, you need to supply a workflow ID that is unique per workflow execution.
350/// 2. **Specify the handler** you want to call and supply the request.
351/// 3. **Await** the call to retrieve the response.
352///
353/// **No need for manual retry logic**:
354/// Restate proxies all the calls and logs them in the journal.
355/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here.
356///
357/// ## Sending messages
358///
359/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows:
360///
361/// ```rust,no_run
362/// # #[path = "../../examples/services/mod.rs"]
363/// # mod services;
364/// # use services::my_virtual_object::MyVirtualObjectClient;
365/// # use services::my_service::MyServiceClient;
366/// # use services::my_workflow::MyWorkflowClient;
367/// # use restate_sdk::prelude::*;
368/// #
369/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
370/// // To a Service:
371/// ctx.service_client::<MyServiceClient>()
372/// .my_handler(String::from("Hi!"))
373/// .send();
374///
375/// // To a Virtual Object:
376/// ctx.object_client::<MyVirtualObjectClient>("Mary")
377/// .my_handler(String::from("Hi!"))
378/// .send();
379///
380/// // To a Workflow:
381/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
382/// .run(String::from("Hi!"))
383/// .send();
384/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
385/// .interact_with_workflow()
386/// .send();
387/// # Ok(())
388/// # }
389/// ```
390///
391/// **No need for message queues**:
392/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery.
393/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed.
394///
395/// ## Delayed calls
396///
397/// A delayed call is a one-way call that gets executed after a specified delay.
398///
399/// To schedule a delayed call, send a message with a delay parameter, as follows:
400///
401/// ```rust,no_run
402/// # #[path = "../../examples/services/mod.rs"]
403/// # mod services;
404/// # use services::my_virtual_object::MyVirtualObjectClient;
405/// # use services::my_service::MyServiceClient;
406/// # use services::my_workflow::MyWorkflowClient;
407/// # use restate_sdk::prelude::*;
408/// # use std::time::Duration;
409/// #
410/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
411/// // To a Service:
412/// ctx.service_client::<MyServiceClient>()
413/// .my_handler(String::from("Hi!"))
414/// .send_after(Duration::from_millis(5000));
415///
416/// // To a Virtual Object:
417/// ctx.object_client::<MyVirtualObjectClient>("Mary")
418/// .my_handler(String::from("Hi!"))
419/// .send_after(Duration::from_millis(5000));
420///
421/// // To a Workflow:
422/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
423/// .run(String::from("Hi!"))
424/// .send_after(Duration::from_millis(5000));
425/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
426/// .interact_with_workflow()
427/// .send_after(Duration::from_millis(5000));
428/// # Ok(())
429/// # }
430/// ```
431///
432/// You can also use this functionality to schedule async tasks.
433/// Restate will make sure the task gets executed at the desired time.
434///
435/// ### Ordering guarantees in Virtual Objects
436/// Invocations to a Virtual Object are executed serially.
437/// Invocations will execute in the same order in which they arrive at Restate.
438/// For example, assume a handler calls the same Virtual Object twice:
439///
440/// ```rust,no_run
441/// # #[path = "../../examples/services/my_virtual_object.rs"]
442/// # mod my_virtual_object;
443/// # use my_virtual_object::MyVirtualObjectClient;
444/// # use restate_sdk::prelude::*;
445/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
446/// ctx.object_client::<MyVirtualObjectClient>("Mary")
447/// .my_handler(String::from("I'm call A!"))
448/// .send();
449/// ctx.object_client::<MyVirtualObjectClient>("Mary")
450/// .my_handler(String::from("I'm call B!"))
451/// .send();
452/// # Ok(())
453/// }
454/// ```
455///
456/// It is guaranteed that call A will execute before call B.
457/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls.
458///
459/// ### Deadlocks with Virtual Objects
460/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests.
461/// Some example cases:
462/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys.
463/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again.
464///
465/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations).
466///
467pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
468 /// Create a [`Request`].
469 fn request<Req, Res>(
470 &self,
471 request_target: RequestTarget,
472 req: Req,
473 ) -> Request<'ctx, Req, Res> {
474 Request::new(self.inner_context(), request_target, req)
475 }
476
477 /// Create an [`InvocationHandle`] from an invocation id.
478 fn invocation_handle(&self, invocation_id: String) -> impl InvocationHandle + 'ctx {
479 self.inner_context().invocation_handle(invocation_id)
480 }
481
482 /// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
483 ///
484 /// ```rust,no_run
485 /// # use std::time::Duration;
486 /// # use restate_sdk::prelude::*;
487 ///
488 /// #[restate_sdk::service]
489 /// trait MyService {
490 /// async fn handle() -> HandlerResult<()>;
491 /// }
492 ///
493 /// # async fn handler(ctx: Context<'_>) {
494 /// let client = ctx.service_client::<MyServiceClient>();
495 ///
496 /// // Do request
497 /// let result = client.handle().call().await;
498 ///
499 /// // Just send the request, don't wait the response
500 /// client.handle().send();
501 ///
502 /// // Schedule the request to be executed later
503 /// client.handle().send_after(Duration::from_secs(60));
504 /// # }
505 /// ```
506 fn service_client<C>(&self) -> C
507 where
508 C: IntoServiceClient<'ctx>,
509 {
510 C::create_client(self.inner_context())
511 }
512
513 /// Create an object client. The object client is generated by the [`restate_sdk_macros::object`] macro with the same name of the trait suffixed with `Client`.
514 ///
515 /// ```rust,no_run
516 /// # use std::time::Duration;
517 /// # use restate_sdk::prelude::*;
518 ///
519 /// #[restate_sdk::object]
520 /// trait MyObject {
521 /// async fn handle() -> HandlerResult<()>;
522 /// }
523 ///
524 /// # async fn handler(ctx: Context<'_>) {
525 /// let client = ctx.object_client::<MyObjectClient>("my-key");
526 ///
527 /// // Do request
528 /// let result = client.handle().call().await;
529 ///
530 /// // Just send the request, don't wait the response
531 /// client.handle().send();
532 ///
533 /// // Schedule the request to be executed later
534 /// client.handle().send_after(Duration::from_secs(60));
535 /// # }
536 /// ```
537 fn object_client<C>(&self, key: impl Into<String>) -> C
538 where
539 C: IntoObjectClient<'ctx>,
540 {
541 C::create_client(self.inner_context(), key.into())
542 }
543
544 /// Create an workflow client. The workflow client is generated by the [`restate_sdk_macros::workflow`] macro with the same name of the trait suffixed with `Client`.
545 ///
546 /// ```rust,no_run
547 /// # use std::time::Duration;
548 /// # use restate_sdk::prelude::*;
549 ///
550 /// #[restate_sdk::workflow]
551 /// trait MyWorkflow {
552 /// async fn handle() -> HandlerResult<()>;
553 /// }
554 ///
555 /// # async fn handler(ctx: Context<'_>) {
556 /// let client = ctx.workflow_client::<MyWorkflowClient>("my-key");
557 ///
558 /// // Do request
559 /// let result = client.handle().call().await;
560 ///
561 /// // Just send the request, don't wait the response
562 /// client.handle().send();
563 ///
564 /// // Schedule the request to be executed later
565 /// client.handle().send_after(Duration::from_secs(60));
566 /// # }
567 /// ```
568 fn workflow_client<C>(&self, key: impl Into<String>) -> C
569 where
570 C: IntoWorkflowClient<'ctx>,
571 {
572 C::create_client(self.inner_context(), key.into())
573 }
574}
575
576/// Trait used by codegen to create the service client.
577pub trait IntoServiceClient<'ctx>: Sized {
578 fn create_client(ctx: &'ctx ContextInternal) -> Self;
579}
580
581/// Trait used by codegen to use the object client.
582pub trait IntoObjectClient<'ctx>: Sized {
583 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
584}
585
586/// Trait used by codegen to use the workflow client.
587pub trait IntoWorkflowClient<'ctx>: Sized {
588 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
589}
590
591impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {}
592
593/// # Awakeables
594///
595/// Awakeables pause an invocation while waiting for another process to complete a task.
596/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result.
597/// This pattern is also known as the callback (task token) pattern.
598///
599/// ## Creating awakeables
600///
601/// 1. **Create an awakeable**. This contains a String identifier and a Promise.
602/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...).
603/// For example, send an HTTP request to a service that executes the task, and attach the ID to the payload.
604/// You use `ctx.run` to avoid re-triggering the task on retries.
605/// 3. **Wait** until the other process has executed the task.
606/// The handler **receives the payload and resumes**.
607///
608/// ```rust,no_run
609/// # use restate_sdk::prelude::*;
610/// #
611/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
612/// /// 1. Create an awakeable
613/// let (id, promise) = ctx.awakeable::<String>();
614///
615/// /// 2. Trigger a task
616/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?;
617///
618/// /// 3. Wait for the promise to be resolved
619/// let payload = promise.await?;
620/// # Ok(())
621/// # }
622/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{
623/// # Ok(())
624/// # }
625/// ```
626///
627///
628/// ## Completing awakeables
629///
630/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it
631/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler.
632///
633/// - Resolving over HTTP with its ID and an optional payload:
634///
635/// ```text
636/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve
637/// -H 'content-type: application/json'
638/// -d '{"hello": "world"}'
639/// ```
640///
641/// - Rejecting over HTTP with its ID and a reason:
642///
643/// ```text
644/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject
645/// -H 'content-type: text/plain' \
646/// -d 'Very bad error!'
647/// ```
648///
649/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason:
650///
651/// ```rust,no_run
652/// # use restate_sdk::prelude::*;
653/// #
654/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> {
655/// // Resolve the awakeable
656/// ctx.resolve_awakeable(&id, "hello".to_string());
657///
658/// // Or reject the awakeable
659/// ctx.reject_awakeable(&id, TerminalError::new("my error reason"));
660/// # Ok(())
661/// # }
662/// ```
663///
664/// For more info about serialization of the payload, see [crate::serde]
665///
666/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed.
667/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return.
668///
669/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved.
670pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
671 /// Create an awakeable
672 fn awakeable<T: Deserialize + 'static>(
673 &self,
674 ) -> (
675 String,
676 impl DurableFuture<Output = Result<T, TerminalError>> + Send + 'ctx,
677 ) {
678 self.inner_context().awakeable()
679 }
680
681 /// Resolve an awakeable
682 fn resolve_awakeable<T: Serialize + 'static>(&self, key: &str, t: T) {
683 self.inner_context().resolve_awakeable(key, t)
684 }
685
686 /// Resolve an awakeable
687 fn reject_awakeable(&self, key: &str, failure: TerminalError) {
688 self.inner_context().reject_awakeable(key, failure)
689 }
690}
691
692impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
693
694/// # Journaling Results
695///
696/// Restate uses an execution log for replay after failures and suspensions.
697/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log.
698/// The SDK offers some functionalities to help you with this:
699/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries.
700/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
701/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
702///
703pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
704 /// ## Journaled actions
705 /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc).
706 /// Restate replays the result instead of re-executing the operation on retries.
707 ///
708 /// Here is an example of a database request for which the string response is stored in Restate:
709 /// ```rust,no_run
710 /// # use restate_sdk::prelude::*;
711 /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
712 /// let response = ctx.run(|| do_db_request()).await?;
713 /// # Ok(())
714 /// # }
715 /// # async fn do_db_request() -> Result<String, HandlerError>{
716 /// # Ok("Hello".to_string())
717 /// # }
718 /// ```
719 ///
720 /// You cannot use the Restate context within `ctx.run`.
721 /// This includes actions such as getting state, calling another service, and nesting other journaled actions.
722 ///
723 /// For more info about serialization of the return values, see [crate::serde].
724 ///
725 /// You can configure the retry policy for the `ctx.run` block:
726 /// ```rust,no_run
727 /// # use std::time::Duration;
728 /// # use restate_sdk::prelude::*;
729 /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
730 /// let my_run_retry_policy = RunRetryPolicy::default()
731 /// .initial_delay(Duration::from_millis(100))
732 /// .exponentiation_factor(2.0)
733 /// .max_delay(Duration::from_millis(1000))
734 /// .max_attempts(10)
735 /// .max_duration(Duration::from_secs(10));
736 /// ctx.run(|| write_to_other_system())
737 /// .retry_policy(my_run_retry_policy)
738 /// .await
739 /// .map_err(|e| {
740 /// // Handle the terminal error after retries exhausted
741 /// // For example, undo previous actions (see sagas guide) and
742 /// // propagate the error back to the caller
743 /// e
744 /// })?;
745 /// # Ok(())
746 /// # }
747 /// # async fn write_to_other_system() -> Result<String, HandlerError>{
748 /// # Ok("Hello".to_string())
749 /// # }
750 /// ```
751 ///
752 /// This way you can override the default retry behavior of your Restate service for specific operations.
753 /// Have a look at [`RunFuture::retry_policy`] for more information.
754 ///
755 /// If you set a maximum number of attempts, then the `ctx.run` block will fail with a [TerminalError] once the retries are exhausted.
756 /// Have a look at the [Sagas guide](https://docs.restate.dev/guides/sagas) to learn how to undo previous actions of the handler to keep the system in a consistent state.
757 ///
758 /// **Caution: Immediately await journaled actions:**
759 /// Always immediately await `ctx.run`, before doing any other context calls.
760 /// If not, you might bump into non-determinism errors during replay,
761 /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way.
762 ///
763 #[must_use]
764 fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
765 where
766 R: RunClosure<Fut = F, Output = T> + Send + 'ctx,
767 F: Future<Output = HandlerResult<T>> + Send + 'ctx,
768 T: Serialize + Deserialize + 'static,
769 {
770 self.inner_context().run(run_closure)
771 }
772
773 /// Return a random seed inherently predictable, based on the invocation id, which is not secret.
774 ///
775 /// This value is stable during the invocation lifecycle, thus across retries.
776 fn random_seed(&self) -> u64 {
777 private::SealedContext::random_seed(self)
778 }
779
780 /// ### Generating random numbers
781 ///
782 /// Return a [`rand::Rng`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`].
783 ///
784 /// For example, you can use this to generate a random number:
785 ///
786 /// ```rust,no_run
787 /// # use restate_sdk::prelude::*;
788 /// # use rand::Rng;
789 /// async fn rand_generate(mut ctx: Context<'_>) {
790 /// let x: u32 = ctx.rand().random();
791 /// # }
792 /// ```
793 ///
794 /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options.
795 /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`].
796 #[cfg(feature = "rand")]
797 fn rand(&mut self) -> &mut rand::prelude::StdRng {
798 private::SealedContext::rand(self)
799 }
800 /// ### Generating UUIDs
801 ///
802 /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`].
803 ///
804 /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries.
805 ///
806 /// Do not use this in cryptographic contexts.
807 ///
808 /// ```rust,no_run
809 /// # use restate_sdk::prelude::*;
810 /// # use uuid::Uuid;
811 /// # async fn uuid_generate(mut ctx: Context<'_>) {
812 /// let uuid: Uuid = ctx.rand_uuid();
813 /// # }
814 /// ```
815 #[cfg(all(feature = "rand", feature = "uuid"))]
816 fn rand_uuid(&mut self) -> uuid::Uuid {
817 let rand = private::SealedContext::rand(self);
818 uuid::Uuid::from_u64_pair(rand::RngCore::next_u64(rand), rand::RngCore::next_u64(rand))
819 }
820}
821
822impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {}
823
824/// # Reading state
825/// You can store key-value state in Restate.
826/// Restate makes sure the state is consistent with the processing of the code execution.
827///
828/// **This feature is only available for Virtual Objects and Workflows:**
829/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
830/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
831///
832/// ```rust,no_run
833/// # use restate_sdk::prelude::*;
834/// #
835/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
836/// /// 1. Listing state keys
837/// /// List all the state keys that have entries in the state store for this object, via:
838/// let keys = ctx.get_keys().await?;
839///
840/// /// 2. Getting a state value
841/// let my_string = ctx
842/// .get::<String>("my-key")
843/// .await?
844/// .unwrap_or(String::from("my-default"));
845/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
846///
847/// /// 3. Setting a state value
848/// ctx.set("my-key", String::from("my-value"));
849///
850/// /// 4. Clearing a state value
851/// ctx.clear("my-key");
852///
853/// /// 5. Clearing all state values
854/// /// Deletes all the state in Restate for the object
855/// ctx.clear_all();
856/// # Ok(())
857/// # }
858/// ```
859///
860/// For more info about serialization, see [crate::serde]
861///
862/// ### Command-line introspection
863/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
864/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
865///
866pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> {
867 /// Get state
868 fn get<T: Deserialize + 'static>(
869 &self,
870 key: &'ctx str,
871 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
872 self.inner_context().get(key)
873 }
874
875 /// Get state keys
876 fn get_keys(&'ctx self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + 'ctx {
877 self.inner_context().get_keys()
878 }
879}
880
881impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> ContextReadState<'ctx>
882 for CTX
883{
884}
885
886/// # Writing State
887/// You can store key-value state in Restate.
888/// Restate makes sure the state is consistent with the processing of the code execution.
889///
890/// **This feature is only available for Virtual Objects and Workflows:**
891/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
892/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
893///
894/// ```rust,no_run
895/// # use restate_sdk::prelude::*;
896/// #
897/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
898/// /// 1. Listing state keys
899/// /// List all the state keys that have entries in the state store for this object, via:
900/// let keys = ctx.get_keys().await?;
901///
902/// /// 2. Getting a state value
903/// let my_string = ctx
904/// .get::<String>("my-key")
905/// .await?
906/// .unwrap_or(String::from("my-default"));
907/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
908///
909/// /// 3. Setting a state value
910/// ctx.set("my-key", String::from("my-value"));
911///
912/// /// 4. Clearing a state value
913/// ctx.clear("my-key");
914///
915/// /// 5. Clearing all state values
916/// /// Deletes all the state in Restate for the object
917/// ctx.clear_all();
918/// # Ok(())
919/// # }
920/// ```
921///
922/// For more info about serialization, see [crate::serde]
923///
924/// ## Command-line introspection
925/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
926/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
927///
928pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> {
929 /// Set state
930 fn set<T: Serialize + 'static>(&self, key: &str, t: T) {
931 self.inner_context().set(key, t)
932 }
933
934 /// Clear state
935 fn clear(&self, key: &str) {
936 self.inner_context().clear(key)
937 }
938
939 /// Clear all state
940 fn clear_all(&self) {
941 self.inner_context().clear_all()
942 }
943}
944
945impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanWriteState> ContextWriteState<'ctx>
946 for CTX
947{
948}
949
950/// Trait exposing Restate promise functionalities.
951///
952/// A promise is a durable, distributed version of a Rust oneshot channel.
953/// Restate keeps track of the promises across restarts/failures.
954///
955/// You can use this feature to implement interaction between different workflow handlers, e.g. to
956/// send a signal from a shared handler to the workflow handler.
957pub trait ContextPromises<'ctx>: private::SealedContext<'ctx> {
958 /// Create a promise
959 fn promise<T: Deserialize + 'static>(
960 &'ctx self,
961 key: &'ctx str,
962 ) -> impl DurableFuture<Output = Result<T, TerminalError>> + 'ctx {
963 self.inner_context().promise(key)
964 }
965
966 /// Peek a promise
967 fn peek_promise<T: Deserialize + 'static>(
968 &self,
969 key: &'ctx str,
970 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
971 self.inner_context().peek_promise(key)
972 }
973
974 /// Resolve a promise
975 fn resolve_promise<T: Serialize + 'static>(&self, key: &str, t: T) {
976 self.inner_context().resolve_promise(key, t)
977 }
978
979 /// Resolve a promise
980 fn reject_promise(&self, key: &str, failure: TerminalError) {
981 self.inner_context().reject_promise(key, failure)
982 }
983}
984
985impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanUsePromises> ContextPromises<'ctx>
986 for CTX
987{
988}
989
990pub trait DurableFuture: Future + macro_support::SealedDurableFuture {}
991
992pub(crate) mod private {
993 use super::*;
994 pub trait SealedContext<'ctx> {
995 fn inner_context(&self) -> &'ctx ContextInternal;
996
997 fn random_seed(&self) -> u64;
998
999 #[cfg(feature = "rand")]
1000 fn rand(&mut self) -> &mut rand::prelude::StdRng;
1001 }
1002
1003 // Context capabilities
1004 pub trait SealedCanReadState {}
1005 pub trait SealedCanWriteState {}
1006 pub trait SealedCanUsePromises {}
1007
1008 impl<'ctx> SealedContext<'ctx> for Context<'ctx> {
1009 fn inner_context(&self) -> &'ctx ContextInternal {
1010 self.inner
1011 }
1012
1013 fn random_seed(&self) -> u64 {
1014 self.random_seed
1015 }
1016
1017 #[cfg(feature = "rand")]
1018 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1019 &mut self.std_rng
1020 }
1021 }
1022
1023 impl<'ctx> SealedContext<'ctx> for SharedObjectContext<'ctx> {
1024 fn inner_context(&self) -> &'ctx ContextInternal {
1025 self.inner
1026 }
1027
1028 fn random_seed(&self) -> u64 {
1029 self.random_seed
1030 }
1031
1032 #[cfg(feature = "rand")]
1033 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1034 &mut self.std_rng
1035 }
1036 }
1037
1038 impl SealedCanReadState for SharedObjectContext<'_> {}
1039
1040 impl<'ctx> SealedContext<'ctx> for ObjectContext<'ctx> {
1041 fn inner_context(&self) -> &'ctx ContextInternal {
1042 self.inner
1043 }
1044
1045 fn random_seed(&self) -> u64 {
1046 self.random_seed
1047 }
1048
1049 #[cfg(feature = "rand")]
1050 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1051 &mut self.std_rng
1052 }
1053 }
1054
1055 impl SealedCanReadState for ObjectContext<'_> {}
1056 impl SealedCanWriteState for ObjectContext<'_> {}
1057
1058 impl<'ctx> SealedContext<'ctx> for SharedWorkflowContext<'ctx> {
1059 fn inner_context(&self) -> &'ctx ContextInternal {
1060 self.inner
1061 }
1062
1063 fn random_seed(&self) -> u64 {
1064 self.random_seed
1065 }
1066
1067 #[cfg(feature = "rand")]
1068 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1069 &mut self.std_rng
1070 }
1071 }
1072
1073 impl SealedCanReadState for SharedWorkflowContext<'_> {}
1074 impl SealedCanUsePromises for SharedWorkflowContext<'_> {}
1075
1076 impl<'ctx> SealedContext<'ctx> for WorkflowContext<'ctx> {
1077 fn inner_context(&self) -> &'ctx ContextInternal {
1078 self.inner
1079 }
1080
1081 fn random_seed(&self) -> u64 {
1082 self.random_seed
1083 }
1084
1085 #[cfg(feature = "rand")]
1086 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1087 &mut self.std_rng
1088 }
1089 }
1090
1091 impl SealedCanReadState for WorkflowContext<'_> {}
1092 impl SealedCanWriteState for WorkflowContext<'_> {}
1093 impl SealedCanUsePromises for WorkflowContext<'_> {}
1094}