Skip to main content

objectiveai_sdk/http/
client.rs

1//! HTTP client implementation for ObjectiveAI API.
2
3use crate::error;
4use eventsource_stream::Event as MessageEvent;
5use futures::{SinkExt, Stream, StreamExt};
6use reqwest_eventsource::{Event, RequestBuilderExt};
7use std::sync::Arc;
8use tokio_tungstenite::tungstenite;
9
10/// HTTP client for making requests to the ObjectiveAI API.
11///
12/// Handles authentication, request building, and response parsing for both
13/// unary and streaming endpoints.
14///
15/// # Example
16///
17/// ```ignore
18/// let client = HttpClient::new(
19///     reqwest::Client::new(),
20///     None, // Use default address
21///     Some("your-api-key"),
22///     None, // user_agent
23///     None, // x_title
24///     None, // http_referer
25///     None, // x_github_authorization
26///     None, // x_openrouter_authorization
27///     None, // x_mcp_authorization
28///     None, // x_viewer_signature
29///     None, // x_viewer_address
30/// );
31/// ```
32#[derive(Debug, Clone)]
33pub struct HttpClient {
34    /// The underlying reqwest HTTP client.
35    pub http_client: reqwest::Client,
36    /// Base URL for API requests. Defaults to `https://api.objectiveai.dev`.
37    pub address: String,
38    /// API key for authentication. Sent as `Bearer` token in `Authorization` header.
39    pub authorization: Option<Arc<String>>,
40    /// Value for the `User-Agent` header.
41    pub user_agent: Option<String>,
42    /// Value for the `X-Title` header.
43    pub x_title: Option<String>,
44    /// Value for both `Referer` and `HTTP-Referer` headers.
45    pub http_referer: Option<String>,
46    /// Value for the `X-GITHUB-AUTHORIZATION` header.
47    pub x_github_authorization: Option<Arc<String>>,
48    /// Value for the `X-OPENROUTER-AUTHORIZATION` header.
49    pub x_openrouter_authorization: Option<Arc<String>>,
50    /// Values for the `X-MCP-AUTHORIZATION` header (JSON-encoded).
51    pub x_mcp_authorization:
52        Option<Arc<std::collections::HashMap<String, String>>>,
53    /// Value for the `X-VIEWER-SIGNATURE` header.
54    pub x_viewer_signature: Option<Arc<String>>,
55    /// Value for the `X-VIEWER-ADDRESS` header.
56    pub x_viewer_address: Option<Arc<String>>,
57    /// Value for the `X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY` header.
58    pub agent_instance_hierarchy: Option<Arc<String>>,
59    /// Value for the `Mcp-Session-Id` header — propagated through to
60    /// `objectiveai-mcp` so server-side tool invocations see a stable
61    /// per-session id. See `objectiveai_sdk::mcp::MCP_SESSION_ID_HEADER`.
62    pub mcp_session_id: Option<Arc<String>>,
63}
64
65impl HttpClient {
66    /// Creates a new HTTP client.
67    ///
68    /// # Arguments
69    ///
70    /// * `http_client` - The reqwest client to use for requests
71    /// * `address` - Base URL for API requests (defaults to `https://api.objectiveai.dev`)
72    /// * `authorization` - API key for authentication
73    /// * `user_agent` - Optional User-Agent header value
74    /// * `x_title` - Optional X-Title header value
75    /// * `http_referer` - Optional Referer header value
76    /// * `x_github_authorization` - Optional X-GITHUB-AUTHORIZATION header value
77    /// * `x_openrouter_authorization` - Optional X-OPENROUTER-AUTHORIZATION header value
78    /// * `x_mcp_authorization` - Optional X-MCP-AUTHORIZATION header value (HashMap)
79    /// * `x_viewer_signature` - Optional X-VIEWER-SIGNATURE header value
80    /// * `x_viewer_address` - Optional X-VIEWER-ADDRESS header value
81    /// * `agent_instance_hierarchy` - Optional X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY header value
82    /// * `mcp_session_id` - Optional Mcp-Session-Id header value
83    pub fn new(
84        http_client: reqwest::Client,
85        address: Option<impl Into<String>>,
86        authorization: Option<impl Into<String>>,
87        user_agent: Option<impl Into<String>>,
88        x_title: Option<impl Into<String>>,
89        http_referer: Option<impl Into<String>>,
90        x_github_authorization: Option<impl Into<String>>,
91        x_openrouter_authorization: Option<impl Into<String>>,
92        x_mcp_authorization: Option<std::collections::HashMap<String, String>>,
93        x_viewer_signature: Option<impl Into<String>>,
94        x_viewer_address: Option<impl Into<String>>,
95        agent_instance_hierarchy: Option<impl Into<String>>,
96        mcp_session_id: Option<impl Into<String>>,
97    ) -> Self {
98        #[cfg(feature = "env")]
99        let env = |name: &str| -> Option<String> { std::env::var(name).ok() };
100
101        Self {
102            http_client,
103            address: match address {
104                Some(base) => base.into(),
105                #[cfg(feature = "env")]
106                None => env("OBJECTIVEAI_ADDRESS").unwrap_or_else(|| {
107                    "https://api.objectiveai.dev".to_string()
108                }),
109                #[cfg(not(feature = "env"))]
110                None => "https://api.objectiveai.dev".to_string(),
111            },
112            authorization: authorization.map(|k| Arc::new(k.into())).or_else(
113                || {
114                    #[cfg(feature = "env")]
115                    {
116                        env("OBJECTIVEAI_AUTHORIZATION").map(Arc::new)
117                    }
118                    #[cfg(not(feature = "env"))]
119                    {
120                        None
121                    }
122                },
123            ),
124            user_agent: user_agent.map(Into::into).or_else(|| {
125                #[cfg(feature = "env")]
126                {
127                    env("USER_AGENT")
128                }
129                #[cfg(not(feature = "env"))]
130                {
131                    None
132                }
133            }),
134            x_title: x_title.map(Into::into).or_else(|| {
135                #[cfg(feature = "env")]
136                {
137                    env("X_TITLE")
138                }
139                #[cfg(not(feature = "env"))]
140                {
141                    None
142                }
143            }),
144            http_referer: http_referer.map(Into::into).or_else(|| {
145                #[cfg(feature = "env")]
146                {
147                    env("HTTP_REFERER")
148                }
149                #[cfg(not(feature = "env"))]
150                {
151                    None
152                }
153            }),
154            x_github_authorization: x_github_authorization
155                .map(|v| Arc::new(v.into()))
156                .or_else(|| {
157                    #[cfg(feature = "env")]
158                    {
159                        env("GITHUB_AUTHORIZATION").map(Arc::new)
160                    }
161                    #[cfg(not(feature = "env"))]
162                    {
163                        None
164                    }
165                }),
166            x_openrouter_authorization: x_openrouter_authorization
167                .map(|v| Arc::new(v.into()))
168                .or_else(|| {
169                    #[cfg(feature = "env")]
170                    {
171                        env("OPENROUTER_AUTHORIZATION").map(Arc::new)
172                    }
173                    #[cfg(not(feature = "env"))]
174                    {
175                        None
176                    }
177                }),
178            x_mcp_authorization: x_mcp_authorization.map(Arc::new).or_else(
179                || {
180                    #[cfg(feature = "env")]
181                    {
182                        env("MCP_AUTHORIZATION")
183                            .and_then(|v| serde_json::from_str(&v).ok())
184                            .map(Arc::new)
185                    }
186                    #[cfg(not(feature = "env"))]
187                    {
188                        None
189                    }
190                },
191            ),
192            x_viewer_signature: x_viewer_signature
193                .map(|v| Arc::new(v.into()))
194                .or_else(|| {
195                    #[cfg(feature = "env")]
196                    {
197                        env("VIEWER_SIGNATURE").map(Arc::new)
198                    }
199                    #[cfg(not(feature = "env"))]
200                    {
201                        None
202                    }
203                }),
204            x_viewer_address: x_viewer_address
205                .map(|v| Arc::new(v.into()))
206                .or_else(|| {
207                    #[cfg(feature = "env")]
208                    {
209                        env("VIEWER_ADDRESS").map(Arc::new)
210                    }
211                    #[cfg(not(feature = "env"))]
212                    {
213                        None
214                    }
215                }),
216            agent_instance_hierarchy: agent_instance_hierarchy.map(|v| Arc::new(v.into())).or_else(|| {
217                #[cfg(feature = "env")]
218                {
219                    env("OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY").map(Arc::new)
220                }
221                #[cfg(not(feature = "env"))]
222                {
223                    None
224                }
225            }),
226            mcp_session_id: mcp_session_id.map(|v| Arc::new(v.into())).or_else(
227                || {
228                    #[cfg(feature = "env")]
229                    {
230                        env(crate::mcp::MCP_SESSION_ID_ENV).map(Arc::new)
231                    }
232                    #[cfg(not(feature = "env"))]
233                    {
234                        None
235                    }
236                },
237            ),
238        }
239    }
240
241    /// Builds a request with authentication and custom headers.
242    fn request(
243        &self,
244        method: reqwest::Method,
245        path: &str,
246        body: Option<impl serde::Serialize>,
247    ) -> reqwest::RequestBuilder {
248        let url = format!(
249            "{}/{}",
250            self.address.trim_end_matches('/'),
251            path.trim_start_matches('/')
252        );
253        let mut request = self.http_client.request(method, &url);
254        if let Some(authorization) = &self.authorization {
255            let key = authorization
256                .strip_prefix("Bearer ")
257                .unwrap_or(authorization);
258            request =
259                request.header("authorization", format!("Bearer {}", key));
260        }
261        if let Some(user_agent) = &self.user_agent {
262            request = request.header("user-agent", user_agent);
263        }
264        if let Some(x_title) = &self.x_title {
265            request = request.header("x-title", x_title);
266        }
267        if let Some(http_referer) = &self.http_referer {
268            request = request.header("referer", http_referer);
269            request = request.header("http-referer", http_referer);
270        }
271        if let Some(token) = &self.x_github_authorization {
272            request = request.header("X-GITHUB-AUTHORIZATION", token.as_str());
273        }
274        if let Some(token) = &self.x_openrouter_authorization {
275            request =
276                request.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
277        }
278        if let Some(headers) = &self.x_mcp_authorization {
279            if let Ok(json) = serde_json::to_string(headers.as_ref()) {
280                request = request.header("X-MCP-AUTHORIZATION", json);
281            }
282        }
283        if let Some(sig) = &self.x_viewer_signature {
284            request = request.header("X-VIEWER-SIGNATURE", sig.as_str());
285        }
286        if let Some(addr) = &self.x_viewer_address {
287            request = request.header("X-VIEWER-ADDRESS", addr.as_str());
288        }
289        if let Some(id) = &self.agent_instance_hierarchy {
290            request = request.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
291        }
292        if let Some(s) = &self.mcp_session_id {
293            request =
294                request.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
295        }
296        if let Some(body) = body {
297            request = request.json(&body);
298        }
299        request
300    }
301
302    /// Sends a unary (request-response) API call and deserializes the response.
303    ///
304    /// # Type Parameters
305    ///
306    /// * `T` - The expected response type to deserialize into
307    ///
308    /// # Errors
309    ///
310    /// Returns [`super::HttpError`] if the request fails, returns a non-success status,
311    /// or the response cannot be deserialized.
312    pub async fn send_unary<T: serde::de::DeserializeOwned + Send + 'static>(
313        &self,
314        method: reqwest::Method,
315        path: impl AsRef<str>,
316        body: Option<impl serde::Serialize>,
317    ) -> Result<T, super::HttpError> {
318        let response = self
319            .http_client
320            .execute(
321                self.request(method, path.as_ref(), body)
322                    .build()
323                    .map_err(super::HttpError::RequestError)?,
324            )
325            .await
326            .map_err(super::HttpError::HttpError)?;
327        let code = response.status();
328        if code.is_success() {
329            let text =
330                response.text().await.map_err(super::HttpError::HttpError)?;
331            let mut de = serde_json::Deserializer::from_str(&text);
332            match serde_path_to_error::deserialize::<_, T>(&mut de) {
333                Ok(value) => Ok(value),
334                Err(e) => Err(super::HttpError::DeserializationError(e)),
335            }
336        } else {
337            match response.text().await {
338                Ok(text) => Err(super::HttpError::BadStatus {
339                    code,
340                    body: match serde_json::from_str::<serde_json::Value>(&text)
341                    {
342                        Ok(body) => body,
343                        Err(_) => serde_json::Value::String(text),
344                    },
345                }),
346                Err(_) => Err(super::HttpError::BadStatus {
347                    code,
348                    body: serde_json::Value::Null,
349                }),
350            }
351        }
352    }
353
354    /// Sends a unary API call that expects no response body.
355    ///
356    /// Useful for DELETE or other operations that only return a status code.
357    ///
358    /// # Errors
359    ///
360    /// Returns [`super::HttpError`] if the request fails or returns a non-success status.
361    pub async fn send_unary_no_response(
362        &self,
363        method: reqwest::Method,
364        path: impl AsRef<str>,
365        body: Option<impl serde::Serialize>,
366    ) -> Result<(), super::HttpError> {
367        let response = self
368            .http_client
369            .execute(
370                self.request(method, path.as_ref(), body)
371                    .build()
372                    .map_err(super::HttpError::RequestError)?,
373            )
374            .await
375            .map_err(super::HttpError::HttpError)?;
376        let code = response.status();
377        if code.is_success() {
378            Ok(())
379        } else {
380            match response.text().await {
381                Ok(text) => Err(super::HttpError::BadStatus {
382                    code,
383                    body: match serde_json::from_str::<serde_json::Value>(&text)
384                    {
385                        Ok(body) => body,
386                        Err(_) => serde_json::Value::String(text),
387                    },
388                }),
389                Err(_) => Err(super::HttpError::BadStatus {
390                    code,
391                    body: serde_json::Value::Null,
392                }),
393            }
394        }
395    }
396
397    /// Sends a streaming API call using Server-Sent Events (SSE).
398    ///
399    /// Returns a stream of deserialized chunks. The stream automatically handles:
400    /// - SSE `[DONE]` messages (filtered out)
401    /// - Comment lines starting with `:` (filtered out)
402    /// - Empty data lines (filtered out)
403    /// - API errors embedded in stream data
404    ///
405    /// # Type Parameters
406    ///
407    /// * `T` - The expected chunk type to deserialize each SSE message into
408    ///
409    /// # Errors
410    ///
411    /// Returns [`super::HttpError`] if the stream cannot be established.
412    pub async fn send_streaming<
413        T: serde::de::DeserializeOwned + Send + 'static,
414        P: AsRef<str> + Send,
415        B: serde::Serialize + Send,
416    >(
417        &self,
418        method: reqwest::Method,
419        path: P,
420        body: Option<B>,
421    ) -> Result<
422        impl Stream<Item = Result<T, super::HttpError>>
423        + Send
424        + 'static
425        + use<T, P, B>,
426        super::HttpError,
427    > {
428        // Stop the stream at [DONE] to prevent reqwest_eventsource from
429        // auto-reconnecting. Uses take_while on the raw SSE events, then
430        // maps/filters the remaining events into typed chunks.
431        // Stamps X-Transport: sse so the API's transport dispatcher
432        // routes this to the SSE branch (the API default is WS).
433        Ok(
434            self.request(method, path.as_ref(), body)
435                .header("X-Transport", "sse")
436                .eventsource()?
437                .take_while(|result| {
438                    let dominated = matches!(
439                        result,
440                        Ok(Event::Message(MessageEvent { data, .. })) if data == "[DONE]"
441                    );
442                    async move { !dominated }
443                })
444                .then(|result| async {
445                    match result {
446                        Ok(Event::Open) => None,
447                        Ok(Event::Message(MessageEvent { data, .. }))
448                            if data.starts_with(":")
449                                || data.is_empty() =>
450                        {
451                            None
452                        }
453                        Ok(Event::Message(MessageEvent { data, .. })) => {
454                            let mut de =
455                                serde_json::Deserializer::from_str(&data);
456                            Some(
457                                match serde_path_to_error::deserialize::<_, T>(
458                                    &mut de,
459                                ) {
460                                    Ok(value) => Ok(value),
461                                    Err(e) => match serde_json::from_str::<error::ResponseError>(&data) {
462                                        Ok(err) => Err(super::HttpError::ApiError(err)),
463                                        Err(_) => Err(super::HttpError::DeserializationError(e)),
464                                    },
465                                }
466                            )
467                        }
468                        Err(reqwest_eventsource::Error::InvalidStatusCode(
469                            code,
470                            response,
471                        )) => match response.text().await {
472                            Ok(body) => {
473                                Some(Err(super::HttpError::BadStatus {
474                                    code,
475                                    body: match serde_json::from_str::<
476                                        serde_json::Value,
477                                    >(
478                                        &body
479                                    ) {
480                                        Ok(body) => body,
481                                        Err(_) => {
482                                            serde_json::Value::String(body)
483                                        }
484                                    },
485                                }))
486                            }
487                            Err(_) => Some(Err(super::HttpError::BadStatus {
488                                code,
489                                body: serde_json::Value::Null,
490                            })),
491                        },
492                        Err(e) => Some(Err(super::HttpError::StreamError(e))),
493                    }
494                })
495                .filter_map(|x| async { x }),
496        )
497    }
498
499    /// WebSocket variant of [`Self::send_streaming`]. Opens a WS to
500    /// the configured `address`, sends `body` as the first text
501    /// frame, then demultiplexes inbound frames into:
502    ///
503    /// - Chunk frames (yielded on the returned [`Stream`]).
504    /// - [`client_response::Response`](crate::client_objectiveai_mcp::client_response::Response)
505    ///   frames (routed to the [`super::Notifier`]'s pending-id map).
506    /// - [`server_request::Request`](crate::client_objectiveai_mcp::server_request::Request)
507    ///   frames (dispatched to `handler`; the result is written back
508    ///   as a `server_response::Response` echoing the request id).
509    ///
510    /// Both the returned `Stream` and the returned [`super::Notifier`]
511    /// share the underlying WebSocket: dropping both stops the demux
512    /// task and closes the connection cleanly. Dropping only one
513    /// keeps the WS alive — useful when a caller wants to send
514    /// notifies after the chunk stream has finished, or vice-versa.
515    #[cfg(feature = "mcp")]
516    pub async fn send_streaming_ws<Chunk, B, H, P>(
517        &self,
518        method: reqwest::Method,
519        path: P,
520        body: B,
521        handler: H,
522    ) -> Result<
523        (
524            impl Stream<Item = Result<Chunk, super::HttpError>>
525            + Send
526            + Unpin
527            + 'static
528            + use<Chunk, B, H, P>,
529            super::Notifier,
530        ),
531        super::HttpError,
532    >
533    where
534        Chunk: serde::de::DeserializeOwned + Send + 'static,
535        B: serde::Serialize + Send + 'static,
536        H: super::McpHandler,
537        P: AsRef<str>,
538    {
539        use crate::client_objectiveai_mcp::{
540            client_response::Response as ClientResponse,
541            server_request::Request as ServerRequest,
542        };
543        use futures::stream::SplitStream;
544        use tokio::net::TcpStream;
545        use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
546
547        // Translate the configured `address` (http(s)://...) into a
548        // ws(s):// URL. Path is appended directly.
549        let url = format!(
550            "{}/{}",
551            self.address.trim_end_matches('/'),
552            path.as_ref().trim_start_matches('/')
553        );
554        let ws_url = if let Some(rest) = url.strip_prefix("https://") {
555            format!("wss://{rest}")
556        } else if let Some(rest) = url.strip_prefix("http://") {
557            format!("ws://{rest}")
558        } else {
559            url.clone()
560        };
561        let _ = method; // axum's WS route is `any(...)`, method is ignored on the wire.
562
563        // Build the upgrade request manually so we can apply the
564        // same auth + custom headers `request()` does for HTTP.
565        let mut req = tungstenite::handshake::client::Request::builder()
566            .method("GET")
567            .uri(&ws_url)
568            .header(
569                "Host",
570                reqwest::Url::parse(&url)
571                    .ok()
572                    .and_then(|u| u.host_str().map(str::to_owned))
573                    .unwrap_or_default(),
574            )
575            .header("Upgrade", "websocket")
576            .header("Connection", "Upgrade")
577            .header(
578                "Sec-WebSocket-Key",
579                tungstenite::handshake::client::generate_key(),
580            )
581            .header("Sec-WebSocket-Version", "13")
582            .header("X-Transport", "ws");
583        if let Some(authorization) = &self.authorization {
584            let key = authorization
585                .strip_prefix("Bearer ")
586                .unwrap_or(authorization.as_str());
587            req = req.header("authorization", format!("Bearer {}", key));
588        }
589        if let Some(ua) = &self.user_agent {
590            req = req.header("user-agent", ua);
591        }
592        if let Some(x_title) = &self.x_title {
593            req = req.header("x-title", x_title);
594        }
595        if let Some(http_referer) = &self.http_referer {
596            req = req.header("referer", http_referer);
597            req = req.header("http-referer", http_referer);
598        }
599        if let Some(token) = &self.x_github_authorization {
600            req = req.header("X-GITHUB-AUTHORIZATION", token.as_str());
601        }
602        if let Some(token) = &self.x_openrouter_authorization {
603            req = req.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
604        }
605        if let Some(headers) = &self.x_mcp_authorization {
606            if let Ok(json) = serde_json::to_string(headers.as_ref()) {
607                req = req.header("X-MCP-AUTHORIZATION", json);
608            }
609        }
610        if let Some(sig) = &self.x_viewer_signature {
611            req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
612        }
613        if let Some(addr) = &self.x_viewer_address {
614            req = req.header("X-VIEWER-ADDRESS", addr.as_str());
615        }
616        if let Some(id) = &self.agent_instance_hierarchy {
617            req = req.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
618        }
619        if let Some(s) = &self.mcp_session_id {
620            req = req.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
621        }
622        let req = req.body(()).map_err(|e| {
623            super::HttpError::WsConnect(tungstenite::Error::Http(
624                tungstenite::http::Response::builder()
625                    .status(400)
626                    .body(Some(e.to_string().into_bytes()))
627                    .unwrap(),
628            ))
629        })?;
630
631        let (ws_stream, _resp) = tokio_tungstenite::connect_async(req).await?;
632        let (mut sink, rx_stream): (
633            _,
634            SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
635        ) = ws_stream.split();
636
637        // Send the body as the first text frame.
638        let body_frame = serde_json::to_string(&body)
639            .map_err(super::HttpError::NotifySerialize)?;
640        sink.send(tungstenite::Message::Text(body_frame.into()))
641            .await
642            .map_err(super::HttpError::NotifySend)?;
643
644        // Build the per-connection state shared with Notifier + demux.
645        let sink: super::notifier::SharedSink =
646            Arc::new(tokio::sync::Mutex::new(sink));
647        let pending: super::notifier::PendingNotifies =
648            Arc::new(dashmap::DashMap::new());
649
650        // mpsc the demux task pushes chunks (or terminal errors) into.
651        // Use futures::channel::mpsc so the rx side is `impl Stream`
652        // without pulling in tokio-stream.
653        let (chunk_tx, chunk_rx) = futures::channel::mpsc::unbounded::<
654            Result<Chunk, super::HttpError>,
655        >();
656
657        let demux_sink = sink.clone();
658        let demux_pending = pending.clone();
659        let handler = Arc::new(handler);
660        tokio::spawn(async move {
661            let mut rx_stream = rx_stream;
662            let mut chunk_tx = chunk_tx;
663            loop {
664                let msg = match rx_stream.next().await {
665                    Some(m) => m,
666                    None => break,
667                };
668                let text = match msg {
669                    Ok(tungstenite::Message::Text(t)) => {
670                        let s = t.to_string();
671                        s
672                    }
673                    Ok(tungstenite::Message::Binary(_)) => {
674                        continue;
675                    }
676                    Ok(
677                        tungstenite::Message::Ping(_)
678                        | tungstenite::Message::Pong(_),
679                    ) => continue,
680                    Ok(tungstenite::Message::Close(_)) => {
681                        break;
682                    }
683                    Ok(tungstenite::Message::Frame(_)) => continue,
684                    Err(_) => {
685                        break;
686                    }
687                };
688
689                // Classification: try client_response, then
690                // server_request, then chunk. Order matters — chunks
691                // tend to have many fields; the envelopes have a
692                // distinctive `id` + tagged `type`.
693                if let Ok(response) =
694                    serde_json::from_str::<ClientResponse>(&text)
695                {
696                    let id = response.id().to_string();
697                    if let Some((_, tx)) = demux_pending.remove(&id) {
698                        let _ = tx.send(response);
699                    }
700                    continue;
701                }
702                if let Ok(request) =
703                    serde_json::from_str::<ServerRequest>(&text)
704                {
705                    let id = request.id.clone();
706                    let handler = handler.clone();
707                    let demux_sink = demux_sink.clone();
708                    tokio::spawn(async move {
709                        let id = id;
710                        // Handler returns the full response (incl.
711                        // matching id); we just frame + write it.
712                        let response = handler.handle(request).await;
713                        let frame = match serde_json::to_string(&response) {
714                            Ok(s) => s,
715                            Err(_) => {
716                                return;
717                            }
718                        };
719                        let mut guard = demux_sink.lock().await;
720                        let send_result = guard
721                            .send(tungstenite::Message::Text(frame.into()))
722                            .await;
723                    });
724                    continue;
725                }
726
727                // Chunk path.
728                let mut de = serde_json::Deserializer::from_str(&text);
729                match serde_path_to_error::deserialize::<_, Chunk>(&mut de) {
730                    Ok(chunk) => {
731                        if chunk_tx.unbounded_send(Ok(chunk)).is_err() {
732                            break;
733                        }
734                    }
735                    Err(e) => {
736                        // Try to parse as a ResponseError before
737                        // surfacing the deserialization failure.
738                        let err = match serde_json::from_str::<
739                            error::ResponseError,
740                        >(&text)
741                        {
742                            Ok(api_err) => super::HttpError::ApiError(api_err),
743                            Err(_) => super::HttpError::DeserializationError(e),
744                        };
745                        let _ = chunk_tx.unbounded_send(Err(err));
746                        break;
747                    }
748                }
749            }
750            // Make sure any awaiting Notifier futures unblock when
751            // we exit — dropping the dashmap fires every oneshot
752            // Sender's drop, which causes the rx side to error.
753            drop(demux_pending);
754            drop(chunk_tx);
755        });
756
757        let notifier = super::Notifier::new(sink, pending);
758        Ok((chunk_rx, notifier))
759    }
760}