1use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64pub(crate) mod ffi;
65
66#[cfg(any(target_os = "macos", target_os = "ios"))]
67mod poller_kq;
68#[cfg(target_os = "linux")]
69mod poller_ep;
70
71#[cfg(any(target_os = "macos", target_os = "ios"))]
72pub use poller_kq::Poller;
73#[cfg(target_os = "linux")]
74pub use poller_ep::Poller;
75
76const AF_INET: c_int = 2;
79const SOCK_STREAM: c_int = 1;
80const IPPROTO_TCP: c_int = 6;
81const TCP_NODELAY: c_int = 1;
82const F_GETFL: c_int = 3;
83const F_SETFL: c_int = 4;
84
85#[cfg(target_os = "linux")]
86const SOL_SOCKET: c_int = 1;
87#[cfg(target_os = "linux")]
88const SO_REUSEADDR: c_int = 2;
89#[cfg(target_os = "linux")]
90const SO_REUSEPORT: c_int = 15;
91#[cfg(target_os = "linux")]
92const O_NONBLOCK: c_int = 0x800;
93
94#[cfg(any(target_os = "macos", target_os = "ios"))]
95const SOL_SOCKET: c_int = 0xffff;
96#[cfg(any(target_os = "macos", target_os = "ios"))]
97const SO_REUSEADDR: c_int = 0x0004;
98#[cfg(any(target_os = "macos", target_os = "ios"))]
99const SO_REUSEPORT: c_int = 0x0200;
100#[cfg(any(target_os = "macos", target_os = "ios"))]
101const O_NONBLOCK: c_int = 0x0004;
102
103#[cfg(target_os = "linux")]
106#[repr(C)]
107struct SockaddrIn {
108 sin_family: u16,
109 sin_port: u16,
110 sin_addr: u32,
111 sin_zero: [u8; 8],
112}
113
114#[allow(clippy::struct_field_names)]
117#[cfg(any(target_os = "macos", target_os = "ios"))]
118#[repr(C)]
119struct SockaddrIn {
120 sin_len: u8,
121 sin_family: u8,
122 sin_port: u16,
123 sin_addr: u32,
124 sin_zero: [u8; 8],
125}
126
127impl SockaddrIn {
128 fn new(ip: [u8; 4], port: u16) -> Self {
129 #[cfg(target_os = "linux")]
130 return SockaddrIn {
131 sin_family: AF_INET as u16,
132 sin_port: port.to_be(),
133 sin_addr: u32::from_ne_bytes(ip),
134 sin_zero: [0; 8],
135 };
136 #[cfg(any(target_os = "macos", target_os = "ios"))]
137 return SockaddrIn {
138 sin_len: size_of::<SockaddrIn>() as u8,
139 sin_family: AF_INET as u8,
140 sin_port: port.to_be(),
141 sin_addr: u32::from_ne_bytes(ip),
142 sin_zero: [0; 8],
143 };
144 }
145
146 fn zeroed() -> Self {
147 unsafe { core::mem::zeroed() }
148 }
149}
150
151pub struct Socket {
155 fd: c_int,
156}
157
158impl Socket {
159 #[inline]
161 pub fn raw(&self) -> i32 {
162 self.fd
163 }
164
165 #[inline]
171 pub unsafe fn from_raw_fd(fd: i32) -> Socket {
172 Socket { fd }
173 }
174
175 pub fn accept(&self) -> io::Result<Socket> {
178 let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
179 if fd < 0 {
180 return Err(io::Error::last_os_error());
181 }
182 Ok(Socket { fd })
183 }
184
185 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
188 loop {
189 let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
190 if n < 0 {
191 let e = io::Error::last_os_error();
192 if e.kind() == io::ErrorKind::Interrupted {
193 continue;
194 }
195 return Err(e);
196 }
197 return Ok(n as usize);
198 }
199 }
200
201 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
204 loop {
205 let n = unsafe { ffi::write(self.fd, buf.as_ptr().cast::<c_void>(), buf.len()) };
206 if n < 0 {
207 let e = io::Error::last_os_error();
208 if e.kind() == io::ErrorKind::Interrupted {
209 continue;
210 }
211 return Err(e);
212 }
213 return Ok(n as usize);
214 }
215 }
216
217 pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
219 while !buf.is_empty() {
220 let n = self.write(buf)?;
221 if n == 0 {
222 return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
223 }
224 buf = &buf[n..];
225 }
226 Ok(())
227 }
228
229 pub fn set_nonblocking(&self) -> io::Result<()> {
231 set_fd_nonblocking(self.fd)
232 }
233
234 pub fn set_nodelay(&self) -> io::Result<()> {
236 let one: c_int = 1;
237 let r = unsafe {
238 ffi::setsockopt(
239 self.fd,
240 IPPROTO_TCP,
241 TCP_NODELAY,
242 (&raw const one).cast::<c_void>(),
243 size_of::<c_int>() as u32,
244 )
245 };
246 if r < 0 {
247 return Err(io::Error::last_os_error());
248 }
249 Ok(())
250 }
251
252 pub fn local_port(&self) -> io::Result<u16> {
254 let mut addr = SockaddrIn::zeroed();
255 let mut len = size_of::<SockaddrIn>() as u32;
256 let r = unsafe {
257 ffi::getsockname(
258 self.fd,
259 (&raw mut addr).cast::<c_void>(),
260 &raw mut len,
261 )
262 };
263 if r < 0 {
264 return Err(io::Error::last_os_error());
265 }
266 Ok(u16::from_be(addr.sin_port))
267 }
268
269 pub fn peer_addr(&self) -> io::Result<(std::net::Ipv4Addr, u16)> {
274 let mut addr = SockaddrIn::zeroed();
275 let mut len = size_of::<SockaddrIn>() as u32;
276 let r = unsafe {
277 ffi::getpeername(
278 self.fd,
279 (&raw mut addr).cast::<c_void>(),
280 &raw mut len,
281 )
282 };
283 if r < 0 {
284 return Err(io::Error::last_os_error());
285 }
286 let octets = addr.sin_addr.to_ne_bytes();
291 Ok((std::net::Ipv4Addr::from(octets), u16::from_be(addr.sin_port)))
292 }
293}
294
295impl Drop for Socket {
296 fn drop(&mut self) {
297 unsafe {
298 ffi::close(self.fd);
299 }
300 }
301}
302
303fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
305 let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
306 if flags < 0 {
307 return Err(io::Error::last_os_error());
308 }
309 if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
310 return Err(io::Error::last_os_error());
311 }
312 Ok(())
313}
314
315fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
316 let r = unsafe {
317 ffi::setsockopt(
318 fd,
319 level,
320 name,
321 (&raw const val).cast::<c_void>(),
322 size_of::<c_int>() as u32,
323 )
324 };
325 if r < 0 {
326 return Err(io::Error::last_os_error());
327 }
328 Ok(())
329}
330
331fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
332 let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
333 if fd < 0 {
334 return Err(io::Error::last_os_error());
335 }
336 let sock = Socket { fd }; setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
339 if reuseport {
340 setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
343 }
344
345 let addr = SockaddrIn::new(ip, port);
346 let r = unsafe {
347 ffi::bind(
348 fd,
349 (&raw const addr).cast::<c_void>(),
350 size_of::<SockaddrIn>() as u32,
351 )
352 };
353 if r < 0 {
354 return Err(io::Error::last_os_error());
355 }
356 if unsafe { ffi::listen(fd, backlog) } < 0 {
357 return Err(io::Error::last_os_error());
358 }
359 Ok(sock)
360}
361
362pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
365 listen_inner(ip, port, backlog, false)
366}
367
368pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
371 listen_inner(ip, port, backlog, true)
372}
373
374pub struct Waker {
378 read_fd: c_int,
379 write_fd: c_int,
380}
381
382pub fn waker() -> io::Result<Waker> {
384 let mut fds = [0 as c_int; 2];
385 if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
386 return Err(io::Error::last_os_error());
387 }
388 let w = Waker {
389 read_fd: fds[0],
390 write_fd: fds[1],
391 };
392 set_fd_nonblocking(w.read_fd)?;
393 set_fd_nonblocking(w.write_fd)?;
394 Ok(w)
395}
396
397impl Waker {
398 #[inline]
400 pub fn read_fd(&self) -> i32 {
401 self.read_fd
402 }
403
404 pub fn wake(&self) -> io::Result<()> {
406 let byte = [1u8];
407 loop {
408 let n = unsafe { ffi::write(self.write_fd, byte.as_ptr().cast::<c_void>(), 1) };
409 if n < 0 {
410 let e = io::Error::last_os_error();
411 match e.kind() {
412 io::ErrorKind::Interrupted => continue,
413 io::ErrorKind::WouldBlock => return Ok(()),
414 _ => return Err(e),
415 }
416 }
417 return Ok(());
418 }
419 }
420
421 pub fn drain(&self) {
423 let mut buf = [0u8; 64];
424 loop {
425 let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
426 if n <= 0 {
427 break; }
429 }
430 }
431}
432
433impl Drop for Waker {
434 fn drop(&mut self) {
435 unsafe {
436 ffi::close(self.read_fd);
437 ffi::close(self.write_fd);
438 }
439 }
440}
441
442unsafe impl Send for Waker {}
444unsafe impl Sync for Waker {}
445
446#[derive(Debug, Clone, Copy)]
450pub struct Event {
451 pub fd: i32,
452 pub readable: bool,
453 pub writable: bool,
454 pub hup: bool,
456}
457
458const WAIT_CAPACITY: usize = 1024;
460
461
462
463#[cfg(test)]
464mod tests;