Skip to main content

cognis_core/compose/
lambda.rs

1//! Wrap a closure as a `Runnable<I, O>`.
2
3use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9
10use crate::runnable::{Runnable, RunnableConfig};
11use crate::Result;
12
13type LambdaFn<I, O> =
14    dyn Fn(I, RunnableConfig) -> Pin<Box<dyn Future<Output = Result<O>> + Send>> + Send + Sync;
15
16/// Wraps an async or sync closure so it can act as a `Runnable<I, O>`.
17pub struct Lambda<I, O> {
18    func: Arc<LambdaFn<I, O>>,
19    name: &'static str,
20    _phantom: PhantomData<fn(I) -> O>,
21}
22
23impl<I, O> Clone for Lambda<I, O> {
24    fn clone(&self) -> Self {
25        Self {
26            func: self.func.clone(),
27            name: self.name,
28            _phantom: PhantomData,
29        }
30    }
31}
32
33impl<I, O> Lambda<I, O>
34where
35    I: Send + 'static,
36    O: Send + 'static,
37{
38    /// Build from an async closure that ignores the config.
39    pub fn from_async<F, Fut>(f: F) -> Self
40    where
41        F: Fn(I) -> Fut + Send + Sync + 'static,
42        Fut: Future<Output = Result<O>> + Send + 'static,
43    {
44        Self {
45            func: Arc::new(move |i, _cfg| {
46                Box::pin(f(i)) as Pin<Box<dyn Future<Output = Result<O>> + Send>>
47            }),
48            name: "Lambda",
49            _phantom: PhantomData,
50        }
51    }
52
53    /// Build from an async closure that uses the config.
54    pub fn from_async_with_config<F, Fut>(f: F) -> Self
55    where
56        F: Fn(I, RunnableConfig) -> Fut + Send + Sync + 'static,
57        Fut: Future<Output = Result<O>> + Send + 'static,
58    {
59        Self {
60            func: Arc::new(move |i, c| {
61                Box::pin(f(i, c)) as Pin<Box<dyn Future<Output = Result<O>> + Send>>
62            }),
63            name: "Lambda",
64            _phantom: PhantomData,
65        }
66    }
67
68    /// Build from a sync closure.
69    pub fn from_sync<F>(f: F) -> Self
70    where
71        F: Fn(I) -> Result<O> + Send + Sync + 'static,
72    {
73        Self {
74            func: Arc::new(move |i, _cfg| {
75                let result = f(i);
76                Box::pin(async move { result }) as Pin<Box<dyn Future<Output = Result<O>> + Send>>
77            }),
78            name: "Lambda",
79            _phantom: PhantomData,
80        }
81    }
82
83    /// Override the name reported via `Runnable::name`.
84    pub fn with_name(mut self, name: &'static str) -> Self {
85        self.name = name;
86        self
87    }
88}
89
90#[async_trait]
91impl<I, O> Runnable<I, O> for Lambda<I, O>
92where
93    I: Send + 'static,
94    O: Send + 'static,
95{
96    async fn invoke(&self, input: I, config: RunnableConfig) -> Result<O> {
97        (self.func)(input, config).await
98    }
99
100    fn name(&self) -> &str {
101        self.name
102    }
103}
104
105/// Convenience constructor — equivalent to [`Lambda::from_async`].
106pub fn lambda<F, Fut, I, O>(f: F) -> Lambda<I, O>
107where
108    F: Fn(I) -> Fut + Send + Sync + 'static,
109    Fut: Future<Output = Result<O>> + Send + 'static,
110    I: Send + 'static,
111    O: Send + 'static,
112{
113    Lambda::from_async(f)
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[tokio::test]
121    async fn from_async_runs() {
122        let l = lambda(|i: u32| async move { Ok(i + 1) });
123        assert_eq!(l.invoke(2, RunnableConfig::default()).await.unwrap(), 3);
124    }
125
126    #[tokio::test]
127    async fn from_sync_runs() {
128        let l: Lambda<u32, u32> = Lambda::from_sync(|i| Ok(i * 2));
129        assert_eq!(l.invoke(5, RunnableConfig::default()).await.unwrap(), 10);
130    }
131
132    #[tokio::test]
133    async fn with_name_overrides() {
134        let l = lambda(|i: u32| async move { Ok(i) }).with_name("my_lambda");
135        assert_eq!(l.name(), "my_lambda");
136    }
137}