trillium_client/conn.rs
1use crate::{Pool, ResponseBody, h3::H3ClientState, util::encoding};
2use encoding_rs::Encoding;
3use std::{borrow::Cow, net::SocketAddr, sync::Arc, time::Duration};
4use trillium_http::{
5 Body, Buffer, HeaderName, HeaderValues, Headers, HttpContext, Method, ProtocolSession,
6 ReceivedBody, ReceivedBodyState, Status, TypeSet, Version,
7};
8use trillium_server_common::{
9 ArcedConnector, Transport,
10 url::{Origin, Url},
11};
12
13mod h1;
14mod h2;
15mod h3;
16mod shared;
17mod unexpected_status_error;
18
19pub(crate) use h2::H2Pooled;
20#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
21pub use shared::ClientSerdeError;
22pub use unexpected_status_error::UnexpectedStatusError;
23
24/// a client connection, representing both an outbound http request and a
25/// http response
26#[must_use]
27#[derive(fieldwork::Fieldwork)]
28pub struct Conn {
29 pub(crate) pool: Option<Pool<Origin, Box<dyn Transport>>>,
30 pub(crate) h2_pool: Option<Pool<Origin, H2Pooled>>,
31 pub(crate) h2_idle_timeout: Option<Duration>,
32 pub(crate) h2_idle_ping_threshold: Option<Duration>,
33 pub(crate) h2_idle_ping_timeout: Duration,
34 pub(crate) h3_client_state: Option<H3ClientState>,
35 pub(crate) protocol_session: ProtocolSession,
36 /// QUIC-connection WebTransport dispatcher slot (lazy-init) and the QUIC connection
37 /// itself, retained on extended-CONNECT-with-`:protocol = webtransport` requests so
38 /// `into_webtransport` can install the router and hand the QUIC connection to the
39 /// returned [`WebTransportConnection`][trillium_webtransport::WebTransportConnection].
40 #[cfg(feature = "webtransport")]
41 pub(crate) wt_pool_entry: Option<crate::h3::H3PoolEntry>,
42 pub(crate) buffer: Buffer,
43 pub(crate) response_body_state: ReceivedBodyState,
44 pub(crate) config: ArcedConnector,
45 pub(crate) headers_finalized: bool,
46 pub(crate) max_head_length: usize,
47 pub(crate) state: TypeSet,
48 pub(crate) context: Arc<HttpContext>,
49
50 /// the transport for this conn
51 ///
52 /// This should only be used to call your own custom methods on the transport that do not read
53 /// or write any data. Calling any method that reads from or writes to the transport will
54 /// disrupt the HTTP protocol.
55 #[field(get, get_mut)]
56 pub(crate) transport: Option<Box<dyn Transport>>,
57
58 /// the url for this conn.
59 ///
60 /// ```
61 /// use trillium_client::{Client, Method};
62 /// use trillium_testing::client_config;
63 ///
64 /// let client = Client::from(client_config());
65 ///
66 /// let conn = client.get("http://localhost:9080");
67 ///
68 /// let url = conn.url(); //<-
69 ///
70 /// assert_eq!(url.host_str().unwrap(), "localhost");
71 /// ```
72 #[field(get, set, get_mut)]
73 pub(crate) url: Url,
74
75 /// the method for this conn.
76 ///
77 /// ```
78 /// use trillium_client::{Client, Method};
79 /// use trillium_testing::client_config;
80 ///
81 /// let client = Client::from(client_config());
82 /// let conn = client.get("http://localhost:9080");
83 ///
84 /// let method = conn.method(); //<-
85 ///
86 /// assert_eq!(method, Method::Get);
87 /// ```
88 #[field(get, set, copy)]
89 pub(crate) method: Method,
90
91 /// the request headers
92 #[field(get, get_mut)]
93 pub(crate) request_headers: Headers,
94
95 #[field(get, get_mut)]
96 /// the response headers
97 pub(crate) response_headers: Headers,
98
99 /// the status code for this conn.
100 ///
101 /// If the conn has not yet been sent, this will be None.
102 ///
103 /// ```
104 /// use trillium_client::{Client, Status};
105 /// use trillium_testing::{client_config, with_server};
106 ///
107 /// async fn handler(conn: trillium::Conn) -> trillium::Conn {
108 /// conn.with_status(418)
109 /// }
110 ///
111 /// with_server(handler, |url| async move {
112 /// let client = Client::new(client_config());
113 /// let conn = client.get(url).await?;
114 /// assert_eq!(Status::ImATeapot, conn.status().unwrap());
115 /// Ok(())
116 /// });
117 /// ```
118 #[field(get, copy)]
119 pub(crate) status: Option<Status>,
120
121 /// the request body
122 ///
123 /// ```
124 /// env_logger::init();
125 /// use trillium_client::Client;
126 /// use trillium_testing::{client_config, with_server};
127 ///
128 /// let handler = |mut conn: trillium::Conn| async move {
129 /// let body = conn.request_body_string().await.unwrap();
130 /// conn.ok(format!("request body was: {}", body))
131 /// };
132 ///
133 /// with_server(handler, |url| async move {
134 /// let client = Client::from(client_config());
135 /// let mut conn = client
136 /// .post(url)
137 /// .with_body("body") //<-
138 /// .await?;
139 ///
140 /// assert_eq!(
141 /// conn.response_body().read_string().await?,
142 /// "request body was: body"
143 /// );
144 /// Ok(())
145 /// });
146 /// ```
147 #[field(with = with_body, argument = body, set, into, take, option_set_some)]
148 pub(crate) request_body: Option<Body>,
149
150 /// the timeout for this conn
151 ///
152 /// this can also be set on the client with [`Client::set_timeout`](crate::Client::set_timeout)
153 /// and [`Client::with_timeout`](crate::Client::with_timeout)
154 #[field(with, set, get, get_mut, take, copy, option_set_some)]
155 pub(crate) timeout: Option<Duration>,
156
157 /// the http version for this conn
158 ///
159 /// Pre-execution this is the version *hint* (prior knowledge), not the version that will
160 /// necessarily be on the wire — the default [`Version::Http1_1`] means "no hint, use
161 /// auto-discovery" rather than "force HTTP/1.1." Post-execution this reflects the version
162 /// the request was actually sent over.
163 ///
164 /// See the crate-level [Protocol selection][crate#protocol-selection] documentation for
165 /// the full hint → behavior table.
166 #[field(get, set, with, copy)]
167 pub(crate) http_version: Version,
168
169 /// the :authority pseudo-header, populated during h3 header finalization
170 #[field(get)]
171 pub(crate) authority: Option<Cow<'static, str>>,
172 /// the :scheme pseudo-header, populated during h3 header finalization
173
174 #[field(get)]
175 pub(crate) scheme: Option<Cow<'static, str>>,
176
177 /// the :path pseudo-header, populated during h3 header finalization
178 #[field(get)]
179 pub(crate) path: Option<Cow<'static, str>>,
180
181 /// an explicit request target override, used only for `OPTIONS *` and `CONNECT host:port`
182 ///
183 /// When set and the method is OPTIONS or CONNECT, this value is used as the HTTP request
184 /// target instead of deriving it from the url. For all other methods, this field is ignored.
185 #[field(with, set, get, option_set_some, into)]
186 pub(crate) request_target: Option<Cow<'static, str>>,
187
188 /// the `:protocol` pseudo-header for an extended-CONNECT bootstrap (RFC 8441 §4 over h2,
189 /// RFC 9220 §3 over h3). Set internally by [`Conn::into_websocket`] to `"websocket"`;
190 /// triggers the h2/h3 exec paths to send HEADERS without `END_STREAM` and leave the stream
191 /// open as a bidirectional byte channel.
192 ///
193 /// Only meaningful when method is `CONNECT` and [`http_version`][Self::http_version] is
194 /// `Http2` or `Http3`. h1 and prior-version requests ignore this field.
195 #[field(get)]
196 pub(crate) protocol: Option<Cow<'static, str>>,
197
198 /// trailers sent with the request body, populated after the body has been fully sent.
199 ///
200 /// Only present when the request body was constructed with [`Body::new_with_trailers`] and
201 /// the body has been fully sent. For H3, this is populated after `send_h3_request`; for H1,
202 /// after `send_body` with a chunked body.
203 #[field(get)]
204 pub(crate) request_trailers: Option<Headers>,
205
206 /// trailers received with the response body, populated after the response body has been fully
207 /// read.
208 ///
209 /// For H3, these are decoded from the trailing HEADERS frame. For H1, from chunked trailers
210 /// (once H1 trailer receive is implemented).
211 #[field(get)]
212 pub(crate) response_trailers: Option<Headers>,
213}
214
215/// default http user-agent header
216pub const USER_AGENT: &str = concat!("trillium-client/", env!("CARGO_PKG_VERSION"));
217
218impl Conn {
219 /// chainable setter for [`inserting`](Headers::insert) a request header
220 ///
221 /// ```
222 /// use trillium_client::Client;
223 /// use trillium_testing::{client_config, with_server};
224 ///
225 /// let handler = |conn: trillium::Conn| async move {
226 /// let header = conn
227 /// .request_headers()
228 /// .get_str("some-request-header")
229 /// .unwrap_or_default();
230 /// let response = format!("some-request-header was {}", header);
231 /// conn.ok(response)
232 /// };
233 ///
234 /// with_server(handler, |url| async move {
235 /// let client = Client::new(client_config());
236 /// let mut conn = client
237 /// .get(url)
238 /// .with_request_header("some-request-header", "header-value") // <--
239 /// .await?;
240 /// assert_eq!(
241 /// conn.response_body().read_string().await?,
242 /// "some-request-header was header-value"
243 /// );
244 /// Ok(())
245 /// })
246 /// ```
247 pub fn with_request_header(
248 mut self,
249 name: impl Into<HeaderName<'static>>,
250 value: impl Into<HeaderValues>,
251 ) -> Self {
252 self.request_headers.insert(name, value);
253 self
254 }
255
256 /// chainable setter for `extending` request headers
257 ///
258 /// ```
259 /// use trillium_client::Client;
260 /// use trillium_testing::{client_config, with_server};
261 ///
262 /// let handler = |conn: trillium::Conn| async move {
263 /// let header = conn
264 /// .request_headers()
265 /// .get_str("some-request-header")
266 /// .unwrap_or_default();
267 /// let response = format!("some-request-header was {}", header);
268 /// conn.ok(response)
269 /// };
270 ///
271 /// with_server(handler, move |url| async move {
272 /// let client = Client::new(client_config());
273 /// let mut conn = client
274 /// .get(url)
275 /// .with_request_headers([
276 /// ("some-request-header", "header-value"),
277 /// ("some-other-req-header", "other-header-value"),
278 /// ])
279 /// .await?;
280 ///
281 /// assert_eq!(
282 /// conn.response_body().read_string().await?,
283 /// "some-request-header was header-value"
284 /// );
285 /// Ok(())
286 /// })
287 /// ```
288 pub fn with_request_headers<HN, HV, I>(mut self, headers: I) -> Self
289 where
290 I: IntoIterator<Item = (HN, HV)> + Send,
291 HN: Into<HeaderName<'static>>,
292 HV: Into<HeaderValues>,
293 {
294 self.request_headers.extend(headers);
295 self
296 }
297
298 /// Chainable method to remove a request header if present
299 pub fn without_request_header(mut self, name: impl Into<HeaderName<'static>>) -> Self {
300 self.request_headers.remove(name);
301 self
302 }
303
304 /// chainable setter for json body. this requires the `serde_json` crate feature to be enabled.
305 #[cfg(feature = "serde_json")]
306 pub fn with_json_body(self, body: &impl serde::Serialize) -> serde_json::Result<Self> {
307 use trillium_http::KnownHeaderName;
308
309 Ok(self
310 .with_body(serde_json::to_string(body)?)
311 .with_request_header(KnownHeaderName::ContentType, "application/json"))
312 }
313
314 /// chainable setter for json body. this requires the `sonic-rs` crate feature to be enabled.
315 #[cfg(feature = "sonic-rs")]
316 pub fn with_json_body(self, body: &impl serde::Serialize) -> sonic_rs::Result<Self> {
317 use trillium_http::KnownHeaderName;
318
319 Ok(self
320 .with_body(sonic_rs::to_string(body)?)
321 .with_request_header(KnownHeaderName::ContentType, "application/json"))
322 }
323
324 pub(crate) fn response_encoding(&self) -> &'static Encoding {
325 encoding(&self.response_headers)
326 }
327
328 /// returns a [`ResponseBody`](crate::ResponseBody) that borrows the connection inside this
329 /// conn.
330 /// ```
331 /// use trillium_client::Client;
332 /// use trillium_testing::{client_config, with_server};
333 ///
334 /// let handler = |mut conn: trillium::Conn| async move { conn.ok("hello from trillium") };
335 ///
336 /// with_server(handler, |url| async move {
337 /// let client = Client::from(client_config());
338 /// let mut conn = client.get(url).await?;
339 ///
340 /// let response_body = conn.response_body(); //<-
341 ///
342 /// assert_eq!(19, response_body.content_length().unwrap());
343 /// let string = response_body.read_string().await?;
344 /// assert_eq!("hello from trillium", string);
345 /// Ok(())
346 /// });
347 /// ```
348 #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
349 pub fn response_body(&mut self) -> ResponseBody<'_> {
350 ReceivedBody::new(
351 self.response_content_length(),
352 &mut self.buffer,
353 self.transport.as_mut().unwrap(),
354 &mut self.response_body_state,
355 None,
356 encoding(&self.response_headers),
357 )
358 .with_trailers(&mut self.response_trailers)
359 .with_protocol_session(self.protocol_session.clone())
360 .into()
361 }
362
363 /// Attempt to deserialize the response body. Note that this consumes the body content.
364 #[cfg(feature = "serde_json")]
365 pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
366 where
367 T: serde::de::DeserializeOwned,
368 {
369 let body = self.response_body().read_string().await?;
370 Ok(serde_json::from_str(&body)?)
371 }
372
373 /// Attempt to deserialize the response body. Note that this consumes the body content.
374 #[cfg(feature = "sonic-rs")]
375 pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
376 where
377 T: serde::de::DeserializeOwned,
378 {
379 let body = self.response_body().read_string().await?;
380 Ok(sonic_rs::from_str(&body)?)
381 }
382
383 /// Returns the conn or an [`UnexpectedStatusError`] that contains the conn
384 ///
385 /// ```
386 /// use trillium_client::{Client, Status};
387 /// use trillium_testing::{client_config, with_server};
388 ///
389 /// with_server(Status::NotFound, |url| async move {
390 /// let client = Client::new(client_config());
391 /// assert_eq!(
392 /// client.get(url).await?.success().unwrap_err().to_string(),
393 /// "expected a success (2xx) status code, but got 404 Not Found"
394 /// );
395 /// Ok(())
396 /// });
397 ///
398 /// with_server(Status::Ok, |url| async move {
399 /// let client = Client::new(client_config());
400 /// assert!(client.get(url).await?.success().is_ok());
401 /// Ok(())
402 /// });
403 /// ```
404 pub fn success(self) -> Result<Self, UnexpectedStatusError> {
405 match self.status() {
406 Some(status) if status.is_success() => Ok(self),
407 _ => Err(self.into()),
408 }
409 }
410
411 /// Returns this conn to the connection pool if it is keepalive, and
412 /// closes it otherwise. This will happen asynchronously as a spawned
413 /// task when the conn is dropped, but calling it explicitly allows
414 /// you to block on it and control where it happens.
415 pub async fn recycle(mut self) {
416 if self.is_keep_alive() && self.transport.is_some() && self.pool.is_some() {
417 self.finish_reading_body().await;
418 }
419 }
420
421 /// attempts to retrieve the connected peer address
422 pub fn peer_addr(&self) -> Option<SocketAddr> {
423 self.transport
424 .as_ref()
425 .and_then(|t| t.peer_addr().ok().flatten())
426 }
427
428 /// add state to the client conn and return self
429 pub fn with_state<T: Send + Sync + 'static>(mut self, state: T) -> Self {
430 self.insert_state(state);
431 self
432 }
433
434 /// add state to the client conn, returning any previously set state of this type
435 pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
436 self.state.insert(state)
437 }
438
439 /// borrow state
440 pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
441 self.state.get()
442 }
443
444 /// borrow state mutably
445 pub fn state_mut<T: Send + Sync + 'static>(&mut self) -> Option<&mut T> {
446 self.state.get_mut()
447 }
448
449 /// take state
450 pub fn take_state<T: Send + Sync + 'static>(&mut self) -> Option<T> {
451 self.state.take()
452 }
453}