1use crate::error::{Error, Result};
8use crate::vcr::{RecordedRequest, VcrRecorder};
9use asupersync::http::h1::ParsedUrl;
10use asupersync::http::h1::http_client::Scheme;
11use asupersync::io::ext::AsyncWriteExt;
12use asupersync::io::{AsyncRead, AsyncWrite, ReadBuf};
13use asupersync::net::tcp::stream::TcpStream;
14use asupersync::tls::{TlsConnector, TlsConnectorBuilder};
15use futures::Stream;
16use futures::StreamExt;
17use futures::TryStreamExt;
18use futures::stream::{self, BoxStream};
19use std::pin::Pin;
20#[cfg(not(test))]
21use std::sync::OnceLock;
22use std::task::{Context, Poll};
23
24const DEFAULT_USER_AGENT: &str = concat!("pi_agent_rust/", env!("CARGO_PKG_VERSION"));
25const ANTIGRAVITY_VERSION_ENV: &str = "PI_AI_ANTIGRAVITY_VERSION";
26const MAX_HEADER_BYTES: usize = 64 * 1024;
27const READ_CHUNK_BYTES: usize = 16 * 1024;
28const MAX_BUFFERED_BYTES: usize = 256 * 1024;
29const MAX_TEXT_BODY_BYTES: usize = 50 * 1024 * 1024;
30const MAX_REQUEST_HEADERS: usize = 100;
32
33const WRITE_ZERO_MAX_RETRIES: usize = 10;
38
39const WRITE_ZERO_BACKOFF: std::time::Duration = std::time::Duration::from_millis(10);
41#[cfg(not(test))]
42const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 60;
43
44fn default_request_timeout_from_env() -> Option<std::time::Duration> {
45 #[cfg(test)]
46 {
47 None
50 }
51
52 #[cfg(not(test))]
53 {
54 static REQUEST_TIMEOUT: OnceLock<Option<std::time::Duration>> = OnceLock::new();
55 *REQUEST_TIMEOUT.get_or_init(|| {
56 let timeout_secs = std::env::var("PI_HTTP_REQUEST_TIMEOUT_SECS")
57 .ok()
58 .and_then(|raw| raw.trim().parse::<u64>().ok())
59 .unwrap_or(DEFAULT_REQUEST_TIMEOUT_SECS);
60 if timeout_secs == 0 {
61 None
62 } else {
63 Some(std::time::Duration::from_secs(timeout_secs))
64 }
65 })
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct Client {
71 tls: std::result::Result<TlsConnector, String>,
72 user_agent: String,
73 vcr: Option<VcrRecorder>,
74}
75
76impl Client {
77 #[must_use]
78 pub fn new() -> Self {
79 let tls = TlsConnectorBuilder::new()
80 .with_native_roots()
81 .and_then(|builder| builder.alpn_protocols(vec![b"http/1.1".to_vec()]).build())
82 .map_err(|e| e.to_string());
83
84 let user_agent = std::env::var(ANTIGRAVITY_VERSION_ENV).map_or_else(
85 |_| DEFAULT_USER_AGENT.to_string(),
86 |v| format!("{DEFAULT_USER_AGENT} Antigravity/{v}"),
87 );
88
89 Self {
90 tls,
91 user_agent,
92 vcr: None,
93 }
94 }
95
96 pub fn post(&self, url: &str) -> RequestBuilder<'_> {
97 RequestBuilder::new(self, Method::Post, url)
98 }
99
100 pub fn get(&self, url: &str) -> RequestBuilder<'_> {
101 RequestBuilder::new(self, Method::Get, url)
102 }
103
104 #[must_use]
105 pub fn with_vcr(mut self, recorder: VcrRecorder) -> Self {
106 self.vcr = Some(recorder);
107 self
108 }
109
110 pub const fn vcr(&self) -> Option<&VcrRecorder> {
111 self.vcr.as_ref()
112 }
113}
114
115impl Default for Client {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121#[derive(Debug, Clone, Copy)]
122enum Method {
123 Get,
124 Post,
125}
126
127impl Method {
128 const fn as_str(self) -> &'static str {
129 match self {
130 Self::Get => "GET",
131 Self::Post => "POST",
132 }
133 }
134}
135
136pub struct RequestBuilder<'a> {
137 client: &'a Client,
138 method: Method,
139 url: String,
140 headers: Vec<(String, String)>,
141 body: Vec<u8>,
142 timeout: Option<std::time::Duration>,
143}
144
145impl<'a> RequestBuilder<'a> {
146 fn new(client: &'a Client, method: Method, url: &str) -> Self {
147 Self {
148 client,
149 method,
150 url: url.to_string(),
151 headers: Vec::new(),
152 body: Vec::new(),
153 timeout: default_request_timeout_from_env(),
154 }
155 }
156
157 #[must_use]
158 pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
159 let key = key.into();
160 let value = value.into();
161 if let Some((existing_key, existing_value)) = self
162 .headers
163 .iter_mut()
164 .find(|(existing_key, _)| existing_key.eq_ignore_ascii_case(&key))
165 {
166 *existing_key = key;
167 *existing_value = value;
168 } else if self.headers.len() < MAX_REQUEST_HEADERS {
169 self.headers.push((key, value));
170 }
171 self
173 }
174
175 #[must_use]
176 pub const fn timeout(mut self, duration: std::time::Duration) -> Self {
177 self.timeout = Some(duration);
178 self
179 }
180
181 #[must_use]
184 pub const fn no_timeout(mut self) -> Self {
185 self.timeout = None;
186 self
187 }
188
189 #[must_use]
191 pub fn body(mut self, body: Vec<u8>) -> Self {
192 self.body = body;
193 self
194 }
195
196 pub fn json<T: serde::Serialize>(mut self, payload: &T) -> Result<Self> {
197 self = self.header("Content-Type", "application/json");
198 self.body = serde_json::to_vec(payload)?;
199 Ok(self)
200 }
201
202 pub async fn send(self) -> Result<Response> {
203 let RequestBuilder {
204 client,
205 method,
206 url,
207 headers,
208 body,
209 timeout,
210 } = self;
211
212 if let Some(recorder) = client.vcr() {
213 let recorded_request = build_recorded_request(method, &url, &headers, &body);
214 let recorded = recorder
215 .request_streaming_with(recorded_request, || async {
216 let (status, response_headers, stream) =
217 send_parts(client, method, &url, &headers, &body).await?;
218 Ok((status, response_headers, stream))
219 })
220 .await?;
221 let status = recorded.status;
222 let response_headers = recorded.headers.clone();
223 let stream = recorded.into_byte_stream();
224 return Ok(Response {
225 status,
226 headers: response_headers,
227 stream,
228 timeout_info: None,
229 });
230 }
231
232 let send_fut = send_parts(client, method, &url, &headers, &body);
233
234 let (status, response_headers, stream, timeout_info) = if let Some(duration) = timeout {
235 use asupersync::time::{sleep, wall_now};
236 use futures::future::{Either, FutureExt, select};
237
238 let asupersync_now = asupersync::Cx::current()
239 .and_then(|cx| cx.timer_driver())
240 .map_or_else(wall_now, |timer| timer.now());
241
242 let sleep_fut = sleep(asupersync_now, duration).fuse();
243 let send_fut = send_fut.fuse();
244 futures::pin_mut!(sleep_fut, send_fut);
245
246 let (status, response_headers, stream) = match select(send_fut, sleep_fut).await {
247 Either::Left((res, _)) => res?,
248 Either::Right(_) => return Err(Error::api("Request timed out")),
249 };
250 (
251 status,
252 response_headers,
253 stream,
254 Some((asupersync_now, duration)),
255 )
256 } else {
257 let (status, response_headers, stream) = send_fut.await?;
258 (status, response_headers, stream, None)
259 };
260
261 Ok(Response {
262 status,
263 headers: response_headers,
264 stream,
265 timeout_info,
266 })
267 }
268}
269
270async fn write_all_with_retry<W: AsyncWrite + Unpin>(
279 writer: &mut W,
280 mut buf: &[u8],
281) -> std::io::Result<()> {
282 use asupersync::time::{sleep, wall_now};
283
284 let mut consecutive_zeros: usize = 0;
285 let mut backoff = WRITE_ZERO_BACKOFF;
286
287 while !buf.is_empty() {
288 let n = futures::future::poll_fn(|cx| Pin::new(&mut *writer).poll_write(cx, buf)).await?;
289
290 if n == 0 {
291 consecutive_zeros += 1;
292 if consecutive_zeros > WRITE_ZERO_MAX_RETRIES {
293 return Err(std::io::Error::new(
294 std::io::ErrorKind::WriteZero,
295 format!(
296 "transport returned Ok(0) {} consecutive times ({} bytes remaining)",
297 consecutive_zeros,
298 buf.len(),
299 ),
300 ));
301 }
302 tracing::debug!(
303 attempt = consecutive_zeros,
304 remaining = buf.len(),
305 backoff_ms = backoff.as_millis(),
306 "write returned Ok(0), backing off before retry"
307 );
308
309 futures::future::poll_fn(|cx| Pin::new(&mut *writer).poll_flush(cx)).await?;
314
315 let now = asupersync::Cx::current()
316 .and_then(|cx| cx.timer_driver())
317 .map_or_else(wall_now, |timer| timer.now());
318 sleep(now, backoff).await;
319
320 backoff = backoff.saturating_mul(2);
322 } else {
323 buf = &buf[n..];
325 consecutive_zeros = 0;
326 backoff = WRITE_ZERO_BACKOFF;
327 }
328 }
329 Ok(())
330}
331
332async fn send_parts(
333 client: &Client,
334 method: Method,
335 url: &str,
336 headers: &[(String, String)],
337 body: &[u8],
338) -> Result<(
339 u16,
340 Vec<(String, String)>,
341 BoxStream<'static, std::io::Result<Vec<u8>>>,
342)> {
343 let parsed = ParsedUrl::parse(url).map_err(|e| Error::api(format!("Invalid URL: {e}")))?;
344 let mut transport = connect_transport(&parsed, client).await?;
345
346 let request_bytes = build_request_bytes(method, &parsed, &client.user_agent, headers, body);
347 write_all_with_retry(&mut transport, &request_bytes).await?;
348 if !body.is_empty() {
349 write_all_with_retry(&mut transport, body).await?;
350 }
351 transport.flush().await?;
352
353 let (status, response_headers, leftover) = Box::pin(read_response_head(&mut transport)).await?;
354 let body_kind = body_kind_from_response(status, &response_headers)?;
355
356 let state = BodyStreamState::new(transport, body_kind, leftover);
357 let stream = stream::try_unfold(state, |mut state| async move {
358 match Box::pin(state.next_bytes()).await {
359 Ok(Some(chunk)) => Ok(Some((chunk, state))),
360 Ok(None) => {
361 state.shutdown_transport_best_effort().await;
362 Ok(None)
363 }
364 Err(err) => {
365 state.shutdown_transport_best_effort().await;
366 Err(err)
367 }
368 }
369 })
370 .boxed();
371
372 Ok((status, response_headers, stream))
373}
374
375fn build_recorded_request(
376 method: Method,
377 url: &str,
378 headers: &[(String, String)],
379 body: &[u8],
380) -> RecordedRequest {
381 let mut body_value = None;
382 let mut body_text = None;
383
384 if !body.is_empty() {
385 let is_json = headers.iter().any(|(name, value)| {
386 name.eq_ignore_ascii_case("content-type")
387 && value.to_ascii_lowercase().contains("application/json")
388 });
389
390 if is_json {
391 match serde_json::from_slice::<serde_json::Value>(body) {
392 Ok(value) => body_value = Some(value),
393 Err(_) => body_text = Some(String::from_utf8_lossy(body).to_string()),
394 }
395 } else {
396 body_text = Some(String::from_utf8_lossy(body).to_string());
397 }
398 }
399
400 RecordedRequest {
401 method: method.as_str().to_string(),
402 url: url.to_string(),
403 headers: headers.to_vec(),
404 body: body_value,
405 body_text,
406 }
407}
408
409pub struct Response {
410 status: u16,
411 headers: Vec<(String, String)>,
412 stream: Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>>,
413 timeout_info: Option<(asupersync::Time, std::time::Duration)>,
414}
415
416fn wrap_stream_with_idle_timeout(
417 stream: Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>>,
418 timeout_info: Option<(asupersync::Time, std::time::Duration)>,
419) -> Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>> {
420 let Some((start_time, timeout)) = timeout_info else {
421 return stream;
422 };
423
424 Box::pin(futures::stream::unfold(
425 (stream, start_time, timeout),
426 |(mut stream, mut last_activity, timeout)| async move {
427 use asupersync::time::{sleep, wall_now};
428 use futures::future::{Either, FutureExt, select};
429
430 let asupersync_now = asupersync::Cx::current()
431 .and_then(|cx| cx.timer_driver())
432 .map_or_else(wall_now, |timer| timer.now());
433
434 let elapsed =
435 std::time::Duration::from_nanos(asupersync_now.duration_since(last_activity));
436 if elapsed >= timeout {
437 return Some((
438 Err(std::io::Error::other(
439 "Request timed out reading body stream",
440 )),
441 (stream, last_activity, timeout),
442 ));
443 }
444
445 let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
446 let sleep_fut = sleep(asupersync_now, remaining).fuse();
447 let next_fut = stream.next().fuse();
448 futures::pin_mut!(sleep_fut, next_fut);
449
450 match select(next_fut, sleep_fut).await {
451 Either::Left((Some(res), _)) => {
452 let now = asupersync::Cx::current()
453 .and_then(|cx| cx.timer_driver())
454 .map_or_else(wall_now, |timer| timer.now());
455 last_activity = now;
456 Some((res, (stream, last_activity, timeout)))
457 }
458 Either::Left((None, _)) => None,
459 Either::Right(_) => Some((
460 Err(std::io::Error::other(
461 "Request timed out reading body stream",
462 )),
463 (stream, last_activity, timeout),
464 )),
465 }
466 },
467 ))
468}
469
470impl Response {
471 #[must_use]
472 pub const fn status(&self) -> u16 {
473 self.status
474 }
475
476 #[must_use]
477 pub fn headers(&self) -> &[(String, String)] {
478 &self.headers
479 }
480
481 #[must_use]
482 pub fn bytes_stream(self) -> Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>> {
483 wrap_stream_with_idle_timeout(self.stream, self.timeout_info)
484 }
485
486 pub async fn text(self) -> Result<String> {
487 let stream = wrap_stream_with_idle_timeout(self.stream, self.timeout_info);
488 let bytes = stream
489 .try_fold(Vec::new(), |mut acc, chunk| async move {
490 if acc.len().saturating_add(chunk.len()) > MAX_TEXT_BODY_BYTES {
491 return Err(std::io::Error::other("response body too large"));
492 }
493 acc.extend_from_slice(&chunk);
494 Ok::<_, std::io::Error>(acc)
495 })
496 .await
497 .map_err(|err| {
498 if err.kind() == std::io::ErrorKind::Other
499 && err
500 .to_string()
501 .contains("Request timed out reading body stream")
502 {
503 Error::api("Request timed out reading body")
504 } else {
505 Error::from(err)
506 }
507 })?;
508
509 match String::from_utf8(bytes) {
510 Ok(s) => Ok(s),
511 Err(e) => Ok(String::from_utf8_lossy(e.as_bytes()).into_owned()),
512 }
513 }
514}
515
516async fn connect_transport(parsed: &ParsedUrl, client: &Client) -> Result<Transport> {
517 let addr = (parsed.host.clone(), parsed.port);
518 let tcp = TcpStream::connect(addr).await?;
519 match parsed.scheme {
520 Scheme::Http => Ok(Transport::Tcp(tcp)),
521 Scheme::Https => {
522 let tls = client
523 .tls
524 .as_ref()
525 .map_err(|e| Error::api(format!("TLS configuration error: {e}")))?;
526 let tls_stream = tls
527 .clone()
528 .connect(&parsed.host, tcp)
529 .await
530 .map_err(|e| Error::api(format!("TLS connect failed: {e}")))?;
531 Ok(Transport::Tls(Box::new(tls_stream)))
532 }
533 }
534}
535
536fn sanitize_header_value(value: &str) -> String {
538 value.chars().filter(|&c| c != '\r' && c != '\n').collect()
539}
540
541fn sanitize_header_name(name: &str) -> String {
543 name.bytes()
544 .filter(|b| {
545 b.is_ascii_alphanumeric()
546 || matches!(
547 *b,
548 b'!' | b'#'
549 | b'$'
550 | b'%'
551 | b'&'
552 | b'\''
553 | b'*'
554 | b'+'
555 | b'-'
556 | b'.'
557 | b'^'
558 | b'_'
559 | b'`'
560 | b'|'
561 | b'~'
562 )
563 })
564 .map(char::from)
565 .collect()
566}
567
568fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> {
569 headers.iter().rev().find_map(|(key, value)| {
570 if key.eq_ignore_ascii_case(name) {
571 Some(value.as_str())
572 } else {
573 None
574 }
575 })
576}
577
578fn build_request_bytes(
579 method: Method,
580 parsed: &ParsedUrl,
581 user_agent: &str,
582 headers: &[(String, String)],
583 body: &[u8],
584) -> Vec<u8> {
585 let mut out = String::new();
586 let effective_user_agent =
587 sanitize_header_value(header_value(headers, "user-agent").unwrap_or(user_agent));
588 let host_header = host_header_value(parsed);
589 let _ = std::fmt::Write::write_fmt(
590 &mut out,
591 format_args!("{} {} HTTP/1.1\r\n", method.as_str(), parsed.path),
592 );
593 let _ = std::fmt::Write::write_fmt(&mut out, format_args!("Host: {host_header}\r\n"));
594 let _ = std::fmt::Write::write_fmt(
595 &mut out,
596 format_args!("User-Agent: {effective_user_agent}\r\n"),
597 );
598 let _ =
599 std::fmt::Write::write_fmt(&mut out, format_args!("Content-Length: {}\r\n", body.len()));
600
601 for (name, value) in headers {
602 let clean_name = sanitize_header_name(name);
603 if clean_name.is_empty()
604 || clean_name.eq_ignore_ascii_case("host")
605 || clean_name.eq_ignore_ascii_case("user-agent")
606 || clean_name.eq_ignore_ascii_case("content-length")
607 || clean_name.eq_ignore_ascii_case("transfer-encoding")
610 {
611 continue;
612 }
613 let clean_value = sanitize_header_value(value);
614 let _ =
615 std::fmt::Write::write_fmt(&mut out, format_args!("{clean_name}: {clean_value}\r\n"));
616 }
617
618 out.push_str("\r\n");
619 out.into_bytes()
620}
621
622fn host_header_value(parsed: &ParsedUrl) -> String {
623 let host = if parsed.host.contains(':') && !parsed.host.starts_with('[') {
624 format!("[{}]", parsed.host)
625 } else {
626 parsed.host.clone()
627 };
628
629 let default_port = match parsed.scheme {
630 Scheme::Http => 80,
631 Scheme::Https => 443,
632 };
633
634 if parsed.port == default_port {
635 host
636 } else {
637 format!("{host}:{}", parsed.port)
638 }
639}
640
641async fn read_response_head(
642 transport: &mut Transport,
643) -> Result<(u16, Vec<(String, String)>, Vec<u8>)> {
644 let mut buf = Vec::with_capacity(8192);
645 let mut scratch = [0u8; READ_CHUNK_BYTES];
646 let mut search_start = 0;
647
648 loop {
649 if buf.len() > MAX_HEADER_BYTES {
650 return Err(Error::api("HTTP response headers too large"));
651 }
652
653 let haystack = &buf[search_start..];
654 if let Some(pos) = find_headers_end(haystack) {
655 let absolute_pos = search_start + pos;
656 let head = &buf[..absolute_pos];
657 let leftover = buf[absolute_pos..].to_vec();
658 let (status, headers) = parse_response_head(head)?;
659 return Ok((status, headers, leftover));
660 }
661
662 let n = read_some(transport, &mut scratch).await?;
663 if n == 0 {
664 return Err(Error::api("HTTP connection closed before headers"));
665 }
666 let old_len = buf.len();
667 buf.extend_from_slice(&scratch[..n]);
668 search_start = old_len.saturating_sub(3);
669 }
670}
671
672fn find_headers_end(buf: &[u8]) -> Option<usize> {
673 for i in 0..buf.len().saturating_sub(1) {
674 if buf[i..].starts_with(b"\r\n\r\n") {
675 return Some(i + 4);
676 }
677 if buf[i..].starts_with(b"\n\n") {
678 return Some(i + 2);
679 }
680 }
681 None
682}
683
684fn parse_response_head(head: &[u8]) -> Result<(u16, Vec<(String, String)>)> {
685 let text =
686 std::str::from_utf8(head).map_err(|e| Error::api(format!("Invalid HTTP headers: {e}")))?;
687 let mut lines = text.lines();
688
689 let status_line = lines
690 .next()
691 .ok_or_else(|| Error::api("Missing HTTP status line"))?;
692 let mut parts = status_line.split_whitespace();
693 let _version = parts
694 .next()
695 .ok_or_else(|| Error::api("Invalid HTTP status line"))?;
696 let status_str = parts
697 .next()
698 .ok_or_else(|| Error::api("Invalid HTTP status line"))?;
699 let status: u16 = status_str
700 .parse()
701 .map_err(|_| Error::api("Invalid HTTP status code"))?;
702
703 let mut headers = Vec::new();
704 for line in lines {
705 if line.is_empty() {
706 continue;
707 }
708 let (name, value) = line
709 .split_once(':')
710 .ok_or_else(|| Error::api("Invalid HTTP header line"))?;
711 headers.push((name.trim().to_string(), value.trim().to_string()));
712 }
713
714 Ok((status, headers))
715}
716
717#[derive(Debug, Clone, Copy)]
718enum BodyKind {
719 Empty,
720 ContentLength(usize),
721 Chunked,
722 Eof,
723}
724
725fn body_kind_from_response(status: u16, headers: &[(String, String)]) -> Result<BodyKind> {
726 if matches!(status, 100..=199 | 204 | 205 | 304) {
727 return Ok(BodyKind::Empty);
728 }
729 body_kind_from_headers(headers)
730}
731
732fn body_kind_from_headers(headers: &[(String, String)]) -> Result<BodyKind> {
733 let mut content_length = None;
734 let mut transfer_encodings = Vec::new();
735 let mut saw_transfer_encoding = false;
736
737 for (name, value) in headers {
738 let name_lc = name.to_ascii_lowercase();
739 if name_lc == "content-length" {
740 for part in value.split(',') {
741 let parsed = part
742 .trim()
743 .parse::<usize>()
744 .map_err(|_| Error::api("Invalid HTTP Content-Length header"))?;
745 if let Some(existing) = content_length {
746 if existing != parsed {
747 return Err(Error::api("Conflicting HTTP Content-Length headers"));
748 }
749 } else {
750 content_length = Some(parsed);
751 }
752 }
753 } else if name_lc == "transfer-encoding" {
754 saw_transfer_encoding = true;
755 transfer_encodings.extend(
756 value
757 .split(',')
758 .map(str::trim)
759 .filter(|value| !value.is_empty())
760 .map(str::to_ascii_lowercase),
761 );
762 }
763 }
764
765 if saw_transfer_encoding {
766 let Some(last) = transfer_encodings.last() else {
767 return Err(Error::api("Invalid HTTP Transfer-Encoding header"));
768 };
769 if last != "chunked" {
770 return Err(Error::api("Unsupported HTTP Transfer-Encoding header"));
771 }
772 if transfer_encodings.len() != 1 {
773 return Err(Error::api("Unsupported HTTP Transfer-Encoding header"));
774 }
775 return Ok(BodyKind::Chunked);
776 }
777
778 Ok(match content_length {
779 Some(0) => BodyKind::Empty,
780 Some(n) => BodyKind::ContentLength(n),
781 None => BodyKind::Eof,
782 })
783}
784
785struct Buffer {
786 bytes: Vec<u8>,
787 pos: usize,
788}
789
790impl Buffer {
791 const fn new(initial: Vec<u8>) -> Self {
792 Self {
793 bytes: initial,
794 pos: 0,
795 }
796 }
797
798 fn available(&self) -> &[u8] {
799 &self.bytes[self.pos..]
800 }
801
802 fn len(&self) -> usize {
803 self.available().len()
804 }
805
806 fn is_empty(&self) -> bool {
807 self.len() == 0
808 }
809
810 fn consume(&mut self, n: usize) {
811 self.pos = self.pos.saturating_add(n).min(self.bytes.len());
812 if self.pos == self.bytes.len() {
813 self.bytes.clear();
814 self.pos = 0;
815 } else if self.pos > 0 && self.pos >= self.bytes.len() / 2 {
816 self.bytes.drain(..self.pos);
817 self.pos = 0;
818 }
819 }
820
821 fn extend(&mut self, data: &[u8]) -> Result<()> {
822 if self.bytes.len().saturating_add(data.len()) > MAX_BUFFERED_BYTES {
823 return Err(Error::api("HTTP body buffer exceeded"));
824 }
825 self.bytes.extend_from_slice(data);
826 Ok(())
827 }
828
829 fn split_to_vec(&mut self, n: usize) -> Vec<u8> {
830 let n = n.min(self.len());
831 let out = self.available()[..n].to_vec();
832 self.consume(n);
833 out
834 }
835}
836
837enum ChunkedState {
838 SizeLine,
839 Data { remaining: usize },
840 DataCrlf,
841 Trailers,
842 Done,
843}
844
845struct BodyStreamState {
846 transport: Transport,
847 kind: BodyKind,
848 buf: Buffer,
849 chunked_state: ChunkedState,
850 remaining: usize,
851 transport_closed: bool,
852}
853
854impl BodyStreamState {
855 const fn new(transport: Transport, kind: BodyKind, leftover: Vec<u8>) -> Self {
856 let remaining = match kind {
857 BodyKind::ContentLength(n) => n,
858 _ => 0,
859 };
860 Self {
861 transport,
862 kind,
863 buf: Buffer::new(leftover),
864 chunked_state: ChunkedState::SizeLine,
865 remaining,
866 transport_closed: false,
867 }
868 }
869
870 async fn next_bytes(&mut self) -> std::io::Result<Option<Vec<u8>>> {
871 match self.kind {
872 BodyKind::Empty => Ok(None),
873 BodyKind::Eof => Box::pin(self.next_eof()).await,
874 BodyKind::ContentLength(_) => Box::pin(self.next_content_length()).await,
875 BodyKind::Chunked => Box::pin(self.next_chunked()).await,
876 }
877 }
878
879 async fn shutdown_transport_best_effort(&mut self) {
880 if self.transport_closed {
881 return;
882 }
883 self.transport_closed = true;
884 let _ = self.transport.shutdown().await;
885 }
886
887 async fn read_more(&mut self) -> std::io::Result<usize> {
888 let mut scratch = [0u8; READ_CHUNK_BYTES];
889 let n = read_some(&mut self.transport, &mut scratch).await?;
890 if n > 0 {
891 if let Err(err) = self.buf.extend(&scratch[..n]) {
892 return Err(std::io::Error::other(err.to_string()));
893 }
894 }
895 Ok(n)
896 }
897
898 async fn next_eof(&mut self) -> std::io::Result<Option<Vec<u8>>> {
899 if !self.buf.is_empty() {
900 return Ok(Some(self.buf.split_to_vec(self.buf.len())));
901 }
902
903 let n = Box::pin(self.read_more()).await?;
904 if n == 0 {
905 return Ok(None);
906 }
907 Ok(Some(self.buf.split_to_vec(self.buf.len())))
908 }
909
910 async fn next_content_length(&mut self) -> std::io::Result<Option<Vec<u8>>> {
911 if self.remaining == 0 {
912 return Ok(None);
913 }
914
915 if self.buf.is_empty() {
916 let n = Box::pin(self.read_more()).await?;
917 if n == 0 {
918 return Err(std::io::Error::new(
919 std::io::ErrorKind::UnexpectedEof,
920 "unexpected EOF reading content-length body",
921 ));
922 }
923 }
924
925 let to_take = self.remaining.min(self.buf.len()).min(READ_CHUNK_BYTES);
926 let out = self.buf.split_to_vec(to_take);
927 self.remaining = self.remaining.saturating_sub(out.len());
928 Ok(Some(out))
929 }
930
931 #[allow(clippy::too_many_lines)]
932 async fn next_chunked(&mut self) -> std::io::Result<Option<Vec<u8>>> {
933 loop {
934 match self.chunked_state {
935 ChunkedState::SizeLine => {
936 if let Some((line_end, len)) = find_crlf(self.buf.available()) {
937 let line = &self.buf.available()[..line_end];
938 let line_str = std::str::from_utf8(line).map_err(std::io::Error::other)?;
939 let size_part = line_str.split(';').next().unwrap_or("").trim();
940 if size_part.is_empty() {
941 return Err(std::io::Error::other("invalid chunk size"));
942 }
943 let chunk_size = usize::from_str_radix(size_part, 16)
944 .map_err(|_| std::io::Error::other("invalid chunk size"))?;
945 self.buf.consume(line_end + len);
946 if chunk_size == 0 {
947 self.chunked_state = ChunkedState::Trailers;
948 } else {
949 self.chunked_state = ChunkedState::Data {
950 remaining: chunk_size,
951 };
952 }
953 continue;
954 }
955
956 let n = Box::pin(self.read_more()).await?;
957 if n == 0 {
958 return Err(std::io::Error::new(
959 std::io::ErrorKind::UnexpectedEof,
960 "unexpected EOF reading chunk size",
961 ));
962 }
963 }
964
965 ChunkedState::Data { remaining } => {
966 if remaining == 0 {
967 self.chunked_state = ChunkedState::DataCrlf;
968 continue;
969 }
970
971 if self.buf.is_empty() {
972 let n = Box::pin(self.read_more()).await?;
973 if n == 0 {
974 return Err(std::io::Error::new(
975 std::io::ErrorKind::UnexpectedEof,
976 "unexpected EOF reading chunk data",
977 ));
978 }
979 }
980
981 let to_take = remaining.min(self.buf.len()).min(READ_CHUNK_BYTES);
982 let out = self.buf.split_to_vec(to_take);
983 self.chunked_state = ChunkedState::Data {
984 remaining: remaining.saturating_sub(out.len()),
985 };
986 return Ok(Some(out));
987 }
988
989 ChunkedState::DataCrlf => {
990 if self.buf.len() < 2 {
991 let n = Box::pin(self.read_more()).await?;
992 if n == 0 && self.buf.is_empty() {
993 return Err(std::io::Error::new(
994 std::io::ErrorKind::UnexpectedEof,
995 "unexpected EOF reading chunk CRLF",
996 ));
997 }
998 }
1000
1001 let bytes = self.buf.available();
1002 if bytes.starts_with(b"\r\n") {
1003 self.buf.consume(2);
1004 self.chunked_state = ChunkedState::SizeLine;
1005 } else if bytes.starts_with(b"\n") {
1006 self.buf.consume(1);
1007 self.chunked_state = ChunkedState::SizeLine;
1008 } else if bytes.len() >= 2 {
1009 return Err(std::io::Error::other("invalid chunk CRLF"));
1010 } else {
1011 let n = Box::pin(self.read_more()).await?;
1013 if n == 0 {
1014 return Err(std::io::Error::new(
1015 std::io::ErrorKind::UnexpectedEof,
1016 "unexpected EOF reading chunk CRLF",
1017 ));
1018 }
1019 }
1020 }
1021
1022 ChunkedState::Trailers => {
1023 let bytes = self.buf.available();
1027 if bytes.starts_with(b"\r\n") {
1028 self.buf.consume(2);
1029 self.chunked_state = ChunkedState::Done;
1030 return Ok(None);
1031 }
1032 if bytes.starts_with(b"\n") {
1033 self.buf.consume(1);
1034 self.chunked_state = ChunkedState::Done;
1035 return Ok(None);
1036 }
1037 if let Some(end) = find_headers_end(self.buf.available()) {
1038 self.buf.consume(end);
1039 self.chunked_state = ChunkedState::Done;
1040 return Ok(None);
1041 }
1042
1043 let n = Box::pin(self.read_more()).await?;
1044 if n == 0 {
1045 return Err(std::io::Error::new(
1046 std::io::ErrorKind::UnexpectedEof,
1047 "unexpected EOF reading trailers",
1048 ));
1049 }
1050 }
1051
1052 ChunkedState::Done => return Ok(None),
1053 }
1054 }
1055 }
1056}
1057
1058fn find_crlf(buf: &[u8]) -> Option<(usize, usize)> {
1059 for i in 0..buf.len() {
1060 if buf[i..].starts_with(b"\r\n") {
1061 return Some((i, 2));
1062 }
1063 if buf[i..].starts_with(b"\n") {
1064 return Some((i, 1));
1065 }
1066 }
1067 None
1068}
1069
1070async fn read_some<R: AsyncRead + Unpin>(reader: &mut R, dst: &mut [u8]) -> std::io::Result<usize> {
1071 futures::future::poll_fn(|cx| {
1072 let mut read_buf = ReadBuf::new(dst);
1073 match Pin::new(&mut *reader).poll_read(cx, &mut read_buf) {
1074 Poll::Pending => Poll::Pending,
1075 Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())),
1076 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1077 }
1078 })
1079 .await
1080}
1081
1082#[derive(Debug)]
1083enum Transport {
1084 Tcp(TcpStream),
1085 Tls(Box<asupersync::tls::TlsStream<TcpStream>>),
1086}
1087
1088impl Unpin for Transport {}
1089
1090impl AsyncRead for Transport {
1091 fn poll_read(
1092 mut self: Pin<&mut Self>,
1093 cx: &mut Context<'_>,
1094 buf: &mut ReadBuf<'_>,
1095 ) -> Poll<std::io::Result<()>> {
1096 match &mut *self {
1097 Self::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
1098 Self::Tls(stream) => Pin::new(&mut **stream).poll_read(cx, buf),
1099 }
1100 }
1101}
1102
1103impl AsyncWrite for Transport {
1104 fn poll_write(
1105 mut self: Pin<&mut Self>,
1106 cx: &mut Context<'_>,
1107 buf: &[u8],
1108 ) -> Poll<std::io::Result<usize>> {
1109 match &mut *self {
1110 Self::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
1111 Self::Tls(stream) => Pin::new(&mut **stream).poll_write(cx, buf),
1112 }
1113 }
1114
1115 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1116 match &mut *self {
1117 Self::Tcp(stream) => Pin::new(stream).poll_flush(cx),
1118 Self::Tls(stream) => Pin::new(&mut **stream).poll_flush(cx),
1119 }
1120 }
1121
1122 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1123 match &mut *self {
1124 Self::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
1125 Self::Tls(stream) => Pin::new(&mut **stream).poll_shutdown(cx),
1126 }
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::*;
1133 use serde_json::json;
1134 use std::collections::VecDeque;
1135
1136 #[test]
1138 fn method_as_str_get() {
1139 assert_eq!(Method::Get.as_str(), "GET");
1140 }
1141
1142 #[test]
1143 fn method_as_str_post() {
1144 assert_eq!(Method::Post.as_str(), "POST");
1145 }
1146
1147 #[test]
1149 fn find_headers_end_present() {
1150 let buf = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
1151 let pos = find_headers_end(buf).unwrap();
1152 assert_eq!(&buf[pos..], b"hello");
1153 }
1154
1155 #[test]
1156 fn find_headers_end_absent() {
1157 assert!(find_headers_end(b"HTTP/1.1 200 OK\r\nFoo: bar\r\n").is_none());
1158 }
1159
1160 #[test]
1161 fn find_headers_end_empty() {
1162 assert!(find_headers_end(b"").is_none());
1163 }
1164
1165 #[test]
1166 fn find_headers_end_just_separator() {
1167 let buf = b"\r\n\r\n";
1168 assert_eq!(find_headers_end(buf), Some(4));
1169 }
1170
1171 #[test]
1173 fn find_crlf_present() {
1174 assert_eq!(find_crlf(b"abc\r\ndef"), Some((3, 2)));
1175 }
1176
1177 #[test]
1178 fn find_crlf_present_lf() {
1179 assert_eq!(find_crlf(b"abc\ndef"), Some((3, 1)));
1180 }
1181
1182 #[test]
1183 fn find_crlf_absent() {
1184 assert!(find_crlf(b"abcdef").is_none());
1185 }
1186
1187 #[test]
1188 fn find_crlf_at_start() {
1189 assert_eq!(find_crlf(b"\r\ndata"), Some((0, 2)));
1190 }
1191
1192 #[test]
1193 fn find_crlf_at_start_lf() {
1194 assert_eq!(find_crlf(b"\ndata"), Some((0, 1)));
1195 }
1196
1197 #[test]
1199 fn parse_response_head_200() {
1200 let head = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n";
1201 let (status, headers) = parse_response_head(head).unwrap();
1202 assert_eq!(status, 200);
1203 assert_eq!(headers.len(), 1);
1204 assert_eq!(headers[0].0, "Content-Type");
1205 assert_eq!(headers[0].1, "text/plain");
1206 }
1207
1208 #[test]
1209 fn parse_response_head_404() {
1210 let head = b"HTTP/1.1 404 Not Found\r\n\r\n";
1211 let (status, headers) = parse_response_head(head).unwrap();
1212 assert_eq!(status, 404);
1213 assert!(headers.is_empty());
1214 }
1215
1216 #[test]
1217 fn parse_response_head_multiple_headers() {
1218 let head = b"HTTP/1.1 200 OK\r\nA: 1\r\nB: 2\r\nC: 3\r\n\r\n";
1219 let (status, headers) = parse_response_head(head).unwrap();
1220 assert_eq!(status, 200);
1221 assert_eq!(headers.len(), 3);
1222 assert_eq!(headers[0], ("A".to_string(), "1".to_string()));
1223 assert_eq!(headers[1], ("B".to_string(), "2".to_string()));
1224 assert_eq!(headers[2], ("C".to_string(), "3".to_string()));
1225 }
1226
1227 #[test]
1228 fn parse_response_head_header_value_with_colon() {
1229 let head = b"HTTP/1.1 200 OK\r\nLocation: http://example.com:8080/path\r\n\r\n";
1231 let (status, headers) = parse_response_head(head).unwrap();
1232 assert_eq!(status, 200);
1233 assert_eq!(headers[0].0, "Location");
1234 assert_eq!(headers[0].1, "http://example.com:8080/path");
1235 }
1236
1237 #[test]
1238 fn parse_response_head_invalid_status_code() {
1239 let head = b"HTTP/1.1 abc OK\r\n\r\n";
1240 assert!(parse_response_head(head).is_err());
1241 }
1242
1243 #[test]
1244 fn parse_response_head_missing_status() {
1245 let head = b"HTTP/1.1\r\n\r\n";
1246 assert!(parse_response_head(head).is_err());
1247 }
1248
1249 #[test]
1250 fn parse_response_head_empty() {
1251 let head = b"";
1252 assert!(parse_response_head(head).is_err());
1253 }
1254
1255 #[test]
1257 fn body_kind_content_length() {
1258 let headers = vec![("Content-Length".to_string(), "42".to_string())];
1259 assert!(matches!(
1260 body_kind_from_headers(&headers).unwrap(),
1261 BodyKind::ContentLength(42)
1262 ));
1263 }
1264
1265 #[test]
1266 fn body_kind_content_length_zero() {
1267 let headers = vec![("Content-Length".to_string(), "0".to_string())];
1268 assert!(matches!(
1269 body_kind_from_headers(&headers).unwrap(),
1270 BodyKind::Empty
1271 ));
1272 }
1273
1274 #[test]
1275 fn body_kind_chunked() {
1276 let headers = vec![("Transfer-Encoding".to_string(), "chunked".to_string())];
1277 assert!(matches!(
1278 body_kind_from_headers(&headers).unwrap(),
1279 BodyKind::Chunked
1280 ));
1281 }
1282
1283 #[test]
1284 fn body_kind_rejects_chunked_with_additional_transfer_codings() {
1285 let headers = vec![("Transfer-Encoding".to_string(), "gzip, chunked".to_string())];
1286 assert!(body_kind_from_headers(&headers).is_err());
1287 }
1288
1289 #[test]
1290 fn body_kind_rejects_repeated_transfer_encoding_headers_with_extra_codings() {
1291 let headers = vec![
1292 ("Transfer-Encoding".to_string(), "gzip".to_string()),
1293 ("Transfer-Encoding".to_string(), "chunked".to_string()),
1294 ];
1295 assert!(body_kind_from_headers(&headers).is_err());
1296 }
1297
1298 #[test]
1299 fn body_kind_rejects_repeated_chunked_transfer_encoding() {
1300 let headers = vec![
1301 ("Transfer-Encoding".to_string(), "chunked".to_string()),
1302 ("Transfer-Encoding".to_string(), "chunked".to_string()),
1303 ];
1304 assert!(body_kind_from_headers(&headers).is_err());
1305 }
1306
1307 #[test]
1308 fn body_kind_rejects_transfer_encoding_when_chunked_is_not_final() {
1309 let headers = vec![
1310 ("Transfer-Encoding".to_string(), "chunked".to_string()),
1311 ("Transfer-Encoding".to_string(), "gzip".to_string()),
1312 ];
1313 assert!(body_kind_from_headers(&headers).is_err());
1314 }
1315
1316 #[test]
1317 fn body_kind_rejects_non_chunked_transfer_encoding() {
1318 let headers = vec![("Transfer-Encoding".to_string(), "gzip".to_string())];
1319 assert!(body_kind_from_headers(&headers).is_err());
1320 }
1321
1322 #[test]
1323 fn body_kind_chunked_overrides_content_length() {
1324 let headers = vec![
1326 ("Content-Length".to_string(), "100".to_string()),
1327 ("Transfer-Encoding".to_string(), "chunked".to_string()),
1328 ];
1329 assert!(matches!(
1330 body_kind_from_headers(&headers).unwrap(),
1331 BodyKind::Chunked
1332 ));
1333 }
1334
1335 #[test]
1336 fn body_kind_eof_no_headers() {
1337 let headers: Vec<(String, String)> = Vec::new();
1338 assert!(matches!(
1339 body_kind_from_headers(&headers).unwrap(),
1340 BodyKind::Eof
1341 ));
1342 }
1343
1344 #[test]
1345 fn body_kind_case_insensitive() {
1346 let headers = vec![("content-length".to_string(), "10".to_string())];
1347 assert!(matches!(
1348 body_kind_from_headers(&headers).unwrap(),
1349 BodyKind::ContentLength(10)
1350 ));
1351 }
1352
1353 #[test]
1354 fn body_kind_response_204_without_headers_is_empty() {
1355 let headers: Vec<(String, String)> = Vec::new();
1356 assert!(matches!(
1357 body_kind_from_response(204, &headers).unwrap(),
1358 BodyKind::Empty
1359 ));
1360 }
1361
1362 #[test]
1363 fn body_kind_response_304_ignores_content_length() {
1364 let headers = vec![("Content-Length".to_string(), "7".to_string())];
1365 assert!(matches!(
1366 body_kind_from_response(304, &headers).unwrap(),
1367 BodyKind::Empty
1368 ));
1369 }
1370
1371 #[test]
1372 fn body_kind_response_205_without_headers_is_empty() {
1373 let headers: Vec<(String, String)> = Vec::new();
1374 assert!(matches!(
1375 body_kind_from_response(205, &headers).unwrap(),
1376 BodyKind::Empty
1377 ));
1378 }
1379
1380 #[test]
1382 fn build_request_bytes_get() {
1383 let parsed = ParsedUrl::parse("http://example.com/api/test").unwrap();
1384 let bytes = build_request_bytes(Method::Get, &parsed, "test-agent", &[], &[]);
1385 let text = String::from_utf8(bytes).unwrap();
1386 assert!(text.starts_with("GET /api/test HTTP/1.1\r\n"));
1387 assert!(text.contains("Host: example.com\r\n"));
1388 assert!(text.contains("User-Agent: test-agent\r\n"));
1389 assert!(text.contains("Content-Length: 0\r\n"));
1390 assert!(text.ends_with("\r\n\r\n"));
1391 }
1392
1393 #[test]
1394 fn build_request_bytes_post_with_body() {
1395 let parsed = ParsedUrl::parse("https://api.example.com/v1/messages").unwrap();
1396 let body = b"hello world";
1397 let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1398 let bytes = build_request_bytes(Method::Post, &parsed, "pi/0.1", &headers, body);
1399 let text = String::from_utf8(bytes).unwrap();
1400 assert!(text.starts_with("POST /v1/messages HTTP/1.1\r\n"));
1401 assert!(text.contains("Host: api.example.com\r\n"));
1402 assert!(text.contains("Content-Length: 11\r\n"));
1403 assert!(text.contains("Content-Type: application/json\r\n"));
1404 }
1405
1406 #[test]
1407 fn build_request_bytes_custom_headers() {
1408 let parsed = ParsedUrl::parse("http://localhost/test").unwrap();
1409 let headers = vec![
1410 ("Authorization".to_string(), "Bearer sk-test".to_string()),
1411 ("X-Custom".to_string(), "value".to_string()),
1412 ];
1413 let bytes = build_request_bytes(Method::Post, &parsed, "agent", &headers, &[]);
1414 let text = String::from_utf8(bytes).unwrap();
1415 assert!(text.contains("Authorization: Bearer sk-test\r\n"));
1416 assert!(text.contains("X-Custom: value\r\n"));
1417 }
1418
1419 #[test]
1420 fn build_request_bytes_reserved_headers_are_canonicalized() {
1421 let parsed = ParsedUrl::parse("https://api.example.com/v1/messages").unwrap();
1422 let headers = vec![
1423 ("Host".to_string(), "spoofed.example.com".to_string()),
1424 ("User-Agent".to_string(), "custom-agent".to_string()),
1425 ("Content-Length".to_string(), "999".to_string()),
1426 ("X-Test".to_string(), "1".to_string()),
1427 ];
1428 let body = b"hello";
1429 let bytes = build_request_bytes(Method::Post, &parsed, "default-agent", &headers, body);
1430 let text = String::from_utf8(bytes).unwrap();
1431
1432 assert_eq!(text.matches("Host: ").count(), 1);
1433 assert!(text.contains("Host: api.example.com\r\n"));
1434 assert!(!text.contains("Host: spoofed.example.com\r\n"));
1435
1436 assert_eq!(text.matches("User-Agent: ").count(), 1);
1437 assert!(text.contains("User-Agent: custom-agent\r\n"));
1438 assert!(!text.contains("User-Agent: default-agent\r\n"));
1439
1440 assert_eq!(text.matches("Content-Length: ").count(), 1);
1441 assert!(text.contains("Content-Length: 5\r\n"));
1442 assert!(!text.contains("Content-Length: 999\r\n"));
1443
1444 assert!(text.contains("X-Test: 1\r\n"));
1445 }
1446
1447 #[test]
1448 fn build_request_bytes_non_default_port_includes_port_in_host_header() {
1449 let parsed = ParsedUrl::parse("http://example.com:8080/api/test").unwrap();
1450 let bytes = build_request_bytes(Method::Get, &parsed, "agent", &[], &[]);
1451 let text = String::from_utf8(bytes).unwrap();
1452
1453 assert!(text.contains("Host: example.com:8080\r\n"));
1454 }
1455
1456 #[test]
1457 fn build_request_bytes_sanitizes_overridden_user_agent() {
1458 let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
1459 let headers = vec![(
1460 "User-Agent".to_string(),
1461 "custom-agent\r\nX-Injected: nope".to_string(),
1462 )];
1463 let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
1464 let text = String::from_utf8(bytes).unwrap();
1465
1466 assert!(text.contains("User-Agent: custom-agentX-Injected: nope\r\n"));
1467 assert_eq!(text.matches("User-Agent: ").count(), 1);
1468 assert!(!text.contains("\r\nX-Injected: nope\r\n"));
1469 }
1470
1471 #[test]
1473 fn build_recorded_request_empty_body() {
1474 let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &[], &[]);
1475 assert_eq!(req.method, "POST");
1476 assert_eq!(req.url, "https://api.test.com/v1");
1477 assert!(req.body.is_none());
1478 assert!(req.body_text.is_none());
1479 }
1480
1481 #[test]
1482 fn build_recorded_request_json_body() {
1483 let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1484 let body = serde_json::to_vec(&json!({"model": "test"})).unwrap();
1485 let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, &body);
1486 assert!(req.body.is_some());
1487 assert_eq!(req.body.unwrap()["model"], "test");
1488 assert!(req.body_text.is_none());
1489 }
1490
1491 #[test]
1492 fn build_recorded_request_text_body() {
1493 let headers = vec![("Content-Type".to_string(), "text/plain".to_string())];
1494 let body = b"hello world";
1495 let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, body);
1496 assert!(req.body.is_none());
1497 assert_eq!(req.body_text.as_deref(), Some("hello world"));
1498 }
1499
1500 #[test]
1501 fn build_recorded_request_invalid_json_body_falls_back_to_text() {
1502 let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1503 let body = b"not json {{{";
1504 let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, body);
1505 assert!(req.body.is_none());
1506 assert_eq!(req.body_text.as_deref(), Some("not json {{{"));
1507 }
1508
1509 #[test]
1510 fn build_recorded_request_preserves_headers() {
1511 let headers = vec![
1512 ("Authorization".to_string(), "Bearer key".to_string()),
1513 ("X-Trace".to_string(), "abc123".to_string()),
1514 ];
1515 let req = build_recorded_request(Method::Get, "https://test.com", &headers, &[]);
1516 assert_eq!(req.headers.len(), 2);
1517 assert_eq!(req.headers[0].0, "Authorization");
1518 }
1519
1520 #[test]
1522 fn buffer_new_empty() {
1523 let buf = Buffer::new(Vec::new());
1524 assert!(buf.is_empty());
1525 assert_eq!(buf.len(), 0);
1526 }
1527
1528 #[test]
1529 fn buffer_new_with_data() {
1530 let buf = Buffer::new(vec![1, 2, 3]);
1531 assert!(!buf.is_empty());
1532 assert_eq!(buf.len(), 3);
1533 assert_eq!(buf.available(), &[1, 2, 3]);
1534 }
1535
1536 #[test]
1537 fn buffer_consume_partial() {
1538 let mut buf = Buffer::new(vec![1, 2, 3, 4, 5]);
1539 buf.consume(2);
1540 assert_eq!(buf.len(), 3);
1541 assert_eq!(buf.available(), &[3, 4, 5]);
1542 }
1543
1544 #[test]
1545 fn buffer_consume_all() {
1546 let mut buf = Buffer::new(vec![1, 2, 3]);
1547 buf.consume(3);
1548 assert!(buf.is_empty());
1549 assert_eq!(buf.len(), 0);
1550 }
1551
1552 #[test]
1553 fn buffer_consume_triggers_compact() {
1554 let mut buf = Buffer::new(vec![0; 10]);
1556 buf.consume(6); assert_eq!(buf.len(), 4);
1558 assert_eq!(buf.available().len(), 4);
1559 }
1560
1561 #[test]
1562 fn buffer_extend() {
1563 let mut buf = Buffer::new(vec![1, 2]);
1564 buf.extend(&[3, 4, 5]).unwrap();
1565 assert_eq!(buf.len(), 5);
1566 assert_eq!(buf.available(), &[1, 2, 3, 4, 5]);
1567 }
1568
1569 #[test]
1570 fn buffer_extend_overflow() {
1571 let mut buf = Buffer::new(Vec::new());
1572 let huge = vec![0u8; MAX_BUFFERED_BYTES + 1];
1573 assert!(buf.extend(&huge).is_err());
1574 }
1575
1576 #[test]
1577 fn buffer_split_to_vec() {
1578 let mut buf = Buffer::new(vec![1, 2, 3, 4, 5]);
1579 let out = buf.split_to_vec(3);
1580 assert_eq!(out, vec![1, 2, 3]);
1581 assert_eq!(buf.len(), 2);
1582 assert_eq!(buf.available(), &[4, 5]);
1583 }
1584
1585 #[test]
1586 fn buffer_split_to_vec_more_than_available() {
1587 let mut buf = Buffer::new(vec![1, 2]);
1588 let out = buf.split_to_vec(10);
1589 assert_eq!(out, vec![1, 2]);
1590 assert!(buf.is_empty());
1591 }
1592
1593 #[test]
1594 fn buffer_consume_then_extend() {
1595 let mut buf = Buffer::new(vec![1, 2, 3]);
1596 buf.consume(2);
1597 buf.extend(&[4, 5]).unwrap();
1598 assert_eq!(buf.available(), &[3, 4, 5]);
1600 }
1601
1602 #[test]
1603 fn buffer_consume_exactly_all_clears() {
1604 let mut buf = Buffer::new(vec![1, 2, 3]);
1605 buf.consume(3);
1606 assert!(buf.is_empty());
1608 assert_eq!(buf.available(), &[] as &[u8]);
1609 }
1610
1611 #[test]
1613 fn client_default() {
1614 let client = Client::default();
1615 assert!(client.vcr().is_none());
1616 }
1617
1618 #[test]
1619 fn client_with_vcr() {
1620 let recorder = VcrRecorder::new_with(
1621 "test",
1622 crate::vcr::VcrMode::Playback,
1623 std::path::Path::new("/tmp"),
1624 );
1625 let client = Client::new().with_vcr(recorder);
1626 assert!(client.vcr().is_some());
1627 }
1628
1629 #[test]
1631 fn request_builder_header_chaining() {
1632 let client = Client::new();
1633 let builder = client
1634 .post("https://api.example.com")
1635 .header("Authorization", "Bearer test")
1636 .header("X-Custom", "value");
1637 assert_eq!(builder.headers.len(), 2);
1638 }
1639
1640 #[test]
1641 fn request_builder_header_replaces_case_insensitive_duplicate_names() {
1642 let client = Client::new();
1643 let builder = client
1644 .post("https://api.example.com")
1645 .header("Authorization", "Bearer first")
1646 .header("authorization", "Bearer second");
1647
1648 assert_eq!(builder.headers.len(), 1);
1649 assert!(builder.headers[0].0.eq_ignore_ascii_case("authorization"));
1650 assert_eq!(builder.headers[0].1, "Bearer second");
1651 }
1652
1653 #[test]
1654 fn request_builder_header_bounds_prevent_dos() {
1655 let client = Client::new();
1657 let mut builder = client.post("https://api.example.com");
1658
1659 for i in 0..MAX_REQUEST_HEADERS {
1661 builder = builder.header(format!("X-Header-{i}"), "value");
1662 }
1663 assert_eq!(builder.headers.len(), MAX_REQUEST_HEADERS);
1664
1665 builder = builder
1667 .header("X-Over-Limit-1", "dropped")
1668 .header("X-Over-Limit-2", "also-dropped");
1669 assert_eq!(builder.headers.len(), MAX_REQUEST_HEADERS);
1670
1671 builder = builder.header("X-Header-0", "replaced-value");
1673 assert_eq!(builder.headers.len(), MAX_REQUEST_HEADERS);
1674 assert_eq!(builder.headers[0].1, "replaced-value");
1675 }
1676
1677 #[test]
1678 fn request_builder_json() {
1679 let client = Client::new();
1680 let builder = client
1681 .post("https://api.example.com")
1682 .json(&json!({"key": "value"}))
1683 .unwrap();
1684 assert!(!builder.body.is_empty());
1685 assert!(
1687 builder
1688 .headers
1689 .iter()
1690 .any(|(k, v)| k == "Content-Type" && v == "application/json")
1691 );
1692 }
1693
1694 #[test]
1695 fn request_builder_body() {
1696 let client = Client::new();
1697 let builder = client
1698 .post("https://api.example.com")
1699 .body(b"raw bytes".to_vec());
1700 assert_eq!(builder.body, b"raw bytes");
1701 }
1702
1703 #[test]
1704 fn request_builder_default_timeout() {
1705 let client = Client::new();
1706 let builder = client.get("https://api.example.com");
1707 assert_eq!(builder.timeout, None);
1709 }
1710
1711 #[test]
1712 fn request_builder_timeout() {
1713 let client = Client::new();
1714 let builder = client
1715 .get("https://api.example.com")
1716 .timeout(std::time::Duration::from_secs(30));
1717 assert_eq!(builder.timeout, Some(std::time::Duration::from_secs(30)));
1718 }
1719
1720 #[test]
1721 fn request_builder_no_timeout() {
1722 let client = Client::new();
1723 let builder = client.get("https://api.example.com").no_timeout();
1724 assert_eq!(builder.timeout, None);
1725 }
1726
1727 struct MockRetryWriter {
1728 writes: VecDeque<std::io::Result<usize>>,
1729 flushes: VecDeque<std::io::Result<()>>,
1730 written: Vec<u8>,
1731 }
1732
1733 impl MockRetryWriter {
1734 fn new(
1735 writes: impl IntoIterator<Item = std::io::Result<usize>>,
1736 flushes: impl IntoIterator<Item = std::io::Result<()>>,
1737 ) -> Self {
1738 Self {
1739 writes: writes.into_iter().collect(),
1740 flushes: flushes.into_iter().collect(),
1741 written: Vec::new(),
1742 }
1743 }
1744 }
1745
1746 impl AsyncWrite for MockRetryWriter {
1747 fn poll_write(
1748 mut self: Pin<&mut Self>,
1749 _cx: &mut Context<'_>,
1750 buf: &[u8],
1751 ) -> Poll<std::io::Result<usize>> {
1752 let result = self.writes.pop_front().unwrap_or(Ok(buf.len()));
1753 if let Ok(written) = result {
1754 self.written
1755 .extend_from_slice(&buf[..written.min(buf.len())]);
1756 }
1757 Poll::Ready(result)
1758 }
1759
1760 fn poll_flush(
1761 mut self: Pin<&mut Self>,
1762 _cx: &mut Context<'_>,
1763 ) -> Poll<std::io::Result<()>> {
1764 Poll::Ready(self.flushes.pop_front().unwrap_or(Ok(())))
1765 }
1766
1767 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1768 Poll::Ready(Ok(()))
1769 }
1770 }
1771
1772 #[test]
1773 fn write_all_with_retry_propagates_flush_error_after_zero_write() {
1774 asupersync::test_utils::run_test(|| async {
1775 let mut writer = MockRetryWriter::new(
1776 [Ok(0)],
1777 [Err(std::io::Error::new(
1778 std::io::ErrorKind::BrokenPipe,
1779 "flush failed",
1780 ))],
1781 );
1782
1783 let err = write_all_with_retry(&mut writer, b"hello")
1784 .await
1785 .expect_err("flush failure should not be swallowed");
1786 assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1787 assert_eq!(err.to_string(), "flush failed");
1788 assert!(writer.written.is_empty());
1789 });
1790 }
1791
1792 #[test]
1793 fn write_all_with_retry_recovers_after_zero_write_when_flush_succeeds() {
1794 asupersync::test_utils::run_test(|| async {
1795 let mut writer = MockRetryWriter::new([Ok(0), Ok(2), Ok(3)], [Ok(())]);
1796
1797 write_all_with_retry(&mut writer, b"hello")
1798 .await
1799 .expect("retry helper should recover after transient zero write");
1800 assert_eq!(writer.written, b"hello");
1801 });
1802 }
1803
1804 #[test]
1806 fn response_accessors() {
1807 let response = Response {
1808 status: 200,
1809 headers: vec![("Content-Type".to_string(), "text/plain".to_string())],
1810 stream: Box::pin(futures::stream::empty()),
1811 timeout_info: None,
1812 };
1813 assert_eq!(response.status(), 200);
1814 assert_eq!(response.headers().len(), 1);
1815 assert_eq!(response.headers()[0].0, "Content-Type");
1816 }
1817
1818 #[test]
1819 fn response_text() {
1820 asupersync::test_utils::run_test(|| async {
1821 let chunks = vec![Ok(b"hello ".to_vec()), Ok(b"world".to_vec())];
1822 let response = Response {
1823 status: 200,
1824 headers: Vec::new(),
1825 stream: Box::pin(futures::stream::iter(chunks)),
1826 timeout_info: None,
1827 };
1828 let text = response.text().await.unwrap();
1829 assert_eq!(text, "hello world");
1830 });
1831 }
1832
1833 #[test]
1834 fn response_text_empty() {
1835 asupersync::test_utils::run_test(|| async {
1836 let response = Response {
1837 status: 200,
1838 headers: Vec::new(),
1839 stream: Box::pin(futures::stream::empty()),
1840 timeout_info: None,
1841 };
1842 let text = response.text().await.unwrap();
1843 assert_eq!(text, "");
1844 });
1845 }
1846
1847 #[test]
1848 fn response_bytes_stream() {
1849 asupersync::test_utils::run_test(|| async {
1850 let chunks = vec![Ok(b"data".to_vec())];
1851 let response = Response {
1852 status: 200,
1853 headers: Vec::new(),
1854 stream: Box::pin(futures::stream::iter(chunks)),
1855 timeout_info: None,
1856 };
1857 let mut stream = response.bytes_stream();
1858 let first = stream.next().await.unwrap().unwrap();
1859 assert_eq!(first, b"data");
1860 assert!(stream.next().await.is_none());
1861 });
1862 }
1863
1864 #[test]
1866 fn body_stream_content_length_via_response() {
1867 asupersync::test_utils::run_test(|| async {
1868 let body = b"Hello, World!";
1870 let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(body.to_vec())];
1871 let response = Response {
1872 status: 200,
1873 headers: vec![("Content-Length".to_string(), "13".to_string())],
1874 stream: Box::pin(futures::stream::iter(chunks)),
1875 timeout_info: None,
1876 };
1877 let text = response.text().await.unwrap();
1878 assert_eq!(text, "Hello, World!");
1879 });
1880 }
1881
1882 #[test]
1883 fn body_stream_multiple_chunks_via_response() {
1884 asupersync::test_utils::run_test(|| async {
1885 let chunks: Vec<std::io::Result<Vec<u8>>> = vec![
1886 Ok(b"chunk1".to_vec()),
1887 Ok(b"chunk2".to_vec()),
1888 Ok(b"chunk3".to_vec()),
1889 ];
1890 let response = Response {
1891 status: 200,
1892 headers: Vec::new(),
1893 stream: Box::pin(futures::stream::iter(chunks)),
1894 timeout_info: None,
1895 };
1896 let text = response.text().await.unwrap();
1897 assert_eq!(text, "chunk1chunk2chunk3");
1898 });
1899 }
1900
1901 #[test]
1902 fn body_stream_error_propagation() {
1903 asupersync::test_utils::run_test(|| async {
1904 let chunks: Vec<std::io::Result<Vec<u8>>> = vec![
1905 Ok(b"data".to_vec()),
1906 Err(std::io::Error::new(
1907 std::io::ErrorKind::ConnectionReset,
1908 "connection reset",
1909 )),
1910 ];
1911 let response = Response {
1912 status: 200,
1913 headers: Vec::new(),
1914 stream: Box::pin(futures::stream::iter(chunks)),
1915 timeout_info: None,
1916 };
1917 let result = response.text().await;
1918 assert!(result.is_err());
1919 });
1920 }
1921
1922 #[test]
1924 fn parse_response_head_trims_header_whitespace() {
1925 let head = b"HTTP/1.1 200 OK\r\n X-Padded : value with spaces \r\n\r\n";
1926 let (status, headers) = parse_response_head(head).unwrap();
1927 assert_eq!(status, 200);
1928 assert_eq!(headers[0].0, "X-Padded");
1929 assert_eq!(headers[0].1, "value with spaces");
1930 }
1931
1932 #[test]
1933 fn parse_response_head_status_codes() {
1934 for (code, line) in [
1935 (100, "HTTP/1.1 100 Continue"),
1936 (201, "HTTP/1.1 201 Created"),
1937 (301, "HTTP/1.1 301 Moved Permanently"),
1938 (400, "HTTP/1.1 400 Bad Request"),
1939 (429, "HTTP/1.1 429 Too Many Requests"),
1940 (500, "HTTP/1.1 500 Internal Server Error"),
1941 (503, "HTTP/1.1 503 Service Unavailable"),
1942 ] {
1943 let head = format!("{line}\r\n\r\n");
1944 let (status, _) = parse_response_head(head.as_bytes()).unwrap();
1945 assert_eq!(status, code, "Failed to parse status {code}");
1946 }
1947 }
1948
1949 #[test]
1950 fn body_kind_invalid_content_length_is_error() {
1951 let headers = vec![("Content-Length".to_string(), "not-a-number".to_string())];
1952 assert!(body_kind_from_headers(&headers).is_err());
1953 }
1954
1955 #[test]
1956 fn body_kind_conflicting_content_length_headers_is_error() {
1957 let headers = vec![
1958 ("Content-Length".to_string(), "5".to_string()),
1959 ("content-length".to_string(), "7".to_string()),
1960 ];
1961 assert!(body_kind_from_headers(&headers).is_err());
1962 }
1963
1964 #[test]
1965 fn body_kind_coalesced_identical_content_length_is_accepted() {
1966 let headers = vec![("Content-Length".to_string(), "5, 5".to_string())];
1967 assert!(matches!(
1968 body_kind_from_headers(&headers).unwrap(),
1969 BodyKind::ContentLength(5)
1970 ));
1971 }
1972
1973 #[test]
1974 fn body_kind_coalesced_conflicting_content_length_is_error() {
1975 let headers = vec![("Content-Length".to_string(), "5, 7".to_string())];
1976 assert!(body_kind_from_headers(&headers).is_err());
1977 }
1978
1979 #[test]
1980 fn build_request_bytes_empty_path() {
1981 let parsed = ParsedUrl::parse("http://example.com").unwrap();
1982 let bytes = build_request_bytes(Method::Get, &parsed, "agent", &[], &[]);
1983 let text = String::from_utf8(bytes).unwrap();
1984 assert!(text.starts_with("GET /"));
1986 }
1987
1988 #[test]
1989 fn build_recorded_request_content_type_case_insensitive() {
1990 let headers = vec![("content-type".to_string(), "APPLICATION/JSON".to_string())];
1991 let body = serde_json::to_vec(&json!({"test": true})).unwrap();
1992 let req = build_recorded_request(Method::Post, "https://test.com", &headers, &body);
1993 assert!(req.body.is_some());
1995 }
1996
1997 #[test]
1999 fn sanitize_header_value_strips_crlf() {
2000 assert_eq!(sanitize_header_value("normal value"), "normal value");
2001 assert_eq!(
2002 sanitize_header_value("injected\r\nEvil: header"),
2003 "injectedEvil: header"
2004 );
2005 assert_eq!(sanitize_header_value("bare\nnewline"), "barenewline");
2006 assert_eq!(sanitize_header_value("bare\rreturn"), "barereturn");
2007 assert_eq!(sanitize_header_value(""), "");
2008 }
2009
2010 #[test]
2011 fn build_request_bytes_strips_crlf_from_headers() {
2012 let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2013 let headers = vec![(
2014 "X-Injected\r\nEvil".to_string(),
2015 "value\r\nX-Bad: smuggled".to_string(),
2016 )];
2017 let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
2018 let text = String::from_utf8(bytes).unwrap();
2019 assert!(text.contains("X-InjectedEvil: valueX-Bad: smuggled\r\n"));
2021 assert!(!text.contains("\r\nX-Bad: smuggled\r\n"));
2023 }
2024
2025 #[test]
2026 fn build_request_bytes_strips_invalid_chars_from_header_names() {
2027 let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2028 let headers = vec![("X:Injected Header".to_string(), "value".to_string())];
2029 let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
2030 let text = String::from_utf8(bytes).unwrap();
2031
2032 assert!(text.contains("XInjectedHeader: value\r\n"));
2033 assert!(!text.contains("X:Injected Header: value\r\n"));
2034 }
2035
2036 #[test]
2037 fn build_request_bytes_drops_headers_that_normalize_to_reserved_names() {
2038 let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2039 let headers = vec![
2040 ("Host:".to_string(), "evil.example".to_string()),
2041 ("Content-Length ".to_string(), "999".to_string()),
2042 ("User-Agent:".to_string(), "spoofed".to_string()),
2043 ];
2044 let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
2045 let text = String::from_utf8(bytes).unwrap();
2046
2047 assert!(text.contains("Host: example.com\r\n"));
2048 assert!(text.contains("User-Agent: agent\r\n"));
2049 assert!(text.contains("Content-Length: 0\r\n"));
2050 assert!(!text.contains("Host: evil.example\r\n"));
2051 assert!(!text.contains("Content-Length: 999\r\n"));
2052 assert!(!text.contains("User-Agent: spoofed\r\n"));
2053 }
2054
2055 #[test]
2056 fn build_request_bytes_drops_transfer_encoding_header() {
2057 let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2058 let headers = vec![("Transfer-Encoding".to_string(), "chunked".to_string())];
2059 let body = b"hello";
2060 let bytes = build_request_bytes(Method::Post, &parsed, "agent", &headers, body);
2061 let text = String::from_utf8(bytes).unwrap();
2062
2063 assert!(text.contains("Content-Length: 5\r\n"));
2064 assert!(!text.contains("Transfer-Encoding: chunked\r\n"));
2065 }
2066
2067 #[test]
2069 fn response_text_rejects_oversized_body() {
2070 asupersync::test_utils::run_test(|| async {
2071 let big_chunk = vec![0u8; MAX_TEXT_BODY_BYTES + 1];
2073 let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(big_chunk)];
2074 let response = Response {
2075 status: 200,
2076 headers: Vec::new(),
2077 stream: Box::pin(futures::stream::iter(chunks)),
2078 timeout_info: None,
2079 };
2080 let result = response.text().await;
2081 assert!(result.is_err());
2082 let err_msg = format!("{}", result.unwrap_err());
2083 assert!(
2084 err_msg.contains("too large"),
2085 "error should mention size: {err_msg}"
2086 );
2087 });
2088 }
2089
2090 #[test]
2091 fn response_text_accepts_body_at_limit() {
2092 asupersync::test_utils::run_test(|| async {
2093 let chunk = vec![b'a'; MAX_TEXT_BODY_BYTES];
2094 let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(chunk)];
2095 let response = Response {
2096 status: 200,
2097 headers: Vec::new(),
2098 stream: Box::pin(futures::stream::iter(chunks)),
2099 timeout_info: None,
2100 };
2101 let result = response.text().await;
2102 assert!(result.is_ok());
2103 assert_eq!(result.unwrap().len(), MAX_TEXT_BODY_BYTES);
2104 });
2105 }
2106
2107 #[test]
2110 fn antigravity_user_agent_format() {
2111 let version = "1.2.3";
2113 let ua = format!("{DEFAULT_USER_AGENT} Antigravity/{version}");
2114 assert!(ua.starts_with("pi_agent_rust/"));
2115 assert!(ua.contains("Antigravity/1.2.3"));
2116
2117 assert!(DEFAULT_USER_AGENT.starts_with("pi_agent_rust/"));
2119 }
2120
2121 #[test]
2122 fn antigravity_user_agent_in_request_headers() {
2123 let ua = format!("{DEFAULT_USER_AGENT} Antigravity/42.0");
2125 let parsed = ParsedUrl::parse("http://example.com/api").unwrap();
2126 let bytes = build_request_bytes(Method::Get, &parsed, &ua, &[], &[]);
2127 let text = String::from_utf8(bytes).unwrap();
2128 assert!(text.contains(&format!("User-Agent: {ua}\r\n")));
2129 }
2130}