1use std::future::Future;
47use std::io;
48use std::os::unix::io::RawFd;
49use std::pin::Pin;
50use std::task::{Context, Poll};
51
52use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
53
54use crate::error::{Result, SaferRingError};
55use crate::ownership::OwnedBuffer;
56use crate::ring::Ring;
57
58const ADAPTER_BUFFER_SIZE: usize = 8192;
60
61type OperationFutureResult = Result<(usize, OwnedBuffer)>;
63
64type OwnedOperationFuture<'a> = Pin<Box<dyn Future<Output = OperationFutureResult> + 'a>>;
66
67pub struct AsyncReadAdapter<'ring> {
78 ring: &'ring Ring<'ring>,
79 fd: RawFd,
80 internal_buffer: Option<OwnedBuffer>,
82 cached_data: Vec<u8>,
84 cached_pos: usize,
86 read_future: Option<OwnedOperationFuture<'ring>>,
88}
89
90impl<'ring> AsyncReadAdapter<'ring> {
91 pub fn new(ring: &'ring Ring<'ring>, fd: RawFd) -> Self {
98 Self {
99 ring,
100 fd,
101 internal_buffer: Some(OwnedBuffer::new(ADAPTER_BUFFER_SIZE)),
102 cached_data: Vec::new(),
103 cached_pos: 0,
104 read_future: None,
105 }
106 }
107
108 pub fn ring(&self) -> &'ring Ring<'ring> {
110 self.ring
111 }
112
113 pub fn fd(&self) -> RawFd {
115 self.fd
116 }
117}
118
119impl<'ring> AsyncRead for AsyncReadAdapter<'ring> {
120 fn poll_read(
121 mut self: Pin<&mut Self>,
122 cx: &mut Context<'_>,
123 buf: &mut ReadBuf<'_>,
124 ) -> Poll<io::Result<()>> {
125 if self.cached_pos < self.cached_data.len() {
127 let available = self.cached_data.len() - self.cached_pos;
128 let to_copy = available.min(buf.remaining());
129
130 if to_copy > 0 {
131 let data = &self.cached_data[self.cached_pos..self.cached_pos + to_copy];
132 buf.put_slice(data);
133 self.cached_pos += to_copy;
134
135 if self.cached_pos >= self.cached_data.len() {
137 self.cached_data.clear();
138 self.cached_pos = 0;
139 }
140
141 return Poll::Ready(Ok(()));
142 }
143 }
144
145 if let Some(mut future) = self.read_future.take() {
147 match future.as_mut().poll(cx) {
148 Poll::Ready(Ok((bytes_read, returned_buffer))) => {
149 self.internal_buffer = Some(returned_buffer);
151
152 if bytes_read > 0 {
153 if let Some(buffer_guard) =
155 self.internal_buffer.as_ref().and_then(|b| b.try_access())
156 {
157 let data = &buffer_guard[..bytes_read];
158
159 let to_copy = data.len().min(buf.remaining());
161 buf.put_slice(&data[..to_copy]);
162
163 if to_copy < data.len() {
165 self.cached_data.extend_from_slice(&data[to_copy..]);
166 self.cached_pos = 0;
167 }
168 }
169 }
170
171 Poll::Ready(Ok(()))
172 }
173 Poll::Ready(Err(e)) => {
174 Poll::Ready(Err(io::Error::other(e.to_string())))
176 }
177 Poll::Pending => {
178 self.read_future = Some(future);
180 Poll::Pending
181 }
182 }
183 } else if let Some(buffer) = self.internal_buffer.take() {
184 let future = self.ring.read_owned(self.fd, buffer);
186 self.read_future = Some(Box::pin(future));
187
188 self.poll_read(cx, buf)
190 } else {
191 Poll::Ready(Err(io::Error::other(
193 "AsyncReadAdapter internal buffer unavailable",
194 )))
195 }
196 }
197}
198
199pub struct AsyncWriteAdapter<'ring> {
204 ring: &'ring Ring<'ring>,
205 fd: RawFd,
206 #[allow(dead_code)]
208 write_buffer: Vec<u8>,
209 internal_buffer: Option<OwnedBuffer>,
211 write_future: Option<OwnedOperationFuture<'ring>>,
213}
214
215impl<'ring> AsyncWriteAdapter<'ring> {
216 pub fn new(ring: &'ring Ring<'ring>, fd: RawFd) -> Self {
223 Self {
224 ring,
225 fd,
226 write_buffer: Vec::new(),
227 internal_buffer: Some(OwnedBuffer::new(ADAPTER_BUFFER_SIZE)),
228 write_future: None,
229 }
230 }
231
232 pub fn ring(&self) -> &'ring Ring<'ring> {
234 self.ring
235 }
236
237 pub fn fd(&self) -> RawFd {
239 self.fd
240 }
241}
242
243impl<'ring> AsyncWrite for AsyncWriteAdapter<'ring> {
244 fn poll_write(
245 mut self: Pin<&mut Self>,
246 cx: &mut Context<'_>,
247 buf: &[u8],
248 ) -> Poll<io::Result<usize>> {
249 if let Some(mut future) = self.write_future.take() {
251 match future.as_mut().poll(cx) {
252 Poll::Ready(Ok((_bytes_written, returned_buffer))) => {
253 self.internal_buffer = Some(returned_buffer);
255 }
257 Poll::Ready(Err(e)) => {
258 return Poll::Ready(Err(io::Error::other(e.to_string())));
260 }
261 Poll::Pending => {
262 self.write_future = Some(future);
264 return Poll::Pending;
265 }
266 }
267 }
268
269 if let Some(internal_buffer) = self.internal_buffer.take() {
271 if let Some(mut buffer_guard) = internal_buffer.try_access() {
273 let to_copy = buf.len().min(buffer_guard.len());
274 buffer_guard[..to_copy].copy_from_slice(&buf[..to_copy]);
275
276 drop(buffer_guard);
278
279 let future = self.ring.write_owned(self.fd, internal_buffer);
281 self.write_future = Some(Box::pin(future));
282
283 match self.poll_write(cx, &[]) {
285 Poll::Ready(Ok(_)) => Poll::Ready(Ok(to_copy)),
286 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
287 Poll::Pending => Poll::Ready(Ok(to_copy)), }
289 } else {
290 self.internal_buffer = Some(internal_buffer);
292 Poll::Pending
293 }
294 } else {
295 Poll::Ready(Err(io::Error::other(
297 "AsyncWriteAdapter internal buffer unavailable",
298 )))
299 }
300 }
301
302 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
303 if let Some(mut future) = self.write_future.take() {
305 match future.as_mut().poll(cx) {
306 Poll::Ready(Ok((_bytes_written, returned_buffer))) => {
307 self.internal_buffer = Some(returned_buffer);
309 Poll::Ready(Ok(()))
310 }
311 Poll::Ready(Err(e)) => {
312 Poll::Ready(Err(io::Error::other(e.to_string())))
314 }
315 Poll::Pending => {
316 self.write_future = Some(future);
318 Poll::Pending
319 }
320 }
321 } else {
322 Poll::Ready(Ok(()))
324 }
325 }
326
327 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
328 match self.as_mut().poll_flush(cx) {
330 Poll::Ready(Ok(())) => {
331 Poll::Ready(Ok(()))
333 }
334 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
335 Poll::Pending => Poll::Pending,
336 }
337 }
338}
339
340pub struct File<'ring> {
361 ring: &'ring Ring<'ring>,
362 fd: RawFd,
363 read_adapter: Option<AsyncReadAdapter<'ring>>,
364 write_adapter: Option<AsyncWriteAdapter<'ring>>,
365}
366
367impl<'ring> File<'ring> {
368 pub fn new(ring: &'ring Ring<'ring>, fd: RawFd) -> Self {
375 Self {
376 ring,
377 fd,
378 read_adapter: Some(AsyncReadAdapter::new(ring, fd)),
379 write_adapter: Some(AsyncWriteAdapter::new(ring, fd)),
380 }
381 }
382
383 pub async fn create(ring: &'ring Ring<'ring>, path: &str) -> Result<Self> {
387 use std::ffi::CString;
388
389 let path_cstr = CString::new(path).map_err(|_| {
390 SaferRingError::Io(io::Error::new(io::ErrorKind::InvalidInput, "Invalid path"))
391 })?;
392
393 let fd = unsafe {
395 libc::open(
396 path_cstr.as_ptr(),
397 libc::O_CREAT | libc::O_WRONLY | libc::O_TRUNC,
398 0o644,
399 )
400 };
401
402 if fd == -1 {
403 return Err(SaferRingError::Io(io::Error::last_os_error()));
404 }
405
406 Ok(Self::new(ring, fd))
407 }
408
409 pub async fn open(ring: &'ring Ring<'ring>, path: &str) -> Result<Self> {
413 use std::ffi::CString;
414
415 let path_cstr = CString::new(path).map_err(|_| {
416 SaferRingError::Io(io::Error::new(io::ErrorKind::InvalidInput, "Invalid path"))
417 })?;
418
419 let fd = unsafe { libc::open(path_cstr.as_ptr(), libc::O_RDWR, 0) };
421
422 if fd == -1 {
423 return Err(SaferRingError::Io(io::Error::last_os_error()));
424 }
425
426 Ok(Self::new(ring, fd))
427 }
428
429 pub async fn sync_all(&self) -> Result<()> {
431 Ok(())
433 }
434
435 pub fn fd(&self) -> RawFd {
437 self.fd
438 }
439
440 pub fn ring(&self) -> &'ring Ring<'ring> {
442 self.ring
443 }
444}
445
446impl<'ring> Drop for File<'ring> {
447 fn drop(&mut self) {
448 unsafe {
449 libc::close(self.fd);
450 }
451 }
452}
453
454impl<'ring> AsyncRead for File<'ring> {
455 fn poll_read(
456 mut self: Pin<&mut Self>,
457 _cx: &mut Context<'_>,
458 buf: &mut ReadBuf<'_>,
459 ) -> Poll<io::Result<()>> {
460 if let Some(adapter) = &mut self.read_adapter {
461 Pin::new(adapter).poll_read(_cx, buf)
462 } else {
463 Poll::Ready(Err(io::Error::other("Read adapter not available")))
464 }
465 }
466}
467
468impl<'ring> AsyncWrite for File<'ring> {
469 fn poll_write(
470 mut self: Pin<&mut Self>,
471 _cx: &mut Context<'_>,
472 buf: &[u8],
473 ) -> Poll<io::Result<usize>> {
474 if let Some(adapter) = &mut self.write_adapter {
475 Pin::new(adapter).poll_write(_cx, buf)
476 } else {
477 Poll::Ready(Err(io::Error::other("Write adapter not available")))
478 }
479 }
480
481 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
482 if let Some(adapter) = &mut self.write_adapter {
483 Pin::new(adapter).poll_flush(cx)
484 } else {
485 Poll::Ready(Ok(()))
486 }
487 }
488
489 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
490 if let Some(adapter) = &mut self.write_adapter {
491 Pin::new(adapter).poll_shutdown(cx)
492 } else {
493 Poll::Ready(Ok(()))
494 }
495 }
496}
497
498pub struct Socket<'ring> {
502 ring: &'ring Ring<'ring>,
503 fd: RawFd,
504 read_adapter: AsyncReadAdapter<'ring>,
505 write_adapter: AsyncWriteAdapter<'ring>,
506}
507
508impl<'ring> Socket<'ring> {
509 pub fn new(ring: &'ring Ring<'ring>, fd: RawFd) -> Self {
511 Self {
512 ring,
513 fd,
514 read_adapter: AsyncReadAdapter::new(ring, fd),
515 write_adapter: AsyncWriteAdapter::new(ring, fd),
516 }
517 }
518
519 pub fn fd(&self) -> RawFd {
521 self.fd
522 }
523
524 pub fn ring(&self) -> &'ring Ring<'ring> {
526 self.ring
527 }
528}
529
530impl<'ring> AsyncRead for Socket<'ring> {
531 fn poll_read(
532 mut self: Pin<&mut Self>,
533 _cx: &mut Context<'_>,
534 buf: &mut ReadBuf<'_>,
535 ) -> Poll<io::Result<()>> {
536 Pin::new(&mut self.read_adapter).poll_read(_cx, buf)
537 }
538}
539
540impl<'ring> AsyncWrite for Socket<'ring> {
541 fn poll_write(
542 mut self: Pin<&mut Self>,
543 _cx: &mut Context<'_>,
544 buf: &[u8],
545 ) -> Poll<io::Result<usize>> {
546 Pin::new(&mut self.write_adapter).poll_write(_cx, buf)
547 }
548
549 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
550 Pin::new(&mut self.write_adapter).poll_flush(cx)
551 }
552
553 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
554 Pin::new(&mut self.write_adapter).poll_shutdown(cx)
555 }
556}
557
558pub trait AsyncCompat<'ring> {
563 fn async_read(&'ring self, fd: RawFd) -> AsyncReadAdapter<'ring>;
565
566 fn async_write(&'ring self, fd: RawFd) -> AsyncWriteAdapter<'ring>;
568
569 fn file(&'ring self, fd: RawFd) -> File<'ring>;
571
572 fn socket(&'ring self, fd: RawFd) -> Socket<'ring>;
574}
575
576impl<'ring> AsyncCompat<'ring> for Ring<'ring> {
577 fn async_read(&'ring self, fd: RawFd) -> AsyncReadAdapter<'ring> {
578 AsyncReadAdapter::new(self, fd)
579 }
580
581 fn async_write(&'ring self, fd: RawFd) -> AsyncWriteAdapter<'ring> {
582 AsyncWriteAdapter::new(self, fd)
583 }
584
585 fn file(&'ring self, fd: RawFd) -> File<'ring> {
586 File::new(self, fd)
587 }
588
589 fn socket(&'ring self, fd: RawFd) -> Socket<'ring> {
590 Socket::new(self, fd)
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597
598 #[tokio::test]
599 async fn test_async_read_adapter_creation() {
600 #[cfg(target_os = "linux")]
601 {
602 use crate::Ring;
603
604 let _can_create_adapter: for<'r> fn(&'r Ring<'r>) -> AsyncReadAdapter<'r> =
607 |ring| ring.async_read(0);
608
609 }
611
612 #[cfg(not(target_os = "linux"))]
613 {
614 println!("Skipping AsyncRead adapter test on non-Linux platform");
615 }
616 }
617
618 #[tokio::test]
619 async fn test_async_write_adapter_creation() {
620 #[cfg(target_os = "linux")]
621 {
622 use crate::Ring;
623
624 let _can_create_adapter: for<'r> fn(&'r Ring<'r>) -> AsyncWriteAdapter<'r> =
627 |ring| ring.async_write(1);
628
629 }
631
632 #[cfg(not(target_os = "linux"))]
633 {
634 println!("Skipping AsyncWrite adapter test on non-Linux platform");
635 }
636 }
637
638 #[tokio::test]
639 async fn test_file_wrapper_creation() {
640 #[cfg(target_os = "linux")]
641 {
642 use crate::Ring;
643
644 let _can_create_file: for<'r> fn(&'r Ring<'r>) -> File<'r> = |ring| ring.file(0);
647
648 println!("✓ File wrapper method accessible");
649 }
650
651 #[cfg(not(target_os = "linux"))]
652 {
653 println!("Skipping File wrapper test on non-Linux platform");
654 }
655 }
656
657 #[tokio::test]
658 async fn test_socket_wrapper_creation() {
659 #[cfg(target_os = "linux")]
660 {
661 use crate::Ring;
662
663 let _can_create_socket: for<'r> fn(&'r Ring<'r>) -> Socket<'r> = |ring| ring.socket(0);
666
667 println!("✓ Socket wrapper method accessible");
668 }
669
670 #[cfg(not(target_os = "linux"))]
671 {
672 println!("Skipping Socket wrapper test on non-Linux platform");
673 }
674 }
675
676 #[test]
677 fn test_adapter_buffer_size() {
678 const _: () = assert!(ADAPTER_BUFFER_SIZE > 0, "Buffer size must be positive");
681 const _: () = assert!(
682 ADAPTER_BUFFER_SIZE <= 65536,
683 "Buffer size should be reasonable"
684 ); }
686}