1use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64mod ffi {
65 use core::ffi::{c_int, c_void};
66
67 unsafe extern "C" {
69 pub fn socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int;
70 pub fn setsockopt(
71 fd: c_int,
72 level: c_int,
73 optname: c_int,
74 optval: *const c_void,
75 optlen: u32,
76 ) -> c_int;
77 pub fn bind(fd: c_int, addr: *const c_void, addrlen: u32) -> c_int;
78 pub fn listen(fd: c_int, backlog: c_int) -> c_int;
79 pub fn accept(fd: c_int, addr: *mut c_void, addrlen: *mut u32) -> c_int;
80 pub fn getsockname(fd: c_int, addr: *mut c_void, addrlen: *mut u32) -> c_int;
81 pub fn read(fd: c_int, buf: *mut c_void, count: usize) -> isize;
82 pub fn write(fd: c_int, buf: *const c_void, count: usize) -> isize;
83 pub fn close(fd: c_int) -> c_int;
84 pub fn fcntl(fd: c_int, cmd: c_int, ...) -> c_int;
86 pub fn pipe(fds: *mut c_int) -> c_int;
87 }
88
89 #[cfg(any(target_os = "macos", target_os = "ios"))]
91 #[repr(C)]
92 pub struct Timespec {
93 pub tv_sec: isize,
94 pub tv_nsec: isize,
95 }
96
97 #[cfg(any(target_os = "macos", target_os = "ios"))]
98 #[repr(C)]
99 pub struct Kevent {
100 pub ident: usize,
101 pub filter: i16,
102 pub flags: u16,
103 pub fflags: u32,
104 pub data: isize,
105 pub udata: usize, }
107
108 #[cfg(any(target_os = "macos", target_os = "ios"))]
109 unsafe extern "C" {
110 pub fn kqueue() -> c_int;
111 pub fn kevent(
112 kq: c_int,
113 changelist: *const Kevent,
114 nchanges: c_int,
115 eventlist: *mut Kevent,
116 nevents: c_int,
117 timeout: *const Timespec,
118 ) -> c_int;
119 }
120
121 #[cfg(target_os = "linux")]
125 #[repr(C)]
126 #[cfg_attr(target_arch = "x86_64", repr(packed))]
127 pub struct EpollEvent {
128 pub events: u32,
129 pub data: u64,
130 }
131
132 #[cfg(target_os = "linux")]
133 unsafe extern "C" {
134 pub fn epoll_create1(flags: c_int) -> c_int;
135 pub fn epoll_ctl(epfd: c_int, op: c_int, fd: c_int, event: *mut EpollEvent) -> c_int;
136 pub fn epoll_wait(
137 epfd: c_int,
138 events: *mut EpollEvent,
139 maxevents: c_int,
140 timeout: c_int,
141 ) -> c_int;
142 }
143
144}
145
146const AF_INET: c_int = 2;
149const SOCK_STREAM: c_int = 1;
150const IPPROTO_TCP: c_int = 6;
151const TCP_NODELAY: c_int = 1;
152const F_GETFL: c_int = 3;
153const F_SETFL: c_int = 4;
154
155#[cfg(target_os = "linux")]
156const SOL_SOCKET: c_int = 1;
157#[cfg(target_os = "linux")]
158const SO_REUSEADDR: c_int = 2;
159#[cfg(target_os = "linux")]
160const SO_REUSEPORT: c_int = 15;
161#[cfg(target_os = "linux")]
162const O_NONBLOCK: c_int = 0x800;
163
164#[cfg(any(target_os = "macos", target_os = "ios"))]
165const SOL_SOCKET: c_int = 0xffff;
166#[cfg(any(target_os = "macos", target_os = "ios"))]
167const SO_REUSEADDR: c_int = 0x0004;
168#[cfg(any(target_os = "macos", target_os = "ios"))]
169const SO_REUSEPORT: c_int = 0x0200;
170#[cfg(any(target_os = "macos", target_os = "ios"))]
171const O_NONBLOCK: c_int = 0x0004;
172
173#[cfg(target_os = "linux")]
176#[repr(C)]
177struct SockaddrIn {
178 sin_family: u16,
179 sin_port: u16,
180 sin_addr: u32,
181 sin_zero: [u8; 8],
182}
183
184#[cfg(any(target_os = "macos", target_os = "ios"))]
185#[repr(C)]
186struct SockaddrIn {
187 sin_len: u8,
188 sin_family: u8,
189 sin_port: u16,
190 sin_addr: u32,
191 sin_zero: [u8; 8],
192}
193
194impl SockaddrIn {
195 fn new(ip: [u8; 4], port: u16) -> Self {
196 #[cfg(target_os = "linux")]
197 return SockaddrIn {
198 sin_family: AF_INET as u16,
199 sin_port: port.to_be(),
200 sin_addr: u32::from_ne_bytes(ip),
201 sin_zero: [0; 8],
202 };
203 #[cfg(any(target_os = "macos", target_os = "ios"))]
204 return SockaddrIn {
205 sin_len: size_of::<SockaddrIn>() as u8,
206 sin_family: AF_INET as u8,
207 sin_port: port.to_be(),
208 sin_addr: u32::from_ne_bytes(ip),
209 sin_zero: [0; 8],
210 };
211 }
212
213 fn zeroed() -> Self {
214 unsafe { core::mem::zeroed() }
215 }
216}
217
218pub struct Socket {
222 fd: c_int,
223}
224
225impl Socket {
226 #[inline]
228 pub fn raw(&self) -> i32 {
229 self.fd
230 }
231
232 #[inline]
238 pub unsafe fn from_raw_fd(fd: i32) -> Socket {
239 Socket { fd }
240 }
241
242 pub fn accept(&self) -> io::Result<Socket> {
245 let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
246 if fd < 0 {
247 return Err(io::Error::last_os_error());
248 }
249 Ok(Socket { fd })
250 }
251
252 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
255 loop {
256 let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
257 if n < 0 {
258 let e = io::Error::last_os_error();
259 if e.kind() == io::ErrorKind::Interrupted {
260 continue;
261 }
262 return Err(e);
263 }
264 return Ok(n as usize);
265 }
266 }
267
268 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
271 loop {
272 let n = unsafe { ffi::write(self.fd, buf.as_ptr() as *const c_void, buf.len()) };
273 if n < 0 {
274 let e = io::Error::last_os_error();
275 if e.kind() == io::ErrorKind::Interrupted {
276 continue;
277 }
278 return Err(e);
279 }
280 return Ok(n as usize);
281 }
282 }
283
284 pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
286 while !buf.is_empty() {
287 let n = self.write(buf)?;
288 if n == 0 {
289 return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
290 }
291 buf = &buf[n..];
292 }
293 Ok(())
294 }
295
296 pub fn set_nonblocking(&self) -> io::Result<()> {
298 set_fd_nonblocking(self.fd)
299 }
300
301 pub fn set_nodelay(&self) -> io::Result<()> {
303 let one: c_int = 1;
304 let r = unsafe {
305 ffi::setsockopt(
306 self.fd,
307 IPPROTO_TCP,
308 TCP_NODELAY,
309 &one as *const c_int as *const c_void,
310 size_of::<c_int>() as u32,
311 )
312 };
313 if r < 0 {
314 return Err(io::Error::last_os_error());
315 }
316 Ok(())
317 }
318
319 pub fn local_port(&self) -> io::Result<u16> {
321 let mut addr = SockaddrIn::zeroed();
322 let mut len = size_of::<SockaddrIn>() as u32;
323 let r = unsafe {
324 ffi::getsockname(
325 self.fd,
326 &mut addr as *mut SockaddrIn as *mut c_void,
327 &mut len,
328 )
329 };
330 if r < 0 {
331 return Err(io::Error::last_os_error());
332 }
333 Ok(u16::from_be(addr.sin_port))
334 }
335}
336
337impl Drop for Socket {
338 fn drop(&mut self) {
339 unsafe {
340 ffi::close(self.fd);
341 }
342 }
343}
344
345fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
347 let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
348 if flags < 0 {
349 return Err(io::Error::last_os_error());
350 }
351 if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
352 return Err(io::Error::last_os_error());
353 }
354 Ok(())
355}
356
357fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
358 let r = unsafe {
359 ffi::setsockopt(
360 fd,
361 level,
362 name,
363 &val as *const c_int as *const c_void,
364 size_of::<c_int>() as u32,
365 )
366 };
367 if r < 0 {
368 return Err(io::Error::last_os_error());
369 }
370 Ok(())
371}
372
373fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
374 let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
375 if fd < 0 {
376 return Err(io::Error::last_os_error());
377 }
378 let sock = Socket { fd }; setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
381 if reuseport {
382 setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
385 }
386
387 let addr = SockaddrIn::new(ip, port);
388 let r = unsafe {
389 ffi::bind(
390 fd,
391 &addr as *const SockaddrIn as *const c_void,
392 size_of::<SockaddrIn>() as u32,
393 )
394 };
395 if r < 0 {
396 return Err(io::Error::last_os_error());
397 }
398 if unsafe { ffi::listen(fd, backlog) } < 0 {
399 return Err(io::Error::last_os_error());
400 }
401 Ok(sock)
402}
403
404pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
407 listen_inner(ip, port, backlog, false)
408}
409
410pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
413 listen_inner(ip, port, backlog, true)
414}
415
416pub struct Waker {
420 read_fd: c_int,
421 write_fd: c_int,
422}
423
424pub fn waker() -> io::Result<Waker> {
426 let mut fds = [0 as c_int; 2];
427 if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
428 return Err(io::Error::last_os_error());
429 }
430 let w = Waker {
431 read_fd: fds[0],
432 write_fd: fds[1],
433 };
434 set_fd_nonblocking(w.read_fd)?;
435 set_fd_nonblocking(w.write_fd)?;
436 Ok(w)
437}
438
439impl Waker {
440 #[inline]
442 pub fn read_fd(&self) -> i32 {
443 self.read_fd
444 }
445
446 pub fn wake(&self) -> io::Result<()> {
448 let byte = [1u8];
449 loop {
450 let n = unsafe { ffi::write(self.write_fd, byte.as_ptr() as *const c_void, 1) };
451 if n < 0 {
452 let e = io::Error::last_os_error();
453 match e.kind() {
454 io::ErrorKind::Interrupted => continue,
455 io::ErrorKind::WouldBlock => return Ok(()),
456 _ => return Err(e),
457 }
458 }
459 return Ok(());
460 }
461 }
462
463 pub fn drain(&self) {
465 let mut buf = [0u8; 64];
466 loop {
467 let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
468 if n <= 0 {
469 break; }
471 }
472 }
473}
474
475impl Drop for Waker {
476 fn drop(&mut self) {
477 unsafe {
478 ffi::close(self.read_fd);
479 ffi::close(self.write_fd);
480 }
481 }
482}
483
484unsafe impl Send for Waker {}
486unsafe impl Sync for Waker {}
487
488#[derive(Debug, Clone, Copy)]
492pub struct Event {
493 pub fd: i32,
494 pub readable: bool,
495 pub writable: bool,
496 pub hup: bool,
498}
499
500const WAIT_CAPACITY: usize = 1024;
502
503#[cfg(any(target_os = "macos", target_os = "ios"))]
504mod kq {
505 pub const EVFILT_READ: i16 = -1;
506 pub const EVFILT_WRITE: i16 = -2;
507 pub const EV_ADD: u16 = 0x0001;
508 pub const EV_DELETE: u16 = 0x0002;
509 pub const EV_ENABLE: u16 = 0x0004;
510 pub const EV_DISABLE: u16 = 0x0008;
511 pub const EV_EOF: u16 = 0x8000;
512}
513
514#[cfg(any(target_os = "macos", target_os = "ios"))]
516pub struct Poller {
517 kq: c_int,
518}
519
520#[cfg(any(target_os = "macos", target_os = "ios"))]
521impl Poller {
522 pub fn new() -> io::Result<Self> {
523 let kq = unsafe { ffi::kqueue() };
524 if kq < 0 {
525 return Err(io::Error::last_os_error());
526 }
527 Ok(Poller { kq })
528 }
529
530 fn change(&self, fd: i32, filter: i16, flags: u16) -> io::Result<()> {
531 let kev = ffi::Kevent {
532 ident: fd as usize,
533 filter,
534 flags,
535 fflags: 0,
536 data: 0,
537 udata: 0,
538 };
539 let r = unsafe { ffi::kevent(self.kq, &kev, 1, ptr::null_mut(), 0, ptr::null()) };
540 if r < 0 {
541 return Err(io::Error::last_os_error());
542 }
543 Ok(())
544 }
545
546 pub fn add(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
548 let r = if read { kq::EV_ENABLE } else { kq::EV_DISABLE };
549 let w = if write { kq::EV_ENABLE } else { kq::EV_DISABLE };
550 self.change(fd, kq::EVFILT_READ, kq::EV_ADD | r)?;
551 self.change(fd, kq::EVFILT_WRITE, kq::EV_ADD | w)?;
552 Ok(())
553 }
554
555 pub fn modify(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
557 self.change(
558 fd,
559 kq::EVFILT_READ,
560 if read { kq::EV_ENABLE } else { kq::EV_DISABLE },
561 )?;
562 self.change(
563 fd,
564 kq::EVFILT_WRITE,
565 if write { kq::EV_ENABLE } else { kq::EV_DISABLE },
566 )?;
567 Ok(())
568 }
569
570 pub fn delete(&self, fd: i32) -> io::Result<()> {
572 let _ = self.change(fd, kq::EVFILT_READ, kq::EV_DELETE);
573 let _ = self.change(fd, kq::EVFILT_WRITE, kq::EV_DELETE);
574 Ok(())
575 }
576
577 pub fn wait(&self, out: &mut Vec<Event>, timeout_ms: Option<i32>) -> io::Result<usize> {
579 out.clear();
580 let mut raw: Vec<ffi::Kevent> = Vec::with_capacity(WAIT_CAPACITY);
581 let ts;
582 let ts_ptr = match timeout_ms {
583 Some(ms) => {
584 ts = ffi::Timespec {
585 tv_sec: (ms / 1000) as isize,
586 tv_nsec: ((ms % 1000) * 1_000_000) as isize,
587 };
588 &ts as *const ffi::Timespec
589 }
590 None => ptr::null(),
591 };
592 let n = unsafe {
593 ffi::kevent(
594 self.kq,
595 ptr::null(),
596 0,
597 raw.as_mut_ptr(),
598 WAIT_CAPACITY as c_int,
599 ts_ptr,
600 )
601 };
602 if n < 0 {
603 let e = io::Error::last_os_error();
604 if e.kind() == io::ErrorKind::Interrupted {
605 return Ok(0);
606 }
607 return Err(e);
608 }
609 unsafe { raw.set_len(n as usize) };
610 for kev in &raw {
611 out.push(Event {
612 fd: kev.ident as i32,
613 readable: kev.filter == kq::EVFILT_READ,
614 writable: kev.filter == kq::EVFILT_WRITE,
615 hup: kev.flags & kq::EV_EOF != 0,
616 });
617 }
618 Ok(out.len())
619 }
620}
621
622#[cfg(target_os = "linux")]
623mod ep {
624 pub const EPOLL_CLOEXEC: super::c_int = 0x80000;
625 pub const EPOLL_CTL_ADD: super::c_int = 1;
626 pub const EPOLL_CTL_DEL: super::c_int = 2;
627 pub const EPOLL_CTL_MOD: super::c_int = 3;
628 pub const EPOLLIN: u32 = 0x001;
629 pub const EPOLLOUT: u32 = 0x004;
630 pub const EPOLLERR: u32 = 0x008;
631 pub const EPOLLHUP: u32 = 0x010;
632 pub const EPOLLRDHUP: u32 = 0x2000;
633}
634
635#[cfg(target_os = "linux")]
636pub struct Poller {
637 epfd: c_int,
638}
639
640#[cfg(target_os = "linux")]
641impl Poller {
642 pub fn new() -> io::Result<Self> {
643 let epfd = unsafe { ffi::epoll_create1(ep::EPOLL_CLOEXEC) };
644 if epfd < 0 {
645 return Err(io::Error::last_os_error());
646 }
647 Ok(Poller { epfd })
648 }
649
650 fn mask(read: bool, write: bool) -> u32 {
651 let mut m = ep::EPOLLRDHUP;
652 if read {
653 m |= ep::EPOLLIN;
654 }
655 if write {
656 m |= ep::EPOLLOUT;
657 }
658 m
659 }
660
661 fn ctl(&self, op: c_int, fd: i32, read: bool, write: bool) -> io::Result<()> {
662 let mut ev = ffi::EpollEvent {
663 events: Self::mask(read, write),
664 data: fd as u64,
665 };
666 let r = unsafe { ffi::epoll_ctl(self.epfd, op, fd, &mut ev) };
667 if r < 0 {
668 return Err(io::Error::last_os_error());
669 }
670 Ok(())
671 }
672
673 pub fn add(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
674 self.ctl(ep::EPOLL_CTL_ADD, fd, read, write)
675 }
676
677 pub fn modify(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
678 self.ctl(ep::EPOLL_CTL_MOD, fd, read, write)
679 }
680
681 pub fn delete(&self, fd: i32) -> io::Result<()> {
682 let r = unsafe { ffi::epoll_ctl(self.epfd, ep::EPOLL_CTL_DEL, fd, ptr::null_mut()) };
683 if r < 0 {
684 return Err(io::Error::last_os_error());
685 }
686 Ok(())
687 }
688
689 pub fn wait(&self, out: &mut Vec<Event>, timeout_ms: Option<i32>) -> io::Result<usize> {
690 out.clear();
691 let mut raw: Vec<ffi::EpollEvent> = Vec::with_capacity(WAIT_CAPACITY);
692 let n = unsafe {
693 ffi::epoll_wait(
694 self.epfd,
695 raw.as_mut_ptr(),
696 WAIT_CAPACITY as c_int,
697 timeout_ms.unwrap_or(-1),
698 )
699 };
700 if n < 0 {
701 let e = io::Error::last_os_error();
702 if e.kind() == io::ErrorKind::Interrupted {
703 return Ok(0);
704 }
705 return Err(e);
706 }
707 unsafe { raw.set_len(n as usize) };
708 for ev in &raw {
709 let flags = ev.events; let fd = ev.data as i32;
711 let hup = flags & (ep::EPOLLHUP | ep::EPOLLERR | ep::EPOLLRDHUP) != 0;
712 out.push(Event {
713 fd,
714 readable: flags & (ep::EPOLLIN | ep::EPOLLHUP | ep::EPOLLERR) != 0,
715 writable: flags & ep::EPOLLOUT != 0,
716 hup,
717 });
718 }
719 Ok(out.len())
720 }
721}
722
723#[cfg(any(target_os = "linux", target_os = "macos", target_os = "ios"))]
724impl Drop for Poller {
725 fn drop(&mut self) {
726 #[cfg(target_os = "linux")]
727 let fd = self.epfd;
728 #[cfg(any(target_os = "macos", target_os = "ios"))]
729 let fd = self.kq;
730 unsafe {
731 ffi::close(fd);
732 }
733 }
734}
735
736#[cfg(test)]
737mod tests {
738 use super::*;
739 use std::io::{Read, Write};
740
741 #[test]
742 fn listen_accept_roundtrip() {
743 let listener = tcp_listen([127, 0, 0, 1], 0, 16).unwrap();
744 let port = listener.local_port().unwrap();
745 assert_ne!(port, 0);
746
747 let server = std::thread::spawn(move || {
748 let conn = listener.accept().unwrap();
749 let mut b = [0u8; 1];
750 assert_eq!(conn.read(&mut b).unwrap(), 1);
751 conn.write_all(&b).unwrap();
752 });
753
754 let mut client = std::net::TcpStream::connect(("127.0.0.1", port)).unwrap();
755 client.write_all(b"Z").unwrap();
756 let mut got = [0u8; 1];
757 assert_eq!(client.read(&mut got).unwrap(), 1);
758 assert_eq!(&got, b"Z");
759
760 server.join().unwrap();
761 }
762
763 #[test]
764 fn poller_signals_listener_readable() {
765 let listener = tcp_listen([127, 0, 0, 1], 0, 16).unwrap();
766 listener.set_nonblocking().unwrap();
767 let port = listener.local_port().unwrap();
768
769 let poller = Poller::new().unwrap();
770 poller.add(listener.raw(), true, false).unwrap();
771
772 let _client = std::net::TcpStream::connect(("127.0.0.1", port)).unwrap();
773
774 let mut events = Vec::new();
775 let n = poller.wait(&mut events, Some(2000)).unwrap();
776 assert!(n >= 1, "expected a readiness event");
777 assert!(events.iter().any(|e| e.fd == listener.raw() && e.readable));
778
779 listener.accept().unwrap();
781 }
782
783 #[test]
784 fn waker_wakes_poller() {
785 let w = std::sync::Arc::new(waker().unwrap());
786 let poller = Poller::new().unwrap();
787 poller.add(w.read_fd(), true, false).unwrap();
788
789 let w2 = w.clone();
790 std::thread::spawn(move || w2.wake().unwrap());
791
792 let mut events = Vec::new();
793 let n = poller.wait(&mut events, Some(2000)).unwrap();
794 assert!(n >= 1, "waker should have woken the poller");
795 assert!(events.iter().any(|e| e.fd == w.read_fd() && e.readable));
796 w.drain();
797 }
798
799 #[test]
800 fn reuseport_allows_shared_port() {
801 let l1 = tcp_listen_reuseport([127, 0, 0, 1], 0, 16).unwrap();
802 let port = l1.local_port().unwrap();
803 let l2 = tcp_listen_reuseport([127, 0, 0, 1], port, 16).unwrap();
805 assert_eq!(l2.local_port().unwrap(), port);
806 }
807}