1use std::{
2 any::{self, TypeId},
3 collections::HashMap,
4 future::Future,
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use tokio::sync;
10
11pub use pipeit_derive::node;
12
13pub extern crate pipeit_derive;
14
15pub mod ext;
16pub mod handler;
17pub mod tag;
18pub mod cocurrency;
19#[derive(Clone)]
23struct DendencyMap(Arc<HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>>);
24
25#[derive(Default, Clone)]
29pub struct Shared {
30 inner: HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>,
31}
32
33impl Shared {
34 pub fn new() -> Self {
36 Self::default()
37 }
38
39 pub fn insert<T: Send + Sync + 'static>(mut self, resource: T) -> Self {
42 self.inner.insert(
43 TypeId::of::<T>(),
44 Arc::new(sync::RwLock::new(
45 Box::new(resource) as Box<dyn any::Any + Send + Sync>
46 )),
47 );
48 self
49 }
50}
51
52pub struct Res<T>(
55 sync::OwnedRwLockReadGuard<Box<dyn any::Any + Send + Sync>>,
56 std::marker::PhantomData<T>,
57);
58
59pub struct ResMut<T>(
62 sync::OwnedRwLockWriteGuard<Box<dyn any::Any + Send + Sync>>,
63 std::marker::PhantomData<T>,
64);
65
66impl<T: 'static> Deref for Res<T> {
67 type Target = T;
68 fn deref(&self) -> &Self::Target {
69 (**self.0)
72 .downcast_ref::<T>()
73 .expect("Resource type mismatch during Res deref")
74 }
75}
76
77impl<T: 'static> Deref for ResMut<T> {
78 type Target = T;
79 fn deref(&self) -> &Self::Target {
80 (**self.0)
81 .downcast_ref::<T>()
82 .expect("Resource type mismatch during ResMut deref")
83 }
84}
85
86impl<T: 'static> DerefMut for ResMut<T> {
87 fn deref_mut(&mut self) -> &mut Self::Target {
88 (**self.0)
90 .downcast_mut::<T>()
91 .expect("Resource type mismatch during ResMut deref_mut")
92 }
93}
94
95pub struct Context<Input>
97where
98 Input: ?Sized,
99{
100 shared: DendencyMap,
101 input: Arc<Input>,
102}
103
104impl<I> Context<I> {
105 pub fn new(input: I, shared: Shared) -> Self {
107 Self {
108 shared: DendencyMap(Arc::new(shared.inner)),
109 input: Arc::new(input),
110 }
111 }
112 pub fn empty(input: I) -> Self {
113 Self {
114 shared: DendencyMap(Arc::new(Shared::new().inner)),
115 input: Arc::new(input),
116 }
117 }
118 pub(crate) fn replace<NewInput>(self, input: NewInput) -> Context<NewInput> {
121 Context {
122 shared: self.shared,
123 input: Arc::new(input),
124 }
125 }
126
127 pub(crate) fn input(&self) -> Arc<I> {
129 self.input.clone()
130 }
131}
132impl<Input> Clone for Context<Input>
133where
134 Input: ?Sized,
135{
136 fn clone(&self) -> Self {
137 Self {
138 shared: self.shared.clone(),
139 input: self.input.clone(),
140 }
141 }
142}
143
144pub trait FromContext<Input>: Send {
147 fn from(ctx: Context<Input>) -> impl Future<Output = Self> + Send;
148}
149
150#[derive(Clone)]
152pub struct Input<T>(pub Arc<T>);
153
154impl<T> Deref for Input<T> {
155 type Target = T;
156 fn deref(&self) -> &Self::Target {
157 &self.0
158 }
159}
160
161impl<I: Send + Sync + 'static> FromContext<I> for Input<I> {
162 fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
163 let input = ctx.input.clone();
164 async move { Input(input) }
165 }
166}
167
168impl<I, T> FromContext<I> for Res<T>
169where
170 I: Send + Sync + 'static,
171 T: Send + Sync + 'static,
172{
173 fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
174 async move {
175 let shared = ctx.shared.0.clone();
176 let dep = shared
177 .get(&TypeId::of::<T>())
178 .expect("Dependency not found")
179 .clone();
180 let guard = dep.read_owned().await;
181 Res(guard, std::marker::PhantomData)
182 }
183 }
184}
185
186impl<I, T> FromContext<I> for ResMut<T>
187where
188 I: Send + Sync + 'static,
189 T: Send + Sync + 'static,
190{
191 fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
192 async move {
193 let shared = ctx.shared.0.clone();
194 let dep = shared
195 .get(&TypeId::of::<T>())
196 .expect("Dependency not found")
197 .clone();
198 let guard = dep.write_owned().await;
199 ResMut(guard, std::marker::PhantomData)
200 }
201 }
202}
203
204pub trait Pipeline<I, O>: Send + Sync + 'static {
207 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send;
208}
209
210#[derive(Debug, Clone)]
212pub enum PipelineError {
213 Failure { msg: String, expected: String },
214 Fatal { msg: String },
215}
216
217pub type PResult<O, E = PipelineError> = Result<O, E>;
219
220pub struct Cond<F, P, I, O> {
222 predicate: F,
223 next: P,
224 _marker: std::marker::PhantomData<fn(I, O)>,
225}
226
227impl<F, P, I, O> Pipeline<I, PResult<O>> for Cond<F, P, I, O>
228where
229 F: Pipeline<I, bool>,
230 P: Pipeline<I, O>,
231 I: Clone + Send + Sync + 'static,
232 O: Send + 'static,
233 F: Send + Sync + 'static,
234 P: Send + Sync + 'static,
235{
236 fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
237 async move {
238 let matched = self.predicate.apply(ctx.clone()).await;
239 if matched {
240 Ok(self.next.apply(ctx).await)
241 } else {
242 Err(PipelineError::Failure {
243 msg: "Condition not met".to_string(),
244 expected: "true".to_string(),
245 })
246 }
247 }
248 }
249}
250
251pub fn cond<I, O, F, P, ArgsF, ArgsP>(
277 predicate: F,
278 next: P,
279) -> Cond<crate::ext::Pipe<F, ArgsF>, crate::ext::Pipe<P, ArgsP>, I, O>
280where
281 F: crate::handler::Handler<I, bool, ArgsF>,
282 P: crate::handler::Handler<I, O, ArgsP>,
283 I: Clone + Send + Sync + 'static,
284 O: Send + 'static,
285 ArgsF: Send + Sync + 'static,
286 ArgsP: Send + Sync + 'static,
287{
288 use crate::ext::HandlerExt;
289 Cond {
290 predicate: predicate.pipe(),
291 next: next.pipe(),
292 _marker: std::marker::PhantomData,
293 }
294}
295
296pub struct Choice<T>(pub T);
298
299macro_rules! impl_pipeline_for_tuple {
300 ($($P:ident),+) => {
301 impl<I, O, $($P),+ > Pipeline<I, PResult<O>> for ($($P,)+)
302 where
303 I: Clone + Send + Sync + 'static,
304 O: Send + 'static,
305 $( $P: Pipeline<I, PResult<O>> ),+
306 {
307 fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
308 #[allow(non_snake_case)]
309 let ($($P,)+) = self;
310 async move {
311 $(
312 match $P.apply(ctx.clone()).await {
313 Ok(res) => return Ok(res),
314 Err(PipelineError::Fatal { msg }) => return Err(PipelineError::Fatal { msg }),
315 Err(PipelineError::Failure { .. }) => {}
316 }
317 )*
318 Err(PipelineError::Fatal { msg: "All pipeline branches failed".to_string() })
319 }
320 }
321 }
322 };
323}
324
325impl_pipeline_for_tuple!(P1);
326impl_pipeline_for_tuple!(P1, P2);
327impl_pipeline_for_tuple!(P1, P2, P3);
328impl_pipeline_for_tuple!(P1, P2, P3, P4);
329impl_pipeline_for_tuple!(P1, P2, P3, P4, P5);
330impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6);
331impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7);
332impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7, P8);
333
334pub async fn identity<I: Clone>(input: Input<I>) -> I {
337 (*input).clone()
338}
339
340pub async fn identity_res<I: Clone>(input: Input<I>) -> PResult<I> {
343 Ok((*input).clone())
344}