restate_sdk/context/
mod.rs

1//! Types exposing Restate functionalities to service handlers.
2
3use crate::endpoint::{ContextInternal, InputMetadata};
4use crate::errors::{HandlerResult, TerminalError};
5use crate::serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8
9#[doc(hidden)]
10pub mod macro_support;
11mod request;
12mod run;
13mod select;
14
15pub use request::{CallFuture, InvocationHandle, Request, RequestTarget};
16pub use run::{RunClosure, RunFuture, RunRetryPolicy};
17
18pub type HeaderMap = http::HeaderMap<String>;
19
20/// Service handler context.
21pub struct Context<'ctx> {
22    random_seed: u64,
23    #[cfg(feature = "rand")]
24    std_rng: rand::prelude::StdRng,
25    headers: HeaderMap,
26    inner: &'ctx ContextInternal,
27}
28
29impl<'ctx> Context<'ctx> {
30    /// Get request headers.
31    pub fn headers(&self) -> &HeaderMap {
32        &self.headers
33    }
34
35    /// Get request headers.
36    pub fn headers_mut(&mut self) -> &HeaderMap {
37        &mut self.headers
38    }
39}
40
41impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for Context<'ctx> {
42    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
43        Self {
44            random_seed: value.1.random_seed,
45            #[cfg(feature = "rand")]
46            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
47            headers: value.1.headers,
48            inner: value.0,
49        }
50    }
51}
52
53/// Object shared handler context.
54pub struct SharedObjectContext<'ctx> {
55    key: String,
56    random_seed: u64,
57    #[cfg(feature = "rand")]
58    std_rng: rand::prelude::StdRng,
59    headers: HeaderMap,
60    pub(crate) inner: &'ctx ContextInternal,
61}
62
63impl<'ctx> SharedObjectContext<'ctx> {
64    /// Get object key.
65    pub fn key(&self) -> &str {
66        &self.key
67    }
68
69    /// Get request headers.
70    pub fn headers(&self) -> &HeaderMap {
71        &self.headers
72    }
73
74    /// Get request headers.
75    pub fn headers_mut(&mut self) -> &HeaderMap {
76        &mut self.headers
77    }
78}
79
80impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedObjectContext<'ctx> {
81    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
82        Self {
83            key: value.1.key,
84            random_seed: value.1.random_seed,
85            #[cfg(feature = "rand")]
86            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
87            headers: value.1.headers,
88            inner: value.0,
89        }
90    }
91}
92
93/// Object handler context.
94pub struct ObjectContext<'ctx> {
95    key: String,
96    random_seed: u64,
97    #[cfg(feature = "rand")]
98    std_rng: rand::prelude::StdRng,
99    headers: HeaderMap,
100    pub(crate) inner: &'ctx ContextInternal,
101}
102
103impl<'ctx> ObjectContext<'ctx> {
104    /// Get object key.
105    pub fn key(&self) -> &str {
106        &self.key
107    }
108
109    /// Get request headers.
110    pub fn headers(&self) -> &HeaderMap {
111        &self.headers
112    }
113
114    /// Get request headers.
115    pub fn headers_mut(&mut self) -> &HeaderMap {
116        &mut self.headers
117    }
118}
119
120impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for ObjectContext<'ctx> {
121    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
122        Self {
123            key: value.1.key,
124            random_seed: value.1.random_seed,
125            #[cfg(feature = "rand")]
126            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
127            headers: value.1.headers,
128            inner: value.0,
129        }
130    }
131}
132
133/// Workflow shared handler context.
134pub struct SharedWorkflowContext<'ctx> {
135    key: String,
136    random_seed: u64,
137    #[cfg(feature = "rand")]
138    std_rng: rand::prelude::StdRng,
139    headers: HeaderMap,
140    pub(crate) inner: &'ctx ContextInternal,
141}
142
143impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedWorkflowContext<'ctx> {
144    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
145        Self {
146            key: value.1.key,
147            random_seed: value.1.random_seed,
148            #[cfg(feature = "rand")]
149            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
150            headers: value.1.headers,
151            inner: value.0,
152        }
153    }
154}
155
156impl<'ctx> SharedWorkflowContext<'ctx> {
157    /// Get workflow key.
158    pub fn key(&self) -> &str {
159        &self.key
160    }
161
162    /// Get request headers.
163    pub fn headers(&self) -> &HeaderMap {
164        &self.headers
165    }
166
167    /// Get request headers.
168    pub fn headers_mut(&mut self) -> &HeaderMap {
169        &mut self.headers
170    }
171}
172
173/// Workflow handler context.
174pub struct WorkflowContext<'ctx> {
175    key: String,
176    random_seed: u64,
177    #[cfg(feature = "rand")]
178    std_rng: rand::prelude::StdRng,
179    headers: HeaderMap,
180    pub(crate) inner: &'ctx ContextInternal,
181}
182
183impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for WorkflowContext<'ctx> {
184    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
185        Self {
186            key: value.1.key,
187            random_seed: value.1.random_seed,
188            #[cfg(feature = "rand")]
189            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
190            headers: value.1.headers,
191            inner: value.0,
192        }
193    }
194}
195
196impl<'ctx> WorkflowContext<'ctx> {
197    /// Get workflow key.
198    pub fn key(&self) -> &str {
199        &self.key
200    }
201
202    /// Get request headers.
203    pub fn headers(&self) -> &HeaderMap {
204        &self.headers
205    }
206
207    /// Get request headers.
208    pub fn headers_mut(&mut self) -> &HeaderMap {
209        &mut self.headers
210    }
211}
212
213///
214/// # Scheduling & Timers
215/// The Restate SDK includes durable timers.
216/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time.
217/// These timers are resilient to failures and restarts.
218/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts.
219///
220/// ## Scheduling Async Tasks
221///
222/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after).
223///
224/// ## Durable sleep
225/// To sleep in a Restate application for ten seconds, do the following:
226///
227/// ```rust,no_run
228/// # use restate_sdk::prelude::*;
229/// # use std::convert::Infallible;
230/// # use std::time::Duration;
231/// #
232/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
233/// ctx.sleep(Duration::from_secs(10)).await?;
234/// #    Ok(())
235/// # }
236/// ```
237///
238/// **Cost savings on FaaS**:
239/// Restate suspends the handler while it is sleeping, to free up resources.
240/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping.
241///
242/// **Sleeping in Virtual Objects**:
243/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping.
244///
245/// <details>
246/// <summary>Clock synchronization Restate Server vs. SDK</summary>
247///
248/// The Restate SDK calculates the wake-up time based on the delay you specify.
249/// The Restate Server then uses this calculated time to wake up the handler.
250/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate.
251/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized.
252/// </details>
253pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> {
254    /// Sleep using Restate
255    fn sleep(
256        &self,
257        duration: Duration,
258    ) -> impl DurableFuture<Output = Result<(), TerminalError>> + 'ctx {
259        private::SealedContext::inner_context(self).sleep(duration)
260    }
261}
262
263impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
264
265/// # Service Communication
266///
267/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response.
268///
269/// ## Request-response calls
270///
271/// Request-response calls are requests where the client waits for the response.
272///
273/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way:
274///
275/// ```rust,no_run
276/// # #[path = "../../examples/services/mod.rs"]
277/// # mod services;
278/// # use services::my_virtual_object::MyVirtualObjectClient;
279/// # use services::my_service::MyServiceClient;
280/// # use services::my_workflow::MyWorkflowClient;
281/// # use restate_sdk::prelude::*;
282/// #
283/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
284///     // To a Service:
285///     let service_response = ctx
286///         .service_client::<MyServiceClient>()
287///         .my_handler(String::from("Hi!"))
288///         .call()
289///         .await?;
290///
291///     // To a Virtual Object:
292///     let object_response = ctx
293///         .object_client::<MyVirtualObjectClient>("Mary")
294///         .my_handler(String::from("Hi!"))
295///         .call()
296///         .await?;
297///
298///     // To a Workflow:
299///     let workflow_result = ctx
300///         .workflow_client::<MyWorkflowClient>("my-workflow-id")
301///         .run(String::from("Hi!"))
302///         .call()
303///         .await?;
304///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
305///         .interact_with_workflow()
306///         .call()
307///         .await?;
308///  #    Ok(())
309///  # }
310/// ```
311///
312/// 1. **Create a service client**.
313///     - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`.
314///     - For Workflows, you need to supply a workflow ID that is unique per workflow execution.
315/// 2. **Specify the handler** you want to call and supply the request.
316/// 3. **Await** the call to retrieve the response.
317///
318/// **No need for manual retry logic**:
319/// Restate proxies all the calls and logs them in the journal.
320/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here.
321///
322/// ## Sending messages
323///
324/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows:
325///
326/// ```rust,no_run
327/// # #[path = "../../examples/services/mod.rs"]
328/// # mod services;
329/// # use services::my_virtual_object::MyVirtualObjectClient;
330/// # use services::my_service::MyServiceClient;
331/// # use services::my_workflow::MyWorkflowClient;
332/// # use restate_sdk::prelude::*;
333/// #
334/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
335///     // To a Service:
336///     ctx.service_client::<MyServiceClient>()
337///         .my_handler(String::from("Hi!"))
338///         .send();
339///
340///     // To a Virtual Object:
341///     ctx.object_client::<MyVirtualObjectClient>("Mary")
342///         .my_handler(String::from("Hi!"))
343///         .send();
344///
345///     // To a Workflow:
346///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
347///         .run(String::from("Hi!"))
348///         .send();
349///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
350///         .interact_with_workflow()
351///         .send();
352///  #    Ok(())
353///  # }
354/// ```
355///
356/// **No need for message queues**:
357/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery.
358/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed.
359///
360/// ## Delayed calls
361///
362/// A delayed call is a one-way call that gets executed after a specified delay.
363///
364/// To schedule a delayed call, send a message with a delay parameter, as follows:
365///
366/// ```rust,no_run
367/// # #[path = "../../examples/services/mod.rs"]
368/// # mod services;
369/// # use services::my_virtual_object::MyVirtualObjectClient;
370/// # use services::my_service::MyServiceClient;
371/// # use services::my_workflow::MyWorkflowClient;
372/// # use restate_sdk::prelude::*;
373/// # use std::time::Duration;
374/// #
375/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
376///     // To a Service:
377///     ctx.service_client::<MyServiceClient>()
378///         .my_handler(String::from("Hi!"))
379///         .send_after(Duration::from_millis(5000));
380///
381///     // To a Virtual Object:
382///     ctx.object_client::<MyVirtualObjectClient>("Mary")
383///         .my_handler(String::from("Hi!"))
384///         .send_after(Duration::from_millis(5000));
385///
386///     // To a Workflow:
387///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
388///         .run(String::from("Hi!"))
389///         .send_after(Duration::from_millis(5000));
390///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
391///         .interact_with_workflow()
392///         .send_after(Duration::from_millis(5000));
393///  #    Ok(())
394///  # }
395/// ```
396///
397/// You can also use this functionality to schedule async tasks.
398/// Restate will make sure the task gets executed at the desired time.
399///
400/// ### Ordering guarantees in Virtual Objects
401/// Invocations to a Virtual Object are executed serially.
402/// Invocations will execute in the same order in which they arrive at Restate.
403/// For example, assume a handler calls the same Virtual Object twice:
404///
405/// ```rust,no_run
406/// # #[path = "../../examples/services/my_virtual_object.rs"]
407/// # mod my_virtual_object;
408/// # use my_virtual_object::MyVirtualObjectClient;
409/// # use restate_sdk::prelude::*;
410/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
411///     ctx.object_client::<MyVirtualObjectClient>("Mary")
412///         .my_handler(String::from("I'm call A!"))
413///         .send();
414///     ctx.object_client::<MyVirtualObjectClient>("Mary")
415///         .my_handler(String::from("I'm call B!"))
416///         .send();
417/// #    Ok(())
418/// }
419/// ```
420///
421/// It is guaranteed that call A will execute before call B.
422/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls.
423///
424/// ### Deadlocks with Virtual Objects
425/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests.
426/// Some example cases:
427/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys.
428/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again.
429///
430/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations).
431///
432pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
433    /// Create a [`Request`].
434    fn request<Req, Res>(
435        &self,
436        request_target: RequestTarget,
437        req: Req,
438    ) -> Request<'ctx, Req, Res> {
439        Request::new(self.inner_context(), request_target, req)
440    }
441
442    /// Create an [`InvocationHandle`] from an invocation id.
443    fn invocation_handle(&self, invocation_id: String) -> impl InvocationHandle + 'ctx {
444        self.inner_context().invocation_handle(invocation_id)
445    }
446
447    /// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
448    ///
449    /// ```rust,no_run
450    /// # use std::time::Duration;
451    /// # use restate_sdk::prelude::*;
452    ///
453    /// #[restate_sdk::service]
454    /// trait MyService {
455    ///   async fn handle() -> HandlerResult<()>;
456    /// }
457    ///
458    /// # async fn handler(ctx: Context<'_>) {
459    /// let client = ctx.service_client::<MyServiceClient>();
460    ///
461    /// // Do request
462    /// let result = client.handle().call().await;
463    ///
464    /// // Just send the request, don't wait the response
465    /// client.handle().send();
466    ///
467    /// // Schedule the request to be executed later
468    /// client.handle().send_after(Duration::from_secs(60));
469    /// # }
470    /// ```
471    fn service_client<C>(&self) -> C
472    where
473        C: IntoServiceClient<'ctx>,
474    {
475        C::create_client(self.inner_context())
476    }
477
478    /// Create an object client. The object client is generated by the [`restate_sdk_macros::object`] macro with the same name of the trait suffixed with `Client`.
479    ///
480    /// ```rust,no_run
481    /// # use std::time::Duration;
482    /// # use restate_sdk::prelude::*;
483    ///
484    /// #[restate_sdk::object]
485    /// trait MyObject {
486    ///   async fn handle() -> HandlerResult<()>;
487    /// }
488    ///
489    /// # async fn handler(ctx: Context<'_>) {
490    /// let client = ctx.object_client::<MyObjectClient>("my-key");
491    ///
492    /// // Do request
493    /// let result = client.handle().call().await;
494    ///
495    /// // Just send the request, don't wait the response
496    /// client.handle().send();
497    ///
498    /// // Schedule the request to be executed later
499    /// client.handle().send_after(Duration::from_secs(60));
500    /// # }
501    /// ```
502    fn object_client<C>(&self, key: impl Into<String>) -> C
503    where
504        C: IntoObjectClient<'ctx>,
505    {
506        C::create_client(self.inner_context(), key.into())
507    }
508
509    /// Create an workflow client. The workflow client is generated by the [`restate_sdk_macros::workflow`] macro with the same name of the trait suffixed with `Client`.
510    ///
511    /// ```rust,no_run
512    /// # use std::time::Duration;
513    /// # use restate_sdk::prelude::*;
514    ///
515    /// #[restate_sdk::workflow]
516    /// trait MyWorkflow {
517    ///   async fn handle() -> HandlerResult<()>;
518    /// }
519    ///
520    /// # async fn handler(ctx: Context<'_>) {
521    /// let client = ctx.workflow_client::<MyWorkflowClient>("my-key");
522    ///
523    /// // Do request
524    /// let result = client.handle().call().await;
525    ///
526    /// // Just send the request, don't wait the response
527    /// client.handle().send();
528    ///
529    /// // Schedule the request to be executed later
530    /// client.handle().send_after(Duration::from_secs(60));
531    /// # }
532    /// ```
533    fn workflow_client<C>(&self, key: impl Into<String>) -> C
534    where
535        C: IntoWorkflowClient<'ctx>,
536    {
537        C::create_client(self.inner_context(), key.into())
538    }
539}
540
541/// Trait used by codegen to create the service client.
542pub trait IntoServiceClient<'ctx>: Sized {
543    fn create_client(ctx: &'ctx ContextInternal) -> Self;
544}
545
546/// Trait used by codegen to use the object client.
547pub trait IntoObjectClient<'ctx>: Sized {
548    fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
549}
550
551/// Trait used by codegen to use the workflow client.
552pub trait IntoWorkflowClient<'ctx>: Sized {
553    fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
554}
555
556impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {}
557
558/// # Awakeables
559///
560/// Awakeables pause an invocation while waiting for another process to complete a task.
561/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result.
562/// This pattern is also known as the callback (task token) pattern.
563///
564/// ## Creating awakeables
565///
566/// 1. **Create an awakeable**. This contains a String identifier and a Promise.
567/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...).
568///    For example, send an HTTP request to a service that executes the task, and attach the ID to the payload.
569///    You use `ctx.run` to avoid re-triggering the task on retries.
570/// 3. **Wait** until the other process has executed the task.
571///    The handler **receives the payload and resumes**.
572///
573/// ```rust,no_run
574/// # use restate_sdk::prelude::*;
575/// #
576/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
577/// /// 1. Create an awakeable
578/// let (id, promise) = ctx.awakeable::<String>();
579///
580/// /// 2. Trigger a task
581/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?;
582///
583/// /// 3. Wait for the promise to be resolved
584/// let payload = promise.await?;
585/// # Ok(())
586/// # }
587/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{
588/// #    Ok(())
589/// # }
590/// ```
591///
592///
593/// ## Completing awakeables
594///
595/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it
596/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler.
597///
598/// - Resolving over HTTP with its ID and an optional payload:
599///
600/// ```text
601/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve
602///     -H 'content-type: application/json'
603///     -d '{"hello": "world"}'
604/// ```
605///
606/// - Rejecting over HTTP with its ID and a reason:
607///
608/// ```text
609/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject
610///     -H 'content-type: text/plain' \
611///     -d 'Very bad error!'
612/// ```
613///
614/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason:
615///
616/// ```rust,no_run
617/// # use restate_sdk::prelude::*;
618/// #
619/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> {
620/// // Resolve the awakeable
621/// ctx.resolve_awakeable(&id, "hello".to_string());
622///
623/// // Or reject the awakeable
624/// ctx.reject_awakeable(&id, TerminalError::new("my error reason"));
625/// # Ok(())
626/// # }
627/// ```
628///
629/// For more info about serialization of the payload, see [crate::serde]
630///
631/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed.
632/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return.
633///
634/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved.
635pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
636    /// Create an awakeable
637    fn awakeable<T: Deserialize + 'static>(
638        &self,
639    ) -> (
640        String,
641        impl DurableFuture<Output = Result<T, TerminalError>> + Send + 'ctx,
642    ) {
643        self.inner_context().awakeable()
644    }
645
646    /// Resolve an awakeable
647    fn resolve_awakeable<T: Serialize + 'static>(&self, key: &str, t: T) {
648        self.inner_context().resolve_awakeable(key, t)
649    }
650
651    /// Resolve an awakeable
652    fn reject_awakeable(&self, key: &str, failure: TerminalError) {
653        self.inner_context().reject_awakeable(key, failure)
654    }
655}
656
657impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
658
659/// # Journaling Results
660///
661/// Restate uses an execution log for replay after failures and suspensions.
662/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log.
663/// The SDK offers some functionalities to help you with this:
664/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries.
665/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
666/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
667///
668pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
669    /// ## Journaled actions
670    /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc).
671    /// Restate replays the result instead of re-executing the operation on retries.
672    ///
673    /// Here is an example of a database request for which the string response is stored in Restate:
674    /// ```rust,no_run
675    /// # use restate_sdk::prelude::*;
676    /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
677    /// let response = ctx.run(|| do_db_request()).await?;
678    /// # Ok(())
679    /// # }
680    /// # async fn do_db_request() -> Result<String, HandlerError>{
681    /// # Ok("Hello".to_string())
682    /// # }
683    /// ```
684    ///
685    /// You cannot use the Restate context within `ctx.run`.
686    /// This includes actions such as getting state, calling another service, and nesting other journaled actions.
687    ///
688    /// For more info about serialization of the return values, see [crate::serde].
689    ///
690    /// You can configure the retry policy for the `ctx.run` block:
691    /// ```rust,no_run
692    /// # use std::time::Duration;
693    /// # use restate_sdk::prelude::*;
694    /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
695    /// let my_run_retry_policy = RunRetryPolicy::default()
696    ///     .initial_delay(Duration::from_millis(100))
697    ///     .exponentiation_factor(2.0)
698    ///     .max_delay(Duration::from_millis(1000))
699    ///     .max_attempts(10)
700    ///     .max_duration(Duration::from_secs(10));
701    /// ctx.run(|| write_to_other_system())
702    ///     .retry_policy(my_run_retry_policy)
703    ///     .await
704    ///     .map_err(|e| {
705    ///         // Handle the terminal error after retries exhausted
706    ///         // For example, undo previous actions (see sagas guide) and
707    ///         // propagate the error back to the caller
708    ///         e
709    ///      })?;
710    /// # Ok(())
711    /// # }
712    /// # async fn write_to_other_system() -> Result<String, HandlerError>{
713    /// # Ok("Hello".to_string())
714    /// # }
715    /// ```
716    ///
717    /// This way you can override the default retry behavior of your Restate service for specific operations.
718    /// Have a look at [`RunFuture::retry_policy`] for more information.
719    ///
720    /// If you set a maximum number of attempts, then the `ctx.run` block will fail with a [TerminalError] once the retries are exhausted.
721    /// Have a look at the [Sagas guide](https://docs.restate.dev/guides/sagas) to learn how to undo previous actions of the handler to keep the system in a consistent state.
722    ///
723    /// **Caution: Immediately await journaled actions:**
724    /// Always immediately await `ctx.run`, before doing any other context calls.
725    /// If not, you might bump into non-determinism errors during replay,
726    /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way.
727    ///
728    #[must_use]
729    fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
730    where
731        R: RunClosure<Fut = F, Output = T> + Send + 'ctx,
732        F: Future<Output = HandlerResult<T>> + Send + 'ctx,
733        T: Serialize + Deserialize + 'static,
734    {
735        self.inner_context().run(run_closure)
736    }
737
738    /// Return a random seed inherently predictable, based on the invocation id, which is not secret.
739    ///
740    /// This value is stable during the invocation lifecycle, thus across retries.
741    fn random_seed(&self) -> u64 {
742        private::SealedContext::random_seed(self)
743    }
744
745    /// ### Generating random numbers
746    ///
747    /// Return a [`rand::Rng`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`].
748    ///
749    /// For example, you can use this to generate a random number:
750    ///
751    /// ```rust,no_run
752    /// # use restate_sdk::prelude::*;
753    /// # use rand::Rng;
754    /// async fn rand_generate(mut ctx: Context<'_>) {
755    /// let x: u32 = ctx.rand().gen();
756    /// # }
757    /// ```
758    ///
759    /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options.
760    /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`].
761    #[cfg(feature = "rand")]
762    fn rand(&mut self) -> &mut rand::prelude::StdRng {
763        private::SealedContext::rand(self)
764    }
765    /// ### Generating UUIDs
766    ///
767    /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`].
768    ///
769    /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries.
770    ///
771    /// Do not use this in cryptographic contexts.
772    ///
773    /// ```rust,no_run
774    /// # use restate_sdk::prelude::*;
775    /// # use uuid::Uuid;
776    /// # async fn uuid_generate(mut ctx: Context<'_>) {
777    /// let uuid: Uuid = ctx.rand_uuid();
778    /// # }
779    /// ```
780    #[cfg(all(feature = "rand", feature = "uuid"))]
781    fn rand_uuid(&mut self) -> uuid::Uuid {
782        let rand = private::SealedContext::rand(self);
783        uuid::Uuid::from_u64_pair(rand::RngCore::next_u64(rand), rand::RngCore::next_u64(rand))
784    }
785}
786
787impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {}
788
789/// # Reading state
790/// You can store key-value state in Restate.
791/// Restate makes sure the state is consistent with the processing of the code execution.
792///
793/// **This feature is only available for Virtual Objects and Workflows:**
794/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
795/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
796///
797/// ```rust,no_run
798/// # use restate_sdk::prelude::*;
799/// #
800/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
801///     /// 1. Listing state keys
802///     /// List all the state keys that have entries in the state store for this object, via:
803///     let keys = ctx.get_keys().await?;
804///
805///     /// 2. Getting a state value
806///     let my_string = ctx
807///         .get::<String>("my-key")
808///         .await?
809///         .unwrap_or(String::from("my-default"));
810///     let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
811///
812///     /// 3. Setting a state value
813///     ctx.set("my-key", String::from("my-value"));
814///
815///     /// 4. Clearing a state value
816///     ctx.clear("my-key");
817///
818///     /// 5. Clearing all state values
819///     /// Deletes all the state in Restate for the object
820///     ctx.clear_all();
821/// #    Ok(())
822/// # }
823/// ```
824///
825/// For more info about serialization, see [crate::serde]
826///
827/// ### Command-line introspection
828/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
829/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
830///
831pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> {
832    /// Get state
833    fn get<T: Deserialize + 'static>(
834        &self,
835        key: &str,
836    ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
837        self.inner_context().get(key)
838    }
839
840    /// Get state keys
841    fn get_keys(&self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + 'ctx {
842        self.inner_context().get_keys()
843    }
844}
845
846impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> ContextReadState<'ctx>
847    for CTX
848{
849}
850
851/// # Writing State
852/// You can store key-value state in Restate.
853/// Restate makes sure the state is consistent with the processing of the code execution.
854///
855/// **This feature is only available for Virtual Objects and Workflows:**
856/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
857/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
858///
859/// ```rust,no_run
860/// # use restate_sdk::prelude::*;
861/// #
862/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
863///     /// 1. Listing state keys
864///     /// List all the state keys that have entries in the state store for this object, via:
865///     let keys = ctx.get_keys().await?;
866///
867///     /// 2. Getting a state value
868///     let my_string = ctx
869///         .get::<String>("my-key")
870///         .await?
871///         .unwrap_or(String::from("my-default"));
872///     let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
873///
874///     /// 3. Setting a state value
875///     ctx.set("my-key", String::from("my-value"));
876///
877///     /// 4. Clearing a state value
878///     ctx.clear("my-key");
879///
880///     /// 5. Clearing all state values
881///     /// Deletes all the state in Restate for the object
882///     ctx.clear_all();
883/// #    Ok(())
884/// # }
885/// ```
886///
887/// For more info about serialization, see [crate::serde]
888///
889/// ## Command-line introspection
890/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
891/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
892///
893pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> {
894    /// Set state
895    fn set<T: Serialize + 'static>(&self, key: &str, t: T) {
896        self.inner_context().set(key, t)
897    }
898
899    /// Clear state
900    fn clear(&self, key: &str) {
901        self.inner_context().clear(key)
902    }
903
904    /// Clear all state
905    fn clear_all(&self) {
906        self.inner_context().clear_all()
907    }
908}
909
910impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanWriteState> ContextWriteState<'ctx>
911    for CTX
912{
913}
914
915/// Trait exposing Restate promise functionalities.
916///
917/// A promise is a durable, distributed version of a Rust oneshot channel.
918/// Restate keeps track of the promises across restarts/failures.
919///
920/// You can use this feature to implement interaction between different workflow handlers, e.g. to
921/// send a signal from a shared handler to the workflow handler.
922pub trait ContextPromises<'ctx>: private::SealedContext<'ctx> {
923    /// Create a promise
924    fn promise<T: Deserialize + 'static>(
925        &'ctx self,
926        key: &str,
927    ) -> impl DurableFuture<Output = Result<T, TerminalError>> + 'ctx {
928        self.inner_context().promise(key)
929    }
930
931    /// Peek a promise
932    fn peek_promise<T: Deserialize + 'static>(
933        &self,
934        key: &str,
935    ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
936        self.inner_context().peek_promise(key)
937    }
938
939    /// Resolve a promise
940    fn resolve_promise<T: Serialize + 'static>(&self, key: &str, t: T) {
941        self.inner_context().resolve_promise(key, t)
942    }
943
944    /// Resolve a promise
945    fn reject_promise(&self, key: &str, failure: TerminalError) {
946        self.inner_context().reject_promise(key, failure)
947    }
948}
949
950impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanUsePromises> ContextPromises<'ctx>
951    for CTX
952{
953}
954
955pub trait DurableFuture: Future + macro_support::SealedDurableFuture {}
956
957pub(crate) mod private {
958    use super::*;
959    pub trait SealedContext<'ctx> {
960        fn inner_context(&self) -> &'ctx ContextInternal;
961
962        fn random_seed(&self) -> u64;
963
964        #[cfg(feature = "rand")]
965        fn rand(&mut self) -> &mut rand::prelude::StdRng;
966    }
967
968    // Context capabilities
969    pub trait SealedCanReadState {}
970    pub trait SealedCanWriteState {}
971    pub trait SealedCanUsePromises {}
972
973    impl<'ctx> SealedContext<'ctx> for Context<'ctx> {
974        fn inner_context(&self) -> &'ctx ContextInternal {
975            self.inner
976        }
977
978        fn random_seed(&self) -> u64 {
979            self.random_seed
980        }
981
982        #[cfg(feature = "rand")]
983        fn rand(&mut self) -> &mut rand::prelude::StdRng {
984            &mut self.std_rng
985        }
986    }
987
988    impl<'ctx> SealedContext<'ctx> for SharedObjectContext<'ctx> {
989        fn inner_context(&self) -> &'ctx ContextInternal {
990            self.inner
991        }
992
993        fn random_seed(&self) -> u64 {
994            self.random_seed
995        }
996
997        #[cfg(feature = "rand")]
998        fn rand(&mut self) -> &mut rand::prelude::StdRng {
999            &mut self.std_rng
1000        }
1001    }
1002
1003    impl SealedCanReadState for SharedObjectContext<'_> {}
1004
1005    impl<'ctx> SealedContext<'ctx> for ObjectContext<'ctx> {
1006        fn inner_context(&self) -> &'ctx ContextInternal {
1007            self.inner
1008        }
1009
1010        fn random_seed(&self) -> u64 {
1011            self.random_seed
1012        }
1013
1014        #[cfg(feature = "rand")]
1015        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1016            &mut self.std_rng
1017        }
1018    }
1019
1020    impl SealedCanReadState for ObjectContext<'_> {}
1021    impl SealedCanWriteState for ObjectContext<'_> {}
1022
1023    impl<'ctx> SealedContext<'ctx> for SharedWorkflowContext<'ctx> {
1024        fn inner_context(&self) -> &'ctx ContextInternal {
1025            self.inner
1026        }
1027
1028        fn random_seed(&self) -> u64 {
1029            self.random_seed
1030        }
1031
1032        #[cfg(feature = "rand")]
1033        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1034            &mut self.std_rng
1035        }
1036    }
1037
1038    impl SealedCanReadState for SharedWorkflowContext<'_> {}
1039    impl SealedCanUsePromises for SharedWorkflowContext<'_> {}
1040
1041    impl<'ctx> SealedContext<'ctx> for WorkflowContext<'ctx> {
1042        fn inner_context(&self) -> &'ctx ContextInternal {
1043            self.inner
1044        }
1045
1046        fn random_seed(&self) -> u64 {
1047            self.random_seed
1048        }
1049
1050        #[cfg(feature = "rand")]
1051        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1052            &mut self.std_rng
1053        }
1054    }
1055
1056    impl SealedCanReadState for WorkflowContext<'_> {}
1057    impl SealedCanWriteState for WorkflowContext<'_> {}
1058    impl SealedCanUsePromises for WorkflowContext<'_> {}
1059}