1use super::handle::Handle;
2use super::submission_handler::SubmissionHandler;
3use futures::io::{AsyncRead, AsyncWrite, SeekFrom};
4
5use std::{fs::File, pin::Pin, task::Context, task::Poll};
6use std::{io, task};
7
8use std::net::TcpStream;
9
10#[cfg(unix)]
11use std::os::unix::net::UnixStream;
12#[cfg(unix)]
13use std::{
14 mem::ManuallyDrop,
15 os::unix::io::{AsRawFd, FromRawFd, RawFd},
16 os::unix::prelude::*,
17};
18
19use crate::syscore::Processor;
20use futures::AsyncSeek;
21
22macro_rules! impl_async_read {
27 ($name:ident) => {
28 impl AsyncRead for Handle<$name> {
29 fn poll_read(
30 self: Pin<&mut Self>,
31 cx: &mut Context,
32 buf: &mut [u8],
33 ) -> Poll<io::Result<usize>> {
34 Pin::new(&mut &*Pin::get_mut(self)).poll_read(cx, buf)
35 }
36 }
37 };
38}
39
40macro_rules! impl_async_write {
41 ($name:ident) => {
42 impl AsyncWrite for Handle<$name> {
43 fn poll_write(
44 self: Pin<&mut Self>,
45 cx: &mut Context,
46 buf: &[u8],
47 ) -> Poll<io::Result<usize>> {
48 Pin::new(&mut &*Pin::get_mut(self)).poll_write(cx, buf)
49 }
50
51 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
52 Pin::new(&mut &*Pin::get_mut(self)).poll_flush(cx)
53 }
54
55 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
56 Pin::new(&mut &*Pin::get_mut(self)).poll_close(cx)
57 }
58 }
59 };
60}
61
62#[cfg(not(all(feature = "iouring", target_os = "linux")))]
63impl_async_read!(File);
64#[cfg(not(all(feature = "iouring", target_os = "linux")))]
65impl_async_write!(File);
66
67impl_async_read!(TcpStream);
68impl_async_write!(TcpStream);
69
70#[cfg(unix)]
71impl_async_read!(UnixStream);
72#[cfg(unix)]
73impl_async_write!(UnixStream);
74
75#[cfg(not(all(feature = "iouring", target_os = "linux")))]
80impl AsyncRead for &Handle<File> {
81 fn poll_read(
82 self: Pin<&mut Self>,
83 cx: &mut task::Context<'_>,
84 buf: &mut [u8],
85 ) -> Poll<io::Result<usize>> {
86 let raw_fd = self.as_raw_fd();
87 let buf_len = buf.len();
88 let buf = buf.as_mut_ptr();
89
90 let completion_dispatcher = async move {
91 let file = unsafe { File::from_raw_fd(raw_fd) };
92
93 let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) };
94 let size = Processor::processor_read_file(&file, buf).await?;
95
96 let _ = ManuallyDrop::new(file);
97 Ok(size)
98 };
99
100 SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
101 }
102}
103
104#[cfg(not(all(feature = "iouring", target_os = "linux")))]
105impl AsyncWrite for &Handle<File> {
106 fn poll_write(
107 self: Pin<&mut Self>,
108 cx: &mut Context<'_>,
109 buf: &[u8],
110 ) -> Poll<io::Result<usize>> {
111 let raw_fd = self.as_raw_fd();
112 let buf_len = buf.len();
113 let buf = buf.as_ptr();
114
115 let completion_dispatcher = async move {
116 let file = unsafe { File::from_raw_fd(raw_fd) };
117
118 let buf = unsafe { std::slice::from_raw_parts(buf, buf_len) };
119 let size = Processor::processor_write_file(&file, buf).await?;
120
121 let _ = ManuallyDrop::new(file);
122 Ok(size)
123 };
124
125 SubmissionHandler::<Self>::handle_write(self, cx, completion_dispatcher)
126 }
127
128 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
129 Poll::Ready(Ok(()))
130 }
131
132 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
133 Poll::Ready(Ok(()))
134 }
135}
136
137#[cfg(not(all(feature = "iouring", target_os = "linux")))]
138impl AsyncSeek for Handle<File> {
139 fn poll_seek(
140 self: Pin<&mut Self>,
141 cx: &mut Context<'_>,
142 pos: SeekFrom,
143 ) -> Poll<io::Result<u64>> {
144 let raw_fd = self.as_raw_fd();
145
146 let completion_dispatcher = async move {
147 let file = unsafe { File::from_raw_fd(raw_fd) };
148 let newpos = Processor::processor_seek_file(&file, pos).await?;
149
150 let _ = ManuallyDrop::new(file);
151 Ok(newpos)
152 };
153
154 SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
155 .map(|e| e.map(|i| i as u64))
156 }
157}
158
159#[cfg(all(feature = "iouring", target_os = "linux"))]
166use crate::syscore::StoreFile;
167#[cfg(all(feature = "iouring", target_os = "linux"))]
168use futures::io::{IoSlice, IoSliceMut};
169#[cfg(all(feature = "iouring", target_os = "linux"))]
170use futures::*;
171#[cfg(all(feature = "iouring", target_os = "linux"))]
172use lever::prelude::*;
173#[cfg(all(feature = "iouring", target_os = "linux"))]
174use std::path::Path;
175#[cfg(all(feature = "iouring", target_os = "linux"))]
176use std::sync::Arc;
177
178#[cfg(all(feature = "iouring", target_os = "linux"))]
179impl Handle<File> {
180 pub async fn open(p: impl AsRef<Path>) -> io::Result<Handle<File>> {
181 let fd = Processor::processor_open_at(p).await?;
182 let io = unsafe { File::from_raw_fd(fd as _) };
183
184 Ok(Handle {
185 io_task: Some(io),
186 chan: None,
187 store_file: Some(StoreFile::new(fd as _)),
188 read: Arc::new(TTas::new(None)),
189 write: Arc::new(TTas::new(None)),
190 })
191 }
192}
193
194#[cfg(all(feature = "iouring", target_os = "linux"))]
195impl AsyncRead for Handle<File> {
196 fn poll_read(
197 mut self: Pin<&mut Self>,
198 cx: &mut task::Context<'_>,
199 buf: &mut [u8],
200 ) -> Poll<io::Result<usize>> {
201 let mut inner = futures::ready!(self.as_mut().poll_fill_buf(cx))?;
202 let len = io::Read::read(&mut inner, buf)?;
203 self.consume(len);
204 Poll::Ready(Ok(len))
205 }
206
207 fn poll_read_vectored(
208 self: Pin<&mut Self>,
209 cx: &mut Context<'_>,
210 bufs: &mut [IoSliceMut<'_>],
211 ) -> Poll<io::Result<usize>> {
212 let store = &mut self.get_mut().store_file;
213
214 if let Some(store_file) = store.as_mut() {
215 let fd: RawFd = store_file.receive_fd();
216 let op_state = store_file.op_state();
217 let (_, pos) = store_file.bufpair();
218
219 let fut = Processor::processor_read_vectored(&fd, bufs);
220 futures::pin_mut!(fut);
221
222 loop {
223 match fut.as_mut().poll(cx)? {
224 Poll::Ready(n) => {
225 *pos += n;
226 break Poll::Ready(Ok(n));
227 }
228 _ => {}
229 }
230 }
231 } else {
232 Poll::Ready(Ok(0))
233 }
234 }
235}
236
237#[cfg(all(feature = "iouring", target_os = "linux"))]
238const NON_READ: &[u8] = &[];
239
240#[cfg(all(feature = "iouring", target_os = "linux"))]
241impl AsyncBufRead for Handle<File> {
242 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
243 let store = &mut self.get_mut().store_file;
244
245 if let Some(store_file) = store.as_mut() {
246 let fd: RawFd = store_file.receive_fd();
247 let op_state = store_file.op_state();
248 let (bufp, pos) = store_file.bufpair();
249
250 bufp.fill_buf(|buf| {
251 let fut = Processor::processor_read_file(&fd, buf, *pos);
252 futures::pin_mut!(fut);
253
254 loop {
255 match fut.as_mut().poll(cx)? {
256 Poll::Ready(n) => {
257 *pos += n;
258 break Poll::Ready(Ok(n));
259 }
260 _ => {}
261 }
262 }
263 })
264 } else {
265 Poll::Ready(Ok(NON_READ))
266 }
267 }
268
269 fn consume(self: Pin<&mut Self>, amt: usize) {
270 let store = self.get_mut().store_file.as_mut().unwrap();
271 store.buf().consume(amt);
272 }
273}
274
275#[cfg(all(feature = "iouring", target_os = "linux"))]
276impl AsyncWrite for Handle<File> {
277 fn poll_write(
278 self: Pin<&mut Self>,
279 cx: &mut Context<'_>,
280 bufslice: &[u8],
281 ) -> Poll<io::Result<usize>> {
282 let store = &mut self.get_mut().store_file;
283
284 if let Some(store_file) = store.as_mut() {
285 let fd: RawFd = store_file.receive_fd();
286 let op_state = store_file.op_state();
287 let (bufp, pos) = store_file.bufpair();
288
289 let data = futures::ready!(bufp.fill_buf(|mut buf| {
290 Poll::Ready(Ok(io::Write::write(&mut buf, bufslice).unwrap()))
291 }))
292 .unwrap();
293
294 let res = {
295 let fut = Processor::processor_write_file(&fd, data, *pos);
296 futures::pin_mut!(fut);
297
298 loop {
299 match fut.as_mut().poll(cx)? {
300 Poll::Ready(n) => {
301 *pos += n;
302 break Poll::Ready(Ok(n));
303 }
304 _ => {}
305 }
306 }
307 };
308
309 bufp.clear();
310
311 res
312 } else {
313 Poll::Ready(Ok(0))
314 }
315 }
316
317 fn poll_write_vectored(
318 self: Pin<&mut Self>,
319 cx: &mut Context<'_>,
320 bufs: &[IoSlice<'_>],
321 ) -> Poll<io::Result<usize>> {
322 let store = &mut self.get_mut().store_file;
323
324 if let Some(store_file) = store.as_mut() {
325 let fd: RawFd = store_file.receive_fd();
326 let op_state = store_file.op_state();
327 let (_, pos) = store_file.bufpair();
328
329 let fut = Processor::processor_write_vectored(&fd, bufs);
330 futures::pin_mut!(fut);
331
332 loop {
333 match fut.as_mut().poll(cx)? {
334 Poll::Ready(n) => {
335 *pos += n;
336 break Poll::Ready(Ok(n));
337 }
338 _ => {}
339 }
340 }
341 } else {
342 Poll::Ready(Ok(0))
343 }
344 }
345
346 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
347 futures::ready!(self.poll_write(cx, &[]))?;
348 Poll::Ready(Ok(()))
349 }
350
351 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
352 let store = &mut self.get_mut().store_file;
353
354 if let Some(store_file) = store.as_mut() {
355 let fd: RawFd = store_file.receive_fd();
356 let op_state = store_file.op_state();
357
358 let fut = Processor::processor_close_file(&fd);
359 futures::pin_mut!(fut);
360
361 loop {
362 match fut.as_mut().poll(cx)? {
363 Poll::Ready(_) => break Poll::Ready(Ok(())),
364 _ => {}
365 }
366 }
367 } else {
368 Poll::Ready(Ok(()))
369 }
370 }
371}
372
373#[cfg(all(feature = "iouring", target_os = "linux"))]
374impl AsyncSeek for Handle<File> {
375 fn poll_seek(
376 self: Pin<&mut Self>,
377 cx: &mut Context<'_>,
378 pos: SeekFrom,
379 ) -> Poll<io::Result<u64>> {
380 let store = &mut self.get_mut().store_file.as_mut().unwrap();
381
382 let (cursor, offset) = match pos {
383 io::SeekFrom::Start(n) => {
384 *store.pos() = n as usize;
385 return Poll::Ready(Ok(*store.pos() as u64));
386 }
387 io::SeekFrom::Current(n) => (*store.pos(), n),
388 io::SeekFrom::End(n) => {
389 let fut = store.poll_file_size();
390 futures::pin_mut!(fut);
391 (futures::ready!(fut.as_mut().poll(cx))?, n)
392 }
393 };
394 let valid_seek = if offset.is_negative() {
395 match cursor.checked_sub(offset.unsigned_abs() as usize) {
396 Some(valid_seek) => valid_seek,
397 None => {
398 let invalid = io::Error::from(io::ErrorKind::InvalidInput);
399 return Poll::Ready(Err(invalid));
400 }
401 }
402 } else {
403 match cursor.checked_add(offset as usize) {
404 Some(valid_seek) => valid_seek,
405 None => {
406 let overflow = io::Error::from_raw_os_error(libc::EOVERFLOW);
407 return Poll::Ready(Err(overflow));
408 }
409 }
410 };
411 *store.pos() = valid_seek;
412 Poll::Ready(Ok(*store.pos() as u64))
413 }
414}
415
416#[cfg(unix)]
421impl AsyncRead for &Handle<TcpStream> {
422 fn poll_read(
423 self: Pin<&mut Self>,
424 cx: &mut task::Context<'_>,
425 buf: &mut [u8],
426 ) -> Poll<io::Result<usize>> {
427 let raw_fd = self.as_raw_fd();
428 let buf_len = buf.len();
429 let buf = buf.as_mut_ptr();
430
431 let completion_dispatcher = async move {
432 let sock = unsafe { TcpStream::from_raw_fd(raw_fd) };
433
434 let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) };
435 let size = Processor::processor_recv(&sock, buf).await?;
436
437 let _ = ManuallyDrop::new(sock);
438 Ok(size)
439 };
440
441 SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
442 }
443}
444
445#[cfg(unix)]
446impl AsyncWrite for &Handle<TcpStream> {
447 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
448 let raw_fd = self.as_raw_fd();
449 let buf_len = buf.len();
450 let buf = buf.as_ptr();
451
452 let completion_dispatcher = async move {
453 let sock = unsafe { TcpStream::from_raw_fd(raw_fd) };
454
455 let buf = unsafe { std::slice::from_raw_parts(buf, buf_len) };
456 let size = Processor::processor_send(&sock, buf).await?;
457
458 let _ = ManuallyDrop::new(sock);
459 Ok(size)
460 };
461
462 SubmissionHandler::<Self>::handle_write(self, cx, completion_dispatcher)
463 }
464
465 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
466 Poll::Ready(Ok(()))
467 }
468
469 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
470 Poll::Ready(Ok(()))
471 }
472}
473
474#[cfg(unix)]
479impl<T: AsRawFd> AsRawFd for Handle<T> {
480 fn as_raw_fd(&self) -> RawFd {
481 self.io_task.as_ref().unwrap().as_raw_fd()
482 }
483}
484
485#[cfg(unix)]
486impl<T: IntoRawFd> IntoRawFd for Handle<T> {
487 fn into_raw_fd(self) -> RawFd {
488 self.into_inner().into_raw_fd()
489 }
490}
491
492#[cfg(unix)]
497impl AsyncRead for &Handle<UnixStream> {
498 fn poll_read(
499 self: Pin<&mut Self>,
500 cx: &mut Context,
501 buf: &mut [u8],
502 ) -> Poll<io::Result<usize>> {
503 let raw_fd = self.as_raw_fd();
504 let buf_len = buf.len();
505 let buf = buf.as_mut_ptr();
506
507 let completion_dispatcher = async move {
508 let sock = unsafe { UnixStream::from_raw_fd(raw_fd) };
509
510 let buf = unsafe { std::slice::from_raw_parts_mut(buf, buf_len) };
511 let size = Processor::processor_recv(&sock, buf).await?;
512
513 let _ = ManuallyDrop::new(sock);
514 Ok(size)
515 };
516
517 SubmissionHandler::<Self>::handle_read(self, cx, completion_dispatcher)
518 }
519}
520
521#[cfg(unix)]
522impl AsyncWrite for &Handle<UnixStream> {
523 fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
524 let raw_fd = self.as_raw_fd();
525 let buf_len = buf.len();
526 let buf = buf.as_ptr();
527
528 let completion_dispatcher = async move {
529 let sock = unsafe { UnixStream::from_raw_fd(raw_fd) };
530
531 let buf = unsafe { std::slice::from_raw_parts(buf, buf_len) };
532 let size = Processor::processor_send(&sock, buf).await?;
533
534 let _ = ManuallyDrop::new(sock);
535 Ok(size)
536 };
537
538 SubmissionHandler::<Self>::handle_write(self, cx, completion_dispatcher)
539 }
540
541 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
542 Poll::Ready(Ok(()))
543 }
544
545 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
546 Poll::Ready(Ok(()))
547 }
548}