nym_sdk/mixnet/
native_client.rs

1use crate::mixnet::client::MixnetClientBuilder;
2use crate::mixnet::traits::MixnetMessageSender;
3use crate::{Error, Result};
4use async_trait::async_trait;
5use futures::{ready, Stream, StreamExt};
6use log::{debug, error};
7use nym_client_core::client::base_client::GatewayConnection;
8use nym_client_core::client::mix_traffic::ClientRequestSender;
9use nym_client_core::client::{
10    base_client::{ClientInput, ClientOutput, ClientState},
11    inbound_messages::InputMessage,
12    received_buffer::ReconstructedMessagesReceiver,
13};
14use nym_client_core::config::{ForgetMe, RememberMe};
15use nym_crypto::asymmetric::ed25519;
16use nym_gateway_requests::ClientRequest;
17use nym_sphinx::addressing::clients::Recipient;
18use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage};
19use nym_statistics_common::clients::{ClientStatsEvents, ClientStatsSender};
20use nym_task::connections::{ConnectionCommandSender, LaneQueueLengths};
21use nym_task::ShutdownTracker;
22use nym_topology::{NymRouteProvider, NymTopology};
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::{Context, Poll};
26use tokio::sync::RwLockReadGuard;
27use tokio_util::sync::CancellationToken;
28
29/// Client connected to the Nym mixnet.
30pub struct MixnetClient {
31    /// The nym address of this connected client.
32    pub(crate) nym_address: Recipient,
33
34    pub(crate) identity_keys: Arc<ed25519::KeyPair>,
35
36    /// Input to the client from the users perspective. This can be either data to send or control
37    /// messages.
38    pub(crate) client_input: ClientInput,
39
40    /// Output from the client from the users perspective. This is typically messages arriving from
41    /// the mixnet.
42    #[allow(dead_code)]
43    pub(crate) client_output: ClientOutput,
44
45    /// The current state of the client that is exposed to the user. This includes things like
46    /// current message send queue length.
47    pub(crate) client_state: ClientState,
48
49    /// A channel for messages arriving from the mixnet after they have been reconstructed.
50    pub(crate) reconstructed_receiver: ReconstructedMessagesReceiver,
51
52    /// A channel for sending stats event to be reported.
53    pub(crate) stats_events_reporter: ClientStatsSender,
54
55    /// The task manager that controls all the spawned tasks that the clients uses to do it's job.
56    pub(crate) shutdown_handle: ShutdownTracker,
57    pub(crate) packet_type: Option<PacketType>,
58
59    // internal state used for the `Stream` implementation
60    _buffered: Vec<ReconstructedMessage>,
61    pub(crate) forget_me: ForgetMe,
62    pub(crate) remember_me: RememberMe,
63}
64
65impl MixnetClient {
66    #[allow(clippy::too_many_arguments)]
67    pub(crate) fn new(
68        nym_address: Recipient,
69        identity_keys: Arc<ed25519::KeyPair>,
70        client_input: ClientInput,
71        client_output: ClientOutput,
72        client_state: ClientState,
73        reconstructed_receiver: ReconstructedMessagesReceiver,
74        stats_events_reporter: ClientStatsSender,
75        task_handle: ShutdownTracker,
76        packet_type: Option<PacketType>,
77        forget_me: ForgetMe,
78        remember_me: RememberMe,
79    ) -> Self {
80        Self {
81            nym_address,
82            identity_keys,
83            client_input,
84            client_output,
85            client_state,
86            reconstructed_receiver,
87            stats_events_reporter,
88            shutdown_handle: task_handle,
89            packet_type,
90            _buffered: Vec::new(),
91            forget_me,
92            remember_me,
93        }
94    }
95
96    /// Create a new client and connect to the mixnet using ephemeral in-memory keys that are
97    /// discarded at application close.
98    ///
99    /// # Examples
100    ///
101    /// ```no_run
102    /// use nym_sdk::mixnet;
103    ///
104    /// #[tokio::main]
105    /// async fn main() {
106    ///     let mut client = mixnet::MixnetClient::connect_new().await;
107    /// }
108    ///
109    /// ```
110    pub async fn connect_new() -> Result<Self> {
111        MixnetClientBuilder::new_ephemeral()
112            .build()?
113            .connect_to_mixnet()
114            .await
115    }
116
117    /// Get the nym address for this client, if it is available. The nym address is composed of the
118    /// client identity, the client encryption key, and the gateway identity.
119    pub fn nym_address(&self) -> &Recipient {
120        &self.nym_address
121    }
122
123    /// Get a child token of the root, to monitor unexpected shutdown, without causing one
124    pub fn cancellation_token(&self) -> CancellationToken {
125        self.shutdown_handle.child_shutdown_token().inner().clone()
126    }
127
128    pub fn client_request_sender(&self) -> ClientRequestSender {
129        self.client_input.client_request_sender.clone()
130    }
131
132    /// Get the client's identity keys.
133    pub fn identity_keypair(&self) -> Arc<ed25519::KeyPair> {
134        self.identity_keys.clone()
135    }
136
137    /// Sign a message with the client's private identity key.
138    pub fn sign(&self, data: &[u8]) -> ed25519::Signature {
139        self.identity_keys.private_key().sign(data)
140    }
141
142    /// Sign a message with the client's private identity key and return it as a base58 encoded
143    /// signature.
144    pub fn sign_text(&self, text: &str) -> String {
145        self.identity_keys.private_key().sign_text(text)
146    }
147
148    /// Get gateway connection information, like the file descriptor of the WebSocket
149    pub fn gateway_connection(&self) -> GatewayConnection {
150        self.client_state.gateway_connection
151    }
152
153    /// Get a shallow clone of [`MixnetClientSender`]. Useful if you want split the send and
154    /// receive logic in different locations.
155    pub fn split_sender(&self) -> MixnetClientSender {
156        MixnetClientSender {
157            client_input: self.client_input.clone(),
158            packet_type: self.packet_type,
159        }
160    }
161
162    /// Get a shallow clone of [`ConnectionCommandSender`]. This is useful if you want to e.g
163    /// explicitly close a transmission lane that is still sending data even though it should
164    /// cancel.
165    pub fn connection_command_sender(&self) -> ConnectionCommandSender {
166        self.client_input.connection_command_sender.clone()
167    }
168
169    /// Get a shallow clone of [`LaneQueueLengths`]. This is useful to manually implement some form
170    /// of backpressure logic.
171    pub fn shared_lane_queue_lengths(&self) -> LaneQueueLengths {
172        self.client_state.shared_lane_queue_lengths.clone()
173    }
174
175    /// Change the network topology used by this client for constructing sphinx packets into the
176    /// provided one.
177    pub async fn manually_overwrite_topology(&self, new_topology: NymTopology) {
178        self.client_state
179            .topology_accessor
180            .manually_change_topology(new_topology)
181            .await
182    }
183
184    /// Gets the value of the currently used network topology.
185    pub async fn read_current_route_provider(
186        &self,
187    ) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
188        self.client_state
189            .topology_accessor
190            .current_route_provider()
191            .await
192    }
193
194    /// Restore default topology refreshing behaviour of this client.
195    pub fn restore_automatic_topology_refreshing(&self) {
196        self.client_state.topology_accessor.release_manual_control()
197    }
198
199    /// Wait for messages from the mixnet
200    pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
201        self.reconstructed_receiver.next().await
202    }
203
204    /// Provide a callback to execute on incoming messages from the mixnet.
205    pub async fn on_messages<F>(&mut self, fun: F)
206    where
207        F: Fn(ReconstructedMessage),
208    {
209        while let Some(msgs) = self.wait_for_messages().await {
210            for msg in msgs {
211                fun(msg)
212            }
213        }
214    }
215
216    pub fn send_stats_event(&self, event: ClientStatsEvents) {
217        self.stats_events_reporter.report(event);
218    }
219
220    /// Get a clone of stats_events_reporter for easier use
221    pub fn stats_events_reporter(&self) -> ClientStatsSender {
222        self.stats_events_reporter.clone()
223    }
224
225    /// Disconnect from the mixnet. Currently, it is not supported to reconnect a disconnected
226    /// client.
227    pub async fn disconnect(self) {
228        if self.forget_me.any() {
229            log::debug!("Sending forget me request: {:?}", self.forget_me);
230            match self.send_forget_me().await {
231                Ok(_) => (),
232                Err(e) => error!("Failed to send forget me request: {e}"),
233            };
234            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
235        } else if self.remember_me.stats() {
236            log::debug!("Sending remember me request: {:?}", self.remember_me);
237            match self.send_remember_me().await {
238                Ok(_) => (),
239                Err(e) => error!("Failed to send remember me request: {e}"),
240            };
241            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
242        }
243
244        self.shutdown_handle.shutdown().await;
245    }
246
247    pub async fn send_forget_me(&self) -> Result<()> {
248        let client_request = ClientRequest::ForgetMe {
249            client: self.forget_me.client(),
250            stats: self.forget_me.stats(),
251        };
252        match self
253            .client_input
254            .client_request_sender
255            .send(client_request)
256            .await
257        {
258            Ok(_) => Ok(()),
259            Err(e) => {
260                error!("Failed to send forget me request: {e}");
261                Err(Error::MessageSendingFailure)
262            }
263        }
264    }
265
266    pub async fn send_remember_me(&self) -> Result<()> {
267        let client_request = ClientRequest::RememberMe {
268            session_type: self.remember_me.session_type(),
269        };
270        match self
271            .client_input
272            .client_request_sender
273            .send(client_request)
274            .await
275        {
276            Ok(_) => Ok(()),
277            Err(e) => {
278                error!("Failed to send forget me request: {e}");
279                Err(Error::MessageSendingFailure)
280            }
281        }
282    }
283}
284
285#[derive(Clone)]
286pub struct MixnetClientSender {
287    client_input: ClientInput,
288    packet_type: Option<PacketType>,
289}
290
291impl Stream for MixnetClient {
292    type Item = ReconstructedMessage;
293
294    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295        if let Some(next) = self._buffered.pop() {
296            cx.waker().wake_by_ref();
297            return Poll::Ready(Some(next));
298        }
299        match ready!(Pin::new(&mut self.reconstructed_receiver).poll_next(cx)) {
300            None => Poll::Ready(None),
301            Some(mut msgs) => {
302                // the vector itself should never be empty
303                if let Some(next) = msgs.pop() {
304                    // there's more than a single message - buffer them and wake the waker
305                    // to get polled again immediately
306                    if !msgs.is_empty() {
307                        self._buffered = msgs;
308                        cx.waker().wake_by_ref();
309                    }
310                    Poll::Ready(Some(next))
311                } else {
312                    // I *think* this happens for SURBs, but I'm not 100% sure. Nonetheless it's
313                    // beneign, but let's log it here anyway as a reminder
314                    debug!("the reconstructed messages vector is empty");
315                    cx.waker().wake_by_ref();
316                    Poll::Pending
317                }
318            }
319        }
320    }
321}
322
323#[async_trait]
324impl MixnetMessageSender for MixnetClient {
325    fn packet_type(&self) -> Option<PacketType> {
326        self.packet_type
327    }
328
329    async fn send(&self, message: InputMessage) -> Result<()> {
330        self.client_input
331            .send(message)
332            .await
333            .map_err(|_| Error::MessageSendingFailure)
334    }
335}
336
337#[async_trait]
338impl MixnetMessageSender for MixnetClientSender {
339    fn packet_type(&self) -> Option<PacketType> {
340        self.packet_type
341    }
342
343    async fn send(&self, message: InputMessage) -> Result<()> {
344        self.client_input
345            .send(message)
346            .await
347            .map_err(|_| Error::MessageSendingFailure)
348    }
349}