tcplane/server/
impl.rs

1use crate::*;
2
3impl Default for Server {
4    fn default() -> Self {
5        Self {
6            cfg: Arc::new(RwLock::new(ServerConfig::default())),
7            func_list: Arc::new(RwLock::new(vec![])),
8            tmp: Arc::new(RwLock::new(Tmp::default())),
9        }
10    }
11}
12
13impl Server {
14    pub fn new() -> Self {
15        Self::default()
16    }
17
18    pub async fn host<T>(&mut self, host: T) -> &mut Self
19    where
20        T: Into<String>,
21    {
22        self.get_cfg().write().await.set_host(host.into());
23        self
24    }
25
26    pub async fn port(&mut self, port: usize) -> &mut Self {
27        self.get_cfg().write().await.set_port(port);
28        self
29    }
30
31    pub async fn log_dir<T>(&mut self, log_dir: T) -> &mut Self
32    where
33        T: Into<String> + Clone,
34    {
35        self.get_cfg()
36            .write()
37            .await
38            .set_log_dir(log_dir.clone().into());
39        self.get_tmp()
40            .write()
41            .await
42            .log
43            .set_path(log_dir.clone().into());
44        self
45    }
46
47    pub async fn log_size(&mut self, log_size: usize) -> &mut Self {
48        self.get_cfg().write().await.set_log_size(log_size);
49        self.get_tmp()
50            .write()
51            .await
52            .log
53            .set_limit_file_size(log_size);
54        self
55    }
56
57    pub async fn enable_log(&self) -> &Self {
58        self.get_cfg()
59            .write()
60            .await
61            .set_log_size(DEFAULT_LOG_FILE_SIZE);
62        self.get_tmp()
63            .write()
64            .await
65            .get_mut_log()
66            .set_limit_file_size(DEFAULT_LOG_FILE_SIZE);
67        self
68    }
69
70    pub async fn disable_log(&self) -> &Self {
71        self.get_cfg()
72            .write()
73            .await
74            .set_log_size(DISABLE_LOG_FILE_SIZE);
75        self.get_tmp()
76            .write()
77            .await
78            .get_mut_log()
79            .set_limit_file_size(DISABLE_LOG_FILE_SIZE);
80        self
81    }
82
83    pub async fn print(&mut self, print: bool) -> &mut Self {
84        self.get_cfg().write().await.set_inner_print(print);
85        self
86    }
87
88    pub async fn enable_print(&mut self) -> &mut Self {
89        self.print(true).await;
90        self
91    }
92
93    pub async fn disable_print(&mut self) -> &mut Self {
94        self.print(false).await;
95        self
96    }
97
98    pub async fn open_print(&mut self, print: bool) -> &mut Self {
99        self.get_cfg().write().await.set_inner_print(print);
100        self
101    }
102
103    pub async fn buffer(&mut self, buffer_size: usize) -> &mut Self {
104        self.get_cfg().write().await.set_buffer_size(buffer_size);
105        self
106    }
107
108    pub async fn inner_print(&self, print: bool) -> &Self {
109        self.get_cfg().write().await.set_inner_print(print);
110        self
111    }
112
113    pub async fn inner_log(&self, print: bool) -> &Self {
114        self.get_cfg().write().await.set_inner_log(print);
115        self
116    }
117
118    pub async fn enable_inner_print(&self) -> &Self {
119        self.inner_print(true).await;
120        self
121    }
122
123    pub async fn disable_inner_print(&self) -> &Self {
124        self.inner_print(false).await;
125        self
126    }
127
128    pub async fn enable_inner_log(&self) -> &Self {
129        self.inner_log(true).await;
130        self
131    }
132
133    pub async fn disable_inner_log(&self) -> &Self {
134        self.inner_log(false).await;
135        self
136    }
137
138    pub async fn func<F, Fut>(&mut self, func: F) -> &mut Self
139    where
140        F: AsyncFuncWithoutPin<Fut>,
141        Fut: Future<Output = ()> + Send + Sync + 'static,
142    {
143        self.func_list
144            .write()
145            .await
146            .push(Box::new(move |ctx| Box::pin(func(ctx))));
147        self
148    }
149
150    pub(super) async fn handle_stream(cfg: &ServerConfig, stream_lock: ArcRwLockStream) -> Vec<u8> {
151        let buffer_size: usize = cfg.get_buffer_size().clone().max(SPLIT_REQUEST_BYTES.len());
152        let mut buffer: Vec<u8> = Vec::new();
153        let mut tmp_buf: Vec<u8> = vec![0u8; buffer_size];
154        let mut stream: RwLockWriteGuard<'_, TcpStream> = stream_lock.get_write_lock().await;
155        loop {
156            match stream.read(&mut tmp_buf).await {
157                Ok(n) => {
158                    let old_len: usize = tmp_buf.len();
159                    tmp_buf = remove_trailing_zeros(&mut tmp_buf);
160                    let new_len: usize = tmp_buf.len();
161                    if n == 0 {
162                        break;
163                    }
164                    if old_len != new_len || tmp_buf.ends_with(SPLIT_REQUEST_BYTES) {
165                        buffer.extend_from_slice(&tmp_buf[..n - SPLIT_REQUEST_BYTES.len()]);
166                        break;
167                    }
168                    buffer.extend_from_slice(&tmp_buf[..n]);
169                }
170                _ => {
171                    break;
172                }
173            }
174        }
175        buffer
176    }
177
178    pub async fn listen(&mut self) -> &mut Self {
179        self.init().await;
180        let cfg: ServerConfig = self.get_cfg().read().await.clone();
181        let host: String = cfg.get_host().to_owned();
182        let port: usize = *cfg.get_port();
183        let addr: String = format!("{}{}{}", host, COLON_SPACE_SYMBOL, port);
184        let tcp_listener: TcpListener = TcpListener::bind(&addr)
185            .await
186            .map_err(|e| ServerError::TcpBindError(e.to_string()))
187            .unwrap();
188        while let Ok((stream, _)) = tcp_listener.accept().await {
189            let tmp_arc_lock: ArcRwLockTmp = Arc::clone(&self.tmp);
190            let stream_lock: ArcRwLockStream = ArcRwLockStream::from_stream(stream);
191            let func_list_arc_lock: ArcRwlockVecBoxFunc = Arc::clone(&self.get_func_list());
192            let cfg_arc_lock: ArcRwLockServerConfig = Arc::clone(&self.get_cfg());
193            let handle_request = move || async move {
194                let cfg: ServerConfig = cfg_arc_lock.read().await.clone();
195                let request: Vec<u8> = Self::handle_stream(&cfg, stream_lock.clone()).await;
196                let log: Log = tmp_arc_lock.read().await.get_log().clone();
197                let mut ctx: InnerContext = InnerContext::new();
198                ctx.set_stream(Some(stream_lock.clone()))
199                    .set_request(request)
200                    .set_log(log);
201                let ctx: Context = Context::from_inner_context(ctx);
202                for func in func_list_arc_lock.read().await.iter() {
203                    func(ctx.clone()).await;
204                }
205            };
206            tokio::spawn(handle_request());
207        }
208        self
209    }
210
211    async fn init_panic_hook(&self) {
212        let tmp: Tmp = self.get_tmp().read().await.clone();
213        let cfg: ServerConfig = self.get_cfg().read().await.clone();
214        let enable_inner_print: bool = *cfg.get_inner_print();
215        let enable_inner_log: bool = *cfg.get_inner_log() && tmp.get_log().is_enable();
216        set_hook(Box::new(move |err| {
217            let err_string: String = err.to_string();
218            if enable_inner_print {
219                println_error!(err_string);
220            }
221            if enable_inner_log {
222                handle_error(&tmp, &err_string);
223            }
224        }));
225    }
226
227    async fn init(&self) {
228        self.init_panic_hook().await;
229    }
230}