nuclei/
async_io.rs

1use super::handle::Handle;
2use super::submission_handler::SubmissionHandler;
3use futures::io::{AsyncRead, AsyncWrite, SeekFrom};
4
5use std::{fs::File, pin::Pin, task::Context, task::Poll};
6use std::{io, task};
7
8use std::net::TcpStream;
9
10#[cfg(unix)]
11use std::os::unix::net::UnixStream;
12#[cfg(unix)]
13use std::{
14    mem::ManuallyDrop,
15    os::unix::io::{AsRawFd, FromRawFd, RawFd},
16    os::unix::prelude::*,
17};
18
19use crate::syscore::Processor;
20use futures::AsyncSeek;
21
22//
23// Proxy operations for Future registration via AsyncRead, AsyncWrite and others.
24// Linux, windows etc. specific
25
26macro_rules! impl_async_read {
27    ($name:ident) => {
28        impl AsyncRead for Handle<$name> {
29            fn poll_read(
30                self: Pin<&mut Self>,
31                cx: &mut Context,
32                buf: &mut [u8],
33            ) -> Poll<io::Result<usize>> {
34                Pin::new(&mut &*Pin::get_mut(self)).poll_read(cx, buf)
35            }
36        }
37    };
38}
39
40macro_rules! impl_async_write {
41    ($name:ident) => {
42        impl AsyncWrite for Handle<$name> {
43            fn poll_write(
44                self: Pin<&mut Self>,
45                cx: &mut Context,
46                buf: &[u8],
47            ) -> Poll<io::Result<usize>> {
48                Pin::new(&mut &*Pin::get_mut(self)).poll_write(cx, buf)
49            }
50
51            fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
52                Pin::new(&mut &*Pin::get_mut(self)).poll_flush(cx)
53            }
54
55            fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
56                Pin::new(&mut &*Pin::get_mut(self)).poll_close(cx)
57            }
58        }
59    };
60}
61
62#[cfg(not(all(feature = "iouring", target_os = "linux")))]
63impl_async_read!(File);
64#[cfg(not(all(feature = "iouring", target_os = "linux")))]
65impl_async_write!(File);
66
67impl_async_read!(TcpStream);
68impl_async_write!(TcpStream);
69
70#[cfg(unix)]
71impl_async_read!(UnixStream);
72#[cfg(unix)]
73impl_async_write!(UnixStream);
74
75///////////////////////////////////
76///// Non proactive File
77///////////////////////////////////
78
79#[cfg(not(all(feature = "iouring", target_os = "linux")))]
80impl AsyncRead for &Handle<File> {
81    fn poll_read(
82        self: Pin<&mut Self>,
83        cx: &mut task::Context<'_>,
84        buf: &mut [u8],
85    ) -> Poll<io::Result<usize>> {
86        let raw_fd = self.as_raw_fd();
87        let buf_len = buf.len();
88        let buf = buf.as_mut_ptr();
89
90        let completion_dispatcher = async move {
91            let file = unsafe { File::from_raw_fd(raw_fd) };
92
93            let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) };
94            let size = Processor::processor_read_file(&file, buf).await?;
95
96            let _ = ManuallyDrop::new(file);
97            Ok(size)
98        };
99
100        SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
101    }
102}
103
104#[cfg(not(all(feature = "iouring", target_os = "linux")))]
105impl AsyncWrite for &Handle<File> {
106    fn poll_write(
107        self: Pin<&mut Self>,
108        cx: &mut Context<'_>,
109        buf: &[u8],
110    ) -> Poll<io::Result<usize>> {
111        let raw_fd = self.as_raw_fd();
112        let buf_len = buf.len();
113        let buf = buf.as_ptr();
114
115        let completion_dispatcher = async move {
116            let file = unsafe { File::from_raw_fd(raw_fd) };
117
118            let buf = unsafe { std::slice::from_raw_parts(buf, buf_len) };
119            let size = Processor::processor_write_file(&file, buf).await?;
120
121            let _ = ManuallyDrop::new(file);
122            Ok(size)
123        };
124
125        SubmissionHandler::<Self>::handle_write(self, cx, completion_dispatcher)
126    }
127
128    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
129        Poll::Ready(Ok(()))
130    }
131
132    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
133        Poll::Ready(Ok(()))
134    }
135}
136
137#[cfg(not(all(feature = "iouring", target_os = "linux")))]
138impl AsyncSeek for Handle<File> {
139    fn poll_seek(
140        self: Pin<&mut Self>,
141        cx: &mut Context<'_>,
142        pos: SeekFrom,
143    ) -> Poll<io::Result<u64>> {
144        let raw_fd = self.as_raw_fd();
145
146        let completion_dispatcher = async move {
147            let file = unsafe { File::from_raw_fd(raw_fd) };
148            let newpos = Processor::processor_seek_file(&file, pos).await?;
149
150            let _ = ManuallyDrop::new(file);
151            Ok(newpos)
152        };
153
154        SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
155            .map(|e| e.map(|i| i as u64))
156    }
157}
158
159///////////////////////////////////
160///// IO URING / Proactive / Linux
161///////////////////////////////////
162
163// Import stream for io_uring,
164
165#[cfg(all(feature = "iouring", target_os = "linux"))]
166use crate::syscore::StoreFile;
167#[cfg(all(feature = "iouring", target_os = "linux"))]
168use futures::io::{IoSlice, IoSliceMut};
169#[cfg(all(feature = "iouring", target_os = "linux"))]
170use futures::*;
171#[cfg(all(feature = "iouring", target_os = "linux"))]
172use lever::prelude::*;
173#[cfg(all(feature = "iouring", target_os = "linux"))]
174use std::path::Path;
175#[cfg(all(feature = "iouring", target_os = "linux"))]
176use std::sync::Arc;
177
178#[cfg(all(feature = "iouring", target_os = "linux"))]
179impl Handle<File> {
180    pub async fn open(p: impl AsRef<Path>) -> io::Result<Handle<File>> {
181        let fd = Processor::processor_open_at(p).await?;
182        let io = unsafe { File::from_raw_fd(fd as _) };
183
184        Ok(Handle {
185            io_task: Some(io),
186            chan: None,
187            store_file: Some(StoreFile::new(fd as _)),
188            read: Arc::new(TTas::new(None)),
189            write: Arc::new(TTas::new(None)),
190        })
191    }
192}
193
194#[cfg(all(feature = "iouring", target_os = "linux"))]
195impl AsyncRead for Handle<File> {
196    fn poll_read(
197        mut self: Pin<&mut Self>,
198        cx: &mut task::Context<'_>,
199        buf: &mut [u8],
200    ) -> Poll<io::Result<usize>> {
201        let mut inner = futures::ready!(self.as_mut().poll_fill_buf(cx))?;
202        let len = io::Read::read(&mut inner, buf)?;
203        self.consume(len);
204        Poll::Ready(Ok(len))
205    }
206
207    fn poll_read_vectored(
208        self: Pin<&mut Self>,
209        cx: &mut Context<'_>,
210        bufs: &mut [IoSliceMut<'_>],
211    ) -> Poll<io::Result<usize>> {
212        let store = &mut self.get_mut().store_file;
213
214        if let Some(store_file) = store.as_mut() {
215            let fd: RawFd = store_file.receive_fd();
216            let op_state = store_file.op_state();
217            let (_, pos) = store_file.bufpair();
218
219            let fut = Processor::processor_read_vectored(&fd, bufs);
220            futures::pin_mut!(fut);
221
222            loop {
223                match fut.as_mut().poll(cx)? {
224                    Poll::Ready(n) => {
225                        *pos += n;
226                        break Poll::Ready(Ok(n));
227                    }
228                    _ => {}
229                }
230            }
231        } else {
232            Poll::Ready(Ok(0))
233        }
234    }
235}
236
237#[cfg(all(feature = "iouring", target_os = "linux"))]
238const NON_READ: &[u8] = &[];
239
240#[cfg(all(feature = "iouring", target_os = "linux"))]
241impl AsyncBufRead for Handle<File> {
242    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
243        let store = &mut self.get_mut().store_file;
244
245        if let Some(store_file) = store.as_mut() {
246            let fd: RawFd = store_file.receive_fd();
247            let op_state = store_file.op_state();
248            let (bufp, pos) = store_file.bufpair();
249
250            bufp.fill_buf(|buf| {
251                let fut = Processor::processor_read_file(&fd, buf, *pos);
252                futures::pin_mut!(fut);
253
254                loop {
255                    match fut.as_mut().poll(cx)? {
256                        Poll::Ready(n) => {
257                            *pos += n;
258                            break Poll::Ready(Ok(n));
259                        }
260                        _ => {}
261                    }
262                }
263            })
264        } else {
265            Poll::Ready(Ok(NON_READ))
266        }
267    }
268
269    fn consume(self: Pin<&mut Self>, amt: usize) {
270        let store = self.get_mut().store_file.as_mut().unwrap();
271        store.buf().consume(amt);
272    }
273}
274
275#[cfg(all(feature = "iouring", target_os = "linux"))]
276impl AsyncWrite for Handle<File> {
277    fn poll_write(
278        self: Pin<&mut Self>,
279        cx: &mut Context<'_>,
280        bufslice: &[u8],
281    ) -> Poll<io::Result<usize>> {
282        let store = &mut self.get_mut().store_file;
283
284        if let Some(store_file) = store.as_mut() {
285            let fd: RawFd = store_file.receive_fd();
286            let op_state = store_file.op_state();
287            let (bufp, pos) = store_file.bufpair();
288
289            let data = futures::ready!(bufp.fill_buf(|mut buf| {
290                Poll::Ready(Ok(io::Write::write(&mut buf, bufslice).unwrap()))
291            }))
292            .unwrap();
293
294            let res = {
295                let fut = Processor::processor_write_file(&fd, data, *pos);
296                futures::pin_mut!(fut);
297
298                loop {
299                    match fut.as_mut().poll(cx)? {
300                        Poll::Ready(n) => {
301                            *pos += n;
302                            break Poll::Ready(Ok(n));
303                        }
304                        _ => {}
305                    }
306                }
307            };
308
309            bufp.clear();
310
311            res
312        } else {
313            Poll::Ready(Ok(0))
314        }
315    }
316
317    fn poll_write_vectored(
318        self: Pin<&mut Self>,
319        cx: &mut Context<'_>,
320        bufs: &[IoSlice<'_>],
321    ) -> Poll<io::Result<usize>> {
322        let store = &mut self.get_mut().store_file;
323
324        if let Some(store_file) = store.as_mut() {
325            let fd: RawFd = store_file.receive_fd();
326            let op_state = store_file.op_state();
327            let (_, pos) = store_file.bufpair();
328
329            let fut = Processor::processor_write_vectored(&fd, bufs);
330            futures::pin_mut!(fut);
331
332            loop {
333                match fut.as_mut().poll(cx)? {
334                    Poll::Ready(n) => {
335                        *pos += n;
336                        break Poll::Ready(Ok(n));
337                    }
338                    _ => {}
339                }
340            }
341        } else {
342            Poll::Ready(Ok(0))
343        }
344    }
345
346    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
347        futures::ready!(self.poll_write(cx, &[]))?;
348        Poll::Ready(Ok(()))
349    }
350
351    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
352        let store = &mut self.get_mut().store_file;
353
354        if let Some(store_file) = store.as_mut() {
355            let fd: RawFd = store_file.receive_fd();
356            let op_state = store_file.op_state();
357
358            let fut = Processor::processor_close_file(&fd);
359            futures::pin_mut!(fut);
360
361            loop {
362                match fut.as_mut().poll(cx)? {
363                    Poll::Ready(_) => break Poll::Ready(Ok(())),
364                    _ => {}
365                }
366            }
367        } else {
368            Poll::Ready(Ok(()))
369        }
370    }
371}
372
373#[cfg(all(feature = "iouring", target_os = "linux"))]
374impl AsyncSeek for Handle<File> {
375    fn poll_seek(
376        self: Pin<&mut Self>,
377        cx: &mut Context<'_>,
378        pos: SeekFrom,
379    ) -> Poll<io::Result<u64>> {
380        let store = &mut self.get_mut().store_file.as_mut().unwrap();
381
382        let (cursor, offset) = match pos {
383            io::SeekFrom::Start(n) => {
384                *store.pos() = n as usize;
385                return Poll::Ready(Ok(*store.pos() as u64));
386            }
387            io::SeekFrom::Current(n) => (*store.pos(), n),
388            io::SeekFrom::End(n) => {
389                let fut = store.poll_file_size();
390                futures::pin_mut!(fut);
391                (futures::ready!(fut.as_mut().poll(cx))?, n)
392            }
393        };
394        let valid_seek = if offset.is_negative() {
395            match cursor.checked_sub(offset.unsigned_abs() as usize) {
396                Some(valid_seek) => valid_seek,
397                None => {
398                    let invalid = io::Error::from(io::ErrorKind::InvalidInput);
399                    return Poll::Ready(Err(invalid));
400                }
401            }
402        } else {
403            match cursor.checked_add(offset as usize) {
404                Some(valid_seek) => valid_seek,
405                None => {
406                    let overflow = io::Error::from_raw_os_error(libc::EOVERFLOW);
407                    return Poll::Ready(Err(overflow));
408                }
409            }
410        };
411        *store.pos() = valid_seek;
412        Poll::Ready(Ok(*store.pos() as u64))
413    }
414}
415
416///////////////////////////////////
417///// TcpStream
418///////////////////////////////////
419
420#[cfg(unix)]
421impl AsyncRead for &Handle<TcpStream> {
422    fn poll_read(
423        self: Pin<&mut Self>,
424        cx: &mut task::Context<'_>,
425        buf: &mut [u8],
426    ) -> Poll<io::Result<usize>> {
427        let raw_fd = self.as_raw_fd();
428        let buf_len = buf.len();
429        let buf = buf.as_mut_ptr();
430
431        let completion_dispatcher = async move {
432            let sock = unsafe { TcpStream::from_raw_fd(raw_fd) };
433
434            let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) };
435            let size = Processor::processor_recv(&sock, buf).await?;
436
437            let _ = ManuallyDrop::new(sock);
438            Ok(size)
439        };
440
441        SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
442    }
443}
444
445#[cfg(unix)]
446impl AsyncWrite for &Handle<TcpStream> {
447    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
448        let raw_fd = self.as_raw_fd();
449        let buf_len = buf.len();
450        let buf = buf.as_ptr();
451
452        let completion_dispatcher = async move {
453            let sock = unsafe { TcpStream::from_raw_fd(raw_fd) };
454
455            let buf = unsafe { std::slice::from_raw_parts(buf, buf_len) };
456            let size = Processor::processor_send(&sock, buf).await?;
457
458            let _ = ManuallyDrop::new(sock);
459            Ok(size)
460        };
461
462        SubmissionHandler::<Self>::handle_write(self, cx, completion_dispatcher)
463    }
464
465    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
466        Poll::Ready(Ok(()))
467    }
468
469    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
470        Poll::Ready(Ok(()))
471    }
472}
473
474///////////////////////////////////
475///// AsRawFd impls
476///////////////////////////////////
477
478#[cfg(unix)]
479impl<T: AsRawFd> AsRawFd for Handle<T> {
480    fn as_raw_fd(&self) -> RawFd {
481        self.io_task.as_ref().unwrap().as_raw_fd()
482    }
483}
484
485#[cfg(unix)]
486impl<T: IntoRawFd> IntoRawFd for Handle<T> {
487    fn into_raw_fd(self) -> RawFd {
488        self.into_inner().into_raw_fd()
489    }
490}
491
492///////////////////////////////////
493///// UnixStream
494///////////////////////////////////
495
496#[cfg(unix)]
497impl AsyncRead for &Handle<UnixStream> {
498    fn poll_read(
499        self: Pin<&mut Self>,
500        cx: &mut Context,
501        buf: &mut [u8],
502    ) -> Poll<io::Result<usize>> {
503        let raw_fd = self.as_raw_fd();
504        let buf_len = buf.len();
505        let buf = buf.as_mut_ptr();
506
507        let completion_dispatcher = async move {
508            let sock = unsafe { UnixStream::from_raw_fd(raw_fd) };
509
510            let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) };
511            let size = Processor::processor_recv(&sock, buf).await?;
512
513            let _ = ManuallyDrop::new(sock);
514            Ok(size)
515        };
516
517        SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
518    }
519}
520
521#[cfg(unix)]
522impl AsyncWrite for &Handle<UnixStream> {
523    fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
524        let raw_fd = self.as_raw_fd();
525        let buf_len = buf.len();
526        let buf = buf.as_ptr();
527
528        let completion_dispatcher = async move {
529            let sock = unsafe { UnixStream::from_raw_fd(raw_fd) };
530
531            let buf = unsafe { std::slice::from_raw_parts(buf, buf_len) };
532            let size = Processor::processor_send(&sock, buf).await?;
533
534            let _ = ManuallyDrop::new(sock);
535            Ok(size)
536        };
537
538        SubmissionHandler::<Self>::handle_write(self, cx, completion_dispatcher)
539    }
540
541    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
542        Poll::Ready(Ok(()))
543    }
544
545    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
546        Poll::Ready(Ok(()))
547    }
548}