1use crate::error;
4use eventsource_stream::Event as MessageEvent;
5use futures::{Stream, StreamExt, SinkExt};
6use reqwest_eventsource::{Event, RequestBuilderExt};
7use std::sync::Arc;
8use tokio_tungstenite::tungstenite;
9
10#[derive(Debug, Clone)]
35pub struct HttpClient {
36 pub http_client: reqwest::Client,
38 pub address: String,
40 pub authorization: Option<Arc<String>>,
42 pub user_agent: Option<String>,
44 pub x_title: Option<String>,
46 pub http_referer: Option<String>,
48 pub x_github_authorization: Option<Arc<String>>,
50 pub x_openrouter_authorization: Option<Arc<String>>,
52 pub x_mcp_authorization: Option<Arc<std::collections::HashMap<String, String>>>,
54 pub x_viewer_signature: Option<Arc<String>>,
56 pub x_viewer_address: Option<Arc<String>>,
58 pub x_commit_author_name: Option<Arc<String>>,
60 pub x_commit_author_email: Option<Arc<String>>,
62 pub agent_id: Option<Arc<String>>,
64}
65
66impl HttpClient {
67 pub fn new(
86 http_client: reqwest::Client,
87 address: Option<impl Into<String>>,
88 authorization: Option<impl Into<String>>,
89 user_agent: Option<impl Into<String>>,
90 x_title: Option<impl Into<String>>,
91 http_referer: Option<impl Into<String>>,
92 x_github_authorization: Option<impl Into<String>>,
93 x_openrouter_authorization: Option<impl Into<String>>,
94 x_mcp_authorization: Option<std::collections::HashMap<String, String>>,
95 x_viewer_signature: Option<impl Into<String>>,
96 x_viewer_address: Option<impl Into<String>>,
97 x_commit_author_name: Option<impl Into<String>>,
98 x_commit_author_email: Option<impl Into<String>>,
99 agent_id: Option<impl Into<String>>,
100 ) -> Self {
101 #[cfg(feature = "env")]
102 let env = |name: &str| -> Option<String> { std::env::var(name).ok() };
103
104 Self {
105 http_client,
106 address: match address {
107 Some(base) => base.into(),
108 #[cfg(feature = "env")]
109 None => env("OBJECTIVEAI_ADDRESS")
110 .unwrap_or_else(|| "https://api.objectiveai.dev".to_string()),
111 #[cfg(not(feature = "env"))]
112 None => "https://api.objectiveai.dev".to_string(),
113 },
114 authorization: authorization.map(|k| Arc::new(k.into()))
115 .or_else(|| { #[cfg(feature = "env")] { env("OBJECTIVEAI_AUTHORIZATION").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
116 user_agent: user_agent.map(Into::into)
117 .or_else(|| { #[cfg(feature = "env")] { env("USER_AGENT") } #[cfg(not(feature = "env"))] { None } }),
118 x_title: x_title.map(Into::into)
119 .or_else(|| { #[cfg(feature = "env")] { env("X_TITLE") } #[cfg(not(feature = "env"))] { None } }),
120 http_referer: http_referer.map(Into::into)
121 .or_else(|| { #[cfg(feature = "env")] { env("HTTP_REFERER") } #[cfg(not(feature = "env"))] { None } }),
122 x_github_authorization: x_github_authorization.map(|v| Arc::new(v.into()))
123 .or_else(|| { #[cfg(feature = "env")] { env("GITHUB_AUTHORIZATION").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
124 x_openrouter_authorization: x_openrouter_authorization.map(|v| Arc::new(v.into()))
125 .or_else(|| { #[cfg(feature = "env")] { env("OPENROUTER_AUTHORIZATION").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
126 x_mcp_authorization: x_mcp_authorization.map(Arc::new)
127 .or_else(|| { #[cfg(feature = "env")] { env("MCP_AUTHORIZATION").and_then(|v| serde_json::from_str(&v).ok()).map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
128 x_viewer_signature: x_viewer_signature.map(|v| Arc::new(v.into()))
129 .or_else(|| { #[cfg(feature = "env")] { env("VIEWER_SIGNATURE").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
130 x_viewer_address: x_viewer_address.map(|v| Arc::new(v.into()))
131 .or_else(|| { #[cfg(feature = "env")] { env("VIEWER_ADDRESS").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
132 x_commit_author_name: x_commit_author_name.map(|v| Arc::new(v.into()))
133 .or_else(|| { #[cfg(feature = "env")] { env("COMMIT_AUTHOR_NAME").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
134 x_commit_author_email: x_commit_author_email.map(|v| Arc::new(v.into()))
135 .or_else(|| { #[cfg(feature = "env")] { env("COMMIT_AUTHOR_EMAIL").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
136 agent_id: agent_id.map(|v| Arc::new(v.into()))
137 .or_else(|| { #[cfg(feature = "env")] { env("OBJECTIVEAI_AGENT_ID").map(Arc::new) } #[cfg(not(feature = "env"))] { None } }),
138 }
139 }
140
141 fn request(
143 &self,
144 method: reqwest::Method,
145 path: &str,
146 body: Option<impl serde::Serialize>,
147 ) -> reqwest::RequestBuilder {
148 let url = format!(
149 "{}/{}",
150 self.address.trim_end_matches('/'),
151 path.trim_start_matches('/')
152 );
153 let mut request = self.http_client.request(method, &url);
154 if let Some(authorization) = &self.authorization {
155 let key = authorization.strip_prefix("Bearer ").unwrap_or(authorization);
156 request =
157 request.header("authorization", format!("Bearer {}", key));
158 }
159 if let Some(user_agent) = &self.user_agent {
160 request = request.header("user-agent", user_agent);
161 }
162 if let Some(x_title) = &self.x_title {
163 request = request.header("x-title", x_title);
164 }
165 if let Some(http_referer) = &self.http_referer {
166 request = request.header("referer", http_referer);
167 request = request.header("http-referer", http_referer);
168 }
169 if let Some(token) = &self.x_github_authorization {
170 request = request.header("X-GITHUB-AUTHORIZATION", token.as_str());
171 }
172 if let Some(token) = &self.x_openrouter_authorization {
173 request = request.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
174 }
175 if let Some(headers) = &self.x_mcp_authorization {
176 if let Ok(json) = serde_json::to_string(headers.as_ref()) {
177 request = request.header("X-MCP-AUTHORIZATION", json);
178 }
179 }
180 if let Some(sig) = &self.x_viewer_signature {
181 request = request.header("X-VIEWER-SIGNATURE", sig.as_str());
182 }
183 if let Some(addr) = &self.x_viewer_address {
184 request = request.header("X-VIEWER-ADDRESS", addr.as_str());
185 }
186 if let Some(name) = &self.x_commit_author_name {
187 request = request.header("X-COMMIT-AUTHOR-NAME", name.as_str());
188 }
189 if let Some(email) = &self.x_commit_author_email {
190 request = request.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
191 }
192 if let Some(id) = &self.agent_id {
193 request = request.header("X-OBJECTIVEAI-AGENT-ID", id.as_str());
194 }
195 if let Some(body) = body {
196 request = request.json(&body);
197 }
198 request
199 }
200
201 pub async fn send_unary<T: serde::de::DeserializeOwned + Send + 'static>(
212 &self,
213 method: reqwest::Method,
214 path: impl AsRef<str>,
215 body: Option<impl serde::Serialize>,
216 ) -> Result<T, super::HttpError> {
217 let response = self
218 .http_client
219 .execute(
220 self.request(method, path.as_ref(), body)
221 .build()
222 .map_err(super::HttpError::RequestError)?,
223 )
224 .await
225 .map_err(super::HttpError::HttpError)?;
226 let code = response.status();
227 if code.is_success() {
228 let text =
229 response.text().await.map_err(super::HttpError::HttpError)?;
230 let mut de = serde_json::Deserializer::from_str(&text);
231 match serde_path_to_error::deserialize::<_, T>(&mut de) {
232 Ok(value) => Ok(value),
233 Err(e) => Err(super::HttpError::DeserializationError(e)),
234 }
235 } else {
236 match response.text().await {
237 Ok(text) => Err(super::HttpError::BadStatus {
238 code,
239 body: match serde_json::from_str::<serde_json::Value>(&text)
240 {
241 Ok(body) => body,
242 Err(_) => serde_json::Value::String(text),
243 },
244 }),
245 Err(_) => Err(super::HttpError::BadStatus {
246 code,
247 body: serde_json::Value::Null,
248 }),
249 }
250 }
251 }
252
253 pub async fn send_unary_no_response(
261 &self,
262 method: reqwest::Method,
263 path: impl AsRef<str>,
264 body: Option<impl serde::Serialize>,
265 ) -> Result<(), super::HttpError> {
266 let response = self
267 .http_client
268 .execute(
269 self.request(method, path.as_ref(), body)
270 .build()
271 .map_err(super::HttpError::RequestError)?,
272 )
273 .await
274 .map_err(super::HttpError::HttpError)?;
275 let code = response.status();
276 if code.is_success() {
277 Ok(())
278 } else {
279 match response.text().await {
280 Ok(text) => Err(super::HttpError::BadStatus {
281 code,
282 body: match serde_json::from_str::<serde_json::Value>(&text)
283 {
284 Ok(body) => body,
285 Err(_) => serde_json::Value::String(text),
286 },
287 }),
288 Err(_) => Err(super::HttpError::BadStatus {
289 code,
290 body: serde_json::Value::Null,
291 }),
292 }
293 }
294 }
295
296 pub async fn send_streaming<
312 T: serde::de::DeserializeOwned + Send + 'static,
313 P: AsRef<str> + Send,
314 B: serde::Serialize + Send,
315 >(
316 &self,
317 method: reqwest::Method,
318 path: P,
319 body: Option<B>,
320 ) -> Result<
321 impl Stream<Item = Result<T, super::HttpError>>
322 + Send
323 + 'static
324 + use<T, P, B>,
325 super::HttpError,
326 > {
327 Ok(
333 self.request(method, path.as_ref(), body)
334 .header("X-Transport", "sse")
335 .eventsource()?
336 .take_while(|result| {
337 let dominated = matches!(
338 result,
339 Ok(Event::Message(MessageEvent { data, .. })) if data == "[DONE]"
340 );
341 async move { !dominated }
342 })
343 .then(|result| async {
344 match result {
345 Ok(Event::Open) => None,
346 Ok(Event::Message(MessageEvent { data, .. }))
347 if data.starts_with(":")
348 || data.is_empty() =>
349 {
350 None
351 }
352 Ok(Event::Message(MessageEvent { data, .. })) => {
353 let mut de =
354 serde_json::Deserializer::from_str(&data);
355 Some(
356 match serde_path_to_error::deserialize::<_, T>(
357 &mut de,
358 ) {
359 Ok(value) => Ok(value),
360 Err(e) => match serde_json::from_str::<error::ResponseError>(&data) {
361 Ok(err) => Err(super::HttpError::ApiError(err)),
362 Err(_) => Err(super::HttpError::DeserializationError(e)),
363 },
364 }
365 )
366 }
367 Err(reqwest_eventsource::Error::InvalidStatusCode(
368 code,
369 response,
370 )) => match response.text().await {
371 Ok(body) => {
372 Some(Err(super::HttpError::BadStatus {
373 code,
374 body: match serde_json::from_str::<
375 serde_json::Value,
376 >(
377 &body
378 ) {
379 Ok(body) => body,
380 Err(_) => {
381 serde_json::Value::String(body)
382 }
383 },
384 }))
385 }
386 Err(_) => Some(Err(super::HttpError::BadStatus {
387 code,
388 body: serde_json::Value::Null,
389 })),
390 },
391 Err(e) => Some(Err(super::HttpError::StreamError(e))),
392 }
393 })
394 .filter_map(|x| async { x }),
395 )
396 }
397
398 pub async fn send_streaming_ws<Chunk, B, H, P>(
415 &self,
416 method: reqwest::Method,
417 path: P,
418 body: B,
419 handler: H,
420 ) -> Result<
421 (
422 impl Stream<Item = Result<Chunk, super::HttpError>>
423 + Send
424 + Unpin
425 + 'static
426 + use<Chunk, B, H, P>,
427 super::Notifier,
428 ),
429 super::HttpError,
430 >
431 where
432 Chunk: serde::de::DeserializeOwned + Send + 'static,
433 B: serde::Serialize + Send + 'static,
434 H: super::McpHandler,
435 P: AsRef<str>,
436 {
437 use crate::client_objectiveai_mcp::{
438 client_response::Response as ClientResponse,
439 server_request::Request as ServerRequest,
440 };
441 use futures::stream::SplitStream;
442 use tokio::net::TcpStream;
443 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
444
445 let url = format!(
448 "{}/{}",
449 self.address.trim_end_matches('/'),
450 path.as_ref().trim_start_matches('/')
451 );
452 let ws_url = if let Some(rest) = url.strip_prefix("https://") {
453 format!("wss://{rest}")
454 } else if let Some(rest) = url.strip_prefix("http://") {
455 format!("ws://{rest}")
456 } else {
457 url.clone()
458 };
459 let _ = method; let mut req = tungstenite::handshake::client::Request::builder()
464 .method("GET")
465 .uri(&ws_url)
466 .header("Host", reqwest::Url::parse(&url).ok().and_then(|u| u.host_str().map(str::to_owned)).unwrap_or_default())
467 .header("Upgrade", "websocket")
468 .header("Connection", "Upgrade")
469 .header(
470 "Sec-WebSocket-Key",
471 tungstenite::handshake::client::generate_key(),
472 )
473 .header("Sec-WebSocket-Version", "13")
474 .header("X-Transport", "ws");
475 if let Some(authorization) = &self.authorization {
476 let key = authorization
477 .strip_prefix("Bearer ")
478 .unwrap_or(authorization.as_str());
479 req = req.header("authorization", format!("Bearer {}", key));
480 }
481 if let Some(ua) = &self.user_agent {
482 req = req.header("user-agent", ua);
483 }
484 if let Some(x_title) = &self.x_title {
485 req = req.header("x-title", x_title);
486 }
487 if let Some(http_referer) = &self.http_referer {
488 req = req.header("referer", http_referer);
489 req = req.header("http-referer", http_referer);
490 }
491 if let Some(token) = &self.x_github_authorization {
492 req = req.header("X-GITHUB-AUTHORIZATION", token.as_str());
493 }
494 if let Some(token) = &self.x_openrouter_authorization {
495 req = req.header("X-OPENROUTER-AUTHORIZATION", token.as_str());
496 }
497 if let Some(headers) = &self.x_mcp_authorization {
498 if let Ok(json) = serde_json::to_string(headers.as_ref()) {
499 req = req.header("X-MCP-AUTHORIZATION", json);
500 }
501 }
502 if let Some(sig) = &self.x_viewer_signature {
503 req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
504 }
505 if let Some(addr) = &self.x_viewer_address {
506 req = req.header("X-VIEWER-ADDRESS", addr.as_str());
507 }
508 if let Some(name) = &self.x_commit_author_name {
509 req = req.header("X-COMMIT-AUTHOR-NAME", name.as_str());
510 }
511 if let Some(email) = &self.x_commit_author_email {
512 req = req.header("X-COMMIT-AUTHOR-EMAIL", email.as_str());
513 }
514 if let Some(id) = &self.agent_id {
515 req = req.header("X-OBJECTIVEAI-AGENT-ID", id.as_str());
516 }
517 let req = req
518 .body(())
519 .map_err(|e| super::HttpError::WsConnect(tungstenite::Error::Http(
520 tungstenite::http::Response::builder()
521 .status(400)
522 .body(Some(e.to_string().into_bytes()))
523 .unwrap(),
524 )))?;
525
526 let (ws_stream, _resp) = tokio_tungstenite::connect_async(req).await?;
527 let (mut sink, rx_stream): (
528 _,
529 SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
530 ) = ws_stream.split();
531
532 let body_frame = serde_json::to_string(&body)
534 .map_err(super::HttpError::NotifySerialize)?;
535 sink.send(tungstenite::Message::Text(body_frame.into()))
536 .await
537 .map_err(super::HttpError::NotifySend)?;
538
539 let sink: super::notifier::SharedSink = Arc::new(tokio::sync::Mutex::new(sink));
541 let pending: super::notifier::PendingNotifies =
542 Arc::new(dashmap::DashMap::new());
543
544 let (chunk_tx, chunk_rx) =
548 futures::channel::mpsc::unbounded::<Result<Chunk, super::HttpError>>();
549
550 let demux_sink = sink.clone();
551 let demux_pending = pending.clone();
552 let handler = Arc::new(handler);
553 tokio::spawn(async move {
554 let mut rx_stream = rx_stream;
555 let mut chunk_tx = chunk_tx;
556 loop {
557 let msg = match rx_stream.next().await {
558 Some(m) => m,
559 None => {
560 break;
561 }
562 };
563 let text = match msg {
564 Ok(tungstenite::Message::Text(t)) => {
565 let s = t.to_string();
566 s
567 }
568 Ok(tungstenite::Message::Binary(_)) => {
569 continue;
570 }
571 Ok(
572 tungstenite::Message::Ping(_) | tungstenite::Message::Pong(_),
573 ) => continue,
574 Ok(tungstenite::Message::Close(_)) => {
575 break;
576 }
577 Ok(tungstenite::Message::Frame(_)) => continue,
578 Err(_) => {
579 break;
580 }
581 };
582
583 if let Ok(response) = serde_json::from_str::<ClientResponse>(&text) {
588 let id = response.id().to_string();
589 if let Some((_, tx)) = demux_pending.remove(&id) {
590 let _ = tx.send(response);
591 }
592 continue;
593 }
594 if let Ok(request) = serde_json::from_str::<ServerRequest>(&text) {
595 let id = request.id.clone();
596 let method = request
597 .body
598 .as_ref()
599 .and_then(|v| v.get("method"))
600 .and_then(|v| v.as_str())
601 .unwrap_or("")
602 .to_string();
603 let handler = handler.clone();
604 let demux_sink = demux_sink.clone();
605 tokio::spawn(async move {
606 let id = id;
607 let response = handler.handle(request).await;
610 let frame = match serde_json::to_string(&response) {
611 Ok(s) => s,
612 Err(_) => {
613 return;
614 }
615 };
616 let mut guard = demux_sink.lock().await;
617 let send_result = guard
618 .send(tungstenite::Message::Text(frame.into()))
619 .await;
620 });
621 continue;
622 }
623
624 let mut de = serde_json::Deserializer::from_str(&text);
626 match serde_path_to_error::deserialize::<_, Chunk>(&mut de) {
627 Ok(chunk) => {
628 if chunk_tx.unbounded_send(Ok(chunk)).is_err() {
629 break;
630 }
631 }
632 Err(e) => {
633 let err = match serde_json::from_str::<error::ResponseError>(&text) {
636 Ok(api_err) => super::HttpError::ApiError(api_err),
637 Err(_) => super::HttpError::DeserializationError(e),
638 };
639 let _ = chunk_tx.unbounded_send(Err(err));
640 break;
641 }
642 }
643 }
644 drop(demux_pending);
648 drop(chunk_tx);
649 });
650
651 let notifier = super::Notifier::new(sink, pending);
652 Ok((chunk_rx, notifier))
653 }
654}