1use crate::error::ZeroFsError;
7use crate::file::File;
8use std::future::Future;
9use std::io::{self, SeekFrom};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::task::{Context, Poll, ready};
13use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
14
15type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
16
17enum State {
18 Idle,
19 Reading(BoxFut<Result<bytes::Bytes, ZeroFsError>>),
20 Writing {
22 fut: BoxFut<Result<(), ZeroFsError>>,
23 len: u64,
24 },
25 SeekingEnd {
27 fut: BoxFut<Result<u64, ZeroFsError>>,
28 delta: i64,
29 },
30}
31
32pub struct FileCursor {
41 file: Arc<File>,
42 pos: u64,
43 state: State,
44}
45
46impl FileCursor {
47 pub(crate) fn new(file: Arc<File>) -> Self {
48 Self {
49 file,
50 pos: 0,
51 state: State::Idle,
52 }
53 }
54
55 pub fn position(&self) -> u64 {
57 self.pos
58 }
59}
60
61impl AsyncRead for FileCursor {
62 fn poll_read(
63 mut self: Pin<&mut Self>,
64 cx: &mut Context<'_>,
65 buf: &mut ReadBuf<'_>,
66 ) -> Poll<io::Result<()>> {
67 loop {
68 match &mut self.state {
69 State::Reading(fut) => {
70 let data = ready!(fut.as_mut().poll(cx));
71 self.state = State::Idle;
72 let data = data.map_err(io::Error::from)?;
73 let n = data.len().min(buf.remaining());
74 buf.put_slice(&data[..n]);
75 self.pos += n as u64;
76 return Poll::Ready(Ok(()));
77 }
78 State::Idle => {
79 if buf.remaining() == 0 {
80 return Poll::Ready(Ok(()));
81 }
82 let chunk = self.file.max_read_chunk().max(1) as usize;
85 let len = buf.remaining().min(chunk) as u32;
86 let offset = self.pos;
87 let file = Arc::clone(&self.file);
88 self.state =
89 State::Reading(Box::pin(async move { file.read_at(offset, len).await }));
90 }
91 _ => return Poll::Ready(Err(busy())),
92 }
93 }
94 }
95}
96
97impl AsyncWrite for FileCursor {
98 fn poll_write(
99 mut self: Pin<&mut Self>,
100 cx: &mut Context<'_>,
101 buf: &[u8],
102 ) -> Poll<io::Result<usize>> {
103 loop {
104 match &mut self.state {
105 State::Writing { fut, len } => {
106 let res = ready!(fut.as_mut().poll(cx));
107 let len = *len;
108 self.state = State::Idle;
109 res.map_err(io::Error::from)?;
110 self.pos += len;
111 return Poll::Ready(Ok(len as usize));
112 }
113 State::Idle => {
114 if buf.is_empty() {
115 return Poll::Ready(Ok(0));
116 }
117 let offset = self.pos;
120 let data = buf.to_vec();
121 let len = data.len() as u64;
122 let file = Arc::clone(&self.file);
123 self.state = State::Writing {
124 fut: Box::pin(async move { file.write_at(offset, &data).await }),
125 len,
126 };
127 }
128 _ => return Poll::Ready(Err(busy())),
129 }
130 }
131 }
132
133 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
134 match &mut self.state {
138 State::Writing { fut, len } => {
139 let res = ready!(fut.as_mut().poll(cx));
140 let len = *len;
141 self.state = State::Idle;
142 res.map_err(io::Error::from)?;
143 self.pos += len;
144 Poll::Ready(Ok(()))
145 }
146 _ => Poll::Ready(Ok(())),
147 }
148 }
149
150 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
151 self.poll_flush(cx)
152 }
153}
154
155impl AsyncSeek for FileCursor {
156 fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
157 if !matches!(self.state, State::Idle) {
158 return Err(busy());
159 }
160 match position {
161 SeekFrom::Start(n) => self.pos = n,
162 SeekFrom::Current(delta) => {
163 self.pos = self
164 .pos
165 .checked_add_signed(delta)
166 .ok_or_else(negative_seek)?;
167 }
168 SeekFrom::End(delta) => {
169 let file = Arc::clone(&self.file);
170 self.state = State::SeekingEnd {
171 fut: Box::pin(async move { file.metadata().await.map(|m| m.size) }),
172 delta,
173 };
174 }
175 }
176 Ok(())
177 }
178
179 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
180 if let State::SeekingEnd { fut, delta } = &mut self.state {
181 let size = ready!(fut.as_mut().poll(cx));
182 let delta = *delta;
183 self.state = State::Idle;
184 let size = size.map_err(io::Error::from)?;
185 self.pos = size.checked_add_signed(delta).ok_or_else(negative_seek)?;
186 }
187 Poll::Ready(Ok(self.pos))
188 }
189}
190
191fn busy() -> io::Error {
192 io::Error::other("zerofs FileCursor: another I/O operation is already in flight")
193}
194
195fn negative_seek() -> io::Error {
196 io::Error::new(
197 io::ErrorKind::InvalidInput,
198 "zerofs FileCursor: seek to a negative or out-of-range offset",
199 )
200}