use std::collections::HashMap;
use std::sync::atomic::{AtomicI32, Ordering::*};
use std::sync::Mutex;
#[cfg(not(windows))]
use std::os::unix::io::RawFd;
#[cfg(windows)]
type RawFd = i32;
use super::g::G;
struct GRaw(usize);
unsafe impl Send for GRaw {}
static REG: Mutex<Option<HashMap<RawFd, (GRaw, u32)>>> = Mutex::new(None);
fn with_reg<F, R>(f: F) -> R
where
F: FnOnce(&mut HashMap<RawFd, (GRaw, u32)>) -> R,
{
let mut guard = REG.lock().unwrap();
f(guard.get_or_insert_with(HashMap::new))
}
pub(crate) const POLL_READ: u32 = 1;
pub(crate) const POLL_WRITE: u32 = 2;
static POLL_FD: AtomicI32 = AtomicI32::new(-1);
pub(crate) fn netpoll_init() {
#[cfg(not(windows))]
{
if POLL_FD.load(Relaxed) >= 0 {
return;
}
let fd = create_poll_fd();
assert!(fd >= 0, "netpoll_init: could not create poll fd");
if POLL_FD
.compare_exchange(-1, fd, AcqRel, Relaxed)
.is_err()
{
unsafe { libc::close(fd) };
}
}
#[cfg(windows)]
iocp_win_init();
}
pub(crate) unsafe fn netpoll_arm(fd: RawFd, mode: u32, gp: *mut G) {
let pfd = POLL_FD.load(Acquire);
if pfd < 0 {
return;
}
with_reg(|reg| {
reg.insert(fd, (GRaw(gp as usize), mode));
});
#[cfg(not(windows))]
unsafe { poll_add(pfd, fd, mode) };
}
pub(crate) fn netpoll_unarm(fd: RawFd) {
with_reg(|reg| { reg.remove(&fd); });
#[cfg(not(windows))]
{
let pfd = POLL_FD.load(Acquire);
if pfd >= 0 {
unsafe { poll_del(pfd, fd) };
}
}
}
pub(crate) unsafe fn netpoll_wait(timeout_ms: i32) -> Vec<*mut G> {
#[cfg(windows)]
return unsafe { iocp_win_wait(timeout_ms) };
#[cfg(not(windows))]
{
let pfd = POLL_FD.load(Acquire);
if pfd < 0 {
return Vec::new();
}
let ready_fds = unsafe { poll_wait(pfd, timeout_ms) };
let mut goroutines = Vec::with_capacity(ready_fds.len());
with_reg(|reg| {
for fd in &ready_fds {
if let Some((g_raw, _mode)) = reg.remove(fd) {
goroutines.push(g_raw.0 as *mut G);
unsafe { poll_del(pfd, *fd) };
}
}
});
goroutines
}
}
#[cfg(target_os = "linux")]
fn create_poll_fd() -> RawFd {
unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) }
}
#[cfg(target_os = "linux")]
unsafe fn poll_add(epfd: RawFd, fd: RawFd, mode: u32) {
let mut ev = libc::epoll_event {
events: {
let mut e = libc::EPOLLONESHOT as u32;
if mode & POLL_READ != 0 { e |= libc::EPOLLIN as u32; }
if mode & POLL_WRITE != 0 { e |= libc::EPOLLOUT as u32; }
e
},
u64: fd as u64,
};
unsafe {
let ret = libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev);
if ret < 0 {
libc::epoll_ctl(epfd, libc::EPOLL_CTL_MOD, fd, &mut ev);
}
}
}
#[cfg(target_os = "linux")]
unsafe fn poll_del(epfd: RawFd, fd: RawFd) {
unsafe {
libc::epoll_ctl(epfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
}
}
#[cfg(target_os = "linux")]
unsafe fn poll_wait(epfd: RawFd, timeout_ms: i32) -> Vec<RawFd> {
const MAX_EVENTS: usize = 128;
let mut events = [libc::epoll_event { events: 0, u64: 0 }; MAX_EVENTS];
let n = unsafe {
libc::epoll_wait(epfd, events.as_mut_ptr(), MAX_EVENTS as i32, timeout_ms)
};
if n <= 0 {
return Vec::new();
}
(0..n as usize).map(|i| events[i].u64 as RawFd).collect()
}
#[cfg(target_os = "macos")]
fn create_poll_fd() -> RawFd {
unsafe { libc::kqueue() }
}
#[cfg(target_os = "macos")]
unsafe fn poll_add(kq: RawFd, fd: RawFd, mode: u32) {
let mut changes: [libc::kevent; 2] = unsafe { std::mem::zeroed() };
let mut n = 0usize;
if mode & POLL_READ != 0 {
changes[n] = libc::kevent {
ident: fd as libc::uintptr_t,
filter: libc::EVFILT_READ,
flags: libc::EV_ADD | libc::EV_ONESHOT,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
n += 1;
}
if mode & POLL_WRITE != 0 {
changes[n] = libc::kevent {
ident: fd as libc::uintptr_t,
filter: libc::EVFILT_WRITE,
flags: libc::EV_ADD | libc::EV_ONESHOT,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
};
n += 1;
}
if n > 0 {
unsafe {
libc::kevent(
kq,
changes.as_ptr(),
n as libc::c_int,
std::ptr::null_mut(),
0,
std::ptr::null(),
);
}
}
}
#[cfg(target_os = "macos")]
unsafe fn poll_del(kq: RawFd, fd: RawFd) {
let changes = [
libc::kevent {
ident: fd as libc::uintptr_t,
filter: libc::EVFILT_READ,
flags: libc::EV_DELETE,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
},
libc::kevent {
ident: fd as libc::uintptr_t,
filter: libc::EVFILT_WRITE,
flags: libc::EV_DELETE,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
},
];
unsafe {
libc::kevent(
kq,
changes.as_ptr(),
2,
std::ptr::null_mut(),
0,
std::ptr::null(),
);
}
}
#[cfg(target_os = "macos")]
unsafe fn poll_wait(kq: RawFd, timeout_ms: i32) -> Vec<RawFd> {
const MAX_EVENTS: usize = 128;
let mut events: [libc::kevent; MAX_EVENTS] = unsafe { std::mem::zeroed() };
let ts;
let ts_ptr;
if timeout_ms < 0 {
ts_ptr = std::ptr::null();
} else {
ts = libc::timespec {
tv_sec: (timeout_ms / 1000) as libc::time_t,
tv_nsec: ((timeout_ms % 1000) * 1_000_000) as libc::c_long,
};
ts_ptr = &ts as *const libc::timespec;
}
let n = unsafe {
libc::kevent(
kq,
std::ptr::null(),
0,
events.as_mut_ptr(),
MAX_EVENTS as libc::c_int,
ts_ptr,
)
};
if n <= 0 {
return Vec::new();
}
(0..n as usize)
.map(|i| events[i].ident as RawFd)
.collect()
}
#[cfg(windows)]
use std::sync::OnceLock;
#[cfg(windows)]
static WIN_IOCP: OnceLock<usize> = OnceLock::new();
#[cfg(windows)]
type WinHandle = *mut std::ffi::c_void;
#[cfg(windows)]
#[repr(C)]
pub(crate) struct IocpOp {
pub overlapped: WinOverlapped,
pub gp: *mut G,
pub bytes_transferred: u32,
pub ntstatus: u32,
}
#[cfg(windows)]
unsafe impl Send for IocpOp {}
#[cfg(windows)]
#[repr(C)]
pub(crate) struct WinOverlapped {
pub internal: usize, pub internal_high: usize, pub offset: u32, pub offset_high: u32, pub h_event: WinHandle, }
#[cfg(windows)]
#[repr(C)]
struct OverlappedEntry {
completion_key: usize, overlapped: *mut WinOverlapped, internal: usize, bytes_transferred: u32, }
#[cfg(windows)]
#[repr(C)]
struct WsaData([u8; 400]);
#[cfg(windows)]
#[link(name = "ws2_32")]
unsafe extern "system" {
fn WSAStartup(w_version_required: u16, lp_wsa_data: *mut WsaData) -> i32;
}
#[cfg(windows)]
#[link(name = "kernel32")]
unsafe extern "system" {
fn CreateIoCompletionPort(
file_handle: WinHandle,
existing_completion_port: WinHandle,
completion_key: usize,
number_of_concurrent_threads: u32,
) -> WinHandle;
fn GetQueuedCompletionStatusEx(
completion_port: WinHandle,
lp_completion_port_entries: *mut OverlappedEntry,
ul_count: u32,
ul_num_entries_removed: *mut u32,
dw_milliseconds: u32,
f_alertable: i32,
) -> i32;
}
#[cfg(windows)]
pub(crate) fn iocp_win_init() {
if WIN_IOCP.get().is_some() {
return;
}
let mut data = WsaData([0u8; 400]);
let rc = unsafe { WSAStartup(0x0202u16, &mut data) };
assert_eq!(rc, 0, "netpoll_init: WSAStartup failed (error {rc})");
let invalid: WinHandle = usize::MAX as WinHandle;
let h = unsafe { CreateIoCompletionPort(invalid, std::ptr::null_mut(), 0, 0) };
assert!(!h.is_null(), "netpoll_init: CreateIoCompletionPort failed");
let _ = WIN_IOCP.set(h as usize);
}
#[cfg(windows)]
#[inline]
pub(crate) fn netpoll_iocp_handle() -> WinHandle {
WIN_IOCP.get().map(|&h| h as WinHandle).unwrap_or(std::ptr::null_mut())
}
#[cfg(windows)]
pub(crate) fn netpoll_iocp_associate(socket: usize) -> bool {
let iocp = netpoll_iocp_handle();
if iocp.is_null() {
return false;
}
let result = unsafe {
CreateIoCompletionPort(socket as WinHandle, iocp, socket, 0)
};
!result.is_null()
}
#[cfg(windows)]
unsafe fn iocp_win_wait(timeout_ms: i32) -> Vec<*mut G> {
let iocp = netpoll_iocp_handle();
if iocp.is_null() {
return Vec::new();
}
const MAX_ENTRIES: usize = 64;
let mut entries: [OverlappedEntry; MAX_ENTRIES] = unsafe { std::mem::zeroed() };
let mut removed: u32 = 0;
let timeout_dw: u32 = if timeout_ms < 0 { u32::MAX } else { timeout_ms as u32 };
let ok = unsafe {
GetQueuedCompletionStatusEx(
iocp,
entries.as_mut_ptr(),
MAX_ENTRIES as u32,
&mut removed,
timeout_dw,
0, )
};
if ok == 0 || removed == 0 {
return Vec::new();
}
let mut goroutines = Vec::with_capacity(removed as usize);
for i in 0..removed as usize {
let lp = entries[i].overlapped;
if lp.is_null() {
continue;
}
let op: *mut IocpOp = lp as *mut IocpOp;
unsafe {
(*op).bytes_transferred = entries[i].bytes_transferred;
(*op).ntstatus = entries[i].internal as u32;
goroutines.push((*op).gp);
}
}
goroutines
}