1use core::time::Duration;
16use std::fmt;
17
18use bytes::Bytes;
19use ts_capabilityversion::CapabilityVersion;
20use ts_control_serde::{
21 TkaBootstrapRequest, TkaBootstrapResponse, TkaSyncOfferRequest, TkaSyncOfferResponse,
22 TkaSyncSendRequest, TkaSyncSendResponse,
23};
24use ts_http_util::{BytesBody, ClientExt, Http2, ResponseExt, StatusCode};
25use ts_keys::NodePublicKey;
26use url::Url;
27
28use crate::tokio::connect::ConnectionError;
29
30const LOAD_BALANCER_HEADER_KEY: &str = "Ts-Lb";
31
32pub(crate) const TKA_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
36
37pub(crate) const MAX_TKA_SYNC_RESPONSE: usize = 10 * 1024 * 1024;
41
42#[derive(Debug, Clone, Copy, Eq, PartialEq)]
44pub enum TkaSyncInternalErrorKind {
45 Url,
47 SerDe,
49 Http,
51 Utf8,
53 TooLarge,
55}
56
57impl fmt::Display for TkaSyncInternalErrorKind {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 match self {
60 TkaSyncInternalErrorKind::Url => write!(f, "URL parsing error"),
61 TkaSyncInternalErrorKind::SerDe => write!(f, "serialization/deserialization error"),
62 TkaSyncInternalErrorKind::Http => write!(f, "unsuccessful HTTP request"),
63 TkaSyncInternalErrorKind::Utf8 => write!(f, "invalid UTF8"),
64 TkaSyncInternalErrorKind::TooLarge => write!(f, "response body too large"),
65 }
66 }
67}
68
69#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
71pub enum TkaSyncError {
72 #[error("network error during TKA sync")]
74 NetworkError,
75 #[error("control does not support TKA sync")]
79 Unsupported,
80 #[error("error during TKA sync: {0}")]
82 Internal(TkaSyncInternalErrorKind),
83}
84
85impl From<url::ParseError> for TkaSyncError {
86 fn from(error: url::ParseError) -> Self {
87 tracing::error!(%error, "bad URL building TKA-sync request");
88 TkaSyncError::Internal(TkaSyncInternalErrorKind::Url)
89 }
90}
91
92impl From<serde_json::Error> for TkaSyncError {
93 fn from(error: serde_json::Error) -> Self {
94 tracing::error!(%error, "serde error in TKA-sync request");
95 TkaSyncError::Internal(TkaSyncInternalErrorKind::SerDe)
96 }
97}
98
99impl From<core::str::Utf8Error> for TkaSyncError {
100 fn from(error: core::str::Utf8Error) -> Self {
101 tracing::error!(%error, "invalid utf8 in TKA-sync response");
102 TkaSyncError::Internal(TkaSyncInternalErrorKind::Utf8)
103 }
104}
105
106impl From<ts_http_util::Error> for TkaSyncError {
107 fn from(error: ts_http_util::Error) -> Self {
108 tracing::error!(%error, "http error in TKA-sync request");
109 if crate::http_error_is_recoverable(error) {
110 TkaSyncError::NetworkError
111 } else {
112 TkaSyncError::Internal(TkaSyncInternalErrorKind::Http)
113 }
114 }
115}
116
117impl From<ConnectionError> for TkaSyncError {
118 fn from(error: ConnectionError) -> Self {
119 use crate::tokio::connect::InternalErrorKind as Conn;
120 match error {
121 ConnectionError::NetworkError => TkaSyncError::NetworkError,
122 ConnectionError::Internal(k) => TkaSyncError::Internal(match k {
123 Conn::Url => TkaSyncInternalErrorKind::Url,
124 Conn::SerDe => TkaSyncInternalErrorKind::SerDe,
125 Conn::Http
126 | Conn::MessageFormat
127 | Conn::Io
128 | Conn::ChallengeLength
129 | Conn::NoiseHandshake => TkaSyncInternalErrorKind::Http,
130 }),
131 }
132 }
133}
134
135pub async fn tka_sync_offer(
138 control_url: &Url,
139 node_keystate: &ts_keys::NodeState,
140 offer: TkaSyncOfferRequest,
141 allow_http_key_fetch: bool,
142) -> Result<TkaSyncOfferResponse, TkaSyncError> {
143 let run = async {
144 let http2_conn = crate::tokio::connect(
145 control_url,
146 &node_keystate.machine_keys,
147 allow_http_key_fetch,
148 )
149 .await?;
150 tka_sync_offer_with(control_url, node_keystate, offer, &http2_conn).await
151 };
152 match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
153 Ok(result) => result,
154 Err(_elapsed) => {
155 tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA sync/offer timed out");
156 Err(TkaSyncError::NetworkError)
157 }
158 }
159}
160
161pub(crate) async fn tka_sync_offer_with(
164 control_url: &Url,
165 node_keystate: &ts_keys::NodeState,
166 mut offer: TkaSyncOfferRequest,
167 http2_conn: &Http2<BytesBody>,
168) -> Result<TkaSyncOfferResponse, TkaSyncError> {
169 let node_public_key = node_keystate.node_keys.public;
170 offer.node_key = node_public_key;
173 offer.version = CapabilityVersion::CURRENT;
174
175 let body = serde_json::to_string(&offer)?;
176 let url = control_url.join("machine/tka/sync/offer")?;
177 tracing::debug!(url = %url.as_str(), "TKA sync/offer to control");
178
179 let response = http2_conn
180 .get_with_body(
181 &url,
182 [lb_header(&node_public_key)],
183 Bytes::from(body).into(),
184 )
185 .await?;
186 let status = response.status();
187 let body = response
188 .collect_bytes_limited(MAX_TKA_SYNC_RESPONSE)
189 .await?;
190 parse_offer_response(status, &body)
191}
192
193pub async fn tka_sync_send(
196 control_url: &Url,
197 node_keystate: &ts_keys::NodeState,
198 send: TkaSyncSendRequest,
199 allow_http_key_fetch: bool,
200) -> Result<TkaSyncSendResponse, TkaSyncError> {
201 let run = async {
202 let http2_conn = crate::tokio::connect(
203 control_url,
204 &node_keystate.machine_keys,
205 allow_http_key_fetch,
206 )
207 .await?;
208 tka_sync_send_with(control_url, node_keystate, send, &http2_conn).await
209 };
210 match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
211 Ok(result) => result,
212 Err(_elapsed) => {
213 tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA sync/send timed out");
214 Err(TkaSyncError::NetworkError)
215 }
216 }
217}
218
219pub(crate) async fn tka_sync_send_with(
221 control_url: &Url,
222 node_keystate: &ts_keys::NodeState,
223 mut send: TkaSyncSendRequest,
224 http2_conn: &Http2<BytesBody>,
225) -> Result<TkaSyncSendResponse, TkaSyncError> {
226 let node_public_key = node_keystate.node_keys.public;
227 send.node_key = node_public_key;
228 send.version = CapabilityVersion::CURRENT;
229
230 let body = serde_json::to_string(&send)?;
231 let url = control_url.join("machine/tka/sync/send")?;
232 tracing::debug!(url = %url.as_str(), "TKA sync/send to control");
233
234 let response = http2_conn
235 .get_with_body(
236 &url,
237 [lb_header(&node_public_key)],
238 Bytes::from(body).into(),
239 )
240 .await?;
241 let status = response.status();
242 let body = response
243 .collect_bytes_limited(MAX_TKA_SYNC_RESPONSE)
244 .await?;
245 parse_send_response(status, &body)
246}
247
248pub async fn tka_bootstrap(
253 control_url: &Url,
254 node_keystate: &ts_keys::NodeState,
255 head: alloc::string::String,
256 allow_http_key_fetch: bool,
257) -> Result<TkaBootstrapResponse, TkaSyncError> {
258 let run = async {
259 let http2_conn = crate::tokio::connect(
260 control_url,
261 &node_keystate.machine_keys,
262 allow_http_key_fetch,
263 )
264 .await?;
265 tka_bootstrap_with(control_url, node_keystate, head, &http2_conn).await
266 };
267 match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
268 Ok(result) => result,
269 Err(_elapsed) => {
270 tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA bootstrap timed out");
271 Err(TkaSyncError::NetworkError)
272 }
273 }
274}
275
276pub(crate) async fn tka_bootstrap_with(
278 control_url: &Url,
279 node_keystate: &ts_keys::NodeState,
280 head: alloc::string::String,
281 http2_conn: &Http2<BytesBody>,
282) -> Result<TkaBootstrapResponse, TkaSyncError> {
283 let node_public_key = node_keystate.node_keys.public;
284 let req = TkaBootstrapRequest {
285 version: CapabilityVersion::CURRENT,
286 node_key: node_public_key,
287 head,
288 };
289 let body = serde_json::to_string(&req)?;
290 let url = control_url.join("machine/tka/bootstrap")?;
291 tracing::debug!(url = %url.as_str(), "TKA bootstrap to control");
292
293 let response = http2_conn
294 .get_with_body(
295 &url,
296 [lb_header(&node_public_key)],
297 Bytes::from(body).into(),
298 )
299 .await?;
300 let status = response.status();
301 let body = response
302 .collect_bytes_limited(MAX_TKA_SYNC_RESPONSE)
303 .await?;
304 parse_bootstrap_response(status, &body)
305}
306
307pub(crate) fn lb_header(
309 node_public_key: &NodePublicKey,
310) -> (ts_http_util::HeaderName, ts_http_util::HeaderValue) {
311 (
312 LOAD_BALANCER_HEADER_KEY.parse().unwrap(),
313 node_public_key.to_string().parse().unwrap(),
314 )
315}
316
317pub(crate) fn classify_status(status: StatusCode, body: &[u8]) -> Option<TkaSyncError> {
321 if status.is_success() {
322 return None;
323 }
324 if status == StatusCode::NOT_FOUND || status == StatusCode::NOT_IMPLEMENTED {
325 tracing::info!(%status, "control has no TKA-sync endpoint; staying inert");
326 return Some(TkaSyncError::Unsupported);
327 }
328 let mut truncated = body.to_vec();
329 truncated.truncate(512);
330 let preview = core::str::from_utf8(&truncated).unwrap_or("<invalid utf8>");
331 tracing::error!(body = %preview, %status, "TKA-sync request failed");
332 Some(TkaSyncError::Internal(TkaSyncInternalErrorKind::Http))
333}
334
335fn parse_offer_response(
336 status: StatusCode,
337 body: &[u8],
338) -> Result<TkaSyncOfferResponse, TkaSyncError> {
339 if let Some(err) = classify_status(status, body) {
340 return Err(err);
341 }
342 if body.len() > MAX_TKA_SYNC_RESPONSE {
347 return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
348 }
349 let body = core::str::from_utf8(body)?;
350 Ok(serde_json::from_str(body)?)
351}
352
353fn parse_send_response(
354 status: StatusCode,
355 body: &[u8],
356) -> Result<TkaSyncSendResponse, TkaSyncError> {
357 if let Some(err) = classify_status(status, body) {
358 return Err(err);
359 }
360 if body.len() > MAX_TKA_SYNC_RESPONSE {
361 return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
362 }
363 let body = core::str::from_utf8(body)?;
364 Ok(serde_json::from_str(body)?)
365}
366
367fn parse_bootstrap_response(
368 status: StatusCode,
369 body: &[u8],
370) -> Result<TkaBootstrapResponse, TkaSyncError> {
371 if let Some(err) = classify_status(status, body) {
372 return Err(err);
373 }
374 if body.len() > MAX_TKA_SYNC_RESPONSE {
375 return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
376 }
377 let body = core::str::from_utf8(body)?;
378 Ok(serde_json::from_str(body)?)
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 #[test]
386 fn parse_offer_response_ok() {
387 let json = br#"{"Head":"AEBAGBAF","Ancestors":["MFRGGZDF"],"MissingAUMs":["AQID"]}"#;
388 let resp = parse_offer_response(StatusCode::OK, json).expect("parse");
389 assert_eq!(resp.head, "AEBAGBAF");
390 assert_eq!(resp.ancestors, alloc::vec!["MFRGGZDF".to_string()]);
391 assert_eq!(resp.missing_aums, alloc::vec![alloc::vec![1u8, 2, 3]]);
392 }
393
394 #[test]
395 fn parse_offer_response_empty_missing_is_up_to_date() {
396 let json = br#"{"Head":"AEBAGBAF","Ancestors":[]}"#;
397 let resp = parse_offer_response(StatusCode::OK, json).expect("parse");
398 assert!(resp.missing_aums.is_empty());
399 }
400
401 #[test]
402 fn unsupported_status_maps_to_unsupported() {
403 assert_eq!(
404 parse_offer_response(StatusCode::NOT_FOUND, b"nope").unwrap_err(),
405 TkaSyncError::Unsupported
406 );
407 assert_eq!(
408 parse_send_response(StatusCode::NOT_IMPLEMENTED, b"").unwrap_err(),
409 TkaSyncError::Unsupported
410 );
411 }
412
413 #[test]
414 fn other_non_2xx_is_http_internal() {
415 assert_eq!(
416 parse_offer_response(StatusCode::INTERNAL_SERVER_ERROR, b"boom").unwrap_err(),
417 TkaSyncError::Internal(TkaSyncInternalErrorKind::Http)
418 );
419 }
420
421 #[test]
422 fn malformed_body_is_serde_error() {
423 let err = parse_offer_response(StatusCode::OK, b"not json").unwrap_err();
424 assert_eq!(err, TkaSyncError::Internal(TkaSyncInternalErrorKind::SerDe));
425 }
426
427 #[test]
428 fn parse_send_response_ok() {
429 let resp = parse_send_response(StatusCode::OK, br#"{"Head":"MFRGGZDF"}"#).expect("parse");
430 assert_eq!(resp.head, "MFRGGZDF");
431 }
432
433 #[test]
434 fn parse_bootstrap_response_ok() {
435 let json = br#"{"GenesisAUM":"AQID","DisablementSecret":"/w=="}"#;
437 let resp = parse_bootstrap_response(StatusCode::OK, json).expect("parse");
438 assert_eq!(resp.genesis_aum, alloc::vec![1u8, 2, 3]);
439 assert_eq!(resp.disablement_secret, alloc::vec![0xffu8]);
440 }
441
442 #[test]
443 fn parse_bootstrap_response_empty_when_no_genesis() {
444 let resp = parse_bootstrap_response(StatusCode::OK, b"{}").expect("parse");
445 assert!(
446 resp.genesis_aum.is_empty(),
447 "no genesis ⇒ empty (TKA not enabled)"
448 );
449 }
450
451 #[test]
452 fn parse_bootstrap_unsupported_status() {
453 assert_eq!(
454 parse_bootstrap_response(StatusCode::NOT_FOUND, b"").unwrap_err(),
455 TkaSyncError::Unsupported
456 );
457 }
458}