1use crate::error;
4use eventsource_stream::Event as MessageEvent;
5use futures::{SinkExt, Stream, StreamExt};
6use reqwest_eventsource::{Event, RequestBuilderExt};
7use std::sync::Arc;
8use tokio_tungstenite::tungstenite;
9
10#[derive(Debug, Clone)]
33pub struct HttpClient {
34 pub http_client: reqwest::Client,
36 pub address: String,
38 pub authorization: Option<Arc<String>>,
40 pub user_agent: Option<String>,
42 pub x_title: Option<String>,
44 pub http_referer: Option<String>,
46 pub x_github_authorization: Option<Arc<String>>,
48 pub x_openrouter_authorization: Option<Arc<String>>,
50 pub x_mcp_authorization:
52 Option<Arc<std::collections::HashMap<String, String>>>,
53 pub x_viewer_signature: Option<Arc<String>>,
55 pub x_viewer_address: Option<Arc<String>>,
57 pub agent_instance_hierarchy: Option<Arc<String>>,
59 pub mcp_session_id: Option<Arc<String>>,
63}
64
65impl HttpClient {
66 pub fn new(
84 http_client: reqwest::Client,
85 address: Option<impl Into<String>>,
86 authorization: Option<impl Into<String>>,
87 user_agent: Option<impl Into<String>>,
88 x_title: Option<impl Into<String>>,
89 http_referer: Option<impl Into<String>>,
90 x_github_authorization: Option<impl Into<String>>,
91 x_openrouter_authorization: Option<impl Into<String>>,
92 x_mcp_authorization: Option<std::collections::HashMap<String, String>>,
93 x_viewer_signature: Option<impl Into<String>>,
94 x_viewer_address: Option<impl Into<String>>,
95 agent_instance_hierarchy: Option<impl Into<String>>,
96 mcp_session_id: Option<impl Into<String>>,
97 ) -> Self {
98 #[cfg(feature = "env")]
99 let env = |name: &str| -> Option<String> { std::env::var(name).ok() };
100
101 Self {
102 http_client,
103 address: match address {
104 Some(base) => base.into(),
105 #[cfg(feature = "env")]
106 None => env("OBJECTIVEAI_ADDRESS").unwrap_or_else(|| {
107 "https://api.objectiveai.dev".to_string()
108 }),
109 #[cfg(not(feature = "env"))]
110 None => "https://api.objectiveai.dev".to_string(),
111 },
112 authorization: authorization.map(|k| Arc::new(k.into())).or_else(
113 || {
114 #[cfg(feature = "env")]
115 {
116 env("OBJECTIVEAI_AUTHORIZATION").map(Arc::new)
117 }
118 #[cfg(not(feature = "env"))]
119 {
120 None
121 }
122 },
123 ),
124 user_agent: user_agent.map(Into::into).or_else(|| {
125 #[cfg(feature = "env")]
126 {
127 env("USER_AGENT")
128 }
129 #[cfg(not(feature = "env"))]
130 {
131 None
132 }
133 }),
134 x_title: x_title.map(Into::into).or_else(|| {
135 #[cfg(feature = "env")]
136 {
137 env("X_TITLE")
138 }
139 #[cfg(not(feature = "env"))]
140 {
141 None
142 }
143 }),
144 http_referer: http_referer.map(Into::into).or_else(|| {
145 #[cfg(feature = "env")]
146 {
147 env("HTTP_REFERER")
148 }
149 #[cfg(not(feature = "env"))]
150 {
151 None
152 }
153 }),
154 x_github_authorization: x_github_authorization
155 .map(|v| Arc::new(v.into()))
156 .or_else(|| {
157 #[cfg(feature = "env")]
158 {
159 env("GITHUB_AUTHORIZATION").map(Arc::new)
160 }
161 #[cfg(not(feature = "env"))]
162 {
163 None
164 }
165 }),
166 x_openrouter_authorization: x_openrouter_authorization
167 .map(|v| Arc::new(v.into()))
168 .or_else(|| {
169 #[cfg(feature = "env")]
170 {
171 env("OPENROUTER_AUTHORIZATION").map(Arc::new)
172 }
173 #[cfg(not(feature = "env"))]
174 {
175 None
176 }
177 }),
178 x_mcp_authorization: x_mcp_authorization.map(Arc::new).or_else(
179 || {
180 #[cfg(feature = "env")]
181 {
182 env("MCP_AUTHORIZATION")
183 .and_then(|v| serde_json::from_str(&v).ok())
184 .map(Arc::new)
185 }
186 #[cfg(not(feature = "env"))]
187 {
188 None
189 }
190 },
191 ),
192 x_viewer_signature: x_viewer_signature
193 .map(|v| Arc::new(v.into()))
194 .or_else(|| {
195 #[cfg(feature = "env")]
196 {
197 env("VIEWER_SIGNATURE").map(Arc::new)
198 }
199 #[cfg(not(feature = "env"))]
200 {
201 None
202 }
203 }),
204 x_viewer_address: x_viewer_address
205 .map(|v| Arc::new(v.into()))
206 .or_else(|| {
207 #[cfg(feature = "env")]
208 {
209 env("VIEWER_ADDRESS").map(Arc::new)
210 }
211 #[cfg(not(feature = "env"))]
212 {
213 None
214 }
215 }),
216 agent_instance_hierarchy: agent_instance_hierarchy.map(|v| Arc::new(v.into())).or_else(|| {
217 #[cfg(feature = "env")]
218 {
219 env("OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY").map(Arc::new)
220 }
221 #[cfg(not(feature = "env"))]
222 {
223 None
224 }
225 }),
226 mcp_session_id: mcp_session_id.map(|v| Arc::new(v.into())).or_else(
227 || {
228 #[cfg(feature = "env")]
229 {
230 env(crate::mcp::MCP_SESSION_ID_ENV).map(Arc::new)
231 }
232 #[cfg(not(feature = "env"))]
233 {
234 None
235 }
236 },
237 ),
238 }
239 }
240
241 fn request(
243 &self,
244 method: reqwest::Method,
245 path: &str,
246 body: Option<impl serde::Serialize>,
247 ) -> reqwest::RequestBuilder {
248 let url = format!(
249 "{}/{}",
250 self.address.trim_end_matches('/'),
251 path.trim_start_matches('/')
252 );
253 let mut request = self.http_client.request(method, &url);
254 if let Some(authorization) = &self.authorization {
255 let key = authorization
256 .strip_prefix("Bearer ")
257 .unwrap_or(authorization);
258 request =
259 request.header("authorization", format!("Bearer {}", key));
260 }
261 if let Some(user_agent) = &self.user_agent {
262 request = request.header("user-agent", user_agent);
263 }
264 if let Some(x_title) = &self.x_title {
265 request = request.header("x-title", x_title);
266 }
267 if let Some(http_referer) = &self.http_referer {
268 request = request.header("referer", http_referer);
269 request = request.header("http-referer", http_referer);
270 }
271 if let Some(token) = &self.x_github_authorization {
272 request = request.header("X-GITHUB-AUTHORIZATION", token.as_str());
273 }
274 if let Some(token) = &self.x_openrouter_authorization {
275 request =
276 request.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
277 }
278 if let Some(headers) = &self.x_mcp_authorization {
279 if let Ok(json) = serde_json::to_string(headers.as_ref()) {
280 request = request.header("X-MCP-AUTHORIZATION", json);
281 }
282 }
283 if let Some(sig) = &self.x_viewer_signature {
284 request = request.header("X-VIEWER-SIGNATURE", sig.as_str());
285 }
286 if let Some(addr) = &self.x_viewer_address {
287 request = request.header("X-VIEWER-ADDRESS", addr.as_str());
288 }
289 if let Some(id) = &self.agent_instance_hierarchy {
290 request = request.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
291 }
292 if let Some(s) = &self.mcp_session_id {
293 request =
294 request.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
295 }
296 if let Some(body) = body {
297 request = request.json(&body);
298 }
299 request
300 }
301
302 pub async fn send_unary<T: serde::de::DeserializeOwned + Send + 'static>(
313 &self,
314 method: reqwest::Method,
315 path: impl AsRef<str>,
316 body: Option<impl serde::Serialize>,
317 ) -> Result<T, super::HttpError> {
318 let response = self
319 .http_client
320 .execute(
321 self.request(method, path.as_ref(), body)
322 .build()
323 .map_err(super::HttpError::RequestError)?,
324 )
325 .await
326 .map_err(super::HttpError::HttpError)?;
327 let code = response.status();
328 if code.is_success() {
329 let text =
330 response.text().await.map_err(super::HttpError::HttpError)?;
331 let mut de = serde_json::Deserializer::from_str(&text);
332 match serde_path_to_error::deserialize::<_, T>(&mut de) {
333 Ok(value) => Ok(value),
334 Err(e) => Err(super::HttpError::DeserializationError(e)),
335 }
336 } else {
337 match response.text().await {
338 Ok(text) => Err(super::HttpError::BadStatus {
339 code,
340 body: match serde_json::from_str::<serde_json::Value>(&text)
341 {
342 Ok(body) => body,
343 Err(_) => serde_json::Value::String(text),
344 },
345 }),
346 Err(_) => Err(super::HttpError::BadStatus {
347 code,
348 body: serde_json::Value::Null,
349 }),
350 }
351 }
352 }
353
354 pub async fn send_unary_no_response(
362 &self,
363 method: reqwest::Method,
364 path: impl AsRef<str>,
365 body: Option<impl serde::Serialize>,
366 ) -> Result<(), super::HttpError> {
367 let response = self
368 .http_client
369 .execute(
370 self.request(method, path.as_ref(), body)
371 .build()
372 .map_err(super::HttpError::RequestError)?,
373 )
374 .await
375 .map_err(super::HttpError::HttpError)?;
376 let code = response.status();
377 if code.is_success() {
378 Ok(())
379 } else {
380 match response.text().await {
381 Ok(text) => Err(super::HttpError::BadStatus {
382 code,
383 body: match serde_json::from_str::<serde_json::Value>(&text)
384 {
385 Ok(body) => body,
386 Err(_) => serde_json::Value::String(text),
387 },
388 }),
389 Err(_) => Err(super::HttpError::BadStatus {
390 code,
391 body: serde_json::Value::Null,
392 }),
393 }
394 }
395 }
396
397 pub async fn send_streaming<
413 T: serde::de::DeserializeOwned + Send + 'static,
414 P: AsRef<str> + Send,
415 B: serde::Serialize + Send,
416 >(
417 &self,
418 method: reqwest::Method,
419 path: P,
420 body: Option<B>,
421 ) -> Result<
422 impl Stream<Item = Result<T, super::HttpError>>
423 + Send
424 + 'static
425 + use<T, P, B>,
426 super::HttpError,
427 > {
428 Ok(
434 self.request(method, path.as_ref(), body)
435 .header("X-Transport", "sse")
436 .eventsource()?
437 .take_while(|result| {
438 let dominated = matches!(
439 result,
440 Ok(Event::Message(MessageEvent { data, .. })) if data == "[DONE]"
441 );
442 async move { !dominated }
443 })
444 .then(|result| async {
445 match result {
446 Ok(Event::Open) => None,
447 Ok(Event::Message(MessageEvent { data, .. }))
448 if data.starts_with(":")
449 || data.is_empty() =>
450 {
451 None
452 }
453 Ok(Event::Message(MessageEvent { data, .. })) => {
454 let mut de =
455 serde_json::Deserializer::from_str(&data);
456 Some(
457 match serde_path_to_error::deserialize::<_, T>(
458 &mut de,
459 ) {
460 Ok(value) => Ok(value),
461 Err(e) => match serde_json::from_str::<error::ResponseError>(&data) {
462 Ok(err) => Err(super::HttpError::ApiError(err)),
463 Err(_) => Err(super::HttpError::DeserializationError(e)),
464 },
465 }
466 )
467 }
468 Err(reqwest_eventsource::Error::InvalidStatusCode(
469 code,
470 response,
471 )) => match response.text().await {
472 Ok(body) => {
473 Some(Err(super::HttpError::BadStatus {
474 code,
475 body: match serde_json::from_str::<
476 serde_json::Value,
477 >(
478 &body
479 ) {
480 Ok(body) => body,
481 Err(_) => {
482 serde_json::Value::String(body)
483 }
484 },
485 }))
486 }
487 Err(_) => Some(Err(super::HttpError::BadStatus {
488 code,
489 body: serde_json::Value::Null,
490 })),
491 },
492 Err(e) => Some(Err(super::HttpError::StreamError(e))),
493 }
494 })
495 .filter_map(|x| async { x }),
496 )
497 }
498
499 #[cfg(feature = "mcp")]
516 pub async fn send_streaming_ws<Chunk, B, H, P>(
517 &self,
518 method: reqwest::Method,
519 path: P,
520 body: B,
521 handler: H,
522 ) -> Result<
523 (
524 impl Stream<Item = Result<Chunk, super::HttpError>>
525 + Send
526 + Unpin
527 + 'static
528 + use<Chunk, B, H, P>,
529 super::Notifier,
530 ),
531 super::HttpError,
532 >
533 where
534 Chunk: serde::de::DeserializeOwned + Send + 'static,
535 B: serde::Serialize + Send + 'static,
536 H: super::McpHandler,
537 P: AsRef<str>,
538 {
539 use crate::client_objectiveai_mcp::{
540 client_response::Response as ClientResponse,
541 server_request::Request as ServerRequest,
542 };
543 use futures::stream::SplitStream;
544 use tokio::net::TcpStream;
545 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
546
547 let url = format!(
550 "{}/{}",
551 self.address.trim_end_matches('/'),
552 path.as_ref().trim_start_matches('/')
553 );
554 let ws_url = if let Some(rest) = url.strip_prefix("https://") {
555 format!("wss://{rest}")
556 } else if let Some(rest) = url.strip_prefix("http://") {
557 format!("ws://{rest}")
558 } else {
559 url.clone()
560 };
561 let _ = method; let mut req = tungstenite::handshake::client::Request::builder()
566 .method("GET")
567 .uri(&ws_url)
568 .header(
569 "Host",
570 reqwest::Url::parse(&url)
571 .ok()
572 .and_then(|u| u.host_str().map(str::to_owned))
573 .unwrap_or_default(),
574 )
575 .header("Upgrade", "websocket")
576 .header("Connection", "Upgrade")
577 .header(
578 "Sec-WebSocket-Key",
579 tungstenite::handshake::client::generate_key(),
580 )
581 .header("Sec-WebSocket-Version", "13")
582 .header("X-Transport", "ws");
583 if let Some(authorization) = &self.authorization {
584 let key = authorization
585 .strip_prefix("Bearer ")
586 .unwrap_or(authorization.as_str());
587 req = req.header("authorization", format!("Bearer {}", key));
588 }
589 if let Some(ua) = &self.user_agent {
590 req = req.header("user-agent", ua);
591 }
592 if let Some(x_title) = &self.x_title {
593 req = req.header("x-title", x_title);
594 }
595 if let Some(http_referer) = &self.http_referer {
596 req = req.header("referer", http_referer);
597 req = req.header("http-referer", http_referer);
598 }
599 if let Some(token) = &self.x_github_authorization {
600 req = req.header("X-GITHUB-AUTHORIZATION", token.as_str());
601 }
602 if let Some(token) = &self.x_openrouter_authorization {
603 req = req.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
604 }
605 if let Some(headers) = &self.x_mcp_authorization {
606 if let Ok(json) = serde_json::to_string(headers.as_ref()) {
607 req = req.header("X-MCP-AUTHORIZATION", json);
608 }
609 }
610 if let Some(sig) = &self.x_viewer_signature {
611 req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
612 }
613 if let Some(addr) = &self.x_viewer_address {
614 req = req.header("X-VIEWER-ADDRESS", addr.as_str());
615 }
616 if let Some(id) = &self.agent_instance_hierarchy {
617 req = req.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
618 }
619 if let Some(s) = &self.mcp_session_id {
620 req = req.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
621 }
622 let req = req.body(()).map_err(|e| {
623 super::HttpError::WsConnect(tungstenite::Error::Http(
624 tungstenite::http::Response::builder()
625 .status(400)
626 .body(Some(e.to_string().into_bytes()))
627 .unwrap(),
628 ))
629 })?;
630
631 let (ws_stream, _resp) = tokio_tungstenite::connect_async(req).await?;
632 let (mut sink, rx_stream): (
633 _,
634 SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
635 ) = ws_stream.split();
636
637 let body_frame = serde_json::to_string(&body)
639 .map_err(super::HttpError::NotifySerialize)?;
640 sink.send(tungstenite::Message::Text(body_frame.into()))
641 .await
642 .map_err(super::HttpError::NotifySend)?;
643
644 let sink: super::notifier::SharedSink =
646 Arc::new(tokio::sync::Mutex::new(sink));
647 let pending: super::notifier::PendingNotifies =
648 Arc::new(dashmap::DashMap::new());
649
650 let (chunk_tx, chunk_rx) = futures::channel::mpsc::unbounded::<
654 Result<Chunk, super::HttpError>,
655 >();
656
657 let demux_sink = sink.clone();
658 let demux_pending = pending.clone();
659 let handler = Arc::new(handler);
660 tokio::spawn(async move {
661 let mut rx_stream = rx_stream;
662 let mut chunk_tx = chunk_tx;
663 loop {
664 let msg = match rx_stream.next().await {
665 Some(m) => m,
666 None => break,
667 };
668 let text = match msg {
669 Ok(tungstenite::Message::Text(t)) => {
670 let s = t.to_string();
671 s
672 }
673 Ok(tungstenite::Message::Binary(_)) => {
674 continue;
675 }
676 Ok(
677 tungstenite::Message::Ping(_)
678 | tungstenite::Message::Pong(_),
679 ) => continue,
680 Ok(tungstenite::Message::Close(_)) => {
681 break;
682 }
683 Ok(tungstenite::Message::Frame(_)) => continue,
684 Err(_) => {
685 break;
686 }
687 };
688
689 if let Ok(response) =
694 serde_json::from_str::<ClientResponse>(&text)
695 {
696 let id = response.id().to_string();
697 if let Some((_, tx)) = demux_pending.remove(&id) {
698 let _ = tx.send(response);
699 }
700 continue;
701 }
702 if let Ok(request) =
703 serde_json::from_str::<ServerRequest>(&text)
704 {
705 let id = request.id.clone();
706 let handler = handler.clone();
707 let demux_sink = demux_sink.clone();
708 tokio::spawn(async move {
709 let id = id;
710 let response = handler.handle(request).await;
713 let frame = match serde_json::to_string(&response) {
714 Ok(s) => s,
715 Err(_) => {
716 return;
717 }
718 };
719 let mut guard = demux_sink.lock().await;
720 let send_result = guard
721 .send(tungstenite::Message::Text(frame.into()))
722 .await;
723 });
724 continue;
725 }
726
727 let mut de = serde_json::Deserializer::from_str(&text);
729 match serde_path_to_error::deserialize::<_, Chunk>(&mut de) {
730 Ok(chunk) => {
731 if chunk_tx.unbounded_send(Ok(chunk)).is_err() {
732 break;
733 }
734 }
735 Err(e) => {
736 let err = match serde_json::from_str::<
739 error::ResponseError,
740 >(&text)
741 {
742 Ok(api_err) => super::HttpError::ApiError(api_err),
743 Err(_) => super::HttpError::DeserializationError(e),
744 };
745 let _ = chunk_tx.unbounded_send(Err(err));
746 break;
747 }
748 }
749 }
750 drop(demux_pending);
754 drop(chunk_tx);
755 });
756
757 let notifier = super::Notifier::new(sink, pending);
758 Ok((chunk_rx, notifier))
759 }
760}