use std::sync::Arc;
use ts_control::{
TkaSyncError, TkaSyncOfferRequest, TkaSyncSendRequest, tka_bootstrap, tka_sync_offer,
tka_sync_send,
};
use ts_tka::{Aum, AumHash, Authority, MemAumStore, SyncOffer, VerifiedAumChain};
pub(crate) struct SyncedTka {
pub authority: Arc<Authority>,
pub store: MemAumStore,
pub oldest: AumHash,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TkaLogEntry {
pub aum_hash: [u8; 32],
pub change: String,
pub signer_key_ids: Vec<Vec<u8>>,
pub raw: Vec<u8>,
}
pub(crate) fn tka_log_entries(
store: &MemAumStore,
oldest: AumHash,
limit: usize,
) -> Vec<TkaLogEntry> {
let chain = store.linear_chain_from(oldest).unwrap_or_default();
chain
.iter()
.rev() .take(limit)
.map(|aum| TkaLogEntry {
aum_hash: aum.hash().0,
change: aum.message_kind.as_str().to_string(),
signer_key_ids: aum.signatures.iter().map(|s| s.key_id.clone()).collect(),
raw: aum.serialize(),
})
.collect()
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TkaSyncDriverError {
#[error("TKA sync RPC failed: {0}")]
Rpc(#[from] TkaSyncError),
#[error("TKA chain verification failed: {0}")]
Chain(#[from] ts_tka::TkaError),
}
fn decode_aums(marshaled: &[Vec<u8>]) -> Result<Vec<Aum>, ts_tka::TkaError> {
marshaled.iter().map(|b| Aum::from_cbor(b)).collect()
}
fn rebuild_authority(store: &MemAumStore, oldest: AumHash) -> Result<Authority, ts_tka::TkaError> {
let chain = store.linear_chain_from(oldest)?;
let verified = VerifiedAumChain::verify(&chain)?;
Ok(Authority::from_verified_chain(verified))
}
pub(crate) async fn sync_tka(
config: &ts_control::Config,
keys: &ts_keys::NodeState,
current: Option<SyncedTka>,
) -> Result<Option<SyncedTka>, TkaSyncDriverError> {
let control_url = &config.server_url;
let allow_http_key_fetch = config.allow_http_key_fetch;
let (mut store, oldest, mut authority) = match current {
Some(s) => (s.store, s.oldest, (*s.authority).clone()),
None => {
let resp = tka_bootstrap(
control_url,
keys,
String::new(), allow_http_key_fetch,
)
.await?;
if resp.genesis_aum.is_empty() {
return Ok(None);
}
let genesis = Aum::from_cbor(&resp.genesis_aum)?;
let oldest = genesis.hash();
let mut store = MemAumStore::new();
store.insert(genesis);
let authority = rebuild_authority(&store, oldest)?;
(store, oldest, authority)
}
};
let local_offer = authority.sync_offer(&store, oldest)?;
let offer_req = TkaSyncOfferRequest {
version: Default::default(), node_key: keys.node_keys.public,
head: local_offer.head.to_base32(),
ancestors: local_offer
.ancestors
.iter()
.map(|a| a.to_base32())
.collect(),
};
let offer_resp = tka_sync_offer(control_url, keys, offer_req, allow_http_key_fetch).await?;
let control_offer = parse_offer(&offer_resp.head, &offer_resp.ancestors)?;
let received = decode_aums(&offer_resp.missing_aums)?;
for aum in &received {
store.insert(aum.clone());
}
let to_send = authority
.missing_aums(&store, &control_offer, oldest)
.unwrap_or_default();
authority = rebuild_authority(&store, oldest)?;
let send_req = TkaSyncSendRequest {
version: Default::default(),
node_key: keys.node_keys.public,
head: authority.head().to_base32(),
missing_aums: to_send.iter().map(Aum::serialize).collect(),
interactive: false,
};
if let Err(e) = tka_sync_send(control_url, keys, send_req, allow_http_key_fetch).await {
tracing::warn!(error = ?e, "TKA sync/send failed (local Authority already advanced)");
}
Ok(Some(SyncedTka {
authority: Arc::new(authority),
store,
oldest,
}))
}
fn parse_offer(head: &str, ancestors: &[String]) -> Result<SyncOffer, ts_tka::TkaError> {
let head = AumHash::from_base32(head).ok_or(ts_tka::TkaError::Decode("bad base32 head"))?;
let ancestors = ancestors
.iter()
.map(|a| AumHash::from_base32(a).ok_or(ts_tka::TkaError::Decode("bad base32 ancestor")))
.collect::<Result<Vec<_>, _>>()?;
Ok(SyncOffer { head, ancestors })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_offer_roundtrips_base32() {
let h0 = AumHash([0x11; 32]);
let h1 = AumHash([0x22; 32]);
let h2 = AumHash([0x33; 32]);
let offer = parse_offer(&h0.to_base32(), &[h1.to_base32(), h2.to_base32()]).expect("parse");
assert_eq!(offer.head, h0);
assert_eq!(offer.ancestors, vec![h1, h2]);
}
#[test]
fn parse_offer_rejects_bad_base32() {
assert!(parse_offer("not valid base32!", &[]).is_err());
let good = AumHash([1u8; 32]).to_base32();
assert!(parse_offer(&good, &["@@@@".to_string()]).is_err());
}
#[test]
fn decode_aums_roundtrips_and_rejects_garbage() {
let aum = Aum {
message_kind: ts_tka::AumKind::NoOp,
prev_aum_hash: None,
key: None,
key_id: Vec::new(),
state: None,
votes: None,
meta: Vec::new(),
signatures: Vec::new(),
};
let good = aum.serialize();
let decoded = decode_aums(std::slice::from_ref(&good)).expect("decode");
assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].hash(), aum.hash());
assert!(decode_aums(&[good, vec![0xff, 0x00, 0x13]]).is_err());
}
fn test_aum_key(seed: u8, votes: u32) -> ts_tka::AumKey {
use ed25519_dalek::SigningKey;
ts_tka::AumKey {
kind: ts_tka::KeyKind::Ed25519,
votes,
public: SigningKey::from_bytes(&[seed; 32])
.verifying_key()
.to_bytes()
.to_vec(),
meta: Vec::new(),
}
}
fn genesis_checkpoint(key: ts_tka::AumKey) -> Aum {
Aum {
message_kind: ts_tka::AumKind::Checkpoint,
prev_aum_hash: None,
key: None,
key_id: Vec::new(),
state: Some(ts_tka::AumState {
last_aum_hash: None,
disablement_values: Some(vec![vec![0x11; 32]]),
keys: Some(vec![key]),
state_id1: 0,
state_id2: 0,
}),
votes: None,
meta: Vec::new(),
signatures: Vec::new(),
}
}
fn add_key_child(parent: &Aum, key: ts_tka::AumKey) -> Aum {
Aum {
message_kind: ts_tka::AumKind::AddKey,
prev_aum_hash: Some(parent.hash()),
key: Some(key),
key_id: Vec::new(),
state: None,
votes: None,
meta: Vec::new(),
signatures: Vec::new(),
}
}
#[test]
fn tka_log_entries_head_first_with_fields() {
let g = genesis_checkpoint(test_aum_key(1, 1));
let a1 = add_key_child(&g, test_aum_key(2, 1));
let a2 = add_key_child(&a1, test_aum_key(3, 1));
let mut store = MemAumStore::new();
store.insert(a1.clone());
store.insert(a2.clone());
store.insert(g.clone());
let log = tka_log_entries(&store, g.hash(), 100);
let got_hashes: Vec<[u8; 32]> = log.iter().map(|e| e.aum_hash).collect();
assert_eq!(
got_hashes,
vec![a2.hash().0, a1.hash().0, g.hash().0],
"log must be head-first (a2, a1, genesis)"
);
let changes: Vec<&str> = log.iter().map(|e| e.change.as_str()).collect();
assert_eq!(changes, vec!["add-key", "add-key", "checkpoint"]);
assert_eq!(log[2].aum_hash, g.hash().0);
for (entry, aum) in log.iter().zip([&a2, &a1, &g]) {
let decoded = Aum::from_cbor(&entry.raw).expect("raw is canonical AUM CBOR");
assert_eq!(&decoded, aum, "raw must decode back to the source AUM");
}
}
#[test]
fn tka_log_entries_limit_truncates_from_head() {
let g = genesis_checkpoint(test_aum_key(1, 1));
let a1 = add_key_child(&g, test_aum_key(2, 1));
let a2 = add_key_child(&a1, test_aum_key(3, 1));
let store = MemAumStore::from_aums([g.clone(), a1.clone(), a2.clone()]);
let log = tka_log_entries(&store, g.hash(), 2);
assert_eq!(log.len(), 2, "limit caps the row count");
assert_eq!(
log.iter().map(|e| e.aum_hash).collect::<Vec<_>>(),
vec![a2.hash().0, a1.hash().0],
"limit keeps the newest entries (head-first)"
);
assert!(tka_log_entries(&store, g.hash(), 0).is_empty());
}
#[test]
fn tka_log_entries_extracts_signer_key_ids() {
use ed25519_dalek::SigningKey;
let mut g = genesis_checkpoint(test_aum_key(1, 1));
let sk = SigningKey::from_bytes(&[1u8; 32]);
g.sign(&sk);
let signer_id = sk.verifying_key().to_bytes().to_vec();
let store = MemAumStore::from_aums([g.clone()]);
let log = tka_log_entries(&store, g.hash(), 100);
assert_eq!(log.len(), 1);
assert_eq!(
log[0].signer_key_ids,
vec![signer_id],
"signer_key_ids carries each signature's key_id"
);
let unsigned = genesis_checkpoint(test_aum_key(2, 1));
let store2 = MemAumStore::from_aums([unsigned.clone()]);
assert!(
tka_log_entries(&store2, unsigned.hash(), 100)[0]
.signer_key_ids
.is_empty()
);
}
#[test]
fn tka_log_entries_unwalkable_store_is_empty() {
let empty = MemAumStore::new();
assert!(tka_log_entries(&empty, AumHash([0u8; 32]), 100).is_empty());
let g = genesis_checkpoint(test_aum_key(1, 1));
let store = MemAumStore::from_aums([g]);
assert!(tka_log_entries(&store, AumHash([0xEE; 32]), 100).is_empty());
}
}