remotia_core/pipeline/
mod.rs1use std::fmt::Debug;
2
3use log::info;
4use tokio::{sync::mpsc::{self, UnboundedSender}, task::JoinHandle};
5
6use self::{component::Component, feeder::PipelineFeeder};
7
8pub mod component;
9pub mod feeder;
10pub mod registry;
11
12pub struct Pipeline<F> {
13 components: Vec<Component<F>>,
14 feeding_sender: Option<UnboundedSender<F>>,
15
16 tag: String,
17
18 bound: bool,
19
20 to_be_feedable: bool,
21}
22
23impl<F: Debug + Default + Send + 'static> Pipeline<F> {
24 pub fn new() -> Self {
25 Self {
26 components: Vec::new(),
27 feeding_sender: None,
28
29 tag: "".to_string(),
30
31 bound: false,
32
33 to_be_feedable: false,
34 }
35 }
36
37 pub fn singleton(component: Component<F>) -> Self {
38 Self::new().link(component)
39 }
40
41 pub fn link(mut self, component: Component<F>) -> Self {
42 self.components.push(component);
43 self
44 }
45
46 pub fn get_feeder(&mut self) -> PipelineFeeder<F> {
47 if self.to_be_feedable {
48 self.make_feedable();
49 }
50
51 let sender = self.feeding_sender.as_ref().unwrap().clone();
52 PipelineFeeder::new(sender)
53 }
54
55 pub fn run(mut self) -> Vec<JoinHandle<()>> {
56 info!("[{}] Launching threads...", self.tag);
57
58 if !self.bound {
59 self.bind();
60 }
61
62 if self.to_be_feedable {
63 self.make_feedable();
64 }
65
66 let mut handles = Vec::new();
67
68 for component in self.components {
69 let handle = component.launch();
70 handles.push(handle);
71 }
72
73 handles
74 }
75
76 fn bind(&mut self) {
77 info!("[{}] Binding channels...", self.tag);
78
79 for i in 0..self.components.len()-1 {
80 let (sender, receiver) = mpsc::unbounded_channel::<F>();
81
82 let src_component = self.components.get_mut(i).unwrap();
83 src_component.set_sender(sender);
84
85 let dst_component = self.components.get_mut(i + 1).unwrap();
86 dst_component.set_receiver(receiver);
87 }
88
89 self.bound = true;
90 }
91
92 fn make_feedable(&mut self) {
93 let head = self.components.get_mut(0).unwrap();
94
95 let (sender, receiver) = mpsc::unbounded_channel::<F>();
96 self.feeding_sender = Some(sender);
97
98 head.set_receiver(receiver);
99
100 self.to_be_feedable = false;
101 }
102
103 pub fn tag(mut self, tag: &str) -> Self {
104 self.tag = tag.to_string();
105 self
106 }
107
108 pub fn feedable(mut self) -> Self {
109 self.to_be_feedable = true;
110 self
111 }
112}
113
114impl<F: Default + Debug + Send + 'static> Default for Pipeline<F> {
115 fn default() -> Self {
116 Self::new()
117 }
118}