1use std::task::Waker;
2use std::time::Duration;
3use std::{io, net::Shutdown};
4
5use std::net::SocketAddr;
6
7use crate::{
8 CmdResp, Description, FileMode, Handle, Interest, IntoRawDriver, OpenFlags, RawDriver,
9};
10
11pub trait RawDriverExt {
13 fn fd_user_define_open(&self, id: usize, buf: &[u8]) -> io::Result<Handle>;
14
15 fn fd_user_define_close(&self, id: usize, handle: Handle) -> io::Result<()>;
16 fn fd_user_define_clone(&self, handle: Handle) -> io::Result<Handle>;
17 fn file_open(&self, path: &str, mode: FileMode) -> io::Result<Handle>;
19
20 fn file_write(&self, waker: Waker, handle: Handle, buf: &[u8]) -> io::Result<usize>;
21
22 fn file_read(&self, waker: Waker, handle: Handle, buf: &mut [u8]) -> io::Result<usize>;
23
24 fn file_close(&self, handle: Handle) -> io::Result<()>;
26
27 fn timeout_open(&self, duration: Duration) -> io::Result<Handle>;
28
29 fn timeout(&self, waker: Waker, handle: Handle) -> io::Result<bool>;
30
31 fn timeout_close(&self, handle: Handle) -> io::Result<()>;
32
33 fn tcp_listener_bind(&self, laddrs: &[SocketAddr]) -> io::Result<Handle>;
35
36 fn tcp_listener_accept(&self, waker: Waker, handle: Handle)
38 -> io::Result<(Handle, SocketAddr)>;
39
40 fn tcp_listener_close(&self, handle: Handle) -> io::Result<()>;
42
43 fn tcp_stream_connect(&self, raddrs: &[SocketAddr]) -> io::Result<Handle>;
45
46 fn tcp_stream_write(&self, waker: Waker, handle: Handle, buf: &[u8]) -> io::Result<usize>;
48
49 fn tcp_stream_read(&self, waker: Waker, handle: Handle, buf: &mut [u8]) -> io::Result<usize>;
51
52 fn tcp_stream_close(&self, handle: Handle) -> io::Result<()>;
54
55 fn udp_socket_bind(&self, laddrs: &[SocketAddr]) -> io::Result<Handle>;
57
58 fn udp_socket_sendto(
60 &self,
61 waker: Waker,
62 handle: Handle,
63 buf: &[u8],
64 raddr: SocketAddr,
65 ) -> io::Result<usize>;
66
67 fn udp_socket_recv_from(
69 &self,
70 waker: Waker,
71 handle: Handle,
72 buf: &mut [u8],
73 ) -> io::Result<(usize, SocketAddr)>;
74
75 fn udp_socket_close(&self, handle: Handle) -> io::Result<()>;
77
78 fn poller_open(&self, duration: Option<Duration>) -> io::Result<Handle>;
80
81 fn poller_clone(&self, handle: Handle) -> io::Result<Handle>;
83
84 fn poller_register(
86 &self,
87 poller: Handle,
88 source: Handle,
89 interests: Interest,
90 ) -> io::Result<()>;
91
92 fn poller_reregister(
94 &self,
95 poller: Handle,
96 source: Handle,
97 interests: Interest,
98 ) -> io::Result<()>;
99
100 fn poller_deregister(&self, poller: Handle, source: Handle) -> io::Result<()>;
102
103 fn poller_poll_once(&self, handle: Handle, duration: Option<Duration>) -> io::Result<()>;
104
105 fn poller_close(&self, handle: Handle) -> io::Result<()>;
107
108 fn tcp_listener_local_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
109
110 fn tcp_stream_local_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
111
112 fn tcp_stream_remote_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
113
114 fn tcp_stream_shutdown(&self, handle: Handle, shutdown: Shutdown) -> io::Result<()>;
115
116 fn udp_local_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
117}
118
119#[derive(Clone)]
121pub struct RawDriverExtProxy<T: Clone> {
122 inner: T,
123}
124
125impl<T: RawDriverExt + Clone> RawDriverExtProxy<T> {
126 pub fn new(inner: T) -> Self {
127 Self { inner }
128 }
129}
130
131impl<T: RawDriverExt + Clone> RawDriver for RawDriverExtProxy<T> {
132 fn fd_open(
133 &self,
134 desc: crate::Description,
135 open_flags: crate::OpenFlags,
136 ) -> io::Result<Handle> {
137 match desc {
138 crate::Description::File => {
139 let (path, mode) = open_flags.try_into_open_file()?;
140
141 self.inner.file_open(path, mode)
142 }
143 crate::Description::TcpListener => {
144 let laddrs = open_flags.try_into_bind()?;
145
146 self.inner.tcp_listener_bind(laddrs)
147 }
148 crate::Description::TcpStream => {
149 let laddrs = open_flags.try_into_connect()?;
150
151 self.inner.tcp_stream_connect(laddrs)
152 }
153 crate::Description::UdpSocket => {
154 let laddrs = open_flags.try_into_bind()?;
155
156 self.inner.udp_socket_bind(laddrs)
157 }
158 crate::Description::Timeout => {
159 let duration = open_flags.try_into_duration()?;
160
161 self.inner.timeout_open(duration)
162 }
163 crate::Description::Poller => {
164 let duration = match open_flags {
165 OpenFlags::Duration(duration) => Some(duration),
166 _ => None,
167 };
168
169 self.inner.poller_open(duration)
170 }
171 crate::Description::External(id) => {
172 let buf = open_flags.try_into_user_defined()?;
173
174 self.inner.fd_user_define_open(id, buf)
175 }
176 }
177 }
178
179 fn fd_cntl(&self, handle: Handle, cmd: crate::Cmd) -> io::Result<crate::CmdResp> {
180 match cmd {
181 crate::Cmd::Read { waker, buf } => match handle.desc {
182 Description::File => self
183 .inner
184 .file_read(waker, handle, buf)
185 .map(|len| CmdResp::DataLen(len)),
186 Description::TcpStream => self
187 .inner
188 .tcp_stream_read(waker, handle, buf)
189 .map(|len| CmdResp::DataLen(len)),
190
191 _ => {
192 return Err(io::Error::new(
193 io::ErrorKind::InvalidInput,
194 format!("Expect File / TcpStream , but got {:?}", handle.desc),
195 ));
196 }
197 },
198 crate::Cmd::Write { waker, buf } => match handle.desc {
199 Description::File => self
200 .inner
201 .file_write(waker, handle, buf)
202 .map(|len| CmdResp::DataLen(len)),
203 Description::TcpStream => self
204 .inner
205 .tcp_stream_write(waker, handle, buf)
206 .map(|len| CmdResp::DataLen(len)),
207 _ => {
208 return Err(io::Error::new(
209 io::ErrorKind::InvalidInput,
210 format!("Expect File / TcpStream , but got {:?}", handle.desc),
211 ));
212 }
213 },
214 crate::Cmd::SendTo { waker, buf, raddr } => {
215 handle.expect(Description::UdpSocket)?;
216
217 self.inner
218 .udp_socket_sendto(waker, handle, buf, raddr)
219 .map(|len| CmdResp::DataLen(len))
220 }
221 crate::Cmd::RecvFrom { waker, buf } => {
222 handle.expect(Description::UdpSocket)?;
223
224 self.inner
225 .udp_socket_recv_from(waker, handle, buf)
226 .map(|(len, raddr)| CmdResp::RecvFrom(len, raddr))
227 }
228 crate::Cmd::Register { source, interests } => {
229 handle.expect(Description::Poller)?;
230
231 self.inner
232 .poller_register(handle, source, interests)
233 .map(|_| CmdResp::None)
234 }
235 crate::Cmd::ReRegister { source, interests } => {
236 handle.expect(Description::Poller)?;
237
238 self.inner
239 .poller_reregister(handle, source, interests)
240 .map(|_| CmdResp::None)
241 }
242 crate::Cmd::Accept(waker) => {
250 handle.expect(Description::TcpListener)?;
251
252 self.inner
253 .tcp_listener_accept(waker, handle)
254 .map(|(stream, raddr)| CmdResp::Incoming(stream, raddr))
255 }
256 crate::Cmd::PollOnce(duration) => {
257 handle.expect(Description::Poller)?;
258
259 self.inner
260 .poller_poll_once(handle, duration)
261 .map(|_| CmdResp::None)
262 }
263 crate::Cmd::TryClone => match handle.desc {
264 Description::Poller => self
265 .inner
266 .poller_clone(handle)
267 .map(|handle| CmdResp::Cloned(handle)),
268 _ => self
269 .inner
270 .fd_user_define_clone(handle)
271 .map(|handle| CmdResp::Cloned(handle)),
272 },
273 crate::Cmd::Timeout(waker) => {
274 handle.expect(Description::Timeout)?;
275
276 self.inner
277 .timeout(waker, handle)
278 .map(|next| CmdResp::Timeout(next))
279 }
280 crate::Cmd::LocalAddr => match handle.desc {
281 Description::TcpListener => self
282 .inner
283 .tcp_listener_local_addr(handle)
284 .map(|laddr| CmdResp::SockAddr(laddr)),
285 Description::TcpStream => self
286 .inner
287 .tcp_stream_local_addr(handle)
288 .map(|laddr| CmdResp::SockAddr(laddr)),
289 Description::UdpSocket => self
290 .inner
291 .udp_local_addr(handle)
292 .map(|laddr| CmdResp::SockAddr(laddr)),
293 _ => {
294 return Err(io::Error::new(
295 io::ErrorKind::InvalidInput,
296 format!("Expect File / TcpStream , but got {:?}", handle.desc),
297 ));
298 }
299 },
300 crate::Cmd::RemoteAddr => match handle.desc {
301 Description::TcpStream => self
302 .inner
303 .tcp_stream_remote_addr(handle)
304 .map(|laddr| CmdResp::SockAddr(laddr)),
305 _ => {
306 return Err(io::Error::new(
307 io::ErrorKind::InvalidInput,
308 format!("Expect File / TcpStream , but got {:?}", handle.desc),
309 ));
310 }
311 },
312 crate::Cmd::Shutdown(shutdown) => match handle.desc {
313 Description::TcpStream => self
314 .inner
315 .tcp_stream_shutdown(handle, shutdown)
316 .map(|_| CmdResp::None),
317 _ => {
318 return Err(io::Error::new(
319 io::ErrorKind::InvalidInput,
320 format!("Expect File / TcpStream , but got {:?}", handle.desc),
321 ));
322 }
323 },
324 }
325 }
326
327 fn fd_close(&self, handle: Handle) -> io::Result<()> {
328 match handle.desc {
329 Description::File => self.inner.file_close(handle),
330 Description::TcpListener => self.inner.tcp_listener_close(handle),
331 Description::TcpStream => self.inner.tcp_stream_close(handle),
332 Description::UdpSocket => self.inner.udp_socket_close(handle),
333 Description::Timeout => self.inner.timeout_close(handle),
334 Description::Poller => self.inner.poller_close(handle),
335 Description::External(id) => self.inner.fd_user_define_close(id, handle),
336 }
337 }
338}
339
340impl<T: RawDriverExt + Clone> IntoRawDriver for T {
341 type Driver = RawDriverExtProxy<T>;
342
343 fn into_raw_driver(self) -> Self::Driver {
344 RawDriverExtProxy::new(self)
345 }
346}