use std::sync::Arc;
use ts_control::{
TkaSyncError, TkaSyncOfferRequest, TkaSyncSendRequest, tka_bootstrap, tka_sync_offer,
tka_sync_send,
};
use ts_tka::{Aum, AumHash, Authority, MemAumStore, SyncOffer, VerifiedAumChain};
pub(crate) struct SyncedTka {
pub authority: Arc<Authority>,
pub store: MemAumStore,
pub oldest: AumHash,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TkaSyncDriverError {
#[error("TKA sync RPC failed: {0}")]
Rpc(#[from] TkaSyncError),
#[error("TKA chain verification failed: {0}")]
Chain(#[from] ts_tka::TkaError),
}
fn decode_aums(marshaled: &[Vec<u8>]) -> Result<Vec<Aum>, ts_tka::TkaError> {
marshaled.iter().map(|b| Aum::from_cbor(b)).collect()
}
fn rebuild_authority(store: &MemAumStore, oldest: AumHash) -> Result<Authority, ts_tka::TkaError> {
let chain = store.linear_chain_from(oldest)?;
let verified = VerifiedAumChain::verify(&chain)?;
Ok(Authority::from_verified_chain(verified))
}
pub(crate) async fn sync_tka(
config: &ts_control::Config,
keys: &ts_keys::NodeState,
current: Option<SyncedTka>,
) -> Result<Option<SyncedTka>, TkaSyncDriverError> {
let control_url = &config.server_url;
let allow_http_key_fetch = config.allow_http_key_fetch;
let (mut store, oldest, mut authority) = match current {
Some(s) => (s.store, s.oldest, (*s.authority).clone()),
None => {
let resp = tka_bootstrap(
control_url,
keys,
String::new(), allow_http_key_fetch,
)
.await?;
if resp.genesis_aum.is_empty() {
return Ok(None);
}
let genesis = Aum::from_cbor(&resp.genesis_aum)?;
let oldest = genesis.hash();
let mut store = MemAumStore::new();
store.insert(genesis);
let authority = rebuild_authority(&store, oldest)?;
(store, oldest, authority)
}
};
let local_offer = authority.sync_offer(&store, oldest)?;
let offer_req = TkaSyncOfferRequest {
version: Default::default(), node_key: keys.node_keys.public,
head: local_offer.head.to_base32(),
ancestors: local_offer
.ancestors
.iter()
.map(|a| a.to_base32())
.collect(),
};
let offer_resp = tka_sync_offer(control_url, keys, offer_req, allow_http_key_fetch).await?;
let control_offer = parse_offer(&offer_resp.head, &offer_resp.ancestors)?;
let received = decode_aums(&offer_resp.missing_aums)?;
for aum in &received {
store.insert(aum.clone());
}
let to_send = authority
.missing_aums(&store, &control_offer, oldest)
.unwrap_or_default();
authority = rebuild_authority(&store, oldest)?;
let send_req = TkaSyncSendRequest {
version: Default::default(),
node_key: keys.node_keys.public,
head: authority.head().to_base32(),
missing_aums: to_send.iter().map(Aum::serialize).collect(),
interactive: false,
};
if let Err(e) = tka_sync_send(control_url, keys, send_req, allow_http_key_fetch).await {
tracing::warn!(error = ?e, "TKA sync/send failed (local Authority already advanced)");
}
Ok(Some(SyncedTka {
authority: Arc::new(authority),
store,
oldest,
}))
}
fn parse_offer(head: &str, ancestors: &[String]) -> Result<SyncOffer, ts_tka::TkaError> {
let head = AumHash::from_base32(head).ok_or(ts_tka::TkaError::Decode("bad base32 head"))?;
let ancestors = ancestors
.iter()
.map(|a| AumHash::from_base32(a).ok_or(ts_tka::TkaError::Decode("bad base32 ancestor")))
.collect::<Result<Vec<_>, _>>()?;
Ok(SyncOffer { head, ancestors })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_offer_roundtrips_base32() {
let h0 = AumHash([0x11; 32]);
let h1 = AumHash([0x22; 32]);
let h2 = AumHash([0x33; 32]);
let offer = parse_offer(&h0.to_base32(), &[h1.to_base32(), h2.to_base32()]).expect("parse");
assert_eq!(offer.head, h0);
assert_eq!(offer.ancestors, vec![h1, h2]);
}
#[test]
fn parse_offer_rejects_bad_base32() {
assert!(parse_offer("not valid base32!", &[]).is_err());
let good = AumHash([1u8; 32]).to_base32();
assert!(parse_offer(&good, &["@@@@".to_string()]).is_err());
}
#[test]
fn decode_aums_roundtrips_and_rejects_garbage() {
let aum = Aum {
message_kind: ts_tka::AumKind::NoOp,
prev_aum_hash: None,
key: None,
key_id: Vec::new(),
state: None,
votes: None,
meta: Vec::new(),
signatures: Vec::new(),
};
let good = aum.serialize();
let decoded = decode_aums(std::slice::from_ref(&good)).expect("decode");
assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].hash(), aum.hash());
assert!(decode_aums(&[good, vec![0xff, 0x00, 0x13]]).is_err());
}
}