pipe_it/
ext.rs

1use crate::{Context, handler::Handler, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4
5/// Extension trait for any type that implements the Handler trait.
6/// Provides a way to convert a Handler into a Pipeline and combinators.
7pub trait HandlerExt<I, O, Args>: Handler<I, O, Args> {
8    fn pipe(self) -> Pipe<Self, Args>
9    where
10        Self: Sized,
11    {
12        Pipe {
13            p: self,
14            _marker: PhantomData,
15        }
16    }
17
18    /// Connects two pipelines together. Output of the first becomes the input of the second.
19    ///
20    /// # Example
21    ///
22    /// ```rust
23    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
24    ///
25    /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
26    /// async fn to_string(n: Input<i32>) -> String { n.to_string() }
27    /// 
28    /// # #[tokio::main]
29    /// # async fn main() {
30    /// let pipe = add_one.pipe().connect(to_string);
31    /// let result = pipe.apply(Context::empty(1)).await;
32    /// assert_eq!(result, "2");
33    /// # }
34    /// ```
35    fn connect<O2, G, Args2>(self, g: G) -> Connect<Pipe<Self, Args>, Pipe<G, Args2>, I, O, O2>
36    where
37        G: Handler<O, O2, Args2>,
38        Self: Sized,
39    {
40        Connect {
41            f: self.pipe(),
42            g: g.pipe(),
43            _marker: PhantomData,
44        }
45    }
46
47    /// Pulls back the domain of the pipeline.
48    /// This allows a pipeline defined on `I` to be used for input `I2` given a mapping `I2 -> I`.
49    /// The mapping function is now a full Handler (async, supports DI).
50    ///
51    /// # Example
52    ///
53    /// ```rust
54    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
55    ///
56    /// #[derive(Clone)]
57    /// struct User { age: i32 }
58    /// 
59    /// // Pre-processing step: extract age from User.
60    /// // Supports DI and ownership extraction.
61    /// async fn get_age(user: Input<User>) -> i32 { 
62    ///     // We can try_unwrap to get ownership if needed (optimization)
63    ///     user.try_unwrap().map(|u| u.age).unwrap_or_else(|u| u.age)
64    /// }
65    ///
66    /// async fn check_adult(age: Input<i32>) -> bool { *age >= 18 }
67    ///
68    /// # #[tokio::main]
69    /// # async fn main() {
70    /// // Input: User -> (get_age) -> i32 -> (check_adult) -> bool
71    /// let pipe = check_adult.pipe().pullback(get_age);
72    /// let result = pipe.apply(Context::empty(User { age: 20 })).await;
73    /// assert_eq!(result, true);
74    /// # }
75    /// ```
76    fn pullback<I2, H, Args2>(self, handler: H) -> Pullback<Pipe<Self, Args>, Pipe<H, Args2>, I2, I, O>
77    where
78        H: Handler<I2, I, Args2>,
79        Self: Sized,
80    {
81        Pullback {
82            p: self.pipe(),
83            map: handler.pipe(),
84            _marker: PhantomData,
85        }
86    }
87
88    /// Lifts the domain requirement to anything that can be converted into the original input.
89    /// Uses `From` / `Into` trait.
90    ///
91    /// # Example
92    ///
93    /// ```rust
94    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
95    ///
96    /// async fn process_string(s: Input<String>) -> usize { s.len() }
97    ///
98    /// # #[tokio::main]
99    /// # async fn main() {
100    /// // Accepts &str because String implements From<&str>
101    /// let pipe = process_string.pipe().lift::<&str>();
102    /// let result = pipe.apply(Context::empty("hello")).await;
103    /// assert_eq!(result, 5);
104    /// # }
105    /// ```
106    fn lift<I2>(self) -> Pullback<Pipe<Self, Args>, Pipe<LiftHandler<I, I2>, (crate::Input<I2>,)>, I2, I, O>
107    where
108        I: From<I2> + Send + Sync + 'static,
109        I2: Clone + Send + Sync + 'static,
110        Self: Sized,
111    {
112        self.pullback(LiftHandler(PhantomData))
113    }
114
115    /// Extends the output of the pipeline by applying a transformation.
116    ///
117    /// # Example
118    ///
119    /// ```rust
120    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
121    ///
122    /// async fn compute(n: Input<i32>) -> i32 { *n * 2 }
123    ///
124    /// # #[tokio::main]
125    /// # async fn main() {
126    /// // Changes output from i32 to String
127    /// let pipe = compute.pipe().extend(|n| format!("Result: {}", n));
128    /// let result = pipe.apply(Context::empty(10)).await;
129    /// assert_eq!(result, "Result: 20");
130    /// # }
131    /// ```
132    fn extend<O2, F>(self, map: F) -> Extend<Pipe<Self, Args>, F, I, O, O2>
133    where
134        F: Fn(O) -> O2 + Send + Sync + 'static,
135        Self: Sized,
136    {
137        Extend {
138            p: self.pipe(),
139            map,
140            _marker: PhantomData,
141        }
142    }
143
144    /// Repeats the pipeline operation n times.
145    /// Input and output types must be the same.
146    ///
147    /// # Example
148    ///
149    /// ```rust
150    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
151    ///
152    /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
153    ///
154    /// # #[tokio::main]
155    /// # async fn main() {
156    /// // Input: 0 -> 1 -> 2 -> 3
157    /// let pipe = add_one.pipe().repeat(3);
158    /// let result = pipe.apply(Context::empty(0)).await;
159    /// assert_eq!(result, 3);
160    /// # }
161    /// ```
162    fn repeat(self, times: usize) -> Repeat<Pipe<Self, Args>, I>
163    where
164        Self: Handler<I, I, Args> + Sized,
165        I: Clone + Send + Sync + 'static,
166    {
167        Repeat {
168            p: self.pipe(),
169            times,
170            _marker: PhantomData,
171        }
172    }
173
174    /// Caches the results of the pipeline using an LRU strategy.
175    /// Requires the input to implement `Hash + Eq + Clone` and output to implement `Clone`.
176    ///
177    /// # Example
178    ///
179    /// ```rust
180    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
181    /// use std::sync::atomic::{AtomicUsize, Ordering};
182    /// use std::sync::Arc;
183    ///
184    /// static CALL_COUNT: AtomicUsize = AtomicUsize::new(0);
185    ///
186    /// async fn heavy_calc(n: Input<i32>) -> i32 { 
187    ///     CALL_COUNT.fetch_add(1, Ordering::SeqCst);
188    ///     *n * *n 
189    /// }
190    ///
191    /// # #[tokio::main]
192    /// # async fn main() {
193    /// let pipe = heavy_calc.pipe().cache(100);
194    /// let result1 = pipe.apply(Context::empty(10)).await;
195    /// assert_eq!(result1, 100);
196    /// let result2 = pipe.apply(Context::empty(10)).await; // Should hit cache
197    /// assert_eq!(result2, 100);
198    /// assert_eq!(CALL_COUNT.load(Ordering::SeqCst), 1);
199    /// # }
200    /// ```
201    fn cache(self, capacity: usize) -> Cache<Pipe<Self, Args>, I, O>
202    where
203        Self: Sized,
204        I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
205        O: Clone + Send + Sync + 'static,
206    {
207        Cache {
208            p: self.pipe(),
209            cache: quick_cache::sync::Cache::new(capacity),
210        }
211    }
212
213    /// Maps the pipeline over a collection of inputs concurrently.
214    /// Input for the new pipeline will be `Vec<I>` and output will be `Vec<O>`.
215    ///
216    /// # Example
217    ///
218    /// ```rust
219    /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
220    ///
221    /// async fn process_item(n: Input<i32>) -> String { n.to_string() }
222    ///
223    /// # #[tokio::main]
224    /// # async fn main() {
225    /// // Input: Vec<i32> -> Output: Vec<String>
226    /// let pipe = process_item.pipe().map();
227    /// let result = pipe.apply(Context::empty(vec![1, 2, 3])).await;
228    /// assert_eq!(result, vec!["1", "2", "3"]);
229    /// # }
230    /// ```
231    fn map(self) -> crate::cocurrency::Map<Pipe<Self, Args>, I, O>
232    where
233        Self: Sized,
234    {
235        crate::cocurrency::Map {
236            p: self.pipe(),
237            _marker: PhantomData,
238        }
239    }
240}
241
242impl<F, I, O, Args> HandlerExt<I, O, Args> for F where F: Handler<I, O, Args> {}
243
244/// Wrapper struct to adapt a Handler into a Pipeline.
245pub struct Pipe<P, Args> {
246    p: P,
247    _marker: PhantomData<Args>,
248}
249
250impl<P, Args, I, O> Pipeline<I, O> for Pipe<P, Args>
251where
252    P: Handler<I, O, Args>,
253    I: Clone + Send + Sync + 'static,
254    O: Send + 'static,
255    Args: Send + Sync + 'static,
256{
257    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
258        self.p.call(ctx)
259    }
260}
261
262// PipelineExt removed to avoid ambiguity with HandlerExt
263// pub trait PipelineExt<I, O>: Pipeline<I, O> { ... }
264// impl<P, I, O> PipelineExt<I, O> for P where P: Pipeline<I, O> {}
265
266// --- Connect ---
267
268pub struct Connect<F, G, I, O1, O2> {
269    f: F,
270    g: G,
271    _marker: PhantomData<(I, O1, O2)>,
272}
273
274impl<F, G, I, O1, O2> Pipeline<I, O2> for Connect<F, G, I, O1, O2>
275where
276    F: Pipeline<I, O1>,
277    G: Pipeline<O1, O2>,
278    I: Clone + Send + Sync + 'static,
279    O1: Send + Sync + 'static,
280    O2: Send + Sync + 'static,
281{
282    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
283        async move {
284            // Optimized to release input ownership for the second stage
285            let (input, shared) = ctx.into_parts();
286            let ctx_f = Context::from_parts(input, shared.clone());
287            let res = self.f.apply(ctx_f).await;
288            self.g.apply(Context::from_parts(std::sync::Arc::new(res), shared)).await
289        }
290    }
291}
292
293// --- Pullback ---
294
295pub struct Pullback<P, H, I2, I, O> {
296    p: P,
297    map: H,
298    _marker: PhantomData<(I2, I, O)>,
299}
300
301impl<P, H, I2, I, O> Pipeline<I2, O> for Pullback<P, H, I2, I, O>
302where
303    P: Pipeline<I, O>,
304    H: Pipeline<I2, I>,
305    I2: Clone + Send + Sync + 'static,
306    I: Send + Sync + 'static,
307    O: Send + Sync + 'static,
308{
309    fn apply(&self, ctx: Context<I2>) -> impl Future<Output = O> + Send {
310        async move {
311            // Effectively H.connect(P)
312            let (input, shared) = ctx.into_parts();
313            // Run Mapper (H) on I2
314            let ctx_h = Context::from_parts(input, shared.clone());
315            let mapped_input = self.map.apply(ctx_h).await;
316            // Run P on mapped input I
317            self.p.apply(Context::from_parts(std::sync::Arc::new(mapped_input), shared)).await
318        }
319    }
320}
321
322/// Helper handler for Lift operation
323pub struct LiftHandler<I, I2>(PhantomData<(I, I2)>);
324
325impl<I, I2> Handler<I2, I, (crate::Input<I2>,)> for LiftHandler<I, I2>
326where
327    I: From<I2> + Send + Sync + 'static,
328    I2: Clone + Send + Sync + 'static,
329{
330    fn call(&self, ctx: Context<I2>) -> impl Future<Output = I> + Send {
331        async move {
332            let (input, _) = ctx.into_parts();
333            let val = match crate::Input::<I2>(input).try_unwrap() {
334                Ok(v) => v,
335                Err(arc) => (*arc).clone(),
336            };
337            val.into()
338        }
339    }
340}
341
342// --- Extend ---
343
344pub struct Extend<P, F, I, O1, O2> {
345    p: P,
346    map: F,
347    _marker: PhantomData<(I, O1, O2)>,
348}
349
350impl<P, F, I, O1, O2> Pipeline<I, O2> for Extend<P, F, I, O1, O2>
351where
352    P: Pipeline<I, O1>,
353    F: Fn(O1) -> O2 + Send + Sync + 'static,
354    I: Clone + Send + Sync + 'static,
355    O1: Send + Sync + 'static,
356    O2: Send + Sync + 'static,
357{
358    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
359        async move {
360            let res = self.p.apply(ctx).await;
361            (self.map)(res)
362        }
363    }
364}
365
366// --- Repeat ---
367
368pub struct Repeat<P, I> {
369    p: P,
370    times: usize,
371    _marker: PhantomData<I>,
372}
373
374impl<P, I> Pipeline<I, I> for Repeat<P, I>
375where
376    P: Pipeline<I, I>,
377    I: Clone + Send + Sync + 'static,
378{
379    fn apply(&self, ctx: Context<I>) -> impl Future<Output = I> + Send {
380        async move {
381            let (input_arc, shared) = ctx.into_parts();
382            let mut val = match std::sync::Arc::try_unwrap(input_arc) {
383                Ok(v) => v,
384                Err(arc) => (*arc).clone(),
385            };
386
387            for _ in 0..self.times {
388                let iter_ctx = Context::from_parts(std::sync::Arc::new(val), shared.clone());
389                val = self.p.apply(iter_ctx).await;
390            }
391            val
392        }
393    }
394}
395
396// --- Cache ---
397
398pub struct Cache<P, I, O> {
399    p: P,
400    cache: quick_cache::sync::Cache<I, O>,
401}
402
403impl<P, I, O> Pipeline<I, O> for Cache<P, I, O>
404where
405    P: Pipeline<I, O>,
406    I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
407    O: Clone + Send + Sync + 'static,
408{
409    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
410        async move {
411            let input_arc = ctx.input();
412            if let Some(val) = self.cache.get(&*input_arc) {
413                return val;
414            }
415
416            let res = self.p.apply(ctx).await;
417            self.cache.insert((*input_arc).clone(), res.clone());
418            res
419        }
420    }
421}
422
423impl<P, I, O> Handler<I, O, ()> for P
424where
425    P: Pipeline<I, O>,
426    I: Clone + Send + Sync + 'static,
427    O: Send + 'static,
428{
429    fn call(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
430        self.apply(ctx)
431    }
432}