use super::core::{MessageId, Mixnet, RequestMessage, SessionIndex, Surb, MESSAGE_ID_SIZE};
use hashlink::{linked_hash_map::Entry, LinkedHashMap};
use log::{debug, warn};
use rand::RngCore;
use std::time::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct Config {
pub log_target: &'static str,
pub capacity: usize,
pub max_posts: usize,
pub cooldown: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
log_target: "mixnet",
capacity: 400,
max_posts: 2,
cooldown: Duration::from_secs(10),
}
}
}
struct Reply {
message_id: MessageId,
data: Vec<u8>,
}
impl Reply {
fn new(data: Vec<u8>) -> Self {
let mut message_id = [0; MESSAGE_ID_SIZE];
rand::thread_rng().fill_bytes(&mut message_id);
Self { message_id, data }
}
}
pub struct ReplyContext {
session_index: SessionIndex,
message_id: MessageId,
surbs: Vec<Surb>,
}
impl ReplyContext {
pub fn message_id(&self) -> &MessageId {
&self.message_id
}
fn post_reply(&mut self, reply: &Reply, mixnet: &mut Mixnet, config: &Config) {
for _ in 0..config.max_posts {
if let Err(err) = mixnet.post_reply(
&mut self.surbs,
self.session_index,
&reply.message_id,
reply.data.as_slice().into(),
) {
warn!(target: config.log_target,
"Failed to post reply to request with message ID {:x?}: {err}",
self.message_id);
break
}
}
}
}
enum ReplyState {
Pending,
Complete { reply: Reply, last_post: Instant },
}
pub struct ReplyManager {
config: Config,
states: LinkedHashMap<MessageId, ReplyState>,
}
impl ReplyManager {
pub fn new(config: Config) -> Self {
let states = LinkedHashMap::with_capacity(
config.capacity.saturating_add(1),
);
Self { config, states }
}
fn maybe_evict(&mut self) {
if self.states.len() > self.config.capacity {
self.states.pop_front();
debug_assert_eq!(self.states.len(), self.config.capacity);
}
}
pub fn insert(
&mut self,
message: RequestMessage,
mixnet: &mut Mixnet,
) -> Option<(ReplyContext, Vec<u8>)> {
let mut reply_context = ReplyContext {
session_index: message.session_index,
message_id: message.id,
surbs: message.surbs,
};
match self.states.entry(message.id) {
Entry::Occupied(mut entry) => {
match entry.get_mut() {
ReplyState::Pending => debug!(target: self.config.log_target,
"Ignoring repeat request with message ID {:x?}; currently handling", message.id),
ReplyState::Complete { reply, last_post } => {
let now = Instant::now();
let since_last = now.saturating_duration_since(*last_post);
if since_last < self.config.cooldown {
debug!(target: self.config.log_target,
"Ignoring repeat request with message ID {:x?}; posted a reply {:.1}s ago",
message.id, since_last.as_secs_f32());
} else {
*last_post = now;
reply_context.post_reply(reply, mixnet, &self.config);
}
},
}
None
},
Entry::Vacant(entry) => {
entry.insert(ReplyState::Pending);
self.maybe_evict();
Some((reply_context, message.data))
},
}
}
pub fn abandon(&mut self, reply_context: ReplyContext) {
if let Entry::Occupied(entry) = self.states.entry(reply_context.message_id) {
match entry.get() {
ReplyState::Pending => {
entry.remove();
},
ReplyState::Complete { .. } => warn!(
target: self.config.log_target,
"Ignoring abandon of request with message ID {:x?}; already completed",
reply_context.message_id
),
}
}
}
pub fn complete(
&mut self,
mut reply_context: ReplyContext,
data: Vec<u8>,
mixnet: &mut Mixnet,
) {
let state = match self.states.entry(reply_context.message_id) {
Entry::Occupied(entry) => match entry.into_mut() {
state @ ReplyState::Pending => state,
ReplyState::Complete { .. } => {
warn!(target: self.config.log_target,
"Request with message ID {:x?} completed twice",
reply_context.message_id);
return
},
},
Entry::Vacant(entry) => entry.insert(ReplyState::Pending),
};
let reply = Reply::new(data);
reply_context.post_reply(&reply, mixnet, &self.config);
*state = ReplyState::Complete { reply, last_post: Instant::now() };
self.maybe_evict();
}
}