Skip to main content

restate_sdk/context/
mod.rs

1//! Types exposing Restate functionalities to service handlers.
2
3use crate::endpoint::{ContextInternal, InputMetadata};
4use crate::errors::{HandlerResult, TerminalError};
5use crate::serde::{Deserialize, Serialize};
6use std::future::Future;
7use std::time::Duration;
8
9#[doc(hidden)]
10pub mod macro_support;
11mod request;
12mod run;
13mod select;
14mod select_any;
15
16pub use request::{CallFuture, InvocationHandle, Request, RequestTarget};
17pub use run::{RunClosure, RunFuture, RunRetryPolicy};
18pub use select_any::DurableFuturesUnordered;
19
20pub type HeaderMap = http::HeaderMap<String>;
21
22/// Service handler context.
23pub struct Context<'ctx> {
24    invocation_id: String,
25    random_seed: u64,
26    #[cfg(feature = "rand")]
27    std_rng: rand::prelude::StdRng,
28    headers: HeaderMap,
29    inner: &'ctx ContextInternal,
30}
31
32impl Context<'_> {
33    /// Get invocation id.
34    pub fn invocation_id(&self) -> &str {
35        &self.invocation_id
36    }
37
38    /// Get request headers.
39    pub fn headers(&self) -> &HeaderMap {
40        &self.headers
41    }
42
43    /// Get request headers.
44    pub fn headers_mut(&mut self) -> &HeaderMap {
45        &mut self.headers
46    }
47}
48
49impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for Context<'ctx> {
50    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
51        Self {
52            invocation_id: value.1.invocation_id,
53            random_seed: value.1.random_seed,
54            #[cfg(feature = "rand")]
55            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
56            headers: value.1.headers,
57            inner: value.0,
58        }
59    }
60}
61
62/// Object shared handler context.
63pub struct SharedObjectContext<'ctx> {
64    invocation_id: String,
65    key: String,
66    random_seed: u64,
67    #[cfg(feature = "rand")]
68    std_rng: rand::prelude::StdRng,
69    headers: HeaderMap,
70    pub(crate) inner: &'ctx ContextInternal,
71}
72
73impl SharedObjectContext<'_> {
74    /// Get invocation id.
75    pub fn invocation_id(&self) -> &str {
76        &self.invocation_id
77    }
78
79    /// Get object key.
80    pub fn key(&self) -> &str {
81        &self.key
82    }
83
84    /// Get request headers.
85    pub fn headers(&self) -> &HeaderMap {
86        &self.headers
87    }
88
89    /// Get request headers.
90    pub fn headers_mut(&mut self) -> &HeaderMap {
91        &mut self.headers
92    }
93}
94
95impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedObjectContext<'ctx> {
96    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
97        Self {
98            invocation_id: value.1.invocation_id,
99            key: value.1.key,
100            random_seed: value.1.random_seed,
101            #[cfg(feature = "rand")]
102            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
103            headers: value.1.headers,
104            inner: value.0,
105        }
106    }
107}
108
109/// Object handler context.
110pub struct ObjectContext<'ctx> {
111    invocation_id: String,
112    key: String,
113    random_seed: u64,
114    #[cfg(feature = "rand")]
115    std_rng: rand::prelude::StdRng,
116    headers: HeaderMap,
117    pub(crate) inner: &'ctx ContextInternal,
118}
119
120impl ObjectContext<'_> {
121    /// Get invocation id.
122    pub fn invocation_id(&self) -> &str {
123        &self.invocation_id
124    }
125
126    /// Get object key.
127    pub fn key(&self) -> &str {
128        &self.key
129    }
130
131    /// Get request headers.
132    pub fn headers(&self) -> &HeaderMap {
133        &self.headers
134    }
135
136    /// Get request headers.
137    pub fn headers_mut(&mut self) -> &HeaderMap {
138        &mut self.headers
139    }
140}
141
142impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for ObjectContext<'ctx> {
143    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
144        Self {
145            invocation_id: value.1.invocation_id,
146            key: value.1.key,
147            random_seed: value.1.random_seed,
148            #[cfg(feature = "rand")]
149            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
150            headers: value.1.headers,
151            inner: value.0,
152        }
153    }
154}
155
156/// Workflow shared handler context.
157pub struct SharedWorkflowContext<'ctx> {
158    invocation_id: String,
159    key: String,
160    random_seed: u64,
161    #[cfg(feature = "rand")]
162    std_rng: rand::prelude::StdRng,
163    headers: HeaderMap,
164    pub(crate) inner: &'ctx ContextInternal,
165}
166
167impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for SharedWorkflowContext<'ctx> {
168    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
169        Self {
170            invocation_id: value.1.invocation_id,
171            key: value.1.key,
172            random_seed: value.1.random_seed,
173            #[cfg(feature = "rand")]
174            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
175            headers: value.1.headers,
176            inner: value.0,
177        }
178    }
179}
180
181impl SharedWorkflowContext<'_> {
182    /// Get invocation id.
183    pub fn invocation_id(&self) -> &str {
184        &self.invocation_id
185    }
186
187    /// Get workflow key.
188    pub fn key(&self) -> &str {
189        &self.key
190    }
191
192    /// Get request headers.
193    pub fn headers(&self) -> &HeaderMap {
194        &self.headers
195    }
196
197    /// Get request headers.
198    pub fn headers_mut(&mut self) -> &HeaderMap {
199        &mut self.headers
200    }
201}
202
203/// Workflow handler context.
204pub struct WorkflowContext<'ctx> {
205    invocation_id: String,
206    key: String,
207    random_seed: u64,
208    #[cfg(feature = "rand")]
209    std_rng: rand::prelude::StdRng,
210    headers: HeaderMap,
211    pub(crate) inner: &'ctx ContextInternal,
212}
213
214impl<'ctx> From<(&'ctx ContextInternal, InputMetadata)> for WorkflowContext<'ctx> {
215    fn from(value: (&'ctx ContextInternal, InputMetadata)) -> Self {
216        Self {
217            invocation_id: value.1.invocation_id,
218            key: value.1.key,
219            random_seed: value.1.random_seed,
220            #[cfg(feature = "rand")]
221            std_rng: rand::prelude::SeedableRng::seed_from_u64(value.1.random_seed),
222            headers: value.1.headers,
223            inner: value.0,
224        }
225    }
226}
227
228impl WorkflowContext<'_> {
229    /// Get invocation id.
230    pub fn invocation_id(&self) -> &str {
231        &self.invocation_id
232    }
233
234    /// Get workflow key.
235    pub fn key(&self) -> &str {
236        &self.key
237    }
238
239    /// Get request headers.
240    pub fn headers(&self) -> &HeaderMap {
241        &self.headers
242    }
243
244    /// Get request headers.
245    pub fn headers_mut(&mut self) -> &HeaderMap {
246        &mut self.headers
247    }
248}
249
250///
251/// # Scheduling & Timers
252/// The Restate SDK includes durable timers.
253/// You can use these to let handlers sleep for a specified time, or to schedule a handler to be called at a later time.
254/// These timers are resilient to failures and restarts.
255/// Restate stores and keeps track of the timers and triggers them on time, even across failures and restarts.
256///
257/// ## Scheduling Async Tasks
258///
259/// To schedule a handler to be called at a later time, have a look at the documentation on [delayed calls](Request::send_after).
260///
261/// ## Durable sleep
262/// To sleep in a Restate application for ten seconds, do the following:
263///
264/// ```rust,no_run
265/// # use restate_sdk::prelude::*;
266/// # use std::convert::Infallible;
267/// # use std::time::Duration;
268/// #
269/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
270/// ctx.sleep(Duration::from_secs(10)).await?;
271/// #    Ok(())
272/// # }
273/// ```
274///
275/// **Cost savings on FaaS**:
276/// Restate suspends the handler while it is sleeping, to free up resources.
277/// This is beneficial for AWS Lambda deployments, since you don't pay for the time the handler is sleeping.
278///
279/// **Sleeping in Virtual Objects**:
280/// Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while sleeping.
281///
282/// <details>
283/// <summary>Clock synchronization Restate Server vs. SDK</summary>
284///
285/// The Restate SDK calculates the wake-up time based on the delay you specify.
286/// The Restate Server then uses this calculated time to wake up the handler.
287/// If the Restate Server and the SDK have different system clocks, the sleep duration might not be accurate.
288/// So make sure that the system clock of the Restate Server and the SDK have the same timezone and are synchronized.
289/// </details>
290pub trait ContextTimers<'ctx>: private::SealedContext<'ctx> {
291    /// Sleep using Restate
292    fn sleep(
293        &self,
294        duration: Duration,
295    ) -> impl DurableFuture<Output = Result<(), TerminalError>> + 'ctx {
296        private::SealedContext::inner_context(self).sleep(duration)
297    }
298}
299
300impl<'ctx, CTX: private::SealedContext<'ctx>> ContextTimers<'ctx> for CTX {}
301
302/// # Service Communication
303///
304/// A handler can call another handler and wait for the response (request-response), or it can send a message without waiting for the response.
305///
306/// ## Request-response calls
307///
308/// Request-response calls are requests where the client waits for the response.
309///
310/// You can do request-response calls to Services, Virtual Objects, and Workflows, in the following way:
311///
312/// ```rust,no_run
313/// # #[path = "../../examples/services/mod.rs"]
314/// # mod services;
315/// # use services::my_virtual_object::MyVirtualObjectClient;
316/// # use services::my_service::MyServiceClient;
317/// # use services::my_workflow::MyWorkflowClient;
318/// # use restate_sdk::prelude::*;
319/// #
320/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
321///     // To a Service:
322///     let service_response = ctx
323///         .service_client::<MyServiceClient>()
324///         .my_handler(String::from("Hi!"))
325///         .call()
326///         .await?;
327///
328///     // To a Virtual Object:
329///     let object_response = ctx
330///         .object_client::<MyVirtualObjectClient>("Mary")
331///         .my_handler(String::from("Hi!"))
332///         .call()
333///         .await?;
334///
335///     // To a Workflow:
336///     let workflow_result = ctx
337///         .workflow_client::<MyWorkflowClient>("my-workflow-id")
338///         .run(String::from("Hi!"))
339///         .call()
340///         .await?;
341///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
342///         .interact_with_workflow()
343///         .call()
344///         .await?;
345///  #    Ok(())
346///  # }
347/// ```
348///
349/// 1. **Create a service client**.
350///     - For Virtual Objects, you also need to supply the key of the Virtual Object you want to call, here `"Mary"`.
351///     - For Workflows, you need to supply a workflow ID that is unique per workflow execution.
352/// 2. **Specify the handler** you want to call and supply the request.
353/// 3. **Await** the call to retrieve the response.
354///
355/// **No need for manual retry logic**:
356/// Restate proxies all the calls and logs them in the journal.
357/// In case of failures, Restate takes care of retries, so you don't need to implement this yourself here.
358///
359/// ## Sending messages
360///
361/// Handlers can send messages (a.k.a. one-way calls, or fire-and-forget calls), as follows:
362///
363/// ```rust,no_run
364/// # #[path = "../../examples/services/mod.rs"]
365/// # mod services;
366/// # use services::my_virtual_object::MyVirtualObjectClient;
367/// # use services::my_service::MyServiceClient;
368/// # use services::my_workflow::MyWorkflowClient;
369/// # use restate_sdk::prelude::*;
370/// #
371/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
372///     // To a Service:
373///     ctx.service_client::<MyServiceClient>()
374///         .my_handler(String::from("Hi!"))
375///         .send();
376///
377///     // To a Virtual Object:
378///     ctx.object_client::<MyVirtualObjectClient>("Mary")
379///         .my_handler(String::from("Hi!"))
380///         .send();
381///
382///     // To a Workflow:
383///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
384///         .run(String::from("Hi!"))
385///         .send();
386///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
387///         .interact_with_workflow()
388///         .send();
389///  #    Ok(())
390///  # }
391/// ```
392///
393/// **No need for message queues**:
394/// Without Restate, you would usually put a message queue in between the two services, to guarantee the message delivery.
395/// Restate eliminates the need for a message queue because Restate durably logs the request and makes sure it gets executed.
396///
397/// ## Delayed calls
398///
399/// A delayed call is a one-way call that gets executed after a specified delay.
400///
401/// To schedule a delayed call, send a message with a delay parameter, as follows:
402///
403/// ```rust,no_run
404/// # #[path = "../../examples/services/mod.rs"]
405/// # mod services;
406/// # use services::my_virtual_object::MyVirtualObjectClient;
407/// # use services::my_service::MyServiceClient;
408/// # use services::my_workflow::MyWorkflowClient;
409/// # use restate_sdk::prelude::*;
410/// # use std::time::Duration;
411/// #
412/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
413///     // To a Service:
414///     ctx.service_client::<MyServiceClient>()
415///         .my_handler(String::from("Hi!"))
416///         .send_after(Duration::from_millis(5000));
417///
418///     // To a Virtual Object:
419///     ctx.object_client::<MyVirtualObjectClient>("Mary")
420///         .my_handler(String::from("Hi!"))
421///         .send_after(Duration::from_millis(5000));
422///
423///     // To a Workflow:
424///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
425///         .run(String::from("Hi!"))
426///         .send_after(Duration::from_millis(5000));
427///     ctx.workflow_client::<MyWorkflowClient>("my-workflow-id")
428///         .interact_with_workflow()
429///         .send_after(Duration::from_millis(5000));
430///  #    Ok(())
431///  # }
432/// ```
433///
434/// You can also use this functionality to schedule async tasks.
435/// Restate will make sure the task gets executed at the desired time.
436///
437/// ### Ordering guarantees in Virtual Objects
438/// Invocations to a Virtual Object are executed serially.
439/// Invocations will execute in the same order in which they arrive at Restate.
440/// For example, assume a handler calls the same Virtual Object twice:
441///
442/// ```rust,no_run
443/// # #[path = "../../examples/services/my_virtual_object.rs"]
444/// # mod my_virtual_object;
445/// # use my_virtual_object::MyVirtualObjectClient;
446/// # use restate_sdk::prelude::*;
447/// # async fn greet(ctx: Context<'_>, greeting: String) -> Result<(), HandlerError> {
448///     ctx.object_client::<MyVirtualObjectClient>("Mary")
449///         .my_handler(String::from("I'm call A!"))
450///         .send();
451///     ctx.object_client::<MyVirtualObjectClient>("Mary")
452///         .my_handler(String::from("I'm call B!"))
453///         .send();
454/// #    Ok(())
455/// }
456/// ```
457///
458/// It is guaranteed that call A will execute before call B.
459/// It is not guaranteed though that call B will be executed immediately after call A, as invocations coming from other handlers/sources, could interleave these two calls.
460///
461/// ### Deadlocks with Virtual Objects
462/// Request-response calls to Virtual Objects can lead to deadlocks, in which the Virtual Object remains locked and can't process any more requests.
463/// Some example cases:
464/// - Cross deadlock between Virtual Object A and B: A calls B, and B calls A, both using same keys.
465/// - Cyclical deadlock: A calls B, and B calls C, and C calls A again.
466///
467/// In this situation, you can use the CLI to unblock the Virtual Object manually by [cancelling invocations](https://docs.restate.dev/operate/invocation#cancelling-invocations).
468///
469pub trait ContextClient<'ctx>: private::SealedContext<'ctx> {
470    /// Create a [`Request`].
471    fn request<Req, Res>(
472        &self,
473        request_target: RequestTarget,
474        req: Req,
475    ) -> Request<'ctx, Req, Res> {
476        Request::new(self.inner_context(), request_target, req)
477    }
478
479    /// Create an [`InvocationHandle`] from an invocation id.
480    fn invocation_handle(&self, invocation_id: String) -> impl InvocationHandle + 'ctx {
481        self.inner_context().invocation_handle(invocation_id)
482    }
483
484    /// Create a service client. The service client is generated by the [`restate_sdk_macros::service`] macro with the same name of the trait suffixed with `Client`.
485    ///
486    /// ```rust,no_run
487    /// # use std::time::Duration;
488    /// # use restate_sdk::prelude::*;
489    ///
490    /// #[restate_sdk::service]
491    /// trait MyService {
492    ///   async fn handle() -> HandlerResult<()>;
493    /// }
494    ///
495    /// # async fn handler(ctx: Context<'_>) {
496    /// let client = ctx.service_client::<MyServiceClient>();
497    ///
498    /// // Do request
499    /// let result = client.handle().call().await;
500    ///
501    /// // Just send the request, don't wait the response
502    /// client.handle().send();
503    ///
504    /// // Schedule the request to be executed later
505    /// client.handle().send_after(Duration::from_secs(60));
506    /// # }
507    /// ```
508    fn service_client<C>(&self) -> C
509    where
510        C: IntoServiceClient<'ctx>,
511    {
512        C::create_client(self.inner_context())
513    }
514
515    /// Create an object client. The object client is generated by the [`restate_sdk_macros::object`] macro with the same name of the trait suffixed with `Client`.
516    ///
517    /// ```rust,no_run
518    /// # use std::time::Duration;
519    /// # use restate_sdk::prelude::*;
520    ///
521    /// #[restate_sdk::object]
522    /// trait MyObject {
523    ///   async fn handle() -> HandlerResult<()>;
524    /// }
525    ///
526    /// # async fn handler(ctx: Context<'_>) {
527    /// let client = ctx.object_client::<MyObjectClient>("my-key");
528    ///
529    /// // Do request
530    /// let result = client.handle().call().await;
531    ///
532    /// // Just send the request, don't wait the response
533    /// client.handle().send();
534    ///
535    /// // Schedule the request to be executed later
536    /// client.handle().send_after(Duration::from_secs(60));
537    /// # }
538    /// ```
539    fn object_client<C>(&self, key: impl Into<String>) -> C
540    where
541        C: IntoObjectClient<'ctx>,
542    {
543        C::create_client(self.inner_context(), key.into())
544    }
545
546    /// Create an workflow client. The workflow client is generated by the [`restate_sdk_macros::workflow`] macro with the same name of the trait suffixed with `Client`.
547    ///
548    /// ```rust,no_run
549    /// # use std::time::Duration;
550    /// # use restate_sdk::prelude::*;
551    ///
552    /// #[restate_sdk::workflow]
553    /// trait MyWorkflow {
554    ///   async fn handle() -> HandlerResult<()>;
555    /// }
556    ///
557    /// # async fn handler(ctx: Context<'_>) {
558    /// let client = ctx.workflow_client::<MyWorkflowClient>("my-key");
559    ///
560    /// // Do request
561    /// let result = client.handle().call().await;
562    ///
563    /// // Just send the request, don't wait the response
564    /// client.handle().send();
565    ///
566    /// // Schedule the request to be executed later
567    /// client.handle().send_after(Duration::from_secs(60));
568    /// # }
569    /// ```
570    fn workflow_client<C>(&self, key: impl Into<String>) -> C
571    where
572        C: IntoWorkflowClient<'ctx>,
573    {
574        C::create_client(self.inner_context(), key.into())
575    }
576}
577
578/// Trait used by codegen to create the service client.
579pub trait IntoServiceClient<'ctx>: Sized {
580    fn create_client(ctx: &'ctx ContextInternal) -> Self;
581}
582
583/// Trait used by codegen to use the object client.
584pub trait IntoObjectClient<'ctx>: Sized {
585    fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
586}
587
588/// Trait used by codegen to use the workflow client.
589pub trait IntoWorkflowClient<'ctx>: Sized {
590    fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self;
591}
592
593impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {}
594
595/// # Awakeables
596///
597/// Awakeables pause an invocation while waiting for another process to complete a task.
598/// You can use this pattern to let a handler execute a task somewhere else and retrieve the result.
599/// This pattern is also known as the callback (task token) pattern.
600///
601/// ## Creating awakeables
602///
603/// 1. **Create an awakeable**. This contains a String identifier and a Promise.
604/// 2. **Trigger a task/process** and attach the awakeable ID (e.g., over Kafka, via HTTP, ...).
605///    For example, send an HTTP request to a service that executes the task, and attach the ID to the payload.
606///    You use `ctx.run` to avoid re-triggering the task on retries.
607/// 3. **Wait** until the other process has executed the task.
608///    The handler **receives the payload and resumes**.
609///
610/// ```rust,no_run
611/// # use restate_sdk::prelude::*;
612/// #
613/// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
614/// /// 1. Create an awakeable
615/// let (id, promise) = ctx.awakeable::<String>();
616///
617/// /// 2. Trigger a task
618/// ctx.run(|| trigger_task_and_deliver_id(id.clone())).await?;
619///
620/// /// 3. Wait for the promise to be resolved
621/// let payload = promise.await?;
622/// # Ok(())
623/// # }
624/// # async fn trigger_task_and_deliver_id(awakeable_id: String) -> Result<(), HandlerError>{
625/// #    Ok(())
626/// # }
627/// ```
628///
629///
630/// ## Completing awakeables
631///
632/// The external process completes the awakeable by either resolving it with an optional payload or by rejecting it
633/// with its ID and a reason for the failure. This throws [a terminal error][crate::errors] in the waiting handler.
634///
635/// - Resolving over HTTP with its ID and an optional payload:
636///
637/// ```text
638/// curl localhost:8080/restate/awakeables/prom_1PePOqp/resolve
639///     -H 'content-type: application/json'
640///     -d '{"hello": "world"}'
641/// ```
642///
643/// - Rejecting over HTTP with its ID and a reason:
644///
645/// ```text
646/// curl localhost:8080/restate/awakeables/prom_1PePOqp/reject
647///     -H 'content-type: text/plain' \
648///     -d 'Very bad error!'
649/// ```
650///
651/// - Resolving via the SDK with its ID and an optional payload, or rejecting with its ID and a reason:
652///
653/// ```rust,no_run
654/// # use restate_sdk::prelude::*;
655/// #
656/// # async fn handle(ctx: Context<'_>, id: String) -> Result<(), HandlerError> {
657/// // Resolve the awakeable
658/// ctx.resolve_awakeable(&id, "hello".to_string());
659///
660/// // Or reject the awakeable
661/// ctx.reject_awakeable(&id, TerminalError::new("my error reason"));
662/// # Ok(())
663/// # }
664/// ```
665///
666/// For more info about serialization of the payload, see [crate::serde]
667///
668/// When running on Function-as-a-Service platforms, such as AWS Lambda, Restate suspends the handler while waiting for the awakeable to be completed.
669/// Since you only pay for the time that the handler is actually running, you don't pay while waiting for the external process to return.
670///
671/// **Be aware**: Virtual Objects only process a single invocation at a time, so the Virtual Object will be blocked while waiting on the awakeable to be resolved.
672pub trait ContextAwakeables<'ctx>: private::SealedContext<'ctx> {
673    /// Create an awakeable
674    fn awakeable<T: Deserialize + 'static>(
675        &self,
676    ) -> (
677        String,
678        impl DurableFuture<Output = Result<T, TerminalError>> + Send + 'ctx,
679    ) {
680        self.inner_context().awakeable()
681    }
682
683    /// Resolve an awakeable
684    fn resolve_awakeable<T: Serialize + 'static>(&self, key: &str, t: T) {
685        self.inner_context().resolve_awakeable(key, t)
686    }
687
688    /// Resolve an awakeable
689    fn reject_awakeable(&self, key: &str, failure: TerminalError) {
690        self.inner_context().reject_awakeable(key, failure)
691    }
692}
693
694impl<'ctx, CTX: private::SealedContext<'ctx>> ContextAwakeables<'ctx> for CTX {}
695
696/// # Journaling Results
697///
698/// Restate uses an execution log for replay after failures and suspensions.
699/// This means that non-deterministic results (e.g. database responses, UUID generation) need to be stored in the execution log.
700/// The SDK offers some functionalities to help you with this:
701/// 1. **[Journaled actions][crate::context::ContextSideEffects::run]**: Run any block of code and store the result in Restate. Restate replays the result instead of re-executing the block on retries.
702/// 2. **[UUID generator][crate::context::ContextSideEffects::rand_uuid]**: Built-in helpers for generating stable UUIDs. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
703/// 3. **[Random generator][crate::context::ContextSideEffects::rand]**: Built-in helpers for generating randoms. Restate seeds the random number generator with the invocation ID, so it always returns the same value on retries.
704///
705pub trait ContextSideEffects<'ctx>: private::SealedContext<'ctx> {
706    /// ## Journaled actions
707    /// You can store the result of a (non-deterministic) operation in the Restate execution log (e.g. database requests, HTTP calls, etc).
708    /// Restate replays the result instead of re-executing the operation on retries.
709    ///
710    /// Here is an example of a database request for which the string response is stored in Restate:
711    /// ```rust,no_run
712    /// # use restate_sdk::prelude::*;
713    /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
714    /// let response = ctx.run(|| do_db_request()).await?;
715    /// # Ok(())
716    /// # }
717    /// # async fn do_db_request() -> Result<String, HandlerError>{
718    /// # Ok("Hello".to_string())
719    /// # }
720    /// ```
721    ///
722    /// You cannot use the Restate context within `ctx.run`.
723    /// This includes actions such as getting state, calling another service, and nesting other journaled actions.
724    ///
725    /// For more info about serialization of the return values, see [crate::serde].
726    ///
727    /// You can configure the retry policy for the `ctx.run` block:
728    /// ```rust,no_run
729    /// # use std::time::Duration;
730    /// # use restate_sdk::prelude::*;
731    /// # async fn handle(ctx: Context<'_>) -> Result<(), HandlerError> {
732    /// let my_run_retry_policy = RunRetryPolicy::default()
733    ///     .initial_delay(Duration::from_millis(100))
734    ///     .exponentiation_factor(2.0)
735    ///     .max_delay(Duration::from_millis(1000))
736    ///     .max_attempts(10)
737    ///     .max_duration(Duration::from_secs(10));
738    /// ctx.run(|| write_to_other_system())
739    ///     .retry_policy(my_run_retry_policy)
740    ///     .await
741    ///     .map_err(|e| {
742    ///         // Handle the terminal error after retries exhausted
743    ///         // For example, undo previous actions (see sagas guide) and
744    ///         // propagate the error back to the caller
745    ///         e
746    ///      })?;
747    /// # Ok(())
748    /// # }
749    /// # async fn write_to_other_system() -> Result<String, HandlerError>{
750    /// # Ok("Hello".to_string())
751    /// # }
752    /// ```
753    ///
754    /// This way you can override the default retry behavior of your Restate service for specific operations.
755    /// Have a look at [`RunFuture::retry_policy`] for more information.
756    ///
757    /// If you set a maximum number of attempts, then the `ctx.run` block will fail with a [TerminalError] once the retries are exhausted.
758    /// Have a look at the [Sagas guide](https://docs.restate.dev/guides/sagas) to learn how to undo previous actions of the handler to keep the system in a consistent state.
759    ///
760    /// **Caution: Immediately await journaled actions:**
761    /// Always immediately await `ctx.run`, before doing any other context calls.
762    /// If not, you might bump into non-determinism errors during replay,
763    /// because the journaled result can get interleaved with the other context calls in the journal in a non-deterministic way.
764    ///
765    #[must_use]
766    fn run<R, F, T>(&self, run_closure: R) -> impl RunFuture<Result<T, TerminalError>> + 'ctx
767    where
768        R: RunClosure<Fut = F, Output = T> + Send + 'ctx,
769        F: Future<Output = HandlerResult<T>> + Send + 'ctx,
770        T: Serialize + Deserialize + 'static,
771    {
772        self.inner_context().run(run_closure)
773    }
774
775    /// Return a random seed inherently predictable, based on the invocation id, which is not secret.
776    ///
777    /// This value is stable during the invocation lifecycle, thus across retries.
778    fn random_seed(&self) -> u64 {
779        private::SealedContext::random_seed(self)
780    }
781
782    /// ### Generating random numbers
783    ///
784    /// Return a [`rand::RngExt`] instance inherently predictable, seeded with [`ContextSideEffects::random_seed`].
785    ///
786    /// For example, you can use this to generate a random number:
787    ///
788    /// ```rust,no_run
789    /// # use restate_sdk::prelude::*;
790    /// # use rand::RngExt;
791    /// async fn rand_generate(mut ctx: Context<'_>) {
792    /// let x: u32 = ctx.rand().random();
793    /// # }
794    /// ```
795    ///
796    /// This instance is useful to generate identifiers, idempotency keys, and for uniform sampling from a set of options.
797    /// If a cryptographically secure value is needed, please generate that externally using [`ContextSideEffects::run`].
798    #[cfg(feature = "rand")]
799    fn rand(&mut self) -> &mut rand::prelude::StdRng {
800        private::SealedContext::rand(self)
801    }
802    /// ### Generating UUIDs
803    ///
804    /// Returns a random [`uuid::Uuid`], generated using [`ContextSideEffects::rand`].
805    ///
806    /// You can use these UUIDs to generate stable idempotency keys, to deduplicate operations. For example, you can use this to let a payment service avoid duplicate payments during retries.
807    ///
808    /// Do not use this in cryptographic contexts.
809    ///
810    /// ```rust,no_run
811    /// # use restate_sdk::prelude::*;
812    /// # use uuid::Uuid;
813    /// # async fn uuid_generate(mut ctx: Context<'_>) {
814    /// let uuid: Uuid = ctx.rand_uuid();
815    /// # }
816    /// ```
817    #[cfg(all(feature = "rand", feature = "uuid"))]
818    fn rand_uuid(&mut self) -> uuid::Uuid {
819        let rand = private::SealedContext::rand(self);
820        uuid::Uuid::from_u64_pair(rand::Rng::next_u64(rand), rand::Rng::next_u64(rand))
821    }
822}
823
824impl<'ctx, CTX: private::SealedContext<'ctx>> ContextSideEffects<'ctx> for CTX {}
825
826/// # Reading state
827/// You can store key-value state in Restate.
828/// Restate makes sure the state is consistent with the processing of the code execution.
829///
830/// **This feature is only available for Virtual Objects and Workflows:**
831/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
832/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
833///
834/// ```rust,no_run
835/// # use restate_sdk::prelude::*;
836/// #
837/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
838///     /// 1. Listing state keys
839///     /// List all the state keys that have entries in the state store for this object, via:
840///     let keys = ctx.get_keys().await?;
841///
842///     /// 2. Getting a state value
843///     let my_string = ctx
844///         .get::<String>("my-key")
845///         .await?
846///         .unwrap_or(String::from("my-default"));
847///     let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
848///
849///     /// 3. Setting a state value
850///     ctx.set("my-key", String::from("my-value"));
851///
852///     /// 4. Clearing a state value
853///     ctx.clear("my-key");
854///
855///     /// 5. Clearing all state values
856///     /// Deletes all the state in Restate for the object
857///     ctx.clear_all();
858/// #    Ok(())
859/// # }
860/// ```
861///
862/// For more info about serialization, see [crate::serde]
863///
864/// ### Command-line introspection
865/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
866/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
867///
868pub trait ContextReadState<'ctx>: private::SealedContext<'ctx> {
869    /// Get state
870    fn get<T: Deserialize + 'static>(
871        &self,
872        key: &'ctx str,
873    ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
874        self.inner_context().get(key)
875    }
876
877    /// Get state keys
878    fn get_keys(&'ctx self) -> impl Future<Output = Result<Vec<String>, TerminalError>> + 'ctx {
879        self.inner_context().get_keys()
880    }
881}
882
883impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanReadState> ContextReadState<'ctx>
884    for CTX
885{
886}
887
888/// # Writing State
889/// You can store key-value state in Restate.
890/// Restate makes sure the state is consistent with the processing of the code execution.
891///
892/// **This feature is only available for Virtual Objects and Workflows:**
893/// - For **Virtual Objects**, the state is isolated per Virtual Object and lives forever (across invocations for that object).
894/// - For **Workflows**, you can think of it as if every workflow execution is a new object. So the state is isolated to a single workflow execution. The state can only be mutated by the `run` handler of the workflow. The other handlers can only read the state.
895///
896/// ```rust,no_run
897/// # use restate_sdk::prelude::*;
898/// #
899/// # async fn my_handler(ctx: ObjectContext<'_>) -> Result<(), HandlerError> {
900///     /// 1. Listing state keys
901///     /// List all the state keys that have entries in the state store for this object, via:
902///     let keys = ctx.get_keys().await?;
903///
904///     /// 2. Getting a state value
905///     let my_string = ctx
906///         .get::<String>("my-key")
907///         .await?
908///         .unwrap_or(String::from("my-default"));
909///     let my_number = ctx.get::<f64>("my-number-key").await?.unwrap_or_default();
910///
911///     /// 3. Setting a state value
912///     ctx.set("my-key", String::from("my-value"));
913///
914///     /// 4. Clearing a state value
915///     ctx.clear("my-key");
916///
917///     /// 5. Clearing all state values
918///     /// Deletes all the state in Restate for the object
919///     ctx.clear_all();
920/// #    Ok(())
921/// # }
922/// ```
923///
924/// For more info about serialization, see [crate::serde]
925///
926/// ## Command-line introspection
927/// You can inspect and edit the K/V state stored in Restate via `psql` and the CLI.
928/// Have a look at the [introspection docs](https://docs.restate.dev//operate/introspection#inspecting-application-state) for more information.
929///
930pub trait ContextWriteState<'ctx>: private::SealedContext<'ctx> {
931    /// Set state
932    fn set<T: Serialize + 'static>(&self, key: &str, t: T) {
933        self.inner_context().set(key, t)
934    }
935
936    /// Clear state
937    fn clear(&self, key: &str) {
938        self.inner_context().clear(key)
939    }
940
941    /// Clear all state
942    fn clear_all(&self) {
943        self.inner_context().clear_all()
944    }
945}
946
947impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanWriteState> ContextWriteState<'ctx>
948    for CTX
949{
950}
951
952/// Trait exposing Restate promise functionalities.
953///
954/// A promise is a durable, distributed version of a Rust oneshot channel.
955/// Restate keeps track of the promises across restarts/failures.
956///
957/// You can use this feature to implement interaction between different workflow handlers, e.g. to
958/// send a signal from a shared handler to the workflow handler.
959pub trait ContextPromises<'ctx>: private::SealedContext<'ctx> {
960    /// Create a promise
961    fn promise<T: Deserialize + 'static>(
962        &'ctx self,
963        key: &'ctx str,
964    ) -> impl DurableFuture<Output = Result<T, TerminalError>> + 'ctx {
965        self.inner_context().promise(key)
966    }
967
968    /// Peek a promise
969    fn peek_promise<T: Deserialize + 'static>(
970        &self,
971        key: &'ctx str,
972    ) -> impl Future<Output = Result<Option<T>, TerminalError>> + 'ctx {
973        self.inner_context().peek_promise(key)
974    }
975
976    /// Resolve a promise
977    fn resolve_promise<T: Serialize + 'static>(&self, key: &str, t: T) {
978        self.inner_context().resolve_promise(key, t)
979    }
980
981    /// Resolve a promise
982    fn reject_promise(&self, key: &str, failure: TerminalError) {
983        self.inner_context().reject_promise(key, failure)
984    }
985}
986
987impl<'ctx, CTX: private::SealedContext<'ctx> + private::SealedCanUsePromises> ContextPromises<'ctx>
988    for CTX
989{
990}
991
992pub trait DurableFuture: Future + macro_support::SealedDurableFuture {}
993
994pub(crate) mod private {
995    use super::*;
996    pub trait SealedContext<'ctx> {
997        fn inner_context(&self) -> &'ctx ContextInternal;
998
999        fn random_seed(&self) -> u64;
1000
1001        #[cfg(feature = "rand")]
1002        fn rand(&mut self) -> &mut rand::prelude::StdRng;
1003    }
1004
1005    // Context capabilities
1006    pub trait SealedCanReadState {}
1007    pub trait SealedCanWriteState {}
1008    pub trait SealedCanUsePromises {}
1009
1010    impl<'ctx> SealedContext<'ctx> for Context<'ctx> {
1011        fn inner_context(&self) -> &'ctx ContextInternal {
1012            self.inner
1013        }
1014
1015        fn random_seed(&self) -> u64 {
1016            self.random_seed
1017        }
1018
1019        #[cfg(feature = "rand")]
1020        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1021            &mut self.std_rng
1022        }
1023    }
1024
1025    impl<'ctx> SealedContext<'ctx> for SharedObjectContext<'ctx> {
1026        fn inner_context(&self) -> &'ctx ContextInternal {
1027            self.inner
1028        }
1029
1030        fn random_seed(&self) -> u64 {
1031            self.random_seed
1032        }
1033
1034        #[cfg(feature = "rand")]
1035        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1036            &mut self.std_rng
1037        }
1038    }
1039
1040    impl SealedCanReadState for SharedObjectContext<'_> {}
1041
1042    impl<'ctx> SealedContext<'ctx> for ObjectContext<'ctx> {
1043        fn inner_context(&self) -> &'ctx ContextInternal {
1044            self.inner
1045        }
1046
1047        fn random_seed(&self) -> u64 {
1048            self.random_seed
1049        }
1050
1051        #[cfg(feature = "rand")]
1052        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1053            &mut self.std_rng
1054        }
1055    }
1056
1057    impl SealedCanReadState for ObjectContext<'_> {}
1058    impl SealedCanWriteState for ObjectContext<'_> {}
1059
1060    impl<'ctx> SealedContext<'ctx> for SharedWorkflowContext<'ctx> {
1061        fn inner_context(&self) -> &'ctx ContextInternal {
1062            self.inner
1063        }
1064
1065        fn random_seed(&self) -> u64 {
1066            self.random_seed
1067        }
1068
1069        #[cfg(feature = "rand")]
1070        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1071            &mut self.std_rng
1072        }
1073    }
1074
1075    impl SealedCanReadState for SharedWorkflowContext<'_> {}
1076    impl SealedCanUsePromises for SharedWorkflowContext<'_> {}
1077
1078    impl<'ctx> SealedContext<'ctx> for WorkflowContext<'ctx> {
1079        fn inner_context(&self) -> &'ctx ContextInternal {
1080            self.inner
1081        }
1082
1083        fn random_seed(&self) -> u64 {
1084            self.random_seed
1085        }
1086
1087        #[cfg(feature = "rand")]
1088        fn rand(&mut self) -> &mut rand::prelude::StdRng {
1089            &mut self.std_rng
1090        }
1091    }
1092
1093    impl SealedCanReadState for WorkflowContext<'_> {}
1094    impl SealedCanWriteState for WorkflowContext<'_> {}
1095    impl SealedCanUsePromises for WorkflowContext<'_> {}
1096}