1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
use super::{Body, Conn, Transport, TypeSet};
use crate::{ClientHandler, ConnExt, Error, Result, Version};
use smallvec::SmallVec;
#[cfg(feature = "hickory")]
use std::net::IpAddr;
use std::{
borrow::Cow,
fmt::{self, Debug, Formatter},
future::{Future, IntoFuture},
mem,
net::SocketAddr,
pin::Pin,
};
use trillium_http::{ProtocolSession, Upgrade};
use trillium_server_common::Destination;
/// A wrapper error for [`trillium_http::Error`] or, depending on json serializer feature, either
/// `sonic_rs::Error` or `serde_json::Error`. Only available when either the `sonic-rs` or
/// `serde_json` cargo features are enabled.
#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
#[derive(thiserror::Error, Debug)]
pub enum ClientSerdeError {
/// A [`trillium_http::Error`]
#[error(transparent)]
HttpError(#[from] Error),
#[cfg(feature = "sonic-rs")]
/// A [`sonic_rs::Error`]
#[error(transparent)]
JsonError(#[from] sonic_rs::Error),
#[cfg(feature = "serde_json")]
/// A [`serde_json::Error`]
#[error(transparent)]
JsonError(#[from] serde_json::Error),
}
impl Conn {
pub(crate) async fn exec(&mut self) -> Result<()> {
// Arc-clone to dodge conflict with the `&mut self` we pass to `run`.
let handler = self.client.arc_handler().clone();
handler.run(self).await?;
if !self.halted {
// Stash, don't return: `after_response` runs unconditionally so recovery handlers
// (stale-if-error, retry-with-fallback) get a chance to clear it.
if let Err(e) = self.exec_network().await {
self.error = Some(e);
}
} else {
log::trace!("conn is halted, skipping network round-trip");
}
// Reverse order, regardless of halt/error — mirrors server-side `before_send`.
handler.after_response(self).await?;
if let Some(e) = self.error.take() {
Err(e)
} else {
Ok(())
}
}
async fn exec_network(&mut self) -> Result<()> {
if self.http_version == Some(Version::Http0_9) {
return Err(Error::UnsupportedVersion(Version::Http0_9));
}
// Phase 1 — reuse a live pooled connection, best protocol first. No DNS, no new connect.
// A pooled h2 connection is reused in preference to establishing a new h3 connection: we
// do not proactively migrate h2→h3, since a general-purpose client can't assume the
// request locality that makes eager migration pay off (see the migration-policy backlog
// item). A pooled h1 connection, by contrast, does not block establishing h3 below.
if self.try_reuse_h3_pool().await? {
return Ok(());
}
if self.try_exec_h2_pooled().await? {
return Ok(());
}
// Phase 2/3 — establish a new connection, preferring h3 when the origin is known to speak
// it (pinned, Alt-Svc, or SVCB). This runs before the h1 path, so h1→h3 is immediate.
if self.try_establish_h3().await? {
return Ok(());
}
// Prior-knowledge h2: caller asserted h2, skip h1/ALPN. Useful for TLS connectors
// that don't expose `negotiated_alpn` (e.g. native-tls). No fallback — a non-h2
// server here surfaces as a plain IO error.
if self.http_version == Some(Version::Http2) {
return self.exec_h2_prior_knowledge().await;
}
self.exec_h1_or_promote_h2().await
}
pub(crate) fn body_len(&self) -> Option<u64> {
if let Some(ref body) = self.request_body {
body.len()
} else {
Some(0)
}
}
pub(crate) fn finalize_headers(&mut self) -> Result<()> {
match self.http_version() {
Version::Http1_0 | Version::Http1_1 => self.finalize_headers_h1(),
Version::Http2 => self.finalize_headers_h2(),
Version::Http3 if self.client.h3().is_some() => self.finalize_headers_h3(),
other => Err(Error::UnsupportedVersion(other)),
}
}
/// The [`Destination`] for connecting to this conn's origin over h1/h2: scheme, host, and port
/// from the URL, plus any DoH-resolved addresses. A bare-IP origin keeps the address
/// [`from_url`](Destination::from_url) derived and is never resolved.
///
/// An explicit version pin constrains the connection's ALPN so the pin is honored over TLS: an
/// h1 pin advertises only `http/1.1` (a server that would otherwise negotiate `h2` falls back
/// to h1), an h2 pin advertises only `h2`. Without a pin the connector's configured default
/// ALPN is left in place, so auto-discovery can promote to h2 via ALPN.
pub(crate) async fn origin_destination(&self) -> Result<Destination> {
let mut destination = Destination::from_url(&self.url)?;
let addrs = self.origin_socket_addrs().await?;
if !addrs.is_empty() {
destination.set_addrs(addrs);
}
match self.http_version {
Some(Version::Http1_0 | Version::Http1_1) => {
destination.set_alpn([Cow::Borrowed(b"http/1.1".as_slice())]);
}
Some(Version::Http2) => {
destination.set_alpn([Cow::Borrowed(b"h2".as_slice())]);
}
_ => {}
}
Ok(destination)
}
/// Pre-resolved socket addresses for this conn's origin host:port, for the protocols that
/// always connect to the origin (h1/h2). Empty when DoH is not configured or the host is an IP
/// literal, so the connector falls back to its own (trivial, for an IP) resolution.
pub(crate) async fn origin_socket_addrs(&self) -> Result<SmallVec<[SocketAddr; 4]>> {
let Some(host) = self.url.host_str() else {
return Ok(SmallVec::new());
};
let port = self.url.port_or_known_default().unwrap_or(443);
self.resolve_socket_addrs(host, port).await
}
}
#[cfg(feature = "hickory")]
impl Conn {
/// Resolve `host:port` through the configured DoH resolver, or `None` when DoH is not
/// configured (so the caller falls back to the connector's own resolution).
///
/// The single place this conn touches DNS. The resolver reads and populates a shared, TTL'd
/// cache as a side effect, so repeated calls for the same host — across protocols, and across
/// the SVCB decision and the eventual connect — issue at most one set of queries.
///
/// Fail-closed: once DoH is configured, a lookup the resolver can't answer fails the request
/// rather than falling back to the (possibly plaintext) system resolver.
///
/// An IP-literal host is returned as `None` without touching the resolver — there is nothing to
/// look up, and no SVCB/HTTPS records exist for a bare address.
pub(crate) async fn resolve(
&self,
host: &str,
port: u16,
) -> Result<Option<crate::dns::Resolved>> {
if host.parse::<IpAddr>().is_ok() {
return Ok(None);
}
match &self.client.resolver {
Some(resolver) => Ok(Some(
resolver
.resolve(&self.client, host, port, self.timeout)
.await?,
)),
None => Ok(None),
}
}
pub(crate) async fn resolve_socket_addrs(
&self,
host: &str,
port: u16,
) -> Result<SmallVec<[SocketAddr; 4]>> {
Ok(self
.resolve(host, port)
.await?
.map(|resolved| resolved.socket_addrs(port))
.unwrap_or_default())
}
}
#[cfg(not(feature = "hickory"))]
impl Conn {
pub(crate) async fn resolve_socket_addrs(
&self,
_host: &str,
_port: u16,
) -> Result<SmallVec<[SocketAddr; 4]>> {
Ok(SmallVec::new())
}
}
impl Drop for Conn {
fn drop(&mut self) {
log::trace!("dropping client conn");
drop(self.take_response_body());
}
}
impl From<Conn> for Body {
fn from(mut conn: Conn) -> Body {
// body_override (e.g. cache hit, set via `set_response_body`) bypasses the transport;
// transport pooling is left to `Drop`.
if let Some(body) = conn.body_override.take() {
return body;
}
match conn.take_received_body(true) {
Some(rb) => rb.into(),
None => Body::default(),
}
}
}
impl From<Conn> for Upgrade<Box<dyn Transport>> {
/// Convert a client conn into a [`trillium_http::Upgrade`] after response headers
/// arrive, handing off the open transport for direct `AsyncRead` / `AsyncWrite`
/// exchange with per-protocol framing applied.
///
/// # Panics
///
/// Panics if the conn has no live transport (request not yet sent, or transport
/// already taken).
fn from(mut conn: Conn) -> Self {
// `Conn: Drop` rules out destructuring — pull each field with `mem::take` /
// `mem::replace`. New fields on `Conn` won't show up here automatically.
let path = conn.path.take().unwrap_or_else(|| match conn.url.query() {
Some(q) => Cow::Owned(format!("{}?{q}", conn.url.path())),
None => Cow::Owned(conn.url.path().to_owned()),
});
let secure = conn.url.scheme() == "https";
Upgrade::from_parts(
mem::take(&mut conn.response_headers),
mem::take(&mut conn.request_headers),
path,
conn.method,
conn.transport
.take()
.expect("client conn has no transport — request not yet sent"),
mem::take(&mut conn.buffer),
mem::take(&mut conn.state),
conn.context.clone(),
None,
conn.authority.take(),
conn.scheme.take(),
mem::replace(&mut conn.protocol_session, ProtocolSession::Http1),
conn.protocol.take(),
conn.http_version(),
conn.status,
secure,
// Client-side inbound = response body.
mem::take(&mut conn.response_body_state),
// Carry through any pre-upgrade-decoded trailers so a downstream reader
// can observe them.
conn.response_trailers.take(),
)
}
}
impl IntoFuture for Conn {
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'static>>;
type Output = Result<Conn>;
fn into_future(mut self) -> Self::IntoFuture {
Box::pin(async move { (&mut self).await.map(|()| self) })
}
}
impl<'conn> IntoFuture for &'conn mut Conn {
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'conn>>;
type Output = Result<()>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
// Re-issuing handlers (FollowRedirects, retry, auth-refresh) queue a follow-up
// via `set_followup` in `after_response`; we recycle, swap, re-exec.
loop {
let result = if let Some(duration) = self.timeout {
self.client
.connector()
.runtime()
.timeout(duration, self.exec())
.await
.unwrap_or(Err(Error::TimedOut("Conn", duration)))
} else {
self.exec().await
};
// `halted` is handler-internal; don't leak it out to the caller.
self.halted = false;
if let Err(e) = result {
// Unrecovered error wins over any queued follow-up. Recovery handlers
// that want the follow-up to run must `take_error()` in `after_response`.
self.followup = None;
return Err(e);
}
let Some(next) = self.take_followup() else {
break;
};
if let Some(body) = self.take_response_body() {
body.recycle().await;
}
let _displaced = mem::replace(self, next);
}
Ok(())
})
}
}
impl Debug for Conn {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Conn")
.field("authority", &self.authority)
.field("buffer", &String::from_utf8_lossy(&self.buffer))
.field("client", &self.client)
.field("protocol_session", &self.protocol_session)
.field("http_version", &self.http_version)
.field("method", &self.method)
.field("path", &self.path)
.field("request_body", &self.request_body)
.field("request_headers", &self.request_headers)
.field("request_target", &self.request_target)
.field("request_trailers", &self.request_trailers)
.field("response_body_state", &self.response_body_state)
.field("response_headers", &self.response_headers)
.field("response_trailers", &self.response_trailers)
.field("scheme", &self.scheme)
.field("state", &self.state)
.field("status", &self.status)
.field("url", &self.url)
.finish()
}
}
impl AsRef<TypeSet> for Conn {
fn as_ref(&self) -> &TypeSet {
&self.state
}
}
impl AsMut<TypeSet> for Conn {
fn as_mut(&mut self) -> &mut TypeSet {
&mut self.state
}
}