restate_sdk/context/mod.rs
1//! Types exposing Restate functionalities to service handlers.
2
3use crate::endpoint::{ContextInternal, InputMetadata};
4use crate::errors::{HandlerResult, TerminalError};
5use crate::serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8
9#[doc(hidden)]
10pub mod macro_support;
11mod request;
12mod run;
13mod select;
14mod select_any;
15
16pub use request::{CallFuture, InvocationHandle, Request, RequestTarget};
17pub use run::{RunClosure, RunFuture, RunRetryPolicy};
18pub use select_any::DurableFuturesUnordered;
19
20pub type HeaderMap = http::HeaderMap<String>;
21
22/// Service handler context.
23pub struct Context<'ctx> {
24 invocation_id: String,
25 random_seed: u64,
26 #[cfg(feature = "rand")]
27 std_rng: rand::prelude::StdRng,
28 headers: HeaderMap,
29 inner: &'ctx ContextInternal,
30}
31
32impl Context<'_> {
33 /// Get invocation id.
34 pub fn invocation_id(&self) -> &str {
35 &self.invocation_id
36 }
37
38 /// Get request headers.
39 pub fn headers(&self) -> &HeaderMap {
40 &self.headers
41 }
42
43 /// Get request headers.
44 pub fn headers_mut(&mut self) -> &HeaderMap {
45 &mut self.headers
46 }
47}
48
49impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for Context<'ctx> {
50 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
51 Self {
52 invocation_id: value.1.invocation_id,
53 random_seed: value.1.random_seed,
54 #[cfg(feature = "rand")]
55 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
56 headers: value.1.headers,
57 inner: value.0,
58 }
59 }
60}
61
62/// Object shared handler context.
63pub struct SharedObjectContext<'ctx> {
64 invocation_id: String,
65 key: String,
66 random_seed: u64,
67 #[cfg(feature = "rand")]
68 std_rng: rand::prelude::StdRng,
69 headers: HeaderMap,
70 pub(crate) inner: &'ctx ContextInternal,
71}
72
73impl SharedObjectContext<'_> {
74 /// Get invocation id.
75 pub fn invocation_id(&self) -> &str {
76 &self.invocation_id
77 }
78
79 /// Get object key.
80 pub fn key(&self) -> &str {
81 &self.key
82 }
83
84 /// Get request headers.
85 pub fn headers(&self) -> &HeaderMap {
86 &self.headers
87 }
88
89 /// Get request headers.
90 pub fn headers_mut(&mut self) -> &HeaderMap {
91 &mut self.headers
92 }
93}
94
95impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedObjectContext<'ctx> {
96 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
97 Self {
98 invocation_id: value.1.invocation_id,
99 key: value.1.key,
100 random_seed: value.1.random_seed,
101 #[cfg(feature = "rand")]
102 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
103 headers: value.1.headers,
104 inner: value.0,
105 }
106 }
107}
108
109/// Object handler context.
110pub struct ObjectContext<'ctx> {
111 invocation_id: String,
112 key: String,
113 random_seed: u64,
114 #[cfg(feature = "rand")]
115 std_rng: rand::prelude::StdRng,
116 headers: HeaderMap,
117 pub(crate) inner: &'ctx ContextInternal,
118}
119
120impl ObjectContext<'_> {
121 /// Get invocation id.
122 pub fn invocation_id(&self) -> &str {
123 &self.invocation_id
124 }
125
126 /// Get object key.
127 pub fn key(&self) -> &str {
128 &self.key
129 }
130
131 /// Get request headers.
132 pub fn headers(&self) -> &HeaderMap {
133 &self.headers
134 }
135
136 /// Get request headers.
137 pub fn headers_mut(&mut self) -> &HeaderMap {
138 &mut self.headers
139 }
140}
141
142impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for ObjectContext<'ctx> {
143 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
144 Self {
145 invocation_id: value.1.invocation_id,
146 key: value.1.key,
147 random_seed: value.1.random_seed,
148 #[cfg(feature = "rand")]
149 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
150 headers: value.1.headers,
151 inner: value.0,
152 }
153 }
154}
155
156/// Workflow shared handler context.
157pub struct SharedWorkflowContext<'ctx> {
158 invocation_id: String,
159 key: String,
160 random_seed: u64,
161 #[cfg(feature = "rand")]
162 std_rng: rand::prelude::StdRng,
163 headers: HeaderMap,
164 pub(crate) inner: &'ctx ContextInternal,
165}
166
167impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedWorkflowContext<'ctx> {
168 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
169 Self {
170 invocation_id: value.1.invocation_id,
171 key: value.1.key,
172 random_seed: value.1.random_seed,
173 #[cfg(feature = "rand")]
174 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
175 headers: value.1.headers,
176 inner: value.0,
177 }
178 }
179}
180
181impl SharedWorkflowContext<'_> {
182 /// Get invocation id.
183 pub fn invocation_id(&self) -> &str {
184 &self.invocation_id
185 }
186
187 /// Get workflow key.
188 pub fn key(&self) -> &str {
189 &self.key
190 }
191
192 /// Get request headers.
193 pub fn headers(&self) -> &HeaderMap {
194 &self.headers
195 }
196
197 /// Get request headers.
198 pub fn headers_mut(&mut self) -> &HeaderMap {
199 &mut self.headers
200 }
201}
202
203/// Workflow handler context.
204pub struct WorkflowContext<'ctx> {
205 invocation_id: String,
206 key: String,
207 random_seed: u64,
208 #[cfg(feature = "rand")]
209 std_rng: rand::prelude::StdRng,
210 headers: HeaderMap,
211 pub(crate) inner: &'ctx ContextInternal,
212}
213
214impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for WorkflowContext<'ctx> {
215 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
216 Self {
217 invocation_id: value.1.invocation_id,
218 key: value.1.key,
219 random_seed: value.1.random_seed,
220 #[cfg(feature = "rand")]
221 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
222 headers: value.1.headers,
223 inner: value.0,
224 }
225 }
226}
227
228impl WorkflowContext<'_> {
229 /// Get invocation id.
230 pub fn invocation_id(&self) -> &str {
231 &self.invocation_id
232 }
233
234 /// Get workflow key.
235 pub fn key(&self) -> &str {
236 &self.key
237 }
238
239 /// Get request headers.
240 pub fn headers(&self) -> &HeaderMap {
241 &self.headers
242 }
243
244 /// Get request headers.
245 pub fn headers_mut(&mut self) -> &HeaderMap {
246 &mut self.headers
247 }
248}
249
250///
251/// # Scheduling & Timers
252/// The Restate SDK includes durable timers.
253/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time.
254/// These timers are resilient to failures and restarts.
255/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts.
256///
257/// ## Scheduling Async Tasks
258///
259/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after).
260///
261/// ## Durable sleep
262/// To sleep in a Restate application for ten seconds, do the following:
263///
264/// ```rust,no_run
265/// # use restate_sdk::prelude::*;
266/// # use std::convert::Infallible;
267/// # use std::time::Duration;
268/// #
269/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
270/// ctx.sleep(Duration::from_secs(10)).await?;
271/// # Ok(())
272/// # }
273/// ```
274///
275/// **Cost savings on FaaS**:
276/// Restate suspends the handler while it is sleeping, to free up resources.
277/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping.
278///
279/// **Sleeping in Virtual Objects**:
280/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping.
281///
282/// <details>
283/// <summary>Clock synchronization Restate Server vs. SDK</summary>
284///
285/// The Restate SDK calculates the wake-up time based on the delay you specify.
286/// The Restate Server then uses this calculated time to wake up the handler.
287/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate.
288/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized.
289/// </details>
290pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> {
291 /// Sleep using Restate
292 fn sleep(
293 &self,
294 duration: Duration,
295 ) -> impl DurableFuture<Output = Result<(), TerminalError>> + 'ctx {
296 private::SealedContext::inner_context(self).sleep(duration)
297 }
298}
299
300impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
301
302/// # Service Communication
303///
304/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response.
305///
306/// ## Request-response calls
307///
308/// Request-response calls are requests where the client waits for the response.
309///
310/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way:
311///
312/// ```rust,no_run
313/// # #[path = "../../examples/services/mod.rs"]
314/// # mod services;
315/// # use services::my_virtual_object::MyVirtualObjectClient;
316/// # use services::my_service::MyServiceClient;
317/// # use services::my_workflow::MyWorkflowClient;
318/// # use restate_sdk::prelude::*;
319/// #
320/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
321/// // To a Service:
322/// let service_response = ctx
323/// .service_client::<MyServiceClient>()
324/// .my_handler(String::from("Hi!"))
325/// .call()
326/// .await?;
327///
328/// // To a Virtual Object:
329/// let object_response = ctx
330/// .object_client::<MyVirtualObjectClient>("Mary")
331/// .my_handler(String::from("Hi!"))
332/// .call()
333/// .await?;
334///
335/// // To a Workflow:
336/// let workflow_result = ctx
337/// .workflow_client::<MyWorkflowClient>("my-workflow-id")
338/// .run(String::from("Hi!"))
339/// .call()
340/// .await?;
341/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
342/// .interact_with_workflow()
343/// .call()
344/// .await?;
345/// # Ok(())
346/// # }
347/// ```
348///
349/// 1. **Create a service client**.
350/// - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`.
351/// - For Workflows, you need to supply a workflow ID that is unique per workflow execution.
352/// 2. **Specify the handler** you want to call and supply the request.
353/// 3. **Await** the call to retrieve the response.
354///
355/// **No need for manual retry logic**:
356/// Restate proxies all the calls and logs them in the journal.
357/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here.
358///
359/// ## Sending messages
360///
361/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows:
362///
363/// ```rust,no_run
364/// # #[path = "../../examples/services/mod.rs"]
365/// # mod services;
366/// # use services::my_virtual_object::MyVirtualObjectClient;
367/// # use services::my_service::MyServiceClient;
368/// # use services::my_workflow::MyWorkflowClient;
369/// # use restate_sdk::prelude::*;
370/// #
371/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
372/// // To a Service:
373/// ctx.service_client::<MyServiceClient>()
374/// .my_handler(String::from("Hi!"))
375/// .send();
376///
377/// // To a Virtual Object:
378/// ctx.object_client::<MyVirtualObjectClient>("Mary")
379/// .my_handler(String::from("Hi!"))
380/// .send();
381///
382/// // To a Workflow:
383/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
384/// .run(String::from("Hi!"))
385/// .send();
386/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
387/// .interact_with_workflow()
388/// .send();
389/// # Ok(())
390/// # }
391/// ```
392///
393/// **No need for message queues**:
394/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery.
395/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed.
396///
397/// ## Delayed calls
398///
399/// A delayed call is a one-way call that gets executed after a specified delay.
400///
401/// To schedule a delayed call, send a message with a delay parameter, as follows:
402///
403/// ```rust,no_run
404/// # #[path = "../../examples/services/mod.rs"]
405/// # mod services;
406/// # use services::my_virtual_object::MyVirtualObjectClient;
407/// # use services::my_service::MyServiceClient;
408/// # use services::my_workflow::MyWorkflowClient;
409/// # use restate_sdk::prelude::*;
410/// # use std::time::Duration;
411/// #
412/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
413/// // To a Service:
414/// ctx.service_client::<MyServiceClient>()
415/// .my_handler(String::from("Hi!"))
416/// .send_after(Duration::from_millis(5000));
417///
418/// // To a Virtual Object:
419/// ctx.object_client::<MyVirtualObjectClient>("Mary")
420/// .my_handler(String::from("Hi!"))
421/// .send_after(Duration::from_millis(5000));
422///
423/// // To a Workflow:
424/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
425/// .run(String::from("Hi!"))
426/// .send_after(Duration::from_millis(5000));
427/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
428/// .interact_with_workflow()
429/// .send_after(Duration::from_millis(5000));
430/// # Ok(())
431/// # }
432/// ```
433///
434/// You can also use this functionality to schedule async tasks.
435/// Restate will make sure the task gets executed at the desired time.
436///
437/// ### Ordering guarantees in Virtual Objects
438/// Invocations to a Virtual Object are executed serially.
439/// Invocations will execute in the same order in which they arrive at Restate.
440/// For example, assume a handler calls the same Virtual Object twice:
441///
442/// ```rust,no_run
443/// # #[path = "../../examples/services/my_virtual_object.rs"]
444/// # mod my_virtual_object;
445/// # use my_virtual_object::MyVirtualObjectClient;
446/// # use restate_sdk::prelude::*;
447/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
448/// ctx.object_client::<MyVirtualObjectClient>("Mary")
449/// .my_handler(String::from("I'm call A!"))
450/// .send();
451/// ctx.object_client::<MyVirtualObjectClient>("Mary")
452/// .my_handler(String::from("I'm call B!"))
453/// .send();
454/// # Ok(())
455/// }
456/// ```
457///
458/// It is guaranteed that call A will execute before call B.
459/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls.
460///
461/// ### Deadlocks with Virtual Objects
462/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests.
463/// Some example cases:
464/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys.
465/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again.
466///
467/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations).
468///
469pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
470 /// Create a [`Request`].
471 fn request<Req, Res>(
472 &self,
473 request_target: RequestTarget,
474 req: Req,
475 ) -> Request<'ctx, Req, Res> {
476 Request::new(self.inner_context(), request_target, req)
477 }
478
479 /// Create an [`InvocationHandle`] from an invocation id.
480 fn invocation_handle(&self, invocation_id: String) -> impl InvocationHandle + 'ctx {
481 self.inner_context().invocation_handle(invocation_id)
482 }
483
484 /// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
485 ///
486 /// ```rust,no_run
487 /// # use std::time::Duration;
488 /// # use restate_sdk::prelude::*;
489 ///
490 /// #[restate_sdk::service]
491 /// trait MyService {
492 /// async fn handle() -> HandlerResult<()>;
493 /// }
494 ///
495 /// # async fn handler(ctx: Context<'_>) {
496 /// let client = ctx.service_client::<MyServiceClient>();
497 ///
498 /// // Do request
499 /// let result = client.handle().call().await;
500 ///
501 /// // Just send the request, don't wait the response
502 /// client.handle().send();
503 ///
504 /// // Schedule the request to be executed later
505 /// client.handle().send_after(Duration::from_secs(60));
506 /// # }
507 /// ```
508 fn service_client<C>(&self) -> C
509 where
510 C: IntoServiceClient<'ctx>,
511 {
512 C::create_client(self.inner_context())
513 }
514
515 /// Create an object client. The object client is generated by the [`restate_sdk_macros::object`] macro with the same name of the trait suffixed with `Client`.
516 ///
517 /// ```rust,no_run
518 /// # use std::time::Duration;
519 /// # use restate_sdk::prelude::*;
520 ///
521 /// #[restate_sdk::object]
522 /// trait MyObject {
523 /// async fn handle() -> HandlerResult<()>;
524 /// }
525 ///
526 /// # async fn handler(ctx: Context<'_>) {
527 /// let client = ctx.object_client::<MyObjectClient>("my-key");
528 ///
529 /// // Do request
530 /// let result = client.handle().call().await;
531 ///
532 /// // Just send the request, don't wait the response
533 /// client.handle().send();
534 ///
535 /// // Schedule the request to be executed later
536 /// client.handle().send_after(Duration::from_secs(60));
537 /// # }
538 /// ```
539 fn object_client<C>(&self, key: impl Into<String>) -> C
540 where
541 C: IntoObjectClient<'ctx>,
542 {
543 C::create_client(self.inner_context(), key.into())
544 }
545
546 /// Create an workflow client. The workflow client is generated by the [`restate_sdk_macros::workflow`] macro with the same name of the trait suffixed with `Client`.
547 ///
548 /// ```rust,no_run
549 /// # use std::time::Duration;
550 /// # use restate_sdk::prelude::*;
551 ///
552 /// #[restate_sdk::workflow]
553 /// trait MyWorkflow {
554 /// async fn handle() -> HandlerResult<()>;
555 /// }
556 ///
557 /// # async fn handler(ctx: Context<'_>) {
558 /// let client = ctx.workflow_client::<MyWorkflowClient>("my-key");
559 ///
560 /// // Do request
561 /// let result = client.handle().call().await;
562 ///
563 /// // Just send the request, don't wait the response
564 /// client.handle().send();
565 ///
566 /// // Schedule the request to be executed later
567 /// client.handle().send_after(Duration::from_secs(60));
568 /// # }
569 /// ```
570 fn workflow_client<C>(&self, key: impl Into<String>) -> C
571 where
572 C: IntoWorkflowClient<'ctx>,
573 {
574 C::create_client(self.inner_context(), key.into())
575 }
576}
577
578/// Trait used by codegen to create the service client.
579pub trait IntoServiceClient<'ctx>: Sized {
580 fn create_client(ctx: &'ctx ContextInternal) -> Self;
581}
582
583/// Trait used by codegen to use the object client.
584pub trait IntoObjectClient<'ctx>: Sized {
585 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
586}
587
588/// Trait used by codegen to use the workflow client.
589pub trait IntoWorkflowClient<'ctx>: Sized {
590 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
591}
592
593impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {}
594
595/// # Awakeables
596///
597/// Awakeables pause an invocation while waiting for another process to complete a task.
598/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result.
599/// This pattern is also known as the callback (task token) pattern.
600///
601/// ## Creating awakeables
602///
603/// 1. **Create an awakeable**. This contains a String identifier and a Promise.
604/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...).
605/// For example, send an HTTP request to a service that executes the task, and attach the ID to the payload.
606/// You use `ctx.run` to avoid re-triggering the task on retries.
607/// 3. **Wait** until the other process has executed the task.
608/// The handler **receives the payload and resumes**.
609///
610/// ```rust,no_run
611/// # use restate_sdk::prelude::*;
612/// #
613/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
614/// /// 1. Create an awakeable
615/// let (id, promise) = ctx.awakeable::<String>();
616///
617/// /// 2. Trigger a task
618/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?;
619///
620/// /// 3. Wait for the promise to be resolved
621/// let payload = promise.await?;
622/// # Ok(())
623/// # }
624/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{
625/// # Ok(())
626/// # }
627/// ```
628///
629///
630/// ## Completing awakeables
631///
632/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it
633/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler.
634///
635/// - Resolving over HTTP with its ID and an optional payload:
636///
637/// ```text
638/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve
639/// -H 'content-type: application/json'
640/// -d '{"hello": "world"}'
641/// ```
642///
643/// - Rejecting over HTTP with its ID and a reason:
644///
645/// ```text
646/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject
647/// -H 'content-type: text/plain' \
648/// -d 'Very bad error!'
649/// ```
650///
651/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason:
652///
653/// ```rust,no_run
654/// # use restate_sdk::prelude::*;
655/// #
656/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> {
657/// // Resolve the awakeable
658/// ctx.resolve_awakeable(&id, "hello".to_string());
659///
660/// // Or reject the awakeable
661/// ctx.reject_awakeable(&id, TerminalError::new("my error reason"));
662/// # Ok(())
663/// # }
664/// ```
665///
666/// For more info about serialization of the payload, see [crate::serde]
667///
668/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed.
669/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return.
670///
671/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved.
672pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
673 /// Create an awakeable
674 fn awakeable<T: Deserialize + 'static>(
675 &self,
676 ) -> (
677 String,
678 impl DurableFuture<Output = Result<T, TerminalError>> + Send + 'ctx,
679 ) {
680 self.inner_context().awakeable()
681 }
682
683 /// Resolve an awakeable
684 fn resolve_awakeable<T: Serialize + 'static>(&self, key: &str, t: T) {
685 self.inner_context().resolve_awakeable(key, t)
686 }
687
688 /// Resolve an awakeable
689 fn reject_awakeable(&self, key: &str, failure: TerminalError) {
690 self.inner_context().reject_awakeable(key, failure)
691 }
692}
693
694impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
695
696/// # Journaling Results
697///
698/// Restate uses an execution log for replay after failures and suspensions.
699/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log.
700/// The SDK offers some functionalities to help you with this:
701/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries.
702/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
703/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
704///
705pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
706 /// ## Journaled actions
707 /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc).
708 /// Restate replays the result instead of re-executing the operation on retries.
709 ///
710 /// Here is an example of a database request for which the string response is stored in Restate:
711 /// ```rust,no_run
712 /// # use restate_sdk::prelude::*;
713 /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
714 /// let response = ctx.run(|| do_db_request()).await?;
715 /// # Ok(())
716 /// # }
717 /// # async fn do_db_request() -> Result<String, HandlerError>{
718 /// # Ok("Hello".to_string())
719 /// # }
720 /// ```
721 ///
722 /// You cannot use the Restate context within `ctx.run`.
723 /// This includes actions such as getting state, calling another service, and nesting other journaled actions.
724 ///
725 /// For more info about serialization of the return values, see [crate::serde].
726 ///
727 /// You can configure the retry policy for the `ctx.run` block:
728 /// ```rust,no_run
729 /// # use std::time::Duration;
730 /// # use restate_sdk::prelude::*;
731 /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
732 /// let my_run_retry_policy = RunRetryPolicy::default()
733 /// .initial_delay(Duration::from_millis(100))
734 /// .exponentiation_factor(2.0)
735 /// .max_delay(Duration::from_millis(1000))
736 /// .max_attempts(10)
737 /// .max_duration(Duration::from_secs(10));
738 /// ctx.run(|| write_to_other_system())
739 /// .retry_policy(my_run_retry_policy)
740 /// .await
741 /// .map_err(|e| {
742 /// // Handle the terminal error after retries exhausted
743 /// // For example, undo previous actions (see sagas guide) and
744 /// // propagate the error back to the caller
745 /// e
746 /// })?;
747 /// # Ok(())
748 /// # }
749 /// # async fn write_to_other_system() -> Result<String, HandlerError>{
750 /// # Ok("Hello".to_string())
751 /// # }
752 /// ```
753 ///
754 /// This way you can override the default retry behavior of your Restate service for specific operations.
755 /// Have a look at [`RunFuture::retry_policy`] for more information.
756 ///
757 /// If you set a maximum number of attempts, then the `ctx.run` block will fail with a [TerminalError] once the retries are exhausted.
758 /// Have a look at the [Sagas guide](https://docs.restate.dev/guides/sagas) to learn how to undo previous actions of the handler to keep the system in a consistent state.
759 ///
760 /// **Caution: Immediately await journaled actions:**
761 /// Always immediately await `ctx.run`, before doing any other context calls.
762 /// If not, you might bump into non-determinism errors during replay,
763 /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way.
764 ///
765 #[must_use]
766 fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
767 where
768 R: RunClosure<Fut = F, Output = T> + Send + 'ctx,
769 F: Future<Output = HandlerResult<T>> + Send + 'ctx,
770 T: Serialize + Deserialize + 'static,
771 {
772 self.inner_context().run(run_closure)
773 }
774
775 /// Return a random seed inherently predictable, based on the invocation id, which is not secret.
776 ///
777 /// This value is stable during the invocation lifecycle, thus across retries.
778 fn random_seed(&self) -> u64 {
779 private::SealedContext::random_seed(self)
780 }
781
782 /// ### Generating random numbers
783 ///
784 /// Return a [`rand::RngExt`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`].
785 ///
786 /// For example, you can use this to generate a random number:
787 ///
788 /// ```rust,no_run
789 /// # use restate_sdk::prelude::*;
790 /// # use rand::RngExt;
791 /// async fn rand_generate(mut ctx: Context<'_>) {
792 /// let x: u32 = ctx.rand().random();
793 /// # }
794 /// ```
795 ///
796 /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options.
797 /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`].
798 #[cfg(feature = "rand")]
799 fn rand(&mut self) -> &mut rand::prelude::StdRng {
800 private::SealedContext::rand(self)
801 }
802 /// ### Generating UUIDs
803 ///
804 /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`].
805 ///
806 /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries.
807 ///
808 /// Do not use this in cryptographic contexts.
809 ///
810 /// ```rust,no_run
811 /// # use restate_sdk::prelude::*;
812 /// # use uuid::Uuid;
813 /// # async fn uuid_generate(mut ctx: Context<'_>) {
814 /// let uuid: Uuid = ctx.rand_uuid();
815 /// # }
816 /// ```
817 #[cfg(all(feature = "rand", feature = "uuid"))]
818 fn rand_uuid(&mut self) -> uuid::Uuid {
819 let rand = private::SealedContext::rand(self);
820 uuid::Uuid::from_u64_pair(rand::Rng::next_u64(rand), rand::Rng::next_u64(rand))
821 }
822}
823
824impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {}
825
826/// # Reading state
827/// You can store key-value state in Restate.
828/// Restate makes sure the state is consistent with the processing of the code execution.
829///
830/// **This feature is only available for Virtual Objects and Workflows:**
831/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
832/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
833///
834/// ```rust,no_run
835/// # use restate_sdk::prelude::*;
836/// #
837/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
838/// /// 1. Listing state keys
839/// /// List all the state keys that have entries in the state store for this object, via:
840/// let keys = ctx.get_keys().await?;
841///
842/// /// 2. Getting a state value
843/// let my_string = ctx
844/// .get::<String>("my-key")
845/// .await?
846/// .unwrap_or(String::from("my-default"));
847/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
848///
849/// /// 3. Setting a state value
850/// ctx.set("my-key", String::from("my-value"));
851///
852/// /// 4. Clearing a state value
853/// ctx.clear("my-key");
854///
855/// /// 5. Clearing all state values
856/// /// Deletes all the state in Restate for the object
857/// ctx.clear_all();
858/// # Ok(())
859/// # }
860/// ```
861///
862/// For more info about serialization, see [crate::serde]
863///
864/// ### Command-line introspection
865/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
866/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
867///
868pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> {
869 /// Get state
870 fn get<T: Deserialize + 'static>(
871 &self,
872 key: &'ctx str,
873 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
874 self.inner_context().get(key)
875 }
876
877 /// Get state keys
878 fn get_keys(&'ctx self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + 'ctx {
879 self.inner_context().get_keys()
880 }
881}
882
883impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> ContextReadState<'ctx>
884 for CTX
885{
886}
887
888/// # Writing State
889/// You can store key-value state in Restate.
890/// Restate makes sure the state is consistent with the processing of the code execution.
891///
892/// **This feature is only available for Virtual Objects and Workflows:**
893/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
894/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
895///
896/// ```rust,no_run
897/// # use restate_sdk::prelude::*;
898/// #
899/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
900/// /// 1. Listing state keys
901/// /// List all the state keys that have entries in the state store for this object, via:
902/// let keys = ctx.get_keys().await?;
903///
904/// /// 2. Getting a state value
905/// let my_string = ctx
906/// .get::<String>("my-key")
907/// .await?
908/// .unwrap_or(String::from("my-default"));
909/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
910///
911/// /// 3. Setting a state value
912/// ctx.set("my-key", String::from("my-value"));
913///
914/// /// 4. Clearing a state value
915/// ctx.clear("my-key");
916///
917/// /// 5. Clearing all state values
918/// /// Deletes all the state in Restate for the object
919/// ctx.clear_all();
920/// # Ok(())
921/// # }
922/// ```
923///
924/// For more info about serialization, see [crate::serde]
925///
926/// ## Command-line introspection
927/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
928/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
929///
930pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> {
931 /// Set state
932 fn set<T: Serialize + 'static>(&self, key: &str, t: T) {
933 self.inner_context().set(key, t)
934 }
935
936 /// Clear state
937 fn clear(&self, key: &str) {
938 self.inner_context().clear(key)
939 }
940
941 /// Clear all state
942 fn clear_all(&self) {
943 self.inner_context().clear_all()
944 }
945}
946
947impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanWriteState> ContextWriteState<'ctx>
948 for CTX
949{
950}
951
952/// Trait exposing Restate promise functionalities.
953///
954/// A promise is a durable, distributed version of a Rust oneshot channel.
955/// Restate keeps track of the promises across restarts/failures.
956///
957/// You can use this feature to implement interaction between different workflow handlers, e.g. to
958/// send a signal from a shared handler to the workflow handler.
959pub trait ContextPromises<'ctx>: private::SealedContext<'ctx> {
960 /// Create a promise
961 fn promise<T: Deserialize + 'static>(
962 &'ctx self,
963 key: &'ctx str,
964 ) -> impl DurableFuture<Output = Result<T, TerminalError>> + 'ctx {
965 self.inner_context().promise(key)
966 }
967
968 /// Peek a promise
969 fn peek_promise<T: Deserialize + 'static>(
970 &self,
971 key: &'ctx str,
972 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
973 self.inner_context().peek_promise(key)
974 }
975
976 /// Resolve a promise
977 fn resolve_promise<T: Serialize + 'static>(&self, key: &str, t: T) {
978 self.inner_context().resolve_promise(key, t)
979 }
980
981 /// Resolve a promise
982 fn reject_promise(&self, key: &str, failure: TerminalError) {
983 self.inner_context().reject_promise(key, failure)
984 }
985}
986
987impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanUsePromises> ContextPromises<'ctx>
988 for CTX
989{
990}
991
992pub trait DurableFuture: Future + macro_support::SealedDurableFuture {}
993
994pub(crate) mod private {
995 use super::*;
996 pub trait SealedContext<'ctx> {
997 fn inner_context(&self) -> &'ctx ContextInternal;
998
999 fn random_seed(&self) -> u64;
1000
1001 #[cfg(feature = "rand")]
1002 fn rand(&mut self) -> &mut rand::prelude::StdRng;
1003 }
1004
1005 // Context capabilities
1006 pub trait SealedCanReadState {}
1007 pub trait SealedCanWriteState {}
1008 pub trait SealedCanUsePromises {}
1009
1010 impl<'ctx> SealedContext<'ctx> for Context<'ctx> {
1011 fn inner_context(&self) -> &'ctx ContextInternal {
1012 self.inner
1013 }
1014
1015 fn random_seed(&self) -> u64 {
1016 self.random_seed
1017 }
1018
1019 #[cfg(feature = "rand")]
1020 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1021 &mut self.std_rng
1022 }
1023 }
1024
1025 impl<'ctx> SealedContext<'ctx> for SharedObjectContext<'ctx> {
1026 fn inner_context(&self) -> &'ctx ContextInternal {
1027 self.inner
1028 }
1029
1030 fn random_seed(&self) -> u64 {
1031 self.random_seed
1032 }
1033
1034 #[cfg(feature = "rand")]
1035 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1036 &mut self.std_rng
1037 }
1038 }
1039
1040 impl SealedCanReadState for SharedObjectContext<'_> {}
1041
1042 impl<'ctx> SealedContext<'ctx> for ObjectContext<'ctx> {
1043 fn inner_context(&self) -> &'ctx ContextInternal {
1044 self.inner
1045 }
1046
1047 fn random_seed(&self) -> u64 {
1048 self.random_seed
1049 }
1050
1051 #[cfg(feature = "rand")]
1052 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1053 &mut self.std_rng
1054 }
1055 }
1056
1057 impl SealedCanReadState for ObjectContext<'_> {}
1058 impl SealedCanWriteState for ObjectContext<'_> {}
1059
1060 impl<'ctx> SealedContext<'ctx> for SharedWorkflowContext<'ctx> {
1061 fn inner_context(&self) -> &'ctx ContextInternal {
1062 self.inner
1063 }
1064
1065 fn random_seed(&self) -> u64 {
1066 self.random_seed
1067 }
1068
1069 #[cfg(feature = "rand")]
1070 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1071 &mut self.std_rng
1072 }
1073 }
1074
1075 impl SealedCanReadState for SharedWorkflowContext<'_> {}
1076 impl SealedCanUsePromises for SharedWorkflowContext<'_> {}
1077
1078 impl<'ctx> SealedContext<'ctx> for WorkflowContext<'ctx> {
1079 fn inner_context(&self) -> &'ctx ContextInternal {
1080 self.inner
1081 }
1082
1083 fn random_seed(&self) -> u64 {
1084 self.random_seed
1085 }
1086
1087 #[cfg(feature = "rand")]
1088 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1089 &mut self.std_rng
1090 }
1091 }
1092
1093 impl SealedCanReadState for WorkflowContext<'_> {}
1094 impl SealedCanWriteState for WorkflowContext<'_> {}
1095 impl SealedCanUsePromises for WorkflowContext<'_> {}
1096}