cognis_core/compose/
each.rs1use std::marker::PhantomData;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::stream::{self, StreamExt};
8
9use crate::runnable::{Runnable, RunnableConfig};
10use crate::Result;
11
12pub struct Each<R, I, O> {
17 inner: R,
18 _phantom: PhantomData<fn(I) -> O>,
19}
20
21impl<R, I, O> Each<R, I, O>
22where
23 R: Runnable<I, O>,
24 I: Send + 'static,
25 O: Send + 'static,
26{
27 pub fn new(inner: R) -> Self {
29 Self {
30 inner,
31 _phantom: PhantomData,
32 }
33 }
34}
35
36#[async_trait]
37impl<R, I, O> Runnable<Vec<I>, Vec<O>> for Each<R, I, O>
38where
39 R: Runnable<I, O>,
40 I: Send + 'static,
41 O: Send + 'static,
42{
43 async fn invoke(&self, inputs: Vec<I>, config: RunnableConfig) -> Result<Vec<O>> {
44 let concurrency = config.max_concurrency.max(1);
45 let cfg = Arc::new(config);
46 stream::iter(inputs)
47 .map(|i| {
48 let cfg = cfg.clone();
49 async move {
50 self.inner
51 .invoke(i, RunnableConfig::clone_for_subcall(&cfg))
52 .await
53 }
54 })
55 .buffered(concurrency)
56 .collect::<Vec<_>>()
57 .await
58 .into_iter()
59 .collect()
60 }
61 fn name(&self) -> &str {
62 "Each"
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69
70 struct Inc;
71
72 #[async_trait]
73 impl Runnable<u32, u32> for Inc {
74 async fn invoke(&self, input: u32, _: RunnableConfig) -> Result<u32> {
75 Ok(input + 1)
76 }
77 }
78
79 #[tokio::test]
80 async fn maps_each_element() {
81 let e = Each::new(Inc);
82 let out = e
83 .invoke(vec![1, 2, 3, 4], RunnableConfig::default())
84 .await
85 .unwrap();
86 assert_eq!(out, vec![2, 3, 4, 5]);
87 }
88}