Skip to main content

auths_infra_http/
pairing_client.rs

1use std::future::Future;
2use std::time::Duration;
3
4use auths_core::pairing::{
5    CreateSessionRequest, CreateSessionResponse, GetConfirmationResponse, GetSessionResponse,
6    SessionStatus, SubmitConfirmationRequest, SubmitResponseRequest,
7};
8use auths_core::ports::network::NetworkError;
9use auths_core::ports::pairing::PairingRelayClient;
10
11use crate::default_http_client;
12use crate::error::{map_reqwest_error, map_status_error};
13
14const POLL_INTERVAL: Duration = Duration::from_secs(2);
15
16/// HTTP-backed implementation of [`PairingRelayClient`].
17///
18/// Uses WebSocket for real-time session updates with HTTP polling as a fallback
19/// when WebSocket is unavailable.
20///
21/// Usage:
22/// ```ignore
23/// use auths_infra_http::HttpPairingRelayClient;
24///
25/// let relay = HttpPairingRelayClient::new();
26/// let response = relay.create_session("https://registry.example.com", &request).await?;
27/// ```
28pub struct HttpPairingRelayClient {
29    client: reqwest::Client,
30}
31
32impl HttpPairingRelayClient {
33    /// Creates a new client with a default reqwest client.
34    pub fn new() -> Self {
35        Self {
36            client: default_http_client(),
37        }
38    }
39}
40
41impl Default for HttpPairingRelayClient {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl PairingRelayClient for HttpPairingRelayClient {
48    fn create_session(
49        &self,
50        registry_url: &str,
51        request: &CreateSessionRequest,
52    ) -> impl Future<Output = Result<CreateSessionResponse, NetworkError>> + Send {
53        let url = format!("{}/v1/pairing/sessions", registry_url.trim_end_matches('/'));
54        let endpoint = registry_url.to_string();
55        // Serialize JSON at call time so the future owns the request bytes.
56        let req = self.client.post(&url).json(request);
57
58        async move {
59            let resp = req
60                .send()
61                .await
62                .map_err(|e| map_reqwest_error(e, &endpoint))?;
63            if !resp.status().is_success() {
64                return Err(map_status_error(resp.status().as_u16(), &url));
65            }
66            resp.json::<CreateSessionResponse>()
67                .await
68                .map_err(|e| NetworkError::InvalidResponse {
69                    detail: e.to_string(),
70                })
71        }
72    }
73
74    fn get_session(
75        &self,
76        registry_url: &str,
77        session_id: &str,
78    ) -> impl Future<Output = Result<GetSessionResponse, NetworkError>> + Send {
79        let url = format!(
80            "{}/v1/pairing/sessions/{}",
81            registry_url.trim_end_matches('/'),
82            session_id
83        );
84        let endpoint = registry_url.to_string();
85        let req = self.client.get(&url);
86
87        async move {
88            let resp = req
89                .send()
90                .await
91                .map_err(|e| map_reqwest_error(e, &endpoint))?;
92            if !resp.status().is_success() {
93                return Err(map_status_error(resp.status().as_u16(), &url));
94            }
95            resp.json::<GetSessionResponse>()
96                .await
97                .map_err(|e| NetworkError::InvalidResponse {
98                    detail: e.to_string(),
99                })
100        }
101    }
102
103    fn lookup_by_code(
104        &self,
105        registry_url: &str,
106        code: &str,
107    ) -> impl Future<Output = Result<GetSessionResponse, NetworkError>> + Send {
108        let url = format!(
109            "{}/v1/pairing/sessions/by-code/{}",
110            registry_url.trim_end_matches('/'),
111            code
112        );
113        let endpoint = registry_url.to_string();
114        let req = self.client.get(&url);
115
116        async move {
117            let resp = req
118                .send()
119                .await
120                .map_err(|e| map_reqwest_error(e, &endpoint))?;
121            if !resp.status().is_success() {
122                return Err(map_status_error(resp.status().as_u16(), &url));
123            }
124            resp.json::<GetSessionResponse>()
125                .await
126                .map_err(|e| NetworkError::InvalidResponse {
127                    detail: e.to_string(),
128                })
129        }
130    }
131
132    fn submit_response(
133        &self,
134        registry_url: &str,
135        session_id: &str,
136        response: &SubmitResponseRequest,
137    ) -> impl Future<Output = Result<(), NetworkError>> + Send {
138        let url = format!(
139            "{}/v1/pairing/sessions/{}/response",
140            registry_url.trim_end_matches('/'),
141            session_id
142        );
143        let endpoint = registry_url.to_string();
144        let req = self.client.post(&url).json(response);
145
146        async move {
147            let resp = req
148                .send()
149                .await
150                .map_err(|e| map_reqwest_error(e, &endpoint))?;
151            if !resp.status().is_success() {
152                return Err(map_status_error(resp.status().as_u16(), &url));
153            }
154            Ok(())
155        }
156    }
157
158    fn submit_confirmation(
159        &self,
160        registry_url: &str,
161        session_id: &str,
162        request: &SubmitConfirmationRequest,
163    ) -> impl Future<Output = Result<(), NetworkError>> + Send {
164        let url = format!(
165            "{}/v1/pairing/sessions/{}/confirm",
166            registry_url.trim_end_matches('/'),
167            session_id
168        );
169        let endpoint = registry_url.to_string();
170        let req = self.client.post(&url).json(request);
171
172        async move {
173            let resp = req
174                .send()
175                .await
176                .map_err(|e| map_reqwest_error(e, &endpoint))?;
177            if !resp.status().is_success() {
178                return Err(map_status_error(resp.status().as_u16(), &url));
179            }
180            Ok(())
181        }
182    }
183
184    fn get_confirmation(
185        &self,
186        registry_url: &str,
187        session_id: &str,
188    ) -> impl Future<Output = Result<GetConfirmationResponse, NetworkError>> + Send {
189        let url = format!(
190            "{}/v1/pairing/sessions/{}/confirmation",
191            registry_url.trim_end_matches('/'),
192            session_id
193        );
194        let endpoint = registry_url.to_string();
195        let req = self.client.get(&url);
196
197        async move {
198            let resp = req
199                .send()
200                .await
201                .map_err(|e| map_reqwest_error(e, &endpoint))?;
202            if !resp.status().is_success() {
203                return Err(map_status_error(resp.status().as_u16(), &url));
204            }
205            resp.json::<GetConfirmationResponse>().await.map_err(|e| {
206                NetworkError::InvalidResponse {
207                    detail: e.to_string(),
208                }
209            })
210        }
211    }
212
213    fn wait_for_update(
214        &self,
215        registry_url: &str,
216        session_id: &str,
217        timeout: Duration,
218    ) -> impl Future<Output = Result<Option<GetSessionResponse>, NetworkError>> + Send {
219        let session_url = format!(
220            "{}/v1/pairing/sessions/{}",
221            registry_url.trim_end_matches('/'),
222            session_id
223        );
224        let ws_url = format!(
225            "{}/v1/pairing/sessions/{}/ws",
226            registry_url
227                .replace("http://", "ws://")
228                .replace("https://", "wss://")
229                .trim_end_matches('/'),
230            session_id
231        );
232        let endpoint = registry_url.to_string();
233        // Clone the client so the future owns it without borrowing &self.
234        let client = self.client.clone();
235
236        async move {
237            let deadline = tokio::time::Instant::now() + timeout;
238
239            if let Ok((ws_stream, _)) = tokio_tungstenite::connect_async(&ws_url).await {
240                use futures_util::StreamExt;
241                let (_, mut read) = ws_stream.split();
242                loop {
243                    tokio::select! {
244                        _ = tokio::time::sleep_until(deadline) => return Ok(None),
245                        msg = read.next() => match msg {
246                            Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => {
247                                if text.contains("\"responded\"")
248                                    || text.contains("\"cancelled\"")
249                                    || text.contains("\"expired\"")
250                                {
251                                    let resp = client
252                                        .get(&session_url)
253                                        .send()
254                                        .await
255                                        .map_err(|e| map_reqwest_error(e, &endpoint))?;
256                                    return resp
257                                        .json::<GetSessionResponse>()
258                                        .await
259                                        .map(Some)
260                                        .map_err(|e| NetworkError::InvalidResponse {
261                                            detail: e.to_string(),
262                                        });
263                                }
264                            }
265                            None | Some(Err(_)) => break,
266                            _ => {}
267                        },
268                    }
269                }
270            }
271
272            // Fallback: HTTP polling
273            let start = std::time::Instant::now();
274            loop {
275                if start.elapsed() >= timeout {
276                    return Ok(None);
277                }
278                if let Ok(resp) = client.get(&session_url).send().await
279                    && resp.status().is_success()
280                    && let Ok(state) = resp.json::<GetSessionResponse>().await
281                {
282                    match state.status {
283                        SessionStatus::Responded
284                        | SessionStatus::Cancelled
285                        | SessionStatus::Expired => return Ok(Some(state)),
286                        _ => {}
287                    }
288                }
289                tokio::time::sleep(POLL_INTERVAL).await;
290            }
291        }
292    }
293}