rust_rsm/rsm/socket/
poll.rs

1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4#![allow(dead_code)]
5
6use crate::common::{errcode, spin_lock};
7use crate::net_ext::{RawFdType};
8use super::*;
9use crate::rsm::{self};
10use core::ffi::c_void;
11use crate::rsm::{SOCK_EVENT_READ, SOCK_EVENT_WRITE, SOCK_EVENT_CLOSE,SOCK_EVENT_ERR};
12
13#[cfg(windows)]
14use crate::net_ext::{windows::rawsocket};
15#[cfg(unix)]
16use crate::net_ext::{unix::rawsocket};
17#[cfg(windows)]
18use windows_sys::Win32::System::IO;
19#[cfg(windows)]
20use windows_sys::Win32::Foundation::{ERROR_SUCCESS};
21
22#[cfg(unix)]
23use libc;
24#[cfg(windows)]
25use windows_sys::Win32::System::IO::OVERLAPPED;
26
27pub struct Poll {
28    poll_fd:RawFdType,
29    capacity:usize,
30    count:usize,
31    lock:spin_lock::spin_lock_t,
32}
33
34impl Poll {
35    pub fn new(capacity:usize)->Self {
36        let h = create_poll_inst(capacity as i32);
37        return Self { poll_fd: h,
38         capacity:capacity,
39         count:0,
40         lock:spin_lock::spin_lock_t::new(),
41         }
42    }
43
44    pub fn register(&mut self,fd:RawFdType,key:usize,event:rsm::SOCKET_EVENT,post_event:bool)->errcode::RESULT {
45        self.lock.lock();
46        let ret = poll_add_socket(self.poll_fd, fd, key, event);
47        if ret==errcode::RESULT_SUCCESS {
48            self.count+=1;
49            println!("register socket event success,socket_id={},event={},ret={},current_count={}",key,event,ret,self.count);
50        } else {
51            println!("register socket event failed,socket_id={},event={},ret={},current_count={}",key,event,ret,self.count);
52        }
53       
54        self.lock.unlock();
55        if post_event {
56            self.post_event(fd, key, rsm::SOCK_EVENT_READ);
57        }
58        return ret
59    }
60    #[cfg(windows)]
61    fn post_event(&self,fd:RawFdType,key:usize,event:rsm::SOCKET_EVENT) {
62        let os_ev=rsmev_to_osev(event);
63        let mut ol = unsafe { std::mem::zeroed::<OVERLAPPED>() };
64        ol.hEvent =  fd as isize;
65        unsafe {
66        IO::PostQueuedCompletionStatus(self.poll_fd as isize, os_ev,
67                 key, &ol as *const _);
68        }
69    }
70
71    #[cfg(unix)]
72    fn post_event(&self,_fd:RawFdType,_key:usize,_event:rsm::SOCKET_EVENT) {
73    }
74
75    pub fn deregister(&mut self,fd:RawFdType)->errcode::RESULT {
76        self.lock.lock();
77        let ret = poll_del_socket(self.poll_fd, fd);
78        if ret==errcode::RESULT_SUCCESS {
79            self.count-=1;
80        }
81        self.lock.unlock();
82        return ret
83    }
84
85    pub fn poll(&mut self,wait_msec:u32)->Option<Vec<socket_event_t>> {
86        return poll_wait(self.poll_fd, wait_msec)
87    }
88
89    pub fn capacity(&self)->usize {
90        self.capacity
91    }
92
93    pub fn used(&self)->usize {
94        self.count
95    }
96
97}
98
99impl Drop for Poll {
100    fn drop(&mut self) {
101        #[cfg(unix)]
102        rawsocket::close_fd(self.poll_fd);
103        #[cfg(windows)]
104        unsafe { windows_sys::Win32::Foundation::CloseHandle(self.poll_fd as isize) };
105    }
106}
107
108#[cfg(windows)]
109fn create_poll_inst(_capacity:i32)->RawFdType {
110    use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
111
112   let h = unsafe { IO::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 
113    0, 1) };
114    println!("[poll]create IOCP success,port={}",h);
115   return h as RawFdType;
116}
117
118#[cfg(unix)]
119fn create_poll_inst(capacity:i32)->RawFdType {
120   return unsafe {libc::epoll_create(capacity)}
121}
122
123#[cfg(windows)]
124fn poll_add_socket(inst:RawFdType,fd:RawFdType,key:usize,events:u32)->errcode::RESULT {
125    //let mut len=0u32;
126    let mut sock_reg=unsafe { std::mem::zeroed::<WinSock::SOCK_NOTIFY_REGISTRATION>() };
127    sock_reg.completionKey = key as *mut c_void;
128    sock_reg.operation = WinSock::SOCK_NOTIFY_OP_ENABLE as u8;
129    sock_reg.socket = fd as usize;
130    sock_reg.eventFilter = rsmev_to_osev(events) as u16;
131    sock_reg.triggerFlags = (WinSock::SOCK_NOTIFY_TRIGGER_LEVEL | WinSock::SOCK_NOTIFY_TRIGGER_PERSISTENT) as u8;
132    let ret = unsafe {
133        WinSock::ProcessSocketNotifications(inst as isize, 1, 
134            &mut sock_reg as * mut WinSock::SOCK_NOTIFY_REGISTRATION, 0, 
135            0, 
136            std::ptr::null_mut(), std::ptr::null_mut())
137    };
138    // >=Vec::with_capacity(MAX_WAIT_EVENTS_NUM);
139    
140    /*let ret = unsafe { IO::CreateIoCompletionPort(fd as isize, inst as isize, 
141        key, 1) }; */
142
143    if ret==ERROR_SUCCESS && sock_reg.registrationResult==ERROR_SUCCESS 
144    {
145        println!("[poll]add socket to IOCP success,IOCP={},sock_fd={},ret={},",inst,fd,ret);
146        return errcode::RESULT_SUCCESS
147    }
148    
149    println!("[poll]add socket event error,iocp={},fd={},events={},ret={},reg_result={},key={},os_err={}",
150        inst,fd,sock_reg.eventFilter,ret,sock_reg.registrationResult,key,
151        unsafe {WinSock::WSAGetLastError()});
152    return errcode::ERROR_OS_CALL_FAILED
153
154   
155}
156
157#[cfg(unix)]
158fn poll_add_socket(inst:RawFdType,fd:RawFdType,key:usize,events:u32)->errcode::RESULT {
159    let mut ev = unsafe { std::mem::zeroed::<libc::epoll_event>() };
160    ev.events = rsmev_to_osev(events) |  libc::EPOLLET as u32;
161    ev.u64 = key as u64;
162    let ret = unsafe { libc::epoll_ctl(inst,libc::EPOLL_CTL_ADD, fd,
163        &mut ev as *mut libc::epoll_event) };
164    if ret!=0 {
165        return errcode::ERROR_OS_CALL_FAILED
166    }
167    errcode::RESULT_SUCCESS
168}
169
170#[cfg(windows)]
171fn poll_del_socket(_inst:RawFdType,_fd:RawFdType)->errcode::RESULT {
172    errcode::RESULT_SUCCESS
173 }
174
175#[cfg(unix)]
176fn poll_del_socket(inst:RawFdType,fd:RawFdType)->errcode::RESULT {
177    let mut ev = unsafe { std::mem::zeroed::<libc::epoll_event>() };
178    ev.events = (libc::EPOLLIN | libc::EPOLLRDHUP) as u32;
179    unsafe { libc::epoll_ctl(inst,libc::EPOLL_CTL_DEL, fd,
180        &mut ev as *mut libc::epoll_event) };    
181    errcode::RESULT_SUCCESS
182 }
183
184 const MAX_WAIT_EVENTS_NUM:usize=128;
185 #[cfg(windows)]
186fn poll_wait(inst:RawFdType,wait_msec:u32)->Option<Vec<socket_event_t>> {
187    use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY;
188
189    let mut len=0u32;
190    //let mut sock_reg:Vec<WinSock::SOCK_NOTIFY_REGISTRATION>=Vec::with_capacity(MAX_WAIT_EVENTS_NUM);
191    let mut lpoverlapped:Vec<OVERLAPPED_ENTRY> = Vec::with_capacity(MAX_WAIT_EVENTS_NUM);
192
193    unsafe { 
194        lpoverlapped.set_len(MAX_WAIT_EVENTS_NUM)
195    }
196    let ret = unsafe {
197        WinSock::ProcessSocketNotifications(inst as isize, 0, 
198            std::ptr::null_mut(), wait_msec, MAX_WAIT_EVENTS_NUM as u32, 
199            lpoverlapped.as_mut_ptr(), &mut len as *mut u32)      
200        /*IO::GetQueuedCompletionStatusEx(inst as isize, lpoverlapped.as_mut_ptr(), 
201        MAX_WAIT_EVENTS_NUM as u32,&mut len as *mut u32,
202        wait_msec,1)*/
203    
204    };
205    if len==0 || ret!=0 {
206    //(ret!=ERROR_SUCCESS && ret !=WAIT_TIMEOUT) 
207        //println!("pool event failed,ret={},os_error={}",ret,std::io::Error::last_os_error());
208        return None
209    }
210    {
211    let handle=if lpoverlapped[0].lpOverlapped.is_null() {0} else {
212        unsafe { (*lpoverlapped[0].lpOverlapped).hEvent }
213    };
214    println!("[poll]poll event success,ret={},event_count={},ret_ev={},handle={}",ret,len,
215        lpoverlapped[0].dwNumberOfBytesTransferred,handle);
216    }
217    let mut evs:Vec<socket_event_t>=Vec::new();
218    for i in 0..len as usize {
219        let ol=&lpoverlapped[i];
220        let os_ev = ol.dwNumberOfBytesTransferred;
221        let sock_ev = socket_event_t {
222            socket_id:ol.lpCompletionKey as i32,
223            sock_type:SOCKET_TYPE::PROTO_DGRAM,
224            event:osev_to_rsmev(os_ev), 
225        };
226        evs.push(sock_ev);
227    }
228    return Some(evs)
229 }
230
231#[cfg(unix)]
232fn poll_wait(fd:RawFdType,wait_msec:u32)->Option<Vec<socket_event_t>> {
233
234    let mut events:Vec<libc::epoll_event> = Vec::with_capacity(MAX_WAIT_EVENTS_NUM);
235    unsafe { 
236        //keys.set_len(128);
237        events.set_len(MAX_WAIT_EVENTS_NUM)
238    }
239    let ret = unsafe { libc::epoll_wait(fd,events.as_mut_ptr(),MAX_WAIT_EVENTS_NUM as i32,wait_msec as i32) };
240    if ret<=0 {
241        return None
242    }
243    let mut evs:Vec<socket_event_t>=Vec::new();
244    for i in 0..ret as usize {
245        let ev = &events[i];
246        let sock_ev = socket_event_t {
247            socket_id:ev.u64 as i32,
248            sock_type:SOCKET_TYPE::PROTO_DGRAM,
249            event:osev_to_rsmev(ev.events as i32),
250        };
251        evs.push(sock_ev);
252    }
253
254    return Some(evs)
255 }
256
257
258 #[cfg(windows)]
259 fn rsmev_to_osev(ev:u32)->u32 {
260    let mut os_ev = 0u32;
261    if ev & rsm::SOCK_EVENT_READ != 0 {
262        os_ev |= WinSock::SOCK_NOTIFY_REGISTER_EVENT_IN;
263    }    
264
265    if ev & rsm::SOCK_EVENT_WRITE != 0 {
266        os_ev |= WinSock::SOCK_NOTIFY_REGISTER_EVENT_OUT;
267    }
268
269    if ev & rsm::SOCK_EVENT_CLOSE != 0 {
270        os_ev |= WinSock::SOCK_NOTIFY_REGISTER_EVENT_HANGUP;
271    }
272
273    return os_ev
274  }
275 
276 #[cfg(unix)]
277 fn rsmev_to_osev(ev:u32)->u32 {
278    
279    let mut os_ev = 0i32;
280    if ev & SOCK_EVENT_READ != 0 {
281        os_ev |= libc::EPOLLIN;
282    }
283    if ev & SOCK_EVENT_WRITE != 0 {
284        os_ev |= libc::EPOLLOUT;
285    }  
286
287    if ev & SOCK_EVENT_CLOSE != 0 {
288        os_ev |= libc::EPOLLRDHUP;
289    }
290    return os_ev as u32
291}
292
293 #[cfg(windows)]
294 fn osev_to_rsmev(ev:u32)->rsm::SOCKET_EVENT {
295
296    let mut rsm_ev = 0u32;
297    if ev & WinSock::SOCK_NOTIFY_EVENT_IN !=0 {
298        rsm_ev |= SOCK_EVENT_READ
299    }
300    if ev & WinSock::SOCK_NOTIFY_EVENT_OUT !=0 {
301        rsm_ev |= SOCK_EVENT_WRITE
302    }
303    if ev & WinSock::SOCK_NOTIFY_EVENT_HANGUP !=0 {
304        rsm_ev |= SOCK_EVENT_CLOSE
305    }
306    if ev & WinSock::SOCK_NOTIFY_EVENT_ERR !=0 {
307        rsm_ev |= SOCK_EVENT_ERR
308    }
309    return rsm_ev
310  }
311 
312 #[cfg(unix)]
313 fn osev_to_rsmev(ev:i32)->rsm::SOCKET_EVENT {
314    let mut rsm_ev = 0u32;
315    if ev & libc::EPOLLIN != 0 {
316        rsm_ev |= SOCK_EVENT_READ;
317    }
318    if ev & libc::EPOLLOUT != 0 {
319        rsm_ev |= SOCK_EVENT_WRITE;
320    }  
321
322    if ev &  libc::EPOLLRDHUP!= 0 {
323        rsm_ev |= SOCK_EVENT_CLOSE;
324    }
325    if ev & libc::EPOLLERR!=0 {
326        rsm_ev |= SOCK_EVENT_ERR
327    }
328
329    return rsm_ev
330}
331
332#[cfg(test)]
333mod tests {
334    use std::net::{SocketAddr};
335    use super::*;
336    use super::socketpool;
337    #[test]
338    fn test_rsm_poll() {
339       socketpool::init_socketpool_data();
340
341        let mut p=Poll::new(128);
342        let addr1=SocketAddr::new("0.0.0.0".parse().unwrap(), 14010);
343        let lis=match TcpListener::new(&addr1, 1024, SOCKET_LB_POLICY::SOCK_LB_ALL_INSTANCE) {
344            Ok(s)=>s,
345            Err(e)=>{
346                println!("Create TCP Listener failed,ret={},local_addr={}",e,addr1);
347                assert!(false);
348                return
349            },
350        };
351        let s1=match socket(SOCKET_ADDRESS_FAMILY::SOCKET_INET, SOCKET_TYPE::PROTO_DGRAM, net_ext::IpProtos::Ip_Proto_UDP) {
352            Ok(s)=>s,
353            Err(e)=>{
354                assert_eq!(e,errcode::RESULT_SUCCESS);
355                return
356            }
357        };
358        let ret = p.register(s1, 20000 as usize, rsm::SOCK_EVENT_READ | rsm::SOCK_EVENT_CLOSE,false);
359        println!("Register event listener,ret={},socket={}",ret,s1);
360        assert_eq!(ret,errcode::RESULT_SUCCESS);
361    }
362}