Skip to main content

moonpool_sim/storage/
file.rs

1//! Simulated storage file implementation.
2
3use crate::sim::WeakSimWorld;
4use crate::sim::state::FileId;
5use async_trait::async_trait;
6use moonpool_core::StorageFile;
7use std::cell::Cell;
8use std::io::{self, SeekFrom};
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
12
13use super::futures::{SetLenFuture, SyncFuture};
14use super::sim_shutdown_error;
15
16/// State for an in-progress seek operation.
17#[derive(Debug, Clone, Copy)]
18enum SeekState {
19    /// No seek in progress.
20    Idle,
21    /// Seek requested, waiting to complete.
22    Seeking(u64),
23}
24
25/// Simulated storage file for deterministic testing.
26///
27/// This provides a simulation-aware file handle that integrates with
28/// the deterministic simulation engine for testing storage I/O patterns.
29///
30/// ## State Tracking
31///
32/// The file tracks pending operations using `Cell`-based state:
33/// - `pending_read`: Active read operation (op_seq, offset, len)
34/// - `pending_write`: Active write operation (op_seq, bytes_written)
35/// - `seek_state`: Current seek state
36///
37/// ## Polling Pattern
38///
39/// Operations follow the schedule → wait → complete pattern:
40/// 1. First poll: Schedule operation with SimWorld, store pending state
41/// 2. Subsequent polls: Check completion, return Pending until done
42/// 3. Final poll: Clear pending state, return result
43#[derive(Debug)]
44pub struct SimStorageFile {
45    sim: WeakSimWorld,
46    file_id: FileId,
47    /// Pending read operation: (op_seq, offset, len)
48    pending_read: Cell<Option<(u64, u64, usize)>>,
49    /// Pending write operation: (op_seq, bytes_written)
50    pending_write: Cell<Option<(u64, usize)>>,
51    /// Current seek state
52    seek_state: Cell<SeekState>,
53}
54
55impl SimStorageFile {
56    /// Create a new simulated storage file.
57    pub(crate) fn new(sim: WeakSimWorld, file_id: FileId) -> Self {
58        Self {
59            sim,
60            file_id,
61            pending_read: Cell::new(None),
62            pending_write: Cell::new(None),
63            seek_state: Cell::new(SeekState::Idle),
64        }
65    }
66
67    /// Get the file ID.
68    pub fn file_id(&self) -> FileId {
69        self.file_id
70    }
71}
72
73#[async_trait(?Send)]
74impl StorageFile for SimStorageFile {
75    async fn sync_all(&self) -> io::Result<()> {
76        SyncFuture::new(self.sim.clone(), self.file_id).await
77    }
78
79    async fn sync_data(&self) -> io::Result<()> {
80        // Simulation treats sync_all and sync_data identically
81        SyncFuture::new(self.sim.clone(), self.file_id).await
82    }
83
84    async fn size(&self) -> io::Result<u64> {
85        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
86        sim.file_size(self.file_id)
87            .map_err(|e| io::Error::other(e.to_string()))
88    }
89
90    async fn set_len(&self, size: u64) -> io::Result<()> {
91        SetLenFuture::new(self.sim.clone(), self.file_id, size).await
92    }
93}
94
95impl AsyncRead for SimStorageFile {
96    fn poll_read(
97        self: Pin<&mut Self>,
98        cx: &mut Context<'_>,
99        buf: &mut ReadBuf<'_>,
100    ) -> Poll<io::Result<()>> {
101        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
102
103        // Check for pending read operation
104        if let Some((op_seq, offset, len)) = self.pending_read.get() {
105            // Check if operation is complete
106            if sim.is_storage_op_complete(self.file_id, op_seq) {
107                // Clear pending state
108                self.pending_read.set(None);
109
110                // Calculate how many bytes to actually read
111                let bytes_to_read = buf.remaining().min(len);
112                if bytes_to_read == 0 {
113                    return Poll::Ready(Ok(()));
114                }
115
116                // Read from file at the stored offset
117                let mut temp_buf = vec![0u8; bytes_to_read];
118                let bytes_read = sim.read_from_file(self.file_id, offset, &mut temp_buf)?;
119
120                // Update file position
121                let new_position = offset + bytes_read as u64;
122                sim.set_file_position(self.file_id, new_position)?;
123
124                // Copy to output buffer
125                buf.put_slice(&temp_buf[..bytes_read]);
126                return Poll::Ready(Ok(()));
127            }
128
129            // Operation not complete, register waker and wait
130            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
131            return Poll::Pending;
132        }
133
134        // No pending read - start a new one
135
136        // Get current position
137        let position = sim.file_position(self.file_id)?;
138
139        // Get file size to check for EOF
140        let file_size = sim.file_size(self.file_id)?;
141
142        // Check for EOF
143        if position >= file_size {
144            return Poll::Ready(Ok(())); // EOF - 0 bytes read
145        }
146
147        // Calculate bytes to read (don't read past EOF)
148        let remaining_in_file = (file_size - position) as usize;
149        let len = buf.remaining().min(remaining_in_file);
150
151        if len == 0 {
152            return Poll::Ready(Ok(()));
153        }
154
155        // Schedule the read operation
156        let op_seq = sim.schedule_read(self.file_id, position, len)?;
157
158        // Store pending state
159        self.pending_read.set(Some((op_seq, position, len)));
160
161        // Register waker
162        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
163
164        Poll::Pending
165    }
166}
167
168impl AsyncWrite for SimStorageFile {
169    fn poll_write(
170        self: Pin<&mut Self>,
171        cx: &mut Context<'_>,
172        buf: &[u8],
173    ) -> Poll<io::Result<usize>> {
174        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
175
176        // Check for pending write operation
177        if let Some((op_seq, bytes_written)) = self.pending_write.get() {
178            // Check if operation is complete
179            if sim.is_storage_op_complete(self.file_id, op_seq) {
180                // Clear pending state
181                self.pending_write.set(None);
182
183                // Update file position
184                let position = sim.file_position(self.file_id)?;
185                let new_position = position + bytes_written as u64;
186                sim.set_file_position(self.file_id, new_position)?;
187
188                return Poll::Ready(Ok(bytes_written));
189            }
190
191            // Operation not complete, register waker and wait
192            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
193            return Poll::Pending;
194        }
195
196        // No pending write - start a new one
197
198        if buf.is_empty() {
199            return Poll::Ready(Ok(0));
200        }
201
202        // Get current position
203        let position = sim.file_position(self.file_id)?;
204
205        // Schedule the write operation
206        let op_seq = sim.schedule_write(self.file_id, position, buf.to_vec())?;
207
208        // Store pending state
209        self.pending_write.set(Some((op_seq, buf.len())));
210
211        // Register waker
212        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
213
214        Poll::Pending
215    }
216
217    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
218        // Flush is a no-op - durability comes from sync_all/sync_data
219        Poll::Ready(Ok(()))
220    }
221
222    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
223        // Shutdown is a no-op - file cleanup handled via Drop if needed
224        Poll::Ready(Ok(()))
225    }
226}
227
228impl AsyncSeek for SimStorageFile {
229    fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
230        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
231
232        // Calculate target position
233        let current_position = sim.file_position(self.file_id)?;
234
235        let file_size = sim.file_size(self.file_id)?;
236
237        let target = match position {
238            SeekFrom::Start(pos) => pos,
239            SeekFrom::End(offset) => {
240                if offset >= 0 {
241                    file_size.saturating_add(offset as u64)
242                } else {
243                    file_size.saturating_sub((-offset) as u64)
244                }
245            }
246            SeekFrom::Current(offset) => {
247                if offset >= 0 {
248                    current_position.saturating_add(offset as u64)
249                } else {
250                    current_position.saturating_sub((-offset) as u64)
251                }
252            }
253        };
254
255        // Store the seek state
256        self.seek_state.set(SeekState::Seeking(target));
257        Ok(())
258    }
259
260    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
261        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
262
263        match self.seek_state.get() {
264            SeekState::Idle => {
265                // No seek in progress, return current position
266                let position = sim.file_position(self.file_id)?;
267                Poll::Ready(Ok(position))
268            }
269            SeekState::Seeking(target) => {
270                // Complete the seek by setting the position
271                sim.set_file_position(self.file_id, target)?;
272                self.seek_state.set(SeekState::Idle);
273                Poll::Ready(Ok(target))
274            }
275        }
276    }
277}