Skip to main content

zerofs_client/
io.rs

1//! `tokio-io` feature: a [`FileCursor`] adapter that turns the positioned
2//! [`File`] API into a stateful `AsyncRead + AsyncWrite + AsyncSeek` stream, so
3//! `tokio::io::copy` and friends work against a ZeroFS file. Rust-only; never
4//! crosses the FFI boundary.
5
6use crate::error::ZeroFsError;
7use crate::file::File;
8use std::future::Future;
9use std::io::{self, SeekFrom};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::task::{Context, Poll, ready};
13use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
14
15type BoxFut<T> = Pin<Box<dyn Future<Output = T> + Send>>;
16
17enum State {
18    Idle,
19    Reading(BoxFut<Result<bytes::Bytes, ZeroFsError>>),
20    /// A write in flight, plus how many bytes it covers (added to `pos` on success).
21    Writing {
22        fut: BoxFut<Result<(), ZeroFsError>>,
23        len: u64,
24    },
25    /// A `SeekFrom::End` in flight: resolves the size, then offsets by `delta`.
26    SeekingEnd {
27        fut: BoxFut<Result<u64, ZeroFsError>>,
28        delta: i64,
29    },
30}
31
32/// An owned cursor over a [`File`] implementing `AsyncRead + AsyncWrite +
33/// AsyncSeek`, built entirely on [`File::read_at`]/[`File::write_at`]. Multiple
34/// cursors over one `File` are safe; each carries its own position and the
35/// underlying I/O is positioned, so there is no shared seek pointer.
36///
37/// One operation is in flight at a time per cursor (it is driven by `&mut self`
38/// like every async-io adapter); starting a seek while a read or write is
39/// pending is rejected with `io::ErrorKind::Other`.
40pub struct FileCursor {
41    file: Arc<File>,
42    pos: u64,
43    state: State,
44}
45
46impl FileCursor {
47    pub(crate) fn new(file: Arc<File>) -> Self {
48        Self {
49            file,
50            pos: 0,
51            state: State::Idle,
52        }
53    }
54
55    /// The cursor's current absolute byte offset.
56    pub fn position(&self) -> u64 {
57        self.pos
58    }
59}
60
61impl AsyncRead for FileCursor {
62    fn poll_read(
63        mut self: Pin<&mut Self>,
64        cx: &mut Context<'_>,
65        buf: &mut ReadBuf<'_>,
66    ) -> Poll<io::Result<()>> {
67        loop {
68            match &mut self.state {
69                State::Reading(fut) => {
70                    let data = ready!(fut.as_mut().poll(cx));
71                    self.state = State::Idle;
72                    let data = data.map_err(io::Error::from)?;
73                    let n = data.len().min(buf.remaining());
74                    buf.put_slice(&data[..n]);
75                    self.pos += n as u64;
76                    return Poll::Ready(Ok(()));
77                }
78                State::Idle => {
79                    if buf.remaining() == 0 {
80                        return Poll::Ready(Ok(()));
81                    }
82                    // Cap one read at the negotiated chunk so each poll is a
83                    // single round trip; a partial fill is normal for AsyncRead.
84                    let chunk = self.file.max_read_chunk().max(1) as usize;
85                    let len = buf.remaining().min(chunk) as u32;
86                    let offset = self.pos;
87                    let file = Arc::clone(&self.file);
88                    self.state =
89                        State::Reading(Box::pin(async move { file.read_at(offset, len).await }));
90                }
91                _ => return Poll::Ready(Err(busy())),
92            }
93        }
94    }
95}
96
97impl AsyncWrite for FileCursor {
98    fn poll_write(
99        mut self: Pin<&mut Self>,
100        cx: &mut Context<'_>,
101        buf: &[u8],
102    ) -> Poll<io::Result<usize>> {
103        loop {
104            match &mut self.state {
105                State::Writing { fut, len } => {
106                    let res = ready!(fut.as_mut().poll(cx));
107                    let len = *len;
108                    self.state = State::Idle;
109                    res.map_err(io::Error::from)?;
110                    self.pos += len;
111                    return Poll::Ready(Ok(len as usize));
112                }
113                State::Idle => {
114                    if buf.is_empty() {
115                        return Poll::Ready(Ok(0));
116                    }
117                    // write_at writes all of `data` (chunked internally), so one
118                    // poll consumes the whole buffer.
119                    let offset = self.pos;
120                    let data = buf.to_vec();
121                    let len = data.len() as u64;
122                    let file = Arc::clone(&self.file);
123                    self.state = State::Writing {
124                        fut: Box::pin(async move { file.write_at(offset, &data).await }),
125                        len,
126                    };
127                }
128                _ => return Poll::Ready(Err(busy())),
129            }
130        }
131    }
132
133    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
134        // Drive an in-flight write to completion; there is no client-side write
135        // buffer beyond that (each write_at awaits the server), so once idle
136        // everything sent is acknowledged.
137        match &mut self.state {
138            State::Writing { fut, len } => {
139                let res = ready!(fut.as_mut().poll(cx));
140                let len = *len;
141                self.state = State::Idle;
142                res.map_err(io::Error::from)?;
143                self.pos += len;
144                Poll::Ready(Ok(()))
145            }
146            _ => Poll::Ready(Ok(())),
147        }
148    }
149
150    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
151        self.poll_flush(cx)
152    }
153}
154
155impl AsyncSeek for FileCursor {
156    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
157        if !matches!(self.state, State::Idle) {
158            return Err(busy());
159        }
160        match position {
161            SeekFrom::Start(n) => self.pos = n,
162            SeekFrom::Current(delta) => {
163                self.pos = self
164                    .pos
165                    .checked_add_signed(delta)
166                    .ok_or_else(negative_seek)?;
167            }
168            SeekFrom::End(delta) => {
169                let file = Arc::clone(&self.file);
170                self.state = State::SeekingEnd {
171                    fut: Box::pin(async move { file.metadata().await.map(|m| m.size) }),
172                    delta,
173                };
174            }
175        }
176        Ok(())
177    }
178
179    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
180        if let State::SeekingEnd { fut, delta } = &mut self.state {
181            let size = ready!(fut.as_mut().poll(cx));
182            let delta = *delta;
183            self.state = State::Idle;
184            let size = size.map_err(io::Error::from)?;
185            self.pos = size.checked_add_signed(delta).ok_or_else(negative_seek)?;
186        }
187        Poll::Ready(Ok(self.pos))
188    }
189}
190
191fn busy() -> io::Error {
192    io::Error::other("zerofs FileCursor: another I/O operation is already in flight")
193}
194
195fn negative_seek() -> io::Error {
196    io::Error::new(
197        io::ErrorKind::InvalidInput,
198        "zerofs FileCursor: seek to a negative or out-of-range offset",
199    )
200}