tun_easytier/async/
unix_device.rs1use core::pin::Pin;
16use core::task::{Context, Poll};
17use futures_core::ready;
18use std::io::{IoSlice, Read, Write};
19use tokio::io::unix::AsyncFd;
20use tokio::io::Interest;
21use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22use tokio_util::codec::Framed;
23
24use super::TunPacketCodec;
25use crate::device::AbstractDevice;
26use crate::platform::Device;
27
28pub struct AsyncDevice {
30 inner: AsyncFd<Device>,
31}
32
33impl core::ops::Deref for AsyncDevice {
35 type Target = Device;
36
37 fn deref(&self) -> &Self::Target {
38 self.inner.get_ref()
39 }
40}
41
42impl core::ops::DerefMut for AsyncDevice {
44 fn deref_mut(&mut self) -> &mut Self::Target {
45 self.inner.get_mut()
46 }
47}
48
49impl AsyncDevice {
50 pub fn new(device: Device) -> std::io::Result<AsyncDevice> {
52 device.set_nonblock()?;
53 Ok(AsyncDevice {
54 inner: AsyncFd::new(device)?,
55 })
56 }
57
58 pub fn into_framed(self) -> Framed<Self, TunPacketCodec> {
60 let mtu = self.mtu().unwrap_or(crate::DEFAULT_MTU);
61 let codec = TunPacketCodec::new(mtu as usize);
62 Framed::with_capacity(self, codec, mtu as usize)
64 }
65
66 pub async fn recv(&self, buf: &mut [u8]) -> std::io::Result<usize> {
68 let guard = self.inner.readable().await?;
69 guard
70 .get_ref()
71 .async_io(Interest::READABLE, |inner| inner.recv(buf))
72 .await
73 }
74
75 pub async fn send(&self, buf: &[u8]) -> std::io::Result<usize> {
77 let guard = self.inner.writable().await?;
78 guard
79 .get_ref()
80 .async_io(Interest::WRITABLE, |inner| inner.send(buf))
81 .await
82 }
83}
84
85impl AsyncRead for AsyncDevice {
86 fn poll_read(
87 mut self: Pin<&mut Self>,
88 cx: &mut Context<'_>,
89 buf: &mut ReadBuf,
90 ) -> Poll<std::io::Result<()>> {
91 loop {
92 let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?;
93 let rbuf = buf.initialize_unfilled();
94 match guard.try_io(|inner| inner.get_mut().read(rbuf)) {
95 Ok(res) => return Poll::Ready(res.map(|n| buf.advance(n))),
96 Err(_wb) => continue,
97 }
98 }
99 }
100}
101
102impl AsyncWrite for AsyncDevice {
103 fn poll_write(
104 mut self: Pin<&mut Self>,
105 cx: &mut Context<'_>,
106 buf: &[u8],
107 ) -> Poll<std::io::Result<usize>> {
108 loop {
109 let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;
110 match guard.try_io(|inner| inner.get_mut().write(buf)) {
111 Ok(res) => return Poll::Ready(res),
112 Err(_wb) => continue,
113 }
114 }
115 }
116
117 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
118 loop {
119 let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;
120 match guard.try_io(|inner| inner.get_mut().flush()) {
121 Ok(res) => return Poll::Ready(res),
122 Err(_wb) => continue,
123 }
124 }
125 }
126
127 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
128 Poll::Ready(Ok(()))
129 }
130
131 fn poll_write_vectored(
132 mut self: Pin<&mut Self>,
133 cx: &mut Context<'_>,
134 bufs: &[IoSlice<'_>],
135 ) -> Poll<std::io::Result<usize>> {
136 loop {
137 let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;
138 match guard.try_io(|inner| inner.get_mut().write_vectored(bufs)) {
139 Ok(res) => return Poll::Ready(res),
140 Err(_wb) => continue,
141 }
142 }
143 }
144
145 fn is_write_vectored(&self) -> bool {
146 true
147 }
148}