remotia_core/pipeline/
component.rs

1use std::time::Duration;
2
3use log::{debug, info};
4use tokio::{
5    task::JoinHandle, sync::mpsc::{UnboundedReceiver, UnboundedSender},
6};
7
8use crate::{traits::FrameProcessor};
9
10macro_rules! tagged {
11    ($self:ident, $msg:tt) => {{
12        &format!("[{}] {}", $self.tag.as_ref().unwrap_or(&"".to_string()), $msg)
13    }}
14}
15
16pub struct Component<F> {
17    processors: Vec<Box<dyn FrameProcessor<F> + Send>>,
18
19    receiver: Option<UnboundedReceiver<F>>,
20    sender: Option<UnboundedSender<F>>,
21
22    tag: Option<String>
23}
24
25unsafe impl<F> Send for Component<F> {}
26
27impl<F: Default + Send + 'static> Component<F> {
28    pub fn new() -> Self {
29        Self {
30            processors: Vec::new(),
31            receiver: None,
32            sender: None,
33            tag: None
34        }
35    }
36
37    pub fn singleton<T: 'static + FrameProcessor<F> + Send>(processor: T) -> Self {
38        Self::new().append(processor)
39    }
40
41    pub fn append<T: 'static + FrameProcessor<F> + Send>(mut self, processor: T) -> Self {
42        self.processors.push(Box::new(processor));
43        self
44    }
45
46    pub fn tag(mut self, tag: &str) -> Self {
47        self.tag = Some(tag.to_string());
48        self
49    }
50
51    //////////////////////
52    // Internal methods //
53    //////////////////////
54
55    pub(crate) fn set_sender(&mut self, sender: UnboundedSender<F>) {
56        self.sender = Some(sender);
57    }
58
59    pub(crate) fn set_receiver(&mut self, receiver: UnboundedReceiver<F>) {
60        self.receiver = Some(receiver);
61    }
62
63    pub(crate) fn launch(mut self) -> JoinHandle<()> {
64        tokio::spawn(async move {
65            loop {
66                let mut frame_data = if self.receiver.is_some() {
67                    Some(
68                        self.receiver
69                            .as_mut()
70                            .unwrap()
71                            .recv()
72                            .await
73                            .expect(tagged!(self, "Receive channel closed")),
74                    )
75                } else {
76                    debug!("No receiver registered, allocating an empty frame DTO");
77                    Some(F::default())
78                };
79
80                for processor in &mut self.processors {
81                    frame_data = processor.process(frame_data.unwrap()).await;
82
83                    if frame_data.is_none() {
84                        break;
85                    }
86                }
87
88                if self.sender.is_some() {
89                    if let Some(frame_data) = frame_data {
90                        if self.sender.as_mut().unwrap().send(frame_data).is_err() {
91                            panic!("{}", tagged!(self, "Error while sending frame data"));
92                        }
93                    }
94                }
95            }
96        })
97    }
98}
99
100
101impl<F: Default + Send + 'static> Default for Component<F> {
102    fn default() -> Self {
103        Self::new()
104    }
105}