1use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64pub(crate) mod ffi;
65
66#[cfg(any(target_os = "macos", target_os = "ios"))]
67mod poller_kq;
68#[cfg(target_os = "linux")]
69mod poller_ep;
70
71#[cfg(any(target_os = "macos", target_os = "ios"))]
72pub use poller_kq::Poller;
73#[cfg(target_os = "linux")]
74pub use poller_ep::Poller;
75
76const AF_INET: c_int = 2;
79const AF_UNIX: c_int = 1;
80const SOCK_STREAM: c_int = 1;
81const IPPROTO_TCP: c_int = 6;
82const TCP_NODELAY: c_int = 1;
83const F_GETFL: c_int = 3;
84const F_SETFL: c_int = 4;
85
86#[cfg(target_os = "linux")]
87const SOL_SOCKET: c_int = 1;
88#[cfg(target_os = "linux")]
89const SO_REUSEADDR: c_int = 2;
90#[cfg(target_os = "linux")]
91const SO_REUSEPORT: c_int = 15;
92#[cfg(target_os = "linux")]
93const O_NONBLOCK: c_int = 0x800;
94
95#[cfg(any(target_os = "macos", target_os = "ios"))]
96const SOL_SOCKET: c_int = 0xffff;
97#[cfg(any(target_os = "macos", target_os = "ios"))]
98const SO_REUSEADDR: c_int = 0x0004;
99#[cfg(any(target_os = "macos", target_os = "ios"))]
100const SO_REUSEPORT: c_int = 0x0200;
101#[cfg(any(target_os = "macos", target_os = "ios"))]
102const O_NONBLOCK: c_int = 0x0004;
103
104#[cfg(target_os = "linux")]
107#[repr(C)]
108struct SockaddrIn {
109 sin_family: u16,
110 sin_port: u16,
111 sin_addr: u32,
112 sin_zero: [u8; 8],
113}
114
115#[allow(clippy::struct_field_names)]
118#[cfg(any(target_os = "macos", target_os = "ios"))]
119#[repr(C)]
120struct SockaddrIn {
121 sin_len: u8,
122 sin_family: u8,
123 sin_port: u16,
124 sin_addr: u32,
125 sin_zero: [u8; 8],
126}
127
128impl SockaddrIn {
129 fn new(ip: [u8; 4], port: u16) -> Self {
130 #[cfg(target_os = "linux")]
131 return SockaddrIn {
132 sin_family: AF_INET as u16,
133 sin_port: port.to_be(),
134 sin_addr: u32::from_ne_bytes(ip),
135 sin_zero: [0; 8],
136 };
137 #[cfg(any(target_os = "macos", target_os = "ios"))]
138 return SockaddrIn {
139 sin_len: size_of::<SockaddrIn>() as u8,
140 sin_family: AF_INET as u8,
141 sin_port: port.to_be(),
142 sin_addr: u32::from_ne_bytes(ip),
143 sin_zero: [0; 8],
144 };
145 }
146
147 fn zeroed() -> Self {
148 unsafe { core::mem::zeroed() }
149 }
150}
151
152pub struct Socket {
156 fd: c_int,
157}
158
159impl Socket {
160 #[inline]
162 pub fn raw(&self) -> i32 {
163 self.fd
164 }
165
166 #[inline]
172 pub unsafe fn from_raw_fd(fd: i32) -> Socket {
173 Socket { fd }
174 }
175
176 pub fn accept(&self) -> io::Result<Socket> {
179 let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
180 if fd < 0 {
181 return Err(io::Error::last_os_error());
182 }
183 Ok(Socket { fd })
184 }
185
186 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
189 loop {
190 let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
191 if n < 0 {
192 let e = io::Error::last_os_error();
193 if e.kind() == io::ErrorKind::Interrupted {
194 continue;
195 }
196 return Err(e);
197 }
198 return Ok(n as usize);
199 }
200 }
201
202 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
205 loop {
206 let n = unsafe { ffi::write(self.fd, buf.as_ptr().cast::<c_void>(), buf.len()) };
207 if n < 0 {
208 let e = io::Error::last_os_error();
209 if e.kind() == io::ErrorKind::Interrupted {
210 continue;
211 }
212 return Err(e);
213 }
214 return Ok(n as usize);
215 }
216 }
217
218 pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
220 while !buf.is_empty() {
221 let n = self.write(buf)?;
222 if n == 0 {
223 return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
224 }
225 buf = &buf[n..];
226 }
227 Ok(())
228 }
229
230 pub fn set_nonblocking(&self) -> io::Result<()> {
232 set_fd_nonblocking(self.fd)
233 }
234
235 pub fn set_nodelay(&self) -> io::Result<()> {
237 let one: c_int = 1;
238 let r = unsafe {
239 ffi::setsockopt(
240 self.fd,
241 IPPROTO_TCP,
242 TCP_NODELAY,
243 (&raw const one).cast::<c_void>(),
244 size_of::<c_int>() as u32,
245 )
246 };
247 if r < 0 {
248 return Err(io::Error::last_os_error());
249 }
250 Ok(())
251 }
252
253 pub fn local_port(&self) -> io::Result<u16> {
255 let mut addr = SockaddrIn::zeroed();
256 let mut len = size_of::<SockaddrIn>() as u32;
257 let r = unsafe {
258 ffi::getsockname(
259 self.fd,
260 (&raw mut addr).cast::<c_void>(),
261 &raw mut len,
262 )
263 };
264 if r < 0 {
265 return Err(io::Error::last_os_error());
266 }
267 Ok(u16::from_be(addr.sin_port))
268 }
269
270 pub fn peer_addr(&self) -> io::Result<(std::net::Ipv4Addr, u16)> {
275 let mut addr = SockaddrIn::zeroed();
276 let mut len = size_of::<SockaddrIn>() as u32;
277 let r = unsafe {
278 ffi::getpeername(
279 self.fd,
280 (&raw mut addr).cast::<c_void>(),
281 &raw mut len,
282 )
283 };
284 if r < 0 {
285 return Err(io::Error::last_os_error());
286 }
287 let octets = addr.sin_addr.to_ne_bytes();
292 Ok((std::net::Ipv4Addr::from(octets), u16::from_be(addr.sin_port)))
293 }
294}
295
296impl Drop for Socket {
297 fn drop(&mut self) {
298 unsafe {
299 ffi::close(self.fd);
300 }
301 }
302}
303
304fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
306 let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
307 if flags < 0 {
308 return Err(io::Error::last_os_error());
309 }
310 if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
311 return Err(io::Error::last_os_error());
312 }
313 Ok(())
314}
315
316fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
317 let r = unsafe {
318 ffi::setsockopt(
319 fd,
320 level,
321 name,
322 (&raw const val).cast::<c_void>(),
323 size_of::<c_int>() as u32,
324 )
325 };
326 if r < 0 {
327 return Err(io::Error::last_os_error());
328 }
329 Ok(())
330}
331
332fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
333 let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
334 if fd < 0 {
335 return Err(io::Error::last_os_error());
336 }
337 let sock = Socket { fd }; setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
340 if reuseport {
341 setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
344 }
345
346 let addr = SockaddrIn::new(ip, port);
347 let r = unsafe {
348 ffi::bind(
349 fd,
350 (&raw const addr).cast::<c_void>(),
351 size_of::<SockaddrIn>() as u32,
352 )
353 };
354 if r < 0 {
355 return Err(io::Error::last_os_error());
356 }
357 if unsafe { ffi::listen(fd, backlog) } < 0 {
358 return Err(io::Error::last_os_error());
359 }
360 Ok(sock)
361}
362
363pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
366 listen_inner(ip, port, backlog, false)
367}
368
369pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
372 listen_inner(ip, port, backlog, true)
373}
374
375#[repr(C)]
379struct SockaddrUn {
380 #[cfg(target_os = "linux")]
381 sun_family: u16,
382 #[cfg(not(target_os = "linux"))]
383 sun_len: u8,
384 #[cfg(not(target_os = "linux"))]
385 sun_family: u8,
386 sun_path: [u8; 108],
387}
388
389impl SockaddrUn {
390 fn new(path: &[u8]) -> io::Result<(Self, u32)> {
391 if path.is_empty() || path.len() >= 108 {
392 return Err(io::Error::new(
393 io::ErrorKind::InvalidInput,
394 "unix socket path must be 1..=107 bytes",
395 ));
396 }
397 let mut sun_path = [0u8; 108];
398 sun_path[..path.len()].copy_from_slice(path);
399 let sa = SockaddrUn {
402 #[cfg(target_os = "linux")]
403 sun_family: AF_UNIX as u16,
404 #[cfg(not(target_os = "linux"))]
405 sun_len: size_of::<SockaddrUn>() as u8,
406 #[cfg(not(target_os = "linux"))]
407 sun_family: AF_UNIX as u8,
408 sun_path,
409 };
410 Ok((sa, size_of::<SockaddrUn>() as u32))
411 }
412}
413
414pub fn unix_listen(path: &[u8], backlog: i32) -> io::Result<Socket> {
419 if let Ok(c) = std::ffi::CString::new(path) {
422 unsafe {
423 ffi::unlink(c.as_ptr());
424 }
425 }
426
427 let fd = unsafe { ffi::socket(AF_UNIX, SOCK_STREAM, 0) };
428 if fd < 0 {
429 return Err(io::Error::last_os_error());
430 }
431 let sock = Socket { fd };
432
433 let (addr, len) = SockaddrUn::new(path)?;
434 let r = unsafe {
435 ffi::bind(
436 fd,
437 (&raw const addr).cast::<c_void>(),
438 len,
439 )
440 };
441 if r < 0 {
442 return Err(io::Error::last_os_error());
443 }
444 if unsafe { ffi::listen(fd, backlog) } < 0 {
445 return Err(io::Error::last_os_error());
446 }
447 if let Ok(c) = std::ffi::CString::new(path) {
450 unsafe { ffi::chmod(c.as_ptr(), 0o777) };
451 }
452 Ok(sock)
453}
454
455pub const SIGTERM: c_int = 15;
457pub const SIGINT: c_int = 2;
459
460pub fn install_signal_handler(signum: c_int, handler: extern "C" fn(c_int)) {
465 unsafe {
468 ffi::signal(signum, handler);
469 }
470}
471
472pub struct Waker {
476 read_fd: c_int,
477 write_fd: c_int,
478}
479
480pub fn waker() -> io::Result<Waker> {
482 let mut fds = [0 as c_int; 2];
483 if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
484 return Err(io::Error::last_os_error());
485 }
486 let w = Waker {
487 read_fd: fds[0],
488 write_fd: fds[1],
489 };
490 set_fd_nonblocking(w.read_fd)?;
491 set_fd_nonblocking(w.write_fd)?;
492 Ok(w)
493}
494
495impl Waker {
496 #[inline]
498 pub fn read_fd(&self) -> i32 {
499 self.read_fd
500 }
501
502 pub fn wake(&self) -> io::Result<()> {
504 let byte = [1u8];
505 loop {
506 let n = unsafe { ffi::write(self.write_fd, byte.as_ptr().cast::<c_void>(), 1) };
507 if n < 0 {
508 let e = io::Error::last_os_error();
509 match e.kind() {
510 io::ErrorKind::Interrupted => continue,
511 io::ErrorKind::WouldBlock => return Ok(()),
512 _ => return Err(e),
513 }
514 }
515 return Ok(());
516 }
517 }
518
519 pub fn drain(&self) {
521 let mut buf = [0u8; 64];
522 loop {
523 let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
524 if n <= 0 {
525 break; }
527 }
528 }
529}
530
531impl Drop for Waker {
532 fn drop(&mut self) {
533 unsafe {
534 ffi::close(self.read_fd);
535 ffi::close(self.write_fd);
536 }
537 }
538}
539
540unsafe impl Send for Waker {}
542unsafe impl Sync for Waker {}
543
544#[derive(Debug, Clone, Copy)]
548pub struct Event {
549 pub fd: i32,
550 pub readable: bool,
551 pub writable: bool,
552 pub hup: bool,
554}
555
556const WAIT_CAPACITY: usize = 1024;
558
559
560
561#[cfg(test)]
562mod tests;