remotia_core/pipeline/
component.rs1use std::time::Duration;
2
3use log::{debug, info};
4use tokio::{
5 task::JoinHandle, sync::mpsc::{UnboundedReceiver, UnboundedSender},
6};
7
8use crate::{traits::FrameProcessor};
9
10macro_rules! tagged {
11 ($self:ident, $msg:tt) => {{
12 &format!("[{}] {}", $self.tag.as_ref().unwrap_or(&"".to_string()), $msg)
13 }}
14}
15
16pub struct Component<F> {
17 processors: Vec<Box<dyn FrameProcessor<F> + Send>>,
18
19 receiver: Option<UnboundedReceiver<F>>,
20 sender: Option<UnboundedSender<F>>,
21
22 tag: Option<String>
23}
24
25unsafe impl<F> Send for Component<F> {}
26
27impl<F: Default + Send + 'static> Component<F> {
28 pub fn new() -> Self {
29 Self {
30 processors: Vec::new(),
31 receiver: None,
32 sender: None,
33 tag: None
34 }
35 }
36
37 pub fn singleton<T: 'static + FrameProcessor<F> + Send>(processor: T) -> Self {
38 Self::new().append(processor)
39 }
40
41 pub fn append<T: 'static + FrameProcessor<F> + Send>(mut self, processor: T) -> Self {
42 self.processors.push(Box::new(processor));
43 self
44 }
45
46 pub fn tag(mut self, tag: &str) -> Self {
47 self.tag = Some(tag.to_string());
48 self
49 }
50
51 pub(crate) fn set_sender(&mut self, sender: UnboundedSender<F>) {
56 self.sender = Some(sender);
57 }
58
59 pub(crate) fn set_receiver(&mut self, receiver: UnboundedReceiver<F>) {
60 self.receiver = Some(receiver);
61 }
62
63 pub(crate) fn launch(mut self) -> JoinHandle<()> {
64 tokio::spawn(async move {
65 loop {
66 let mut frame_data = if self.receiver.is_some() {
67 Some(
68 self.receiver
69 .as_mut()
70 .unwrap()
71 .recv()
72 .await
73 .expect(tagged!(self, "Receive channel closed")),
74 )
75 } else {
76 debug!("No receiver registered, allocating an empty frame DTO");
77 Some(F::default())
78 };
79
80 for processor in &mut self.processors {
81 frame_data = processor.process(frame_data.unwrap()).await;
82
83 if frame_data.is_none() {
84 break;
85 }
86 }
87
88 if self.sender.is_some() {
89 if let Some(frame_data) = frame_data {
90 if self.sender.as_mut().unwrap().send(frame_data).is_err() {
91 panic!("{}", tagged!(self, "Error while sending frame data"));
92 }
93 }
94 }
95 }
96 })
97 }
98}
99
100
101impl<F: Default + Send + 'static> Default for Component<F> {
102 fn default() -> Self {
103 Self::new()
104 }
105}