use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use crate::boxes::{ByteBox, OpenBox};
use crate::crypto::{KeyPair, AuthToken, PublicKey};
use crate::errors::{SignalingError, SignalingResult, SaltyError, ValidationError};
use rmpv::{Value};
use rust_sodium::crypto::box_;
pub(crate) mod context;
pub(crate) mod cookie;
pub(crate) mod csn;
pub(crate) mod messages;
pub(crate) mod nonce;
pub(crate) mod send_error;
pub(crate) mod state;
pub(crate) mod types;
#[cfg(test)] mod tests;
use crate::{Event, CloseCode};
use crate::tasks::{Tasks, BoxedTask, TaskMessage};
use self::context::{PeerContext, ServerContext, InitiatorContext, ResponderContext};
pub(crate) use self::cookie::{Cookie};
use self::messages::{
Message, ServerHello, ServerAuth, ClientHello, ClientAuth,
NewInitiator, NewResponder, DropResponder, DropReason, Disconnected,
SendError, Token, Key, Auth, InitiatorAuthBuilder, ResponderAuthBuilder, Close,
};
pub(crate) use self::nonce::{Nonce};
pub use self::types::Role;
pub(crate) use self::types::{HandleAction};
use self::types::{Identity, ClientIdentity, Address};
use self::state::{
SignalingState, ServerHandshakeState,
InitiatorHandshakeState, ResponderHandshakeState,
};
pub(crate) trait Signaling {
fn common(&self) -> &Common;
fn common_mut(&mut self) -> &mut Common;
fn server(&self) -> &ServerContext {
&self.common().server
}
fn server_mut(&mut self) -> &mut ServerContext {
&mut self.common_mut().server
}
fn identity(&self) -> ClientIdentity {
self.common().identity
}
fn role(&self) -> Role {
self.common().role
}
fn auth_token(&self) -> Option<&AuthToken> {
if let Some(AuthProvider::Token(ref token)) = self.common().auth_provider {
Some(token)
} else {
None
}
}
fn server_handshake_state(&self) -> ServerHandshakeState {
self.server().handshake_state()
}
fn validate_nonce(&mut self, nonce: &Nonce) -> Result<(), ValidationError> {
self.validate_nonce_destination(nonce)?;
self.validate_nonce_source(nonce)?;
self.validate_nonce_csn(nonce)?;
self.validate_nonce_cookie(nonce)?;
Ok(())
}
fn validate_repeated_cookie(&self, repeated_cookie: &Cookie,
our_cookie: &Cookie, identity: Identity)
-> Result<(), SignalingError> {
if repeated_cookie != our_cookie {
debug!("Our cookie: {:?}", our_cookie);
debug!("Their cookie: {:?}", repeated_cookie);
return Err(SignalingError::Protocol(
format!("Repeated cookie in auth message from {} does not match our cookie", identity)
))
}
Ok(())
}
fn get_peer(&self) -> Option<&dyn PeerContext>;
fn get_peer_with_address_mut(&mut self, addr: Address) -> Option<&mut dyn PeerContext>;
fn initiator_pubkey(&self) -> &PublicKey;
fn current_peer_sequence_numbers(&self) -> Option<csn::PeerSequenceNumbers> {
self.get_peer()
.map(|peer: &dyn PeerContext| peer.csn_pair())
.map(|csn_pair_lock| csn_pair_lock.read().expect("CSN pair rwlock is poisoned"))
.and_then(|csn_pair| match csn_pair.theirs {
Some(ref theirs) => Some(csn::PeerSequenceNumbers {
outgoing: csn_pair.ours.combined_sequence_number(),
incoming: theirs.combined_sequence_number(),
}),
None => None,
})
}
fn validate_nonce_destination(&mut self, nonce: &Nonce) -> Result<(), ValidationError>;
fn validate_nonce_source(&mut self, nonce: &Nonce) -> Result<(), ValidationError>;
fn validate_nonce_csn(&mut self, nonce: &Nonce) -> Result<(), ValidationError> {
let role = self.role();
let peer: &mut dyn PeerContext = self.get_peer_with_address_mut(nonce.source()).ok_or_else(|| {
if role == Role::Initiator && nonce.source().is_responder() {
ValidationError::DropMsg(format!("Could not find responder with address {}", nonce.source()))
} else {
ValidationError::Crash("Got message from invalid sender that wasn't dropped".into())
}
})?;
let peer_identity = peer.identity();
let mut csn_pair = peer.csn_pair().try_write()?;
if let Some(ref mut csn) = csn_pair.theirs {
let previous = csn;
let current = nonce.csn();
if current < previous {
let msg = format!("The {} CSN is lower than last time", peer_identity);
return Err(ValidationError::Fail(msg));
} else if current == previous {
let msg = format!("The {} CSN hasn't been incremented", peer_identity);
return Err(ValidationError::Fail(msg));
} else {
*previous = current.clone();
}
}
if csn_pair.theirs.is_none() {
if nonce.csn().overflow_number() != 0 {
let msg = format!("First message from {} must have set the overflow number to 0", peer.identity());
return Err(ValidationError::Fail(msg));
}
csn_pair.theirs = Some(nonce.csn().clone());
}
Ok(())
}
fn validate_nonce_cookie(&mut self, nonce: &Nonce) -> Result<(), ValidationError> {
let role = self.role();
let peer: &mut dyn PeerContext = self.get_peer_with_address_mut(nonce.source()).ok_or_else(|| {
if role == Role::Initiator && nonce.source().is_responder() {
ValidationError::DropMsg(format!("Could not find responder with address {}", nonce.source()))
} else {
ValidationError::Crash("Got message from invalid sender that wasn't dropped".into())
}
})?;
let peer_identity = peer.identity();
let cookie_pair = peer.cookie_pair_mut();
match cookie_pair.theirs {
None => {
if *nonce.cookie() == cookie_pair.ours {
Err(ValidationError::Fail(
format!("Cookie from {} is identical to our own cookie", peer_identity)
))
} else {
cookie_pair.theirs = Some(nonce.cookie().clone());
Ok(())
}
},
Some(ref cookie) => {
if nonce.cookie() != cookie {
Err(ValidationError::Fail(
format!("Cookie from {} has changed", peer_identity)
))
} else {
Ok(())
}
},
}
}
fn handle_message(&mut self, bbox: ByteBox) -> SignalingResult<Vec<HandleAction>> {
trace!("handle_message");
match self.validate_nonce(&bbox.nonce) {
Ok(_) => {},
Err(ValidationError::DropMsg(warning)) => {
warn!("Invalid nonce: {}", warning);
return Ok(vec![]);
},
Err(ValidationError::Fail(reason)) =>
return Err(SignalingError::InvalidNonce(reason)),
Err(ValidationError::Crash(reason)) =>
return Err(SignalingError::Crash(reason)),
};
if bbox.nonce.source().is_server() {
let nonce_unsafe_clone = unsafe { bbox.nonce.clone() };
let obox: OpenBox<Message> = self.decode_server_message(bbox)?;
let nonce_clone_opt = if obox.message.get_type() == "server-auth" {
Some(nonce_unsafe_clone)
} else {
None
};
self.handle_server_message(obox, nonce_clone_opt)
} else {
match self.common().signaling_state() {
SignalingState::ServerHandshake => self.handle_handshake_peer_message(bbox),
SignalingState::PeerHandshake => self.handle_handshake_peer_message(bbox),
SignalingState::Task => self.handle_task_peer_message(bbox),
}
}
}
fn handle_handshake_peer_message(&mut self, bbox: ByteBox) -> SignalingResult<Vec<HandleAction>> {
trace!("handle_handshake_peer_message");
if bbox.nonce.source().is_server() {
return Err(SignalingError::Crash(
"Message in handle_handshake_peer_message is from server!".into()
));
}
let obox: OpenBox<Message> = {
let source_address = bbox.nonce.source();
match self.decode_peer_message(bbox) {
Ok(obox) => obox,
Err(SignalingError::InitiatorCouldNotDecrypt) => {
let drop_responder = self.send_drop_responder(
source_address,
DropReason::InitiatorCouldNotDecrypt,
)?;
debug!("<-- Enqueuing drop-responder to {}", self.server().identity());
return Ok(vec![drop_responder]);
},
Err(e) => return Err(e),
}
};
match self.common().signaling_state() {
SignalingState::ServerHandshake =>
Err(SignalingError::Crash("Illegal signaling state: ServerHandshake".into())),
SignalingState::PeerHandshake if obox.nonce.source().is_server() =>
self.handle_server_message(obox, None),
SignalingState::PeerHandshake =>
self.handle_peer_message(obox),
SignalingState::Task =>
Err(SignalingError::Crash("Illegal signaling state: Task".into())),
}
}
fn handle_task_peer_message(&mut self, bbox: ByteBox) -> SignalingResult<Vec<HandleAction>> {
trace!("handle_task_peer_message");
if bbox.nonce.source().is_server() {
return Err(SignalingError::Crash(
"Message in handle_task_peer_message is from server!".into()
));
}
let obox: OpenBox<Value> = self.decode_task_message(bbox)?;
let mut map: HashMap<String, Value> = HashMap::new();
match obox.message {
Value::Map(pairs) => {
for (k, v) in pairs {
let key = k.as_str().ok_or_else(|| SignalingError::InvalidMessage(
"Task message map contains non-hashable key".into()
))?;
map.insert(key.into(), v);
}
},
_ => return Err(SignalingError::InvalidMessage("Task message is not a map".into())),
};
let msg_type = map.get("type")
.ok_or_else(|| SignalingError::InvalidMessage("Task message does not contain type field".into()))?
.as_str()
.ok_or_else(|| SignalingError::InvalidMessage("Task message type is not a string".into()))?
.to_owned();
debug!("Received {} message from peer", msg_type);
if msg_type == "application" {
let data: Value = map.get("data")
.ok_or_else(|| SignalingError::InvalidMessage("Application message does not contain a data field".into()))?
.to_owned();
return Ok(vec![HandleAction::TaskMessage(TaskMessage::Application(data))]);
}
if msg_type == "close" {
let reason: CloseCode = map.get("reason")
.ok_or_else(|| SignalingError::InvalidMessage("Close message does not contain a reason field".into()))?
.as_u64()
.ok_or_else(|| SignalingError::InvalidMessage("Close message reason is not an integer".into()))
.and_then(|val: u64| {
if val > u64::from(::std::u16::MAX) {
Err(SignalingError::InvalidMessage("Close message reason code is too large".into()))
} else {
Ok(val as u16)
}
})
.map(CloseCode::from_number)?;
return Ok(vec![HandleAction::TaskMessage(TaskMessage::Close(reason))]);
}
let task_supported_types = self.common()
.task_supported_types
.ok_or_else(|| SignalingError::Crash("Task supported types not set".into()))?;
if task_supported_types.iter().any(|t| *t == msg_type) {
return Ok(vec![HandleAction::TaskMessage(TaskMessage::Value(map))])
}
warn!("Received task message with unsupported type: {}. Ignoring.", msg_type);
Ok(vec![])
}
fn decode_server_message(&self, bbox: ByteBox) -> SignalingResult<OpenBox<Message>> {
if self.common().signaling_state() == SignalingState::ServerHandshake
&& self.server_handshake_state() == ServerHandshakeState::New {
return OpenBox::decode(bbox);
}
match self.server().session_key {
Some(ref pubkey) => OpenBox::<Message>::decrypt(bbox, &self.common().permanent_keypair, pubkey),
None => Err(SignalingError::Crash("Missing server session key".into())),
}
}
fn decode_peer_message(&self, bbox: ByteBox) -> SignalingResult<OpenBox<Message>>;
fn decode_task_message(&self, bbox: ByteBox) -> SignalingResult<OpenBox<Value>> {
let peer = self.get_peer()
.ok_or_else(|| SignalingError::Crash("Peer not set".into()))?;
let session_key = peer.session_key()
.ok_or_else(|| SignalingError::Crash("Peer session key not set".into()))?;
OpenBox::<Value>::decrypt(
bbox,
peer.keypair().ok_or_else(|| SignalingError::Crash("Peer session keypair not available".into()))?,
session_key,
)
}
fn encode_task_message(&self, value: Value) -> SignalingResult<ByteBox> {
let signaling_state = self.common().signaling_state();
if signaling_state != SignalingState::Task {
return Err(SignalingError::Crash(
format!("Called encode_task_message in state {:?}", signaling_state)
));
}
let peer = self.get_peer()
.ok_or_else(|| SignalingError::Crash("Peer not set".into()))?;
let nonce = Nonce::new(
peer.cookie_pair().ours.clone(),
self.common().identity.into(),
peer.identity().into(),
peer.csn_pair().try_write()?.ours.increment()?,
);
let obox = OpenBox::<Value>::new(value, nonce);
let bbox = obox.encrypt(
peer.keypair().ok_or_else(|| SignalingError::Crash("Session keypair not available".into()))?,
peer.session_key().ok_or_else(|| SignalingError::Crash("Peer session key not set".into()))?,
);
Ok(bbox)
}
fn encode_close_message(
&self,
reason: CloseCode,
peer_ctx: Option<&dyn PeerContext>,
) -> SignalingResult<ByteBox> {
let peer = match peer_ctx {
Some(p) => p,
None => {
let signaling_state = self.common().signaling_state();
if signaling_state != SignalingState::Task {
return Err(SignalingError::Crash(
format!("Called encode_close_message in state {:?}", signaling_state)
));
}
self.get_peer()
.ok_or_else(|| SignalingError::Crash("Peer not set".into()))?
},
};
let nonce = Nonce::new(
peer.cookie_pair().ours.clone(),
self.common().identity.into(),
peer.identity().into(),
peer.csn_pair().try_write()?.ours.increment()?,
);
let msg = Close::from_close_code(reason).into_message();
let obox = OpenBox::<Message>::new(msg, nonce);
let bbox = obox.encrypt(
peer.keypair().ok_or_else(|| SignalingError::Crash("Session keypair not available".into()))?,
peer.session_key().ok_or_else(|| SignalingError::Crash("Peer session key not set".into()))?,
);
Ok(bbox)
}
fn handle_server_message(&mut self, obox: OpenBox<Message>, nonce_clone: Option<Nonce>) -> SignalingResult<Vec<HandleAction>> {
let old_state = self.server_handshake_state();
match (old_state, obox.message) {
(ServerHandshakeState::New, Message::ServerHello(msg)) =>
self.handle_server_hello(msg),
(ServerHandshakeState::ClientInfoSent, Message::ServerAuth(msg)) =>
self.handle_server_auth(msg, nonce_clone),
(ServerHandshakeState::Done, Message::NewInitiator(msg)) =>
self.handle_new_initiator(msg),
(ServerHandshakeState::Done, Message::NewResponder(msg)) =>
self.handle_new_responder(msg),
(ServerHandshakeState::Done, Message::DropResponder(_msg)) =>
unimplemented!("TODO (#36): Handling DropResponder messages not yet implemented"),
(ServerHandshakeState::Done, Message::SendError(msg)) =>
self.handle_send_error(msg),
(ServerHandshakeState::Done, Message::Disconnected(msg)) =>
self.handle_disconnected(msg),
(s, message) => Err(SignalingError::InvalidStateTransition(
format!("Got '{}' message from server in {:?} state", message.get_type(), s)
)),
}
}
fn handle_peer_message(&mut self, obox: OpenBox<Message>) -> SignalingResult<Vec<HandleAction>>;
fn handle_server_hello(&mut self, msg: ServerHello) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received server-hello from server");
let mut actions = Vec::with_capacity(2);
trace!("Server session key is {:?}", msg.key);
if self.server().session_key.is_some() {
return Err(SignalingError::Protocol(
"Got a server-hello message, but server session key is already set".to_string()
));
}
self.common_mut().server.session_key = Some(msg.key);
if self.role() == Role::Responder {
let client_hello = {
let key = self.common().permanent_keypair.public_key();
ClientHello::new(*key).into_message()
};
let client_hello_nonce = Nonce::new(
self.server().cookie_pair().ours.clone(),
self.common().identity.into(),
self.server().identity().into(),
self.server().csn_pair().try_write()?.ours.increment()?,
);
let reply = OpenBox::<Message>::new(client_hello, client_hello_nonce);
debug!("<-- Enqueuing client-hello to server");
actions.push(HandleAction::Reply(reply.encode()));
}
let ping_interval = self.common()
.ping_interval
.map(|duration| duration.as_secs())
.map(|secs| if secs > u64::from(::std::u32::MAX) {
warn!("Ping interval is too large. Truncating it to {} seconds.", ::std::u32::MAX);
::std::u32::MAX
} else {
secs as u32
})
.unwrap_or(0u32);
match ping_interval {
0 => debug!("Requesting WebSocket ping messages to be disabled"),
n => debug!("Requesting WebSocket ping messages every {}s", n),
};
let client_auth = ClientAuth {
your_cookie: self.server().cookie_pair().theirs.clone().unwrap(),
subprotocols: vec![crate::SUBPROTOCOL.into()],
ping_interval,
your_key: self.server().permanent_key().cloned(),
}.into_message();
let client_auth_nonce = Nonce::new(
self.server().cookie_pair().ours.clone(),
self.identity().into(),
self.server().identity().into(),
self.server().csn_pair().try_write()?.ours.increment()?,
);
let reply = OpenBox::<Message>::new(client_auth, client_auth_nonce);
match self.server().session_key {
Some(ref pubkey) => {
debug!("<-- Enqueuing client-auth to server");
actions.push(HandleAction::Reply(reply.encrypt(&self.common().permanent_keypair, pubkey)));
},
None => return Err(SignalingError::Crash("Missing server permanent key".into())),
};
self.server_mut().set_handshake_state(ServerHandshakeState::ClientInfoSent);
Ok(actions)
}
fn handle_server_auth(&mut self, msg: ServerAuth, nonce_clone: Option<Nonce>) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received server-auth from server");
if self.identity() == ClientIdentity::Unknown {
return Err(SignalingError::Crash(
"No identity assigned when receiving server-auth message".into()
));
}
self.validate_repeated_cookie(
&msg.your_cookie,
&self.server().cookie_pair().ours,
self.server().identity(),
)?;
if let Some(server_public_permanent_key) = self.server().permanent_key() {
let nonce = nonce_clone.ok_or_else(|| SignalingError::Crash(
"This is a server-auth message, but no nonce clone was passed in".into()
))?;
let signed_keys = msg.signed_keys.as_ref().ok_or_else(|| SignalingError::Protocol(
"Server's public permanent key is known, but server did not send signed keys".into()
))?;
let decrypted = signed_keys.decrypt(
&self.common().permanent_keypair,
server_public_permanent_key,
nonce,
)?;
let server_public_session_key = self.server().session_key()
.ok_or_else(|| SignalingError::Crash("Server session key not set".into()))?;
if &decrypted.server_public_session_key != server_public_session_key {
return Err(SignalingError::Protocol("Server public session key sent in `signed_keys` is not valid".into()));
}
if &decrypted.client_public_permanent_key != self.common().permanent_keypair.public_key() {
return Err(SignalingError::Protocol("Our public permanent key sent in `signed_keys` is not valid".into()));
}
} else if msg.signed_keys.is_some() {
warn!("Server sent signed keys, but we're not verifying them");
}
let actions = self.handle_server_auth_impl(&msg)?;
info!("Server handshake completed");
self.server_mut().set_handshake_state(ServerHandshakeState::Done);
self.common_mut().set_signaling_state(SignalingState::PeerHandshake)?;
Ok(actions)
}
fn handle_server_auth_impl(&mut self, msg: &ServerAuth) -> SignalingResult<Vec<HandleAction>>;
fn handle_new_initiator(&mut self, msg: NewInitiator) -> SignalingResult<Vec<HandleAction>>;
fn handle_new_responder(&mut self, msg: NewResponder) -> SignalingResult<Vec<HandleAction>>;
fn handle_send_error(&mut self, msg: SendError) -> SignalingResult<Vec<HandleAction>> {
warn!("--> Received send-error from server");
debug!("Message that could not be relayed: {:#?}", msg.id);
Err(SignalingError::SendError)
}
fn handle_disconnected(&mut self, msg: Disconnected) -> SignalingResult<Vec<HandleAction>>;
fn send_drop_responder(&self, addr: Address, reason: DropReason) -> SignalingResult<HandleAction> {
if self.role() != Role::Initiator {
return Err(SignalingError::Crash(
"Non-initiator should never need to encode a DropResponder message".into()
));
}
let drop = DropResponder::with_reason(addr, reason).into_message();
let drop_nonce = Nonce::new(
self.server().cookie_pair.ours.clone(),
self.common().identity.into(),
self.server().identity().into(),
self.server().csn_pair().try_write()?.ours.increment()?,
);
let obox = OpenBox::<Message>::new(drop, drop_nonce);
let bbox = obox.encrypt(
&self.common().permanent_keypair,
self.server().session_key()
.ok_or_else(|| SignalingError::Crash("Server session key not set".into()))?
);
Ok(HandleAction::Reply(bbox))
}
fn encrypt_raw_with_session_keys(&self, data: &[u8], nonce: &box_::Nonce) -> SignalingResult<Vec<u8>> {
let peer = self.get_peer()
.ok_or_else(|| SignalingError::NoPeer)?;
let peer_session_public_key = peer.session_key()
.ok_or_else(|| SignalingError::Crash("Peer session public key not set".into()))?;
let our_session_private_key = peer.keypair()
.map(|keypair: &KeyPair| keypair.private_key())
.ok_or_else(|| SignalingError::Crash("Our session private key not set".into()))?;
Ok(box_::seal(data, nonce, peer_session_public_key, our_session_private_key))
}
fn decrypt_raw_with_session_keys(&self, data: &[u8], nonce: &box_::Nonce) -> SignalingResult<Vec<u8>> {
let peer = self.get_peer()
.ok_or_else(|| SignalingError::NoPeer)?;
let peer_session_public_key = peer.session_key()
.ok_or_else(|| SignalingError::Crash("Peer session public key not set".into()))?;
let our_session_private_key = peer.keypair()
.map(|keypair: &KeyPair| keypair.private_key())
.ok_or_else(|| SignalingError::Crash("Our session private key not set".into()))?;
box_::open(data, nonce, peer_session_public_key, our_session_private_key)
.map_err(|_| SignalingError::Crypto("Could not decrypt bytes".into()))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum AuthProvider {
Token(AuthToken),
TrustedKey(PublicKey)
}
pub(crate) struct Common {
signaling_state: SignalingState,
pub(crate) permanent_keypair: KeyPair,
pub(crate) auth_provider: Option<AuthProvider>,
pub(crate) role: Role,
pub(crate) identity: ClientIdentity,
pub(crate) server: ServerContext,
pub(crate) tasks: Option<Tasks>,
pub(crate) task: Option<Arc<Mutex<BoxedTask>>>,
pub(crate) task_supported_types: Option<&'static [&'static str]>,
pub(crate) ping_interval: Option<Duration>,
}
impl Common {
fn signaling_state(&self) -> SignalingState {
self.signaling_state
}
fn set_signaling_state(&mut self, state: SignalingState) -> SignalingResult<()> {
if self.signaling_state == state {
trace!("Ignoring signaling state transition: {:?} -> {:?}", self.signaling_state(), state);
return Ok(())
}
if !self.signaling_state.may_transition_to(state) {
return Err(SignalingError::InvalidStateTransition(
format!("Signaling state: {:?} -> {:?}", self.signaling_state(), state)
));
}
trace!("Signaling state transition: {:?} -> {:?}", self.signaling_state(), state);
self.signaling_state = state;
Ok(())
}
#[cfg(test)]
fn set_signaling_state_forced(&mut self, state: SignalingState) -> SignalingResult<()> {
trace!("Setting signaling state to {:?} for tests", state);
self.signaling_state = state;
Ok(())
}
}
pub(crate) struct ResponderCounter(u32);
impl ResponderCounter {
fn new() -> Self {
ResponderCounter(0)
}
fn increment(&mut self) -> SignalingResult<u32> {
let old_val = self.0;
self.0 = self.0.checked_add(1)
.ok_or_else(|| SignalingError::Crash("Overflow when incrementing responder counter".into()))?;
Ok(old_val)
}
}
pub(crate) struct InitiatorSignaling {
pub(crate) common: Common,
pub(crate) responders: HashMap<Address, ResponderContext>,
pub(crate) responder: Option<ResponderContext>,
pub(crate) responder_counter: ResponderCounter,
}
impl Signaling for InitiatorSignaling {
fn common(&self) -> &Common {
&self.common
}
fn common_mut(&mut self) -> &mut Common {
&mut self.common
}
fn get_peer(&self) -> Option<&dyn PeerContext> {
self.responder.as_ref().map(|p| p as &dyn PeerContext)
}
fn get_peer_with_address_mut(&mut self, addr: Address) -> Option<&mut dyn PeerContext> {
let identity: Identity = addr.into();
match identity {
Identity::Server => Some(&mut self.common.server as &mut dyn PeerContext),
Identity::Initiator => None,
Identity::Responder(_) => {
if self.common().signaling_state() == SignalingState::Task {
let peer = self.responder.as_mut().map(|p| p as &mut dyn PeerContext);
let valid = match peer {
Some(ref p) => {
let peer_addr: Address = p.identity().into();
peer_addr == addr
},
None => false,
};
if valid {
peer
} else {
None
}
} else {
self.responders.get_mut(&addr).map(|r| r as &mut dyn PeerContext)
}
}
}
}
fn initiator_pubkey(&self) -> &PublicKey {
self.common().permanent_keypair.public_key()
}
fn validate_nonce_destination(&mut self, nonce: &Nonce) -> Result<(), ValidationError> {
if self.identity() == ClientIdentity::Unknown
&& !nonce.destination().is_unknown()
&& self.server_handshake_state() != ServerHandshakeState::New {
if nonce.destination().is_initiator() {
self.common.identity = ClientIdentity::Initiator;
debug!("Assigned identity: {}", self.identity());
} else {
return Err(ValidationError::Fail(
format!("cannot assign address {} to initiator", nonce.destination())
));
};
}
if nonce.destination() != self.identity().into() {
return Err(ValidationError::Fail(
format!("Bad destination: {} (our identity is {})", nonce.destination(), self.identity())
));
}
Ok(())
}
fn validate_nonce_source(&mut self, nonce: &Nonce) -> Result<(), ValidationError> {
match nonce.source() {
Address(0x00) => Ok(()),
Address(0x01) => Err(ValidationError::DropMsg(
format!("Bad source: {} (our identity is {})", nonce.source(), self.identity())
)),
Address(0x02..=0xff) => {
if self.identity() == ClientIdentity::Initiator {
Ok(())
} else {
Err(ValidationError::DropMsg(
format!("Bad source: {} (our identity is {})", nonce.source(), self.identity())
))
}
},
}
}
fn decode_peer_message(&self, bbox: ByteBox) -> SignalingResult<OpenBox<Message>> {
if !bbox.nonce.source().is_responder() {
return Err(SignalingError::Crash("Received message from an initiator".to_string()));
}
let source = bbox.nonce.source();
let responder = match self.responders.get(&source) {
Some(responder) => responder,
None => return Err(SignalingError::Crash(
format!("Did not find responder with address {}", source)
)),
};
fn responder_permanent_key(responder: &ResponderContext) -> SignalingResult<&PublicKey> {
responder.permanent_key.as_ref()
.ok_or_else(|| SignalingError::Crash(
format!("Did not find public permanent key for responder {}", responder.address.0)
))
}
fn responder_session_key(responder: &ResponderContext) -> SignalingResult<&PublicKey> {
responder.session_key.as_ref()
.ok_or_else(|| SignalingError::Crash(
format!("Did not find public session key for responder {}", responder.address.0)
))
}
match responder.handshake_state() {
ResponderHandshakeState::New => {
debug!("Expect token message");
match self.common.auth_provider {
Some(AuthProvider::Token(ref token)) => OpenBox::decrypt_token(bbox, token),
Some(AuthProvider::TrustedKey(_)) => Err(SignalingError::Crash(
"Handshake state is \"New\" even though a trusted key is available".into()
)),
None => Err(SignalingError::Crash(
"Handshake state is \"New\" without an auth provider available".into()
)),
}
},
ResponderHandshakeState::TokenReceived => {
debug!("Expect key message");
OpenBox::<Message>::decrypt(
bbox,
&self.common.permanent_keypair,
responder_permanent_key(&responder)?
).map_err(|e| match e {
SignalingError::Decode(_) => {
warn!("Could not decrypt key message");
SignalingError::InitiatorCouldNotDecrypt
},
e => e,
})
},
ResponderHandshakeState::KeySent => {
OpenBox::<Message>::decrypt(bbox, &responder.keypair, responder_session_key(&responder)?)
},
other => {
Err(SignalingError::Crash(format!("Invalid responder handshake state: {:?}", other)))
},
}
}
fn handle_peer_message(&mut self, obox: OpenBox<Message>) -> SignalingResult<Vec<HandleAction>> {
let source = obox.nonce.source();
let old_state = {
let responder = self.responders.get(&source)
.ok_or_else(|| SignalingError::Crash(
format!("Did not find responder with address {}", source)
))?;
responder.handshake_state()
};
match (old_state, obox.message) {
(ResponderHandshakeState::New, Message::Token(msg)) => self.handle_token(msg, source),
(ResponderHandshakeState::TokenReceived, Message::Key(msg)) => self.handle_key(msg, source),
(ResponderHandshakeState::KeySent, Message::Auth(msg)) => self.handle_auth(msg, source),
(s, message) => Err(SignalingError::InvalidStateTransition(
format!("Got {} message from responder {} in {:?} state", message.get_type(), obox.nonce.source().0, s)
)),
}
}
fn handle_server_auth_impl(&mut self, msg: &ServerAuth) -> SignalingResult<Vec<HandleAction>> {
if msg.initiator_connected.is_some() {
return Err(SignalingError::InvalidMessage(
"We're the initiator, but the `initiator_connected` field in the server-auth message is set".into()
));
}
let responders = match msg.responders {
Some(ref responders) => responders,
None => return Err(SignalingError::InvalidMessage(
"`responders` field in server-auth message not set".into()
)),
};
let responders_set: HashSet<Address> = responders.iter().cloned().collect();
if responders_set.contains(&Address(0x00)) || responders_set.contains(&Address(0x01)) {
return Err(SignalingError::InvalidMessage(
"`responders` field in server-auth message may not contain addresses <0x02".into()
));
}
if responders.len() != responders_set.len() {
return Err(SignalingError::InvalidMessage(
"`responders` field in server-auth message may not contain duplicates".into()
));
}
let mut actions = vec![];
for address in responders_set {
if let Some(drop_responder) = self.process_new_responder(address)? {
actions.push(drop_responder);
}
}
actions.push(HandleAction::Event(Event::ServerHandshakeDone(responders.is_empty())));
Ok(actions)
}
fn handle_new_initiator(&mut self, _msg: NewInitiator) -> SignalingResult<Vec<HandleAction>> {
Err(SignalingError::Protocol("Received 'new-responder' message as initiator".into()))
}
fn handle_new_responder(&mut self, msg: NewResponder) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received new-responder ({}) from server", msg.id);
if !msg.id.is_responder() {
return Err(SignalingError::InvalidMessage(
"`id` field in new-responder message is not a valid responder address".into()
));
}
match self.process_new_responder(msg.id)? {
Some(drop_responder) => Ok(vec![drop_responder]),
None => Ok(vec![]),
}
}
fn handle_disconnected(&mut self, msg: Disconnected) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received disconnected from server");
if !msg.id.is_responder() {
return Err(SignalingError::Protocol(
"Received 'disconnected' message with non-responder id".into()
));
}
Ok(vec![HandleAction::Event(Event::Disconnected(msg.id.0))])
}
}
impl InitiatorSignaling {
pub(crate) fn new(permanent_keypair: KeyPair,
tasks: Tasks,
responder_trusted_pubkey: Option<PublicKey>,
server_public_permanent_key: Option<PublicKey>,
ping_interval: Option<Duration>) -> Self {
InitiatorSignaling {
common: Common {
signaling_state: SignalingState::ServerHandshake,
role: Role::Initiator,
identity: ClientIdentity::Unknown,
permanent_keypair,
auth_provider: Some(match responder_trusted_pubkey {
Some(key) => AuthProvider::TrustedKey(key),
None => AuthProvider::Token(AuthToken::new()),
}),
server: {
let mut ctx = ServerContext::new();
ctx.permanent_key = server_public_permanent_key;
ctx
},
tasks: Some(tasks),
task: None,
task_supported_types: None,
ping_interval,
},
responders: HashMap::new(),
responder: None,
responder_counter: ResponderCounter::new(),
}
}
#[cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
fn handle_token(&mut self, msg: Token, source: Address) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received token from {}", Identity::from(source));
{
let responder = self.responders.get_mut(&source)
.ok_or_else(|| SignalingError::Crash(
format!("Did not find responder with address {}", source)
))?;
if responder.permanent_key.is_some() {
return Err(SignalingError::Crash("Responder already has a permanent key set!".into()));
}
responder.permanent_key = Some(msg.key);
responder.set_handshake_state(ResponderHandshakeState::TokenReceived);
}
match self.common().auth_provider {
Some(AuthProvider::Token(_)) => {},
_ => return Err(SignalingError::Crash("Auth provider is not a token".into())),
}
self.common_mut().auth_provider = None;
Ok(vec![])
}
#[cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
fn handle_key(&mut self, msg: Key, source: Address) -> SignalingResult<Vec<HandleAction>> {
let source_identity = Identity::from(source);
debug!("--> Received key from {}", source_identity);
let responder = self.responders.get_mut(&source)
.ok_or_else(|| SignalingError::Crash(
format!("Did not find responder with address {}", source)
))?;
if responder.session_key.is_some() {
return Err(SignalingError::Crash("Responder already has a session key set!".into()));
}
match responder.permanent_key {
Some(pk) if pk == msg.key => {
return Err(SignalingError::Protocol("Responder session key and permanent key are equal".into()));
},
Some(_) => {},
None => {
return Err(SignalingError::Crash("Responder permanent key not set".into()));
}
};
responder.session_key = Some(msg.key);
responder.set_handshake_state(ResponderHandshakeState::KeyReceived);
let key: Message = Key { key: *responder.keypair.public_key() }.into_message();
let key_nonce = Nonce::new(
responder.cookie_pair().ours.clone(),
self.common.identity.into(),
responder.identity().into(),
responder.csn_pair().try_write()?.ours.increment()?,
);
let obox = OpenBox::<Message>::new(key, key_nonce);
let bbox = obox.encrypt(
&self.common.permanent_keypair,
responder.permanent_key.as_ref()
.ok_or_else(|| SignalingError::Crash("Responder permanent key not set".into()))?,
);
responder.set_handshake_state(ResponderHandshakeState::KeySent);
debug!("<-- Enqueuing key to {}", source_identity);
Ok(vec![HandleAction::Reply(bbox)])
}
fn handle_auth(&mut self, msg: Auth, source: Address) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received auth from {}", Identity::from(source));
let mut actions = vec![];
let mut responder = self.responders.remove(&source)
.ok_or_else(|| SignalingError::Crash(
format!("Did not find responder with address {}", source)
))?;
self.validate_repeated_cookie(
&msg.your_cookie,
&responder.cookie_pair().ours,
responder.identity(),
)?;
if msg.task.is_some() {
return Err(SignalingError::InvalidMessage("We're an initiator, but the `task` field in the auth message is set".into()));
}
let proposed_tasks = match msg.tasks {
None => return Err(SignalingError::InvalidMessage("The `tasks` field in the auth message is not set".into())),
Some(ref tasks) if tasks.is_empty() => return Err(SignalingError::InvalidMessage("The `tasks` field in the auth message is empty".into())),
Some(tasks) => tasks,
};
if msg.data.len() != proposed_tasks.len() {
return Err(SignalingError::InvalidMessage("The `tasks` and `data` fields in the auth message have a different number of entries".into()));
};
for task in &proposed_tasks {
if !msg.data.contains_key(task) {
return Err(SignalingError::InvalidMessage(format!("The task \"{}\" in the auth message does not have a corresponding data entry", task)));
}
}
let our_tasks = mem::replace(&mut self.common_mut().tasks, None)
.ok_or_else(|| SignalingError::Crash("No tasks defined".into()))?;
trace!("Our tasks: {:?}", &our_tasks);
trace!("Proposed tasks: {:?}", &proposed_tasks);
let mut chosen_task: BoxedTask = match our_tasks.choose_shared_task(&proposed_tasks) {
Some(task) => task,
None => {
let mut actions = vec![];
match self.encode_close_message(CloseCode::NoSharedTask, Some(&responder)) {
Ok(bbox) => actions.push(HandleAction::Reply(bbox)),
Err(e) => error!("Could not encode close message: {}", e),
};
actions.push(HandleAction::HandshakeError(SaltyError::NoSharedTask));
return Ok(actions);
},
};
let task_data = msg.data.get(&*chosen_task.name())
.ok_or_else(|| SignalingError::Crash("Task data not found".into()))?;
chosen_task.init(task_data)
.map_err(|e| SignalingError::TaskInitialization(format!("{}", e)))?;
info!("Responder {:#04x} authenticated", source.0);
if !self.responders.is_empty() {
info!("Dropping {} other responders", self.responders.len());
for addr in self.responders.keys() {
let drop_responder = self.send_drop_responder(*addr, DropReason::DroppedByInitiator)?;
debug!("<-- Enqueuing drop-responder to {}", self.server().identity());
actions.push(drop_responder);
}
self.responders.clear();
self.responders.shrink_to_fit();
}
responder.set_handshake_state(ResponderHandshakeState::AuthReceived);
let responder_cookie = responder.cookie_pair.theirs.as_ref().cloned()
.ok_or_else(|| SignalingError::Crash("Responder cookie not set".into()))?;
let auth: Message = InitiatorAuthBuilder::new(responder_cookie)
.set_task(chosen_task.name(), chosen_task.data())
.build()?
.into_message();
let auth_nonce = Nonce::new(
responder.cookie_pair().ours.clone(),
self.common.identity.into(),
responder.address,
responder.csn_pair().try_write()?.ours.increment()?,
);
let obox = OpenBox::<Message>::new(auth, auth_nonce);
let bbox = obox.encrypt(
&responder.keypair,
responder.session_key.as_ref()
.ok_or_else(|| SignalingError::Crash("Responder session key not set".into()))?,
);
debug!("<-- Enqueuing auth to {}", &responder.identity());
actions.push(HandleAction::Reply(bbox));
self.common_mut().task_supported_types = Some(chosen_task.supported_types());
self.common_mut().task = Some(Arc::new(Mutex::new(chosen_task)));
responder.set_handshake_state(ResponderHandshakeState::AuthSent);
self.common.set_signaling_state(SignalingState::Task)?;
info!("Peer handshake completed");
actions.push(HandleAction::HandshakeDone);
self.responder = Some(responder);
Ok(actions)
}
fn process_new_responder(&mut self, address: Address) -> SignalingResult<Option<HandleAction>> {
if self.responders.contains_key(&address) {
warn!("Overwriting responder context for address {:?}", address);
self.responders.remove(&address);
} else {
info!("Registering new responder with address {:?}", address);
}
let mut responder = ResponderContext::new(address, self.responder_counter.increment()?);
if let Some(AuthProvider::TrustedKey(key)) = self.common.auth_provider {
if responder.permanent_key.is_some() { return Err(SignalingError::Crash("Responder already has a permanent key set!".into()));
}
responder.permanent_key = Some(key);
responder.set_handshake_state(ResponderHandshakeState::TokenReceived);
}
self.responders.insert(address, responder);
let mut action = None;
if self.responders.len() > (254 - 2) {
if let Some(drop_action) = self.drop_oldest_inactive_responder()? {
debug!("<-- Enqueuing drop-responder to {}", self.server().identity());
action = Some(drop_action);
}
}
Ok(action)
}
fn drop_oldest_inactive_responder(&mut self) -> SignalingResult<Option<HandleAction>> {
debug!("Path almost full, dropping the oldest inactive responder.");
let address = self.responders
.values()
.filter(|r| r.handshake_state() == ResponderHandshakeState::New)
.min_by_key(|r| r.counter)
.map(|r| r.address);
let responder: ResponderContext = match address {
Some(ref addr) => {
self.responders
.remove(addr)
.ok_or_else(|| SignalingError::Crash("Inactive responder not found anymore in responders list".into()))?
},
None => {
warn!("Did not find a valid responder candidate to drop!");
return Ok(None);
}
};
self
.send_drop_responder(responder.address, DropReason::DroppedByInitiator)
.map(Option::Some)
}
}
pub(crate) struct ResponderSignaling {
pub(crate) common: Common,
pub(crate) initiator: InitiatorContext,
}
impl Signaling for ResponderSignaling {
fn common(&self) -> &Common {
&self.common
}
fn common_mut(&mut self) -> &mut Common {
&mut self.common
}
fn get_peer(&self) -> Option<&dyn PeerContext> {
Some(&self.initiator as &dyn PeerContext)
}
fn get_peer_with_address_mut(&mut self, addr: Address) -> Option<&mut dyn PeerContext> {
let identity: Identity = addr.into();
match identity {
Identity::Server => Some(&mut self.common.server),
Identity::Initiator => Some(&mut self.initiator),
Identity::Responder(_) => None,
}
}
fn initiator_pubkey(&self) -> &PublicKey {
&self.initiator.permanent_key
}
fn validate_nonce_destination(&mut self, nonce: &Nonce) -> Result<(), ValidationError> {
if self.identity() == ClientIdentity::Unknown
&& !nonce.destination().is_unknown()
&& self.server_handshake_state() != ServerHandshakeState::New {
if nonce.destination().is_responder() {
self.common.identity = ClientIdentity::Responder(nonce.destination().0);
debug!("Assigned identity: {}", self.identity());
} else {
return Err(ValidationError::Fail(
format!("cannot assign address {} to a responder", nonce.destination())
));
};
}
if nonce.destination() != self.identity().into() {
return Err(ValidationError::Fail(
format!("Bad destination: {} (our identity is {})", nonce.destination(), self.identity())
));
}
Ok(())
}
fn validate_nonce_source(&mut self, nonce: &Nonce) -> Result<(), ValidationError> {
match nonce.source() {
Address(0x00) => Ok(()),
Address(0x01) => {
if let ClientIdentity::Responder(_) = self.identity() {
Ok(())
} else {
Err(ValidationError::DropMsg(
format!("Bad source: {} (our identity is {})", nonce.source(), self.identity())
))
}
},
Address(0x02..=0xff) => Err(ValidationError::DropMsg(
format!("Bad source: {} (our identity is {})", nonce.source(), self.identity())
)),
}
}
fn decode_peer_message(&self, bbox: ByteBox) -> SignalingResult<OpenBox<Message>> {
if !bbox.nonce.source().is_initiator() {
return Err(SignalingError::Crash("Received message from a responder".to_string()));
}
match self.initiator.handshake_state() {
InitiatorHandshakeState::KeySent => {
OpenBox::<Message>::decrypt(bbox, &self.common.permanent_keypair, &self.initiator.permanent_key)
},
InitiatorHandshakeState::AuthSent => {
let initiator_session_key = self.initiator.session_key.as_ref()
.ok_or_else(|| SignalingError::Crash("Initiator session key not set".into()))?;
OpenBox::<Message>::decrypt(bbox, &self.initiator.keypair, initiator_session_key)
},
other => {
Err(SignalingError::Crash(format!("Invalid initiator handshake state: {:?}", other)))
},
}
}
fn handle_peer_message(&mut self, obox: OpenBox<Message>) -> SignalingResult<Vec<HandleAction>> {
let old_state = self.initiator.handshake_state();
match (old_state, obox.message) {
(InitiatorHandshakeState::KeySent, Message::Key(msg)) => self.handle_key(msg, &obox.nonce),
(InitiatorHandshakeState::AuthSent, Message::Auth(msg)) => self.handle_auth(msg, obox.nonce.source()),
(InitiatorHandshakeState::AuthSent, Message::Close(msg)) => self.handle_peer_handshake_close(msg),
(s, message) => Err(SignalingError::InvalidStateTransition(
format!("Got {} message from initiator in {:?} state", message.get_type(), s)
)),
}
}
fn handle_server_auth_impl(&mut self, msg: &ServerAuth) -> SignalingResult<Vec<HandleAction>> {
if msg.responders.is_some() {
return Err(SignalingError::InvalidMessage(
"We're a responder, but the `responders` field in the server-auth message is set".into()
));
}
let mut actions: Vec<HandleAction> = vec![];
match msg.initiator_connected {
Some(true) => {
let mut send_token = false;
match self.common().auth_provider {
Some(AuthProvider::Token(_)) => {
send_token = true;
},
Some(AuthProvider::TrustedKey(_)) => {
debug!("Trusted key available, skipping token message");
},
None => {
return Err(SignalingError::Crash("No auth provider set".into()));
},
}
if send_token {
let old_auth_provider = mem::replace(&mut self.common_mut().auth_provider, None);
if let Some(AuthProvider::Token(token)) = old_auth_provider {
actions.push(self.send_token(token)?);
} else {
return Err(SignalingError::Crash("Auth provider is not a token".into()));
}
}
actions.push(self.send_key()?);
actions.push(HandleAction::Event(Event::ServerHandshakeDone(true)));
self.initiator.set_handshake_state(InitiatorHandshakeState::KeySent);
},
Some(false) => {
debug!("No initiator connected so far");
actions.push(HandleAction::Event(Event::ServerHandshakeDone(false)));
},
None => return Err(SignalingError::InvalidMessage(
"We're a responder, but the `initiator_connected` field in the server-auth message is not set".into()
)),
}
Ok(actions)
}
fn handle_new_initiator(&mut self, _msg: NewInitiator) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received new-initiator from server");
let mut actions: Vec<HandleAction> = vec![];
self.initiator = InitiatorContext::new(self.initiator.permanent_key);
let mut send_token = false;
match self.common().auth_provider {
Some(AuthProvider::Token(_)) => {
send_token = true;
},
Some(AuthProvider::TrustedKey(_)) => {
debug!("Trusted key available, skipping token message");
},
None => {
return Err(SignalingError::Crash("No auth provider set".into()));
},
}
if send_token {
let old_auth_provider = mem::replace(&mut self.common_mut().auth_provider, None);
if let Some(AuthProvider::Token(token)) = old_auth_provider {
actions.push(self.send_token(token)?);
} else {
return Err(SignalingError::Crash("Auth provider is not a token".into()));
}
}
actions.push(self.send_key()?);
self.initiator.set_handshake_state(InitiatorHandshakeState::KeySent);
Ok(actions)
}
fn handle_new_responder(&mut self, _msg: NewResponder) -> SignalingResult<Vec<HandleAction>> {
Err(SignalingError::Protocol("Received 'new-responder' message as responder".into()))
}
fn handle_disconnected(&mut self, msg: Disconnected) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received disconnected from server");
if !msg.id.is_initiator() {
return Err(SignalingError::Protocol(
"Received 'disconnected' message with non-initiator id".into()
));
}
Ok(vec![HandleAction::Event(Event::Disconnected(msg.id.0))])
}
}
impl ResponderSignaling {
pub(crate) fn new(permanent_keypair: KeyPair,
initiator_pubkey: PublicKey,
auth_token: Option<AuthToken>,
server_public_permanent_key: Option<PublicKey>,
tasks: Tasks,
ping_interval: Option<Duration>) -> Self {
ResponderSignaling {
common: Common {
signaling_state: SignalingState::ServerHandshake,
role: Role::Responder,
identity: ClientIdentity::Unknown,
permanent_keypair,
auth_provider: Some(match auth_token {
Some(token) => AuthProvider::Token(token),
None => AuthProvider::TrustedKey(initiator_pubkey),
}),
server: {
let mut ctx = ServerContext::new();
ctx.permanent_key = server_public_permanent_key;
ctx
},
tasks: Some(tasks),
task: None,
task_supported_types: None,
ping_interval,
},
initiator: InitiatorContext::new(initiator_pubkey),
}
}
#[cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
fn send_token(&self, token: AuthToken) -> SignalingResult<HandleAction> {
let msg: Message = Token {
key: self.common().permanent_keypair.public_key().to_owned(),
}.into_message();
let nonce = Nonce::new(
self.initiator.cookie_pair().ours.clone(),
self.identity().into(),
self.initiator.identity().into(),
self.initiator.csn_pair().try_write()?.ours.increment()?,
);
let obox = OpenBox::<Message>::new(msg, nonce);
let bbox = obox.encrypt_token(&token);
debug!("<-- Enqueuing token to {}", self.initiator.identity());
Ok(HandleAction::Reply(bbox))
}
fn send_key(&self) -> SignalingResult<HandleAction> {
let msg: Message = Key {
key: self.initiator.keypair.public_key().to_owned(),
}.into_message();
let nonce = Nonce::new(
self.initiator.cookie_pair().ours.clone(),
self.identity().into(),
self.initiator.identity().into(),
self.initiator.csn_pair().try_write()?.ours.increment()?,
);
let obox = OpenBox::<Message>::new(msg, nonce);
let bbox = obox.encrypt(&self.common().permanent_keypair, &self.initiator.permanent_key);
debug!("<-- Enqueuing key to {}", self.initiator.identity());
Ok(HandleAction::Reply(bbox))
}
#[cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
fn handle_key(&mut self, msg: Key, nonce: &Nonce) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received key from {}", nonce.source_identity());
if self.initiator.session_key.is_some() {
return Err(SignalingError::Crash("Initiator already has a session key set!".into()));
}
if msg.key == self.initiator.permanent_key {
return Err(SignalingError::Protocol("Responder session key and permanent key are equal".into()));
}
self.initiator.session_key = Some(msg.key);
self.initiator.set_handshake_state(InitiatorHandshakeState::KeyReceived);
let auth: Message = ResponderAuthBuilder::new(nonce.cookie().clone())
.add_tasks(
self.common()
.tasks
.as_ref()
.ok_or_else(|| SignalingError::Crash("Tasks are not set".into()))?
)
.build()?
.into_message();
let auth_nonce = Nonce::new(
self.initiator.cookie_pair().ours.clone(),
self.common().identity.into(),
self.initiator.identity().into(),
self.initiator.csn_pair().try_write()?.ours.increment()?,
);
let obox = OpenBox::<Message>::new(auth, auth_nonce);
let bbox = obox.encrypt(
&self.initiator.keypair,
self.initiator.session_key.as_ref()
.ok_or_else(|| SignalingError::Crash("Initiator session key not set".into()))?,
);
self.initiator.set_handshake_state(InitiatorHandshakeState::AuthSent);
debug!("<-- Enqueuing auth to {}", self.initiator.identity());
Ok(vec![HandleAction::Reply(bbox)])
}
fn handle_auth(&mut self, msg: Auth, source: Address) -> SignalingResult<Vec<HandleAction>> {
debug!("--> Received auth from {}", Identity::from(source));
self.validate_repeated_cookie(
&msg.your_cookie,
&self.initiator.cookie_pair().ours,
self.initiator.identity(),
)?;
if msg.tasks.is_some() {
return Err(SignalingError::InvalidMessage("We're a responder, but the `tasks` field in the auth message is set".into()));
}
let mut chosen_task: BoxedTask = match msg.task {
Some(task) => {
let our_tasks = mem::replace(&mut self.common_mut().tasks, None)
.ok_or_else(|| SignalingError::Crash("No tasks defined".into()))?;
our_tasks
.into_iter()
.find(|t: &BoxedTask| t.name() == task)
.ok_or_else(|| SignalingError::Protocol(
"The `task` field in the auth message contains an unknown task".into()
))?
},
None => return Err(SignalingError::InvalidMessage(
"The `task` field in the auth message is not set".into()
)),
};
if msg.data.is_empty() {
return Err(SignalingError::Protocol(
"The `data` field in the auth message is empty".into()
));
}
if msg.data.len() > 1 {
return Err(SignalingError::Protocol(
"The `data` field in the auth message contains more than one entry".into()
));
}
let task_data = msg.data.get(&*chosen_task.name())
.ok_or_else(|| SignalingError::Protocol(
"The task in the auth message does not have a corresponding data entry".into()
))?;
chosen_task.init(task_data)
.map_err(|e| SignalingError::TaskInitialization(format!("{}", e)))?;
info!("Initiator authenticated");
self.common_mut().task_supported_types = Some(chosen_task.supported_types());
self.common_mut().task = Some(Arc::new(Mutex::new(chosen_task)));
self.initiator.set_handshake_state(InitiatorHandshakeState::AuthReceived);
self.common.set_signaling_state(SignalingState::Task)?;
info!("Peer handshake completed");
Ok(vec![HandleAction::HandshakeDone])
}
#[cfg_attr(feature="cargo-clippy", allow(needless_pass_by_value))]
fn handle_peer_handshake_close(&mut self, msg: Close) -> SignalingResult<Vec<HandleAction>> {
let close_code = CloseCode::from_number(msg.reason);
match close_code {
CloseCode::NoSharedTask => Err(SignalingError::NoSharedTask),
_ => Err(SignalingError::Protocol(
format!("Received unexpected close message with code {} during peer handshake", msg.reason)
)),
}
}
}