Skip to main content

ts_control/tokio/
tka_sync.rs

1//! Control RPCs for Tailnet-Lock (TKA) chain sync: `GET /machine/tka/sync/offer` and
2//! `GET /machine/tka/sync/send`, over the Noise (ts2021) transport.
3//!
4//! Mirrors Go's `tkaDoSyncOffer` / `tkaDoSyncSend` (`ipn/ipnlocal/tailnet-lock.go`, v1.100.0): the
5//! node sends its [`TkaSyncOfferRequest`] (head + ancestor sample), control replies with the AUMs
6//! the node is missing + control's own offer; the node then sends control the AUMs *it* is missing
7//! in a [`TkaSyncSendRequest`]. Both are HTTP `GET`s carrying a JSON body (yes — a GET with a body,
8//! matching upstream), and both responses are read behind a 10 MiB limit.
9//!
10//! Transport only: these functions speak the [`ts_control_serde`] wire types (base32 head strings,
11//! base64'd raw-CBOR AUM bytes). Converting to/from the domain `ts_tka::{Aum, AumHash, SyncOffer}`
12//! and driving the offer→Inform→send handshake is the runtime layer's job — keeping `ts_control`
13//! free of a `ts_tka` dependency (it is the wire crate, `ts_tka` is the chain-logic crate).
14
15use core::time::Duration;
16use std::fmt;
17
18use bytes::Bytes;
19use ts_capabilityversion::CapabilityVersion;
20use ts_control_serde::{
21    TkaBootstrapRequest, TkaBootstrapResponse, TkaSyncOfferRequest, TkaSyncOfferResponse,
22    TkaSyncSendRequest, TkaSyncSendResponse,
23};
24use ts_http_util::{BytesBody, ClientExt, Http2, ResponseExt, StatusCode};
25use ts_keys::NodePublicKey;
26use url::Url;
27
28use crate::tokio::connect::ConnectionError;
29
30const LOAD_BALANCER_HEADER_KEY: &str = "Ts-Lb";
31
32/// Upper bound on a single TKA-sync RPC (fresh Noise connect + GET + response read). A hung control
33/// plane is abandoned and reported as a transient [`TkaSyncError::NetworkError`] rather than pinning
34/// a half-open connection. Matches the id-token RPC's 30s bound.
35pub(crate) const TKA_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
36
37/// Cap on a TKA-sync response body (Go reads these behind a 10 MiB `io.LimitedReader`). A sync batch
38/// of AUMs is small in practice; the cap stops a hostile/buggy control plane from streaming an
39/// unbounded body into memory.
40pub(crate) const MAX_TKA_SYNC_RESPONSE: usize = 10 * 1024 * 1024;
41
42/// The internal failure kinds a TKA-sync request can surface (kept coarse for the public surface).
43#[derive(Debug, Clone, Copy, Eq, PartialEq)]
44pub enum TkaSyncInternalErrorKind {
45    /// Failed to build/parse a URL for the request.
46    Url,
47    /// Failed to serialize the request or deserialize the response body.
48    SerDe,
49    /// An unsuccessful (non-2xx) HTTP request, or an HTTP/transport error not classed as transient.
50    Http,
51    /// The response body was not valid UTF-8.
52    Utf8,
53    /// The response body exceeded `MAX_TKA_SYNC_RESPONSE`.
54    TooLarge,
55}
56
57impl fmt::Display for TkaSyncInternalErrorKind {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        match self {
60            TkaSyncInternalErrorKind::Url => write!(f, "URL parsing error"),
61            TkaSyncInternalErrorKind::SerDe => write!(f, "serialization/deserialization error"),
62            TkaSyncInternalErrorKind::Http => write!(f, "unsuccessful HTTP request"),
63            TkaSyncInternalErrorKind::Utf8 => write!(f, "invalid UTF8"),
64            TkaSyncInternalErrorKind::TooLarge => write!(f, "response body too large"),
65        }
66    }
67}
68
69/// Errors from a TKA-sync RPC.
70#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
71pub enum TkaSyncError {
72    /// A transient network error; the request may succeed on retry.
73    #[error("network error during TKA sync")]
74    NetworkError,
75    /// Control does not support TKA sync at this endpoint (404/501) — the tailnet has no lock, or the
76    /// control plane is too old. Callers treat this as "no Authority obtained" and stay inert; it is
77    /// **not** an error to surface up the netmap stream.
78    #[error("control does not support TKA sync")]
79    Unsupported,
80    /// An internal failure (URL/serde/HTTP/UTF-8/size). Detail kept coarse for the public surface.
81    #[error("error during TKA sync: {0}")]
82    Internal(TkaSyncInternalErrorKind),
83}
84
85impl From<url::ParseError> for TkaSyncError {
86    fn from(error: url::ParseError) -> Self {
87        tracing::error!(%error, "bad URL building TKA-sync request");
88        TkaSyncError::Internal(TkaSyncInternalErrorKind::Url)
89    }
90}
91
92impl From<serde_json::Error> for TkaSyncError {
93    fn from(error: serde_json::Error) -> Self {
94        tracing::error!(%error, "serde error in TKA-sync request");
95        TkaSyncError::Internal(TkaSyncInternalErrorKind::SerDe)
96    }
97}
98
99impl From<core::str::Utf8Error> for TkaSyncError {
100    fn from(error: core::str::Utf8Error) -> Self {
101        tracing::error!(%error, "invalid utf8 in TKA-sync response");
102        TkaSyncError::Internal(TkaSyncInternalErrorKind::Utf8)
103    }
104}
105
106impl From<ts_http_util::Error> for TkaSyncError {
107    fn from(error: ts_http_util::Error) -> Self {
108        tracing::error!(%error, "http error in TKA-sync request");
109        if crate::http_error_is_recoverable(error) {
110            TkaSyncError::NetworkError
111        } else {
112            TkaSyncError::Internal(TkaSyncInternalErrorKind::Http)
113        }
114    }
115}
116
117impl From<ConnectionError> for TkaSyncError {
118    fn from(error: ConnectionError) -> Self {
119        use crate::tokio::connect::InternalErrorKind as Conn;
120        match error {
121            ConnectionError::NetworkError => TkaSyncError::NetworkError,
122            ConnectionError::Internal(k) => TkaSyncError::Internal(match k {
123                Conn::Url => TkaSyncInternalErrorKind::Url,
124                Conn::SerDe => TkaSyncInternalErrorKind::SerDe,
125                Conn::Http
126                | Conn::MessageFormat
127                | Conn::Io
128                | Conn::ChallengeLength
129                | Conn::NoiseHandshake => TkaSyncInternalErrorKind::Http,
130            }),
131        }
132    }
133}
134
135/// Send a TKA `sync/offer` to control: our chain `offer`, returning control's response (its own
136/// offer + the AUMs we are missing). Opens a fresh Noise channel, bounded by `TKA_SYNC_TIMEOUT`.
137pub async fn tka_sync_offer(
138    control_url: &Url,
139    node_keystate: &ts_keys::NodeState,
140    offer: TkaSyncOfferRequest,
141    allow_http_key_fetch: bool,
142) -> Result<TkaSyncOfferResponse, TkaSyncError> {
143    let run = async {
144        let http2_conn = crate::tokio::connect(
145            control_url,
146            &node_keystate.machine_keys,
147            allow_http_key_fetch,
148        )
149        .await?;
150        tka_sync_offer_with(control_url, node_keystate, offer, &http2_conn).await
151    };
152    match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
153        Ok(result) => result,
154        Err(_elapsed) => {
155            tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA sync/offer timed out");
156            Err(TkaSyncError::NetworkError)
157        }
158    }
159}
160
161/// The offer RPC over an already-established Noise channel (factored out so the connect + send is
162/// timeout-wrappable and the send is testable against a mock `Http2`).
163pub(crate) async fn tka_sync_offer_with(
164    control_url: &Url,
165    node_keystate: &ts_keys::NodeState,
166    mut offer: TkaSyncOfferRequest,
167    http2_conn: &Http2<BytesBody>,
168) -> Result<TkaSyncOfferResponse, TkaSyncError> {
169    let node_public_key = node_keystate.node_keys.public;
170    // The request always carries this node's identity + the current capability version, regardless
171    // of how the caller built the offer body.
172    offer.node_key = node_public_key;
173    offer.version = CapabilityVersion::CURRENT;
174
175    let body = serde_json::to_string(&offer)?;
176    let url = control_url.join("machine/tka/sync/offer")?;
177    tracing::debug!(url = %url.as_str(), "TKA sync/offer to control");
178
179    let response = http2_conn
180        .get_with_body(
181            &url,
182            [lb_header(&node_public_key)],
183            Bytes::from(body).into(),
184        )
185        .await?;
186    let status = response.status();
187    let body = response
188        .collect_bytes_limited(MAX_TKA_SYNC_RESPONSE)
189        .await?;
190    parse_offer_response(status, &body)
191}
192
193/// Send a TKA `sync/send` to control: our (post-Inform) `send` request with the AUMs control is
194/// missing, returning control's resulting head. Fresh Noise channel, bounded by `TKA_SYNC_TIMEOUT`.
195pub async fn tka_sync_send(
196    control_url: &Url,
197    node_keystate: &ts_keys::NodeState,
198    send: TkaSyncSendRequest,
199    allow_http_key_fetch: bool,
200) -> Result<TkaSyncSendResponse, TkaSyncError> {
201    let run = async {
202        let http2_conn = crate::tokio::connect(
203            control_url,
204            &node_keystate.machine_keys,
205            allow_http_key_fetch,
206        )
207        .await?;
208        tka_sync_send_with(control_url, node_keystate, send, &http2_conn).await
209    };
210    match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
211        Ok(result) => result,
212        Err(_elapsed) => {
213            tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA sync/send timed out");
214            Err(TkaSyncError::NetworkError)
215        }
216    }
217}
218
219/// The send RPC over an already-established Noise channel.
220pub(crate) async fn tka_sync_send_with(
221    control_url: &Url,
222    node_keystate: &ts_keys::NodeState,
223    mut send: TkaSyncSendRequest,
224    http2_conn: &Http2<BytesBody>,
225) -> Result<TkaSyncSendResponse, TkaSyncError> {
226    let node_public_key = node_keystate.node_keys.public;
227    send.node_key = node_public_key;
228    send.version = CapabilityVersion::CURRENT;
229
230    let body = serde_json::to_string(&send)?;
231    let url = control_url.join("machine/tka/sync/send")?;
232    tracing::debug!(url = %url.as_str(), "TKA sync/send to control");
233
234    let response = http2_conn
235        .get_with_body(
236            &url,
237            [lb_header(&node_public_key)],
238            Bytes::from(body).into(),
239        )
240        .await?;
241    let status = response.status();
242    let body = response
243        .collect_bytes_limited(MAX_TKA_SYNC_RESPONSE)
244        .await?;
245    parse_send_response(status, &body)
246}
247
248/// Fetch the TKA bootstrap (genesis AUM) from control: the entry point that gives a node with no
249/// chain yet the initial AUM to build its `Authority` from, before the offer/send catch-up
250/// (Go `tkaFetchBootstrap`). `head` is the node's current known head (empty when it has none).
251/// Fresh Noise channel, bounded by `TKA_SYNC_TIMEOUT`.
252pub async fn tka_bootstrap(
253    control_url: &Url,
254    node_keystate: &ts_keys::NodeState,
255    head: alloc::string::String,
256    allow_http_key_fetch: bool,
257) -> Result<TkaBootstrapResponse, TkaSyncError> {
258    let run = async {
259        let http2_conn = crate::tokio::connect(
260            control_url,
261            &node_keystate.machine_keys,
262            allow_http_key_fetch,
263        )
264        .await?;
265        tka_bootstrap_with(control_url, node_keystate, head, &http2_conn).await
266    };
267    match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
268        Ok(result) => result,
269        Err(_elapsed) => {
270            tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA bootstrap timed out");
271            Err(TkaSyncError::NetworkError)
272        }
273    }
274}
275
276/// The bootstrap RPC over an already-established Noise channel.
277pub(crate) async fn tka_bootstrap_with(
278    control_url: &Url,
279    node_keystate: &ts_keys::NodeState,
280    head: alloc::string::String,
281    http2_conn: &Http2<BytesBody>,
282) -> Result<TkaBootstrapResponse, TkaSyncError> {
283    let node_public_key = node_keystate.node_keys.public;
284    let req = TkaBootstrapRequest {
285        version: CapabilityVersion::CURRENT,
286        node_key: node_public_key,
287        head,
288    };
289    let body = serde_json::to_string(&req)?;
290    let url = control_url.join("machine/tka/bootstrap")?;
291    tracing::debug!(url = %url.as_str(), "TKA bootstrap to control");
292
293    let response = http2_conn
294        .get_with_body(
295            &url,
296            [lb_header(&node_public_key)],
297            Bytes::from(body).into(),
298        )
299        .await?;
300    let status = response.status();
301    let body = response
302        .collect_bytes_limited(MAX_TKA_SYNC_RESPONSE)
303        .await?;
304    parse_bootstrap_response(status, &body)
305}
306
307/// The `Ts-Lb` load-balancer header (the node public key), as every other `/machine/*` RPC sets.
308pub(crate) fn lb_header(
309    node_public_key: &NodePublicKey,
310) -> (ts_http_util::HeaderName, ts_http_util::HeaderValue) {
311    (
312        LOAD_BALANCER_HEADER_KEY.parse().unwrap(),
313        node_public_key.to_string().parse().unwrap(),
314    )
315}
316
317/// Map a non-2xx status to the right error: 404/501 ⇒ [`TkaSyncError::Unsupported`] (control has no
318/// TKA endpoint — stay inert), anything else ⇒ a coarse HTTP internal error. Pure (no I/O), so the
319/// status/body branch logic is unit-testable without a live stream.
320pub(crate) fn classify_status(status: StatusCode, body: &[u8]) -> Option<TkaSyncError> {
321    if status.is_success() {
322        return None;
323    }
324    if status == StatusCode::NOT_FOUND || status == StatusCode::NOT_IMPLEMENTED {
325        tracing::info!(%status, "control has no TKA-sync endpoint; staying inert");
326        return Some(TkaSyncError::Unsupported);
327    }
328    let mut truncated = body.to_vec();
329    truncated.truncate(512);
330    let preview = core::str::from_utf8(&truncated).unwrap_or("<invalid utf8>");
331    tracing::error!(body = %preview, %status, "TKA-sync request failed");
332    Some(TkaSyncError::Internal(TkaSyncInternalErrorKind::Http))
333}
334
335fn parse_offer_response(
336    status: StatusCode,
337    body: &[u8],
338) -> Result<TkaSyncOfferResponse, TkaSyncError> {
339    if let Some(err) = classify_status(status, body) {
340        return Err(err);
341    }
342    // Defense-in-depth: the network read now uses `collect_bytes_limited(MAX_TKA_SYNC_RESPONSE)`, so
343    // an over-cap body is already rejected (as `BodyTooLarge`) before reaching here. This length
344    // guard still covers the pure-`&[u8]` parse path (e.g. unit tests, or any future non-limited
345    // caller) and keeps the typed `TooLarge` outcome for it.
346    if body.len() > MAX_TKA_SYNC_RESPONSE {
347        return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
348    }
349    let body = core::str::from_utf8(body)?;
350    Ok(serde_json::from_str(body)?)
351}
352
353fn parse_send_response(
354    status: StatusCode,
355    body: &[u8],
356) -> Result<TkaSyncSendResponse, TkaSyncError> {
357    if let Some(err) = classify_status(status, body) {
358        return Err(err);
359    }
360    if body.len() > MAX_TKA_SYNC_RESPONSE {
361        return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
362    }
363    let body = core::str::from_utf8(body)?;
364    Ok(serde_json::from_str(body)?)
365}
366
367fn parse_bootstrap_response(
368    status: StatusCode,
369    body: &[u8],
370) -> Result<TkaBootstrapResponse, TkaSyncError> {
371    if let Some(err) = classify_status(status, body) {
372        return Err(err);
373    }
374    if body.len() > MAX_TKA_SYNC_RESPONSE {
375        return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
376    }
377    let body = core::str::from_utf8(body)?;
378    Ok(serde_json::from_str(body)?)
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn parse_offer_response_ok() {
387        let json = br#"{"Head":"AEBAGBAF","Ancestors":["MFRGGZDF"],"MissingAUMs":["AQID"]}"#;
388        let resp = parse_offer_response(StatusCode::OK, json).expect("parse");
389        assert_eq!(resp.head, "AEBAGBAF");
390        assert_eq!(resp.ancestors, alloc::vec!["MFRGGZDF".to_string()]);
391        assert_eq!(resp.missing_aums, alloc::vec![alloc::vec![1u8, 2, 3]]);
392    }
393
394    #[test]
395    fn parse_offer_response_empty_missing_is_up_to_date() {
396        let json = br#"{"Head":"AEBAGBAF","Ancestors":[]}"#;
397        let resp = parse_offer_response(StatusCode::OK, json).expect("parse");
398        assert!(resp.missing_aums.is_empty());
399    }
400
401    #[test]
402    fn unsupported_status_maps_to_unsupported() {
403        assert_eq!(
404            parse_offer_response(StatusCode::NOT_FOUND, b"nope").unwrap_err(),
405            TkaSyncError::Unsupported
406        );
407        assert_eq!(
408            parse_send_response(StatusCode::NOT_IMPLEMENTED, b"").unwrap_err(),
409            TkaSyncError::Unsupported
410        );
411    }
412
413    #[test]
414    fn other_non_2xx_is_http_internal() {
415        assert_eq!(
416            parse_offer_response(StatusCode::INTERNAL_SERVER_ERROR, b"boom").unwrap_err(),
417            TkaSyncError::Internal(TkaSyncInternalErrorKind::Http)
418        );
419    }
420
421    #[test]
422    fn malformed_body_is_serde_error() {
423        let err = parse_offer_response(StatusCode::OK, b"not json").unwrap_err();
424        assert_eq!(err, TkaSyncError::Internal(TkaSyncInternalErrorKind::SerDe));
425    }
426
427    #[test]
428    fn parse_send_response_ok() {
429        let resp = parse_send_response(StatusCode::OK, br#"{"Head":"MFRGGZDF"}"#).expect("parse");
430        assert_eq!(resp.head, "MFRGGZDF");
431    }
432
433    #[test]
434    fn parse_bootstrap_response_ok() {
435        // GenesisAUM "AQID" = bytes {1,2,3}.
436        let json = br#"{"GenesisAUM":"AQID","DisablementSecret":"/w=="}"#;
437        let resp = parse_bootstrap_response(StatusCode::OK, json).expect("parse");
438        assert_eq!(resp.genesis_aum, alloc::vec![1u8, 2, 3]);
439        assert_eq!(resp.disablement_secret, alloc::vec![0xffu8]);
440    }
441
442    #[test]
443    fn parse_bootstrap_response_empty_when_no_genesis() {
444        let resp = parse_bootstrap_response(StatusCode::OK, b"{}").expect("parse");
445        assert!(
446            resp.genesis_aum.is_empty(),
447            "no genesis ⇒ empty (TKA not enabled)"
448        );
449    }
450
451    #[test]
452    fn parse_bootstrap_unsupported_status() {
453        assert_eq!(
454            parse_bootstrap_response(StatusCode::NOT_FOUND, b"").unwrap_err(),
455            TkaSyncError::Unsupported
456        );
457    }
458}