binary_cookies/tokio/
cursor.rs1use std::{
2 future::Future,
3 pin::Pin,
4 sync::Arc,
5 task::{ready, Poll},
6};
7
8use positioned_io::{RandomAccessFile, ReadAt};
9use tokio::{io::AsyncRead, task::JoinHandle};
10
11pub trait CookieCursor {
12 type Cursor<'a>: AsyncRead + Unpin + 'a
13 where
14 Self: 'a;
15
16 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_>;
17}
18
19impl CookieCursor for &[u8] {
20 type Cursor<'a>
21 = &'a [u8]
22 where
23 Self: 'a;
24
25 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
26 &self[offset as usize..]
27 }
28}
29
30impl CookieCursor for Vec<u8> {
31 type Cursor<'a>
32 = &'a [u8]
33 where
34 Self: 'a;
35
36 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
37 &self[offset as usize..]
38 }
39}
40
41impl CookieCursor for Arc<RandomAccessFile> {
42 type Cursor<'a>
43 = AsyncCursor
44 where
45 Self: 'a;
46
47 fn cursor_at(&self, offset: u64) -> Self::Cursor<'_> {
48 AsyncCursor {
49 file: Self::clone(self),
50 file_offset: offset,
51 state: State::Idle(Some(Buf {
52 buf: vec![0; 512],
53 valid_len: 0,
54 buf_offset: 0,
55 })),
56 }
57 }
58}
59
60#[derive(Debug)]
61pub struct AsyncCursor {
62 file: Arc<RandomAccessFile>,
63 file_offset: u64,
64 state: State,
65}
66
67#[derive(Clone)]
68#[derive(Debug)]
69#[derive(Default)]
70#[derive(PartialEq, Eq, PartialOrd, Ord)]
71struct Buf {
72 buf: Vec<u8>,
73 valid_len: usize,
74 buf_offset: usize,
75}
76
77#[derive(Debug)]
78enum State {
79 Idle(Option<Buf>),
80 Busy(JoinHandle<Result<Buf, std::io::Error>>),
81}
82
83impl AsyncRead for AsyncCursor {
84 fn poll_read(
85 mut self: std::pin::Pin<&mut Self>,
86 cx: &mut std::task::Context<'_>,
87 buf: &mut tokio::io::ReadBuf<'_>,
88 ) -> std::task::Poll<std::io::Result<()>> {
89 loop {
90 match &mut self.state {
91 State::Idle(buf_cell) => {
92 #[expect(clippy::unwrap_used, reason = "It must be `Some`")]
93 let mut buffer = buf_cell.take().unwrap();
94
95 if buffer.buf_offset < buffer.valid_len {
96 let read_len = buf
97 .remaining()
98 .min(buffer.valid_len - buffer.buf_offset);
99 buf.put_slice(&buffer.buf[buffer.buf_offset..][..read_len]);
100 buffer.buf_offset += read_len;
101
102 *buf_cell = Some(buffer);
103 return Poll::Ready(Ok(()));
104 }
105
106 let f = Arc::clone(&self.file);
107 let file_offset = self.file_offset;
108
109 let jh = tokio::task::spawn_blocking(move || -> Result<_, std::io::Error> {
110 let readed = f.read_at(file_offset, &mut buffer.buf)?;
111 buffer.valid_len = readed;
112 buffer.buf_offset = 0;
113 Ok(buffer)
114 });
115 self.state = State::Busy(jh);
116 },
117 State::Busy(jh) => match ready!(Pin::new(jh).poll(cx))? {
118 Ok(buffer) => {
119 self.file_offset += buffer.valid_len as u64;
120 self.state = State::Idle(Some(buffer));
121 continue;
122 },
123 Err(e) => return Poll::Ready(Err(e)),
124 },
125 }
126 }
127 }
128}