scatter_net/legacy/peer/methods/
put_blob.rs1use std::{
2 future::Future,
3 mem::replace,
4 pin::Pin,
5 task::Poll::{Pending, Ready},
6};
7
8use bytes::Bytes;
9use n0_future::{FutureExt, StreamExt};
10use ps_promise::PromiseRejection;
11
12use crate::{spawn_and_forget, Interaction, Packet, Peer, PutRequest, PutResponse};
13
14impl Peer {
15 pub fn put_blob(self, data: Bytes) -> PeerPutBlob {
16 PeerPutBlob {
17 state: PeerPutBlobState::BeginInteraction {
19 data,
20 future: Box::pin(self.begin_interaction()),
22 },
23 }
24 }
25}
26
27type BeginInteractionResult = Result<Interaction, crate::PeerBeginInteractionError>;
28type SendPacketResult = Result<(), crate::InteractionSendPacketError>;
29
30enum PeerPutBlobState {
31 BeginInteraction {
32 data: Bytes,
33 future: Pin<Box<dyn Future<Output = BeginInteractionResult> + Send + Sync>>,
34 },
35 SendPacket {
36 interaction: Interaction,
37 future: Pin<Box<dyn Future<Output = SendPacketResult> + Send + Sync>>,
38 },
39 AwaitResponse {
40 interaction: Interaction,
41 },
42 Done,
43 Failed,
44 Processing,
45}
46
47pub struct PeerPutBlob {
48 state: PeerPutBlobState,
49}
50
51impl Future for PeerPutBlob {
52 type Output = Result<PutResponse, PeerPutBlobError>;
53
54 fn poll(
55 self: std::pin::Pin<&mut Self>,
56 cx: &mut std::task::Context<'_>,
57 ) -> std::task::Poll<Self::Output> {
58 let this = self.get_mut();
59
60 loop {
61 match replace(&mut this.state, PeerPutBlobState::Processing) {
62 PeerPutBlobState::BeginInteraction { data, mut future } => {
64 match future.poll(cx) {
65 Ready(Ok(interaction)) => {
66 let request = PutRequest { data };
68 let packet = Packet::PutRequest(request);
69
70 let send_future = {
71 let interaction = interaction.clone();
72
73 async move { interaction.send_packet(packet).await }
74 };
75
76 this.state = PeerPutBlobState::SendPacket {
78 interaction,
79 future: Box::pin(send_future),
80 };
81
82 continue;
84 }
85 Ready(Err(err)) => {
86 this.state = PeerPutBlobState::Failed;
87
88 return Ready(Err(PeerPutBlobError::BeginInteraction(err)));
90 }
91 Pending => {
92 this.state = PeerPutBlobState::BeginInteraction { data, future };
93
94 return Pending;
96 }
97 }
98 }
99
100 PeerPutBlobState::SendPacket {
102 interaction,
103 mut future,
104 } => {
105 match future.poll(cx) {
106 Ready(Ok(())) => {
107 this.state = PeerPutBlobState::AwaitResponse { interaction };
109
110 continue;
112 }
113 Ready(Err(err)) => {
114 this.state = PeerPutBlobState::Failed;
115
116 return Ready(Err(PeerPutBlobError::SendPacket(err)));
118 }
119 Pending => {
120 this.state = PeerPutBlobState::SendPacket {
121 interaction,
122 future,
123 };
124
125 return Pending;
127 }
128 }
129 }
130
131 PeerPutBlobState::AwaitResponse { mut interaction } => {
133 let packet = match interaction.poll_next(cx) {
135 Pending => {
136 this.state = PeerPutBlobState::AwaitResponse { interaction };
137
138 return Pending;
139 }
140 Ready(None) => {
141 this.state = PeerPutBlobState::Failed;
142
143 return Ready(Err(PeerPutBlobError::DidNotRespond));
144 }
145 Ready(Some(Ok(packet))) => packet,
146 Ready(Some(Err(err))) => {
147 this.state = PeerPutBlobState::Failed;
148
149 return Ready(Err(PeerPutBlobError::ReadPacket(err)));
150 }
151 };
152
153 match packet {
155 Packet::PutResponse(response) => {
156 this.state = PeerPutBlobState::Done;
158
159 return Ready(Ok(response));
160 }
161 packet => {
162 let peer = interaction.get_peer().clone();
164
165 spawn_and_forget(async move {
167 packet.process(peer).await?;
168 Ok(())
169 });
170
171 this.state = PeerPutBlobState::Failed;
172
173 return Ready(Err(PeerPutBlobError::InvalidResponse));
174 }
175 }
176 }
177 PeerPutBlobState::Done | PeerPutBlobState::Failed => {
179 return Ready(Err(PeerPutBlobError::MultipleAwaits))
180 }
181 PeerPutBlobState::Processing => {
183 return Ready(Err(PeerPutBlobError::ProcessingState))
184 }
185 }
186 }
187 }
188}
189
190#[derive(thiserror::Error, Debug)]
191pub enum PeerPutBlobError {
192 #[error("This Promise was consumed more than once.")]
193 AlreadyConsumed,
194 #[error(transparent)]
195 BeginInteraction(#[from] crate::PeerBeginInteractionError),
196 #[error("Peer did not respond to the put request.")]
197 DidNotRespond,
198 #[error("Peer did not provide a valid response.")]
199 InvalidResponse,
200 #[error("This future was awaited multiple times, which isn't supported.")]
201 MultipleAwaits,
202 #[error("This is an internal exception which you shouldn't encounter.")]
203 ProcessingState,
204 #[error("Failed to read packet: {0}")]
205 ReadPacket(#[from] crate::InteractionReadPacketError),
206 #[error(transparent)]
207 SendPacket(#[from] crate::InteractionSendPacketError),
208}
209
210impl PromiseRejection for PeerPutBlobError {
211 fn already_consumed() -> Self {
212 Self::AlreadyConsumed
213 }
214}
215
216type Result<T = PutResponse, E = PeerPutBlobError> = std::result::Result<T, E>;