Skip to main content

moonpool_sim/storage/
file.rs

1//! Simulated storage file implementation.
2
3use crate::sim::WeakSimWorld;
4use crate::sim::state::FileId;
5use async_trait::async_trait;
6use moonpool_core::StorageFile;
7use std::cell::Cell;
8use std::io::{self, SeekFrom};
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
12
13use super::futures::{SetLenFuture, SyncFuture};
14use super::sim_shutdown_error;
15
16/// State for an in-progress seek operation.
17#[derive(Debug, Clone, Copy)]
18enum SeekState {
19    /// No seek in progress.
20    Idle,
21    /// Seek requested, waiting to complete.
22    Seeking(u64),
23}
24
25/// Simulated storage file for deterministic testing.
26///
27/// This provides a simulation-aware file handle that integrates with
28/// the deterministic simulation engine for testing storage I/O patterns.
29///
30/// ## State Tracking
31///
32/// The file tracks pending operations using `Cell`-based state:
33/// - `pending_read`: Active read operation (op_seq, offset, len)
34/// - `pending_write`: Active write operation (op_seq, bytes_written)
35/// - `seek_state`: Current seek state
36///
37/// ## Polling Pattern
38///
39/// Operations follow the schedule → wait → complete pattern:
40/// 1. First poll: Schedule operation with SimWorld, store pending state
41/// 2. Subsequent polls: Check completion, return Pending until done
42/// 3. Final poll: Clear pending state, return result
43#[derive(Debug)]
44pub struct SimStorageFile {
45    sim: WeakSimWorld,
46    file_id: FileId,
47    /// Pending read operation: (op_seq, offset, len)
48    pending_read: Cell<Option<(u64, u64, usize)>>,
49    /// Pending write operation: (op_seq, bytes_written)
50    pending_write: Cell<Option<(u64, usize)>>,
51    /// Current seek state
52    seek_state: Cell<SeekState>,
53}
54
55impl SimStorageFile {
56    /// Create a new simulated storage file.
57    pub(crate) fn new(sim: WeakSimWorld, file_id: FileId) -> Self {
58        Self {
59            sim,
60            file_id,
61            pending_read: Cell::new(None),
62            pending_write: Cell::new(None),
63            seek_state: Cell::new(SeekState::Idle),
64        }
65    }
66
67    /// Get the file ID.
68    pub fn file_id(&self) -> FileId {
69        self.file_id
70    }
71}
72
73#[async_trait(?Send)]
74impl StorageFile for SimStorageFile {
75    async fn sync_all(&self) -> io::Result<()> {
76        SyncFuture::new(self.sim.clone(), self.file_id).await
77    }
78
79    async fn sync_data(&self) -> io::Result<()> {
80        // Simulation treats sync_all and sync_data identically
81        SyncFuture::new(self.sim.clone(), self.file_id).await
82    }
83
84    async fn size(&self) -> io::Result<u64> {
85        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
86        sim.get_file_size(self.file_id)
87            .map_err(|e| io::Error::other(e.to_string()))
88    }
89
90    async fn set_len(&self, size: u64) -> io::Result<()> {
91        SetLenFuture::new(self.sim.clone(), self.file_id, size).await
92    }
93}
94
95impl AsyncRead for SimStorageFile {
96    fn poll_read(
97        self: Pin<&mut Self>,
98        cx: &mut Context<'_>,
99        buf: &mut ReadBuf<'_>,
100    ) -> Poll<io::Result<()>> {
101        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
102
103        // Check for pending read operation
104        if let Some((op_seq, offset, len)) = self.pending_read.get() {
105            // Check if operation is complete
106            if sim.is_storage_op_complete(self.file_id, op_seq) {
107                // Clear pending state
108                self.pending_read.set(None);
109
110                // Calculate how many bytes to actually read
111                let bytes_to_read = buf.remaining().min(len);
112                if bytes_to_read == 0 {
113                    return Poll::Ready(Ok(()));
114                }
115
116                // Read from file at the stored offset
117                let mut temp_buf = vec![0u8; bytes_to_read];
118                let bytes_read = sim
119                    .read_from_file(self.file_id, offset, &mut temp_buf)
120                    .map_err(|e| io::Error::other(e.to_string()))?;
121
122                // Update file position
123                let new_position = offset + bytes_read as u64;
124                sim.set_file_position(self.file_id, new_position)
125                    .map_err(|e| io::Error::other(e.to_string()))?;
126
127                // Copy to output buffer
128                buf.put_slice(&temp_buf[..bytes_read]);
129                return Poll::Ready(Ok(()));
130            }
131
132            // Operation not complete, register waker and wait
133            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
134            return Poll::Pending;
135        }
136
137        // No pending read - start a new one
138
139        // Get current position
140        let position = sim
141            .get_file_position(self.file_id)
142            .map_err(|e| io::Error::other(e.to_string()))?;
143
144        // Get file size to check for EOF
145        let file_size = sim
146            .get_file_size(self.file_id)
147            .map_err(|e| io::Error::other(e.to_string()))?;
148
149        // Check for EOF
150        if position >= file_size {
151            return Poll::Ready(Ok(())); // EOF - 0 bytes read
152        }
153
154        // Calculate bytes to read (don't read past EOF)
155        let remaining_in_file = (file_size - position) as usize;
156        let len = buf.remaining().min(remaining_in_file);
157
158        if len == 0 {
159            return Poll::Ready(Ok(()));
160        }
161
162        // Schedule the read operation
163        let op_seq = sim
164            .schedule_read(self.file_id, position, len)
165            .map_err(|e| io::Error::other(e.to_string()))?;
166
167        // Store pending state
168        self.pending_read.set(Some((op_seq, position, len)));
169
170        // Register waker
171        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
172
173        Poll::Pending
174    }
175}
176
177impl AsyncWrite for SimStorageFile {
178    fn poll_write(
179        self: Pin<&mut Self>,
180        cx: &mut Context<'_>,
181        buf: &[u8],
182    ) -> Poll<io::Result<usize>> {
183        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
184
185        // Check for pending write operation
186        if let Some((op_seq, bytes_written)) = self.pending_write.get() {
187            // Check if operation is complete
188            if sim.is_storage_op_complete(self.file_id, op_seq) {
189                // Clear pending state
190                self.pending_write.set(None);
191
192                // Update file position
193                let position = sim
194                    .get_file_position(self.file_id)
195                    .map_err(|e| io::Error::other(e.to_string()))?;
196                let new_position = position + bytes_written as u64;
197                sim.set_file_position(self.file_id, new_position)
198                    .map_err(|e| io::Error::other(e.to_string()))?;
199
200                return Poll::Ready(Ok(bytes_written));
201            }
202
203            // Operation not complete, register waker and wait
204            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
205            return Poll::Pending;
206        }
207
208        // No pending write - start a new one
209
210        if buf.is_empty() {
211            return Poll::Ready(Ok(0));
212        }
213
214        // Get current position
215        let position = sim
216            .get_file_position(self.file_id)
217            .map_err(|e| io::Error::other(e.to_string()))?;
218
219        // Schedule the write operation
220        let op_seq = sim
221            .schedule_write(self.file_id, position, buf.to_vec())
222            .map_err(|e| io::Error::other(e.to_string()))?;
223
224        // Store pending state
225        self.pending_write.set(Some((op_seq, buf.len())));
226
227        // Register waker
228        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
229
230        Poll::Pending
231    }
232
233    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
234        // Flush is a no-op - durability comes from sync_all/sync_data
235        Poll::Ready(Ok(()))
236    }
237
238    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
239        // Shutdown is a no-op - file cleanup handled via Drop if needed
240        Poll::Ready(Ok(()))
241    }
242}
243
244impl AsyncSeek for SimStorageFile {
245    fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
246        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
247
248        // Calculate target position
249        let current_position = sim
250            .get_file_position(self.file_id)
251            .map_err(|e| io::Error::other(e.to_string()))?;
252
253        let file_size = sim
254            .get_file_size(self.file_id)
255            .map_err(|e| io::Error::other(e.to_string()))?;
256
257        let target = match position {
258            SeekFrom::Start(pos) => pos,
259            SeekFrom::End(offset) => {
260                if offset >= 0 {
261                    file_size.saturating_add(offset as u64)
262                } else {
263                    file_size.saturating_sub((-offset) as u64)
264                }
265            }
266            SeekFrom::Current(offset) => {
267                if offset >= 0 {
268                    current_position.saturating_add(offset as u64)
269                } else {
270                    current_position.saturating_sub((-offset) as u64)
271                }
272            }
273        };
274
275        // Store the seek state
276        self.seek_state.set(SeekState::Seeking(target));
277        Ok(())
278    }
279
280    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
281        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
282
283        match self.seek_state.get() {
284            SeekState::Idle => {
285                // No seek in progress, return current position
286                let position = sim
287                    .get_file_position(self.file_id)
288                    .map_err(|e| io::Error::other(e.to_string()))?;
289                Poll::Ready(Ok(position))
290            }
291            SeekState::Seeking(target) => {
292                // Complete the seek by setting the position
293                sim.set_file_position(self.file_id, target)
294                    .map_err(|e| io::Error::other(e.to_string()))?;
295                self.seek_state.set(SeekState::Idle);
296                Poll::Ready(Ok(target))
297            }
298        }
299    }
300}