nym_sdk/mixnet/
native_client.rs1use crate::mixnet::client::MixnetClientBuilder;
2use crate::mixnet::traits::MixnetMessageSender;
3use crate::{Error, Result};
4use async_trait::async_trait;
5use futures::{ready, Stream, StreamExt};
6use log::{debug, error};
7use nym_client_core::client::base_client::GatewayConnection;
8use nym_client_core::client::mix_traffic::ClientRequestSender;
9use nym_client_core::client::{
10 base_client::{ClientInput, ClientOutput, ClientState},
11 inbound_messages::InputMessage,
12 received_buffer::ReconstructedMessagesReceiver,
13};
14use nym_client_core::config::{ForgetMe, RememberMe};
15use nym_crypto::asymmetric::ed25519;
16use nym_gateway_requests::ClientRequest;
17use nym_sphinx::addressing::clients::Recipient;
18use nym_sphinx::{params::PacketType, receiver::ReconstructedMessage};
19use nym_statistics_common::clients::{ClientStatsEvents, ClientStatsSender};
20use nym_task::connections::{ConnectionCommandSender, LaneQueueLengths};
21use nym_task::ShutdownTracker;
22use nym_topology::{NymRouteProvider, NymTopology};
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::{Context, Poll};
26use tokio::sync::RwLockReadGuard;
27use tokio_util::sync::CancellationToken;
28
29pub struct MixnetClient {
31 pub(crate) nym_address: Recipient,
33
34 pub(crate) identity_keys: Arc<ed25519::KeyPair>,
35
36 pub(crate) client_input: ClientInput,
39
40 #[allow(dead_code)]
43 pub(crate) client_output: ClientOutput,
44
45 pub(crate) client_state: ClientState,
48
49 pub(crate) reconstructed_receiver: ReconstructedMessagesReceiver,
51
52 pub(crate) stats_events_reporter: ClientStatsSender,
54
55 pub(crate) shutdown_handle: ShutdownTracker,
57 pub(crate) packet_type: Option<PacketType>,
58
59 _buffered: Vec<ReconstructedMessage>,
61 pub(crate) forget_me: ForgetMe,
62 pub(crate) remember_me: RememberMe,
63}
64
65impl MixnetClient {
66 #[allow(clippy::too_many_arguments)]
67 pub(crate) fn new(
68 nym_address: Recipient,
69 identity_keys: Arc<ed25519::KeyPair>,
70 client_input: ClientInput,
71 client_output: ClientOutput,
72 client_state: ClientState,
73 reconstructed_receiver: ReconstructedMessagesReceiver,
74 stats_events_reporter: ClientStatsSender,
75 task_handle: ShutdownTracker,
76 packet_type: Option<PacketType>,
77 forget_me: ForgetMe,
78 remember_me: RememberMe,
79 ) -> Self {
80 Self {
81 nym_address,
82 identity_keys,
83 client_input,
84 client_output,
85 client_state,
86 reconstructed_receiver,
87 stats_events_reporter,
88 shutdown_handle: task_handle,
89 packet_type,
90 _buffered: Vec::new(),
91 forget_me,
92 remember_me,
93 }
94 }
95
96 pub async fn connect_new() -> Result<Self> {
111 MixnetClientBuilder::new_ephemeral()
112 .build()?
113 .connect_to_mixnet()
114 .await
115 }
116
117 pub fn nym_address(&self) -> &Recipient {
120 &self.nym_address
121 }
122
123 pub fn cancellation_token(&self) -> CancellationToken {
125 self.shutdown_handle.child_shutdown_token().inner().clone()
126 }
127
128 pub fn client_request_sender(&self) -> ClientRequestSender {
129 self.client_input.client_request_sender.clone()
130 }
131
132 pub fn identity_keypair(&self) -> Arc<ed25519::KeyPair> {
134 self.identity_keys.clone()
135 }
136
137 pub fn sign(&self, data: &[u8]) -> ed25519::Signature {
139 self.identity_keys.private_key().sign(data)
140 }
141
142 pub fn sign_text(&self, text: &str) -> String {
145 self.identity_keys.private_key().sign_text(text)
146 }
147
148 pub fn gateway_connection(&self) -> GatewayConnection {
150 self.client_state.gateway_connection
151 }
152
153 pub fn split_sender(&self) -> MixnetClientSender {
156 MixnetClientSender {
157 client_input: self.client_input.clone(),
158 packet_type: self.packet_type,
159 }
160 }
161
162 pub fn connection_command_sender(&self) -> ConnectionCommandSender {
166 self.client_input.connection_command_sender.clone()
167 }
168
169 pub fn shared_lane_queue_lengths(&self) -> LaneQueueLengths {
172 self.client_state.shared_lane_queue_lengths.clone()
173 }
174
175 pub async fn manually_overwrite_topology(&self, new_topology: NymTopology) {
178 self.client_state
179 .topology_accessor
180 .manually_change_topology(new_topology)
181 .await
182 }
183
184 pub async fn read_current_route_provider(
186 &self,
187 ) -> Option<RwLockReadGuard<'_, NymRouteProvider>> {
188 self.client_state
189 .topology_accessor
190 .current_route_provider()
191 .await
192 }
193
194 pub fn restore_automatic_topology_refreshing(&self) {
196 self.client_state.topology_accessor.release_manual_control()
197 }
198
199 pub async fn wait_for_messages(&mut self) -> Option<Vec<ReconstructedMessage>> {
201 self.reconstructed_receiver.next().await
202 }
203
204 pub async fn on_messages<F>(&mut self, fun: F)
206 where
207 F: Fn(ReconstructedMessage),
208 {
209 while let Some(msgs) = self.wait_for_messages().await {
210 for msg in msgs {
211 fun(msg)
212 }
213 }
214 }
215
216 pub fn send_stats_event(&self, event: ClientStatsEvents) {
217 self.stats_events_reporter.report(event);
218 }
219
220 pub fn stats_events_reporter(&self) -> ClientStatsSender {
222 self.stats_events_reporter.clone()
223 }
224
225 pub async fn disconnect(self) {
228 if self.forget_me.any() {
229 log::debug!("Sending forget me request: {:?}", self.forget_me);
230 match self.send_forget_me().await {
231 Ok(_) => (),
232 Err(e) => error!("Failed to send forget me request: {e}"),
233 };
234 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
235 } else if self.remember_me.stats() {
236 log::debug!("Sending remember me request: {:?}", self.remember_me);
237 match self.send_remember_me().await {
238 Ok(_) => (),
239 Err(e) => error!("Failed to send remember me request: {e}"),
240 };
241 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
242 }
243
244 self.shutdown_handle.shutdown().await;
245 }
246
247 pub async fn send_forget_me(&self) -> Result<()> {
248 let client_request = ClientRequest::ForgetMe {
249 client: self.forget_me.client(),
250 stats: self.forget_me.stats(),
251 };
252 match self
253 .client_input
254 .client_request_sender
255 .send(client_request)
256 .await
257 {
258 Ok(_) => Ok(()),
259 Err(e) => {
260 error!("Failed to send forget me request: {e}");
261 Err(Error::MessageSendingFailure)
262 }
263 }
264 }
265
266 pub async fn send_remember_me(&self) -> Result<()> {
267 let client_request = ClientRequest::RememberMe {
268 session_type: self.remember_me.session_type(),
269 };
270 match self
271 .client_input
272 .client_request_sender
273 .send(client_request)
274 .await
275 {
276 Ok(_) => Ok(()),
277 Err(e) => {
278 error!("Failed to send forget me request: {e}");
279 Err(Error::MessageSendingFailure)
280 }
281 }
282 }
283}
284
285#[derive(Clone)]
286pub struct MixnetClientSender {
287 client_input: ClientInput,
288 packet_type: Option<PacketType>,
289}
290
291impl Stream for MixnetClient {
292 type Item = ReconstructedMessage;
293
294 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295 if let Some(next) = self._buffered.pop() {
296 cx.waker().wake_by_ref();
297 return Poll::Ready(Some(next));
298 }
299 match ready!(Pin::new(&mut self.reconstructed_receiver).poll_next(cx)) {
300 None => Poll::Ready(None),
301 Some(mut msgs) => {
302 if let Some(next) = msgs.pop() {
304 if !msgs.is_empty() {
307 self._buffered = msgs;
308 cx.waker().wake_by_ref();
309 }
310 Poll::Ready(Some(next))
311 } else {
312 debug!("the reconstructed messages vector is empty");
315 cx.waker().wake_by_ref();
316 Poll::Pending
317 }
318 }
319 }
320 }
321}
322
323#[async_trait]
324impl MixnetMessageSender for MixnetClient {
325 fn packet_type(&self) -> Option<PacketType> {
326 self.packet_type
327 }
328
329 async fn send(&self, message: InputMessage) -> Result<()> {
330 self.client_input
331 .send(message)
332 .await
333 .map_err(|_| Error::MessageSendingFailure)
334 }
335}
336
337#[async_trait]
338impl MixnetMessageSender for MixnetClientSender {
339 fn packet_type(&self) -> Option<PacketType> {
340 self.packet_type
341 }
342
343 async fn send(&self, message: InputMessage) -> Result<()> {
344 self.client_input
345 .send(message)
346 .await
347 .map_err(|_| Error::MessageSendingFailure)
348 }
349}