Skip to main content

moonpool_sim/storage/
futures.rs

1//! Future types for storage async operations.
2//!
3//! These futures handle the schedule → wait → complete pattern for
4//! storage operations that don't fit into the standard AsyncRead/AsyncWrite
5//! traits.
6
7use crate::sim::WeakSimWorld;
8use crate::sim::state::FileId;
9use std::cell::Cell;
10use std::future::Future;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15use super::sim_shutdown_error;
16
17/// Future for sync_all and sync_data operations.
18///
19/// Follows the schedule → wait → complete pattern:
20/// 1. First poll: Schedule sync with SimWorld, store op_seq
21/// 2. Subsequent polls: Check completion, return Pending until done
22/// 3. Final poll: Clear state, return Ok(())
23pub struct SyncFuture {
24    sim: WeakSimWorld,
25    file_id: FileId,
26    /// Pending operation sequence number
27    pending_op: Cell<Option<u64>>,
28}
29
30impl SyncFuture {
31    /// Create a new sync future.
32    pub(crate) fn new(sim: WeakSimWorld, file_id: FileId) -> Self {
33        Self {
34            sim,
35            file_id,
36            pending_op: Cell::new(None),
37        }
38    }
39}
40
41impl Future for SyncFuture {
42    type Output = io::Result<()>;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
46
47        // Check for pending operation
48        if let Some(op_seq) = self.pending_op.get() {
49            // Check if operation is complete
50            if sim.is_storage_op_complete(self.file_id, op_seq) {
51                // Clear pending state
52                self.pending_op.set(None);
53
54                // Check if sync failed due to fault injection
55                if sim.take_sync_failure(self.file_id, op_seq) {
56                    return Poll::Ready(Err(io::Error::other("sync failed (simulated I/O error)")));
57                }
58
59                return Poll::Ready(Ok(()));
60            }
61
62            // Operation not complete, register waker and wait
63            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
64            return Poll::Pending;
65        }
66
67        // No pending operation - start a new one
68        let op_seq = sim.schedule_sync(self.file_id)?;
69
70        // Store pending state
71        self.pending_op.set(Some(op_seq));
72
73        // Register waker
74        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
75
76        Poll::Pending
77    }
78}
79
80/// Future for set_len operations.
81///
82/// Follows the schedule → wait → complete pattern:
83/// 1. First poll: Schedule set_len with SimWorld, store op_seq
84/// 2. Subsequent polls: Check completion, return Pending until done
85/// 3. Final poll: Clear state, return Ok(())
86pub struct SetLenFuture {
87    sim: WeakSimWorld,
88    file_id: FileId,
89    /// The new length to set the file to.
90    new_len: u64,
91    /// Pending operation sequence number
92    pending_op: Cell<Option<u64>>,
93}
94
95impl SetLenFuture {
96    /// Create a new set_len future.
97    pub(crate) fn new(sim: WeakSimWorld, file_id: FileId, new_len: u64) -> Self {
98        Self {
99            sim,
100            file_id,
101            new_len,
102            pending_op: Cell::new(None),
103        }
104    }
105}
106
107impl Future for SetLenFuture {
108    type Output = io::Result<()>;
109
110    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
112
113        // Check for pending operation
114        if let Some(op_seq) = self.pending_op.get() {
115            // Check if operation is complete
116            if sim.is_storage_op_complete(self.file_id, op_seq) {
117                // Clear pending state
118                self.pending_op.set(None);
119                return Poll::Ready(Ok(()));
120            }
121
122            // Operation not complete, register waker and wait
123            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
124            return Poll::Pending;
125        }
126
127        // No pending operation - start a new one
128        let op_seq = sim.schedule_set_len(self.file_id, self.new_len)?;
129
130        // Store pending state
131        self.pending_op.set(Some(op_seq));
132
133        // Register waker
134        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
135
136        Poll::Pending
137    }
138}