Skip to main content

tfserver/structures/
traffic_proc.rs

1use crate::codec::codec_trait::TfCodec;
2use crate::structures::transport::Transport;
3use async_trait::async_trait;
4use tokio_util::bytes::{BytesMut};
5use tokio_util::codec::{ Framed};
6
7#[async_trait]
8///A traffic processor trait, that applied to all streams. Processes all stream. If you need setup by each specific stream, use codecs instead
9pub trait TrafficProcess: Send + Sync {
10    type Codec;
11    ///The routine that defines if we can connect stream or not
12    async fn initial_connect(&mut self, source: &mut Transport) -> bool;
13    ///The routine that defines if we can connect stream or not, but when framed was setted up
14    async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, Self::Codec>)
15    -> bool;
16    ///Process every traffic that is handled by server
17    async fn post_process_traffic(&mut self, data: Vec<u8>) -> Vec<u8>;
18    ///Process every traffic that is handled by server
19    async fn pre_process_traffic(&mut self, data: BytesMut) -> BytesMut;
20    fn clone(&self) -> Box<dyn TrafficProcess<Codec = Self::Codec>>;
21}
22
23pub struct TrafficProcessorHolder<C>
24where
25    C: TfCodec,
26{
27    processors: Vec<Box<dyn TrafficProcess<Codec = C>>>,
28}
29
30impl<C> Clone for TrafficProcessorHolder<C>
31where
32    C: TfCodec,
33{
34    fn clone(&self) -> Self {
35        let mut processors = Vec::new();
36        self.processors
37            .iter()
38            .for_each(|p| processors.push(p.as_ref().clone()));
39
40        Self { processors }
41    }
42}
43
44impl<C> TrafficProcessorHolder<C>
45where
46    C: TfCodec,
47{
48    pub fn new() -> Self {
49        TrafficProcessorHolder { processors: vec![] }
50    }
51    pub fn register_processor(&mut self, processor: Box<dyn TrafficProcess<Codec = C>>) {
52        self.processors.push(processor);
53    }
54
55    pub async fn initial_connect(&mut self, source: &mut Transport) -> bool {
56        for processor in self.processors.iter_mut() {
57            if !processor.as_mut().initial_connect(source).await {
58                return false;
59            }
60        }
61        true
62    }
63
64    pub async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, C>) -> bool {
65        for processor in self.processors.iter_mut() {
66            if !processor.as_mut().initial_framed_connect(source).await {
67                return false;
68            }
69        }
70        true
71    }
72
73    pub async fn post_process_traffic(&mut self, mut data: Vec<u8>) -> Vec<u8> {
74        for proc in self.processors.iter_mut() {
75            data = proc.post_process_traffic(data).await;
76        }
77        data
78    }
79
80    pub async fn pre_process_traffic(&mut self, mut data: BytesMut) -> BytesMut {
81        for proc in self.processors.iter_mut() {
82            data = proc.pre_process_traffic(data).await;
83        }
84        data
85    }
86}