Skip to main content

pipe_it/
lib.rs

1use std::{
2    any::{self, TypeId},
3    collections::HashMap,
4    future::Future,
5    ops::{Deref, DerefMut},
6    sync::Arc,
7};
8
9use tokio::sync;
10
11pub use pipeit_derive::node;
12
13pub extern crate pipeit_derive;
14
15pub mod cocurrency;
16pub mod ext;
17pub mod handler;
18pub mod tag;
19// store dynamic data
20/// A map storing shared dependencies, keyed by their TypeId.
21/// Wrapped in an Arc for thread-safe sharing and using Arc<RwLock> to support owned guards.
22#[derive(Clone)]
23pub(crate) struct DendencyMap(Arc<HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>>);
24
25/// A collection of shared resources that can be injected into pipeline functions.
26///
27/// This acts as a builder to conveniently register resources before creating a Context.
28#[derive(Default, Clone)]
29pub struct Shared {
30    inner: HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>,
31}
32
33impl Shared {
34    /// Creates a new, empty shared resource collection.
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// Adds a new resource to the collection.
40    /// The resource will be wrapped in an Arc<RwLock> to allow concurrent access.
41    pub fn insert<T: Send + Sync + 'static>(mut self, resource: T) -> Self {
42        self.inner.insert(
43            TypeId::of::<T>(),
44            Arc::new(sync::RwLock::new(
45                Box::new(resource) as Box<dyn any::Any + Send + Sync>
46            )),
47        );
48        self
49    }
50}
51
52/// A read-only resource container that mirrors Bevy's Res.
53/// It holds an owned read lock on a shared dependency and provides strong typing via Deref.
54pub struct Res<T>(
55    sync::OwnedRwLockReadGuard<Box<dyn any::Any + Send + Sync>>,
56    std::marker::PhantomData<T>,
57);
58
59/// A mutable resource container that mirrors Bevy's ResMut.
60/// It holds an owned write lock on a shared dependency and provides strong typing via Deref/DerefMut.
61pub struct ResMut<T>(
62    sync::OwnedRwLockWriteGuard<Box<dyn any::Any + Send + Sync>>,
63    std::marker::PhantomData<T>,
64);
65
66impl<T: 'static> Deref for Res<T> {
67    type Target = T;
68    fn deref(&self) -> &Self::Target {
69        // Here we perform the downcast from dyn Any to the strong type T.
70        // This is exactly how Bevy provides strong typing from a generic container.
71        (**self.0)
72            .downcast_ref::<T>()
73            .expect("Resource type mismatch during Res deref")
74    }
75}
76
77impl<T: 'static> Deref for ResMut<T> {
78    type Target = T;
79    fn deref(&self) -> &Self::Target {
80        (**self.0)
81            .downcast_ref::<T>()
82            .expect("Resource type mismatch during ResMut deref")
83    }
84}
85
86impl<T: 'static> DerefMut for ResMut<T> {
87    fn deref_mut(&mut self) -> &mut Self::Target {
88        // Performs mutable downcast to provide &mut T.
89        (**self.0)
90            .downcast_mut::<T>()
91            .expect("Resource type mismatch during ResMut deref_mut")
92    }
93}
94
95/// The execution context for a pipeline, carrying both shared dependencies and the specific input.
96pub struct Context<Input>
97where
98    Input: ?Sized,
99{
100    shared: DendencyMap,
101    input: Arc<Input>,
102}
103
104impl<I> Context<I> {
105    /// Creates a new context with the provided input and shared dependency collection.
106    pub fn new(input: I, shared: Shared) -> Self {
107        Self {
108            shared: DendencyMap(Arc::new(shared.inner)),
109            input: Arc::new(input),
110        }
111    }
112    pub fn empty(input: I) -> Self {
113        Self {
114            shared: DendencyMap(Arc::new(Shared::new().inner)),
115            input: Arc::new(input),
116        }
117    }
118    /// Replaces the input of the current context while keeping the same shared dependencies.
119    /// Returns a new Context with the updated input type.
120    pub(crate) fn replace<NewInput>(self, input: NewInput) -> Context<NewInput> {
121        Context {
122            shared: self.shared,
123            input: Arc::new(input),
124        }
125    }
126
127    /// Returns a reference to the current input wrapped in an Arc.
128    pub(crate) fn input(&self) -> Arc<I> {
129        self.input.clone()
130    }
131
132    /// Consumes the Context and returns the input Arc and the shared dependencies.
133    /// This allows avoiding the clone of the input if the Context is no longer needed.
134    pub(crate) fn into_parts(self) -> (Arc<I>, DendencyMap) {
135        (self.input, self.shared)
136    }
137
138    /// Reconstructs a Context from its parts.
139    pub(crate) fn from_parts(input: Arc<I>, shared: DendencyMap) -> Self {
140        Self {
141            shared,
142            input,
143        }
144    }
145}
146impl<Input> Clone for Context<Input>
147where
148    Input: ?Sized,
149{
150    fn clone(&self) -> Self {
151        Self {
152            shared: self.shared.clone(),
153            input: self.input.clone(),
154        }
155    }
156}
157
158/// A trait for types that can be extracted from a Context.
159/// Similar to Axum's FromRequest or Bevy's SystemParam.
160pub trait FromContext<Input>: Send {
161    fn from(ctx: Context<Input>) -> impl Future<Output = Self> + Send;
162}
163
164/// A wrapper for the primary input data of the pipeline.
165#[derive(Clone)]
166pub struct Input<T>(pub Arc<T>);
167
168impl<T> Input<T> {
169    /// Returns the inner value, if the Arc has exactly one strong reference.
170    /// Otherwise, an Err is returned with the same Arc that was passed in.
171    /// // Optimized: consume context to steal Arc without cloning if possible
172    pub fn try_unwrap(self) -> Result<T, Arc<T>> {
173        Arc::try_unwrap(self.0)
174    }
175}
176impl<T> Deref for Input<T> {
177    type Target = T;
178    fn deref(&self) -> &Self::Target {
179        &self.0
180    }
181}
182
183impl<I: Send + Sync + 'static> FromContext<I> for Input<I> {
184    fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
185        let input = ctx.input.clone();
186        async move { Input(input) }
187    }
188}
189
190impl<I, T> FromContext<I> for Res<T>
191where
192    I: Send + Sync + 'static,
193    T: Send + Sync + 'static,
194{
195    fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
196        async move {
197            let shared = ctx.shared.0.clone();
198            let dep = shared
199                .get(&TypeId::of::<T>())
200                .expect("Dependency not found")
201                .clone();
202            let guard = dep.read_owned().await;
203            Res(guard, std::marker::PhantomData)
204        }
205    }
206}
207
208impl<I, T> FromContext<I> for ResMut<T>
209where
210    I: Send + Sync + 'static,
211    T: Send + Sync + 'static,
212{
213    fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
214        async move {
215            let shared = ctx.shared.0.clone();
216            let dep = shared
217                .get(&TypeId::of::<T>())
218                .expect("Dependency not found")
219                .clone();
220            let guard = dep.write_owned().await;
221            ResMut(guard, std::marker::PhantomData)
222        }
223    }
224}
225
226/// Represents a pipeline unit that can be applied to a Context.
227/// Implementations are automatically provided for functions that match the signature.
228pub trait Pipeline<I, O>: Send + Sync + 'static {
229    fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send;
230}
231
232/// Errors occurring during pipeline execution.
233#[derive(Debug, Clone)]
234pub enum PipelineError {
235    Failure { msg: String, expected: String },
236    Fatal { msg: String },
237}
238
239/// Standard Result type for pipeline operations.
240pub type PResult<O, E = PipelineError> = Result<O, E>;
241
242/// A pipeline that executes only if a predicate on the input is met.
243pub struct Cond<F, P, I, O> {
244    predicate: F,
245    next: P,
246    _marker: std::marker::PhantomData<fn(I, O)>,
247}
248
249impl<F, P, I, O> Pipeline<I, PResult<O>> for Cond<F, P, I, O>
250where
251    F: Pipeline<I, bool>,
252    P: Pipeline<I, O>,
253    I: Clone + Send + Sync + 'static,
254    O: Send + 'static,
255    F: Send + Sync + 'static,
256    P: Send + Sync + 'static,
257{
258    fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
259        async move {
260            let matched = self.predicate.apply(ctx.clone()).await;
261            if matched {
262                Ok(self.next.apply(ctx).await)
263            } else {
264                Err(PipelineError::Failure {
265                    msg: "Condition not met".to_string(),
266                    expected: "true".to_string(),
267                })
268            }
269        }
270    }
271}
272
273/// Creates a conditional pipeline.
274/// If the predicate returns true, the next pipeline is executed.
275/// If false, it returns a PipelineError::Failure.
276///
277/// # Example
278///
279/// ```rust
280/// use pipe_it::{cond, Context, Pipeline, Input, ext::HandlerExt};
281///
282/// async fn is_even(n: Input<i32>) -> bool { *n % 2 == 0 }
283/// async fn process(n: Input<i32>) -> String { "Even".to_string() }
284///
285/// # #[tokio::main]
286/// # async fn main() {
287/// let pipe = cond(is_even, process);
288///
289/// // Success case
290/// let result = pipe.apply(Context::empty(2)).await;
291/// assert_eq!(result.unwrap(), "Even");
292///
293/// // Failure case
294/// let result = pipe.apply(Context::empty(1)).await;
295/// assert!(result.is_err());
296/// # }
297/// ```
298pub fn cond<I, O, F, P, ArgsF, ArgsP>(
299    predicate: F,
300    next: P,
301) -> Cond<crate::ext::Pipe<F, ArgsF>, crate::ext::Pipe<P, ArgsP>, I, O>
302where
303    F: crate::handler::Handler<I, bool, ArgsF>,
304    P: crate::handler::Handler<I, O, ArgsP>,
305    I: Clone + Send + Sync + 'static,
306    O: Send + 'static,
307    ArgsF: Send + Sync + 'static,
308    ArgsP: Send + Sync + 'static,
309{
310    use crate::ext::HandlerExt;
311    Cond {
312        predicate: predicate.pipe(),
313        next: next.pipe(),
314        _marker: std::marker::PhantomData,
315    }
316}
317
318/// A wrapper used as a marker for choice-based pipeline execution.
319pub struct Choice<T>(pub T);
320
321macro_rules! impl_pipeline_for_tuple {
322    ($($P:ident),+) => {
323        impl<I, O, $($P),+ > Pipeline<I, PResult<O>> for ($($P,)+)
324        where
325            I: Clone + Send + Sync + 'static,
326            O: Send + 'static,
327            $( $P: Pipeline<I, PResult<O>> ),+
328        {
329            fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
330                #[allow(non_snake_case)]
331                let ($($P,)+) = self;
332                async move {
333                    $(
334                        match $P.apply(ctx.clone()).await {
335                            Ok(res) => return Ok(res),
336                            Err(PipelineError::Fatal { msg }) => return Err(PipelineError::Fatal { msg }),
337                            Err(PipelineError::Failure { .. }) => {}
338                        }
339                    )*
340                    Err(PipelineError::Fatal { msg: "All pipeline branches failed".to_string() })
341                }
342            }
343        }
344    };
345}
346
347impl_pipeline_for_tuple!(P1);
348impl_pipeline_for_tuple!(P1, P2);
349impl_pipeline_for_tuple!(P1, P2, P3);
350impl_pipeline_for_tuple!(P1, P2, P3, P4);
351impl_pipeline_for_tuple!(P1, P2, P3, P4, P5);
352impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6);
353impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7);
354impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7, P8);
355
356/// An identity pipeline that simply returns the current input.
357/// This acts as a neutral element in pipeline composition.
358pub async fn identity<I: Clone>(input: Input<I>) -> I {
359    (*input).clone()
360}
361
362/// An identity pipeline that returns the current input wrapped in a successful Result.
363/// Useful as a terminal fallback in a choice-based pipeline (tuple).
364pub async fn identity_res<I: Clone>(input: Input<I>) -> PResult<I> {
365    Ok((*input).clone())
366}