1use core::time::Duration;
2
3use rusl::error::Errno;
4use rusl::network::get_inet_sock_name;
5use rusl::platform::{
6 AddressFamily, NonNegativeI32, PollEvents, SocketAddressInet, SocketAddressUnix, SocketFlags,
7 SocketOptions, SocketType,
8};
9use rusl::string::unix_str::UnixStr;
10
11use crate::error::Result;
12use crate::io::{Read, Write};
13use crate::sock::{
14 blocking_read_nonblock_sock, blocking_write_nonblock_sock, sock_nonblock_op_poll_if_not_ready,
15};
16use crate::unix::fd::{AsRawFd, OwnedFd, RawFd};
17
18#[cfg(test)]
19mod test;
20
21#[derive(Debug)]
22pub struct UnixStream(OwnedFd);
23
24impl UnixStream {
25 #[inline]
30 pub fn connect(path: &UnixStr) -> Result<Self> {
31 Self::do_connect(path, None)
32 }
33
34 fn do_connect(path: &UnixStr, timeout: Option<Duration>) -> Result<Self> {
35 let fd = rusl::network::socket(
36 AddressFamily::AF_UNIX,
37 SocketOptions::new(
38 SocketType::SOCK_STREAM,
39 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
40 ),
41 0,
42 )?;
43 let addr = SocketAddressUnix::try_from_unix(path)?;
44 if let Err(e) = sock_nonblock_op_poll_if_not_ready(
45 fd,
46 Errno::EAGAIN,
47 PollEvents::POLLOUT,
48 timeout,
49 |sock| rusl::network::connect_unix(sock, &addr),
50 ) {
51 let _ = rusl::unistd::close(fd);
52 return Err(e);
53 }
54 Ok(Self(OwnedFd(fd)))
55 }
56
57 pub fn try_connect(path: &UnixStr) -> Result<Option<Self>> {
62 let fd = rusl::network::socket(
63 AddressFamily::AF_UNIX,
64 SocketOptions::new(
65 SocketType::SOCK_STREAM,
66 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
67 ),
68 0,
69 )?;
70 let addr = SocketAddressUnix::try_from_unix(path)?;
71 match rusl::network::connect_unix(fd, &addr) {
72 Ok(()) => {}
73 Err(e) if e.code == Some(Errno::EAGAIN) => {
74 let _ = rusl::unistd::close(fd);
75 return Ok(None);
76 }
77 Err(e) => {
78 let _ = rusl::unistd::close(fd);
79 return Err(e.into());
80 }
81 }
82 Ok(Some(Self(OwnedFd(fd))))
83 }
84}
85
86impl AsRawFd for UnixStream {
87 fn as_raw_fd(&self) -> RawFd {
88 self.0 .0
89 }
90}
91
92impl Read for UnixStream {
93 #[inline]
94 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
95 blocking_read_nonblock_sock(self.0 .0, buf, None)
96 }
97}
98
99impl Write for UnixStream {
100 #[inline]
101 fn write(&mut self, buf: &[u8]) -> Result<usize> {
102 blocking_write_nonblock_sock(self.0 .0, buf, None)
103 }
104
105 #[inline]
106 fn flush(&mut self) -> Result<()> {
107 Ok(())
108 }
109}
110
111pub struct UnixListener(OwnedFd);
112
113impl UnixListener {
114 pub fn bind(path: &UnixStr) -> Result<Self> {
119 let fd = rusl::network::socket(
120 AddressFamily::AF_UNIX,
121 SocketOptions::new(
122 SocketType::SOCK_STREAM,
123 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
124 ),
125 0,
126 )?;
127 let addr = SocketAddressUnix::try_from_unix(path)?;
128 if let Err(e) = rusl::network::bind_unix(fd, &addr) {
129 let _ = rusl::unistd::close(fd);
130 return Err(e.into());
131 }
132 if let Err(e) = rusl::network::listen(fd, NonNegativeI32::MAX) {
133 let _ = rusl::unistd::close(fd);
134 return Err(e.into());
135 }
136 rusl::network::listen(fd, NonNegativeI32::MAX)?;
137 Ok(Self(OwnedFd(fd)))
138 }
139
140 #[inline]
144 pub fn accept(&mut self) -> Result<UnixStream> {
145 self.do_accept(None)
146 }
147
148 #[inline]
152 pub fn accept_with_timeout(&mut self, timeout: Duration) -> Result<UnixStream> {
153 self.do_accept(Some(timeout))
154 }
155
156 #[inline]
161 pub fn try_accept(&mut self) -> Result<Option<UnixStream>> {
162 let fd = match rusl::network::accept_unix(
163 self.0 .0,
164 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
165 ) {
166 Ok((fd, _addr)) => fd,
167 Err(e) if e.code == Some(Errno::EAGAIN) => return Ok(None),
168 Err(e) => return Err(e.into()),
169 };
170 Ok(Some(UnixStream(OwnedFd(fd))))
171 }
172
173 fn do_accept(&mut self, timeout: Option<Duration>) -> Result<UnixStream> {
174 let (fd, _addr) = sock_nonblock_op_poll_if_not_ready(
175 self.0 .0,
176 Errno::EAGAIN,
177 PollEvents::POLLIN,
178 timeout,
179 |sock| {
180 rusl::network::accept_unix(
181 sock,
182 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
183 )
184 },
185 )?;
186
187 Ok(UnixStream(OwnedFd(fd)))
188 }
189}
190
191#[derive(Debug, Clone, Copy)]
192pub struct SocketAddress {
193 ip: Ip,
194 port: u16,
195}
196
197impl SocketAddress {
198 #[must_use]
199 pub fn new(ip: Ip, port: u16) -> Self {
200 Self { ip, port }
201 }
202}
203
204#[non_exhaustive]
205#[derive(Debug, Clone, Copy)]
206pub enum Ip {
207 V4([u8; 4]),
209}
210
211#[derive(Debug)]
212pub struct TcpStream(OwnedFd);
213
214#[derive(Debug)]
215pub enum TcpTryConnect {
216 Connected(TcpStream),
217 InProgress(TcpStreamInProgress),
218}
219
220#[derive(Debug)]
221pub struct TcpStreamInProgress(OwnedFd, SocketAddressInet);
222
223impl TcpStreamInProgress {
224 pub fn try_connect(self) -> Result<TcpTryConnect> {
228 match rusl::network::connect_inet(self.0 .0, &self.1) {
229 Ok(()) => {}
230 Err(e) if matches!(e.code, Some(Errno::EINPROGRESS)) => {
231 return Ok(TcpTryConnect::InProgress(self));
232 }
233 Err(e) => {
234 return Err(e.into());
235 }
236 }
237 let Self(o, _s) = self;
238 Ok(TcpTryConnect::Connected(TcpStream(o)))
239 }
240
241 pub fn connect_blocking(self) -> Result<TcpStream> {
245 sock_nonblock_op_poll_if_not_ready(
246 self.0 .0,
247 Errno::EINPROGRESS,
248 PollEvents::POLLOUT,
249 None,
250 |sock| rusl::network::connect_inet(sock, &self.1),
251 )?;
252 let Self(o, _addr) = self;
253 Ok(TcpStream(o))
254 }
255}
256
257impl TcpStream {
258 pub fn connect(addr: &SocketAddress) -> Result<Self> {
263 Self::do_connect(addr, None)
264 }
265
266 pub fn connect_with_timeout(addr: &SocketAddress, timeout: Duration) -> Result<Self> {
271 Self::do_connect(addr, Some(timeout))
272 }
273
274 pub fn try_connect(addr: &SocketAddress) -> Result<TcpTryConnect> {
279 let fd = rusl::network::socket(
280 AddressFamily::AF_INET,
281 SocketOptions::new(
282 SocketType::SOCK_STREAM,
283 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
284 ),
285 6,
286 )?;
287 let addr = match addr.ip {
288 Ip::V4(bytes) => SocketAddressInet::new(bytes, addr.port),
289 };
290 match rusl::network::connect_inet(fd, &addr) {
291 Ok(()) => {}
292 Err(e) if matches!(e.code, Some(Errno::EINPROGRESS)) => {
293 return Ok(TcpTryConnect::InProgress(TcpStreamInProgress(
294 OwnedFd(fd),
295 addr,
296 )));
297 }
298 Err(e) => {
299 let _ = rusl::unistd::close(fd);
300 return Err(e.into());
301 }
302 }
303 Ok(TcpTryConnect::Connected(Self(OwnedFd(fd))))
304 }
305
306 #[expect(clippy::trivially_copy_pass_by_ref)]
307 fn do_connect(addr: &SocketAddress, timeout: Option<Duration>) -> Result<Self> {
308 let fd = rusl::network::socket(
309 AddressFamily::AF_INET,
310 SocketOptions::new(
311 SocketType::SOCK_STREAM,
312 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
313 ),
314 6,
315 )?;
316 let addr = match addr.ip {
317 Ip::V4(bytes) => SocketAddressInet::new(bytes, addr.port),
318 };
319 if let Err(e) = sock_nonblock_op_poll_if_not_ready(
320 fd,
321 Errno::EINPROGRESS,
322 PollEvents::POLLOUT,
323 timeout,
324 |sock| rusl::network::connect_inet(sock, &addr),
325 ) {
326 let _ = rusl::unistd::close(fd);
327 return Err(e);
328 }
329 Ok(Self(OwnedFd(fd)))
330 }
331
332 #[inline]
336 pub fn read_with_timeout(&mut self, buf: &mut [u8], timeout: Duration) -> Result<usize> {
337 blocking_read_nonblock_sock(self.0 .0, buf, Some(timeout))
338 }
339}
340
341impl AsRawFd for TcpStream {
342 #[inline]
343 fn as_raw_fd(&self) -> RawFd {
344 self.0 .0
345 }
346}
347
348impl Read for TcpStream {
349 #[inline]
350 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
351 blocking_read_nonblock_sock(self.0 .0, buf, None)
352 }
353}
354
355impl Write for TcpStream {
356 #[inline]
357 fn write(&mut self, buf: &[u8]) -> Result<usize> {
358 blocking_write_nonblock_sock(self.0 .0, buf, None)
359 }
360
361 #[inline]
362 fn flush(&mut self) -> Result<()> {
363 Ok(())
364 }
365}
366
367#[derive(Debug)]
368pub struct TcpListener(OwnedFd);
369
370impl TcpListener {
371 pub fn bind(addr: &SocketAddress) -> Result<Self> {
375 let fd = rusl::network::socket(
376 AddressFamily::AF_INET,
377 SocketOptions::new(
378 SocketType::SOCK_STREAM,
379 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
380 ),
381 6,
382 )?;
383 let addr = match addr.ip {
384 Ip::V4(bytes) => SocketAddressInet::new(bytes, addr.port),
385 };
386 if let Err(e) = rusl::network::bind_inet(fd, &addr) {
387 let _ = rusl::unistd::close(fd);
388 return Err(e.into());
389 }
390 rusl::network::listen(fd, NonNegativeI32::MAX)?;
391 Ok(Self(OwnedFd(fd)))
392 }
393 pub fn local_addr(&self) -> Result<SocketAddress> {
397 let name = get_inet_sock_name(self.0 .0)?;
398 let (ip, port) = name.ipv4_addr();
399 Ok(SocketAddress::new(Ip::V4(ip), port))
400 }
401
402 pub fn accept(&mut self) -> Result<TcpStream> {
406 self.do_accept(None)
407 }
408
409 pub fn accept_with_timeout(&mut self, timeout: Duration) -> Result<TcpStream> {
413 self.do_accept(Some(timeout))
414 }
415
416 pub fn try_accept(&mut self) -> Result<Option<TcpStream>> {
421 let fd = match rusl::network::accept_inet(
422 self.0 .0,
423 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
424 ) {
425 Ok((fd, _addr)) => fd,
426 Err(e) if e.code == Some(Errno::EAGAIN) => return Ok(None),
427 Err(e) => return Err(e.into()),
428 };
429 Ok(Some(TcpStream(OwnedFd(fd))))
430 }
431
432 fn do_accept(&self, timeout: Option<Duration>) -> Result<TcpStream> {
433 let (fd, _addr) = sock_nonblock_op_poll_if_not_ready(
434 self.0 .0,
435 Errno::EAGAIN,
436 PollEvents::POLLIN,
437 timeout,
438 |sock| {
439 rusl::network::accept_inet(
440 sock,
441 SocketFlags::SOCK_NONBLOCK | SocketFlags::SOCK_CLOEXEC,
442 )
443 },
444 )?;
445
446 Ok(TcpStream(OwnedFd(fd)))
447 }
448}