restate_sdk/context/mod.rs
1//! Types exposing Restate functionalities to service handlers.
2
3use crate::endpoint::{ContextInternal, InputMetadata};
4use crate::errors::{HandlerResult, TerminalError};
5use crate::serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8
9mod request;
10mod run;
11pub use request::{Request, RequestTarget};
12pub use run::{RunClosure, RunFuture, RunRetryPolicy};
13
14pub type HeaderMap = http::HeaderMap<String>;
15
16/// Service handler context.
17pub struct Context<'ctx> {
18 random_seed: u64,
19 #[cfg(feature = "rand")]
20 std_rng: rand::prelude::StdRng,
21 headers: HeaderMap,
22 inner: &'ctx ContextInternal,
23}
24
25impl<'ctx> Context<'ctx> {
26 /// Get request headers.
27 pub fn headers(&self) -> &HeaderMap {
28 &self.headers
29 }
30
31 /// Get request headers.
32 pub fn headers_mut(&mut self) -> &HeaderMap {
33 &mut self.headers
34 }
35}
36
37impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for Context<'ctx> {
38 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
39 Self {
40 random_seed: value.1.random_seed,
41 #[cfg(feature = "rand")]
42 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
43 headers: value.1.headers,
44 inner: value.0,
45 }
46 }
47}
48
49/// Object shared handler context.
50pub struct SharedObjectContext<'ctx> {
51 key: String,
52 random_seed: u64,
53 #[cfg(feature = "rand")]
54 std_rng: rand::prelude::StdRng,
55 headers: HeaderMap,
56 pub(crate) inner: &'ctx ContextInternal,
57}
58
59impl<'ctx> SharedObjectContext<'ctx> {
60 /// Get object key.
61 pub fn key(&self) -> &str {
62 &self.key
63 }
64
65 /// Get request headers.
66 pub fn headers(&self) -> &HeaderMap {
67 &self.headers
68 }
69
70 /// Get request headers.
71 pub fn headers_mut(&mut self) -> &HeaderMap {
72 &mut self.headers
73 }
74}
75
76impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedObjectContext<'ctx> {
77 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
78 Self {
79 key: value.1.key,
80 random_seed: value.1.random_seed,
81 #[cfg(feature = "rand")]
82 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
83 headers: value.1.headers,
84 inner: value.0,
85 }
86 }
87}
88
89/// Object handler context.
90pub struct ObjectContext<'ctx> {
91 key: String,
92 random_seed: u64,
93 #[cfg(feature = "rand")]
94 std_rng: rand::prelude::StdRng,
95 headers: HeaderMap,
96 pub(crate) inner: &'ctx ContextInternal,
97}
98
99impl<'ctx> ObjectContext<'ctx> {
100 /// Get object key.
101 pub fn key(&self) -> &str {
102 &self.key
103 }
104
105 /// Get request headers.
106 pub fn headers(&self) -> &HeaderMap {
107 &self.headers
108 }
109
110 /// Get request headers.
111 pub fn headers_mut(&mut self) -> &HeaderMap {
112 &mut self.headers
113 }
114}
115
116impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for ObjectContext<'ctx> {
117 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
118 Self {
119 key: value.1.key,
120 random_seed: value.1.random_seed,
121 #[cfg(feature = "rand")]
122 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
123 headers: value.1.headers,
124 inner: value.0,
125 }
126 }
127}
128
129/// Workflow shared handler context.
130pub struct SharedWorkflowContext<'ctx> {
131 key: String,
132 random_seed: u64,
133 #[cfg(feature = "rand")]
134 std_rng: rand::prelude::StdRng,
135 headers: HeaderMap,
136 pub(crate) inner: &'ctx ContextInternal,
137}
138
139impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedWorkflowContext<'ctx> {
140 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
141 Self {
142 key: value.1.key,
143 random_seed: value.1.random_seed,
144 #[cfg(feature = "rand")]
145 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
146 headers: value.1.headers,
147 inner: value.0,
148 }
149 }
150}
151
152impl<'ctx> SharedWorkflowContext<'ctx> {
153 /// Get workflow key.
154 pub fn key(&self) -> &str {
155 &self.key
156 }
157
158 /// Get request headers.
159 pub fn headers(&self) -> &HeaderMap {
160 &self.headers
161 }
162
163 /// Get request headers.
164 pub fn headers_mut(&mut self) -> &HeaderMap {
165 &mut self.headers
166 }
167}
168
169/// Workflow handler context.
170pub struct WorkflowContext<'ctx> {
171 key: String,
172 random_seed: u64,
173 #[cfg(feature = "rand")]
174 std_rng: rand::prelude::StdRng,
175 headers: HeaderMap,
176 pub(crate) inner: &'ctx ContextInternal,
177}
178
179impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for WorkflowContext<'ctx> {
180 fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
181 Self {
182 key: value.1.key,
183 random_seed: value.1.random_seed,
184 #[cfg(feature = "rand")]
185 std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
186 headers: value.1.headers,
187 inner: value.0,
188 }
189 }
190}
191
192impl<'ctx> WorkflowContext<'ctx> {
193 /// Get workflow key.
194 pub fn key(&self) -> &str {
195 &self.key
196 }
197
198 /// Get request headers.
199 pub fn headers(&self) -> &HeaderMap {
200 &self.headers
201 }
202
203 /// Get request headers.
204 pub fn headers_mut(&mut self) -> &HeaderMap {
205 &mut self.headers
206 }
207}
208
209///
210/// # Scheduling & Timers
211/// The Restate SDK includes durable timers.
212/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time.
213/// These timers are resilient to failures and restarts.
214/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts.
215///
216/// ## Scheduling Async Tasks
217///
218/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls][crate::context::ContextClient#delayed-calls].
219///
220///
221/// ## Durable sleep
222/// To sleep in a Restate application for ten seconds, do the following:
223///
224/// ```rust,no_run
225/// # use restate_sdk::prelude::*;
226/// # use std::convert::Infallible;
227/// # use std::time::Duration;
228/// #
229/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
230/// ctx.sleep(Duration::from_secs(10)).await?;
231/// # Ok(())
232/// # }
233/// ```
234///
235/// **Cost savings on FaaS**:
236/// Restate suspends the handler while it is sleeping, to free up resources.
237/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping.
238///
239/// **Sleeping in Virtual Objects**:
240/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping.
241///
242/// <details>
243/// <summary>Clock synchronization Restate Server vs. SDK</summary>
244///
245/// The Restate SDK calculates the wake-up time based on the delay you specify.
246/// The Restate Server then uses this calculated time to wake up the handler.
247/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate.
248/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized.
249/// </details>
250pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> {
251 /// Sleep using Restate
252 fn sleep(&self, duration: Duration) -> impl Future<Output = Result<(), TerminalError>> + 'ctx {
253 private::SealedContext::inner_context(self).sleep(duration)
254 }
255}
256
257impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
258
259/// # Service Communication
260///
261/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response.
262///
263/// ## Request-response calls
264///
265/// Request-response calls are requests where the client waits for the response.
266///
267/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way:
268///
269/// ```rust,no_run
270/// # #[path = "../../examples/services/mod.rs"]
271/// # mod services;
272/// # use services::my_virtual_object::MyVirtualObjectClient;
273/// # use services::my_service::MyServiceClient;
274/// # use services::my_workflow::MyWorkflowClient;
275/// # use restate_sdk::prelude::*;
276/// #
277/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
278/// // To a Service:
279/// let service_response = ctx
280/// .service_client::<MyServiceClient>()
281/// .my_handler(String::from("Hi!"))
282/// .call()
283/// .await?;
284///
285/// // To a Virtual Object:
286/// let object_response = ctx
287/// .object_client::<MyVirtualObjectClient>("Mary")
288/// .my_handler(String::from("Hi!"))
289/// .call()
290/// .await?;
291///
292/// // To a Workflow:
293/// let workflow_result = ctx
294/// .workflow_client::<MyWorkflowClient>("my-workflow-id")
295/// .run(String::from("Hi!"))
296/// .call()
297/// .await?;
298/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
299/// .interact_with_workflow()
300/// .call()
301/// .await?;
302/// # Ok(())
303/// # }
304/// ```
305///
306/// 1. **Create a service client**.
307/// - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`.
308/// - For Workflows, you need to supply a workflow ID that is unique per workflow execution.
309/// 2. **Specify the handler** you want to call and supply the request.
310/// 3. **Await** the call to retrieve the response.
311///
312/// **No need for manual retry logic**:
313/// Restate proxies all the calls and logs them in the journal.
314/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here.
315///
316/// ## Sending messages
317///
318/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows:
319///
320/// ```rust,no_run
321/// # #[path = "../../examples/services/mod.rs"]
322/// # mod services;
323/// # use services::my_virtual_object::MyVirtualObjectClient;
324/// # use services::my_service::MyServiceClient;
325/// # use services::my_workflow::MyWorkflowClient;
326/// # use restate_sdk::prelude::*;
327/// #
328/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
329/// // To a Service:
330/// ctx.service_client::<MyServiceClient>()
331/// .my_handler(String::from("Hi!"))
332/// .send();
333///
334/// // To a Virtual Object:
335/// ctx.object_client::<MyVirtualObjectClient>("Mary")
336/// .my_handler(String::from("Hi!"))
337/// .send();
338///
339/// // To a Workflow:
340/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
341/// .run(String::from("Hi!"))
342/// .send();
343/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
344/// .interact_with_workflow()
345/// .send();
346/// # Ok(())
347/// # }
348/// ```
349///
350/// **No need for message queues**:
351/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery.
352/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed.
353///
354/// ## Delayed calls
355///
356/// A delayed call is a one-way call that gets executed after a specified delay.
357///
358/// To schedule a delayed call, send a message with a delay parameter, as follows:
359///
360/// ```rust,no_run
361/// # #[path = "../../examples/services/mod.rs"]
362/// # mod services;
363/// # use services::my_virtual_object::MyVirtualObjectClient;
364/// # use services::my_service::MyServiceClient;
365/// # use services::my_workflow::MyWorkflowClient;
366/// # use restate_sdk::prelude::*;
367/// # use std::time::Duration;
368/// #
369/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
370/// // To a Service:
371/// ctx.service_client::<MyServiceClient>()
372/// .my_handler(String::from("Hi!"))
373/// .send_with_delay(Duration::from_millis(5000));
374///
375/// // To a Virtual Object:
376/// ctx.object_client::<MyVirtualObjectClient>("Mary")
377/// .my_handler(String::from("Hi!"))
378/// .send_with_delay(Duration::from_millis(5000));
379///
380/// // To a Workflow:
381/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
382/// .run(String::from("Hi!"))
383/// .send_with_delay(Duration::from_millis(5000));
384/// ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
385/// .interact_with_workflow()
386/// .send_with_delay(Duration::from_millis(5000));
387/// # Ok(())
388/// # }
389/// ```
390///
391/// You can also use this functionality to schedule async tasks.
392/// Restate will make sure the task gets executed at the desired time.
393///
394/// ### Ordering guarantees in Virtual Objects
395/// Invocations to a Virtual Object are executed serially.
396/// Invocations will execute in the same order in which they arrive at Restate.
397/// For example, assume a handler calls the same Virtual Object twice:
398///
399/// ```rust,no_run
400/// # #[path = "../../examples/services/my_virtual_object.rs"]
401/// # mod my_virtual_object;
402/// # use my_virtual_object::MyVirtualObjectClient;
403/// # use restate_sdk::prelude::*;
404/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
405/// ctx.object_client::<MyVirtualObjectClient>("Mary")
406/// .my_handler(String::from("I'm call A!"))
407/// .send();
408/// ctx.object_client::<MyVirtualObjectClient>("Mary")
409/// .my_handler(String::from("I'm call B!"))
410/// .send();
411/// # Ok(())
412/// }
413/// ```
414///
415/// It is guaranteed that call A will execute before call B.
416/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls.
417///
418/// ### Deadlocks with Virtual Objects
419/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests.
420/// Some example cases:
421/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys.
422/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again.
423///
424/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations).
425///
426pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
427 /// Create a [`Request`].
428 fn request<Req, Res>(
429 &self,
430 request_target: RequestTarget,
431 req: Req,
432 ) -> Request<'ctx, Req, Res> {
433 Request::new(self.inner_context(), request_target, req)
434 }
435
436 /// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
437 ///
438 /// ```rust,no_run
439 /// # use std::time::Duration;
440 /// # use restate_sdk::prelude::*;
441 ///
442 /// #[restate_sdk::service]
443 /// trait MyService {
444 /// async fn handle() -> HandlerResult<()>;
445 /// }
446 ///
447 /// # async fn handler(ctx: Context<'_>) {
448 /// let client = ctx.service_client::<MyServiceClient>();
449 ///
450 /// // Do request
451 /// let result = client.handle().call().await;
452 ///
453 /// // Just send the request, don't wait the response
454 /// client.handle().send();
455 ///
456 /// // Schedule the request to be executed later
457 /// client.handle().send_with_delay(Duration::from_secs(60));
458 /// # }
459 /// ```
460 fn service_client<C>(&self) -> C
461 where
462 C: IntoServiceClient<'ctx>,
463 {
464 C::create_client(self.inner_context())
465 }
466
467 /// Create an object client. The object client is generated by the [`restate_sdk_macros::object`] macro with the same name of the trait suffixed with `Client`.
468 ///
469 /// ```rust,no_run
470 /// # use std::time::Duration;
471 /// # use restate_sdk::prelude::*;
472 ///
473 /// #[restate_sdk::object]
474 /// trait MyObject {
475 /// async fn handle() -> HandlerResult<()>;
476 /// }
477 ///
478 /// # async fn handler(ctx: Context<'_>) {
479 /// let client = ctx.object_client::<MyObjectClient>("my-key");
480 ///
481 /// // Do request
482 /// let result = client.handle().call().await;
483 ///
484 /// // Just send the request, don't wait the response
485 /// client.handle().send();
486 ///
487 /// // Schedule the request to be executed later
488 /// client.handle().send_with_delay(Duration::from_secs(60));
489 /// # }
490 /// ```
491 fn object_client<C>(&self, key: impl Into<String>) -> C
492 where
493 C: IntoObjectClient<'ctx>,
494 {
495 C::create_client(self.inner_context(), key.into())
496 }
497
498 /// Create an workflow client. The workflow client is generated by the [`restate_sdk_macros::workflow`] macro with the same name of the trait suffixed with `Client`.
499 ///
500 /// ```rust,no_run
501 /// # use std::time::Duration;
502 /// # use restate_sdk::prelude::*;
503 ///
504 /// #[restate_sdk::workflow]
505 /// trait MyWorkflow {
506 /// async fn handle() -> HandlerResult<()>;
507 /// }
508 ///
509 /// # async fn handler(ctx: Context<'_>) {
510 /// let client = ctx.workflow_client::<MyWorkflowClient>("my-key");
511 ///
512 /// // Do request
513 /// let result = client.handle().call().await;
514 ///
515 /// // Just send the request, don't wait the response
516 /// client.handle().send();
517 ///
518 /// // Schedule the request to be executed later
519 /// client.handle().send_with_delay(Duration::from_secs(60));
520 /// # }
521 /// ```
522 fn workflow_client<C>(&self, key: impl Into<String>) -> C
523 where
524 C: IntoWorkflowClient<'ctx>,
525 {
526 C::create_client(self.inner_context(), key.into())
527 }
528}
529
530/// Trait used by codegen to create the service client.
531pub trait IntoServiceClient<'ctx>: Sized {
532 fn create_client(ctx: &'ctx ContextInternal) -> Self;
533}
534
535/// Trait used by codegen to use the object client.
536pub trait IntoObjectClient<'ctx>: Sized {
537 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
538}
539
540/// Trait used by codegen to use the workflow client.
541pub trait IntoWorkflowClient<'ctx>: Sized {
542 fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
543}
544
545impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {}
546
547/// # Awakeables
548///
549/// Awakeables pause an invocation while waiting for another process to complete a task.
550/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result.
551/// This pattern is also known as the callback (task token) pattern.
552///
553/// ## Creating awakeables
554///
555/// 1. **Create an awakeable**. This contains a String identifier and a Promise.
556/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...).
557/// For example, send an HTTP request to a service that executes the task, and attach the ID to the payload.
558/// You use `ctx.run` to avoid re-triggering the task on retries.
559/// 3. **Wait** until the other process has executed the task.
560/// The handler **receives the payload and resumes**.
561///
562/// ```rust,no_run
563/// # use restate_sdk::prelude::*;
564/// #
565/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
566/// /// 1. Create an awakeable
567/// let (id, promise) = ctx.awakeable::<String>();
568///
569/// /// 2. Trigger a task
570/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?;
571///
572/// /// 3. Wait for the promise to be resolved
573/// let payload = promise.await?;
574/// # Ok(())
575/// # }
576/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{
577/// # Ok(())
578/// # }
579/// ```
580///
581///
582/// ## Completing awakeables
583///
584/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it
585/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler.
586///
587/// - Resolving over HTTP with its ID and an optional payload:
588///
589/// ```text
590/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve
591/// -H 'content-type: application/json'
592/// -d '{"hello": "world"}'
593/// ```
594///
595/// - Rejecting over HTTP with its ID and a reason:
596///
597/// ```text
598/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject
599/// -H 'content-type: text/plain' \
600/// -d 'Very bad error!'
601/// ```
602///
603/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason:
604///
605/// ```rust,no_run
606/// # use restate_sdk::prelude::*;
607/// #
608/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> {
609/// // Resolve the awakeable
610/// ctx.resolve_awakeable(&id, "hello".to_string());
611///
612/// // Or reject the awakeable
613/// ctx.reject_awakeable(&id, TerminalError::new("my error reason"));
614/// # Ok(())
615/// # }
616/// ```
617///
618/// For more info about serialization of the payload, see [crate::serde]
619///
620/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed.
621/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return.
622///
623/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved.
624pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
625 /// Create an awakeable
626 fn awakeable<T: Deserialize + 'static>(
627 &self,
628 ) -> (
629 String,
630 impl Future<Output = Result<T, TerminalError>> + Send + Sync + 'ctx,
631 ) {
632 self.inner_context().awakeable()
633 }
634
635 /// Resolve an awakeable
636 fn resolve_awakeable<T: Serialize + 'static>(&self, key: &str, t: T) {
637 self.inner_context().resolve_awakeable(key, t)
638 }
639
640 /// Resolve an awakeable
641 fn reject_awakeable(&self, key: &str, failure: TerminalError) {
642 self.inner_context().reject_awakeable(key, failure)
643 }
644}
645
646impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
647
648/// # Journaling Results
649///
650/// Restate uses an execution log for replay after failures and suspensions.
651/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log.
652/// The SDK offers some functionalities to help you with this:
653/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries.
654/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
655/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
656///
657pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
658 /// ## Journaled actions
659 /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc).
660 /// Restate replays the result instead of re-executing the operation on retries.
661 ///
662 /// Here is an example of a database request for which the string response is stored in Restate:
663 /// ```rust,no_run
664 /// # use restate_sdk::prelude::*;
665 /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
666 /// let response = ctx.run(|| do_db_request()).await?;
667 /// # Ok(())
668 /// # }
669 /// # async fn do_db_request() -> Result<String, HandlerError>{
670 /// # Ok("Hello".to_string())
671 /// # }
672 /// ```
673 ///
674 /// You cannot use the Restate context within `ctx.run`.
675 /// This includes actions such as getting state, calling another service, and nesting other journaled actions.
676 ///
677 /// For more info about serialization of the return values, see [crate::serde].
678 ///
679 /// **Caution: Immediately await journaled actions:**
680 /// Always immediately await `ctx.run`, before doing any other context calls.
681 /// If not, you might bump into non-determinism errors during replay,
682 /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way.
683 ///
684 #[must_use]
685 fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
686 where
687 R: RunClosure<Fut = F, Output = T> + Send + 'ctx,
688 F: Future<Output = HandlerResult<T>> + Send + 'ctx,
689 T: Serialize + Deserialize + 'static,
690 {
691 self.inner_context().run(run_closure)
692 }
693
694 /// Return a random seed inherently predictable, based on the invocation id, which is not secret.
695 ///
696 /// This value is stable during the invocation lifecycle, thus across retries.
697 fn random_seed(&self) -> u64 {
698 private::SealedContext::random_seed(self)
699 }
700
701 /// ### Generating random numbers
702 ///
703 /// Return a [`rand::Rng`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`].
704 ///
705 /// For example, you can use this to generate a random number:
706 ///
707 /// ```rust,no_run
708 /// # use restate_sdk::prelude::*;
709 /// # use rand::Rng;
710 /// async fn rand_generate(mut ctx: Context<'_>) {
711 /// let x: u32 = ctx.rand().gen();
712 /// # }
713 /// ```
714 ///
715 /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options.
716 /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`].
717 #[cfg(feature = "rand")]
718 fn rand(&mut self) -> &mut rand::prelude::StdRng {
719 private::SealedContext::rand(self)
720 }
721 /// ### Generating UUIDs
722 ///
723 /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`].
724 ///
725 /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries.
726 ///
727 /// Do not use this in cryptographic contexts.
728 ///
729 /// ```rust,no_run
730 /// # use restate_sdk::prelude::*;
731 /// # use uuid::Uuid;
732 /// # async fn uuid_generate(mut ctx: Context<'_>) {
733 /// let uuid: Uuid = ctx.rand_uuid();
734 /// # }
735 /// ```
736 #[cfg(all(feature = "rand", feature = "uuid"))]
737 fn rand_uuid(&mut self) -> uuid::Uuid {
738 let rand = private::SealedContext::rand(self);
739 uuid::Uuid::from_u64_pair(rand::RngCore::next_u64(rand), rand::RngCore::next_u64(rand))
740 }
741}
742
743impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {}
744
745/// # Reading state
746/// You can store key-value state in Restate.
747/// Restate makes sure the state is consistent with the processing of the code execution.
748///
749/// **This feature is only available for Virtual Objects and Workflows:**
750/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
751/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
752///
753/// ```rust,no_run
754/// # use restate_sdk::prelude::*;
755/// #
756/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
757/// /// 1. Listing state keys
758/// /// List all the state keys that have entries in the state store for this object, via:
759/// let keys = ctx.get_keys().await?;
760///
761/// /// 2. Getting a state value
762/// let my_string = ctx
763/// .get::<String>("my-key")
764/// .await?
765/// .unwrap_or(String::from("my-default"));
766/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
767///
768/// /// 3. Setting a state value
769/// ctx.set("my-key", String::from("my-value"));
770///
771/// /// 4. Clearing a state value
772/// ctx.clear("my-key");
773///
774/// /// 5. Clearing all state values
775/// /// Deletes all the state in Restate for the object
776/// ctx.clear_all();
777/// # Ok(())
778/// # }
779/// ```
780///
781/// For more info about serialization, see [crate::serde]
782///
783/// ### Command-line introspection
784/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
785/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
786///
787pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> {
788 /// Get state
789 fn get<T: Deserialize + 'static>(
790 &self,
791 key: &str,
792 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
793 self.inner_context().get(key)
794 }
795
796 /// Get state keys
797 fn get_keys(&self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + 'ctx {
798 self.inner_context().get_keys()
799 }
800}
801
802impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> ContextReadState<'ctx>
803 for CTX
804{
805}
806
807/// # Writing State
808/// You can store key-value state in Restate.
809/// Restate makes sure the state is consistent with the processing of the code execution.
810///
811/// **This feature is only available for Virtual Objects and Workflows:**
812/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
813/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
814///
815/// ```rust,no_run
816/// # use restate_sdk::prelude::*;
817/// #
818/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
819/// /// 1. Listing state keys
820/// /// List all the state keys that have entries in the state store for this object, via:
821/// let keys = ctx.get_keys().await?;
822///
823/// /// 2. Getting a state value
824/// let my_string = ctx
825/// .get::<String>("my-key")
826/// .await?
827/// .unwrap_or(String::from("my-default"));
828/// let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
829///
830/// /// 3. Setting a state value
831/// ctx.set("my-key", String::from("my-value"));
832///
833/// /// 4. Clearing a state value
834/// ctx.clear("my-key");
835///
836/// /// 5. Clearing all state values
837/// /// Deletes all the state in Restate for the object
838/// ctx.clear_all();
839/// # Ok(())
840/// # }
841/// ```
842///
843/// For more info about serialization, see [crate::serde]
844///
845/// ## Command-line introspection
846/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
847/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
848///
849pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> {
850 /// Set state
851 fn set<T: Serialize + 'static>(&self, key: &str, t: T) {
852 self.inner_context().set(key, t)
853 }
854
855 /// Clear state
856 fn clear(&self, key: &str) {
857 self.inner_context().clear(key)
858 }
859
860 /// Clear all state
861 fn clear_all(&self) {
862 self.inner_context().clear_all()
863 }
864}
865
866impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanWriteState> ContextWriteState<'ctx>
867 for CTX
868{
869}
870
871/// Trait exposing Restate promise functionalities.
872///
873/// A promise is a durable, distributed version of a Rust oneshot channel.
874/// Restate keeps track of the promises across restarts/failures.
875///
876/// You can use this feature to implement interaction between different workflow handlers, e.g. to
877/// send a signal from a shared handler to the workflow handler.
878pub trait ContextPromises<'ctx>: private::SealedContext<'ctx> {
879 /// Create a promise
880 fn promise<T: Deserialize + 'static>(
881 &'ctx self,
882 key: &str,
883 ) -> impl Future<Output = Result<T, TerminalError>> + 'ctx {
884 self.inner_context().promise(key)
885 }
886
887 /// Peek a promise
888 fn peek_promise<T: Deserialize + 'static>(
889 &self,
890 key: &str,
891 ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
892 self.inner_context().peek_promise(key)
893 }
894
895 /// Resolve a promise
896 fn resolve_promise<T: Serialize + 'static>(&self, key: &str, t: T) {
897 self.inner_context().resolve_promise(key, t)
898 }
899
900 /// Resolve a promise
901 fn reject_promise(&self, key: &str, failure: TerminalError) {
902 self.inner_context().reject_promise(key, failure)
903 }
904}
905
906impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanUsePromises> ContextPromises<'ctx>
907 for CTX
908{
909}
910
911mod private {
912 use super::*;
913
914 pub trait SealedContext<'ctx> {
915 fn inner_context(&self) -> &'ctx ContextInternal;
916
917 fn random_seed(&self) -> u64;
918
919 #[cfg(feature = "rand")]
920 fn rand(&mut self) -> &mut rand::prelude::StdRng;
921 }
922
923 // Context capabilities
924 pub trait SealedCanReadState {}
925 pub trait SealedCanWriteState {}
926 pub trait SealedCanUsePromises {}
927
928 impl<'ctx> SealedContext<'ctx> for Context<'ctx> {
929 fn inner_context(&self) -> &'ctx ContextInternal {
930 self.inner
931 }
932
933 fn random_seed(&self) -> u64 {
934 self.random_seed
935 }
936
937 #[cfg(feature = "rand")]
938 fn rand(&mut self) -> &mut rand::prelude::StdRng {
939 &mut self.std_rng
940 }
941 }
942
943 impl<'ctx> SealedContext<'ctx> for SharedObjectContext<'ctx> {
944 fn inner_context(&self) -> &'ctx ContextInternal {
945 self.inner
946 }
947
948 fn random_seed(&self) -> u64 {
949 self.random_seed
950 }
951
952 #[cfg(feature = "rand")]
953 fn rand(&mut self) -> &mut rand::prelude::StdRng {
954 &mut self.std_rng
955 }
956 }
957
958 impl SealedCanReadState for SharedObjectContext<'_> {}
959
960 impl<'ctx> SealedContext<'ctx> for ObjectContext<'ctx> {
961 fn inner_context(&self) -> &'ctx ContextInternal {
962 self.inner
963 }
964
965 fn random_seed(&self) -> u64 {
966 self.random_seed
967 }
968
969 #[cfg(feature = "rand")]
970 fn rand(&mut self) -> &mut rand::prelude::StdRng {
971 &mut self.std_rng
972 }
973 }
974
975 impl SealedCanReadState for ObjectContext<'_> {}
976 impl SealedCanWriteState for ObjectContext<'_> {}
977
978 impl<'ctx> SealedContext<'ctx> for SharedWorkflowContext<'ctx> {
979 fn inner_context(&self) -> &'ctx ContextInternal {
980 self.inner
981 }
982
983 fn random_seed(&self) -> u64 {
984 self.random_seed
985 }
986
987 #[cfg(feature = "rand")]
988 fn rand(&mut self) -> &mut rand::prelude::StdRng {
989 &mut self.std_rng
990 }
991 }
992
993 impl SealedCanReadState for SharedWorkflowContext<'_> {}
994 impl SealedCanUsePromises for SharedWorkflowContext<'_> {}
995
996 impl<'ctx> SealedContext<'ctx> for WorkflowContext<'ctx> {
997 fn inner_context(&self) -> &'ctx ContextInternal {
998 self.inner
999 }
1000
1001 fn random_seed(&self) -> u64 {
1002 self.random_seed
1003 }
1004
1005 #[cfg(feature = "rand")]
1006 fn rand(&mut self) -> &mut rand::prelude::StdRng {
1007 &mut self.std_rng
1008 }
1009 }
1010
1011 impl SealedCanReadState for WorkflowContext<'_> {}
1012 impl SealedCanWriteState for WorkflowContext<'_> {}
1013 impl SealedCanUsePromises for WorkflowContext<'_> {}
1014}