use super::{MsgResponse, Session};
use crate::{Error, Result};
use qp2p::{RecvStream, UsrMsgBytes};
use sn_interface::{
messaging::{
data::{ClientDataResponse, ClientMsg},
AuthorityProof, ClientAuth, Dst, MsgId, MsgKind, MsgType, WireMsg,
},
network_knowledge::SectionTreeUpdate,
types::{log_markers::LogMarker, Peer},
};
use itertools::Itertools;
const MAX_AE_RETRIES_TO_ATTEMPT: u8 = 5;
struct MsgResent {
new_peer: Peer,
new_recv_stream: RecvStream,
}
impl Session {
#[instrument(skip_all, level = "debug")]
async fn read_resp_from_recvstream(
recv_stream: &mut RecvStream,
peer: Peer,
correlation_id: MsgId,
) -> Result<(MsgId, ClientDataResponse), Error> {
if let Some(bytes) = recv_stream.next().await? {
match WireMsg::deserialize(bytes)? {
MsgType::ClientDataResponse { msg_id, msg } => Ok((msg_id, msg)),
msg => {
warn!(
"Unexpected msg type received on {} from {peer:?} in response \
to {correlation_id:?}: {msg:?}",
recv_stream.id()
);
Err(Error::UnexpectedMsgType {
correlation_id,
peer,
msg,
})
}
}
} else {
Err(Error::ResponseStreamClosed {
msg_id: correlation_id,
peer,
})
}
}
#[instrument(skip_all, level = "debug")]
pub(crate) async fn recv_stream_listener(
&self,
correlation_id: MsgId,
mut peer: Peer,
peer_index: usize,
mut recv_stream: RecvStream,
) -> MsgResponse {
let mut attempt = 0;
let result = loop {
let addr = peer.addr();
if attempt > MAX_AE_RETRIES_TO_ATTEMPT {
break MsgResponse::Failure(
addr,
Error::AntiEntropyMaxRetries {
msg_id: correlation_id,
retries: attempt - 1,
},
);
}
let stream_id = recv_stream.id();
debug!("Waiting for response msg on {stream_id} from {peer:?} @ index: {peer_index} for {correlation_id:?}, attempt #{attempt}");
let (msg_id, resp_msg) =
match Self::read_resp_from_recvstream(&mut recv_stream, peer, correlation_id).await
{
Ok(resp_info) => resp_info,
Err(err) => break MsgResponse::Failure(addr, err),
};
match resp_msg {
ClientDataResponse::QueryResponse {
response,
correlation_id,
} => {
trace!(
"QueryResponse with id {msg_id:?} regarding correlation_id \
{correlation_id:?} from {peer:?} with response: {response:?}"
);
break MsgResponse::QueryResponse(addr, Box::new(response));
}
ClientDataResponse::CmdResponse {
response,
correlation_id,
} => {
trace!(
"CmdResponse with id {msg_id:?} regarding correlation_id \
{correlation_id:?} from {peer:?} with response {response:?}"
);
break MsgResponse::CmdResponse(addr, Box::new(response));
}
ClientDataResponse::AntiEntropy {
section_tree_update,
bounced_msg,
} => {
debug!(
"AntiEntropy msg with id {msg_id:?} received for {correlation_id:?} \
from {peer:?}@{peer_index}"
);
let ae_resp_outcome = self
.handle_ae_msg(
section_tree_update,
bounced_msg,
peer,
peer_index,
correlation_id,
)
.await;
match ae_resp_outcome {
Err(err) => break MsgResponse::Failure(addr, err),
Ok(MsgResent {
new_peer,
new_recv_stream,
}) => {
recv_stream = new_recv_stream;
trace!(
"{} of correlation {correlation_id:?} to {} on {stream_id}",
LogMarker::ReceiveCompleted,
addr,
);
peer = new_peer;
attempt += 1;
continue;
}
}
}
}
};
trace!(
"{} of correlation {correlation_id:?} to {}, on {}, with {result:?}",
LogMarker::ReceiveCompleted,
peer.addr(),
recv_stream.id()
);
result
}
#[instrument(skip_all, level = "debug")]
async fn handle_ae_msg(
&self,
section_tree_update: SectionTreeUpdate,
bounced_msg: UsrMsgBytes,
src_peer: Peer,
src_peer_index: usize,
correlation_id: MsgId,
) -> Result<MsgResent> {
let target_sap = section_tree_update.signed_sap.value.clone();
debug!(
"Received Anti-Entropy msg from {src_peer}@{src_peer_index}, with SAP: {target_sap:?}"
);
self.update_network_knowledge(section_tree_update, src_peer)
.await;
let (msg_id, elders, client_msg, dst, auth) = self
.new_target_elders(src_peer, bounced_msg, correlation_id)
.await?;
let ordered_elders: Vec<_> = elders
.into_iter()
.sorted_by(|lhs, rhs| dst.name.cmp_distance(&lhs.name(), &rhs.name()))
.collect();
let target_elder = ordered_elders.get(src_peer_index);
if let Some(elder) = target_elder {
let payload = WireMsg::serialize_msg_payload(&client_msg)?;
let wire_msg =
WireMsg::new_msg(msg_id, payload, MsgKind::Client(auth.into_inner()), dst);
let bytes = wire_msg.serialize()?;
debug!("{msg_id:?} AE bounced msg going out again. Resending original message (sent to index {src_peer_index:?} peer: {src_peer:?}) to new section elder {elder:?}");
let link = self
.peer_links
.get_or_create_link(elder, false, Some(correlation_id))
.await;
let new_recv_stream = link
.send_bi(bytes, msg_id)
.await
.map_err(|error| Error::FailedToInitateBiDiStream { msg_id, error })?;
Ok(MsgResent {
new_peer: *elder,
new_recv_stream,
})
} else {
Err(Error::AntiEntropyNoSapElders)
}
}
async fn update_network_knowledge(
&self,
section_tree_update: SectionTreeUpdate,
src_peer: Peer,
) {
debug!("Attempting to update our network knowledge...");
let sap = section_tree_update.signed_sap.value.clone();
let prefix = sap.prefix();
let mut network = self.network.write().await;
debug!("Attempting to update our network knowledge... WRITE LOCK GOT");
match network.update_the_section_tree(section_tree_update) {
Ok(true) => {
debug!("Anti-Entropy: updated remote section SAP for {prefix:?} to {sap:?}");
}
Ok(false) => {
debug!(
"Anti-Entropy: discarded SAP for {prefix:?} since it's the same as \
the one in our records: {sap:?}",
);
}
Err(err) => {
warn!(
"Anti-Entropy: failed to update remote section SAP and DAG \
sent by: {src_peer:?}, section key: {:?}, w/ err: {err:?}",
sap.section_key()
);
}
}
}
#[instrument(skip_all, level = "debug")]
#[allow(clippy::type_complexity)]
async fn new_target_elders(
&self,
src_peer: Peer,
bounced_msg: UsrMsgBytes,
correlation_id: MsgId,
) -> Result<(MsgId, Vec<Peer>, ClientMsg, Dst, AuthorityProof<ClientAuth>), Error> {
let (msg_id, client_msg, bounced_msg_dst, auth) = match WireMsg::deserialize(bounced_msg)? {
MsgType::Client {
msg_id,
msg,
dst,
auth,
} => (msg_id, msg, dst, auth),
msg => {
warn!("Unexpected bounced msg received in AE response: {msg:?}");
return Err(Error::UnexpectedMsgType {
correlation_id,
peer: src_peer,
msg,
});
}
};
trace!(
"Bounced msg {msg_id:?} received in an AE response: {client_msg:?} from {src_peer:?}"
);
let knowlege = self.network.read().await;
let best_sap = knowlege
.closest(&bounced_msg_dst.name, None)
.ok_or(Error::NoCloseSapFound(bounced_msg_dst.name))?;
trace!("{msg_id:?} from {src_peer:?}. New SAP of for bounced msg: {best_sap:?}");
let target_elders = best_sap.elders_vec();
if target_elders.is_empty() {
Err(Error::AntiEntropyNoSapElders)
} else {
let dst = Dst {
name: bounced_msg_dst.name,
section_key: best_sap.section_key(),
};
debug!(
"Final target elders for resending {msg_id:?}: {client_msg:?} msg \
are {target_elders:?}"
);
Ok((msg_id, target_elders, client_msg, dst, auth))
}
}
}