pipe_it/ext.rs
1use crate::{Context, handler::Handler, Pipeline};
2use std::future::Future;
3use std::marker::PhantomData;
4
5/// Extension trait for any type that implements the Handler trait.
6/// Provides a way to convert a Handler into a Pipeline and combinators.
7pub trait HandlerExt<I, O, Args>: Handler<I, O, Args> {
8 fn pipe(self) -> Pipe<Self, Args>
9 where
10 Self: Sized,
11 {
12 Pipe {
13 p: self,
14 _marker: PhantomData,
15 }
16 }
17
18 /// Connects two pipelines together. Output of the first becomes the input of the second.
19 ///
20 /// # Example
21 ///
22 /// ```rust
23 /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
24 ///
25 /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
26 /// async fn to_string(n: Input<i32>) -> String { n.to_string() }
27 ///
28 /// # #[tokio::main]
29 /// # async fn main() {
30 /// let pipe = add_one.pipe().connect(to_string);
31 /// let result = pipe.apply(Context::empty(1)).await;
32 /// assert_eq!(result, "2");
33 /// # }
34 /// ```
35 fn connect<O2, G, Args2>(self, g: G) -> Connect<Pipe<Self, Args>, Pipe<G, Args2>, I, O, O2>
36 where
37 G: Handler<O, O2, Args2>,
38 Self: Sized,
39 {
40 Connect {
41 f: self.pipe(),
42 g: g.pipe(),
43 _marker: PhantomData,
44 }
45 }
46
47 /// Pulls back the domain of the pipeline.
48 /// This allows a pipeline defined on `I` to be used for input `I2` given a mapping `I2 -> I`.
49 ///
50 /// # Example
51 ///
52 /// ```rust
53 /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
54 ///
55 /// #[derive(Clone)]
56 /// struct User { age: i32 }
57 /// async fn check_adult(age: Input<i32>) -> bool { *age >= 18 }
58 ///
59 /// # #[tokio::main]
60 /// # async fn main() {
61 /// //Create a pipeline that accepts `User` but processes `i32`
62 /// let pipe = check_adult.pipe().pullback(|u: User| u.age);
63 /// let result = pipe.apply(Context::empty(User { age: 20 })).await;
64 /// assert_eq!(result, true);
65 /// # }
66 /// ```
67 fn pullback<I2, F>(self, map: F) -> Pullback<Pipe<Self, Args>, F, I2, I, O>
68 where
69 F: Fn(I2) -> I + Send + Sync + 'static,
70 Self: Sized,
71 {
72 Pullback {
73 p: self.pipe(),
74 map,
75 _marker: PhantomData,
76 }
77 }
78
79 /// Lifts the domain requirement to anything that can be converted into the original input.
80 /// Uses `From` / `Into` trait.
81 ///
82 /// # Example
83 ///
84 /// ```rust
85 /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
86 ///
87 /// async fn process_string(s: Input<String>) -> usize { s.len() }
88 ///
89 /// # #[tokio::main]
90 /// # async fn main() {
91 /// // Accepts &str because String implements From<&str>
92 /// let pipe = process_string.pipe().lift::<&str>();
93 /// let result = pipe.apply(Context::empty("hello")).await;
94 /// assert_eq!(result, 5);
95 /// # }
96 /// ```
97 fn lift<I2>(self) -> Pullback<Pipe<Self, Args>, fn(I2) -> I, I2, I, O>
98 where
99 I: From<I2> + Send + Sync + 'static,
100 I2: Clone + Send + Sync + 'static,
101 Self: Sized,
102 {
103 self.pullback(|i| i.into())
104 }
105
106 /// Extends the output of the pipeline by applying a transformation.
107 ///
108 /// # Example
109 ///
110 /// ```rust
111 /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
112 ///
113 /// async fn compute(n: Input<i32>) -> i32 { *n * 2 }
114 ///
115 /// # #[tokio::main]
116 /// # async fn main() {
117 /// // Changes output from i32 to String
118 /// let pipe = compute.pipe().extend(|n| format!("Result: {}", n));
119 /// let result = pipe.apply(Context::empty(10)).await;
120 /// assert_eq!(result, "Result: 20");
121 /// # }
122 /// ```
123 fn extend<O2, F>(self, map: F) -> Extend<Pipe<Self, Args>, F, I, O, O2>
124 where
125 F: Fn(O) -> O2 + Send + Sync + 'static,
126 Self: Sized,
127 {
128 Extend {
129 p: self.pipe(),
130 map,
131 _marker: PhantomData,
132 }
133 }
134
135 /// Repeats the pipeline operation n times.
136 /// Input and output types must be the same.
137 ///
138 /// # Example
139 ///
140 /// ```rust
141 /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
142 ///
143 /// async fn add_one(n: Input<i32>) -> i32 { *n + 1 }
144 ///
145 /// # #[tokio::main]
146 /// # async fn main() {
147 /// // Input: 0 -> 1 -> 2 -> 3
148 /// let pipe = add_one.pipe().repeat(3);
149 /// let result = pipe.apply(Context::empty(0)).await;
150 /// assert_eq!(result, 3);
151 /// # }
152 /// ```
153 fn repeat(self, times: usize) -> Repeat<Pipe<Self, Args>, I>
154 where
155 Self: Handler<I, I, Args> + Sized,
156 I: Clone + Send + Sync + 'static,
157 {
158 Repeat {
159 p: self.pipe(),
160 times,
161 _marker: PhantomData,
162 }
163 }
164
165 /// Caches the results of the pipeline using an LRU strategy.
166 /// Requires the input to implement `Hash + Eq + Clone` and output to implement `Clone`.
167 ///
168 /// # Example
169 ///
170 /// ```rust
171 /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
172 /// use std::sync::atomic::{AtomicUsize, Ordering};
173 /// use std::sync::Arc;
174 ///
175 /// static CALL_COUNT: AtomicUsize = AtomicUsize::new(0);
176 ///
177 /// async fn heavy_calc(n: Input<i32>) -> i32 {
178 /// CALL_COUNT.fetch_add(1, Ordering::SeqCst);
179 /// *n * *n
180 /// }
181 ///
182 /// # #[tokio::main]
183 /// # async fn main() {
184 /// let pipe = heavy_calc.pipe().cache(100);
185 /// let result1 = pipe.apply(Context::empty(10)).await;
186 /// assert_eq!(result1, 100);
187 /// let result2 = pipe.apply(Context::empty(10)).await; // Should hit cache
188 /// assert_eq!(result2, 100);
189 /// assert_eq!(CALL_COUNT.load(Ordering::SeqCst), 1);
190 /// # }
191 /// ```
192 fn cache(self, capacity: usize) -> Cache<Pipe<Self, Args>, I, O>
193 where
194 Self: Sized,
195 I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
196 O: Clone + Send + Sync + 'static,
197 {
198 Cache {
199 p: self.pipe(),
200 cache: quick_cache::sync::Cache::new(capacity),
201 }
202 }
203
204 /// Maps the pipeline over a collection of inputs concurrently.
205 /// Input for the new pipeline will be `Vec<I>` and output will be `Vec<O>`.
206 ///
207 /// # Example
208 ///
209 /// ```rust
210 /// use pipeline_core::{Context, Pipeline, Input, ext::HandlerExt};
211 ///
212 /// async fn process_item(n: Input<i32>) -> String { n.to_string() }
213 ///
214 /// # #[tokio::main]
215 /// # async fn main() {
216 /// // Input: Vec<i32> -> Output: Vec<String>
217 /// let pipe = process_item.pipe().map();
218 /// let result = pipe.apply(Context::empty(vec![1, 2, 3])).await;
219 /// assert_eq!(result, vec!["1", "2", "3"]);
220 /// # }
221 /// ```
222 fn map(self) -> crate::cocurrency::Map<Pipe<Self, Args>, I, O>
223 where
224 Self: Sized,
225 {
226 crate::cocurrency::Map {
227 p: self.pipe(),
228 _marker: PhantomData,
229 }
230 }
231}
232
233impl<F, I, O, Args> HandlerExt<I, O, Args> for F where F: Handler<I, O, Args> {}
234
235/// Wrapper struct to adapt a Handler into a Pipeline.
236pub struct Pipe<P, Args> {
237 p: P,
238 _marker: PhantomData<Args>,
239}
240
241impl<P, Args, I, O> Pipeline<I, O> for Pipe<P, Args>
242where
243 P: Handler<I, O, Args>,
244 I: Clone + Send + Sync + 'static,
245 O: Send + 'static,
246 Args: Send + Sync + 'static,
247{
248 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
249 self.p.call(ctx)
250 }
251}
252
253// PipelineExt removed to avoid ambiguity with HandlerExt
254// pub trait PipelineExt<I, O>: Pipeline<I, O> { ... }
255// impl<P, I, O> PipelineExt<I, O> for P where P: Pipeline<I, O> {}
256
257// --- Connect ---
258
259pub struct Connect<F, G, I, O1, O2> {
260 f: F,
261 g: G,
262 _marker: PhantomData<(I, O1, O2)>,
263}
264
265impl<F, G, I, O1, O2> Pipeline<I, O2> for Connect<F, G, I, O1, O2>
266where
267 F: Pipeline<I, O1>,
268 G: Pipeline<O1, O2>,
269 I: Clone + Send + Sync + 'static,
270 O1: Send + Sync + 'static,
271 O2: Send + Sync + 'static,
272{
273 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
274 async move {
275 let res = self.f.apply(ctx.clone()).await;
276 self.g.apply(ctx.replace(res)).await
277 }
278 }
279}
280
281// --- Pullback ---
282
283pub struct Pullback<P, F, I2, I, O> {
284 p: P,
285 map: F,
286 _marker: PhantomData<(I2, I, O)>,
287}
288
289impl<P, F, I2, I, O> Pipeline<I2, O> for Pullback<P, F, I2, I, O>
290where
291 P: Pipeline<I, O>,
292 F: Fn(I2) -> I + Send + Sync + 'static,
293 I2: Clone + Send + Sync + 'static,
294 I: Send + Sync + 'static,
295 O: Send + Sync + 'static,
296{
297 fn apply(&self, ctx: Context<I2>) -> impl Future<Output = O> + Send {
298 async move {
299 let mapped_input = (self.map)((*ctx.input()).clone());
300 self.p.apply(ctx.replace(mapped_input)).await
301 }
302 }
303}
304
305// --- Extend ---
306
307pub struct Extend<P, F, I, O1, O2> {
308 p: P,
309 map: F,
310 _marker: PhantomData<(I, O1, O2)>,
311}
312
313impl<P, F, I, O1, O2> Pipeline<I, O2> for Extend<P, F, I, O1, O2>
314where
315 P: Pipeline<I, O1>,
316 F: Fn(O1) -> O2 + Send + Sync + 'static,
317 I: Clone + Send + Sync + 'static,
318 O1: Send + Sync + 'static,
319 O2: Send + Sync + 'static,
320{
321 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O2> + Send {
322 async move {
323 let res = self.p.apply(ctx).await;
324 (self.map)(res)
325 }
326 }
327}
328
329// --- Repeat ---
330
331pub struct Repeat<P, I> {
332 p: P,
333 times: usize,
334 _marker: PhantomData<I>,
335}
336
337impl<P, I> Pipeline<I, I> for Repeat<P, I>
338where
339 P: Pipeline<I, I>,
340 I: Clone + Send + Sync + 'static,
341{
342 fn apply(&self, ctx: Context<I>) -> impl Future<Output = I> + Send {
343 async move {
344 let mut current_input = (*ctx.input()).clone();
345 let mut current_ctx = ctx;
346 for _ in 0..self.times {
347 current_input = self.p.apply(current_ctx.clone()).await;
348 current_ctx = current_ctx.replace(current_input.clone());
349 }
350 current_input
351 }
352 }
353}
354
355// --- Cache ---
356
357pub struct Cache<P, I, O> {
358 p: P,
359 cache: quick_cache::sync::Cache<I, O>,
360}
361
362impl<P, I, O> Pipeline<I, O> for Cache<P, I, O>
363where
364 P: Pipeline<I, O>,
365 I: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
366 O: Clone + Send + Sync + 'static,
367{
368 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
369 async move {
370 let key = (*ctx.input()).clone();
371 if let Some(val) = self.cache.get(&key) {
372 return val;
373 }
374
375 let res = self.p.apply(ctx).await;
376 self.cache.insert(key, res.clone());
377 res
378 }
379 }
380}
381
382impl<P, I, O> Handler<I, O, ()> for P
383where
384 P: Pipeline<I, O>,
385 I: Clone + Send + Sync + 'static,
386 O: Send + 'static,
387{
388 fn call(&self, ctx: Context<I>) -> impl Future<Output = O> + Send {
389 self.apply(ctx)
390 }
391}