auths_infra_http/
pairing_client.rs1use std::future::Future;
2use std::time::Duration;
3
4use auths_core::pairing::{
5 CreateSessionRequest, CreateSessionResponse, GetConfirmationResponse, GetSessionResponse,
6 SessionStatus, SubmitConfirmationRequest, SubmitResponseRequest,
7};
8use auths_core::ports::network::NetworkError;
9use auths_core::ports::pairing::PairingRelayClient;
10
11use crate::default_http_client;
12use crate::error::{map_reqwest_error, map_status_error};
13
14const POLL_INTERVAL: Duration = Duration::from_secs(2);
15
16pub struct HttpPairingRelayClient {
29 client: reqwest::Client,
30}
31
32impl HttpPairingRelayClient {
33 pub fn new() -> Self {
35 Self {
36 client: default_http_client(),
37 }
38 }
39}
40
41impl Default for HttpPairingRelayClient {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl PairingRelayClient for HttpPairingRelayClient {
48 fn create_session(
49 &self,
50 registry_url: &str,
51 request: &CreateSessionRequest,
52 ) -> impl Future<Output = Result<CreateSessionResponse, NetworkError>> + Send {
53 let url = format!("{}/v1/pairing/sessions", registry_url.trim_end_matches('/'));
54 let endpoint = registry_url.to_string();
55 let req = self.client.post(&url).json(request);
57
58 async move {
59 let resp = req
60 .send()
61 .await
62 .map_err(|e| map_reqwest_error(e, &endpoint))?;
63 if !resp.status().is_success() {
64 return Err(map_status_error(resp.status().as_u16(), &url));
65 }
66 resp.json::<CreateSessionResponse>()
67 .await
68 .map_err(|e| NetworkError::InvalidResponse {
69 detail: e.to_string(),
70 })
71 }
72 }
73
74 fn get_session(
75 &self,
76 registry_url: &str,
77 session_id: &str,
78 ) -> impl Future<Output = Result<GetSessionResponse, NetworkError>> + Send {
79 let url = format!(
80 "{}/v1/pairing/sessions/{}",
81 registry_url.trim_end_matches('/'),
82 session_id
83 );
84 let endpoint = registry_url.to_string();
85 let req = self.client.get(&url);
86
87 async move {
88 let resp = req
89 .send()
90 .await
91 .map_err(|e| map_reqwest_error(e, &endpoint))?;
92 if !resp.status().is_success() {
93 return Err(map_status_error(resp.status().as_u16(), &url));
94 }
95 resp.json::<GetSessionResponse>()
96 .await
97 .map_err(|e| NetworkError::InvalidResponse {
98 detail: e.to_string(),
99 })
100 }
101 }
102
103 fn lookup_by_code(
104 &self,
105 registry_url: &str,
106 code: &str,
107 ) -> impl Future<Output = Result<GetSessionResponse, NetworkError>> + Send {
108 let url = format!(
109 "{}/v1/pairing/sessions/by-code/{}",
110 registry_url.trim_end_matches('/'),
111 code
112 );
113 let endpoint = registry_url.to_string();
114 let req = self.client.get(&url);
115
116 async move {
117 let resp = req
118 .send()
119 .await
120 .map_err(|e| map_reqwest_error(e, &endpoint))?;
121 if !resp.status().is_success() {
122 return Err(map_status_error(resp.status().as_u16(), &url));
123 }
124 resp.json::<GetSessionResponse>()
125 .await
126 .map_err(|e| NetworkError::InvalidResponse {
127 detail: e.to_string(),
128 })
129 }
130 }
131
132 fn submit_response(
133 &self,
134 registry_url: &str,
135 session_id: &str,
136 response: &SubmitResponseRequest,
137 ) -> impl Future<Output = Result<(), NetworkError>> + Send {
138 let url = format!(
139 "{}/v1/pairing/sessions/{}/response",
140 registry_url.trim_end_matches('/'),
141 session_id
142 );
143 let endpoint = registry_url.to_string();
144 let req = self.client.post(&url).json(response);
145
146 async move {
147 let resp = req
148 .send()
149 .await
150 .map_err(|e| map_reqwest_error(e, &endpoint))?;
151 if !resp.status().is_success() {
152 return Err(map_status_error(resp.status().as_u16(), &url));
153 }
154 Ok(())
155 }
156 }
157
158 fn submit_confirmation(
159 &self,
160 registry_url: &str,
161 session_id: &str,
162 request: &SubmitConfirmationRequest,
163 ) -> impl Future<Output = Result<(), NetworkError>> + Send {
164 let url = format!(
165 "{}/v1/pairing/sessions/{}/confirm",
166 registry_url.trim_end_matches('/'),
167 session_id
168 );
169 let endpoint = registry_url.to_string();
170 let req = self.client.post(&url).json(request);
171
172 async move {
173 let resp = req
174 .send()
175 .await
176 .map_err(|e| map_reqwest_error(e, &endpoint))?;
177 if !resp.status().is_success() {
178 return Err(map_status_error(resp.status().as_u16(), &url));
179 }
180 Ok(())
181 }
182 }
183
184 fn get_confirmation(
185 &self,
186 registry_url: &str,
187 session_id: &str,
188 ) -> impl Future<Output = Result<GetConfirmationResponse, NetworkError>> + Send {
189 let url = format!(
190 "{}/v1/pairing/sessions/{}/confirmation",
191 registry_url.trim_end_matches('/'),
192 session_id
193 );
194 let endpoint = registry_url.to_string();
195 let req = self.client.get(&url);
196
197 async move {
198 let resp = req
199 .send()
200 .await
201 .map_err(|e| map_reqwest_error(e, &endpoint))?;
202 if !resp.status().is_success() {
203 return Err(map_status_error(resp.status().as_u16(), &url));
204 }
205 resp.json::<GetConfirmationResponse>().await.map_err(|e| {
206 NetworkError::InvalidResponse {
207 detail: e.to_string(),
208 }
209 })
210 }
211 }
212
213 fn wait_for_update(
214 &self,
215 registry_url: &str,
216 session_id: &str,
217 timeout: Duration,
218 ) -> impl Future<Output = Result<Option<GetSessionResponse>, NetworkError>> + Send {
219 let session_url = format!(
220 "{}/v1/pairing/sessions/{}",
221 registry_url.trim_end_matches('/'),
222 session_id
223 );
224 let ws_url = format!(
225 "{}/v1/pairing/sessions/{}/ws",
226 registry_url
227 .replace("http://", "ws://")
228 .replace("https://", "wss://")
229 .trim_end_matches('/'),
230 session_id
231 );
232 let endpoint = registry_url.to_string();
233 let client = self.client.clone();
235
236 async move {
237 let deadline = tokio::time::Instant::now() + timeout;
238
239 if let Ok((ws_stream, _)) = tokio_tungstenite::connect_async(&ws_url).await {
240 use futures_util::StreamExt;
241 let (_, mut read) = ws_stream.split();
242 loop {
243 tokio::select! {
244 _ = tokio::time::sleep_until(deadline) => return Ok(None),
245 msg = read.next() => match msg {
246 Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => {
247 if text.contains("\"responded\"")
248 || text.contains("\"cancelled\"")
249 || text.contains("\"expired\"")
250 {
251 let resp = client
252 .get(&session_url)
253 .send()
254 .await
255 .map_err(|e| map_reqwest_error(e, &endpoint))?;
256 return resp
257 .json::<GetSessionResponse>()
258 .await
259 .map(Some)
260 .map_err(|e| NetworkError::InvalidResponse {
261 detail: e.to_string(),
262 });
263 }
264 }
265 None | Some(Err(_)) => break,
266 _ => {}
267 },
268 }
269 }
270 }
271
272 let start = std::time::Instant::now();
274 loop {
275 if start.elapsed() >= timeout {
276 return Ok(None);
277 }
278 if let Ok(resp) = client.get(&session_url).send().await
279 && resp.status().is_success()
280 && let Ok(state) = resp.json::<GetSessionResponse>().await
281 {
282 match state.status {
283 SessionStatus::Responded
284 | SessionStatus::Cancelled
285 | SessionStatus::Expired => return Ok(Some(state)),
286 _ => {}
287 }
288 }
289 tokio::time::sleep(POLL_INTERVAL).await;
290 }
291 }
292 }
293}