pipe_it/
lib.rs

1use std::{
2    any::{self, TypeId},
3    collections::HashMap,
4    future::Future,
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use tokio::sync;
10
11pub use pipeit_derive::node;
12
13pub extern crate pipeit_derive;
14
15pub mod ext;
16pub mod handler;
17pub mod tag;
18pub mod cocurrency;
19// store dynamic data
20/// A map storing shared dependencies, keyed by their TypeId.
21/// Wrapped in an Arc for thread-safe sharing and using Arc<RwLock> to support owned guards.
22#[derive(Clone)]
23struct DendencyMap(Arc<HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>>);
24
25/// A collection of shared resources that can be injected into pipeline functions.
26///
27/// This acts as a builder to conveniently register resources before creating a Context.
28#[derive(Default, Clone)]
29pub struct Shared {
30    inner: HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>,
31}
32
33impl Shared {
34    /// Creates a new, empty shared resource collection.
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// Adds a new resource to the collection.
40    /// The resource will be wrapped in an Arc<RwLock> to allow concurrent access.
41    pub fn insert<T: Send + Sync + 'static>(mut self, resource: T) -> Self {
42        self.inner.insert(
43            TypeId::of::<T>(),
44            Arc::new(sync::RwLock::new(
45                Box::new(resource) as Box<dyn any::Any + Send + Sync>
46            )),
47        );
48        self
49    }
50}
51
52/// A read-only resource container that mirrors Bevy's Res.
53/// It holds an owned read lock on a shared dependency and provides strong typing via Deref.
54pub struct Res<T>(
55    sync::OwnedRwLockReadGuard<Box<dyn any::Any + Send + Sync>>,
56    std::marker::PhantomData<T>,
57);
58
59/// A mutable resource container that mirrors Bevy's ResMut.
60/// It holds an owned write lock on a shared dependency and provides strong typing via Deref/DerefMut.
61pub struct ResMut<T>(
62    sync::OwnedRwLockWriteGuard<Box<dyn any::Any + Send + Sync>>,
63    std::marker::PhantomData<T>,
64);
65
66impl<T: 'static> Deref for Res<T> {
67    type Target = T;
68    fn deref(&self) -> &Self::Target {
69        // Here we perform the downcast from dyn Any to the strong type T.
70        // This is exactly how Bevy provides strong typing from a generic container.
71        (**self.0)
72            .downcast_ref::<T>()
73            .expect("Resource type mismatch during Res deref")
74    }
75}
76
77impl<T: 'static> Deref for ResMut<T> {
78    type Target = T;
79    fn deref(&self) -> &Self::Target {
80        (**self.0)
81            .downcast_ref::<T>()
82            .expect("Resource type mismatch during ResMut deref")
83    }
84}
85
86impl<T: 'static> DerefMut for ResMut<T> {
87    fn deref_mut(&mut self) -> &mut Self::Target {
88        // Performs mutable downcast to provide &mut T.
89        (**self.0)
90            .downcast_mut::<T>()
91            .expect("Resource type mismatch during ResMut deref_mut")
92    }
93}
94
95/// The execution context for a pipeline, carrying both shared dependencies and the specific input.
96pub struct Context<Input>
97where
98    Input: ?Sized,
99{
100    shared: DendencyMap,
101    input: Arc<Input>,
102}
103
104impl<I> Context<I> {
105    /// Creates a new context with the provided input and shared dependency collection.
106    pub fn new(input: I, shared: Shared) -> Self {
107        Self {
108            shared: DendencyMap(Arc::new(shared.inner)),
109            input: Arc::new(input),
110        }
111    }
112    pub fn empty(input: I) -> Self {
113        Self {
114            shared: DendencyMap(Arc::new(Shared::new().inner)),
115            input: Arc::new(input),
116        }
117    }
118    /// Replaces the input of the current context while keeping the same shared dependencies.
119    /// Returns a new Context with the updated input type.
120    pub(crate) fn replace<NewInput>(self, input: NewInput) -> Context<NewInput> {
121        Context {
122            shared: self.shared,
123            input: Arc::new(input),
124        }
125    }
126
127    /// Returns a reference to the current input wrapped in an Arc.
128    pub(crate) fn input(&self) -> Arc<I> {
129        self.input.clone()
130    }
131}
132impl<Input> Clone for Context<Input>
133where
134    Input: ?Sized,
135{
136    fn clone(&self) -> Self {
137        Self {
138            shared: self.shared.clone(),
139            input: self.input.clone(),
140        }
141    }
142}
143
144/// A trait for types that can be extracted from a Context.
145/// Similar to Axum's FromRequest or Bevy's SystemParam.
146pub trait FromContext<Input>: Send {
147    fn from(ctx: Context<Input>) -> impl Future<Output = Self> + Send;
148}
149
150/// A wrapper for the primary input data of the pipeline.
151#[derive(Clone)]
152pub struct Input<T>(pub Arc<T>);
153
154impl<T> Deref for Input<T> {
155    type Target = T;
156    fn deref(&self) -> &Self::Target {
157        &self.0
158    }
159}
160
161impl<I: Send + Sync + 'static> FromContext<I> for Input<I> {
162    fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
163        let input = ctx.input.clone();
164        async move { Input(input) }
165    }
166}
167
168impl<I, T> FromContext<I> for Res<T>
169where
170    I: Send + Sync + 'static,
171    T: Send + Sync + 'static,
172{
173    fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
174        async move {
175            let shared = ctx.shared.0.clone();
176            let dep = shared
177                .get(&TypeId::of::<T>())
178                .expect("Dependency not found")
179                .clone();
180            let guard = dep.read_owned().await;
181            Res(guard, std::marker::PhantomData)
182        }
183    }
184}
185
186impl<I, T> FromContext<I> for ResMut<T>
187where
188    I: Send + Sync + 'static,
189    T: Send + Sync + 'static,
190{
191    fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
192        async move {
193            let shared = ctx.shared.0.clone();
194            let dep = shared
195                .get(&TypeId::of::<T>())
196                .expect("Dependency not found")
197                .clone();
198            let guard = dep.write_owned().await;
199            ResMut(guard, std::marker::PhantomData)
200        }
201    }
202}
203
204/// Represents a pipeline unit that can be applied to a Context.
205/// Implementations are automatically provided for functions that match the signature.
206pub trait Pipeline<I, O>: Send + Sync + 'static {
207    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send;
208}
209
210/// Errors occurring during pipeline execution.
211#[derive(Debug, Clone)]
212pub enum PipelineError {
213    Failure { msg: String, expected: String },
214    Fatal { msg: String },
215}
216
217/// Standard Result type for pipeline operations.
218pub type PResult<O, E = PipelineError> = Result<O, E>;
219
220/// A pipeline that executes only if a predicate on the input is met.
221pub struct Cond<F, P, I, O> {
222    predicate: F,
223    next: P,
224    _marker: std::marker::PhantomData<fn(I, O)>,
225}
226
227impl<F, P, I, O> Pipeline<I, PResult<O>> for Cond<F, P, I, O>
228where
229    F: Pipeline<I, bool>,
230    P: Pipeline<I, O>,
231    I: Clone + Send + Sync + 'static,
232    O: Send + 'static,
233    F: Send + Sync + 'static,
234    P: Send + Sync + 'static,
235{
236    fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
237        async move {
238            let matched = self.predicate.apply(ctx.clone()).await;
239            if matched {
240                Ok(self.next.apply(ctx).await)
241            } else {
242                Err(PipelineError::Failure {
243                    msg: "Condition not met".to_string(),
244                    expected: "true".to_string(),
245                })
246            }
247        }
248    }
249}
250
251/// Creates a conditional pipeline.
252/// If the predicate returns true, the next pipeline is executed.
253/// If false, it returns a PipelineError::Failure.
254///
255/// # Example
256///
257/// ```rust
258/// use pipeline_core::{cond, Context, Pipeline, Input, ext::HandlerExt};
259///
260/// async fn is_even(n: Input<i32>) -> bool { *n % 2 == 0 }
261/// async fn process(n: Input<i32>) -> String { "Even".to_string() }
262///
263/// # #[tokio::main]
264/// # async fn main() {
265/// let pipe = cond(is_even, process);
266/// 
267/// // Success case
268/// let result = pipe.apply(Context::empty(2)).await;
269/// assert_eq!(result.unwrap(), "Even");
270/// 
271/// // Failure case
272/// let result = pipe.apply(Context::empty(1)).await;
273/// assert!(result.is_err());
274/// # }
275/// ```
276pub fn cond<I, O, F, P, ArgsF, ArgsP>(
277    predicate: F,
278    next: P,
279) -> Cond<crate::ext::Pipe<F, ArgsF>, crate::ext::Pipe<P, ArgsP>, I, O>
280where
281    F: crate::handler::Handler<I, bool, ArgsF>,
282    P: crate::handler::Handler<I, O, ArgsP>,
283    I: Clone + Send + Sync + 'static,
284    O: Send + 'static,
285    ArgsF: Send + Sync + 'static,
286    ArgsP: Send + Sync + 'static,
287{
288    use crate::ext::HandlerExt;
289    Cond {
290        predicate: predicate.pipe(),
291        next: next.pipe(),
292        _marker: std::marker::PhantomData,
293    }
294}
295
296/// A wrapper used as a marker for choice-based pipeline execution.
297pub struct Choice<T>(pub T);
298
299macro_rules! impl_pipeline_for_tuple {
300    ($($P:ident),+) => {
301        impl<I, O, $($P),+ > Pipeline<I, PResult<O>> for ($($P,)+)
302        where
303            I: Clone + Send + Sync + 'static,
304            O: Send + 'static,
305            $( $P: Pipeline<I, PResult<O>> ),+
306        {
307            fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
308                #[allow(non_snake_case)]
309                let ($($P,)+) = self;
310                async move {
311                    $(
312                        match $P.apply(ctx.clone()).await {
313                            Ok(res) => return Ok(res),
314                            Err(PipelineError::Fatal { msg }) => return Err(PipelineError::Fatal { msg }),
315                            Err(PipelineError::Failure { .. }) => {}
316                        }
317                    )*
318                    Err(PipelineError::Fatal { msg: "All pipeline branches failed".to_string() })
319                }
320            }
321        }
322    };
323}
324
325impl_pipeline_for_tuple!(P1);
326impl_pipeline_for_tuple!(P1, P2);
327impl_pipeline_for_tuple!(P1, P2, P3);
328impl_pipeline_for_tuple!(P1, P2, P3, P4);
329impl_pipeline_for_tuple!(P1, P2, P3, P4, P5);
330impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6);
331impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7);
332impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7, P8);
333
334/// An identity pipeline that simply returns the current input.
335/// This acts as a neutral element in pipeline composition.
336pub async fn identity<I: Clone>(input: Input<I>) -> I {
337    (*input).clone()
338}
339
340/// An identity pipeline that returns the current input wrapped in a successful Result.
341/// Useful as a terminal fallback in a choice-based pipeline (tuple).
342pub async fn identity_res<I: Clone>(input: Input<I>) -> PResult<I> {
343    Ok((*input).clone())
344}