use calimero_crypto::Nonce;
use calimero_network_primitives::stream::Stream;
use calimero_node_primitives::sync::{InitPayload, MessagePayload, StreamMessage};
use calimero_primitives::context::Context;
use calimero_primitives::identity::PublicKey;
use ed25519_dalek::Signature;
use eyre::{bail, OptionExt};
use rand::{thread_rng, Rng};
use tracing::{debug, info};
use super::manager::SyncManager;
use super::tracking::Sequencer;
pub const CHALLENGE_DOMAIN: [u8; 38] = *b"CALIMERO_KEY_SHARE_CHALLENGE_HANDSHAKE";
impl SyncManager {
pub(super) async fn initiate_key_share_process(
&self,
context: &mut Context,
our_identity: PublicKey,
stream: &mut Stream,
) -> eyre::Result<()> {
info!(
context_id=%context.id,
our_identity=%our_identity,
"Initiating key share",
);
let our_nonce = thread_rng().gen::<Nonce>();
self.send(
stream,
&StreamMessage::Init {
context_id: context.id,
party_id: our_identity,
payload: InitPayload::KeyShare,
next_nonce: our_nonce,
},
None,
)
.await?;
let Some(ack) = self.recv(stream, None).await? else {
bail!("connection closed while awaiting state sync handshake");
};
let their_identity = match ack {
StreamMessage::Init {
party_id,
payload: InitPayload::KeyShare,
..
} => party_id,
unexpected @ (StreamMessage::Init { .. }
| StreamMessage::Message { .. }
| StreamMessage::OpaqueError) => {
bail!("unexpected message: {:?}", unexpected)
}
};
let is_initiator = <PublicKey as AsRef<[u8; 32]>>::as_ref(&our_identity)
> <PublicKey as AsRef<[u8; 32]>>::as_ref(&their_identity);
debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
is_initiator=%is_initiator,
"Determined role via deterministic comparison (prevents deadlock)"
);
self.bidirectional_key_share(context, our_identity, their_identity, stream, is_initiator)
.await
}
pub(super) async fn handle_key_share_request(
&self,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
_their_nonce: Nonce,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
their_identity=%their_identity,
"Received key share request",
);
let our_nonce = thread_rng().gen::<Nonce>();
self.send(
stream,
&StreamMessage::Init {
context_id: context.id,
party_id: our_identity,
payload: InitPayload::KeyShare,
next_nonce: our_nonce,
},
None,
)
.await?;
let is_initiator = <PublicKey as AsRef<[u8; 32]>>::as_ref(&our_identity)
> <PublicKey as AsRef<[u8; 32]>>::as_ref(&their_identity);
debug!(
context_id=%context.id,
is_initiator=%is_initiator,
"Determined role via deterministic comparison (consistent with peer)"
);
self.bidirectional_key_share(context, our_identity, their_identity, stream, is_initiator)
.await
}
async fn bidirectional_key_share(
&self,
context: &Context,
our_identity: PublicKey,
their_identity: PublicKey,
stream: &mut Stream,
is_initiator: bool,
) -> eyre::Result<()> {
debug!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity,
is_initiator=%is_initiator,
"Starting bidirectional key share with challenge-response authentication",
);
let mut their_identity_record = self
.context_client
.get_identity(&context.id, &their_identity)?
.ok_or_eyre("expected peer identity to exist")?;
let (our_private_key, sender_key) = self
.context_client
.get_identity(&context.id, &our_identity)?
.and_then(|i| Some((i.private_key?, i.sender_key?)))
.ok_or_eyre("expected own identity to have private & sender keys")?;
let our_nonce = thread_rng().gen::<Nonce>();
let mut sqx_out = Sequencer::default();
let mut sqx_in = Sequencer::default();
if is_initiator {
let challenge: [u8; 32] = thread_rng().gen();
debug!(
context_id=%context.id,
their_identity=%their_identity,
"Sending authentication challenge to peer (initiator)"
);
self.send(
stream,
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::Challenge { challenge },
next_nonce: our_nonce,
},
None,
)
.await?;
let Some(msg) = self.recv(stream, None).await? else {
bail!("connection closed while awaiting challenge response");
};
let (sequence_id, their_signature_bytes) = match msg {
StreamMessage::Message {
sequence_id,
payload: MessagePayload::ChallengeResponse { signature },
..
} => (sequence_id, signature),
unexpected => {
bail!("expected ChallengeResponse, got {:?}", unexpected)
}
};
sqx_in.expect(sequence_id)?;
let mut peer_payload = CHALLENGE_DOMAIN.to_vec();
peer_payload.extend_from_slice(&challenge);
let their_signature = Signature::from_bytes(&their_signature_bytes);
their_identity
.verify(&peer_payload, &their_signature)
.map_err(|e| eyre::eyre!("Peer failed to prove identity ownership: {}", e))?;
info!(
context_id=%context.id,
their_identity=%their_identity,
"Peer successfully authenticated via challenge-response"
);
let Some(msg) = self.recv(stream, None).await? else {
bail!("connection closed while awaiting challenge");
};
let (sequence_id, their_challenge) = match msg {
StreamMessage::Message {
sequence_id,
payload: MessagePayload::Challenge { challenge },
..
} => (sequence_id, challenge),
unexpected => {
bail!("expected Challenge, got {:?}", unexpected)
}
};
sqx_in.expect(sequence_id)?;
let mut payload = CHALLENGE_DOMAIN.to_vec();
payload.extend_from_slice(&their_challenge);
let our_signature = our_private_key.sign(&payload)?;
debug!(
context_id=%context.id,
our_identity=%our_identity,
"Sending authentication response to peer (initiator)"
);
self.send(
stream,
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::ChallengeResponse {
signature: our_signature.to_bytes(),
},
next_nonce: our_nonce,
},
None,
)
.await?;
} else {
let Some(msg) = self.recv(stream, None).await? else {
bail!("connection closed while awaiting challenge");
};
let (sequence_id, their_challenge) = match msg {
StreamMessage::Message {
sequence_id,
payload: MessagePayload::Challenge { challenge },
..
} => (sequence_id, challenge),
unexpected => {
bail!("expected Challenge, got {:?}", unexpected)
}
};
sqx_in.expect(sequence_id)?;
let mut payload = CHALLENGE_DOMAIN.to_vec();
payload.extend_from_slice(&their_challenge);
let our_signature = our_private_key.sign(&payload)?;
debug!(
context_id=%context.id,
our_identity=%our_identity,
"Sending authentication response to peer (responder)"
);
self.send(
stream,
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::ChallengeResponse {
signature: our_signature.to_bytes(),
},
next_nonce: our_nonce,
},
None,
)
.await?;
let challenge: [u8; 32] = thread_rng().gen();
debug!(
context_id=%context.id,
their_identity=%their_identity,
"Sending authentication challenge to peer (responder)"
);
self.send(
stream,
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::Challenge { challenge },
next_nonce: our_nonce,
},
None,
)
.await?;
let Some(msg) = self.recv(stream, None).await? else {
bail!("connection closed while awaiting challenge response");
};
let (sequence_id, their_signature_bytes) = match msg {
StreamMessage::Message {
sequence_id,
payload: MessagePayload::ChallengeResponse { signature },
..
} => (sequence_id, signature),
unexpected => {
bail!("expected ChallengeResponse, got {:?}", unexpected)
}
};
sqx_in.expect(sequence_id)?;
let mut peer_payload = CHALLENGE_DOMAIN.to_vec();
peer_payload.extend_from_slice(&challenge);
let their_signature = Signature::from_bytes(&their_signature_bytes);
their_identity
.verify(&peer_payload, &their_signature)
.map_err(|e| eyre::eyre!("Peer failed to prove identity ownership: {}", e))?;
info!(
context_id=%context.id,
their_identity=%their_identity,
"Peer successfully authenticated via challenge-response"
);
}
if is_initiator {
self.send(
stream,
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::KeyShare { sender_key },
next_nonce: our_nonce,
},
None,
)
.await?;
let Some(msg) = self.recv(stream, None).await? else {
bail!("connection closed while awaiting key share");
};
let (sequence_id, peer_sender_key) = match msg {
StreamMessage::Message {
sequence_id,
payload: MessagePayload::KeyShare { sender_key },
..
} => (sequence_id, sender_key),
unexpected => {
bail!("expected KeyShare, got {:?}", unexpected)
}
};
sqx_in.expect(sequence_id)?;
their_identity_record.sender_key = Some(peer_sender_key);
} else {
let Some(msg) = self.recv(stream, None).await? else {
bail!("connection closed while awaiting key share");
};
let (sequence_id, peer_sender_key) = match msg {
StreamMessage::Message {
sequence_id,
payload: MessagePayload::KeyShare { sender_key },
..
} => (sequence_id, sender_key),
unexpected => {
bail!("expected KeyShare, got {:?}", unexpected)
}
};
sqx_in.expect(sequence_id)?;
their_identity_record.sender_key = Some(peer_sender_key);
self.send(
stream,
&StreamMessage::Message {
sequence_id: sqx_out.next(),
payload: MessagePayload::KeyShare { sender_key },
next_nonce: our_nonce,
},
None,
)
.await?;
}
self.context_client
.update_identity(&context.id, &their_identity_record)?;
info!(
context_id=%context.id,
our_identity=%our_identity,
their_identity=%their_identity_record.public_key,
"Key share completed with mutual authentication",
);
Ok(())
}
}