async_wasi/snapshots/common/net/
async_tokio.rs

1use super::*;
2use crate::snapshots::{
3    common::{types as wasi_types, vfs},
4    env::Errno,
5};
6use socket2::{SockAddr, Socket};
7use std::{
8    ops::DerefMut,
9    os::unix::prelude::{AsRawFd, FromRawFd, RawFd},
10    sync::atomic::{AtomicBool, AtomicI8, AtomicU8},
11};
12use tokio::io::{
13    unix::{AsyncFd, AsyncFdReadyGuard, TryIoError},
14    AsyncReadExt, AsyncWriteExt, Interest,
15};
16
17#[derive(Debug)]
18pub(crate) enum AsyncWasiSocketInner {
19    PreOpen(Socket),
20    AsyncFd(AsyncFd<Socket>),
21}
22
23impl AsyncWasiSocketInner {
24    fn register(&mut self) -> io::Result<()> {
25        unsafe {
26            let inner = match self {
27                AsyncWasiSocketInner::PreOpen(s) => {
28                    let mut inner_socket = std::mem::zeroed();
29                    std::mem::swap(s, &mut inner_socket);
30                    inner_socket
31                }
32                AsyncWasiSocketInner::AsyncFd(_) => return Ok(()),
33            };
34            let mut new_self = Self::AsyncFd(AsyncFd::new(inner)?);
35            std::mem::swap(self, &mut new_self);
36            std::mem::forget(new_self);
37            Ok(())
38        }
39    }
40
41    fn bind(&mut self, addr: &SockAddr) -> io::Result<()> {
42        match self {
43            AsyncWasiSocketInner::PreOpen(s) => {
44                s.set_reuse_address(true)?;
45                s.bind(addr)
46            }
47            AsyncWasiSocketInner::AsyncFd(_) => Err(io::Error::from_raw_os_error(libc::EINVAL)),
48        }
49    }
50
51    fn bind_device(&mut self, interface: Option<&[u8]>) -> io::Result<()> {
52        match self {
53            AsyncWasiSocketInner::PreOpen(s) => s.bind_device(interface),
54            AsyncWasiSocketInner::AsyncFd(s) => s.get_ref().bind_device(interface),
55        }
56    }
57
58    fn device(&self) -> io::Result<Option<Vec<u8>>> {
59        match self {
60            AsyncWasiSocketInner::PreOpen(s) => s.device(),
61            AsyncWasiSocketInner::AsyncFd(s) => s.get_ref().device(),
62        }
63    }
64
65    fn listen(&mut self, backlog: i32) -> io::Result<()> {
66        match self {
67            AsyncWasiSocketInner::PreOpen(s) => {
68                s.listen(backlog)?;
69            }
70            AsyncWasiSocketInner::AsyncFd(_) => {
71                return Err(io::Error::from_raw_os_error(libc::EINVAL))
72            }
73        }
74        self.register()
75    }
76
77    async fn accept(&mut self) -> io::Result<(Socket, SockAddr)> {
78        match self {
79            AsyncWasiSocketInner::PreOpen(s) => Err(io::Error::from_raw_os_error(libc::EINVAL)),
80            AsyncWasiSocketInner::AsyncFd(s) => {
81                match tokio::time::timeout(
82                    std::time::Duration::from_millis(100),
83                    s.async_io(Interest::READABLE, |s| s.accept()),
84                )
85                .await
86                {
87                    Ok(r) => r,
88                    Err(_) => Err(io::Error::from(io::ErrorKind::WouldBlock)),
89                }
90            }
91        }
92    }
93
94    fn connect(&mut self, addr: &SockAddr) -> io::Result<()> {
95        let r = match self {
96            AsyncWasiSocketInner::PreOpen(s) => s.connect(addr),
97            AsyncWasiSocketInner::AsyncFd(_) => {
98                return Err(io::Error::from_raw_os_error(libc::EINVAL))
99            }
100        };
101
102        if let Err(e) = r {
103            let errno = Errno::from(&e);
104            if errno != Errno::__WASI_ERRNO_INPROGRESS {
105                Err(e)
106            } else {
107                self.register()?;
108                Err(io::Error::from_raw_os_error(libc::EINPROGRESS))
109            }
110        } else {
111            self.register()?;
112            Ok(())
113        }
114    }
115
116    fn get_ref(&self) -> io::Result<&Socket> {
117        match self {
118            AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
119            AsyncWasiSocketInner::AsyncFd(s) => Ok(s.get_ref()),
120        }
121    }
122
123    fn get_async_socket(&self) -> io::Result<&AsyncFd<Socket>> {
124        match self {
125            AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
126            AsyncWasiSocketInner::AsyncFd(s) => Ok(s),
127        }
128    }
129
130    fn mut_async_socket(&mut self) -> io::Result<&mut AsyncFd<Socket>> {
131        match self {
132            AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
133            AsyncWasiSocketInner::AsyncFd(s) => Ok(s),
134        }
135    }
136
137    pub(crate) async fn readable(&self) -> io::Result<AsyncFdReadyGuard<Socket>> {
138        match self {
139            AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
140            AsyncWasiSocketInner::AsyncFd(s) => Ok(s.readable().await?),
141        }
142    }
143
144    pub(crate) async fn writable(&self) -> io::Result<AsyncFdReadyGuard<Socket>> {
145        match self {
146            AsyncWasiSocketInner::PreOpen(_) => Err(io::Error::from_raw_os_error(libc::ENOTCONN)),
147            AsyncWasiSocketInner::AsyncFd(s) => Ok(s.writable().await?),
148        }
149    }
150}
151
152#[derive(Debug)]
153pub(crate) struct SocketWritable(pub(crate) AtomicI8);
154impl SocketWritable {
155    pub(crate) async fn writable(&self) {
156        let b = self.0.fetch_sub(1, std::sync::atomic::Ordering::Acquire);
157        tokio::time::timeout(Duration::from_secs(10), SocketWritableFuture(b)).await;
158    }
159
160    pub(crate) fn set_writable(&self) {
161        self.0.store(5, std::sync::atomic::Ordering::Release)
162    }
163}
164impl Default for SocketWritable {
165    fn default() -> Self {
166        Self(AtomicI8::new(5))
167    }
168}
169
170#[derive(Debug, Clone, Copy)]
171pub(crate) struct SocketWritableFuture(i8);
172
173impl Future for SocketWritableFuture {
174    type Output = ();
175
176    fn poll(
177        self: std::pin::Pin<&mut Self>,
178        cx: &mut std::task::Context<'_>,
179    ) -> std::task::Poll<Self::Output> {
180        log::trace!("SocketWritableFuture self.0={}", self.0);
181        if self.0 >= 0 {
182            std::task::Poll::Ready(())
183        } else {
184            std::task::Poll::Pending
185        }
186    }
187}
188
189#[derive(Debug)]
190pub struct AsyncWasiSocket {
191    pub(crate) inner: AsyncWasiSocketInner,
192    pub state: Box<WasiSocketState>,
193    pub(crate) writable: SocketWritable,
194}
195
196impl AsyncWasiSocket {
197    pub(crate) async fn readable(&self) -> std::io::Result<()> {
198        self.inner.readable().await.map(|x| ())
199    }
200
201    pub(crate) async fn writable(&self) -> std::io::Result<()> {
202        self.writable.writable().await;
203        self.inner.writable().await?;
204        Ok(())
205    }
206}
207
208#[inline]
209fn handle_timeout_result<T>(
210    result: Result<io::Result<T>, tokio::time::error::Elapsed>,
211) -> io::Result<T> {
212    if let Ok(r) = result {
213        r
214    } else {
215        Err(io::Error::from_raw_os_error(libc::EWOULDBLOCK))
216    }
217}
218
219impl AsyncWasiSocket {
220    pub fn fd_fdstat_get(&self) -> Result<FdStat, Errno> {
221        let mut filetype = match self.state.sock_type.1 {
222            SocketType::Datagram => FileType::SOCKET_DGRAM,
223            SocketType::Stream => FileType::SOCKET_STREAM,
224        };
225        let flags = if self.state.nonblocking {
226            FdFlags::NONBLOCK
227        } else {
228            FdFlags::empty()
229        };
230
231        Ok(FdStat {
232            filetype,
233            fs_rights_base: self.state.fs_rights.clone(),
234            fs_rights_inheriting: WASIRights::empty(),
235            flags,
236        })
237    }
238}
239
240impl AsyncWasiSocket {
241    pub fn from_tcplistener(
242        listener: std::net::TcpListener,
243        state: WasiSocketState,
244    ) -> io::Result<Self> {
245        let socket = Socket::from(listener);
246        socket.set_nonblocking(true)?;
247        Ok(Self {
248            inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(socket)?),
249            state: Box::new(state),
250            writable: Default::default(),
251        })
252    }
253
254    pub fn from_udpsocket(socket: std::net::UdpSocket, state: WasiSocketState) -> io::Result<Self> {
255        let socket = Socket::from(socket);
256        socket.set_nonblocking(true)?;
257        Ok(Self {
258            inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(socket)?),
259            state: Box::new(state),
260            writable: Default::default(),
261        })
262    }
263}
264
265impl AsyncWasiSocket {
266    pub fn open(mut state: WasiSocketState) -> io::Result<Self> {
267        use socket2::{Domain, Protocol, Type};
268        match state.sock_type.1 {
269            SocketType::Stream => {
270                state.fs_rights = WASIRights::SOCK_BIND
271                    | WASIRights::SOCK_CLOSE
272                    | WASIRights::SOCK_RECV
273                    | WASIRights::SOCK_SEND
274                    | WASIRights::SOCK_SHUTDOWN
275                    | WASIRights::POLL_FD_READWRITE;
276            }
277            SocketType::Datagram => {
278                state.fs_rights = WASIRights::SOCK_BIND
279                    | WASIRights::SOCK_CLOSE
280                    | WASIRights::SOCK_RECV_FROM
281                    | WASIRights::SOCK_SEND_TO
282                    | WASIRights::SOCK_SHUTDOWN
283                    | WASIRights::POLL_FD_READWRITE;
284            }
285        }
286        let inner = match state.sock_type {
287            (AddressFamily::Inet4, SocketType::Datagram) => {
288                Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?
289            }
290            (AddressFamily::Inet4, SocketType::Stream) => {
291                Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?
292            }
293            (AddressFamily::Inet6, SocketType::Datagram) => {
294                Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?
295            }
296            (AddressFamily::Inet6, SocketType::Stream) => {
297                Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))?
298            }
299        };
300        inner.set_nonblocking(true)?;
301        if !state.bind_device.is_empty() {
302            inner.bind_device(Some(&state.bind_device))?;
303        }
304        Ok(AsyncWasiSocket {
305            inner: AsyncWasiSocketInner::PreOpen(inner),
306            state: Box::new(state),
307            writable: Default::default(),
308        })
309    }
310
311    pub fn bind(&mut self, addr: net::SocketAddr) -> io::Result<()> {
312        use socket2::SockAddr;
313        let sock_addr = SockAddr::from(addr);
314        self.inner.bind(&sock_addr)?;
315        if let SocketType::Datagram = self.state.sock_type.1 {
316            self.inner.register()?;
317        }
318        self.state.local_addr = Some(addr);
319        Ok(())
320    }
321
322    pub fn device(&self) -> io::Result<Option<Vec<u8>>> {
323        if self.state.bind_device.is_empty() {
324            self.inner.device()
325        } else {
326            Ok(Some(self.state.bind_device.clone()))
327        }
328    }
329
330    pub fn bind_device(&mut self, interface: Option<&[u8]>) -> io::Result<()> {
331        self.inner.bind_device(interface)?;
332        self.state.bind_device = match interface {
333            Some(interface) => interface.to_vec(),
334            None => vec![],
335        };
336        Ok(())
337    }
338
339    pub fn listen(&mut self, backlog: u32) -> io::Result<()> {
340        self.inner.listen(backlog as i32)?;
341        self.state.backlog = backlog;
342        self.state.so_conn_state = ConnectState::Listening;
343        Ok(())
344    }
345
346    pub async fn accept(&mut self) -> io::Result<Self> {
347        let mut new_state = WasiSocketState {
348            nonblocking: self.state.nonblocking,
349            so_conn_state: ConnectState::Connected,
350            ..Default::default()
351        };
352
353        log::trace!("accept nonblocking={}", self.state.nonblocking);
354
355        let (cs, _) = if self.state.nonblocking {
356            let s = self
357                .inner
358                .get_async_socket()?
359                .async_io(Interest::READABLE, |s| s.accept());
360            tokio::time::timeout(std::time::Duration::from_millis(50), s)
361                .await
362                .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))?
363        } else {
364            self.inner
365                .get_async_socket()?
366                .async_io(Interest::READABLE, |s| s.accept())
367                .await
368        }?;
369
370        cs.set_nonblocking(true)?;
371        new_state.peer_addr = cs.peer_addr().ok().and_then(|addr| addr.as_socket());
372        new_state.local_addr = cs.local_addr().ok().and_then(|addr| addr.as_socket());
373
374        Ok(AsyncWasiSocket {
375            inner: AsyncWasiSocketInner::AsyncFd(AsyncFd::new(cs)?),
376            state: Box::new(new_state),
377            writable: Default::default(),
378        })
379    }
380
381    pub async fn connect(&mut self, addr: net::SocketAddr) -> io::Result<()> {
382        let address = SockAddr::from(addr);
383        self.state.so_conn_state = ConnectState::Connected;
384        self.state.peer_addr = Some(addr);
385
386        match (self.state.nonblocking, self.state.so_send_timeout) {
387            (true, None) => {
388                let r = self.inner.connect(&address);
389                if r.is_err() {
390                    self.state.so_conn_state = ConnectState::Connecting;
391                }
392                r?;
393                Ok(())
394            }
395            (false, None) => {
396                if let Err(e) = self.inner.connect(&address) {
397                    match e.raw_os_error() {
398                        Some(libc::EINPROGRESS) => {}
399                        _ => return Err(e),
400                    }
401                    let s = self.inner.writable().await?;
402                    let e = s.get_inner().take_error()?;
403                    if let Some(e) = e {
404                        return Err(e);
405                    }
406                }
407                Ok(())
408            }
409            (_, Some(timeout)) => {
410                if let Err(e) = self.inner.connect(&address) {
411                    match e.raw_os_error() {
412                        Some(libc::EINPROGRESS) => {}
413                        _ => return Err(e),
414                    }
415                    match tokio::time::timeout(timeout, self.inner.writable()).await {
416                        Ok(r) => {
417                            let s = r?;
418                            let e = s.get_inner().take_error()?;
419                            if let Some(e) = e {
420                                return Err(e);
421                            }
422                            Ok(())
423                        }
424                        Err(e) => Err(io::Error::from_raw_os_error(libc::EWOULDBLOCK)),
425                    }
426                } else {
427                    Ok(())
428                }
429            }
430        }
431    }
432
433    pub async fn recv<'a>(
434        &self,
435        bufs: &mut [io::IoSliceMut<'a>],
436        flags: libc::c_int,
437    ) -> io::Result<(usize, bool)> {
438        use socket2::MaybeUninitSlice;
439
440        let (n, f) = match (self.state.nonblocking, self.state.so_recv_timeout) {
441            (true, None) => {
442                let f = self
443                    .inner
444                    .get_async_socket()?
445                    .async_io(Interest::READABLE, |s| {
446                        let bufs = unsafe {
447                            &mut *(bufs as *mut [io::IoSliceMut<'_>]
448                                as *mut [MaybeUninitSlice<'_>])
449                        };
450                        s.recv_vectored_with_flags(bufs, flags)
451                    });
452
453                tokio::time::timeout(std::time::Duration::from_millis(50), f)
454                    .await
455                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
456            }
457            (false, None) => {
458                self.inner
459                    .get_async_socket()?
460                    .async_io(Interest::READABLE, |s| {
461                        let bufs = unsafe {
462                            &mut *(bufs as *mut [io::IoSliceMut<'_>]
463                                as *mut [MaybeUninitSlice<'_>])
464                        };
465                        s.recv_vectored_with_flags(bufs, flags)
466                    })
467                    .await?
468            }
469            (_, Some(timeout)) => {
470                let f = self
471                    .inner
472                    .get_async_socket()?
473                    .async_io(Interest::READABLE, |s| {
474                        let bufs = unsafe {
475                            &mut *(bufs as *mut [io::IoSliceMut<'_>]
476                                as *mut [MaybeUninitSlice<'_>])
477                        };
478                        s.recv_vectored_with_flags(bufs, flags)
479                    });
480
481                tokio::time::timeout(timeout, f)
482                    .await
483                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
484            }
485        };
486
487        Ok((n, f.is_truncated()))
488    }
489
490    pub async fn recv_from<'a>(
491        &self,
492        bufs: &mut [io::IoSliceMut<'a>],
493        flags: libc::c_int,
494    ) -> io::Result<(usize, bool, Option<net::SocketAddr>)> {
495        use socket2::MaybeUninitSlice;
496
497        let (n, f, addr) = match (self.state.nonblocking, self.state.so_recv_timeout) {
498            (true, None) => {
499                let f = self
500                    .inner
501                    .get_async_socket()?
502                    .async_io(Interest::READABLE, |s| {
503                        let bufs = unsafe {
504                            &mut *(bufs as *mut [io::IoSliceMut<'_>]
505                                as *mut [MaybeUninitSlice<'_>])
506                        };
507                        s.recv_from_vectored_with_flags(bufs, flags)
508                    });
509
510                tokio::time::timeout(std::time::Duration::from_millis(50), f)
511                    .await
512                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
513            }
514            (false, None) => {
515                let f = self
516                    .inner
517                    .get_async_socket()?
518                    .async_io(Interest::READABLE, |s| {
519                        let bufs = unsafe {
520                            &mut *(bufs as *mut [io::IoSliceMut<'_>]
521                                as *mut [MaybeUninitSlice<'_>])
522                        };
523                        s.recv_from_vectored_with_flags(bufs, flags)
524                    });
525
526                f.await?
527            }
528            (_, Some(timeout)) => {
529                let f = self
530                    .inner
531                    .get_async_socket()?
532                    .async_io(Interest::READABLE, |s| {
533                        let bufs = unsafe {
534                            &mut *(bufs as *mut [io::IoSliceMut<'_>]
535                                as *mut [MaybeUninitSlice<'_>])
536                        };
537                        s.recv_from_vectored_with_flags(bufs, flags)
538                    });
539
540                tokio::time::timeout(timeout, f)
541                    .await
542                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
543            }
544        };
545        Ok((n, f.is_truncated(), addr.as_socket()))
546    }
547
548    pub async fn send<'a>(
549        &self,
550        bufs: &[io::IoSlice<'a>],
551        flags: libc::c_int,
552    ) -> io::Result<usize> {
553        let n = match (self.state.nonblocking, self.state.so_send_timeout) {
554            (true, None) => {
555                let f = self
556                    .inner
557                    .get_async_socket()?
558                    .async_io(Interest::WRITABLE, |s| {
559                        s.send_vectored_with_flags(bufs, flags)
560                    });
561
562                tokio::time::timeout(std::time::Duration::from_millis(50), f)
563                    .await
564                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
565            }
566            (false, None) => {
567                let f = self
568                    .inner
569                    .get_async_socket()?
570                    .async_io(Interest::WRITABLE, |s| {
571                        s.send_vectored_with_flags(bufs, flags)
572                    });
573
574                f.await?
575            }
576            (_, Some(timeout)) => {
577                let f = self
578                    .inner
579                    .get_async_socket()?
580                    .async_io(Interest::WRITABLE, |s| {
581                        s.send_vectored_with_flags(bufs, flags)
582                    });
583
584                tokio::time::timeout(timeout, f)
585                    .await
586                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
587            }
588        };
589
590        Ok(n)
591    }
592
593    pub async fn send_to<'a>(
594        &self,
595        bufs: &[io::IoSlice<'a>],
596        addr: net::SocketAddr,
597        flags: libc::c_int,
598    ) -> io::Result<usize> {
599        use socket2::{MaybeUninitSlice, SockAddr};
600        let address = SockAddr::from(addr);
601
602        let n = match (self.state.nonblocking, self.state.so_send_timeout) {
603            (true, None) => {
604                let f = self
605                    .inner
606                    .get_async_socket()?
607                    .async_io(Interest::WRITABLE, |s| {
608                        s.send_to_vectored_with_flags(bufs, &address, flags)
609                    });
610
611                tokio::time::timeout(std::time::Duration::from_millis(50), f)
612                    .await
613                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
614            }
615            (false, None) => {
616                let f = self
617                    .inner
618                    .get_async_socket()?
619                    .async_io(Interest::WRITABLE, |s| {
620                        s.send_to_vectored_with_flags(bufs, &address, flags)
621                    });
622
623                f.await?
624            }
625            (_, Some(timeout)) => {
626                let f = self
627                    .inner
628                    .get_async_socket()?
629                    .async_io(Interest::WRITABLE, |s| {
630                        s.send_to_vectored_with_flags(bufs, &address, flags)
631                    });
632
633                tokio::time::timeout(timeout, f)
634                    .await
635                    .map_err(|_| io::Error::from(io::ErrorKind::WouldBlock))??
636            }
637        };
638
639        Ok(n)
640    }
641
642    pub fn shutdown(&mut self, how: net::Shutdown) -> io::Result<()> {
643        self.inner.get_ref()?.shutdown(how)?;
644        self.state.shutdown.insert(how);
645        Ok(())
646    }
647
648    pub fn get_peer(&mut self) -> io::Result<net::SocketAddr> {
649        if let Some(addr) = self.state.peer_addr {
650            Ok(addr)
651        } else {
652            let addr = self.inner.get_ref()?.peer_addr()?.as_socket().unwrap();
653            self.state.peer_addr = Some(addr);
654            Ok(addr)
655        }
656    }
657
658    pub fn get_local(&mut self) -> io::Result<net::SocketAddr> {
659        if let Some(addr) = self.state.local_addr {
660            Ok(addr)
661        } else {
662            let addr = self.inner.get_ref()?.local_addr()?.as_socket().unwrap();
663            self.state.local_addr = Some(addr);
664            Ok(addr)
665        }
666    }
667
668    pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
669        self.state.nonblocking = nonblocking;
670        Ok(())
671    }
672
673    pub fn get_nonblocking(&self) -> bool {
674        self.state.nonblocking
675    }
676
677    pub fn get_so_type(&self) -> (AddressFamily, SocketType) {
678        self.state.sock_type
679    }
680
681    pub fn get_so_accept_conn(&self) -> io::Result<bool> {
682        self.inner.get_ref()?.is_listener()
683    }
684
685    pub fn sync_conn_state(&mut self) {
686        if self.state.so_conn_state == ConnectState::Connecting {
687            self.state.so_conn_state = ConnectState::Connected;
688        }
689    }
690
691    pub fn set_so_reuseaddr(&mut self, reuseaddr: bool) -> io::Result<()> {
692        self.state.so_reuseaddr = reuseaddr;
693        Ok(())
694    }
695
696    pub fn get_so_reuseaddr(&self) -> bool {
697        self.state.so_reuseaddr
698    }
699
700    pub fn set_so_recv_buf_size(&mut self, buf_size: usize) -> io::Result<()> {
701        self.state.so_recv_buf_size = buf_size;
702        Ok(())
703    }
704
705    pub fn get_so_recv_buf_size(&self) -> usize {
706        self.state.so_recv_buf_size
707    }
708
709    pub fn set_so_send_buf_size(&mut self, buf_size: usize) -> io::Result<()> {
710        self.state.so_send_buf_size = buf_size;
711        Ok(())
712    }
713
714    pub fn get_so_send_buf_size(&mut self) -> usize {
715        self.state.so_send_buf_size
716    }
717
718    pub fn set_so_recv_timeout(&mut self, timeout: Option<Duration>) -> io::Result<()> {
719        self.state.so_recv_timeout = timeout;
720        self.state.nonblocking = true;
721        Ok(())
722    }
723
724    pub fn get_so_recv_timeout(&mut self) -> Option<Duration> {
725        self.state.so_recv_timeout
726    }
727
728    pub fn set_so_send_timeout(&mut self, timeout: Option<Duration>) -> io::Result<()> {
729        self.state.so_send_timeout = timeout;
730        self.state.nonblocking = true;
731        Ok(())
732    }
733
734    pub fn get_so_send_timeout(&mut self) -> Option<Duration> {
735        self.state.so_send_timeout
736    }
737
738    pub fn get_so_error(&mut self) -> io::Result<Option<io::Error>> {
739        self.inner.get_ref()?.take_error()
740    }
741}