nym_client_core/client/
received_buffer.rs

1// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::client::helpers::get_time_now;
5use crate::client::replies::{
6    reply_controller::ReplyControllerSender, reply_storage::SentReplyKeys,
7};
8use futures::channel::mpsc;
9use futures::lock::Mutex;
10use futures::StreamExt;
11use nym_crypto::asymmetric::x25519;
12use nym_crypto::Digest;
13use nym_gateway_client::MixnetMessageReceiver;
14use nym_sphinx::anonymous_replies::requests::{
15    RepliableMessage, RepliableMessageContent, ReplyMessage, ReplyMessageContent,
16};
17use nym_sphinx::anonymous_replies::{encryption_key::EncryptionKeyDigest, SurbEncryptionKey};
18use nym_sphinx::message::{NymMessage, PlainMessage};
19use nym_sphinx::params::ReplySurbKeyDigestAlgorithm;
20use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError, ReconstructedMessage};
21use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
22use nym_task::ShutdownToken;
23use std::collections::HashSet;
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::*;
27
28// The interval at which we check for stale buffers
29const STALE_BUFFER_CHECK_INTERVAL: Duration = Duration::from_secs(10);
30
31// Buffer Requests to say "hey, send any reconstructed messages to this channel"
32// or to say "hey, I'm going offline, don't send anything more to me. Just buffer them instead"
33pub type ReceivedBufferRequestSender = mpsc::UnboundedSender<ReceivedBufferMessage>;
34pub type ReceivedBufferRequestReceiver = mpsc::UnboundedReceiver<ReceivedBufferMessage>;
35
36// The channel set for the above
37pub type ReconstructedMessagesSender = mpsc::UnboundedSender<Vec<ReconstructedMessage>>;
38pub type ReconstructedMessagesReceiver = mpsc::UnboundedReceiver<Vec<ReconstructedMessage>>;
39
40struct ReceivedMessagesBufferInner<R: MessageReceiver> {
41    messages: Vec<ReconstructedMessage>,
42    local_encryption_keypair: Arc<x25519::KeyPair>,
43
44    // TODO: looking how it 'looks' here, perhaps `MessageReceiver` should be renamed to something
45    // else instead.
46    message_receiver: R,
47    message_sender: Option<ReconstructedMessagesSender>,
48
49    // TODO: this will get cleared upon re-running the client
50    // but perhaps it should be changed to include timestamps of when the message was reconstructed
51    // and every now and then remove ids older than X
52    recently_reconstructed: HashSet<i32>,
53
54    stats_tx: ClientStatsSender,
55
56    // Periodically check for stale buffers to clean up
57    last_stale_check: crate::client::helpers::Instant,
58}
59
60impl<R: MessageReceiver> ReceivedMessagesBufferInner<R> {
61    fn recover_from_fragment(
62        &mut self,
63        fragment_data: &[u8],
64        fragment_data_size: usize,
65    ) -> Option<NymMessage> {
66        if nym_sphinx::cover::is_cover(fragment_data) {
67            trace!("The message was a loop cover message! Skipping it");
68            // NOTE: it's important to note that there is quite a bit of difference in size of
69            // received and sent packets due to the sphinx layers being removed by the exit gateway
70            // before it reaches the mixnet client.
71            self.stats_tx
72                .report(PacketStatisticsEvent::CoverPacketReceived(fragment_data_size).into());
73            return None;
74        }
75
76        self.stats_tx
77            .report(PacketStatisticsEvent::RealPacketReceived(fragment_data_size).into());
78
79        let fragment = match self.message_receiver.recover_fragment(fragment_data) {
80            Err(err) => {
81                warn!("failed to recover fragment from raw data: {err}. The whole underlying message might be corrupted and unrecoverable!");
82                return None;
83            }
84            Ok(frag) => frag,
85        };
86
87        if self.recently_reconstructed.contains(&fragment.id()) {
88            debug!("Received a chunk of already re-assembled message ({:?})! It probably got here because the ack got lost", fragment.id());
89            return None;
90        }
91
92        // if we returned an error the underlying message is malformed in some way
93        match self.message_receiver.insert_new_fragment(fragment) {
94            Err(err) => match err {
95                MessageRecoveryError::MalformedReconstructedMessage { source, used_sets } => {
96                    error!("message reconstruction failed - {source}. Attempting to re-use the message sets...");
97                    // TODO: should we really insert reconstructed sets? could this be abused for some attack?
98                    for set_id in used_sets {
99                        if !self.recently_reconstructed.insert(set_id) {
100                            // or perhaps we should even panic at this point?
101                            error!("Reconstructed another message containing already used set id!")
102                        }
103                    }
104                    None
105                }
106                _ => {
107                    error!("unexpected error occurred during message reconstruction: {err}");
108                    None
109                }
110            },
111            Ok(reconstruction_result) => match reconstruction_result {
112                Some((reconstructed_message, used_sets)) => {
113                    for set_id in used_sets {
114                        if !self.recently_reconstructed.insert(set_id) {
115                            // or perhaps we should even panic at this point?
116                            error!("Reconstructed another message containing already used set id!")
117                        }
118                    }
119                    Some(reconstructed_message)
120                }
121                None => None,
122            },
123        }
124    }
125
126    fn process_received_reply(
127        &mut self,
128        reply_ciphertext: &mut [u8],
129        reply_key: SurbEncryptionKey,
130    ) -> Result<Option<NymMessage>, MessageRecoveryError> {
131        let reply_ciphertext_size = reply_ciphertext.len();
132        // note: this performs decryption IN PLACE without extra allocation
133        self.message_receiver
134            .recover_plaintext_from_reply(reply_ciphertext, reply_key)?;
135        let fragment_data = reply_ciphertext;
136
137        Ok(self.recover_from_fragment(fragment_data, reply_ciphertext_size))
138    }
139
140    fn process_received_regular_packet(&mut self, mut raw_fragment: Vec<u8>) -> Option<NymMessage> {
141        let raw_fragment_size = raw_fragment.len();
142        let fragment_data = match self.message_receiver.recover_plaintext_from_regular_packet(
143            self.local_encryption_keypair.private_key(),
144            &mut raw_fragment,
145        ) {
146            Err(err) => {
147                warn!("failed to recover fragment data: {err}. The whole underlying message might be corrupted and unrecoverable!");
148                return None;
149            }
150            Ok(frag_data) => frag_data,
151        };
152
153        self.recover_from_fragment(fragment_data, raw_fragment_size)
154    }
155
156    fn cleanup_stale_buffers(&mut self) {
157        let now = get_time_now();
158        if now - self.last_stale_check > STALE_BUFFER_CHECK_INTERVAL {
159            self.last_stale_check = now;
160            self.message_receiver
161                .reconstructor()
162                .cleanup_stale_buffers();
163        }
164    }
165}
166
167#[derive(Debug, Clone)]
168// Note: you should NEVER create more than a single instance of this using 'new()'.
169// You should always use .clone() to create additional instances
170struct ReceivedMessagesBuffer<R: MessageReceiver> {
171    inner: Arc<Mutex<ReceivedMessagesBufferInner<R>>>,
172    reply_key_storage: SentReplyKeys,
173    reply_controller_sender: ReplyControllerSender,
174    shutdown_token: ShutdownToken,
175}
176
177impl<R: MessageReceiver> ReceivedMessagesBuffer<R> {
178    fn new(
179        local_encryption_keypair: Arc<x25519::KeyPair>,
180        reply_key_storage: SentReplyKeys,
181        reply_controller_sender: ReplyControllerSender,
182        stats_tx: ClientStatsSender,
183        shutdown_token: ShutdownToken,
184    ) -> Self {
185        ReceivedMessagesBuffer {
186            inner: Arc::new(Mutex::new(ReceivedMessagesBufferInner {
187                messages: Vec::new(),
188                local_encryption_keypair,
189                message_receiver: R::new(),
190                message_sender: None,
191                recently_reconstructed: HashSet::new(),
192                stats_tx,
193                last_stale_check: get_time_now(),
194            })),
195            reply_key_storage,
196            reply_controller_sender,
197            shutdown_token,
198        }
199    }
200
201    #[allow(clippy::panic)]
202    async fn disconnect_sender(&mut self) {
203        let mut guard = self.inner.lock().await;
204        if guard.message_sender.is_none() {
205            // in theory we could just ignore it, but that situation should have never happened
206            // in the first place, so this way we at least know we have an important bug to fix
207            panic!("trying to disconnect non-existent sender!")
208        }
209        guard.message_sender = None;
210    }
211
212    #[allow(clippy::panic)]
213    async fn connect_sender(&mut self, sender: ReconstructedMessagesSender) {
214        let mut guard = self.inner.lock().await;
215        if guard.message_sender.is_some() {
216            // in theory we could just ignore it, but that situation should have never happened
217            // in the first place, so this way we at least know we have an important bug to fix
218            panic!("trying overwrite an existing sender!")
219        }
220
221        // while we're at it, also empty the buffer if we happened to receive anything while
222        // no sender was connected
223        let stored_messages = std::mem::take(&mut guard.messages);
224        if !stored_messages.is_empty() {
225            if let Err(err) = sender.unbounded_send(stored_messages) {
226                error!("The sender channel we just received is already invalidated - {err:?}");
227                // put the values back to the buffer
228                // the returned error has two fields: err: SendError and val: T,
229                // where val is the value that was failed to get sent;
230                // it's returned by the `into_inner` call
231                guard.messages = err.into_inner();
232                return;
233            }
234        }
235        guard.message_sender = Some(sender);
236    }
237
238    fn handle_reconstructed_plain_messages(
239        &mut self,
240        msgs: Vec<PlainMessage>,
241    ) -> Vec<ReconstructedMessage> {
242        msgs.into_iter().map(Into::into).collect()
243    }
244
245    fn handle_reconstructed_repliable_messages(
246        &mut self,
247        msgs: Vec<RepliableMessage>,
248    ) -> Vec<ReconstructedMessage> {
249        let mut reconstructed = Vec::new();
250        for msg in msgs {
251            let (reply_surbs, from_surb_request) = match msg.content {
252                RepliableMessageContent::Data(content) => {
253                    let reply_surbs = content.reply_surbs;
254                    let message = content.message;
255
256                    trace!(
257                        "received message that also contained additional {} reply surbs from {:?}!",
258                        reply_surbs.len(),
259                        msg.sender_tag
260                    );
261
262                    reconstructed.push(ReconstructedMessage::new(message, msg.sender_tag));
263
264                    (reply_surbs, false)
265                }
266                RepliableMessageContent::AdditionalSurbs(content) => {
267                    let reply_surbs = content.reply_surbs;
268
269                    trace!(
270                        "received additional {} reply surbs from {:?}!",
271                        reply_surbs.len(),
272                        msg.sender_tag
273                    );
274                    (reply_surbs, true)
275                }
276                RepliableMessageContent::Heartbeat(content) => {
277                    let additional_reply_surbs = content.additional_reply_surbs;
278                    error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
279                    (additional_reply_surbs, false)
280                }
281                RepliableMessageContent::DataV2(content) => {
282                    let reply_surbs = content.reply_surbs;
283                    let message = content.message;
284
285                    trace!(
286                        "received message that also contained additional {} reply surbs from {:?}!",
287                        reply_surbs.len(),
288                        msg.sender_tag
289                    );
290
291                    reconstructed.push(ReconstructedMessage::new(message, msg.sender_tag));
292
293                    (reply_surbs, false)
294                }
295                RepliableMessageContent::AdditionalSurbsV2(content) => {
296                    let reply_surbs = content.reply_surbs;
297
298                    trace!(
299                        "received additional {} reply surbs from {:?}!",
300                        reply_surbs.len(),
301                        msg.sender_tag
302                    );
303                    (reply_surbs, true)
304                }
305                RepliableMessageContent::HeartbeatV2(content) => {
306                    let additional_reply_surbs = content.additional_reply_surbs;
307                    error!("received a repliable heartbeat message - we don't know how to handle it yet (and we won't know until future PRs)");
308                    (additional_reply_surbs, false)
309                }
310            };
311
312            if !reply_surbs.is_empty() {
313                if let Err(err) = self.reply_controller_sender.send_additional_surbs(
314                    msg.sender_tag,
315                    reply_surbs,
316                    from_surb_request,
317                ) {
318                    if !self.shutdown_token.is_cancelled() {
319                        error!("{err}");
320                    }
321                }
322            }
323        }
324        reconstructed
325    }
326
327    fn handle_reconstructed_reply_messages(
328        &mut self,
329        msgs: Vec<ReplyMessage>,
330    ) -> Vec<ReconstructedMessage> {
331        let mut reconstructed = Vec::new();
332        for msg in msgs {
333            match msg.content {
334                ReplyMessageContent::Data { message } => reconstructed.push(message.into()),
335                ReplyMessageContent::SurbRequest { recipient, amount } => {
336                    debug!("received request for {amount} additional reply SURBs from {recipient}");
337                    if let Err(err) = self
338                        .reply_controller_sender
339                        .send_additional_surbs_request(*recipient, amount)
340                    {
341                        if !self.shutdown_token.is_cancelled() {
342                            error!("{err}");
343                        }
344                    }
345                }
346            }
347        }
348        reconstructed
349    }
350
351    async fn handle_reconstructed_messages(&mut self, msgs: Vec<NymMessage>) {
352        if msgs.is_empty() {
353            return;
354        }
355
356        let mut plain_messages = Vec::new();
357        let mut repliable_messages = Vec::new();
358        let mut reply_messages = Vec::new();
359
360        for msg in msgs {
361            match msg {
362                NymMessage::Plain(plain) => plain_messages.push(plain),
363                NymMessage::Repliable(repliable) => repliable_messages.push(repliable),
364                NymMessage::Reply(reply) => reply_messages.push(reply),
365            }
366        }
367
368        let mut reconstructed_messages = self.handle_reconstructed_plain_messages(plain_messages);
369        reconstructed_messages
370            .append(&mut self.handle_reconstructed_repliable_messages(repliable_messages));
371        reconstructed_messages
372            .append(&mut self.handle_reconstructed_reply_messages(reply_messages));
373
374        let mut inner_guard = self.inner.lock().await;
375        debug!(
376            "Adding {:?} new messages to the buffer!",
377            reconstructed_messages.len()
378        );
379
380        if let Some(sender) = &inner_guard.message_sender {
381            trace!("Sending reconstructed messages to announced sender");
382            if let Err(err) = sender.unbounded_send(reconstructed_messages) {
383                warn!("The reconstructed message receiver went offline without explicit notification (relevant error: - {err})");
384                inner_guard.message_sender = None;
385                inner_guard.messages.extend(err.into_inner());
386            }
387        } else {
388            trace!("No sender available - buffering reconstructed messages");
389            inner_guard.messages.extend(reconstructed_messages)
390        }
391    }
392
393    // this function doesn't really belong here...
394    fn get_reply_key<'a>(
395        &self,
396        raw_message: &'a mut [u8],
397    ) -> Option<(SurbEncryptionKey, &'a mut [u8])> {
398        let reply_surb_digest_size = ReplySurbKeyDigestAlgorithm::output_size();
399        if raw_message.len() < reply_surb_digest_size {
400            return None;
401        }
402
403        let possible_key_digest =
404            EncryptionKeyDigest::clone_from_slice(&raw_message[..reply_surb_digest_size]);
405        self.reply_key_storage
406            .try_pop(possible_key_digest)
407            .map(|reply_encryption_key| {
408                (
409                    *reply_encryption_key,
410                    &mut raw_message[reply_surb_digest_size..],
411                )
412            })
413    }
414
415    async fn handle_new_received(
416        &mut self,
417        msgs: Vec<Vec<u8>>,
418    ) -> Result<(), MessageRecoveryError> {
419        trace!(
420            "Processing {:?} new message that might get added to the buffer!",
421            msgs.len()
422        );
423
424        let mut completed_messages = Vec::new();
425        let mut inner_guard = self.inner.lock().await;
426
427        // first check if this is a reply or a chunked message
428        // note: there's a possible information leakage associated with this check https://github.com/nymtech/nym/issues/296
429        for mut msg in msgs {
430            // check first `HasherOutputSize` bytes if they correspond to known encryption key
431            // if yes - this is a reply message
432            let completed_message =
433                if let Some((reply_key, reply_message)) = self.get_reply_key(&mut msg) {
434                    inner_guard.process_received_reply(reply_message, reply_key)?
435                } else {
436                    inner_guard.process_received_regular_packet(msg)
437                };
438
439            if let Some(completed) = completed_message {
440                debug!("received {completed}");
441                completed_messages.push(completed)
442            }
443        }
444
445        // Cleanup stale buffers, if there are any fragments that simply never arrived.
446        // We do this here as part of handling new received fragments so that we can keep the event
447        // loop focused on processing new messages.
448        inner_guard.cleanup_stale_buffers();
449
450        drop(inner_guard);
451
452        if !completed_messages.is_empty() {
453            self.handle_reconstructed_messages(completed_messages).await
454        }
455        Ok(())
456    }
457}
458
459pub enum ReceivedBufferMessage {
460    // Signals a websocket connection (or a native implementation) was established and we should stop buffering messages,
461    // and instead send them directly to the received channel
462    ReceiverAnnounce(ReconstructedMessagesSender),
463
464    // Explicit signal that Receiver connection will no longer accept messages
465    ReceiverDisconnect,
466}
467
468pub(crate) struct RequestReceiver<R: MessageReceiver> {
469    received_buffer: ReceivedMessagesBuffer<R>,
470    query_receiver: ReceivedBufferRequestReceiver,
471    shutdown_token: ShutdownToken,
472}
473
474impl<R: MessageReceiver> RequestReceiver<R> {
475    fn new(
476        received_buffer: ReceivedMessagesBuffer<R>,
477        query_receiver: ReceivedBufferRequestReceiver,
478        shutdown_token: ShutdownToken,
479    ) -> Self {
480        RequestReceiver {
481            received_buffer,
482            query_receiver,
483            shutdown_token,
484        }
485    }
486
487    async fn handle_message(&mut self, message: ReceivedBufferMessage) {
488        match message {
489            ReceivedBufferMessage::ReceiverAnnounce(sender) => {
490                self.received_buffer.connect_sender(sender).await;
491            }
492            ReceivedBufferMessage::ReceiverDisconnect => {
493                self.received_buffer.disconnect_sender().await
494            }
495        }
496    }
497
498    pub(crate) async fn run(&mut self) {
499        debug!("Started RequestReceiver with graceful shutdown support");
500        loop {
501            tokio::select! {
502                biased;
503                _ = self.shutdown_token.cancelled() => {
504                    tracing::trace!("RequestReceiver: Received shutdown");
505                    break;
506                }
507                request = self.query_receiver.next() => {
508                    if let Some(message) = request {
509                        self.handle_message(message).await
510                    } else {
511                        tracing::trace!("RequestReceiver: Stopping since channel closed");
512                        self.shutdown_token.cancelled().await;
513                        break;
514                    }
515                },
516            }
517        }
518        tracing::debug!("RequestReceiver: Exiting");
519    }
520}
521
522pub(crate) struct FragmentedMessageReceiver<R: MessageReceiver> {
523    received_buffer: ReceivedMessagesBuffer<R>,
524    mixnet_packet_receiver: MixnetMessageReceiver,
525    shutdown_token: ShutdownToken,
526}
527
528impl<R: MessageReceiver> FragmentedMessageReceiver<R> {
529    fn new(
530        received_buffer: ReceivedMessagesBuffer<R>,
531        mixnet_packet_receiver: MixnetMessageReceiver,
532        shutdown_token: ShutdownToken,
533    ) -> Self {
534        FragmentedMessageReceiver {
535            received_buffer,
536            mixnet_packet_receiver,
537            shutdown_token,
538        }
539    }
540
541    pub(crate) async fn run(&mut self) -> Result<(), MessageRecoveryError> {
542        debug!("Started FragmentedMessageReceiver with graceful shutdown support");
543        loop {
544            tokio::select! {
545                biased;
546                _ = self.shutdown_token.cancelled() => {
547                    tracing::trace!("FragmentedMessageReceiver: Received shutdown");
548                    break;
549                }
550                new_messages = self.mixnet_packet_receiver.next() => {
551                    if let Some(new_messages) = new_messages {
552                        self.received_buffer.handle_new_received(new_messages).await?;
553                    } else {
554                        tracing::trace!("FragmentedMessageReceiver: Stopping since channel closed");
555                        self.shutdown_token.cancelled().await;
556                        break;
557                    }
558                },
559
560            }
561        }
562        tracing::debug!("FragmentedMessageReceiver: Exiting");
563        Ok(())
564    }
565}
566
567pub(crate) struct ReceivedMessagesBufferController<R: MessageReceiver> {
568    fragmented_message_receiver: FragmentedMessageReceiver<R>,
569    request_receiver: RequestReceiver<R>,
570}
571
572impl<R: MessageReceiver + Clone + Send + 'static> ReceivedMessagesBufferController<R> {
573    pub(crate) fn new(
574        local_encryption_keypair: Arc<x25519::KeyPair>,
575        query_receiver: ReceivedBufferRequestReceiver,
576        mixnet_packet_receiver: MixnetMessageReceiver,
577        reply_key_storage: SentReplyKeys,
578        reply_controller_sender: ReplyControllerSender,
579        metrics_reporter: ClientStatsSender,
580        shutdown_token: ShutdownToken,
581    ) -> Self {
582        let received_buffer = ReceivedMessagesBuffer::new(
583            local_encryption_keypair,
584            reply_key_storage,
585            reply_controller_sender,
586            metrics_reporter,
587            shutdown_token.clone(),
588        );
589
590        ReceivedMessagesBufferController {
591            fragmented_message_receiver: FragmentedMessageReceiver::new(
592                received_buffer.clone(),
593                mixnet_packet_receiver,
594                shutdown_token.clone(),
595            ),
596            request_receiver: RequestReceiver::new(
597                received_buffer,
598                query_receiver,
599                shutdown_token.clone(),
600            ),
601        }
602    }
603
604    pub(crate) fn into_tasks(self) -> (FragmentedMessageReceiver<R>, RequestReceiver<R>) {
605        (self.fragmented_message_receiver, self.request_receiver)
606    }
607}