Skip to main content

pi/http/
client.rs

1//! Minimal streaming HTTP client for Pi.
2//!
3//! This is intentionally small and purpose-built for provider streaming (SSE).
4//! It avoids Node/Bun-style ambient APIs and is designed to pair with
5//! asupersync for TLS + cancel-correctness.
6
7use crate::error::{Error, Result};
8use crate::vcr::{RecordedRequest, VcrRecorder};
9use asupersync::http::h1::ParsedUrl;
10use asupersync::http::h1::http_client::Scheme;
11use asupersync::io::ext::AsyncWriteExt;
12use asupersync::io::{AsyncRead, AsyncWrite, ReadBuf};
13use asupersync::net::tcp::stream::TcpStream;
14use asupersync::tls::{TlsConnector, TlsConnectorBuilder};
15use futures::Stream;
16use futures::StreamExt;
17use futures::TryStreamExt;
18use futures::stream::{self, BoxStream};
19use std::pin::Pin;
20use std::sync::OnceLock;
21use std::task::{Context, Poll};
22
23const DEFAULT_USER_AGENT: &str = concat!("pi_agent_rust/", env!("CARGO_PKG_VERSION"));
24const ANTIGRAVITY_VERSION_ENV: &str = "PI_AI_ANTIGRAVITY_VERSION";
25const MAX_HEADER_BYTES: usize = 64 * 1024;
26const READ_CHUNK_BYTES: usize = 16 * 1024;
27const MAX_BUFFERED_BYTES: usize = 256 * 1024;
28const MAX_TEXT_BODY_BYTES: usize = 50 * 1024 * 1024;
29const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 60;
30
31fn default_request_timeout_from_env() -> Option<std::time::Duration> {
32    static REQUEST_TIMEOUT: OnceLock<Option<std::time::Duration>> = OnceLock::new();
33    *REQUEST_TIMEOUT.get_or_init(|| {
34        let timeout_secs = std::env::var("PI_HTTP_REQUEST_TIMEOUT_SECS")
35            .ok()
36            .and_then(|raw| raw.trim().parse::<u64>().ok())
37            .unwrap_or(DEFAULT_REQUEST_TIMEOUT_SECS);
38        if timeout_secs == 0 {
39            None
40        } else {
41            Some(std::time::Duration::from_secs(timeout_secs))
42        }
43    })
44}
45
46#[derive(Debug, Clone)]
47pub struct Client {
48    tls: std::result::Result<TlsConnector, String>,
49    user_agent: String,
50    vcr: Option<VcrRecorder>,
51}
52
53impl Client {
54    #[must_use]
55    pub fn new() -> Self {
56        let tls = TlsConnectorBuilder::new()
57            .with_native_roots()
58            .and_then(|builder| builder.alpn_protocols(vec![b"http/1.1".to_vec()]).build())
59            .map_err(|e| e.to_string());
60
61        let user_agent = std::env::var(ANTIGRAVITY_VERSION_ENV).map_or_else(
62            |_| DEFAULT_USER_AGENT.to_string(),
63            |v| format!("{DEFAULT_USER_AGENT} Antigravity/{v}"),
64        );
65
66        Self {
67            tls,
68            user_agent,
69            vcr: None,
70        }
71    }
72
73    pub fn post(&self, url: &str) -> RequestBuilder<'_> {
74        RequestBuilder::new(self, Method::Post, url)
75    }
76
77    pub fn get(&self, url: &str) -> RequestBuilder<'_> {
78        RequestBuilder::new(self, Method::Get, url)
79    }
80
81    #[must_use]
82    pub fn with_vcr(mut self, recorder: VcrRecorder) -> Self {
83        self.vcr = Some(recorder);
84        self
85    }
86
87    pub const fn vcr(&self) -> Option<&VcrRecorder> {
88        self.vcr.as_ref()
89    }
90}
91
92impl Default for Client {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98#[derive(Debug, Clone, Copy)]
99enum Method {
100    Get,
101    Post,
102}
103
104impl Method {
105    const fn as_str(self) -> &'static str {
106        match self {
107            Self::Get => "GET",
108            Self::Post => "POST",
109        }
110    }
111}
112
113pub struct RequestBuilder<'a> {
114    client: &'a Client,
115    method: Method,
116    url: String,
117    headers: Vec<(String, String)>,
118    body: Vec<u8>,
119    timeout: Option<std::time::Duration>,
120}
121
122impl<'a> RequestBuilder<'a> {
123    fn new(client: &'a Client, method: Method, url: &str) -> Self {
124        Self {
125            client,
126            method,
127            url: url.to_string(),
128            headers: Vec::new(),
129            body: Vec::new(),
130            timeout: default_request_timeout_from_env(),
131        }
132    }
133
134    #[must_use]
135    pub fn header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
136        self.headers.push((key.into(), value.into()));
137        self
138    }
139
140    #[must_use]
141    pub const fn timeout(mut self, duration: std::time::Duration) -> Self {
142        self.timeout = Some(duration);
143        self
144    }
145
146    /// Remove the timeout entirely. Use for requests that are expected to take
147    /// an arbitrarily long time (e.g. long-polling SSE streams).
148    #[must_use]
149    pub const fn no_timeout(mut self) -> Self {
150        self.timeout = None;
151        self
152    }
153
154    /// Set raw body bytes.
155    #[must_use]
156    pub fn body(mut self, body: Vec<u8>) -> Self {
157        self.body = body;
158        self
159    }
160
161    pub fn json<T: serde::Serialize>(mut self, payload: &T) -> Result<Self> {
162        self.headers
163            .push(("Content-Type".to_string(), "application/json".to_string()));
164        self.body = serde_json::to_vec(payload)?;
165        Ok(self)
166    }
167
168    pub async fn send(self) -> Result<Response> {
169        let RequestBuilder {
170            client,
171            method,
172            url,
173            headers,
174            body,
175            timeout,
176        } = self;
177
178        if let Some(recorder) = client.vcr() {
179            let recorded_request = build_recorded_request(method, &url, &headers, &body);
180            let recorded = recorder
181                .request_streaming_with(recorded_request, || async {
182                    let (status, response_headers, stream) =
183                        send_parts(client, method, &url, &headers, &body).await?;
184                    Ok((status, response_headers, stream))
185                })
186                .await?;
187            let status = recorded.status;
188            let response_headers = recorded.headers.clone();
189            let stream = recorded.into_byte_stream();
190            return Ok(Response {
191                status,
192                headers: response_headers,
193                stream,
194            });
195        }
196
197        let send_fut = send_parts(client, method, &url, &headers, &body);
198
199        let (status, response_headers, stream) = if let Some(duration) = timeout {
200            use asupersync::time::{sleep, wall_now};
201            use futures::future::{Either, FutureExt, select};
202
203            let now = asupersync::Cx::current()
204                .and_then(|cx| cx.timer_driver())
205                .map_or_else(wall_now, |timer| timer.now());
206            let sleep_fut = sleep(now, duration).fuse();
207            let send_fut = send_fut.fuse();
208            futures::pin_mut!(sleep_fut, send_fut);
209
210            match select(send_fut, sleep_fut).await {
211                Either::Left((res, _)) => res?,
212                Either::Right(_) => return Err(Error::api("Request timed out")),
213            }
214        } else {
215            send_fut.await?
216        };
217
218        Ok(Response {
219            status,
220            headers: response_headers,
221            stream,
222        })
223    }
224}
225
226async fn send_parts(
227    client: &Client,
228    method: Method,
229    url: &str,
230    headers: &[(String, String)],
231    body: &[u8],
232) -> Result<(
233    u16,
234    Vec<(String, String)>,
235    BoxStream<'static, std::io::Result<Vec<u8>>>,
236)> {
237    let parsed = ParsedUrl::parse(url).map_err(|e| Error::api(format!("Invalid URL: {e}")))?;
238    let mut transport = connect_transport(&parsed, client).await?;
239
240    let request_bytes = build_request_bytes(method, &parsed, &client.user_agent, headers, body);
241    transport.write_all(&request_bytes).await?;
242    if !body.is_empty() {
243        transport.write_all(body).await?;
244    }
245    transport.flush().await?;
246
247    let (status, response_headers, leftover) = Box::pin(read_response_head(&mut transport)).await?;
248    let body_kind = body_kind_from_headers(&response_headers);
249
250    let state = BodyStreamState::new(transport, body_kind, leftover);
251    let stream = stream::try_unfold(state, |mut state| async move {
252        match Box::pin(state.next_bytes()).await {
253            Ok(Some(chunk)) => Ok(Some((chunk, state))),
254            Ok(None) => {
255                state.shutdown_transport_best_effort().await;
256                Ok(None)
257            }
258            Err(err) => {
259                state.shutdown_transport_best_effort().await;
260                Err(err)
261            }
262        }
263    })
264    .boxed();
265
266    Ok((status, response_headers, stream))
267}
268
269fn build_recorded_request(
270    method: Method,
271    url: &str,
272    headers: &[(String, String)],
273    body: &[u8],
274) -> RecordedRequest {
275    let mut body_value = None;
276    let mut body_text = None;
277
278    if !body.is_empty() {
279        let is_json = headers.iter().any(|(name, value)| {
280            name.eq_ignore_ascii_case("content-type")
281                && value.to_ascii_lowercase().contains("application/json")
282        });
283
284        if is_json {
285            match serde_json::from_slice::<serde_json::Value>(body) {
286                Ok(value) => body_value = Some(value),
287                Err(_) => body_text = Some(String::from_utf8_lossy(body).to_string()),
288            }
289        } else {
290            body_text = Some(String::from_utf8_lossy(body).to_string());
291        }
292    }
293
294    RecordedRequest {
295        method: method.as_str().to_string(),
296        url: url.to_string(),
297        headers: headers.to_vec(),
298        body: body_value,
299        body_text,
300    }
301}
302
303pub struct Response {
304    status: u16,
305    headers: Vec<(String, String)>,
306    stream: Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>>,
307}
308
309impl Response {
310    #[must_use]
311    pub const fn status(&self) -> u16 {
312        self.status
313    }
314
315    #[must_use]
316    pub fn headers(&self) -> &[(String, String)] {
317        &self.headers
318    }
319
320    #[must_use]
321    pub fn bytes_stream(self) -> Pin<Box<dyn Stream<Item = std::io::Result<Vec<u8>>> + Send>> {
322        self.stream
323    }
324
325    pub async fn text(self) -> Result<String> {
326        let bytes = self
327            .stream
328            .try_fold(Vec::new(), |mut acc, chunk| async move {
329                if acc.len().saturating_add(chunk.len()) > MAX_TEXT_BODY_BYTES {
330                    return Err(std::io::Error::other("response body too large"));
331                }
332                acc.extend_from_slice(&chunk);
333                Ok::<_, std::io::Error>(acc)
334            })
335            .await
336            .map_err(Error::from)?;
337
338        match String::from_utf8(bytes) {
339            Ok(s) => Ok(s),
340            Err(e) => Ok(String::from_utf8_lossy(e.as_bytes()).into_owned()),
341        }
342    }
343}
344
345async fn connect_transport(parsed: &ParsedUrl, client: &Client) -> Result<Transport> {
346    let addr = (parsed.host.clone(), parsed.port);
347    let tcp = TcpStream::connect(addr).await?;
348    match parsed.scheme {
349        Scheme::Http => Ok(Transport::Tcp(tcp)),
350        Scheme::Https => {
351            let tls = client
352                .tls
353                .as_ref()
354                .map_err(|e| Error::api(format!("TLS configuration error: {e}")))?;
355            let tls_stream = tls
356                .clone()
357                .connect(&parsed.host, tcp)
358                .await
359                .map_err(|e| Error::api(format!("TLS connect failed: {e}")))?;
360            Ok(Transport::Tls(Box::new(tls_stream)))
361        }
362    }
363}
364
365/// Strip CR/LF from header names and values to prevent HTTP header injection.
366fn sanitize_header_value(value: &str) -> String {
367    value.chars().filter(|&c| c != '\r' && c != '\n').collect()
368}
369
370fn build_request_bytes(
371    method: Method,
372    parsed: &ParsedUrl,
373    user_agent: &str,
374    headers: &[(String, String)],
375    body: &[u8],
376) -> Vec<u8> {
377    let mut out = String::new();
378    let _ = std::fmt::Write::write_fmt(
379        &mut out,
380        format_args!("{} {} HTTP/1.1\r\n", method.as_str(), parsed.path),
381    );
382    let _ = std::fmt::Write::write_fmt(&mut out, format_args!("Host: {}\r\n", parsed.host));
383    let _ = std::fmt::Write::write_fmt(&mut out, format_args!("User-Agent: {user_agent}\r\n"));
384    let _ =
385        std::fmt::Write::write_fmt(&mut out, format_args!("Content-Length: {}\r\n", body.len()));
386
387    for (name, value) in headers {
388        let clean_name = sanitize_header_value(name);
389        let clean_value = sanitize_header_value(value);
390        let _ =
391            std::fmt::Write::write_fmt(&mut out, format_args!("{clean_name}: {clean_value}\r\n"));
392    }
393
394    out.push_str("\r\n");
395    out.into_bytes()
396}
397
398async fn read_response_head(
399    transport: &mut Transport,
400) -> Result<(u16, Vec<(String, String)>, Vec<u8>)> {
401    let mut buf = Vec::with_capacity(8192);
402    let mut scratch = [0u8; READ_CHUNK_BYTES];
403    let mut search_start = 0;
404
405    loop {
406        if buf.len() > MAX_HEADER_BYTES {
407            return Err(Error::api("HTTP response headers too large"));
408        }
409
410        let haystack = &buf[search_start..];
411        if let Some(pos) = find_headers_end(haystack) {
412            let absolute_pos = search_start + pos;
413            let head = &buf[..absolute_pos];
414            let leftover = buf[absolute_pos..].to_vec();
415            let (status, headers) = parse_response_head(head)?;
416            return Ok((status, headers, leftover));
417        }
418
419        let n = read_some(transport, &mut scratch).await?;
420        if n == 0 {
421            return Err(Error::api("HTTP connection closed before headers"));
422        }
423        let old_len = buf.len();
424        buf.extend_from_slice(&scratch[..n]);
425        search_start = old_len.saturating_sub(3);
426    }
427}
428
429fn find_headers_end(buf: &[u8]) -> Option<usize> {
430    buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4)
431}
432
433fn parse_response_head(head: &[u8]) -> Result<(u16, Vec<(String, String)>)> {
434    let text =
435        std::str::from_utf8(head).map_err(|e| Error::api(format!("Invalid HTTP headers: {e}")))?;
436    let mut lines = text.split("\r\n");
437
438    let status_line = lines
439        .next()
440        .ok_or_else(|| Error::api("Missing HTTP status line"))?;
441    let mut parts = status_line.split_whitespace();
442    let _version = parts
443        .next()
444        .ok_or_else(|| Error::api("Invalid HTTP status line"))?;
445    let status_str = parts
446        .next()
447        .ok_or_else(|| Error::api("Invalid HTTP status line"))?;
448    let status: u16 = status_str
449        .parse()
450        .map_err(|_| Error::api("Invalid HTTP status code"))?;
451
452    let mut headers = Vec::new();
453    for line in lines {
454        if line.is_empty() {
455            continue;
456        }
457        let (name, value) = line
458            .split_once(':')
459            .ok_or_else(|| Error::api("Invalid HTTP header line"))?;
460        headers.push((name.trim().to_string(), value.trim().to_string()));
461    }
462
463    Ok((status, headers))
464}
465
466#[derive(Debug, Clone, Copy)]
467enum BodyKind {
468    Empty,
469    ContentLength(usize),
470    Chunked,
471    Eof,
472}
473
474fn body_kind_from_headers(headers: &[(String, String)]) -> BodyKind {
475    let mut content_length = None;
476    let mut transfer_encoding = None;
477
478    for (name, value) in headers {
479        let name_lc = name.to_ascii_lowercase();
480        if name_lc == "content-length" {
481            content_length = value.trim().parse::<usize>().ok();
482        } else if name_lc == "transfer-encoding" {
483            transfer_encoding = Some(value.to_ascii_lowercase());
484        }
485    }
486
487    if let Some(te) = transfer_encoding {
488        if te.split(',').any(|v| v.trim() == "chunked") {
489            return BodyKind::Chunked;
490        }
491    }
492
493    match content_length {
494        Some(0) => BodyKind::Empty,
495        Some(n) => BodyKind::ContentLength(n),
496        None => BodyKind::Eof,
497    }
498}
499
500struct Buffer {
501    bytes: Vec<u8>,
502    pos: usize,
503}
504
505impl Buffer {
506    const fn new(initial: Vec<u8>) -> Self {
507        Self {
508            bytes: initial,
509            pos: 0,
510        }
511    }
512
513    fn available(&self) -> &[u8] {
514        &self.bytes[self.pos..]
515    }
516
517    fn len(&self) -> usize {
518        self.available().len()
519    }
520
521    fn is_empty(&self) -> bool {
522        self.len() == 0
523    }
524
525    fn consume(&mut self, n: usize) {
526        self.pos = self.pos.saturating_add(n);
527        if self.pos == self.bytes.len() {
528            self.bytes.clear();
529            self.pos = 0;
530        } else if self.pos > 0 && self.pos >= self.bytes.len() / 2 {
531            self.bytes.drain(..self.pos);
532            self.pos = 0;
533        }
534    }
535
536    fn extend(&mut self, data: &[u8]) -> Result<()> {
537        if self.bytes.len().saturating_add(data.len()) > MAX_BUFFERED_BYTES {
538            return Err(Error::api("HTTP body buffer exceeded"));
539        }
540        self.bytes.extend_from_slice(data);
541        Ok(())
542    }
543
544    fn split_to_vec(&mut self, n: usize) -> Vec<u8> {
545        let n = n.min(self.len());
546        let out = self.available()[..n].to_vec();
547        self.consume(n);
548        out
549    }
550}
551
552enum ChunkedState {
553    SizeLine,
554    Data { remaining: usize },
555    DataCrlf,
556    Trailers,
557    Done,
558}
559
560struct BodyStreamState {
561    transport: Transport,
562    kind: BodyKind,
563    buf: Buffer,
564    chunked_state: ChunkedState,
565    remaining: usize,
566    transport_closed: bool,
567}
568
569impl BodyStreamState {
570    const fn new(transport: Transport, kind: BodyKind, leftover: Vec<u8>) -> Self {
571        let remaining = match kind {
572            BodyKind::ContentLength(n) => n,
573            _ => 0,
574        };
575        Self {
576            transport,
577            kind,
578            buf: Buffer::new(leftover),
579            chunked_state: ChunkedState::SizeLine,
580            remaining,
581            transport_closed: false,
582        }
583    }
584
585    async fn next_bytes(&mut self) -> std::io::Result<Option<Vec<u8>>> {
586        match self.kind {
587            BodyKind::Empty => Ok(None),
588            BodyKind::Eof => Box::pin(self.next_eof()).await,
589            BodyKind::ContentLength(_) => Box::pin(self.next_content_length()).await,
590            BodyKind::Chunked => Box::pin(self.next_chunked()).await,
591        }
592    }
593
594    async fn shutdown_transport_best_effort(&mut self) {
595        if self.transport_closed {
596            return;
597        }
598        self.transport_closed = true;
599        let _ = self.transport.shutdown().await;
600    }
601
602    async fn read_more(&mut self) -> std::io::Result<usize> {
603        let mut scratch = [0u8; READ_CHUNK_BYTES];
604        let n = read_some(&mut self.transport, &mut scratch).await?;
605        if n > 0 {
606            if let Err(err) = self.buf.extend(&scratch[..n]) {
607                return Err(std::io::Error::other(err.to_string()));
608            }
609        }
610        Ok(n)
611    }
612
613    async fn next_eof(&mut self) -> std::io::Result<Option<Vec<u8>>> {
614        if !self.buf.is_empty() {
615            return Ok(Some(self.buf.split_to_vec(self.buf.len())));
616        }
617
618        let n = Box::pin(self.read_more()).await?;
619        if n == 0 {
620            return Ok(None);
621        }
622        Ok(Some(self.buf.split_to_vec(self.buf.len())))
623    }
624
625    async fn next_content_length(&mut self) -> std::io::Result<Option<Vec<u8>>> {
626        if self.remaining == 0 {
627            return Ok(None);
628        }
629
630        if self.buf.is_empty() {
631            let n = Box::pin(self.read_more()).await?;
632            if n == 0 {
633                return Err(std::io::Error::new(
634                    std::io::ErrorKind::UnexpectedEof,
635                    "unexpected EOF reading content-length body",
636                ));
637            }
638        }
639
640        let to_take = self.remaining.min(self.buf.len()).min(READ_CHUNK_BYTES);
641        let out = self.buf.split_to_vec(to_take);
642        self.remaining = self.remaining.saturating_sub(out.len());
643        Ok(Some(out))
644    }
645
646    async fn next_chunked(&mut self) -> std::io::Result<Option<Vec<u8>>> {
647        loop {
648            match self.chunked_state {
649                ChunkedState::SizeLine => {
650                    if let Some(line_end) = find_crlf(self.buf.available()) {
651                        let line = &self.buf.available()[..line_end];
652                        let line_str = std::str::from_utf8(line).map_err(std::io::Error::other)?;
653                        let size_part = line_str.split(';').next().unwrap_or("").trim();
654                        if size_part.is_empty() {
655                            return Err(std::io::Error::other("invalid chunk size"));
656                        }
657                        let chunk_size = usize::from_str_radix(size_part, 16)
658                            .map_err(|_| std::io::Error::other("invalid chunk size"))?;
659                        self.buf.consume(line_end + 2);
660                        if chunk_size == 0 {
661                            self.chunked_state = ChunkedState::Trailers;
662                        } else {
663                            self.chunked_state = ChunkedState::Data {
664                                remaining: chunk_size,
665                            };
666                        }
667                        continue;
668                    }
669
670                    let n = Box::pin(self.read_more()).await?;
671                    if n == 0 {
672                        return Err(std::io::Error::new(
673                            std::io::ErrorKind::UnexpectedEof,
674                            "unexpected EOF reading chunk size",
675                        ));
676                    }
677                }
678
679                ChunkedState::Data { remaining } => {
680                    if remaining == 0 {
681                        self.chunked_state = ChunkedState::DataCrlf;
682                        continue;
683                    }
684
685                    if self.buf.is_empty() {
686                        let n = Box::pin(self.read_more()).await?;
687                        if n == 0 {
688                            return Err(std::io::Error::new(
689                                std::io::ErrorKind::UnexpectedEof,
690                                "unexpected EOF reading chunk data",
691                            ));
692                        }
693                    }
694
695                    let to_take = remaining.min(self.buf.len()).min(READ_CHUNK_BYTES);
696                    let out = self.buf.split_to_vec(to_take);
697                    self.chunked_state = ChunkedState::Data {
698                        remaining: remaining.saturating_sub(out.len()),
699                    };
700                    return Ok(Some(out));
701                }
702
703                ChunkedState::DataCrlf => {
704                    if self.buf.len() < 2 {
705                        let n = Box::pin(self.read_more()).await?;
706                        if n == 0 {
707                            return Err(std::io::Error::new(
708                                std::io::ErrorKind::UnexpectedEof,
709                                "unexpected EOF reading chunk CRLF",
710                            ));
711                        }
712                        continue;
713                    }
714
715                    let bytes = self.buf.available();
716                    if bytes[0] != b'\r' || bytes[1] != b'\n' {
717                        return Err(std::io::Error::other("invalid chunk CRLF"));
718                    }
719                    self.buf.consume(2);
720                    self.chunked_state = ChunkedState::SizeLine;
721                }
722
723                ChunkedState::Trailers => {
724                    // Trailers are terminated by an empty line. When there are no trailers,
725                    // the terminator is a single CRLF (`0\r\n\r\n` total, with the final
726                    // `\r\n` remaining after consuming the size line).
727                    let bytes = self.buf.available();
728                    if bytes.len() >= 2 && bytes[0] == b'\r' && bytes[1] == b'\n' {
729                        self.buf.consume(2);
730                        self.chunked_state = ChunkedState::Done;
731                        return Ok(None);
732                    }
733                    if let Some(end) = find_double_crlf(self.buf.available()) {
734                        self.buf.consume(end);
735                        self.chunked_state = ChunkedState::Done;
736                        return Ok(None);
737                    }
738
739                    let n = Box::pin(self.read_more()).await?;
740                    if n == 0 {
741                        return Err(std::io::Error::new(
742                            std::io::ErrorKind::UnexpectedEof,
743                            "unexpected EOF reading trailers",
744                        ));
745                    }
746                }
747
748                ChunkedState::Done => return Ok(None),
749            }
750        }
751    }
752}
753
754fn find_crlf(buf: &[u8]) -> Option<usize> {
755    buf.windows(2).position(|w| w == b"\r\n")
756}
757
758fn find_double_crlf(buf: &[u8]) -> Option<usize> {
759    buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4)
760}
761
762async fn read_some<R: AsyncRead + Unpin>(reader: &mut R, dst: &mut [u8]) -> std::io::Result<usize> {
763    futures::future::poll_fn(|cx| {
764        let mut read_buf = ReadBuf::new(dst);
765        match Pin::new(&mut *reader).poll_read(cx, &mut read_buf) {
766            Poll::Pending => Poll::Pending,
767            Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())),
768            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
769        }
770    })
771    .await
772}
773
774#[derive(Debug)]
775enum Transport {
776    Tcp(TcpStream),
777    Tls(Box<asupersync::tls::TlsStream<TcpStream>>),
778}
779
780impl Unpin for Transport {}
781
782impl AsyncRead for Transport {
783    fn poll_read(
784        mut self: Pin<&mut Self>,
785        cx: &mut Context<'_>,
786        buf: &mut ReadBuf<'_>,
787    ) -> Poll<std::io::Result<()>> {
788        match &mut *self {
789            Self::Tcp(stream) => Pin::new(stream).poll_read(cx, buf),
790            Self::Tls(stream) => Pin::new(&mut **stream).poll_read(cx, buf),
791        }
792    }
793}
794
795impl AsyncWrite for Transport {
796    fn poll_write(
797        mut self: Pin<&mut Self>,
798        cx: &mut Context<'_>,
799        buf: &[u8],
800    ) -> Poll<std::io::Result<usize>> {
801        match &mut *self {
802            Self::Tcp(stream) => Pin::new(stream).poll_write(cx, buf),
803            Self::Tls(stream) => Pin::new(&mut **stream).poll_write(cx, buf),
804        }
805    }
806
807    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
808        match &mut *self {
809            Self::Tcp(stream) => Pin::new(stream).poll_flush(cx),
810            Self::Tls(stream) => Pin::new(&mut **stream).poll_flush(cx),
811        }
812    }
813
814    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
815        match &mut *self {
816            Self::Tcp(stream) => Pin::new(stream).poll_shutdown(cx),
817            Self::Tls(stream) => Pin::new(&mut **stream).poll_shutdown(cx),
818        }
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use super::*;
825    use serde_json::json;
826
827    // ── Method ──────────────────────────────────────────────────────────
828    #[test]
829    fn method_as_str_get() {
830        assert_eq!(Method::Get.as_str(), "GET");
831    }
832
833    #[test]
834    fn method_as_str_post() {
835        assert_eq!(Method::Post.as_str(), "POST");
836    }
837
838    // ── find_headers_end ────────────────────────────────────────────────
839    #[test]
840    fn find_headers_end_present() {
841        let buf = b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello";
842        let pos = find_headers_end(buf).unwrap();
843        assert_eq!(&buf[pos..], b"hello");
844    }
845
846    #[test]
847    fn find_headers_end_absent() {
848        assert!(find_headers_end(b"HTTP/1.1 200 OK\r\nFoo: bar\r\n").is_none());
849    }
850
851    #[test]
852    fn find_headers_end_empty() {
853        assert!(find_headers_end(b"").is_none());
854    }
855
856    #[test]
857    fn find_headers_end_just_separator() {
858        let buf = b"\r\n\r\n";
859        assert_eq!(find_headers_end(buf), Some(4));
860    }
861
862    // ── find_crlf ──────────────────────────────────────────────────────
863    #[test]
864    fn find_crlf_present() {
865        assert_eq!(find_crlf(b"abc\r\ndef"), Some(3));
866    }
867
868    #[test]
869    fn find_crlf_absent() {
870        assert!(find_crlf(b"abcdef").is_none());
871    }
872
873    #[test]
874    fn find_crlf_at_start() {
875        assert_eq!(find_crlf(b"\r\ndata"), Some(0));
876    }
877
878    // ── find_double_crlf ───────────────────────────────────────────────
879    #[test]
880    fn find_double_crlf_present() {
881        let buf = b"headers\r\n\r\nbody";
882        assert_eq!(find_double_crlf(buf), Some(11));
883    }
884
885    #[test]
886    fn find_double_crlf_absent() {
887        assert!(find_double_crlf(b"headers\r\nbody").is_none());
888    }
889
890    // ── parse_response_head ────────────────────────────────────────────
891    #[test]
892    fn parse_response_head_200() {
893        let head = b"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n";
894        let (status, headers) = parse_response_head(head).unwrap();
895        assert_eq!(status, 200);
896        assert_eq!(headers.len(), 1);
897        assert_eq!(headers[0].0, "Content-Type");
898        assert_eq!(headers[0].1, "text/plain");
899    }
900
901    #[test]
902    fn parse_response_head_404() {
903        let head = b"HTTP/1.1 404 Not Found\r\n\r\n";
904        let (status, headers) = parse_response_head(head).unwrap();
905        assert_eq!(status, 404);
906        assert!(headers.is_empty());
907    }
908
909    #[test]
910    fn parse_response_head_multiple_headers() {
911        let head = b"HTTP/1.1 200 OK\r\nA: 1\r\nB: 2\r\nC: 3\r\n\r\n";
912        let (status, headers) = parse_response_head(head).unwrap();
913        assert_eq!(status, 200);
914        assert_eq!(headers.len(), 3);
915        assert_eq!(headers[0], ("A".to_string(), "1".to_string()));
916        assert_eq!(headers[1], ("B".to_string(), "2".to_string()));
917        assert_eq!(headers[2], ("C".to_string(), "3".to_string()));
918    }
919
920    #[test]
921    fn parse_response_head_header_value_with_colon() {
922        // Header value contains a colon (e.g., a URL)
923        let head = b"HTTP/1.1 200 OK\r\nLocation: http://example.com:8080/path\r\n\r\n";
924        let (status, headers) = parse_response_head(head).unwrap();
925        assert_eq!(status, 200);
926        assert_eq!(headers[0].0, "Location");
927        assert_eq!(headers[0].1, "http://example.com:8080/path");
928    }
929
930    #[test]
931    fn parse_response_head_invalid_status_code() {
932        let head = b"HTTP/1.1 abc OK\r\n\r\n";
933        assert!(parse_response_head(head).is_err());
934    }
935
936    #[test]
937    fn parse_response_head_missing_status() {
938        let head = b"HTTP/1.1\r\n\r\n";
939        assert!(parse_response_head(head).is_err());
940    }
941
942    #[test]
943    fn parse_response_head_empty() {
944        let head = b"";
945        assert!(parse_response_head(head).is_err());
946    }
947
948    // ── body_kind_from_headers ─────────────────────────────────────────
949    #[test]
950    fn body_kind_content_length() {
951        let headers = vec![("Content-Length".to_string(), "42".to_string())];
952        assert!(matches!(
953            body_kind_from_headers(&headers),
954            BodyKind::ContentLength(42)
955        ));
956    }
957
958    #[test]
959    fn body_kind_content_length_zero() {
960        let headers = vec![("Content-Length".to_string(), "0".to_string())];
961        assert!(matches!(body_kind_from_headers(&headers), BodyKind::Empty));
962    }
963
964    #[test]
965    fn body_kind_chunked() {
966        let headers = vec![("Transfer-Encoding".to_string(), "chunked".to_string())];
967        assert!(matches!(
968            body_kind_from_headers(&headers),
969            BodyKind::Chunked
970        ));
971    }
972
973    #[test]
974    fn body_kind_chunked_mixed() {
975        // Transfer-Encoding with multiple values
976        let headers = vec![("Transfer-Encoding".to_string(), "gzip, chunked".to_string())];
977        assert!(matches!(
978            body_kind_from_headers(&headers),
979            BodyKind::Chunked
980        ));
981    }
982
983    #[test]
984    fn body_kind_chunked_overrides_content_length() {
985        // When both present, chunked wins
986        let headers = vec![
987            ("Content-Length".to_string(), "100".to_string()),
988            ("Transfer-Encoding".to_string(), "chunked".to_string()),
989        ];
990        assert!(matches!(
991            body_kind_from_headers(&headers),
992            BodyKind::Chunked
993        ));
994    }
995
996    #[test]
997    fn body_kind_eof_no_headers() {
998        let headers: Vec<(String, String)> = Vec::new();
999        assert!(matches!(body_kind_from_headers(&headers), BodyKind::Eof));
1000    }
1001
1002    #[test]
1003    fn body_kind_case_insensitive() {
1004        let headers = vec![("content-length".to_string(), "10".to_string())];
1005        assert!(matches!(
1006            body_kind_from_headers(&headers),
1007            BodyKind::ContentLength(10)
1008        ));
1009    }
1010
1011    // ── build_request_bytes ────────────────────────────────────────────
1012    #[test]
1013    fn build_request_bytes_get() {
1014        let parsed = ParsedUrl::parse("http://example.com/api/test").unwrap();
1015        let bytes = build_request_bytes(Method::Get, &parsed, "test-agent", &[], &[]);
1016        let text = String::from_utf8(bytes).unwrap();
1017        assert!(text.starts_with("GET /api/test HTTP/1.1\r\n"));
1018        assert!(text.contains("Host: example.com\r\n"));
1019        assert!(text.contains("User-Agent: test-agent\r\n"));
1020        assert!(text.contains("Content-Length: 0\r\n"));
1021        assert!(text.ends_with("\r\n\r\n"));
1022    }
1023
1024    #[test]
1025    fn build_request_bytes_post_with_body() {
1026        let parsed = ParsedUrl::parse("https://api.example.com/v1/messages").unwrap();
1027        let body = b"hello world";
1028        let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1029        let bytes = build_request_bytes(Method::Post, &parsed, "pi/0.1", &headers, body);
1030        let text = String::from_utf8(bytes).unwrap();
1031        assert!(text.starts_with("POST /v1/messages HTTP/1.1\r\n"));
1032        assert!(text.contains("Host: api.example.com\r\n"));
1033        assert!(text.contains("Content-Length: 11\r\n"));
1034        assert!(text.contains("Content-Type: application/json\r\n"));
1035    }
1036
1037    #[test]
1038    fn build_request_bytes_custom_headers() {
1039        let parsed = ParsedUrl::parse("http://localhost/test").unwrap();
1040        let headers = vec![
1041            ("Authorization".to_string(), "Bearer sk-test".to_string()),
1042            ("X-Custom".to_string(), "value".to_string()),
1043        ];
1044        let bytes = build_request_bytes(Method::Post, &parsed, "agent", &headers, &[]);
1045        let text = String::from_utf8(bytes).unwrap();
1046        assert!(text.contains("Authorization: Bearer sk-test\r\n"));
1047        assert!(text.contains("X-Custom: value\r\n"));
1048    }
1049
1050    // ── build_recorded_request ─────────────────────────────────────────
1051    #[test]
1052    fn build_recorded_request_empty_body() {
1053        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &[], &[]);
1054        assert_eq!(req.method, "POST");
1055        assert_eq!(req.url, "https://api.test.com/v1");
1056        assert!(req.body.is_none());
1057        assert!(req.body_text.is_none());
1058    }
1059
1060    #[test]
1061    fn build_recorded_request_json_body() {
1062        let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1063        let body = serde_json::to_vec(&json!({"model": "test"})).unwrap();
1064        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, &body);
1065        assert!(req.body.is_some());
1066        assert_eq!(req.body.unwrap()["model"], "test");
1067        assert!(req.body_text.is_none());
1068    }
1069
1070    #[test]
1071    fn build_recorded_request_text_body() {
1072        let headers = vec![("Content-Type".to_string(), "text/plain".to_string())];
1073        let body = b"hello world";
1074        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, body);
1075        assert!(req.body.is_none());
1076        assert_eq!(req.body_text.as_deref(), Some("hello world"));
1077    }
1078
1079    #[test]
1080    fn build_recorded_request_invalid_json_body_falls_back_to_text() {
1081        let headers = vec![("Content-Type".to_string(), "application/json".to_string())];
1082        let body = b"not json {{{";
1083        let req = build_recorded_request(Method::Post, "https://api.test.com/v1", &headers, body);
1084        assert!(req.body.is_none());
1085        assert_eq!(req.body_text.as_deref(), Some("not json {{{"));
1086    }
1087
1088    #[test]
1089    fn build_recorded_request_preserves_headers() {
1090        let headers = vec![
1091            ("Authorization".to_string(), "Bearer key".to_string()),
1092            ("X-Trace".to_string(), "abc123".to_string()),
1093        ];
1094        let req = build_recorded_request(Method::Get, "https://test.com", &headers, &[]);
1095        assert_eq!(req.headers.len(), 2);
1096        assert_eq!(req.headers[0].0, "Authorization");
1097    }
1098
1099    // ── Buffer ─────────────────────────────────────────────────────────
1100    #[test]
1101    fn buffer_new_empty() {
1102        let buf = Buffer::new(Vec::new());
1103        assert!(buf.is_empty());
1104        assert_eq!(buf.len(), 0);
1105    }
1106
1107    #[test]
1108    fn buffer_new_with_data() {
1109        let buf = Buffer::new(vec![1, 2, 3]);
1110        assert!(!buf.is_empty());
1111        assert_eq!(buf.len(), 3);
1112        assert_eq!(buf.available(), &[1, 2, 3]);
1113    }
1114
1115    #[test]
1116    fn buffer_consume_partial() {
1117        let mut buf = Buffer::new(vec![1, 2, 3, 4, 5]);
1118        buf.consume(2);
1119        assert_eq!(buf.len(), 3);
1120        assert_eq!(buf.available(), &[3, 4, 5]);
1121    }
1122
1123    #[test]
1124    fn buffer_consume_all() {
1125        let mut buf = Buffer::new(vec![1, 2, 3]);
1126        buf.consume(3);
1127        assert!(buf.is_empty());
1128        assert_eq!(buf.len(), 0);
1129    }
1130
1131    #[test]
1132    fn buffer_consume_triggers_compact() {
1133        // When pos >= len/2, the buffer compacts
1134        let mut buf = Buffer::new(vec![0; 10]);
1135        buf.consume(6); // pos=6, len=10, 6 >= 5 → compact
1136        assert_eq!(buf.len(), 4);
1137        assert_eq!(buf.available().len(), 4);
1138    }
1139
1140    #[test]
1141    fn buffer_extend() {
1142        let mut buf = Buffer::new(vec![1, 2]);
1143        buf.extend(&[3, 4, 5]).unwrap();
1144        assert_eq!(buf.len(), 5);
1145        assert_eq!(buf.available(), &[1, 2, 3, 4, 5]);
1146    }
1147
1148    #[test]
1149    fn buffer_extend_overflow() {
1150        let mut buf = Buffer::new(Vec::new());
1151        let huge = vec![0u8; MAX_BUFFERED_BYTES + 1];
1152        assert!(buf.extend(&huge).is_err());
1153    }
1154
1155    #[test]
1156    fn buffer_split_to_vec() {
1157        let mut buf = Buffer::new(vec![1, 2, 3, 4, 5]);
1158        let out = buf.split_to_vec(3);
1159        assert_eq!(out, vec![1, 2, 3]);
1160        assert_eq!(buf.len(), 2);
1161        assert_eq!(buf.available(), &[4, 5]);
1162    }
1163
1164    #[test]
1165    fn buffer_split_to_vec_more_than_available() {
1166        let mut buf = Buffer::new(vec![1, 2]);
1167        let out = buf.split_to_vec(10);
1168        assert_eq!(out, vec![1, 2]);
1169        assert!(buf.is_empty());
1170    }
1171
1172    #[test]
1173    fn buffer_consume_then_extend() {
1174        let mut buf = Buffer::new(vec![1, 2, 3]);
1175        buf.consume(2);
1176        buf.extend(&[4, 5]).unwrap();
1177        // After consume(2), available = [3], then extend [4,5] → [3, 4, 5]
1178        assert_eq!(buf.available(), &[3, 4, 5]);
1179    }
1180
1181    #[test]
1182    fn buffer_consume_exactly_all_clears() {
1183        let mut buf = Buffer::new(vec![1, 2, 3]);
1184        buf.consume(3);
1185        // pos == bytes.len() triggers clear
1186        assert!(buf.is_empty());
1187        assert_eq!(buf.available(), &[] as &[u8]);
1188    }
1189
1190    // ── Client builder methods ─────────────────────────────────────────
1191    #[test]
1192    fn client_default() {
1193        let client = Client::default();
1194        assert!(client.vcr().is_none());
1195    }
1196
1197    #[test]
1198    fn client_with_vcr() {
1199        let recorder = VcrRecorder::new_with(
1200            "test",
1201            crate::vcr::VcrMode::Playback,
1202            std::path::Path::new("/tmp"),
1203        );
1204        let client = Client::new().with_vcr(recorder);
1205        assert!(client.vcr().is_some());
1206    }
1207
1208    // ── RequestBuilder ─────────────────────────────────────────────────
1209    #[test]
1210    fn request_builder_header_chaining() {
1211        let client = Client::new();
1212        let builder = client
1213            .post("https://api.example.com")
1214            .header("Authorization", "Bearer test")
1215            .header("X-Custom", "value");
1216        assert_eq!(builder.headers.len(), 2);
1217    }
1218
1219    #[test]
1220    fn request_builder_json() {
1221        let client = Client::new();
1222        let builder = client
1223            .post("https://api.example.com")
1224            .json(&json!({"key": "value"}))
1225            .unwrap();
1226        assert!(!builder.body.is_empty());
1227        // Should have auto-added Content-Type header
1228        assert!(
1229            builder
1230                .headers
1231                .iter()
1232                .any(|(k, v)| k == "Content-Type" && v == "application/json")
1233        );
1234    }
1235
1236    #[test]
1237    fn request_builder_body() {
1238        let client = Client::new();
1239        let builder = client
1240            .post("https://api.example.com")
1241            .body(b"raw bytes".to_vec());
1242        assert_eq!(builder.body, b"raw bytes");
1243    }
1244
1245    #[test]
1246    fn request_builder_default_timeout() {
1247        let client = Client::new();
1248        let builder = client.get("https://api.example.com");
1249        assert_eq!(
1250            builder.timeout,
1251            Some(std::time::Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS))
1252        );
1253    }
1254
1255    #[test]
1256    fn request_builder_timeout() {
1257        let client = Client::new();
1258        let builder = client
1259            .get("https://api.example.com")
1260            .timeout(std::time::Duration::from_secs(30));
1261        assert_eq!(builder.timeout, Some(std::time::Duration::from_secs(30)));
1262    }
1263
1264    #[test]
1265    fn request_builder_no_timeout() {
1266        let client = Client::new();
1267        let builder = client.get("https://api.example.com").no_timeout();
1268        assert_eq!(builder.timeout, None);
1269    }
1270
1271    // ── Response ───────────────────────────────────────────────────────
1272    #[test]
1273    fn response_accessors() {
1274        let response = Response {
1275            status: 200,
1276            headers: vec![("Content-Type".to_string(), "text/plain".to_string())],
1277            stream: Box::pin(futures::stream::empty()),
1278        };
1279        assert_eq!(response.status(), 200);
1280        assert_eq!(response.headers().len(), 1);
1281        assert_eq!(response.headers()[0].0, "Content-Type");
1282    }
1283
1284    #[test]
1285    fn response_text() {
1286        asupersync::test_utils::run_test(|| async {
1287            let chunks = vec![Ok(b"hello ".to_vec()), Ok(b"world".to_vec())];
1288            let response = Response {
1289                status: 200,
1290                headers: Vec::new(),
1291                stream: Box::pin(futures::stream::iter(chunks)),
1292            };
1293            let text = response.text().await.unwrap();
1294            assert_eq!(text, "hello world");
1295        });
1296    }
1297
1298    #[test]
1299    fn response_text_empty() {
1300        asupersync::test_utils::run_test(|| async {
1301            let response = Response {
1302                status: 200,
1303                headers: Vec::new(),
1304                stream: Box::pin(futures::stream::empty()),
1305            };
1306            let text = response.text().await.unwrap();
1307            assert_eq!(text, "");
1308        });
1309    }
1310
1311    #[test]
1312    fn response_bytes_stream() {
1313        asupersync::test_utils::run_test(|| async {
1314            let chunks = vec![Ok(b"data".to_vec())];
1315            let response = Response {
1316                status: 200,
1317                headers: Vec::new(),
1318                stream: Box::pin(futures::stream::iter(chunks)),
1319            };
1320            let mut stream = response.bytes_stream();
1321            let first = stream.next().await.unwrap().unwrap();
1322            assert_eq!(first, b"data");
1323            assert!(stream.next().await.is_none());
1324        });
1325    }
1326
1327    // ── Body stream via Response (in-memory) ──────────────────────────
1328    #[test]
1329    fn body_stream_content_length_via_response() {
1330        asupersync::test_utils::run_test(|| async {
1331            // Simulate a content-length response by providing exact chunks
1332            let body = b"Hello, World!";
1333            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(body.to_vec())];
1334            let response = Response {
1335                status: 200,
1336                headers: vec![("Content-Length".to_string(), "13".to_string())],
1337                stream: Box::pin(futures::stream::iter(chunks)),
1338            };
1339            let text = response.text().await.unwrap();
1340            assert_eq!(text, "Hello, World!");
1341        });
1342    }
1343
1344    #[test]
1345    fn body_stream_multiple_chunks_via_response() {
1346        asupersync::test_utils::run_test(|| async {
1347            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![
1348                Ok(b"chunk1".to_vec()),
1349                Ok(b"chunk2".to_vec()),
1350                Ok(b"chunk3".to_vec()),
1351            ];
1352            let response = Response {
1353                status: 200,
1354                headers: Vec::new(),
1355                stream: Box::pin(futures::stream::iter(chunks)),
1356            };
1357            let text = response.text().await.unwrap();
1358            assert_eq!(text, "chunk1chunk2chunk3");
1359        });
1360    }
1361
1362    #[test]
1363    fn body_stream_error_propagation() {
1364        asupersync::test_utils::run_test(|| async {
1365            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![
1366                Ok(b"data".to_vec()),
1367                Err(std::io::Error::new(
1368                    std::io::ErrorKind::ConnectionReset,
1369                    "connection reset",
1370                )),
1371            ];
1372            let response = Response {
1373                status: 200,
1374                headers: Vec::new(),
1375                stream: Box::pin(futures::stream::iter(chunks)),
1376            };
1377            let result = response.text().await;
1378            assert!(result.is_err());
1379        });
1380    }
1381
1382    // ── Edge cases ─────────────────────────────────────────────────────
1383    #[test]
1384    fn parse_response_head_trims_header_whitespace() {
1385        let head = b"HTTP/1.1 200 OK\r\n  X-Padded  :   value with spaces  \r\n\r\n";
1386        let (status, headers) = parse_response_head(head).unwrap();
1387        assert_eq!(status, 200);
1388        assert_eq!(headers[0].0, "X-Padded");
1389        assert_eq!(headers[0].1, "value with spaces");
1390    }
1391
1392    #[test]
1393    fn parse_response_head_status_codes() {
1394        for (code, line) in [
1395            (100, "HTTP/1.1 100 Continue"),
1396            (201, "HTTP/1.1 201 Created"),
1397            (301, "HTTP/1.1 301 Moved Permanently"),
1398            (400, "HTTP/1.1 400 Bad Request"),
1399            (429, "HTTP/1.1 429 Too Many Requests"),
1400            (500, "HTTP/1.1 500 Internal Server Error"),
1401            (503, "HTTP/1.1 503 Service Unavailable"),
1402        ] {
1403            let head = format!("{line}\r\n\r\n");
1404            let (status, _) = parse_response_head(head.as_bytes()).unwrap();
1405            assert_eq!(status, code, "Failed to parse status {code}");
1406        }
1407    }
1408
1409    #[test]
1410    fn body_kind_invalid_content_length_falls_to_eof() {
1411        let headers = vec![("Content-Length".to_string(), "not-a-number".to_string())];
1412        // parse fails, content_length stays None → Eof
1413        assert!(matches!(body_kind_from_headers(&headers), BodyKind::Eof));
1414    }
1415
1416    #[test]
1417    fn build_request_bytes_empty_path() {
1418        let parsed = ParsedUrl::parse("http://example.com").unwrap();
1419        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &[], &[]);
1420        let text = String::from_utf8(bytes).unwrap();
1421        // Should have "/" as path
1422        assert!(text.starts_with("GET /"));
1423    }
1424
1425    #[test]
1426    fn build_recorded_request_content_type_case_insensitive() {
1427        let headers = vec![("content-type".to_string(), "APPLICATION/JSON".to_string())];
1428        let body = serde_json::to_vec(&json!({"test": true})).unwrap();
1429        let req = build_recorded_request(Method::Post, "https://test.com", &headers, &body);
1430        // Should detect JSON despite case differences
1431        assert!(req.body.is_some());
1432    }
1433
1434    // ── CRLF header injection prevention ──────────────────────────────
1435    #[test]
1436    fn sanitize_header_value_strips_crlf() {
1437        assert_eq!(sanitize_header_value("normal value"), "normal value");
1438        assert_eq!(
1439            sanitize_header_value("injected\r\nEvil: header"),
1440            "injectedEvil: header"
1441        );
1442        assert_eq!(sanitize_header_value("bare\nnewline"), "barenewline");
1443        assert_eq!(sanitize_header_value("bare\rreturn"), "barereturn");
1444        assert_eq!(sanitize_header_value(""), "");
1445    }
1446
1447    #[test]
1448    fn build_request_bytes_strips_crlf_from_headers() {
1449        let parsed = ParsedUrl::parse("http://example.com/test").unwrap();
1450        let headers = vec![(
1451            "X-Injected\r\nEvil".to_string(),
1452            "value\r\nX-Bad: smuggled".to_string(),
1453        )];
1454        let bytes = build_request_bytes(Method::Get, &parsed, "agent", &headers, &[]);
1455        let text = String::from_utf8(bytes).unwrap();
1456        // CRLF should be stripped — no injected header line
1457        assert!(text.contains("X-InjectedEvil: valueX-Bad: smuggled\r\n"));
1458        // The smuggled header must NOT appear as a separate line
1459        assert!(!text.contains("\r\nX-Bad: smuggled\r\n"));
1460    }
1461
1462    // ── Response body size limit ──────────────────────────────────────
1463    #[test]
1464    fn response_text_rejects_oversized_body() {
1465        asupersync::test_utils::run_test(|| async {
1466            // Create a stream that would exceed MAX_TEXT_BODY_BYTES
1467            let big_chunk = vec![0u8; MAX_TEXT_BODY_BYTES + 1];
1468            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(big_chunk)];
1469            let response = Response {
1470                status: 200,
1471                headers: Vec::new(),
1472                stream: Box::pin(futures::stream::iter(chunks)),
1473            };
1474            let result = response.text().await;
1475            assert!(result.is_err());
1476            let err_msg = format!("{}", result.unwrap_err());
1477            assert!(
1478                err_msg.contains("too large"),
1479                "error should mention size: {err_msg}"
1480            );
1481        });
1482    }
1483
1484    #[test]
1485    fn response_text_accepts_body_at_limit() {
1486        asupersync::test_utils::run_test(|| async {
1487            let chunk = vec![b'a'; MAX_TEXT_BODY_BYTES];
1488            let chunks: Vec<std::io::Result<Vec<u8>>> = vec![Ok(chunk)];
1489            let response = Response {
1490                status: 200,
1491                headers: Vec::new(),
1492                stream: Box::pin(futures::stream::iter(chunks)),
1493            };
1494            let result = response.text().await;
1495            assert!(result.is_ok());
1496            assert_eq!(result.unwrap().len(), MAX_TEXT_BODY_BYTES);
1497        });
1498    }
1499
1500    // ── PI_AI_ANTIGRAVITY_VERSION env var ─────────────────────────────
1501
1502    #[test]
1503    fn antigravity_user_agent_format() {
1504        // Verify the format string used when PI_AI_ANTIGRAVITY_VERSION is set.
1505        let version = "1.2.3";
1506        let ua = format!("{DEFAULT_USER_AGENT} Antigravity/{version}");
1507        assert!(ua.starts_with("pi_agent_rust/"));
1508        assert!(ua.contains("Antigravity/1.2.3"));
1509
1510        // Verify default user agent contains crate version.
1511        assert!(DEFAULT_USER_AGENT.starts_with("pi_agent_rust/"));
1512    }
1513
1514    #[test]
1515    fn antigravity_user_agent_in_request_headers() {
1516        // Simulate the antigravity user agent being used in request building.
1517        let ua = format!("{DEFAULT_USER_AGENT} Antigravity/42.0");
1518        let parsed = ParsedUrl::parse("http://example.com/api").unwrap();
1519        let bytes = build_request_bytes(Method::Get, &parsed, &ua, &[], &[]);
1520        let text = String::from_utf8(bytes).unwrap();
1521        assert!(text.contains(&format!("User-Agent: {ua}\r\n")));
1522    }
1523}