binary_cookies/tokio/
cursor.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    sync::Arc,
5    task::{ready, Poll},
6};
7
8use positioned_io::{RandomAccessFile, ReadAt};
9use tokio::{io::AsyncRead, task::JoinHandle};
10
11pub trait CookieCursor {
12    type Cursor<'a>: AsyncRead + Unpin + 'a
13    where
14        Self: 'a;
15
16    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_>;
17}
18
19impl CookieCursor for &[u8] {
20    type Cursor<'a>
21        = &'a [u8]
22    where
23        Self: 'a;
24
25    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
26        &self[offset as usize..]
27    }
28}
29
30impl CookieCursor for Vec<u8> {
31    type Cursor<'a>
32        = &'a [u8]
33    where
34        Self: 'a;
35
36    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
37        &self[offset as usize..]
38    }
39}
40
41impl CookieCursor for Arc<RandomAccessFile> {
42    type Cursor<'a>
43        = AsyncCursor
44    where
45        Self: 'a;
46
47    fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
48        AsyncCursor {
49            file: Self::clone(self),
50            file_offset: offset,
51            state: State::Idle(Some(Buf {
52                buf: vec![0; 512],
53                valid_len: 0,
54                buf_offset: 0,
55            })),
56        }
57    }
58}
59
60#[derive(Debug)]
61pub struct AsyncCursor {
62    file: Arc<RandomAccessFile>,
63    file_offset: u64,
64    state: State,
65}
66
67#[derive(Clone)]
68#[derive(Debug)]
69#[derive(Default)]
70#[derive(PartialEq, Eq, PartialOrd, Ord)]
71struct Buf {
72    buf: Vec<u8>,
73    valid_len: usize,
74    buf_offset: usize,
75}
76
77#[derive(Debug)]
78enum State {
79    Idle(Option<Buf>),
80    Busy(JoinHandle<Result<Buf, std::io::Error>>),
81}
82
83impl AsyncRead for AsyncCursor {
84    fn poll_read(
85        mut self: std::pin::Pin<&mut Self>,
86        cx: &mut std::task::Context<'_>,
87        buf: &mut tokio::io::ReadBuf<'_>,
88    ) -> std::task::Poll<std::io::Result<()>> {
89        loop {
90            match &mut self.state {
91                State::Idle(buf_cell) => {
92                    #[expect(clippy::unwrap_used, reason = "It must be `Some`")]
93                    let mut buffer = buf_cell.take().unwrap();
94
95                    if buffer.buf_offset < buffer.valid_len {
96                        let read_len = buf
97                            .remaining()
98                            .min(buffer.valid_len - buffer.buf_offset);
99                        buf.put_slice(&buffer.buf[buffer.buf_offset..][..read_len]);
100                        buffer.buf_offset += read_len;
101
102                        *buf_cell = Some(buffer);
103                        return Poll::Ready(Ok(()));
104                    }
105
106                    let f = Arc::clone(&self.file);
107                    let file_offset = self.file_offset;
108
109                    let jh = tokio::task::spawn_blocking(move || -> Result<_, std::io::Error> {
110                        let readed = f.read_at(file_offset, &mut buffer.buf)?;
111                        buffer.valid_len = readed;
112                        buffer.buf_offset = 0;
113                        Ok(buffer)
114                    });
115                    self.state = State::Busy(jh);
116                },
117                State::Busy(jh) => match ready!(Pin::new(jh).poll(cx))? {
118                    Ok(buffer) => {
119                        self.file_offset += buffer.valid_len as u64;
120                        self.state = State::Idle(Some(buffer));
121                        continue;
122                    },
123                    Err(e) => return Poll::Ready(Err(e)),
124                },
125            }
126        }
127    }
128}