Skip to main content

tfserver/structures/
traffic_proc.rs

1use async_trait::async_trait;
2use tokio::io;
3use tokio_util::bytes::{Bytes, BytesMut};
4use tokio_util::codec::{Decoder, Encoder, Framed};
5use crate::structures::transport::Transport;
6
7#[async_trait]
8///A traffic processor trait, that applied to all streams. Processes all stream. If you need setup by each specific stream, use codecs instead
9pub trait TrafficProcess: Send + Sync {
10    type Codec;
11    ///The routine that defines if we can connect stream or not
12    async fn initial_connect(&mut self, source: &mut Transport) -> bool;
13     ///The routine that defines if we can connect stream or not, but when framed was setted up
14    async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, Self::Codec>) -> bool;
15    ///Process every traffic that is handled by server
16    async fn post_process_traffic(&mut self, data: Vec<u8>) -> Vec<u8>;
17     ///Process every traffic that is handled by server
18    async fn pre_process_traffic(&mut self, data: BytesMut) -> BytesMut;
19    fn clone(&self) -> Box<dyn TrafficProcess<Codec = Self::Codec>>;
20}
21
22pub struct TrafficProcessorHolder<C> where C: Encoder<Bytes> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + 'static
23{
24    processors: Vec<Box<dyn TrafficProcess<Codec = C>>>,
25}
26
27impl<C> Clone for TrafficProcessorHolder<C>  where C: Encoder<Bytes> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + 'static{
28    fn clone(&self) -> Self {
29        let mut processors = Vec::new();
30        self.processors
31            .iter()
32            .for_each(|p| processors.push(p.as_ref().clone()));
33
34        Self { processors }
35    }
36}
37
38impl<C> TrafficProcessorHolder<C> where
39C: Encoder<Bytes> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + 'static {
40    pub fn new() -> Self {
41        TrafficProcessorHolder { processors: vec![] }
42    }
43    pub fn register_processor(&mut self, processor: Box<dyn TrafficProcess<Codec = C>>) {
44        self.processors.push(processor);
45    }
46
47    pub async fn initial_connect(&mut self, source: &mut Transport) -> bool{
48        for processor in self.processors.iter_mut() {
49            if !processor.as_mut().initial_connect(source).await{
50                return false;
51            }
52        }
53        true
54    }
55
56    pub async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, C>) -> bool {
57        for processor in self.processors.iter_mut() {
58            if !processor.as_mut().initial_framed_connect(source).await{
59                return false;
60            }
61        }
62        true
63    }
64
65    pub async fn post_process_traffic(&mut self, mut data: Vec<u8>) -> Vec<u8> {
66        for proc in self.processors.iter_mut() {
67            data = proc.post_process_traffic(data).await;
68        }
69        data
70    }
71
72    pub async fn pre_process_traffic(&mut self, mut data: BytesMut) -> BytesMut {
73        for proc in self.processors.iter_mut() {
74            data = proc.pre_process_traffic(data).await;
75        }
76        data
77    }
78}