Skip to main content

trillium_client/
conn.rs

1use crate::{
2    Client, ResponseBody,
3    response_body::{CleanupContext, OverrideBody},
4    util::encoding,
5};
6use std::{borrow::Cow, mem, net::SocketAddr, sync::Arc, time::Duration};
7use trillium_http::{
8    Body, Buffer, Error, HeaderName, HeaderValues, Headers, HttpContext, Method, ProtocolSession,
9    ReceivedBody, ReceivedBodyState, Status, TypeSet, Version,
10};
11use trillium_server_common::{Transport, url::Url};
12
13mod h1;
14mod h2;
15mod h3;
16mod shared;
17mod unexpected_status_error;
18
19pub(crate) use h2::H2Pooled;
20#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
21pub use shared::ClientSerdeError;
22pub use unexpected_status_error::UnexpectedStatusError;
23
24/// a client connection, representing both an outbound http request and a
25/// http response
26#[must_use]
27#[derive(fieldwork::Fieldwork)]
28pub struct Conn {
29    pub(crate) protocol_session: ProtocolSession,
30    /// QUIC-connection WebTransport dispatcher slot (lazy-init) and the QUIC connection
31    /// itself, retained on extended-CONNECT-with-`:protocol = webtransport` requests so
32    /// `into_webtransport` can install the router and hand the QUIC connection to the
33    /// returned [`WebTransportConnection`][trillium_webtransport::WebTransportConnection].
34    #[cfg(feature = "webtransport")]
35    pub(crate) wt_pool_entry: Option<crate::h3::H3PoolEntry>,
36    pub(crate) buffer: Buffer,
37    pub(crate) response_body_state: ReceivedBodyState,
38    pub(crate) headers_finalized: bool,
39    pub(crate) max_head_length: usize,
40    pub(crate) state: TypeSet,
41    pub(crate) context: Arc<HttpContext>,
42
43    /// the transport for this conn
44    ///
45    /// This should only be used to call your own custom methods on the transport that do not read
46    /// or write any data. Calling any method that reads from or writes to the transport will
47    /// disrupt the HTTP protocol.
48    #[field(get, get_mut)]
49    pub(crate) transport: Option<Box<dyn Transport>>,
50
51    /// the url for this conn.
52    ///
53    /// ```
54    /// use trillium_client::{Client, Method};
55    /// use trillium_testing::client_config;
56    ///
57    /// let client = Client::from(client_config());
58    ///
59    /// let conn = client.get("http://localhost:9080");
60    ///
61    /// let url = conn.url(); //<-
62    ///
63    /// assert_eq!(url.host_str().unwrap(), "localhost");
64    /// ```
65    #[field(get, set, get_mut)]
66    pub(crate) url: Url,
67
68    /// the method for this conn.
69    ///
70    /// ```
71    /// use trillium_client::{Client, Method};
72    /// use trillium_testing::client_config;
73    ///
74    /// let client = Client::from(client_config());
75    /// let conn = client.get("http://localhost:9080");
76    ///
77    /// let method = conn.method(); //<-
78    ///
79    /// assert_eq!(method, Method::Get);
80    /// ```
81    #[field(get, set, copy)]
82    pub(crate) method: Method,
83
84    /// the request headers
85    #[field(get, get_mut)]
86    pub(crate) request_headers: Headers,
87
88    #[field(get)]
89    /// the response headers
90    pub(crate) response_headers: Headers,
91
92    /// the status code for this conn.
93    ///
94    /// If the conn has not yet been sent, this will be None.
95    ///
96    /// ```
97    /// use trillium_client::{Client, Status};
98    /// use trillium_testing::{client_config, with_server};
99    ///
100    /// async fn handler(conn: trillium::Conn) -> trillium::Conn {
101    ///     conn.with_status(418)
102    /// }
103    ///
104    /// with_server(handler, |url| async move {
105    ///     let client = Client::new(client_config());
106    ///     let conn = client.get(url).await?;
107    ///     assert_eq!(Status::ImATeapot, conn.status().unwrap());
108    ///     Ok(())
109    /// });
110    /// ```
111    #[field(get, copy)]
112    pub(crate) status: Option<Status>,
113
114    /// the request body
115    ///
116    /// ```
117    /// env_logger::init();
118    /// use trillium_client::Client;
119    /// use trillium_testing::{client_config, with_server};
120    ///
121    /// let handler = |mut conn: trillium::Conn| async move {
122    ///     let body = conn.request_body_string().await.unwrap();
123    ///     conn.ok(format!("request body was: {}", body))
124    /// };
125    ///
126    /// with_server(handler, |url| async move {
127    ///     let client = Client::from(client_config());
128    ///     let mut conn = client
129    ///         .post(url)
130    ///         .with_body("body") //<-
131    ///         .await?;
132    ///
133    ///     assert_eq!(
134    ///         conn.response_body().read_string().await?,
135    ///         "request body was: body"
136    ///     );
137    ///     Ok(())
138    /// });
139    /// ```
140    #[field(get, with = with_body, argument = body, set, into, take, option_set_some)]
141    pub(crate) request_body: Option<Body>,
142
143    /// the timeout for this conn
144    ///
145    /// this can also be set on the client with [`Client::set_timeout`](crate::Client::set_timeout)
146    /// and [`Client::with_timeout`](crate::Client::with_timeout)
147    #[field(with, set, get, get_mut, take, copy, option_set_some)]
148    pub(crate) timeout: Option<Duration>,
149
150    /// whether this conn is halted.
151    ///
152    /// When set to `true` before execution, the network round-trip is skipped — the conn is
153    /// returned to the caller with whatever response state has been populated synthetically
154    /// (status, headers, body). Used by client middleware to short-circuit on cache hits,
155    /// mocked responses, or open circuit-breakers. Cleared on egress so the user's conn handle
156    /// never observes residual halt state after the awaited conn returns.
157    ///
158    /// Driven via [`ConnExt`](crate::ConnExt) — `halt` / `set_halted` / `is_halted`.
159    pub(crate) halted: bool,
160
161    /// transport-level error from the round-trip, if any.
162    ///
163    /// When the network call fails (connect refused, TLS handshake error, malformed HTTP frame,
164    /// timeout, etc.) the framework stashes the error here and runs the handler chain's
165    /// [`after_response`](crate::ClientHandler::after_response) anyway. A handler that recovers
166    /// (stale-if-error cache, retry-with-fallback) calls
167    /// [`ConnExt::take_error`](crate::ConnExt::take_error) to clear the error
168    /// and populates response state synthetically; if the error is still present after all
169    /// handlers finish, it propagates as `Err` from the awaited conn.
170    pub(crate) error: Option<Error>,
171
172    /// An override response body installed by middleware via
173    /// [`ConnExt::set_response_body`](crate::ConnExt::set_response_body) or
174    /// [`ConnExt::with_response_body`](crate::ConnExt::with_response_body). When
175    /// set, [`Conn::response_body`] returns a [`ResponseBody`] backed by this body instead of
176    /// the transport.
177    pub(crate) body_override: Option<Body>,
178
179    /// the http version for this conn
180    ///
181    /// Pre-execution this is the version *hint* (prior knowledge), not the version that will
182    /// necessarily be on the wire — the default [`Version::Http1_1`] means "no hint, use
183    /// auto-discovery" rather than "force HTTP/1.1." Post-execution this reflects the version
184    /// the request was actually sent over.
185    ///
186    /// See the crate-level [Protocol selection][crate#protocol-selection] documentation for
187    /// the full hint → behavior table.
188    #[field(get, set, with, copy)]
189    pub(crate) http_version: Version,
190
191    /// the :authority pseudo-header, populated during h2 or h3 header finalization
192    #[field(get)]
193    pub(crate) authority: Option<Cow<'static, str>>,
194    /// the :scheme pseudo-header, populated during h2 or h3 header finalization
195
196    #[field(get)]
197    pub(crate) scheme: Option<Cow<'static, str>>,
198
199    /// the :path pseudo-header, populated during h2 or h3 header finalization
200    #[field(get)]
201    pub(crate) path: Option<Cow<'static, str>>,
202
203    /// an explicit request target override, used only for `OPTIONS *` and `CONNECT host:port`
204    ///
205    /// When set and the method is OPTIONS or CONNECT, this value is used as the HTTP request
206    /// target instead of deriving it from the url. For all other methods, this field is ignored.
207    #[field(with, set, get, option_set_some, into)]
208    pub(crate) request_target: Option<Cow<'static, str>>,
209
210    /// the `:protocol` pseudo-header for an extended-CONNECT bootstrap (RFC 8441 over h2,
211    /// RFC 9220 over h3). Triggers the h2/h3 exec paths to send HEADERS without `END_STREAM`
212    /// and leave the stream open as a bidirectional byte channel.
213    ///
214    /// Only meaningful when method is `CONNECT` and [`http_version`][Self::http_version] is
215    /// `Http2` or `Http3`. h1 and prior-version requests ignore this field.
216    #[field(get)]
217    pub(crate) protocol: Option<Cow<'static, str>>,
218
219    /// trailers sent with the request body, populated after the body has been fully sent.
220    ///
221    /// Only present when the request body was constructed with [`Body::new_with_trailers`] and
222    /// the body has been fully sent.
223    #[field(get)]
224    pub(crate) request_trailers: Option<Headers>,
225
226    /// trailers received with the response body, populated after the response body has been fully
227    /// read.
228    #[field(get)]
229    pub(crate) response_trailers: Option<Headers>,
230
231    /// the [`Client`] that built this conn.
232    #[field(get)]
233    pub(crate) client: Client,
234
235    /// A queued follow-up conn installed by middleware via
236    /// [`ConnExt::set_followup`](crate::ConnExt::set_followup).
237    ///
238    /// When `Some` after the handler chain's `after_response` has fully unwound, the
239    /// [`IntoFuture`][std::future::IntoFuture] loop picks it up: the current conn's response
240    /// body is recycled, then the follow-up is swapped in and runs another full
241    /// `(run → network → after_response)` cycle. Used by re-issuing handlers
242    /// (`FollowRedirects`, retry, auth-refresh) instead of recursing into a nested `.await`.
243    pub(crate) followup: Option<Box<Conn>>,
244
245    /// Whether this conn is armed for an upgrade. When set, the protocol drivers
246    /// transmit only request headers and leave the outbound direction open. Armed via
247    /// [`ConnExt::upgrade`](crate::ConnExt::upgrade).
248    pub(crate) upgrade: bool,
249}
250
251/// default http user-agent header
252pub const USER_AGENT: &str = concat!("trillium-client/", env!("CARGO_PKG_VERSION"));
253
254impl Conn {
255    /// chainable setter for [`inserting`](Headers::insert) a request header
256    ///
257    /// ```
258    /// use trillium_client::Client;
259    /// use trillium_testing::{client_config, with_server};
260    ///
261    /// let handler = |conn: trillium::Conn| async move {
262    ///     let header = conn
263    ///         .request_headers()
264    ///         .get_str("some-request-header")
265    ///         .unwrap_or_default();
266    ///     let response = format!("some-request-header was {}", header);
267    ///     conn.ok(response)
268    /// };
269    ///
270    /// with_server(handler, |url| async move {
271    ///     let client = Client::new(client_config());
272    ///     let mut conn = client
273    ///         .get(url)
274    ///         .with_request_header("some-request-header", "header-value") // <--
275    ///         .await?;
276    ///     assert_eq!(
277    ///         conn.response_body().read_string().await?,
278    ///         "some-request-header was header-value"
279    ///     );
280    ///     Ok(())
281    /// })
282    /// ```
283    pub fn with_request_header(
284        mut self,
285        name: impl Into<HeaderName<'static>>,
286        value: impl Into<HeaderValues>,
287    ) -> Self {
288        self.request_headers.insert(name, value);
289        self
290    }
291
292    /// chainable setter for `extending` request headers
293    ///
294    /// ```
295    /// use trillium_client::Client;
296    /// use trillium_testing::{client_config, with_server};
297    ///
298    /// let handler = |conn: trillium::Conn| async move {
299    ///     let header = conn
300    ///         .request_headers()
301    ///         .get_str("some-request-header")
302    ///         .unwrap_or_default();
303    ///     let response = format!("some-request-header was {}", header);
304    ///     conn.ok(response)
305    /// };
306    ///
307    /// with_server(handler, move |url| async move {
308    ///     let client = Client::new(client_config());
309    ///     let mut conn = client
310    ///         .get(url)
311    ///         .with_request_headers([
312    ///             ("some-request-header", "header-value"),
313    ///             ("some-other-req-header", "other-header-value"),
314    ///         ])
315    ///         .await?;
316    ///
317    ///     assert_eq!(
318    ///         conn.response_body().read_string().await?,
319    ///         "some-request-header was header-value"
320    ///     );
321    ///     Ok(())
322    /// })
323    /// ```
324    pub fn with_request_headers<HN, HV, I>(mut self, headers: I) -> Self
325    where
326        I: IntoIterator<Item = (HN, HV)> + Send,
327        HN: Into<HeaderName<'static>>,
328        HV: Into<HeaderValues>,
329    {
330        self.request_headers.extend(headers);
331        self
332    }
333
334    /// Chainable method to remove a request header if present
335    pub fn without_request_header(mut self, name: impl Into<HeaderName<'static>>) -> Self {
336        self.request_headers.remove(name);
337        self
338    }
339
340    /// chainable setter for json body. this requires the `serde_json` crate feature to be enabled.
341    #[cfg(feature = "serde_json")]
342    pub fn with_json_body(self, body: &impl serde::Serialize) -> serde_json::Result<Self> {
343        use trillium_http::KnownHeaderName;
344
345        Ok(self
346            .with_body(serde_json::to_string(body)?)
347            .with_request_header(KnownHeaderName::ContentType, "application/json"))
348    }
349
350    /// chainable setter for json body. this requires the `sonic-rs` crate feature to be enabled.
351    #[cfg(feature = "sonic-rs")]
352    pub fn with_json_body(self, body: &impl serde::Serialize) -> sonic_rs::Result<Self> {
353        use trillium_http::KnownHeaderName;
354
355        Ok(self
356            .with_body(sonic_rs::to_string(body)?)
357            .with_request_header(KnownHeaderName::ContentType, "application/json"))
358    }
359
360    /// returns a [`ResponseBody`](crate::ResponseBody) that borrows the connection inside this
361    /// conn.
362    /// ```
363    /// use trillium_client::Client;
364    /// use trillium_testing::{client_config, with_server};
365    ///
366    /// let handler = |mut conn: trillium::Conn| async move { conn.ok("hello from trillium") };
367    ///
368    /// with_server(handler, |url| async move {
369    ///     let client = Client::from(client_config());
370    ///     let mut conn = client.get(url).await?;
371    ///
372    ///     let response_body = conn.response_body(); //<-
373    ///
374    ///     assert_eq!(19, response_body.content_length().unwrap());
375    ///     let string = response_body.read_string().await?;
376    ///     assert_eq!("hello from trillium", string);
377    ///     Ok(())
378    /// });
379    /// ```
380    #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
381    pub fn response_body(&mut self) -> ResponseBody<'_> {
382        let content_length = self.response_content_length();
383        let encoding = encoding(&self.response_headers);
384        if let Some(body) = self.body_override.as_mut() {
385            OverrideBody::new(body, encoding, self.context.config()).into()
386        } else {
387            ReceivedBody::new(
388                content_length,
389                &mut self.buffer,
390                self.transport.as_mut().unwrap(),
391                &mut self.response_body_state,
392                None,
393                encoding,
394            )
395            .with_trailers(&mut self.response_trailers)
396            .with_protocol_session(self.protocol_session.clone())
397            .into()
398        }
399    }
400
401    /// Attempt to deserialize the response body. Note that this consumes the body content.
402    #[cfg(feature = "serde_json")]
403    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
404    where
405        T: serde::de::DeserializeOwned,
406    {
407        let body = self.response_body().read_string().await?;
408        Ok(serde_json::from_str(&body)?)
409    }
410
411    /// Attempt to deserialize the response body. Note that this consumes the body content.
412    #[cfg(feature = "sonic-rs")]
413    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
414    where
415        T: serde::de::DeserializeOwned,
416    {
417        let body = self.response_body().read_string().await?;
418        Ok(sonic_rs::from_str(&body)?)
419    }
420
421    /// Returns the conn or an [`UnexpectedStatusError`] that contains the conn
422    ///
423    /// ```
424    /// use trillium_client::{Client, Status};
425    /// use trillium_testing::{client_config, with_server};
426    ///
427    /// with_server(Status::NotFound, |url| async move {
428    ///     let client = Client::new(client_config());
429    ///     assert_eq!(
430    ///         client.get(url).await?.success().unwrap_err().to_string(),
431    ///         "expected a success (2xx) status code, but got 404 Not Found"
432    ///     );
433    ///     Ok(())
434    /// });
435    ///
436    /// with_server(Status::Ok, |url| async move {
437    ///     let client = Client::new(client_config());
438    ///     assert!(client.get(url).await?.success().is_ok());
439    ///     Ok(())
440    /// });
441    /// ```
442    pub fn success(self) -> Result<Self, UnexpectedStatusError> {
443        match self.status() {
444            Some(status) if status.is_success() => Ok(self),
445            _ => Err(self.into()),
446        }
447    }
448
449    /// Detach the response body as an owned, `'static` value.
450    ///
451    /// Returns `None` if there is no body to take — neither an override has been installed nor
452    /// a transport-backed body is available. Subsequent calls return `None`. Callers who want
453    /// to wrap-and-replace the body (e.g. tee through a cache) compose this with
454    /// [`ConnExt::set_response_body`][crate::ConnExt::set_response_body]; the conn's
455    /// body slot is empty between the two calls.
456    ///
457    /// For a transport-backed body, this moves the transport into the returned
458    /// `ResponseBody<'static>`. Drop on that value drains-and-pools (keepalive) or closes
459    /// (otherwise) the transport via a spawned task; [`ResponseBody::recycle`] is the
460    /// `await`-able variant. For an override body, the inner [`Body`] is moved out and any
461    /// leftover transport on the conn is recycled immediately.
462    #[must_use]
463    pub fn take_response_body(&mut self) -> Option<ResponseBody<'static>> {
464        let encoding = encoding(&self.response_headers);
465        if let Some(body) = self.body_override.take() {
466            return Some(OverrideBody::new(body, encoding, self.context.config()).into());
467        }
468
469        let cleanup = self.build_cleanup_context();
470        let received = self.take_received_body(false)?;
471        Some(ResponseBody::received_owned(received, cleanup))
472    }
473
474    /// Build a [`CleanupContext`] capturing the runtime and (if keepalive + pool configured)
475    /// the pool + origin to insert into. Single source of truth for "what should happen to
476    /// this conn's transport when its body is released" — both the on_completion callback
477    /// wired into the body and the [`ResponseBody::recycle`] / `Drop` paths consume clones
478    /// of this same context, so the user-driven and Drop-driven release paths agree.
479    fn build_cleanup_context(&self) -> CleanupContext {
480        // Only pool a transport whose response head we actually received (`status.is_some()`): a
481        // conn abandoned before the response — a timeout or transport error mid-request — has an
482        // empty `response_headers`, which `is_keep_alive` would read as persistent and recycle a
483        // half-spent connection into the pool, poisoning the next request that reuses it.
484        let h1_pool_origin = if self.status.is_some()
485            && self.is_keep_alive()
486            && let Some(pool) = self.client.pool().cloned()
487        {
488            Some((pool, self.url.origin()))
489        } else {
490            None
491        };
492
493        CleanupContext {
494            runtime: self.client.connector().runtime(),
495            h1_pool_origin,
496        }
497    }
498
499    /// Detach the transport-backed receive side of this conn as an owned `ReceivedBody`.
500    ///
501    /// Returns `None` when no transport is attached.
502    ///
503    /// `cleanup: true` wires a spawn-on-End callback inside the body for callers that hand
504    /// the body off without awaiting it (`From<Conn> for Body`). `cleanup: false` is for
505    /// callers that drive the body to End themselves and release the transport inline in
506    /// their own poll loop — `take_response_body` does this so callers get a "transport is
507    /// settled when read_to_end returns Ok(0)" guarantee instead of racing a spawned task.
508    pub(crate) fn take_received_body(
509        &mut self,
510        cleanup: bool,
511    ) -> Option<ReceivedBody<'static, Box<dyn Transport>>> {
512        let _ = self.finalize_headers();
513        let transport = self.transport.take()?;
514
515        let on_completion = cleanup.then(|| {
516            let cleanup = self.build_cleanup_context();
517            Box::new(move |transport| cleanup.handoff(transport))
518                as Box<dyn FnOnce(Box<dyn Transport>) + Send + Sync + 'static>
519        });
520
521        Some(
522            ReceivedBody::new(
523                self.response_content_length(),
524                mem::take(&mut self.buffer),
525                transport,
526                self.response_body_state,
527                on_completion,
528                encoding(&self.response_headers),
529            )
530            .with_protocol_session(self.protocol_session.clone()),
531        )
532    }
533
534    /// Returns this conn to the connection pool if it is keepalive, and
535    /// closes it otherwise. This will happen asynchronously as a spawned
536    /// task when the conn is dropped, but calling it explicitly allows
537    /// you to block on it and control where it happens.
538    pub async fn recycle(mut self) {
539        if let Some(rb) = self.take_response_body() {
540            rb.recycle().await;
541        }
542    }
543
544    /// attempts to retrieve the connected peer address
545    pub fn peer_addr(&self) -> Option<SocketAddr> {
546        self.transport
547            .as_ref()
548            .and_then(|t| t.peer_addr().ok().flatten())
549    }
550
551    /// add state to the client conn and return self
552    pub fn with_state<T: Send + Sync + 'static>(mut self, state: T) -> Self {
553        self.insert_state(state);
554        self
555    }
556
557    /// add state to the client conn, returning any previously set state of this type
558    pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
559        self.state.insert(state)
560    }
561
562    /// borrow state
563    pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
564        self.state.get()
565    }
566
567    /// borrow state mutably
568    pub fn state_mut<T: Send + Sync + 'static>(&mut self) -> Option<&mut T> {
569        self.state.get_mut()
570    }
571
572    /// take state
573    pub fn take_state<T: Send + Sync + 'static>(&mut self) -> Option<T> {
574        self.state.take()
575    }
576}