Skip to main content

objectiveai_sdk/http/
client.rs

1//! HTTP client implementation for ObjectiveAI API.
2
3use crate::error;
4use eventsource_stream::Event as MessageEvent;
5use futures::{SinkExt, Stream, StreamExt};
6use reqwest_eventsource::{Event, RequestBuilderExt};
7use std::sync::Arc;
8use tokio_tungstenite::tungstenite;
9
10/// HTTP client for making requests to the ObjectiveAI API.
11///
12/// Handles authentication, request building, and response parsing for both
13/// unary and streaming endpoints.
14///
15/// # Example
16///
17/// ```ignore
18/// let client = HttpClient::new(
19///     reqwest::Client::new(),
20///     None, // Use default address
21///     Some("your-api-key"),
22///     None, // user_agent
23///     None, // x_title
24///     None, // http_referer
25///     None, // x_github_authorization
26///     None, // x_openrouter_authorization
27///     None, // x_mcp_authorization
28///     None, // x_viewer_signature
29///     None, // x_viewer_address
30///     None, // x_commit_author_name
31///     None, // x_commit_author_email
32/// );
33/// ```
34#[derive(Debug, Clone)]
35pub struct HttpClient {
36    /// The underlying reqwest HTTP client.
37    pub http_client: reqwest::Client,
38    /// Base URL for API requests. Defaults to `https://api.objectiveai.dev`.
39    pub address: String,
40    /// API key for authentication. Sent as `Bearer` token in `Authorization` header.
41    pub authorization: Option<Arc<String>>,
42    /// Value for the `User-Agent` header.
43    pub user_agent: Option<String>,
44    /// Value for the `X-Title` header.
45    pub x_title: Option<String>,
46    /// Value for both `Referer` and `HTTP-Referer` headers.
47    pub http_referer: Option<String>,
48    /// Value for the `X-GITHUB-AUTHORIZATION` header.
49    pub x_github_authorization: Option<Arc<String>>,
50    /// Value for the `X-OPENROUTER-AUTHORIZATION` header.
51    pub x_openrouter_authorization: Option<Arc<String>>,
52    /// Values for the `X-MCP-AUTHORIZATION` header (JSON-encoded).
53    pub x_mcp_authorization:
54        Option<Arc<std::collections::HashMap<String, String>>>,
55    /// Value for the `X-VIEWER-SIGNATURE` header.
56    pub x_viewer_signature: Option<Arc<String>>,
57    /// Value for the `X-VIEWER-ADDRESS` header.
58    pub x_viewer_address: Option<Arc<String>>,
59    /// Value for the `X-COMMIT-AUTHOR-NAME` header.
60    pub x_commit_author_name: Option<Arc<String>>,
61    /// Value for the `X-COMMIT-AUTHOR-EMAIL` header.
62    pub x_commit_author_email: Option<Arc<String>>,
63    /// Value for the `X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY` header.
64    pub agent_instance_hierarchy: Option<Arc<String>>,
65    /// Value for the `Mcp-Session-Id` header — propagated through to
66    /// `objectiveai-mcp` so server-side tool invocations see a stable
67    /// per-session id. See `objectiveai_sdk::mcp::MCP_SESSION_ID_HEADER`.
68    pub mcp_session_id: Option<Arc<String>>,
69}
70
71impl HttpClient {
72    /// Creates a new HTTP client.
73    ///
74    /// # Arguments
75    ///
76    /// * `http_client` - The reqwest client to use for requests
77    /// * `address` - Base URL for API requests (defaults to `https://api.objectiveai.dev`)
78    /// * `authorization` - API key for authentication
79    /// * `user_agent` - Optional User-Agent header value
80    /// * `x_title` - Optional X-Title header value
81    /// * `http_referer` - Optional Referer header value
82    /// * `x_github_authorization` - Optional X-GITHUB-AUTHORIZATION header value
83    /// * `x_openrouter_authorization` - Optional X-OPENROUTER-AUTHORIZATION header value
84    /// * `x_mcp_authorization` - Optional X-MCP-AUTHORIZATION header value (HashMap)
85    /// * `x_viewer_signature` - Optional X-VIEWER-SIGNATURE header value
86    /// * `x_viewer_address` - Optional X-VIEWER-ADDRESS header value
87    /// * `x_commit_author_name` - Optional X-COMMIT-AUTHOR-NAME header value
88    /// * `x_commit_author_email` - Optional X-COMMIT-AUTHOR-EMAIL header value
89    /// * `agent_instance_hierarchy` - Optional X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY header value
90    /// * `mcp_session_id` - Optional Mcp-Session-Id header value
91    pub fn new(
92        http_client: reqwest::Client,
93        address: Option<impl Into<String>>,
94        authorization: Option<impl Into<String>>,
95        user_agent: Option<impl Into<String>>,
96        x_title: Option<impl Into<String>>,
97        http_referer: Option<impl Into<String>>,
98        x_github_authorization: Option<impl Into<String>>,
99        x_openrouter_authorization: Option<impl Into<String>>,
100        x_mcp_authorization: Option<std::collections::HashMap<String, String>>,
101        x_viewer_signature: Option<impl Into<String>>,
102        x_viewer_address: Option<impl Into<String>>,
103        x_commit_author_name: Option<impl Into<String>>,
104        x_commit_author_email: Option<impl Into<String>>,
105        agent_instance_hierarchy: Option<impl Into<String>>,
106        mcp_session_id: Option<impl Into<String>>,
107    ) -> Self {
108        #[cfg(feature = "env")]
109        let env = |name: &str| -> Option<String> { std::env::var(name).ok() };
110
111        Self {
112            http_client,
113            address: match address {
114                Some(base) => base.into(),
115                #[cfg(feature = "env")]
116                None => env("OBJECTIVEAI_ADDRESS").unwrap_or_else(|| {
117                    "https://api.objectiveai.dev".to_string()
118                }),
119                #[cfg(not(feature = "env"))]
120                None => "https://api.objectiveai.dev".to_string(),
121            },
122            authorization: authorization.map(|k| Arc::new(k.into())).or_else(
123                || {
124                    #[cfg(feature = "env")]
125                    {
126                        env("OBJECTIVEAI_AUTHORIZATION").map(Arc::new)
127                    }
128                    #[cfg(not(feature = "env"))]
129                    {
130                        None
131                    }
132                },
133            ),
134            user_agent: user_agent.map(Into::into).or_else(|| {
135                #[cfg(feature = "env")]
136                {
137                    env("USER_AGENT")
138                }
139                #[cfg(not(feature = "env"))]
140                {
141                    None
142                }
143            }),
144            x_title: x_title.map(Into::into).or_else(|| {
145                #[cfg(feature = "env")]
146                {
147                    env("X_TITLE")
148                }
149                #[cfg(not(feature = "env"))]
150                {
151                    None
152                }
153            }),
154            http_referer: http_referer.map(Into::into).or_else(|| {
155                #[cfg(feature = "env")]
156                {
157                    env("HTTP_REFERER")
158                }
159                #[cfg(not(feature = "env"))]
160                {
161                    None
162                }
163            }),
164            x_github_authorization: x_github_authorization
165                .map(|v| Arc::new(v.into()))
166                .or_else(|| {
167                    #[cfg(feature = "env")]
168                    {
169                        env("GITHUB_AUTHORIZATION").map(Arc::new)
170                    }
171                    #[cfg(not(feature = "env"))]
172                    {
173                        None
174                    }
175                }),
176            x_openrouter_authorization: x_openrouter_authorization
177                .map(|v| Arc::new(v.into()))
178                .or_else(|| {
179                    #[cfg(feature = "env")]
180                    {
181                        env("OPENROUTER_AUTHORIZATION").map(Arc::new)
182                    }
183                    #[cfg(not(feature = "env"))]
184                    {
185                        None
186                    }
187                }),
188            x_mcp_authorization: x_mcp_authorization.map(Arc::new).or_else(
189                || {
190                    #[cfg(feature = "env")]
191                    {
192                        env("MCP_AUTHORIZATION")
193                            .and_then(|v| serde_json::from_str(&v).ok())
194                            .map(Arc::new)
195                    }
196                    #[cfg(not(feature = "env"))]
197                    {
198                        None
199                    }
200                },
201            ),
202            x_viewer_signature: x_viewer_signature
203                .map(|v| Arc::new(v.into()))
204                .or_else(|| {
205                    #[cfg(feature = "env")]
206                    {
207                        env("VIEWER_SIGNATURE").map(Arc::new)
208                    }
209                    #[cfg(not(feature = "env"))]
210                    {
211                        None
212                    }
213                }),
214            x_viewer_address: x_viewer_address
215                .map(|v| Arc::new(v.into()))
216                .or_else(|| {
217                    #[cfg(feature = "env")]
218                    {
219                        env("VIEWER_ADDRESS").map(Arc::new)
220                    }
221                    #[cfg(not(feature = "env"))]
222                    {
223                        None
224                    }
225                }),
226            x_commit_author_name: x_commit_author_name
227                .map(|v| Arc::new(v.into()))
228                .or_else(|| {
229                    #[cfg(feature = "env")]
230                    {
231                        env("COMMIT_AUTHOR_NAME").map(Arc::new)
232                    }
233                    #[cfg(not(feature = "env"))]
234                    {
235                        None
236                    }
237                }),
238            x_commit_author_email: x_commit_author_email
239                .map(|v| Arc::new(v.into()))
240                .or_else(|| {
241                    #[cfg(feature = "env")]
242                    {
243                        env("COMMIT_AUTHOR_EMAIL").map(Arc::new)
244                    }
245                    #[cfg(not(feature = "env"))]
246                    {
247                        None
248                    }
249                }),
250            agent_instance_hierarchy: agent_instance_hierarchy.map(|v| Arc::new(v.into())).or_else(|| {
251                #[cfg(feature = "env")]
252                {
253                    env("OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY").map(Arc::new)
254                }
255                #[cfg(not(feature = "env"))]
256                {
257                    None
258                }
259            }),
260            mcp_session_id: mcp_session_id.map(|v| Arc::new(v.into())).or_else(
261                || {
262                    #[cfg(feature = "env")]
263                    {
264                        env(crate::mcp::MCP_SESSION_ID_ENV).map(Arc::new)
265                    }
266                    #[cfg(not(feature = "env"))]
267                    {
268                        None
269                    }
270                },
271            ),
272        }
273    }
274
275    /// Builds a request with authentication and custom headers.
276    fn request(
277        &self,
278        method: reqwest::Method,
279        path: &str,
280        body: Option<impl serde::Serialize>,
281    ) -> reqwest::RequestBuilder {
282        let url = format!(
283            "{}/{}",
284            self.address.trim_end_matches('/'),
285            path.trim_start_matches('/')
286        );
287        let mut request = self.http_client.request(method, &url);
288        if let Some(authorization) = &self.authorization {
289            let key = authorization
290                .strip_prefix("Bearer ")
291                .unwrap_or(authorization);
292            request =
293                request.header("authorization", format!("Bearer {}", key));
294        }
295        if let Some(user_agent) = &self.user_agent {
296            request = request.header("user-agent", user_agent);
297        }
298        if let Some(x_title) = &self.x_title {
299            request = request.header("x-title", x_title);
300        }
301        if let Some(http_referer) = &self.http_referer {
302            request = request.header("referer", http_referer);
303            request = request.header("http-referer", http_referer);
304        }
305        if let Some(token) = &self.x_github_authorization {
306            request = request.header("X-GITHUB-AUTHORIZATION", token.as_str());
307        }
308        if let Some(token) = &self.x_openrouter_authorization {
309            request =
310                request.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
311        }
312        if let Some(headers) = &self.x_mcp_authorization {
313            if let Ok(json) = serde_json::to_string(headers.as_ref()) {
314                request = request.header("X-MCP-AUTHORIZATION", json);
315            }
316        }
317        if let Some(sig) = &self.x_viewer_signature {
318            request = request.header("X-VIEWER-SIGNATURE", sig.as_str());
319        }
320        if let Some(addr) = &self.x_viewer_address {
321            request = request.header("X-VIEWER-ADDRESS", addr.as_str());
322        }
323        if let Some(name) = &self.x_commit_author_name {
324            request = request.header("X-COMMIT-AUTHOR-NAME", name.as_str());
325        }
326        if let Some(email) = &self.x_commit_author_email {
327            request = request.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
328        }
329        if let Some(id) = &self.agent_instance_hierarchy {
330            request = request.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
331        }
332        if let Some(s) = &self.mcp_session_id {
333            request =
334                request.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
335        }
336        if let Some(body) = body {
337            request = request.json(&body);
338        }
339        request
340    }
341
342    /// Sends a unary (request-response) API call and deserializes the response.
343    ///
344    /// # Type Parameters
345    ///
346    /// * `T` - The expected response type to deserialize into
347    ///
348    /// # Errors
349    ///
350    /// Returns [`super::HttpError`] if the request fails, returns a non-success status,
351    /// or the response cannot be deserialized.
352    pub async fn send_unary<T: serde::de::DeserializeOwned + Send + 'static>(
353        &self,
354        method: reqwest::Method,
355        path: impl AsRef<str>,
356        body: Option<impl serde::Serialize>,
357    ) -> Result<T, super::HttpError> {
358        let response = self
359            .http_client
360            .execute(
361                self.request(method, path.as_ref(), body)
362                    .build()
363                    .map_err(super::HttpError::RequestError)?,
364            )
365            .await
366            .map_err(super::HttpError::HttpError)?;
367        let code = response.status();
368        if code.is_success() {
369            let text =
370                response.text().await.map_err(super::HttpError::HttpError)?;
371            let mut de = serde_json::Deserializer::from_str(&text);
372            match serde_path_to_error::deserialize::<_, T>(&mut de) {
373                Ok(value) => Ok(value),
374                Err(e) => Err(super::HttpError::DeserializationError(e)),
375            }
376        } else {
377            match response.text().await {
378                Ok(text) => Err(super::HttpError::BadStatus {
379                    code,
380                    body: match serde_json::from_str::<serde_json::Value>(&text)
381                    {
382                        Ok(body) => body,
383                        Err(_) => serde_json::Value::String(text),
384                    },
385                }),
386                Err(_) => Err(super::HttpError::BadStatus {
387                    code,
388                    body: serde_json::Value::Null,
389                }),
390            }
391        }
392    }
393
394    /// Sends a unary API call that expects no response body.
395    ///
396    /// Useful for DELETE or other operations that only return a status code.
397    ///
398    /// # Errors
399    ///
400    /// Returns [`super::HttpError`] if the request fails or returns a non-success status.
401    pub async fn send_unary_no_response(
402        &self,
403        method: reqwest::Method,
404        path: impl AsRef<str>,
405        body: Option<impl serde::Serialize>,
406    ) -> Result<(), super::HttpError> {
407        let response = self
408            .http_client
409            .execute(
410                self.request(method, path.as_ref(), body)
411                    .build()
412                    .map_err(super::HttpError::RequestError)?,
413            )
414            .await
415            .map_err(super::HttpError::HttpError)?;
416        let code = response.status();
417        if code.is_success() {
418            Ok(())
419        } else {
420            match response.text().await {
421                Ok(text) => Err(super::HttpError::BadStatus {
422                    code,
423                    body: match serde_json::from_str::<serde_json::Value>(&text)
424                    {
425                        Ok(body) => body,
426                        Err(_) => serde_json::Value::String(text),
427                    },
428                }),
429                Err(_) => Err(super::HttpError::BadStatus {
430                    code,
431                    body: serde_json::Value::Null,
432                }),
433            }
434        }
435    }
436
437    /// Sends a streaming API call using Server-Sent Events (SSE).
438    ///
439    /// Returns a stream of deserialized chunks. The stream automatically handles:
440    /// - SSE `[DONE]` messages (filtered out)
441    /// - Comment lines starting with `:` (filtered out)
442    /// - Empty data lines (filtered out)
443    /// - API errors embedded in stream data
444    ///
445    /// # Type Parameters
446    ///
447    /// * `T` - The expected chunk type to deserialize each SSE message into
448    ///
449    /// # Errors
450    ///
451    /// Returns [`super::HttpError`] if the stream cannot be established.
452    pub async fn send_streaming<
453        T: serde::de::DeserializeOwned + Send + 'static,
454        P: AsRef<str> + Send,
455        B: serde::Serialize + Send,
456    >(
457        &self,
458        method: reqwest::Method,
459        path: P,
460        body: Option<B>,
461    ) -> Result<
462        impl Stream<Item = Result<T, super::HttpError>>
463        + Send
464        + 'static
465        + use<T, P, B>,
466        super::HttpError,
467    > {
468        // Stop the stream at [DONE] to prevent reqwest_eventsource from
469        // auto-reconnecting. Uses take_while on the raw SSE events, then
470        // maps/filters the remaining events into typed chunks.
471        // Stamps X-Transport: sse so the API's transport dispatcher
472        // routes this to the SSE branch (the API default is WS).
473        Ok(
474            self.request(method, path.as_ref(), body)
475                .header("X-Transport", "sse")
476                .eventsource()?
477                .take_while(|result| {
478                    let dominated = matches!(
479                        result,
480                        Ok(Event::Message(MessageEvent { data, .. })) if data == "[DONE]"
481                    );
482                    async move { !dominated }
483                })
484                .then(|result| async {
485                    match result {
486                        Ok(Event::Open) => None,
487                        Ok(Event::Message(MessageEvent { data, .. }))
488                            if data.starts_with(":")
489                                || data.is_empty() =>
490                        {
491                            None
492                        }
493                        Ok(Event::Message(MessageEvent { data, .. })) => {
494                            let mut de =
495                                serde_json::Deserializer::from_str(&data);
496                            Some(
497                                match serde_path_to_error::deserialize::<_, T>(
498                                    &mut de,
499                                ) {
500                                    Ok(value) => Ok(value),
501                                    Err(e) => match serde_json::from_str::<error::ResponseError>(&data) {
502                                        Ok(err) => Err(super::HttpError::ApiError(err)),
503                                        Err(_) => Err(super::HttpError::DeserializationError(e)),
504                                    },
505                                }
506                            )
507                        }
508                        Err(reqwest_eventsource::Error::InvalidStatusCode(
509                            code,
510                            response,
511                        )) => match response.text().await {
512                            Ok(body) => {
513                                Some(Err(super::HttpError::BadStatus {
514                                    code,
515                                    body: match serde_json::from_str::<
516                                        serde_json::Value,
517                                    >(
518                                        &body
519                                    ) {
520                                        Ok(body) => body,
521                                        Err(_) => {
522                                            serde_json::Value::String(body)
523                                        }
524                                    },
525                                }))
526                            }
527                            Err(_) => Some(Err(super::HttpError::BadStatus {
528                                code,
529                                body: serde_json::Value::Null,
530                            })),
531                        },
532                        Err(e) => Some(Err(super::HttpError::StreamError(e))),
533                    }
534                })
535                .filter_map(|x| async { x }),
536        )
537    }
538
539    /// WebSocket variant of [`Self::send_streaming`]. Opens a WS to
540    /// the configured `address`, sends `body` as the first text
541    /// frame, then demultiplexes inbound frames into:
542    ///
543    /// - Chunk frames (yielded on the returned [`Stream`]).
544    /// - [`client_response::Response`](crate::client_objectiveai_mcp::client_response::Response)
545    ///   frames (routed to the [`super::Notifier`]'s pending-id map).
546    /// - [`server_request::Request`](crate::client_objectiveai_mcp::server_request::Request)
547    ///   frames (dispatched to `handler`; the result is written back
548    ///   as a `server_response::Response` echoing the request id).
549    ///
550    /// Both the returned `Stream` and the returned [`super::Notifier`]
551    /// share the underlying WebSocket: dropping both stops the demux
552    /// task and closes the connection cleanly. Dropping only one
553    /// keeps the WS alive — useful when a caller wants to send
554    /// notifies after the chunk stream has finished, or vice-versa.
555    #[cfg(feature = "mcp")]
556    pub async fn send_streaming_ws<Chunk, B, H, P>(
557        &self,
558        method: reqwest::Method,
559        path: P,
560        body: B,
561        handler: H,
562    ) -> Result<
563        (
564            impl Stream<Item = Result<Chunk, super::HttpError>>
565            + Send
566            + Unpin
567            + 'static
568            + use<Chunk, B, H, P>,
569            super::Notifier,
570        ),
571        super::HttpError,
572    >
573    where
574        Chunk: serde::de::DeserializeOwned + Send + 'static,
575        B: serde::Serialize + Send + 'static,
576        H: super::McpHandler,
577        P: AsRef<str>,
578    {
579        use crate::client_objectiveai_mcp::{
580            client_response::Response as ClientResponse,
581            server_request::Request as ServerRequest,
582        };
583        use futures::stream::SplitStream;
584        use tokio::net::TcpStream;
585        use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
586
587        // Translate the configured `address` (http(s)://...) into a
588        // ws(s):// URL. Path is appended directly.
589        let url = format!(
590            "{}/{}",
591            self.address.trim_end_matches('/'),
592            path.as_ref().trim_start_matches('/')
593        );
594        let ws_url = if let Some(rest) = url.strip_prefix("https://") {
595            format!("wss://{rest}")
596        } else if let Some(rest) = url.strip_prefix("http://") {
597            format!("ws://{rest}")
598        } else {
599            url.clone()
600        };
601        let _ = method; // axum's WS route is `any(...)`, method is ignored on the wire.
602
603        // Build the upgrade request manually so we can apply the
604        // same auth + custom headers `request()` does for HTTP.
605        let mut req = tungstenite::handshake::client::Request::builder()
606            .method("GET")
607            .uri(&ws_url)
608            .header(
609                "Host",
610                reqwest::Url::parse(&url)
611                    .ok()
612                    .and_then(|u| u.host_str().map(str::to_owned))
613                    .unwrap_or_default(),
614            )
615            .header("Upgrade", "websocket")
616            .header("Connection", "Upgrade")
617            .header(
618                "Sec-WebSocket-Key",
619                tungstenite::handshake::client::generate_key(),
620            )
621            .header("Sec-WebSocket-Version", "13")
622            .header("X-Transport", "ws");
623        if let Some(authorization) = &self.authorization {
624            let key = authorization
625                .strip_prefix("Bearer ")
626                .unwrap_or(authorization.as_str());
627            req = req.header("authorization", format!("Bearer {}", key));
628        }
629        if let Some(ua) = &self.user_agent {
630            req = req.header("user-agent", ua);
631        }
632        if let Some(x_title) = &self.x_title {
633            req = req.header("x-title", x_title);
634        }
635        if let Some(http_referer) = &self.http_referer {
636            req = req.header("referer", http_referer);
637            req = req.header("http-referer", http_referer);
638        }
639        if let Some(token) = &self.x_github_authorization {
640            req = req.header("X-GITHUB-AUTHORIZATION", token.as_str());
641        }
642        if let Some(token) = &self.x_openrouter_authorization {
643            req = req.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
644        }
645        if let Some(headers) = &self.x_mcp_authorization {
646            if let Ok(json) = serde_json::to_string(headers.as_ref()) {
647                req = req.header("X-MCP-AUTHORIZATION", json);
648            }
649        }
650        if let Some(sig) = &self.x_viewer_signature {
651            req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
652        }
653        if let Some(addr) = &self.x_viewer_address {
654            req = req.header("X-VIEWER-ADDRESS", addr.as_str());
655        }
656        if let Some(name) = &self.x_commit_author_name {
657            req = req.header("X-COMMIT-AUTHOR-NAME", name.as_str());
658        }
659        if let Some(email) = &self.x_commit_author_email {
660            req = req.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
661        }
662        if let Some(id) = &self.agent_instance_hierarchy {
663            req = req.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
664        }
665        if let Some(s) = &self.mcp_session_id {
666            req = req.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
667        }
668        let req = req.body(()).map_err(|e| {
669            super::HttpError::WsConnect(tungstenite::Error::Http(
670                tungstenite::http::Response::builder()
671                    .status(400)
672                    .body(Some(e.to_string().into_bytes()))
673                    .unwrap(),
674            ))
675        })?;
676
677        let (ws_stream, _resp) = tokio_tungstenite::connect_async(req).await?;
678        let (mut sink, rx_stream): (
679            _,
680            SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
681        ) = ws_stream.split();
682
683        // Send the body as the first text frame.
684        let body_frame = serde_json::to_string(&body)
685            .map_err(super::HttpError::NotifySerialize)?;
686        sink.send(tungstenite::Message::Text(body_frame.into()))
687            .await
688            .map_err(super::HttpError::NotifySend)?;
689
690        // Build the per-connection state shared with Notifier + demux.
691        let sink: super::notifier::SharedSink =
692            Arc::new(tokio::sync::Mutex::new(sink));
693        let pending: super::notifier::PendingNotifies =
694            Arc::new(dashmap::DashMap::new());
695
696        // mpsc the demux task pushes chunks (or terminal errors) into.
697        // Use futures::channel::mpsc so the rx side is `impl Stream`
698        // without pulling in tokio-stream.
699        let (chunk_tx, chunk_rx) = futures::channel::mpsc::unbounded::<
700            Result<Chunk, super::HttpError>,
701        >();
702
703        let demux_sink = sink.clone();
704        let demux_pending = pending.clone();
705        let handler = Arc::new(handler);
706        tokio::spawn(async move {
707            let mut rx_stream = rx_stream;
708            let mut chunk_tx = chunk_tx;
709            loop {
710                let msg = match rx_stream.next().await {
711                    Some(m) => m,
712                    None => break,
713                };
714                let text = match msg {
715                    Ok(tungstenite::Message::Text(t)) => {
716                        let s = t.to_string();
717                        s
718                    }
719                    Ok(tungstenite::Message::Binary(_)) => {
720                        continue;
721                    }
722                    Ok(
723                        tungstenite::Message::Ping(_)
724                        | tungstenite::Message::Pong(_),
725                    ) => continue,
726                    Ok(tungstenite::Message::Close(_)) => {
727                        break;
728                    }
729                    Ok(tungstenite::Message::Frame(_)) => continue,
730                    Err(_) => {
731                        break;
732                    }
733                };
734
735                // Classification: try client_response, then
736                // server_request, then chunk. Order matters — chunks
737                // tend to have many fields; the envelopes have a
738                // distinctive `id` + tagged `type`.
739                if let Ok(response) =
740                    serde_json::from_str::<ClientResponse>(&text)
741                {
742                    let id = response.id().to_string();
743                    if let Some((_, tx)) = demux_pending.remove(&id) {
744                        let _ = tx.send(response);
745                    }
746                    continue;
747                }
748                if let Ok(request) =
749                    serde_json::from_str::<ServerRequest>(&text)
750                {
751                    let id = request.id.clone();
752                    let handler = handler.clone();
753                    let demux_sink = demux_sink.clone();
754                    tokio::spawn(async move {
755                        let id = id;
756                        // Handler returns the full response (incl.
757                        // matching id); we just frame + write it.
758                        let response = handler.handle(request).await;
759                        let frame = match serde_json::to_string(&response) {
760                            Ok(s) => s,
761                            Err(_) => {
762                                return;
763                            }
764                        };
765                        let mut guard = demux_sink.lock().await;
766                        let send_result = guard
767                            .send(tungstenite::Message::Text(frame.into()))
768                            .await;
769                    });
770                    continue;
771                }
772
773                // Chunk path.
774                let mut de = serde_json::Deserializer::from_str(&text);
775                match serde_path_to_error::deserialize::<_, Chunk>(&mut de) {
776                    Ok(chunk) => {
777                        if chunk_tx.unbounded_send(Ok(chunk)).is_err() {
778                            break;
779                        }
780                    }
781                    Err(e) => {
782                        // Try to parse as a ResponseError before
783                        // surfacing the deserialization failure.
784                        let err = match serde_json::from_str::<
785                            error::ResponseError,
786                        >(&text)
787                        {
788                            Ok(api_err) => super::HttpError::ApiError(api_err),
789                            Err(_) => super::HttpError::DeserializationError(e),
790                        };
791                        let _ = chunk_tx.unbounded_send(Err(err));
792                        break;
793                    }
794                }
795            }
796            // Make sure any awaiting Notifier futures unblock when
797            // we exit — dropping the dashmap fires every oneshot
798            // Sender's drop, which causes the rx side to error.
799            drop(demux_pending);
800            drop(chunk_tx);
801        });
802
803        let notifier = super::Notifier::new(sink, pending);
804        Ok((chunk_rx, notifier))
805    }
806}