rust_rsm/rsm/socket/
socketpool.rs

1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4#![allow(dead_code)]
5
6use crate::rsm::{SOCK_EVENT_READ, SOCK_EVENT_CLOSE, SOCK_EVENT_NEW};
7use crate::{rsm,rsm::rsm_component_t};
8use crate::{net_ext::RawFdType};
9use crate::common::{tsidallocator::TsIdAllocator,spin_lock::spin_lock_t};
10use super::*;
11use crate::common::errcode;
12
13
14#[macro_export]
15macro_rules! return_error {
16    ($e:expr,$ids:expr,$idx:expr) => ({
17        $ids.release_id($idx);
18        return $e;
19    })
20}
21
22const MAX_SOCKET_NUM:usize=131072;
23pub(crate) struct socket_info_t {
24    pub(crate) s:Socket,
25    pub(crate) owner:rsm_component_t,
26}
27
28pub(crate) struct SocketPool {
29    sock_ids:TsIdAllocator,
30    sockets:[Option<socket_info_t>;MAX_SOCKET_NUM+1],
31    capacity:usize,
32    lock:spin_lock_t,
33    poll_instance:poll::Poll,
34}
35
36static mut gSocketPool:Option<Box<SocketPool>>=None;
37impl SocketPool {
38    pub fn new()->Box<Self> {
39        let ids = TsIdAllocator::new(1, MAX_SOCKET_NUM as i32);
40        let poll_inst = poll::Poll::new(MAX_SOCKET_NUM);
41        let pool_ptr =  unsafe {  
42            std::alloc::alloc_zeroed(std::alloc::Layout::from_size_align_unchecked(std::mem::size_of::<SocketPool>(), 1))
43            as * mut SocketPool
44        };
45        let mut pool = unsafe { Box::from_raw(pool_ptr) };
46        pool.sock_ids = ids;   
47        pool.capacity = MAX_SOCKET_NUM;
48        pool.lock = spin_lock_t::new();
49        pool.poll_instance=poll_inst;
50            
51
52        for i in 1..MAX_SOCKET_NUM+1 {
53            pool.sockets[i]=None;
54        }
55        println!("init socket pool success,capacity={}",MAX_SOCKET_NUM);
56        return pool
57    }
58
59    
60    pub fn new_socket(&mut self,sock_af:SOCKET_ADDRESS_FAMILY,sock_type:SOCKET_TYPE,proto:u8)->Result<i32,errcode::RESULT> {
61        const _DEBUG:bool=true;
62        let caller=match rsm::get_self_cid() {
63            None=> {
64                if _DEBUG {
65                    rsm::rsm_component_t::new_zero()
66                } else {
67                    return Err(errcode::ERROR_INVALID_STATE)
68                }
69                
70            },
71            Some(c)=>c,
72        };        
73       
74        let sid=self.sock_ids.allocate_id();
75        if sid==TsIdAllocator::INVALID_ID {
76            return Err(errcode::ERROR_OUTOF_MEM)
77        }
78        let mut sock = match Socket::new_socket(sid,sock_af,sock_type,proto) {
79            Ok(s)=>s,
80            Err(e)=>return_error!(Err(e),self.sock_ids,sid),
81        };
82        sock.set_non_blocking();
83        self.add_poll_registry(&sock);
84        let sck_info = socket_info_t {
85            s:sock,
86            owner:caller,
87        };
88        self.sockets[sid as usize]=Some(sck_info);
89
90        
91        return Ok(sid);
92    }
93
94    fn add_poll_registry(&mut self,sock:&Socket)->errcode::RESULT {
95        return self.poll_instance.register(sock.get_raw_fd(), sock.get_socket_id() as usize, 
96            SOCK_EVENT_READ | SOCK_EVENT_CLOSE,false)
97    }
98
99    fn del_poll_registry(&mut self,fd:RawFdType)->errcode::RESULT {
100        return self.poll_instance.deregister(fd)
101    }
102    //close socket,release socket index, let drop method close the underlying socket
103    pub fn close_socket(&mut self,sock_idx:i32,)->errcode::RESULT {
104        if !self.check_socket_caller(sock_idx) {
105            return errcode::ERROR_NO_PERMISSION
106        }
107        {
108            let fd = match self.get_socket_by_idx(sock_idx) {
109                None=>return errcode::ERROR_NOT_FOUND,
110                Some(s)=>s.get_raw_fd(),
111            };
112            self.del_poll_registry(fd);
113        }
114        let ret = self.sock_ids.release_id(sock_idx);
115        if ret !=errcode::RESULT_SUCCESS {
116            return ret
117        }
118        if self.sockets[sock_idx as usize].is_none() {
119            return errcode::ERROR_NOT_FOUND
120        }
121       
122        self.sockets[sock_idx as usize]=None;
123        errcode::RESULT_SUCCESS
124    }
125
126    fn check_socket_caller(&self,sock_idx:i32)->bool {
127        if sock_idx>self.sock_ids.capacity() || self.sockets[sock_idx as usize].is_none() {
128            return false
129        }
130        let caller=match rsm::get_self_cid() {
131            None=>return false,
132            Some(c)=>c,
133        };
134        if let Some(sinfo)=&self.sockets[sock_idx as usize] {
135            if sinfo.owner==caller {
136                return true
137            } else {
138                return false
139            }
140        }
141
142        return false
143
144    }
145
146    fn is_valid_sock_id(&self,sock_id:i32)->bool {
147        return sock_id>0 && sock_id as usize<=self.capacity && self.sockets[sock_id as usize].is_some()
148    }
149
150    pub(crate) fn is_tcp_server(&self,sock_id:i32)->bool {
151        if !self.is_valid_sock_id(sock_id) {
152            return false
153        }
154
155        return match &self.sockets[sock_id as usize] {
156            None=>false,
157            Some(info)=>info.s.is_tcp_server(),
158        };
159    }
160
161    pub(crate) fn get_sock_binding_info(&mut self,sock_idx:i32)->Option<&mut socket_info_t> {
162        if sock_idx>self.sock_ids.capacity() || self.sockets[sock_idx as usize].is_none() {
163            return None
164        }
165        match &mut self.sockets[sock_idx as usize] {
166            None=>return None,
167            Some(info)=>return Some(info),
168        }
169    }
170
171    fn get_socket_by_idx(&mut self,sock_idx:i32)->Option<&mut Socket> {
172        if sock_idx>self.sock_ids.capacity() || self.sockets[sock_idx as usize].is_none() {
173            return None
174        }
175        match &mut self.sockets[sock_idx as usize] {
176            None=>return None,
177            Some(info)=>return Some(&mut info.s),
178        }
179    }
180
181    pub(crate) fn allocate_socket_id(&mut self)->Result<i32,errcode::RESULT> {
182        let id = self.sock_ids.allocate_id();
183        if id==TsIdAllocator::INVALID_ID {
184            return Err(errcode::ERROR_OUTOF_MEM)
185        }
186        return Ok(id)
187    }
188    pub(crate) fn release_socket_id(&mut self,id:i32)->errcode::RESULT {
189        return self.sock_ids.release_id(id)
190    }
191
192    ///accept a new tcp connection, as insert into the event listener
193    pub(crate) fn accept(&mut self,server_idx:i32)->Result<i32,errcode::RESULT> {
194        let sid = self.sock_ids.allocate_id();
195        println!("[socket pool]New Tcp client connection,server_sock={},client_sock={}",server_idx,sid);
196        if sid==TsIdAllocator::INVALID_ID {
197            return Err(errcode::ERROR_OUTOF_MEM)
198        }
199        if server_idx as usize>self.capacity{            
200            return_error!(Err(errcode::ERROR_NOT_FOUND),self.sock_ids,sid)
201        }
202        let server = match &mut self.sockets[server_idx as usize] {
203            None=> {
204                return_error!(Err(errcode::ERROR_NOT_FOUND),self.sock_ids,sid)
205            },
206            Some(s)=>s,
207        };
208        
209        if !server.s.is_tcp_server()|| server.s.state!=SOCKET_STATE::SOCK_LISTENING {
210            //println!("[socket pool]socket state error,server_idx={},state={:?}",server_idx,server.s.state);
211            return_error!(Err(errcode::ERROR_INVALID_STATE),self.sock_ids,sid)
212        }
213
214        let client = match server.s.accept(sid) {
215            Ok(s)=>s,
216            Err(e)=>{
217                 println!("accept connection error,sock_id={},ret={}",sid,e);
218                 return_error!(Err(e),self.sock_ids,sid);
219            },
220        };
221        add_poll_registry(&client,false);
222        let dst= get_lb_task_id(&server.owner, sid,server.s.lb_policy);
223
224        println!("[socketpool]New Tcp client connection,server_sock={},client_sock={},peer_addr={},raw_fd={},dst_tid={}",
225            server_idx,sid,client.get_peer_addr(),client.get_raw_fd(),dst);
226        let sck_info = socket_info_t {
227            s:client,
228            owner:dst,
229        };
230
231        self.sockets[sid as usize]=Some(sck_info);
232        
233        Ok(sid)
234    }
235
236    pub fn poll(&mut self,wait_msec:u32)->Option<Vec<socket_event_t>> {
237        return self.poll_instance.poll(wait_msec)
238    }
239
240    pub fn get_used_count(&self)->i32 {
241        self.sock_ids.used_count()
242    }
243
244    pub fn capacity(&self)->usize {
245        self.capacity as usize
246    }
247
248}
249
250///get a loadbalanced component_id by the sock_id,
251fn get_lb_task_id(caller:&rsm::rsm_component_t,sock_id:i32,policy:SOCKET_LB_POLICY)->rsm_component_t {
252    let mut dst=caller.clone();
253    let attr=match rsm::rsm_sched::get_component_registry(caller.get_cid()) {
254        None=>return dst,
255        Some(a)=>a,
256    };
257    let inst = match policy {
258        SOCKET_LB_POLICY::SOCK_LB_ALL_INSTANCE=> {
259            sock_id as usize % attr.cattr.inst_num +1
260        },
261        SOCKET_LB_POLICY::SOCK_LB_CALLER_INSTANCE=> {
262            caller.inst_id
263        },
264        SOCKET_LB_POLICY::SOCK_LB_EXCLUDE_CALLER_INSTANCE=>{
265            if attr.cattr.inst_num<=1 {
266                return dst;
267            }
268            let mut vec_inst=Vec::new();
269            for i in 1..attr.cattr.inst_num+1 {
270                if i!=caller.inst_id {
271                    vec_inst.push(i);
272                }
273            }
274            let idx = sock_id as usize %vec_inst.len();
275            vec_inst[idx]            
276        },
277       
278    };
279    dst.inst_id = inst;
280    return dst;
281}
282///创建一个Socket
283pub(crate) fn new_socket(sock_af:SOCKET_ADDRESS_FAMILY,sock_type:SOCKET_TYPE,proto:u8)->Result<i32,errcode::RESULT> {
284    let pool = match unsafe {&mut gSocketPool} {
285        None=> return Err(errcode::ERROR_NOT_INITIALIZED),
286        Some(p)=>p,
287    };
288
289    return pool.new_socket(sock_af,sock_type, proto);
290}
291
292pub(crate) fn close_socket(idx:i32)->errcode::RESULT {
293    let pool = match unsafe {&mut gSocketPool} {
294        None=> return errcode::ERROR_NOT_INITIALIZED,
295        Some(p)=>p,
296    };
297
298    return pool.close_socket(idx);
299}
300
301fn add_poll_registry(sock:&Socket,post_event:bool)->errcode::RESULT {
302    let pool = match unsafe {&mut gSocketPool} {
303        None=> return errcode::ERROR_NOT_INITIALIZED,
304        Some(p)=>p,
305    };    
306    return pool.poll_instance.register(sock.get_raw_fd(), sock.get_socket_id() as usize, 
307        SOCK_EVENT_READ | SOCK_EVENT_CLOSE,post_event)
308}
309
310///init socket pool, and start socket pool thread
311pub(crate) fn init_socket_pool() {
312    os_sock_start();
313    unsafe {
314        gSocketPool = Some(SocketPool::new());
315    }
316    
317    let _ = std::thread::spawn(pool_thread_main);
318    //std::thread::Builder::new().stack_size(4*1024*1024).name("socket_pool".to_string()).spawn(pool_thread_main);
319}
320
321pub(crate) fn init_socketpool_data() {
322    os_sock_start();
323    unsafe {
324        gSocketPool = Some(SocketPool::new());
325    }   
326}
327
328
329///get underlying Socket instance by socket index
330pub(crate) fn get_socket_by_idx<'a>(sock_id:i32)->Option<&'a mut Socket> {
331    let pool = match unsafe {&mut gSocketPool} {
332        None=> return None,
333        Some(p)=>p,
334    };
335    
336    return pool.get_socket_by_idx(sock_id)
337}
338
339const MAX_POLL_MSEC:u32=500;
340///SocketPool main thread, poll the socket events, and send message to correspondant component
341fn pool_thread_main() {
342
343    let pool_inst = match unsafe { &mut gSocketPool} {
344        None=>{
345            println!("Init Socket pool failed,thread exit");
346            return;
347        },
348        Some(p)=>p,
349    };
350    register_oam();
351    loop {
352        let events=match pool_inst.poll(MAX_POLL_MSEC) {
353            None=>continue,
354            Some(ev)=>ev,
355        };
356        if events.len()>0 {
357            process_events(pool_inst,events);
358        }
359        
360    }
361
362}
363
364///process socket event
365/// the events for tcp listener are processed only by socketpool
366/// other socket events are sent to application
367fn process_events(pool:&mut SocketPool,events:Vec<socket_event_t>) {
368    //println!("process events,number={}",events.len());
369    for mut ev in events {
370        println!("[socket pool]process events,ev={},socket_id={}",ev.event,ev.socket_id);
371       
372        if pool.is_tcp_server(ev.socket_id) && (ev.event & SOCK_EVENT_READ)!=0{
373            let client_id = match pool.accept(ev.socket_id) {
374                Err(e)=>{
375                    println!("Accept Socket error,ret={},server_sock_id={}",e,ev.socket_id);
376                    continue;
377                },
378                Ok(idx)=>idx,
379            };
380            //continue;
381            ev.socket_id=client_id;
382            ev.event=SOCK_EVENT_NEW;            
383        }
384        
385        let sck = match pool.get_sock_binding_info(ev.socket_id) {
386            None=>continue,
387            Some(s)=>s,
388        };
389        ev.sock_type = sck.s.get_sock_type();
390        let dst =  sck.owner.clone();
391        //get_lb_task_id(&sck.owner, ev.sock_id, sck.s.get_lb_policy());   
392        let msg = match rsm::rsm_message_t::new::<rsm::rsm_socket_event_t>(rsm::RSM_MSG_ID_SOCKET,&ev) {
393            None=>continue,
394            Some(m)=>m,
395        };
396        if (ev.event & (SOCK_EVENT_CLOSE | rsm::SOCK_EVENT_ERR))!=0 {
397            pool.close_socket(ev.socket_id);
398        }
399        
400        rsm::send_asyn_msg(&dst, msg);
401
402    }
403}
404
405///OAM implementation
406const err_not_imp:&str="not implement";
407use rsm::oam;
408fn register_oam() {
409    let urls = ["/socket".to_string()];
410    rsm::oam::RegisterOamModule(&urls, socket_oam_callback);
411}
412
413fn socket_oam_callback(op:oam::E_RSM_OAM_OP,url:&String,param:&String)->oam::oam_cmd_resp_t {
414    let resp = oam::oam_cmd_resp_t::new(errcode::ERROR_NOT_SUPPORT, &err_not_imp.to_string());
415    match op {
416        oam::E_RSM_OAM_OP::CLI_OP_SHOW=>{
417            return read_socket_stats(url, param)
418        },
419        _=>(),
420    }
421
422    resp
423}
424
425fn read_socket_stats(url:&String,param:&String)->rsm::oam::oam_cmd_resp_t {
426    let mut resp = rsm::oam::oam_cmd_resp_t::new(errcode::ERROR_NOT_SUPPORT, &err_not_imp.to_string());
427    println!("recv oam call,url={},param={}",url,param);
428    let pool = match unsafe{&mut gSocketPool} {
429        None=>return resp,
430        Some(p)=>p,
431    };
432
433    let desc= format!("[socket pool,capacity={},used={}]",pool.capacity(), pool.get_used_count());
434    resp.Description = desc;
435    resp.RetCode=errcode::RESULT_SUCCESS;
436    resp
437}
438
439#[cfg(windows)]
440fn os_sock_start() {
441    let mut wsaData = unsafe {std::mem::zeroed::<WinSock::WSAData>()};
442    let wVersion=((2 as u16)<<8) | (2 as u16);
443    let ret = unsafe {
444        WinSock::WSAStartup(wVersion, &mut wsaData as *mut WinSock::WSAData)
445    };
446
447    if ret!=0 {
448        println!("init winsock err,ret={},os_err={}",ret,std::io::Error::last_os_error());
449    } else {
450        println!("init winsock success,version=0x{:x}",wVersion);
451    }
452    
453}
454
455#[cfg(unix)]
456fn os_sock_start() {
457        
458}