use std::collections::{HashMap, VecDeque};
use std::sync::mpsc::Receiver;
use std::time::Instant;
use protobuf::Message;
use crate::network::dispatch::DispatchMessageSender;
use crate::peer::connector::PeerLookup;
use crate::protos::network::{NetworkMessage, NetworkMessageType};
use crate::transport::matrix::{ConnectionMatrixEnvelope, ConnectionMatrixSender};
use super::PeerTokenPair;
const DEFAULT_PENDING_QUEUE_SIZE: usize = 100;
pub const DEFAULT_TIME_BETWEEN_ATTEMPTS: u64 = 10;
pub struct PendingIncomingMsg {
pub envelope: ConnectionMatrixEnvelope,
pub last_attempt: Instant,
pub remaining_attempts: usize,
}
pub struct PendingOutgoingMsg {
pub recipient: PeerTokenPair,
pub payload: Vec<u8>,
pub last_attempt: Instant,
pub remaining_attempts: usize,
}
pub enum RetryMessage {
Retry,
PendingIncoming(PendingIncomingMsg),
PendingOutgoing(PendingOutgoingMsg),
Shutdown,
}
pub fn run_pending_loop<S>(
peer_connector: &dyn PeerLookup,
receiver: Receiver<RetryMessage>,
dispatch_msg_sender: DispatchMessageSender<NetworkMessageType>,
message_sender: S,
) -> Result<(), String>
where
S: ConnectionMatrixSender + 'static,
{
let mut connection_id_to_peer_id: HashMap<String, PeerTokenPair> = HashMap::new();
let mut peer_id_to_connection_id: HashMap<PeerTokenPair, String> = HashMap::new();
let mut pending_queue_incoming = VecDeque::new();
let mut pending_queue_outgoing = VecDeque::new();
loop {
match receiver.recv() {
Ok(RetryMessage::PendingIncoming(pending)) => {
if pending_queue_incoming.len() > DEFAULT_PENDING_QUEUE_SIZE {
warn!(
"PeerInterconnect pending recv queue is to large, dropping oldest message"
);
pending_queue_incoming.pop_front();
}
pending_queue_incoming.push_back(pending);
continue;
}
Ok(RetryMessage::PendingOutgoing(pending)) => {
if pending_queue_outgoing.len() > DEFAULT_PENDING_QUEUE_SIZE {
warn!(
"PeerInterconnect pending send queue is to large, dropping oldest message"
);
pending_queue_outgoing.pop_front();
}
pending_queue_outgoing.push_back(pending);
continue;
}
Ok(RetryMessage::Retry) => (),
Ok(RetryMessage::Shutdown) => {
info!("Received Shutdown");
break Ok(());
}
Err(_) => break Err("Pending retry receiver dropped".to_string()),
};
let mut still_need_retry_incoming = VecDeque::new();
for mut pending in pending_queue_incoming.into_iter() {
if pending.last_attempt.elapsed().as_secs() < DEFAULT_TIME_BETWEEN_ATTEMPTS {
still_need_retry_incoming.push_back(pending);
continue;
}
let connection_id = pending.envelope.id().to_string();
let peer_id = if let Some(peer_id) = connection_id_to_peer_id.get(&connection_id) {
Some(peer_id.to_owned())
} else if let Some(peer_id) = peer_connector
.peer_id(&connection_id)
.map_err(|err| format!("Unable to get peer ID for {}: {}", connection_id, err))?
{
connection_id_to_peer_id.insert(connection_id.to_string(), peer_id.clone());
Some(peer_id)
} else {
None
};
if let Some(peer_id) = peer_id {
let mut network_msg: NetworkMessage =
match Message::parse_from_bytes(pending.envelope.payload()) {
Ok(msg) => msg,
Err(err) => {
error!("Unable to dispatch message: {}", err);
continue;
}
};
trace!(
"Received message from {}({}): {:?}",
peer_id,
connection_id,
network_msg.get_message_type()
);
match dispatch_msg_sender.send(
network_msg.get_message_type(),
network_msg.take_payload(),
peer_id.into(),
) {
Ok(()) => (),
Err((message_type, _, _)) => {
error!("Unable to dispatch message of type {:?}", message_type)
}
}
} else if pending.remaining_attempts > 0 {
pending.remaining_attempts -= 1;
debug!(
"Received message from removed or unknown peer with connection_id {},\
attempts left {}",
connection_id, pending.remaining_attempts
);
still_need_retry_incoming.push_back(pending);
} else {
error!(
"Received message from removed or unknown peer with connection_id {},\
dropping",
connection_id
);
}
}
pending_queue_incoming = still_need_retry_incoming;
let mut still_need_retry_outgoing = VecDeque::new();
for mut pending in pending_queue_outgoing.into_iter() {
if pending.last_attempt.elapsed().as_secs() < DEFAULT_TIME_BETWEEN_ATTEMPTS {
still_need_retry_outgoing.push_back(pending);
continue;
}
let connection_id = if let Some(connection_id) =
peer_id_to_connection_id.get(&pending.recipient)
{
Some(connection_id.to_owned())
} else if let Some(connection_id) = peer_connector
.connection_id(&pending.recipient)
.map_err(|err| {
format!(
"Unable to get connection ID for {}: {}",
pending.recipient, err
)
})?
{
peer_id_to_connection_id.insert(pending.recipient.clone(), connection_id.clone());
Some(connection_id)
} else {
None
};
if let Some(connection_id) = connection_id {
if message_sender
.send(connection_id.to_string(), pending.payload.to_vec())
.is_err()
{
if let Some(new_connection_id) = peer_connector
.connection_id(&pending.recipient)
.map_err(|err| {
format!(
"Unable to get connection ID for {}: {}",
&pending.recipient, err
)
})?
{
if new_connection_id != connection_id {
peer_id_to_connection_id
.insert(pending.recipient.clone(), new_connection_id.clone());
if message_sender
.send(new_connection_id, pending.payload.to_vec())
.is_ok()
{
continue;
}
}
}
} else {
continue;
}
}
if pending.remaining_attempts > 0 {
pending.remaining_attempts -= 1;
debug!(
"Tried to send message to removed or unknown peer with \
peer_id {}, attempts left {}",
pending.recipient, pending.remaining_attempts
);
still_need_retry_outgoing.push_back(pending);
} else {
error!(
"Cannot send message, unknown peer: {}, dropping",
pending.recipient
);
}
}
pending_queue_outgoing = still_need_retry_outgoing;
}
}