pipe_it/ext.rs
1use crate::{Context, handler::Handler, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4
5/// Extension trait for any type that implements the Handler trait.
6/// Provides a way to convert a Handler into a Pipeline and combinators.
7pub trait HandlerExt<I, O, Args>: Handler<I, O, Args> {
8 fn pipe(self) -> Pipe<Self, Args>
9 where
10 Self: Sized,
11 {
12 Pipe {
13 p: self,
14 _marker: PhantomData,
15 }
16 }
17
18 /// Connects two pipelines together. Output of the first becomes the input of the second.
19 ///
20 /// # Example
21 ///
22 /// ```rust
23 /// use pipe_it::{Context, Pipeline, Input, ext::HandlerExt};
24 ///
25 /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
26 /// async fn to_string(n: Input<i32>) -> String { n.to_string() }
27 ///
28 /// # #[tokio::main]
29 /// # async fn main() {
30 /// let pipe = add_one.pipe().connect(to_string);
31 /// let result = pipe.apply(Context::empty(1)).await;
32 /// assert_eq!(result, "2");
33 /// # }
34 /// ```
35 fn connect<O2, G, Args2>(self, g: G) -> Connect<Pipe<Self, Args>, Pipe<G, Args2>, I, O, O2>
36 where
37 G: Handler<O, O2, Args2>,
38 Self: Sized,
39 {
40 Connect {
41 f: self.pipe(),
42 g: g.pipe(),
43 _marker: PhantomData,
44 }
45 }
46
47 /// Pulls back the domain of the pipeline.
48 /// This allows a pipeline defined on `I` to be used for input `I2` given a mapping `I2 -> I`.
49 /// The mapping function is now a full Handler (async, supports DI).
50 ///
51 /// # Example
52 ///
53 /// ```rust
54 /// use pipe_it::{Context, Pipeline, Input, ext::HandlerExt};
55 ///
56 /// #[derive(Clone)]
57 /// struct User { age: i32 }
58 ///
59 /// // Pre-processing step: extract age from User.
60 /// // Supports DI and ownership extraction.
61 /// async fn get_age(user: Input<User>) -> i32 {
62 /// // We can try_unwrap to get ownership if needed (optimization)
63 /// user.try_unwrap().map(|u| u.age).unwrap_or_else(|u| u.age)
64 /// }
65 ///
66 /// async fn check_adult(age: Input<i32>) -> bool { *age >= 18 }
67 ///
68 /// # #[tokio::main]
69 /// # async fn main() {
70 /// // Input: User -> (get_age) -> i32 -> (check_adult) -> bool
71 /// let pipe = check_adult.pipe().pullback(get_age);
72 /// let result = pipe.apply(Context::empty(User { age: 20 })).await;
73 /// assert_eq!(result, true);
74 /// # }
75 /// ```
76 fn pullback<I2, H, Args2>(self, handler: H) -> Pullback<Pipe<Self, Args>, Pipe<H, Args2>, I2, I, O>
77 where
78 H: Handler<I2, I, Args2>,
79 Self: Sized,
80 {
81 Pullback {
82 p: self.pipe(),
83 map: handler.pipe(),
84 _marker: PhantomData,
85 }
86 }
87
88 /// Lifts the domain requirement to anything that can be converted into the original input.
89 /// Uses `From` / `Into` trait.
90 ///
91 /// # Example
92 ///
93 /// ```rust
94 /// use pipe_it::{Context, Pipeline, Input, ext::HandlerExt};
95 ///
96 /// async fn process_string(s: Input<String>) -> usize { s.len() }
97 ///
98 /// # #[tokio::main]
99 /// # async fn main() {
100 /// // Accepts &str because String implements From<&str>
101 /// let pipe = process_string.pipe().lift::<&str>();
102 /// let result = pipe.apply(Context::empty("hello")).await;
103 /// assert_eq!(result, 5);
104 /// # }
105 /// ```
106 fn lift<I2>(self) -> Pullback<Pipe<Self, Args>, Pipe<LiftHandler<I, I2>, (crate::Input<I2>,)>, I2, I, O>
107 where
108 I: From<I2> + Send + Sync + 'static,
109 I2: Clone + Send + Sync + 'static,
110 Self: Sized,
111 {
112 self.pullback(LiftHandler(PhantomData))
113 }
114
115 /// Extends the output of the pipeline by applying a transformation.
116 ///
117 /// # Example
118 ///
119 /// ```rust
120 /// use pipe_it::{Context, Pipeline, Input, ext::HandlerExt};
121 ///
122 /// async fn compute(n: Input<i32>) -> i32 { *n * 2 }
123 ///
124 /// # #[tokio::main]
125 /// # async fn main() {
126 /// // Changes output from i32 to String
127 /// let pipe = compute.pipe().extend(|n| format!("Result: {}", n));
128 /// let result = pipe.apply(Context::empty(10)).await;
129 /// assert_eq!(result, "Result: 20");
130 /// # }
131 /// ```
132 fn extend<O2, F>(self, map: F) -> Extend<Pipe<Self, Args>, F, I, O, O2>
133 where
134 F: Fn(O) -> O2 + Send + Sync + 'static,
135 Self: Sized,
136 {
137 Extend {
138 p: self.pipe(),
139 map,
140 _marker: PhantomData,
141 }
142 }
143
144 /// Repeats the pipeline operation n times.
145 /// Input and output types must be the same.
146 ///
147 /// # Example
148 ///
149 /// ```rust
150 /// use pipe_it::{Context, Pipeline, Input, ext::HandlerExt};
151 ///
152 /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
153 ///
154 /// # #[tokio::main]
155 /// # async fn main() {
156 /// // Input: 0 -> 1 -> 2 -> 3
157 /// let pipe = add_one.pipe().repeat(3);
158 /// let result = pipe.apply(Context::empty(0)).await;
159 /// assert_eq!(result, 3);
160 /// # }
161 /// ```
162 fn repeat(self, times: usize) -> Repeat<Pipe<Self, Args>, I>
163 where
164 Self: Handler<I, I, Args> + Sized,
165 I: Clone + Send + Sync + 'static,
166 {
167 Repeat {
168 p: self.pipe(),
169 times,
170 _marker: PhantomData,
171 }
172 }
173
174 /// Caches the results of the pipeline using an LRU strategy.
175 /// Requires the input to implement `Hash + Eq + Clone` and output to implement `Clone`.
176 ///
177 /// # Example
178 ///
179 /// ```rust
180 /// use pipe_it::{Context, Pipeline, Input, ext::HandlerExt};
181 /// use std::sync::atomic::{AtomicUsize, Ordering};
182 /// use std::sync::Arc;
183 ///
184 /// static CALL_COUNT: AtomicUsize = AtomicUsize::new(0);
185 ///
186 /// async fn heavy_calc(n: Input<i32>) -> i32 {
187 /// CALL_COUNT.fetch_add(1, Ordering::SeqCst);
188 /// *n * *n
189 /// }
190 ///
191 /// # #[tokio::main]
192 /// # async fn main() {
193 /// let pipe = heavy_calc.pipe().cache(100);
194 /// let result1 = pipe.apply(Context::empty(10)).await;
195 /// assert_eq!(result1, 100);
196 /// let result2 = pipe.apply(Context::empty(10)).await; // Should hit cache
197 /// assert_eq!(result2, 100);
198 /// assert_eq!(CALL_COUNT.load(Ordering::SeqCst), 1);
199 /// # }
200 /// ```
201 fn cache(self, capacity: usize) -> Cache<Pipe<Self, Args>, I, O>
202 where
203 Self: Sized,
204 I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
205 O: Clone + Send + Sync + 'static,
206 {
207 Cache {
208 p: self.pipe(),
209 cache: quick_cache::sync::Cache::new(capacity),
210 }
211 }
212
213 /// Maps the pipeline over a collection of inputs concurrently.
214 /// Input for the new pipeline will be `Vec<I>` and output will be `Vec<O>`.
215 ///
216 /// # Example
217 ///
218 /// ```rust
219 /// use pipe_it::{Context, Pipeline, Input, ext::HandlerExt};
220 ///
221 /// async fn process_item(n: Input<i32>) -> String { n.to_string() }
222 ///
223 /// # #[tokio::main]
224 /// # async fn main() {
225 /// // Input: Vec<i32> -> Output: Vec<String>
226 /// let pipe = process_item.pipe().map();
227 /// let result = pipe.apply(Context::empty(vec![1, 2, 3])).await;
228 /// assert_eq!(result, vec!["1", "2", "3"]);
229 /// # }
230 /// ```
231 fn map(self) -> crate::cocurrency::Map<Pipe<Self, Args>, I, O>
232 where
233 Self: Sized,
234 {
235 crate::cocurrency::Map {
236 p: self.pipe(),
237 _marker: PhantomData,
238 }
239 }
240}
241
242impl<F, I, O, Args> HandlerExt<I, O, Args> for F where F: Handler<I, O, Args> {}
243
244/// Wrapper struct to adapt a Handler into a Pipeline.
245pub struct Pipe<P, Args> {
246 p: P,
247 _marker: PhantomData<Args>,
248}
249
250impl<P, Args, I, O> Pipeline<I, O> for Pipe<P, Args>
251where
252 P: Handler<I, O, Args>,
253 I: Clone + Send + Sync + 'static,
254 O: Send + 'static,
255 Args: Send + Sync + 'static,
256{
257 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
258 self.p.call(ctx)
259 }
260}
261
262// PipelineExt removed to avoid ambiguity with HandlerExt
263// pub trait PipelineExt<I, O>: Pipeline<I, O> { ... }
264// impl<P, I, O> PipelineExt<I, O> for P where P: Pipeline<I, O> {}
265
266// --- Connect ---
267
268pub struct Connect<F, G, I, O1, O2> {
269 f: F,
270 g: G,
271 _marker: PhantomData<(I, O1, O2)>,
272}
273
274impl<F, G, I, O1, O2> Pipeline<I, O2> for Connect<F, G, I, O1, O2>
275where
276 F: Pipeline<I, O1>,
277 G: Pipeline<O1, O2>,
278 I: Clone + Send + Sync + 'static,
279 O1: Send + Sync + 'static,
280 O2: Send + Sync + 'static,
281{
282 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
283 async move {
284 // Optimized to release input ownership for the second stage
285 let (input, shared) = ctx.into_parts();
286 let ctx_f = Context::from_parts(input, shared.clone());
287 let res = self.f.apply(ctx_f).await;
288 self.g.apply(Context::from_parts(std::sync::Arc::new(res), shared)).await
289 }
290 }
291}
292
293// --- Pullback ---
294
295pub struct Pullback<P, H, I2, I, O> {
296 p: P,
297 map: H,
298 _marker: PhantomData<(I2, I, O)>,
299}
300
301impl<P, H, I2, I, O> Pipeline<I2, O> for Pullback<P, H, I2, I, O>
302where
303 P: Pipeline<I, O>,
304 H: Pipeline<I2, I>,
305 I2: Clone + Send + Sync + 'static,
306 I: Send + Sync + 'static,
307 O: Send + Sync + 'static,
308{
309 fn apply(&self, ctx: Context<I2>) -> impl Future<Output = O> + Send {
310 async move {
311 // Effectively H.connect(P)
312 let (input, shared) = ctx.into_parts();
313 // Run Mapper (H) on I2
314 let ctx_h = Context::from_parts(input, shared.clone());
315 let mapped_input = self.map.apply(ctx_h).await;
316 // Run P on mapped input I
317 self.p.apply(Context::from_parts(std::sync::Arc::new(mapped_input), shared)).await
318 }
319 }
320}
321
322/// Helper handler for Lift operation
323pub struct LiftHandler<I, I2>(PhantomData<(I, I2)>);
324
325impl<I, I2> Handler<I2, I, (crate::Input<I2>,)> for LiftHandler<I, I2>
326where
327 I: From<I2> + Send + Sync + 'static,
328 I2: Clone + Send + Sync + 'static,
329{
330 fn call(&self, ctx: Context<I2>) -> impl Future<Output = I> + Send {
331 async move {
332 let (input, _) = ctx.into_parts();
333 let val = match crate::Input::<I2>(input).try_unwrap() {
334 Ok(v) => v,
335 Err(arc) => (*arc).clone(),
336 };
337 val.into()
338 }
339 }
340}
341
342// --- Extend ---
343
344pub struct Extend<P, F, I, O1, O2> {
345 p: P,
346 map: F,
347 _marker: PhantomData<(I, O1, O2)>,
348}
349
350impl<P, F, I, O1, O2> Pipeline<I, O2> for Extend<P, F, I, O1, O2>
351where
352 P: Pipeline<I, O1>,
353 F: Fn(O1) -> O2 + Send + Sync + 'static,
354 I: Clone + Send + Sync + 'static,
355 O1: Send + Sync + 'static,
356 O2: Send + Sync + 'static,
357{
358 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
359 async move {
360 let res = self.p.apply(ctx).await;
361 (self.map)(res)
362 }
363 }
364}
365
366// --- Repeat ---
367
368pub struct Repeat<P, I> {
369 p: P,
370 times: usize,
371 _marker: PhantomData<I>,
372}
373
374impl<P, I> Pipeline<I, I> for Repeat<P, I>
375where
376 P: Pipeline<I, I>,
377 I: Clone + Send + Sync + 'static,
378{
379 fn apply(&self, ctx: Context<I>) -> impl Future<Output = I> + Send {
380 async move {
381 let (input_arc, shared) = ctx.into_parts();
382 let mut val = match std::sync::Arc::try_unwrap(input_arc) {
383 Ok(v) => v,
384 Err(arc) => (*arc).clone(),
385 };
386
387 for _ in 0..self.times {
388 let iter_ctx = Context::from_parts(std::sync::Arc::new(val), shared.clone());
389 val = self.p.apply(iter_ctx).await;
390 }
391 val
392 }
393 }
394}
395
396// --- Cache ---
397
398pub struct Cache<P, I, O> {
399 p: P,
400 cache: quick_cache::sync::Cache<I, O>,
401}
402
403impl<P, I, O> Pipeline<I, O> for Cache<P, I, O>
404where
405 P: Pipeline<I, O>,
406 I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
407 O: Clone + Send + Sync + 'static,
408{
409 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
410 async move {
411 let input_arc = ctx.input();
412 if let Some(val) = self.cache.get(&*input_arc) {
413 return val;
414 }
415
416 let res = self.p.apply(ctx).await;
417 self.cache.insert((*input_arc).clone(), res.clone());
418 res
419 }
420 }
421}
422
423impl<P, I, O> Handler<I, O, ()> for P
424where
425 P: Pipeline<I, O>,
426 I: Clone + Send + Sync + 'static,
427 O: Send + 'static,
428{
429 fn call(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
430 self.apply(ctx)
431 }
432}