tfserver/structures/
traffic_proc.rs1use async_trait::async_trait;
2use tokio::io;
3use tokio_util::bytes::{Bytes, BytesMut};
4use tokio_util::codec::{Decoder, Encoder, Framed};
5use crate::structures::transport::Transport;
6
7#[async_trait]
8pub trait TrafficProcess: Send + Sync {
10 type Codec;
11 async fn initial_connect(&mut self, source: &mut Transport) -> bool;
13 async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, Self::Codec>) -> bool;
15 async fn post_process_traffic(&mut self, data: Vec<u8>) -> Vec<u8>;
17 async fn pre_process_traffic(&mut self, data: BytesMut) -> BytesMut;
19 fn clone(&self) -> Box<dyn TrafficProcess<Codec = Self::Codec>>;
20}
21
22pub struct TrafficProcessorHolder<C> where C: Encoder<Bytes> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + 'static
23{
24 processors: Vec<Box<dyn TrafficProcess<Codec = C>>>,
25}
26
27impl<C> Clone for TrafficProcessorHolder<C> where C: Encoder<Bytes> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + 'static{
28 fn clone(&self) -> Self {
29 let mut processors = Vec::new();
30 self.processors
31 .iter()
32 .for_each(|p| processors.push(p.as_ref().clone()));
33
34 Self { processors }
35 }
36}
37
38impl<C> TrafficProcessorHolder<C> where
39C: Encoder<Bytes> + Decoder<Item = BytesMut, Error = io::Error> + Clone + Send + 'static {
40 pub fn new() -> Self {
41 TrafficProcessorHolder { processors: vec![] }
42 }
43 pub fn register_processor(&mut self, processor: Box<dyn TrafficProcess<Codec = C>>) {
44 self.processors.push(processor);
45 }
46
47 pub async fn initial_connect(&mut self, source: &mut Transport) -> bool{
48 for processor in self.processors.iter_mut() {
49 if !processor.as_mut().initial_connect(source).await{
50 return false;
51 }
52 }
53 true
54 }
55
56 pub async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, C>) -> bool {
57 for processor in self.processors.iter_mut() {
58 if !processor.as_mut().initial_framed_connect(source).await{
59 return false;
60 }
61 }
62 true
63 }
64
65 pub async fn post_process_traffic(&mut self, mut data: Vec<u8>) -> Vec<u8> {
66 for proc in self.processors.iter_mut() {
67 data = proc.post_process_traffic(data).await;
68 }
69 data
70 }
71
72 pub async fn pre_process_traffic(&mut self, mut data: BytesMut) -> BytesMut {
73 for proc in self.processors.iter_mut() {
74 data = proc.pre_process_traffic(data).await;
75 }
76 data
77 }
78}