use crate::client::helpers::new_interval_stream;
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::config;
use futures::StreamExt;
use nym_task::ShutdownToken;
use rand::rngs::OsRng;
use rand::{CryptoRng, Rng};
use std::time::Duration;
use time::OffsetDateTime;
use tracing::debug;
use crate::client::replies::reply_controller::receiver_controller::ReceiverReplyController;
use crate::client::replies::reply_controller::sender_controller::SenderReplyController;
pub(crate) use requests::{ReplyControllerMessage, ReplyControllerReceiver, ReplyControllerSender};
pub mod key_rotation_helpers;
mod receiver_controller;
pub mod requests;
mod sender_controller;
#[derive(Clone, Copy)]
pub struct Config {
reply_surbs: config::ReplySurbs,
key_rotation: KeyRotationConfig,
}
impl Config {
pub(crate) fn new(
reply_surbs_cfg: config::ReplySurbs,
key_rotation: KeyRotationConfig,
) -> Self {
Self {
reply_surbs: reply_surbs_cfg,
key_rotation,
}
}
}
pub type MaxRetransmissions = Option<u32>;
pub struct ReplyController<R> {
config: Config,
sender_controller: SenderReplyController<R>,
receiver_controller: ReceiverReplyController<R>,
request_receiver: ReplyControllerReceiver,
}
impl ReplyController<OsRng> {
pub(crate) fn new(
config: Config,
message_handler: MessageHandler<OsRng>,
full_reply_storage: CombinedReplyStorage,
request_receiver: ReplyControllerReceiver,
) -> Self {
ReplyController {
config,
sender_controller: SenderReplyController::new(
config,
&full_reply_storage,
message_handler.clone(),
),
receiver_controller: ReceiverReplyController::new(
config,
full_reply_storage.surbs_storage(),
message_handler,
),
request_receiver,
}
}
}
impl<R> ReplyController<R>
where
R: CryptoRng + Rng,
{
async fn handle_request(&mut self, request: ReplyControllerMessage) {
match request {
ReplyControllerMessage::RetransmitReply {
recipient,
timed_out_ack,
extra_surb_request,
} => {
self.receiver_controller
.handle_reply_retransmission(recipient, timed_out_ack, extra_surb_request)
.await
}
ReplyControllerMessage::SendReply {
recipient,
message,
lane,
max_retransmissions,
} => {
self.receiver_controller
.handle_send_reply(recipient, message, lane, max_retransmissions)
.await
}
ReplyControllerMessage::AdditionalSurbs {
sender_tag,
reply_surbs,
from_surb_request,
} => {
self.receiver_controller
.handle_received_surbs(sender_tag, reply_surbs, from_surb_request)
.await
}
ReplyControllerMessage::LaneQueueLength {
connection_id,
response_channel,
} => self
.receiver_controller
.handle_lane_queue_length(connection_id, response_channel),
ReplyControllerMessage::AdditionalSurbsRequest { recipient, amount } => {
self.sender_controller
.handle_surb_request(*recipient, amount)
.await
}
}
}
async fn remove_stale_storage(&mut self) {
let now = OffsetDateTime::now_utc();
self.receiver_controller
.inspect_and_clear_stale_data(now)
.await;
self.sender_controller.inspect_and_clear_stale_data(now)
}
pub(crate) async fn run(&mut self, shutdown_token: ShutdownToken) {
debug!("Started ReplyController with graceful shutdown support");
let polling_rate = Duration::from_secs(5);
let mut stale_inspection = new_interval_stream(polling_rate);
let polling_rate = self.config.key_rotation.epoch_duration / 8;
let mut invalidation_inspection = new_interval_stream(polling_rate);
loop {
tokio::select! {
biased;
_ = shutdown_token.cancelled() => {
tracing::trace!("ReplyController: Received shutdown");
break;
},
req = self.request_receiver.next() => match req {
Some(req) => self.handle_request(req).await,
None => {
tracing::trace!("ReplyController: Stopping since channel closed");
break;
}
},
_ = stale_inspection.next() => {
self.receiver_controller.inspect_stale_pending_data().await
},
_ = invalidation_inspection.next() => {
self.receiver_controller.check_surb_refresh().await;
self.remove_stale_storage().await;
}
}
}
tracing::debug!("ReplyController: Exiting");
}
}