use core::time::Duration;
use std::fmt;
use bytes::Bytes;
use ts_capabilityversion::CapabilityVersion;
use ts_control_serde::{
TkaBootstrapRequest, TkaBootstrapResponse, TkaSyncOfferRequest, TkaSyncOfferResponse,
TkaSyncSendRequest, TkaSyncSendResponse,
};
use ts_http_util::{BytesBody, ClientExt, Http2, ResponseExt, StatusCode};
use ts_keys::NodePublicKey;
use url::Url;
use crate::tokio::connect::ConnectionError;
const LOAD_BALANCER_HEADER_KEY: &str = "Ts-Lb";
const TKA_SYNC_TIMEOUT: Duration = Duration::from_secs(30);
const MAX_TKA_SYNC_RESPONSE: usize = 10 * 1024 * 1024;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum TkaSyncInternalErrorKind {
Url,
SerDe,
Http,
Utf8,
TooLarge,
}
impl fmt::Display for TkaSyncInternalErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TkaSyncInternalErrorKind::Url => write!(f, "URL parsing error"),
TkaSyncInternalErrorKind::SerDe => write!(f, "serialization/deserialization error"),
TkaSyncInternalErrorKind::Http => write!(f, "unsuccessful HTTP request"),
TkaSyncInternalErrorKind::Utf8 => write!(f, "invalid UTF8"),
TkaSyncInternalErrorKind::TooLarge => write!(f, "response body too large"),
}
}
}
#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
pub enum TkaSyncError {
#[error("network error during TKA sync")]
NetworkError,
#[error("control does not support TKA sync")]
Unsupported,
#[error("error during TKA sync: {0}")]
Internal(TkaSyncInternalErrorKind),
}
impl From<url::ParseError> for TkaSyncError {
fn from(error: url::ParseError) -> Self {
tracing::error!(%error, "bad URL building TKA-sync request");
TkaSyncError::Internal(TkaSyncInternalErrorKind::Url)
}
}
impl From<serde_json::Error> for TkaSyncError {
fn from(error: serde_json::Error) -> Self {
tracing::error!(%error, "serde error in TKA-sync request");
TkaSyncError::Internal(TkaSyncInternalErrorKind::SerDe)
}
}
impl From<core::str::Utf8Error> for TkaSyncError {
fn from(error: core::str::Utf8Error) -> Self {
tracing::error!(%error, "invalid utf8 in TKA-sync response");
TkaSyncError::Internal(TkaSyncInternalErrorKind::Utf8)
}
}
impl From<ts_http_util::Error> for TkaSyncError {
fn from(error: ts_http_util::Error) -> Self {
tracing::error!(%error, "http error in TKA-sync request");
if crate::http_error_is_recoverable(error) {
TkaSyncError::NetworkError
} else {
TkaSyncError::Internal(TkaSyncInternalErrorKind::Http)
}
}
}
impl From<ConnectionError> for TkaSyncError {
fn from(error: ConnectionError) -> Self {
use crate::tokio::connect::InternalErrorKind as Conn;
match error {
ConnectionError::NetworkError => TkaSyncError::NetworkError,
ConnectionError::Internal(k) => TkaSyncError::Internal(match k {
Conn::Url => TkaSyncInternalErrorKind::Url,
Conn::SerDe => TkaSyncInternalErrorKind::SerDe,
Conn::Http
| Conn::MessageFormat
| Conn::Io
| Conn::ChallengeLength
| Conn::NoiseHandshake => TkaSyncInternalErrorKind::Http,
}),
}
}
}
pub async fn tka_sync_offer(
control_url: &Url,
node_keystate: &ts_keys::NodeState,
offer: TkaSyncOfferRequest,
allow_http_key_fetch: bool,
) -> Result<TkaSyncOfferResponse, TkaSyncError> {
let run = async {
let http2_conn = crate::tokio::connect(
control_url,
&node_keystate.machine_keys,
allow_http_key_fetch,
)
.await?;
tka_sync_offer_with(control_url, node_keystate, offer, &http2_conn).await
};
match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
Ok(result) => result,
Err(_elapsed) => {
tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA sync/offer timed out");
Err(TkaSyncError::NetworkError)
}
}
}
pub(crate) async fn tka_sync_offer_with(
control_url: &Url,
node_keystate: &ts_keys::NodeState,
mut offer: TkaSyncOfferRequest,
http2_conn: &Http2<BytesBody>,
) -> Result<TkaSyncOfferResponse, TkaSyncError> {
let node_public_key = node_keystate.node_keys.public;
offer.node_key = node_public_key;
offer.version = CapabilityVersion::CURRENT;
let body = serde_json::to_string(&offer)?;
let url = control_url.join("machine/tka/sync/offer")?;
tracing::debug!(url = %url.as_str(), "TKA sync/offer to control");
let response = http2_conn
.get_with_body(
&url,
[lb_header(&node_public_key)],
Bytes::from(body).into(),
)
.await?;
let status = response.status();
let body = response.collect_bytes().await?;
parse_offer_response(status, &body)
}
pub async fn tka_sync_send(
control_url: &Url,
node_keystate: &ts_keys::NodeState,
send: TkaSyncSendRequest,
allow_http_key_fetch: bool,
) -> Result<TkaSyncSendResponse, TkaSyncError> {
let run = async {
let http2_conn = crate::tokio::connect(
control_url,
&node_keystate.machine_keys,
allow_http_key_fetch,
)
.await?;
tka_sync_send_with(control_url, node_keystate, send, &http2_conn).await
};
match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
Ok(result) => result,
Err(_elapsed) => {
tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA sync/send timed out");
Err(TkaSyncError::NetworkError)
}
}
}
pub(crate) async fn tka_sync_send_with(
control_url: &Url,
node_keystate: &ts_keys::NodeState,
mut send: TkaSyncSendRequest,
http2_conn: &Http2<BytesBody>,
) -> Result<TkaSyncSendResponse, TkaSyncError> {
let node_public_key = node_keystate.node_keys.public;
send.node_key = node_public_key;
send.version = CapabilityVersion::CURRENT;
let body = serde_json::to_string(&send)?;
let url = control_url.join("machine/tka/sync/send")?;
tracing::debug!(url = %url.as_str(), "TKA sync/send to control");
let response = http2_conn
.get_with_body(
&url,
[lb_header(&node_public_key)],
Bytes::from(body).into(),
)
.await?;
let status = response.status();
let body = response.collect_bytes().await?;
parse_send_response(status, &body)
}
pub async fn tka_bootstrap(
control_url: &Url,
node_keystate: &ts_keys::NodeState,
head: alloc::string::String,
allow_http_key_fetch: bool,
) -> Result<TkaBootstrapResponse, TkaSyncError> {
let run = async {
let http2_conn = crate::tokio::connect(
control_url,
&node_keystate.machine_keys,
allow_http_key_fetch,
)
.await?;
tka_bootstrap_with(control_url, node_keystate, head, &http2_conn).await
};
match tokio::time::timeout(TKA_SYNC_TIMEOUT, run).await {
Ok(result) => result,
Err(_elapsed) => {
tracing::error!(timeout = ?TKA_SYNC_TIMEOUT, "TKA bootstrap timed out");
Err(TkaSyncError::NetworkError)
}
}
}
pub(crate) async fn tka_bootstrap_with(
control_url: &Url,
node_keystate: &ts_keys::NodeState,
head: alloc::string::String,
http2_conn: &Http2<BytesBody>,
) -> Result<TkaBootstrapResponse, TkaSyncError> {
let node_public_key = node_keystate.node_keys.public;
let req = TkaBootstrapRequest {
version: CapabilityVersion::CURRENT,
node_key: node_public_key,
head,
};
let body = serde_json::to_string(&req)?;
let url = control_url.join("machine/tka/bootstrap")?;
tracing::debug!(url = %url.as_str(), "TKA bootstrap to control");
let response = http2_conn
.get_with_body(
&url,
[lb_header(&node_public_key)],
Bytes::from(body).into(),
)
.await?;
let status = response.status();
let body = response.collect_bytes().await?;
parse_bootstrap_response(status, &body)
}
fn lb_header(
node_public_key: &NodePublicKey,
) -> (ts_http_util::HeaderName, ts_http_util::HeaderValue) {
(
LOAD_BALANCER_HEADER_KEY.parse().unwrap(),
node_public_key.to_string().parse().unwrap(),
)
}
fn classify_status(status: StatusCode, body: &[u8]) -> Option<TkaSyncError> {
if status.is_success() {
return None;
}
if status == StatusCode::NOT_FOUND || status == StatusCode::NOT_IMPLEMENTED {
tracing::info!(%status, "control has no TKA-sync endpoint; staying inert");
return Some(TkaSyncError::Unsupported);
}
let mut truncated = body.to_vec();
truncated.truncate(512);
let preview = core::str::from_utf8(&truncated).unwrap_or("<invalid utf8>");
tracing::error!(body = %preview, %status, "TKA-sync request failed");
Some(TkaSyncError::Internal(TkaSyncInternalErrorKind::Http))
}
fn parse_offer_response(
status: StatusCode,
body: &[u8],
) -> Result<TkaSyncOfferResponse, TkaSyncError> {
if let Some(err) = classify_status(status, body) {
return Err(err);
}
if body.len() > MAX_TKA_SYNC_RESPONSE {
return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
}
let body = core::str::from_utf8(body)?;
Ok(serde_json::from_str(body)?)
}
fn parse_send_response(
status: StatusCode,
body: &[u8],
) -> Result<TkaSyncSendResponse, TkaSyncError> {
if let Some(err) = classify_status(status, body) {
return Err(err);
}
if body.len() > MAX_TKA_SYNC_RESPONSE {
return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
}
let body = core::str::from_utf8(body)?;
Ok(serde_json::from_str(body)?)
}
fn parse_bootstrap_response(
status: StatusCode,
body: &[u8],
) -> Result<TkaBootstrapResponse, TkaSyncError> {
if let Some(err) = classify_status(status, body) {
return Err(err);
}
if body.len() > MAX_TKA_SYNC_RESPONSE {
return Err(TkaSyncError::Internal(TkaSyncInternalErrorKind::TooLarge));
}
let body = core::str::from_utf8(body)?;
Ok(serde_json::from_str(body)?)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_offer_response_ok() {
let json = br#"{"Head":"AEBAGBAF","Ancestors":["MFRGGZDF"],"MissingAUMs":["AQID"]}"#;
let resp = parse_offer_response(StatusCode::OK, json).expect("parse");
assert_eq!(resp.head, "AEBAGBAF");
assert_eq!(resp.ancestors, alloc::vec!["MFRGGZDF".to_string()]);
assert_eq!(resp.missing_aums, alloc::vec![alloc::vec![1u8, 2, 3]]);
}
#[test]
fn parse_offer_response_empty_missing_is_up_to_date() {
let json = br#"{"Head":"AEBAGBAF","Ancestors":[]}"#;
let resp = parse_offer_response(StatusCode::OK, json).expect("parse");
assert!(resp.missing_aums.is_empty());
}
#[test]
fn unsupported_status_maps_to_unsupported() {
assert_eq!(
parse_offer_response(StatusCode::NOT_FOUND, b"nope").unwrap_err(),
TkaSyncError::Unsupported
);
assert_eq!(
parse_send_response(StatusCode::NOT_IMPLEMENTED, b"").unwrap_err(),
TkaSyncError::Unsupported
);
}
#[test]
fn other_non_2xx_is_http_internal() {
assert_eq!(
parse_offer_response(StatusCode::INTERNAL_SERVER_ERROR, b"boom").unwrap_err(),
TkaSyncError::Internal(TkaSyncInternalErrorKind::Http)
);
}
#[test]
fn malformed_body_is_serde_error() {
let err = parse_offer_response(StatusCode::OK, b"not json").unwrap_err();
assert_eq!(err, TkaSyncError::Internal(TkaSyncInternalErrorKind::SerDe));
}
#[test]
fn parse_send_response_ok() {
let resp = parse_send_response(StatusCode::OK, br#"{"Head":"MFRGGZDF"}"#).expect("parse");
assert_eq!(resp.head, "MFRGGZDF");
}
#[test]
fn parse_bootstrap_response_ok() {
let json = br#"{"GenesisAUM":"AQID","DisablementSecret":"/w=="}"#;
let resp = parse_bootstrap_response(StatusCode::OK, json).expect("parse");
assert_eq!(resp.genesis_aum, alloc::vec![1u8, 2, 3]);
assert_eq!(resp.disablement_secret, alloc::vec![0xffu8]);
}
#[test]
fn parse_bootstrap_response_empty_when_no_genesis() {
let resp = parse_bootstrap_response(StatusCode::OK, b"{}").expect("parse");
assert!(
resp.genesis_aum.is_empty(),
"no genesis ⇒ empty (TKA not enabled)"
);
}
#[test]
fn parse_bootstrap_unsupported_status() {
assert_eq!(
parse_bootstrap_response(StatusCode::NOT_FOUND, b"").unwrap_err(),
TkaSyncError::Unsupported
);
}
}