nym_gateway_client/
packet_router.rs1use crate::error::GatewayClientError;
8use crate::GatewayPacketRouter;
9use futures::channel::mpsc;
10use nym_task::ShutdownToken;
11
12pub type MixnetMessageSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
13pub type MixnetMessageReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
14
15pub type AcknowledgementSender = mpsc::UnboundedSender<Vec<Vec<u8>>>;
16pub type AcknowledgementReceiver = mpsc::UnboundedReceiver<Vec<Vec<u8>>>;
17
18#[derive(Clone, Debug)]
19pub struct PacketRouter {
20 ack_sender: AcknowledgementSender,
21 mixnet_message_sender: MixnetMessageSender,
22 shutdown: ShutdownToken,
23}
24
25impl PacketRouter {
26 pub fn new(
27 ack_sender: AcknowledgementSender,
28 mixnet_message_sender: MixnetMessageSender,
29 shutdown: ShutdownToken,
30 ) -> Self {
31 PacketRouter {
32 ack_sender,
33 mixnet_message_sender,
34 shutdown,
35 }
36 }
37
38 #[allow(clippy::panic)]
39 pub fn route_mixnet_messages(
40 &self,
41 received_messages: Vec<Vec<u8>>,
42 ) -> Result<(), GatewayClientError> {
43 if let Err(err) = self.mixnet_message_sender.unbounded_send(received_messages) {
44 if self.shutdown.is_cancelled() {
47 tracing::warn!("Failed to send mixnet messages due to receiver task shutdown");
49 return Err(GatewayClientError::ShutdownInProgress);
50 }
51 panic!("Failed to send mixnet message: {err}");
54 }
55 Ok(())
56 }
57
58 #[allow(clippy::panic)]
59 pub fn route_acks(&self, received_acks: Vec<Vec<u8>>) -> Result<(), GatewayClientError> {
60 if let Err(err) = self.ack_sender.unbounded_send(received_acks) {
61 if self.shutdown.is_cancelled() {
64 tracing::warn!("Failed to send acks due to receiver task shutdown");
66 return Err(GatewayClientError::ShutdownInProgress);
67 }
68 panic!("Failed to send acks: {err}");
71 }
72 Ok(())
73 }
74}
75
76impl GatewayPacketRouter for PacketRouter {
77 type Error = GatewayClientError;
78
79 fn route_mixnet_messages(&self, received_messages: Vec<Vec<u8>>) -> Result<(), Self::Error> {
82 self.route_mixnet_messages(received_messages)
83 }
84
85 fn route_acks(&self, received_acks: Vec<Vec<u8>>) -> Result<(), Self::Error> {
86 self.route_acks(received_acks)
87 }
88}