data_pipeline_rs/
data_handler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use anyhow::Result;

use crate::{node::NodeRef, node_visitor::NodeVisitor, stats_producer::StatsProducer};

pub trait DataObserver<T> {
    fn observe(&mut self, data: &T);
    fn get_stats(&self) -> Option<serde_json::Value> {
        None
    }
}

pub trait DataTransformer<T> {
    fn transform(&mut self, data: T) -> Result<T>;
    fn get_stats(&self) -> Option<serde_json::Value> {
        None
    }
}

pub trait DataFilter<T> {
    fn should_forward(&mut self, data: &T) -> bool;
    fn get_stats(&self) -> Option<serde_json::Value> {
        None
    }
}

// impl<T, F> StatsProducer for F where F: FnMut(&T) -> bool {}
//
// impl<T, F> DataFilter<T> for F
// where
//     F: FnMut(&T) -> bool,
// {
//     fn should_forward(&mut self, packet_info: &T) -> bool {
//         (self)(packet_info)
//     }
// }

pub trait DataConsumer<T> {
    fn consume(&mut self, data: T);
    fn get_stats(&self) -> Option<serde_json::Value> {
        None
    }
}

pub trait DataDemuxer<T> {
    fn find_path(&mut self, data: &T) -> Option<&NodeRef<T>>;
    // DataDemuxer has to have its own visitor logic since it handles its own paths
    fn visit(&mut self, visitor: &mut dyn NodeVisitor<T>);
    fn get_stats(&self) -> Option<serde_json::Value> {
        None
    }
}

// Note: Ideally we'd have blanket impls to convert from any of the above traits into
// SomeDatahandler, but unfortunately I don't think that can be done without causing conflicting
// implementation errors.  This macro helps with the conversion at least.
// Example:
// impl_conversion_to_some_data_handler!(MyType, Observer);
#[macro_export]
macro_rules! impl_conversion_to_some_data_handler {
    ($type:ty,$variant:ident) => {
        impl<T> From<$type> for $crate::data_handler::SomeDataHandler<T> {
            fn from(value: $type) -> Self {
                $crate::data_handler::SomeDataHandler::$variant(Box::new(value))
            }
        }
    };
}

pub enum SomeDataHandler<T> {
    Observer(Box<dyn DataObserver<T> + Send>),
    Transformer(Box<dyn DataTransformer<T> + Send>),
    Filter(Box<dyn DataFilter<T> + Send>),
    Consumer(Box<dyn DataConsumer<T> + Send>),
    Demuxer(Box<dyn DataDemuxer<T> + Send>),
}

impl<T> StatsProducer for SomeDataHandler<T> {
    fn get_stats(&self) -> Option<serde_json::Value> {
        match self {
            SomeDataHandler::Observer(ref o) => o.get_stats(),
            SomeDataHandler::Transformer(ref t) => t.get_stats(),
            SomeDataHandler::Filter(ref f) => f.get_stats(),
            SomeDataHandler::Consumer(ref c) => c.get_stats(),
            SomeDataHandler::Demuxer(ref d) => d.get_stats(),
        }
    }
}