mod config;
mod post_queues;
pub use self::config::Config;
use self::post_queues::PostQueues;
use super::core::{
MessageId, Mixnet, MixnodeIndex, NetworkStatus, PostErr, RelSessionIndex, Scattered,
SessionIndex, SessionPhase, SessionStatus,
};
use rand::RngCore;
use std::{
cmp::max,
collections::VecDeque,
time::{Duration, Instant},
};
pub trait Request {
type Context;
fn with_data<T>(&self, f: impl FnOnce(Scattered<u8>) -> T, context: &Self::Context) -> T;
fn num_surbs(&self, context: &Self::Context) -> usize;
fn handling_delay(&self, message_id: &MessageId, context: &Self::Context) -> Duration;
fn handle_post_err(self, err: PostErr, context: &Self::Context);
fn handle_retry_limit_reached(self, context: &Self::Context);
}
struct RequestState<R> {
request: R,
destinations_remaining: u32,
attempts_remaining: u32,
posts_remaining: u32,
message_id: MessageId,
session_index: Option<SessionIndex>,
destination_index: Option<MixnodeIndex>,
retry_deadline: Instant,
}
impl<R> RequestState<R> {
fn new_destination(&mut self, past: Instant) {
rand::thread_rng().fill_bytes(&mut self.message_id);
self.session_index = None;
self.destination_index = None;
self.retry_deadline = past;
}
}
pub struct RequestManager<R> {
config: Config,
created_at: Instant,
session_status: SessionStatus,
post_queues: PostQueues<RequestState<R>>,
retry_queue: VecDeque<RequestState<R>>,
next_retry_deadline_changed: bool,
}
impl<C, R: Request<Context = C>> RequestManager<R> {
pub fn new(config: Config) -> Self {
let capacity = config.capacity;
Self {
config,
created_at: Instant::now(),
session_status: SessionStatus { current_index: 0, phase: SessionPhase::CoverToCurrent },
post_queues: PostQueues::new(capacity),
retry_queue: VecDeque::with_capacity(capacity),
next_retry_deadline_changed: false,
}
}
pub fn update_session_status<X>(
&mut self,
mixnet: &mut Mixnet<X>,
ns: &dyn NetworkStatus,
context: &C,
) {
let session_status = mixnet.session_status();
if self.session_status == session_status {
return
}
let prev_default_len = self.post_queues.default.len();
if self.session_status.current_index != session_status.current_index {
self.post_queues.default.append(&mut self.post_queues.prev); if session_status.current_index.saturating_sub(self.session_status.current_index) == 1 {
std::mem::swap(&mut self.post_queues.current, &mut self.post_queues.prev);
} else {
self.post_queues.default.append(&mut self.post_queues.current); }
}
if !session_status.phase.allow_requests_and_replies(RelSessionIndex::Current) {
self.post_queues.default.append(&mut self.post_queues.current); }
if !session_status.phase.allow_requests_and_replies(RelSessionIndex::Prev) {
self.post_queues.default.append(&mut self.post_queues.prev); }
for state in self.post_queues.default.iter_mut().skip(prev_default_len) {
state.new_destination(self.created_at);
}
self.session_status = session_status;
self.process_post_queues(mixnet, ns, context);
}
pub fn has_space(&self) -> bool {
let len =
self.post_queues.iter().map(VecDeque::len).sum::<usize>() + self.retry_queue.len();
len < self.config.capacity
}
pub fn insert<X>(
&mut self,
request: R,
mixnet: &mut Mixnet<X>,
ns: &dyn NetworkStatus,
context: &C,
) {
debug_assert!(self.has_space());
let state = RequestState {
request,
destinations_remaining: self.config.num_destinations,
attempts_remaining: 0,
posts_remaining: 0,
message_id: Default::default(),
session_index: None,
destination_index: None,
retry_deadline: self.created_at,
};
self.retry(state, mixnet, ns, context);
}
pub fn remove(&mut self, message_id: &MessageId) -> Option<R> {
for post_queue in self.post_queues.iter_mut() {
if let Some(i) = post_queue.iter().position(|state| &state.message_id == message_id) {
return Some(post_queue.remove(i).expect("i returned by position()").request)
}
}
if let Some(i) = self.retry_queue.iter().position(|state| &state.message_id == message_id) {
if i == 0 {
self.next_retry_deadline_changed = true;
}
return Some(self.retry_queue.remove(i).expect("i returned by position()").request)
}
None
}
fn process_post_queue<X>(
&mut self,
rel_session_index: Option<RelSessionIndex>,
mixnet: &mut Mixnet<X>,
ns: &dyn NetworkStatus,
context: &C,
) {
let rel_session_index_or_default =
rel_session_index.unwrap_or(self.session_status.phase.default_request_session());
if (rel_session_index_or_default == RelSessionIndex::Prev) &&
(self.session_status.current_index == 0)
{
debug_assert!(self.post_queues.prev.is_empty());
return
}
let session_index = rel_session_index
.map(|rel_session_index| rel_session_index + self.session_status.current_index);
let session_index_or_default =
rel_session_index_or_default + self.session_status.current_index;
while let Some(mut state) = self.post_queues[rel_session_index].pop_front() {
debug_assert_eq!(state.session_index, session_index);
let res = state.request.with_data(
|data| {
mixnet.post_request(
session_index_or_default,
&mut state.destination_index,
&state.message_id,
data,
state.request.num_surbs(context),
ns,
)
},
context,
);
match res {
Ok(metrics) => {
state.session_index = Some(session_index_or_default);
let handling_delay = state.request.handling_delay(&state.message_id, context);
let rtt = metrics.estimate_rtt(handling_delay);
state.retry_deadline = max(state.retry_deadline, Instant::now() + rtt);
match state.posts_remaining.checked_sub(1) {
Some(posts_remaining) => {
state.posts_remaining = posts_remaining;
self.post_queues[Some(rel_session_index_or_default)].push_back(state);
},
None => {
let i = self
.retry_queue
.partition_point(|s| s.retry_deadline < state.retry_deadline);
self.retry_queue.insert(i, state);
if i == 0 {
self.next_retry_deadline_changed = true;
}
},
}
},
Err(PostErr::NotEnoughSpaceInQueue) => {
self.post_queues[rel_session_index].push_front(state);
break
},
Err(err) => state.request.handle_post_err(err, context),
}
}
}
pub fn process_post_queues<X>(
&mut self,
mixnet: &mut Mixnet<X>,
ns: &dyn NetworkStatus,
context: &C,
) {
self.process_post_queue(None, mixnet, ns, context);
self.process_post_queue(Some(RelSessionIndex::Current), mixnet, ns, context);
self.process_post_queue(Some(RelSessionIndex::Prev), mixnet, ns, context);
}
fn session_post_queues_empty(&self, rel_session_index: Option<RelSessionIndex>) -> bool {
if !self.post_queues[rel_session_index].is_empty() {
return false
}
let default = self.session_status.phase.default_request_session();
match rel_session_index {
Some(rel_session_index) if rel_session_index == default =>
self.post_queues.default.is_empty(),
Some(_) => true,
None => self.post_queues[Some(default)].is_empty(),
}
}
fn retry<X>(
&mut self,
mut state: RequestState<R>,
mixnet: &mut Mixnet<X>,
ns: &dyn NetworkStatus,
context: &C,
) {
debug_assert_eq!(state.posts_remaining, 0);
match state.attempts_remaining.checked_sub(1) {
Some(attempts_remaining) => state.attempts_remaining = attempts_remaining,
None => {
let Some(destinations_remaining) = state.destinations_remaining.checked_sub(1)
else {
state.request.handle_retry_limit_reached(context);
return
};
state.destinations_remaining = destinations_remaining;
state.attempts_remaining = self.config.num_attempts_per_destination - 1;
state.new_destination(self.created_at);
},
}
state.posts_remaining = self.config.num_posts_per_attempt - 1;
let rel_session_index = state.session_index.and_then(|session_index| {
let rel_session_index = RelSessionIndex::from_session_index(
session_index,
self.session_status.current_index,
);
if !rel_session_index.map_or(false, |rel_session_index| {
self.session_status.phase.allow_requests_and_replies(rel_session_index)
}) {
state.new_destination(self.created_at);
return None
}
rel_session_index
});
let empty = self.session_post_queues_empty(rel_session_index);
self.post_queues[rel_session_index].push_back(state);
if empty {
self.process_post_queue(rel_session_index, mixnet, ns, context);
if rel_session_index.is_none() {
self.process_post_queue(
Some(self.session_status.phase.default_request_session()),
mixnet,
ns,
context,
);
}
}
}
pub fn next_retry_deadline(&self) -> Option<Instant> {
self.retry_queue.front().map(|state| state.retry_deadline)
}
pub fn pop_next_retry<X>(
&mut self,
mixnet: &mut Mixnet<X>,
ns: &dyn NetworkStatus,
context: &C,
) -> bool {
if let Some(state) = self.retry_queue.pop_front() {
self.next_retry_deadline_changed = true;
self.retry(state, mixnet, ns, context);
true
} else {
false
}
}
pub fn next_retry_deadline_changed(&mut self) -> bool {
let changed = self.next_retry_deadline_changed;
self.next_retry_deadline_changed = false;
changed
}
}