sails_rs/client/
gstd_env.rs

1use super::*;
2use ::gstd::{
3    errors::Error,
4    msg::{CreateProgramFuture, MessageFuture},
5};
6
7#[derive(Default)]
8pub struct GstdParams {
9    #[cfg(not(feature = "ethexe"))]
10    pub gas_limit: Option<GasUnit>,
11    pub value: Option<ValueUnit>,
12    pub wait_up_to: Option<BlockCount>,
13    #[cfg(not(feature = "ethexe"))]
14    pub reply_deposit: Option<GasUnit>,
15    #[cfg(not(feature = "ethexe"))]
16    pub reply_hook: Option<Box<dyn FnOnce() + Send + 'static>>,
17    pub redirect_on_exit: bool,
18}
19
20crate::params_for_pending_impl!(GstdEnv, GstdParams {
21    #[cfg(not(feature = "ethexe"))]
22    pub gas_limit: GasUnit,
23    pub value: ValueUnit,
24    pub wait_up_to: BlockCount,
25    #[cfg(not(feature = "ethexe"))]
26    pub reply_deposit: GasUnit,
27});
28
29impl GstdParams {
30    pub fn with_redirect_on_exit(self, redirect_on_exit: bool) -> Self {
31        Self {
32            redirect_on_exit,
33            ..self
34        }
35    }
36
37    #[cfg(not(feature = "ethexe"))]
38    pub fn with_reply_hook<F: FnOnce() + Send + 'static>(self, f: F) -> Self {
39        Self {
40            reply_hook: Some(Box::new(f)),
41            ..self
42        }
43    }
44}
45
46impl<T: CallCodec> PendingCall<T, GstdEnv> {
47    /// Set `redirect_on_exit` flag to `true``
48    ///
49    /// This flag is used to redirect a message to a new program when the target program exits
50    /// with an inheritor.
51    ///
52    /// WARNING: When this flag is set, the message future captures the payload and other arguments,
53    /// potentially resulting in multiple messages being sent. This can lead to increased gas consumption.
54    ///
55    /// This flag is set to `false`` by default.
56    pub fn with_redirect_on_exit(self, redirect_on_exit: bool) -> Self {
57        self.with_params(|params| params.with_redirect_on_exit(redirect_on_exit))
58    }
59
60    #[cfg(not(feature = "ethexe"))]
61    pub fn with_reply_hook<F: FnOnce() + Send + 'static>(self, f: F) -> Self {
62        self.with_params(|params| params.with_reply_hook(f))
63    }
64}
65
66#[derive(Debug, Default, Clone)]
67pub struct GstdEnv;
68
69impl GearEnv for GstdEnv {
70    type Params = GstdParams;
71    type Error = Error;
72    #[cfg(target_arch = "wasm32")]
73    type MessageState = GstdFuture;
74    #[cfg(not(target_arch = "wasm32"))]
75    type MessageState = core::future::Ready<Result<Vec<u8>, Self::Error>>;
76}
77
78impl GstdEnv {
79    pub fn send_one_way(
80        &self,
81        destination: ActorId,
82        payload: impl AsRef<[u8]>,
83        params: GstdParams,
84    ) -> Result<MessageId, Error> {
85        let value = params.value.unwrap_or_default();
86        let payload_bytes = payload.as_ref();
87
88        #[cfg(not(feature = "ethexe"))]
89        let waiting_reply_to = if let Some(gas_limit) = params.gas_limit {
90            ::gcore::msg::send_with_gas(destination, payload_bytes, gas_limit, value)?
91        } else {
92            ::gcore::msg::send(destination, payload_bytes, value)?
93        };
94        #[cfg(feature = "ethexe")]
95        let waiting_reply_to = ::gcore::msg::send(destination, payload_bytes, value)?;
96
97        #[cfg(not(feature = "ethexe"))]
98        if let Some(reply_deposit) = params.reply_deposit {
99            ::gcore::exec::reply_deposit(waiting_reply_to, reply_deposit)?;
100        }
101
102        Ok(waiting_reply_to)
103    }
104}
105
106impl<T: CallCodec> PendingCall<T, GstdEnv> {
107    pub fn send_one_way(&mut self) -> Result<MessageId, Error> {
108        let (payload, params) = self.take_encoded_args_and_params();
109        self.env.send_one_way(self.destination, payload, params)
110    }
111}
112
113#[cfg(target_arch = "wasm32")]
114const _: () = {
115    use core::task::ready;
116
117    #[cfg(not(feature = "ethexe"))]
118    #[inline]
119    fn send_for_reply_future(
120        destination: ActorId,
121        payload: &[u8],
122        params: &mut GstdParams,
123    ) -> Result<MessageFuture, Error> {
124        let value = params.value.unwrap_or_default();
125        let reply_deposit = params.reply_deposit.unwrap_or_default();
126        // here can be a redirect target
127        let mut message_future = if let Some(gas_limit) = params.gas_limit {
128            ::gstd::msg::send_bytes_with_gas_for_reply(
129                destination,
130                payload,
131                gas_limit,
132                value,
133                reply_deposit,
134            )?
135        } else {
136            ::gstd::msg::send_bytes_for_reply(destination, payload, value, reply_deposit)?
137        };
138
139        message_future = message_future.up_to(params.wait_up_to)?;
140
141        if let Some(reply_hook) = params.reply_hook.take() {
142            message_future = message_future.handle_reply(reply_hook)?;
143        }
144        Ok(message_future)
145    }
146
147    #[cfg(feature = "ethexe")]
148    #[inline]
149    fn send_for_reply_future(
150        destination: ActorId,
151        payload: &[u8],
152        params: &mut GstdParams,
153    ) -> Result<MessageFuture, Error> {
154        let value = params.value.unwrap_or_default();
155        // here can be a redirect target
156        let mut message_future = ::gstd::msg::send_bytes_for_reply(destination, payload, value)?;
157
158        message_future = message_future.up_to(params.wait_up_to)?;
159
160        Ok(message_future)
161    }
162
163    #[inline]
164    fn send_for_reply(
165        destination: ActorId,
166        payload: Vec<u8>,
167        params: &mut GstdParams,
168    ) -> Result<GstdFuture, Error> {
169        // send message
170        let future = send_for_reply_future(destination, payload.as_ref(), params)?;
171        if params.redirect_on_exit {
172            let created_block = params.wait_up_to.map(|_| gstd::exec::block_height());
173            Ok(GstdFuture::MessageWithRedirect {
174                created_block,
175                future,
176                destination,
177                payload,
178            })
179        } else {
180            Ok(GstdFuture::Message { future })
181        }
182    }
183
184    impl<T: CallCodec> Future for PendingCall<T, GstdEnv> {
185        type Output = Result<T::Reply, <GstdEnv as GearEnv>::Error>;
186
187        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188            if self.state.is_none() {
189                let args = self
190                    .args
191                    .as_ref()
192                    .unwrap_or_else(|| panic!("{PENDING_CALL_INVALID_STATE}"));
193                let payload = T::encode_params_with_prefix(self.route, &args);
194                let destination = self.destination;
195                let params = self.params.get_or_insert_default();
196                // Send message
197                let future = send_for_reply(destination, payload, params)?;
198                self.state = Some(future);
199                // No need to poll the future
200                return Poll::Pending;
201            }
202            let this = self.as_mut().project();
203            // SAFETY: checked in the code above.
204            let mut state = unsafe { this.state.as_pin_mut().unwrap_unchecked() };
205            // Poll message future
206            let output = match state.as_mut().project() {
207                Projection::Message { future } => ready!(future.poll(cx)),
208                Projection::MessageWithRedirect { future, .. } => ready!(future.poll(cx)),
209                _ => panic!("{PENDING_CALL_INVALID_STATE}"),
210            };
211            match output {
212                // ok reply
213                Ok(payload) => {
214                    let res =
215                        T::decode_reply_with_prefix(self.route, payload).map_err(Error::Decode)?;
216                    Poll::Ready(Ok(res))
217                }
218                // reply with ProgramExited
219                Err(gstd::errors::Error::ErrorReply(
220                    error_payload,
221                    ErrorReplyReason::UnavailableActor(SimpleUnavailableActorError::ProgramExited),
222                )) => {
223                    let params = this.params.get_or_insert_default();
224                    if let Replace::MessageWithRedirect {
225                        destination: _destination,
226                        created_block,
227                        payload,
228                        ..
229                    } = state.as_mut().project_replace(GstdFuture::Dummy)
230                        && params.redirect_on_exit
231                        && let Ok(new_target) = ActorId::try_from(error_payload.0.as_ref())
232                    {
233                        gstd::debug!("Redirecting message from {_destination} to {new_target}");
234
235                        // Calculate updated `wait_up_to` if provided
236                        // wait_up_to = wait_up_to - (current_block - created_block)
237                        params.wait_up_to = params.wait_up_to.and_then(|wait_up_to| {
238                            created_block.map(|created_block| {
239                                let current_block = gstd::exec::block_height();
240                                wait_up_to
241                                    .saturating_sub(current_block.saturating_sub(created_block))
242                            })
243                        });
244
245                        // send message to new target
246                        let future = send_for_reply(new_target, payload, params)?;
247                        // Replace the future with a new one
248                        _ = state.as_mut().project_replace(future);
249                        // Return Pending to allow the new future to be polled
250                        Poll::Pending
251                    } else {
252                        Poll::Ready(Err(gstd::errors::Error::ErrorReply(
253                            error_payload,
254                            ErrorReplyReason::UnavailableActor(
255                                SimpleUnavailableActorError::ProgramExited,
256                            ),
257                        )
258                        .into()))
259                    }
260                }
261                // error reply
262                Err(err) => Poll::Ready(Err(err)),
263            }
264        }
265    }
266
267    impl<A, T: CallCodec> Future for PendingCtor<A, T, GstdEnv> {
268        type Output = Result<Actor<A, GstdEnv>, <GstdEnv as GearEnv>::Error>;
269
270        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
271            if self.state.is_none() {
272                let params = self.params.take().unwrap_or_default();
273                let value = params.value.unwrap_or_default();
274                let salt = self.salt.take().unwrap();
275
276                let args = self
277                    .args
278                    .as_ref()
279                    .unwrap_or_else(|| panic!("{PENDING_CALL_INVALID_STATE}"));
280                let payload = T::encode_params(args);
281                // Send message
282                #[cfg(not(feature = "ethexe"))]
283                let future = if let Some(gas_limit) = params.gas_limit {
284                    ::gstd::prog::create_program_bytes_with_gas_for_reply(
285                        self.code_id,
286                        salt,
287                        payload,
288                        gas_limit,
289                        value,
290                        params.reply_deposit.unwrap_or_default(),
291                    )?
292                } else {
293                    ::gstd::prog::create_program_bytes_for_reply(
294                        self.code_id,
295                        salt,
296                        payload,
297                        value,
298                        params.reply_deposit.unwrap_or_default(),
299                    )?
300                };
301                #[cfg(feature = "ethexe")]
302                let future = ::gstd::prog::create_program_bytes_for_reply(
303                    self.code_id,
304                    salt,
305                    payload,
306                    value,
307                )?;
308
309                // self.program_id = Some(program_future.program_id);
310                self.state = Some(GstdFuture::CreateProgram { future });
311                // No need to poll the future
312                return Poll::Pending;
313            }
314            let this = self.as_mut().project();
315            // SAFETY: checked in the code above.
316            let mut state = unsafe { this.state.as_pin_mut().unwrap_unchecked() };
317            if let Projection::CreateProgram { future } = state.project() {
318                // Poll create program future
319                match ready!(future.poll(cx)) {
320                    Ok((program_id, _payload)) => {
321                        // Do not decode payload here
322                        Poll::Ready(Ok(Actor::new(this.env.clone(), program_id)))
323                    }
324                    Err(err) => Poll::Ready(Err(err)),
325                }
326            } else {
327                panic!("{PENDING_CTOR_INVALID_STATE}");
328            }
329        }
330    }
331};
332
333pin_project_lite::pin_project! {
334    #[project = Projection]
335    #[project_replace = Replace]
336    pub enum GstdFuture {
337        CreateProgram { #[pin] future: CreateProgramFuture },
338        Message { #[pin] future: MessageFuture },
339        MessageWithRedirect {
340            #[pin]
341            future: MessageFuture,
342            destination: ActorId,
343            created_block: Option<BlockCount>,
344            payload: Vec<u8>, // reuse encoded payload when redirecting
345        },
346        Dummy,
347    }
348}
349
350#[cfg(not(target_arch = "wasm32"))]
351const _: () = {
352    impl<T: CallCodec> PendingCall<T, GstdEnv>
353    where
354        T::Reply: Encode + Decode,
355    {
356        pub fn from_output(output: T::Reply) -> Self {
357            Self::from_result(Ok(output))
358        }
359
360        pub fn from_error(err: <GstdEnv as GearEnv>::Error) -> Self {
361            Self::from_result(Err(err))
362        }
363
364        pub fn from_result(res: Result<T::Reply, <GstdEnv as GearEnv>::Error>) -> Self {
365            PendingCall {
366                env: GstdEnv,
367                destination: ActorId::zero(),
368                route: "",
369                params: None,
370                args: None,
371                state: Some(future::ready(res.map(|v| v.encode()))),
372            }
373        }
374    }
375
376    impl<T: CallCodec<Reply = O>, O> From<O> for PendingCall<T, GstdEnv>
377    where
378        O: Encode + Decode,
379    {
380        fn from(value: O) -> Self {
381            PendingCall::from_output(value)
382        }
383    }
384
385    impl<T: CallCodec> Future for PendingCall<T, GstdEnv> {
386        type Output = Result<T::Reply, <GstdEnv as GearEnv>::Error>;
387
388        fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
389            match self.state.take() {
390                Some(ready) => {
391                    let res = ready.into_inner();
392                    Poll::Ready(res.map(|v| T::Reply::decode(&mut v.as_ref()).unwrap()))
393                }
394                None => panic!("{PENDING_CALL_INVALID_STATE}"),
395            }
396        }
397    }
398
399    impl<A, T: CallCodec> Future for PendingCtor<A, T, GstdEnv> {
400        type Output = Result<Actor<A, GstdEnv>, <GstdEnv as GearEnv>::Error>;
401
402        fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
403            match self.state.take() {
404                Some(_ready) => {
405                    let program_id = self
406                        .program_id
407                        .take()
408                        .unwrap_or_else(|| panic!("{PENDING_CTOR_INVALID_STATE}"));
409                    let env = self.env.clone();
410                    Poll::Ready(Ok(Actor::new(env, program_id)))
411                }
412                None => panic!("{PENDING_CTOR_INVALID_STATE}"),
413            }
414        }
415    }
416};