hala_io/
driver_ext.rs

1use std::task::Waker;
2use std::time::Duration;
3use std::{io, net::Shutdown};
4
5use std::net::SocketAddr;
6
7use crate::{
8    CmdResp, Description, FileMode, Handle, Interest, IntoRawDriver, OpenFlags, RawDriver,
9};
10
11/// Easier to implement version of `RawDriver` trait
12pub trait RawDriverExt {
13    fn fd_user_define_open(&self, id: usize, buf: &[u8]) -> io::Result<Handle>;
14
15    fn fd_user_define_close(&self, id: usize, handle: Handle) -> io::Result<()>;
16    fn fd_user_define_clone(&self, handle: Handle) -> io::Result<Handle>;
17    /// Create new file
18    fn file_open(&self, path: &str, mode: FileMode) -> io::Result<Handle>;
19
20    fn file_write(&self, waker: Waker, handle: Handle, buf: &[u8]) -> io::Result<usize>;
21
22    fn file_read(&self, waker: Waker, handle: Handle, buf: &mut [u8]) -> io::Result<usize>;
23
24    /// Close file handle
25    fn file_close(&self, handle: Handle) -> io::Result<()>;
26
27    fn timeout_open(&self, duration: Duration) -> io::Result<Handle>;
28
29    fn timeout(&self, waker: Waker, handle: Handle) -> io::Result<bool>;
30
31    fn timeout_close(&self, handle: Handle) -> io::Result<()>;
32
33    /// Create new `TcpListener` socket and bound to `laddrs`
34    fn tcp_listener_bind(&self, laddrs: &[SocketAddr]) -> io::Result<Handle>;
35
36    /// Accept one incoming `TcpStream` socket, may returns WOULD_BLOCK
37    fn tcp_listener_accept(&self, waker: Waker, handle: Handle)
38        -> io::Result<(Handle, SocketAddr)>;
39
40    /// Close `TcpListener` socket.
41    fn tcp_listener_close(&self, handle: Handle) -> io::Result<()>;
42
43    /// Create new `TcpStream` socket and try connect to remote peer.
44    fn tcp_stream_connect(&self, raddrs: &[SocketAddr]) -> io::Result<Handle>;
45
46    /// Write data to underly `TcpStream`
47    fn tcp_stream_write(&self, waker: Waker, handle: Handle, buf: &[u8]) -> io::Result<usize>;
48
49    /// Read data from underly `TcpStream`
50    fn tcp_stream_read(&self, waker: Waker, handle: Handle, buf: &mut [u8]) -> io::Result<usize>;
51
52    /// Close `TcpStream` socket.
53    fn tcp_stream_close(&self, handle: Handle) -> io::Result<()>;
54
55    /// Create a new `UdpSocket` and bind to `laddrs`
56    fn udp_socket_bind(&self, laddrs: &[SocketAddr]) -> io::Result<Handle>;
57
58    /// Send one datagram to `raddr` peer
59    fn udp_socket_sendto(
60        &self,
61        waker: Waker,
62        handle: Handle,
63        buf: &[u8],
64        raddr: SocketAddr,
65    ) -> io::Result<usize>;
66
67    /// Recv one datagram from peer.
68    fn udp_socket_recv_from(
69        &self,
70        waker: Waker,
71        handle: Handle,
72        buf: &mut [u8],
73    ) -> io::Result<(usize, SocketAddr)>;
74
75    /// Close `UdpSocket`
76    fn udp_socket_close(&self, handle: Handle) -> io::Result<()>;
77
78    /// Create new readiness io event poller.
79    fn poller_open(&self, duration: Option<Duration>) -> io::Result<Handle>;
80
81    /// Clone pller handle.
82    fn poller_clone(&self, handle: Handle) -> io::Result<Handle>;
83
84    /// Register interests events of one source.
85    fn poller_register(
86        &self,
87        poller: Handle,
88        source: Handle,
89        interests: Interest,
90    ) -> io::Result<()>;
91
92    /// Re-register interests events of one source.
93    fn poller_reregister(
94        &self,
95        poller: Handle,
96        source: Handle,
97        interests: Interest,
98    ) -> io::Result<()>;
99
100    /// Deregister interests events of one source.
101    fn poller_deregister(&self, poller: Handle, source: Handle) -> io::Result<()>;
102
103    fn poller_poll_once(&self, handle: Handle, duration: Option<Duration>) -> io::Result<()>;
104
105    /// Close poller
106    fn poller_close(&self, handle: Handle) -> io::Result<()>;
107
108    fn tcp_listener_local_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
109
110    fn tcp_stream_local_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
111
112    fn tcp_stream_remote_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
113
114    fn tcp_stream_shutdown(&self, handle: Handle, shutdown: Shutdown) -> io::Result<()>;
115
116    fn udp_local_addr(&self, handle: Handle) -> io::Result<SocketAddr>;
117}
118
119/// Adapter `RawDriverExt` trait to `RawDriver` trait
120#[derive(Clone)]
121pub struct RawDriverExtProxy<T: Clone> {
122    inner: T,
123}
124
125impl<T: RawDriverExt + Clone> RawDriverExtProxy<T> {
126    pub fn new(inner: T) -> Self {
127        Self { inner }
128    }
129}
130
131impl<T: RawDriverExt + Clone> RawDriver for RawDriverExtProxy<T> {
132    fn fd_open(
133        &self,
134        desc: crate::Description,
135        open_flags: crate::OpenFlags,
136    ) -> io::Result<Handle> {
137        match desc {
138            crate::Description::File => {
139                let (path, mode) = open_flags.try_into_open_file()?;
140
141                self.inner.file_open(path, mode)
142            }
143            crate::Description::TcpListener => {
144                let laddrs = open_flags.try_into_bind()?;
145
146                self.inner.tcp_listener_bind(laddrs)
147            }
148            crate::Description::TcpStream => {
149                let laddrs = open_flags.try_into_connect()?;
150
151                self.inner.tcp_stream_connect(laddrs)
152            }
153            crate::Description::UdpSocket => {
154                let laddrs = open_flags.try_into_bind()?;
155
156                self.inner.udp_socket_bind(laddrs)
157            }
158            crate::Description::Timeout => {
159                let duration = open_flags.try_into_duration()?;
160
161                self.inner.timeout_open(duration)
162            }
163            crate::Description::Poller => {
164                let duration = match open_flags {
165                    OpenFlags::Duration(duration) => Some(duration),
166                    _ => None,
167                };
168
169                self.inner.poller_open(duration)
170            }
171            crate::Description::External(id) => {
172                let buf = open_flags.try_into_user_defined()?;
173
174                self.inner.fd_user_define_open(id, buf)
175            }
176        }
177    }
178
179    fn fd_cntl(&self, handle: Handle, cmd: crate::Cmd) -> io::Result<crate::CmdResp> {
180        match cmd {
181            crate::Cmd::Read { waker, buf } => match handle.desc {
182                Description::File => self
183                    .inner
184                    .file_read(waker, handle, buf)
185                    .map(|len| CmdResp::DataLen(len)),
186                Description::TcpStream => self
187                    .inner
188                    .tcp_stream_read(waker, handle, buf)
189                    .map(|len| CmdResp::DataLen(len)),
190
191                _ => {
192                    return Err(io::Error::new(
193                        io::ErrorKind::InvalidInput,
194                        format!("Expect File / TcpStream , but got {:?}", handle.desc),
195                    ));
196                }
197            },
198            crate::Cmd::Write { waker, buf } => match handle.desc {
199                Description::File => self
200                    .inner
201                    .file_write(waker, handle, buf)
202                    .map(|len| CmdResp::DataLen(len)),
203                Description::TcpStream => self
204                    .inner
205                    .tcp_stream_write(waker, handle, buf)
206                    .map(|len| CmdResp::DataLen(len)),
207                _ => {
208                    return Err(io::Error::new(
209                        io::ErrorKind::InvalidInput,
210                        format!("Expect File / TcpStream , but got {:?}", handle.desc),
211                    ));
212                }
213            },
214            crate::Cmd::SendTo { waker, buf, raddr } => {
215                handle.expect(Description::UdpSocket)?;
216
217                self.inner
218                    .udp_socket_sendto(waker, handle, buf, raddr)
219                    .map(|len| CmdResp::DataLen(len))
220            }
221            crate::Cmd::RecvFrom { waker, buf } => {
222                handle.expect(Description::UdpSocket)?;
223
224                self.inner
225                    .udp_socket_recv_from(waker, handle, buf)
226                    .map(|(len, raddr)| CmdResp::RecvFrom(len, raddr))
227            }
228            crate::Cmd::Register { source, interests } => {
229                handle.expect(Description::Poller)?;
230
231                self.inner
232                    .poller_register(handle, source, interests)
233                    .map(|_| CmdResp::None)
234            }
235            crate::Cmd::ReRegister { source, interests } => {
236                handle.expect(Description::Poller)?;
237
238                self.inner
239                    .poller_reregister(handle, source, interests)
240                    .map(|_| CmdResp::None)
241            }
242            // crate::Cmd::Deregister(source) => {
243            //     handle.expect(Description::Poller)?;
244
245            //     self.inner
246            //         .poller_deregister(handle, source)
247            //         .map(|_| CmdResp::None)
248            // }
249            crate::Cmd::Accept(waker) => {
250                handle.expect(Description::TcpListener)?;
251
252                self.inner
253                    .tcp_listener_accept(waker, handle)
254                    .map(|(stream, raddr)| CmdResp::Incoming(stream, raddr))
255            }
256            crate::Cmd::PollOnce(duration) => {
257                handle.expect(Description::Poller)?;
258
259                self.inner
260                    .poller_poll_once(handle, duration)
261                    .map(|_| CmdResp::None)
262            }
263            crate::Cmd::TryClone => match handle.desc {
264                Description::Poller => self
265                    .inner
266                    .poller_clone(handle)
267                    .map(|handle| CmdResp::Cloned(handle)),
268                _ => self
269                    .inner
270                    .fd_user_define_clone(handle)
271                    .map(|handle| CmdResp::Cloned(handle)),
272            },
273            crate::Cmd::Timeout(waker) => {
274                handle.expect(Description::Timeout)?;
275
276                self.inner
277                    .timeout(waker, handle)
278                    .map(|next| CmdResp::Timeout(next))
279            }
280            crate::Cmd::LocalAddr => match handle.desc {
281                Description::TcpListener => self
282                    .inner
283                    .tcp_listener_local_addr(handle)
284                    .map(|laddr| CmdResp::SockAddr(laddr)),
285                Description::TcpStream => self
286                    .inner
287                    .tcp_stream_local_addr(handle)
288                    .map(|laddr| CmdResp::SockAddr(laddr)),
289                Description::UdpSocket => self
290                    .inner
291                    .udp_local_addr(handle)
292                    .map(|laddr| CmdResp::SockAddr(laddr)),
293                _ => {
294                    return Err(io::Error::new(
295                        io::ErrorKind::InvalidInput,
296                        format!("Expect File / TcpStream , but got {:?}", handle.desc),
297                    ));
298                }
299            },
300            crate::Cmd::RemoteAddr => match handle.desc {
301                Description::TcpStream => self
302                    .inner
303                    .tcp_stream_remote_addr(handle)
304                    .map(|laddr| CmdResp::SockAddr(laddr)),
305                _ => {
306                    return Err(io::Error::new(
307                        io::ErrorKind::InvalidInput,
308                        format!("Expect File / TcpStream , but got {:?}", handle.desc),
309                    ));
310                }
311            },
312            crate::Cmd::Shutdown(shutdown) => match handle.desc {
313                Description::TcpStream => self
314                    .inner
315                    .tcp_stream_shutdown(handle, shutdown)
316                    .map(|_| CmdResp::None),
317                _ => {
318                    return Err(io::Error::new(
319                        io::ErrorKind::InvalidInput,
320                        format!("Expect File / TcpStream , but got {:?}", handle.desc),
321                    ));
322                }
323            },
324        }
325    }
326
327    fn fd_close(&self, handle: Handle) -> io::Result<()> {
328        match handle.desc {
329            Description::File => self.inner.file_close(handle),
330            Description::TcpListener => self.inner.tcp_listener_close(handle),
331            Description::TcpStream => self.inner.tcp_stream_close(handle),
332            Description::UdpSocket => self.inner.udp_socket_close(handle),
333            Description::Timeout => self.inner.timeout_close(handle),
334            Description::Poller => self.inner.poller_close(handle),
335            Description::External(id) => self.inner.fd_user_define_close(id, handle),
336        }
337    }
338}
339
340impl<T: RawDriverExt + Clone> IntoRawDriver for T {
341    type Driver = RawDriverExtProxy<T>;
342
343    fn into_raw_driver(self) -> Self::Driver {
344        RawDriverExtProxy::new(self)
345    }
346}