fuser_async/
async_readseek.rs

1use std::future::Future;
2
3use crate::FilesystemSSUS;
4
5// Inspired by the [`tokio::io`] implementation of [`tokio::io::AsyncRead`] on [`tokio::io::File`].
6#[derive(Debug)]
7enum State<E> {
8    Idle,
9    Busy(tokio::task::JoinHandle<Operation<E>>),
10}
11
12#[derive(Debug)]
13enum Operation<E> {
14    Read(Result<bytes::Bytes, E>),
15}
16struct FileHandleInner<E> {
17    state: State<E>,
18    pos: u64,
19}
20impl<E> Default for FileHandleInner<E> {
21    fn default() -> Self {
22        Self {
23            state: State::Idle,
24            pos: 0,
25        }
26    }
27}
28macro_rules! ready {
29    ($e:expr) => {
30        match $e {
31            std::task::Poll::Ready(t) => t,
32            std::task::Poll::Pending => {
33                return std::task::Poll::Pending;
34            }
35        }
36    };
37}
38/// Owned handle on a FUSE filesystem file, implementing [`tokio::io::AsyncRead`].
39///
40/// The handle is released when the object is dropped
41pub struct FileHandle<F: FilesystemSSUS + Clone>
42where
43    F::Error: Send,
44{
45    fs: F,
46    fh: u64,
47    inode: u64,
48    inner: tokio::sync::Mutex<FileHandleInner<F::Error>>,
49}
50
51impl<F: FilesystemSSUS + Clone> FileHandle<F>
52where
53    F::Error: Send,
54{
55    /// Create a new owned handle (calling `open` with the provided flags) on an inode.
56    pub async fn new(fs: F, inode: u64, flags: i32) -> Result<FileHandle<F>, F::Error> {
57        let fh = fs.open(inode, flags).await?;
58        Ok(FileHandle {
59            fs,
60            fh,
61            inode,
62            inner: Default::default(),
63        })
64    }
65}
66
67impl<F: FilesystemSSUS + Clone> Drop for FileHandle<F>
68where
69    F::Error: Send,
70{
71    fn drop(&mut self) {
72        let fs = self.fs.clone();
73        let fh = self.fh;
74        let inode = self.inode;
75        tokio::spawn(async move { fs.release(inode, fh).await });
76    }
77}
78
79impl<F: FilesystemSSUS + Clone> tokio::io::AsyncRead for FileHandle<F>
80where
81    F::Error: Send + std::fmt::Display,
82{
83    fn poll_read(
84        self: std::pin::Pin<&mut Self>,
85        cx: &mut std::task::Context<'_>,
86        dst: &mut tokio::io::ReadBuf<'_>,
87    ) -> std::task::Poll<tokio::io::Result<()>> {
88        let me = self.get_mut();
89        let inner = me.inner.get_mut();
90        let capacity = dst.remaining();
91
92        loop {
93            match inner.state {
94                State::Idle => {
95                    let (inode, fh, fs) = (me.inode, me.fh, me.fs.clone());
96                    let offset = inner.pos;
97                    inner.state = State::Busy(tokio::task::spawn(async move {
98                        Operation::Read(fs.read(inode, fh, offset as i64, capacity as u32).await)
99                    }))
100                }
101                State::Busy(ref mut rx) => {
102                    let op = ready!(std::pin::Pin::new(rx).poll(cx))?;
103                    match op {
104                        Operation::Read(Ok(buf)) => {
105                            dst.put_slice(&buf);
106                            inner.pos += buf.len() as u64;
107                            inner.state = State::Idle;
108                            return std::task::Poll::Ready(Ok(()));
109                        }
110                        Operation::Read(Err(e)) => {
111                            inner.state = State::Idle;
112
113                            return std::task::Poll::Ready(Err(tokio::io::Error::new(
114                                tokio::io::ErrorKind::Other,
115                                e.to_string(),
116                            )));
117                        }
118                    }
119                }
120            }
121        }
122    }
123}
124
125impl<F: FilesystemSSUS + Clone> tokio::io::AsyncSeek for FileHandle<F>
126where
127    F::Error: Send,
128{
129    fn start_seek(
130        self: std::pin::Pin<&mut Self>,
131        position: std::io::SeekFrom,
132    ) -> tokio::io::Result<()> {
133        let me = self.get_mut();
134        let inner = me.inner.get_mut();
135        match inner.state {
136            State::Idle => {
137                inner.pos = match position {
138                    std::io::SeekFrom::Start(s) => s,
139                    std::io::SeekFrom::Current(s) => {
140                        if s >= 0 {
141                            inner.pos.checked_add(s as u64).unwrap_or_default()
142                        } else {
143                            inner.pos.checked_sub(s.unsigned_abs()).unwrap_or_default()
144                        }
145                    }
146                    _ => {
147                        return Err(tokio::io::Error::new(
148                            tokio::io::ErrorKind::Other,
149                            "Unsupported seek mode",
150                        ))
151                    }
152                };
153
154                Ok(())
155            }
156            State::Busy(_) => Err(tokio::io::Error::new(
157                tokio::io::ErrorKind::Other,
158                "other file operation is pending, call poll_complete before start_seek",
159            )),
160        }
161    }
162
163    fn poll_complete(
164        self: std::pin::Pin<&mut Self>,
165        _cx: &mut std::task::Context<'_>,
166    ) -> std::task::Poll<std::io::Result<u64>> {
167        let me = self.get_mut();
168        let inner = me.inner.get_mut();
169        std::task::Poll::Ready(Ok(inner.pos))
170    }
171}