Skip to main content

trillium_client/
conn.rs

1use crate::{Pool, ResponseBody, h3::H3ClientState, util::encoding};
2use encoding_rs::Encoding;
3use std::{borrow::Cow, net::SocketAddr, sync::Arc, time::Duration};
4use trillium_http::{
5    Body, Buffer, HeaderName, HeaderValues, Headers, HttpContext, Method, ProtocolSession,
6    ReceivedBody, ReceivedBodyState, Status, TypeSet, Version,
7};
8use trillium_server_common::{
9    ArcedConnector, Transport,
10    url::{Origin, Url},
11};
12
13mod h1;
14mod h2;
15mod h3;
16mod shared;
17mod unexpected_status_error;
18
19pub(crate) use h2::H2Pooled;
20#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
21pub use shared::ClientSerdeError;
22pub use unexpected_status_error::UnexpectedStatusError;
23
24/// a client connection, representing both an outbound http request and a
25/// http response
26#[must_use]
27#[derive(fieldwork::Fieldwork)]
28pub struct Conn {
29    pub(crate) pool: Option<Pool<Origin, Box<dyn Transport>>>,
30    pub(crate) h2_pool: Option<Pool<Origin, H2Pooled>>,
31    pub(crate) h2_idle_timeout: Option<Duration>,
32    pub(crate) h2_idle_ping_threshold: Option<Duration>,
33    pub(crate) h2_idle_ping_timeout: Duration,
34    pub(crate) h3_client_state: Option<H3ClientState>,
35    pub(crate) protocol_session: ProtocolSession,
36    /// QUIC-connection WebTransport dispatcher slot (lazy-init) and the QUIC connection
37    /// itself, retained on extended-CONNECT-with-`:protocol = webtransport` requests so
38    /// `into_webtransport` can install the router and hand the QUIC connection to the
39    /// returned [`WebTransportConnection`][trillium_webtransport::WebTransportConnection].
40    #[cfg(feature = "webtransport")]
41    pub(crate) wt_pool_entry: Option<crate::h3::H3PoolEntry>,
42    pub(crate) buffer: Buffer,
43    pub(crate) response_body_state: ReceivedBodyState,
44    pub(crate) config: ArcedConnector,
45    pub(crate) headers_finalized: bool,
46    pub(crate) max_head_length: usize,
47    pub(crate) state: TypeSet,
48    pub(crate) context: Arc<HttpContext>,
49
50    /// the transport for this conn
51    ///
52    /// This should only be used to call your own custom methods on the transport that do not read
53    /// or write any data. Calling any method that reads from or writes to the transport will
54    /// disrupt the HTTP protocol.
55    #[field(get, get_mut)]
56    pub(crate) transport: Option<Box<dyn Transport>>,
57
58    /// the url for this conn.
59    ///
60    /// ```
61    /// use trillium_client::{Client, Method};
62    /// use trillium_testing::client_config;
63    ///
64    /// let client = Client::from(client_config());
65    ///
66    /// let conn = client.get("http://localhost:9080");
67    ///
68    /// let url = conn.url(); //<-
69    ///
70    /// assert_eq!(url.host_str().unwrap(), "localhost");
71    /// ```
72    #[field(get, set, get_mut)]
73    pub(crate) url: Url,
74
75    /// the method for this conn.
76    ///
77    /// ```
78    /// use trillium_client::{Client, Method};
79    /// use trillium_testing::client_config;
80    ///
81    /// let client = Client::from(client_config());
82    /// let conn = client.get("http://localhost:9080");
83    ///
84    /// let method = conn.method(); //<-
85    ///
86    /// assert_eq!(method, Method::Get);
87    /// ```
88    #[field(get, set, copy)]
89    pub(crate) method: Method,
90
91    /// the request headers
92    #[field(get, get_mut)]
93    pub(crate) request_headers: Headers,
94
95    #[field(get, get_mut)]
96    /// the response headers
97    pub(crate) response_headers: Headers,
98
99    /// the status code for this conn.
100    ///
101    /// If the conn has not yet been sent, this will be None.
102    ///
103    /// ```
104    /// use trillium_client::{Client, Status};
105    /// use trillium_testing::{client_config, with_server};
106    ///
107    /// async fn handler(conn: trillium::Conn) -> trillium::Conn {
108    ///     conn.with_status(418)
109    /// }
110    ///
111    /// with_server(handler, |url| async move {
112    ///     let client = Client::new(client_config());
113    ///     let conn = client.get(url).await?;
114    ///     assert_eq!(Status::ImATeapot, conn.status().unwrap());
115    ///     Ok(())
116    /// });
117    /// ```
118    #[field(get, copy)]
119    pub(crate) status: Option<Status>,
120
121    /// the request body
122    ///
123    /// ```
124    /// env_logger::init();
125    /// use trillium_client::Client;
126    /// use trillium_testing::{client_config, with_server};
127    ///
128    /// let handler = |mut conn: trillium::Conn| async move {
129    ///     let body = conn.request_body_string().await.unwrap();
130    ///     conn.ok(format!("request body was: {}", body))
131    /// };
132    ///
133    /// with_server(handler, |url| async move {
134    ///     let client = Client::from(client_config());
135    ///     let mut conn = client
136    ///         .post(url)
137    ///         .with_body("body") //<-
138    ///         .await?;
139    ///
140    ///     assert_eq!(
141    ///         conn.response_body().read_string().await?,
142    ///         "request body was: body"
143    ///     );
144    ///     Ok(())
145    /// });
146    /// ```
147    #[field(with = with_body, argument = body, set, into, take, option_set_some)]
148    pub(crate) request_body: Option<Body>,
149
150    /// the timeout for this conn
151    ///
152    /// this can also be set on the client with [`Client::set_timeout`](crate::Client::set_timeout)
153    /// and [`Client::with_timeout`](crate::Client::with_timeout)
154    #[field(with, set, get, get_mut, take, copy, option_set_some)]
155    pub(crate) timeout: Option<Duration>,
156
157    /// the http version for this conn
158    ///
159    /// Pre-execution this is the version *hint* (prior knowledge), not the version that will
160    /// necessarily be on the wire — the default [`Version::Http1_1`] means "no hint, use
161    /// auto-discovery" rather than "force HTTP/1.1." Post-execution this reflects the version
162    /// the request was actually sent over.
163    ///
164    /// See the crate-level [Protocol selection][crate#protocol-selection] documentation for
165    /// the full hint → behavior table.
166    #[field(get, set, with, copy)]
167    pub(crate) http_version: Version,
168
169    /// the :authority pseudo-header, populated during h3 header finalization
170    #[field(get)]
171    pub(crate) authority: Option<Cow<'static, str>>,
172    /// the :scheme pseudo-header, populated during h3 header finalization
173
174    #[field(get)]
175    pub(crate) scheme: Option<Cow<'static, str>>,
176
177    /// the :path pseudo-header, populated during h3 header finalization
178    #[field(get)]
179    pub(crate) path: Option<Cow<'static, str>>,
180
181    /// an explicit request target override, used only for `OPTIONS *` and `CONNECT host:port`
182    ///
183    /// When set and the method is OPTIONS or CONNECT, this value is used as the HTTP request
184    /// target instead of deriving it from the url. For all other methods, this field is ignored.
185    #[field(with, set, get, option_set_some, into)]
186    pub(crate) request_target: Option<Cow<'static, str>>,
187
188    /// the `:protocol` pseudo-header for an extended-CONNECT bootstrap (RFC 8441 §4 over h2,
189    /// RFC 9220 §3 over h3). Set internally by [`Conn::into_websocket`] to `"websocket"`;
190    /// triggers the h2/h3 exec paths to send HEADERS without `END_STREAM` and leave the stream
191    /// open as a bidirectional byte channel.
192    ///
193    /// Only meaningful when method is `CONNECT` and [`http_version`][Self::http_version] is
194    /// `Http2` or `Http3`. h1 and prior-version requests ignore this field.
195    #[field(get)]
196    pub(crate) protocol: Option<Cow<'static, str>>,
197
198    /// trailers sent with the request body, populated after the body has been fully sent.
199    ///
200    /// Only present when the request body was constructed with [`Body::new_with_trailers`] and
201    /// the body has been fully sent. For H3, this is populated after `send_h3_request`; for H1,
202    /// after `send_body` with a chunked body.
203    #[field(get)]
204    pub(crate) request_trailers: Option<Headers>,
205
206    /// trailers received with the response body, populated after the response body has been fully
207    /// read.
208    ///
209    /// For H3, these are decoded from the trailing HEADERS frame. For H1, from chunked trailers
210    /// (once H1 trailer receive is implemented).
211    #[field(get)]
212    pub(crate) response_trailers: Option<Headers>,
213}
214
215/// default http user-agent header
216pub const USER_AGENT: &str = concat!("trillium-client/", env!("CARGO_PKG_VERSION"));
217
218impl Conn {
219    /// chainable setter for [`inserting`](Headers::insert) a request header
220    ///
221    /// ```
222    /// use trillium_client::Client;
223    /// use trillium_testing::{client_config, with_server};
224    ///
225    /// let handler = |conn: trillium::Conn| async move {
226    ///     let header = conn
227    ///         .request_headers()
228    ///         .get_str("some-request-header")
229    ///         .unwrap_or_default();
230    ///     let response = format!("some-request-header was {}", header);
231    ///     conn.ok(response)
232    /// };
233    ///
234    /// with_server(handler, |url| async move {
235    ///     let client = Client::new(client_config());
236    ///     let mut conn = client
237    ///         .get(url)
238    ///         .with_request_header("some-request-header", "header-value") // <--
239    ///         .await?;
240    ///     assert_eq!(
241    ///         conn.response_body().read_string().await?,
242    ///         "some-request-header was header-value"
243    ///     );
244    ///     Ok(())
245    /// })
246    /// ```
247    pub fn with_request_header(
248        mut self,
249        name: impl Into<HeaderName<'static>>,
250        value: impl Into<HeaderValues>,
251    ) -> Self {
252        self.request_headers.insert(name, value);
253        self
254    }
255
256    /// chainable setter for `extending` request headers
257    ///
258    /// ```
259    /// use trillium_client::Client;
260    /// use trillium_testing::{client_config, with_server};
261    ///
262    /// let handler = |conn: trillium::Conn| async move {
263    ///     let header = conn
264    ///         .request_headers()
265    ///         .get_str("some-request-header")
266    ///         .unwrap_or_default();
267    ///     let response = format!("some-request-header was {}", header);
268    ///     conn.ok(response)
269    /// };
270    ///
271    /// with_server(handler, move |url| async move {
272    ///     let client = Client::new(client_config());
273    ///     let mut conn = client
274    ///         .get(url)
275    ///         .with_request_headers([
276    ///             ("some-request-header", "header-value"),
277    ///             ("some-other-req-header", "other-header-value"),
278    ///         ])
279    ///         .await?;
280    ///
281    ///     assert_eq!(
282    ///         conn.response_body().read_string().await?,
283    ///         "some-request-header was header-value"
284    ///     );
285    ///     Ok(())
286    /// })
287    /// ```
288    pub fn with_request_headers<HN, HV, I>(mut self, headers: I) -> Self
289    where
290        I: IntoIterator<Item = (HN, HV)> + Send,
291        HN: Into<HeaderName<'static>>,
292        HV: Into<HeaderValues>,
293    {
294        self.request_headers.extend(headers);
295        self
296    }
297
298    /// Chainable method to remove a request header if present
299    pub fn without_request_header(mut self, name: impl Into<HeaderName<'static>>) -> Self {
300        self.request_headers.remove(name);
301        self
302    }
303
304    /// chainable setter for json body. this requires the `serde_json` crate feature to be enabled.
305    #[cfg(feature = "serde_json")]
306    pub fn with_json_body(self, body: &impl serde::Serialize) -> serde_json::Result<Self> {
307        use trillium_http::KnownHeaderName;
308
309        Ok(self
310            .with_body(serde_json::to_string(body)?)
311            .with_request_header(KnownHeaderName::ContentType, "application/json"))
312    }
313
314    /// chainable setter for json body. this requires the `sonic-rs` crate feature to be enabled.
315    #[cfg(feature = "sonic-rs")]
316    pub fn with_json_body(self, body: &impl serde::Serialize) -> sonic_rs::Result<Self> {
317        use trillium_http::KnownHeaderName;
318
319        Ok(self
320            .with_body(sonic_rs::to_string(body)?)
321            .with_request_header(KnownHeaderName::ContentType, "application/json"))
322    }
323
324    pub(crate) fn response_encoding(&self) -> &'static Encoding {
325        encoding(&self.response_headers)
326    }
327
328    /// returns a [`ResponseBody`](crate::ResponseBody) that borrows the connection inside this
329    /// conn.
330    /// ```
331    /// use trillium_client::Client;
332    /// use trillium_testing::{client_config, with_server};
333    ///
334    /// let handler = |mut conn: trillium::Conn| async move { conn.ok("hello from trillium") };
335    ///
336    /// with_server(handler, |url| async move {
337    ///     let client = Client::from(client_config());
338    ///     let mut conn = client.get(url).await?;
339    ///
340    ///     let response_body = conn.response_body(); //<-
341    ///
342    ///     assert_eq!(19, response_body.content_length().unwrap());
343    ///     let string = response_body.read_string().await?;
344    ///     assert_eq!("hello from trillium", string);
345    ///     Ok(())
346    /// });
347    /// ```
348    #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
349    pub fn response_body(&mut self) -> ResponseBody<'_> {
350        ReceivedBody::new(
351            self.response_content_length(),
352            &mut self.buffer,
353            self.transport.as_mut().unwrap(),
354            &mut self.response_body_state,
355            None,
356            encoding(&self.response_headers),
357        )
358        .with_trailers(&mut self.response_trailers)
359        .with_protocol_session(self.protocol_session.clone())
360        .into()
361    }
362
363    /// Attempt to deserialize the response body. Note that this consumes the body content.
364    #[cfg(feature = "serde_json")]
365    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
366    where
367        T: serde::de::DeserializeOwned,
368    {
369        let body = self.response_body().read_string().await?;
370        Ok(serde_json::from_str(&body)?)
371    }
372
373    /// Attempt to deserialize the response body. Note that this consumes the body content.
374    #[cfg(feature = "sonic-rs")]
375    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
376    where
377        T: serde::de::DeserializeOwned,
378    {
379        let body = self.response_body().read_string().await?;
380        Ok(sonic_rs::from_str(&body)?)
381    }
382
383    /// Returns the conn or an [`UnexpectedStatusError`] that contains the conn
384    ///
385    /// ```
386    /// use trillium_client::{Client, Status};
387    /// use trillium_testing::{client_config, with_server};
388    ///
389    /// with_server(Status::NotFound, |url| async move {
390    ///     let client = Client::new(client_config());
391    ///     assert_eq!(
392    ///         client.get(url).await?.success().unwrap_err().to_string(),
393    ///         "expected a success (2xx) status code, but got 404 Not Found"
394    ///     );
395    ///     Ok(())
396    /// });
397    ///
398    /// with_server(Status::Ok, |url| async move {
399    ///     let client = Client::new(client_config());
400    ///     assert!(client.get(url).await?.success().is_ok());
401    ///     Ok(())
402    /// });
403    /// ```
404    pub fn success(self) -> Result<Self, UnexpectedStatusError> {
405        match self.status() {
406            Some(status) if status.is_success() => Ok(self),
407            _ => Err(self.into()),
408        }
409    }
410
411    /// Returns this conn to the connection pool if it is keepalive, and
412    /// closes it otherwise. This will happen asynchronously as a spawned
413    /// task when the conn is dropped, but calling it explicitly allows
414    /// you to block on it and control where it happens.
415    pub async fn recycle(mut self) {
416        if self.is_keep_alive() && self.transport.is_some() && self.pool.is_some() {
417            self.finish_reading_body().await;
418        }
419    }
420
421    /// attempts to retrieve the connected peer address
422    pub fn peer_addr(&self) -> Option<SocketAddr> {
423        self.transport
424            .as_ref()
425            .and_then(|t| t.peer_addr().ok().flatten())
426    }
427
428    /// add state to the client conn and return self
429    pub fn with_state<T: Send + Sync + 'static>(mut self, state: T) -> Self {
430        self.insert_state(state);
431        self
432    }
433
434    /// add state to the client conn, returning any previously set state of this type
435    pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
436        self.state.insert(state)
437    }
438
439    /// borrow state
440    pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
441        self.state.get()
442    }
443
444    /// borrow state mutably
445    pub fn state_mut<T: Send + Sync + 'static>(&mut self) -> Option<&mut T> {
446        self.state.get_mut()
447    }
448
449    /// take state
450    pub fn take_state<T: Send + Sync + 'static>(&mut self) -> Option<T> {
451        self.state.take()
452    }
453}