use self::{
acknowledgement_control::AcknowledgementController, real_traffic_stream::OutQueueControl,
};
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller;
use crate::client::replies::reply_controller::{
ReplyController, ReplyControllerReceiver, ReplyControllerSender,
};
use crate::client::replies::reply_storage::CombinedReplyStorage;
use crate::client::{
inbound_messages::InputMessageReceiver, mix_traffic::BatchMixMessageSender,
real_messages_control::acknowledgement_control::AcknowledgementControllerConnectors,
topology_control::TopologyAccessor,
};
use crate::config;
use futures::channel::mpsc;
use nym_gateway_client::AcknowledgementReceiver;
use nym_sphinx::acknowledgements::AckKey;
use nym_sphinx::addressing::clients::Recipient;
use nym_statistics_common::clients::ClientStatsSender;
use nym_task::connections::{ConnectionCommandReceiver, LaneQueueLengths};
use nym_task::ShutdownToken;
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::sync::Arc;
use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
pub(crate) use acknowledgement_control::{AckActionSender, Action};
pub(crate) mod acknowledgement_control;
pub(crate) mod message_handler;
pub(crate) mod real_traffic_stream;
pub struct Config {
ack_key: Arc<AckKey>,
self_recipient: Recipient,
traffic: config::Traffic,
cover_traffic: config::CoverTraffic,
acks: config::Acknowledgements,
reply_surbs: config::ReplySurbs,
}
impl<'a> From<&'a Config> for acknowledgement_control::Config {
fn from(cfg: &'a Config) -> Self {
acknowledgement_control::Config::new(
cfg.traffic.maximum_number_of_retransmissions,
cfg.acks.ack_wait_addition,
cfg.acks.ack_wait_multiplier,
cfg.traffic.packet_type,
)
.with_custom_packet_size(cfg.traffic.primary_packet_size)
}
}
impl<'a> From<&'a Config> for real_traffic_stream::Config {
fn from(cfg: &'a Config) -> Self {
real_traffic_stream::Config::new(
Arc::clone(&cfg.ack_key),
cfg.self_recipient,
cfg.acks.average_ack_delay,
cfg.traffic,
cfg.cover_traffic.cover_traffic_primary_size_ratio,
)
}
}
impl<'a> From<&'a Config> for message_handler::Config {
fn from(cfg: &'a Config) -> Self {
message_handler::Config::new(
Arc::clone(&cfg.ack_key),
cfg.self_recipient,
cfg.traffic.average_packet_delay,
cfg.acks.average_ack_delay,
cfg.traffic.deterministic_route_selection,
cfg.traffic.use_legacy_sphinx_format,
)
.with_custom_primary_packet_size(cfg.traffic.primary_packet_size)
.with_custom_secondary_packet_size(cfg.traffic.secondary_packet_size)
.disable_mix_hops(cfg.traffic.disable_mix_hops)
}
}
impl Config {
pub fn new(
base_client_debug_config: &config::DebugConfig,
ack_key: Arc<AckKey>,
self_recipient: Recipient,
) -> Self {
Config {
ack_key,
self_recipient,
traffic: base_client_debug_config.traffic,
cover_traffic: base_client_debug_config.cover_traffic,
acks: base_client_debug_config.acknowledgements,
reply_surbs: base_client_debug_config.reply_surbs,
}
}
}
pub(crate) struct RealMessagesController<R>
where
R: CryptoRng + Rng,
{
out_queue_control: OutQueueControl<R>,
ack_control: AcknowledgementController<R>,
reply_control: ReplyController<R>,
}
impl RealMessagesController<OsRng> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: Config,
key_rotation_config: KeyRotationConfig,
ack_receiver: AcknowledgementReceiver,
input_receiver: InputMessageReceiver,
mix_sender: BatchMixMessageSender,
topology_access: TopologyAccessor,
reply_storage: CombinedReplyStorage,
reply_controller_sender: ReplyControllerSender,
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: ClientStatsSender,
shutdown_token: ShutdownToken,
) -> Self {
let rng = OsRng;
let (real_message_sender, real_message_receiver) = tokio::sync::mpsc::channel(8);
let (sent_notifier_tx, sent_notifier_rx) = mpsc::unbounded();
let (ack_action_tx, ack_action_rx) = mpsc::unbounded();
let ack_controller_connectors = AcknowledgementControllerConnectors::new(
input_receiver,
sent_notifier_rx,
ack_receiver,
ack_action_tx.clone(),
ack_action_rx,
);
let ack_control_config = (&config).into();
let out_queue_config = (&config).into();
let reply_controller_config =
reply_controller::Config::new(config.reply_surbs, key_rotation_config);
let message_handler_config = (&config).into();
let message_handler = MessageHandler::new(
message_handler_config,
rng,
ack_action_tx,
real_message_sender,
topology_access.clone(),
reply_storage.key_storage(),
reply_storage.tags_storage(),
shutdown_token.clone(),
);
let ack_control = AcknowledgementController::new(
ack_control_config,
Arc::clone(&config.ack_key),
ack_controller_connectors,
message_handler.clone(),
reply_controller_sender,
stats_tx.clone(),
);
let reply_control = ReplyController::new(
reply_controller_config,
message_handler,
reply_storage,
reply_controller_receiver,
);
let out_queue_control = OutQueueControl::new(
out_queue_config,
rng,
sent_notifier_tx,
mix_sender,
real_message_receiver,
topology_access,
lane_queue_lengths,
client_connection_rx,
stats_tx,
shutdown_token.clone(),
);
RealMessagesController {
out_queue_control,
ack_control,
reply_control,
}
}
pub fn into_tasks(
self,
) -> (
OutQueueControl<OsRng>,
ReplyController<OsRng>,
AcknowledgementController<OsRng>,
) {
(self.out_queue_control, self.reply_control, self.ack_control)
}
}