Skip to main content

restate_sdk/context/
mod.rs

1//! Types exposing Restate functionalities to service handlers.
2
3use crate::endpoint::{ContextInternal, InputMetadata};
4use crate::errors::{HandlerResult, TerminalError};
5use crate::serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8
9#[doc(hidden)]
10pub mod macro_support;
11mod request;
12mod run;
13mod select;
14
15pub use request::{CallFuture, InvocationHandle, Request, RequestTarget};
16pub use run::{RunClosure, RunFuture, RunRetryPolicy};
17
18pub type HeaderMap = http::HeaderMap<String>;
19
20/// Service handler context.
21pub struct Context<'ctx> {
22    invocation_id: String,
23    random_seed: u64,
24    #[cfg(feature = "rand")]
25    std_rng: rand::prelude::StdRng,
26    headers: HeaderMap,
27    inner: &'ctx ContextInternal,
28}
29
30impl Context<'_> {
31    /// Get invocation id.
32    pub fn invocation_id(&self) -> &str {
33        &self.invocation_id
34    }
35
36    /// Get request headers.
37    pub fn headers(&self) -> &HeaderMap {
38        &self.headers
39    }
40
41    /// Get request headers.
42    pub fn headers_mut(&mut self) -> &HeaderMap {
43        &mut self.headers
44    }
45}
46
47impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for Context<'ctx> {
48    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
49        Self {
50            invocation_id: value.1.invocation_id,
51            random_seed: value.1.random_seed,
52            #[cfg(feature = "rand")]
53            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
54            headers: value.1.headers,
55            inner: value.0,
56        }
57    }
58}
59
60/// Object shared handler context.
61pub struct SharedObjectContext<'ctx> {
62    invocation_id: String,
63    key: String,
64    random_seed: u64,
65    #[cfg(feature = "rand")]
66    std_rng: rand::prelude::StdRng,
67    headers: HeaderMap,
68    pub(crate) inner: &'ctx ContextInternal,
69}
70
71impl SharedObjectContext<'_> {
72    /// Get invocation id.
73    pub fn invocation_id(&self) -> &str {
74        &self.invocation_id
75    }
76
77    /// Get object key.
78    pub fn key(&self) -> &str {
79        &self.key
80    }
81
82    /// Get request headers.
83    pub fn headers(&self) -> &HeaderMap {
84        &self.headers
85    }
86
87    /// Get request headers.
88    pub fn headers_mut(&mut self) -> &HeaderMap {
89        &mut self.headers
90    }
91}
92
93impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedObjectContext<'ctx> {
94    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
95        Self {
96            invocation_id: value.1.invocation_id,
97            key: value.1.key,
98            random_seed: value.1.random_seed,
99            #[cfg(feature = "rand")]
100            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
101            headers: value.1.headers,
102            inner: value.0,
103        }
104    }
105}
106
107/// Object handler context.
108pub struct ObjectContext<'ctx> {
109    invocation_id: String,
110    key: String,
111    random_seed: u64,
112    #[cfg(feature = "rand")]
113    std_rng: rand::prelude::StdRng,
114    headers: HeaderMap,
115    pub(crate) inner: &'ctx ContextInternal,
116}
117
118impl ObjectContext<'_> {
119    /// Get invocation id.
120    pub fn invocation_id(&self) -> &str {
121        &self.invocation_id
122    }
123
124    /// Get object key.
125    pub fn key(&self) -> &str {
126        &self.key
127    }
128
129    /// Get request headers.
130    pub fn headers(&self) -> &HeaderMap {
131        &self.headers
132    }
133
134    /// Get request headers.
135    pub fn headers_mut(&mut self) -> &HeaderMap {
136        &mut self.headers
137    }
138}
139
140impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for ObjectContext<'ctx> {
141    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
142        Self {
143            invocation_id: value.1.invocation_id,
144            key: value.1.key,
145            random_seed: value.1.random_seed,
146            #[cfg(feature = "rand")]
147            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
148            headers: value.1.headers,
149            inner: value.0,
150        }
151    }
152}
153
154/// Workflow shared handler context.
155pub struct SharedWorkflowContext<'ctx> {
156    invocation_id: String,
157    key: String,
158    random_seed: u64,
159    #[cfg(feature = "rand")]
160    std_rng: rand::prelude::StdRng,
161    headers: HeaderMap,
162    pub(crate) inner: &'ctx ContextInternal,
163}
164
165impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedWorkflowContext<'ctx> {
166    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
167        Self {
168            invocation_id: value.1.invocation_id,
169            key: value.1.key,
170            random_seed: value.1.random_seed,
171            #[cfg(feature = "rand")]
172            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
173            headers: value.1.headers,
174            inner: value.0,
175        }
176    }
177}
178
179impl SharedWorkflowContext<'_> {
180    /// Get invocation id.
181    pub fn invocation_id(&self) -> &str {
182        &self.invocation_id
183    }
184
185    /// Get workflow key.
186    pub fn key(&self) -> &str {
187        &self.key
188    }
189
190    /// Get request headers.
191    pub fn headers(&self) -> &HeaderMap {
192        &self.headers
193    }
194
195    /// Get request headers.
196    pub fn headers_mut(&mut self) -> &HeaderMap {
197        &mut self.headers
198    }
199}
200
201/// Workflow handler context.
202pub struct WorkflowContext<'ctx> {
203    invocation_id: String,
204    key: String,
205    random_seed: u64,
206    #[cfg(feature = "rand")]
207    std_rng: rand::prelude::StdRng,
208    headers: HeaderMap,
209    pub(crate) inner: &'ctx ContextInternal,
210}
211
212impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for WorkflowContext<'ctx> {
213    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
214        Self {
215            invocation_id: value.1.invocation_id,
216            key: value.1.key,
217            random_seed: value.1.random_seed,
218            #[cfg(feature = "rand")]
219            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
220            headers: value.1.headers,
221            inner: value.0,
222        }
223    }
224}
225
226impl WorkflowContext<'_> {
227    /// Get invocation id.
228    pub fn invocation_id(&self) -> &str {
229        &self.invocation_id
230    }
231
232    /// Get workflow key.
233    pub fn key(&self) -> &str {
234        &self.key
235    }
236
237    /// Get request headers.
238    pub fn headers(&self) -> &HeaderMap {
239        &self.headers
240    }
241
242    /// Get request headers.
243    pub fn headers_mut(&mut self) -> &HeaderMap {
244        &mut self.headers
245    }
246}
247
248///
249/// # Scheduling & Timers
250/// The Restate SDK includes durable timers.
251/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time.
252/// These timers are resilient to failures and restarts.
253/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts.
254///
255/// ## Scheduling Async Tasks
256///
257/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after).
258///
259/// ## Durable sleep
260/// To sleep in a Restate application for ten seconds, do the following:
261///
262/// ```rust,no_run
263/// # use restate_sdk::prelude::*;
264/// # use std::convert::Infallible;
265/// # use std::time::Duration;
266/// #
267/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
268/// ctx.sleep(Duration::from_secs(10)).await?;
269/// #    Ok(())
270/// # }
271/// ```
272///
273/// **Cost savings on FaaS**:
274/// Restate suspends the handler while it is sleeping, to free up resources.
275/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping.
276///
277/// **Sleeping in Virtual Objects**:
278/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping.
279///
280/// <details>
281/// <summary>Clock synchronization Restate Server vs. SDK</summary>
282///
283/// The Restate SDK calculates the wake-up time based on the delay you specify.
284/// The Restate Server then uses this calculated time to wake up the handler.
285/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate.
286/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized.
287/// </details>
288pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> {
289    /// Sleep using Restate
290    fn sleep(
291        &self,
292        duration: Duration,
293    ) -> impl DurableFuture<Output = Result<(), TerminalError>> + 'ctx {
294        private::SealedContext::inner_context(self).sleep(duration)
295    }
296}
297
298impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
299
300/// # Service Communication
301///
302/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response.
303///
304/// ## Request-response calls
305///
306/// Request-response calls are requests where the client waits for the response.
307///
308/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way:
309///
310/// ```rust,no_run
311/// # #[path = "../../examples/services/mod.rs"]
312/// # mod services;
313/// # use services::my_virtual_object::MyVirtualObjectClient;
314/// # use services::my_service::MyServiceClient;
315/// # use services::my_workflow::MyWorkflowClient;
316/// # use restate_sdk::prelude::*;
317/// #
318/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
319///     // To a Service:
320///     let service_response = ctx
321///         .service_client::<MyServiceClient>()
322///         .my_handler(String::from("Hi!"))
323///         .call()
324///         .await?;
325///
326///     // To a Virtual Object:
327///     let object_response = ctx
328///         .object_client::<MyVirtualObjectClient>("Mary")
329///         .my_handler(String::from("Hi!"))
330///         .call()
331///         .await?;
332///
333///     // To a Workflow:
334///     let workflow_result = ctx
335///         .workflow_client::<MyWorkflowClient>("my-workflow-id")
336///         .run(String::from("Hi!"))
337///         .call()
338///         .await?;
339///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
340///         .interact_with_workflow()
341///         .call()
342///         .await?;
343///  #    Ok(())
344///  # }
345/// ```
346///
347/// 1. **Create a service client**.
348///     - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`.
349///     - For Workflows, you need to supply a workflow ID that is unique per workflow execution.
350/// 2. **Specify the handler** you want to call and supply the request.
351/// 3. **Await** the call to retrieve the response.
352///
353/// **No need for manual retry logic**:
354/// Restate proxies all the calls and logs them in the journal.
355/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here.
356///
357/// ## Sending messages
358///
359/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows:
360///
361/// ```rust,no_run
362/// # #[path = "../../examples/services/mod.rs"]
363/// # mod services;
364/// # use services::my_virtual_object::MyVirtualObjectClient;
365/// # use services::my_service::MyServiceClient;
366/// # use services::my_workflow::MyWorkflowClient;
367/// # use restate_sdk::prelude::*;
368/// #
369/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
370///     // To a Service:
371///     ctx.service_client::<MyServiceClient>()
372///         .my_handler(String::from("Hi!"))
373///         .send();
374///
375///     // To a Virtual Object:
376///     ctx.object_client::<MyVirtualObjectClient>("Mary")
377///         .my_handler(String::from("Hi!"))
378///         .send();
379///
380///     // To a Workflow:
381///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
382///         .run(String::from("Hi!"))
383///         .send();
384///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
385///         .interact_with_workflow()
386///         .send();
387///  #    Ok(())
388///  # }
389/// ```
390///
391/// **No need for message queues**:
392/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery.
393/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed.
394///
395/// ## Delayed calls
396///
397/// A delayed call is a one-way call that gets executed after a specified delay.
398///
399/// To schedule a delayed call, send a message with a delay parameter, as follows:
400///
401/// ```rust,no_run
402/// # #[path = "../../examples/services/mod.rs"]
403/// # mod services;
404/// # use services::my_virtual_object::MyVirtualObjectClient;
405/// # use services::my_service::MyServiceClient;
406/// # use services::my_workflow::MyWorkflowClient;
407/// # use restate_sdk::prelude::*;
408/// # use std::time::Duration;
409/// #
410/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
411///     // To a Service:
412///     ctx.service_client::<MyServiceClient>()
413///         .my_handler(String::from("Hi!"))
414///         .send_after(Duration::from_millis(5000));
415///
416///     // To a Virtual Object:
417///     ctx.object_client::<MyVirtualObjectClient>("Mary")
418///         .my_handler(String::from("Hi!"))
419///         .send_after(Duration::from_millis(5000));
420///
421///     // To a Workflow:
422///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
423///         .run(String::from("Hi!"))
424///         .send_after(Duration::from_millis(5000));
425///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
426///         .interact_with_workflow()
427///         .send_after(Duration::from_millis(5000));
428///  #    Ok(())
429///  # }
430/// ```
431///
432/// You can also use this functionality to schedule async tasks.
433/// Restate will make sure the task gets executed at the desired time.
434///
435/// ### Ordering guarantees in Virtual Objects
436/// Invocations to a Virtual Object are executed serially.
437/// Invocations will execute in the same order in which they arrive at Restate.
438/// For example, assume a handler calls the same Virtual Object twice:
439///
440/// ```rust,no_run
441/// # #[path = "../../examples/services/my_virtual_object.rs"]
442/// # mod my_virtual_object;
443/// # use my_virtual_object::MyVirtualObjectClient;
444/// # use restate_sdk::prelude::*;
445/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
446///     ctx.object_client::<MyVirtualObjectClient>("Mary")
447///         .my_handler(String::from("I'm call A!"))
448///         .send();
449///     ctx.object_client::<MyVirtualObjectClient>("Mary")
450///         .my_handler(String::from("I'm call B!"))
451///         .send();
452/// #    Ok(())
453/// }
454/// ```
455///
456/// It is guaranteed that call A will execute before call B.
457/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls.
458///
459/// ### Deadlocks with Virtual Objects
460/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests.
461/// Some example cases:
462/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys.
463/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again.
464///
465/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations).
466///
467pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
468    /// Create a [`Request`].
469    fn request<Req, Res>(
470        &self,
471        request_target: RequestTarget,
472        req: Req,
473    ) -> Request<'ctx, Req, Res> {
474        Request::new(self.inner_context(), request_target, req)
475    }
476
477    /// Create an [`InvocationHandle`] from an invocation id.
478    fn invocation_handle(&self, invocation_id: String) -> impl InvocationHandle + 'ctx {
479        self.inner_context().invocation_handle(invocation_id)
480    }
481
482    /// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
483    ///
484    /// ```rust,no_run
485    /// # use std::time::Duration;
486    /// # use restate_sdk::prelude::*;
487    ///
488    /// #[restate_sdk::service]
489    /// trait MyService {
490    ///   async fn handle() -> HandlerResult<()>;
491    /// }
492    ///
493    /// # async fn handler(ctx: Context<'_>) {
494    /// let client = ctx.service_client::<MyServiceClient>();
495    ///
496    /// // Do request
497    /// let result = client.handle().call().await;
498    ///
499    /// // Just send the request, don't wait the response
500    /// client.handle().send();
501    ///
502    /// // Schedule the request to be executed later
503    /// client.handle().send_after(Duration::from_secs(60));
504    /// # }
505    /// ```
506    fn service_client<C>(&self) -> C
507    where
508        C: IntoServiceClient<'ctx>,
509    {
510        C::create_client(self.inner_context())
511    }
512
513    /// Create an object client. The object client is generated by the [`restate_sdk_macros::object`] macro with the same name of the trait suffixed with `Client`.
514    ///
515    /// ```rust,no_run
516    /// # use std::time::Duration;
517    /// # use restate_sdk::prelude::*;
518    ///
519    /// #[restate_sdk::object]
520    /// trait MyObject {
521    ///   async fn handle() -> HandlerResult<()>;
522    /// }
523    ///
524    /// # async fn handler(ctx: Context<'_>) {
525    /// let client = ctx.object_client::<MyObjectClient>("my-key");
526    ///
527    /// // Do request
528    /// let result = client.handle().call().await;
529    ///
530    /// // Just send the request, don't wait the response
531    /// client.handle().send();
532    ///
533    /// // Schedule the request to be executed later
534    /// client.handle().send_after(Duration::from_secs(60));
535    /// # }
536    /// ```
537    fn object_client<C>(&self, key: impl Into<String>) -> C
538    where
539        C: IntoObjectClient<'ctx>,
540    {
541        C::create_client(self.inner_context(), key.into())
542    }
543
544    /// Create an workflow client. The workflow client is generated by the [`restate_sdk_macros::workflow`] macro with the same name of the trait suffixed with `Client`.
545    ///
546    /// ```rust,no_run
547    /// # use std::time::Duration;
548    /// # use restate_sdk::prelude::*;
549    ///
550    /// #[restate_sdk::workflow]
551    /// trait MyWorkflow {
552    ///   async fn handle() -> HandlerResult<()>;
553    /// }
554    ///
555    /// # async fn handler(ctx: Context<'_>) {
556    /// let client = ctx.workflow_client::<MyWorkflowClient>("my-key");
557    ///
558    /// // Do request
559    /// let result = client.handle().call().await;
560    ///
561    /// // Just send the request, don't wait the response
562    /// client.handle().send();
563    ///
564    /// // Schedule the request to be executed later
565    /// client.handle().send_after(Duration::from_secs(60));
566    /// # }
567    /// ```
568    fn workflow_client<C>(&self, key: impl Into<String>) -> C
569    where
570        C: IntoWorkflowClient<'ctx>,
571    {
572        C::create_client(self.inner_context(), key.into())
573    }
574}
575
576/// Trait used by codegen to create the service client.
577pub trait IntoServiceClient<'ctx>: Sized {
578    fn create_client(ctx: &'ctx ContextInternal) -> Self;
579}
580
581/// Trait used by codegen to use the object client.
582pub trait IntoObjectClient<'ctx>: Sized {
583    fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
584}
585
586/// Trait used by codegen to use the workflow client.
587pub trait IntoWorkflowClient<'ctx>: Sized {
588    fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
589}
590
591impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {}
592
593/// # Awakeables
594///
595/// Awakeables pause an invocation while waiting for another process to complete a task.
596/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result.
597/// This pattern is also known as the callback (task token) pattern.
598///
599/// ## Creating awakeables
600///
601/// 1. **Create an awakeable**. This contains a String identifier and a Promise.
602/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...).
603///    For example, send an HTTP request to a service that executes the task, and attach the ID to the payload.
604///    You use `ctx.run` to avoid re-triggering the task on retries.
605/// 3. **Wait** until the other process has executed the task.
606///    The handler **receives the payload and resumes**.
607///
608/// ```rust,no_run
609/// # use restate_sdk::prelude::*;
610/// #
611/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
612/// /// 1. Create an awakeable
613/// let (id, promise) = ctx.awakeable::<String>();
614///
615/// /// 2. Trigger a task
616/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?;
617///
618/// /// 3. Wait for the promise to be resolved
619/// let payload = promise.await?;
620/// # Ok(())
621/// # }
622/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{
623/// #    Ok(())
624/// # }
625/// ```
626///
627///
628/// ## Completing awakeables
629///
630/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it
631/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler.
632///
633/// - Resolving over HTTP with its ID and an optional payload:
634///
635/// ```text
636/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve
637///     -H 'content-type: application/json'
638///     -d '{"hello": "world"}'
639/// ```
640///
641/// - Rejecting over HTTP with its ID and a reason:
642///
643/// ```text
644/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject
645///     -H 'content-type: text/plain' \
646///     -d 'Very bad error!'
647/// ```
648///
649/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason:
650///
651/// ```rust,no_run
652/// # use restate_sdk::prelude::*;
653/// #
654/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> {
655/// // Resolve the awakeable
656/// ctx.resolve_awakeable(&id, "hello".to_string());
657///
658/// // Or reject the awakeable
659/// ctx.reject_awakeable(&id, TerminalError::new("my error reason"));
660/// # Ok(())
661/// # }
662/// ```
663///
664/// For more info about serialization of the payload, see [crate::serde]
665///
666/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed.
667/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return.
668///
669/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved.
670pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
671    /// Create an awakeable
672    fn awakeable<T: Deserialize + 'static>(
673        &self,
674    ) -> (
675        String,
676        impl DurableFuture<Output = Result<T, TerminalError>> + Send + 'ctx,
677    ) {
678        self.inner_context().awakeable()
679    }
680
681    /// Resolve an awakeable
682    fn resolve_awakeable<T: Serialize + 'static>(&self, key: &str, t: T) {
683        self.inner_context().resolve_awakeable(key, t)
684    }
685
686    /// Resolve an awakeable
687    fn reject_awakeable(&self, key: &str, failure: TerminalError) {
688        self.inner_context().reject_awakeable(key, failure)
689    }
690}
691
692impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
693
694/// # Journaling Results
695///
696/// Restate uses an execution log for replay after failures and suspensions.
697/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log.
698/// The SDK offers some functionalities to help you with this:
699/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries.
700/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
701/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
702///
703pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
704    /// ## Journaled actions
705    /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc).
706    /// Restate replays the result instead of re-executing the operation on retries.
707    ///
708    /// Here is an example of a database request for which the string response is stored in Restate:
709    /// ```rust,no_run
710    /// # use restate_sdk::prelude::*;
711    /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
712    /// let response = ctx.run(|| do_db_request()).await?;
713    /// # Ok(())
714    /// # }
715    /// # async fn do_db_request() -> Result<String, HandlerError>{
716    /// # Ok("Hello".to_string())
717    /// # }
718    /// ```
719    ///
720    /// You cannot use the Restate context within `ctx.run`.
721    /// This includes actions such as getting state, calling another service, and nesting other journaled actions.
722    ///
723    /// For more info about serialization of the return values, see [crate::serde].
724    ///
725    /// You can configure the retry policy for the `ctx.run` block:
726    /// ```rust,no_run
727    /// # use std::time::Duration;
728    /// # use restate_sdk::prelude::*;
729    /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
730    /// let my_run_retry_policy = RunRetryPolicy::default()
731    ///     .initial_delay(Duration::from_millis(100))
732    ///     .exponentiation_factor(2.0)
733    ///     .max_delay(Duration::from_millis(1000))
734    ///     .max_attempts(10)
735    ///     .max_duration(Duration::from_secs(10));
736    /// ctx.run(|| write_to_other_system())
737    ///     .retry_policy(my_run_retry_policy)
738    ///     .await
739    ///     .map_err(|e| {
740    ///         // Handle the terminal error after retries exhausted
741    ///         // For example, undo previous actions (see sagas guide) and
742    ///         // propagate the error back to the caller
743    ///         e
744    ///      })?;
745    /// # Ok(())
746    /// # }
747    /// # async fn write_to_other_system() -> Result<String, HandlerError>{
748    /// # Ok("Hello".to_string())
749    /// # }
750    /// ```
751    ///
752    /// This way you can override the default retry behavior of your Restate service for specific operations.
753    /// Have a look at [`RunFuture::retry_policy`] for more information.
754    ///
755    /// If you set a maximum number of attempts, then the `ctx.run` block will fail with a [TerminalError] once the retries are exhausted.
756    /// Have a look at the [Sagas guide](https://docs.restate.dev/guides/sagas) to learn how to undo previous actions of the handler to keep the system in a consistent state.
757    ///
758    /// **Caution: Immediately await journaled actions:**
759    /// Always immediately await `ctx.run`, before doing any other context calls.
760    /// If not, you might bump into non-determinism errors during replay,
761    /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way.
762    ///
763    #[must_use]
764    fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
765    where
766        R: RunClosure<Fut = F, Output = T> + Send + 'ctx,
767        F: Future<Output = HandlerResult<T>> + Send + 'ctx,
768        T: Serialize + Deserialize + 'static,
769    {
770        self.inner_context().run(run_closure)
771    }
772
773    /// Return a random seed inherently predictable, based on the invocation id, which is not secret.
774    ///
775    /// This value is stable during the invocation lifecycle, thus across retries.
776    fn random_seed(&self) -> u64 {
777        private::SealedContext::random_seed(self)
778    }
779
780    /// ### Generating random numbers
781    ///
782    /// Return a [`rand::RngExt`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`].
783    ///
784    /// For example, you can use this to generate a random number:
785    ///
786    /// ```rust,no_run
787    /// # use restate_sdk::prelude::*;
788    /// # use rand::RngExt;
789    /// async fn rand_generate(mut ctx: Context<'_>) {
790    /// let x: u32 = ctx.rand().random();
791    /// # }
792    /// ```
793    ///
794    /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options.
795    /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`].
796    #[cfg(feature = "rand")]
797    fn rand(&mut self) -> &mut rand::prelude::StdRng {
798        private::SealedContext::rand(self)
799    }
800    /// ### Generating UUIDs
801    ///
802    /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`].
803    ///
804    /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries.
805    ///
806    /// Do not use this in cryptographic contexts.
807    ///
808    /// ```rust,no_run
809    /// # use restate_sdk::prelude::*;
810    /// # use uuid::Uuid;
811    /// # async fn uuid_generate(mut ctx: Context<'_>) {
812    /// let uuid: Uuid = ctx.rand_uuid();
813    /// # }
814    /// ```
815    #[cfg(all(feature = "rand", feature = "uuid"))]
816    fn rand_uuid(&mut self) -> uuid::Uuid {
817        let rand = private::SealedContext::rand(self);
818        uuid::Uuid::from_u64_pair(rand::Rng::next_u64(rand), rand::Rng::next_u64(rand))
819    }
820}
821
822impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {}
823
824/// # Reading state
825/// You can store key-value state in Restate.
826/// Restate makes sure the state is consistent with the processing of the code execution.
827///
828/// **This feature is only available for Virtual Objects and Workflows:**
829/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
830/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
831///
832/// ```rust,no_run
833/// # use restate_sdk::prelude::*;
834/// #
835/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
836///     /// 1. Listing state keys
837///     /// List all the state keys that have entries in the state store for this object, via:
838///     let keys = ctx.get_keys().await?;
839///
840///     /// 2. Getting a state value
841///     let my_string = ctx
842///         .get::<String>("my-key")
843///         .await?
844///         .unwrap_or(String::from("my-default"));
845///     let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
846///
847///     /// 3. Setting a state value
848///     ctx.set("my-key", String::from("my-value"));
849///
850///     /// 4. Clearing a state value
851///     ctx.clear("my-key");
852///
853///     /// 5. Clearing all state values
854///     /// Deletes all the state in Restate for the object
855///     ctx.clear_all();
856/// #    Ok(())
857/// # }
858/// ```
859///
860/// For more info about serialization, see [crate::serde]
861///
862/// ### Command-line introspection
863/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
864/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
865///
866pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> {
867    /// Get state
868    fn get<T: Deserialize + 'static>(
869        &self,
870        key: &'ctx str,
871    ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
872        self.inner_context().get(key)
873    }
874
875    /// Get state keys
876    fn get_keys(&'ctx self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + 'ctx {
877        self.inner_context().get_keys()
878    }
879}
880
881impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> ContextReadState<'ctx>
882    for CTX
883{
884}
885
886/// # Writing State
887/// You can store key-value state in Restate.
888/// Restate makes sure the state is consistent with the processing of the code execution.
889///
890/// **This feature is only available for Virtual Objects and Workflows:**
891/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
892/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
893///
894/// ```rust,no_run
895/// # use restate_sdk::prelude::*;
896/// #
897/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
898///     /// 1. Listing state keys
899///     /// List all the state keys that have entries in the state store for this object, via:
900///     let keys = ctx.get_keys().await?;
901///
902///     /// 2. Getting a state value
903///     let my_string = ctx
904///         .get::<String>("my-key")
905///         .await?
906///         .unwrap_or(String::from("my-default"));
907///     let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
908///
909///     /// 3. Setting a state value
910///     ctx.set("my-key", String::from("my-value"));
911///
912///     /// 4. Clearing a state value
913///     ctx.clear("my-key");
914///
915///     /// 5. Clearing all state values
916///     /// Deletes all the state in Restate for the object
917///     ctx.clear_all();
918/// #    Ok(())
919/// # }
920/// ```
921///
922/// For more info about serialization, see [crate::serde]
923///
924/// ## Command-line introspection
925/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
926/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
927///
928pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> {
929    /// Set state
930    fn set<T: Serialize + 'static>(&self, key: &str, t: T) {
931        self.inner_context().set(key, t)
932    }
933
934    /// Clear state
935    fn clear(&self, key: &str) {
936        self.inner_context().clear(key)
937    }
938
939    /// Clear all state
940    fn clear_all(&self) {
941        self.inner_context().clear_all()
942    }
943}
944
945impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanWriteState> ContextWriteState<'ctx>
946    for CTX
947{
948}
949
950/// Trait exposing Restate promise functionalities.
951///
952/// A promise is a durable, distributed version of a Rust oneshot channel.
953/// Restate keeps track of the promises across restarts/failures.
954///
955/// You can use this feature to implement interaction between different workflow handlers, e.g. to
956/// send a signal from a shared handler to the workflow handler.
957pub trait ContextPromises<'ctx>: private::SealedContext<'ctx> {
958    /// Create a promise
959    fn promise<T: Deserialize + 'static>(
960        &'ctx self,
961        key: &'ctx str,
962    ) -> impl DurableFuture<Output = Result<T, TerminalError>> + 'ctx {
963        self.inner_context().promise(key)
964    }
965
966    /// Peek a promise
967    fn peek_promise<T: Deserialize + 'static>(
968        &self,
969        key: &'ctx str,
970    ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
971        self.inner_context().peek_promise(key)
972    }
973
974    /// Resolve a promise
975    fn resolve_promise<T: Serialize + 'static>(&self, key: &str, t: T) {
976        self.inner_context().resolve_promise(key, t)
977    }
978
979    /// Resolve a promise
980    fn reject_promise(&self, key: &str, failure: TerminalError) {
981        self.inner_context().reject_promise(key, failure)
982    }
983}
984
985impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanUsePromises> ContextPromises<'ctx>
986    for CTX
987{
988}
989
990pub trait DurableFuture: Future + macro_support::SealedDurableFuture {}
991
992pub(crate) mod private {
993    use super::*;
994    pub trait SealedContext<'ctx> {
995        fn inner_context(&self) -> &'ctx ContextInternal;
996
997        fn random_seed(&self) -> u64;
998
999        #[cfg(feature = "rand")]
1000        fn rand(&mut self) -> &mut rand::prelude::StdRng;
1001    }
1002
1003    // Context capabilities
1004    pub trait SealedCanReadState {}
1005    pub trait SealedCanWriteState {}
1006    pub trait SealedCanUsePromises {}
1007
1008    impl<'ctx> SealedContext<'ctx> for Context<'ctx> {
1009        fn inner_context(&self) -> &'ctx ContextInternal {
1010            self.inner
1011        }
1012
1013        fn random_seed(&self) -> u64 {
1014            self.random_seed
1015        }
1016
1017        #[cfg(feature = "rand")]
1018        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1019            &mut self.std_rng
1020        }
1021    }
1022
1023    impl<'ctx> SealedContext<'ctx> for SharedObjectContext<'ctx> {
1024        fn inner_context(&self) -> &'ctx ContextInternal {
1025            self.inner
1026        }
1027
1028        fn random_seed(&self) -> u64 {
1029            self.random_seed
1030        }
1031
1032        #[cfg(feature = "rand")]
1033        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1034            &mut self.std_rng
1035        }
1036    }
1037
1038    impl SealedCanReadState for SharedObjectContext<'_> {}
1039
1040    impl<'ctx> SealedContext<'ctx> for ObjectContext<'ctx> {
1041        fn inner_context(&self) -> &'ctx ContextInternal {
1042            self.inner
1043        }
1044
1045        fn random_seed(&self) -> u64 {
1046            self.random_seed
1047        }
1048
1049        #[cfg(feature = "rand")]
1050        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1051            &mut self.std_rng
1052        }
1053    }
1054
1055    impl SealedCanReadState for ObjectContext<'_> {}
1056    impl SealedCanWriteState for ObjectContext<'_> {}
1057
1058    impl<'ctx> SealedContext<'ctx> for SharedWorkflowContext<'ctx> {
1059        fn inner_context(&self) -> &'ctx ContextInternal {
1060            self.inner
1061        }
1062
1063        fn random_seed(&self) -> u64 {
1064            self.random_seed
1065        }
1066
1067        #[cfg(feature = "rand")]
1068        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1069            &mut self.std_rng
1070        }
1071    }
1072
1073    impl SealedCanReadState for SharedWorkflowContext<'_> {}
1074    impl SealedCanUsePromises for SharedWorkflowContext<'_> {}
1075
1076    impl<'ctx> SealedContext<'ctx> for WorkflowContext<'ctx> {
1077        fn inner_context(&self) -> &'ctx ContextInternal {
1078            self.inner
1079        }
1080
1081        fn random_seed(&self) -> u64 {
1082            self.random_seed
1083        }
1084
1085        #[cfg(feature = "rand")]
1086        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1087            &mut self.std_rng
1088        }
1089    }
1090
1091    impl SealedCanReadState for WorkflowContext<'_> {}
1092    impl SealedCanWriteState for WorkflowContext<'_> {}
1093    impl SealedCanUsePromises for WorkflowContext<'_> {}
1094}