tfserver/structures/
traffic_proc.rs1use crate::codec::codec_trait::TfCodec;
2use crate::structures::transport::Transport;
3use async_trait::async_trait;
4use tokio_util::bytes::{BytesMut};
5use tokio_util::codec::{ Framed};
6
7#[async_trait]
8pub trait TrafficProcess: Send + Sync {
10 type Codec;
11 async fn initial_connect(&mut self, source: &mut Transport) -> bool;
13 async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, Self::Codec>)
15 -> bool;
16 async fn post_process_traffic(&mut self, data: Vec<u8>) -> Vec<u8>;
18 async fn pre_process_traffic(&mut self, data: BytesMut) -> BytesMut;
20 fn clone(&self) -> Box<dyn TrafficProcess<Codec = Self::Codec>>;
21}
22
23pub struct TrafficProcessorHolder<C>
24where
25 C: TfCodec,
26{
27 processors: Vec<Box<dyn TrafficProcess<Codec = C>>>,
28}
29
30impl<C> Clone for TrafficProcessorHolder<C>
31where
32 C: TfCodec,
33{
34 fn clone(&self) -> Self {
35 let mut processors = Vec::new();
36 self.processors
37 .iter()
38 .for_each(|p| processors.push(p.as_ref().clone()));
39
40 Self { processors }
41 }
42}
43
44impl<C> TrafficProcessorHolder<C>
45where
46 C: TfCodec,
47{
48 pub fn new() -> Self {
49 TrafficProcessorHolder { processors: vec![] }
50 }
51 pub fn register_processor(&mut self, processor: Box<dyn TrafficProcess<Codec = C>>) {
52 self.processors.push(processor);
53 }
54
55 pub async fn initial_connect(&mut self, source: &mut Transport) -> bool {
56 for processor in self.processors.iter_mut() {
57 if !processor.as_mut().initial_connect(source).await {
58 return false;
59 }
60 }
61 true
62 }
63
64 pub async fn initial_framed_connect(&mut self, source: &mut Framed<Transport, C>) -> bool {
65 for processor in self.processors.iter_mut() {
66 if !processor.as_mut().initial_framed_connect(source).await {
67 return false;
68 }
69 }
70 true
71 }
72
73 pub async fn post_process_traffic(&mut self, mut data: Vec<u8>) -> Vec<u8> {
74 for proc in self.processors.iter_mut() {
75 data = proc.post_process_traffic(data).await;
76 }
77 data
78 }
79
80 pub async fn pre_process_traffic(&mut self, mut data: BytesMut) -> BytesMut {
81 for proc in self.processors.iter_mut() {
82 data = proc.pre_process_traffic(data).await;
83 }
84 data
85 }
86}