1use super::mix_traffic::ClientRequestSender;
5use super::received_buffer::ReceivedBufferMessage;
6use super::statistics_control::StatisticsControl;
7use crate::client::base_client::storage::helpers::store_client_keys;
8use crate::client::base_client::storage::MixnetClientStorage;
9use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
10use crate::client::event_control::EventControl;
11use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
12use crate::client::key_manager::persistence::KeyStore;
13use crate::client::key_manager::ClientKeys;
14use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
15use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
16use crate::client::real_messages_control;
17use crate::client::real_messages_control::RealMessagesController;
18use crate::client::received_buffer::{
19 ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
20};
21use crate::client::replies::reply_controller;
22use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
23use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyControllerSender};
24use crate::client::replies::reply_storage::{
25 CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
26};
27use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
28use crate::client::topology_control::{
29 TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
30};
31use crate::config;
32use crate::config::{Config, DebugConfig};
33use crate::error::ClientCoreError;
34use crate::init::{
35 setup_gateway,
36 types::{GatewaySetup, InitialisationResult},
37};
38use futures::channel::mpsc;
39use nym_bandwidth_controller::BandwidthController;
40use nym_client_core_config_types::{ForgetMe, RememberMe};
41use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
42use nym_credential_storage::storage::Storage as CredentialStorage;
43use nym_crypto::asymmetric::{ed25519, x25519};
44use nym_crypto::hkdf::DerivationMaterial;
45use nym_gateway_client::client::config::GatewayClientConfig;
46use nym_gateway_client::{
47 AcknowledgementReceiver, GatewayClient, GatewayConfig, MixnetMessageReceiver, PacketRouter,
48};
49use nym_sphinx::acknowledgements::AckKey;
50use nym_sphinx::addressing::clients::Recipient;
51use nym_sphinx::addressing::nodes::NodeIdentity;
52use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
53use nym_statistics_common::clients::ClientStatsSender;
54use nym_statistics_common::generate_client_stats_id;
55use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
56use nym_task::ShutdownTracker;
57use nym_topology::provider_trait::TopologyProvider;
58use nym_topology::HardcodedTopologyProvider;
59use nym_validator_client::nym_api::NymApiClientExt;
60use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
61use rand::prelude::SliceRandom;
62use rand::rngs::OsRng;
63use rand::thread_rng;
64use std::fmt::Debug;
65use std::os::raw::c_int as RawFd;
66use std::path::Path;
67use std::sync::Arc;
68use time::OffsetDateTime;
69use tokio::sync::mpsc::Sender;
70use url::Url;
71
72#[cfg(target_arch = "wasm32")]
73#[cfg(debug_assertions)]
74use nym_wasm_utils::console_log;
75
76const DEFAULT_NYM_API_RETRIES: usize = 3;
79
80#[cfg(all(
81 not(target_arch = "wasm32"),
82 feature = "fs-surb-storage",
83 feature = "fs-gateways-storage"
84))]
85pub mod non_wasm_helpers;
86
87pub mod helpers;
88pub mod storage;
89
90#[derive(Clone, Copy, Debug)]
91pub enum MixnetClientEvent {
92 Traffic(MixTrafficEvent),
93}
94
95pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
96#[derive(Clone)]
97pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
98
99impl EventSender {
100 pub fn send(&self, event: MixnetClientEvent) {
101 if let Err(err) = self.0.unbounded_send(event) {
102 tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
103 }
104 }
105}
106
107#[derive(Clone)]
108pub struct ClientInput {
109 pub connection_command_sender: ConnectionCommandSender,
110 pub input_sender: InputMessageSender,
111 pub client_request_sender: ClientRequestSender,
112}
113
114impl ClientInput {
115 pub async fn send(
116 &self,
117 message: InputMessage,
118 ) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
119 self.input_sender.send(message).await
120 }
121}
122
123pub struct ClientOutput {
124 pub received_buffer_request_sender: ReceivedBufferRequestSender,
125}
126
127impl ClientOutput {
128 pub fn register_receiver(
129 &mut self,
130 ) -> Result<mpsc::UnboundedReceiver<Vec<ReconstructedMessage>>, ClientCoreError> {
131 let (reconstructed_sender, reconstructed_receiver) = mpsc::unbounded();
132
133 self.received_buffer_request_sender
134 .unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
135 reconstructed_sender,
136 ))
137 .map_err(|_| ClientCoreError::FailedToRegisterReceiver)?;
138
139 Ok(reconstructed_receiver)
140 }
141}
142
143#[derive(Clone, Debug)]
144pub struct ClientState {
145 pub shared_lane_queue_lengths: LaneQueueLengths,
146 pub reply_controller_sender: ReplyControllerSender,
147 pub topology_accessor: TopologyAccessor,
148 pub gateway_connection: GatewayConnection,
149}
150
151#[derive(Clone, Copy, Debug)]
152pub struct GatewayConnection {
153 pub gateway_ws_fd: Option<RawFd>,
154}
155
156pub enum ClientInputStatus {
157 AwaitingProducer { client_input: ClientInput },
158 Connected,
159}
160
161impl ClientInputStatus {
162 #[allow(clippy::panic)]
163 pub fn register_producer(&mut self) -> ClientInput {
164 match std::mem::replace(self, ClientInputStatus::Connected) {
165 ClientInputStatus::AwaitingProducer { client_input } => client_input,
166 ClientInputStatus::Connected => panic!("producer was already registered before"),
168 }
169 }
170}
171
172pub enum ClientOutputStatus {
173 AwaitingConsumer { client_output: ClientOutput },
174 Connected,
175}
176
177impl ClientOutputStatus {
178 #[allow(clippy::panic)]
179 pub fn register_consumer(&mut self) -> ClientOutput {
180 match std::mem::replace(self, ClientOutputStatus::Connected) {
181 ClientOutputStatus::AwaitingConsumer { client_output } => client_output,
182 ClientOutputStatus::Connected => panic!("consumer was already registered before"),
184 }
185 }
186}
187
188#[derive(Copy, Clone, PartialEq, Eq)]
189pub enum CredentialsToggle {
190 Enabled,
191 Disabled,
192}
193
194impl CredentialsToggle {
195 pub fn is_enabled(&self) -> bool {
196 self == &CredentialsToggle::Enabled
197 }
198
199 pub fn is_disabled(&self) -> bool {
200 self == &CredentialsToggle::Disabled
201 }
202}
203
204impl From<bool> for CredentialsToggle {
205 fn from(value: bool) -> Self {
206 if value {
207 CredentialsToggle::Enabled
208 } else {
209 CredentialsToggle::Disabled
210 }
211 }
212}
213
214pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
215 config: Config,
216 client_store: S,
217 dkg_query_client: Option<C>,
218
219 nym_api_urls: Option<Vec<nym_network_defaults::ApiUrl>>,
221
222 wait_for_gateway: bool,
223 custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
224 custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
225 shutdown: Option<ShutdownTracker>,
226 event_tx: Option<EventSender>,
227 user_agent: Option<UserAgent>,
228
229 setup_method: GatewaySetup,
230
231 #[cfg(unix)]
232 connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
233
234 derivation_material: Option<DerivationMaterial>,
235}
236
237impl<C, S> BaseClientBuilder<C, S>
238where
239 S: MixnetClientStorage + 'static,
240 C: DkgQueryClient + Send + Sync + 'static,
241{
242 pub fn new(
243 base_config: Config,
244 client_store: S,
245 dkg_query_client: Option<C>,
246 ) -> BaseClientBuilder<C, S> {
247 BaseClientBuilder {
248 config: base_config,
249 client_store,
250 dkg_query_client,
251 nym_api_urls: None,
252 wait_for_gateway: false,
253 custom_topology_provider: None,
254 custom_gateway_transceiver: None,
255 shutdown: None,
256 event_tx: None,
257 user_agent: None,
258 setup_method: GatewaySetup::MustLoad { gateway_id: None },
259 #[cfg(unix)]
260 connection_fd_callback: None,
261 derivation_material: None,
262 }
263 }
264
265 #[must_use]
266 pub fn with_derivation_material(
267 mut self,
268 derivation_material: Option<DerivationMaterial>,
269 ) -> Self {
270 self.derivation_material = derivation_material;
271 self
272 }
273
274 #[must_use]
279 pub fn with_nym_api_urls(mut self, nym_api_urls: Vec<nym_network_defaults::ApiUrl>) -> Self {
280 self.nym_api_urls = Some(nym_api_urls);
281 self
282 }
283
284 #[must_use]
285 pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
286 self.config.debug.forget_me = *forget_me;
287 self
288 }
289
290 #[must_use]
291 pub fn with_remember_me(mut self, remember_me: &RememberMe) -> Self {
292 self.config.debug.remember_me = *remember_me;
293 self
294 }
295
296 #[must_use]
297 pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
298 self.setup_method = setup;
299 self
300 }
301
302 #[must_use]
303 pub fn with_wait_for_gateway(mut self, wait_for_gateway: bool) -> Self {
304 self.wait_for_gateway = wait_for_gateway;
305 self
306 }
307
308 #[must_use]
309 pub fn with_topology_provider(
310 mut self,
311 provider: Box<dyn TopologyProvider + Send + Sync>,
312 ) -> Self {
313 self.custom_topology_provider = Some(provider);
314 self
315 }
316
317 #[must_use]
318 pub fn with_gateway_transceiver(mut self, sender: Box<dyn GatewayTransceiver + Send>) -> Self {
319 self.custom_gateway_transceiver = Some(sender);
320 self
321 }
322
323 #[must_use]
324 pub fn with_shutdown(mut self, shutdown: ShutdownTracker) -> Self {
325 self.shutdown = Some(shutdown);
326 self
327 }
328
329 #[must_use]
330 pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
331 self.event_tx = Some(event_tx);
332 self
333 }
334
335 #[must_use]
336 pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
337 self.user_agent = Some(user_agent);
338 self
339 }
340
341 pub fn with_stored_topology<P: AsRef<Path>>(
342 mut self,
343 file: P,
344 ) -> Result<Self, ClientCoreError> {
345 self.custom_topology_provider =
346 Some(Box::new(HardcodedTopologyProvider::new_from_file(file)?));
347 Ok(self)
348 }
349
350 #[cfg(unix)]
351 pub fn with_connection_fd_callback(
352 mut self,
353 callback: Arc<dyn Fn(RawFd) + Send + Sync>,
354 ) -> Self {
355 self.connection_fd_callback = Some(callback);
356 self
357 }
358
359 fn mix_address(details: &InitialisationResult) -> Recipient {
362 details.client_address()
363 }
364
365 fn start_event_control(
366 parent_event_tx: Option<EventSender>,
367 children_event_rx: EventReceiver,
368 shutdown_tracker: &ShutdownTracker,
369 ) {
370 let event_control = EventControl::new(parent_event_tx, children_event_rx);
371 shutdown_tracker.try_spawn_named_with_shutdown(
372 async move { event_control.run().await },
373 "EventControl",
374 );
375 }
376
377 fn start_cover_traffic_stream(
380 debug_config: &DebugConfig,
381 ack_key: Arc<AckKey>,
382 self_address: Recipient,
383 topology_accessor: TopologyAccessor,
384 mix_tx: BatchMixMessageSender,
385 stats_tx: ClientStatsSender,
386 shutdown_tracker: &ShutdownTracker,
387 ) {
388 tracing::info!("Starting loop cover traffic stream...");
389
390 let mut stream = LoopCoverTrafficStream::new(
391 ack_key,
392 debug_config.acknowledgements.average_ack_delay,
393 mix_tx,
394 self_address,
395 topology_accessor,
396 debug_config.traffic,
397 debug_config.cover_traffic,
398 stats_tx,
399 );
400 shutdown_tracker
401 .try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
402 }
403
404 #[allow(clippy::too_many_arguments)]
405 fn start_real_traffic_controller(
406 controller_config: real_messages_control::Config,
407 key_rotation_config: KeyRotationConfig,
408 topology_accessor: TopologyAccessor,
409 ack_receiver: AcknowledgementReceiver,
410 input_receiver: InputMessageReceiver,
411 mix_sender: BatchMixMessageSender,
412 reply_storage: CombinedReplyStorage,
413 reply_controller_sender: ReplyControllerSender,
414 reply_controller_receiver: ReplyControllerReceiver,
415 lane_queue_lengths: LaneQueueLengths,
416 client_connection_rx: ConnectionCommandReceiver,
417 stats_tx: ClientStatsSender,
418 shutdown_tracker: &ShutdownTracker,
419 ) {
420 tracing::info!("Starting real traffic stream...");
421
422 let real_messages_controller = RealMessagesController::new(
423 controller_config,
424 key_rotation_config,
425 ack_receiver,
426 input_receiver,
427 mix_sender,
428 topology_accessor,
429 reply_storage,
430 reply_controller_sender,
431 reply_controller_receiver,
432 lane_queue_lengths,
433 client_connection_rx,
434 stats_tx,
435 shutdown_tracker.clone_shutdown_token(),
436 );
437
438 let (mut out_queue_control, mut reply_controller, ack_controller) =
440 real_messages_controller.into_tasks();
441 let (
442 mut ack_listener,
443 mut input_listener,
444 mut retransmission_listener,
445 mut sent_notification_listener,
446 mut ack_action_controller,
447 ) = ack_controller.into_tasks();
448
449 shutdown_tracker.try_spawn_named(
450 async move { out_queue_control.run().await },
451 "RealMessagesController::OutQueueControl",
452 );
453
454 let shutdown_token = shutdown_tracker.clone_shutdown_token();
455 shutdown_tracker.try_spawn_named(
456 async move { reply_controller.run(shutdown_token).await },
457 "RealMessagesController::ReplyController",
458 );
459
460 let shutdown_token = shutdown_tracker.clone_shutdown_token();
461 shutdown_tracker.try_spawn_named(
462 async move { ack_listener.run(shutdown_token).await },
463 "AcknowledgementController::AcknowledgementListener",
464 );
465
466 let shutdown_token = shutdown_tracker.clone_shutdown_token();
467 shutdown_tracker.try_spawn_named(
468 async move { input_listener.run(shutdown_token).await },
469 "AcknowledgementController::InputMessageListener",
470 );
471
472 let shutdown_token = shutdown_tracker.clone_shutdown_token();
473 shutdown_tracker.try_spawn_named(
474 async move { retransmission_listener.run(shutdown_token).await },
475 "AcknowledgementController::RetransmissionRequestListener",
476 );
477
478 shutdown_tracker.try_spawn_named_with_shutdown(
479 async move {
480 sent_notification_listener.run().await;
481 },
482 "AcknowledgementController::SentNotificationListener",
483 );
484
485 let shutdown_token = shutdown_tracker.clone_shutdown_token();
486 shutdown_tracker.try_spawn_named(
487 async move { ack_action_controller.run(shutdown_token).await },
488 "AcknowledgementController::ActionController",
489 );
490
491 }
493
494 fn start_received_messages_buffer_controller(
497 local_encryption_keypair: Arc<x25519::KeyPair>,
498 query_receiver: ReceivedBufferRequestReceiver,
499 mixnet_receiver: MixnetMessageReceiver,
500 reply_key_storage: SentReplyKeys,
501 reply_controller_sender: ReplyControllerSender,
502 metrics_reporter: ClientStatsSender,
503 shutdown_tracker: &ShutdownTracker,
504 ) {
505 tracing::info!("Starting received messages buffer controller...");
506 let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
507 local_encryption_keypair,
508 query_receiver,
509 mixnet_receiver,
510 reply_key_storage,
511 reply_controller_sender,
512 metrics_reporter,
513 shutdown_tracker.clone_shutdown_token(),
514 );
515 let (mut msg_receiver, mut req_receiver) = controller.into_tasks();
516
517 shutdown_tracker.try_spawn_named(
518 async move { msg_receiver.run().await },
519 "ReceivedMessagesBufferController::FragmentedMessageReceiver",
520 );
521 shutdown_tracker.try_spawn_named(
522 async move { req_receiver.run().await },
523 "ReceivedMessagesBufferController::RequestReceiver",
524 );
525 }
526
527 #[allow(clippy::too_many_arguments)]
528 async fn start_gateway_client(
529 config: &Config,
530 initialisation_result: InitialisationResult,
531 bandwidth_controller: Option<BandwidthController<C, S::CredentialStore>>,
532 packet_router: PacketRouter,
533 stats_reporter: ClientStatsSender,
534 #[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
535 shutdown_tracker: &ShutdownTracker,
536 ) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
537 where
538 <S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
539 <S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
540 <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
541 {
542 let managed_keys = initialisation_result.client_keys;
543 let GatewayDetails::Remote(details) = initialisation_result.gateway_registration.details
544 else {
545 return Err(ClientCoreError::UnexpectedPersistedCustomGatewayDetails);
546 };
547
548 let mut gateway_client =
549 if let Some(existing_client) = initialisation_result.authenticated_ephemeral_client {
550 existing_client.upgrade(
551 packet_router,
552 bandwidth_controller,
553 stats_reporter,
554 shutdown_tracker.clone_shutdown_token(),
555 )
556 } else {
557 let cfg = GatewayConfig::new(details.gateway_id, details.published_data.listeners);
558 GatewayClient::new(
559 GatewayClientConfig::new_default()
560 .with_disabled_credentials_mode(config.client.disabled_credentials_mode)
561 .with_response_timeout(
562 config.debug.gateway_connection.gateway_response_timeout,
563 ),
564 cfg,
565 managed_keys.identity_keypair(),
566 Some(details.shared_key),
567 packet_router,
568 bandwidth_controller,
569 stats_reporter,
570 #[cfg(unix)]
571 connection_fd_callback,
572 shutdown_tracker.clone_shutdown_token(),
573 )
574 };
575
576 let gateway_failure = |err| {
577 tracing::error!("Could not authenticate and start up the gateway connection - {err}");
578 ClientCoreError::GatewayClientError {
579 gateway_id: details.gateway_id.to_base58_string(),
580 source: Box::new(err),
581 }
582 };
583
584 let _ = gateway_client
590 .perform_initial_authentication()
591 .await
592 .map_err(gateway_failure)?;
593
594 gateway_client
595 .claim_initial_bandwidth()
596 .await
597 .map_err(gateway_failure)?;
598
599 gateway_client
600 .start_listening_for_mixnet_messages()
601 .map_err(gateway_failure)?;
602
603 Ok(gateway_client)
604 }
605
606 #[allow(clippy::too_many_arguments)]
607 async fn setup_gateway_transceiver(
608 custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
609 config: &Config,
610 initialisation_result: InitialisationResult,
611 bandwidth_controller: Option<BandwidthController<C, S::CredentialStore>>,
612 packet_router: PacketRouter,
613 stats_reporter: ClientStatsSender,
614 #[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
615 shutdown_tracker: &ShutdownTracker,
616 ) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
617 where
618 <S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
619 <S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
620 <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
621 {
622 if let Some(mut custom_gateway_transceiver) = custom_gateway_transceiver {
624 return if !initialisation_result
625 .gateway_registration
626 .details
627 .is_custom()
628 {
629 Err(ClientCoreError::CustomGatewaySelectionExpected)
630 } else {
631 custom_gateway_transceiver.set_packet_router(packet_router)?;
633 Ok(custom_gateway_transceiver)
634 };
635 }
636
637 let gateway_client = Self::start_gateway_client(
639 config,
640 initialisation_result,
641 bandwidth_controller,
642 packet_router,
643 stats_reporter,
644 #[cfg(unix)]
645 connection_fd_callback,
646 shutdown_tracker,
647 )
648 .await?;
649
650 Ok(Box::new(RemoteGateway::new(gateway_client)))
651 }
652
653 fn setup_topology_provider(
654 custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
655 config_topology: config::Topology,
656 nym_api_urls: Vec<Url>,
657 nym_api_client: nym_http_api_client::Client,
658 ) -> Box<dyn TopologyProvider + Send + Sync> {
659 custom_provider.unwrap_or_else(|| {
661 Box::new(NymApiTopologyProvider::new(
662 config_topology,
663 nym_api_urls,
664 nym_api_client,
665 ))
666 })
667 }
668
669 async fn start_topology_refresher(
672 topology_provider: Box<dyn TopologyProvider + Send + Sync>,
673 topology_config: config::Topology,
674 topology_accessor: TopologyAccessor,
675 local_gateway: NodeIdentity,
676 wait_for_gateway: bool,
677 shutdown_tracker: &ShutdownTracker,
678 ) -> Result<(), ClientCoreError> {
679 let topology_refresher_config =
680 TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
681
682 if topology_config.disable_refreshing {
683 tracing::info!("The background topology refresher is not going to be started");
685 }
686
687 let mut topology_refresher = TopologyRefresher::new(
688 topology_refresher_config,
689 topology_accessor,
690 topology_provider,
691 );
692 tracing::info!("Obtaining initial network topology");
695 topology_refresher.try_refresh().await;
696
697 if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
698 tracing::error!(
699 "The current network topology seem to be insufficient to route any packets through \
700 - check if enough nodes and a gateway are online - source: {err}"
701 );
702 return Err(ClientCoreError::InsufficientNetworkTopology(err));
703 }
704
705 let gateway_wait_timeout = if wait_for_gateway {
706 Some(topology_config.max_startup_gateway_waiting_period)
707 } else {
708 None
709 };
710
711 if let Err(err) = topology_refresher
712 .ensure_contains_routable_egress(local_gateway)
713 .await
714 {
715 if let Some(waiting_timeout) = gateway_wait_timeout {
716 if let Err(err) = topology_refresher
717 .wait_for_gateway(local_gateway, waiting_timeout)
718 .await
719 {
720 tracing::error!(
721 "the gateway did not come back online within the specified timeout: {err}"
722 );
723 return Err(err.into());
724 }
725 } else {
726 tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
727 return Err(err.into());
728 }
729 }
730
731 if !topology_config.disable_refreshing {
732 tracing::info!("Starting topology refresher...");
735 shutdown_tracker.try_spawn_named_with_shutdown(
736 async move { topology_refresher.run().await },
737 "TopologyRefresher",
738 );
739 }
740
741 Ok(())
742 }
743
744 fn start_statistics_control(
745 config: &Config,
746 user_agent: Option<UserAgent>,
747 client_stats_id: String,
748 input_sender: Sender<InputMessage>,
749 shutdown_tracker: &ShutdownTracker,
750 ) -> ClientStatsSender {
751 tracing::info!("Starting statistics control...");
752 StatisticsControl::create_and_start(
753 config.debug.stats_reporting,
754 user_agent
755 .map(|u| u.application)
756 .unwrap_or("unknown".to_string()),
757 client_stats_id,
758 input_sender.clone(),
759 shutdown_tracker,
760 )
761 }
762
763 fn start_mix_traffic_controller(
764 gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
765 shutdown_tracker: &ShutdownTracker,
766 event_tx: EventSender,
767 ) -> (BatchMixMessageSender, ClientRequestSender) {
768 tracing::info!("Starting mix traffic controller...");
769 let mut mix_traffic_controller = MixTrafficController::new(
770 gateway_transceiver,
771 shutdown_tracker.clone_shutdown_token(),
772 event_tx,
773 );
774
775 let mix_tx = mix_traffic_controller.mix_tx();
776 let client_tx = mix_traffic_controller.client_tx();
777
778 shutdown_tracker.try_spawn_named(
779 async move { mix_traffic_controller.run().await },
780 "MixTrafficController",
781 );
782
783 (mix_tx, client_tx)
784 }
785
786 async fn setup_persistent_reply_storage(
788 backend: S::ReplyStore,
789 key_rotation_config: KeyRotationConfig,
790 shutdown_tracker: &ShutdownTracker,
791 ) -> Result<CombinedReplyStorage, ClientCoreError>
792 where
793 <S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
794 S::ReplyStore: Send + Sync,
795 {
796 tracing::trace!("Setup persistent reply storage");
797 let now = OffsetDateTime::now_utc();
798 let expected_current_key_rotation_start =
799 key_rotation_config.expected_current_key_rotation_start(now);
800 let prior_epoch_start =
804 expected_current_key_rotation_start - key_rotation_config.epoch_duration;
805
806 let persistent_storage = PersistentReplyStorage::new(backend);
807 let mem_store = persistent_storage
808 .load_state_from_backend(prior_epoch_start)
809 .await
810 .map_err(|err| ClientCoreError::SurbStorageError {
811 source: Box::new(err),
812 })?;
813
814 let store_clone = mem_store.clone();
815 let shutdown_token = shutdown_tracker.clone_shutdown_token();
816 shutdown_tracker.try_spawn_named(
817 async move {
818 persistent_storage
819 .flush_on_shutdown(store_clone, shutdown_token)
820 .await
821 },
822 "PersistentReplyStorage::flush_on_shutdown",
823 );
824
825 Ok(mem_store)
826 }
827
828 async fn initialise_keys_and_gateway(
829 setup_method: GatewaySetup,
830 key_store: &S::KeyStore,
831 details_store: &S::GatewaysDetailsStore,
832 derivation_material: Option<DerivationMaterial>,
833 ) -> Result<InitialisationResult, ClientCoreError>
834 where
835 <S::KeyStore as KeyStore>::StorageError: Sync + Send,
836 <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
837 {
838 if key_store.load_keys().await.is_err() {
840 tracing::info!("could not find valid client keys - a new set will be generated");
841 let mut rng = OsRng;
842 let keys = if let Some(derivation_material) = derivation_material {
843 ClientKeys::from_master_key(&mut rng, &derivation_material)
844 .map_err(|_| ClientCoreError::HkdfDerivationError)?
845 } else {
846 ClientKeys::generate_new(&mut rng)
847 };
848 store_client_keys(keys, key_store).await?;
849 }
850
851 setup_gateway(setup_method, key_store, details_store).await
852 }
853
854 fn construct_nym_api_client(
855 nym_api_urls: Option<&Vec<nym_network_defaults::ApiUrl>>,
856 config: &Config,
857 user_agent: Option<UserAgent>,
858 ) -> Result<nym_http_api_client::Client, ClientCoreError> {
859 tracing::debug!(
860 "construct_nym_api_client called with nym_api_urls: {}",
861 nym_api_urls.is_some()
862 );
863
864 if let Some(nym_api_urls) = nym_api_urls {
866 if nym_api_urls.is_empty() {
867 tracing::warn!("Provided nym_api_urls is empty, falling back to config endpoints");
868 } else {
869 tracing::info!(
870 "Building nym-api client from provided URLs (with domain fronting support): {} URLs",
871 nym_api_urls.len()
872 );
873
874 let mut builder =
875 nym_http_api_client::ClientBuilder::new_with_fronted_urls(nym_api_urls.clone())
876 .map_err(ClientCoreError::from)?
877 .with_retries(DEFAULT_NYM_API_RETRIES);
878
879 if let Some(user_agent) = user_agent {
880 builder = builder.with_user_agent(user_agent);
881 }
882
883 return builder.build().map_err(ClientCoreError::from);
884 }
885 }
886
887 tracing::debug!("Building basic nym-api HTTP client from config endpoints");
889
890 let mut nym_api_urls = config.get_nym_api_endpoints();
891 if nym_api_urls.is_empty() {
892 tracing::warn!("No API endpoints configured in config, this may cause issues");
893 }
894 nym_api_urls.shuffle(&mut thread_rng());
895
896 let api_urls: Vec<nym_network_defaults::ApiUrl> = nym_api_urls
898 .into_iter()
899 .map(|url| nym_network_defaults::ApiUrl {
900 url: url.to_string(),
901 front_hosts: None,
902 })
903 .collect();
904
905 tracing::debug!("Using {} config API endpoints", api_urls.len());
906
907 let mut builder = nym_http_api_client::ClientBuilder::new_with_fronted_urls(api_urls)
908 .map_err(ClientCoreError::from)?
909 .with_retries(DEFAULT_NYM_API_RETRIES)
910 .with_bincode();
911
912 if let Some(user_agent) = user_agent {
913 builder = builder.with_user_agent(user_agent);
914 }
915
916 builder.build().map_err(ClientCoreError::from)
917 }
918
919 async fn determine_key_rotation_state(
920 client: &nym_http_api_client::Client,
921 ) -> Result<KeyRotationConfig, ClientCoreError> {
922 Ok(client.get_key_rotation_info().await?.into())
923 }
924
925 pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
926 where
927 S::ReplyStore: Send + Sync,
928 <S::KeyStore as KeyStore>::StorageError: Send + Sync,
929 <S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
930 <S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
931 <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
932 {
933 tracing::info!("Starting nym client");
934 #[cfg(debug_assertions)]
935 #[cfg(target_arch = "wasm32")]
936 {
937 console_log!("Starting base Nym Client");
938 }
939
940 let init_res = Self::initialise_keys_and_gateway(
942 self.setup_method,
943 self.client_store.key_store(),
944 self.client_store.gateway_details_store(),
945 self.derivation_material,
946 )
947 .await?;
948
949 let (reply_storage_backend, credential_store, _) = self.client_store.into_runtime_stores();
950
951 let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
959
960 let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
962
963 let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
965
966 let (event_sender, event_receiver) = mpsc::unbounded();
968
969 let (ack_sender, ack_receiver) = mpsc::unbounded();
971 let shared_topology_accessor =
972 TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
973
974 let shutdown_tracker = match self.shutdown {
977 Some(parent_tracker) => parent_tracker.clone(),
978 None => nym_task::create_sdk_shutdown_tracker()?,
979 };
980
981 Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
982
983 let (reply_controller_sender, reply_controller_receiver) =
985 reply_controller::requests::new_control_channels();
986
987 let self_address = Self::mix_address(&init_res);
988 let ack_key = init_res.client_keys.ack_key();
989 let encryption_keys = init_res.client_keys.encryption_keypair();
990 let identity_keys = init_res.client_keys.identity_keypair();
991
992 let bandwidth_controller = self
995 .dkg_query_client
996 .map(|client| BandwidthController::new(credential_store, client));
997
998 let nym_api_client = Self::construct_nym_api_client(
999 self.nym_api_urls.as_ref(),
1000 &self.config,
1001 self.user_agent.clone(),
1002 )?;
1003 let key_rotation_config = Self::determine_key_rotation_state(&nym_api_client).await?;
1004
1005 let topology_provider = Self::setup_topology_provider(
1006 self.custom_topology_provider.take(),
1007 self.config.debug.topology,
1008 self.config.get_nym_api_endpoints(),
1009 nym_api_client,
1010 );
1011
1012 let stats_reporter = Self::start_statistics_control(
1013 &self.config,
1014 self.user_agent.clone(),
1015 generate_client_stats_id(*self_address.identity()),
1016 input_sender.clone(),
1017 &shutdown_tracker.clone(),
1018 );
1019
1020 Self::start_topology_refresher(
1022 topology_provider,
1023 self.config.debug.topology,
1024 shared_topology_accessor.clone(),
1025 self_address.gateway(),
1026 self.wait_for_gateway,
1027 &shutdown_tracker.clone(),
1028 )
1029 .await?;
1030
1031 let gateway_packet_router = PacketRouter::new(
1032 ack_sender,
1033 mixnet_messages_sender,
1034 shutdown_tracker.clone_shutdown_token(),
1035 );
1036
1037 let gateway_transceiver = Self::setup_gateway_transceiver(
1038 self.custom_gateway_transceiver,
1039 &self.config,
1040 init_res,
1041 bandwidth_controller,
1042 gateway_packet_router,
1043 stats_reporter.clone(),
1044 #[cfg(unix)]
1045 self.connection_fd_callback,
1046 &shutdown_tracker.clone(),
1047 )
1048 .await?;
1049 let gateway_ws_fd = gateway_transceiver.ws_fd();
1050
1051 let reply_storage = Self::setup_persistent_reply_storage(
1052 reply_storage_backend,
1053 key_rotation_config,
1054 &shutdown_tracker.clone(),
1055 )
1056 .await?;
1057
1058 Self::start_received_messages_buffer_controller(
1059 encryption_keys,
1060 received_buffer_request_receiver,
1061 mixnet_messages_receiver,
1062 reply_storage.key_storage(),
1063 reply_controller_sender.clone(),
1064 stats_reporter.clone(),
1065 &shutdown_tracker.clone(),
1066 );
1067
1068 let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
1074 gateway_transceiver,
1075 &shutdown_tracker.clone(),
1076 EventSender(event_sender),
1077 );
1078
1079 let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
1082
1083 let shared_lane_queue_lengths = LaneQueueLengths::new();
1086
1087 let controller_config = real_messages_control::Config::new(
1088 &self.config.debug,
1089 Arc::clone(&ack_key),
1090 self_address,
1091 );
1092
1093 Self::start_real_traffic_controller(
1094 controller_config,
1095 key_rotation_config,
1096 shared_topology_accessor.clone(),
1097 ack_receiver,
1098 input_receiver,
1099 message_sender.clone(),
1100 reply_storage,
1101 reply_controller_sender.clone(),
1102 reply_controller_receiver,
1103 shared_lane_queue_lengths.clone(),
1104 client_connection_rx,
1105 stats_reporter.clone(),
1106 &shutdown_tracker.clone(),
1107 );
1108
1109 if !self
1110 .config
1111 .debug
1112 .cover_traffic
1113 .disable_loop_cover_traffic_stream
1114 {
1115 Self::start_cover_traffic_stream(
1116 &self.config.debug,
1117 ack_key,
1118 self_address,
1119 shared_topology_accessor.clone(),
1120 message_sender,
1121 stats_reporter.clone(),
1122 &shutdown_tracker.clone(),
1123 );
1124 }
1125
1126 tracing::debug!("Core client startup finished!");
1127 tracing::debug!("The address of this client is: {self_address}");
1128
1129 #[cfg(debug_assertions)]
1130 #[cfg(target_arch = "wasm32")]
1131 {
1132 console_log!("Core client startup finished!");
1133 console_log!("Rust::start_base: the address of this client is: {self_address}");
1134 }
1135
1136 Ok(BaseClient {
1137 address: self_address,
1138 identity_keys,
1139 client_input: ClientInputStatus::AwaitingProducer {
1140 client_input: ClientInput {
1141 connection_command_sender: client_connection_tx,
1142 input_sender,
1143 client_request_sender,
1144 },
1145 },
1146 client_output: ClientOutputStatus::AwaitingConsumer {
1147 client_output: ClientOutput {
1148 received_buffer_request_sender,
1149 },
1150 },
1151 client_state: ClientState {
1152 shared_lane_queue_lengths,
1153 reply_controller_sender,
1154 topology_accessor: shared_topology_accessor,
1155 gateway_connection: GatewayConnection { gateway_ws_fd },
1156 },
1157 stats_reporter,
1158 shutdown_handle: shutdown_tracker, forget_me: self.config.debug.forget_me,
1160 remember_me: self.config.debug.remember_me,
1161 })
1162 }
1163}
1164
1165pub struct BaseClient {
1166 pub address: Recipient,
1167 pub identity_keys: Arc<ed25519::KeyPair>,
1168 pub client_input: ClientInputStatus,
1169 pub client_output: ClientOutputStatus,
1170 pub client_state: ClientState,
1171 pub stats_reporter: ClientStatsSender,
1172 pub shutdown_handle: ShutdownTracker,
1173 pub forget_me: ForgetMe,
1174 pub remember_me: RememberMe,
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use super::*;
1180 use nym_network_defaults::{ApiUrl, NymNetworkDetails};
1181
1182 #[test]
1183 fn test_network_details_with_multiple_urls() {
1184 let mut network_details = NymNetworkDetails::new_empty();
1186 network_details.nym_api_urls = Some(vec![
1187 ApiUrl {
1188 url: "https://validator.nymtech.net/api/".to_string(),
1189 front_hosts: None,
1190 },
1191 ApiUrl {
1192 url: "https://nym-frontdoor.vercel.app/api/".to_string(),
1193 front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
1194 },
1195 ]);
1196
1197 assert_eq!(network_details.nym_api_urls.as_ref().unwrap().len(), 2);
1198 assert!(network_details.nym_api_urls.as_ref().unwrap()[1]
1199 .front_hosts
1200 .is_some());
1201 }
1202
1203 #[test]
1204 fn test_network_details_with_front_hosts() {
1205 let api_url = ApiUrl {
1207 url: "https://nym-frontdoor.vercel.app/api/".to_string(),
1208 front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
1209 };
1210
1211 assert_eq!(api_url.url, "https://nym-frontdoor.vercel.app/api/");
1212 assert_eq!(api_url.front_hosts.as_ref().unwrap().len(), 2);
1213 assert!(api_url
1214 .front_hosts
1215 .as_ref()
1216 .unwrap()
1217 .contains(&"vercel.app".to_string()));
1218 }
1219
1220 #[test]
1221 fn test_default_nym_api_retries_constant() {
1222 assert_eq!(DEFAULT_NYM_API_RETRIES, 3);
1224 }
1225}