1use crate::client::helpers::get_time_now;
5use crate::client::replies::{
6 reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
7};
8use futures::channel::mpsc;
9use futures::lock::Mutex;
10use futures::StreamExt;
11use nym_crypto::asymmetric::x25519;
12use nym_crypto::Digest;
13use nym_gateway_client::MixnetMessageReceiver;
14use nym_sphinx::anonymous_replies::requests::{
15 RepliableMessage, RepliableMessageContent, ReplyMessage, ReplyMessageContent,
16};
17use nym_sphinx::anonymous_replies::{encryption_key::EncryptionKeyDigest, SurbEncryptionKey};
18use nym_sphinx::message::{NymMessage, PlainMessage};
19use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
20use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
21use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
22use nym_task::ShutdownToken;
23use std::collections::HashSet;
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::*;
27
28const STALE_BUFFER_CHECK_INTERVAL: Duration = Duration::from_secs(10);
30
31pub type ReceivedBufferRequestSender = mpsc::UnboundedSender<ReceivedBufferMessage>;
34pub type ReceivedBufferRequestReceiver = mpsc::UnboundedReceiver<ReceivedBufferMessage>;
35
36pub type ReconstructedMessagesSender = mpsc::UnboundedSender<Vec<ReconstructedMessage>>;
38pub type ReconstructedMessagesReceiver = mpsc::UnboundedReceiver<Vec<ReconstructedMessage>>;
39
40struct ReceivedMessagesBufferInner<R: MessageReceiver> {
41 messages: Vec<ReconstructedMessage>,
42 local_encryption_keypair: Arc<x25519::KeyPair>,
43
44 message_receiver: R,
47 message_sender: Option<ReconstructedMessagesSender>,
48
49 recently_reconstructed: HashSet<i32>,
53
54 stats_tx: ClientStatsSender,
55
56 last_stale_check: crate::client::helpers::Instant,
58}
59
60impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
61 fn recover_from_fragment(
62 &mut self,
63 fragment_data: &[u8],
64 fragment_data_size: usize,
65 ) -> Option<NymMessage> {
66 if nym_sphinx::cover::is_cover(fragment_data) {
67 trace!("The message was a loop cover message! Skipping it");
68 self.stats_tx
72 .report(PacketStatisticsEvent::CoverPacketReceived(fragment_data_size).into());
73 return None;
74 }
75
76 self.stats_tx
77 .report(PacketStatisticsEvent::RealPacketReceived(fragment_data_size).into());
78
79 let fragment = match self.message_receiver.recover_fragment(fragment_data) {
80 Err(err) => {
81 warn!("failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!");
82 return None;
83 }
84 Ok(frag) => frag,
85 };
86
87 if self.recently_reconstructed.contains(&fragment.id()) {
88 debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
89 return None;
90 }
91
92 match self.message_receiver.insert_new_fragment(fragment) {
94 Err(err) => match err {
95 MessageRecoveryError::MalformedReconstructedMessage { source, used_sets } => {
96 error!("message reconstruction failed - {source}. Attempting to re-use the message sets...");
97 for set_id in used_sets {
99 if !self.recently_reconstructed.insert(set_id) {
100 error!("Reconstructed another message containing already used set id!")
102 }
103 }
104 None
105 }
106 _ => {
107 error!("unexpected error occurred during message reconstruction: {err}");
108 None
109 }
110 },
111 Ok(reconstruction_result) => match reconstruction_result {
112 Some((reconstructed_message, used_sets)) => {
113 for set_id in used_sets {
114 if !self.recently_reconstructed.insert(set_id) {
115 error!("Reconstructed another message containing already used set id!")
117 }
118 }
119 Some(reconstructed_message)
120 }
121 None => None,
122 },
123 }
124 }
125
126 fn process_received_reply(
127 &mut self,
128 reply_ciphertext: &mut [u8],
129 reply_key: SurbEncryptionKey,
130 ) -> Result<Option<NymMessage>, MessageRecoveryError> {
131 let reply_ciphertext_size = reply_ciphertext.len();
132 self.message_receiver
134 .recover_plaintext_from_reply(reply_ciphertext, reply_key)?;
135 let fragment_data = reply_ciphertext;
136
137 Ok(self.recover_from_fragment(fragment_data, reply_ciphertext_size))
138 }
139
140 fn process_received_regular_packet(&mut self, mut raw_fragment: Vec<u8>) -> Option<NymMessage> {
141 let raw_fragment_size = raw_fragment.len();
142 let fragment_data = match self.message_receiver.recover_plaintext_from_regular_packet(
143 self.local_encryption_keypair.private_key(),
144 &mut raw_fragment,
145 ) {
146 Err(err) => {
147 warn!("failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!");
148 return None;
149 }
150 Ok(frag_data) => frag_data,
151 };
152
153 self.recover_from_fragment(fragment_data, raw_fragment_size)
154 }
155
156 fn cleanup_stale_buffers(&mut self) {
157 let now = get_time_now();
158 if now - self.last_stale_check > STALE_BUFFER_CHECK_INTERVAL {
159 self.last_stale_check = now;
160 self.message_receiver
161 .reconstructor()
162 .cleanup_stale_buffers();
163 }
164 }
165}
166
167#[derive(Debug, Clone)]
168struct ReceivedMessagesBuffer<R: MessageReceiver> {
171 inner: Arc<Mutex<ReceivedMessagesBufferInner<R>>>,
172 reply_key_storage: SentReplyKeys,
173 reply_controller_sender: ReplyControllerSender,
174 shutdown_token: ShutdownToken,
175}
176
177impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
178 fn new(
179 local_encryption_keypair: Arc<x25519::KeyPair>,
180 reply_key_storage: SentReplyKeys,
181 reply_controller_sender: ReplyControllerSender,
182 stats_tx: ClientStatsSender,
183 shutdown_token: ShutdownToken,
184 ) -> Self {
185 ReceivedMessagesBuffer {
186 inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
187 messages: Vec::new(),
188 local_encryption_keypair,
189 message_receiver: R::new(),
190 message_sender: None,
191 recently_reconstructed: HashSet::new(),
192 stats_tx,
193 last_stale_check: get_time_now(),
194 })),
195 reply_key_storage,
196 reply_controller_sender,
197 shutdown_token,
198 }
199 }
200
201 #[allow(clippy::panic)]
202 async fn disconnect_sender(&mut self) {
203 let mut guard = self.inner.lock().await;
204 if guard.message_sender.is_none() {
205 panic!("trying to disconnect non-existent sender!")
208 }
209 guard.message_sender = None;
210 }
211
212 #[allow(clippy::panic)]
213 async fn connect_sender(&mut self, sender: ReconstructedMessagesSender) {
214 let mut guard = self.inner.lock().await;
215 if guard.message_sender.is_some() {
216 panic!("trying overwrite an existing sender!")
219 }
220
221 let stored_messages = std::mem::take(&mut guard.messages);
224 if !stored_messages.is_empty() {
225 if let Err(err) = sender.unbounded_send(stored_messages) {
226 error!("The sender channel we just received is already invalidated - {err:?}");
227 guard.messages = err.into_inner();
232 return;
233 }
234 }
235 guard.message_sender = Some(sender);
236 }
237
238 fn handle_reconstructed_plain_messages(
239 &mut self,
240 msgs: Vec<PlainMessage>,
241 ) -> Vec<ReconstructedMessage> {
242 msgs.into_iter().map(Into::into).collect()
243 }
244
245 fn handle_reconstructed_repliable_messages(
246 &mut self,
247 msgs: Vec<RepliableMessage>,
248 ) -> Vec<ReconstructedMessage> {
249 let mut reconstructed = Vec::new();
250 for msg in msgs {
251 let (reply_surbs, from_surb_request) = match msg.content {
252 RepliableMessageContent::Data(content) => {
253 let reply_surbs = content.reply_surbs;
254 let message = content.message;
255
256 trace!(
257 "received message that also contained additional {} reply surbs from {:?}!",
258 reply_surbs.len(),
259 msg.sender_tag
260 );
261
262 reconstructed.push(ReconstructedMessage::new(message, msg.sender_tag));
263
264 (reply_surbs, false)
265 }
266 RepliableMessageContent::AdditionalSurbs(content) => {
267 let reply_surbs = content.reply_surbs;
268
269 trace!(
270 "received additional {} reply surbs from {:?}!",
271 reply_surbs.len(),
272 msg.sender_tag
273 );
274 (reply_surbs, true)
275 }
276 RepliableMessageContent::Heartbeat(content) => {
277 let additional_reply_surbs = content.additional_reply_surbs;
278 error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
279 (additional_reply_surbs, false)
280 }
281 RepliableMessageContent::DataV2(content) => {
282 let reply_surbs = content.reply_surbs;
283 let message = content.message;
284
285 trace!(
286 "received message that also contained additional {} reply surbs from {:?}!",
287 reply_surbs.len(),
288 msg.sender_tag
289 );
290
291 reconstructed.push(ReconstructedMessage::new(message, msg.sender_tag));
292
293 (reply_surbs, false)
294 }
295 RepliableMessageContent::AdditionalSurbsV2(content) => {
296 let reply_surbs = content.reply_surbs;
297
298 trace!(
299 "received additional {} reply surbs from {:?}!",
300 reply_surbs.len(),
301 msg.sender_tag
302 );
303 (reply_surbs, true)
304 }
305 RepliableMessageContent::HeartbeatV2(content) => {
306 let additional_reply_surbs = content.additional_reply_surbs;
307 error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
308 (additional_reply_surbs, false)
309 }
310 };
311
312 if !reply_surbs.is_empty() {
313 if let Err(err) = self.reply_controller_sender.send_additional_surbs(
314 msg.sender_tag,
315 reply_surbs,
316 from_surb_request,
317 ) {
318 if !self.shutdown_token.is_cancelled() {
319 error!("{err}");
320 }
321 }
322 }
323 }
324 reconstructed
325 }
326
327 fn handle_reconstructed_reply_messages(
328 &mut self,
329 msgs: Vec<ReplyMessage>,
330 ) -> Vec<ReconstructedMessage> {
331 let mut reconstructed = Vec::new();
332 for msg in msgs {
333 match msg.content {
334 ReplyMessageContent::Data { message } => reconstructed.push(message.into()),
335 ReplyMessageContent::SurbRequest { recipient, amount } => {
336 debug!("received request for {amount} additional reply SURBs from {recipient}");
337 if let Err(err) = self
338 .reply_controller_sender
339 .send_additional_surbs_request(*recipient, amount)
340 {
341 if !self.shutdown_token.is_cancelled() {
342 error!("{err}");
343 }
344 }
345 }
346 }
347 }
348 reconstructed
349 }
350
351 async fn handle_reconstructed_messages(&mut self, msgs: Vec<NymMessage>) {
352 if msgs.is_empty() {
353 return;
354 }
355
356 let mut plain_messages = Vec::new();
357 let mut repliable_messages = Vec::new();
358 let mut reply_messages = Vec::new();
359
360 for msg in msgs {
361 match msg {
362 NymMessage::Plain(plain) => plain_messages.push(plain),
363 NymMessage::Repliable(repliable) => repliable_messages.push(repliable),
364 NymMessage::Reply(reply) => reply_messages.push(reply),
365 }
366 }
367
368 let mut reconstructed_messages = self.handle_reconstructed_plain_messages(plain_messages);
369 reconstructed_messages
370 .append(&mut self.handle_reconstructed_repliable_messages(repliable_messages));
371 reconstructed_messages
372 .append(&mut self.handle_reconstructed_reply_messages(reply_messages));
373
374 let mut inner_guard = self.inner.lock().await;
375 debug!(
376 "Adding {:?} new messages to the buffer!",
377 reconstructed_messages.len()
378 );
379
380 if let Some(sender) = &inner_guard.message_sender {
381 trace!("Sending reconstructed messages to announced sender");
382 if let Err(err) = sender.unbounded_send(reconstructed_messages) {
383 warn!("The reconstructed message receiver went offline without explicit notification (relevant error: - {err})");
384 inner_guard.message_sender = None;
385 inner_guard.messages.extend(err.into_inner());
386 }
387 } else {
388 trace!("No sender available - buffering reconstructed messages");
389 inner_guard.messages.extend(reconstructed_messages)
390 }
391 }
392
393 fn get_reply_key<'a>(
395 &self,
396 raw_message: &'a mut [u8],
397 ) -> Option<(SurbEncryptionKey, &'a mut [u8])> {
398 let reply_surb_digest_size = ReplySurbKeyDigestAlgorithm::output_size();
399 if raw_message.len() < reply_surb_digest_size {
400 return None;
401 }
402
403 let possible_key_digest =
404 EncryptionKeyDigest::clone_from_slice(&raw_message[..reply_surb_digest_size]);
405 self.reply_key_storage
406 .try_pop(possible_key_digest)
407 .map(|reply_encryption_key| {
408 (
409 *reply_encryption_key,
410 &mut raw_message[reply_surb_digest_size..],
411 )
412 })
413 }
414
415 async fn handle_new_received(
416 &mut self,
417 msgs: Vec<Vec<u8>>,
418 ) -> Result<(), MessageRecoveryError> {
419 trace!(
420 "Processing {:?} new message that might get added to the buffer!",
421 msgs.len()
422 );
423
424 let mut completed_messages = Vec::new();
425 let mut inner_guard = self.inner.lock().await;
426
427 for mut msg in msgs {
430 let completed_message =
433 if let Some((reply_key, reply_message)) = self.get_reply_key(&mut msg) {
434 inner_guard.process_received_reply(reply_message, reply_key)?
435 } else {
436 inner_guard.process_received_regular_packet(msg)
437 };
438
439 if let Some(completed) = completed_message {
440 debug!("received {completed}");
441 completed_messages.push(completed)
442 }
443 }
444
445 inner_guard.cleanup_stale_buffers();
449
450 drop(inner_guard);
451
452 if !completed_messages.is_empty() {
453 self.handle_reconstructed_messages(completed_messages).await
454 }
455 Ok(())
456 }
457}
458
459pub enum ReceivedBufferMessage {
460 ReceiverAnnounce(ReconstructedMessagesSender),
463
464 ReceiverDisconnect,
466}
467
468pub(crate) struct RequestReceiver<R: MessageReceiver> {
469 received_buffer: ReceivedMessagesBuffer<R>,
470 query_receiver: ReceivedBufferRequestReceiver,
471 shutdown_token: ShutdownToken,
472}
473
474impl<R: MessageReceiver> RequestReceiver<R> {
475 fn new(
476 received_buffer: ReceivedMessagesBuffer<R>,
477 query_receiver: ReceivedBufferRequestReceiver,
478 shutdown_token: ShutdownToken,
479 ) -> Self {
480 RequestReceiver {
481 received_buffer,
482 query_receiver,
483 shutdown_token,
484 }
485 }
486
487 async fn handle_message(&mut self, message: ReceivedBufferMessage) {
488 match message {
489 ReceivedBufferMessage::ReceiverAnnounce(sender) => {
490 self.received_buffer.connect_sender(sender).await;
491 }
492 ReceivedBufferMessage::ReceiverDisconnect => {
493 self.received_buffer.disconnect_sender().await
494 }
495 }
496 }
497
498 pub(crate) async fn run(&mut self) {
499 debug!("Started RequestReceiver with graceful shutdown support");
500 loop {
501 tokio::select! {
502 biased;
503 _ = self.shutdown_token.cancelled() => {
504 tracing::trace!("RequestReceiver: Received shutdown");
505 break;
506 }
507 request = self.query_receiver.next() => {
508 if let Some(message) = request {
509 self.handle_message(message).await
510 } else {
511 tracing::trace!("RequestReceiver: Stopping since channel closed");
512 self.shutdown_token.cancelled().await;
513 break;
514 }
515 },
516 }
517 }
518 tracing::debug!("RequestReceiver: Exiting");
519 }
520}
521
522pub(crate) struct FragmentedMessageReceiver<R: MessageReceiver> {
523 received_buffer: ReceivedMessagesBuffer<R>,
524 mixnet_packet_receiver: MixnetMessageReceiver,
525 shutdown_token: ShutdownToken,
526}
527
528impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
529 fn new(
530 received_buffer: ReceivedMessagesBuffer<R>,
531 mixnet_packet_receiver: MixnetMessageReceiver,
532 shutdown_token: ShutdownToken,
533 ) -> Self {
534 FragmentedMessageReceiver {
535 received_buffer,
536 mixnet_packet_receiver,
537 shutdown_token,
538 }
539 }
540
541 pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
542 debug!("Started FragmentedMessageReceiver with graceful shutdown support");
543 loop {
544 tokio::select! {
545 biased;
546 _ = self.shutdown_token.cancelled() => {
547 tracing::trace!("FragmentedMessageReceiver: Received shutdown");
548 break;
549 }
550 new_messages = self.mixnet_packet_receiver.next() => {
551 if let Some(new_messages) = new_messages {
552 self.received_buffer.handle_new_received(new_messages).await?;
553 } else {
554 tracing::trace!("FragmentedMessageReceiver: Stopping since channel closed");
555 self.shutdown_token.cancelled().await;
556 break;
557 }
558 },
559
560 }
561 }
562 tracing::debug!("FragmentedMessageReceiver: Exiting");
563 Ok(())
564 }
565}
566
567pub(crate) struct ReceivedMessagesBufferController<R: MessageReceiver> {
568 fragmented_message_receiver: FragmentedMessageReceiver<R>,
569 request_receiver: RequestReceiver<R>,
570}
571
572impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferController<R> {
573 pub(crate) fn new(
574 local_encryption_keypair: Arc<x25519::KeyPair>,
575 query_receiver: ReceivedBufferRequestReceiver,
576 mixnet_packet_receiver: MixnetMessageReceiver,
577 reply_key_storage: SentReplyKeys,
578 reply_controller_sender: ReplyControllerSender,
579 metrics_reporter: ClientStatsSender,
580 shutdown_token: ShutdownToken,
581 ) -> Self {
582 let received_buffer = ReceivedMessagesBuffer::new(
583 local_encryption_keypair,
584 reply_key_storage,
585 reply_controller_sender,
586 metrics_reporter,
587 shutdown_token.clone(),
588 );
589
590 ReceivedMessagesBufferController {
591 fragmented_message_receiver: FragmentedMessageReceiver::new(
592 received_buffer.clone(),
593 mixnet_packet_receiver,
594 shutdown_token.clone(),
595 ),
596 request_receiver: RequestReceiver::new(
597 received_buffer,
598 query_receiver,
599 shutdown_token.clone(),
600 ),
601 }
602 }
603
604 pub(crate) fn into_tasks(self) -> (FragmentedMessageReceiver<R>, RequestReceiver<R>) {
605 (self.fragmented_message_receiver, self.request_receiver)
606 }
607}