1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4#![allow(dead_code)]
5
6use crate::rsm::{SOCK_EVENT_READ, SOCK_EVENT_CLOSE, SOCK_EVENT_NEW};
7use crate::{rsm,rsm::rsm_component_t};
8use crate::{net_ext::RawFdType};
9use crate::common::{tsidallocator::TsIdAllocator,spin_lock::spin_lock_t};
10use super::*;
11use crate::common::errcode;
12
13
14#[macro_export]
15macro_rules! return_error {
16 ($e:expr,$ids:expr,$idx:expr) => ({
17 $ids.release_id($idx);
18 return $e;
19 })
20}
21
22const MAX_SOCKET_NUM:usize=131072;
23pub(crate) struct socket_info_t {
24 pub(crate) s:Socket,
25 pub(crate) owner:rsm_component_t,
26}
27
28pub(crate) struct SocketPool {
29 sock_ids:TsIdAllocator,
30 sockets:[Option<socket_info_t>;MAX_SOCKET_NUM+1],
31 capacity:usize,
32 lock:spin_lock_t,
33 poll_instance:poll::Poll,
34}
35
36static mut gSocketPool:Option<Box<SocketPool>>=None;
37impl SocketPool {
38 pub fn new()->Box<Self> {
39 let ids = TsIdAllocator::new(1, MAX_SOCKET_NUM as i32);
40 let poll_inst = poll::Poll::new(MAX_SOCKET_NUM);
41 let pool_ptr = unsafe {
42 std::alloc::alloc_zeroed(std::alloc::Layout::from_size_align_unchecked(std::mem::size_of::<SocketPool>(), 1))
43 as * mut SocketPool
44 };
45 let mut pool = unsafe { Box::from_raw(pool_ptr) };
46 pool.sock_ids = ids;
47 pool.capacity = MAX_SOCKET_NUM;
48 pool.lock = spin_lock_t::new();
49 pool.poll_instance=poll_inst;
50
51
52 for i in 1..MAX_SOCKET_NUM+1 {
53 pool.sockets[i]=None;
54 }
55 println!("init socket pool success,capacity={}",MAX_SOCKET_NUM);
56 return pool
57 }
58
59
60 pub fn new_socket(&mut self,sock_af:SOCKET_ADDRESS_FAMILY,sock_type:SOCKET_TYPE,proto:u8)->Result<i32,errcode::RESULT> {
61 const _DEBUG:bool=true;
62 let caller=match rsm::get_self_cid() {
63 None=> {
64 if _DEBUG {
65 rsm::rsm_component_t::new_zero()
66 } else {
67 return Err(errcode::ERROR_INVALID_STATE)
68 }
69
70 },
71 Some(c)=>c,
72 };
73
74 let sid=self.sock_ids.allocate_id();
75 if sid==TsIdAllocator::INVALID_ID {
76 return Err(errcode::ERROR_OUTOF_MEM)
77 }
78 let mut sock = match Socket::new_socket(sid,sock_af,sock_type,proto) {
79 Ok(s)=>s,
80 Err(e)=>return_error!(Err(e),self.sock_ids,sid),
81 };
82 sock.set_non_blocking();
83 self.add_poll_registry(&sock);
84 let sck_info = socket_info_t {
85 s:sock,
86 owner:caller,
87 };
88 self.sockets[sid as usize]=Some(sck_info);
89
90
91 return Ok(sid);
92 }
93
94 fn add_poll_registry(&mut self,sock:&Socket)->errcode::RESULT {
95 return self.poll_instance.register(sock.get_raw_fd(), sock.get_socket_id() as usize,
96 SOCK_EVENT_READ | SOCK_EVENT_CLOSE,false)
97 }
98
99 fn del_poll_registry(&mut self,fd:RawFdType)->errcode::RESULT {
100 return self.poll_instance.deregister(fd)
101 }
102 pub fn close_socket(&mut self,sock_idx:i32,)->errcode::RESULT {
104 if !self.check_socket_caller(sock_idx) {
105 return errcode::ERROR_NO_PERMISSION
106 }
107 {
108 let fd = match self.get_socket_by_idx(sock_idx) {
109 None=>return errcode::ERROR_NOT_FOUND,
110 Some(s)=>s.get_raw_fd(),
111 };
112 self.del_poll_registry(fd);
113 }
114 let ret = self.sock_ids.release_id(sock_idx);
115 if ret !=errcode::RESULT_SUCCESS {
116 return ret
117 }
118 if self.sockets[sock_idx as usize].is_none() {
119 return errcode::ERROR_NOT_FOUND
120 }
121
122 self.sockets[sock_idx as usize]=None;
123 errcode::RESULT_SUCCESS
124 }
125
126 fn check_socket_caller(&self,sock_idx:i32)->bool {
127 if sock_idx>self.sock_ids.capacity() || self.sockets[sock_idx as usize].is_none() {
128 return false
129 }
130 let caller=match rsm::get_self_cid() {
131 None=>return false,
132 Some(c)=>c,
133 };
134 if let Some(sinfo)=&self.sockets[sock_idx as usize] {
135 if sinfo.owner==caller {
136 return true
137 } else {
138 return false
139 }
140 }
141
142 return false
143
144 }
145
146 fn is_valid_sock_id(&self,sock_id:i32)->bool {
147 return sock_id>0 && sock_id as usize<=self.capacity && self.sockets[sock_id as usize].is_some()
148 }
149
150 pub(crate) fn is_tcp_server(&self,sock_id:i32)->bool {
151 if !self.is_valid_sock_id(sock_id) {
152 return false
153 }
154
155 return match &self.sockets[sock_id as usize] {
156 None=>false,
157 Some(info)=>info.s.is_tcp_server(),
158 };
159 }
160
161 pub(crate) fn get_sock_binding_info(&mut self,sock_idx:i32)->Option<&mut socket_info_t> {
162 if sock_idx>self.sock_ids.capacity() || self.sockets[sock_idx as usize].is_none() {
163 return None
164 }
165 match &mut self.sockets[sock_idx as usize] {
166 None=>return None,
167 Some(info)=>return Some(info),
168 }
169 }
170
171 fn get_socket_by_idx(&mut self,sock_idx:i32)->Option<&mut Socket> {
172 if sock_idx>self.sock_ids.capacity() || self.sockets[sock_idx as usize].is_none() {
173 return None
174 }
175 match &mut self.sockets[sock_idx as usize] {
176 None=>return None,
177 Some(info)=>return Some(&mut info.s),
178 }
179 }
180
181 pub(crate) fn allocate_socket_id(&mut self)->Result<i32,errcode::RESULT> {
182 let id = self.sock_ids.allocate_id();
183 if id==TsIdAllocator::INVALID_ID {
184 return Err(errcode::ERROR_OUTOF_MEM)
185 }
186 return Ok(id)
187 }
188 pub(crate) fn release_socket_id(&mut self,id:i32)->errcode::RESULT {
189 return self.sock_ids.release_id(id)
190 }
191
192 pub(crate) fn accept(&mut self,server_idx:i32)->Result<i32,errcode::RESULT> {
194 let sid = self.sock_ids.allocate_id();
195 println!("[socket pool]New Tcp client connection,server_sock={},client_sock={}",server_idx,sid);
196 if sid==TsIdAllocator::INVALID_ID {
197 return Err(errcode::ERROR_OUTOF_MEM)
198 }
199 if server_idx as usize>self.capacity{
200 return_error!(Err(errcode::ERROR_NOT_FOUND),self.sock_ids,sid)
201 }
202 let server = match &mut self.sockets[server_idx as usize] {
203 None=> {
204 return_error!(Err(errcode::ERROR_NOT_FOUND),self.sock_ids,sid)
205 },
206 Some(s)=>s,
207 };
208
209 if !server.s.is_tcp_server()|| server.s.state!=SOCKET_STATE::SOCK_LISTENING {
210 return_error!(Err(errcode::ERROR_INVALID_STATE),self.sock_ids,sid)
212 }
213
214 let client = match server.s.accept(sid) {
215 Ok(s)=>s,
216 Err(e)=>{
217 println!("accept connection error,sock_id={},ret={}",sid,e);
218 return_error!(Err(e),self.sock_ids,sid);
219 },
220 };
221 add_poll_registry(&client,false);
222 let dst= get_lb_task_id(&server.owner, sid,server.s.lb_policy);
223
224 println!("[socketpool]New Tcp client connection,server_sock={},client_sock={},peer_addr={},raw_fd={},dst_tid={}",
225 server_idx,sid,client.get_peer_addr(),client.get_raw_fd(),dst);
226 let sck_info = socket_info_t {
227 s:client,
228 owner:dst,
229 };
230
231 self.sockets[sid as usize]=Some(sck_info);
232
233 Ok(sid)
234 }
235
236 pub fn poll(&mut self,wait_msec:u32)->Option<Vec<socket_event_t>> {
237 return self.poll_instance.poll(wait_msec)
238 }
239
240 pub fn get_used_count(&self)->i32 {
241 self.sock_ids.used_count()
242 }
243
244 pub fn capacity(&self)->usize {
245 self.capacity as usize
246 }
247
248}
249
250fn get_lb_task_id(caller:&rsm::rsm_component_t,sock_id:i32,policy:SOCKET_LB_POLICY)->rsm_component_t {
252 let mut dst=caller.clone();
253 let attr=match rsm::rsm_sched::get_component_registry(caller.get_cid()) {
254 None=>return dst,
255 Some(a)=>a,
256 };
257 let inst = match policy {
258 SOCKET_LB_POLICY::SOCK_LB_ALL_INSTANCE=> {
259 sock_id as usize % attr.cattr.inst_num +1
260 },
261 SOCKET_LB_POLICY::SOCK_LB_CALLER_INSTANCE=> {
262 caller.inst_id
263 },
264 SOCKET_LB_POLICY::SOCK_LB_EXCLUDE_CALLER_INSTANCE=>{
265 if attr.cattr.inst_num<=1 {
266 return dst;
267 }
268 let mut vec_inst=Vec::new();
269 for i in 1..attr.cattr.inst_num+1 {
270 if i!=caller.inst_id {
271 vec_inst.push(i);
272 }
273 }
274 let idx = sock_id as usize %vec_inst.len();
275 vec_inst[idx]
276 },
277
278 };
279 dst.inst_id = inst;
280 return dst;
281}
282pub(crate) fn new_socket(sock_af:SOCKET_ADDRESS_FAMILY,sock_type:SOCKET_TYPE,proto:u8)->Result<i32,errcode::RESULT> {
284 let pool = match unsafe {&mut gSocketPool} {
285 None=> return Err(errcode::ERROR_NOT_INITIALIZED),
286 Some(p)=>p,
287 };
288
289 return pool.new_socket(sock_af,sock_type, proto);
290}
291
292pub(crate) fn close_socket(idx:i32)->errcode::RESULT {
293 let pool = match unsafe {&mut gSocketPool} {
294 None=> return errcode::ERROR_NOT_INITIALIZED,
295 Some(p)=>p,
296 };
297
298 return pool.close_socket(idx);
299}
300
301fn add_poll_registry(sock:&Socket,post_event:bool)->errcode::RESULT {
302 let pool = match unsafe {&mut gSocketPool} {
303 None=> return errcode::ERROR_NOT_INITIALIZED,
304 Some(p)=>p,
305 };
306 return pool.poll_instance.register(sock.get_raw_fd(), sock.get_socket_id() as usize,
307 SOCK_EVENT_READ | SOCK_EVENT_CLOSE,post_event)
308}
309
310pub(crate) fn init_socket_pool() {
312 os_sock_start();
313 unsafe {
314 gSocketPool = Some(SocketPool::new());
315 }
316
317 let _ = std::thread::spawn(pool_thread_main);
318 }
320
321pub(crate) fn init_socketpool_data() {
322 os_sock_start();
323 unsafe {
324 gSocketPool = Some(SocketPool::new());
325 }
326}
327
328
329pub(crate) fn get_socket_by_idx<'a>(sock_id:i32)->Option<&'a mut Socket> {
331 let pool = match unsafe {&mut gSocketPool} {
332 None=> return None,
333 Some(p)=>p,
334 };
335
336 return pool.get_socket_by_idx(sock_id)
337}
338
339const MAX_POLL_MSEC:u32=500;
340fn pool_thread_main() {
342
343 let pool_inst = match unsafe { &mut gSocketPool} {
344 None=>{
345 println!("Init Socket pool failed,thread exit");
346 return;
347 },
348 Some(p)=>p,
349 };
350 register_oam();
351 loop {
352 let events=match pool_inst.poll(MAX_POLL_MSEC) {
353 None=>continue,
354 Some(ev)=>ev,
355 };
356 if events.len()>0 {
357 process_events(pool_inst,events);
358 }
359
360 }
361
362}
363
364fn process_events(pool:&mut SocketPool,events:Vec<socket_event_t>) {
368 for mut ev in events {
370 println!("[socket pool]process events,ev={},socket_id={}",ev.event,ev.socket_id);
371
372 if pool.is_tcp_server(ev.socket_id) && (ev.event & SOCK_EVENT_READ)!=0{
373 let client_id = match pool.accept(ev.socket_id) {
374 Err(e)=>{
375 println!("Accept Socket error,ret={},server_sock_id={}",e,ev.socket_id);
376 continue;
377 },
378 Ok(idx)=>idx,
379 };
380 ev.socket_id=client_id;
382 ev.event=SOCK_EVENT_NEW;
383 }
384
385 let sck = match pool.get_sock_binding_info(ev.socket_id) {
386 None=>continue,
387 Some(s)=>s,
388 };
389 ev.sock_type = sck.s.get_sock_type();
390 let dst = sck.owner.clone();
391 let msg = match rsm::rsm_message_t::new::<rsm::rsm_socket_event_t>(rsm::RSM_MSG_ID_SOCKET,&ev) {
393 None=>continue,
394 Some(m)=>m,
395 };
396 if (ev.event & (SOCK_EVENT_CLOSE | rsm::SOCK_EVENT_ERR))!=0 {
397 pool.close_socket(ev.socket_id);
398 }
399
400 rsm::send_asyn_msg(&dst, msg);
401
402 }
403}
404
405const err_not_imp:&str="not implement";
407use rsm::oam;
408fn register_oam() {
409 let urls = ["/socket".to_string()];
410 rsm::oam::RegisterOamModule(&urls, socket_oam_callback);
411}
412
413fn socket_oam_callback(op:oam::E_RSM_OAM_OP,url:&String,param:&String)->oam::oam_cmd_resp_t {
414 let resp = oam::oam_cmd_resp_t::new(errcode::ERROR_NOT_SUPPORT, &err_not_imp.to_string());
415 match op {
416 oam::E_RSM_OAM_OP::CLI_OP_SHOW=>{
417 return read_socket_stats(url, param)
418 },
419 _=>(),
420 }
421
422 resp
423}
424
425fn read_socket_stats(url:&String,param:&String)->rsm::oam::oam_cmd_resp_t {
426 let mut resp = rsm::oam::oam_cmd_resp_t::new(errcode::ERROR_NOT_SUPPORT, &err_not_imp.to_string());
427 println!("recv oam call,url={},param={}",url,param);
428 let pool = match unsafe{&mut gSocketPool} {
429 None=>return resp,
430 Some(p)=>p,
431 };
432
433 let desc= format!("[socket pool,capacity={},used={}]",pool.capacity(), pool.get_used_count());
434 resp.Description = desc;
435 resp.RetCode=errcode::RESULT_SUCCESS;
436 resp
437}
438
439#[cfg(windows)]
440fn os_sock_start() {
441 let mut wsaData = unsafe {std::mem::zeroed::<WinSock::WSAData>()};
442 let wVersion=((2 as u16)<<8) | (2 as u16);
443 let ret = unsafe {
444 WinSock::WSAStartup(wVersion, &mut wsaData as *mut WinSock::WSAData)
445 };
446
447 if ret!=0 {
448 println!("init winsock err,ret={},os_err={}",ret,std::io::Error::last_os_error());
449 } else {
450 println!("init winsock success,version=0x{:x}",wVersion);
451 }
452
453}
454
455#[cfg(unix)]
456fn os_sock_start() {
457
458}