ts_runtime/taildrop_send.rs
1//! Taildrop file *sender* — the client half of Tailscale's peer-to-peer file transfer.
2//!
3//! Where [`crate::taildrop`] + `peerapi` implement the *receiving* half (a peer pushes a
4//! file to this node and it lands in the on-disk store), this module implements the *sending* half:
5//! pushing a local file to a peer over the overlay peerAPI as `PUT /v0/put/<name>`, faithfully
6//! mirroring Go's wire sender.
7//!
8//! ## Wire contract
9//!
10//! We emit `PUT http://<peer>/v0/put/<url.PathEscape(name)>` with a `Content-Length: <size>` header
11//! and the file bytes as the body, then expect HTTP `200` on success. The receiver returns
12//! `409 Conflict` when a transfer for that name is already in progress, and `403` when the sender
13//! lacks the file-send capability. We always send **from offset 0** — the Range/resume GET that Go
14//! uses as an optimization to skip already-received bytes is deliberately omitted; a fresh full PUT
15//! is always correct. The name is percent-escaped exactly like Go `url.PathEscape` (see
16//! `path_escape`), the encoder counterpart to the receiver's `percent_decode`.
17//!
18//! ## Anti-leak
19//!
20//! ALL traffic rides the overlay netstack `channel` (`channel.tcp_connect`), so it travels the
21//! encrypted WireGuard tunnel to the peer — **never a host socket**. IPv4-only: the local bind and
22//! the destination are tailnet IPv4 addresses. This is the same discipline the DoH client
23//! (`peerapi_doh`) follows, mirrored here.
24
25use std::{
26 net::{Ipv4Addr, SocketAddr},
27 time::Duration,
28};
29
30use netstack::{CreateSocket, netcore::Channel};
31use tokio::{
32 io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
33 time::timeout,
34};
35
36/// How long we wait to dial the peer's peerAPI over the overlay before giving up.
37const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
38/// How long we wait for the peer's HTTP response head once the body has been sent. There is no
39/// overall body-send timeout — files can be large; we rely on the connection itself for liveness.
40const RESPONSE_TIMEOUT: Duration = Duration::from_secs(30);
41/// How long a single write (the request head, or one body chunk) may block before we give up. This
42/// bounds an otherwise-unbounded `write_all` against a hostile peer that accepts the connection then
43/// never drains its receive window — without it, the post-flush [`RESPONSE_TIMEOUT`] would never be
44/// reached because the body write hangs first. There is still no *total* body-send deadline (a slow
45/// but live peer streaming a large file is fine); this only caps a single stalled write.
46const WRITE_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
47/// Cap on the response headers we will buffer. The body is irrelevant (the receiver returns `{}`);
48/// only the status line matters, so a tiny cap suffices and bounds memory against a hostile peer.
49const MAX_RESP_HEADERS: usize = 8 * 1024;
50
51/// Errors from the Taildrop file *sender* ([`send_file`]). Payload-free except
52/// [`TaildropSendError::UnexpectedStatus`] so the type stays cheap to construct and compare.
53#[derive(Debug)]
54pub enum TaildropSendError {
55 /// Dialing the peer's peerAPI over the overlay failed.
56 Connect,
57 /// A write to or read from the overlay stream failed.
58 Io,
59 /// The file name failed [`crate::taildrop::validate_base_name`] (the receiver would reject it).
60 InvalidName,
61 /// The receiver returned `403` — this sender lacks the file-send capability.
62 Forbidden,
63 /// The receiver returned `409` — a transfer for this name is already in progress.
64 Conflict,
65 /// The receiver returned an HTTP status we do not handle. Carries the status code.
66 UnexpectedStatus(u16),
67 /// The dial or the response read exceeded its timeout.
68 Timeout,
69}
70
71impl core::fmt::Display for TaildropSendError {
72 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
73 match self {
74 TaildropSendError::Connect => write!(f, "failed to dial peer over the overlay"),
75 TaildropSendError::Io => write!(f, "taildrop send I/O error"),
76 TaildropSendError::InvalidName => write!(f, "invalid taildrop file name"),
77 TaildropSendError::Forbidden => {
78 write!(
79 f,
80 "peer rejected transfer: file-send capability denied (403)"
81 )
82 }
83 TaildropSendError::Conflict => {
84 write!(f, "a transfer for this file is already in progress (409)")
85 }
86 TaildropSendError::UnexpectedStatus(code) => {
87 write!(f, "peer returned unexpected status {code}")
88 }
89 TaildropSendError::Timeout => write!(f, "taildrop send timed out"),
90 }
91 }
92}
93
94impl std::error::Error for TaildropSendError {}
95
96/// Percent-escape a path segment exactly like Go `url.PathEscape`: unreserved bytes
97/// (`A-Z a-z 0-9 - _ . ~`) pass through verbatim; every other byte is encoded as `%XX` with
98/// uppercase hex. This is the encoder counterpart to the receiver's `percent_decode` (in
99/// `peerapi`), so `percent_decode(path_escape(x)) == x` holds. Hand-rolled, no new deps.
100pub(crate) fn path_escape(name: &str) -> String {
101 let mut out = String::with_capacity(name.len());
102 for &b in name.as_bytes() {
103 let unreserved = b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.' | b'~');
104 if unreserved {
105 out.push(b as char);
106 } else {
107 out.push('%');
108 out.push(hex_upper(b >> 4));
109 out.push(hex_upper(b & 0x0F));
110 }
111 }
112 out
113}
114
115/// Map a 4-bit nibble (`0..=15`) to its uppercase ASCII hex digit.
116fn hex_upper(nibble: u8) -> char {
117 match nibble {
118 0..=9 => (b'0' + nibble) as char,
119 _ => (b'A' + (nibble - 10)) as char,
120 }
121}
122
123/// Classify an HTTP status code into a send result. Pure, so it is unit-testable without a live
124/// stream: `2xx` is success; `403`/`409` map to their dedicated errors; anything else is
125/// [`TaildropSendError::UnexpectedStatus`].
126fn classify_status(code: u16) -> Result<(), TaildropSendError> {
127 match code {
128 200..=299 => Ok(()),
129 403 => Err(TaildropSendError::Forbidden),
130 409 => Err(TaildropSendError::Conflict),
131 _ => Err(TaildropSendError::UnexpectedStatus(code)),
132 }
133}
134
135/// Parse the 3-digit status code out of an HTTP/1.1 status line (`HTTP/1.1 <code> <reason>\r\n`).
136/// Pure and unit-testable: returns `None` if no `HTTP/`-prefixed status line with three ASCII
137/// digits after the first space can be found.
138fn parse_status_line(head: &[u8]) -> Option<u16> {
139 if !head.starts_with(b"HTTP/") {
140 return None;
141 }
142 let space = head.iter().position(|&b| b == b' ')?;
143 let digits = head.get(space + 1..space + 4)?;
144 if !digits.iter().all(|b| b.is_ascii_digit()) {
145 return None;
146 }
147 let code = (digits[0] - b'0') as u16 * 100
148 + (digits[1] - b'0') as u16 * 10
149 + (digits[2] - b'0') as u16;
150 Some(code)
151}
152
153/// Send the contents of `reader` (`content_length` bytes) to peer `dst` as a Taildrop
154/// `PUT /v0/put/<name>` over the overlay netstack `channel`, binding locally to `self_ipv4`.
155///
156/// The name is validated up front ([`crate::taildrop::validate_base_name`]) — the receiver would
157/// reject an unsafe name anyway, so we fail fast — then percent-escaped into the request path. The
158/// body is streamed to EOF (we trust the caller's declared `content_length`, like Go's
159/// `DeclaredSize`, and do not enforce that the read byte count matches it). Only the response status
160/// line is inspected (`classify_status`); the body is ignored.
161///
162/// Anti-leak: the connection is dialed over `channel`, so it rides the encrypted overlay — never a
163/// host socket.
164pub async fn send_file<R>(
165 channel: &Channel,
166 self_ipv4: Ipv4Addr,
167 dst: SocketAddr,
168 name: &str,
169 content_length: u64,
170 mut reader: R,
171) -> Result<(), TaildropSendError>
172where
173 R: AsyncRead + Unpin,
174{
175 // Fail fast on an unsafe name: the receiver validates it identically and would reject the PUT.
176 crate::taildrop::validate_base_name(name).ok_or(TaildropSendError::InvalidName)?;
177
178 let local = SocketAddr::new(self_ipv4.into(), 0);
179 tracing::debug!(%dst, name, content_length, "taildrop send: dialing peer over overlay");
180 let mut stream = timeout(CONNECT_TIMEOUT, channel.tcp_connect(local, dst))
181 .await
182 .map_err(|_| TaildropSendError::Timeout)?
183 .map_err(|_| TaildropSendError::Connect)?;
184
185 // Request head: PUT the percent-escaped name with the declared length. `Connection: close` so
186 // the peer closes after answering and our response read terminates on EOF.
187 let head = format!(
188 "PUT /v0/put/{} HTTP/1.1\r\nHost: {dst}\r\nContent-Length: {content_length}\r\nConnection: close\r\n\r\n",
189 path_escape(name),
190 );
191 write_all_bounded(&mut stream, head.as_bytes()).await?;
192
193 // Stream the body to EOF.
194 let mut buf = [0u8; 64 * 1024];
195 loop {
196 let n = reader
197 .read(&mut buf)
198 .await
199 .map_err(|_| TaildropSendError::Io)?;
200 if n == 0 {
201 break;
202 }
203 write_all_bounded(&mut stream, &buf[..n]).await?;
204 }
205 timeout(WRITE_IDLE_TIMEOUT, stream.flush())
206 .await
207 .map_err(|_| TaildropSendError::Timeout)?
208 .map_err(|_| TaildropSendError::Io)?;
209
210 // Read just the response head (the body is irrelevant), bounded by both size and time.
211 let code = timeout(RESPONSE_TIMEOUT, read_response_status(&mut stream))
212 .await
213 .map_err(|_| TaildropSendError::Timeout)??;
214
215 match classify_status(code) {
216 Ok(()) => Ok(()),
217 Err(e) => {
218 tracing::warn!(%dst, name, status = code, "taildrop send: peer rejected transfer");
219 Err(e)
220 }
221 }
222}
223
224/// `write_all` one buffer to `stream`, bounded by [`WRITE_IDLE_TIMEOUT`] so a peer that accepts the
225/// connection but never drains its receive window cannot block the send indefinitely. A timeout is a
226/// [`TaildropSendError::Timeout`]; any other write failure is [`TaildropSendError::Io`].
227async fn write_all_bounded<S>(stream: &mut S, data: &[u8]) -> Result<(), TaildropSendError>
228where
229 S: AsyncWriteExt + Unpin,
230{
231 timeout(WRITE_IDLE_TIMEOUT, stream.write_all(data))
232 .await
233 .map_err(|_| TaildropSendError::Timeout)?
234 .map_err(|_| TaildropSendError::Io)
235}
236
237/// Read an HTTP/1.1 response head from `stream` until the `\r\n\r\n` terminator, then parse out the
238/// status code. Caps the buffered headers at [`MAX_RESP_HEADERS`]; an oversized or unparseable head,
239/// or an early EOF, is an [`TaildropSendError::Io`]. The `find_header_end` gate guarantees the buffer
240/// passed to [`parse_status_line`] is always a fully `\r\n\r\n`-terminated head.
241async fn read_response_status<S>(stream: &mut S) -> Result<u16, TaildropSendError>
242where
243 S: AsyncRead + Unpin,
244{
245 let mut buf = Vec::with_capacity(1024);
246 let mut tmp = [0u8; 1024];
247 loop {
248 if crate::peerapi_doh::find_header_end(&buf).is_some() {
249 break;
250 }
251 if buf.len() > MAX_RESP_HEADERS {
252 return Err(TaildropSendError::Io);
253 }
254 let n = stream
255 .read(&mut tmp)
256 .await
257 .map_err(|_| TaildropSendError::Io)?;
258 if n == 0 {
259 return Err(TaildropSendError::Io);
260 }
261 buf.extend_from_slice(&tmp[..n]);
262 }
263 parse_status_line(&buf).ok_or(TaildropSendError::Io)
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn path_escape_leaves_unreserved_verbatim() {
272 assert_eq!(path_escape("photo.jpg"), "photo.jpg");
273 assert_eq!(
274 path_escape("AZaz09-_.~"),
275 "AZaz09-_.~",
276 "all unreserved bytes pass through"
277 );
278 }
279
280 #[test]
281 fn path_escape_encodes_reserved() {
282 assert_eq!(path_escape("my file.txt"), "my%20file.txt");
283 assert_eq!(path_escape("a/b"), "a%2Fb");
284 // Uppercase hex, non-ASCII multibyte.
285 assert_eq!(path_escape("é"), "%C3%A9");
286 }
287
288 #[test]
289 fn classify_status_maps_codes() {
290 assert!(classify_status(200).is_ok());
291 assert!(classify_status(204).is_ok());
292 assert!(matches!(
293 classify_status(403),
294 Err(TaildropSendError::Forbidden)
295 ));
296 assert!(matches!(
297 classify_status(409),
298 Err(TaildropSendError::Conflict)
299 ));
300 assert!(matches!(
301 classify_status(500),
302 Err(TaildropSendError::UnexpectedStatus(500))
303 ));
304 }
305
306 #[test]
307 fn parse_status_line_extracts_code() {
308 assert_eq!(
309 parse_status_line(b"HTTP/1.1 200 OK\r\nX: 1\r\n\r\n"),
310 Some(200)
311 );
312 assert_eq!(parse_status_line(b"HTTP/1.1 409 Conflict\r\n"), Some(409));
313 assert_eq!(parse_status_line(b"not http at all"), None);
314 assert_eq!(parse_status_line(b"HTTP/1.1 XX OK\r\n"), None);
315 assert_eq!(parse_status_line(b""), None);
316 }
317
318 #[test]
319 fn send_error_display_is_non_empty() {
320 for e in [
321 TaildropSendError::Connect,
322 TaildropSendError::Io,
323 TaildropSendError::InvalidName,
324 TaildropSendError::Forbidden,
325 TaildropSendError::Conflict,
326 TaildropSendError::UnexpectedStatus(418),
327 TaildropSendError::Timeout,
328 ] {
329 assert!(!e.to_string().is_empty());
330 }
331 }
332}