scatter_net/legacy/peer/methods/
put_blob.rs

1use std::{
2    future::Future,
3    mem::replace,
4    pin::Pin,
5    task::Poll::{Pending, Ready},
6};
7
8use bytes::Bytes;
9use n0_future::{FutureExt, StreamExt};
10use ps_promise::PromiseRejection;
11
12use crate::{spawn_and_forget, Interaction, Packet, Peer, PutRequest, PutResponse};
13
14impl Peer {
15    pub fn put_blob(self, data: Bytes) -> PeerPutBlob {
16        PeerPutBlob {
17            // Start in the BeginInteraction state
18            state: PeerPutBlobState::BeginInteraction {
19                data,
20                // Start the interaction process immediately
21                future: Box::pin(self.begin_interaction()),
22            },
23        }
24    }
25}
26
27type BeginInteractionResult = Result<Interaction, crate::PeerBeginInteractionError>;
28type SendPacketResult = Result<(), crate::InteractionSendPacketError>;
29
30enum PeerPutBlobState {
31    BeginInteraction {
32        data: Bytes,
33        future: Pin<Box<dyn Future<Output = BeginInteractionResult> + Send + Sync>>,
34    },
35    SendPacket {
36        interaction: Interaction,
37        future: Pin<Box<dyn Future<Output = SendPacketResult> + Send + Sync>>,
38    },
39    AwaitResponse {
40        interaction: Interaction,
41    },
42    Done,
43    Failed,
44    Processing,
45}
46
47pub struct PeerPutBlob {
48    state: PeerPutBlobState,
49}
50
51impl Future for PeerPutBlob {
52    type Output = Result<PutResponse, PeerPutBlobError>;
53
54    fn poll(
55        self: std::pin::Pin<&mut Self>,
56        cx: &mut std::task::Context<'_>,
57    ) -> std::task::Poll<Self::Output> {
58        let this = self.get_mut();
59
60        loop {
61            match replace(&mut this.state, PeerPutBlobState::Processing) {
62                // --- State 1: Begin Interaction ---
63                PeerPutBlobState::BeginInteraction { data, mut future } => {
64                    match future.poll(cx) {
65                        Ready(Ok(interaction)) => {
66                            // Success! Prepare the request packet
67                            let request = PutRequest { data };
68                            let packet = Packet::PutRequest(request);
69
70                            let send_future = {
71                                let interaction = interaction.clone();
72
73                                async move { interaction.send_packet(packet).await }
74                            };
75
76                            // Transition to the next state
77                            this.state = PeerPutBlobState::SendPacket {
78                                interaction,
79                                future: Box::pin(send_future),
80                            };
81
82                            // Continue the loop to poll the SendPacket state immediately
83                            continue;
84                        }
85                        Ready(Err(err)) => {
86                            this.state = PeerPutBlobState::Failed;
87
88                            // Drop all state and return the Error
89                            return Ready(Err(PeerPutBlobError::BeginInteraction(err)));
90                        }
91                        Pending => {
92                            this.state = PeerPutBlobState::BeginInteraction { data, future };
93
94                            // Interaction not ready, keep waiting...
95                            return Pending;
96                        }
97                    }
98                }
99
100                // --- State 2: Send Packet ---
101                PeerPutBlobState::SendPacket {
102                    interaction,
103                    mut future,
104                } => {
105                    match future.poll(cx) {
106                        Ready(Ok(())) => {
107                            // Packet sent successfully!
108                            this.state = PeerPutBlobState::AwaitResponse { interaction };
109
110                            // Continue the loop to poll the AwaitResponse state immediately
111                            continue;
112                        }
113                        Ready(Err(err)) => {
114                            this.state = PeerPutBlobState::Failed;
115
116                            // Drop state and return the Error.
117                            return Ready(Err(PeerPutBlobError::SendPacket(err)));
118                        }
119                        Pending => {
120                            this.state = PeerPutBlobState::SendPacket {
121                                interaction,
122                                future,
123                            };
124
125                            // Packet not delivered, keep waiting...
126                            return Pending;
127                        }
128                    }
129                }
130
131                // --- State 3: Await Response ---
132                PeerPutBlobState::AwaitResponse { mut interaction } => {
133                    // Poll the interaction stream for the next packet
134                    let packet = match interaction.poll_next(cx) {
135                        Pending => {
136                            this.state = PeerPutBlobState::AwaitResponse { interaction };
137
138                            return Pending;
139                        }
140                        Ready(None) => {
141                            this.state = PeerPutBlobState::Failed;
142
143                            return Ready(Err(PeerPutBlobError::DidNotRespond));
144                        }
145                        Ready(Some(Ok(packet))) => packet,
146                        Ready(Some(Err(err))) => {
147                            this.state = PeerPutBlobState::Failed;
148
149                            return Ready(Err(PeerPutBlobError::ReadPacket(err)));
150                        }
151                    };
152
153                    // Process the received packet
154                    match packet {
155                        Packet::PutResponse(response) => {
156                            // Got the expected response!
157                            this.state = PeerPutBlobState::Done;
158
159                            return Ready(Ok(response));
160                        }
161                        packet => {
162                            // Got an unexpected packet type
163                            let peer = interaction.get_peer().clone();
164
165                            // Spawn a task to process the unexpected packet anyway
166                            spawn_and_forget(async move {
167                                packet.process(peer).await?;
168                                Ok(())
169                            });
170
171                            this.state = PeerPutBlobState::Failed;
172
173                            return Ready(Err(PeerPutBlobError::InvalidResponse));
174                        }
175                    }
176                }
177                // --- Terminal States ---
178                PeerPutBlobState::Done | PeerPutBlobState::Failed => {
179                    return Ready(Err(PeerPutBlobError::MultipleAwaits))
180                }
181                // --- Invalid State ---
182                PeerPutBlobState::Processing => {
183                    return Ready(Err(PeerPutBlobError::ProcessingState))
184                }
185            }
186        }
187    }
188}
189
190#[derive(thiserror::Error, Debug)]
191pub enum PeerPutBlobError {
192    #[error("This Promise was consumed more than once.")]
193    AlreadyConsumed,
194    #[error(transparent)]
195    BeginInteraction(#[from] crate::PeerBeginInteractionError),
196    #[error("Peer did not respond to the put request.")]
197    DidNotRespond,
198    #[error("Peer did not provide a valid response.")]
199    InvalidResponse,
200    #[error("This future was awaited multiple times, which isn't supported.")]
201    MultipleAwaits,
202    #[error("This is an internal exception which you shouldn't encounter.")]
203    ProcessingState,
204    #[error("Failed to read packet: {0}")]
205    ReadPacket(#[from] crate::InteractionReadPacketError),
206    #[error(transparent)]
207    SendPacket(#[from] crate::InteractionSendPacketError),
208}
209
210impl PromiseRejection for PeerPutBlobError {
211    fn already_consumed() -> Self {
212        Self::AlreadyConsumed
213    }
214}
215
216type Result<T = PutResponse, E = PeerPutBlobError> = std::result::Result<T, E>;