data_pipeline_rs/
data_handler.rs1use anyhow::Result;
2
3use crate::{node::NodeRef, node_visitor::NodeVisitor, stats_producer::StatsProducer};
4
5pub trait DataObserver<T> {
6 fn observe(&mut self, data: &T);
7 fn get_stats(&self) -> Option<serde_json::Value> {
8 None
9 }
10}
11
12pub trait DataTransformer<T> {
13 fn transform(&mut self, data: T) -> Result<T>;
14 fn get_stats(&self) -> Option<serde_json::Value> {
15 None
16 }
17}
18
19pub trait DataFilter<T> {
20 fn should_forward(&mut self, data: &T) -> bool;
21 fn get_stats(&self) -> Option<serde_json::Value> {
22 None
23 }
24}
25
26impl<T, F> DataFilter<T> for F
29where
30 F: FnMut(&T) -> bool,
31{
32 fn should_forward(&mut self, packet_info: &T) -> bool {
33 (self)(packet_info)
34 }
35}
36
37impl<F, T> From<F> for SomeDataHandler<T>
38where
39 F: FnMut(&T) -> bool + Send + 'static,
40{
41 fn from(value: F) -> Self {
42 SomeDataHandler::Filter(Box::new(value))
43 }
44}
45
46pub trait DataConsumer<T> {
47 fn consume(&mut self, data: T);
48 fn get_stats(&self) -> Option<serde_json::Value> {
49 None
50 }
51}
52
53pub trait DataDemuxer<T> {
54 fn find_path(&mut self, data: &T) -> Option<&NodeRef<T>>;
55 fn visit(&mut self, visitor: &mut dyn NodeVisitor<T>);
57 fn get_stats(&self) -> Option<serde_json::Value> {
58 None
59 }
60}
61
62#[macro_export]
68macro_rules! impl_conversion_to_some_data_handler {
69 ($type:ty,$variant:ident) => {
70 impl<T> From<$type> for $crate::data_handler::SomeDataHandler<T> {
71 fn from(value: $type) -> Self {
72 $crate::data_handler::SomeDataHandler::$variant(Box::new(value))
73 }
74 }
75 };
76}
77
78pub enum SomeDataHandler<T> {
79 Observer(Box<dyn DataObserver<T> + Send>),
80 Transformer(Box<dyn DataTransformer<T> + Send>),
81 Filter(Box<dyn DataFilter<T> + Send>),
82 Consumer(Box<dyn DataConsumer<T> + Send>),
83 Demuxer(Box<dyn DataDemuxer<T> + Send>),
84}
85
86impl<T> StatsProducer for SomeDataHandler<T> {
87 fn get_stats(&self) -> Option<serde_json::Value> {
88 match self {
89 SomeDataHandler::Observer(ref o) => o.get_stats(),
90 SomeDataHandler::Transformer(ref t) => t.get_stats(),
91 SomeDataHandler::Filter(ref f) => f.get_stats(),
92 SomeDataHandler::Consumer(ref c) => c.get_stats(),
93 SomeDataHandler::Demuxer(ref d) => d.get_stats(),
94 }
95 }
96}