Skip to main content

objectiveai_sdk/http/
client.rs

1//! HTTP client implementation for ObjectiveAI API.
2
3use crate::error;
4use eventsource_stream::Event as MessageEvent;
5use futures::{Stream, StreamExt, SinkExt};
6use reqwest_eventsource::{Event, RequestBuilderExt};
7use std::sync::Arc;
8use tokio_tungstenite::tungstenite;
9
10/// HTTP client for making requests to the ObjectiveAI API.
11///
12/// Handles authentication, request building, and response parsing for both
13/// unary and streaming endpoints.
14///
15/// # Example
16///
17/// ```ignore
18/// let client = HttpClient::new(
19///     reqwest::Client::new(),
20///     None, // Use default address
21///     Some("your-api-key"),
22///     None, // user_agent
23///     None, // x_title
24///     None, // http_referer
25///     None, // x_github_authorization
26///     None, // x_openrouter_authorization
27///     None, // x_mcp_authorization
28///     None, // x_viewer_signature
29///     None, // x_viewer_address
30///     None, // x_commit_author_name
31///     None, // x_commit_author_email
32/// );
33/// ```
34#[derive(Debug, Clone)]
35pub struct HttpClient {
36    /// The underlying reqwest HTTP client.
37    pub http_client: reqwest::Client,
38    /// Base URL for API requests. Defaults to `https://api.objectiveai.dev`.
39    pub address: String,
40    /// API key for authentication. Sent as `Bearer` token in `Authorization` header.
41    pub authorization: Option<Arc<String>>,
42    /// Value for the `User-Agent` header.
43    pub user_agent: Option<String>,
44    /// Value for the `X-Title` header.
45    pub x_title: Option<String>,
46    /// Value for both `Referer` and `HTTP-Referer` headers.
47    pub http_referer: Option<String>,
48    /// Value for the `X-GITHUB-AUTHORIZATION` header.
49    pub x_github_authorization: Option<Arc<String>>,
50    /// Value for the `X-OPENROUTER-AUTHORIZATION` header.
51    pub x_openrouter_authorization: Option<Arc<String>>,
52    /// Values for the `X-MCP-AUTHORIZATION` header (JSON-encoded).
53    pub x_mcp_authorization: Option<Arc<std::collections::HashMap<String, String>>>,
54    /// Value for the `X-VIEWER-SIGNATURE` header.
55    pub x_viewer_signature: Option<Arc<String>>,
56    /// Value for the `X-VIEWER-ADDRESS` header.
57    pub x_viewer_address: Option<Arc<String>>,
58    /// Value for the `X-COMMIT-AUTHOR-NAME` header.
59    pub x_commit_author_name: Option<Arc<String>>,
60    /// Value for the `X-COMMIT-AUTHOR-EMAIL` header.
61    pub x_commit_author_email: Option<Arc<String>>,
62    /// Value for the `X-OBJECTIVEAI-AGENT-ID` header.
63    pub agent_id: Option<Arc<String>>,
64}
65
66impl HttpClient {
67    /// Creates a new HTTP client.
68    ///
69    /// # Arguments
70    ///
71    /// * `http_client` - The reqwest client to use for requests
72    /// * `address` - Base URL for API requests (defaults to `https://api.objectiveai.dev`)
73    /// * `authorization` - API key for authentication
74    /// * `user_agent` - Optional User-Agent header value
75    /// * `x_title` - Optional X-Title header value
76    /// * `http_referer` - Optional Referer header value
77    /// * `x_github_authorization` - Optional X-GITHUB-AUTHORIZATION header value
78    /// * `x_openrouter_authorization` - Optional X-OPENROUTER-AUTHORIZATION header value
79    /// * `x_mcp_authorization` - Optional X-MCP-AUTHORIZATION header value (HashMap)
80    /// * `x_viewer_signature` - Optional X-VIEWER-SIGNATURE header value
81    /// * `x_viewer_address` - Optional X-VIEWER-ADDRESS header value
82    /// * `x_commit_author_name` - Optional X-COMMIT-AUTHOR-NAME header value
83    /// * `x_commit_author_email` - Optional X-COMMIT-AUTHOR-EMAIL header value
84    /// * `agent_id` - Optional X-OBJECTIVEAI-AGENT-ID header value
85    pub fn new(
86        http_client: reqwest::Client,
87        address: Option<impl Into<String>>,
88        authorization: Option<impl Into<String>>,
89        user_agent: Option<impl Into<String>>,
90        x_title: Option<impl Into<String>>,
91        http_referer: Option<impl Into<String>>,
92        x_github_authorization: Option<impl Into<String>>,
93        x_openrouter_authorization: Option<impl Into<String>>,
94        x_mcp_authorization: Option<std::collections::HashMap<String, String>>,
95        x_viewer_signature: Option<impl Into<String>>,
96        x_viewer_address: Option<impl Into<String>>,
97        x_commit_author_name: Option<impl Into<String>>,
98        x_commit_author_email: Option<impl Into<String>>,
99        agent_id: Option<impl Into<String>>,
100    ) -> Self {
101        #[cfg(feature = "env")]
102        let env = |name: &str| -> Option<String> { std::env::var(name).ok() };
103
104        Self {
105            http_client,
106            address: match address {
107                Some(base) => base.into(),
108                #[cfg(feature = "env")]
109                None => env("OBJECTIVEAI_ADDRESS")
110                    .unwrap_or_else(|| "https://api.objectiveai.dev".to_string()),
111                #[cfg(not(feature = "env"))]
112                None => "https://api.objectiveai.dev".to_string(),
113            },
114            authorization: authorization.map(|k| Arc::new(k.into()))
115                .or_else(|| { #[cfg(feature = "env")] { env("OBJECTIVEAI_AUTHORIZATION").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
116            user_agent: user_agent.map(Into::into)
117                .or_else(|| { #[cfg(feature = "env")] { env("USER_AGENT") } #[cfg(not(feature = "env"))] { None } }),
118            x_title: x_title.map(Into::into)
119                .or_else(|| { #[cfg(feature = "env")] { env("X_TITLE") } #[cfg(not(feature = "env"))] { None } }),
120            http_referer: http_referer.map(Into::into)
121                .or_else(|| { #[cfg(feature = "env")] { env("HTTP_REFERER") } #[cfg(not(feature = "env"))] { None } }),
122            x_github_authorization: x_github_authorization.map(|v| Arc::new(v.into()))
123                .or_else(|| { #[cfg(feature = "env")] { env("GITHUB_AUTHORIZATION").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
124            x_openrouter_authorization: x_openrouter_authorization.map(|v| Arc::new(v.into()))
125                .or_else(|| { #[cfg(feature = "env")] { env("OPENROUTER_AUTHORIZATION").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
126            x_mcp_authorization: x_mcp_authorization.map(Arc::new)
127                .or_else(|| { #[cfg(feature = "env")] { env("MCP_AUTHORIZATION").and_then(|v| serde_json::from_str(&v).ok()).map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
128            x_viewer_signature: x_viewer_signature.map(|v| Arc::new(v.into()))
129                .or_else(|| { #[cfg(feature = "env")] { env("VIEWER_SIGNATURE").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
130            x_viewer_address: x_viewer_address.map(|v| Arc::new(v.into()))
131                .or_else(|| { #[cfg(feature = "env")] { env("VIEWER_ADDRESS").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
132            x_commit_author_name: x_commit_author_name.map(|v| Arc::new(v.into()))
133                .or_else(|| { #[cfg(feature = "env")] { env("COMMIT_AUTHOR_NAME").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
134            x_commit_author_email: x_commit_author_email.map(|v| Arc::new(v.into()))
135                .or_else(|| { #[cfg(feature = "env")] { env("COMMIT_AUTHOR_EMAIL").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
136            agent_id: agent_id.map(|v| Arc::new(v.into()))
137                .or_else(|| { #[cfg(feature = "env")] { env("OBJECTIVEAI_AGENT_ID").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
138        }
139    }
140
141    /// Builds a request with authentication and custom headers.
142    fn request(
143        &self,
144        method: reqwest::Method,
145        path: &str,
146        body: Option<impl serde::Serialize>,
147    ) -> reqwest::RequestBuilder {
148        let url = format!(
149            "{}/{}",
150            self.address.trim_end_matches('/'),
151            path.trim_start_matches('/')
152        );
153        let mut request = self.http_client.request(method, &url);
154        if let Some(authorization) = &self.authorization {
155            let key = authorization.strip_prefix("Bearer ").unwrap_or(authorization);
156            request =
157                request.header("authorization", format!("Bearer {}", key));
158        }
159        if let Some(user_agent) = &self.user_agent {
160            request = request.header("user-agent", user_agent);
161        }
162        if let Some(x_title) = &self.x_title {
163            request = request.header("x-title", x_title);
164        }
165        if let Some(http_referer) = &self.http_referer {
166            request = request.header("referer", http_referer);
167            request = request.header("http-referer", http_referer);
168        }
169        if let Some(token) = &self.x_github_authorization {
170            request = request.header("X-GITHUB-AUTHORIZATION", token.as_str());
171        }
172        if let Some(token) = &self.x_openrouter_authorization {
173            request = request.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
174        }
175        if let Some(headers) = &self.x_mcp_authorization {
176            if let Ok(json) = serde_json::to_string(headers.as_ref()) {
177                request = request.header("X-MCP-AUTHORIZATION", json);
178            }
179        }
180        if let Some(sig) = &self.x_viewer_signature {
181            request = request.header("X-VIEWER-SIGNATURE", sig.as_str());
182        }
183        if let Some(addr) = &self.x_viewer_address {
184            request = request.header("X-VIEWER-ADDRESS", addr.as_str());
185        }
186        if let Some(name) = &self.x_commit_author_name {
187            request = request.header("X-COMMIT-AUTHOR-NAME", name.as_str());
188        }
189        if let Some(email) = &self.x_commit_author_email {
190            request = request.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
191        }
192        if let Some(id) = &self.agent_id {
193            request = request.header("X-OBJECTIVEAI-AGENT-ID", id.as_str());
194        }
195        if let Some(body) = body {
196            request = request.json(&body);
197        }
198        request
199    }
200
201    /// Sends a unary (request-response) API call and deserializes the response.
202    ///
203    /// # Type Parameters
204    ///
205    /// * `T` - The expected response type to deserialize into
206    ///
207    /// # Errors
208    ///
209    /// Returns [`super::HttpError`] if the request fails, returns a non-success status,
210    /// or the response cannot be deserialized.
211    pub async fn send_unary<T: serde::de::DeserializeOwned + Send + 'static>(
212        &self,
213        method: reqwest::Method,
214        path: impl AsRef<str>,
215        body: Option<impl serde::Serialize>,
216    ) -> Result<T, super::HttpError> {
217        let response = self
218            .http_client
219            .execute(
220                self.request(method, path.as_ref(), body)
221                    .build()
222                    .map_err(super::HttpError::RequestError)?,
223            )
224            .await
225            .map_err(super::HttpError::HttpError)?;
226        let code = response.status();
227        if code.is_success() {
228            let text =
229                response.text().await.map_err(super::HttpError::HttpError)?;
230            let mut de = serde_json::Deserializer::from_str(&text);
231            match serde_path_to_error::deserialize::<_, T>(&mut de) {
232                Ok(value) => Ok(value),
233                Err(e) => Err(super::HttpError::DeserializationError(e)),
234            }
235        } else {
236            match response.text().await {
237                Ok(text) => Err(super::HttpError::BadStatus {
238                    code,
239                    body: match serde_json::from_str::<serde_json::Value>(&text)
240                    {
241                        Ok(body) => body,
242                        Err(_) => serde_json::Value::String(text),
243                    },
244                }),
245                Err(_) => Err(super::HttpError::BadStatus {
246                    code,
247                    body: serde_json::Value::Null,
248                }),
249            }
250        }
251    }
252
253    /// Sends a unary API call that expects no response body.
254    ///
255    /// Useful for DELETE or other operations that only return a status code.
256    ///
257    /// # Errors
258    ///
259    /// Returns [`super::HttpError`] if the request fails or returns a non-success status.
260    pub async fn send_unary_no_response(
261        &self,
262        method: reqwest::Method,
263        path: impl AsRef<str>,
264        body: Option<impl serde::Serialize>,
265    ) -> Result<(), super::HttpError> {
266        let response = self
267            .http_client
268            .execute(
269                self.request(method, path.as_ref(), body)
270                    .build()
271                    .map_err(super::HttpError::RequestError)?,
272            )
273            .await
274            .map_err(super::HttpError::HttpError)?;
275        let code = response.status();
276        if code.is_success() {
277            Ok(())
278        } else {
279            match response.text().await {
280                Ok(text) => Err(super::HttpError::BadStatus {
281                    code,
282                    body: match serde_json::from_str::<serde_json::Value>(&text)
283                    {
284                        Ok(body) => body,
285                        Err(_) => serde_json::Value::String(text),
286                    },
287                }),
288                Err(_) => Err(super::HttpError::BadStatus {
289                    code,
290                    body: serde_json::Value::Null,
291                }),
292            }
293        }
294    }
295
296    /// Sends a streaming API call using Server-Sent Events (SSE).
297    ///
298    /// Returns a stream of deserialized chunks. The stream automatically handles:
299    /// - SSE `[DONE]` messages (filtered out)
300    /// - Comment lines starting with `:` (filtered out)
301    /// - Empty data lines (filtered out)
302    /// - API errors embedded in stream data
303    ///
304    /// # Type Parameters
305    ///
306    /// * `T` - The expected chunk type to deserialize each SSE message into
307    ///
308    /// # Errors
309    ///
310    /// Returns [`super::HttpError`] if the stream cannot be established.
311    pub async fn send_streaming<
312        T: serde::de::DeserializeOwned + Send + 'static,
313        P: AsRef<str> + Send,
314        B: serde::Serialize + Send,
315    >(
316        &self,
317        method: reqwest::Method,
318        path: P,
319        body: Option<B>,
320    ) -> Result<
321        impl Stream<Item = Result<T, super::HttpError>>
322        + Send
323        + 'static
324        + use<T, P, B>,
325        super::HttpError,
326    > {
327        // Stop the stream at [DONE] to prevent reqwest_eventsource from
328        // auto-reconnecting. Uses take_while on the raw SSE events, then
329        // maps/filters the remaining events into typed chunks.
330        // Stamps X-Transport: sse so the API's transport dispatcher
331        // routes this to the SSE branch (the API default is WS).
332        Ok(
333            self.request(method, path.as_ref(), body)
334                .header("X-Transport", "sse")
335                .eventsource()?
336                .take_while(|result| {
337                    let dominated = matches!(
338                        result,
339                        Ok(Event::Message(MessageEvent { data, .. })) if data == "[DONE]"
340                    );
341                    async move { !dominated }
342                })
343                .then(|result| async {
344                    match result {
345                        Ok(Event::Open) => None,
346                        Ok(Event::Message(MessageEvent { data, .. }))
347                            if data.starts_with(":")
348                                || data.is_empty() =>
349                        {
350                            None
351                        }
352                        Ok(Event::Message(MessageEvent { data, .. })) => {
353                            let mut de =
354                                serde_json::Deserializer::from_str(&data);
355                            Some(
356                                match serde_path_to_error::deserialize::<_, T>(
357                                    &mut de,
358                                ) {
359                                    Ok(value) => Ok(value),
360                                    Err(e) => match serde_json::from_str::<error::ResponseError>(&data) {
361                                        Ok(err) => Err(super::HttpError::ApiError(err)),
362                                        Err(_) => Err(super::HttpError::DeserializationError(e)),
363                                    },
364                                }
365                            )
366                        }
367                        Err(reqwest_eventsource::Error::InvalidStatusCode(
368                            code,
369                            response,
370                        )) => match response.text().await {
371                            Ok(body) => {
372                                Some(Err(super::HttpError::BadStatus {
373                                    code,
374                                    body: match serde_json::from_str::<
375                                        serde_json::Value,
376                                    >(
377                                        &body
378                                    ) {
379                                        Ok(body) => body,
380                                        Err(_) => {
381                                            serde_json::Value::String(body)
382                                        }
383                                    },
384                                }))
385                            }
386                            Err(_) => Some(Err(super::HttpError::BadStatus {
387                                code,
388                                body: serde_json::Value::Null,
389                            })),
390                        },
391                        Err(e) => Some(Err(super::HttpError::StreamError(e))),
392                    }
393                })
394                .filter_map(|x| async { x }),
395        )
396    }
397
398    /// WebSocket variant of [`Self::send_streaming`]. Opens a WS to
399    /// the configured `address`, sends `body` as the first text
400    /// frame, then demultiplexes inbound frames into:
401    ///
402    /// - Chunk frames (yielded on the returned [`Stream`]).
403    /// - [`client_response::Response`](crate::client_objectiveai_mcp::client_response::Response)
404    ///   frames (routed to the [`super::Notifier`]'s pending-id map).
405    /// - [`server_request::Request`](crate::client_objectiveai_mcp::server_request::Request)
406    ///   frames (dispatched to `handler`; the result is written back
407    ///   as a `server_response::Response` echoing the request id).
408    ///
409    /// Both the returned `Stream` and the returned [`super::Notifier`]
410    /// share the underlying WebSocket: dropping both stops the demux
411    /// task and closes the connection cleanly. Dropping only one
412    /// keeps the WS alive — useful when a caller wants to send
413    /// notifies after the chunk stream has finished, or vice-versa.
414    pub async fn send_streaming_ws<Chunk, B, H, P>(
415        &self,
416        method: reqwest::Method,
417        path: P,
418        body: B,
419        handler: H,
420    ) -> Result<
421        (
422            impl Stream<Item = Result<Chunk, super::HttpError>>
423                + Send
424                + Unpin
425                + 'static
426                + use<Chunk, B, H, P>,
427            super::Notifier,
428        ),
429        super::HttpError,
430    >
431    where
432        Chunk: serde::de::DeserializeOwned + Send + 'static,
433        B: serde::Serialize + Send + 'static,
434        H: super::McpHandler,
435        P: AsRef<str>,
436    {
437        use crate::client_objectiveai_mcp::{
438            client_response::Response as ClientResponse,
439            server_request::Request as ServerRequest,
440        };
441        use futures::stream::SplitStream;
442        use tokio::net::TcpStream;
443        use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
444
445        // Translate the configured `address` (http(s)://...) into a
446        // ws(s):// URL. Path is appended directly.
447        let url = format!(
448            "{}/{}",
449            self.address.trim_end_matches('/'),
450            path.as_ref().trim_start_matches('/')
451        );
452        let ws_url = if let Some(rest) = url.strip_prefix("https://") {
453            format!("wss://{rest}")
454        } else if let Some(rest) = url.strip_prefix("http://") {
455            format!("ws://{rest}")
456        } else {
457            url.clone()
458        };
459        let _ = method; // axum's WS route is `any(...)`, method is ignored on the wire.
460
461        // Build the upgrade request manually so we can apply the
462        // same auth + custom headers `request()` does for HTTP.
463        let mut req = tungstenite::handshake::client::Request::builder()
464            .method("GET")
465            .uri(&ws_url)
466            .header("Host", reqwest::Url::parse(&url).ok().and_then(|u| u.host_str().map(str::to_owned)).unwrap_or_default())
467            .header("Upgrade", "websocket")
468            .header("Connection", "Upgrade")
469            .header(
470                "Sec-WebSocket-Key",
471                tungstenite::handshake::client::generate_key(),
472            )
473            .header("Sec-WebSocket-Version", "13")
474            .header("X-Transport", "ws");
475        if let Some(authorization) = &self.authorization {
476            let key = authorization
477                .strip_prefix("Bearer ")
478                .unwrap_or(authorization.as_str());
479            req = req.header("authorization", format!("Bearer {}", key));
480        }
481        if let Some(ua) = &self.user_agent {
482            req = req.header("user-agent", ua);
483        }
484        if let Some(x_title) = &self.x_title {
485            req = req.header("x-title", x_title);
486        }
487        if let Some(http_referer) = &self.http_referer {
488            req = req.header("referer", http_referer);
489            req = req.header("http-referer", http_referer);
490        }
491        if let Some(token) = &self.x_github_authorization {
492            req = req.header("X-GITHUB-AUTHORIZATION", token.as_str());
493        }
494        if let Some(token) = &self.x_openrouter_authorization {
495            req = req.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
496        }
497        if let Some(headers) = &self.x_mcp_authorization {
498            if let Ok(json) = serde_json::to_string(headers.as_ref()) {
499                req = req.header("X-MCP-AUTHORIZATION", json);
500            }
501        }
502        if let Some(sig) = &self.x_viewer_signature {
503            req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
504        }
505        if let Some(addr) = &self.x_viewer_address {
506            req = req.header("X-VIEWER-ADDRESS", addr.as_str());
507        }
508        if let Some(name) = &self.x_commit_author_name {
509            req = req.header("X-COMMIT-AUTHOR-NAME", name.as_str());
510        }
511        if let Some(email) = &self.x_commit_author_email {
512            req = req.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
513        }
514        if let Some(id) = &self.agent_id {
515            req = req.header("X-OBJECTIVEAI-AGENT-ID", id.as_str());
516        }
517        let req = req
518            .body(())
519            .map_err(|e| super::HttpError::WsConnect(tungstenite::Error::Http(
520                tungstenite::http::Response::builder()
521                    .status(400)
522                    .body(Some(e.to_string().into_bytes()))
523                    .unwrap(),
524            )))?;
525
526        let (ws_stream, _resp) = tokio_tungstenite::connect_async(req).await?;
527        let (mut sink, rx_stream): (
528            _,
529            SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
530        ) = ws_stream.split();
531
532        // Send the body as the first text frame.
533        let body_frame = serde_json::to_string(&body)
534            .map_err(super::HttpError::NotifySerialize)?;
535        sink.send(tungstenite::Message::Text(body_frame.into()))
536            .await
537            .map_err(super::HttpError::NotifySend)?;
538
539        // Build the per-connection state shared with Notifier + demux.
540        let sink: super::notifier::SharedSink = Arc::new(tokio::sync::Mutex::new(sink));
541        let pending: super::notifier::PendingNotifies =
542            Arc::new(dashmap::DashMap::new());
543
544        // mpsc the demux task pushes chunks (or terminal errors) into.
545        // Use futures::channel::mpsc so the rx side is `impl Stream`
546        // without pulling in tokio-stream.
547        let (chunk_tx, chunk_rx) =
548            futures::channel::mpsc::unbounded::<Result<Chunk, super::HttpError>>();
549
550        let demux_sink = sink.clone();
551        let demux_pending = pending.clone();
552        let handler = Arc::new(handler);
553        tokio::spawn(async move {
554            let mut rx_stream = rx_stream;
555            let mut chunk_tx = chunk_tx;
556            loop {
557                let msg = match rx_stream.next().await {
558                    Some(m) => m,
559                    None => {
560                        break;
561                    }
562                };
563                let text = match msg {
564                    Ok(tungstenite::Message::Text(t)) => {
565                        let s = t.to_string();
566                        s
567                    }
568                    Ok(tungstenite::Message::Binary(_)) => {
569                        continue;
570                    }
571                    Ok(
572                        tungstenite::Message::Ping(_) | tungstenite::Message::Pong(_),
573                    ) => continue,
574                    Ok(tungstenite::Message::Close(_)) => {
575                        break;
576                    }
577                    Ok(tungstenite::Message::Frame(_)) => continue,
578                    Err(_) => {
579                        break;
580                    }
581                };
582
583                // Classification: try client_response, then
584                // server_request, then chunk. Order matters — chunks
585                // tend to have many fields; the envelopes have a
586                // distinctive `id` + tagged `type`.
587                if let Ok(response) = serde_json::from_str::<ClientResponse>(&text) {
588                    let id = response.id().to_string();
589                    if let Some((_, tx)) = demux_pending.remove(&id) {
590                        let _ = tx.send(response);
591                    }
592                    continue;
593                }
594                if let Ok(request) = serde_json::from_str::<ServerRequest>(&text) {
595                    let id = request.id.clone();
596                    let method = request
597                        .body
598                        .as_ref()
599                        .and_then(|v| v.get("method"))
600                        .and_then(|v| v.as_str())
601                        .unwrap_or("")
602                        .to_string();
603                    let handler = handler.clone();
604                    let demux_sink = demux_sink.clone();
605                    tokio::spawn(async move {
606                        let id = id;
607                        // Handler returns the full response (incl.
608                        // matching id); we just frame + write it.
609                        let response = handler.handle(request).await;
610                        let frame = match serde_json::to_string(&response) {
611                            Ok(s) => s,
612                            Err(_) => {
613                                return;
614                            }
615                        };
616                        let mut guard = demux_sink.lock().await;
617                        let send_result = guard
618                            .send(tungstenite::Message::Text(frame.into()))
619                            .await;
620                    });
621                    continue;
622                }
623
624                // Chunk path.
625                let mut de = serde_json::Deserializer::from_str(&text);
626                match serde_path_to_error::deserialize::<_, Chunk>(&mut de) {
627                    Ok(chunk) => {
628                        if chunk_tx.unbounded_send(Ok(chunk)).is_err() {
629                            break;
630                        }
631                    }
632                    Err(e) => {
633                        // Try to parse as a ResponseError before
634                        // surfacing the deserialization failure.
635                        let err = match serde_json::from_str::<error::ResponseError>(&text) {
636                            Ok(api_err) => super::HttpError::ApiError(api_err),
637                            Err(_) => super::HttpError::DeserializationError(e),
638                        };
639                        let _ = chunk_tx.unbounded_send(Err(err));
640                        break;
641                    }
642                }
643            }
644            // Make sure any awaiting Notifier futures unblock when
645            // we exit — dropping the dashmap fires every oneshot
646            // Sender's drop, which causes the rx side to error.
647            drop(demux_pending);
648            drop(chunk_tx);
649        });
650
651        let notifier = super::Notifier::new(sink, pending);
652        Ok((chunk_rx, notifier))
653    }
654}