use std::sync::Arc;
use std::time::Duration;
use crate::compose::{Each, Pipe};
use crate::runnable::Runnable;
use crate::wrappers::{Cache, Fallback, MemoryCache, Retry, RetryPolicy, Timeout};
pub trait RunnableExt<I, O>: Runnable<I, O> + Sized
where
I: Send + 'static,
O: Send + 'static,
{
fn pipe<R2, O2>(self, next: R2) -> Pipe<Self, R2, I, O, O2>
where
R2: Runnable<O, O2>,
O2: Send + 'static,
{
Pipe::new(self, next)
}
fn with_retry(self, policy: RetryPolicy) -> Retry<Self, I, O>
where
I: Clone,
{
Retry::new(self, policy)
}
fn with_max_retries(self, attempts: u32) -> Retry<Self, I, O>
where
I: Clone,
{
Retry::new(self, RetryPolicy::new(attempts))
}
fn with_timeout(self, duration: Duration) -> Timeout<Self, I, O> {
Timeout::new(self, duration)
}
fn with_fallback<F>(self, fallback: F) -> Fallback<Self, F, I, O>
where
F: Runnable<I, O>,
I: Clone,
{
Fallback::new(self, fallback)
}
fn with_memory_cache<K, F>(self, key_fn: F) -> Cache<Self, I, O, K, MemoryCache<K, O>>
where
K: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
O: Clone + Send + Sync + 'static,
F: Fn(&I) -> K + Send + Sync + 'static,
{
Cache::new(self, Arc::new(MemoryCache::new()), key_fn)
}
fn each(self) -> Each<Self, I, O> {
Each::new(self)
}
}
impl<R, I, O> RunnableExt<I, O> for R
where
R: Runnable<I, O>,
I: Send + 'static,
O: Send + 'static,
{
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runnable::RunnableConfig;
use crate::Result;
use async_trait::async_trait;
struct Inc;
#[async_trait]
impl Runnable<u32, u32> for Inc {
async fn invoke(&self, input: u32, _: RunnableConfig) -> Result<u32> {
Ok(input + 1)
}
}
struct Double;
#[async_trait]
impl Runnable<u32, u32> for Double {
async fn invoke(&self, input: u32, _: RunnableConfig) -> Result<u32> {
Ok(input * 2)
}
}
#[tokio::test]
async fn pipe_method_works() {
let chain = Inc.pipe(Double).pipe(Inc);
let out = chain.invoke(3, RunnableConfig::default()).await.unwrap();
assert_eq!(out, ((3 + 1) * 2) + 1);
}
#[tokio::test]
async fn each_works() {
let mapper = Inc.each();
let out = mapper
.invoke(vec![1, 2, 3], RunnableConfig::default())
.await
.unwrap();
assert_eq!(out, vec![2, 3, 4]);
}
}