nym-client-core 1.21.1

Crate containing core client functionality and configs, used by all other Nym client implentations
Documentation
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0

use crate::client::{
    base_client::{EventSender, MixnetClientEvent},
    mix_traffic::transceiver::GatewayTransceiver,
};
use nym_gateway_requests::ClientRequest;
use nym_sphinx::forwarding::packet::MixPacket;
use nym_task::ShutdownToken;
use tracing::*;
use transceiver::ErasedGatewayError;

pub type BatchMixMessageSender = tokio::sync::mpsc::Sender<Vec<MixPacket>>;
pub type BatchMixMessageReceiver = tokio::sync::mpsc::Receiver<Vec<MixPacket>>;
pub type ClientRequestReceiver = tokio::sync::mpsc::Receiver<ClientRequest>;
pub type ClientRequestSender = tokio::sync::mpsc::Sender<ClientRequest>;

pub mod transceiver;

// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;

/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds).
/// If we can't send 20 packets in a row, the gateway is unreachable.
const MAX_FAILURE_COUNT: usize = 20;

// that's also disgusting.
pub struct Empty;

#[derive(Clone, Copy, Debug)]
pub enum MixTrafficEvent {
    FailedSendingSphinx,
}

pub struct MixTrafficController {
    gateway_transceiver: Box<dyn GatewayTransceiver + Send>,

    mix_tx: BatchMixMessageSender,
    mix_rx: BatchMixMessageReceiver,
    client_rx: ClientRequestReceiver,
    client_tx: ClientRequestSender,

    // TODO: this is temporary work-around.
    // in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
    consecutive_gateway_failure_count: usize,

    shutdown_token: ShutdownToken,
    event_tx: EventSender,
}

impl MixTrafficController {
    pub fn new<T>(
        gateway_transceiver: T,
        shutdown_token: ShutdownToken,
        event_tx: EventSender,
    ) -> MixTrafficController
    where
        T: GatewayTransceiver + Send + 'static,
    {
        let (message_sender, message_receiver) =
            tokio::sync::mpsc::channel(MIX_MESSAGE_RECEIVER_BUFFER_SIZE);

        let (client_sender, client_receiver) = tokio::sync::mpsc::channel(8);

        MixTrafficController {
            gateway_transceiver: Box::new(gateway_transceiver),
            mix_tx: message_sender,
            mix_rx: message_receiver,
            client_rx: client_receiver,
            client_tx: client_sender,
            consecutive_gateway_failure_count: 0,
            shutdown_token,
            event_tx,
        }
    }

    pub fn new_dynamic(
        gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
        shutdown_token: ShutdownToken,
        event_tx: EventSender,
    ) -> MixTrafficController {
        Self::new(gateway_transceiver, shutdown_token, event_tx)
    }

    pub fn client_tx(&self) -> ClientRequestSender {
        self.client_tx.clone()
    }

    pub fn mix_tx(&self) -> BatchMixMessageSender {
        self.mix_tx.clone()
    }

    async fn on_messages(
        &mut self,
        mut mix_packets: Vec<MixPacket>,
    ) -> Result<(), ErasedGatewayError> {
        debug_assert!(!mix_packets.is_empty());
        let send_future = if mix_packets.len() == 1 {
            // SAFETY: we just checked we have one packet
            #[allow(clippy::unwrap_used)]
            let mix_packet = mix_packets.pop().unwrap();
            self.gateway_transceiver.send_mix_packet(mix_packet)
        } else {
            self.gateway_transceiver.batch_send_mix_packets(mix_packets)
        };

        tokio::select! {
            biased;
            _ = self.shutdown_token.cancelled() => {
                trace!("received shutdown while handling messages");
                Ok(())
            }
            result = send_future => {
                if result.is_err() {
                    self.consecutive_gateway_failure_count += 1;
                } else {
                    trace!("We *might* have managed to forward sphinx packet(s) to the gateway!");
                    self.consecutive_gateway_failure_count = 0;
                }

                result
            }
        }
    }

    async fn on_client_request(&mut self, client_request: ClientRequest) {
        tokio::select! {
            biased;
             _ = self.shutdown_token.cancelled() => {
                trace!("received shutdown while handling client request");
            }
            result = self.gateway_transceiver.send_client_request(client_request) => {
                if let Err(err) = result {
                    error!("Failed to send client request: {err}")
                }
            }
        }
    }

    pub async fn run(&mut self) {
        debug!("Started MixTrafficController with graceful shutdown support");
        loop {
            tokio::select! {
                biased;
                _ = self.shutdown_token.cancelled() => {
                    trace!("MixTrafficController: Received shutdown");
                    break;
                }
                // mix_rx should never error out as we're holding one instance of the sender

                Some(mix_packets) = self.mix_rx.recv() =>  {
                    if let Err(err) = self.on_messages(mix_packets).await {
                        error!("Failed to send sphinx packet(s) to the gateway: {err}");
                        if self.consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
                            // Disconnect from the gateway. If we should try to re-connect
                            // is handled at a higher layer.
                            error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
                            // Do we need to handle the embedded mixnet client case
                            // separately?
                            self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
                            // IMO it shouldn't be signalled from there but it is how it is
                            // TODO : report the failure upwards and shutdown from upwards
                            // Gateway is dead, we have to shut down currently
                            error!("Signalling shutdown from the MixTrafficController");
                            self.shutdown_token.cancel();
                            break;
                        }
                    }
                },
                // client_rx should never error out as we're holding one instance of the sender
                Some(client_request) = self.client_rx.recv() => {
                    self.on_client_request(client_request).await;
                }
            }
        }
        debug!("MixTrafficController: Exiting");
    }
}