data_pipeline_rs/
pipeline_builder.rs

1use std::marker::PhantomData;
2
3use crate::{
4    data_handler::{DataDemuxer, SomeDataHandler},
5    node::{Node, NodeRef},
6};
7
8pub trait PipelineState {}
9pub struct Open;
10pub struct Terminated;
11
12impl PipelineState for Open {}
13impl PipelineState for Terminated {}
14
15#[derive(Default)]
16pub struct PipelineBuilder<S: PipelineState, T> {
17    nodes: Vec<NodeRef<T>>,
18    _state: PhantomData<S>,
19}
20
21impl<T> Default for PipelineBuilder<Open, T> {
22    fn default() -> Self {
23        Self::new()
24    }
25}
26
27impl<T> PipelineBuilder<Open, T> {
28    pub fn new() -> Self {
29        PipelineBuilder::<Open, T> {
30            nodes: Vec::new(),
31            _state: PhantomData,
32        }
33    }
34}
35
36impl<S: PipelineState, T> PipelineBuilder<S, T> {
37    pub fn build(mut self) -> NodeRef<T> {
38        self.nodes.remove(0)
39    }
40}
41
42impl<T> PipelineBuilder<Open, T> {
43    pub fn attach<U: Into<NodeRef<T>>>(mut self, node: U) -> Self {
44        let node_ref = node.into();
45        if let Some(last) = self.nodes.last() {
46            last.set_next(node_ref.clone());
47            node_ref.set_prev(last.clone());
48        }
49        self.nodes.push(node_ref);
50        self
51    }
52
53    pub fn attach_handler<N: Into<String>, U: Into<SomeDataHandler<T>>>(
54        self,
55        name: N,
56        handler: U,
57    ) -> Self {
58        let node = Node::new(name, handler.into());
59        self.attach(node)
60    }
61
62    pub fn demux<U: Into<String>>(
63        mut self,
64        name: U,
65        demuxer: impl DataDemuxer<T> + Send + 'static,
66    ) -> PipelineBuilder<Terminated, T> {
67        let new_node = NodeRef::new(Node::new(name, SomeDataHandler::Demuxer(Box::new(demuxer))));
68        if let Some(last) = self.nodes.last() {
69            last.set_next(new_node.clone());
70            new_node.set_prev(last.clone());
71        }
72        self.nodes.push(new_node);
73        // Demuxers can't be attached to like normal nodes because they handle their own downstream
74        // paths, so although a pipeline will almost always continue after a demuxer, from a
75        // builder perspective the demuxer needs to be built with its own sub-pipelines for its
76        // downstream paths.
77        PipelineBuilder::<Terminated, T> {
78            nodes: self.nodes,
79            _state: PhantomData::<Terminated>,
80        }
81    }
82}