data_pipeline_rs/
pipeline_builder.rs1use std::marker::PhantomData;
2
3use crate::{
4 data_handler::{DataDemuxer, SomeDataHandler},
5 node::{Node, NodeRef},
6};
7
8pub trait PipelineState {}
9pub struct Open;
10pub struct Terminated;
11
12impl PipelineState for Open {}
13impl PipelineState for Terminated {}
14
15#[derive(Default)]
16pub struct PipelineBuilder<S: PipelineState, T> {
17 nodes: Vec<NodeRef<T>>,
18 _state: PhantomData<S>,
19}
20
21impl<T> Default for PipelineBuilder<Open, T> {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27impl<T> PipelineBuilder<Open, T> {
28 pub fn new() -> Self {
29 PipelineBuilder::<Open, T> {
30 nodes: Vec::new(),
31 _state: PhantomData,
32 }
33 }
34}
35
36impl<S: PipelineState, T> PipelineBuilder<S, T> {
37 pub fn build(mut self) -> NodeRef<T> {
38 self.nodes.remove(0)
39 }
40}
41
42impl<T> PipelineBuilder<Open, T> {
43 pub fn attach<U: Into<NodeRef<T>>>(mut self, node: U) -> Self {
44 let node_ref = node.into();
45 if let Some(last) = self.nodes.last() {
46 last.set_next(node_ref.clone());
47 node_ref.set_prev(last.clone());
48 }
49 self.nodes.push(node_ref);
50 self
51 }
52
53 pub fn attach_handler<N: Into<String>, U: Into<SomeDataHandler<T>>>(
54 self,
55 name: N,
56 handler: U,
57 ) -> Self {
58 let node = Node::new(name, handler.into());
59 self.attach(node)
60 }
61
62 pub fn demux<U: Into<String>>(
63 mut self,
64 name: U,
65 demuxer: impl DataDemuxer<T> + Send + 'static,
66 ) -> PipelineBuilder<Terminated, T> {
67 let new_node = NodeRef::new(Node::new(name, SomeDataHandler::Demuxer(Box::new(demuxer))));
68 if let Some(last) = self.nodes.last() {
69 last.set_next(new_node.clone());
70 new_node.set_prev(last.clone());
71 }
72 self.nodes.push(new_node);
73 PipelineBuilder::<Terminated, T> {
78 nodes: self.nodes,
79 _state: PhantomData::<Terminated>,
80 }
81 }
82}