nash_native_client/
http_extension.rs1use std::any::type_name;
4use std::time::Duration;
5
6use async_recursion::async_recursion;
7use rand::Rng;
8use reqwest::header::AUTHORIZATION;
9use tracing::{error, trace_span, Instrument};
10
11use nash_protocol::errors::{ProtocolError, Result};
12use nash_protocol::protocol::{NashProtocol, NashProtocolPipeline, ResponseOrError, State};
13
14use crate::types::Environment;
15use crate::ws_client::{Client, InnerClient};
16
17pub(crate) struct HttpClientState {
18 client: reqwest::Client,
19 api_url: String,
20 auth_token: Option<String>,
21}
22
23impl InnerClient {
24 pub(crate) async fn setup_http(
26 state: &mut State,
27 env: Environment,
28 timeout: Duration,
29 ) -> Result<HttpClientState> {
30 let client = reqwest::Client::builder()
31 .timeout(timeout)
32 .build()
33 .map_err(|_| ProtocolError("Could not initialize reqwest client"))?;
34 let api_url = format!("https://{}/api/graphql", env.url());
35 let auth_token = state
36 .signer
37 .as_ref()
38 .map(|s| format!("Token {}", s.api_keys.session_id));
39 Ok(HttpClientState {
40 client,
41 api_url,
42 auth_token,
43 })
44 }
45
46 async fn request_http(&self, request: serde_json::Value) -> Result<serde_json::Value> {
48 let mut request = self
50 .http_state
51 .client
52 .post(&self.http_state.api_url)
53 .json(&request);
54 if let Some(auth_token) = &self.http_state.auth_token {
55 request = request.header(AUTHORIZATION, auth_token)
56 }
57 let response = request.send().await;
58 response
59 .map_err(|e| {
60 if e.is_timeout() {
61 ProtocolError("Request timeout")
62 } else {
63 ProtocolError::coerce_static_from_str(&format!("Failed HTTP request: {}", e))
64 }
65 })?
66 .json()
67 .await
68 .map_err(|e| {
69 ProtocolError::coerce_static_from_str(&format!(
70 "Could not parse response as JSON: {}",
71 e
72 ))
73 })
74 }
75
76 async fn execute_protocol_http<T: NashProtocol + Sync>(
80 &self,
81 request: T,
82 ) -> Result<ResponseOrError<T::Response>> {
83 let query = request.graphql(self.state.clone()).await?;
84 let json_payload = self.request_http(query).await?;
85 let protocol_response = request
86 .response_from_json(json_payload, self.state.clone())
87 .await?;
88 match protocol_response{
89 ResponseOrError::Response(ref response) => {
90 request
91 .process_response(&response.data, self.state.clone())
92 .await?;
93 }
94 ResponseOrError::Error(ref error_response) => {
95 request
96 .process_error(error_response, self.state.clone())
97 .await?;
98 }
99 }
100 Ok(protocol_response)
101 }
102
103 #[async_recursion]
104 pub async fn run_http<T: NashProtocolPipeline + Clone>(
105 &self,
106 request: T,
107 ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
108 async {
109 let response = {
110 if let Some(_permit) = request.acquire_permit(self.state.clone()).await {
111 self.run_helper_http(request).await
112 } else {
113 self.run_helper_http(request).await
114 }
115 };
116 if let Err(ref e) = response {
117 error!(error = %e, "request error");
118 }
119 response
120 }
121 .instrument(trace_span!(
122 "RUN (http)",
123 request = type_name::<T>(),
124 id = %rand::thread_rng().gen::<u32>()))
125 .await
126 }
127
128 pub async fn run_http_with_permit<T: NashProtocolPipeline + Clone>(
129 &self,
130 request: T,
131 _permit: tokio::sync::OwnedSemaphorePermit,
132 ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
133 async {
134 let response = self.run_helper_http(request).await;
135 if let Err(ref e) = response {
136 error!(error = %e, "request error");
137 }
138 response
139 }
140 .instrument(trace_span!(
141 "RUN (http)",
142 request = type_name::<T>(),
143 id = %rand::thread_rng().gen::<u32>()))
144 .await
145 }
146
147 async fn run_helper_http<T: NashProtocolPipeline + Clone>(
149 &self,
150 request: T,
151 ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
152 let before_actions = request.run_before(self.state.clone()).await?;
154 if let Some(actions) = before_actions {
155 for action in actions {
156 self.run_http(action).await?;
157 }
158 }
159 let mut protocol_state = request.init_state(self.state.clone()).await;
161 loop {
163 if let Some(protocol_request) = request
164 .next_step(&protocol_state, self.state.clone())
165 .await?
166 {
167 let protocol_response = self.execute_protocol_http(protocol_request).await?;
168 if protocol_response.is_error() {
170 Self::manage_client_error(
171 self.state.clone(),
172 protocol_response.error().unwrap(),
173 )
174 .await;
175
176 return Ok(ResponseOrError::Error(
177 protocol_response
178 .consume_error()
179 .expect("Destructure error after check. Impossible to fail."),
180 ));
181 }
182 request
184 .process_step(
185 protocol_response
186 .consume_response()
187 .expect("Destructure response after check. Impossible to fail."),
188 &mut protocol_state,
189 )
190 .await;
191 } else {
192 break;
194 }
195 }
196 let after_actions = request.run_after(self.state.clone()).await?;
198 if let Some(actions) = after_actions {
200 for action in actions {
201 self.run_http(action).await?;
202 }
203 }
204 request.output(protocol_state)
206 }
207}
208
209impl Client {
210 #[inline]
215 pub async fn run_http<T: NashProtocolPipeline + Clone>(
216 &self,
217 request: T,
218 ) -> Result<ResponseOrError<<T::ActionType as NashProtocol>::Response>> {
219 self.inner.run_http(request).await
220 }
221}