1use super::*;
2use crate::snapshots::{
3 common::{types as wasi_types, vfs},
4 env::Errno,
5};
6use socket2::{SockAddr, Socket};
7use std::{
8 ops::DerefMut,
9 os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
10 sync::atomic::{AtomicBool, AtomicI8, AtomicU8},
11};
12use tokio::io::{
13 unix::{AsyncFd, AsyncFdReadyGuard, TryIoError},
14 AsyncReadExt, AsyncWriteExt, Interest,
15};
16
17#[derive(Debug)]
18pub(crate) enum AsyncWasiSocketInner {
19 PreOpen(Socket),
20 AsyncFd(AsyncFd<Socket>),
21}
22
23impl AsyncWasiSocketInner {
24 fn register(&mut self) -> io::Result<()> {
25 unsafe {
26 let inner = match self {
27 AsyncWasiSocketInner::PreOpen(s) => {
28 let mut inner_socket = std::mem::zeroed();
29 std::mem::swap(s, &mut inner_socket);
30 inner_socket
31 }
32 AsyncWasiSocketInner::AsyncFd(_) => return Ok(()),
33 };
34 let mut new_self = Self::AsyncFd(AsyncFd::new(inner)?);
35 std::mem::swap(self, &mut new_self);
36 std::mem::forget(new_self);
37 Ok(())
38 }
39 }
40
41 fn bind(&mut self, addr: &SockAddr) -> io::Result<()> {
42 match self {
43 AsyncWasiSocketInner::PreOpen(s) => {
44 s.set_reuse_address(true)?;
45 s.bind(addr)
46 }
47 AsyncWasiSocketInner::AsyncFd(_) => Err(io::Error::from_raw_os_error(libc::EINVAL)),
48 }
49 }
50
51 fn bind_device(&mut self, interface: Option<&[u8]>) -> io::Result<()> {
52 match self {
53 AsyncWasiSocketInner::PreOpen(s) => s.bind_device(interface),
54 AsyncWasiSocketInner::AsyncFd(s) => s.get_ref().bind_device(interface),
55 }
56 }
57
58 fn device(&self) -> io::Result<Option<Vec<u8>>> {
59 match self {
60 AsyncWasiSocketInner::PreOpen(s) => s.device(),
61 AsyncWasiSocketInner::AsyncFd(s) => s.get_ref().device(),
62 }
63 }
64
65 fn listen(&mut self, backlog: i32) -> io::Result<()> {
66 match self {
67 AsyncWasiSocketInner::PreOpen(s) => {
68 s.listen(backlog)?;
69 }
70 AsyncWasiSocketInner::AsyncFd(_) => {
71 return Err(io::Error::from_raw_os_error(libc::EINVAL))
72 }
73 }
74 self.register()
75 }
76
77 async fn accept(&mut self) -> io::Result<(Socket, SockAddr)> {
78 match self {
79 AsyncWasiSocketInner::PreOpen(s) => Err(io::Error::from_raw_os_error(libc::EINVAL)),
80 AsyncWasiSocketInner::AsyncFd(s) => {
81 match tokio::time::timeout(
82 std::time::Duration::from_millis(100),
83 s.async_io(Interest::READABLE, |s| s.accept()),
84 )
85 .await
86 {
87 Ok(r) => r,
88 Err(_) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
89 }
90 }
91 }
92 }
93
94 fn connect(&mut self, addr: &SockAddr) -> io::Result<()> {
95 let r = match self {
96 AsyncWasiSocketInner::PreOpen(s) => s.connect(addr),
97 AsyncWasiSocketInner::AsyncFd(_) => {
98 return Err(io::Error::from_raw_os_error(libc::EINVAL))
99 }
100 };
101
102 if let Err(e) = r {
103 let errno = Errno::from(&e);
104 if errno != Errno::__WASI_ERRNO_INPROGRESS {
105 Err(e)
106 } else {
107 self.register()?;
108 Err(io::Error::from_raw_os_error(libc::EINPROGRESS))
109 }
110 } else {
111 self.register()?;
112 Ok(())
113 }
114 }
115
116 fn get_ref(&self) -> io::Result<&Socket> {
117 match self {
118 AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
119 AsyncWasiSocketInner::AsyncFd(s) => Ok(s.get_ref()),
120 }
121 }
122
123 fn get_async_socket(&self) -> io::Result<&AsyncFd<Socket>> {
124 match self {
125 AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
126 AsyncWasiSocketInner::AsyncFd(s) => Ok(s),
127 }
128 }
129
130 fn mut_async_socket(&mut self) -> io::Result<&mut AsyncFd<Socket>> {
131 match self {
132 AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
133 AsyncWasiSocketInner::AsyncFd(s) => Ok(s),
134 }
135 }
136
137 pub(crate) async fn readable(&self) -> io::Result<AsyncFdReadyGuard<Socket>> {
138 match self {
139 AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
140 AsyncWasiSocketInner::AsyncFd(s) => Ok(s.readable().await?),
141 }
142 }
143
144 pub(crate) async fn writable(&self) -> io::Result<AsyncFdReadyGuard<Socket>> {
145 match self {
146 AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
147 AsyncWasiSocketInner::AsyncFd(s) => Ok(s.writable().await?),
148 }
149 }
150}
151
152#[derive(Debug)]
153pub(crate) struct SocketWritable(pub(crate) AtomicI8);
154impl SocketWritable {
155 pub(crate) async fn writable(&self) {
156 let b = self.0.fetch_sub(1, std::sync::atomic::Ordering::Acquire);
157 tokio::time::timeout(Duration::from_secs(10), SocketWritableFuture(b)).await;
158 }
159
160 pub(crate) fn set_writable(&self) {
161 self.0.store(5, std::sync::atomic::Ordering::Release)
162 }
163}
164impl Default for SocketWritable {
165 fn default() -> Self {
166 Self(AtomicI8::new(5))
167 }
168}
169
170#[derive(Debug, Clone, Copy)]
171pub(crate) struct SocketWritableFuture(i8);
172
173impl Future for SocketWritableFuture {
174 type Output = ();
175
176 fn poll(
177 self: std::pin::Pin<&mut Self>,
178 cx: &mut std::task::Context<'_>,
179 ) -> std::task::Poll<Self::Output> {
180 log::trace!("SocketWritableFuture self.0={}", self.0);
181 if self.0 >= 0 {
182 std::task::Poll::Ready(())
183 } else {
184 std::task::Poll::Pending
185 }
186 }
187}
188
189#[derive(Debug)]
190pub struct AsyncWasiSocket {
191 pub(crate) inner: AsyncWasiSocketInner,
192 pub state: Box<WasiSocketState>,
193 pub(crate) writable: SocketWritable,
194}
195
196impl AsyncWasiSocket {
197 pub(crate) async fn readable(&self) -> std::io::Result<()> {
198 self.inner.readable().await.map(|x| ())
199 }
200
201 pub(crate) async fn writable(&self) -> std::io::Result<()> {
202 self.writable.writable().await;
203 self.inner.writable().await?;
204 Ok(())
205 }
206}
207
208#[inline]
209fn handle_timeout_result<T>(
210 result: Result<io::Result<T>, tokio::time::error::Elapsed>,
211) -> io::Result<T> {
212 if let Ok(r) = result {
213 r
214 } else {
215 Err(io::Error::from_raw_os_error(libc::EWOULDBLOCK))
216 }
217}
218
219impl AsyncWasiSocket {
220 pub fn fd_fdstat_get(&self) -> Result<FdStat, Errno> {
221 let mut filetype = match self.state.sock_type.1 {
222 SocketType::Datagram => FileType::SOCKET_DGRAM,
223 SocketType::Stream => FileType::SOCKET_STREAM,
224 };
225 let flags = if self.state.nonblocking {
226 FdFlags::NONBLOCK
227 } else {
228 FdFlags::empty()
229 };
230
231 Ok(FdStat {
232 filetype,
233 fs_rights_base: self.state.fs_rights.clone(),
234 fs_rights_inheriting: WASIRights::empty(),
235 flags,
236 })
237 }
238}
239
240impl AsyncWasiSocket {
241 pub fn from_tcplistener(
242 listener: std::net::TcpListener,
243 state: WasiSocketState,
244 ) -> io::Result<Self> {
245 let socket = Socket::from(listener);
246 socket.set_nonblocking(true)?;
247 Ok(Self {
248 inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(socket)?),
249 state: Box::new(state),
250 writable: Default::default(),
251 })
252 }
253
254 pub fn from_udpsocket(socket: std::net::UdpSocket, state: WasiSocketState) -> io::Result<Self> {
255 let socket = Socket::from(socket);
256 socket.set_nonblocking(true)?;
257 Ok(Self {
258 inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(socket)?),
259 state: Box::new(state),
260 writable: Default::default(),
261 })
262 }
263}
264
265impl AsyncWasiSocket {
266 pub fn open(mut state: WasiSocketState) -> io::Result<Self> {
267 use socket2::{Domain, Protocol, Type};
268 match state.sock_type.1 {
269 SocketType::Stream => {
270 state.fs_rights = WASIRights::SOCK_BIND
271 | WASIRights::SOCK_CLOSE
272 | WASIRights::SOCK_RECV
273 | WASIRights::SOCK_SEND
274 | WASIRights::SOCK_SHUTDOWN
275 | WASIRights::POLL_FD_READWRITE;
276 }
277 SocketType::Datagram => {
278 state.fs_rights = WASIRights::SOCK_BIND
279 | WASIRights::SOCK_CLOSE
280 | WASIRights::SOCK_RECV_FROM
281 | WASIRights::SOCK_SEND_TO
282 | WASIRights::SOCK_SHUTDOWN
283 | WASIRights::POLL_FD_READWRITE;
284 }
285 }
286 let inner = match state.sock_type {
287 (AddressFamily::Inet4, SocketType::Datagram) => {
288 Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?
289 }
290 (AddressFamily::Inet4, SocketType::Stream) => {
291 Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?
292 }
293 (AddressFamily::Inet6, SocketType::Datagram) => {
294 Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?
295 }
296 (AddressFamily::Inet6, SocketType::Stream) => {
297 Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))?
298 }
299 };
300 inner.set_nonblocking(true)?;
301 if !state.bind_device.is_empty() {
302 inner.bind_device(Some(&state.bind_device))?;
303 }
304 Ok(AsyncWasiSocket {
305 inner: AsyncWasiSocketInner::PreOpen(inner),
306 state: Box::new(state),
307 writable: Default::default(),
308 })
309 }
310
311 pub fn bind(&mut self, addr: net::SocketAddr) -> io::Result<()> {
312 use socket2::SockAddr;
313 let sock_addr = SockAddr::from(addr);
314 self.inner.bind(&sock_addr)?;
315 if let SocketType::Datagram = self.state.sock_type.1 {
316 self.inner.register()?;
317 }
318 self.state.local_addr = Some(addr);
319 Ok(())
320 }
321
322 pub fn device(&self) -> io::Result<Option<Vec<u8>>> {
323 if self.state.bind_device.is_empty() {
324 self.inner.device()
325 } else {
326 Ok(Some(self.state.bind_device.clone()))
327 }
328 }
329
330 pub fn bind_device(&mut self, interface: Option<&[u8]>) -> io::Result<()> {
331 self.inner.bind_device(interface)?;
332 self.state.bind_device = match interface {
333 Some(interface) => interface.to_vec(),
334 None => vec![],
335 };
336 Ok(())
337 }
338
339 pub fn listen(&mut self, backlog: u32) -> io::Result<()> {
340 self.inner.listen(backlog as i32)?;
341 self.state.backlog = backlog;
342 self.state.so_conn_state = ConnectState::Listening;
343 Ok(())
344 }
345
346 pub async fn accept(&mut self) -> io::Result<Self> {
347 let mut new_state = WasiSocketState {
348 nonblocking: self.state.nonblocking,
349 so_conn_state: ConnectState::Connected,
350 ..Default::default()
351 };
352
353 log::trace!("accept nonblocking={}", self.state.nonblocking);
354
355 let (cs, _) = if self.state.nonblocking {
356 let s = self
357 .inner
358 .get_async_socket()?
359 .async_io(Interest::READABLE, |s| s.accept());
360 tokio::time::timeout(std::time::Duration::from_millis(50), s)
361 .await
362 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))?
363 } else {
364 self.inner
365 .get_async_socket()?
366 .async_io(Interest::READABLE, |s| s.accept())
367 .await
368 }?;
369
370 cs.set_nonblocking(true)?;
371 new_state.peer_addr = cs.peer_addr().ok().and_then(|addr| addr.as_socket());
372 new_state.local_addr = cs.local_addr().ok().and_then(|addr| addr.as_socket());
373
374 Ok(AsyncWasiSocket {
375 inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(cs)?),
376 state: Box::new(new_state),
377 writable: Default::default(),
378 })
379 }
380
381 pub async fn connect(&mut self, addr: net::SocketAddr) -> io::Result<()> {
382 let address = SockAddr::from(addr);
383 self.state.so_conn_state = ConnectState::Connected;
384 self.state.peer_addr = Some(addr);
385
386 match (self.state.nonblocking, self.state.so_send_timeout) {
387 (true, None) => {
388 let r = self.inner.connect(&address);
389 if r.is_err() {
390 self.state.so_conn_state = ConnectState::Connecting;
391 }
392 r?;
393 Ok(())
394 }
395 (false, None) => {
396 if let Err(e) = self.inner.connect(&address) {
397 match e.raw_os_error() {
398 Some(libc::EINPROGRESS) => {}
399 _ => return Err(e),
400 }
401 let s = self.inner.writable().await?;
402 let e = s.get_inner().take_error()?;
403 if let Some(e) = e {
404 return Err(e);
405 }
406 }
407 Ok(())
408 }
409 (_, Some(timeout)) => {
410 if let Err(e) = self.inner.connect(&address) {
411 match e.raw_os_error() {
412 Some(libc::EINPROGRESS) => {}
413 _ => return Err(e),
414 }
415 match tokio::time::timeout(timeout, self.inner.writable()).await {
416 Ok(r) => {
417 let s = r?;
418 let e = s.get_inner().take_error()?;
419 if let Some(e) = e {
420 return Err(e);
421 }
422 Ok(())
423 }
424 Err(e) => Err(io::Error::from_raw_os_error(libc::EWOULDBLOCK)),
425 }
426 } else {
427 Ok(())
428 }
429 }
430 }
431 }
432
433 pub async fn recv<'a>(
434 &self,
435 bufs: &mut [io::IoSliceMut<'a>],
436 flags: libc::c_int,
437 ) -> io::Result<(usize, bool)> {
438 use socket2::MaybeUninitSlice;
439
440 let (n, f) = match (self.state.nonblocking, self.state.so_recv_timeout) {
441 (true, None) => {
442 let f = self
443 .inner
444 .get_async_socket()?
445 .async_io(Interest::READABLE, |s| {
446 let bufs = unsafe {
447 &mut *(bufs as *mut [io::IoSliceMut<'_>]
448 as *mut [MaybeUninitSlice<'_>])
449 };
450 s.recv_vectored_with_flags(bufs, flags)
451 });
452
453 tokio::time::timeout(std::time::Duration::from_millis(50), f)
454 .await
455 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
456 }
457 (false, None) => {
458 self.inner
459 .get_async_socket()?
460 .async_io(Interest::READABLE, |s| {
461 let bufs = unsafe {
462 &mut *(bufs as *mut [io::IoSliceMut<'_>]
463 as *mut [MaybeUninitSlice<'_>])
464 };
465 s.recv_vectored_with_flags(bufs, flags)
466 })
467 .await?
468 }
469 (_, Some(timeout)) => {
470 let f = self
471 .inner
472 .get_async_socket()?
473 .async_io(Interest::READABLE, |s| {
474 let bufs = unsafe {
475 &mut *(bufs as *mut [io::IoSliceMut<'_>]
476 as *mut [MaybeUninitSlice<'_>])
477 };
478 s.recv_vectored_with_flags(bufs, flags)
479 });
480
481 tokio::time::timeout(timeout, f)
482 .await
483 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
484 }
485 };
486
487 Ok((n, f.is_truncated()))
488 }
489
490 pub async fn recv_from<'a>(
491 &self,
492 bufs: &mut [io::IoSliceMut<'a>],
493 flags: libc::c_int,
494 ) -> io::Result<(usize, bool, Option<net::SocketAddr>)> {
495 use socket2::MaybeUninitSlice;
496
497 let (n, f, addr) = match (self.state.nonblocking, self.state.so_recv_timeout) {
498 (true, None) => {
499 let f = self
500 .inner
501 .get_async_socket()?
502 .async_io(Interest::READABLE, |s| {
503 let bufs = unsafe {
504 &mut *(bufs as *mut [io::IoSliceMut<'_>]
505 as *mut [MaybeUninitSlice<'_>])
506 };
507 s.recv_from_vectored_with_flags(bufs, flags)
508 });
509
510 tokio::time::timeout(std::time::Duration::from_millis(50), f)
511 .await
512 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
513 }
514 (false, None) => {
515 let f = self
516 .inner
517 .get_async_socket()?
518 .async_io(Interest::READABLE, |s| {
519 let bufs = unsafe {
520 &mut *(bufs as *mut [io::IoSliceMut<'_>]
521 as *mut [MaybeUninitSlice<'_>])
522 };
523 s.recv_from_vectored_with_flags(bufs, flags)
524 });
525
526 f.await?
527 }
528 (_, Some(timeout)) => {
529 let f = self
530 .inner
531 .get_async_socket()?
532 .async_io(Interest::READABLE, |s| {
533 let bufs = unsafe {
534 &mut *(bufs as *mut [io::IoSliceMut<'_>]
535 as *mut [MaybeUninitSlice<'_>])
536 };
537 s.recv_from_vectored_with_flags(bufs, flags)
538 });
539
540 tokio::time::timeout(timeout, f)
541 .await
542 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
543 }
544 };
545 Ok((n, f.is_truncated(), addr.as_socket()))
546 }
547
548 pub async fn send<'a>(
549 &self,
550 bufs: &[io::IoSlice<'a>],
551 flags: libc::c_int,
552 ) -> io::Result<usize> {
553 let n = match (self.state.nonblocking, self.state.so_send_timeout) {
554 (true, None) => {
555 let f = self
556 .inner
557 .get_async_socket()?
558 .async_io(Interest::WRITABLE, |s| {
559 s.send_vectored_with_flags(bufs, flags)
560 });
561
562 tokio::time::timeout(std::time::Duration::from_millis(50), f)
563 .await
564 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
565 }
566 (false, None) => {
567 let f = self
568 .inner
569 .get_async_socket()?
570 .async_io(Interest::WRITABLE, |s| {
571 s.send_vectored_with_flags(bufs, flags)
572 });
573
574 f.await?
575 }
576 (_, Some(timeout)) => {
577 let f = self
578 .inner
579 .get_async_socket()?
580 .async_io(Interest::WRITABLE, |s| {
581 s.send_vectored_with_flags(bufs, flags)
582 });
583
584 tokio::time::timeout(timeout, f)
585 .await
586 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
587 }
588 };
589
590 Ok(n)
591 }
592
593 pub async fn send_to<'a>(
594 &self,
595 bufs: &[io::IoSlice<'a>],
596 addr: net::SocketAddr,
597 flags: libc::c_int,
598 ) -> io::Result<usize> {
599 use socket2::{MaybeUninitSlice, SockAddr};
600 let address = SockAddr::from(addr);
601
602 let n = match (self.state.nonblocking, self.state.so_send_timeout) {
603 (true, None) => {
604 let f = self
605 .inner
606 .get_async_socket()?
607 .async_io(Interest::WRITABLE, |s| {
608 s.send_to_vectored_with_flags(bufs, &address, flags)
609 });
610
611 tokio::time::timeout(std::time::Duration::from_millis(50), f)
612 .await
613 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
614 }
615 (false, None) => {
616 let f = self
617 .inner
618 .get_async_socket()?
619 .async_io(Interest::WRITABLE, |s| {
620 s.send_to_vectored_with_flags(bufs, &address, flags)
621 });
622
623 f.await?
624 }
625 (_, Some(timeout)) => {
626 let f = self
627 .inner
628 .get_async_socket()?
629 .async_io(Interest::WRITABLE, |s| {
630 s.send_to_vectored_with_flags(bufs, &address, flags)
631 });
632
633 tokio::time::timeout(timeout, f)
634 .await
635 .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
636 }
637 };
638
639 Ok(n)
640 }
641
642 pub fn shutdown(&mut self, how: net::Shutdown) -> io::Result<()> {
643 self.inner.get_ref()?.shutdown(how)?;
644 self.state.shutdown.insert(how);
645 Ok(())
646 }
647
648 pub fn get_peer(&mut self) -> io::Result<net::SocketAddr> {
649 if let Some(addr) = self.state.peer_addr {
650 Ok(addr)
651 } else {
652 let addr = self.inner.get_ref()?.peer_addr()?.as_socket().unwrap();
653 self.state.peer_addr = Some(addr);
654 Ok(addr)
655 }
656 }
657
658 pub fn get_local(&mut self) -> io::Result<net::SocketAddr> {
659 if let Some(addr) = self.state.local_addr {
660 Ok(addr)
661 } else {
662 let addr = self.inner.get_ref()?.local_addr()?.as_socket().unwrap();
663 self.state.local_addr = Some(addr);
664 Ok(addr)
665 }
666 }
667
668 pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
669 self.state.nonblocking = nonblocking;
670 Ok(())
671 }
672
673 pub fn get_nonblocking(&self) -> bool {
674 self.state.nonblocking
675 }
676
677 pub fn get_so_type(&self) -> (AddressFamily, SocketType) {
678 self.state.sock_type
679 }
680
681 pub fn get_so_accept_conn(&self) -> io::Result<bool> {
682 self.inner.get_ref()?.is_listener()
683 }
684
685 pub fn sync_conn_state(&mut self) {
686 if self.state.so_conn_state == ConnectState::Connecting {
687 self.state.so_conn_state = ConnectState::Connected;
688 }
689 }
690
691 pub fn set_so_reuseaddr(&mut self, reuseaddr: bool) -> io::Result<()> {
692 self.state.so_reuseaddr = reuseaddr;
693 Ok(())
694 }
695
696 pub fn get_so_reuseaddr(&self) -> bool {
697 self.state.so_reuseaddr
698 }
699
700 pub fn set_so_recv_buf_size(&mut self, buf_size: usize) -> io::Result<()> {
701 self.state.so_recv_buf_size = buf_size;
702 Ok(())
703 }
704
705 pub fn get_so_recv_buf_size(&self) -> usize {
706 self.state.so_recv_buf_size
707 }
708
709 pub fn set_so_send_buf_size(&mut self, buf_size: usize) -> io::Result<()> {
710 self.state.so_send_buf_size = buf_size;
711 Ok(())
712 }
713
714 pub fn get_so_send_buf_size(&mut self) -> usize {
715 self.state.so_send_buf_size
716 }
717
718 pub fn set_so_recv_timeout(&mut self, timeout: Option<Duration>) -> io::Result<()> {
719 self.state.so_recv_timeout = timeout;
720 self.state.nonblocking = true;
721 Ok(())
722 }
723
724 pub fn get_so_recv_timeout(&mut self) -> Option<Duration> {
725 self.state.so_recv_timeout
726 }
727
728 pub fn set_so_send_timeout(&mut self, timeout: Option<Duration>) -> io::Result<()> {
729 self.state.so_send_timeout = timeout;
730 self.state.nonblocking = true;
731 Ok(())
732 }
733
734 pub fn get_so_send_timeout(&mut self) -> Option<Duration> {
735 self.state.so_send_timeout
736 }
737
738 pub fn get_so_error(&mut self) -> io::Result<Option<io::Error>> {
739 self.inner.get_ref()?.take_error()
740 }
741}