Skip to main content

pi/http/
client.rs

1//! Minimal streaming HTTP client for Pi.
2//!
3//! This is intentionally small and purpose-built for provider streaming (SSE).
4//! It avoids Node/Bun-style ambient APIs and is designed to pair with
5//! asupersync for TLS + cancel-correctness.
6
7use crate::error::{Error, Result};
8use crate::vcr::{RecordedRequest, VcrRecorder};
9use asupersync::http::h1::ParsedUrl;
10use asupersync::http::h1::http_client::Scheme;
11use asupersync::io::ext::AsyncWriteExt;
12use asupersync::io::{AsyncRead, AsyncWrite, ReadBuf};
13use asupersync::net::tcp::stream::TcpStream;
14use asupersync::tls::{TlsConnector, TlsConnectorBuilder};
15use futures::Stream;
16use futures::StreamExt;
17use futures::TryStreamExt;
18use futures::stream::{self, BoxStream};
19use std::pin::Pin;
20#[cfg(not(test))]
21use std::sync::OnceLock;
22use std::task::{Context, Poll};
23
24const DEFAULT_USER_AGENT: &str = concat!("pi_agent_rust/", env!("CARGO_PKG_VERSION"));
25const ANTIGRAVITY_VERSION_ENV: &str = "PI_AI_ANTIGRAVITY_VERSION";
26const MAX_HEADER_BYTES: usize = 64 * 1024;
27const READ_CHUNK_BYTES: usize = 16 * 1024;
28const MAX_BUFFERED_BYTES: usize = 256 * 1024;
29const MAX_TEXT_BODY_BYTES: usize = 50 * 1024 * 1024;
30/// Maximum number of outbound request headers to prevent DoS.
31const MAX_REQUEST_HEADERS: usize = 100;
32
33/// Maximum number of consecutive `Ok(0)` returns from `poll_write` before we
34/// give up and surface `ErrorKind::WriteZero`.  TLS transports can temporarily
35/// return 0 when internal buffers are full; a short backoff usually unblocks
36/// the next write.
37const WRITE_ZERO_MAX_RETRIES: usize = 10;
38
39/// Initial backoff duration when a write returns `Ok(0)`.
40const WRITE_ZERO_BACKOFF: std::time::Duration = std::time::Duration::from_millis(10);
41#[cfg(not(test))]
42const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 60;
43
44fn default_request_timeout_from_env() -> Option<std::time::Duration> {
45    #[cfg(test)]
46    {
47        // Disable timeouts in unit tests to prevent `asupersync`'s virtual timer
48        // from instantly fast-forwarding and failing mock server requests.
49        None
50    }
51
52    #[cfg(not(test))]
53    {
54        static REQUEST_TIMEOUT: OnceLock<Option<std::time::Duration>> = OnceLock::new();
55        *REQUEST_TIMEOUT.get_or_init(|| {
56            let timeout_secs = std::env::var("PI_HTTP_REQUEST_TIMEOUT_SECS")
57                .ok()
58                .and_then(|raw| raw.trim().parse::<u64>().ok())
59                .unwrap_or(DEFAULT_REQUEST_TIMEOUT_SECS);
60            if timeout_secs == 0 {
61                None
62            } else {
63                Some(std::time::Duration::from_secs(timeout_secs))
64            }
65        })
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct Client {
71    tls: std::result::Result<TlsConnector, String>,
72    user_agent: String,
73    vcr: Option<VcrRecorder>,
74}
75
76impl Client {
77    #[must_use]
78    pub fn new() -> Self {
79        let tls = TlsConnectorBuilder::new()
80            .with_native_roots()
81            .and_then(|builder| builder.alpn_protocols(vec![b"http/1.1".to_vec()]).build())
82            .map_err(|e| e.to_string());
83
84        let user_agent = std::env::var(ANTIGRAVITY_VERSION_ENV).map_or_else(
85            |_| DEFAULT_USER_AGENT.to_string(),
86            |v| format!("{DEFAULT_USER_AGENT} Antigravity/{v}"),
87        );
88
89        Self {
90            tls,
91            user_agent,
92            vcr: None,
93        }
94    }
95
96    pub fn post(&self, url: &str) -> RequestBuilder<'_> {
97        RequestBuilder::new(self, Method::Post, url)
98    }
99
100    pub fn get(&self, url: &str) -> RequestBuilder<'_> {
101        RequestBuilder::new(self, Method::Get, url)
102    }
103
104    #[must_use]
105    pub fn with_vcr(mut self, recorder: VcrRecorder) -> Self {
106        self.vcr = Some(recorder);
107        self
108    }
109
110    pub const fn vcr(&self) -> Option<&VcrRecorder> {
111        self.vcr.as_ref()
112    }
113}
114
115impl Default for Client {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121#[derive(Debug, Clone, Copy)]
122enum Method {
123    Get,
124    Post,
125}
126
127impl Method {
128    const fn as_str(self) -> &'static str {
129        match self {
130            Self::Get => "GET",
131            Self::Post => "POST",
132        }
133    }
134}
135
136pub struct RequestBuilder<'a> {
137    client: &'a Client,
138    method: Method,
139    url: String,
140    headers: Vec<(String, String)>,
141    body: Vec<u8>,
142    timeout: Option<std::time::Duration>,
143}
144
145impl<'a> RequestBuilder<'a> {
146    fn new(client: &'a Client, method: Method, url: &str) -> Self {
147        Self {
148            client,
149            method,
150            url: url.to_string(),
151            headers: Vec::new(),
152            body: Vec::new(),
153            timeout: default_request_timeout_from_env(),
154        }
155    }
156
157    #[must_use]
158    pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
159        let key = key.into();
160        let value = value.into();
161        if let Some((existing_key, existing_value)) = self
162            .headers
163            .iter_mut()
164            .find(|(existing_key, _)| existing_key.eq_ignore_ascii_case(&key))
165        {
166            *existing_key = key;
167            *existing_value = value;
168        } else if self.headers.len() < MAX_REQUEST_HEADERS {
169            self.headers.push((key, value));
170        }
171        // Silently drop headers beyond the limit to prevent DoS
172        self
173    }
174
175    #[must_use]
176    pub const fn timeout(mut self, duration: std::time::Duration) -> Self {
177        self.timeout = Some(duration);
178        self
179    }
180
181    /// Remove the timeout entirely. Use for requests that are expected to take
182    /// an arbitrarily long time (e.g. long-polling SSE streams).
183    #[must_use]
184    pub const fn no_timeout(mut self) -> Self {
185        self.timeout = None;
186        self
187    }
188
189    /// Set raw body bytes.
190    #[must_use]
191    pub fn body(mut self, body: Vec<u8>) -> Self {
192        self.body = body;
193        self
194    }
195
196    pub fn json<T: serde::Serialize>(mut self, payload: &T) -> Result<Self> {
197        self = self.header("Content-Type", "application/json");
198        self.body = serde_json::to_vec(payload)?;
199        Ok(self)
200    }
201
202    pub async fn send(self) -> Result<Response> {
203        let RequestBuilder {
204            client,
205            method,
206            url,
207            headers,
208            body,
209            timeout,
210        } = self;
211
212        if let Some(recorder) = client.vcr() {
213            let recorded_request = build_recorded_request(method, &url, &headers, &body);
214            let recorded = recorder
215                .request_streaming_with(recorded_request, || async {
216                    let (status, response_headers, stream) =
217                        send_parts(client, method, &url, &headers, &body).await?;
218                    Ok((status, response_headers, stream))
219                })
220                .await?;
221            let status = recorded.status;
222            let response_headers = recorded.headers.clone();
223            let stream = recorded.into_byte_stream();
224            return Ok(Response {
225                status,
226                headers: response_headers,
227                stream,
228                timeout_info: None,
229            });
230        }
231
232        let send_fut = send_parts(client, method, &url, &headers, &body);
233
234        let (status, response_headers, stream, timeout_info) = if let Some(duration) = timeout {
235            use asupersync::time::{sleep, wall_now};
236            use futures::future::{Either, FutureExt, select};
237
238            let asupersync_now = asupersync::Cx::current()
239                .and_then(|cx| cx.timer_driver())
240                .map_or_else(wall_now, |timer| timer.now());
241
242            let sleep_fut = sleep(asupersync_now, duration).fuse();
243            let send_fut = send_fut.fuse();
244            futures::pin_mut!(sleep_fut, send_fut);
245
246            let (status, response_headers, stream) = match select(send_fut, sleep_fut).await {
247                Either::Left((res, _)) => res?,
248                Either::Right(_) => return Err(Error::api("Request timed out")),
249            };
250            (
251                status,
252                response_headers,
253                stream,
254                Some((asupersync_now, duration)),
255            )
256        } else {
257            let (status, response_headers, stream) = send_fut.await?;
258            (status, response_headers, stream, None)
259        };
260
261        Ok(Response {
262            status,
263            headers: response_headers,
264            stream,
265            timeout_info,
266        })
267    }
268}
269
270/// Like `write_all`, but retries on `Ok(0)` with exponential backoff instead
271/// of immediately failing with `ErrorKind::WriteZero`.
272///
273/// TLS transports (and, less commonly, TCP under memory pressure) can return
274/// `Ok(0)` from `write()` when internal buffers are temporarily full.  The
275/// standard `write_all` implementation treats this as an unrecoverable error,
276/// which causes spurious "IO error: write zero" failures — especially for
277/// large request bodies such as resumed session contexts.
278async fn write_all_with_retry<W: AsyncWrite + Unpin>(
279    writer: &mut W,
280    mut buf: &[u8],
281) -> std::io::Result<()> {
282    use asupersync::time::{sleep, wall_now};
283
284    let mut consecutive_zeros: usize = 0;
285    let mut backoff = WRITE_ZERO_BACKOFF;
286
287    while !buf.is_empty() {
288        let n = futures::future::poll_fn(|cx| Pin::new(&mut *writer).poll_write(cx, buf)).await?;
289
290        if n == 0 {
291            consecutive_zeros += 1;
292            if consecutive_zeros > WRITE_ZERO_MAX_RETRIES {
293                return Err(std::io::Error::new(
294                    std::io::ErrorKind::WriteZero,
295                    format!(
296                        "transport returned Ok(0) {} consecutive times ({} bytes remaining)",
297                        consecutive_zeros,
298                        buf.len(),
299                    ),
300                ));
301            }
302            tracing::debug!(
303                attempt = consecutive_zeros,
304                remaining = buf.len(),
305                backoff_ms = backoff.as_millis(),
306                "write returned Ok(0), backing off before retry"
307            );
308
309            // Flushing the writer is crucial when TLS buffers are full, otherwise
310            // we will sleep and retry without any progress being made. If flush
311            // itself fails, surface that real transport error immediately rather
312            // than misreporting the retry loop as a generic write-zero failure.
313            futures::future::poll_fn(|cx| Pin::new(&mut *writer).poll_flush(cx)).await?;
314
315            let now = asupersync::Cx::current()
316                .and_then(|cx| cx.timer_driver())
317                .map_or_else(wall_now, |timer| timer.now());
318            sleep(now, backoff).await;
319
320            // Exponential backoff: 10ms, 20ms, 40ms, …
321            backoff = backoff.saturating_mul(2);
322        } else {
323            // Successful partial write — advance the buffer and reset retry state.
324            buf = &buf[n..];
325            consecutive_zeros = 0;
326            backoff = WRITE_ZERO_BACKOFF;
327        }
328    }
329    Ok(())
330}
331
332async fn send_parts(
333    client: &Client,
334    method: Method,
335    url: &str,
336    headers: &[(String, String)],
337    body: &[u8],
338) -> Result<(
339    u16,
340    Vec<(String, String)>,
341    BoxStream<'static, std::io::Result<Vec<u8>>>,
342)> {
343    let parsed = ParsedUrl::parse(url).map_err(|e| Error::api(format!("Invalid URL: {e}")))?;
344    let mut transport = connect_transport(&parsed, client).await?;
345
346    let request_bytes = build_request_bytes(method, &parsed, &client.user_agent, headers, body);
347    write_all_with_retry(&mut transport, &request_bytes).await?;
348    if !body.is_empty() {
349        write_all_with_retry(&mut transport, body).await?;
350    }
351    transport.flush().await?;
352
353    let (status, response_headers, leftover) = Box::pin(read_response_head(&mut transport)).await?;
354    let body_kind = body_kind_from_response(status, &response_headers)?;
355
356    let state = BodyStreamState::new(transport, body_kind, leftover);
357    let stream = stream::try_unfold(state, |mut state| async move {
358        match Box::pin(state.next_bytes()).await {
359            Ok(Some(chunk)) => Ok(Some((chunk, state))),
360            Ok(None) => {
361                state.shutdown_transport_best_effort().await;
362                Ok(None)
363            }
364            Err(err) => {
365                state.shutdown_transport_best_effort().await;
366                Err(err)
367            }
368        }
369    })
370    .boxed();
371
372    Ok((status, response_headers, stream))
373}
374
375fn build_recorded_request(
376    method: Method,
377    url: &str,
378    headers: &[(String, String)],
379    body: &[u8],
380) -> RecordedRequest {
381    let mut body_value = None;
382    let mut body_text = None;
383
384    if !body.is_empty() {
385        let is_json = headers.iter().any(|(name, value)| {
386            name.eq_ignore_ascii_case("content-type")
387                && value.to_ascii_lowercase().contains("application/json")
388        });
389
390        if is_json {
391            match serde_json::from_slice::<serde_json::Value>(body) {
392                Ok(value) => body_value = Some(value),
393                Err(_) => body_text = Some(String::from_utf8_lossy(body).to_string()),
394            }
395        } else {
396            body_text = Some(String::from_utf8_lossy(body).to_string());
397        }
398    }
399
400    RecordedRequest {
401        method: method.as_str().to_string(),
402        url: url.to_string(),
403        headers: headers.to_vec(),
404        body: body_value,
405        body_text,
406    }
407}
408
409pub struct Response {
410    status: u16,
411    headers: Vec<(String, String)>,
412    stream: Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>>,
413    timeout_info: Option<(asupersync::Time, std::time::Duration)>,
414}
415
416fn wrap_stream_with_idle_timeout(
417    stream: Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>>,
418    timeout_info: Option<(asupersync::Time, std::time::Duration)>,
419) -> Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>> {
420    let Some((start_time, timeout)) = timeout_info else {
421        return stream;
422    };
423
424    Box::pin(futures::stream::unfold(
425        (stream, start_time, timeout),
426        |(mut stream, mut last_activity, timeout)| async move {
427            use asupersync::time::{sleep, wall_now};
428            use futures::future::{Either, FutureExt, select};
429
430            let asupersync_now = asupersync::Cx::current()
431                .and_then(|cx| cx.timer_driver())
432                .map_or_else(wall_now, |timer| timer.now());
433
434            let elapsed =
435                std::time::Duration::from_nanos(asupersync_now.duration_since(last_activity));
436            if elapsed >= timeout {
437                return Some((
438                    Err(std::io::Error::other(
439                        "Request timed out reading body stream",
440                    )),
441                    (stream, last_activity, timeout),
442                ));
443            }
444
445            let remaining = timeout.checked_sub(elapsed).unwrap_or_default();
446            let sleep_fut = sleep(asupersync_now, remaining).fuse();
447            let next_fut = stream.next().fuse();
448            futures::pin_mut!(sleep_fut, next_fut);
449
450            match select(next_fut, sleep_fut).await {
451                Either::Left((Some(res), _)) => {
452                    let now = asupersync::Cx::current()
453                        .and_then(|cx| cx.timer_driver())
454                        .map_or_else(wall_now, |timer| timer.now());
455                    last_activity = now;
456                    Some((res, (stream, last_activity, timeout)))
457                }
458                Either::Left((None, _)) => None,
459                Either::Right(_) => Some((
460                    Err(std::io::Error::other(
461                        "Request timed out reading body stream",
462                    )),
463                    (stream, last_activity, timeout),
464                )),
465            }
466        },
467    ))
468}
469
470impl Response {
471    #[must_use]
472    pub const fn status(&self) -> u16 {
473        self.status
474    }
475
476    #[must_use]
477    pub fn headers(&self) -> &[(String, String)] {
478        &self.headers
479    }
480
481    #[must_use]
482    pub fn bytes_stream(self) -> Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>> {
483        wrap_stream_with_idle_timeout(self.stream, self.timeout_info)
484    }
485
486    pub async fn text(self) -> Result<String> {
487        let stream = wrap_stream_with_idle_timeout(self.stream, self.timeout_info);
488        let bytes = stream
489            .try_fold(Vec::new(), |mut acc, chunk| async move {
490                if acc.len().saturating_add(chunk.len()) > MAX_TEXT_BODY_BYTES {
491                    return Err(std::io::Error::other("response body too large"));
492                }
493                acc.extend_from_slice(&chunk);
494                Ok::<_, std::io::Error>(acc)
495            })
496            .await
497            .map_err(|err| {
498                if err.kind() == std::io::ErrorKind::Other
499                    && err
500                        .to_string()
501                        .contains("Request timed out reading body stream")
502                {
503                    Error::api("Request timed out reading body")
504                } else {
505                    Error::from(err)
506                }
507            })?;
508
509        match String::from_utf8(bytes) {
510            Ok(s) => Ok(s),
511            Err(e) => Ok(String::from_utf8_lossy(e.as_bytes()).into_owned()),
512        }
513    }
514}
515
516async fn connect_transport(parsed: &ParsedUrl, client: &Client) -> Result<Transport> {
517    let addr = (parsed.host.clone(), parsed.port);
518    let tcp = TcpStream::connect(addr).await?;
519    match parsed.scheme {
520        Scheme::Http => Ok(Transport::Tcp(tcp)),
521        Scheme::Https => {
522            let tls = client
523                .tls
524                .as_ref()
525                .map_err(|e| Error::api(format!("TLS configuration error: {e}")))?;
526            let tls_stream = tls
527                .clone()
528                .connect(&parsed.host, tcp)
529                .await
530                .map_err(|e| Error::api(format!("TLS connect failed: {e}")))?;
531            Ok(Transport::Tls(Box::new(tls_stream)))
532        }
533    }
534}
535
536/// Strip CR/LF from header values to prevent HTTP header injection.
537fn sanitize_header_value(value: &str) -> String {
538    value.chars().filter(|&c| c != '\r' && c != '\n').collect()
539}
540
541/// Preserve only RFC 9110 token characters in outbound header names.
542fn sanitize_header_name(name: &str) -> String {
543    name.bytes()
544        .filter(|b| {
545            b.is_ascii_alphanumeric()
546                || matches!(
547                    *b,
548                    b'!' | b'#'
549                        | b'$'
550                        | b'%'
551                        | b'&'
552                        | b'\''
553                        | b'*'
554                        | b'+'
555                        | b'-'
556                        | b'.'
557                        | b'^'
558                        | b'_'
559                        | b'`'
560                        | b'|'
561                        | b'~'
562                )
563        })
564        .map(char::from)
565        .collect()
566}
567
568fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> {
569    headers.iter().rev().find_map(|(key, value)| {
570        if key.eq_ignore_ascii_case(name) {
571            Some(value.as_str())
572        } else {
573            None
574        }
575    })
576}
577
578fn build_request_bytes(
579    method: Method,
580    parsed: &ParsedUrl,
581    user_agent: &str,
582    headers: &[(String, String)],
583    body: &[u8],
584) -> Vec<u8> {
585    let mut out = String::new();
586    let effective_user_agent =
587        sanitize_header_value(header_value(headers, "user-agent").unwrap_or(user_agent));
588    let host_header = host_header_value(parsed);
589    let _ = std::fmt::Write::write_fmt(
590        &mut out,
591        format_args!("{} {} HTTP/1.1\r\n", method.as_str(), parsed.path),
592    );
593    let _ = std::fmt::Write::write_fmt(&mut out, format_args!("Host: {host_header}\r\n"));
594    let _ = std::fmt::Write::write_fmt(
595        &mut out,
596        format_args!("User-Agent: {effective_user_agent}\r\n"),
597    );
598    let _ =
599        std::fmt::Write::write_fmt(&mut out, format_args!("Content-Length: {}\r\n", body.len()));
600
601    for (name, value) in headers {
602        let clean_name = sanitize_header_name(name);
603        if clean_name.is_empty()
604            || clean_name.eq_ignore_ascii_case("host")
605            || clean_name.eq_ignore_ascii_case("user-agent")
606            || clean_name.eq_ignore_ascii_case("content-length")
607            // This client only emits fixed-length request bodies, so
608            // caller-supplied transfer codings would lie about the wire format.
609            || clean_name.eq_ignore_ascii_case("transfer-encoding")
610        {
611            continue;
612        }
613        let clean_value = sanitize_header_value(value);
614        let _ =
615            std::fmt::Write::write_fmt(&mut out, format_args!("{clean_name}: {clean_value}\r\n"));
616    }
617
618    out.push_str("\r\n");
619    out.into_bytes()
620}
621
622fn host_header_value(parsed: &ParsedUrl) -> String {
623    let host = if parsed.host.contains(':') && !parsed.host.starts_with('[') {
624        format!("[{}]", parsed.host)
625    } else {
626        parsed.host.clone()
627    };
628
629    let default_port = match parsed.scheme {
630        Scheme::Http => 80,
631        Scheme::Https => 443,
632    };
633
634    if parsed.port == default_port {
635        host
636    } else {
637        format!("{host}:{}", parsed.port)
638    }
639}
640
641async fn read_response_head(
642    transport: &mut Transport,
643) -> Result<(u16, Vec<(String, String)>, Vec<u8>)> {
644    let mut buf = Vec::with_capacity(8192);
645    let mut scratch = [0u8; READ_CHUNK_BYTES];
646    let mut search_start = 0;
647
648    loop {
649        if buf.len() > MAX_HEADER_BYTES {
650            return Err(Error::api("HTTP response headers too large"));
651        }
652
653        let haystack = &buf[search_start..];
654        if let Some(pos) = find_headers_end(haystack) {
655            let absolute_pos = search_start + pos;
656            let head = &buf[..absolute_pos];
657            let leftover = buf[absolute_pos..].to_vec();
658            let (status, headers) = parse_response_head(head)?;
659            return Ok((status, headers, leftover));
660        }
661
662        let n = read_some(transport, &mut scratch).await?;
663        if n == 0 {
664            return Err(Error::api("HTTP connection closed before headers"));
665        }
666        let old_len = buf.len();
667        buf.extend_from_slice(&scratch[..n]);
668        search_start = old_len.saturating_sub(3);
669    }
670}
671
672fn find_headers_end(buf: &[u8]) -> Option<usize> {
673    for i in 0..buf.len().saturating_sub(1) {
674        if buf[i..].starts_with(b"\r\n\r\n") {
675            return Some(i + 4);
676        }
677        if buf[i..].starts_with(b"\n\n") {
678            return Some(i + 2);
679        }
680    }
681    None
682}
683
684fn parse_response_head(head: &[u8]) -> Result<(u16, Vec<(String, String)>)> {
685    let text =
686        std::str::from_utf8(head).map_err(|e| Error::api(format!("Invalid HTTP headers: {e}")))?;
687    let mut lines = text.lines();
688
689    let status_line = lines
690        .next()
691        .ok_or_else(|| Error::api("Missing HTTP status line"))?;
692    let mut parts = status_line.split_whitespace();
693    let _version = parts
694        .next()
695        .ok_or_else(|| Error::api("Invalid HTTP status line"))?;
696    let status_str = parts
697        .next()
698        .ok_or_else(|| Error::api("Invalid HTTP status line"))?;
699    let status: u16 = status_str
700        .parse()
701        .map_err(|_| Error::api("Invalid HTTP status code"))?;
702
703    let mut headers = Vec::new();
704    for line in lines {
705        if line.is_empty() {
706            continue;
707        }
708        let (name, value) = line
709            .split_once(':')
710            .ok_or_else(|| Error::api("Invalid HTTP header line"))?;
711        headers.push((name.trim().to_string(), value.trim().to_string()));
712    }
713
714    Ok((status, headers))
715}
716
717#[derive(Debug, Clone, Copy)]
718enum BodyKind {
719    Empty,
720    ContentLength(usize),
721    Chunked,
722    Eof,
723}
724
725fn body_kind_from_response(status: u16, headers: &[(String, String)]) -> Result<BodyKind> {
726    if matches!(status, 100..=199 | 204 | 205 | 304) {
727        return Ok(BodyKind::Empty);
728    }
729    body_kind_from_headers(headers)
730}
731
732fn body_kind_from_headers(headers: &[(String, String)]) -> Result<BodyKind> {
733    let mut content_length = None;
734    let mut transfer_encodings = Vec::new();
735    let mut saw_transfer_encoding = false;
736
737    for (name, value) in headers {
738        let name_lc = name.to_ascii_lowercase();
739        if name_lc == "content-length" {
740            for part in value.split(',') {
741                let parsed = part
742                    .trim()
743                    .parse::<usize>()
744                    .map_err(|_| Error::api("Invalid HTTP Content-Length header"))?;
745                if let Some(existing) = content_length {
746                    if existing != parsed {
747                        return Err(Error::api("Conflicting HTTP Content-Length headers"));
748                    }
749                } else {
750                    content_length = Some(parsed);
751                }
752            }
753        } else if name_lc == "transfer-encoding" {
754            saw_transfer_encoding = true;
755            transfer_encodings.extend(
756                value
757                    .split(',')
758                    .map(str::trim)
759                    .filter(|value| !value.is_empty())
760                    .map(str::to_ascii_lowercase),
761            );
762        }
763    }
764
765    if saw_transfer_encoding {
766        let Some(last) = transfer_encodings.last() else {
767            return Err(Error::api("Invalid HTTP Transfer-Encoding header"));
768        };
769        if last != "chunked" {
770            return Err(Error::api("Unsupported HTTP Transfer-Encoding header"));
771        }
772        if transfer_encodings.len() != 1 {
773            return Err(Error::api("Unsupported HTTP Transfer-Encoding header"));
774        }
775        return Ok(BodyKind::Chunked);
776    }
777
778    Ok(match content_length {
779        Some(0) => BodyKind::Empty,
780        Some(n) => BodyKind::ContentLength(n),
781        None => BodyKind::Eof,
782    })
783}
784
785struct Buffer {
786    bytes: Vec<u8>,
787    pos: usize,
788}
789
790impl Buffer {
791    const fn new(initial: Vec<u8>) -> Self {
792        Self {
793            bytes: initial,
794            pos: 0,
795        }
796    }
797
798    fn available(&self) -> &[u8] {
799        &self.bytes[self.pos..]
800    }
801
802    fn len(&self) -> usize {
803        self.available().len()
804    }
805
806    fn is_empty(&self) -> bool {
807        self.len() == 0
808    }
809
810    fn consume(&mut self, n: usize) {
811        self.pos = self.pos.saturating_add(n).min(self.bytes.len());
812        if self.pos == self.bytes.len() {
813            self.bytes.clear();
814            self.pos = 0;
815        } else if self.pos > 0 && self.pos >= self.bytes.len() / 2 {
816            self.bytes.drain(..self.pos);
817            self.pos = 0;
818        }
819    }
820
821    fn extend(&mut self, data: &[u8]) -> Result<()> {
822        if self.bytes.len().saturating_add(data.len()) > MAX_BUFFERED_BYTES {
823            return Err(Error::api("HTTP body buffer exceeded"));
824        }
825        self.bytes.extend_from_slice(data);
826        Ok(())
827    }
828
829    fn split_to_vec(&mut self, n: usize) -> Vec<u8> {
830        let n = n.min(self.len());
831        let out = self.available()[..n].to_vec();
832        self.consume(n);
833        out
834    }
835}
836
837enum ChunkedState {
838    SizeLine,
839    Data { remaining: usize },
840    DataCrlf,
841    Trailers,
842    Done,
843}
844
845struct BodyStreamState {
846    transport: Transport,
847    kind: BodyKind,
848    buf: Buffer,
849    chunked_state: ChunkedState,
850    remaining: usize,
851    transport_closed: bool,
852}
853
854impl BodyStreamState {
855    const fn new(transport: Transport, kind: BodyKind, leftover: Vec<u8>) -> Self {
856        let remaining = match kind {
857            BodyKind::ContentLength(n) => n,
858            _ => 0,
859        };
860        Self {
861            transport,
862            kind,
863            buf: Buffer::new(leftover),
864            chunked_state: ChunkedState::SizeLine,
865            remaining,
866            transport_closed: false,
867        }
868    }
869
870    async fn next_bytes(&mut self) -> std::io::Result<Option<Vec<u8>>> {
871        match self.kind {
872            BodyKind::Empty => Ok(None),
873            BodyKind::Eof => Box::pin(self.next_eof()).await,
874            BodyKind::ContentLength(_) => Box::pin(self.next_content_length()).await,
875            BodyKind::Chunked => Box::pin(self.next_chunked()).await,
876        }
877    }
878
879    async fn shutdown_transport_best_effort(&mut self) {
880        if self.transport_closed {
881            return;
882        }
883        self.transport_closed = true;
884        let _ = self.transport.shutdown().await;
885    }
886
887    async fn read_more(&mut self) -> std::io::Result<usize> {
888        let mut scratch = [0u8; READ_CHUNK_BYTES];
889        let n = read_some(&mut self.transport, &mut scratch).await?;
890        if n > 0 {
891            if let Err(err) = self.buf.extend(&scratch[..n]) {
892                return Err(std::io::Error::other(err.to_string()));
893            }
894        }
895        Ok(n)
896    }
897
898    async fn next_eof(&mut self) -> std::io::Result<Option<Vec<u8>>> {
899        if !self.buf.is_empty() {
900            return Ok(Some(self.buf.split_to_vec(self.buf.len())));
901        }
902
903        let n = Box::pin(self.read_more()).await?;
904        if n == 0 {
905            return Ok(None);
906        }
907        Ok(Some(self.buf.split_to_vec(self.buf.len())))
908    }
909
910    async fn next_content_length(&mut self) -> std::io::Result<Option<Vec<u8>>> {
911        if self.remaining == 0 {
912            return Ok(None);
913        }
914
915        if self.buf.is_empty() {
916            let n = Box::pin(self.read_more()).await?;
917            if n == 0 {
918                return Err(std::io::Error::new(
919                    std::io::ErrorKind::UnexpectedEof,
920                    "unexpected EOF reading content-length body",
921                ));
922            }
923        }
924
925        let to_take = self.remaining.min(self.buf.len()).min(READ_CHUNK_BYTES);
926        let out = self.buf.split_to_vec(to_take);
927        self.remaining = self.remaining.saturating_sub(out.len());
928        Ok(Some(out))
929    }
930
931    #[allow(clippy::too_many_lines)]
932    async fn next_chunked(&mut self) -> std::io::Result<Option<Vec<u8>>> {
933        loop {
934            match self.chunked_state {
935                ChunkedState::SizeLine => {
936                    if let Some((line_end, len)) = find_crlf(self.buf.available()) {
937                        let line = &self.buf.available()[..line_end];
938                        let line_str = std::str::from_utf8(line).map_err(std::io::Error::other)?;
939                        let size_part = line_str.split(';').next().unwrap_or("").trim();
940                        if size_part.is_empty() {
941                            return Err(std::io::Error::other("invalid chunk size"));
942                        }
943                        let chunk_size = usize::from_str_radix(size_part, 16)
944                            .map_err(|_| std::io::Error::other("invalid chunk size"))?;
945                        self.buf.consume(line_end + len);
946                        if chunk_size == 0 {
947                            self.chunked_state = ChunkedState::Trailers;
948                        } else {
949                            self.chunked_state = ChunkedState::Data {
950                                remaining: chunk_size,
951                            };
952                        }
953                        continue;
954                    }
955
956                    let n = Box::pin(self.read_more()).await?;
957                    if n == 0 {
958                        return Err(std::io::Error::new(
959                            std::io::ErrorKind::UnexpectedEof,
960                            "unexpected EOF reading chunk size",
961                        ));
962                    }
963                }
964
965                ChunkedState::Data { remaining } => {
966                    if remaining == 0 {
967                        self.chunked_state = ChunkedState::DataCrlf;
968                        continue;
969                    }
970
971                    if self.buf.is_empty() {
972                        let n = Box::pin(self.read_more()).await?;
973                        if n == 0 {
974                            return Err(std::io::Error::new(
975                                std::io::ErrorKind::UnexpectedEof,
976                                "unexpected EOF reading chunk data",
977                            ));
978                        }
979                    }
980
981                    let to_take = remaining.min(self.buf.len()).min(READ_CHUNK_BYTES);
982                    let out = self.buf.split_to_vec(to_take);
983                    self.chunked_state = ChunkedState::Data {
984                        remaining: remaining.saturating_sub(out.len()),
985                    };
986                    return Ok(Some(out));
987                }
988
989                ChunkedState::DataCrlf => {
990                    if self.buf.len() < 2 {
991                        let n = Box::pin(self.read_more()).await?;
992                        if n == 0 && self.buf.is_empty() {
993                            return Err(std::io::Error::new(
994                                std::io::ErrorKind::UnexpectedEof,
995                                "unexpected EOF reading chunk CRLF",
996                            ));
997                        }
998                        // Continue to let starts_with handle single byte \n or full \r\n
999                    }
1000
1001                    let bytes = self.buf.available();
1002                    if bytes.starts_with(b"\r\n") {
1003                        self.buf.consume(2);
1004                        self.chunked_state = ChunkedState::SizeLine;
1005                    } else if bytes.starts_with(b"\n") {
1006                        self.buf.consume(1);
1007                        self.chunked_state = ChunkedState::SizeLine;
1008                    } else if bytes.len() >= 2 {
1009                        return Err(std::io::Error::other("invalid chunk CRLF"));
1010                    } else {
1011                        // wait for more data
1012                        let n = Box::pin(self.read_more()).await?;
1013                        if n == 0 {
1014                            return Err(std::io::Error::new(
1015                                std::io::ErrorKind::UnexpectedEof,
1016                                "unexpected EOF reading chunk CRLF",
1017                            ));
1018                        }
1019                    }
1020                }
1021
1022                ChunkedState::Trailers => {
1023                    // Trailers are terminated by an empty line. When there are no trailers,
1024                    // the terminator is a single CRLF (`0\r\n\r\n` total, with the final
1025                    // `\r\n` remaining after consuming the size line).
1026                    let bytes = self.buf.available();
1027                    if bytes.starts_with(b"\r\n") {
1028                        self.buf.consume(2);
1029                        self.chunked_state = ChunkedState::Done;
1030                        return Ok(None);
1031                    }
1032                    if bytes.starts_with(b"\n") {
1033                        self.buf.consume(1);
1034                        self.chunked_state = ChunkedState::Done;
1035                        return Ok(None);
1036                    }
1037                    if let Some(end) = find_headers_end(self.buf.available()) {
1038                        self.buf.consume(end);
1039                        self.chunked_state = ChunkedState::Done;
1040                        return Ok(None);
1041                    }
1042
1043                    let n = Box::pin(self.read_more()).await?;
1044                    if n == 0 {
1045                        return Err(std::io::Error::new(
1046                            std::io::ErrorKind::UnexpectedEof,
1047                            "unexpected EOF reading trailers",
1048                        ));
1049                    }
1050                }
1051
1052                ChunkedState::Done => return Ok(None),
1053            }
1054        }
1055    }
1056}
1057
1058fn find_crlf(buf: &[u8]) -> Option<(usize, usize)> {
1059    for i in 0..buf.len() {
1060        if buf[i..].starts_with(b"\r\n") {
1061            return Some((i, 2));
1062        }
1063        if buf[i..].starts_with(b"\n") {
1064            return Some((i, 1));
1065        }
1066    }
1067    None
1068}
1069
1070async fn read_some<R: AsyncRead + Unpin>(reader: &mut R, dst: &mut [u8]) -> std::io::Result<usize> {
1071    futures::future::poll_fn(|cx| {
1072        let mut read_buf = ReadBuf::new(dst);
1073        match Pin::new(&mut *reader).poll_read(cx, &mut read_buf) {
1074            Poll::Pending => Poll::Pending,
1075            Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())),
1076            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1077        }
1078    })
1079    .await
1080}
1081
1082#[derive(Debug)]
1083enum Transport {
1084    Tcp(TcpStream),
1085    Tls(Box<asupersync::tls::TlsStream<TcpStream>>),
1086}
1087
1088impl Unpin for Transport {}
1089
1090impl AsyncRead for Transport {
1091    fn poll_read(
1092        mut self: Pin<&mut Self>,
1093        cx: &mut Context<'_>,
1094        buf: &mut ReadBuf<'_>,
1095    ) -> Poll<std::io::Result<()>> {
1096        match &mut *self {
1097            Self::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
1098            Self::Tls(stream) => Pin::new(&mut **stream).poll_read(cx, buf),
1099        }
1100    }
1101}
1102
1103impl AsyncWrite for Transport {
1104    fn poll_write(
1105        mut self: Pin<&mut Self>,
1106        cx: &mut Context<'_>,
1107        buf: &[u8],
1108    ) -> Poll<std::io::Result<usize>> {
1109        match &mut *self {
1110            Self::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
1111            Self::Tls(stream) => Pin::new(&mut **stream).poll_write(cx, buf),
1112        }
1113    }
1114
1115    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1116        match &mut *self {
1117            Self::Tcp(stream) => Pin::new(stream).poll_flush(cx),
1118            Self::Tls(stream) => Pin::new(&mut **stream).poll_flush(cx),
1119        }
1120    }
1121
1122    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1123        match &mut *self {
1124            Self::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
1125            Self::Tls(stream) => Pin::new(&mut **stream).poll_shutdown(cx),
1126        }
1127    }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use super::*;
1133    use serde_json::json;
1134    use std::collections::VecDeque;
1135
1136    // ── Method ──────────────────────────────────────────────────────────
1137    #[test]
1138    fn method_as_str_get() {
1139        assert_eq!(Method::Get.as_str(), "GET");
1140    }
1141
1142    #[test]
1143    fn method_as_str_post() {
1144        assert_eq!(Method::Post.as_str(), "POST");
1145    }
1146
1147    // ── find_headers_end ────────────────────────────────────────────────
1148    #[test]
1149    fn find_headers_end_present() {
1150        let buf = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
1151        let pos = find_headers_end(buf).unwrap();
1152        assert_eq!(&buf[pos..], b"hello");
1153    }
1154
1155    #[test]
1156    fn find_headers_end_absent() {
1157        assert!(find_headers_end(b"HTTP/1.1 200 OK\r\nFoo: bar\r\n").is_none());
1158    }
1159
1160    #[test]
1161    fn find_headers_end_empty() {
1162        assert!(find_headers_end(b"").is_none());
1163    }
1164
1165    #[test]
1166    fn find_headers_end_just_separator() {
1167        let buf = b"\r\n\r\n";
1168        assert_eq!(find_headers_end(buf), Some(4));
1169    }
1170
1171    // ── find_crlf ──────────────────────────────────────────────────────
1172    #[test]
1173    fn find_crlf_present() {
1174        assert_eq!(find_crlf(b"abc\r\ndef"), Some((3, 2)));
1175    }
1176
1177    #[test]
1178    fn find_crlf_present_lf() {
1179        assert_eq!(find_crlf(b"abc\ndef"), Some((3, 1)));
1180    }
1181
1182    #[test]
1183    fn find_crlf_absent() {
1184        assert!(find_crlf(b"abcdef").is_none());
1185    }
1186
1187    #[test]
1188    fn find_crlf_at_start() {
1189        assert_eq!(find_crlf(b"\r\ndata"), Some((0, 2)));
1190    }
1191
1192    #[test]
1193    fn find_crlf_at_start_lf() {
1194        assert_eq!(find_crlf(b"\ndata"), Some((0, 1)));
1195    }
1196
1197    // ── parse_response_head ────────────────────────────────────────────
1198    #[test]
1199    fn parse_response_head_200() {
1200        let head = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n";
1201        let (status, headers) = parse_response_head(head).unwrap();
1202        assert_eq!(status, 200);
1203        assert_eq!(headers.len(), 1);
1204        assert_eq!(headers[0].0, "Content-Type");
1205        assert_eq!(headers[0].1, "text/plain");
1206    }
1207
1208    #[test]
1209    fn parse_response_head_404() {
1210        let head = b"HTTP/1.1 404 Not Found\r\n\r\n";
1211        let (status, headers) = parse_response_head(head).unwrap();
1212        assert_eq!(status, 404);
1213        assert!(headers.is_empty());
1214    }
1215
1216    #[test]
1217    fn parse_response_head_multiple_headers() {
1218        let head = b"HTTP/1.1 200 OK\r\nA: 1\r\nB: 2\r\nC: 3\r\n\r\n";
1219        let (status, headers) = parse_response_head(head).unwrap();
1220        assert_eq!(status, 200);
1221        assert_eq!(headers.len(), 3);
1222        assert_eq!(headers[0], ("A".to_string(), "1".to_string()));
1223        assert_eq!(headers[1], ("B".to_string(), "2".to_string()));
1224        assert_eq!(headers[2], ("C".to_string(), "3".to_string()));
1225    }
1226
1227    #[test]
1228    fn parse_response_head_header_value_with_colon() {
1229        // Header value contains a colon (e.g., a URL)
1230        let head = b"HTTP/1.1 200 OK\r\nLocation: http://example.com:8080/path\r\n\r\n";
1231        let (status, headers) = parse_response_head(head).unwrap();
1232        assert_eq!(status, 200);
1233        assert_eq!(headers[0].0, "Location");
1234        assert_eq!(headers[0].1, "http://example.com:8080/path");
1235    }
1236
1237    #[test]
1238    fn parse_response_head_invalid_status_code() {
1239        let head = b"HTTP/1.1 abc OK\r\n\r\n";
1240        assert!(parse_response_head(head).is_err());
1241    }
1242
1243    #[test]
1244    fn parse_response_head_missing_status() {
1245        let head = b"HTTP/1.1\r\n\r\n";
1246        assert!(parse_response_head(head).is_err());
1247    }
1248
1249    #[test]
1250    fn parse_response_head_empty() {
1251        let head = b"";
1252        assert!(parse_response_head(head).is_err());
1253    }
1254
1255    // ── body_kind_from_headers ─────────────────────────────────────────
1256    #[test]
1257    fn body_kind_content_length() {
1258        let headers = vec![("Content-Length".to_string(), "42".to_string())];
1259        assert!(matches!(
1260            body_kind_from_headers(&headers).unwrap(),
1261            BodyKind::ContentLength(42)
1262        ));
1263    }
1264
1265    #[test]
1266    fn body_kind_content_length_zero() {
1267        let headers = vec![("Content-Length".to_string(), "0".to_string())];
1268        assert!(matches!(
1269            body_kind_from_headers(&headers).unwrap(),
1270            BodyKind::Empty
1271        ));
1272    }
1273
1274    #[test]
1275    fn body_kind_chunked() {
1276        let headers = vec![("Transfer-Encoding".to_string(), "chunked".to_string())];
1277        assert!(matches!(
1278            body_kind_from_headers(&headers).unwrap(),
1279            BodyKind::Chunked
1280        ));
1281    }
1282
1283    #[test]
1284    fn body_kind_rejects_chunked_with_additional_transfer_codings() {
1285        let headers = vec![("Transfer-Encoding".to_string(), "gzip, chunked".to_string())];
1286        assert!(body_kind_from_headers(&headers).is_err());
1287    }
1288
1289    #[test]
1290    fn body_kind_rejects_repeated_transfer_encoding_headers_with_extra_codings() {
1291        let headers = vec![
1292            ("Transfer-Encoding".to_string(), "gzip".to_string()),
1293            ("Transfer-Encoding".to_string(), "chunked".to_string()),
1294        ];
1295        assert!(body_kind_from_headers(&headers).is_err());
1296    }
1297
1298    #[test]
1299    fn body_kind_rejects_repeated_chunked_transfer_encoding() {
1300        let headers = vec![
1301            ("Transfer-Encoding".to_string(), "chunked".to_string()),
1302            ("Transfer-Encoding".to_string(), "chunked".to_string()),
1303        ];
1304        assert!(body_kind_from_headers(&headers).is_err());
1305    }
1306
1307    #[test]
1308    fn body_kind_rejects_transfer_encoding_when_chunked_is_not_final() {
1309        let headers = vec![
1310            ("Transfer-Encoding".to_string(), "chunked".to_string()),
1311            ("Transfer-Encoding".to_string(), "gzip".to_string()),
1312        ];
1313        assert!(body_kind_from_headers(&headers).is_err());
1314    }
1315
1316    #[test]
1317    fn body_kind_rejects_non_chunked_transfer_encoding() {
1318        let headers = vec![("Transfer-Encoding".to_string(), "gzip".to_string())];
1319        assert!(body_kind_from_headers(&headers).is_err());
1320    }
1321
1322    #[test]
1323    fn body_kind_chunked_overrides_content_length() {
1324        // When both present, chunked wins
1325        let headers = vec![
1326            ("Content-Length".to_string(), "100".to_string()),
1327            ("Transfer-Encoding".to_string(), "chunked".to_string()),
1328        ];
1329        assert!(matches!(
1330            body_kind_from_headers(&headers).unwrap(),
1331            BodyKind::Chunked
1332        ));
1333    }
1334
1335    #[test]
1336    fn body_kind_eof_no_headers() {
1337        let headers: Vec<(String, String)> = Vec::new();
1338        assert!(matches!(
1339            body_kind_from_headers(&headers).unwrap(),
1340            BodyKind::Eof
1341        ));
1342    }
1343
1344    #[test]
1345    fn body_kind_case_insensitive() {
1346        let headers = vec![("content-length".to_string(), "10".to_string())];
1347        assert!(matches!(
1348            body_kind_from_headers(&headers).unwrap(),
1349            BodyKind::ContentLength(10)
1350        ));
1351    }
1352
1353    #[test]
1354    fn body_kind_response_204_without_headers_is_empty() {
1355        let headers: Vec<(String, String)> = Vec::new();
1356        assert!(matches!(
1357            body_kind_from_response(204, &headers).unwrap(),
1358            BodyKind::Empty
1359        ));
1360    }
1361
1362    #[test]
1363    fn body_kind_response_304_ignores_content_length() {
1364        let headers = vec![("Content-Length".to_string(), "7".to_string())];
1365        assert!(matches!(
1366            body_kind_from_response(304, &headers).unwrap(),
1367            BodyKind::Empty
1368        ));
1369    }
1370
1371    #[test]
1372    fn body_kind_response_205_without_headers_is_empty() {
1373        let headers: Vec<(String, String)> = Vec::new();
1374        assert!(matches!(
1375            body_kind_from_response(205, &headers).unwrap(),
1376            BodyKind::Empty
1377        ));
1378    }
1379
1380    // ── build_request_bytes ────────────────────────────────────────────
1381    #[test]
1382    fn build_request_bytes_get() {
1383        let parsed = ParsedUrl::parse("http://example.com/api/test").unwrap();
1384        let bytes = build_request_bytes(Method::Get, &parsed, "test-agent", &[], &[]);
1385        let text = String::from_utf8(bytes).unwrap();
1386        assert!(text.starts_with("GET /api/test HTTP/1.1\r\n"));
1387        assert!(text.contains("Host: example.com\r\n"));
1388        assert!(text.contains("User-Agent: test-agent\r\n"));
1389        assert!(text.contains("Content-Length: 0\r\n"));
1390        assert!(text.ends_with("\r\n\r\n"));
1391    }
1392
1393    #[test]
1394    fn build_request_bytes_post_with_body() {
1395        let parsed = ParsedUrl::parse("https://api.example.com/v1/messages").unwrap();
1396        let body = b"hello world";
1397        let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1398        let bytes = build_request_bytes(Method::Post, &parsed, "pi/0.1", &headers, body);
1399        let text = String::from_utf8(bytes).unwrap();
1400        assert!(text.starts_with("POST /v1/messages HTTP/1.1\r\n"));
1401        assert!(text.contains("Host: api.example.com\r\n"));
1402        assert!(text.contains("Content-Length: 11\r\n"));
1403        assert!(text.contains("Content-Type: application/json\r\n"));
1404    }
1405
1406    #[test]
1407    fn build_request_bytes_custom_headers() {
1408        let parsed = ParsedUrl::parse("http://localhost/test").unwrap();
1409        let headers = vec![
1410            ("Authorization".to_string(), "Bearer sk-test".to_string()),
1411            ("X-Custom".to_string(), "value".to_string()),
1412        ];
1413        let bytes = build_request_bytes(Method::Post, &parsed, "agent", &headers, &[]);
1414        let text = String::from_utf8(bytes).unwrap();
1415        assert!(text.contains("Authorization: Bearer sk-test\r\n"));
1416        assert!(text.contains("X-Custom: value\r\n"));
1417    }
1418
1419    #[test]
1420    fn build_request_bytes_reserved_headers_are_canonicalized() {
1421        let parsed = ParsedUrl::parse("https://api.example.com/v1/messages").unwrap();
1422        let headers = vec![
1423            ("Host".to_string(), "spoofed.example.com".to_string()),
1424            ("User-Agent".to_string(), "custom-agent".to_string()),
1425            ("Content-Length".to_string(), "999".to_string()),
1426            ("X-Test".to_string(), "1".to_string()),
1427        ];
1428        let body = b"hello";
1429        let bytes = build_request_bytes(Method::Post, &parsed, "default-agent", &headers, body);
1430        let text = String::from_utf8(bytes).unwrap();
1431
1432        assert_eq!(text.matches("Host: ").count(), 1);
1433        assert!(text.contains("Host: api.example.com\r\n"));
1434        assert!(!text.contains("Host: spoofed.example.com\r\n"));
1435
1436        assert_eq!(text.matches("User-Agent: ").count(), 1);
1437        assert!(text.contains("User-Agent: custom-agent\r\n"));
1438        assert!(!text.contains("User-Agent: default-agent\r\n"));
1439
1440        assert_eq!(text.matches("Content-Length: ").count(), 1);
1441        assert!(text.contains("Content-Length: 5\r\n"));
1442        assert!(!text.contains("Content-Length: 999\r\n"));
1443
1444        assert!(text.contains("X-Test: 1\r\n"));
1445    }
1446
1447    #[test]
1448    fn build_request_bytes_non_default_port_includes_port_in_host_header() {
1449        let parsed = ParsedUrl::parse("http://example.com:8080/api/test").unwrap();
1450        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &[], &[]);
1451        let text = String::from_utf8(bytes).unwrap();
1452
1453        assert!(text.contains("Host: example.com:8080\r\n"));
1454    }
1455
1456    #[test]
1457    fn build_request_bytes_sanitizes_overridden_user_agent() {
1458        let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
1459        let headers = vec![(
1460            "User-Agent".to_string(),
1461            "custom-agent\r\nX-Injected: nope".to_string(),
1462        )];
1463        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
1464        let text = String::from_utf8(bytes).unwrap();
1465
1466        assert!(text.contains("User-Agent: custom-agentX-Injected: nope\r\n"));
1467        assert_eq!(text.matches("User-Agent: ").count(), 1);
1468        assert!(!text.contains("\r\nX-Injected: nope\r\n"));
1469    }
1470
1471    // ── build_recorded_request ─────────────────────────────────────────
1472    #[test]
1473    fn build_recorded_request_empty_body() {
1474        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &[], &[]);
1475        assert_eq!(req.method, "POST");
1476        assert_eq!(req.url, "https://api.test.com/v1");
1477        assert!(req.body.is_none());
1478        assert!(req.body_text.is_none());
1479    }
1480
1481    #[test]
1482    fn build_recorded_request_json_body() {
1483        let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1484        let body = serde_json::to_vec(&json!({"model": "test"})).unwrap();
1485        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, &body);
1486        assert!(req.body.is_some());
1487        assert_eq!(req.body.unwrap()["model"], "test");
1488        assert!(req.body_text.is_none());
1489    }
1490
1491    #[test]
1492    fn build_recorded_request_text_body() {
1493        let headers = vec![("Content-Type".to_string(), "text/plain".to_string())];
1494        let body = b"hello world";
1495        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, body);
1496        assert!(req.body.is_none());
1497        assert_eq!(req.body_text.as_deref(), Some("hello world"));
1498    }
1499
1500    #[test]
1501    fn build_recorded_request_invalid_json_body_falls_back_to_text() {
1502        let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1503        let body = b"not json {{{";
1504        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, body);
1505        assert!(req.body.is_none());
1506        assert_eq!(req.body_text.as_deref(), Some("not json {{{"));
1507    }
1508
1509    #[test]
1510    fn build_recorded_request_preserves_headers() {
1511        let headers = vec![
1512            ("Authorization".to_string(), "Bearer key".to_string()),
1513            ("X-Trace".to_string(), "abc123".to_string()),
1514        ];
1515        let req = build_recorded_request(Method::Get, "https://test.com", &headers, &[]);
1516        assert_eq!(req.headers.len(), 2);
1517        assert_eq!(req.headers[0].0, "Authorization");
1518    }
1519
1520    // ── Buffer ─────────────────────────────────────────────────────────
1521    #[test]
1522    fn buffer_new_empty() {
1523        let buf = Buffer::new(Vec::new());
1524        assert!(buf.is_empty());
1525        assert_eq!(buf.len(), 0);
1526    }
1527
1528    #[test]
1529    fn buffer_new_with_data() {
1530        let buf = Buffer::new(vec![1, 2, 3]);
1531        assert!(!buf.is_empty());
1532        assert_eq!(buf.len(), 3);
1533        assert_eq!(buf.available(), &[1, 2, 3]);
1534    }
1535
1536    #[test]
1537    fn buffer_consume_partial() {
1538        let mut buf = Buffer::new(vec![1, 2, 3, 4, 5]);
1539        buf.consume(2);
1540        assert_eq!(buf.len(), 3);
1541        assert_eq!(buf.available(), &[3, 4, 5]);
1542    }
1543
1544    #[test]
1545    fn buffer_consume_all() {
1546        let mut buf = Buffer::new(vec![1, 2, 3]);
1547        buf.consume(3);
1548        assert!(buf.is_empty());
1549        assert_eq!(buf.len(), 0);
1550    }
1551
1552    #[test]
1553    fn buffer_consume_triggers_compact() {
1554        // When pos >= len/2, the buffer compacts
1555        let mut buf = Buffer::new(vec![0; 10]);
1556        buf.consume(6); // pos=6, len=10, 6 >= 5 → compact
1557        assert_eq!(buf.len(), 4);
1558        assert_eq!(buf.available().len(), 4);
1559    }
1560
1561    #[test]
1562    fn buffer_extend() {
1563        let mut buf = Buffer::new(vec![1, 2]);
1564        buf.extend(&[3, 4, 5]).unwrap();
1565        assert_eq!(buf.len(), 5);
1566        assert_eq!(buf.available(), &[1, 2, 3, 4, 5]);
1567    }
1568
1569    #[test]
1570    fn buffer_extend_overflow() {
1571        let mut buf = Buffer::new(Vec::new());
1572        let huge = vec![0u8; MAX_BUFFERED_BYTES + 1];
1573        assert!(buf.extend(&huge).is_err());
1574    }
1575
1576    #[test]
1577    fn buffer_split_to_vec() {
1578        let mut buf = Buffer::new(vec![1, 2, 3, 4, 5]);
1579        let out = buf.split_to_vec(3);
1580        assert_eq!(out, vec![1, 2, 3]);
1581        assert_eq!(buf.len(), 2);
1582        assert_eq!(buf.available(), &[4, 5]);
1583    }
1584
1585    #[test]
1586    fn buffer_split_to_vec_more_than_available() {
1587        let mut buf = Buffer::new(vec![1, 2]);
1588        let out = buf.split_to_vec(10);
1589        assert_eq!(out, vec![1, 2]);
1590        assert!(buf.is_empty());
1591    }
1592
1593    #[test]
1594    fn buffer_consume_then_extend() {
1595        let mut buf = Buffer::new(vec![1, 2, 3]);
1596        buf.consume(2);
1597        buf.extend(&[4, 5]).unwrap();
1598        // After consume(2), available = [3], then extend [4,5] → [3, 4, 5]
1599        assert_eq!(buf.available(), &[3, 4, 5]);
1600    }
1601
1602    #[test]
1603    fn buffer_consume_exactly_all_clears() {
1604        let mut buf = Buffer::new(vec![1, 2, 3]);
1605        buf.consume(3);
1606        // pos == bytes.len() triggers clear
1607        assert!(buf.is_empty());
1608        assert_eq!(buf.available(), &[] as &[u8]);
1609    }
1610
1611    // ── Client builder methods ─────────────────────────────────────────
1612    #[test]
1613    fn client_default() {
1614        let client = Client::default();
1615        assert!(client.vcr().is_none());
1616    }
1617
1618    #[test]
1619    fn client_with_vcr() {
1620        let recorder = VcrRecorder::new_with(
1621            "test",
1622            crate::vcr::VcrMode::Playback,
1623            std::path::Path::new("/tmp"),
1624        );
1625        let client = Client::new().with_vcr(recorder);
1626        assert!(client.vcr().is_some());
1627    }
1628
1629    // ── RequestBuilder ─────────────────────────────────────────────────
1630    #[test]
1631    fn request_builder_header_chaining() {
1632        let client = Client::new();
1633        let builder = client
1634            .post("https://api.example.com")
1635            .header("Authorization", "Bearer test")
1636            .header("X-Custom", "value");
1637        assert_eq!(builder.headers.len(), 2);
1638    }
1639
1640    #[test]
1641    fn request_builder_header_replaces_case_insensitive_duplicate_names() {
1642        let client = Client::new();
1643        let builder = client
1644            .post("https://api.example.com")
1645            .header("Authorization", "Bearer first")
1646            .header("authorization", "Bearer second");
1647
1648        assert_eq!(builder.headers.len(), 1);
1649        assert!(builder.headers[0].0.eq_ignore_ascii_case("authorization"));
1650        assert_eq!(builder.headers[0].1, "Bearer second");
1651    }
1652
1653    #[test]
1654    fn request_builder_header_bounds_prevent_dos() {
1655        // Test that header count is bounded to prevent DoS attacks.
1656        let client = Client::new();
1657        let mut builder = client.post("https://api.example.com");
1658
1659        // Add headers up to the limit
1660        for i in 0..MAX_REQUEST_HEADERS {
1661            builder = builder.header(format!("X-Header-{i}"), "value");
1662        }
1663        assert_eq!(builder.headers.len(), MAX_REQUEST_HEADERS);
1664
1665        // Additional headers should be silently dropped
1666        builder = builder
1667            .header("X-Over-Limit-1", "dropped")
1668            .header("X-Over-Limit-2", "also-dropped");
1669        assert_eq!(builder.headers.len(), MAX_REQUEST_HEADERS);
1670
1671        // But replacing existing headers should still work
1672        builder = builder.header("X-Header-0", "replaced-value");
1673        assert_eq!(builder.headers.len(), MAX_REQUEST_HEADERS);
1674        assert_eq!(builder.headers[0].1, "replaced-value");
1675    }
1676
1677    #[test]
1678    fn request_builder_json() {
1679        let client = Client::new();
1680        let builder = client
1681            .post("https://api.example.com")
1682            .json(&json!({"key": "value"}))
1683            .unwrap();
1684        assert!(!builder.body.is_empty());
1685        // Should have auto-added Content-Type header
1686        assert!(
1687            builder
1688                .headers
1689                .iter()
1690                .any(|(k, v)| k == "Content-Type" && v == "application/json")
1691        );
1692    }
1693
1694    #[test]
1695    fn request_builder_body() {
1696        let client = Client::new();
1697        let builder = client
1698            .post("https://api.example.com")
1699            .body(b"raw bytes".to_vec());
1700        assert_eq!(builder.body, b"raw bytes");
1701    }
1702
1703    #[test]
1704    fn request_builder_default_timeout() {
1705        let client = Client::new();
1706        let builder = client.get("https://api.example.com");
1707        // During tests, default timeout is disabled to avoid virtual timer issues.
1708        assert_eq!(builder.timeout, None);
1709    }
1710
1711    #[test]
1712    fn request_builder_timeout() {
1713        let client = Client::new();
1714        let builder = client
1715            .get("https://api.example.com")
1716            .timeout(std::time::Duration::from_secs(30));
1717        assert_eq!(builder.timeout, Some(std::time::Duration::from_secs(30)));
1718    }
1719
1720    #[test]
1721    fn request_builder_no_timeout() {
1722        let client = Client::new();
1723        let builder = client.get("https://api.example.com").no_timeout();
1724        assert_eq!(builder.timeout, None);
1725    }
1726
1727    struct MockRetryWriter {
1728        writes: VecDeque<std::io::Result<usize>>,
1729        flushes: VecDeque<std::io::Result<()>>,
1730        written: Vec<u8>,
1731    }
1732
1733    impl MockRetryWriter {
1734        fn new(
1735            writes: impl IntoIterator<Item = std::io::Result<usize>>,
1736            flushes: impl IntoIterator<Item = std::io::Result<()>>,
1737        ) -> Self {
1738            Self {
1739                writes: writes.into_iter().collect(),
1740                flushes: flushes.into_iter().collect(),
1741                written: Vec::new(),
1742            }
1743        }
1744    }
1745
1746    impl AsyncWrite for MockRetryWriter {
1747        fn poll_write(
1748            mut self: Pin<&mut Self>,
1749            _cx: &mut Context<'_>,
1750            buf: &[u8],
1751        ) -> Poll<std::io::Result<usize>> {
1752            let result = self.writes.pop_front().unwrap_or(Ok(buf.len()));
1753            if let Ok(written) = result {
1754                self.written
1755                    .extend_from_slice(&buf[..written.min(buf.len())]);
1756            }
1757            Poll::Ready(result)
1758        }
1759
1760        fn poll_flush(
1761            mut self: Pin<&mut Self>,
1762            _cx: &mut Context<'_>,
1763        ) -> Poll<std::io::Result<()>> {
1764            Poll::Ready(self.flushes.pop_front().unwrap_or(Ok(())))
1765        }
1766
1767        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
1768            Poll::Ready(Ok(()))
1769        }
1770    }
1771
1772    #[test]
1773    fn write_all_with_retry_propagates_flush_error_after_zero_write() {
1774        asupersync::test_utils::run_test(|| async {
1775            let mut writer = MockRetryWriter::new(
1776                [Ok(0)],
1777                [Err(std::io::Error::new(
1778                    std::io::ErrorKind::BrokenPipe,
1779                    "flush failed",
1780                ))],
1781            );
1782
1783            let err = write_all_with_retry(&mut writer, b"hello")
1784                .await
1785                .expect_err("flush failure should not be swallowed");
1786            assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
1787            assert_eq!(err.to_string(), "flush failed");
1788            assert!(writer.written.is_empty());
1789        });
1790    }
1791
1792    #[test]
1793    fn write_all_with_retry_recovers_after_zero_write_when_flush_succeeds() {
1794        asupersync::test_utils::run_test(|| async {
1795            let mut writer = MockRetryWriter::new([Ok(0), Ok(2), Ok(3)], [Ok(())]);
1796
1797            write_all_with_retry(&mut writer, b"hello")
1798                .await
1799                .expect("retry helper should recover after transient zero write");
1800            assert_eq!(writer.written, b"hello");
1801        });
1802    }
1803
1804    // ── Response ───────────────────────────────────────────────────────
1805    #[test]
1806    fn response_accessors() {
1807        let response = Response {
1808            status: 200,
1809            headers: vec![("Content-Type".to_string(), "text/plain".to_string())],
1810            stream: Box::pin(futures::stream::empty()),
1811            timeout_info: None,
1812        };
1813        assert_eq!(response.status(), 200);
1814        assert_eq!(response.headers().len(), 1);
1815        assert_eq!(response.headers()[0].0, "Content-Type");
1816    }
1817
1818    #[test]
1819    fn response_text() {
1820        asupersync::test_utils::run_test(|| async {
1821            let chunks = vec![Ok(b"hello ".to_vec()), Ok(b"world".to_vec())];
1822            let response = Response {
1823                status: 200,
1824                headers: Vec::new(),
1825                stream: Box::pin(futures::stream::iter(chunks)),
1826                timeout_info: None,
1827            };
1828            let text = response.text().await.unwrap();
1829            assert_eq!(text, "hello world");
1830        });
1831    }
1832
1833    #[test]
1834    fn response_text_empty() {
1835        asupersync::test_utils::run_test(|| async {
1836            let response = Response {
1837                status: 200,
1838                headers: Vec::new(),
1839                stream: Box::pin(futures::stream::empty()),
1840                timeout_info: None,
1841            };
1842            let text = response.text().await.unwrap();
1843            assert_eq!(text, "");
1844        });
1845    }
1846
1847    #[test]
1848    fn response_bytes_stream() {
1849        asupersync::test_utils::run_test(|| async {
1850            let chunks = vec![Ok(b"data".to_vec())];
1851            let response = Response {
1852                status: 200,
1853                headers: Vec::new(),
1854                stream: Box::pin(futures::stream::iter(chunks)),
1855                timeout_info: None,
1856            };
1857            let mut stream = response.bytes_stream();
1858            let first = stream.next().await.unwrap().unwrap();
1859            assert_eq!(first, b"data");
1860            assert!(stream.next().await.is_none());
1861        });
1862    }
1863
1864    // ── Body stream via Response (in-memory) ──────────────────────────
1865    #[test]
1866    fn body_stream_content_length_via_response() {
1867        asupersync::test_utils::run_test(|| async {
1868            // Simulate a content-length response by providing exact chunks
1869            let body = b"Hello, World!";
1870            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(body.to_vec())];
1871            let response = Response {
1872                status: 200,
1873                headers: vec![("Content-Length".to_string(), "13".to_string())],
1874                stream: Box::pin(futures::stream::iter(chunks)),
1875                timeout_info: None,
1876            };
1877            let text = response.text().await.unwrap();
1878            assert_eq!(text, "Hello, World!");
1879        });
1880    }
1881
1882    #[test]
1883    fn body_stream_multiple_chunks_via_response() {
1884        asupersync::test_utils::run_test(|| async {
1885            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![
1886                Ok(b"chunk1".to_vec()),
1887                Ok(b"chunk2".to_vec()),
1888                Ok(b"chunk3".to_vec()),
1889            ];
1890            let response = Response {
1891                status: 200,
1892                headers: Vec::new(),
1893                stream: Box::pin(futures::stream::iter(chunks)),
1894                timeout_info: None,
1895            };
1896            let text = response.text().await.unwrap();
1897            assert_eq!(text, "chunk1chunk2chunk3");
1898        });
1899    }
1900
1901    #[test]
1902    fn body_stream_error_propagation() {
1903        asupersync::test_utils::run_test(|| async {
1904            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![
1905                Ok(b"data".to_vec()),
1906                Err(std::io::Error::new(
1907                    std::io::ErrorKind::ConnectionReset,
1908                    "connection reset",
1909                )),
1910            ];
1911            let response = Response {
1912                status: 200,
1913                headers: Vec::new(),
1914                stream: Box::pin(futures::stream::iter(chunks)),
1915                timeout_info: None,
1916            };
1917            let result = response.text().await;
1918            assert!(result.is_err());
1919        });
1920    }
1921
1922    // ── Edge cases ─────────────────────────────────────────────────────
1923    #[test]
1924    fn parse_response_head_trims_header_whitespace() {
1925        let head = b"HTTP/1.1 200 OK\r\n  X-Padded  :   value with spaces  \r\n\r\n";
1926        let (status, headers) = parse_response_head(head).unwrap();
1927        assert_eq!(status, 200);
1928        assert_eq!(headers[0].0, "X-Padded");
1929        assert_eq!(headers[0].1, "value with spaces");
1930    }
1931
1932    #[test]
1933    fn parse_response_head_status_codes() {
1934        for (code, line) in [
1935            (100, "HTTP/1.1 100 Continue"),
1936            (201, "HTTP/1.1 201 Created"),
1937            (301, "HTTP/1.1 301 Moved Permanently"),
1938            (400, "HTTP/1.1 400 Bad Request"),
1939            (429, "HTTP/1.1 429 Too Many Requests"),
1940            (500, "HTTP/1.1 500 Internal Server Error"),
1941            (503, "HTTP/1.1 503 Service Unavailable"),
1942        ] {
1943            let head = format!("{line}\r\n\r\n");
1944            let (status, _) = parse_response_head(head.as_bytes()).unwrap();
1945            assert_eq!(status, code, "Failed to parse status {code}");
1946        }
1947    }
1948
1949    #[test]
1950    fn body_kind_invalid_content_length_is_error() {
1951        let headers = vec![("Content-Length".to_string(), "not-a-number".to_string())];
1952        assert!(body_kind_from_headers(&headers).is_err());
1953    }
1954
1955    #[test]
1956    fn body_kind_conflicting_content_length_headers_is_error() {
1957        let headers = vec![
1958            ("Content-Length".to_string(), "5".to_string()),
1959            ("content-length".to_string(), "7".to_string()),
1960        ];
1961        assert!(body_kind_from_headers(&headers).is_err());
1962    }
1963
1964    #[test]
1965    fn body_kind_coalesced_identical_content_length_is_accepted() {
1966        let headers = vec![("Content-Length".to_string(), "5, 5".to_string())];
1967        assert!(matches!(
1968            body_kind_from_headers(&headers).unwrap(),
1969            BodyKind::ContentLength(5)
1970        ));
1971    }
1972
1973    #[test]
1974    fn body_kind_coalesced_conflicting_content_length_is_error() {
1975        let headers = vec![("Content-Length".to_string(), "5, 7".to_string())];
1976        assert!(body_kind_from_headers(&headers).is_err());
1977    }
1978
1979    #[test]
1980    fn build_request_bytes_empty_path() {
1981        let parsed = ParsedUrl::parse("http://example.com").unwrap();
1982        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &[], &[]);
1983        let text = String::from_utf8(bytes).unwrap();
1984        // Should have "/" as path
1985        assert!(text.starts_with("GET /"));
1986    }
1987
1988    #[test]
1989    fn build_recorded_request_content_type_case_insensitive() {
1990        let headers = vec![("content-type".to_string(), "APPLICATION/JSON".to_string())];
1991        let body = serde_json::to_vec(&json!({"test": true})).unwrap();
1992        let req = build_recorded_request(Method::Post, "https://test.com", &headers, &body);
1993        // Should detect JSON despite case differences
1994        assert!(req.body.is_some());
1995    }
1996
1997    // ── CRLF header injection prevention ──────────────────────────────
1998    #[test]
1999    fn sanitize_header_value_strips_crlf() {
2000        assert_eq!(sanitize_header_value("normal value"), "normal value");
2001        assert_eq!(
2002            sanitize_header_value("injected\r\nEvil: header"),
2003            "injectedEvil: header"
2004        );
2005        assert_eq!(sanitize_header_value("bare\nnewline"), "barenewline");
2006        assert_eq!(sanitize_header_value("bare\rreturn"), "barereturn");
2007        assert_eq!(sanitize_header_value(""), "");
2008    }
2009
2010    #[test]
2011    fn build_request_bytes_strips_crlf_from_headers() {
2012        let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2013        let headers = vec![(
2014            "X-Injected\r\nEvil".to_string(),
2015            "value\r\nX-Bad: smuggled".to_string(),
2016        )];
2017        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
2018        let text = String::from_utf8(bytes).unwrap();
2019        // CRLF should be stripped — no injected header line
2020        assert!(text.contains("X-InjectedEvil: valueX-Bad: smuggled\r\n"));
2021        // The smuggled header must NOT appear as a separate line
2022        assert!(!text.contains("\r\nX-Bad: smuggled\r\n"));
2023    }
2024
2025    #[test]
2026    fn build_request_bytes_strips_invalid_chars_from_header_names() {
2027        let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2028        let headers = vec![("X:Injected Header".to_string(), "value".to_string())];
2029        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
2030        let text = String::from_utf8(bytes).unwrap();
2031
2032        assert!(text.contains("XInjectedHeader: value\r\n"));
2033        assert!(!text.contains("X:Injected Header: value\r\n"));
2034    }
2035
2036    #[test]
2037    fn build_request_bytes_drops_headers_that_normalize_to_reserved_names() {
2038        let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2039        let headers = vec![
2040            ("Host:".to_string(), "evil.example".to_string()),
2041            ("Content-Length ".to_string(), "999".to_string()),
2042            ("User-Agent:".to_string(), "spoofed".to_string()),
2043        ];
2044        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
2045        let text = String::from_utf8(bytes).unwrap();
2046
2047        assert!(text.contains("Host: example.com\r\n"));
2048        assert!(text.contains("User-Agent: agent\r\n"));
2049        assert!(text.contains("Content-Length: 0\r\n"));
2050        assert!(!text.contains("Host: evil.example\r\n"));
2051        assert!(!text.contains("Content-Length: 999\r\n"));
2052        assert!(!text.contains("User-Agent: spoofed\r\n"));
2053    }
2054
2055    #[test]
2056    fn build_request_bytes_drops_transfer_encoding_header() {
2057        let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
2058        let headers = vec![("Transfer-Encoding".to_string(), "chunked".to_string())];
2059        let body = b"hello";
2060        let bytes = build_request_bytes(Method::Post, &parsed, "agent", &headers, body);
2061        let text = String::from_utf8(bytes).unwrap();
2062
2063        assert!(text.contains("Content-Length: 5\r\n"));
2064        assert!(!text.contains("Transfer-Encoding: chunked\r\n"));
2065    }
2066
2067    // ── Response body size limit ──────────────────────────────────────
2068    #[test]
2069    fn response_text_rejects_oversized_body() {
2070        asupersync::test_utils::run_test(|| async {
2071            // Create a stream that would exceed MAX_TEXT_BODY_BYTES
2072            let big_chunk = vec![0u8; MAX_TEXT_BODY_BYTES + 1];
2073            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(big_chunk)];
2074            let response = Response {
2075                status: 200,
2076                headers: Vec::new(),
2077                stream: Box::pin(futures::stream::iter(chunks)),
2078                timeout_info: None,
2079            };
2080            let result = response.text().await;
2081            assert!(result.is_err());
2082            let err_msg = format!("{}", result.unwrap_err());
2083            assert!(
2084                err_msg.contains("too large"),
2085                "error should mention size: {err_msg}"
2086            );
2087        });
2088    }
2089
2090    #[test]
2091    fn response_text_accepts_body_at_limit() {
2092        asupersync::test_utils::run_test(|| async {
2093            let chunk = vec![b'a'; MAX_TEXT_BODY_BYTES];
2094            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(chunk)];
2095            let response = Response {
2096                status: 200,
2097                headers: Vec::new(),
2098                stream: Box::pin(futures::stream::iter(chunks)),
2099                timeout_info: None,
2100            };
2101            let result = response.text().await;
2102            assert!(result.is_ok());
2103            assert_eq!(result.unwrap().len(), MAX_TEXT_BODY_BYTES);
2104        });
2105    }
2106
2107    // ── PI_AI_ANTIGRAVITY_VERSION env var ─────────────────────────────
2108
2109    #[test]
2110    fn antigravity_user_agent_format() {
2111        // Verify the format string used when PI_AI_ANTIGRAVITY_VERSION is set.
2112        let version = "1.2.3";
2113        let ua = format!("{DEFAULT_USER_AGENT} Antigravity/{version}");
2114        assert!(ua.starts_with("pi_agent_rust/"));
2115        assert!(ua.contains("Antigravity/1.2.3"));
2116
2117        // Verify default user agent contains crate version.
2118        assert!(DEFAULT_USER_AGENT.starts_with("pi_agent_rust/"));
2119    }
2120
2121    #[test]
2122    fn antigravity_user_agent_in_request_headers() {
2123        // Simulate the antigravity user agent being used in request building.
2124        let ua = format!("{DEFAULT_USER_AGENT} Antigravity/42.0");
2125        let parsed = ParsedUrl::parse("http://example.com/api").unwrap();
2126        let bytes = build_request_bytes(Method::Get, &parsed, &ua, &[], &[]);
2127        let text = String::from_utf8(bytes).unwrap();
2128        assert!(text.contains(&format!("User-Agent: {ua}\r\n")));
2129    }
2130}