data_pipeline_rs/
data_handler.rs

1use anyhow::Result;
2
3use crate::{node::NodeRef, node_visitor::NodeVisitor, stats_producer::StatsProducer};
4
5pub trait DataObserver<T> {
6    fn observe(&mut self, data: &T);
7    fn get_stats(&self) -> Option<serde_json::Value> {
8        None
9    }
10}
11
12pub trait DataTransformer<T> {
13    fn transform(&mut self, data: T) -> Result<T>;
14    fn get_stats(&self) -> Option<serde_json::Value> {
15        None
16    }
17}
18
19pub trait DataFilter<T> {
20    fn should_forward(&mut self, data: &T) -> bool;
21    fn get_stats(&self) -> Option<serde_json::Value> {
22        None
23    }
24}
25
26// impl<T, F> StatsProducer for F where F: FnMut(&T) -> bool {}
27
28impl<T, F> DataFilter<T> for F
29where
30    F: FnMut(&T) -> bool,
31{
32    fn should_forward(&mut self, packet_info: &T) -> bool {
33        (self)(packet_info)
34    }
35}
36
37impl<F, T> From<F> for SomeDataHandler<T>
38where
39    F: FnMut(&T) -> bool + Send + 'static,
40{
41    fn from(value: F) -> Self {
42        SomeDataHandler::Filter(Box::new(value))
43    }
44}
45
46pub trait DataConsumer<T> {
47    fn consume(&mut self, data: T);
48    fn get_stats(&self) -> Option<serde_json::Value> {
49        None
50    }
51}
52
53pub trait DataDemuxer<T> {
54    fn find_path(&mut self, data: &T) -> Option<&NodeRef<T>>;
55    // DataDemuxer has to have its own visitor logic since it handles its own paths
56    fn visit(&mut self, visitor: &mut dyn NodeVisitor<T>);
57    fn get_stats(&self) -> Option<serde_json::Value> {
58        None
59    }
60}
61
62// Note: Ideally we'd have blanket impls to convert from any of the above traits into
63// SomeDatahandler, but unfortunately I don't think that can be done without causing conflicting
64// implementation errors.  This macro helps with the conversion at least.
65// Example:
66// impl_conversion_to_some_data_handler!(MyType, Observer);
67#[macro_export]
68macro_rules! impl_conversion_to_some_data_handler {
69    ($type:ty,$variant:ident) => {
70        impl<T> From<$type> for $crate::data_handler::SomeDataHandler<T> {
71            fn from(value: $type) -> Self {
72                $crate::data_handler::SomeDataHandler::$variant(Box::new(value))
73            }
74        }
75    };
76}
77
78pub enum SomeDataHandler<T> {
79    Observer(Box<dyn DataObserver<T> + Send>),
80    Transformer(Box<dyn DataTransformer<T> + Send>),
81    Filter(Box<dyn DataFilter<T> + Send>),
82    Consumer(Box<dyn DataConsumer<T> + Send>),
83    Demuxer(Box<dyn DataDemuxer<T> + Send>),
84}
85
86impl<T> StatsProducer for SomeDataHandler<T> {
87    fn get_stats(&self) -> Option<serde_json::Value> {
88        match self {
89            SomeDataHandler::Observer(ref o) => o.get_stats(),
90            SomeDataHandler::Transformer(ref t) => t.get_stats(),
91            SomeDataHandler::Filter(ref f) => f.get_stats(),
92            SomeDataHandler::Consumer(ref c) => c.get_stats(),
93            SomeDataHandler::Demuxer(ref d) => d.get_stats(),
94        }
95    }
96}