nym_gateway_client/
packet_router.rs

1// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4// JS: I personally don't like this name very much, but could not think of anything better.
5// I will gladly take any suggestions on how to rename this.
6
7use crate::error::GatewayClientError;
8use crate::GatewayPacketRouter;
9use futures::channel::mpsc;
10use nym_task::ShutdownToken;
11
12pub type MixnetMessageSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
13pub type MixnetMessageReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
14
15pub type AcknowledgementSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
16pub type AcknowledgementReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
17
18#[derive(Clone, Debug)]
19pub struct PacketRouter {
20    ack_sender: AcknowledgementSender,
21    mixnet_message_sender: MixnetMessageSender,
22    shutdown: ShutdownToken,
23}
24
25impl PacketRouter {
26    pub fn new(
27        ack_sender: AcknowledgementSender,
28        mixnet_message_sender: MixnetMessageSender,
29        shutdown: ShutdownToken,
30    ) -> Self {
31        PacketRouter {
32            ack_sender,
33            mixnet_message_sender,
34            shutdown,
35        }
36    }
37
38    #[allow(clippy::panic)]
39    pub fn route_mixnet_messages(
40        &self,
41        received_messages: Vec<Vec<u8>>,
42    ) -> Result<(), GatewayClientError> {
43        if let Err(err) = self.mixnet_message_sender.unbounded_send(received_messages) {
44            // check if the failure is due to the shutdown being in progress and thus the receiver channel
45            // having already been dropped
46            if self.shutdown.is_cancelled() {
47                // This should ideally not happen, but it's ok
48                tracing::warn!("Failed to send mixnet messages due to receiver task shutdown");
49                return Err(GatewayClientError::ShutdownInProgress);
50            }
51            // This should never happen during ordinary operation the way it's currently used.
52            // Abort to be on the safe side
53            panic!("Failed to send mixnet message: {err}");
54        }
55        Ok(())
56    }
57
58    #[allow(clippy::panic)]
59    pub fn route_acks(&self, received_acks: Vec<Vec<u8>>) -> Result<(), GatewayClientError> {
60        if let Err(err) = self.ack_sender.unbounded_send(received_acks) {
61            // check if the failure is due to the shutdown being in progress and thus the receiver channel
62            // having already been dropped
63            if self.shutdown.is_cancelled() {
64                // This should ideally not happen, but it's ok
65                tracing::warn!("Failed to send acks due to receiver task shutdown");
66                return Err(GatewayClientError::ShutdownInProgress);
67            }
68            // This should never happen during ordinary operation the way it's currently used.
69            // Abort to be on the safe side
70            panic!("Failed to send acks: {err}");
71        }
72        Ok(())
73    }
74}
75
76impl GatewayPacketRouter for PacketRouter {
77    type Error = GatewayClientError;
78
79    // note: this trait tries to decide whether a given message is an ack or a data message
80
81    fn route_mixnet_messages(&self, received_messages: Vec<Vec<u8>>) -> Result<(), Self::Error> {
82        self.route_mixnet_messages(received_messages)
83    }
84
85    fn route_acks(&self, received_acks: Vec<Vec<u8>>) -> Result<(), Self::Error> {
86        self.route_acks(received_acks)
87    }
88}