1use super::*;
2use ::gstd::{
3 errors::Error,
4 msg::{CreateProgramFuture, MessageFuture},
5};
6
7#[derive(Default)]
8pub struct GstdParams {
9 #[cfg(not(feature = "ethexe"))]
10 pub gas_limit: Option<GasUnit>,
11 pub value: Option<ValueUnit>,
12 pub wait_up_to: Option<BlockCount>,
13 #[cfg(not(feature = "ethexe"))]
14 pub reply_deposit: Option<GasUnit>,
15 #[cfg(not(feature = "ethexe"))]
16 pub reply_hook: Option<Box<dyn FnOnce() + Send + 'static>>,
17 pub redirect_on_exit: bool,
18}
19
20crate::params_for_pending_impl!(GstdEnv, GstdParams {
21 #[cfg(not(feature = "ethexe"))]
22 pub gas_limit: GasUnit,
23 pub value: ValueUnit,
24 pub wait_up_to: BlockCount,
25 #[cfg(not(feature = "ethexe"))]
26 pub reply_deposit: GasUnit,
27});
28
29impl GstdParams {
30 pub fn with_redirect_on_exit(self, redirect_on_exit: bool) -> Self {
31 Self {
32 redirect_on_exit,
33 ..self
34 }
35 }
36
37 #[cfg(not(feature = "ethexe"))]
38 pub fn with_reply_hook<F: FnOnce() + Send + 'static>(self, f: F) -> Self {
39 Self {
40 reply_hook: Some(Box::new(f)),
41 ..self
42 }
43 }
44}
45
46impl<T: CallCodec> PendingCall<T, GstdEnv> {
47 pub fn with_redirect_on_exit(self, redirect_on_exit: bool) -> Self {
57 self.with_params(|params| params.with_redirect_on_exit(redirect_on_exit))
58 }
59
60 #[cfg(not(feature = "ethexe"))]
61 pub fn with_reply_hook<F: FnOnce() + Send + 'static>(self, f: F) -> Self {
62 self.with_params(|params| params.with_reply_hook(f))
63 }
64}
65
66#[derive(Debug, Default, Clone)]
67pub struct GstdEnv;
68
69impl GearEnv for GstdEnv {
70 type Params = GstdParams;
71 type Error = Error;
72 #[cfg(target_arch = "wasm32")]
73 type MessageState = GstdFuture;
74 #[cfg(not(target_arch = "wasm32"))]
75 type MessageState = core::future::Ready<Result<Vec<u8>, Self::Error>>;
76}
77
78impl GstdEnv {
79 pub fn send_one_way(
80 &self,
81 destination: ActorId,
82 payload: impl AsRef<[u8]>,
83 params: GstdParams,
84 ) -> Result<MessageId, Error> {
85 let value = params.value.unwrap_or_default();
86 let payload_bytes = payload.as_ref();
87
88 #[cfg(not(feature = "ethexe"))]
89 let waiting_reply_to = if let Some(gas_limit) = params.gas_limit {
90 ::gcore::msg::send_with_gas(destination, payload_bytes, gas_limit, value)?
91 } else {
92 ::gcore::msg::send(destination, payload_bytes, value)?
93 };
94 #[cfg(feature = "ethexe")]
95 let waiting_reply_to = ::gcore::msg::send(destination, payload_bytes, value)?;
96
97 #[cfg(not(feature = "ethexe"))]
98 if let Some(reply_deposit) = params.reply_deposit {
99 ::gcore::exec::reply_deposit(waiting_reply_to, reply_deposit)?;
100 }
101
102 Ok(waiting_reply_to)
103 }
104}
105
106impl<T: CallCodec> PendingCall<T, GstdEnv> {
107 pub fn send_one_way(&mut self) -> Result<MessageId, Error> {
108 let (payload, params) = self.take_encoded_args_and_params();
109 self.env.send_one_way(self.destination, payload, params)
110 }
111}
112
113#[cfg(target_arch = "wasm32")]
114const _: () = {
115 use core::task::ready;
116
117 #[cfg(not(feature = "ethexe"))]
118 #[inline]
119 fn send_for_reply_future(
120 destination: ActorId,
121 payload: &[u8],
122 params: &mut GstdParams,
123 ) -> Result<MessageFuture, Error> {
124 let value = params.value.unwrap_or_default();
125 let reply_deposit = params.reply_deposit.unwrap_or_default();
126 let mut message_future = if let Some(gas_limit) = params.gas_limit {
128 ::gstd::msg::send_bytes_with_gas_for_reply(
129 destination,
130 payload,
131 gas_limit,
132 value,
133 reply_deposit,
134 )?
135 } else {
136 ::gstd::msg::send_bytes_for_reply(destination, payload, value, reply_deposit)?
137 };
138
139 message_future = message_future.up_to(params.wait_up_to)?;
140
141 if let Some(reply_hook) = params.reply_hook.take() {
142 message_future = message_future.handle_reply(reply_hook)?;
143 }
144 Ok(message_future)
145 }
146
147 #[cfg(feature = "ethexe")]
148 #[inline]
149 fn send_for_reply_future(
150 destination: ActorId,
151 payload: &[u8],
152 params: &mut GstdParams,
153 ) -> Result<MessageFuture, Error> {
154 let value = params.value.unwrap_or_default();
155 let mut message_future = ::gstd::msg::send_bytes_for_reply(destination, payload, value)?;
157
158 message_future = message_future.up_to(params.wait_up_to)?;
159
160 Ok(message_future)
161 }
162
163 #[inline]
164 fn send_for_reply(
165 destination: ActorId,
166 payload: Vec<u8>,
167 params: &mut GstdParams,
168 ) -> Result<GstdFuture, Error> {
169 let future = send_for_reply_future(destination, payload.as_ref(), params)?;
171 if params.redirect_on_exit {
172 let created_block = params.wait_up_to.map(|_| gstd::exec::block_height());
173 Ok(GstdFuture::MessageWithRedirect {
174 created_block,
175 future,
176 destination,
177 payload,
178 })
179 } else {
180 Ok(GstdFuture::Message { future })
181 }
182 }
183
184 impl<T: CallCodec> Future for PendingCall<T, GstdEnv> {
185 type Output = Result<T::Reply, <GstdEnv as GearEnv>::Error>;
186
187 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188 if self.state.is_none() {
189 let args = self
190 .args
191 .as_ref()
192 .unwrap_or_else(|| panic!("{PENDING_CALL_INVALID_STATE}"));
193 let payload = T::encode_params_with_prefix(self.route, &args);
194 let destination = self.destination;
195 let params = self.params.get_or_insert_default();
196 let future = send_for_reply(destination, payload, params)?;
198 self.state = Some(future);
199 return Poll::Pending;
201 }
202 let this = self.as_mut().project();
203 let mut state = unsafe { this.state.as_pin_mut().unwrap_unchecked() };
205 let output = match state.as_mut().project() {
207 Projection::Message { future } => ready!(future.poll(cx)),
208 Projection::MessageWithRedirect { future, .. } => ready!(future.poll(cx)),
209 _ => panic!("{PENDING_CALL_INVALID_STATE}"),
210 };
211 match output {
212 Ok(payload) => {
214 let res =
215 T::decode_reply_with_prefix(self.route, payload).map_err(Error::Decode)?;
216 Poll::Ready(Ok(res))
217 }
218 Err(gstd::errors::Error::ErrorReply(
220 error_payload,
221 ErrorReplyReason::UnavailableActor(SimpleUnavailableActorError::ProgramExited),
222 )) => {
223 let params = this.params.get_or_insert_default();
224 if let Replace::MessageWithRedirect {
225 destination: _destination,
226 created_block,
227 payload,
228 ..
229 } = state.as_mut().project_replace(GstdFuture::Dummy)
230 && params.redirect_on_exit
231 && let Ok(new_target) = ActorId::try_from(error_payload.0.as_ref())
232 {
233 gstd::debug!("Redirecting message from {_destination} to {new_target}");
234
235 params.wait_up_to = params.wait_up_to.and_then(|wait_up_to| {
238 created_block.map(|created_block| {
239 let current_block = gstd::exec::block_height();
240 wait_up_to
241 .saturating_sub(current_block.saturating_sub(created_block))
242 })
243 });
244
245 let future = send_for_reply(new_target, payload, params)?;
247 _ = state.as_mut().project_replace(future);
249 Poll::Pending
251 } else {
252 Poll::Ready(Err(gstd::errors::Error::ErrorReply(
253 error_payload,
254 ErrorReplyReason::UnavailableActor(
255 SimpleUnavailableActorError::ProgramExited,
256 ),
257 )
258 .into()))
259 }
260 }
261 Err(err) => Poll::Ready(Err(err)),
263 }
264 }
265 }
266
267 impl<A, T: CallCodec> Future for PendingCtor<A, T, GstdEnv> {
268 type Output = Result<Actor<A, GstdEnv>, <GstdEnv as GearEnv>::Error>;
269
270 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
271 if self.state.is_none() {
272 let params = self.params.take().unwrap_or_default();
273 let value = params.value.unwrap_or_default();
274 let salt = self.salt.take().unwrap();
275
276 let args = self
277 .args
278 .as_ref()
279 .unwrap_or_else(|| panic!("{PENDING_CALL_INVALID_STATE}"));
280 let payload = T::encode_params(args);
281 #[cfg(not(feature = "ethexe"))]
283 let future = if let Some(gas_limit) = params.gas_limit {
284 ::gstd::prog::create_program_bytes_with_gas_for_reply(
285 self.code_id,
286 salt,
287 payload,
288 gas_limit,
289 value,
290 params.reply_deposit.unwrap_or_default(),
291 )?
292 } else {
293 ::gstd::prog::create_program_bytes_for_reply(
294 self.code_id,
295 salt,
296 payload,
297 value,
298 params.reply_deposit.unwrap_or_default(),
299 )?
300 };
301 #[cfg(feature = "ethexe")]
302 let future = ::gstd::prog::create_program_bytes_for_reply(
303 self.code_id,
304 salt,
305 payload,
306 value,
307 )?;
308
309 self.state = Some(GstdFuture::CreateProgram { future });
311 return Poll::Pending;
313 }
314 let this = self.as_mut().project();
315 let mut state = unsafe { this.state.as_pin_mut().unwrap_unchecked() };
317 if let Projection::CreateProgram { future } = state.project() {
318 match ready!(future.poll(cx)) {
320 Ok((program_id, _payload)) => {
321 Poll::Ready(Ok(Actor::new(this.env.clone(), program_id)))
323 }
324 Err(err) => Poll::Ready(Err(err)),
325 }
326 } else {
327 panic!("{PENDING_CTOR_INVALID_STATE}");
328 }
329 }
330 }
331};
332
333pin_project_lite::pin_project! {
334 #[project = Projection]
335 #[project_replace = Replace]
336 pub enum GstdFuture {
337 CreateProgram { #[pin] future: CreateProgramFuture },
338 Message { #[pin] future: MessageFuture },
339 MessageWithRedirect {
340 #[pin]
341 future: MessageFuture,
342 destination: ActorId,
343 created_block: Option<BlockCount>,
344 payload: Vec<u8>, },
346 Dummy,
347 }
348}
349
350#[cfg(not(target_arch = "wasm32"))]
351const _: () = {
352 impl<T: CallCodec> PendingCall<T, GstdEnv>
353 where
354 T::Reply: Encode + Decode,
355 {
356 pub fn from_output(output: T::Reply) -> Self {
357 Self::from_result(Ok(output))
358 }
359
360 pub fn from_error(err: <GstdEnv as GearEnv>::Error) -> Self {
361 Self::from_result(Err(err))
362 }
363
364 pub fn from_result(res: Result<T::Reply, <GstdEnv as GearEnv>::Error>) -> Self {
365 PendingCall {
366 env: GstdEnv,
367 destination: ActorId::zero(),
368 route: "",
369 params: None,
370 args: None,
371 state: Some(future::ready(res.map(|v| v.encode()))),
372 }
373 }
374 }
375
376 impl<T: CallCodec<Reply = O>, O> From<O> for PendingCall<T, GstdEnv>
377 where
378 O: Encode + Decode,
379 {
380 fn from(value: O) -> Self {
381 PendingCall::from_output(value)
382 }
383 }
384
385 impl<T: CallCodec> Future for PendingCall<T, GstdEnv> {
386 type Output = Result<T::Reply, <GstdEnv as GearEnv>::Error>;
387
388 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
389 match self.state.take() {
390 Some(ready) => {
391 let res = ready.into_inner();
392 Poll::Ready(res.map(|v| T::Reply::decode(&mut v.as_ref()).unwrap()))
393 }
394 None => panic!("{PENDING_CALL_INVALID_STATE}"),
395 }
396 }
397 }
398
399 impl<A, T: CallCodec> Future for PendingCtor<A, T, GstdEnv> {
400 type Output = Result<Actor<A, GstdEnv>, <GstdEnv as GearEnv>::Error>;
401
402 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
403 match self.state.take() {
404 Some(_ready) => {
405 let program_id = self
406 .program_id
407 .take()
408 .unwrap_or_else(|| panic!("{PENDING_CTOR_INVALID_STATE}"));
409 let env = self.env.clone();
410 Poll::Ready(Ok(Actor::new(env, program_id)))
411 }
412 None => panic!("{PENDING_CTOR_INVALID_STATE}"),
413 }
414 }
415 }
416};