pipe_it/
ext.rs

1use crate::{Context, handler::Handler, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4
5/// Extension trait for any type that implements the Handler trait.
6/// Provides a way to convert a Handler into a Pipeline and combinators.
7pub trait HandlerExt<I, O, Args>: Handler<I, O, Args> {
8    fn pipe(self) -> Pipe<Self, Args>
9    where
10        Self: Sized,
11    {
12        Pipe {
13            p: self,
14            _marker: PhantomData,
15        }
16    }
17
18    /// Connects two pipelines together. Output of the first becomes the input of the second.
19    ///
20    /// # Example
21    ///
22    /// ```rust
23    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
24    ///
25    /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
26    /// async fn to_string(n: Input<i32>) -> String { n.to_string() }
27    /// 
28    /// # #[tokio::main]
29    /// # async fn main() {
30    /// let pipe = add_one.pipe().connect(to_string);
31    /// let result = pipe.apply(Context::empty(1)).await;
32    /// assert_eq!(result, "2");
33    /// # }
34    /// ```
35    fn connect<O2, G, Args2>(self, g: G) -> Connect<Pipe<Self, Args>, Pipe<G, Args2>, I, O, O2>
36    where
37        G: Handler<O, O2, Args2>,
38        Self: Sized,
39    {
40        Connect {
41            f: self.pipe(),
42            g: g.pipe(),
43            _marker: PhantomData,
44        }
45    }
46
47    /// Pulls back the domain of the pipeline.
48    /// This allows a pipeline defined on `I` to be used for input `I2` given a mapping `I2 -> I`.
49    ///
50    /// # Example
51    ///
52    /// ```rust
53    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
54    ///
55    /// #[derive(Clone)]
56    /// struct User { age: i32 }
57    /// async fn check_adult(age: Input<i32>) -> bool { *age >= 18 }
58    ///
59    /// # #[tokio::main]
60    /// # async fn main() {
61    /// //Create a pipeline that accepts `User` but processes `i32`
62    /// let pipe = check_adult.pipe().pullback(|u: User| u.age);
63    /// let result = pipe.apply(Context::empty(User { age: 20 })).await;
64    /// assert_eq!(result, true);
65    /// # }
66    /// ```
67    fn pullback<I2, F>(self, map: F) -> Pullback<Pipe<Self, Args>, F, I2, I, O>
68    where
69        F: Fn(I2) -> I + Send + Sync + 'static,
70        Self: Sized,
71    {
72        Pullback {
73            p: self.pipe(),
74            map,
75            _marker: PhantomData,
76        }
77    }
78
79    /// Lifts the domain requirement to anything that can be converted into the original input.
80    /// Uses `From` / `Into` trait.
81    ///
82    /// # Example
83    ///
84    /// ```rust
85    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
86    ///
87    /// async fn process_string(s: Input<String>) -> usize { s.len() }
88    ///
89    /// # #[tokio::main]
90    /// # async fn main() {
91    /// // Accepts &str because String implements From<&str>
92    /// let pipe = process_string.pipe().lift::<&str>();
93    /// let result = pipe.apply(Context::empty("hello")).await;
94    /// assert_eq!(result, 5);
95    /// # }
96    /// ```
97    fn lift<I2>(self) -> Pullback<Pipe<Self, Args>, fn(I2) -> I, I2, I, O>
98    where
99        I: From<I2> + Send + Sync + 'static,
100        I2: Clone + Send + Sync + 'static,
101        Self: Sized,
102    {
103        self.pullback(|i| i.into())
104    }
105
106    /// Extends the output of the pipeline by applying a transformation.
107    ///
108    /// # Example
109    ///
110    /// ```rust
111    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
112    ///
113    /// async fn compute(n: Input<i32>) -> i32 { *n * 2 }
114    ///
115    /// # #[tokio::main]
116    /// # async fn main() {
117    /// // Changes output from i32 to String
118    /// let pipe = compute.pipe().extend(|n| format!("Result: {}", n));
119    /// let result = pipe.apply(Context::empty(10)).await;
120    /// assert_eq!(result, "Result: 20");
121    /// # }
122    /// ```
123    fn extend<O2, F>(self, map: F) -> Extend<Pipe<Self, Args>, F, I, O, O2>
124    where
125        F: Fn(O) -> O2 + Send + Sync + 'static,
126        Self: Sized,
127    {
128        Extend {
129            p: self.pipe(),
130            map,
131            _marker: PhantomData,
132        }
133    }
134
135    /// Repeats the pipeline operation n times.
136    /// Input and output types must be the same.
137    ///
138    /// # Example
139    ///
140    /// ```rust
141    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
142    ///
143    /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
144    ///
145    /// # #[tokio::main]
146    /// # async fn main() {
147    /// // Input: 0 -> 1 -> 2 -> 3
148    /// let pipe = add_one.pipe().repeat(3);
149    /// let result = pipe.apply(Context::empty(0)).await;
150    /// assert_eq!(result, 3);
151    /// # }
152    /// ```
153    fn repeat(self, times: usize) -> Repeat<Pipe<Self, Args>, I>
154    where
155        Self: Handler<I, I, Args> + Sized,
156        I: Clone + Send + Sync + 'static,
157    {
158        Repeat {
159            p: self.pipe(),
160            times,
161            _marker: PhantomData,
162        }
163    }
164
165    /// Caches the results of the pipeline using an LRU strategy.
166    /// Requires the input to implement `Hash + Eq + Clone` and output to implement `Clone`.
167    ///
168    /// # Example
169    ///
170    /// ```rust
171    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
172    /// use std::sync::atomic::{AtomicUsize, Ordering};
173    /// use std::sync::Arc;
174    ///
175    /// static CALL_COUNT: AtomicUsize = AtomicUsize::new(0);
176    ///
177    /// async fn heavy_calc(n: Input<i32>) -> i32 { 
178    ///     CALL_COUNT.fetch_add(1, Ordering::SeqCst);
179    ///     *n * *n 
180    /// }
181    ///
182    /// # #[tokio::main]
183    /// # async fn main() {
184    /// let pipe = heavy_calc.pipe().cache(100);
185    /// let result1 = pipe.apply(Context::empty(10)).await;
186    /// assert_eq!(result1, 100);
187    /// let result2 = pipe.apply(Context::empty(10)).await; // Should hit cache
188    /// assert_eq!(result2, 100);
189    /// assert_eq!(CALL_COUNT.load(Ordering::SeqCst), 1);
190    /// # }
191    /// ```
192    fn cache(self, capacity: usize) -> Cache<Pipe<Self, Args>, I, O>
193    where
194        Self: Sized,
195        I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
196        O: Clone + Send + Sync + 'static,
197    {
198        Cache {
199            p: self.pipe(),
200            cache: quick_cache::sync::Cache::new(capacity),
201        }
202    }
203
204    /// Maps the pipeline over a collection of inputs concurrently.
205    /// Input for the new pipeline will be `Vec<I>` and output will be `Vec<O>`.
206    ///
207    /// # Example
208    ///
209    /// ```rust
210    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
211    ///
212    /// async fn process_item(n: Input<i32>) -> String { n.to_string() }
213    ///
214    /// # #[tokio::main]
215    /// # async fn main() {
216    /// // Input: Vec<i32> -> Output: Vec<String>
217    /// let pipe = process_item.pipe().map();
218    /// let result = pipe.apply(Context::empty(vec![1, 2, 3])).await;
219    /// assert_eq!(result, vec!["1", "2", "3"]);
220    /// # }
221    /// ```
222    fn map(self) -> crate::cocurrency::Map<Pipe<Self, Args>, I, O>
223    where
224        Self: Sized,
225    {
226        crate::cocurrency::Map {
227            p: self.pipe(),
228            _marker: PhantomData,
229        }
230    }
231}
232
233impl<F, I, O, Args> HandlerExt<I, O, Args> for F where F: Handler<I, O, Args> {}
234
235/// Wrapper struct to adapt a Handler into a Pipeline.
236pub struct Pipe<P, Args> {
237    p: P,
238    _marker: PhantomData<Args>,
239}
240
241impl<P, Args, I, O> Pipeline<I, O> for Pipe<P, Args>
242where
243    P: Handler<I, O, Args>,
244    I: Clone + Send + Sync + 'static,
245    O: Send + 'static,
246    Args: Send + Sync + 'static,
247{
248    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
249        self.p.call(ctx)
250    }
251}
252
253// PipelineExt removed to avoid ambiguity with HandlerExt
254// pub trait PipelineExt<I, O>: Pipeline<I, O> { ... }
255// impl<P, I, O> PipelineExt<I, O> for P where P: Pipeline<I, O> {}
256
257// --- Connect ---
258
259pub struct Connect<F, G, I, O1, O2> {
260    f: F,
261    g: G,
262    _marker: PhantomData<(I, O1, O2)>,
263}
264
265impl<F, G, I, O1, O2> Pipeline<I, O2> for Connect<F, G, I, O1, O2>
266where
267    F: Pipeline<I, O1>,
268    G: Pipeline<O1, O2>,
269    I: Clone + Send + Sync + 'static,
270    O1: Send + Sync + 'static,
271    O2: Send + Sync + 'static,
272{
273    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
274        async move {
275            let res = self.f.apply(ctx.clone()).await;
276            self.g.apply(ctx.replace(res)).await
277        }
278    }
279}
280
281// --- Pullback ---
282
283pub struct Pullback<P, F, I2, I, O> {
284    p: P,
285    map: F,
286    _marker: PhantomData<(I2, I, O)>,
287}
288
289impl<P, F, I2, I, O> Pipeline<I2, O> for Pullback<P, F, I2, I, O>
290where
291    P: Pipeline<I, O>,
292    F: Fn(I2) -> I + Send + Sync + 'static,
293    I2: Clone + Send + Sync + 'static,
294    I: Send + Sync + 'static,
295    O: Send + Sync + 'static,
296{
297    fn apply(&self, ctx: Context<I2>) -> impl Future<Output = O> + Send {
298        async move {
299            let mapped_input = (self.map)((*ctx.input()).clone());
300            self.p.apply(ctx.replace(mapped_input)).await
301        }
302    }
303}
304
305// --- Extend ---
306
307pub struct Extend<P, F, I, O1, O2> {
308    p: P,
309    map: F,
310    _marker: PhantomData<(I, O1, O2)>,
311}
312
313impl<P, F, I, O1, O2> Pipeline<I, O2> for Extend<P, F, I, O1, O2>
314where
315    P: Pipeline<I, O1>,
316    F: Fn(O1) -> O2 + Send + Sync + 'static,
317    I: Clone + Send + Sync + 'static,
318    O1: Send + Sync + 'static,
319    O2: Send + Sync + 'static,
320{
321    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
322        async move {
323            let res = self.p.apply(ctx).await;
324            (self.map)(res)
325        }
326    }
327}
328
329// --- Repeat ---
330
331pub struct Repeat<P, I> {
332    p: P,
333    times: usize,
334    _marker: PhantomData<I>,
335}
336
337impl<P, I> Pipeline<I, I> for Repeat<P, I>
338where
339    P: Pipeline<I, I>,
340    I: Clone + Send + Sync + 'static,
341{
342    fn apply(&self, ctx: Context<I>) -> impl Future<Output = I> + Send {
343        async move {
344            let mut current_input = (*ctx.input()).clone();
345            let mut current_ctx = ctx;
346            for _ in 0..self.times {
347                current_input = self.p.apply(current_ctx.clone()).await;
348                current_ctx = current_ctx.replace(current_input.clone());
349            }
350            current_input
351        }
352    }
353}
354
355// --- Cache ---
356
357pub struct Cache<P, I, O> {
358    p: P,
359    cache: quick_cache::sync::Cache<I, O>,
360}
361
362impl<P, I, O> Pipeline<I, O> for Cache<P, I, O>
363where
364    P: Pipeline<I, O>,
365    I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
366    O: Clone + Send + Sync + 'static,
367{
368    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
369        async move {
370            let key = (*ctx.input()).clone();
371            if let Some(val) = self.cache.get(&key) {
372                return val;
373            }
374
375            let res = self.p.apply(ctx).await;
376            self.cache.insert(key, res.clone());
377            res
378        }
379    }
380}
381
382impl<P, I, O> Handler<I, O, ()> for P
383where
384    P: Pipeline<I, O>,
385    I: Clone + Send + Sync + 'static,
386    O: Send + 'static,
387{
388    fn call(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
389        self.apply(ctx)
390    }
391}