Skip to main content

hiver_runtime/
io.rs

1//! I/O operations module
2//! I/O操作模块
3//!
4//! # Overview / 概述
5//!
6//! This module provides async I/O primitives for TCP and UDP networking.
7//! 本模块提供用于TCP和UDP网络的异步I/O原语。
8//!
9//! # Features / 功能
10//!
11//! - Async TCP stream with connect/read/write / 带有connect/read/write的异步TCP流
12//! - Async TCP listener for accepting connections / 用于接受连接的异步TCP监听器
13//! - Zero-copy ready operations / 零拷贝就绪操作
14//!
15//! # Example / 示例
16//!
17//! ```rust,no_run,ignore
18//! use hiver_runtime::io::TcpStream;
19//!
20//! async fn echo_client() -> std::io::Result<()> {
21//!     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
22//!
23//!     stream.write_all(b"Hello, World!").await?;
24//!
25//!     let mut buf = [0u8; 1024];
26//!     let n = stream.read(&mut buf).await?;
27//!
28//!     println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
29//!     Ok(())
30//! }
31//! ```
32
33#![allow(private_interfaces)]
34
35use std::future::Future;
36use std::io;
37use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr};
38use std::os::fd::{AsRawFd, FromRawFd, RawFd};
39use std::pin::Pin;
40use std::task::{Context, Poll};
41
42const DUMMY_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
43
44/// A TCP stream between a local and a remote socket
45/// 本地套接字和远程套接字之间的TCP流
46///
47/// Provides async read/write operations with the underlying driver.
48/// 使用底层驱动提供异步读/写操作。
49pub struct TcpStream {
50    /// The raw file descriptor / 原始文件描述符
51    fd: std::os::fd::OwnedFd,
52}
53
54impl TcpStream {
55    /// Create a new TcpStream from a raw file descriptor
56    /// 从原始文件描述符创建新的TcpStream
57    ///
58    /// # Safety / 安全性
59    ///
60    /// The fd must be valid and owned by the caller.
61    /// fd必须有效且由调用者拥有。
62    pub(crate) unsafe fn from_raw_fd(fd: RawFd) -> io::Result<Self> {
63        // Set non-blocking mode
64        // 设置非阻塞模式
65        #[cfg(unix)]
66        unsafe {
67            let flags = libc::fcntl(fd, libc::F_GETFL);
68            if flags < 0 {
69                return Err(io::Error::last_os_error());
70            }
71            if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
72                return Err(io::Error::last_os_error());
73            }
74        }
75
76        Ok(Self {
77            // SAFETY: Caller guarantees ownership
78            // 安全性:调用者保证所有权
79            fd: unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) },
80        })
81    }
82
83    /// Connect to a remote address
84    /// 连接到远程地址
85    ///
86    /// # Example / 示例
87    ///
88    /// ```rust,no_run,ignore
89    /// use hiver_runtime::io::TcpStream;
90    ///
91    /// async fn connect() -> std::io::Result<()> {
92    ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
93    ///     Ok(())
94    /// }
95    /// ```
96    pub fn connect(addr: &str) -> ConnectFuture {
97        let Ok(addr) = addr.parse::<SocketAddr>() else {
98            // Try to resolve as hostname
99            // For now, return error - DNS resolution will be added later
100            return ConnectFuture::Error(io::Error::new(
101                io::ErrorKind::InvalidInput,
102                "Invalid address format, use IP:PORT",
103            ));
104        };
105
106        ConnectFuture::Connecting(Box::new(ConnectingState {
107            addr,
108            fd: None,
109            started: false,
110        }))
111    }
112
113    /// Read some bytes from the stream
114    /// 从流中读取一些字节
115    ///
116    /// Returns the number of bytes read. May return 0 if the stream is closed.
117    /// 返回读取的字节数。如果流已关闭,可能返回0。
118    pub fn read<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> ReadFuture<'a, 'b> {
119        ReadFuture {
120            stream: Some(self),
121            buf,
122            pos: 0,
123        }
124    }
125
126    /// Write all bytes to the stream
127    /// 将所有字节写入流
128    ///
129    /// This will keep writing until all bytes have been written or an error occurs.
130    /// 将持续写入,直到所有字节都已写入或发生错误。
131    pub fn write_all<'a, 'b>(&'a mut self, buf: &'b [u8]) -> WriteAllFuture<'a, 'b> {
132        WriteAllFuture {
133            stream: Some(self),
134            buf,
135            pos: 0,
136        }
137    }
138
139    /// Split the stream into read and write halves
140    /// 将流拆分为读写两半
141    ///
142    /// Note: This is a placeholder. The actual implementation will use
143    /// Arc-based splitting like Tokio for true split read/write.
144    /// 注意:这是占位符。实际实现将使用类似Tokio的基于Arc的拆分来实现真正的读写分离。
145    ///
146    /// # Note / 注意
147    ///
148    /// This is a simplified split implementation. Both halves reference the same
149    /// underlying socket. The caller must coordinate read/write operations.
150    /// 这是简化的 split 实现。两个半部引用同一个底层 socket。
151    /// 调用者必须协调读/写操作。
152    pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
153        // SAFETY: Both halves reference the same stream via raw pointer.
154        // This is safe because TCP sockets support full-duplex I/O —
155        // reads and writes can proceed concurrently at the kernel level.
156        // The caller must not perform conflicting operations on the same half.
157        // 安全:两个半部通过裸指针引用同一个流。
158        // 这是安全的,因为 TCP socket 支持全双工 I/O —
159        // 读和写可以在内核级别并发进行。
160        // 调用者不得在同一个半部上执行冲突操作。
161        unsafe {
162            let ptr = self as *mut TcpStream;
163            (ReadHalf { _stream: &mut *ptr }, WriteHalf { _stream: &mut *ptr })
164        }
165    }
166
167    /// Shutdown the stream
168    /// 关闭流
169    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
170        #[cfg(unix)]
171        unsafe {
172            let how = match how {
173                Shutdown::Read => libc::SHUT_RD,
174                Shutdown::Write => libc::SHUT_WR,
175                Shutdown::Both => libc::SHUT_RDWR,
176            };
177            if libc::shutdown(self.as_raw_fd(), how) < 0 {
178                return Err(io::Error::last_os_error());
179            }
180        }
181        #[cfg(not(unix))]
182        {
183            let _ = how;
184            return Err(io::Error::new(
185                io::ErrorKind::Unsupported,
186                "Shutdown not supported on this platform",
187            ));
188        }
189        Ok(())
190    }
191}
192
193impl AsRawFd for TcpStream {
194    fn as_raw_fd(&self) -> RawFd {
195        self.fd.as_raw_fd()
196    }
197}
198
199/// Future for connecting to a remote address
200/// 连接到远程地址的future
201pub enum ConnectFuture {
202    /// Error state / 错误状态
203    Error(io::Error),
204    /// Connecting state / 连接中状态
205    /// Boxed to reduce enum size / 使用Box减小枚举大小
206    Connecting(Box<ConnectingState>),
207    /// Done state / 完成状态
208    Done,
209}
210
211struct ConnectingState {
212    addr: SocketAddr,
213    fd: Option<RawFd>,
214    started: bool,
215}
216
217impl Future for ConnectFuture {
218    type Output = io::Result<TcpStream>;
219
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221        match &mut *self {
222            ConnectFuture::Error(e) => {
223                let e = std::mem::replace(e, io::Error::other(""));
224                Poll::Ready(Err(e))
225            },
226            ConnectFuture::Done => panic!("ConnectFuture polled after completion"),
227            ConnectFuture::Connecting(state) => {
228                if !state.started {
229                    state.started = true;
230
231                    // Create socket and start connect
232                    // 创建套接字并启动connect
233                    let fd: RawFd = create_socket(state.addr.is_ipv4());
234
235                    if fd < 0 {
236                        return Poll::Ready(Err(io::Error::last_os_error()));
237                    }
238
239                    // Start connect (socket is already non-blocking from create_socket)
240                    // 启动 connect(create_socket 已设置非阻塞)
241                    let result = do_connect(fd, state.addr);
242
243                    if result < 0 {
244                        let err = io::Error::last_os_error();
245                        if err.kind() != io::ErrorKind::WouldBlock {
246                            unsafe { libc::close(fd) };
247                            return Poll::Ready(Err(err));
248                        }
249                        // Async connect in progress — store fd for later polling
250                        // 异步 connect 进行中 — 存储 fd 用于后续轮询
251                        state.fd = Some(fd);
252                        return Poll::Pending;
253                    }
254
255                    // Connected immediately
256                    // 立即连接
257                    state.fd = Some(fd);
258                }
259
260                // Check if async connect has completed using poll()
261                // 使用 poll() 检查异步连接是否完成
262                if let Some(fd) = state.fd {
263                    let mut pfd = libc::pollfd {
264                        fd,
265                        events: libc::POLLOUT,
266                        revents: 0,
267                    };
268                    let ready = unsafe { libc::poll(&mut pfd, 1, 0) };
269
270                    if ready < 0 {
271                        let fd = state.fd.take().unwrap();
272                        unsafe { libc::close(fd) };
273                        return Poll::Ready(Err(io::Error::last_os_error()));
274                    }
275
276                    if ready == 0 {
277                        // Not ready yet — register waker for future notification
278                        // 尚未就绪 — 注册 waker 以便未来通知
279                        cx.waker().wake_by_ref();
280                        return Poll::Pending;
281                    }
282
283                    // Socket is writable — check for connection error via SO_ERROR
284                    // Socket 可写 — 通过 SO_ERROR 检查连接错误
285                    let mut err_val: libc::c_int = 0;
286                    let mut err_len: libc::socklen_t =
287                        size_of::<libc::c_int>() as libc::socklen_t;
288                    unsafe {
289                        libc::getsockopt(
290                            fd,
291                            libc::SOL_SOCKET,
292                            libc::SO_ERROR,
293                            &mut err_val as *mut _ as *mut _,
294                            &mut err_len,
295                        );
296                    }
297                    if err_val != 0 {
298                        let fd = state.fd.take().unwrap();
299                        unsafe { libc::close(fd) };
300                        return Poll::Ready(Err(io::Error::from_raw_os_error(err_val)));
301                    }
302
303                    // Connected successfully
304                    // 连接成功
305                    let fd = state.fd.take().unwrap();
306                    let stream = match unsafe { TcpStream::from_raw_fd(fd) } {
307                        Ok(s) => s,
308                        Err(e) => return Poll::Ready(Err(e)),
309                    };
310                    *self = ConnectFuture::Done;
311                    Poll::Ready(Ok(stream))
312                } else {
313                    Poll::Pending
314                }
315            },
316        }
317    }
318}
319
320/// Helper to create a non-blocking socket
321/// 创建非阻塞套接字的辅助函数
322#[cfg(unix)]
323fn create_socket(ipv4: bool) -> RawFd {
324    unsafe {
325        let domain = if ipv4 { libc::AF_INET } else { libc::AF_INET6 };
326
327        #[cfg(target_os = "linux")]
328        let fd = libc::socket(domain, libc::SOCK_STREAM | libc::SOCK_CLOEXEC, 0);
329
330        #[cfg(not(target_os = "linux"))]
331        let fd = libc::socket(domain, libc::SOCK_STREAM, 0);
332
333        if fd < 0 {
334            return fd;
335        }
336
337        #[cfg(not(target_os = "linux"))]
338        {
339            // Set close-on-exec for macOS/BSD
340            if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
341                libc::close(fd);
342                return -1;
343            }
344        }
345
346        // Set non-blocking
347        let flags = libc::fcntl(fd, libc::F_GETFL);
348        if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
349            libc::close(fd);
350            return -1;
351        }
352
353        fd
354    }
355}
356
357/// Helper to start a connect
358/// 启动connect的辅助函数
359#[cfg(unix)]
360fn do_connect(fd: RawFd, addr: SocketAddr) -> i32 {
361    unsafe {
362        if addr.is_ipv4() {
363            if let SocketAddr::V4(v4) = addr {
364                #[cfg(target_os = "linux")]
365                let sockaddr = libc::sockaddr_in {
366                    sin_family: libc::AF_INET as u16,
367                    sin_port: v4.port().to_be(),
368                    sin_addr: libc::in_addr {
369                        s_addr: u32::from_ne_bytes(v4.ip().octets()),
370                    },
371                    sin_zero: [0; 8],
372                };
373
374                #[cfg(not(target_os = "linux"))]
375                let sockaddr = libc::sockaddr_in {
376                    sin_len: size_of::<libc::sockaddr_in>() as u8,
377                    sin_family: libc::AF_INET as u8,
378                    sin_port: v4.port().to_be(),
379                    sin_addr: libc::in_addr {
380                        s_addr: u32::from_ne_bytes(v4.ip().octets()),
381                    },
382                    sin_zero: [0; 8],
383                };
384
385                libc::connect(
386                    fd,
387                    &sockaddr as *const _ as *const libc::sockaddr,
388                    size_of::<libc::sockaddr_in>() as libc::socklen_t,
389                )
390            } else {
391                -1
392            }
393        } else {
394            if let SocketAddr::V6(v6) = addr {
395                #[cfg(target_os = "linux")]
396                let sockaddr = libc::sockaddr_in6 {
397                    sin6_family: libc::AF_INET6 as u16,
398                    sin6_port: v6.port().to_be(),
399                    sin6_flowinfo: v6.flowinfo(),
400                    sin6_addr: libc::in6_addr {
401                        s6_addr: v6.ip().octets(),
402                    },
403                    sin6_scope_id: v6.scope_id(),
404                };
405
406                #[cfg(not(target_os = "linux"))]
407                let sockaddr = libc::sockaddr_in6 {
408                    sin6_len: size_of::<libc::sockaddr_in6>() as u8,
409                    sin6_family: libc::AF_INET6 as u8,
410                    sin6_port: v6.port().to_be(),
411                    sin6_flowinfo: v6.flowinfo(),
412                    sin6_addr: libc::in6_addr {
413                        s6_addr: v6.ip().octets(),
414                    },
415                    sin6_scope_id: v6.scope_id(),
416                };
417
418                libc::connect(
419                    fd,
420                    &sockaddr as *const _ as *const libc::sockaddr,
421                    size_of::<libc::sockaddr_in6>() as libc::socklen_t,
422                )
423            } else {
424                -1
425            }
426        }
427    }
428}
429
430/// Future for reading from a TcpStream
431/// 从TcpStream读取的future
432pub struct ReadFuture<'a, 'b> {
433    stream: Option<&'a mut TcpStream>,
434    buf: &'b mut [u8],
435    pos: usize,
436}
437
438impl Future for ReadFuture<'_, '_> {
439    type Output = io::Result<usize>;
440
441    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
442        // Extract all needed values upfront to avoid borrow issues
443        // 提前提取所有需要的值以避免借用问题
444        let stream_fd;
445        let buf_ptr;
446        let buf_len;
447
448        {
449            let stream = self.stream.as_mut().unwrap();
450            stream_fd = stream.as_raw_fd();
451            let pos = self.pos;
452            buf_ptr = self.buf[pos..].as_mut_ptr();
453            buf_len = self.buf[pos..].len();
454        }
455
456        #[cfg(unix)]
457        {
458            let result = unsafe { libc::read(stream_fd, buf_ptr as *mut _, buf_len) };
459
460            if result < 0 {
461                let err = io::Error::last_os_error();
462                if err.kind() == io::ErrorKind::WouldBlock {
463                    // Would block - should register with driver
464                    // 会阻塞 - 应该向驱动注册
465                    // For now, just return Pending
466                    // 目前只返回Pending
467                    return Poll::Pending;
468                }
469                return Poll::Ready(Err(err));
470            }
471
472            let n = result as usize;
473            if n == 0 {
474                return Poll::Ready(Ok(0)); // EOF
475            }
476
477            self.pos += n;
478            Poll::Ready(Ok(n))
479        }
480
481        #[cfg(not(unix))]
482        {
483            let _ = (stream_fd, buf_ptr, buf_len);
484            Poll::Ready(Err(io::Error::new(
485                io::ErrorKind::Unsupported,
486                "TCP read not yet implemented on this platform",
487            )))
488        }
489    }
490}
491
492/// Future for writing all bytes to a TcpStream
493/// 向TcpStream写入所有字节的future
494pub struct WriteAllFuture<'a, 'b> {
495    stream: Option<&'a mut TcpStream>,
496    buf: &'b [u8],
497    pos: usize,
498}
499
500impl Future for WriteAllFuture<'_, '_> {
501    type Output = io::Result<()>;
502
503    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
504        while self.pos < self.buf.len() {
505            let stream = self.stream.as_mut().unwrap();
506
507            #[cfg(unix)]
508            {
509                let result = unsafe {
510                    libc::write(
511                        stream.as_raw_fd(),
512                        self.buf[self.pos..].as_ptr() as *const _,
513                        self.buf[self.pos..].len(),
514                    )
515                };
516
517                if result < 0 {
518                    let err = io::Error::last_os_error();
519                    if err.kind() == io::ErrorKind::WouldBlock {
520                        return Poll::Pending;
521                    }
522                    return Poll::Ready(Err(err));
523                }
524
525                let n = result as usize;
526                if n == 0 {
527                    return Poll::Ready(Err(io::Error::new(
528                        io::ErrorKind::WriteZero,
529                        "write zero byte",
530                    )));
531                }
532
533                self.pos += n;
534            }
535
536            #[cfg(not(unix))]
537            {
538                let _ = stream;
539                return Poll::Ready(Err(io::Error::new(
540                    io::ErrorKind::Unsupported,
541                    "TCP write not yet implemented on this platform",
542                )));
543            }
544        }
545
546        Poll::Ready(Ok(()))
547    }
548}
549
550/// Read half of a TcpStream
551/// TcpStream的读半部
552pub struct ReadHalf<'a> {
553    _stream: &'a mut TcpStream,
554}
555
556/// Write half of a TcpStream
557/// TcpStream的写半部
558pub struct WriteHalf<'a> {
559    _stream: &'a mut TcpStream,
560}
561
562/// A TCP socket listener
563/// TCP套接字监听器
564///
565/// Listens for incoming connections on a specific address.
566/// 在特定地址上监听传入的连接。
567pub struct TcpListener {
568    /// The raw file descriptor / 原始文件描述符
569    fd: std::os::fd::OwnedFd,
570}
571
572impl TcpListener {
573    /// Create a new TCP listener bound to the specified address
574    /// 创建绑定到指定地址的新TCP监听器
575    ///
576    /// # Example / 示例
577    ///
578    /// ```rust,no_run,ignore
579    /// use hiver_runtime::io::TcpListener;
580    ///
581    /// async fn listen() -> std::io::Result<()> {
582    ///     let listener = TcpListener::bind("127.0.0.1:8080").await?;
583    ///     println!("Listening on 127.0.0.1:8080");
584    ///     Ok(())
585    /// }
586    /// ```
587    pub fn bind(addr: &str) -> BindFuture {
588        let Ok(addr) = addr.parse::<SocketAddr>() else {
589            return BindFuture::Error(io::Error::new(
590                io::ErrorKind::InvalidInput,
591                "Invalid address format, use IP:PORT",
592            ));
593        };
594
595        BindFuture::Binding(BindingState { addr })
596    }
597
598    /// Accept a new connection
599    /// 接受新连接
600    pub fn accept(&mut self) -> AcceptFuture<'_> {
601        AcceptFuture { listener: self }
602    }
603
604    /// Get the local address
605    /// 获取本地地址
606    pub fn local_addr(&self) -> io::Result<SocketAddr> {
607        #[cfg(unix)]
608        unsafe {
609            let mut addr: libc::sockaddr_storage = std::mem::zeroed();
610            let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
611
612            if libc::getsockname(
613                self.as_raw_fd(),
614                &mut addr as *mut _ as *mut libc::sockaddr,
615                &mut len,
616            ) < 0
617            {
618                return Err(io::Error::last_os_error());
619            }
620
621            // Convert to SocketAddr (simplified)
622            // 转换为SocketAddr(简化版)
623            Ok(DUMMY_ADDR)
624        }
625
626        #[cfg(not(unix))]
627        {
628            Err(io::Error::new(
629                io::ErrorKind::Unsupported,
630                "local_addr not supported on this platform",
631            ))
632        }
633    }
634}
635
636impl AsRawFd for TcpListener {
637    fn as_raw_fd(&self) -> RawFd {
638        self.fd.as_raw_fd()
639    }
640}
641
642/// Future for binding a TCP listener
643/// 绑定TCP监听器的future
644pub enum BindFuture {
645    /// Error state / 错误状态
646    Error(io::Error),
647    /// Binding state / 绑定中状态
648    Binding(BindingState),
649    /// Done state / 完成状态
650    Done,
651}
652
653struct BindingState {
654    addr: SocketAddr,
655}
656
657impl Future for BindFuture {
658    type Output = io::Result<TcpListener>;
659
660    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
661        match &mut *self {
662            BindFuture::Error(e) => {
663                let e = std::mem::replace(e, io::Error::other(""));
664                Poll::Ready(Err(e))
665            },
666            BindFuture::Done => panic!("BindFuture polled after completion"),
667            BindFuture::Binding(state) => {
668                // Create and bind socket
669                // 创建并绑定套接字
670                let fd = create_socket(state.addr.is_ipv4());
671
672                if fd < 0 {
673                    return Poll::Ready(Err(io::Error::last_os_error()));
674                }
675
676                // Set reuse address
677                // 设置地址重用
678                #[cfg(unix)]
679                unsafe {
680                    let opt: i32 = 1;
681                    if libc::setsockopt(
682                        fd,
683                        libc::SOL_SOCKET,
684                        libc::SO_REUSEADDR,
685                        &opt as *const _ as *const _,
686                        size_of::<i32>() as libc::socklen_t,
687                    ) < 0
688                    {
689                        libc::close(fd);
690                        return Poll::Ready(Err(io::Error::last_os_error()));
691                    }
692
693                    // Bind
694                    // 绑定
695                    let result = do_bind(fd, state.addr);
696                    if result < 0 {
697                        let err = io::Error::last_os_error();
698                        libc::close(fd);
699                        return Poll::Ready(Err(err));
700                    }
701
702                    // Listen
703                    // 监听
704                    if libc::listen(fd, 128) < 0 {
705                        let err = io::Error::last_os_error();
706                        libc::close(fd);
707                        return Poll::Ready(Err(err));
708                    }
709
710                    let listener = TcpListener {
711                        // SAFETY: fd is valid and owned
712                        fd: std::os::fd::OwnedFd::from_raw_fd(fd),
713                    };
714
715                    *self = BindFuture::Done;
716                    Poll::Ready(Ok(listener))
717                }
718
719                #[cfg(not(unix))]
720                {
721                    Poll::Ready(Err(io::Error::new(
722                        io::ErrorKind::Unsupported,
723                        "TCP bind not yet implemented on this platform",
724                    )))
725                }
726            },
727        }
728    }
729}
730
731/// Helper to bind a socket to an address
732/// 将套接字绑定到地址的辅助函数
733#[cfg(unix)]
734fn do_bind(fd: RawFd, addr: SocketAddr) -> i32 {
735    unsafe {
736        if addr.is_ipv4() {
737            if let SocketAddr::V4(v4) = addr {
738                #[cfg(target_os = "linux")]
739                let sockaddr = libc::sockaddr_in {
740                    sin_family: libc::AF_INET as u16,
741                    sin_port: v4.port().to_be(),
742                    sin_addr: libc::in_addr {
743                        s_addr: u32::from_ne_bytes(v4.ip().octets()),
744                    },
745                    sin_zero: [0; 8],
746                };
747
748                #[cfg(not(target_os = "linux"))]
749                let sockaddr = libc::sockaddr_in {
750                    sin_len: size_of::<libc::sockaddr_in>() as u8,
751                    sin_family: libc::AF_INET as u8,
752                    sin_port: v4.port().to_be(),
753                    sin_addr: libc::in_addr {
754                        s_addr: u32::from_ne_bytes(v4.ip().octets()),
755                    },
756                    sin_zero: [0; 8],
757                };
758
759                libc::bind(
760                    fd,
761                    &sockaddr as *const _ as *const libc::sockaddr,
762                    size_of::<libc::sockaddr_in>() as libc::socklen_t,
763                )
764            } else {
765                -1
766            }
767        } else {
768            if let SocketAddr::V6(v6) = addr {
769                #[cfg(target_os = "linux")]
770                let sockaddr = libc::sockaddr_in6 {
771                    sin6_family: libc::AF_INET6 as u16,
772                    sin6_port: v6.port().to_be(),
773                    sin6_flowinfo: v6.flowinfo(),
774                    sin6_addr: libc::in6_addr {
775                        s6_addr: v6.ip().octets(),
776                    },
777                    sin6_scope_id: v6.scope_id(),
778                };
779
780                #[cfg(not(target_os = "linux"))]
781                let sockaddr = libc::sockaddr_in6 {
782                    sin6_len: size_of::<libc::sockaddr_in6>() as u8,
783                    sin6_family: libc::AF_INET6 as u8,
784                    sin6_port: v6.port().to_be(),
785                    sin6_flowinfo: v6.flowinfo(),
786                    sin6_addr: libc::in6_addr {
787                        s6_addr: v6.ip().octets(),
788                    },
789                    sin6_scope_id: v6.scope_id(),
790                };
791
792                libc::bind(
793                    fd,
794                    &sockaddr as *const _ as *const libc::sockaddr,
795                    size_of::<libc::sockaddr_in6>() as libc::socklen_t,
796                )
797            } else {
798                -1
799            }
800        }
801    }
802}
803
804/// Future for accepting a connection
805/// 接受连接的future
806pub struct AcceptFuture<'a> {
807    listener: &'a mut TcpListener,
808}
809
810impl Future for AcceptFuture<'_> {
811    type Output = io::Result<(TcpStream, SocketAddr)>;
812
813    #[allow(unused_mut)]
814    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
815        #[cfg(unix)]
816        {
817            let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
818            let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
819
820            let fd = unsafe {
821                #[cfg(target_os = "linux")]
822                {
823                    libc::accept4(
824                        self.listener.as_raw_fd(),
825                        &mut addr as *mut _ as *mut libc::sockaddr,
826                        &mut len,
827                        libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
828                    )
829                }
830
831                #[cfg(not(target_os = "linux"))]
832                {
833                    // Use regular accept on macOS/BSD, then set flags
834                    let fd = libc::accept(
835                        self.listener.as_raw_fd(),
836                        &mut addr as *mut _ as *mut libc::sockaddr,
837                        &mut len,
838                    );
839
840                    if fd >= 0 {
841                        // Set close-on-exec
842                        if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
843                            libc::close(fd);
844                            return Poll::Ready(Err(io::Error::last_os_error()));
845                        }
846
847                        // Set non-blocking
848                        let flags = libc::fcntl(fd, libc::F_GETFL);
849                        if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
850                            libc::close(fd);
851                            return Poll::Ready(Err(io::Error::last_os_error()));
852                        }
853                    }
854
855                    fd
856                }
857            };
858
859            if fd < 0 {
860                let err = io::Error::last_os_error();
861                if err.kind() == io::ErrorKind::WouldBlock {
862                    return Poll::Pending;
863                }
864                return Poll::Ready(Err(err));
865            }
866
867            let stream = match unsafe { TcpStream::from_raw_fd(fd) } {
868                Ok(s) => s,
869                Err(e) => return Poll::Ready(Err(e)),
870            };
871
872            // Parse peer address (simplified)
873            // 解析对端地址(简化版)
874            let peer_addr = match self.listener.local_addr() {
875                Ok(_) => DUMMY_ADDR,
876                Err(_) => return Poll::Ready(Err(io::Error::last_os_error())),
877            };
878
879            Poll::Ready(Ok((stream, peer_addr)))
880        }
881
882        #[cfg(not(unix))]
883        {
884            Poll::Ready(Err(io::Error::new(
885                io::ErrorKind::Unsupported,
886                "TCP accept not yet implemented on this platform",
887            )))
888        }
889    }
890}
891
892/// UDP socket type / UDP套接字类型
893///
894/// Provides async UDP send and receive operations.
895/// 提供异步UDP发送和接收操作。
896///
897/// # Example / 示例
898///
899/// ```rust,no_run,ignore
900/// use hiver_runtime::io::UdpSocket;
901///
902/// async fn echo_server() -> std::io::Result<()> {
903///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
904///     println!("UDP server listening on 127.0.0.1:8080");
905///
906///     let mut buf = [0u8; 1024];
907///     loop {
908///         let (n, peer) = socket.recv_from(&mut buf).await?;
909///         socket.send_to(&buf[..n], &peer).await?;
910///     }
911/// }
912/// ```
913pub struct UdpSocket {
914    /// The raw file descriptor / 原始文件描述符
915    fd: std::os::fd::OwnedFd,
916}
917
918impl UdpSocket {
919    /// Bind a new UDP socket to the specified address
920    /// 将新的UDP套接字绑定到指定地址
921    ///
922    /// # Example / 示例
923    ///
924    /// ```rust,no_run,ignore
925    /// use hiver_runtime::io::UdpSocket;
926    ///
927    /// async fn bind_server() -> std::io::Result<()> {
928    ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
929    ///     Ok(())
930    /// }
931    /// ```
932    pub fn bind(addr: &str) -> BindUdpFuture {
933        let Ok(addr) = addr.parse::<SocketAddr>() else {
934            return BindUdpFuture::Error(io::Error::new(
935                io::ErrorKind::InvalidInput,
936                "Invalid address format, use IP:PORT",
937            ));
938        };
939
940        BindUdpFuture::Binding(BindingUdpState { addr })
941    }
942
943    /// Receive data from the socket
944    /// 从套接字接收数据
945    ///
946    /// Returns the number of bytes received and the peer address.
947    /// 返回接收的字节数和对端地址。
948    pub fn recv_from<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> RecvFromFuture<'a, 'b> {
949        RecvFromFuture {
950            stream: Some(self),
951            buf,
952        }
953    }
954
955    /// Send data to the specified address
956    /// 向指定地址发送数据
957    ///
958    /// Returns the number of bytes sent.
959    /// 返回发送的字节数。
960    pub fn send_to<'a, 'b>(&'a mut self, buf: &'b [u8], addr: SocketAddr) -> SendToFuture<'a, 'b> {
961        SendToFuture {
962            stream: Some(self),
963            buf,
964            addr,
965        }
966    }
967
968    /// Connect the socket to a remote address
969    /// 将套接字连接到远程地址
970    ///
971    /// This filters incoming datagrams to only receive from this address.
972    /// 这会过滤传入的数据报,只接收来自此地址的数据。
973    pub fn connect(&mut self, addr: SocketAddr) -> ConnectUdpFuture {
974        ConnectUdpFuture {
975            fd: self.fd.as_raw_fd(),
976            addr,
977            done: false,
978        }
979    }
980}
981
982impl AsRawFd for UdpSocket {
983    fn as_raw_fd(&self) -> RawFd {
984        self.fd.as_raw_fd()
985    }
986}
987
988/// Future for binding a UDP socket
989/// 绑定UDP套接字的future
990pub enum BindUdpFuture {
991    /// Error state / 错误状态
992    Error(io::Error),
993    /// Binding state / 绑定中状态
994    Binding(BindingUdpState),
995    /// Done state / 完成状态
996    Done,
997}
998
999struct BindingUdpState {
1000    addr: SocketAddr,
1001}
1002
1003impl Future for BindUdpFuture {
1004    type Output = io::Result<UdpSocket>;
1005
1006    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1007        match &mut *self {
1008            BindUdpFuture::Error(e) => {
1009                let e = std::mem::replace(e, io::Error::other(""));
1010                Poll::Ready(Err(e))
1011            },
1012            BindUdpFuture::Done => panic!("BindUdpFuture polled after completion"),
1013            BindUdpFuture::Binding(state) => {
1014                // Create and bind UDP socket
1015                // 创建并绑定UDP套接字
1016                let fd = create_udp_socket(state.addr.is_ipv4());
1017
1018                if fd < 0 {
1019                    return Poll::Ready(Err(io::Error::last_os_error()));
1020                }
1021
1022                // Bind
1023                // 绑定
1024                let result = do_bind_udp(fd, state.addr);
1025                if result < 0 {
1026                    let err = io::Error::last_os_error();
1027                    unsafe { libc::close(fd) };
1028                    return Poll::Ready(Err(err));
1029                }
1030
1031                let socket = UdpSocket {
1032                    // SAFETY: fd is valid and owned
1033                    fd: unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) },
1034                };
1035
1036                *self = BindUdpFuture::Done;
1037                Poll::Ready(Ok(socket))
1038            },
1039        }
1040    }
1041}
1042
1043/// Helper to create a UDP socket
1044/// 创建UDP套接字的辅助函数
1045#[cfg(unix)]
1046fn create_udp_socket(ipv4: bool) -> RawFd {
1047    unsafe {
1048        let domain = if ipv4 { libc::AF_INET } else { libc::AF_INET6 };
1049
1050        #[cfg(target_os = "linux")]
1051        let fd =
1052            libc::socket(domain, libc::SOCK_DGRAM | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, 0);
1053
1054        #[cfg(not(target_os = "linux"))]
1055        let fd = libc::socket(domain, libc::SOCK_DGRAM, 0);
1056
1057        if fd < 0 {
1058            return fd;
1059        }
1060
1061        #[cfg(not(target_os = "linux"))]
1062        {
1063            // Set close-on-exec for macOS/BSD
1064            if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
1065                libc::close(fd);
1066                return -1;
1067            }
1068
1069            // Set non-blocking
1070            let flags = libc::fcntl(fd, libc::F_GETFL);
1071            if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
1072                libc::close(fd);
1073                return -1;
1074            }
1075        }
1076
1077        fd
1078    }
1079}
1080
1081/// Helper to bind a UDP socket to an address
1082/// 将UDP套接字绑定到地址的辅助函数
1083#[cfg(unix)]
1084fn do_bind_udp(fd: RawFd, addr: SocketAddr) -> i32 {
1085    unsafe {
1086        if addr.is_ipv4() {
1087            if let SocketAddr::V4(v4) = addr {
1088                #[cfg(target_os = "linux")]
1089                let sockaddr = libc::sockaddr_in {
1090                    sin_family: libc::AF_INET as u16,
1091                    sin_port: v4.port().to_be(),
1092                    sin_addr: libc::in_addr {
1093                        s_addr: u32::from_ne_bytes(v4.ip().octets()),
1094                    },
1095                    sin_zero: [0; 8],
1096                };
1097
1098                #[cfg(not(target_os = "linux"))]
1099                let sockaddr = libc::sockaddr_in {
1100                    sin_len: size_of::<libc::sockaddr_in>() as u8,
1101                    sin_family: libc::AF_INET as u8,
1102                    sin_port: v4.port().to_be(),
1103                    sin_addr: libc::in_addr {
1104                        s_addr: u32::from_ne_bytes(v4.ip().octets()),
1105                    },
1106                    sin_zero: [0; 8],
1107                };
1108
1109                libc::bind(
1110                    fd,
1111                    &sockaddr as *const _ as *const libc::sockaddr,
1112                    size_of::<libc::sockaddr_in>() as libc::socklen_t,
1113                )
1114            } else {
1115                -1
1116            }
1117        } else {
1118            if let SocketAddr::V6(v6) = addr {
1119                #[cfg(target_os = "linux")]
1120                let sockaddr = libc::sockaddr_in6 {
1121                    sin6_family: libc::AF_INET6 as u16,
1122                    sin6_port: v6.port().to_be(),
1123                    sin6_flowinfo: v6.flowinfo(),
1124                    sin6_addr: libc::in6_addr {
1125                        s6_addr: v6.ip().octets(),
1126                    },
1127                    sin6_scope_id: v6.scope_id(),
1128                };
1129
1130                #[cfg(not(target_os = "linux"))]
1131                let sockaddr = libc::sockaddr_in6 {
1132                    sin6_len: size_of::<libc::sockaddr_in6>() as u8,
1133                    sin6_family: libc::AF_INET6 as u8,
1134                    sin6_port: v6.port().to_be(),
1135                    sin6_flowinfo: v6.flowinfo(),
1136                    sin6_addr: libc::in6_addr {
1137                        s6_addr: v6.ip().octets(),
1138                    },
1139                    sin6_scope_id: v6.scope_id(),
1140                };
1141
1142                libc::bind(
1143                    fd,
1144                    &sockaddr as *const _ as *const libc::sockaddr,
1145                    size_of::<libc::sockaddr_in6>() as libc::socklen_t,
1146                )
1147            } else {
1148                -1
1149            }
1150        }
1151    }
1152}
1153
1154/// Future for receiving from a UDP socket
1155/// 从UDP套接字接收的future
1156pub struct RecvFromFuture<'a, 'b> {
1157    stream: Option<&'a mut UdpSocket>,
1158    buf: &'b mut [u8],
1159}
1160
1161impl Future for RecvFromFuture<'_, '_> {
1162    type Output = io::Result<(usize, SocketAddr)>;
1163
1164    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1165        // Extract all needed values upfront to avoid borrow issues
1166        // 提前提取所有需要的值以避免借用问题
1167        let stream_fd;
1168        let buf_ptr;
1169        let buf_len;
1170
1171        {
1172            let stream = self.stream.as_mut().unwrap();
1173            stream_fd = stream.as_raw_fd();
1174            buf_ptr = self.buf.as_mut_ptr();
1175            buf_len = self.buf.len();
1176        }
1177
1178        #[cfg(unix)]
1179        {
1180            let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
1181            let mut addr_len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
1182
1183            let result = unsafe {
1184                libc::recvfrom(
1185                    stream_fd,
1186                    buf_ptr as *mut _,
1187                    buf_len,
1188                    0,
1189                    &mut addr as *mut _ as *mut libc::sockaddr,
1190                    &mut addr_len,
1191                )
1192            };
1193
1194            if result < 0 {
1195                let err = io::Error::last_os_error();
1196                if err.kind() == io::ErrorKind::WouldBlock {
1197                    return Poll::Pending;
1198                }
1199                return Poll::Ready(Err(err));
1200            }
1201
1202            let n = result as usize;
1203
1204            // Parse peer address (simplified)
1205            // 解析对端地址(简化版)
1206            let peer_addr = SocketAddr::V4(std::net::SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0));
1207
1208            Poll::Ready(Ok((n, peer_addr)))
1209        }
1210
1211        #[cfg(not(unix))]
1212        {
1213            let _ = (stream_fd, buf_ptr, buf_len);
1214            Poll::Ready(Err(io::Error::new(
1215                io::ErrorKind::Unsupported,
1216                "UDP recv_from not yet implemented on this platform",
1217            )))
1218        }
1219    }
1220}
1221
1222/// Future for sending to a UDP socket
1223/// 向UDP套接字发送的future
1224pub struct SendToFuture<'a, 'b> {
1225    stream: Option<&'a mut UdpSocket>,
1226    buf: &'b [u8],
1227    addr: SocketAddr,
1228}
1229
1230impl Future for SendToFuture<'_, '_> {
1231    type Output = io::Result<usize>;
1232
1233    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1234        let stream = self.stream.as_mut().unwrap();
1235        let stream_fd = stream.as_raw_fd();
1236
1237        #[cfg(unix)]
1238        {
1239            let result = match self.addr {
1240                SocketAddr::V4(v4) => {
1241                    let sockaddr = libc::sockaddr_in {
1242                        #[cfg(target_os = "macos")]
1243                        sin_len: size_of::<libc::sockaddr_in>() as u8,
1244                        sin_family: libc::AF_INET as _,
1245                        sin_port: v4.port().to_be(),
1246                        sin_addr: libc::in_addr {
1247                            s_addr: u32::from(*v4.ip()).to_be(),
1248                        },
1249                        sin_zero: [0; 8],
1250                    };
1251                    unsafe {
1252                        libc::sendto(
1253                            stream_fd,
1254                            self.buf.as_ptr() as *const _,
1255                            self.buf.len(),
1256                            0,
1257                            &sockaddr as *const _ as *const _,
1258                            size_of::<libc::sockaddr_in>() as libc::socklen_t,
1259                        )
1260                    }
1261                },
1262                SocketAddr::V6(v6) => {
1263                    let sockaddr = libc::sockaddr_in6 {
1264                        sin6_family: libc::AF_INET6 as _,
1265                        sin6_port: v6.port().to_be(),
1266                        sin6_flowinfo: v6.flowinfo().to_be(),
1267                        sin6_addr: libc::in6_addr {
1268                            s6_addr: v6.ip().octets(),
1269                        },
1270                        sin6_scope_id: v6.scope_id(),
1271                        #[cfg(target_os = "macos")]
1272                        sin6_len: size_of::<libc::sockaddr_in6>() as u8,
1273                    };
1274                    unsafe {
1275                        libc::sendto(
1276                            stream_fd,
1277                            self.buf.as_ptr() as *const _,
1278                            self.buf.len(),
1279                            0,
1280                            &sockaddr as *const _ as *const _,
1281                            size_of::<libc::sockaddr_in6>() as libc::socklen_t,
1282                        )
1283                    }
1284                },
1285            };
1286
1287            if result < 0 {
1288                let err = io::Error::last_os_error();
1289                if err.kind() == io::ErrorKind::WouldBlock {
1290                    return Poll::Pending;
1291                }
1292                return Poll::Ready(Err(err));
1293            }
1294
1295            let n = result as usize;
1296            Poll::Ready(Ok(n))
1297        }
1298
1299        #[cfg(not(unix))]
1300        {
1301            let _ = stream_fd;
1302            Poll::Ready(Err(io::Error::new(
1303                io::ErrorKind::Unsupported,
1304                "UDP send_to not yet implemented on this platform",
1305            )))
1306        }
1307    }
1308}
1309
1310/// Future for connecting a UDP socket
1311/// 连接UDP套接字的future
1312pub struct ConnectUdpFuture {
1313    fd: RawFd,
1314    addr: SocketAddr,
1315    done: bool,
1316}
1317
1318impl Future for ConnectUdpFuture {
1319    type Output = io::Result<()>;
1320
1321    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1322        assert!(!self.done, "ConnectUdpFuture polled after completion");
1323
1324        // Perform the connect operation
1325        // 执行connect操作
1326        #[cfg(unix)]
1327        {
1328            let result = unsafe {
1329                match self.addr {
1330                    SocketAddr::V4(v4) => {
1331                        #[cfg(target_os = "linux")]
1332                        let sockaddr = libc::sockaddr_in {
1333                            sin_family: libc::AF_INET as u16,
1334                            sin_port: v4.port().to_be(),
1335                            sin_addr: libc::in_addr {
1336                                s_addr: u32::from_ne_bytes(v4.ip().octets()),
1337                            },
1338                            sin_zero: [0; 8],
1339                        };
1340
1341                        #[cfg(not(target_os = "linux"))]
1342                        let sockaddr = libc::sockaddr_in {
1343                            sin_len: size_of::<libc::sockaddr_in>() as u8,
1344                            sin_family: libc::AF_INET as u8,
1345                            sin_port: v4.port().to_be(),
1346                            sin_addr: libc::in_addr {
1347                                s_addr: u32::from_ne_bytes(v4.ip().octets()),
1348                            },
1349                            sin_zero: [0; 8],
1350                        };
1351
1352                        libc::connect(
1353                            self.fd,
1354                            &sockaddr as *const _ as *const libc::sockaddr,
1355                            size_of::<libc::sockaddr_in>() as libc::socklen_t,
1356                        )
1357                    },
1358                    SocketAddr::V6(v6) => {
1359                        #[cfg(target_os = "linux")]
1360                        let sockaddr = libc::sockaddr_in6 {
1361                            sin6_family: libc::AF_INET6 as u16,
1362                            sin6_port: v6.port().to_be(),
1363                            sin6_flowinfo: v6.flowinfo(),
1364                            sin6_addr: libc::in6_addr {
1365                                s6_addr: v6.ip().octets(),
1366                            },
1367                            sin6_scope_id: v6.scope_id(),
1368                        };
1369
1370                        #[cfg(not(target_os = "linux"))]
1371                        let sockaddr = libc::sockaddr_in6 {
1372                            sin6_len: size_of::<libc::sockaddr_in6>() as u8,
1373                            sin6_family: libc::AF_INET6 as u8,
1374                            sin6_port: v6.port().to_be(),
1375                            sin6_flowinfo: v6.flowinfo(),
1376                            sin6_addr: libc::in6_addr {
1377                                s6_addr: v6.ip().octets(),
1378                            },
1379                            sin6_scope_id: v6.scope_id(),
1380                        };
1381
1382                        libc::connect(
1383                            self.fd,
1384                            &sockaddr as *const _ as *const libc::sockaddr,
1385                            size_of::<libc::sockaddr_in6>() as libc::socklen_t,
1386                        )
1387                    },
1388                }
1389            };
1390
1391            if result < 0 {
1392                return Poll::Ready(Err(io::Error::last_os_error()));
1393            }
1394        }
1395
1396        #[cfg(not(unix))]
1397        {
1398            let _ = (self.fd, self.addr);
1399            return Poll::Ready(Err(io::Error::new(
1400                io::ErrorKind::Unsupported,
1401                "UDP connect not yet implemented on this platform",
1402            )));
1403        }
1404
1405        self.done = true;
1406        Poll::Ready(Ok(()))
1407    }
1408}
1409
1410#[cfg(test)]
1411mod tests {
1412    use super::*;
1413
1414    #[test]
1415    fn test_tcp_stream_create() {
1416        // Test that TcpStream can be created (will fail in practice without a valid fd)
1417        // 测试TcpStream可以被创建(实际上没有有效的fd会失败)
1418        let result = unsafe { TcpStream::from_raw_fd(-1) };
1419        assert!(result.is_err());
1420    }
1421
1422    #[test]
1423    fn test_tcp_listener_bind_invalid() {
1424        let future = TcpListener::bind("invalid_address");
1425        // Should create Error future
1426        // 应该创建Error future
1427        match future {
1428            BindFuture::Error(_) => {},
1429            _ => panic!("Expected Error future"),
1430        }
1431    }
1432
1433    #[test]
1434    fn test_connect_invalid_addr() {
1435        let future = TcpStream::connect("not_an_address");
1436        match future {
1437            ConnectFuture::Error(_) => {},
1438            _ => panic!("Expected Error future for invalid address"),
1439        }
1440    }
1441}