1#![allow(non_camel_case_types)]
2#![allow(non_snake_case)]
3#![allow(non_upper_case_globals)]
4#![allow(dead_code)]
5
6use crate::common::{errcode, spin_lock};
7use crate::net_ext::{RawFdType};
8use super::*;
9use crate::rsm::{self};
10use core::ffi::c_void;
11use crate::rsm::{SOCK_EVENT_READ, SOCK_EVENT_WRITE, SOCK_EVENT_CLOSE,SOCK_EVENT_ERR};
12
13#[cfg(windows)]
14use crate::net_ext::{windows::rawsocket};
15#[cfg(unix)]
16use crate::net_ext::{unix::rawsocket};
17#[cfg(windows)]
18use windows_sys::Win32::System::IO;
19#[cfg(windows)]
20use windows_sys::Win32::Foundation::{ERROR_SUCCESS};
21
22#[cfg(unix)]
23use libc;
24#[cfg(windows)]
25use windows_sys::Win32::System::IO::OVERLAPPED;
26
27pub struct Poll {
28 poll_fd:RawFdType,
29 capacity:usize,
30 count:usize,
31 lock:spin_lock::spin_lock_t,
32}
33
34impl Poll {
35 pub fn new(capacity:usize)->Self {
36 let h = create_poll_inst(capacity as i32);
37 return Self { poll_fd: h,
38 capacity:capacity,
39 count:0,
40 lock:spin_lock::spin_lock_t::new(),
41 }
42 }
43
44 pub fn register(&mut self,fd:RawFdType,key:usize,event:rsm::SOCKET_EVENT,post_event:bool)->errcode::RESULT {
45 self.lock.lock();
46 let ret = poll_add_socket(self.poll_fd, fd, key, event);
47 if ret==errcode::RESULT_SUCCESS {
48 self.count+=1;
49 println!("register socket event success,socket_id={},event={},ret={},current_count={}",key,event,ret,self.count);
50 } else {
51 println!("register socket event failed,socket_id={},event={},ret={},current_count={}",key,event,ret,self.count);
52 }
53
54 self.lock.unlock();
55 if post_event {
56 self.post_event(fd, key, rsm::SOCK_EVENT_READ);
57 }
58 return ret
59 }
60 #[cfg(windows)]
61 fn post_event(&self,fd:RawFdType,key:usize,event:rsm::SOCKET_EVENT) {
62 let os_ev=rsmev_to_osev(event);
63 let mut ol = unsafe { std::mem::zeroed::<OVERLAPPED>() };
64 ol.hEvent = fd as isize;
65 unsafe {
66 IO::PostQueuedCompletionStatus(self.poll_fd as isize, os_ev,
67 key, &ol as *const _);
68 }
69 }
70
71 #[cfg(unix)]
72 fn post_event(&self,_fd:RawFdType,_key:usize,_event:rsm::SOCKET_EVENT) {
73 }
74
75 pub fn deregister(&mut self,fd:RawFdType)->errcode::RESULT {
76 self.lock.lock();
77 let ret = poll_del_socket(self.poll_fd, fd);
78 if ret==errcode::RESULT_SUCCESS {
79 self.count-=1;
80 }
81 self.lock.unlock();
82 return ret
83 }
84
85 pub fn poll(&mut self,wait_msec:u32)->Option<Vec<socket_event_t>> {
86 return poll_wait(self.poll_fd, wait_msec)
87 }
88
89 pub fn capacity(&self)->usize {
90 self.capacity
91 }
92
93 pub fn used(&self)->usize {
94 self.count
95 }
96
97}
98
99impl Drop for Poll {
100 fn drop(&mut self) {
101 #[cfg(unix)]
102 rawsocket::close_fd(self.poll_fd);
103 #[cfg(windows)]
104 unsafe { windows_sys::Win32::Foundation::CloseHandle(self.poll_fd as isize) };
105 }
106}
107
108#[cfg(windows)]
109fn create_poll_inst(_capacity:i32)->RawFdType {
110 use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
111
112 let h = unsafe { IO::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0,
113 0, 1) };
114 println!("[poll]create IOCP success,port={}",h);
115 return h as RawFdType;
116}
117
118#[cfg(unix)]
119fn create_poll_inst(capacity:i32)->RawFdType {
120 return unsafe {libc::epoll_create(capacity)}
121}
122
123#[cfg(windows)]
124fn poll_add_socket(inst:RawFdType,fd:RawFdType,key:usize,events:u32)->errcode::RESULT {
125 let mut sock_reg=unsafe { std::mem::zeroed::<WinSock::SOCK_NOTIFY_REGISTRATION>() };
127 sock_reg.completionKey = key as *mut c_void;
128 sock_reg.operation = WinSock::SOCK_NOTIFY_OP_ENABLE as u8;
129 sock_reg.socket = fd as usize;
130 sock_reg.eventFilter = rsmev_to_osev(events) as u16;
131 sock_reg.triggerFlags = (WinSock::SOCK_NOTIFY_TRIGGER_LEVEL | WinSock::SOCK_NOTIFY_TRIGGER_PERSISTENT) as u8;
132 let ret = unsafe {
133 WinSock::ProcessSocketNotifications(inst as isize, 1,
134 &mut sock_reg as * mut WinSock::SOCK_NOTIFY_REGISTRATION, 0,
135 0,
136 std::ptr::null_mut(), std::ptr::null_mut())
137 };
138 if ret==ERROR_SUCCESS && sock_reg.registrationResult==ERROR_SUCCESS
144 {
145 println!("[poll]add socket to IOCP success,IOCP={},sock_fd={},ret={},",inst,fd,ret);
146 return errcode::RESULT_SUCCESS
147 }
148
149 println!("[poll]add socket event error,iocp={},fd={},events={},ret={},reg_result={},key={},os_err={}",
150 inst,fd,sock_reg.eventFilter,ret,sock_reg.registrationResult,key,
151 unsafe {WinSock::WSAGetLastError()});
152 return errcode::ERROR_OS_CALL_FAILED
153
154
155}
156
157#[cfg(unix)]
158fn poll_add_socket(inst:RawFdType,fd:RawFdType,key:usize,events:u32)->errcode::RESULT {
159 let mut ev = unsafe { std::mem::zeroed::<libc::epoll_event>() };
160 ev.events = rsmev_to_osev(events) | libc::EPOLLET as u32;
161 ev.u64 = key as u64;
162 let ret = unsafe { libc::epoll_ctl(inst,libc::EPOLL_CTL_ADD, fd,
163 &mut ev as *mut libc::epoll_event) };
164 if ret!=0 {
165 return errcode::ERROR_OS_CALL_FAILED
166 }
167 errcode::RESULT_SUCCESS
168}
169
170#[cfg(windows)]
171fn poll_del_socket(_inst:RawFdType,_fd:RawFdType)->errcode::RESULT {
172 errcode::RESULT_SUCCESS
173 }
174
175#[cfg(unix)]
176fn poll_del_socket(inst:RawFdType,fd:RawFdType)->errcode::RESULT {
177 let mut ev = unsafe { std::mem::zeroed::<libc::epoll_event>() };
178 ev.events = (libc::EPOLLIN | libc::EPOLLRDHUP) as u32;
179 unsafe { libc::epoll_ctl(inst,libc::EPOLL_CTL_DEL, fd,
180 &mut ev as *mut libc::epoll_event) };
181 errcode::RESULT_SUCCESS
182 }
183
184 const MAX_WAIT_EVENTS_NUM:usize=128;
185 #[cfg(windows)]
186fn poll_wait(inst:RawFdType,wait_msec:u32)->Option<Vec<socket_event_t>> {
187 use windows_sys::Win32::System::IO::OVERLAPPED_ENTRY;
188
189 let mut len=0u32;
190 let mut lpoverlapped:Vec<OVERLAPPED_ENTRY> = Vec::with_capacity(MAX_WAIT_EVENTS_NUM);
192
193 unsafe {
194 lpoverlapped.set_len(MAX_WAIT_EVENTS_NUM)
195 }
196 let ret = unsafe {
197 WinSock::ProcessSocketNotifications(inst as isize, 0,
198 std::ptr::null_mut(), wait_msec, MAX_WAIT_EVENTS_NUM as u32,
199 lpoverlapped.as_mut_ptr(), &mut len as *mut u32)
200 };
205 if len==0 || ret!=0 {
206 return None
209 }
210 {
211 let handle=if lpoverlapped[0].lpOverlapped.is_null() {0} else {
212 unsafe { (*lpoverlapped[0].lpOverlapped).hEvent }
213 };
214 println!("[poll]poll event success,ret={},event_count={},ret_ev={},handle={}",ret,len,
215 lpoverlapped[0].dwNumberOfBytesTransferred,handle);
216 }
217 let mut evs:Vec<socket_event_t>=Vec::new();
218 for i in 0..len as usize {
219 let ol=&lpoverlapped[i];
220 let os_ev = ol.dwNumberOfBytesTransferred;
221 let sock_ev = socket_event_t {
222 socket_id:ol.lpCompletionKey as i32,
223 sock_type:SOCKET_TYPE::PROTO_DGRAM,
224 event:osev_to_rsmev(os_ev),
225 };
226 evs.push(sock_ev);
227 }
228 return Some(evs)
229 }
230
231#[cfg(unix)]
232fn poll_wait(fd:RawFdType,wait_msec:u32)->Option<Vec<socket_event_t>> {
233
234 let mut events:Vec<libc::epoll_event> = Vec::with_capacity(MAX_WAIT_EVENTS_NUM);
235 unsafe {
236 events.set_len(MAX_WAIT_EVENTS_NUM)
238 }
239 let ret = unsafe { libc::epoll_wait(fd,events.as_mut_ptr(),MAX_WAIT_EVENTS_NUM as i32,wait_msec as i32) };
240 if ret<=0 {
241 return None
242 }
243 let mut evs:Vec<socket_event_t>=Vec::new();
244 for i in 0..ret as usize {
245 let ev = &events[i];
246 let sock_ev = socket_event_t {
247 socket_id:ev.u64 as i32,
248 sock_type:SOCKET_TYPE::PROTO_DGRAM,
249 event:osev_to_rsmev(ev.events as i32),
250 };
251 evs.push(sock_ev);
252 }
253
254 return Some(evs)
255 }
256
257
258 #[cfg(windows)]
259 fn rsmev_to_osev(ev:u32)->u32 {
260 let mut os_ev = 0u32;
261 if ev & rsm::SOCK_EVENT_READ != 0 {
262 os_ev |= WinSock::SOCK_NOTIFY_REGISTER_EVENT_IN;
263 }
264
265 if ev & rsm::SOCK_EVENT_WRITE != 0 {
266 os_ev |= WinSock::SOCK_NOTIFY_REGISTER_EVENT_OUT;
267 }
268
269 if ev & rsm::SOCK_EVENT_CLOSE != 0 {
270 os_ev |= WinSock::SOCK_NOTIFY_REGISTER_EVENT_HANGUP;
271 }
272
273 return os_ev
274 }
275
276 #[cfg(unix)]
277 fn rsmev_to_osev(ev:u32)->u32 {
278
279 let mut os_ev = 0i32;
280 if ev & SOCK_EVENT_READ != 0 {
281 os_ev |= libc::EPOLLIN;
282 }
283 if ev & SOCK_EVENT_WRITE != 0 {
284 os_ev |= libc::EPOLLOUT;
285 }
286
287 if ev & SOCK_EVENT_CLOSE != 0 {
288 os_ev |= libc::EPOLLRDHUP;
289 }
290 return os_ev as u32
291}
292
293 #[cfg(windows)]
294 fn osev_to_rsmev(ev:u32)->rsm::SOCKET_EVENT {
295
296 let mut rsm_ev = 0u32;
297 if ev & WinSock::SOCK_NOTIFY_EVENT_IN !=0 {
298 rsm_ev |= SOCK_EVENT_READ
299 }
300 if ev & WinSock::SOCK_NOTIFY_EVENT_OUT !=0 {
301 rsm_ev |= SOCK_EVENT_WRITE
302 }
303 if ev & WinSock::SOCK_NOTIFY_EVENT_HANGUP !=0 {
304 rsm_ev |= SOCK_EVENT_CLOSE
305 }
306 if ev & WinSock::SOCK_NOTIFY_EVENT_ERR !=0 {
307 rsm_ev |= SOCK_EVENT_ERR
308 }
309 return rsm_ev
310 }
311
312 #[cfg(unix)]
313 fn osev_to_rsmev(ev:i32)->rsm::SOCKET_EVENT {
314 let mut rsm_ev = 0u32;
315 if ev & libc::EPOLLIN != 0 {
316 rsm_ev |= SOCK_EVENT_READ;
317 }
318 if ev & libc::EPOLLOUT != 0 {
319 rsm_ev |= SOCK_EVENT_WRITE;
320 }
321
322 if ev & libc::EPOLLRDHUP!= 0 {
323 rsm_ev |= SOCK_EVENT_CLOSE;
324 }
325 if ev & libc::EPOLLERR!=0 {
326 rsm_ev |= SOCK_EVENT_ERR
327 }
328
329 return rsm_ev
330}
331
332#[cfg(test)]
333mod tests {
334 use std::net::{SocketAddr};
335 use super::*;
336 use super::socketpool;
337 #[test]
338 fn test_rsm_poll() {
339 socketpool::init_socketpool_data();
340
341 let mut p=Poll::new(128);
342 let addr1=SocketAddr::new("0.0.0.0".parse().unwrap(), 14010);
343 let lis=match TcpListener::new(&addr1, 1024, SOCKET_LB_POLICY::SOCK_LB_ALL_INSTANCE) {
344 Ok(s)=>s,
345 Err(e)=>{
346 println!("Create TCP Listener failed,ret={},local_addr={}",e,addr1);
347 assert!(false);
348 return
349 },
350 };
351 let s1=match socket(SOCKET_ADDRESS_FAMILY::SOCKET_INET, SOCKET_TYPE::PROTO_DGRAM, net_ext::IpProtos::Ip_Proto_UDP) {
352 Ok(s)=>s,
353 Err(e)=>{
354 assert_eq!(e,errcode::RESULT_SUCCESS);
355 return
356 }
357 };
358 let ret = p.register(s1, 20000 as usize, rsm::SOCK_EVENT_READ | rsm::SOCK_EVENT_CLOSE,false);
359 println!("Register event listener,ret={},socket={}",ret,s1);
360 assert_eq!(ret,errcode::RESULT_SUCCESS);
361 }
362}