tcplane/server/
impl.rs

1use crate::*;
2
3impl Default for Server {
4    fn default() -> Self {
5        Self {
6            cfg: Arc::new(RwLock::new(ServerConfig::default())),
7            func_list: Arc::new(RwLock::new(vec![])),
8            tmp: Arc::new(RwLock::new(Tmp::default())),
9        }
10    }
11}
12
13impl Server {
14    pub fn new() -> Self {
15        Self::default()
16    }
17
18    pub async fn host<T>(&mut self, host: T) -> &mut Self
19    where
20        T: Into<String>,
21    {
22        self.get_cfg().write().await.set_host(host.into());
23        self
24    }
25
26    pub async fn port(&mut self, port: usize) -> &mut Self {
27        self.get_cfg().write().await.set_port(port);
28        self
29    }
30
31    pub async fn log_dir<T>(&mut self, log_dir: T) -> &mut Self
32    where
33        T: Into<String> + Clone,
34    {
35        self.get_cfg()
36            .write()
37            .await
38            .set_log_dir(log_dir.clone().into());
39        self.get_tmp()
40            .write()
41            .await
42            .log
43            .set_path(log_dir.clone().into());
44        self
45    }
46
47    pub async fn log_size(&mut self, log_size: usize) -> &mut Self {
48        self.get_cfg().write().await.set_log_size(log_size);
49        self.get_tmp()
50            .write()
51            .await
52            .log
53            .set_limit_file_size(log_size);
54        self
55    }
56
57    pub async fn enable_log(&self) -> &Self {
58        self.get_cfg()
59            .write()
60            .await
61            .set_log_size(DEFAULT_LOG_FILE_SIZE);
62        self.get_tmp()
63            .write()
64            .await
65            .get_mut_log()
66            .set_limit_file_size(DEFAULT_LOG_FILE_SIZE);
67        self
68    }
69
70    pub async fn disable_log(&self) -> &Self {
71        self.get_cfg()
72            .write()
73            .await
74            .set_log_size(DISABLE_LOG_FILE_SIZE);
75        self.get_tmp()
76            .write()
77            .await
78            .get_mut_log()
79            .set_limit_file_size(DISABLE_LOG_FILE_SIZE);
80        self
81    }
82
83    pub async fn print(&mut self, print: bool) -> &mut Self {
84        self.get_cfg().write().await.set_inner_print(print);
85        self
86    }
87
88    pub async fn enable_print(&mut self) -> &mut Self {
89        self.print(true).await;
90        self
91    }
92
93    pub async fn disable_print(&mut self) -> &mut Self {
94        self.print(false).await;
95        self
96    }
97
98    pub async fn open_print(&mut self, print: bool) -> &mut Self {
99        self.get_cfg().write().await.set_inner_print(print);
100        self
101    }
102
103    pub async fn buffer(&mut self, buffer_size: usize) -> &mut Self {
104        self.get_cfg().write().await.set_buffer_size(buffer_size);
105        self
106    }
107
108    pub async fn inner_print(&self, print: bool) -> &Self {
109        self.get_cfg().write().await.set_inner_print(print);
110        self
111    }
112
113    pub async fn inner_log(&self, print: bool) -> &Self {
114        self.get_cfg().write().await.set_inner_log(print);
115        self
116    }
117
118    pub async fn enable_inner_print(&self) -> &Self {
119        self.inner_print(true).await;
120        self
121    }
122
123    pub async fn disable_inner_print(&self) -> &Self {
124        self.inner_print(false).await;
125        self
126    }
127
128    pub async fn enable_inner_log(&self) -> &Self {
129        self.inner_log(true).await;
130        self
131    }
132
133    pub async fn disable_inner_log(&self) -> &Self {
134        self.inner_log(false).await;
135        self
136    }
137
138    pub async fn func<F, Fut>(&mut self, func: F) -> &mut Self
139    where
140        F: AsyncFuncWithoutPin<Fut>,
141        Fut: Future<Output = ()> + Send + Sync + 'static,
142    {
143        self.func_list
144            .write()
145            .await
146            .push(Box::new(move |controller_data| {
147                Box::pin(func(controller_data))
148            }));
149        self
150    }
151
152    pub(super) async fn handle_stream(cfg: &ServerConfig, stream_lock: ArcRwLockStream) -> Vec<u8> {
153        let buffer_size: usize = cfg.get_buffer_size().clone().max(SPLIT_REQUEST_BYTES.len());
154        let mut buffer: Vec<u8> = Vec::new();
155        let mut tmp_buf: Vec<u8> = vec![0u8; buffer_size];
156        let mut stream: RwLockWriteGuard<'_, TcpStream> = stream_lock.get_write_lock().await;
157        loop {
158            match stream.read(&mut tmp_buf).await {
159                Ok(n) => {
160                    let old_len: usize = tmp_buf.len();
161                    tmp_buf = remove_trailing_zeros(&mut tmp_buf);
162                    let new_len: usize = tmp_buf.len();
163                    if n == 0 {
164                        break;
165                    }
166                    if old_len != new_len || tmp_buf.ends_with(SPLIT_REQUEST_BYTES) {
167                        buffer.extend_from_slice(&tmp_buf[..n - SPLIT_REQUEST_BYTES.len()]);
168                        break;
169                    }
170                    buffer.extend_from_slice(&tmp_buf[..n]);
171                }
172                _ => {
173                    break;
174                }
175            }
176        }
177        buffer
178    }
179
180    pub async fn listen(&mut self) -> &mut Self {
181        self.init().await;
182        let cfg: ServerConfig = self.get_cfg().read().await.clone();
183        let host: String = cfg.get_host().to_owned();
184        let port: usize = *cfg.get_port();
185        let addr: String = format!("{}{}{}", host, COLON_SPACE_SYMBOL, port);
186        let tcp_listener: TcpListener = TcpListener::bind(&addr)
187            .await
188            .map_err(|e| ServerError::TcpBindError(e.to_string()))
189            .unwrap();
190        while let Ok((stream, _)) = tcp_listener.accept().await {
191            let tmp_arc_lock: ArcRwLockTmp = Arc::clone(&self.tmp);
192            let stream_lock: ArcRwLockStream = ArcRwLockStream::from_stream(stream);
193            let func_list_arc_lock: ArcRwlockVecBoxFunc = Arc::clone(&self.get_func_list());
194            let cfg_arc_lock: ArcRwLockServerConfig = Arc::clone(&self.get_cfg());
195            let handle_request = move || async move {
196                let cfg: ServerConfig = cfg_arc_lock.read().await.clone();
197                let request: Vec<u8> = Self::handle_stream(&cfg, stream_lock.clone()).await;
198                let log: Log = tmp_arc_lock.read().await.get_log().clone();
199                let mut controller_data: InnerControllerData = InnerControllerData::new();
200                controller_data
201                    .set_stream(Some(stream_lock.clone()))
202                    .set_request(request)
203                    .set_log(log);
204                let controller_data: ControllerData =
205                    ControllerData::from_controller_data(controller_data);
206                for func in func_list_arc_lock.read().await.iter() {
207                    func(controller_data.clone()).await;
208                }
209            };
210            tokio::spawn(handle_request());
211        }
212        self
213    }
214
215    async fn init_panic_hook(&self) {
216        let tmp: Tmp = self.get_tmp().read().await.clone();
217        let cfg: ServerConfig = self.get_cfg().read().await.clone();
218        let enable_inner_print: bool = *cfg.get_inner_print();
219        let enable_inner_log: bool = *cfg.get_inner_log() && tmp.get_log().is_enable();
220        set_hook(Box::new(move |err| {
221            let err_string: String = err.to_string();
222            if enable_inner_print {
223                println_error!(err_string);
224            }
225            if enable_inner_log {
226                handle_error(&tmp, &err_string);
227            }
228        }));
229    }
230
231    async fn init(&self) {
232        self.init_panic_hook().await;
233    }
234}