nash_native_client/
http_extension.rs

1//! Client implementation of Nash API over http
2
3use std::any::type_name;
4use std::time::Duration;
5
6use async_recursion::async_recursion;
7use rand::Rng;
8use reqwest::header::AUTHORIZATION;
9use tracing::{error, trace_span, Instrument};
10
11use nash_protocol::errors::{ProtocolError, Result};
12use nash_protocol::protocol::{NashProtocol, NashProtocolPipeline, ResponseOrError, State};
13
14use crate::types::Environment;
15use crate::ws_client::{Client, InnerClient};
16
17pub(crate) struct HttpClientState {
18    client: reqwest::Client,
19    api_url: String,
20    auth_token: Option<String>,
21}
22
23impl InnerClient {
24    /// Init internal http client
25    pub(crate) async fn setup_http(
26        state: &mut State,
27        env: Environment,
28        timeout: Duration,
29    ) -> Result<HttpClientState> {
30        let client = reqwest::Client::builder()
31            .timeout(timeout)
32            .build()
33            .map_err(|_| ProtocolError("Could not initialize reqwest client"))?;
34        let api_url = format!("https://{}/api/graphql", env.url());
35        let auth_token = state
36            .signer
37            .as_ref()
38            .map(|s| format!("Token {}", s.api_keys.session_id));
39        Ok(HttpClientState {
40            client,
41            api_url,
42            auth_token,
43        })
44    }
45
46    /// Execute a serialized NashProtocol request via http
47    async fn request_http(&self, request: serde_json::Value) -> Result<serde_json::Value> {
48        // Do simple request/response...
49        let mut request = self
50            .http_state
51            .client
52            .post(&self.http_state.api_url)
53            .json(&request);
54        if let Some(auth_token) = &self.http_state.auth_token {
55            request = request.header(AUTHORIZATION, auth_token)
56        }
57        let response = request.send().await;
58        response
59            .map_err(|e| {
60                if e.is_timeout() {
61                    ProtocolError("Request timeout")
62                } else {
63                    ProtocolError::coerce_static_from_str(&format!("Failed HTTP request: {}", e))
64                }
65            })?
66            .json()
67            .await
68            .map_err(|e| {
69                ProtocolError::coerce_static_from_str(&format!(
70                    "Could not parse response as JSON: {}",
71                    e
72                ))
73            })
74    }
75
76    /// Execute a NashProtocol request. Query will be created, executed over network, response will
77    /// be passed to the protocol's state update hook, and response will be returned. Used by the even
78    /// more generic `run_http(..)`.
79    async fn execute_protocol_http<T: NashProtocol + Sync>(
80        &self,
81        request: T,
82    ) -> Result<ResponseOrError<T::Response>> {
83        let query = request.graphql(self.state.clone()).await?;
84        let json_payload = self.request_http(query).await?;
85        let protocol_response = request
86            .response_from_json(json_payload, self.state.clone())
87            .await?;
88        match protocol_response{
89            ResponseOrError::Response(ref response) => {
90                request
91                    .process_response(&response.data, self.state.clone())
92                    .await?;
93            }
94            ResponseOrError::Error(ref error_response) => {
95                request
96                    .process_error(error_response, self.state.clone())
97                    .await?;
98            }
99        }
100        Ok(protocol_response)
101    }
102
103    #[async_recursion]
104    pub async fn run_http<T: NashProtocolPipeline + Clone>(
105        &self,
106        request: T,
107    ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
108        async {
109            let response = {
110                if let Some(_permit) = request.acquire_permit(self.state.clone()).await {
111                    self.run_helper_http(request).await
112                } else {
113                    self.run_helper_http(request).await
114                }
115            };
116            if let Err(ref e) = response {
117                error!(error = %e, "request error");
118            }
119            response
120        }
121        .instrument(trace_span!(
122                "RUN (http)",
123                request = type_name::<T>(),
124                id = %rand::thread_rng().gen::<u32>()))
125        .await
126    }
127
128    pub async fn run_http_with_permit<T: NashProtocolPipeline + Clone>(
129        &self,
130        request: T,
131        _permit: tokio::sync::OwnedSemaphorePermit,
132    ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
133        async {
134            let response = self.run_helper_http(request).await;
135            if let Err(ref e) = response {
136                error!(error = %e, "request error");
137            }
138            response
139        }
140        .instrument(trace_span!(
141                "RUN (http)",
142                request = type_name::<T>(),
143                id = %rand::thread_rng().gen::<u32>()))
144        .await
145    }
146
147    /// Does the main work of running a pipeline via http
148    async fn run_helper_http<T: NashProtocolPipeline + Clone>(
149        &self,
150        request: T,
151    ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
152        // First run any dependencies of the request/pipeline
153        let before_actions = request.run_before(self.state.clone()).await?;
154        if let Some(actions) = before_actions {
155            for action in actions {
156                self.run_http(action).await?;
157            }
158        }
159        // Now run the pipeline
160        let mut protocol_state = request.init_state(self.state.clone()).await;
161        // While pipeline contains more actions for client to take, execute them
162        loop {
163            if let Some(protocol_request) = request
164                .next_step(&protocol_state, self.state.clone())
165                .await?
166            {
167                let protocol_response = self.execute_protocol_http(protocol_request).await?;
168                // If error, end pipeline early and return GraphQL/network error data
169                if protocol_response.is_error() {
170                    Self::manage_client_error(
171                        self.state.clone(),
172                        protocol_response.error().unwrap(),
173                    )
174                    .await;
175
176                    return Ok(ResponseOrError::Error(
177                        protocol_response
178                            .consume_error()
179                            .expect("Destructure error after check. Impossible to fail."),
180                    ));
181                }
182                // Otherwise update the pipeline and continue
183                request
184                    .process_step(
185                        protocol_response
186                            .consume_response()
187                            .expect("Destructure response after check. Impossible to fail."),
188                        &mut protocol_state,
189                    )
190                    .await;
191            } else {
192                // If no more actions left, then done
193                break;
194            }
195        }
196        // Get things to run after the request/pipeline
197        let after_actions = request.run_after(self.state.clone()).await?;
198        // Now run anything specified for after the pipeline
199        if let Some(actions) = after_actions {
200            for action in actions {
201                self.run_http(action).await?;
202            }
203        }
204        // Return the pipeline output
205        request.output(protocol_state)
206    }
207}
208
209impl Client {
210    /// Main entry point to execute Nash API requests via HTTP. Capable of running anything that implements `NashProtocolPipeline`.
211    /// All `NashProtocol` requests automatically do. Other more complex multi-stage interactions like `SignAllStates`
212    /// implement the trait manually. This will optionally run before and after hooks if those are defined for the pipeline
213    /// or request (e.g., get asset nonces if they don't exist)
214    #[inline]
215    pub async fn run_http<T: NashProtocolPipeline + Clone>(
216        &self,
217        request: T,
218    ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
219        self.inner.run_http(request).await
220    }
221}