Skip to main content

entelix_runnable/
lambda.rs

1//! Closure-backed `Runnable`. Lets users drop arbitrary async logic into a
2//! pipeline without defining a new type.
3
4use std::sync::Arc;
5
6use entelix_core::{ExecutionContext, Result};
7use futures::future::BoxFuture;
8
9use crate::runnable::Runnable;
10
11type LambdaFn<I, O> = dyn Fn(I, ExecutionContext) -> BoxFuture<'static, Result<O>> + Send + Sync;
12
13/// `Runnable<I, O>` backed by a user-supplied async closure.
14///
15/// The closure receives the input and an owned `ExecutionContext` (cheaply
16/// `Clone`). It must return a future that is `Send + 'static` — common in
17/// practice when the closure body uses owned data and `tokio` primitives.
18pub struct RunnableLambda<I, O>
19where
20    I: Send + 'static,
21    O: Send + 'static,
22{
23    inner: Arc<LambdaFn<I, O>>,
24}
25
26impl<I, O> RunnableLambda<I, O>
27where
28    I: Send + 'static,
29    O: Send + 'static,
30{
31    /// Wrap an async closure into a `Runnable`.
32    ///
33    /// ```ignore
34    /// let double = RunnableLambda::new(|x: i32, _ctx| async move { Ok(x * 2) });
35    /// ```
36    pub fn new<F, Fut>(f: F) -> Self
37    where
38        F: Fn(I, ExecutionContext) -> Fut + Send + Sync + 'static,
39        Fut: core::future::Future<Output = Result<O>> + Send + 'static,
40    {
41        Self {
42            inner: Arc::new(move |input, ctx| Box::pin(f(input, ctx))),
43        }
44    }
45}
46
47impl<I, O> Clone for RunnableLambda<I, O>
48where
49    I: Send + 'static,
50    O: Send + 'static,
51{
52    fn clone(&self) -> Self {
53        Self {
54            inner: Arc::clone(&self.inner),
55        }
56    }
57}
58
59#[async_trait::async_trait]
60impl<I, O> Runnable<I, O> for RunnableLambda<I, O>
61where
62    I: Send + 'static,
63    O: Send + 'static,
64{
65    async fn invoke(&self, input: I, ctx: &ExecutionContext) -> Result<O> {
66        (self.inner)(input, ctx.clone()).await
67    }
68}