use std::ffi::c_char;
use std::ffi::c_int;
use std::ffi::c_void;
use super::UV_EAGAIN;
use super::UV_EALREADY;
use super::UV_EBADF;
use super::UV_EINVAL;
use super::UV_ENOTCONN;
use super::UV_EPIPE;
use super::UV_HANDLE_ACTIVE;
use super::UV_HANDLE_CLOSING;
use super::get_inner;
use super::tcp::WritePending;
use super::tcp::uv_tcp_t;
use super::tty::uv_tty_t;
use super::uv_handle_t;
use super::uv_handle_type;
use super::uv_loop_t;
#[repr(C)]
pub struct uv_stream_t {
pub r#type: uv_handle_type,
pub loop_: *mut uv_loop_t,
pub data: *mut c_void,
pub flags: u32,
}
#[repr(C)]
pub struct uv_write_t {
pub r#type: i32, pub data: *mut c_void,
pub handle: *mut uv_stream_t,
}
#[repr(C)]
pub struct uv_connect_t {
pub r#type: i32,
pub data: *mut c_void,
pub handle: *mut uv_stream_t,
}
#[repr(C)]
pub struct uv_shutdown_t {
pub r#type: i32,
pub data: *mut c_void,
pub handle: *mut uv_stream_t,
}
#[repr(C)]
pub struct uv_buf_t {
pub base: *mut c_char,
pub len: usize,
}
pub type uv_write_cb = unsafe extern "C" fn(*mut uv_write_t, i32);
pub type uv_alloc_cb =
unsafe extern "C" fn(*mut uv_handle_t, usize, *mut uv_buf_t);
pub type uv_read_cb =
unsafe extern "C" fn(*mut uv_stream_t, isize, *const uv_buf_t);
pub type uv_connection_cb = unsafe extern "C" fn(*mut uv_stream_t, i32);
pub type uv_connect_cb = unsafe extern "C" fn(*mut uv_connect_t, i32);
pub type uv_shutdown_cb = unsafe extern "C" fn(*mut uv_shutdown_t, i32);
pub type UvStream = uv_stream_t;
pub type UvWrite = uv_write_t;
pub type UvBuf = uv_buf_t;
pub type UvConnect = uv_connect_t;
pub type UvShutdown = uv_shutdown_t;
pub(crate) unsafe fn maybe_clear_tcp_active(tcp: *mut uv_tcp_t) {
unsafe {
if !(*tcp).internal_reading
&& (*tcp).internal_connection_cb.is_none()
&& (*tcp).internal_connect.is_none()
&& (*tcp).internal_write_queue.is_empty()
&& (*tcp).internal_shutdown.is_none()
{
(*tcp).flags &= !UV_HANDLE_ACTIVE;
}
}
}
pub unsafe fn uv_read_start(
stream: *mut uv_stream_t,
alloc_cb: Option<uv_alloc_cb>,
read_cb: Option<uv_read_cb>,
) -> c_int {
unsafe {
if (*stream).r#type == uv_handle_type::UV_TTY {
return super::tty::read_start_tty(
stream as *mut uv_tty_t,
alloc_cb,
read_cb,
);
}
if (*stream).r#type == uv_handle_type::UV_NAMED_PIPE {
return super::pipe::read_start_pipe(
stream as *mut super::pipe::uv_pipe_t,
alloc_cb,
read_cb,
);
}
if alloc_cb.is_none() || read_cb.is_none() {
return UV_EINVAL;
}
if (*stream).flags & UV_HANDLE_CLOSING != 0 {
return UV_EINVAL;
}
let tcp = stream as *mut uv_tcp_t;
let tcp_ref = &mut *tcp;
tcp_ref.internal_alloc_cb = alloc_cb;
tcp_ref.internal_read_cb = read_cb;
tcp_ref.internal_reading = true;
tcp_ref.flags |= UV_HANDLE_ACTIVE;
let inner = get_inner(tcp_ref.loop_);
let mut handles = inner.tcp_handles.borrow_mut();
if !handles.iter().any(|&h| std::ptr::eq(h, tcp)) {
handles.push(tcp);
}
}
0
}
pub unsafe fn uv_read_stop(stream: *mut uv_stream_t) -> c_int {
unsafe {
if (*stream).r#type == uv_handle_type::UV_TTY {
return super::tty::read_stop_tty(stream as *mut uv_tty_t);
}
if (*stream).r#type == uv_handle_type::UV_NAMED_PIPE {
return super::pipe::read_stop_pipe(
stream as *mut super::pipe::uv_pipe_t,
);
}
let tcp = stream as *mut uv_tcp_t;
let tcp_ref = &mut *tcp;
tcp_ref.internal_reading = false;
tcp_ref.internal_alloc_cb = None;
tcp_ref.internal_read_cb = None;
maybe_clear_tcp_active(tcp);
}
0
}
#[cfg(unix)]
pub unsafe fn uv_stream_set_blocking(
stream: *mut uv_stream_t,
blocking: c_int,
) -> c_int {
unsafe {
let fd = if (*stream).r#type == uv_handle_type::UV_TTY {
(*(stream as *mut uv_tty_t)).internal_fd
} else {
match (*(stream as *mut uv_tcp_t)).internal_fd {
Some(fd) => fd,
None => return super::UV_EBADF,
}
};
let flags = libc::fcntl(fd, libc::F_GETFL);
if flags == -1 {
return -(std::io::Error::last_os_error()
.raw_os_error()
.unwrap_or(libc::EINVAL));
}
let new_flags = if blocking != 0 {
flags & !libc::O_NONBLOCK
} else {
flags | libc::O_NONBLOCK
};
if new_flags != flags && libc::fcntl(fd, libc::F_SETFL, new_flags) == -1 {
return -(std::io::Error::last_os_error()
.raw_os_error()
.unwrap_or(libc::EINVAL));
}
0
}
}
#[cfg(windows)]
pub unsafe fn uv_stream_set_blocking(
_stream: *mut uv_stream_t,
_blocking: c_int,
) -> c_int {
0
}
pub unsafe fn uv_try_write(handle: *mut uv_stream_t, data: &[u8]) -> i32 {
unsafe {
if (*handle).r#type == uv_handle_type::UV_TTY {
return super::tty::try_write_tty(handle, data);
}
}
let tcp_ref = unsafe { &mut *(handle as *mut uv_tcp_t) };
if tcp_ref.internal_connect.is_some()
|| !tcp_ref.internal_write_queue.is_empty()
{
return UV_EAGAIN;
}
let stream = match tcp_ref.internal_stream.as_ref() {
Some(s) => s,
None => return UV_EBADF,
};
match stream.try_write(data) {
Ok(n) => n as i32,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => UV_EAGAIN,
Err(_) => UV_EPIPE,
}
}
pub unsafe fn uv_write(
req: *mut uv_write_t,
handle: *mut uv_stream_t,
bufs: *const uv_buf_t,
nbufs: u32,
cb: Option<uv_write_cb>,
) -> c_int {
unsafe {
if (*handle).r#type == uv_handle_type::UV_TTY {
return super::tty::write_tty(req, handle, bufs, nbufs, cb);
}
if (*handle).r#type == uv_handle_type::UV_NAMED_PIPE {
return write_pipe(
req,
handle as *mut super::pipe::uv_pipe_t,
bufs,
nbufs,
cb,
);
}
let tcp = handle as *mut uv_tcp_t;
(*req).handle = handle;
if (*tcp).internal_stream.is_none() {
return UV_EBADF;
}
let write_data = collect_bufs(bufs, nbufs);
let mut offset = 0;
if (*tcp).internal_write_queue.is_empty()
&& let Some(ref stream) = (*tcp).internal_stream
{
while offset < write_data.len() {
match stream.try_write(&write_data[offset..]) {
Ok(n) => offset += n,
Err(_) => break,
}
}
}
(*tcp).internal_write_queue.push_back(WritePending {
req,
data: write_data,
offset,
cb,
status: None,
});
0
}
}
unsafe fn collect_bufs(bufs: *const uv_buf_t, nbufs: u32) -> Vec<u8> {
unsafe {
let mut total = 0usize;
for i in 0..nbufs as usize {
let buf = &*bufs.add(i);
if !buf.base.is_null() {
total += buf.len;
}
}
let mut data = Vec::with_capacity(total);
for i in 0..nbufs as usize {
let buf = &*bufs.add(i);
if !buf.base.is_null() && buf.len > 0 {
data.extend_from_slice(std::slice::from_raw_parts(
buf.base as *const u8,
buf.len,
));
}
}
data
}
}
pub unsafe fn uv_shutdown(
req: *mut uv_shutdown_t,
stream: *mut uv_stream_t,
cb: Option<uv_shutdown_cb>,
) -> c_int {
unsafe {
if (*stream).r#type == uv_handle_type::UV_TTY {
return super::tty::shutdown_tty(req, stream, cb);
}
if (*stream).flags & UV_HANDLE_CLOSING != 0 {
return UV_ENOTCONN;
}
(*req).handle = stream;
if (*stream).r#type == uv_handle_type::UV_NAMED_PIPE {
let pipe = stream as *mut super::pipe::uv_pipe_t;
#[cfg(unix)]
if (*pipe).internal_stream.is_none() && (*pipe).internal_fd.is_none() {
return UV_ENOTCONN;
}
(*pipe).internal_shutdown = Some(super::tcp::ShutdownPending { req, cb });
let inner = get_inner((*pipe).loop_);
let mut handles = inner.pipe_handles.borrow_mut();
if !handles.iter().any(|&h| std::ptr::eq(h, pipe)) {
handles.push(pipe);
}
(*pipe).flags |= UV_HANDLE_ACTIVE;
return 0;
}
let tcp = stream as *mut uv_tcp_t;
if (*tcp).internal_stream.is_none() {
return UV_ENOTCONN;
}
if (*tcp).internal_shutdown.is_some() {
return UV_EALREADY;
}
(*tcp).internal_shutdown = Some(super::tcp::ShutdownPending { req, cb });
let inner = get_inner((*tcp).loop_);
let mut handles = inner.tcp_handles.borrow_mut();
if !handles.iter().any(|&h| std::ptr::eq(h, tcp)) {
handles.push(tcp);
}
(*tcp).flags |= UV_HANDLE_ACTIVE;
}
0
}
pub(crate) unsafe fn complete_shutdown(
tcp: *mut uv_tcp_t,
cx: &mut std::task::Context<'_>,
) {
use std::pin::Pin;
use tokio::io::AsyncWrite;
let pending = unsafe { (*tcp).internal_shutdown.take() };
let Some(pending) = pending else { return };
let status =
if let Some(ref mut stream) = unsafe { &mut *tcp }.internal_stream {
match Pin::new(stream).poll_shutdown(cx) {
std::task::Poll::Ready(Ok(())) => 0,
std::task::Poll::Ready(Err(_)) => UV_ENOTCONN,
std::task::Poll::Pending => {
unsafe { (*tcp).internal_shutdown = Some(pending) };
return;
}
}
} else {
UV_ENOTCONN
};
if let Some(cb) = pending.cb {
unsafe { cb(pending.req, status) };
}
unsafe {
maybe_clear_tcp_active(tcp);
}
}
pub fn new_write() -> UvWrite {
uv_write_t {
r#type: 0,
data: std::ptr::null_mut(),
handle: std::ptr::null_mut(),
}
}
pub fn new_connect() -> UvConnect {
uv_connect_t {
r#type: 0,
data: std::ptr::null_mut(),
handle: std::ptr::null_mut(),
}
}
pub fn new_shutdown() -> UvShutdown {
uv_shutdown_t {
r#type: 0,
data: std::ptr::null_mut(),
handle: std::ptr::null_mut(),
}
}
unsafe fn write_pipe(
req: *mut uv_write_t,
pipe: *mut super::pipe::uv_pipe_t,
bufs: *const uv_buf_t,
nbufs: u32,
cb: Option<uv_write_cb>,
) -> c_int {
use super::tcp::WritePending;
unsafe {
(*req).handle = pipe as *mut uv_stream_t;
let write_data = collect_bufs(bufs, nbufs);
let mut offset = 0;
#[cfg(unix)]
if (*pipe).internal_write_queue.is_empty() {
if let Some(ref stream) = (*pipe).internal_stream {
while offset < write_data.len() {
match stream.try_write(&write_data[offset..]) {
Ok(n) => {
offset += n;
}
Err(ref _e) => {
break;
}
}
}
} else if let Some(fd) = (*pipe).internal_fd {
while offset < write_data.len() {
let n = libc::write(
fd,
write_data[offset..].as_ptr() as *const std::ffi::c_void,
write_data.len() - offset,
);
if n >= 0 {
offset += n as usize;
} else {
break;
}
}
}
}
#[cfg(windows)]
if (*pipe).internal_write_queue.is_empty()
&& let Some(handle) = (*pipe).internal_handle
{
use std::io::Write;
use std::os::windows::io::FromRawHandle;
let mut file = std::fs::File::from_raw_handle(handle);
while offset < write_data.len() {
match file.write(&write_data[offset..]) {
Ok(n) => offset += n,
Err(_) => break,
}
}
let _ = std::os::windows::io::IntoRawHandle::into_raw_handle(file);
}
let status = if offset >= write_data.len() {
Some(0) } else {
None };
(*pipe).internal_write_queue.push_back(WritePending {
req,
data: write_data,
offset,
cb,
status,
});
#[cfg(unix)]
if (*pipe).internal_async_fd.is_none()
&& (*pipe).internal_stream.is_none()
&& (*pipe).internal_connect.is_none()
&& (*pipe).internal_fd.is_some()
{
let fd = (*pipe).internal_fd.unwrap();
if let Ok(afd) =
tokio::io::unix::AsyncFd::new(super::pipe::RawFdWrapper(fd))
{
(*pipe).internal_async_fd = Some(afd);
}
}
let inner = get_inner((*pipe).loop_);
if let Ok(mut handles) = inner.pipe_handles.try_borrow_mut()
&& !handles.iter().any(|&h| std::ptr::eq(h, pipe))
{
handles.push(pipe);
}
(*pipe).flags |= UV_HANDLE_ACTIVE;
}
0
}