moonpool_sim/storage/
file.rs1use crate::sim::WeakSimWorld;
4use crate::sim::state::FileId;
5use async_trait::async_trait;
6use moonpool_core::StorageFile;
7use std::cell::Cell;
8use std::io::{self, SeekFrom};
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
12
13use super::futures::{SetLenFuture, SyncFuture};
14use super::sim_shutdown_error;
15
16#[derive(Debug, Clone, Copy)]
18enum SeekState {
19 Idle,
21 Seeking(u64),
23}
24
25#[derive(Debug)]
44pub struct SimStorageFile {
45 sim: WeakSimWorld,
46 file_id: FileId,
47 pending_read: Cell<Option<(u64, u64, usize)>>,
49 pending_write: Cell<Option<(u64, usize)>>,
51 seek_state: Cell<SeekState>,
53}
54
55impl SimStorageFile {
56 pub(crate) fn new(sim: WeakSimWorld, file_id: FileId) -> Self {
58 Self {
59 sim,
60 file_id,
61 pending_read: Cell::new(None),
62 pending_write: Cell::new(None),
63 seek_state: Cell::new(SeekState::Idle),
64 }
65 }
66
67 pub fn file_id(&self) -> FileId {
69 self.file_id
70 }
71}
72
73#[async_trait(?Send)]
74impl StorageFile for SimStorageFile {
75 async fn sync_all(&self) -> io::Result<()> {
76 SyncFuture::new(self.sim.clone(), self.file_id).await
77 }
78
79 async fn sync_data(&self) -> io::Result<()> {
80 SyncFuture::new(self.sim.clone(), self.file_id).await
82 }
83
84 async fn size(&self) -> io::Result<u64> {
85 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
86 sim.get_file_size(self.file_id)
87 .map_err(|e| io::Error::other(e.to_string()))
88 }
89
90 async fn set_len(&self, size: u64) -> io::Result<()> {
91 SetLenFuture::new(self.sim.clone(), self.file_id, size).await
92 }
93}
94
95impl AsyncRead for SimStorageFile {
96 fn poll_read(
97 self: Pin<&mut Self>,
98 cx: &mut Context<'_>,
99 buf: &mut ReadBuf<'_>,
100 ) -> Poll<io::Result<()>> {
101 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
102
103 if let Some((op_seq, offset, len)) = self.pending_read.get() {
105 if sim.is_storage_op_complete(self.file_id, op_seq) {
107 self.pending_read.set(None);
109
110 let bytes_to_read = buf.remaining().min(len);
112 if bytes_to_read == 0 {
113 return Poll::Ready(Ok(()));
114 }
115
116 let mut temp_buf = vec![0u8; bytes_to_read];
118 let bytes_read = sim
119 .read_from_file(self.file_id, offset, &mut temp_buf)
120 .map_err(|e| io::Error::other(e.to_string()))?;
121
122 let new_position = offset + bytes_read as u64;
124 sim.set_file_position(self.file_id, new_position)
125 .map_err(|e| io::Error::other(e.to_string()))?;
126
127 buf.put_slice(&temp_buf[..bytes_read]);
129 return Poll::Ready(Ok(()));
130 }
131
132 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
134 return Poll::Pending;
135 }
136
137 let position = sim
141 .get_file_position(self.file_id)
142 .map_err(|e| io::Error::other(e.to_string()))?;
143
144 let file_size = sim
146 .get_file_size(self.file_id)
147 .map_err(|e| io::Error::other(e.to_string()))?;
148
149 if position >= file_size {
151 return Poll::Ready(Ok(())); }
153
154 let remaining_in_file = (file_size - position) as usize;
156 let len = buf.remaining().min(remaining_in_file);
157
158 if len == 0 {
159 return Poll::Ready(Ok(()));
160 }
161
162 let op_seq = sim
164 .schedule_read(self.file_id, position, len)
165 .map_err(|e| io::Error::other(e.to_string()))?;
166
167 self.pending_read.set(Some((op_seq, position, len)));
169
170 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
172
173 Poll::Pending
174 }
175}
176
177impl AsyncWrite for SimStorageFile {
178 fn poll_write(
179 self: Pin<&mut Self>,
180 cx: &mut Context<'_>,
181 buf: &[u8],
182 ) -> Poll<io::Result<usize>> {
183 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
184
185 if let Some((op_seq, bytes_written)) = self.pending_write.get() {
187 if sim.is_storage_op_complete(self.file_id, op_seq) {
189 self.pending_write.set(None);
191
192 let position = sim
194 .get_file_position(self.file_id)
195 .map_err(|e| io::Error::other(e.to_string()))?;
196 let new_position = position + bytes_written as u64;
197 sim.set_file_position(self.file_id, new_position)
198 .map_err(|e| io::Error::other(e.to_string()))?;
199
200 return Poll::Ready(Ok(bytes_written));
201 }
202
203 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
205 return Poll::Pending;
206 }
207
208 if buf.is_empty() {
211 return Poll::Ready(Ok(0));
212 }
213
214 let position = sim
216 .get_file_position(self.file_id)
217 .map_err(|e| io::Error::other(e.to_string()))?;
218
219 let op_seq = sim
221 .schedule_write(self.file_id, position, buf.to_vec())
222 .map_err(|e| io::Error::other(e.to_string()))?;
223
224 self.pending_write.set(Some((op_seq, buf.len())));
226
227 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
229
230 Poll::Pending
231 }
232
233 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
234 Poll::Ready(Ok(()))
236 }
237
238 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
239 Poll::Ready(Ok(()))
241 }
242}
243
244impl AsyncSeek for SimStorageFile {
245 fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
246 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
247
248 let current_position = sim
250 .get_file_position(self.file_id)
251 .map_err(|e| io::Error::other(e.to_string()))?;
252
253 let file_size = sim
254 .get_file_size(self.file_id)
255 .map_err(|e| io::Error::other(e.to_string()))?;
256
257 let target = match position {
258 SeekFrom::Start(pos) => pos,
259 SeekFrom::End(offset) => {
260 if offset >= 0 {
261 file_size.saturating_add(offset as u64)
262 } else {
263 file_size.saturating_sub((-offset) as u64)
264 }
265 }
266 SeekFrom::Current(offset) => {
267 if offset >= 0 {
268 current_position.saturating_add(offset as u64)
269 } else {
270 current_position.saturating_sub((-offset) as u64)
271 }
272 }
273 };
274
275 self.seek_state.set(SeekState::Seeking(target));
277 Ok(())
278 }
279
280 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
281 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
282
283 match self.seek_state.get() {
284 SeekState::Idle => {
285 let position = sim
287 .get_file_position(self.file_id)
288 .map_err(|e| io::Error::other(e.to_string()))?;
289 Poll::Ready(Ok(position))
290 }
291 SeekState::Seeking(target) => {
292 sim.set_file_position(self.file_id, target)
294 .map_err(|e| io::Error::other(e.to_string()))?;
295 self.seek_state.set(SeekState::Idle);
296 Poll::Ready(Ok(target))
297 }
298 }
299 }
300}