1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use crate::errors::make_io_error;
use ara::ReadAt;
use async_trait::async_trait;
use futures::lock::Mutex;
use positioned_io::ReadAt as _;
use std::{fs::File as StdFile, io, sync::Arc};
use tokio::task::spawn_blocking;

pub struct File {
    file: Arc<StdFile>,
    len: u64,
    internal_buf: Mutex<Option<Vec<u8>>>,
}

#[derive(thiserror::Error, Debug)]
enum Error {
    #[error("dead file - a previous read went horribly wrong and it can no longer be read from")]
    DeadFile,
}

impl File {
    pub fn new(file: StdFile) -> io::Result<Self> {
        let len = file.metadata()?.len();
        let file = Arc::new(file);
        let internal_buf = Mutex::new(Some(Vec::new()));
        tracing::trace!("opened file, len = {}", len);
        Ok(Self {
            file,
            len,
            internal_buf,
        })
    }
}

#[async_trait(?Send)]
impl ReadAt for File {
    async fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
        let file = self.file.clone();

        let mut buf_slot = self.internal_buf.lock().await;
        let mut internal_buf = match buf_slot.take() {
            Some(buf) => buf,
            None => return Err(make_io_error(Error::DeadFile)),
        };

        internal_buf.clear();
        internal_buf.reserve(buf.len());
        unsafe {
            internal_buf.set_len(buf.len());
        }

        match spawn_blocking(move || {
            let res = file.read_at(offset, &mut internal_buf);
            (internal_buf, res)
        })
        .await
        {
            Ok((internal_buf, res)) => {
                if let Ok(n) = &res {
                    let n = *n;
                    let dst = &mut buf[..n];
                    let src = &internal_buf[..n];
                    dst.copy_from_slice(src);
                }
                *buf_slot = Some(internal_buf);
                res
            }
            Err(e) => Err(make_io_error(e)),
        }
    }

    fn len(&self) -> u64 {
        self.len
    }
}