nym_client_core/client/base_client/
mod.rs

1// Copyright 2022-2023 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use super::mix_traffic::ClientRequestSender;
5use super::received_buffer::ReceivedBufferMessage;
6use super::statistics_control::StatisticsControl;
7use crate::client::base_client::storage::helpers::store_client_keys;
8use crate::client::base_client::storage::MixnetClientStorage;
9use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
10use crate::client::event_control::EventControl;
11use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
12use crate::client::key_manager::persistence::KeyStore;
13use crate::client::key_manager::ClientKeys;
14use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
15use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
16use crate::client::real_messages_control;
17use crate::client::real_messages_control::RealMessagesController;
18use crate::client::received_buffer::{
19    ReceivedBufferRequestReceiver, ReceivedBufferRequestSender, ReceivedMessagesBufferController,
20};
21use crate::client::replies::reply_controller;
22use crate::client::replies::reply_controller::key_rotation_helpers::KeyRotationConfig;
23use crate::client::replies::reply_controller::{ReplyControllerReceiver, ReplyControllerSender};
24use crate::client::replies::reply_storage::{
25    CombinedReplyStorage, PersistentReplyStorage, ReplyStorageBackend, SentReplyKeys,
26};
27use crate::client::topology_control::nym_api_provider::NymApiTopologyProvider;
28use crate::client::topology_control::{
29    TopologyAccessor, TopologyRefresher, TopologyRefresherConfig,
30};
31use crate::config;
32use crate::config::{Config, DebugConfig};
33use crate::error::ClientCoreError;
34use crate::init::{
35    setup_gateway,
36    types::{GatewaySetup, InitialisationResult},
37};
38use futures::channel::mpsc;
39use nym_bandwidth_controller::BandwidthController;
40use nym_client_core_config_types::{ForgetMe, RememberMe};
41use nym_client_core_gateways_storage::{GatewayDetails, GatewaysDetailsStore};
42use nym_credential_storage::storage::Storage as CredentialStorage;
43use nym_crypto::asymmetric::{ed25519, x25519};
44use nym_crypto::hkdf::DerivationMaterial;
45use nym_gateway_client::client::config::GatewayClientConfig;
46use nym_gateway_client::{
47    AcknowledgementReceiver, GatewayClient, GatewayConfig, MixnetMessageReceiver, PacketRouter,
48};
49use nym_sphinx::acknowledgements::AckKey;
50use nym_sphinx::addressing::clients::Recipient;
51use nym_sphinx::addressing::nodes::NodeIdentity;
52use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
53use nym_statistics_common::clients::ClientStatsSender;
54use nym_statistics_common::generate_client_stats_id;
55use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
56use nym_task::ShutdownTracker;
57use nym_topology::provider_trait::TopologyProvider;
58use nym_topology::HardcodedTopologyProvider;
59use nym_validator_client::nym_api::NymApiClientExt;
60use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
61use rand::prelude::SliceRandom;
62use rand::rngs::OsRng;
63use rand::thread_rng;
64use std::fmt::Debug;
65use std::os::raw::c_int as RawFd;
66use std::path::Path;
67use std::sync::Arc;
68use time::OffsetDateTime;
69use tokio::sync::mpsc::Sender;
70use url::Url;
71
72#[cfg(target_arch = "wasm32")]
73#[cfg(debug_assertions)]
74use nym_wasm_utils::console_log;
75
76/// Default number of retries for Nym API requests when using network details with domain fronting.
77/// This allows the client to try alternative URLs if the primary endpoint is unavailable.
78const DEFAULT_NYM_API_RETRIES: usize = 3;
79
80#[cfg(all(
81    not(target_arch = "wasm32"),
82    feature = "fs-surb-storage",
83    feature = "fs-gateways-storage"
84))]
85pub mod non_wasm_helpers;
86
87pub mod helpers;
88pub mod storage;
89
90#[derive(Clone, Copy, Debug)]
91pub enum MixnetClientEvent {
92    Traffic(MixTrafficEvent),
93}
94
95pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
96#[derive(Clone)]
97pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
98
99impl EventSender {
100    pub fn send(&self, event: MixnetClientEvent) {
101        if let Err(err) = self.0.unbounded_send(event) {
102            tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
103        }
104    }
105}
106
107#[derive(Clone)]
108pub struct ClientInput {
109    pub connection_command_sender: ConnectionCommandSender,
110    pub input_sender: InputMessageSender,
111    pub client_request_sender: ClientRequestSender,
112}
113
114impl ClientInput {
115    pub async fn send(
116        &self,
117        message: InputMessage,
118    ) -> Result<(), tokio::sync::mpsc::error::SendError<InputMessage>> {
119        self.input_sender.send(message).await
120    }
121}
122
123pub struct ClientOutput {
124    pub received_buffer_request_sender: ReceivedBufferRequestSender,
125}
126
127impl ClientOutput {
128    pub fn register_receiver(
129        &mut self,
130    ) -> Result<mpsc::UnboundedReceiver<Vec<ReconstructedMessage>>, ClientCoreError> {
131        let (reconstructed_sender, reconstructed_receiver) = mpsc::unbounded();
132
133        self.received_buffer_request_sender
134            .unbounded_send(ReceivedBufferMessage::ReceiverAnnounce(
135                reconstructed_sender,
136            ))
137            .map_err(|_| ClientCoreError::FailedToRegisterReceiver)?;
138
139        Ok(reconstructed_receiver)
140    }
141}
142
143#[derive(Clone, Debug)]
144pub struct ClientState {
145    pub shared_lane_queue_lengths: LaneQueueLengths,
146    pub reply_controller_sender: ReplyControllerSender,
147    pub topology_accessor: TopologyAccessor,
148    pub gateway_connection: GatewayConnection,
149}
150
151#[derive(Clone, Copy, Debug)]
152pub struct GatewayConnection {
153    pub gateway_ws_fd: Option<RawFd>,
154}
155
156pub enum ClientInputStatus {
157    AwaitingProducer { client_input: ClientInput },
158    Connected,
159}
160
161impl ClientInputStatus {
162    #[allow(clippy::panic)]
163    pub fn register_producer(&mut self) -> ClientInput {
164        match std::mem::replace(self, ClientInputStatus::Connected) {
165            ClientInputStatus::AwaitingProducer { client_input } => client_input,
166            // critical failure implying misuse of software
167            ClientInputStatus::Connected => panic!("producer was already registered before"),
168        }
169    }
170}
171
172pub enum ClientOutputStatus {
173    AwaitingConsumer { client_output: ClientOutput },
174    Connected,
175}
176
177impl ClientOutputStatus {
178    #[allow(clippy::panic)]
179    pub fn register_consumer(&mut self) -> ClientOutput {
180        match std::mem::replace(self, ClientOutputStatus::Connected) {
181            ClientOutputStatus::AwaitingConsumer { client_output } => client_output,
182            // critical failure implying misuse of software
183            ClientOutputStatus::Connected => panic!("consumer was already registered before"),
184        }
185    }
186}
187
188#[derive(Copy, Clone, PartialEq, Eq)]
189pub enum CredentialsToggle {
190    Enabled,
191    Disabled,
192}
193
194impl CredentialsToggle {
195    pub fn is_enabled(&self) -> bool {
196        self == &CredentialsToggle::Enabled
197    }
198
199    pub fn is_disabled(&self) -> bool {
200        self == &CredentialsToggle::Disabled
201    }
202}
203
204impl From<bool> for CredentialsToggle {
205    fn from(value: bool) -> Self {
206        if value {
207            CredentialsToggle::Enabled
208        } else {
209            CredentialsToggle::Disabled
210        }
211    }
212}
213
214pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
215    config: Config,
216    client_store: S,
217    dkg_query_client: Option<C>,
218
219    // Optional API URLs for domain fronting support
220    nym_api_urls: Option<Vec<nym_network_defaults::ApiUrl>>,
221
222    wait_for_gateway: bool,
223    custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
224    custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
225    shutdown: Option<ShutdownTracker>,
226    event_tx: Option<EventSender>,
227    user_agent: Option<UserAgent>,
228
229    setup_method: GatewaySetup,
230
231    #[cfg(unix)]
232    connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
233
234    derivation_material: Option<DerivationMaterial>,
235}
236
237impl<C, S> BaseClientBuilder<C, S>
238where
239    S: MixnetClientStorage + 'static,
240    C: DkgQueryClient + Send + Sync + 'static,
241{
242    pub fn new(
243        base_config: Config,
244        client_store: S,
245        dkg_query_client: Option<C>,
246    ) -> BaseClientBuilder<C, S> {
247        BaseClientBuilder {
248            config: base_config,
249            client_store,
250            dkg_query_client,
251            nym_api_urls: None,
252            wait_for_gateway: false,
253            custom_topology_provider: None,
254            custom_gateway_transceiver: None,
255            shutdown: None,
256            event_tx: None,
257            user_agent: None,
258            setup_method: GatewaySetup::MustLoad { gateway_id: None },
259            #[cfg(unix)]
260            connection_fd_callback: None,
261            derivation_material: None,
262        }
263    }
264
265    #[must_use]
266    pub fn with_derivation_material(
267        mut self,
268        derivation_material: Option<DerivationMaterial>,
269    ) -> Self {
270        self.derivation_material = derivation_material;
271        self
272    }
273
274    /// Set Nym API URLs for domain fronting support.
275    ///
276    /// When provided, the client will use these API URLs (which include front_hosts)
277    /// to construct HTTP clients with domain fronting enabled.
278    #[must_use]
279    pub fn with_nym_api_urls(mut self, nym_api_urls: Vec<nym_network_defaults::ApiUrl>) -> Self {
280        self.nym_api_urls = Some(nym_api_urls);
281        self
282    }
283
284    #[must_use]
285    pub fn with_forget_me(mut self, forget_me: &ForgetMe) -> Self {
286        self.config.debug.forget_me = *forget_me;
287        self
288    }
289
290    #[must_use]
291    pub fn with_remember_me(mut self, remember_me: &RememberMe) -> Self {
292        self.config.debug.remember_me = *remember_me;
293        self
294    }
295
296    #[must_use]
297    pub fn with_gateway_setup(mut self, setup: GatewaySetup) -> Self {
298        self.setup_method = setup;
299        self
300    }
301
302    #[must_use]
303    pub fn with_wait_for_gateway(mut self, wait_for_gateway: bool) -> Self {
304        self.wait_for_gateway = wait_for_gateway;
305        self
306    }
307
308    #[must_use]
309    pub fn with_topology_provider(
310        mut self,
311        provider: Box<dyn TopologyProvider + Send + Sync>,
312    ) -> Self {
313        self.custom_topology_provider = Some(provider);
314        self
315    }
316
317    #[must_use]
318    pub fn with_gateway_transceiver(mut self, sender: Box<dyn GatewayTransceiver + Send>) -> Self {
319        self.custom_gateway_transceiver = Some(sender);
320        self
321    }
322
323    #[must_use]
324    pub fn with_shutdown(mut self, shutdown: ShutdownTracker) -> Self {
325        self.shutdown = Some(shutdown);
326        self
327    }
328
329    #[must_use]
330    pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
331        self.event_tx = Some(event_tx);
332        self
333    }
334
335    #[must_use]
336    pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
337        self.user_agent = Some(user_agent);
338        self
339    }
340
341    pub fn with_stored_topology<P: AsRef<Path>>(
342        mut self,
343        file: P,
344    ) -> Result<Self, ClientCoreError> {
345        self.custom_topology_provider =
346            Some(Box::new(HardcodedTopologyProvider::new_from_file(file)?));
347        Ok(self)
348    }
349
350    #[cfg(unix)]
351    pub fn with_connection_fd_callback(
352        mut self,
353        callback: Arc<dyn Fn(RawFd) + Send + Sync>,
354    ) -> Self {
355        self.connection_fd_callback = Some(callback);
356        self
357    }
358
359    // note: do **NOT** make this method public as its only valid usage is from within `start_base`
360    // because it relies on the crypto keys being already loaded
361    fn mix_address(details: &InitialisationResult) -> Recipient {
362        details.client_address()
363    }
364
365    fn start_event_control(
366        parent_event_tx: Option<EventSender>,
367        children_event_rx: EventReceiver,
368        shutdown_tracker: &ShutdownTracker,
369    ) {
370        let event_control = EventControl::new(parent_event_tx, children_event_rx);
371        shutdown_tracker.try_spawn_named_with_shutdown(
372            async move { event_control.run().await },
373            "EventControl",
374        );
375    }
376
377    // future constantly pumping loop cover traffic at some specified average rate
378    // the pumped traffic goes to the MixTrafficController
379    fn start_cover_traffic_stream(
380        debug_config: &DebugConfig,
381        ack_key: Arc<AckKey>,
382        self_address: Recipient,
383        topology_accessor: TopologyAccessor,
384        mix_tx: BatchMixMessageSender,
385        stats_tx: ClientStatsSender,
386        shutdown_tracker: &ShutdownTracker,
387    ) {
388        tracing::info!("Starting loop cover traffic stream...");
389
390        let mut stream = LoopCoverTrafficStream::new(
391            ack_key,
392            debug_config.acknowledgements.average_ack_delay,
393            mix_tx,
394            self_address,
395            topology_accessor,
396            debug_config.traffic,
397            debug_config.cover_traffic,
398            stats_tx,
399        );
400        shutdown_tracker
401            .try_spawn_named_with_shutdown(async move { stream.run().await }, "CoverTrafficStream");
402    }
403
404    #[allow(clippy::too_many_arguments)]
405    fn start_real_traffic_controller(
406        controller_config: real_messages_control::Config,
407        key_rotation_config: KeyRotationConfig,
408        topology_accessor: TopologyAccessor,
409        ack_receiver: AcknowledgementReceiver,
410        input_receiver: InputMessageReceiver,
411        mix_sender: BatchMixMessageSender,
412        reply_storage: CombinedReplyStorage,
413        reply_controller_sender: ReplyControllerSender,
414        reply_controller_receiver: ReplyControllerReceiver,
415        lane_queue_lengths: LaneQueueLengths,
416        client_connection_rx: ConnectionCommandReceiver,
417        stats_tx: ClientStatsSender,
418        shutdown_tracker: &ShutdownTracker,
419    ) {
420        tracing::info!("Starting real traffic stream...");
421
422        let real_messages_controller = RealMessagesController::new(
423            controller_config,
424            key_rotation_config,
425            ack_receiver,
426            input_receiver,
427            mix_sender,
428            topology_accessor,
429            reply_storage,
430            reply_controller_sender,
431            reply_controller_receiver,
432            lane_queue_lengths,
433            client_connection_rx,
434            stats_tx,
435            shutdown_tracker.clone_shutdown_token(),
436        );
437
438        // break out all the subtasks
439        let (mut out_queue_control, mut reply_controller, ack_controller) =
440            real_messages_controller.into_tasks();
441        let (
442            mut ack_listener,
443            mut input_listener,
444            mut retransmission_listener,
445            mut sent_notification_listener,
446            mut ack_action_controller,
447        ) = ack_controller.into_tasks();
448
449        shutdown_tracker.try_spawn_named(
450            async move { out_queue_control.run().await },
451            "RealMessagesController::OutQueueControl",
452        );
453
454        let shutdown_token = shutdown_tracker.clone_shutdown_token();
455        shutdown_tracker.try_spawn_named(
456            async move { reply_controller.run(shutdown_token).await },
457            "RealMessagesController::ReplyController",
458        );
459
460        let shutdown_token = shutdown_tracker.clone_shutdown_token();
461        shutdown_tracker.try_spawn_named(
462            async move { ack_listener.run(shutdown_token).await },
463            "AcknowledgementController::AcknowledgementListener",
464        );
465
466        let shutdown_token = shutdown_tracker.clone_shutdown_token();
467        shutdown_tracker.try_spawn_named(
468            async move { input_listener.run(shutdown_token).await },
469            "AcknowledgementController::InputMessageListener",
470        );
471
472        let shutdown_token = shutdown_tracker.clone_shutdown_token();
473        shutdown_tracker.try_spawn_named(
474            async move { retransmission_listener.run(shutdown_token).await },
475            "AcknowledgementController::RetransmissionRequestListener",
476        );
477
478        shutdown_tracker.try_spawn_named_with_shutdown(
479            async move {
480                sent_notification_listener.run().await;
481            },
482            "AcknowledgementController::SentNotificationListener",
483        );
484
485        let shutdown_token = shutdown_tracker.clone_shutdown_token();
486        shutdown_tracker.try_spawn_named(
487            async move { ack_action_controller.run(shutdown_token).await },
488            "AcknowledgementController::ActionController",
489        );
490
491        // .start(packet_type);
492    }
493
494    // buffer controlling all messages fetched from provider
495    // required so that other components would be able to use them (say the websocket)
496    fn start_received_messages_buffer_controller(
497        local_encryption_keypair: Arc<x25519::KeyPair>,
498        query_receiver: ReceivedBufferRequestReceiver,
499        mixnet_receiver: MixnetMessageReceiver,
500        reply_key_storage: SentReplyKeys,
501        reply_controller_sender: ReplyControllerSender,
502        metrics_reporter: ClientStatsSender,
503        shutdown_tracker: &ShutdownTracker,
504    ) {
505        tracing::info!("Starting received messages buffer controller...");
506        let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
507            local_encryption_keypair,
508            query_receiver,
509            mixnet_receiver,
510            reply_key_storage,
511            reply_controller_sender,
512            metrics_reporter,
513            shutdown_tracker.clone_shutdown_token(),
514        );
515        let (mut msg_receiver, mut req_receiver) = controller.into_tasks();
516
517        shutdown_tracker.try_spawn_named(
518            async move { msg_receiver.run().await },
519            "ReceivedMessagesBufferController::FragmentedMessageReceiver",
520        );
521        shutdown_tracker.try_spawn_named(
522            async move { req_receiver.run().await },
523            "ReceivedMessagesBufferController::RequestReceiver",
524        );
525    }
526
527    #[allow(clippy::too_many_arguments)]
528    async fn start_gateway_client(
529        config: &Config,
530        initialisation_result: InitialisationResult,
531        bandwidth_controller: Option<BandwidthController<C, S::CredentialStore>>,
532        packet_router: PacketRouter,
533        stats_reporter: ClientStatsSender,
534        #[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
535        shutdown_tracker: &ShutdownTracker,
536    ) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
537    where
538        <S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
539        <S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
540        <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
541    {
542        let managed_keys = initialisation_result.client_keys;
543        let GatewayDetails::Remote(details) = initialisation_result.gateway_registration.details
544        else {
545            return Err(ClientCoreError::UnexpectedPersistedCustomGatewayDetails);
546        };
547
548        let mut gateway_client =
549            if let Some(existing_client) = initialisation_result.authenticated_ephemeral_client {
550                existing_client.upgrade(
551                    packet_router,
552                    bandwidth_controller,
553                    stats_reporter,
554                    shutdown_tracker.clone_shutdown_token(),
555                )
556            } else {
557                let cfg = GatewayConfig::new(details.gateway_id, details.published_data.listeners);
558                GatewayClient::new(
559                    GatewayClientConfig::new_default()
560                        .with_disabled_credentials_mode(config.client.disabled_credentials_mode)
561                        .with_response_timeout(
562                            config.debug.gateway_connection.gateway_response_timeout,
563                        ),
564                    cfg,
565                    managed_keys.identity_keypair(),
566                    Some(details.shared_key),
567                    packet_router,
568                    bandwidth_controller,
569                    stats_reporter,
570                    #[cfg(unix)]
571                    connection_fd_callback,
572                    shutdown_tracker.clone_shutdown_token(),
573                )
574            };
575
576        let gateway_failure = |err| {
577            tracing::error!("Could not authenticate and start up the gateway connection - {err}");
578            ClientCoreError::GatewayClientError {
579                gateway_id: details.gateway_id.to_base58_string(),
580                source: Box::new(err),
581            }
582        };
583
584        // the gateway client startup procedure is slightly more complicated now
585        // we need to:
586        // - perform handshake (reg or auth)
587        // - check for bandwidth
588        // - start background tasks
589        let _ = gateway_client
590            .perform_initial_authentication()
591            .await
592            .map_err(gateway_failure)?;
593
594        gateway_client
595            .claim_initial_bandwidth()
596            .await
597            .map_err(gateway_failure)?;
598
599        gateway_client
600            .start_listening_for_mixnet_messages()
601            .map_err(gateway_failure)?;
602
603        Ok(gateway_client)
604    }
605
606    #[allow(clippy::too_many_arguments)]
607    async fn setup_gateway_transceiver(
608        custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
609        config: &Config,
610        initialisation_result: InitialisationResult,
611        bandwidth_controller: Option<BandwidthController<C, S::CredentialStore>>,
612        packet_router: PacketRouter,
613        stats_reporter: ClientStatsSender,
614        #[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
615        shutdown_tracker: &ShutdownTracker,
616    ) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
617    where
618        <S::KeyStore as KeyStore>::StorageError: Send + Sync + 'static,
619        <S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
620        <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
621    {
622        // if we have setup custom gateway sender and persisted details agree with it, return it
623        if let Some(mut custom_gateway_transceiver) = custom_gateway_transceiver {
624            return if !initialisation_result
625                .gateway_registration
626                .details
627                .is_custom()
628            {
629                Err(ClientCoreError::CustomGatewaySelectionExpected)
630            } else {
631                // and make sure to invalidate the task client, so we wouldn't cause premature shutdown
632                custom_gateway_transceiver.set_packet_router(packet_router)?;
633                Ok(custom_gateway_transceiver)
634            };
635        }
636
637        // otherwise, setup normal gateway client, etc
638        let gateway_client = Self::start_gateway_client(
639            config,
640            initialisation_result,
641            bandwidth_controller,
642            packet_router,
643            stats_reporter,
644            #[cfg(unix)]
645            connection_fd_callback,
646            shutdown_tracker,
647        )
648        .await?;
649
650        Ok(Box::new(RemoteGateway::new(gateway_client)))
651    }
652
653    fn setup_topology_provider(
654        custom_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
655        config_topology: config::Topology,
656        nym_api_urls: Vec<Url>,
657        nym_api_client: nym_http_api_client::Client,
658    ) -> Box<dyn TopologyProvider + Send + Sync> {
659        // if no custom provider was ... provided ..., create one using nym-api
660        custom_provider.unwrap_or_else(|| {
661            Box::new(NymApiTopologyProvider::new(
662                config_topology,
663                nym_api_urls,
664                nym_api_client,
665            ))
666        })
667    }
668
669    // future responsible for periodically polling directory server and updating
670    // the current global view of topology
671    async fn start_topology_refresher(
672        topology_provider: Box<dyn TopologyProvider + Send + Sync>,
673        topology_config: config::Topology,
674        topology_accessor: TopologyAccessor,
675        local_gateway: NodeIdentity,
676        wait_for_gateway: bool,
677        shutdown_tracker: &ShutdownTracker,
678    ) -> Result<(), ClientCoreError> {
679        let topology_refresher_config =
680            TopologyRefresherConfig::new(topology_config.topology_refresh_rate);
681
682        if topology_config.disable_refreshing {
683            // if we're not spawning the refresher, don't cause shutdown immediately
684            tracing::info!("The background topology refresher is not going to be started");
685        }
686
687        let mut topology_refresher = TopologyRefresher::new(
688            topology_refresher_config,
689            topology_accessor,
690            topology_provider,
691        );
692        // before returning, block entire runtime to refresh the current network view so that any
693        // components depending on topology would see a non-empty view
694        tracing::info!("Obtaining initial network topology");
695        topology_refresher.try_refresh().await;
696
697        if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
698            tracing::error!(
699                "The current network topology seem to be insufficient to route any packets through \
700                - check if enough nodes and a gateway are online - source: {err}"
701            );
702            return Err(ClientCoreError::InsufficientNetworkTopology(err));
703        }
704
705        let gateway_wait_timeout = if wait_for_gateway {
706            Some(topology_config.max_startup_gateway_waiting_period)
707        } else {
708            None
709        };
710
711        if let Err(err) = topology_refresher
712            .ensure_contains_routable_egress(local_gateway)
713            .await
714        {
715            if let Some(waiting_timeout) = gateway_wait_timeout {
716                if let Err(err) = topology_refresher
717                    .wait_for_gateway(local_gateway, waiting_timeout)
718                    .await
719                {
720                    tracing::error!(
721                        "the gateway did not come back online within the specified timeout: {err}"
722                    );
723                    return Err(err.into());
724                }
725            } else {
726                tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
727                return Err(err.into());
728            }
729        }
730
731        if !topology_config.disable_refreshing {
732            // don't spawn the refresher if we don't want to be refreshing the topology.
733            // only use the initial values obtained
734            tracing::info!("Starting topology refresher...");
735            shutdown_tracker.try_spawn_named_with_shutdown(
736                async move { topology_refresher.run().await },
737                "TopologyRefresher",
738            );
739        }
740
741        Ok(())
742    }
743
744    fn start_statistics_control(
745        config: &Config,
746        user_agent: Option<UserAgent>,
747        client_stats_id: String,
748        input_sender: Sender<InputMessage>,
749        shutdown_tracker: &ShutdownTracker,
750    ) -> ClientStatsSender {
751        tracing::info!("Starting statistics control...");
752        StatisticsControl::create_and_start(
753            config.debug.stats_reporting,
754            user_agent
755                .map(|u| u.application)
756                .unwrap_or("unknown".to_string()),
757            client_stats_id,
758            input_sender.clone(),
759            shutdown_tracker,
760        )
761    }
762
763    fn start_mix_traffic_controller(
764        gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
765        shutdown_tracker: &ShutdownTracker,
766        event_tx: EventSender,
767    ) -> (BatchMixMessageSender, ClientRequestSender) {
768        tracing::info!("Starting mix traffic controller...");
769        let mut mix_traffic_controller = MixTrafficController::new(
770            gateway_transceiver,
771            shutdown_tracker.clone_shutdown_token(),
772            event_tx,
773        );
774
775        let mix_tx = mix_traffic_controller.mix_tx();
776        let client_tx = mix_traffic_controller.client_tx();
777
778        shutdown_tracker.try_spawn_named(
779            async move { mix_traffic_controller.run().await },
780            "MixTrafficController",
781        );
782
783        (mix_tx, client_tx)
784    }
785
786    // TODO: rename it as it implies the data is persistent whilst one can use InMemBackend
787    async fn setup_persistent_reply_storage(
788        backend: S::ReplyStore,
789        key_rotation_config: KeyRotationConfig,
790        shutdown_tracker: &ShutdownTracker,
791    ) -> Result<CombinedReplyStorage, ClientCoreError>
792    where
793        <S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
794        S::ReplyStore: Send + Sync,
795    {
796        tracing::trace!("Setup persistent reply storage");
797        let now = OffsetDateTime::now_utc();
798        let expected_current_key_rotation_start =
799            key_rotation_config.expected_current_key_rotation_start(now);
800        // time of the start of one epoch BEFORE the CURRENT rotation has begun
801        // this indicates the starting time of when packets with the current keys might have been constructed
802        // (i.e. any surbs OLDER than that MUST BE invalid)
803        let prior_epoch_start =
804            expected_current_key_rotation_start - key_rotation_config.epoch_duration;
805
806        let persistent_storage = PersistentReplyStorage::new(backend);
807        let mem_store = persistent_storage
808            .load_state_from_backend(prior_epoch_start)
809            .await
810            .map_err(|err| ClientCoreError::SurbStorageError {
811                source: Box::new(err),
812            })?;
813
814        let store_clone = mem_store.clone();
815        let shutdown_token = shutdown_tracker.clone_shutdown_token();
816        shutdown_tracker.try_spawn_named(
817            async move {
818                persistent_storage
819                    .flush_on_shutdown(store_clone, shutdown_token)
820                    .await
821            },
822            "PersistentReplyStorage::flush_on_shutdown",
823        );
824
825        Ok(mem_store)
826    }
827
828    async fn initialise_keys_and_gateway(
829        setup_method: GatewaySetup,
830        key_store: &S::KeyStore,
831        details_store: &S::GatewaysDetailsStore,
832        derivation_material: Option<DerivationMaterial>,
833    ) -> Result<InitialisationResult, ClientCoreError>
834    where
835        <S::KeyStore as KeyStore>::StorageError: Sync + Send,
836        <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
837    {
838        // if client keys do not exist already, create and persist them
839        if key_store.load_keys().await.is_err() {
840            tracing::info!("could not find valid client keys - a new set will be generated");
841            let mut rng = OsRng;
842            let keys = if let Some(derivation_material) = derivation_material {
843                ClientKeys::from_master_key(&mut rng, &derivation_material)
844                    .map_err(|_| ClientCoreError::HkdfDerivationError)?
845            } else {
846                ClientKeys::generate_new(&mut rng)
847            };
848            store_client_keys(keys, key_store).await?;
849        }
850
851        setup_gateway(setup_method, key_store, details_store).await
852    }
853
854    fn construct_nym_api_client(
855        nym_api_urls: Option<&Vec<nym_network_defaults::ApiUrl>>,
856        config: &Config,
857        user_agent: Option<UserAgent>,
858    ) -> Result<nym_http_api_client::Client, ClientCoreError> {
859        tracing::debug!(
860            "construct_nym_api_client called with nym_api_urls: {}",
861            nym_api_urls.is_some()
862        );
863
864        // If API URLs are provided, use new_with_fronted_urls() which handles domain fronting
865        if let Some(nym_api_urls) = nym_api_urls {
866            if nym_api_urls.is_empty() {
867                tracing::warn!("Provided nym_api_urls is empty, falling back to config endpoints");
868            } else {
869                tracing::info!(
870                    "Building nym-api client from provided URLs (with domain fronting support): {} URLs",
871                    nym_api_urls.len()
872                );
873
874                let mut builder =
875                    nym_http_api_client::ClientBuilder::new_with_fronted_urls(nym_api_urls.clone())
876                        .map_err(ClientCoreError::from)?
877                        .with_retries(DEFAULT_NYM_API_RETRIES);
878
879                if let Some(user_agent) = user_agent {
880                    builder = builder.with_user_agent(user_agent);
881                }
882
883                return builder.build().map_err(ClientCoreError::from);
884            }
885        }
886
887        // Fallback to basic client for backwards compatibility
888        tracing::debug!("Building basic nym-api HTTP client from config endpoints");
889
890        let mut nym_api_urls = config.get_nym_api_endpoints();
891        if nym_api_urls.is_empty() {
892            tracing::warn!("No API endpoints configured in config, this may cause issues");
893        }
894        nym_api_urls.shuffle(&mut thread_rng());
895
896        // Convert config URLs to ApiUrl format for consistency
897        let api_urls: Vec<nym_network_defaults::ApiUrl> = nym_api_urls
898            .into_iter()
899            .map(|url| nym_network_defaults::ApiUrl {
900                url: url.to_string(),
901                front_hosts: None,
902            })
903            .collect();
904
905        tracing::debug!("Using {} config API endpoints", api_urls.len());
906
907        let mut builder = nym_http_api_client::ClientBuilder::new_with_fronted_urls(api_urls)
908            .map_err(ClientCoreError::from)?
909            .with_retries(DEFAULT_NYM_API_RETRIES)
910            .with_bincode();
911
912        if let Some(user_agent) = user_agent {
913            builder = builder.with_user_agent(user_agent);
914        }
915
916        builder.build().map_err(ClientCoreError::from)
917    }
918
919    async fn determine_key_rotation_state(
920        client: &nym_http_api_client::Client,
921    ) -> Result<KeyRotationConfig, ClientCoreError> {
922        Ok(client.get_key_rotation_info().await?.into())
923    }
924
925    pub async fn start_base(mut self) -> Result<BaseClient, ClientCoreError>
926    where
927        S::ReplyStore: Send + Sync,
928        <S::KeyStore as KeyStore>::StorageError: Send + Sync,
929        <S::ReplyStore as ReplyStorageBackend>::StorageError: Sync + Send,
930        <S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
931        <S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
932    {
933        tracing::info!("Starting nym client");
934        #[cfg(debug_assertions)]
935        #[cfg(target_arch = "wasm32")]
936        {
937            console_log!("Starting base Nym Client");
938        }
939
940        // derive (or load) client keys and gateway configuration
941        let init_res = Self::initialise_keys_and_gateway(
942            self.setup_method,
943            self.client_store.key_store(),
944            self.client_store.gateway_details_store(),
945            self.derivation_material,
946        )
947        .await?;
948
949        let (reply_storage_backend, credential_store, _) = self.client_store.into_runtime_stores();
950
951        // channels for inter-component communication
952        // TODO: make the channels be internally created by the relevant components
953        // rather than creating them here, so say for example the buffer controller would create the request channels
954        // and would allow anyone to clone the sender channel
955
956        // unwrapped_sphinx_sender is the transmitter of mixnet messages received from the gateway
957        // unwrapped_sphinx_receiver is the receiver for said messages - used by ReceivedMessagesBuffer
958        let (mixnet_messages_sender, mixnet_messages_receiver) = mpsc::unbounded();
959
960        // used for announcing connection or disconnection of a channel for pushing re-assembled messages to
961        let (received_buffer_request_sender, received_buffer_request_receiver) = mpsc::unbounded();
962
963        // channels responsible for controlling real messages
964        let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
965
966        // channels responsible for event management
967        let (event_sender, event_receiver) = mpsc::unbounded();
968
969        // channels responsible for controlling ack messages
970        let (ack_sender, ack_receiver) = mpsc::unbounded();
971        let shared_topology_accessor =
972            TopologyAccessor::new(self.config.debug.topology.ignore_egress_epoch_role);
973
974        // Create a shutdown tracker for this client - either as a child of provided tracker
975        // or get one from the registry
976        let shutdown_tracker = match self.shutdown {
977            Some(parent_tracker) => parent_tracker.clone(),
978            None => nym_task::create_sdk_shutdown_tracker()?,
979        };
980
981        Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
982
983        // channels responsible for dealing with reply-related fun
984        let (reply_controller_sender, reply_controller_receiver) =
985            reply_controller::requests::new_control_channels();
986
987        let self_address = Self::mix_address(&init_res);
988        let ack_key = init_res.client_keys.ack_key();
989        let encryption_keys = init_res.client_keys.encryption_keypair();
990        let identity_keys = init_res.client_keys.identity_keypair();
991
992        // the components are started in very specific order. Unless you know what you are doing,
993        // do not change that.
994        let bandwidth_controller = self
995            .dkg_query_client
996            .map(|client| BandwidthController::new(credential_store, client));
997
998        let nym_api_client = Self::construct_nym_api_client(
999            self.nym_api_urls.as_ref(),
1000            &self.config,
1001            self.user_agent.clone(),
1002        )?;
1003        let key_rotation_config = Self::determine_key_rotation_state(&nym_api_client).await?;
1004
1005        let topology_provider = Self::setup_topology_provider(
1006            self.custom_topology_provider.take(),
1007            self.config.debug.topology,
1008            self.config.get_nym_api_endpoints(),
1009            nym_api_client,
1010        );
1011
1012        let stats_reporter = Self::start_statistics_control(
1013            &self.config,
1014            self.user_agent.clone(),
1015            generate_client_stats_id(*self_address.identity()),
1016            input_sender.clone(),
1017            &shutdown_tracker.clone(),
1018        );
1019
1020        // needs to be started as the first thing to block if required waiting for the gateway
1021        Self::start_topology_refresher(
1022            topology_provider,
1023            self.config.debug.topology,
1024            shared_topology_accessor.clone(),
1025            self_address.gateway(),
1026            self.wait_for_gateway,
1027            &shutdown_tracker.clone(),
1028        )
1029        .await?;
1030
1031        let gateway_packet_router = PacketRouter::new(
1032            ack_sender,
1033            mixnet_messages_sender,
1034            shutdown_tracker.clone_shutdown_token(),
1035        );
1036
1037        let gateway_transceiver = Self::setup_gateway_transceiver(
1038            self.custom_gateway_transceiver,
1039            &self.config,
1040            init_res,
1041            bandwidth_controller,
1042            gateway_packet_router,
1043            stats_reporter.clone(),
1044            #[cfg(unix)]
1045            self.connection_fd_callback,
1046            &shutdown_tracker.clone(),
1047        )
1048        .await?;
1049        let gateway_ws_fd = gateway_transceiver.ws_fd();
1050
1051        let reply_storage = Self::setup_persistent_reply_storage(
1052            reply_storage_backend,
1053            key_rotation_config,
1054            &shutdown_tracker.clone(),
1055        )
1056        .await?;
1057
1058        Self::start_received_messages_buffer_controller(
1059            encryption_keys,
1060            received_buffer_request_receiver,
1061            mixnet_messages_receiver,
1062            reply_storage.key_storage(),
1063            reply_controller_sender.clone(),
1064            stats_reporter.clone(),
1065            &shutdown_tracker.clone(),
1066        );
1067
1068        // The message_sender is the transmitter for any component generating sphinx packets
1069        // that are to be sent to the mixnet. They are used by cover traffic stream and real
1070        // traffic stream.
1071        // The MixTrafficController then sends the actual traffic
1072
1073        let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
1074            gateway_transceiver,
1075            &shutdown_tracker.clone(),
1076            EventSender(event_sender),
1077        );
1078
1079        // Channels that the websocket listener can use to signal downstream to the real traffic
1080        // controller that connections are closed.
1081        let (client_connection_tx, client_connection_rx) = mpsc::unbounded();
1082
1083        // Shared queue length data. Published by the `OutQueueController` in the client, and used
1084        // primarily to throttle incoming connections (e.g socks5 for attached network-requesters)
1085        let shared_lane_queue_lengths = LaneQueueLengths::new();
1086
1087        let controller_config = real_messages_control::Config::new(
1088            &self.config.debug,
1089            Arc::clone(&ack_key),
1090            self_address,
1091        );
1092
1093        Self::start_real_traffic_controller(
1094            controller_config,
1095            key_rotation_config,
1096            shared_topology_accessor.clone(),
1097            ack_receiver,
1098            input_receiver,
1099            message_sender.clone(),
1100            reply_storage,
1101            reply_controller_sender.clone(),
1102            reply_controller_receiver,
1103            shared_lane_queue_lengths.clone(),
1104            client_connection_rx,
1105            stats_reporter.clone(),
1106            &shutdown_tracker.clone(),
1107        );
1108
1109        if !self
1110            .config
1111            .debug
1112            .cover_traffic
1113            .disable_loop_cover_traffic_stream
1114        {
1115            Self::start_cover_traffic_stream(
1116                &self.config.debug,
1117                ack_key,
1118                self_address,
1119                shared_topology_accessor.clone(),
1120                message_sender,
1121                stats_reporter.clone(),
1122                &shutdown_tracker.clone(),
1123            );
1124        }
1125
1126        tracing::debug!("Core client startup finished!");
1127        tracing::debug!("The address of this client is: {self_address}");
1128
1129        #[cfg(debug_assertions)]
1130        #[cfg(target_arch = "wasm32")]
1131        {
1132            console_log!("Core client startup finished!");
1133            console_log!("Rust::start_base: the address of this client is: {self_address}");
1134        }
1135
1136        Ok(BaseClient {
1137            address: self_address,
1138            identity_keys,
1139            client_input: ClientInputStatus::AwaitingProducer {
1140                client_input: ClientInput {
1141                    connection_command_sender: client_connection_tx,
1142                    input_sender,
1143                    client_request_sender,
1144                },
1145            },
1146            client_output: ClientOutputStatus::AwaitingConsumer {
1147                client_output: ClientOutput {
1148                    received_buffer_request_sender,
1149                },
1150            },
1151            client_state: ClientState {
1152                shared_lane_queue_lengths,
1153                reply_controller_sender,
1154                topology_accessor: shared_topology_accessor,
1155                gateway_connection: GatewayConnection { gateway_ws_fd },
1156            },
1157            stats_reporter,
1158            shutdown_handle: shutdown_tracker, // The primary tracker for this client
1159            forget_me: self.config.debug.forget_me,
1160            remember_me: self.config.debug.remember_me,
1161        })
1162    }
1163}
1164
1165pub struct BaseClient {
1166    pub address: Recipient,
1167    pub identity_keys: Arc<ed25519::KeyPair>,
1168    pub client_input: ClientInputStatus,
1169    pub client_output: ClientOutputStatus,
1170    pub client_state: ClientState,
1171    pub stats_reporter: ClientStatsSender,
1172    pub shutdown_handle: ShutdownTracker,
1173    pub forget_me: ForgetMe,
1174    pub remember_me: RememberMe,
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use super::*;
1180    use nym_network_defaults::{ApiUrl, NymNetworkDetails};
1181
1182    #[test]
1183    fn test_network_details_with_multiple_urls() {
1184        // Verify that network details can be configured with multiple API URLs
1185        let mut network_details = NymNetworkDetails::new_empty();
1186        network_details.nym_api_urls = Some(vec![
1187            ApiUrl {
1188                url: "https://validator.nymtech.net/api/".to_string(),
1189                front_hosts: None,
1190            },
1191            ApiUrl {
1192                url: "https://nym-frontdoor.vercel.app/api/".to_string(),
1193                front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
1194            },
1195        ]);
1196
1197        assert_eq!(network_details.nym_api_urls.as_ref().unwrap().len(), 2);
1198        assert!(network_details.nym_api_urls.as_ref().unwrap()[1]
1199            .front_hosts
1200            .is_some());
1201    }
1202
1203    #[test]
1204    fn test_network_details_with_front_hosts() {
1205        // Verify that ApiUrl can store domain fronting configuration
1206        let api_url = ApiUrl {
1207            url: "https://nym-frontdoor.vercel.app/api/".to_string(),
1208            front_hosts: Some(vec!["vercel.app".to_string(), "vercel.com".to_string()]),
1209        };
1210
1211        assert_eq!(api_url.url, "https://nym-frontdoor.vercel.app/api/");
1212        assert_eq!(api_url.front_hosts.as_ref().unwrap().len(), 2);
1213        assert!(api_url
1214            .front_hosts
1215            .as_ref()
1216            .unwrap()
1217            .contains(&"vercel.app".to_string()));
1218    }
1219
1220    #[test]
1221    fn test_default_nym_api_retries_constant() {
1222        // Verify the retry constant is set correctly
1223        assert_eq!(DEFAULT_NYM_API_RETRIES, 3);
1224    }
1225}