s2n_quic_transport/space/
mod.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    ack,
6    ack::AckManager,
7    connection, endpoint, path,
8    path::{path_event, Path},
9    processed_packet::ProcessedPacket,
10    stream::Manager as _,
11    transmission,
12};
13use bytes::Bytes;
14use core::{
15    any::Any,
16    fmt,
17    ops::RangeInclusive,
18    task::{Poll, Waker},
19};
20use s2n_codec::DecoderBufferMut;
21use s2n_quic_core::{
22    application::ServerName,
23    connection::{limits::Limits, InitialId, PeerId},
24    crypto::{tls, tls::Session, CryptoSuite, Key},
25    event::{self, IntoEvent},
26    frame::{
27        ack::AckRanges, crypto::CryptoRef, datagram::DatagramRef, stream::StreamRef, Ack,
28        ConnectionClose, DataBlocked, DcStatelessResetTokens, HandshakeDone, MaxData,
29        MaxStreamData, MaxStreams, NewConnectionId, NewToken, PathChallenge, PathResponse,
30        ResetStream, RetireConnectionId, StopSending, StreamDataBlocked, StreamsBlocked,
31    },
32    inet::DatagramInfo,
33    packet::number::{PacketNumber, PacketNumberSpace},
34    time::{timer, Timestamp},
35    transport,
36    varint::VarInt,
37};
38
39mod application;
40mod crypto_stream;
41pub(crate) mod datagram;
42mod handshake;
43mod handshake_status;
44mod initial;
45mod keep_alive;
46mod session_context;
47mod tx_packet_numbers;
48
49pub(crate) use application::ApplicationSpace;
50pub(crate) use crypto_stream::CryptoStream;
51pub(crate) use handshake::HandshakeSpace;
52pub(crate) use handshake_status::HandshakeStatus;
53pub(crate) use initial::InitialSpace;
54pub(crate) use session_context::SessionContext;
55pub(crate) use tx_packet_numbers::TxPacketNumbers;
56
57struct SessionInfo<Config: endpoint::Config> {
58    session: <Config::TLSEndpoint as tls::Endpoint>::Session,
59    initial_cid: InitialId,
60}
61
62pub struct PacketSpaceManager<Config: endpoint::Config> {
63    session_info: Option<SessionInfo<Config>>,
64    retry_cid: Option<Box<PeerId>>,
65    initial: Option<Box<InitialSpace<Config>>>,
66    handshake: Option<Box<HandshakeSpace<Config>>>,
67    pub tls_context: Option<Box<dyn Any + Send>>,
68    application: Option<Box<ApplicationSpace<Config>>>,
69    zero_rtt_crypto:
70        Option<Box<<<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::ZeroRttKey>>,
71    handshake_status: HandshakeStatus,
72    /// Server Name Indication
73    pub server_name: Option<ServerName>,
74    //= https://www.rfc-editor.org/rfc/rfc9000#section-7
75    //# Endpoints MUST explicitly negotiate an application protocol.
76
77    //= https://www.rfc-editor.org/rfc/rfc9001#section-8.1
78    //# Unless
79    //# another mechanism is used for agreeing on an application protocol,
80    //# endpoints MUST use ALPN for this purpose.
81    pub application_protocol: Bytes,
82}
83
84impl<Config: endpoint::Config> fmt::Debug for PacketSpaceManager<Config> {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("PacketSpaceManager")
87            .field("initial", &self.initial)
88            .field("handshake", &self.handshake)
89            .field("application", &self.application)
90            .field("handshake_status", &self.handshake_status)
91            .finish()
92    }
93}
94
95macro_rules! packet_space_api {
96    ($ty:ty, $field:ident, $get_mut:ident) => {
97        #[allow(dead_code)]
98        pub fn $field(&self) -> Option<&$ty> {
99            self.$field.as_ref().map(Box::as_ref)
100        }
101
102        pub fn $get_mut(&mut self) -> Option<(&mut $ty, &mut HandshakeStatus)> {
103            let space = self.$field.as_mut().map(Box::as_mut)?;
104            Some((space, &mut self.handshake_status))
105        }
106    };
107}
108
109impl<Config: endpoint::Config> PacketSpaceManager<Config> {
110    pub fn new<Pub: event::ConnectionPublisher>(
111        initial_cid: InitialId,
112        session: <Config::TLSEndpoint as tls::Endpoint>::Session,
113        initial_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialKey,
114        header_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialHeaderKey,
115        now: Timestamp,
116        publisher: &mut Pub,
117    ) -> Self {
118        let ack_manager = AckManager::new(PacketNumberSpace::Initial, ack::Settings::EARLY);
119
120        publisher.on_key_update(event::builder::KeyUpdate {
121            key_type: event::builder::KeyType::Initial,
122            cipher_suite: initial_key.cipher_suite().into_event(),
123        });
124        Self {
125            session_info: Some(SessionInfo {
126                session,
127                initial_cid,
128            }),
129            tls_context: None,
130            retry_cid: None,
131            initial: Some(Box::new(InitialSpace::new(
132                initial_key,
133                header_key,
134                now,
135                ack_manager,
136            ))),
137            handshake: None,
138            application: None,
139            zero_rtt_crypto: None,
140            handshake_status: HandshakeStatus::default(),
141            server_name: None,
142            application_protocol: Bytes::new(),
143        }
144    }
145
146    packet_space_api!(InitialSpace<Config>, initial, initial_mut);
147
148    packet_space_api!(HandshakeSpace<Config>, handshake, handshake_mut);
149
150    packet_space_api!(ApplicationSpace<Config>, application, application_mut);
151
152    #[allow(dead_code)] // 0RTT hasn't been started yet
153    pub fn zero_rtt_crypto(
154        &self,
155    ) -> Option<&<<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::ZeroRttKey> {
156        self.zero_rtt_crypto.as_ref().map(Box::as_ref)
157    }
158
159    /// Discard the initial packet space
160    pub fn discard_initial<Pub: event::ConnectionPublisher>(
161        &mut self,
162        path_manager: &mut path::Manager<Config>,
163        random_generator: &mut Config::RandomGenerator,
164        now: Timestamp,
165        publisher: &mut Pub,
166    ) {
167        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2
168        //# When Initial or Handshake keys are discarded, the PTO and loss
169        //# detection timers MUST be reset, because discarding keys indicates
170        //# forward progress and the loss detection timer might have been set for
171        //# a now discarded packet number space.
172        if let Some(mut space) = self.initial.take() {
173            path_manager.active_path_mut().reset_pto_backoff();
174            let path_id = path_manager.active_path_id();
175            space.on_discard(path_manager.active_path_mut(), path_id, publisher);
176
177            if let Some((handshake, handshake_status)) = self.handshake_mut() {
178                //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
179                //# A sender SHOULD restart its PTO timer every time an ack-eliciting
180                //# packet is sent or acknowledged, or when Initial or Handshake keys are
181                //# discarded (Section 4.9 of [QUIC-TLS]).
182
183                //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2.1
184                //# Since the server could be blocked until more datagrams are received
185                //# from the client, it is the client's responsibility to send packets to
186                //# unblock the server until it is certain that the server has finished
187                //# its address validation
188
189                // Arm the PTO timer on the handshake space so the handshake can make progress
190                // even if no handshake packets have been transmitted or received yet
191                handshake.update_pto_timer(
192                    path_manager,
193                    now,
194                    handshake_status.is_confirmed(),
195                    random_generator,
196                );
197            }
198        }
199
200        //= https://www.rfc-editor.org/rfc/rfc9001#section-4.9.1
201        //# Endpoints MUST NOT send
202        //# Initial packets after this point.
203        // By discarding a space, we are no longer capable of sending packets with those
204        // keys.
205
206        debug_assert!(
207            self.initial.is_none(),
208            "initial space should have been discarded"
209        );
210    }
211
212    /// Discard the handshake packet space
213    pub fn discard_handshake<Pub: event::ConnectionPublisher>(
214        &mut self,
215        path_manager: &mut path::Manager<Config>,
216        publisher: &mut Pub,
217    ) {
218        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2
219        //# When Initial or Handshake keys are discarded, the PTO and loss
220        //# detection timers MUST be reset, because discarding keys indicates
221        //# forward progress and the loss detection timer might have been set for
222        //# a now discarded packet number space.
223        if let Some(mut space) = self.handshake.take() {
224            path_manager.active_path_mut().reset_pto_backoff();
225            let path_id = path_manager.active_path_id();
226            space.on_discard(path_manager.active_path_mut(), path_id, publisher);
227            // Dropping handshake will clear the PTO timer for the handshake space.
228            // The PTO timer for the application space is reset when the
229            // handshake is confirmed.
230
231            //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
232            //# An endpoint MUST NOT set its PTO timer for the Application Data
233            //# packet number space until the handshake is confirmed.
234        }
235
236        debug_assert!(
237            self.initial.is_none(),
238            "initial space should have been discarded"
239        );
240        debug_assert!(
241            self.handshake.is_none(),
242            "handshake space should have been discarded"
243        );
244    }
245
246    pub fn discard_zero_rtt_crypto(&mut self) {
247        self.zero_rtt_crypto = None;
248    }
249
250    pub fn poll_crypto<Pub: event::ConnectionPublisher>(
251        &mut self,
252        path_manager: &mut path::Manager<Config>,
253        local_id_registry: &mut connection::LocalIdRegistry,
254        limits: &mut Limits,
255        now: Timestamp,
256        waker: &Waker,
257        publisher: &mut Pub,
258        datagram: &mut Config::DatagramEndpoint,
259        dc: &mut Config::DcEndpoint,
260        limits_endpoint: &mut Config::ConnectionLimits,
261        random_generator: &mut Config::RandomGenerator,
262    ) -> Poll<Result<(), transport::Error>> {
263        if let Some(session_info) = self.session_info.as_mut() {
264            let mut context: SessionContext<Config, Pub> = SessionContext {
265                now,
266                initial_cid: &session_info.initial_cid,
267                retry_cid: self.retry_cid.as_deref(),
268                initial: &mut self.initial,
269                handshake: &mut self.handshake,
270                application: &mut self.application,
271                zero_rtt_crypto: &mut self.zero_rtt_crypto,
272                tls_context: &mut self.tls_context,
273                path_manager,
274                handshake_status: &mut self.handshake_status,
275                local_id_registry,
276                limits,
277                server_name: &mut self.server_name,
278                application_protocol: &mut self.application_protocol,
279                waker,
280                publisher,
281                datagram,
282                dc,
283                limits_endpoint,
284                random_generator,
285            };
286
287            match session_info.session.poll(&mut context)? {
288                Poll::Ready(_success) => {
289                    if session_info.session.should_discard_session() {
290                        self.discard_session();
291                    }
292
293                    self.retry_cid = None;
294                }
295                Poll::Pending => return Poll::Pending,
296            };
297        }
298
299        Poll::Ready(Ok(()))
300    }
301
302    pub fn post_handshake_crypto<Pub: event::ConnectionPublisher>(
303        &mut self,
304        path_manager: &mut path::Manager<Config>,
305        local_id_registry: &mut connection::LocalIdRegistry,
306        limits: &mut Limits,
307        now: Timestamp,
308        waker: &Waker,
309        publisher: &mut Pub,
310        datagram: &mut Config::DatagramEndpoint,
311        dc: &mut Config::DcEndpoint,
312        limits_endpoint: &mut Config::ConnectionLimits,
313        random_generator: &mut Config::RandomGenerator,
314    ) -> Result<(), transport::Error> {
315        if let Some(session_info) = self.session_info.as_mut() {
316            let mut context: SessionContext<Config, Pub> = SessionContext {
317                now,
318                initial_cid: &session_info.initial_cid,
319                retry_cid: self.retry_cid.as_deref(),
320                initial: &mut self.initial,
321                handshake: &mut self.handshake,
322                tls_context: &mut self.tls_context,
323                application: &mut self.application,
324                zero_rtt_crypto: &mut self.zero_rtt_crypto,
325                path_manager,
326                handshake_status: &mut self.handshake_status,
327                local_id_registry,
328                limits,
329                server_name: &mut self.server_name,
330                application_protocol: &mut self.application_protocol,
331                waker,
332                publisher,
333                datagram,
334                dc,
335                limits_endpoint,
336                random_generator,
337            };
338
339            session_info
340                .session
341                .process_post_handshake_message(&mut context)?;
342            if session_info.session.should_discard_session() {
343                self.discard_session();
344            }
345        }
346
347        Ok(())
348    }
349
350    fn discard_session(&mut self) {
351        self.session_info = None;
352        if let Some((application_space, _status)) = self.application_mut() {
353            application_space.crypto_stream.rx.reset();
354            application_space.buffer_crypto_frames = false;
355        }
356    }
357
358    /// Called when the connection timer expired
359    pub fn on_timeout<Pub: event::ConnectionPublisher>(
360        &mut self,
361        local_id_registry: &mut connection::LocalIdRegistry,
362        path_manager: &mut path::Manager<Config>,
363        random_generator: &mut Config::RandomGenerator,
364        timestamp: Timestamp,
365        publisher: &mut Pub,
366    ) -> Result<(), connection::Error> {
367        let path_id = path_manager.active_path_id();
368        let path = path_manager.active_path();
369
370        // ensure the backoff doesn't grow too quickly
371        let max_backoff = path.pto_backoff.checked_mul(2).ok_or_else(|| {
372            connection::Error::immediate_close("PTO backoff multiplier exceeded maximum value")
373        })?;
374
375        if let Some((space, handshake_status)) = self.initial_mut() {
376            space.on_timeout(
377                handshake_status,
378                path_id,
379                path_manager,
380                random_generator,
381                timestamp,
382                max_backoff,
383                publisher,
384            )
385        }
386        if let Some((space, handshake_status)) = self.handshake_mut() {
387            space.on_timeout(
388                handshake_status,
389                path_id,
390                path_manager,
391                random_generator,
392                timestamp,
393                max_backoff,
394                publisher,
395            )
396        }
397        if let Some((space, handshake_status)) = self.application_mut() {
398            space.on_timeout(
399                path_manager,
400                handshake_status,
401                local_id_registry,
402                random_generator,
403                timestamp,
404                max_backoff,
405                publisher,
406            )
407        }
408
409        debug_assert!(path_manager.active_path().pto_backoff <= max_backoff);
410        Ok(())
411    }
412
413    /// Signals the connection was previously blocked by anti-amplification limits
414    /// but is now no longer limited.
415    pub fn on_amplification_unblocked(
416        &mut self,
417        path_manager: &path::Manager<Config>,
418        random_generator: &mut Config::RandomGenerator,
419        timestamp: Timestamp,
420    ) {
421        if let Some((space, handshake_status)) = self.initial_mut() {
422            space.on_amplification_unblocked(
423                path_manager,
424                timestamp,
425                handshake_status.is_confirmed(),
426                random_generator,
427            );
428        }
429
430        if let Some((space, handshake_status)) = self.handshake_mut() {
431            space.on_amplification_unblocked(
432                path_manager,
433                timestamp,
434                handshake_status.is_confirmed(),
435                random_generator,
436            );
437        }
438
439        if let Some((space, handshake_status)) = self.application_mut() {
440            space.on_amplification_unblocked(
441                path_manager,
442                timestamp,
443                handshake_status.is_confirmed(),
444                random_generator,
445            );
446        }
447    }
448
449    /// Called after a burst of one or more packets have finished being transmitted
450    pub fn on_transmit_burst_complete(
451        &mut self,
452        active_path: &Path<Config>,
453        random_generator: &mut Config::RandomGenerator,
454        timestamp: Timestamp,
455    ) {
456        debug_assert!(active_path.is_active());
457
458        if let Some((space, handshake_status)) = self.initial_mut() {
459            space.on_transmit_burst_complete(
460                active_path,
461                timestamp,
462                handshake_status.is_confirmed(),
463                random_generator,
464            );
465        }
466
467        if let Some((space, handshake_status)) = self.handshake_mut() {
468            space.on_transmit_burst_complete(
469                active_path,
470                timestamp,
471                handshake_status.is_confirmed(),
472                random_generator,
473            );
474        }
475
476        if let Some((space, handshake_status)) = self.application_mut() {
477            space.on_transmit_burst_complete(
478                active_path,
479                timestamp,
480                handshake_status.is_confirmed(),
481                random_generator,
482            );
483        }
484    }
485
486    pub fn requires_probe(&self) -> bool {
487        core::iter::empty()
488            .chain(self.initial.iter().map(|space| space.requires_probe()))
489            .chain(self.handshake.iter().map(|space| space.requires_probe()))
490            .chain(self.application.iter().map(|space| space.requires_probe()))
491            .any(|requires_probe| requires_probe)
492    }
493
494    pub fn is_handshake_confirmed(&self) -> bool {
495        self.handshake_status.is_confirmed()
496    }
497
498    pub fn is_handshake_complete(&self) -> bool {
499        self.handshake_status.is_complete()
500    }
501
502    pub(crate) fn on_transmit_close(
503        &mut self,
504        early_connection_close: &ConnectionClose,
505        connection_close: &ConnectionClose,
506        context: &mut connection::ConnectionTransmissionContext<Config>,
507        packet_buffer: &mut endpoint::PacketBuffer,
508    ) -> Option<Bytes> {
509        //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.3
510        //# When sending a CONNECTION_CLOSE frame, the goal is to ensure that the
511        //# peer will process the frame.  Generally, this means sending the frame
512        //# in a packet with the highest level of packet protection to avoid the
513        //# packet being discarded.
514        let mut can_send_initial = self.initial.is_some();
515        let mut can_send_handshake = self.handshake.is_some();
516        let can_send_application = self.application.is_some();
517
518        //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.3
519        //# After the handshake is confirmed (see
520        //# Section 4.1.2 of [QUIC-TLS]), an endpoint MUST send any
521        //# CONNECTION_CLOSE frames in a 1-RTT packet.
522        if self.is_handshake_confirmed() {
523            can_send_initial = false;
524            can_send_handshake = false;
525            debug_assert!(
526                can_send_application,
527                "if the handshake is confirmed, 1rtt keys should be available"
528            );
529        }
530
531        //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2.3
532        //# A client will always know whether the server has Handshake keys
533        //# (see Section 17.2.2.1), but it is possible that a server does not
534        //# know whether the client has Handshake keys.  Under these
535        //# circumstances, a server SHOULD send a CONNECTION_CLOSE frame in
536        //# both Handshake and Initial packets to ensure that at least one of
537        //# them is processable by the client.
538        if can_send_handshake {
539            match Config::ENDPOINT_TYPE {
540                endpoint::Type::Client => {
541                    // if we are a client and have handshake keys, we know the server
542                    // has handshake keys as well, so no need to transmit in initial.
543                    can_send_initial = false;
544                }
545                endpoint::Type::Server => {
546                    // try to send an initial packet if the space is still available
547                    //
548                    // Note: this assignment isn't actually needed; it's mostly to make
549                    //       the code easier to follow
550                    can_send_initial &= true;
551                }
552            }
553        }
554
555        packet_buffer.write(|buffer| {
556            macro_rules! write_packet {
557                ($buffer:expr, $space:ident, $check:expr, $frame:expr) => {
558                    if let Some((space, _handshake_status)) = self.$space().filter(|_| $check) {
559                        let result = space.on_transmit_close(context, &$frame, $buffer);
560
561                        match result {
562                            Ok((outcome, buffer)) => {
563                                *context.outcome += outcome;
564                                buffer
565                            }
566                            Err(err) => err.take_buffer(),
567                        }
568                    } else {
569                        $buffer
570                    }
571                };
572            }
573
574            let buffer = write_packet!(
575                buffer,
576                initial_mut,
577                can_send_initial,
578                early_connection_close
579            );
580            let buffer = write_packet!(
581                buffer,
582                handshake_mut,
583                can_send_handshake,
584                early_connection_close
585            );
586            write_packet!(
587                buffer,
588                application_mut,
589                can_send_application,
590                connection_close
591            )
592        })
593    }
594
595    pub fn close<Pub: event::ConnectionPublisher>(
596        &mut self,
597        error: connection::Error,
598        path_manager: &mut path::Manager<Config>,
599        random_generator: &mut Config::RandomGenerator,
600        now: Timestamp,
601        publisher: &mut Pub,
602    ) {
603        self.session_info = None;
604        self.retry_cid = None;
605        self.discard_initial(path_manager, random_generator, now, publisher);
606        self.discard_handshake(path_manager, publisher);
607        self.discard_zero_rtt_crypto();
608
609        // Don't discard the application space until the application has read the error
610        if let Some((application, _handshake_status)) = self.application_mut() {
611            //= https://www.rfc-editor.org/rfc/rfc9000#section-10.2
612            //# A CONNECTION_CLOSE frame
613            //# causes all streams to immediately become closed; open streams can be
614            //# assumed to be implicitly reset.
615
616            // Close all streams with the derived error
617            application.stream_manager.close(error);
618        }
619    }
620
621    pub fn on_retry_packet(&mut self, retry_source_connection_id: PeerId) {
622        debug_assert!(Config::ENDPOINT_TYPE.is_client());
623        self.retry_cid = Some(Box::new(retry_source_connection_id));
624    }
625
626    pub fn retry_cid(&self) -> Option<&PeerId> {
627        self.retry_cid.as_deref()
628    }
629}
630
631impl<Config: endpoint::Config> timer::Provider for PacketSpaceManager<Config> {
632    #[inline]
633    fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
634        //= https://www.rfc-editor.org/rfc/rfc9002#section-6.2.1
635        //# When ack-eliciting packets in multiple packet number spaces are in
636        //# flight, the timer MUST be set to the earlier value of the Initial and
637        //# Handshake packet number spaces.
638
639        if let Some(space) = self.application.as_ref() {
640            space.timers(query)?;
641        }
642        if let Some(space) = self.handshake.as_ref() {
643            space.timers(query)?;
644        }
645        if let Some(space) = self.initial.as_ref() {
646            space.timers(query)?;
647        }
648        Ok(())
649    }
650}
651
652impl<Config: endpoint::Config> transmission::interest::Provider for PacketSpaceManager<Config> {
653    #[inline]
654    fn transmission_interest<Q: transmission::interest::Query>(
655        &self,
656        query: &mut Q,
657    ) -> transmission::interest::Result {
658        if let Some(space) = self.application.as_ref() {
659            space.transmission_interest(query)?;
660        }
661
662        self.handshake_status.transmission_interest(query)?;
663
664        if let Some(space) = self.initial.as_ref() {
665            space.transmission_interest(query)?;
666        }
667
668        if let Some(space) = self.handshake.as_ref() {
669            space.transmission_interest(query)?;
670        }
671
672        Ok(())
673    }
674}
675
676impl<Config: endpoint::Config> connection::finalization::Provider for PacketSpaceManager<Config> {
677    fn finalization_status(&self) -> connection::finalization::Status {
678        core::iter::empty()
679            .chain(self.initial.iter().map(|space| space.finalization_status()))
680            .chain(
681                self.handshake
682                    .iter()
683                    .map(|space| space.finalization_status()),
684            )
685            .chain(
686                self.application
687                    .iter()
688                    .map(|space| space.finalization_status()),
689            )
690            .sum()
691    }
692}
693
694macro_rules! default_frame_handler {
695    ($name:ident, $frame:ty) => {
696        fn $name(&mut self, frame: $frame) -> Result<(), transport::Error> {
697            Err(transport::Error::PROTOCOL_VIOLATION
698                .with_reason(Self::INVALID_FRAME_ERROR)
699                .with_frame_type(frame.tag().into()))
700        }
701    };
702}
703
704pub trait PacketSpace<Config: endpoint::Config>: Sized {
705    const INVALID_FRAME_ERROR: &'static str;
706
707    fn on_amplification_unblocked(
708        &mut self,
709        path_manager: &path::Manager<Config>,
710        timestamp: Timestamp,
711        is_handshake_confirmed: bool,
712        random_generator: &mut Config::RandomGenerator,
713    );
714
715    fn handle_crypto_frame<Pub: event::ConnectionPublisher>(
716        &mut self,
717        frame: CryptoRef,
718        datagram: &DatagramInfo,
719        path: &mut Path<Config>,
720        publisher: &mut Pub,
721    ) -> Result<(), transport::Error>;
722
723    fn handle_ack_frame<A: AckRanges, Pub: event::ConnectionPublisher>(
724        &mut self,
725        frame: Ack<A>,
726        timestamp: Timestamp,
727        path_id: path::Id,
728        path_manager: &mut path::Manager<Config>,
729        packet_number: PacketNumber,
730        handshake_status: &mut HandshakeStatus,
731        local_id_registry: &mut connection::LocalIdRegistry,
732        random_generator: &mut Config::RandomGenerator,
733        publisher: &mut Pub,
734    ) -> Result<(), transport::Error>;
735
736    fn handle_connection_close_frame<Pub: event::ConnectionPublisher>(
737        &mut self,
738        frame: ConnectionClose,
739        path_id: path::Id,
740        path: &mut Path<Config>,
741        packet_number: PacketNumber,
742        publisher: &mut Pub,
743    ) -> Result<(), transport::Error>;
744
745    fn handle_handshake_done_frame<Pub: event::ConnectionPublisher>(
746        &mut self,
747        frame: HandshakeDone,
748        _timestamp: Timestamp,
749        _path: &mut Path<Config>,
750        _local_id_registry: &mut connection::LocalIdRegistry,
751        _handshake_status: &mut HandshakeStatus,
752        _random_generator: &mut Config::RandomGenerator,
753        _publisher: &mut Pub,
754    ) -> Result<(), transport::Error> {
755        Err(transport::Error::PROTOCOL_VIOLATION
756            .with_reason(Self::INVALID_FRAME_ERROR)
757            .with_frame_type(frame.tag().into()))
758    }
759
760    fn handle_retire_connection_id_frame(
761        &mut self,
762        frame: RetireConnectionId,
763        _datagram: &DatagramInfo,
764        _path: &mut Path<Config>,
765        _local_id_registry: &mut connection::LocalIdRegistry,
766    ) -> Result<(), transport::Error> {
767        Err(transport::Error::PROTOCOL_VIOLATION
768            .with_reason(Self::INVALID_FRAME_ERROR)
769            .with_frame_type(frame.tag().into()))
770    }
771
772    fn handle_new_connection_id_frame<Pub: event::ConnectionPublisher>(
773        &mut self,
774        frame: NewConnectionId,
775        _datagram: &DatagramInfo,
776        _path_manager: &mut path::Manager<Config>,
777        _publisher: &mut Pub,
778    ) -> Result<(), transport::Error> {
779        Err(transport::Error::PROTOCOL_VIOLATION
780            .with_reason(Self::INVALID_FRAME_ERROR)
781            .with_frame_type(frame.tag().into()))
782    }
783
784    fn handle_path_response_frame<Pub: event::ConnectionPublisher>(
785        &mut self,
786        frame: PathResponse,
787        _timestamp: Timestamp,
788        _path_manager: &mut path::Manager<Config>,
789        _handshake_status: &mut HandshakeStatus,
790        _random_generator: &mut Config::RandomGenerator,
791        _publisher: &mut Pub,
792    ) -> Result<(), transport::Error> {
793        Err(transport::Error::PROTOCOL_VIOLATION
794            .with_reason(Self::INVALID_FRAME_ERROR)
795            .with_frame_type(frame.tag().into()))
796    }
797
798    fn handle_path_challenge_frame(
799        &mut self,
800        frame: PathChallenge,
801        _path_id: path::Id,
802        _path_manager: &mut path::Manager<Config>,
803    ) -> Result<(), transport::Error> {
804        Err(transport::Error::PROTOCOL_VIOLATION
805            .with_reason(Self::INVALID_FRAME_ERROR)
806            .with_frame_type(frame.tag().into()))
807    }
808
809    fn handle_stream_frame(
810        &mut self,
811        frame: StreamRef,
812        _packet: &mut ProcessedPacket,
813    ) -> Result<(), transport::Error> {
814        Err(transport::Error::PROTOCOL_VIOLATION
815            .with_reason(Self::INVALID_FRAME_ERROR)
816            .with_frame_type(frame.tag().into()))
817    }
818
819    fn handle_datagram_frame(
820        &mut self,
821        _path: s2n_quic_core::event::api::Path<'_>,
822        frame: DatagramRef,
823    ) -> Result<(), transport::Error> {
824        Err(transport::Error::PROTOCOL_VIOLATION
825            .with_reason(Self::INVALID_FRAME_ERROR)
826            .with_frame_type(frame.tag().into()))
827    }
828
829    fn handle_dc_stateless_reset_tokens_frame<Pub: event::ConnectionPublisher>(
830        &mut self,
831        frame: DcStatelessResetTokens,
832        _publisher: &mut Pub,
833    ) -> Result<(), transport::Error> {
834        Err(transport::Error::PROTOCOL_VIOLATION
835            .with_reason(Self::INVALID_FRAME_ERROR)
836            .with_frame_type(frame.tag()))
837    }
838
839    default_frame_handler!(handle_data_blocked_frame, DataBlocked);
840    default_frame_handler!(handle_max_data_frame, MaxData);
841    default_frame_handler!(handle_max_stream_data_frame, MaxStreamData);
842    default_frame_handler!(handle_max_streams_frame, MaxStreams);
843    default_frame_handler!(handle_reset_stream_frame, ResetStream);
844    default_frame_handler!(handle_stop_sending_frame, StopSending);
845    default_frame_handler!(handle_stream_data_blocked_frame, StreamDataBlocked);
846    default_frame_handler!(handle_streams_blocked_frame, StreamsBlocked);
847    default_frame_handler!(handle_new_token_frame, NewToken);
848
849    fn on_processed_packet<Pub: event::ConnectionPublisher>(
850        &mut self,
851        processed_packet: ProcessedPacket,
852        path_id: path::Id,
853        path: &Path<Config>,
854        publisher: &mut Pub,
855    ) -> Result<(), transport::Error>;
856
857    fn handle_cleartext_payload<'a, Pub: event::ConnectionPublisher>(
858        &mut self,
859        packet_number: PacketNumber,
860        payload: DecoderBufferMut<'a>,
861        datagram: &'a DatagramInfo,
862        path_id: path::Id,
863        path_manager: &mut path::Manager<Config>,
864        handshake_status: &mut HandshakeStatus,
865        local_id_registry: &mut connection::LocalIdRegistry,
866        random_generator: &mut Config::RandomGenerator,
867        publisher: &mut Pub,
868        packet_interceptor: &mut Config::PacketInterceptor,
869    ) -> Result<ProcessedPacket<'a>, connection::Error> {
870        use s2n_quic_core::frame::{Frame, FrameMut};
871
872        let mut payload = {
873            use s2n_quic_core::packet::interceptor::{Interceptor, Packet};
874
875            // intercept the payload after it is decrypted, but before we process the frames
876            packet_interceptor.intercept_rx_payload(
877                &publisher.subject(),
878                &Packet {
879                    number: packet_number,
880                    timestamp: datagram.timestamp,
881                },
882                payload,
883            )
884        };
885
886        let mut processed_packet = ProcessedPacket::new(packet_number, datagram);
887
888        macro_rules! on_frame_processed {
889            ($frame:ident) => {{
890                let frame_type = $frame.tag();
891                processed_packet.on_processed_frame(&$frame);
892                move |err: transport::Error| err.with_frame_type(frame_type.into())
893            }};
894        }
895
896        {
897            // allow for an ACK frame to be injected by the packet interceptor
898            use s2n_quic_core::packet::interceptor::Interceptor;
899            let mut ack_context = AckInterceptContext {
900                packet_space: self,
901                timestamp: datagram.timestamp,
902                path_id,
903                path_manager,
904                packet_number,
905                handshake_status,
906                local_id_registry,
907                random_generator,
908                publisher,
909                on_processed_frame: |ack_frame| processed_packet.on_processed_frame(ack_frame),
910                error: None,
911            };
912            packet_interceptor.intercept_rx_ack(&ack_context.publisher.subject(), &mut ack_context);
913            if let Some(error) = ack_context.error {
914                Err(error)?;
915            }
916        }
917
918        while !payload.is_empty() {
919            let (frame, remaining) = payload
920                .decode::<FrameMut>()
921                .map_err(transport::Error::from)?;
922
923            let path = &path_manager[path_id];
924            publisher.on_frame_received(event::builder::FrameReceived {
925                packet_header: event::builder::PacketHeader::new(
926                    packet_number,
927                    publisher.quic_version(),
928                ),
929                path: path_event!(path, path_id),
930                frame: frame.into_event(),
931            });
932
933            match frame {
934                Frame::Padding(frame) => {
935                    //= https://www.rfc-editor.org/rfc/rfc9000#section-19.1
936                    //# A PADDING frame (type=0x00) has no semantic value.  PADDING frames
937                    //# can be used to increase the size of a packet.  Padding can be used to
938                    //# increase an Initial packet to the minimum required size or to provide
939                    //# protection against traffic analysis for protected packets.
940                    let _ = on_frame_processed!(frame);
941                }
942                Frame::Ping(frame) => {
943                    //= https://www.rfc-editor.org/rfc/rfc9000#section-19.2
944                    //# Endpoints can use PING frames (type=0x01) to verify that their peers
945                    //# are still alive or to check reachability to the peer.
946                    let _ = on_frame_processed!(frame);
947                }
948                Frame::Crypto(frame) => {
949                    let on_error = on_frame_processed!(frame);
950
951                    //= https://www.rfc-editor.org/rfc/rfc9000#section-7.5
952                    //# Packets containing
953                    //# discarded CRYPTO frames MUST be acknowledged because the packet has
954                    //# been received and processed by the transport even though the CRYPTO
955                    //# frame was discarded.
956
957                    self.handle_crypto_frame(
958                        frame.into(),
959                        datagram,
960                        &mut path_manager[path_id],
961                        publisher,
962                    )
963                    .map_err(on_error)?;
964                    processed_packet.contains_crypto = true;
965                }
966                Frame::Ack(frame) => {
967                    let on_error = on_frame_processed!(frame);
968                    self.handle_ack_frame(
969                        frame,
970                        datagram.timestamp,
971                        path_id,
972                        path_manager,
973                        packet_number,
974                        handshake_status,
975                        local_id_registry,
976                        random_generator,
977                        publisher,
978                    )
979                    .map_err(on_error)?;
980                }
981                Frame::ConnectionClose(frame) => {
982                    let on_error = on_frame_processed!(frame);
983                    self.handle_connection_close_frame(
984                        frame,
985                        path_id,
986                        &mut path_manager[path_id],
987                        packet_number,
988                        publisher,
989                    )
990                    .map_err(on_error)?;
991
992                    // skip processing any other frames and return an error
993
994                    // use `from` instead of `into` so the location is correctly captured
995                    return Err(connection::Error::from(frame));
996                }
997                Frame::Stream(frame) => {
998                    let on_error = on_frame_processed!(frame);
999                    self.handle_stream_frame(frame.into(), &mut processed_packet)
1000                        .map_err(on_error)?;
1001                }
1002                Frame::Datagram(frame) => {
1003                    let on_error = on_frame_processed!(frame);
1004                    self.handle_datagram_frame(
1005                        path_event!(path, path_id).into_event(),
1006                        frame.into(),
1007                    )
1008                    .map_err(on_error)?;
1009                }
1010                Frame::DataBlocked(frame) => {
1011                    let on_error = on_frame_processed!(frame);
1012                    self.handle_data_blocked_frame(frame).map_err(on_error)?;
1013                }
1014                Frame::MaxData(frame) => {
1015                    let on_error = on_frame_processed!(frame);
1016                    self.handle_max_data_frame(frame).map_err(on_error)?;
1017                }
1018                Frame::MaxStreamData(frame) => {
1019                    let on_error = on_frame_processed!(frame);
1020                    self.handle_max_stream_data_frame(frame).map_err(on_error)?;
1021                }
1022                Frame::MaxStreams(frame) => {
1023                    let on_error = on_frame_processed!(frame);
1024                    self.handle_max_streams_frame(frame).map_err(on_error)?;
1025                }
1026                Frame::ResetStream(frame) => {
1027                    let on_error = on_frame_processed!(frame);
1028                    self.handle_reset_stream_frame(frame).map_err(on_error)?;
1029                }
1030                Frame::StopSending(frame) => {
1031                    let on_error = on_frame_processed!(frame);
1032                    self.handle_stop_sending_frame(frame).map_err(on_error)?;
1033                }
1034                Frame::StreamDataBlocked(frame) => {
1035                    let on_error = on_frame_processed!(frame);
1036                    self.handle_stream_data_blocked_frame(frame)
1037                        .map_err(on_error)?;
1038                }
1039                Frame::StreamsBlocked(frame) => {
1040                    let on_error = on_frame_processed!(frame);
1041                    self.handle_streams_blocked_frame(frame).map_err(on_error)?;
1042                }
1043                Frame::NewToken(frame) => {
1044                    let on_error = on_frame_processed!(frame);
1045                    self.handle_new_token_frame(frame).map_err(on_error)?;
1046                }
1047                Frame::NewConnectionId(frame) => {
1048                    let on_error = on_frame_processed!(frame);
1049                    self.handle_new_connection_id_frame(frame, datagram, path_manager, publisher)
1050                        .map_err(on_error)?;
1051                }
1052                Frame::RetireConnectionId(frame) => {
1053                    let on_error = on_frame_processed!(frame);
1054                    self.handle_retire_connection_id_frame(
1055                        frame,
1056                        datagram,
1057                        &mut path_manager[path_id],
1058                        local_id_registry,
1059                    )
1060                    .map_err(on_error)?;
1061                }
1062                Frame::PathChallenge(frame) => {
1063                    let on_error = on_frame_processed!(frame);
1064
1065                    //= https://www.rfc-editor.org/rfc/rfc9000#section-9.3.3
1066                    //# An endpoint that receives a PATH_CHALLENGE on an active path SHOULD
1067                    //# send a non-probing packet in response.
1068                    if path_manager.active_path_id() == path_id {
1069                        processed_packet.path_challenge_on_active_path = true;
1070                    }
1071                    self.handle_path_challenge_frame(frame, path_id, path_manager)
1072                        .map_err(on_error)?;
1073                }
1074                Frame::PathResponse(frame) => {
1075                    let on_error = on_frame_processed!(frame);
1076
1077                    self.handle_path_response_frame(
1078                        frame,
1079                        datagram.timestamp,
1080                        path_manager,
1081                        handshake_status,
1082                        random_generator,
1083                        publisher,
1084                    )
1085                    .map_err(on_error)?;
1086                }
1087                Frame::HandshakeDone(frame) => {
1088                    let on_error = on_frame_processed!(frame);
1089                    self.handle_handshake_done_frame(
1090                        frame,
1091                        datagram.timestamp,
1092                        &mut path_manager[path_id],
1093                        local_id_registry,
1094                        handshake_status,
1095                        random_generator,
1096                        publisher,
1097                    )
1098                    .map_err(on_error)?;
1099                }
1100                Frame::DcStatelessResetTokens(frame) => {
1101                    let on_error = on_frame_processed!(frame);
1102                    self.handle_dc_stateless_reset_tokens_frame(frame, publisher)
1103                        .map_err(on_error)?;
1104                }
1105            }
1106
1107            payload = remaining;
1108        }
1109
1110        //= https://www.rfc-editor.org/rfc/rfc9000#section-12.4
1111        //# The payload of a packet that contains frames MUST contain at least
1112        //# one frame, and MAY contain multiple frames and multiple frame types.
1113        //# An endpoint MUST treat receipt of a packet containing no frames as a
1114        //# connection error of type PROTOCOL_VIOLATION.
1115        if processed_packet.frames == 0 {
1116            return Err(transport::Error::PROTOCOL_VIOLATION
1117                .with_reason("packet contained no frames")
1118                .into());
1119        }
1120
1121        let amplification_outcome = path_manager.on_processed_packet(
1122            path_id,
1123            datagram.source_connection_id,
1124            processed_packet.path_validation_probing,
1125            random_generator,
1126            publisher,
1127        )?;
1128
1129        if amplification_outcome.is_active_path_unblocked() {
1130            self.on_amplification_unblocked(
1131                path_manager,
1132                datagram.timestamp,
1133                handshake_status.is_confirmed(),
1134                random_generator,
1135            );
1136        }
1137
1138        //= https://www.rfc-editor.org/rfc/rfc9000#section-13.1
1139        //# A packet MUST NOT be acknowledged until packet protection has been
1140        //# successfully removed and all frames contained in the packet have been
1141        //# processed.  For STREAM frames, this means the data has been enqueued
1142        //# in preparation to be received by the application protocol, but it
1143        //# does not require that data be delivered and consumed.
1144        //#
1145        //# Once the packet has been fully processed, a receiver acknowledges
1146        //# receipt by sending one or more ACK frames containing the packet
1147        //# number of the received packet.
1148
1149        self.on_processed_packet(processed_packet, path_id, &path_manager[path_id], publisher)?;
1150
1151        Ok(processed_packet)
1152    }
1153}
1154
1155struct AckInterceptContext<
1156    'a,
1157    Config: endpoint::Config,
1158    Pub: event::ConnectionPublisher,
1159    Space: PacketSpace<Config>,
1160    OnProcessedFrame: FnMut(&Ack<&ack::Ranges>),
1161> {
1162    packet_space: &'a mut Space,
1163    timestamp: Timestamp,
1164    path_id: path::Id,
1165    path_manager: &'a mut path::Manager<Config>,
1166    packet_number: PacketNumber,
1167    handshake_status: &'a mut HandshakeStatus,
1168    local_id_registry: &'a mut connection::LocalIdRegistry,
1169    random_generator: &'a mut Config::RandomGenerator,
1170    publisher: &'a mut Pub,
1171    on_processed_frame: OnProcessedFrame,
1172    error: Option<transport::Error>,
1173}
1174
1175impl<
1176        Config: endpoint::Config,
1177        Pub: event::ConnectionPublisher,
1178        Space: PacketSpace<Config>,
1179        OnProcessedFrame: FnMut(&Ack<&ack::Ranges>),
1180    > s2n_quic_core::packet::interceptor::Ack
1181    for AckInterceptContext<'_, Config, Pub, Space, OnProcessedFrame>
1182{
1183    fn space(&self) -> PacketNumberSpace {
1184        self.packet_number.space()
1185    }
1186
1187    fn insert_range(&mut self, range: RangeInclusive<VarInt>) {
1188        use s2n_quic_core::packet::number::PacketNumberRange;
1189
1190        let mut ack_ranges = ack::Ranges::default();
1191        let pn_range = PacketNumberRange::new(
1192            self.space().new_packet_number(*range.start()),
1193            self.space().new_packet_number(*range.end()),
1194        );
1195        if ack_ranges.insert_packet_number_range(pn_range).is_err() {
1196            self.error = Some(
1197                transport::Error::INTERNAL_ERROR
1198                    .with_reason("Ack interceptor inserted an invalid packet range"),
1199            );
1200            return;
1201        }
1202        let ack_frame = Ack {
1203            ack_delay: Default::default(),
1204            ack_ranges: &ack_ranges,
1205            ecn_counts: None,
1206        };
1207        (self.on_processed_frame)(&ack_frame);
1208
1209        let on_error = {
1210            let frame_type = ack_frame.tag();
1211            move |err: transport::Error| err.with_frame_type(frame_type.into())
1212        };
1213        self.error = self
1214            .packet_space
1215            .handle_ack_frame(
1216                ack_frame,
1217                self.timestamp,
1218                self.path_id,
1219                self.path_manager,
1220                self.packet_number,
1221                self.handshake_status,
1222                self.local_id_registry,
1223                self.random_generator,
1224                self.publisher,
1225            )
1226            .map_err(on_error)
1227            .err();
1228    }
1229}