Skip to main content

zeph_core/pipeline/
builder.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::PipelineError;
5use super::step::Step;
6
7pub trait Runnable: Send + Sync {
8    type Input: Send;
9    type Output: Send;
10
11    fn run(
12        &self,
13        input: Self::Input,
14    ) -> impl std::future::Future<Output = Result<Self::Output, PipelineError>> + Send;
15}
16
17pub struct Start<S>(S);
18
19impl<S: Step> Runnable for Start<S> {
20    type Input = S::Input;
21    type Output = S::Output;
22
23    async fn run(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
24        self.0.run(input).await
25    }
26}
27
28pub struct Chain<Prev, Current> {
29    prev: Prev,
30    current: Current,
31}
32
33impl<Prev, Current> Runnable for Chain<Prev, Current>
34where
35    Prev: Runnable,
36    Current: Step<Input = Prev::Output>,
37{
38    type Input = Prev::Input;
39    type Output = Current::Output;
40
41    async fn run(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
42        let intermediate = self.prev.run(input).await?;
43        self.current.run(intermediate).await
44    }
45}
46
47pub struct Pipeline<S> {
48    steps: S,
49}
50
51impl Pipeline<()> {
52    #[must_use]
53    pub fn start<S: Step>(step: S) -> Pipeline<Start<S>> {
54        Pipeline { steps: Start(step) }
55    }
56}
57
58impl<S> Pipeline<S> {
59    #[must_use]
60    pub fn step<T: Step>(self, step: T) -> Pipeline<Chain<S, T>> {
61        Pipeline {
62            steps: Chain {
63                prev: self.steps,
64                current: step,
65            },
66        }
67    }
68}
69
70impl<S: Runnable> Pipeline<S> {
71    /// # Errors
72    ///
73    /// Returns `PipelineError` if any step in the pipeline fails.
74    pub async fn run(&self, input: S::Input) -> Result<S::Output, PipelineError> {
75        self.steps.run(input).await
76    }
77}