1use crate::error;
4use eventsource_stream::Event as MessageEvent;
5use futures::{SinkExt, Stream, StreamExt};
6use reqwest_eventsource::{Event, RequestBuilderExt};
7use std::sync::Arc;
8use tokio_tungstenite::tungstenite;
9
10#[derive(Debug, Clone)]
35pub struct HttpClient {
36 pub http_client: reqwest::Client,
38 pub address: String,
40 pub authorization: Option<Arc<String>>,
42 pub user_agent: Option<String>,
44 pub x_title: Option<String>,
46 pub http_referer: Option<String>,
48 pub x_github_authorization: Option<Arc<String>>,
50 pub x_openrouter_authorization: Option<Arc<String>>,
52 pub x_mcp_authorization:
54 Option<Arc<std::collections::HashMap<String, String>>>,
55 pub x_viewer_signature: Option<Arc<String>>,
57 pub x_viewer_address: Option<Arc<String>>,
59 pub x_commit_author_name: Option<Arc<String>>,
61 pub x_commit_author_email: Option<Arc<String>>,
63 pub agent_instance_hierarchy: Option<Arc<String>>,
65 pub mcp_session_id: Option<Arc<String>>,
69}
70
71impl HttpClient {
72 pub fn new(
92 http_client: reqwest::Client,
93 address: Option<impl Into<String>>,
94 authorization: Option<impl Into<String>>,
95 user_agent: Option<impl Into<String>>,
96 x_title: Option<impl Into<String>>,
97 http_referer: Option<impl Into<String>>,
98 x_github_authorization: Option<impl Into<String>>,
99 x_openrouter_authorization: Option<impl Into<String>>,
100 x_mcp_authorization: Option<std::collections::HashMap<String, String>>,
101 x_viewer_signature: Option<impl Into<String>>,
102 x_viewer_address: Option<impl Into<String>>,
103 x_commit_author_name: Option<impl Into<String>>,
104 x_commit_author_email: Option<impl Into<String>>,
105 agent_instance_hierarchy: Option<impl Into<String>>,
106 mcp_session_id: Option<impl Into<String>>,
107 ) -> Self {
108 #[cfg(feature = "env")]
109 let env = |name: &str| -> Option<String> { std::env::var(name).ok() };
110
111 Self {
112 http_client,
113 address: match address {
114 Some(base) => base.into(),
115 #[cfg(feature = "env")]
116 None => env("OBJECTIVEAI_ADDRESS").unwrap_or_else(|| {
117 "https://api.objectiveai.dev".to_string()
118 }),
119 #[cfg(not(feature = "env"))]
120 None => "https://api.objectiveai.dev".to_string(),
121 },
122 authorization: authorization.map(|k| Arc::new(k.into())).or_else(
123 || {
124 #[cfg(feature = "env")]
125 {
126 env("OBJECTIVEAI_AUTHORIZATION").map(Arc::new)
127 }
128 #[cfg(not(feature = "env"))]
129 {
130 None
131 }
132 },
133 ),
134 user_agent: user_agent.map(Into::into).or_else(|| {
135 #[cfg(feature = "env")]
136 {
137 env("USER_AGENT")
138 }
139 #[cfg(not(feature = "env"))]
140 {
141 None
142 }
143 }),
144 x_title: x_title.map(Into::into).or_else(|| {
145 #[cfg(feature = "env")]
146 {
147 env("X_TITLE")
148 }
149 #[cfg(not(feature = "env"))]
150 {
151 None
152 }
153 }),
154 http_referer: http_referer.map(Into::into).or_else(|| {
155 #[cfg(feature = "env")]
156 {
157 env("HTTP_REFERER")
158 }
159 #[cfg(not(feature = "env"))]
160 {
161 None
162 }
163 }),
164 x_github_authorization: x_github_authorization
165 .map(|v| Arc::new(v.into()))
166 .or_else(|| {
167 #[cfg(feature = "env")]
168 {
169 env("GITHUB_AUTHORIZATION").map(Arc::new)
170 }
171 #[cfg(not(feature = "env"))]
172 {
173 None
174 }
175 }),
176 x_openrouter_authorization: x_openrouter_authorization
177 .map(|v| Arc::new(v.into()))
178 .or_else(|| {
179 #[cfg(feature = "env")]
180 {
181 env("OPENROUTER_AUTHORIZATION").map(Arc::new)
182 }
183 #[cfg(not(feature = "env"))]
184 {
185 None
186 }
187 }),
188 x_mcp_authorization: x_mcp_authorization.map(Arc::new).or_else(
189 || {
190 #[cfg(feature = "env")]
191 {
192 env("MCP_AUTHORIZATION")
193 .and_then(|v| serde_json::from_str(&v).ok())
194 .map(Arc::new)
195 }
196 #[cfg(not(feature = "env"))]
197 {
198 None
199 }
200 },
201 ),
202 x_viewer_signature: x_viewer_signature
203 .map(|v| Arc::new(v.into()))
204 .or_else(|| {
205 #[cfg(feature = "env")]
206 {
207 env("VIEWER_SIGNATURE").map(Arc::new)
208 }
209 #[cfg(not(feature = "env"))]
210 {
211 None
212 }
213 }),
214 x_viewer_address: x_viewer_address
215 .map(|v| Arc::new(v.into()))
216 .or_else(|| {
217 #[cfg(feature = "env")]
218 {
219 env("VIEWER_ADDRESS").map(Arc::new)
220 }
221 #[cfg(not(feature = "env"))]
222 {
223 None
224 }
225 }),
226 x_commit_author_name: x_commit_author_name
227 .map(|v| Arc::new(v.into()))
228 .or_else(|| {
229 #[cfg(feature = "env")]
230 {
231 env("COMMIT_AUTHOR_NAME").map(Arc::new)
232 }
233 #[cfg(not(feature = "env"))]
234 {
235 None
236 }
237 }),
238 x_commit_author_email: x_commit_author_email
239 .map(|v| Arc::new(v.into()))
240 .or_else(|| {
241 #[cfg(feature = "env")]
242 {
243 env("COMMIT_AUTHOR_EMAIL").map(Arc::new)
244 }
245 #[cfg(not(feature = "env"))]
246 {
247 None
248 }
249 }),
250 agent_instance_hierarchy: agent_instance_hierarchy.map(|v| Arc::new(v.into())).or_else(|| {
251 #[cfg(feature = "env")]
252 {
253 env("OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY").map(Arc::new)
254 }
255 #[cfg(not(feature = "env"))]
256 {
257 None
258 }
259 }),
260 mcp_session_id: mcp_session_id.map(|v| Arc::new(v.into())).or_else(
261 || {
262 #[cfg(feature = "env")]
263 {
264 env(crate::mcp::MCP_SESSION_ID_ENV).map(Arc::new)
265 }
266 #[cfg(not(feature = "env"))]
267 {
268 None
269 }
270 },
271 ),
272 }
273 }
274
275 fn request(
277 &self,
278 method: reqwest::Method,
279 path: &str,
280 body: Option<impl serde::Serialize>,
281 ) -> reqwest::RequestBuilder {
282 let url = format!(
283 "{}/{}",
284 self.address.trim_end_matches('/'),
285 path.trim_start_matches('/')
286 );
287 let mut request = self.http_client.request(method, &url);
288 if let Some(authorization) = &self.authorization {
289 let key = authorization
290 .strip_prefix("Bearer ")
291 .unwrap_or(authorization);
292 request =
293 request.header("authorization", format!("Bearer {}", key));
294 }
295 if let Some(user_agent) = &self.user_agent {
296 request = request.header("user-agent", user_agent);
297 }
298 if let Some(x_title) = &self.x_title {
299 request = request.header("x-title", x_title);
300 }
301 if let Some(http_referer) = &self.http_referer {
302 request = request.header("referer", http_referer);
303 request = request.header("http-referer", http_referer);
304 }
305 if let Some(token) = &self.x_github_authorization {
306 request = request.header("X-GITHUB-AUTHORIZATION", token.as_str());
307 }
308 if let Some(token) = &self.x_openrouter_authorization {
309 request =
310 request.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
311 }
312 if let Some(headers) = &self.x_mcp_authorization {
313 if let Ok(json) = serde_json::to_string(headers.as_ref()) {
314 request = request.header("X-MCP-AUTHORIZATION", json);
315 }
316 }
317 if let Some(sig) = &self.x_viewer_signature {
318 request = request.header("X-VIEWER-SIGNATURE", sig.as_str());
319 }
320 if let Some(addr) = &self.x_viewer_address {
321 request = request.header("X-VIEWER-ADDRESS", addr.as_str());
322 }
323 if let Some(name) = &self.x_commit_author_name {
324 request = request.header("X-COMMIT-AUTHOR-NAME", name.as_str());
325 }
326 if let Some(email) = &self.x_commit_author_email {
327 request = request.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
328 }
329 if let Some(id) = &self.agent_instance_hierarchy {
330 request = request.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
331 }
332 if let Some(s) = &self.mcp_session_id {
333 request =
334 request.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
335 }
336 if let Some(body) = body {
337 request = request.json(&body);
338 }
339 request
340 }
341
342 pub async fn send_unary<T: serde::de::DeserializeOwned + Send + 'static>(
353 &self,
354 method: reqwest::Method,
355 path: impl AsRef<str>,
356 body: Option<impl serde::Serialize>,
357 ) -> Result<T, super::HttpError> {
358 let response = self
359 .http_client
360 .execute(
361 self.request(method, path.as_ref(), body)
362 .build()
363 .map_err(super::HttpError::RequestError)?,
364 )
365 .await
366 .map_err(super::HttpError::HttpError)?;
367 let code = response.status();
368 if code.is_success() {
369 let text =
370 response.text().await.map_err(super::HttpError::HttpError)?;
371 let mut de = serde_json::Deserializer::from_str(&text);
372 match serde_path_to_error::deserialize::<_, T>(&mut de) {
373 Ok(value) => Ok(value),
374 Err(e) => Err(super::HttpError::DeserializationError(e)),
375 }
376 } else {
377 match response.text().await {
378 Ok(text) => Err(super::HttpError::BadStatus {
379 code,
380 body: match serde_json::from_str::<serde_json::Value>(&text)
381 {
382 Ok(body) => body,
383 Err(_) => serde_json::Value::String(text),
384 },
385 }),
386 Err(_) => Err(super::HttpError::BadStatus {
387 code,
388 body: serde_json::Value::Null,
389 }),
390 }
391 }
392 }
393
394 pub async fn send_unary_no_response(
402 &self,
403 method: reqwest::Method,
404 path: impl AsRef<str>,
405 body: Option<impl serde::Serialize>,
406 ) -> Result<(), super::HttpError> {
407 let response = self
408 .http_client
409 .execute(
410 self.request(method, path.as_ref(), body)
411 .build()
412 .map_err(super::HttpError::RequestError)?,
413 )
414 .await
415 .map_err(super::HttpError::HttpError)?;
416 let code = response.status();
417 if code.is_success() {
418 Ok(())
419 } else {
420 match response.text().await {
421 Ok(text) => Err(super::HttpError::BadStatus {
422 code,
423 body: match serde_json::from_str::<serde_json::Value>(&text)
424 {
425 Ok(body) => body,
426 Err(_) => serde_json::Value::String(text),
427 },
428 }),
429 Err(_) => Err(super::HttpError::BadStatus {
430 code,
431 body: serde_json::Value::Null,
432 }),
433 }
434 }
435 }
436
437 pub async fn send_streaming<
453 T: serde::de::DeserializeOwned + Send + 'static,
454 P: AsRef<str> + Send,
455 B: serde::Serialize + Send,
456 >(
457 &self,
458 method: reqwest::Method,
459 path: P,
460 body: Option<B>,
461 ) -> Result<
462 impl Stream<Item = Result<T, super::HttpError>>
463 + Send
464 + 'static
465 + use<T, P, B>,
466 super::HttpError,
467 > {
468 Ok(
474 self.request(method, path.as_ref(), body)
475 .header("X-Transport", "sse")
476 .eventsource()?
477 .take_while(|result| {
478 let dominated = matches!(
479 result,
480 Ok(Event::Message(MessageEvent { data, .. })) if data == "[DONE]"
481 );
482 async move { !dominated }
483 })
484 .then(|result| async {
485 match result {
486 Ok(Event::Open) => None,
487 Ok(Event::Message(MessageEvent { data, .. }))
488 if data.starts_with(":")
489 || data.is_empty() =>
490 {
491 None
492 }
493 Ok(Event::Message(MessageEvent { data, .. })) => {
494 let mut de =
495 serde_json::Deserializer::from_str(&data);
496 Some(
497 match serde_path_to_error::deserialize::<_, T>(
498 &mut de,
499 ) {
500 Ok(value) => Ok(value),
501 Err(e) => match serde_json::from_str::<error::ResponseError>(&data) {
502 Ok(err) => Err(super::HttpError::ApiError(err)),
503 Err(_) => Err(super::HttpError::DeserializationError(e)),
504 },
505 }
506 )
507 }
508 Err(reqwest_eventsource::Error::InvalidStatusCode(
509 code,
510 response,
511 )) => match response.text().await {
512 Ok(body) => {
513 Some(Err(super::HttpError::BadStatus {
514 code,
515 body: match serde_json::from_str::<
516 serde_json::Value,
517 >(
518 &body
519 ) {
520 Ok(body) => body,
521 Err(_) => {
522 serde_json::Value::String(body)
523 }
524 },
525 }))
526 }
527 Err(_) => Some(Err(super::HttpError::BadStatus {
528 code,
529 body: serde_json::Value::Null,
530 })),
531 },
532 Err(e) => Some(Err(super::HttpError::StreamError(e))),
533 }
534 })
535 .filter_map(|x| async { x }),
536 )
537 }
538
539 #[cfg(feature = "mcp")]
556 pub async fn send_streaming_ws<Chunk, B, H, P>(
557 &self,
558 method: reqwest::Method,
559 path: P,
560 body: B,
561 handler: H,
562 ) -> Result<
563 (
564 impl Stream<Item = Result<Chunk, super::HttpError>>
565 + Send
566 + Unpin
567 + 'static
568 + use<Chunk, B, H, P>,
569 super::Notifier,
570 ),
571 super::HttpError,
572 >
573 where
574 Chunk: serde::de::DeserializeOwned + Send + 'static,
575 B: serde::Serialize + Send + 'static,
576 H: super::McpHandler,
577 P: AsRef<str>,
578 {
579 use crate::client_objectiveai_mcp::{
580 client_response::Response as ClientResponse,
581 server_request::Request as ServerRequest,
582 };
583 use futures::stream::SplitStream;
584 use tokio::net::TcpStream;
585 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
586
587 let url = format!(
590 "{}/{}",
591 self.address.trim_end_matches('/'),
592 path.as_ref().trim_start_matches('/')
593 );
594 let ws_url = if let Some(rest) = url.strip_prefix("https://") {
595 format!("wss://{rest}")
596 } else if let Some(rest) = url.strip_prefix("http://") {
597 format!("ws://{rest}")
598 } else {
599 url.clone()
600 };
601 let _ = method; let mut req = tungstenite::handshake::client::Request::builder()
606 .method("GET")
607 .uri(&ws_url)
608 .header(
609 "Host",
610 reqwest::Url::parse(&url)
611 .ok()
612 .and_then(|u| u.host_str().map(str::to_owned))
613 .unwrap_or_default(),
614 )
615 .header("Upgrade", "websocket")
616 .header("Connection", "Upgrade")
617 .header(
618 "Sec-WebSocket-Key",
619 tungstenite::handshake::client::generate_key(),
620 )
621 .header("Sec-WebSocket-Version", "13")
622 .header("X-Transport", "ws");
623 if let Some(authorization) = &self.authorization {
624 let key = authorization
625 .strip_prefix("Bearer ")
626 .unwrap_or(authorization.as_str());
627 req = req.header("authorization", format!("Bearer {}", key));
628 }
629 if let Some(ua) = &self.user_agent {
630 req = req.header("user-agent", ua);
631 }
632 if let Some(x_title) = &self.x_title {
633 req = req.header("x-title", x_title);
634 }
635 if let Some(http_referer) = &self.http_referer {
636 req = req.header("referer", http_referer);
637 req = req.header("http-referer", http_referer);
638 }
639 if let Some(token) = &self.x_github_authorization {
640 req = req.header("X-GITHUB-AUTHORIZATION", token.as_str());
641 }
642 if let Some(token) = &self.x_openrouter_authorization {
643 req = req.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
644 }
645 if let Some(headers) = &self.x_mcp_authorization {
646 if let Ok(json) = serde_json::to_string(headers.as_ref()) {
647 req = req.header("X-MCP-AUTHORIZATION", json);
648 }
649 }
650 if let Some(sig) = &self.x_viewer_signature {
651 req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
652 }
653 if let Some(addr) = &self.x_viewer_address {
654 req = req.header("X-VIEWER-ADDRESS", addr.as_str());
655 }
656 if let Some(name) = &self.x_commit_author_name {
657 req = req.header("X-COMMIT-AUTHOR-NAME", name.as_str());
658 }
659 if let Some(email) = &self.x_commit_author_email {
660 req = req.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
661 }
662 if let Some(id) = &self.agent_instance_hierarchy {
663 req = req.header("X-OBJECTIVEAI-AGENT-INSTANCE-HIERARCHY", id.as_str());
664 }
665 if let Some(s) = &self.mcp_session_id {
666 req = req.header(crate::mcp::MCP_SESSION_ID_HEADER, s.as_str());
667 }
668 let req = req.body(()).map_err(|e| {
669 super::HttpError::WsConnect(tungstenite::Error::Http(
670 tungstenite::http::Response::builder()
671 .status(400)
672 .body(Some(e.to_string().into_bytes()))
673 .unwrap(),
674 ))
675 })?;
676
677 let (ws_stream, _resp) = tokio_tungstenite::connect_async(req).await?;
678 let (mut sink, rx_stream): (
679 _,
680 SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
681 ) = ws_stream.split();
682
683 let body_frame = serde_json::to_string(&body)
685 .map_err(super::HttpError::NotifySerialize)?;
686 sink.send(tungstenite::Message::Text(body_frame.into()))
687 .await
688 .map_err(super::HttpError::NotifySend)?;
689
690 let sink: super::notifier::SharedSink =
692 Arc::new(tokio::sync::Mutex::new(sink));
693 let pending: super::notifier::PendingNotifies =
694 Arc::new(dashmap::DashMap::new());
695
696 let (chunk_tx, chunk_rx) = futures::channel::mpsc::unbounded::<
700 Result<Chunk, super::HttpError>,
701 >();
702
703 let demux_sink = sink.clone();
704 let demux_pending = pending.clone();
705 let handler = Arc::new(handler);
706 tokio::spawn(async move {
707 let mut rx_stream = rx_stream;
708 let mut chunk_tx = chunk_tx;
709 loop {
710 let msg = match rx_stream.next().await {
711 Some(m) => m,
712 None => break,
713 };
714 let text = match msg {
715 Ok(tungstenite::Message::Text(t)) => {
716 let s = t.to_string();
717 s
718 }
719 Ok(tungstenite::Message::Binary(_)) => {
720 continue;
721 }
722 Ok(
723 tungstenite::Message::Ping(_)
724 | tungstenite::Message::Pong(_),
725 ) => continue,
726 Ok(tungstenite::Message::Close(_)) => {
727 break;
728 }
729 Ok(tungstenite::Message::Frame(_)) => continue,
730 Err(_) => {
731 break;
732 }
733 };
734
735 if let Ok(response) =
740 serde_json::from_str::<ClientResponse>(&text)
741 {
742 let id = response.id().to_string();
743 if let Some((_, tx)) = demux_pending.remove(&id) {
744 let _ = tx.send(response);
745 }
746 continue;
747 }
748 if let Ok(request) =
749 serde_json::from_str::<ServerRequest>(&text)
750 {
751 let id = request.id.clone();
752 let handler = handler.clone();
753 let demux_sink = demux_sink.clone();
754 tokio::spawn(async move {
755 let id = id;
756 let response = handler.handle(request).await;
759 let frame = match serde_json::to_string(&response) {
760 Ok(s) => s,
761 Err(_) => {
762 return;
763 }
764 };
765 let mut guard = demux_sink.lock().await;
766 let send_result = guard
767 .send(tungstenite::Message::Text(frame.into()))
768 .await;
769 });
770 continue;
771 }
772
773 let mut de = serde_json::Deserializer::from_str(&text);
775 match serde_path_to_error::deserialize::<_, Chunk>(&mut de) {
776 Ok(chunk) => {
777 if chunk_tx.unbounded_send(Ok(chunk)).is_err() {
778 break;
779 }
780 }
781 Err(e) => {
782 let err = match serde_json::from_str::<
785 error::ResponseError,
786 >(&text)
787 {
788 Ok(api_err) => super::HttpError::ApiError(api_err),
789 Err(_) => super::HttpError::DeserializationError(e),
790 };
791 let _ = chunk_tx.unbounded_send(Err(err));
792 break;
793 }
794 }
795 }
796 drop(demux_pending);
800 drop(chunk_tx);
801 });
802
803 let notifier = super::Notifier::new(sink, pending);
804 Ok((chunk_rx, notifier))
805 }
806}