1use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64pub(crate) mod ffi;
65
66#[cfg(any(target_os = "macos", target_os = "ios"))]
67mod poller_kq;
68#[cfg(target_os = "linux")]
69mod poller_ep;
70
71#[cfg(any(target_os = "macos", target_os = "ios"))]
72pub use poller_kq::Poller;
73#[cfg(target_os = "linux")]
74pub use poller_ep::Poller;
75
76const AF_INET: c_int = 2;
79const SOCK_STREAM: c_int = 1;
80const IPPROTO_TCP: c_int = 6;
81const TCP_NODELAY: c_int = 1;
82const F_GETFL: c_int = 3;
83const F_SETFL: c_int = 4;
84
85#[cfg(target_os = "linux")]
86const SOL_SOCKET: c_int = 1;
87#[cfg(target_os = "linux")]
88const SO_REUSEADDR: c_int = 2;
89#[cfg(target_os = "linux")]
90const SO_REUSEPORT: c_int = 15;
91#[cfg(target_os = "linux")]
92const O_NONBLOCK: c_int = 0x800;
93
94#[cfg(any(target_os = "macos", target_os = "ios"))]
95const SOL_SOCKET: c_int = 0xffff;
96#[cfg(any(target_os = "macos", target_os = "ios"))]
97const SO_REUSEADDR: c_int = 0x0004;
98#[cfg(any(target_os = "macos", target_os = "ios"))]
99const SO_REUSEPORT: c_int = 0x0200;
100#[cfg(any(target_os = "macos", target_os = "ios"))]
101const O_NONBLOCK: c_int = 0x0004;
102
103#[cfg(target_os = "linux")]
106#[repr(C)]
107struct SockaddrIn {
108 sin_family: u16,
109 sin_port: u16,
110 sin_addr: u32,
111 sin_zero: [u8; 8],
112}
113
114#[cfg(any(target_os = "macos", target_os = "ios"))]
115#[repr(C)]
116struct SockaddrIn {
117 sin_len: u8,
118 sin_family: u8,
119 sin_port: u16,
120 sin_addr: u32,
121 sin_zero: [u8; 8],
122}
123
124impl SockaddrIn {
125 fn new(ip: [u8; 4], port: u16) -> Self {
126 #[cfg(target_os = "linux")]
127 return SockaddrIn {
128 sin_family: AF_INET as u16,
129 sin_port: port.to_be(),
130 sin_addr: u32::from_ne_bytes(ip),
131 sin_zero: [0; 8],
132 };
133 #[cfg(any(target_os = "macos", target_os = "ios"))]
134 return SockaddrIn {
135 sin_len: size_of::<SockaddrIn>() as u8,
136 sin_family: AF_INET as u8,
137 sin_port: port.to_be(),
138 sin_addr: u32::from_ne_bytes(ip),
139 sin_zero: [0; 8],
140 };
141 }
142
143 fn zeroed() -> Self {
144 unsafe { core::mem::zeroed() }
145 }
146}
147
148pub struct Socket {
152 fd: c_int,
153}
154
155impl Socket {
156 #[inline]
158 pub fn raw(&self) -> i32 {
159 self.fd
160 }
161
162 #[inline]
168 pub unsafe fn from_raw_fd(fd: i32) -> Socket {
169 Socket { fd }
170 }
171
172 pub fn accept(&self) -> io::Result<Socket> {
175 let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
176 if fd < 0 {
177 return Err(io::Error::last_os_error());
178 }
179 Ok(Socket { fd })
180 }
181
182 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
185 loop {
186 let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
187 if n < 0 {
188 let e = io::Error::last_os_error();
189 if e.kind() == io::ErrorKind::Interrupted {
190 continue;
191 }
192 return Err(e);
193 }
194 return Ok(n as usize);
195 }
196 }
197
198 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
201 loop {
202 let n = unsafe { ffi::write(self.fd, buf.as_ptr() as *const c_void, buf.len()) };
203 if n < 0 {
204 let e = io::Error::last_os_error();
205 if e.kind() == io::ErrorKind::Interrupted {
206 continue;
207 }
208 return Err(e);
209 }
210 return Ok(n as usize);
211 }
212 }
213
214 pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
216 while !buf.is_empty() {
217 let n = self.write(buf)?;
218 if n == 0 {
219 return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
220 }
221 buf = &buf[n..];
222 }
223 Ok(())
224 }
225
226 pub fn set_nonblocking(&self) -> io::Result<()> {
228 set_fd_nonblocking(self.fd)
229 }
230
231 pub fn set_nodelay(&self) -> io::Result<()> {
233 let one: c_int = 1;
234 let r = unsafe {
235 ffi::setsockopt(
236 self.fd,
237 IPPROTO_TCP,
238 TCP_NODELAY,
239 &one as *const c_int as *const c_void,
240 size_of::<c_int>() as u32,
241 )
242 };
243 if r < 0 {
244 return Err(io::Error::last_os_error());
245 }
246 Ok(())
247 }
248
249 pub fn local_port(&self) -> io::Result<u16> {
251 let mut addr = SockaddrIn::zeroed();
252 let mut len = size_of::<SockaddrIn>() as u32;
253 let r = unsafe {
254 ffi::getsockname(
255 self.fd,
256 &mut addr as *mut SockaddrIn as *mut c_void,
257 &mut len,
258 )
259 };
260 if r < 0 {
261 return Err(io::Error::last_os_error());
262 }
263 Ok(u16::from_be(addr.sin_port))
264 }
265}
266
267impl Drop for Socket {
268 fn drop(&mut self) {
269 unsafe {
270 ffi::close(self.fd);
271 }
272 }
273}
274
275fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
277 let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
278 if flags < 0 {
279 return Err(io::Error::last_os_error());
280 }
281 if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
282 return Err(io::Error::last_os_error());
283 }
284 Ok(())
285}
286
287fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
288 let r = unsafe {
289 ffi::setsockopt(
290 fd,
291 level,
292 name,
293 &val as *const c_int as *const c_void,
294 size_of::<c_int>() as u32,
295 )
296 };
297 if r < 0 {
298 return Err(io::Error::last_os_error());
299 }
300 Ok(())
301}
302
303fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
304 let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
305 if fd < 0 {
306 return Err(io::Error::last_os_error());
307 }
308 let sock = Socket { fd }; setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
311 if reuseport {
312 setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
315 }
316
317 let addr = SockaddrIn::new(ip, port);
318 let r = unsafe {
319 ffi::bind(
320 fd,
321 &addr as *const SockaddrIn as *const c_void,
322 size_of::<SockaddrIn>() as u32,
323 )
324 };
325 if r < 0 {
326 return Err(io::Error::last_os_error());
327 }
328 if unsafe { ffi::listen(fd, backlog) } < 0 {
329 return Err(io::Error::last_os_error());
330 }
331 Ok(sock)
332}
333
334pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
337 listen_inner(ip, port, backlog, false)
338}
339
340pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
343 listen_inner(ip, port, backlog, true)
344}
345
346pub struct Waker {
350 read_fd: c_int,
351 write_fd: c_int,
352}
353
354pub fn waker() -> io::Result<Waker> {
356 let mut fds = [0 as c_int; 2];
357 if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
358 return Err(io::Error::last_os_error());
359 }
360 let w = Waker {
361 read_fd: fds[0],
362 write_fd: fds[1],
363 };
364 set_fd_nonblocking(w.read_fd)?;
365 set_fd_nonblocking(w.write_fd)?;
366 Ok(w)
367}
368
369impl Waker {
370 #[inline]
372 pub fn read_fd(&self) -> i32 {
373 self.read_fd
374 }
375
376 pub fn wake(&self) -> io::Result<()> {
378 let byte = [1u8];
379 loop {
380 let n = unsafe { ffi::write(self.write_fd, byte.as_ptr() as *const c_void, 1) };
381 if n < 0 {
382 let e = io::Error::last_os_error();
383 match e.kind() {
384 io::ErrorKind::Interrupted => continue,
385 io::ErrorKind::WouldBlock => return Ok(()),
386 _ => return Err(e),
387 }
388 }
389 return Ok(());
390 }
391 }
392
393 pub fn drain(&self) {
395 let mut buf = [0u8; 64];
396 loop {
397 let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
398 if n <= 0 {
399 break; }
401 }
402 }
403}
404
405impl Drop for Waker {
406 fn drop(&mut self) {
407 unsafe {
408 ffi::close(self.read_fd);
409 ffi::close(self.write_fd);
410 }
411 }
412}
413
414unsafe impl Send for Waker {}
416unsafe impl Sync for Waker {}
417
418#[derive(Debug, Clone, Copy)]
422pub struct Event {
423 pub fd: i32,
424 pub readable: bool,
425 pub writable: bool,
426 pub hup: bool,
428}
429
430const WAIT_CAPACITY: usize = 1024;
432
433
434
435#[cfg(test)]
436mod tests;