moonpool_sim/storage/
file.rs1use crate::sim::WeakSimWorld;
4use crate::sim::state::FileId;
5use async_trait::async_trait;
6use moonpool_core::StorageFile;
7use std::cell::Cell;
8use std::io::{self, SeekFrom};
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
12
13use super::futures::{SetLenFuture, SyncFuture};
14use super::sim_shutdown_error;
15
16#[derive(Debug, Clone, Copy)]
18enum SeekState {
19 Idle,
21 Seeking(u64),
23}
24
25#[derive(Debug)]
44pub struct SimStorageFile {
45 sim: WeakSimWorld,
46 file_id: FileId,
47 pending_read: Cell<Option<(u64, u64, usize)>>,
49 pending_write: Cell<Option<(u64, usize)>>,
51 seek_state: Cell<SeekState>,
53}
54
55impl SimStorageFile {
56 pub(crate) fn new(sim: WeakSimWorld, file_id: FileId) -> Self {
58 Self {
59 sim,
60 file_id,
61 pending_read: Cell::new(None),
62 pending_write: Cell::new(None),
63 seek_state: Cell::new(SeekState::Idle),
64 }
65 }
66
67 pub fn file_id(&self) -> FileId {
69 self.file_id
70 }
71}
72
73#[async_trait(?Send)]
74impl StorageFile for SimStorageFile {
75 async fn sync_all(&self) -> io::Result<()> {
76 SyncFuture::new(self.sim.clone(), self.file_id).await
77 }
78
79 async fn sync_data(&self) -> io::Result<()> {
80 SyncFuture::new(self.sim.clone(), self.file_id).await
82 }
83
84 async fn size(&self) -> io::Result<u64> {
85 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
86 sim.file_size(self.file_id)
87 .map_err(|e| io::Error::other(e.to_string()))
88 }
89
90 async fn set_len(&self, size: u64) -> io::Result<()> {
91 SetLenFuture::new(self.sim.clone(), self.file_id, size).await
92 }
93}
94
95impl AsyncRead for SimStorageFile {
96 fn poll_read(
97 self: Pin<&mut Self>,
98 cx: &mut Context<'_>,
99 buf: &mut ReadBuf<'_>,
100 ) -> Poll<io::Result<()>> {
101 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
102
103 if let Some((op_seq, offset, len)) = self.pending_read.get() {
105 if sim.is_storage_op_complete(self.file_id, op_seq) {
107 self.pending_read.set(None);
109
110 let bytes_to_read = buf.remaining().min(len);
112 if bytes_to_read == 0 {
113 return Poll::Ready(Ok(()));
114 }
115
116 let mut temp_buf = vec![0u8; bytes_to_read];
118 let bytes_read = sim.read_from_file(self.file_id, offset, &mut temp_buf)?;
119
120 let new_position = offset + bytes_read as u64;
122 sim.set_file_position(self.file_id, new_position)?;
123
124 buf.put_slice(&temp_buf[..bytes_read]);
126 return Poll::Ready(Ok(()));
127 }
128
129 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
131 return Poll::Pending;
132 }
133
134 let position = sim.file_position(self.file_id)?;
138
139 let file_size = sim.file_size(self.file_id)?;
141
142 if position >= file_size {
144 return Poll::Ready(Ok(())); }
146
147 let remaining_in_file = (file_size - position) as usize;
149 let len = buf.remaining().min(remaining_in_file);
150
151 if len == 0 {
152 return Poll::Ready(Ok(()));
153 }
154
155 let op_seq = sim.schedule_read(self.file_id, position, len)?;
157
158 self.pending_read.set(Some((op_seq, position, len)));
160
161 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
163
164 Poll::Pending
165 }
166}
167
168impl AsyncWrite for SimStorageFile {
169 fn poll_write(
170 self: Pin<&mut Self>,
171 cx: &mut Context<'_>,
172 buf: &[u8],
173 ) -> Poll<io::Result<usize>> {
174 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
175
176 if let Some((op_seq, bytes_written)) = self.pending_write.get() {
178 if sim.is_storage_op_complete(self.file_id, op_seq) {
180 self.pending_write.set(None);
182
183 let position = sim.file_position(self.file_id)?;
185 let new_position = position + bytes_written as u64;
186 sim.set_file_position(self.file_id, new_position)?;
187
188 return Poll::Ready(Ok(bytes_written));
189 }
190
191 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
193 return Poll::Pending;
194 }
195
196 if buf.is_empty() {
199 return Poll::Ready(Ok(0));
200 }
201
202 let position = sim.file_position(self.file_id)?;
204
205 let op_seq = sim.schedule_write(self.file_id, position, buf.to_vec())?;
207
208 self.pending_write.set(Some((op_seq, buf.len())));
210
211 sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
213
214 Poll::Pending
215 }
216
217 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
218 Poll::Ready(Ok(()))
220 }
221
222 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
223 Poll::Ready(Ok(()))
225 }
226}
227
228impl AsyncSeek for SimStorageFile {
229 fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
230 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
231
232 let current_position = sim.file_position(self.file_id)?;
234
235 let file_size = sim.file_size(self.file_id)?;
236
237 let target = match position {
238 SeekFrom::Start(pos) => pos,
239 SeekFrom::End(offset) => {
240 if offset >= 0 {
241 file_size.saturating_add(offset as u64)
242 } else {
243 file_size.saturating_sub((-offset) as u64)
244 }
245 }
246 SeekFrom::Current(offset) => {
247 if offset >= 0 {
248 current_position.saturating_add(offset as u64)
249 } else {
250 current_position.saturating_sub((-offset) as u64)
251 }
252 }
253 };
254
255 self.seek_state.set(SeekState::Seeking(target));
257 Ok(())
258 }
259
260 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
261 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
262
263 match self.seek_state.get() {
264 SeekState::Idle => {
265 let position = sim.file_position(self.file_id)?;
267 Poll::Ready(Ok(position))
268 }
269 SeekState::Seeking(target) => {
270 sim.set_file_position(self.file_id, target)?;
272 self.seek_state.set(SeekState::Idle);
273 Poll::Ready(Ok(target))
274 }
275 }
276 }
277}