vortex_serde/io/
tokio.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#![cfg(feature = "tokio")]

use std::future::Future;
use std::io;
use std::os::unix::prelude::FileExt;

use bytes::BytesMut;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;
use vortex_buffer::io_buf::IoBuf;
use vortex_error::{VortexError, VortexUnwrap as _};

use crate::io::{VortexRead, VortexReadAt, VortexWrite};
use crate::layouts::AsyncRuntime;

pub struct TokioAdapter<IO>(pub IO);

impl<R: AsyncRead + Unpin> VortexRead for TokioAdapter<R> {
    async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result<BytesMut> {
        self.0.read_exact(buffer.as_mut()).await?;
        Ok(buffer)
    }
}

impl<W: AsyncWrite + Unpin> VortexWrite for TokioAdapter<W> {
    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
        self.0.write_all(buffer.as_slice()).await?;
        Ok(buffer)
    }

    async fn flush(&mut self) -> io::Result<()> {
        self.0.flush().await
    }

    async fn shutdown(&mut self) -> io::Result<()> {
        self.0.shutdown().await
    }
}

impl VortexRead for File {
    async fn read_into(&mut self, mut buffer: BytesMut) -> io::Result<BytesMut> {
        self.read_exact(buffer.as_mut()).await?;
        Ok(buffer)
    }
}

impl VortexReadAt for File {
    async fn read_at_into(&self, pos: u64, mut buffer: BytesMut) -> io::Result<BytesMut> {
        let std_file = self.try_clone().await?.into_std().await;
        std_file.read_exact_at(buffer.as_mut(), pos)?;
        Ok(buffer)
    }

    async fn size(&self) -> u64 {
        self.metadata()
            .await
            .map_err(|err| VortexError::IOError(err).with_context("Failed to get file metadata"))
            .vortex_unwrap()
            .len()
    }
}

impl VortexWrite for File {
    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
        AsyncWriteExt::write_all(self, buffer.as_slice()).await?;
        Ok(buffer)
    }

    async fn flush(&mut self) -> io::Result<()> {
        AsyncWriteExt::flush(self).await
    }

    async fn shutdown(&mut self) -> io::Result<()> {
        AsyncWriteExt::shutdown(self).await
    }
}

impl AsyncRuntime for Runtime {
    fn block_on<F: Future>(&self, fut: F) -> F::Output {
        self.block_on(fut)
    }
}