fuser_async/
async_readseek.rs1use std::future::Future;
2
3use crate::FilesystemSSUS;
4
5#[derive(Debug)]
7enum State<E> {
8 Idle,
9 Busy(tokio::task::JoinHandle<Operation<E>>),
10}
11
12#[derive(Debug)]
13enum Operation<E> {
14 Read(Result<bytes::Bytes, E>),
15}
16struct FileHandleInner<E> {
17 state: State<E>,
18 pos: u64,
19}
20impl<E> Default for FileHandleInner<E> {
21 fn default() -> Self {
22 Self {
23 state: State::Idle,
24 pos: 0,
25 }
26 }
27}
28macro_rules! ready {
29 ($e:expr) => {
30 match $e {
31 std::task::Poll::Ready(t) => t,
32 std::task::Poll::Pending => {
33 return std::task::Poll::Pending;
34 }
35 }
36 };
37}
38pub struct FileHandle<F: FilesystemSSUS + Clone>
42where
43 F::Error: Send,
44{
45 fs: F,
46 fh: u64,
47 inode: u64,
48 inner: tokio::sync::Mutex<FileHandleInner<F::Error>>,
49}
50
51impl<F: FilesystemSSUS + Clone> FileHandle<F>
52where
53 F::Error: Send,
54{
55 pub async fn new(fs: F, inode: u64, flags: i32) -> Result<FileHandle<F>, F::Error> {
57 let fh = fs.open(inode, flags).await?;
58 Ok(FileHandle {
59 fs,
60 fh,
61 inode,
62 inner: Default::default(),
63 })
64 }
65}
66
67impl<F: FilesystemSSUS + Clone> Drop for FileHandle<F>
68where
69 F::Error: Send,
70{
71 fn drop(&mut self) {
72 let fs = self.fs.clone();
73 let fh = self.fh;
74 let inode = self.inode;
75 tokio::spawn(async move { fs.release(inode, fh).await });
76 }
77}
78
79impl<F: FilesystemSSUS + Clone> tokio::io::AsyncRead for FileHandle<F>
80where
81 F::Error: Send + std::fmt::Display,
82{
83 fn poll_read(
84 self: std::pin::Pin<&mut Self>,
85 cx: &mut std::task::Context<'_>,
86 dst: &mut tokio::io::ReadBuf<'_>,
87 ) -> std::task::Poll<tokio::io::Result<()>> {
88 let me = self.get_mut();
89 let inner = me.inner.get_mut();
90 let capacity = dst.remaining();
91
92 loop {
93 match inner.state {
94 State::Idle => {
95 let (inode, fh, fs) = (me.inode, me.fh, me.fs.clone());
96 let offset = inner.pos;
97 inner.state = State::Busy(tokio::task::spawn(async move {
98 Operation::Read(fs.read(inode, fh, offset as i64, capacity as u32).await)
99 }))
100 }
101 State::Busy(ref mut rx) => {
102 let op = ready!(std::pin::Pin::new(rx).poll(cx))?;
103 match op {
104 Operation::Read(Ok(buf)) => {
105 dst.put_slice(&buf);
106 inner.pos += buf.len() as u64;
107 inner.state = State::Idle;
108 return std::task::Poll::Ready(Ok(()));
109 }
110 Operation::Read(Err(e)) => {
111 inner.state = State::Idle;
112
113 return std::task::Poll::Ready(Err(tokio::io::Error::new(
114 tokio::io::ErrorKind::Other,
115 e.to_string(),
116 )));
117 }
118 }
119 }
120 }
121 }
122 }
123}
124
125impl<F: FilesystemSSUS + Clone> tokio::io::AsyncSeek for FileHandle<F>
126where
127 F::Error: Send,
128{
129 fn start_seek(
130 self: std::pin::Pin<&mut Self>,
131 position: std::io::SeekFrom,
132 ) -> tokio::io::Result<()> {
133 let me = self.get_mut();
134 let inner = me.inner.get_mut();
135 match inner.state {
136 State::Idle => {
137 inner.pos = match position {
138 std::io::SeekFrom::Start(s) => s,
139 std::io::SeekFrom::Current(s) => {
140 if s >= 0 {
141 inner.pos.checked_add(s as u64).unwrap_or_default()
142 } else {
143 inner.pos.checked_sub(s.unsigned_abs()).unwrap_or_default()
144 }
145 }
146 _ => {
147 return Err(tokio::io::Error::new(
148 tokio::io::ErrorKind::Other,
149 "Unsupported seek mode",
150 ))
151 }
152 };
153
154 Ok(())
155 }
156 State::Busy(_) => Err(tokio::io::Error::new(
157 tokio::io::ErrorKind::Other,
158 "other file operation is pending, call poll_complete before start_seek",
159 )),
160 }
161 }
162
163 fn poll_complete(
164 self: std::pin::Pin<&mut Self>,
165 _cx: &mut std::task::Context<'_>,
166 ) -> std::task::Poll<std::io::Result<u64>> {
167 let me = self.get_mut();
168 let inner = me.inner.get_mut();
169 std::task::Poll::Ready(Ok(inner.pos))
170 }
171}