use std::future::Future;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::context::DurableContext;
use crate::error::DurableError;
use crate::types::{
BatchResult, CallbackHandle, CallbackOptions, CompensationResult, ExecutionMode, MapOptions,
ParallelOptions, StepOptions,
};
pub trait DurableContextOps {
fn step<T, E, F, Fut>(
&mut self,
name: &str,
f: F,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static;
fn step_with_options<T, E, F, Fut>(
&mut self,
name: &str,
options: StepOptions,
f: F,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static;
fn wait(
&mut self,
name: &str,
duration_secs: i32,
) -> impl Future<Output = Result<(), DurableError>> + Send;
fn create_callback(
&mut self,
name: &str,
options: CallbackOptions,
) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send;
fn invoke<T, P>(
&mut self,
name: &str,
function_name: &str,
payload: &P,
) -> impl Future<Output = Result<T, DurableError>> + Send
where
T: DeserializeOwned + Send,
P: Serialize + Sync;
fn parallel<T, F, Fut>(
&mut self,
name: &str,
branches: Vec<F>,
options: ParallelOptions,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce(DurableContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static;
fn child_context<T, F, Fut>(
&mut self,
name: &str,
f: F,
) -> impl Future<Output = Result<T, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send,
F: FnOnce(DurableContext) -> Fut + Send,
Fut: Future<Output = Result<T, DurableError>> + Send;
fn map<T, I, F, Fut>(
&mut self,
name: &str,
items: Vec<I>,
options: MapOptions,
f: F,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
I: Send + 'static,
F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static;
fn step_with_compensation<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static;
fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
options: StepOptions,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static;
fn run_compensations(
&mut self,
) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send;
fn callback_result<T: DeserializeOwned>(
&self,
handle: &CallbackHandle,
) -> Result<T, DurableError>;
fn execution_mode(&self) -> ExecutionMode;
fn is_replaying(&self) -> bool;
fn arn(&self) -> &str;
fn checkpoint_token(&self) -> &str;
fn log(&self, message: &str);
fn log_with_data(&self, message: &str, data: &serde_json::Value);
fn log_debug(&self, message: &str);
fn log_warn(&self, message: &str);
fn log_error(&self, message: &str);
fn log_debug_with_data(&self, message: &str, data: &serde_json::Value);
fn log_warn_with_data(&self, message: &str, data: &serde_json::Value);
fn log_error_with_data(&self, message: &str, data: &serde_json::Value);
fn enable_batch_mode(&mut self);
fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send;
}
impl DurableContextOps for DurableContext {
fn step<T, E, F, Fut>(
&mut self,
name: &str,
f: F,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
{
DurableContext::step(self, name, f)
}
fn step_with_options<T, E, F, Fut>(
&mut self,
name: &str,
options: StepOptions,
f: F,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
{
DurableContext::step_with_options(self, name, options, f)
}
fn wait(
&mut self,
name: &str,
duration_secs: i32,
) -> impl Future<Output = Result<(), DurableError>> + Send {
DurableContext::wait(self, name, duration_secs)
}
fn create_callback(
&mut self,
name: &str,
options: CallbackOptions,
) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send {
DurableContext::create_callback(self, name, options)
}
fn invoke<T, P>(
&mut self,
name: &str,
function_name: &str,
payload: &P,
) -> impl Future<Output = Result<T, DurableError>> + Send
where
T: DeserializeOwned + Send,
P: Serialize + Sync,
{
DurableContext::invoke(self, name, function_name, payload)
}
fn parallel<T, F, Fut>(
&mut self,
name: &str,
branches: Vec<F>,
options: ParallelOptions,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce(DurableContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
{
DurableContext::parallel(self, name, branches, options)
}
fn child_context<T, F, Fut>(
&mut self,
name: &str,
f: F,
) -> impl Future<Output = Result<T, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send,
F: FnOnce(DurableContext) -> Fut + Send,
Fut: Future<Output = Result<T, DurableError>> + Send,
{
DurableContext::child_context(self, name, f)
}
fn map<T, I, F, Fut>(
&mut self,
name: &str,
items: Vec<I>,
options: MapOptions,
f: F,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
I: Send + 'static,
F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
{
DurableContext::map(self, name, items, options, f)
}
fn step_with_compensation<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
{
DurableContext::step_with_compensation(self, name, forward_fn, compensate_fn)
}
fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
options: StepOptions,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
{
DurableContext::step_with_compensation_opts(self, name, options, forward_fn, compensate_fn)
}
fn run_compensations(
&mut self,
) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send {
DurableContext::run_compensations(self)
}
fn callback_result<T: DeserializeOwned>(
&self,
handle: &CallbackHandle,
) -> Result<T, DurableError> {
DurableContext::callback_result(self, handle)
}
fn execution_mode(&self) -> ExecutionMode {
DurableContext::execution_mode(self)
}
fn is_replaying(&self) -> bool {
DurableContext::is_replaying(self)
}
fn arn(&self) -> &str {
DurableContext::arn(self)
}
fn checkpoint_token(&self) -> &str {
DurableContext::checkpoint_token(self)
}
fn log(&self, message: &str) {
DurableContext::log(self, message);
}
fn log_with_data(&self, message: &str, data: &serde_json::Value) {
DurableContext::log_with_data(self, message, data);
}
fn log_debug(&self, message: &str) {
DurableContext::log_debug(self, message);
}
fn log_warn(&self, message: &str) {
DurableContext::log_warn(self, message);
}
fn log_error(&self, message: &str) {
DurableContext::log_error(self, message);
}
fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
DurableContext::log_debug_with_data(self, message, data);
}
fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
DurableContext::log_warn_with_data(self, message, data);
}
fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
DurableContext::log_error_with_data(self, message, data);
}
fn enable_batch_mode(&mut self) {
DurableContext::enable_batch_mode(self);
}
fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send {
DurableContext::flush_batch(self)
}
}