zeph_core/pipeline/
builder.rs1use super::PipelineError;
5use super::step::Step;
6
7pub trait Runnable: Send + Sync {
8 type Input: Send;
9 type Output: Send;
10
11 fn run(
12 &self,
13 input: Self::Input,
14 ) -> impl std::future::Future<Output = Result<Self::Output, PipelineError>> + Send;
15}
16
17pub struct Start<S>(S);
18
19impl<S: Step> Runnable for Start<S> {
20 type Input = S::Input;
21 type Output = S::Output;
22
23 async fn run(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
24 self.0.run(input).await
25 }
26}
27
28pub struct Chain<Prev, Current> {
29 prev: Prev,
30 current: Current,
31}
32
33impl<Prev, Current> Runnable for Chain<Prev, Current>
34where
35 Prev: Runnable,
36 Current: Step<Input = Prev::Output>,
37{
38 type Input = Prev::Input;
39 type Output = Current::Output;
40
41 async fn run(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
42 let intermediate = self.prev.run(input).await?;
43 self.current.run(intermediate).await
44 }
45}
46
47pub struct Pipeline<S> {
48 steps: S,
49}
50
51impl Pipeline<()> {
52 #[must_use]
53 pub fn start<S: Step>(step: S) -> Pipeline<Start<S>> {
54 Pipeline { steps: Start(step) }
55 }
56}
57
58impl<S> Pipeline<S> {
59 #[must_use]
60 pub fn step<T: Step>(self, step: T) -> Pipeline<Chain<S, T>> {
61 Pipeline {
62 steps: Chain {
63 prev: self.steps,
64 current: step,
65 },
66 }
67 }
68}
69
70impl<S: Runnable> Pipeline<S> {
71 pub async fn run(&self, input: S::Input) -> Result<S::Output, PipelineError> {
75 self.steps.run(input).await
76 }
77}