Skip to main content

moonpool_sim/storage/
futures.rs

1//! Future types for storage async operations.
2//!
3//! These futures handle the schedule → wait → complete pattern for
4//! storage operations that don't fit into the standard AsyncRead/AsyncWrite
5//! traits.
6
7use crate::sim::WeakSimWorld;
8use crate::sim::state::FileId;
9use std::cell::Cell;
10use std::future::Future;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15use super::sim_shutdown_error;
16
17/// Future for sync_all and sync_data operations.
18///
19/// Follows the schedule → wait → complete pattern:
20/// 1. First poll: Schedule sync with SimWorld, store op_seq
21/// 2. Subsequent polls: Check completion, return Pending until done
22/// 3. Final poll: Clear state, return Ok(())
23pub struct SyncFuture {
24    sim: WeakSimWorld,
25    file_id: FileId,
26    /// Pending operation sequence number
27    pending_op: Cell<Option<u64>>,
28}
29
30impl SyncFuture {
31    /// Create a new sync future.
32    pub(crate) fn new(sim: WeakSimWorld, file_id: FileId) -> Self {
33        Self {
34            sim,
35            file_id,
36            pending_op: Cell::new(None),
37        }
38    }
39}
40
41impl Future for SyncFuture {
42    type Output = io::Result<()>;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
46
47        // Check for pending operation
48        if let Some(op_seq) = self.pending_op.get() {
49            // Check if operation is complete
50            if sim.is_storage_op_complete(self.file_id, op_seq) {
51                // Clear pending state
52                self.pending_op.set(None);
53
54                // Check if sync failed due to fault injection
55                if sim.take_sync_failure(self.file_id, op_seq) {
56                    return Poll::Ready(Err(io::Error::other("sync failed (simulated I/O error)")));
57                }
58
59                return Poll::Ready(Ok(()));
60            }
61
62            // Operation not complete, register waker and wait
63            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
64            return Poll::Pending;
65        }
66
67        // No pending operation - start a new one
68        let op_seq = sim
69            .schedule_sync(self.file_id)
70            .map_err(|e| io::Error::other(e.to_string()))?;
71
72        // Store pending state
73        self.pending_op.set(Some(op_seq));
74
75        // Register waker
76        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
77
78        Poll::Pending
79    }
80}
81
82/// Future for set_len operations.
83///
84/// Follows the schedule → wait → complete pattern:
85/// 1. First poll: Schedule set_len with SimWorld, store op_seq
86/// 2. Subsequent polls: Check completion, return Pending until done
87/// 3. Final poll: Clear state, return Ok(())
88pub struct SetLenFuture {
89    sim: WeakSimWorld,
90    file_id: FileId,
91    /// The new length to set the file to.
92    new_len: u64,
93    /// Pending operation sequence number
94    pending_op: Cell<Option<u64>>,
95}
96
97impl SetLenFuture {
98    /// Create a new set_len future.
99    pub(crate) fn new(sim: WeakSimWorld, file_id: FileId, new_len: u64) -> Self {
100        Self {
101            sim,
102            file_id,
103            new_len,
104            pending_op: Cell::new(None),
105        }
106    }
107}
108
109impl Future for SetLenFuture {
110    type Output = io::Result<()>;
111
112    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
113        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
114
115        // Check for pending operation
116        if let Some(op_seq) = self.pending_op.get() {
117            // Check if operation is complete
118            if sim.is_storage_op_complete(self.file_id, op_seq) {
119                // Clear pending state
120                self.pending_op.set(None);
121                return Poll::Ready(Ok(()));
122            }
123
124            // Operation not complete, register waker and wait
125            sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
126            return Poll::Pending;
127        }
128
129        // No pending operation - start a new one
130        let op_seq = sim
131            .schedule_set_len(self.file_id, self.new_len)
132            .map_err(|e| io::Error::other(e.to_string()))?;
133
134        // Store pending state
135        self.pending_op.set(Some(op_seq));
136
137        // Register waker
138        sim.register_storage_waker(self.file_id, op_seq, cx.waker().clone());
139
140        Poll::Pending
141    }
142}