1use std::{
2 any::{self, TypeId},
3 collections::HashMap,
4 future::Future,
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use tokio::sync;
10
11pub use pipeit_derive::node;
12
13pub extern crate pipeit_derive;
14
15pub mod cocurrency;
16pub mod ext;
17pub mod handler;
18pub mod tag;
19#[derive(Clone)]
23pub(crate) struct DendencyMap(Arc<HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>>);
24
25#[derive(Default, Clone)]
29pub struct Shared {
30 inner: HashMap<TypeId, Arc<sync::RwLock<Box<dyn any::Any + Send + Sync>>>>,
31}
32
33impl Shared {
34 pub fn new() -> Self {
36 Self::default()
37 }
38
39 pub fn insert<T: Send + Sync + 'static>(mut self, resource: T) -> Self {
42 self.inner.insert(
43 TypeId::of::<T>(),
44 Arc::new(sync::RwLock::new(
45 Box::new(resource) as Box<dyn any::Any + Send + Sync>
46 )),
47 );
48 self
49 }
50}
51
52pub struct Res<T>(
55 sync::OwnedRwLockReadGuard<Box<dyn any::Any + Send + Sync>>,
56 std::marker::PhantomData<T>,
57);
58
59pub struct ResMut<T>(
62 sync::OwnedRwLockWriteGuard<Box<dyn any::Any + Send + Sync>>,
63 std::marker::PhantomData<T>,
64);
65
66impl<T: 'static> Deref for Res<T> {
67 type Target = T;
68 fn deref(&self) -> &Self::Target {
69 (**self.0)
72 .downcast_ref::<T>()
73 .expect("Resource type mismatch during Res deref")
74 }
75}
76
77impl<T: 'static> Deref for ResMut<T> {
78 type Target = T;
79 fn deref(&self) -> &Self::Target {
80 (**self.0)
81 .downcast_ref::<T>()
82 .expect("Resource type mismatch during ResMut deref")
83 }
84}
85
86impl<T: 'static> DerefMut for ResMut<T> {
87 fn deref_mut(&mut self) -> &mut Self::Target {
88 (**self.0)
90 .downcast_mut::<T>()
91 .expect("Resource type mismatch during ResMut deref_mut")
92 }
93}
94
95pub struct Context<Input>
97where
98 Input: ?Sized,
99{
100 shared: DendencyMap,
101 input: Arc<Input>,
102}
103
104impl<I> Context<I> {
105 pub fn new(input: I, shared: Shared) -> Self {
107 Self {
108 shared: DendencyMap(Arc::new(shared.inner)),
109 input: Arc::new(input),
110 }
111 }
112 pub fn empty(input: I) -> Self {
113 Self {
114 shared: DendencyMap(Arc::new(Shared::new().inner)),
115 input: Arc::new(input),
116 }
117 }
118 pub(crate) fn replace<NewInput>(self, input: NewInput) -> Context<NewInput> {
121 Context {
122 shared: self.shared,
123 input: Arc::new(input),
124 }
125 }
126
127 pub(crate) fn input(&self) -> Arc<I> {
129 self.input.clone()
130 }
131
132 pub(crate) fn into_parts(self) -> (Arc<I>, DendencyMap) {
135 (self.input, self.shared)
136 }
137
138 pub(crate) fn from_parts(input: Arc<I>, shared: DendencyMap) -> Self {
140 Self {
141 shared,
142 input,
143 }
144 }
145}
146impl<Input> Clone for Context<Input>
147where
148 Input: ?Sized,
149{
150 fn clone(&self) -> Self {
151 Self {
152 shared: self.shared.clone(),
153 input: self.input.clone(),
154 }
155 }
156}
157
158pub trait FromContext<Input>: Send {
161 fn from(ctx: Context<Input>) -> impl Future<Output = Self> + Send;
162}
163
164#[derive(Clone)]
166pub struct Input<T>(pub Arc<T>);
167
168impl<T> Input<T> {
169 pub fn try_unwrap(self) -> Result<T, Arc<T>> {
173 Arc::try_unwrap(self.0)
174 }
175}
176impl<T> Deref for Input<T> {
177 type Target = T;
178 fn deref(&self) -> &Self::Target {
179 &self.0
180 }
181}
182
183impl<I: Send + Sync + 'static> FromContext<I> for Input<I> {
184 fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
185 let input = ctx.input.clone();
186 async move { Input(input) }
187 }
188}
189
190impl<I, T> FromContext<I> for Res<T>
191where
192 I: Send + Sync + 'static,
193 T: Send + Sync + 'static,
194{
195 fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
196 async move {
197 let shared = ctx.shared.0.clone();
198 let dep = shared
199 .get(&TypeId::of::<T>())
200 .expect("Dependency not found")
201 .clone();
202 let guard = dep.read_owned().await;
203 Res(guard, std::marker::PhantomData)
204 }
205 }
206}
207
208impl<I, T> FromContext<I> for ResMut<T>
209where
210 I: Send + Sync + 'static,
211 T: Send + Sync + 'static,
212{
213 fn from(ctx: Context<I>) -> impl Future<Output = Self> + Send {
214 async move {
215 let shared = ctx.shared.0.clone();
216 let dep = shared
217 .get(&TypeId::of::<T>())
218 .expect("Dependency not found")
219 .clone();
220 let guard = dep.write_owned().await;
221 ResMut(guard, std::marker::PhantomData)
222 }
223 }
224}
225
226pub trait Pipeline<I, O>: Send + Sync + 'static {
229 fn apply(&self, ctx: Context<I>) -> impl Future<Output = O> + Send;
230}
231
232#[derive(Debug, Clone)]
234pub enum PipelineError {
235 Failure { msg: String, expected: String },
236 Fatal { msg: String },
237}
238
239pub type PResult<O, E = PipelineError> = Result<O, E>;
241
242pub struct Cond<F, P, I, O> {
244 predicate: F,
245 next: P,
246 _marker: std::marker::PhantomData<fn(I, O)>,
247}
248
249impl<F, P, I, O> Pipeline<I, PResult<O>> for Cond<F, P, I, O>
250where
251 F: Pipeline<I, bool>,
252 P: Pipeline<I, O>,
253 I: Clone + Send + Sync + 'static,
254 O: Send + 'static,
255 F: Send + Sync + 'static,
256 P: Send + Sync + 'static,
257{
258 fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
259 async move {
260 let matched = self.predicate.apply(ctx.clone()).await;
261 if matched {
262 Ok(self.next.apply(ctx).await)
263 } else {
264 Err(PipelineError::Failure {
265 msg: "Condition not met".to_string(),
266 expected: "true".to_string(),
267 })
268 }
269 }
270 }
271}
272
273pub fn cond<I, O, F, P, ArgsF, ArgsP>(
299 predicate: F,
300 next: P,
301) -> Cond<crate::ext::Pipe<F, ArgsF>, crate::ext::Pipe<P, ArgsP>, I, O>
302where
303 F: crate::handler::Handler<I, bool, ArgsF>,
304 P: crate::handler::Handler<I, O, ArgsP>,
305 I: Clone + Send + Sync + 'static,
306 O: Send + 'static,
307 ArgsF: Send + Sync + 'static,
308 ArgsP: Send + Sync + 'static,
309{
310 use crate::ext::HandlerExt;
311 Cond {
312 predicate: predicate.pipe(),
313 next: next.pipe(),
314 _marker: std::marker::PhantomData,
315 }
316}
317
318pub struct Choice<T>(pub T);
320
321macro_rules! impl_pipeline_for_tuple {
322 ($($P:ident),+) => {
323 impl<I, O, $($P),+ > Pipeline<I, PResult<O>> for ($($P,)+)
324 where
325 I: Clone + Send + Sync + 'static,
326 O: Send + 'static,
327 $( $P: Pipeline<I, PResult<O>> ),+
328 {
329 fn apply(&self, ctx: Context<I>) -> impl Future<Output = PResult<O>> + Send {
330 #[allow(non_snake_case)]
331 let ($($P,)+) = self;
332 async move {
333 $(
334 match $P.apply(ctx.clone()).await {
335 Ok(res) => return Ok(res),
336 Err(PipelineError::Fatal { msg }) => return Err(PipelineError::Fatal { msg }),
337 Err(PipelineError::Failure { .. }) => {}
338 }
339 )*
340 Err(PipelineError::Fatal { msg: "All pipeline branches failed".to_string() })
341 }
342 }
343 }
344 };
345}
346
347impl_pipeline_for_tuple!(P1);
348impl_pipeline_for_tuple!(P1, P2);
349impl_pipeline_for_tuple!(P1, P2, P3);
350impl_pipeline_for_tuple!(P1, P2, P3, P4);
351impl_pipeline_for_tuple!(P1, P2, P3, P4, P5);
352impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6);
353impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7);
354impl_pipeline_for_tuple!(P1, P2, P3, P4, P5, P6, P7, P8);
355
356pub async fn identity<I: Clone>(input: Input<I>) -> I {
359 (*input).clone()
360}
361
362pub async fn identity_res<I: Clone>(input: Input<I>) -> PResult<I> {
365 Ok((*input).clone())
366}