cognis_core/compose/
lambda.rs1use std::future::Future;
4use std::marker::PhantomData;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9
10use crate::runnable::{Runnable, RunnableConfig};
11use crate::Result;
12
13type LambdaFn<I, O> =
14 dyn Fn(I, RunnableConfig) -> Pin<Box<dyn Future<Output = Result<O>> + Send>> + Send + Sync;
15
16pub struct Lambda<I, O> {
18 func: Arc<LambdaFn<I, O>>,
19 name: &'static str,
20 _phantom: PhantomData<fn(I) -> O>,
21}
22
23impl<I, O> Clone for Lambda<I, O> {
24 fn clone(&self) -> Self {
25 Self {
26 func: self.func.clone(),
27 name: self.name,
28 _phantom: PhantomData,
29 }
30 }
31}
32
33impl<I, O> Lambda<I, O>
34where
35 I: Send + 'static,
36 O: Send + 'static,
37{
38 pub fn from_async<F, Fut>(f: F) -> Self
40 where
41 F: Fn(I) -> Fut + Send + Sync + 'static,
42 Fut: Future<Output = Result<O>> + Send + 'static,
43 {
44 Self {
45 func: Arc::new(move |i, _cfg| {
46 Box::pin(f(i)) as Pin<Box<dyn Future<Output = Result<O>> + Send>>
47 }),
48 name: "Lambda",
49 _phantom: PhantomData,
50 }
51 }
52
53 pub fn from_async_with_config<F, Fut>(f: F) -> Self
55 where
56 F: Fn(I, RunnableConfig) -> Fut + Send + Sync + 'static,
57 Fut: Future<Output = Result<O>> + Send + 'static,
58 {
59 Self {
60 func: Arc::new(move |i, c| {
61 Box::pin(f(i, c)) as Pin<Box<dyn Future<Output = Result<O>> + Send>>
62 }),
63 name: "Lambda",
64 _phantom: PhantomData,
65 }
66 }
67
68 pub fn from_sync<F>(f: F) -> Self
70 where
71 F: Fn(I) -> Result<O> + Send + Sync + 'static,
72 {
73 Self {
74 func: Arc::new(move |i, _cfg| {
75 let result = f(i);
76 Box::pin(async move { result }) as Pin<Box<dyn Future<Output = Result<O>> + Send>>
77 }),
78 name: "Lambda",
79 _phantom: PhantomData,
80 }
81 }
82
83 pub fn with_name(mut self, name: &'static str) -> Self {
85 self.name = name;
86 self
87 }
88}
89
90#[async_trait]
91impl<I, O> Runnable<I, O> for Lambda<I, O>
92where
93 I: Send + 'static,
94 O: Send + 'static,
95{
96 async fn invoke(&self, input: I, config: RunnableConfig) -> Result<O> {
97 (self.func)(input, config).await
98 }
99
100 fn name(&self) -> &str {
101 self.name
102 }
103}
104
105pub fn lambda<F, Fut, I, O>(f: F) -> Lambda<I, O>
107where
108 F: Fn(I) -> Fut + Send + Sync + 'static,
109 Fut: Future<Output = Result<O>> + Send + 'static,
110 I: Send + 'static,
111 O: Send + 'static,
112{
113 Lambda::from_async(f)
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[tokio::test]
121 async fn from_async_runs() {
122 let l = lambda(|i: u32| async move { Ok(i + 1) });
123 assert_eq!(l.invoke(2, RunnableConfig::default()).await.unwrap(), 3);
124 }
125
126 #[tokio::test]
127 async fn from_sync_runs() {
128 let l: Lambda<u32, u32> = Lambda::from_sync(|i| Ok(i * 2));
129 assert_eq!(l.invoke(5, RunnableConfig::default()).await.unwrap(), 10);
130 }
131
132 #[tokio::test]
133 async fn with_name_overrides() {
134 let l = lambda(|i: u32| async move { Ok(i) }).with_name("my_lambda");
135 assert_eq!(l.name(), "my_lambda");
136 }
137}