remotia_core/pipeline/
mod.rs

1use std::fmt::Debug;
2
3use log::info;
4use tokio::{sync::mpsc::{self, UnboundedSender}, task::JoinHandle};
5
6use self::{component::Component, feeder::PipelineFeeder};
7
8pub mod component;
9pub mod feeder;
10pub mod registry;
11
12pub struct Pipeline<F> {
13    components: Vec<Component<F>>,
14    feeding_sender: Option<UnboundedSender<F>>,
15
16    tag: String,
17
18    bound: bool,
19
20    to_be_feedable: bool,
21}
22
23impl<F: Debug + Default + Send + 'static> Pipeline<F> {
24    pub fn new() -> Self {
25        Self {
26            components: Vec::new(),
27            feeding_sender: None,
28
29            tag: "".to_string(),
30
31            bound: false,
32
33            to_be_feedable: false,
34        }
35    }
36
37    pub fn singleton(component: Component<F>) -> Self {
38        Self::new().link(component)
39    }
40
41    pub fn link(mut self, component: Component<F>) -> Self {
42        self.components.push(component);
43        self
44    }
45
46    pub fn get_feeder(&mut self) -> PipelineFeeder<F> {
47        if self.to_be_feedable {
48            self.make_feedable();
49        }
50
51        let sender = self.feeding_sender.as_ref().unwrap().clone();
52        PipelineFeeder::new(sender)
53    }
54
55    pub fn run(mut self) -> Vec<JoinHandle<()>> {
56        info!("[{}] Launching threads...", self.tag);
57
58        if !self.bound {
59            self.bind();
60        }
61
62        if self.to_be_feedable {
63            self.make_feedable();
64        }
65
66        let mut handles = Vec::new();
67
68        for component in self.components {
69            let handle = component.launch();
70            handles.push(handle);
71        }
72
73        handles
74    }
75
76    fn bind(&mut self) {
77        info!("[{}] Binding channels...", self.tag);
78
79        for i in 0..self.components.len()-1 {
80            let (sender, receiver) = mpsc::unbounded_channel::<F>();
81
82            let src_component = self.components.get_mut(i).unwrap();
83            src_component.set_sender(sender);
84
85            let dst_component = self.components.get_mut(i + 1).unwrap();
86            dst_component.set_receiver(receiver);
87        }
88
89        self.bound = true;
90    }
91
92    fn make_feedable(&mut self) {
93        let head = self.components.get_mut(0).unwrap();
94
95        let (sender, receiver) = mpsc::unbounded_channel::<F>();
96        self.feeding_sender = Some(sender);
97
98        head.set_receiver(receiver);
99
100        self.to_be_feedable = false;
101    }
102
103    pub fn tag(mut self, tag: &str) -> Self {
104        self.tag = tag.to_string();
105        self
106    }
107
108    pub fn feedable(mut self) -> Self {
109        self.to_be_feedable = true;
110        self
111    }
112}
113
114impl<F: Default + Debug + Send + 'static> Default for Pipeline<F> {
115    fn default() -> Self {
116        Self::new()
117    }
118}