1use crate::{
5 ack,
6 ack::AckManager,
7 connection, endpoint, path,
8 path::{path_event, Path},
9 processed_packet::ProcessedPacket,
10 stream::Manager as _,
11 transmission,
12};
13use bytes::Bytes;
14use core::{
15 any::Any,
16 fmt,
17 ops::RangeInclusive,
18 task::{Poll, Waker},
19};
20use s2n_codec::DecoderBufferMut;
21use s2n_quic_core::{
22 application::ServerName,
23 connection::{limits::Limits, InitialId, PeerId},
24 crypto::{tls, tls::Session, CryptoSuite, Key},
25 event::{self, IntoEvent},
26 frame::{
27 ack::AckRanges, crypto::CryptoRef, datagram::DatagramRef, stream::StreamRef, Ack,
28 ConnectionClose, DataBlocked, DcStatelessResetTokens, HandshakeDone, MaxData,
29 MaxStreamData, MaxStreams, NewConnectionId, NewToken, PathChallenge, PathResponse,
30 ResetStream, RetireConnectionId, StopSending, StreamDataBlocked, StreamsBlocked,
31 },
32 inet::DatagramInfo,
33 packet::number::{PacketNumber, PacketNumberSpace},
34 time::{timer, Timestamp},
35 transport,
36 varint::VarInt,
37};
38
39mod application;
40mod crypto_stream;
41pub(crate) mod datagram;
42mod handshake;
43mod handshake_status;
44mod initial;
45mod keep_alive;
46mod session_context;
47mod tx_packet_numbers;
48
49pub(crate) use application::ApplicationSpace;
50pub(crate) use crypto_stream::CryptoStream;
51pub(crate) use handshake::HandshakeSpace;
52pub(crate) use handshake_status::HandshakeStatus;
53pub(crate) use initial::InitialSpace;
54pub(crate) use session_context::SessionContext;
55pub(crate) use tx_packet_numbers::TxPacketNumbers;
56
57struct SessionInfo<Config: endpoint::Config> {
58 session: <Config::TLSEndpoint as tls::Endpoint>::Session,
59 initial_cid: InitialId,
60}
61
62pub struct PacketSpaceManager<Config: endpoint::Config> {
63 session_info: Option<SessionInfo<Config>>,
64 retry_cid: Option<Box<PeerId>>,
65 initial: Option<Box<InitialSpace<Config>>>,
66 handshake: Option<Box<HandshakeSpace<Config>>>,
67 pub tls_context: Option<Box<dyn Any + Send>>,
68 application: Option<Box<ApplicationSpace<Config>>>,
69 zero_rtt_crypto:
70 Option<Box<<<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::ZeroRttKey>>,
71 handshake_status: HandshakeStatus,
72 pub server_name: Option<ServerName>,
74 pub application_protocol: Bytes,
82}
83
84impl<Config: endpoint::Config> fmt::Debug for PacketSpaceManager<Config> {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("PacketSpaceManager")
87 .field("initial", &self.initial)
88 .field("handshake", &self.handshake)
89 .field("application", &self.application)
90 .field("handshake_status", &self.handshake_status)
91 .finish()
92 }
93}
94
95macro_rules! packet_space_api {
96 ($ty:ty, $field:ident, $get_mut:ident) => {
97 #[allow(dead_code)]
98 pub fn $field(&self) -> Option<&$ty> {
99 self.$field.as_ref().map(Box::as_ref)
100 }
101
102 pub fn $get_mut(&mut self) -> Option<(&mut $ty, &mut HandshakeStatus)> {
103 let space = self.$field.as_mut().map(Box::as_mut)?;
104 Some((space, &mut self.handshake_status))
105 }
106 };
107}
108
109impl<Config: endpoint::Config> PacketSpaceManager<Config> {
110 pub fn new<Pub: event::ConnectionPublisher>(
111 initial_cid: InitialId,
112 session: <Config::TLSEndpoint as tls::Endpoint>::Session,
113 initial_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialKey,
114 header_key: <<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::InitialHeaderKey,
115 now: Timestamp,
116 publisher: &mut Pub,
117 ) -> Self {
118 let ack_manager = AckManager::new(PacketNumberSpace::Initial, ack::Settings::EARLY);
119
120 publisher.on_key_update(event::builder::KeyUpdate {
121 key_type: event::builder::KeyType::Initial,
122 cipher_suite: initial_key.cipher_suite().into_event(),
123 });
124 Self {
125 session_info: Some(SessionInfo {
126 session,
127 initial_cid,
128 }),
129 tls_context: None,
130 retry_cid: None,
131 initial: Some(Box::new(InitialSpace::new(
132 initial_key,
133 header_key,
134 now,
135 ack_manager,
136 ))),
137 handshake: None,
138 application: None,
139 zero_rtt_crypto: None,
140 handshake_status: HandshakeStatus::default(),
141 server_name: None,
142 application_protocol: Bytes::new(),
143 }
144 }
145
146 packet_space_api!(InitialSpace<Config>, initial, initial_mut);
147
148 packet_space_api!(HandshakeSpace<Config>, handshake, handshake_mut);
149
150 packet_space_api!(ApplicationSpace<Config>, application, application_mut);
151
152 #[allow(dead_code)] pub fn zero_rtt_crypto(
154 &self,
155 ) -> Option<&<<Config::TLSEndpoint as tls::Endpoint>::Session as CryptoSuite>::ZeroRttKey> {
156 self.zero_rtt_crypto.as_ref().map(Box::as_ref)
157 }
158
159 pub fn discard_initial<Pub: event::ConnectionPublisher>(
161 &mut self,
162 path_manager: &mut path::Manager<Config>,
163 random_generator: &mut Config::RandomGenerator,
164 now: Timestamp,
165 publisher: &mut Pub,
166 ) {
167 if let Some(mut space) = self.initial.take() {
173 path_manager.active_path_mut().reset_pto_backoff();
174 let path_id = path_manager.active_path_id();
175 space.on_discard(path_manager.active_path_mut(), path_id, publisher);
176
177 if let Some((handshake, handshake_status)) = self.handshake_mut() {
178 handshake.update_pto_timer(
192 path_manager,
193 now,
194 handshake_status.is_confirmed(),
195 random_generator,
196 );
197 }
198 }
199
200 debug_assert!(
207 self.initial.is_none(),
208 "initial space should have been discarded"
209 );
210 }
211
212 pub fn discard_handshake<Pub: event::ConnectionPublisher>(
214 &mut self,
215 path_manager: &mut path::Manager<Config>,
216 publisher: &mut Pub,
217 ) {
218 if let Some(mut space) = self.handshake.take() {
224 path_manager.active_path_mut().reset_pto_backoff();
225 let path_id = path_manager.active_path_id();
226 space.on_discard(path_manager.active_path_mut(), path_id, publisher);
227 }
235
236 debug_assert!(
237 self.initial.is_none(),
238 "initial space should have been discarded"
239 );
240 debug_assert!(
241 self.handshake.is_none(),
242 "handshake space should have been discarded"
243 );
244 }
245
246 pub fn discard_zero_rtt_crypto(&mut self) {
247 self.zero_rtt_crypto = None;
248 }
249
250 pub fn poll_crypto<Pub: event::ConnectionPublisher>(
251 &mut self,
252 path_manager: &mut path::Manager<Config>,
253 local_id_registry: &mut connection::LocalIdRegistry,
254 limits: &mut Limits,
255 now: Timestamp,
256 waker: &Waker,
257 publisher: &mut Pub,
258 datagram: &mut Config::DatagramEndpoint,
259 dc: &mut Config::DcEndpoint,
260 limits_endpoint: &mut Config::ConnectionLimits,
261 random_generator: &mut Config::RandomGenerator,
262 ) -> Poll<Result<(), transport::Error>> {
263 if let Some(session_info) = self.session_info.as_mut() {
264 let mut context: SessionContext<Config, Pub> = SessionContext {
265 now,
266 initial_cid: &session_info.initial_cid,
267 retry_cid: self.retry_cid.as_deref(),
268 initial: &mut self.initial,
269 handshake: &mut self.handshake,
270 application: &mut self.application,
271 zero_rtt_crypto: &mut self.zero_rtt_crypto,
272 tls_context: &mut self.tls_context,
273 path_manager,
274 handshake_status: &mut self.handshake_status,
275 local_id_registry,
276 limits,
277 server_name: &mut self.server_name,
278 application_protocol: &mut self.application_protocol,
279 waker,
280 publisher,
281 datagram,
282 dc,
283 limits_endpoint,
284 random_generator,
285 };
286
287 match session_info.session.poll(&mut context)? {
288 Poll::Ready(_success) => {
289 if session_info.session.should_discard_session() {
290 self.discard_session();
291 }
292
293 self.retry_cid = None;
294 }
295 Poll::Pending => return Poll::Pending,
296 };
297 }
298
299 Poll::Ready(Ok(()))
300 }
301
302 pub fn post_handshake_crypto<Pub: event::ConnectionPublisher>(
303 &mut self,
304 path_manager: &mut path::Manager<Config>,
305 local_id_registry: &mut connection::LocalIdRegistry,
306 limits: &mut Limits,
307 now: Timestamp,
308 waker: &Waker,
309 publisher: &mut Pub,
310 datagram: &mut Config::DatagramEndpoint,
311 dc: &mut Config::DcEndpoint,
312 limits_endpoint: &mut Config::ConnectionLimits,
313 random_generator: &mut Config::RandomGenerator,
314 ) -> Result<(), transport::Error> {
315 if let Some(session_info) = self.session_info.as_mut() {
316 let mut context: SessionContext<Config, Pub> = SessionContext {
317 now,
318 initial_cid: &session_info.initial_cid,
319 retry_cid: self.retry_cid.as_deref(),
320 initial: &mut self.initial,
321 handshake: &mut self.handshake,
322 tls_context: &mut self.tls_context,
323 application: &mut self.application,
324 zero_rtt_crypto: &mut self.zero_rtt_crypto,
325 path_manager,
326 handshake_status: &mut self.handshake_status,
327 local_id_registry,
328 limits,
329 server_name: &mut self.server_name,
330 application_protocol: &mut self.application_protocol,
331 waker,
332 publisher,
333 datagram,
334 dc,
335 limits_endpoint,
336 random_generator,
337 };
338
339 session_info
340 .session
341 .process_post_handshake_message(&mut context)?;
342 if session_info.session.should_discard_session() {
343 self.discard_session();
344 }
345 }
346
347 Ok(())
348 }
349
350 fn discard_session(&mut self) {
351 self.session_info = None;
352 if let Some((application_space, _status)) = self.application_mut() {
353 application_space.crypto_stream.rx.reset();
354 application_space.buffer_crypto_frames = false;
355 }
356 }
357
358 pub fn on_timeout<Pub: event::ConnectionPublisher>(
360 &mut self,
361 local_id_registry: &mut connection::LocalIdRegistry,
362 path_manager: &mut path::Manager<Config>,
363 random_generator: &mut Config::RandomGenerator,
364 timestamp: Timestamp,
365 publisher: &mut Pub,
366 ) -> Result<(), connection::Error> {
367 let path_id = path_manager.active_path_id();
368 let path = path_manager.active_path();
369
370 let max_backoff = path.pto_backoff.checked_mul(2).ok_or_else(|| {
372 connection::Error::immediate_close("PTO backoff multiplier exceeded maximum value")
373 })?;
374
375 if let Some((space, handshake_status)) = self.initial_mut() {
376 space.on_timeout(
377 handshake_status,
378 path_id,
379 path_manager,
380 random_generator,
381 timestamp,
382 max_backoff,
383 publisher,
384 )
385 }
386 if let Some((space, handshake_status)) = self.handshake_mut() {
387 space.on_timeout(
388 handshake_status,
389 path_id,
390 path_manager,
391 random_generator,
392 timestamp,
393 max_backoff,
394 publisher,
395 )
396 }
397 if let Some((space, handshake_status)) = self.application_mut() {
398 space.on_timeout(
399 path_manager,
400 handshake_status,
401 local_id_registry,
402 random_generator,
403 timestamp,
404 max_backoff,
405 publisher,
406 )
407 }
408
409 debug_assert!(path_manager.active_path().pto_backoff <= max_backoff);
410 Ok(())
411 }
412
413 pub fn on_amplification_unblocked(
416 &mut self,
417 path_manager: &path::Manager<Config>,
418 random_generator: &mut Config::RandomGenerator,
419 timestamp: Timestamp,
420 ) {
421 if let Some((space, handshake_status)) = self.initial_mut() {
422 space.on_amplification_unblocked(
423 path_manager,
424 timestamp,
425 handshake_status.is_confirmed(),
426 random_generator,
427 );
428 }
429
430 if let Some((space, handshake_status)) = self.handshake_mut() {
431 space.on_amplification_unblocked(
432 path_manager,
433 timestamp,
434 handshake_status.is_confirmed(),
435 random_generator,
436 );
437 }
438
439 if let Some((space, handshake_status)) = self.application_mut() {
440 space.on_amplification_unblocked(
441 path_manager,
442 timestamp,
443 handshake_status.is_confirmed(),
444 random_generator,
445 );
446 }
447 }
448
449 pub fn on_transmit_burst_complete(
451 &mut self,
452 active_path: &Path<Config>,
453 random_generator: &mut Config::RandomGenerator,
454 timestamp: Timestamp,
455 ) {
456 debug_assert!(active_path.is_active());
457
458 if let Some((space, handshake_status)) = self.initial_mut() {
459 space.on_transmit_burst_complete(
460 active_path,
461 timestamp,
462 handshake_status.is_confirmed(),
463 random_generator,
464 );
465 }
466
467 if let Some((space, handshake_status)) = self.handshake_mut() {
468 space.on_transmit_burst_complete(
469 active_path,
470 timestamp,
471 handshake_status.is_confirmed(),
472 random_generator,
473 );
474 }
475
476 if let Some((space, handshake_status)) = self.application_mut() {
477 space.on_transmit_burst_complete(
478 active_path,
479 timestamp,
480 handshake_status.is_confirmed(),
481 random_generator,
482 );
483 }
484 }
485
486 pub fn requires_probe(&self) -> bool {
487 core::iter::empty()
488 .chain(self.initial.iter().map(|space| space.requires_probe()))
489 .chain(self.handshake.iter().map(|space| space.requires_probe()))
490 .chain(self.application.iter().map(|space| space.requires_probe()))
491 .any(|requires_probe| requires_probe)
492 }
493
494 pub fn is_handshake_confirmed(&self) -> bool {
495 self.handshake_status.is_confirmed()
496 }
497
498 pub fn is_handshake_complete(&self) -> bool {
499 self.handshake_status.is_complete()
500 }
501
502 pub(crate) fn on_transmit_close(
503 &mut self,
504 early_connection_close: &ConnectionClose,
505 connection_close: &ConnectionClose,
506 context: &mut connection::ConnectionTransmissionContext<Config>,
507 packet_buffer: &mut endpoint::PacketBuffer,
508 ) -> Option<Bytes> {
509 let mut can_send_initial = self.initial.is_some();
515 let mut can_send_handshake = self.handshake.is_some();
516 let can_send_application = self.application.is_some();
517
518 if self.is_handshake_confirmed() {
523 can_send_initial = false;
524 can_send_handshake = false;
525 debug_assert!(
526 can_send_application,
527 "if the handshake is confirmed, 1rtt keys should be available"
528 );
529 }
530
531 if can_send_handshake {
539 match Config::ENDPOINT_TYPE {
540 endpoint::Type::Client => {
541 can_send_initial = false;
544 }
545 endpoint::Type::Server => {
546 can_send_initial &= true;
551 }
552 }
553 }
554
555 packet_buffer.write(|buffer| {
556 macro_rules! write_packet {
557 ($buffer:expr, $space:ident, $check:expr, $frame:expr) => {
558 if let Some((space, _handshake_status)) = self.$space().filter(|_| $check) {
559 let result = space.on_transmit_close(context, &$frame, $buffer);
560
561 match result {
562 Ok((outcome, buffer)) => {
563 *context.outcome += outcome;
564 buffer
565 }
566 Err(err) => err.take_buffer(),
567 }
568 } else {
569 $buffer
570 }
571 };
572 }
573
574 let buffer = write_packet!(
575 buffer,
576 initial_mut,
577 can_send_initial,
578 early_connection_close
579 );
580 let buffer = write_packet!(
581 buffer,
582 handshake_mut,
583 can_send_handshake,
584 early_connection_close
585 );
586 write_packet!(
587 buffer,
588 application_mut,
589 can_send_application,
590 connection_close
591 )
592 })
593 }
594
595 pub fn close<Pub: event::ConnectionPublisher>(
596 &mut self,
597 error: connection::Error,
598 path_manager: &mut path::Manager<Config>,
599 random_generator: &mut Config::RandomGenerator,
600 now: Timestamp,
601 publisher: &mut Pub,
602 ) {
603 self.session_info = None;
604 self.retry_cid = None;
605 self.discard_initial(path_manager, random_generator, now, publisher);
606 self.discard_handshake(path_manager, publisher);
607 self.discard_zero_rtt_crypto();
608
609 if let Some((application, _handshake_status)) = self.application_mut() {
611 application.stream_manager.close(error);
618 }
619 }
620
621 pub fn on_retry_packet(&mut self, retry_source_connection_id: PeerId) {
622 debug_assert!(Config::ENDPOINT_TYPE.is_client());
623 self.retry_cid = Some(Box::new(retry_source_connection_id));
624 }
625
626 pub fn retry_cid(&self) -> Option<&PeerId> {
627 self.retry_cid.as_deref()
628 }
629}
630
631impl<Config: endpoint::Config> timer::Provider for PacketSpaceManager<Config> {
632 #[inline]
633 fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
634 if let Some(space) = self.application.as_ref() {
640 space.timers(query)?;
641 }
642 if let Some(space) = self.handshake.as_ref() {
643 space.timers(query)?;
644 }
645 if let Some(space) = self.initial.as_ref() {
646 space.timers(query)?;
647 }
648 Ok(())
649 }
650}
651
652impl<Config: endpoint::Config> transmission::interest::Provider for PacketSpaceManager<Config> {
653 #[inline]
654 fn transmission_interest<Q: transmission::interest::Query>(
655 &self,
656 query: &mut Q,
657 ) -> transmission::interest::Result {
658 if let Some(space) = self.application.as_ref() {
659 space.transmission_interest(query)?;
660 }
661
662 self.handshake_status.transmission_interest(query)?;
663
664 if let Some(space) = self.initial.as_ref() {
665 space.transmission_interest(query)?;
666 }
667
668 if let Some(space) = self.handshake.as_ref() {
669 space.transmission_interest(query)?;
670 }
671
672 Ok(())
673 }
674}
675
676impl<Config: endpoint::Config> connection::finalization::Provider for PacketSpaceManager<Config> {
677 fn finalization_status(&self) -> connection::finalization::Status {
678 core::iter::empty()
679 .chain(self.initial.iter().map(|space| space.finalization_status()))
680 .chain(
681 self.handshake
682 .iter()
683 .map(|space| space.finalization_status()),
684 )
685 .chain(
686 self.application
687 .iter()
688 .map(|space| space.finalization_status()),
689 )
690 .sum()
691 }
692}
693
694macro_rules! default_frame_handler {
695 ($name:ident, $frame:ty) => {
696 fn $name(&mut self, frame: $frame) -> Result<(), transport::Error> {
697 Err(transport::Error::PROTOCOL_VIOLATION
698 .with_reason(Self::INVALID_FRAME_ERROR)
699 .with_frame_type(frame.tag().into()))
700 }
701 };
702}
703
704pub trait PacketSpace<Config: endpoint::Config>: Sized {
705 const INVALID_FRAME_ERROR: &'static str;
706
707 fn on_amplification_unblocked(
708 &mut self,
709 path_manager: &path::Manager<Config>,
710 timestamp: Timestamp,
711 is_handshake_confirmed: bool,
712 random_generator: &mut Config::RandomGenerator,
713 );
714
715 fn handle_crypto_frame<Pub: event::ConnectionPublisher>(
716 &mut self,
717 frame: CryptoRef,
718 datagram: &DatagramInfo,
719 path: &mut Path<Config>,
720 publisher: &mut Pub,
721 ) -> Result<(), transport::Error>;
722
723 fn handle_ack_frame<A: AckRanges, Pub: event::ConnectionPublisher>(
724 &mut self,
725 frame: Ack<A>,
726 timestamp: Timestamp,
727 path_id: path::Id,
728 path_manager: &mut path::Manager<Config>,
729 packet_number: PacketNumber,
730 handshake_status: &mut HandshakeStatus,
731 local_id_registry: &mut connection::LocalIdRegistry,
732 random_generator: &mut Config::RandomGenerator,
733 publisher: &mut Pub,
734 ) -> Result<(), transport::Error>;
735
736 fn handle_connection_close_frame<Pub: event::ConnectionPublisher>(
737 &mut self,
738 frame: ConnectionClose,
739 path_id: path::Id,
740 path: &mut Path<Config>,
741 packet_number: PacketNumber,
742 publisher: &mut Pub,
743 ) -> Result<(), transport::Error>;
744
745 fn handle_handshake_done_frame<Pub: event::ConnectionPublisher>(
746 &mut self,
747 frame: HandshakeDone,
748 _timestamp: Timestamp,
749 _path: &mut Path<Config>,
750 _local_id_registry: &mut connection::LocalIdRegistry,
751 _handshake_status: &mut HandshakeStatus,
752 _random_generator: &mut Config::RandomGenerator,
753 _publisher: &mut Pub,
754 ) -> Result<(), transport::Error> {
755 Err(transport::Error::PROTOCOL_VIOLATION
756 .with_reason(Self::INVALID_FRAME_ERROR)
757 .with_frame_type(frame.tag().into()))
758 }
759
760 fn handle_retire_connection_id_frame(
761 &mut self,
762 frame: RetireConnectionId,
763 _datagram: &DatagramInfo,
764 _path: &mut Path<Config>,
765 _local_id_registry: &mut connection::LocalIdRegistry,
766 ) -> Result<(), transport::Error> {
767 Err(transport::Error::PROTOCOL_VIOLATION
768 .with_reason(Self::INVALID_FRAME_ERROR)
769 .with_frame_type(frame.tag().into()))
770 }
771
772 fn handle_new_connection_id_frame<Pub: event::ConnectionPublisher>(
773 &mut self,
774 frame: NewConnectionId,
775 _datagram: &DatagramInfo,
776 _path_manager: &mut path::Manager<Config>,
777 _publisher: &mut Pub,
778 ) -> Result<(), transport::Error> {
779 Err(transport::Error::PROTOCOL_VIOLATION
780 .with_reason(Self::INVALID_FRAME_ERROR)
781 .with_frame_type(frame.tag().into()))
782 }
783
784 fn handle_path_response_frame<Pub: event::ConnectionPublisher>(
785 &mut self,
786 frame: PathResponse,
787 _timestamp: Timestamp,
788 _path_manager: &mut path::Manager<Config>,
789 _handshake_status: &mut HandshakeStatus,
790 _random_generator: &mut Config::RandomGenerator,
791 _publisher: &mut Pub,
792 ) -> Result<(), transport::Error> {
793 Err(transport::Error::PROTOCOL_VIOLATION
794 .with_reason(Self::INVALID_FRAME_ERROR)
795 .with_frame_type(frame.tag().into()))
796 }
797
798 fn handle_path_challenge_frame(
799 &mut self,
800 frame: PathChallenge,
801 _path_id: path::Id,
802 _path_manager: &mut path::Manager<Config>,
803 ) -> Result<(), transport::Error> {
804 Err(transport::Error::PROTOCOL_VIOLATION
805 .with_reason(Self::INVALID_FRAME_ERROR)
806 .with_frame_type(frame.tag().into()))
807 }
808
809 fn handle_stream_frame(
810 &mut self,
811 frame: StreamRef,
812 _packet: &mut ProcessedPacket,
813 ) -> Result<(), transport::Error> {
814 Err(transport::Error::PROTOCOL_VIOLATION
815 .with_reason(Self::INVALID_FRAME_ERROR)
816 .with_frame_type(frame.tag().into()))
817 }
818
819 fn handle_datagram_frame(
820 &mut self,
821 _path: s2n_quic_core::event::api::Path<'_>,
822 frame: DatagramRef,
823 ) -> Result<(), transport::Error> {
824 Err(transport::Error::PROTOCOL_VIOLATION
825 .with_reason(Self::INVALID_FRAME_ERROR)
826 .with_frame_type(frame.tag().into()))
827 }
828
829 fn handle_dc_stateless_reset_tokens_frame<Pub: event::ConnectionPublisher>(
830 &mut self,
831 frame: DcStatelessResetTokens,
832 _publisher: &mut Pub,
833 ) -> Result<(), transport::Error> {
834 Err(transport::Error::PROTOCOL_VIOLATION
835 .with_reason(Self::INVALID_FRAME_ERROR)
836 .with_frame_type(frame.tag()))
837 }
838
839 default_frame_handler!(handle_data_blocked_frame, DataBlocked);
840 default_frame_handler!(handle_max_data_frame, MaxData);
841 default_frame_handler!(handle_max_stream_data_frame, MaxStreamData);
842 default_frame_handler!(handle_max_streams_frame, MaxStreams);
843 default_frame_handler!(handle_reset_stream_frame, ResetStream);
844 default_frame_handler!(handle_stop_sending_frame, StopSending);
845 default_frame_handler!(handle_stream_data_blocked_frame, StreamDataBlocked);
846 default_frame_handler!(handle_streams_blocked_frame, StreamsBlocked);
847 default_frame_handler!(handle_new_token_frame, NewToken);
848
849 fn on_processed_packet<Pub: event::ConnectionPublisher>(
850 &mut self,
851 processed_packet: ProcessedPacket,
852 path_id: path::Id,
853 path: &Path<Config>,
854 publisher: &mut Pub,
855 ) -> Result<(), transport::Error>;
856
857 fn handle_cleartext_payload<'a, Pub: event::ConnectionPublisher>(
858 &mut self,
859 packet_number: PacketNumber,
860 payload: DecoderBufferMut<'a>,
861 datagram: &'a DatagramInfo,
862 path_id: path::Id,
863 path_manager: &mut path::Manager<Config>,
864 handshake_status: &mut HandshakeStatus,
865 local_id_registry: &mut connection::LocalIdRegistry,
866 random_generator: &mut Config::RandomGenerator,
867 publisher: &mut Pub,
868 packet_interceptor: &mut Config::PacketInterceptor,
869 ) -> Result<ProcessedPacket<'a>, connection::Error> {
870 use s2n_quic_core::frame::{Frame, FrameMut};
871
872 let mut payload = {
873 use s2n_quic_core::packet::interceptor::{Interceptor, Packet};
874
875 packet_interceptor.intercept_rx_payload(
877 &publisher.subject(),
878 &Packet {
879 number: packet_number,
880 timestamp: datagram.timestamp,
881 },
882 payload,
883 )
884 };
885
886 let mut processed_packet = ProcessedPacket::new(packet_number, datagram);
887
888 macro_rules! on_frame_processed {
889 ($frame:ident) => {{
890 let frame_type = $frame.tag();
891 processed_packet.on_processed_frame(&$frame);
892 move |err: transport::Error| err.with_frame_type(frame_type.into())
893 }};
894 }
895
896 {
897 use s2n_quic_core::packet::interceptor::Interceptor;
899 let mut ack_context = AckInterceptContext {
900 packet_space: self,
901 timestamp: datagram.timestamp,
902 path_id,
903 path_manager,
904 packet_number,
905 handshake_status,
906 local_id_registry,
907 random_generator,
908 publisher,
909 on_processed_frame: |ack_frame| processed_packet.on_processed_frame(ack_frame),
910 error: None,
911 };
912 packet_interceptor.intercept_rx_ack(&ack_context.publisher.subject(), &mut ack_context);
913 if let Some(error) = ack_context.error {
914 Err(error)?;
915 }
916 }
917
918 while !payload.is_empty() {
919 let (frame, remaining) = payload
920 .decode::<FrameMut>()
921 .map_err(transport::Error::from)?;
922
923 let path = &path_manager[path_id];
924 publisher.on_frame_received(event::builder::FrameReceived {
925 packet_header: event::builder::PacketHeader::new(
926 packet_number,
927 publisher.quic_version(),
928 ),
929 path: path_event!(path, path_id),
930 frame: frame.into_event(),
931 });
932
933 match frame {
934 Frame::Padding(frame) => {
935 let _ = on_frame_processed!(frame);
941 }
942 Frame::Ping(frame) => {
943 let _ = on_frame_processed!(frame);
947 }
948 Frame::Crypto(frame) => {
949 let on_error = on_frame_processed!(frame);
950
951 self.handle_crypto_frame(
958 frame.into(),
959 datagram,
960 &mut path_manager[path_id],
961 publisher,
962 )
963 .map_err(on_error)?;
964 processed_packet.contains_crypto = true;
965 }
966 Frame::Ack(frame) => {
967 let on_error = on_frame_processed!(frame);
968 self.handle_ack_frame(
969 frame,
970 datagram.timestamp,
971 path_id,
972 path_manager,
973 packet_number,
974 handshake_status,
975 local_id_registry,
976 random_generator,
977 publisher,
978 )
979 .map_err(on_error)?;
980 }
981 Frame::ConnectionClose(frame) => {
982 let on_error = on_frame_processed!(frame);
983 self.handle_connection_close_frame(
984 frame,
985 path_id,
986 &mut path_manager[path_id],
987 packet_number,
988 publisher,
989 )
990 .map_err(on_error)?;
991
992 return Err(connection::Error::from(frame));
996 }
997 Frame::Stream(frame) => {
998 let on_error = on_frame_processed!(frame);
999 self.handle_stream_frame(frame.into(), &mut processed_packet)
1000 .map_err(on_error)?;
1001 }
1002 Frame::Datagram(frame) => {
1003 let on_error = on_frame_processed!(frame);
1004 self.handle_datagram_frame(
1005 path_event!(path, path_id).into_event(),
1006 frame.into(),
1007 )
1008 .map_err(on_error)?;
1009 }
1010 Frame::DataBlocked(frame) => {
1011 let on_error = on_frame_processed!(frame);
1012 self.handle_data_blocked_frame(frame).map_err(on_error)?;
1013 }
1014 Frame::MaxData(frame) => {
1015 let on_error = on_frame_processed!(frame);
1016 self.handle_max_data_frame(frame).map_err(on_error)?;
1017 }
1018 Frame::MaxStreamData(frame) => {
1019 let on_error = on_frame_processed!(frame);
1020 self.handle_max_stream_data_frame(frame).map_err(on_error)?;
1021 }
1022 Frame::MaxStreams(frame) => {
1023 let on_error = on_frame_processed!(frame);
1024 self.handle_max_streams_frame(frame).map_err(on_error)?;
1025 }
1026 Frame::ResetStream(frame) => {
1027 let on_error = on_frame_processed!(frame);
1028 self.handle_reset_stream_frame(frame).map_err(on_error)?;
1029 }
1030 Frame::StopSending(frame) => {
1031 let on_error = on_frame_processed!(frame);
1032 self.handle_stop_sending_frame(frame).map_err(on_error)?;
1033 }
1034 Frame::StreamDataBlocked(frame) => {
1035 let on_error = on_frame_processed!(frame);
1036 self.handle_stream_data_blocked_frame(frame)
1037 .map_err(on_error)?;
1038 }
1039 Frame::StreamsBlocked(frame) => {
1040 let on_error = on_frame_processed!(frame);
1041 self.handle_streams_blocked_frame(frame).map_err(on_error)?;
1042 }
1043 Frame::NewToken(frame) => {
1044 let on_error = on_frame_processed!(frame);
1045 self.handle_new_token_frame(frame).map_err(on_error)?;
1046 }
1047 Frame::NewConnectionId(frame) => {
1048 let on_error = on_frame_processed!(frame);
1049 self.handle_new_connection_id_frame(frame, datagram, path_manager, publisher)
1050 .map_err(on_error)?;
1051 }
1052 Frame::RetireConnectionId(frame) => {
1053 let on_error = on_frame_processed!(frame);
1054 self.handle_retire_connection_id_frame(
1055 frame,
1056 datagram,
1057 &mut path_manager[path_id],
1058 local_id_registry,
1059 )
1060 .map_err(on_error)?;
1061 }
1062 Frame::PathChallenge(frame) => {
1063 let on_error = on_frame_processed!(frame);
1064
1065 if path_manager.active_path_id() == path_id {
1069 processed_packet.path_challenge_on_active_path = true;
1070 }
1071 self.handle_path_challenge_frame(frame, path_id, path_manager)
1072 .map_err(on_error)?;
1073 }
1074 Frame::PathResponse(frame) => {
1075 let on_error = on_frame_processed!(frame);
1076
1077 self.handle_path_response_frame(
1078 frame,
1079 datagram.timestamp,
1080 path_manager,
1081 handshake_status,
1082 random_generator,
1083 publisher,
1084 )
1085 .map_err(on_error)?;
1086 }
1087 Frame::HandshakeDone(frame) => {
1088 let on_error = on_frame_processed!(frame);
1089 self.handle_handshake_done_frame(
1090 frame,
1091 datagram.timestamp,
1092 &mut path_manager[path_id],
1093 local_id_registry,
1094 handshake_status,
1095 random_generator,
1096 publisher,
1097 )
1098 .map_err(on_error)?;
1099 }
1100 Frame::DcStatelessResetTokens(frame) => {
1101 let on_error = on_frame_processed!(frame);
1102 self.handle_dc_stateless_reset_tokens_frame(frame, publisher)
1103 .map_err(on_error)?;
1104 }
1105 }
1106
1107 payload = remaining;
1108 }
1109
1110 if processed_packet.frames == 0 {
1116 return Err(transport::Error::PROTOCOL_VIOLATION
1117 .with_reason("packet contained no frames")
1118 .into());
1119 }
1120
1121 let amplification_outcome = path_manager.on_processed_packet(
1122 path_id,
1123 datagram.source_connection_id,
1124 processed_packet.path_validation_probing,
1125 random_generator,
1126 publisher,
1127 )?;
1128
1129 if amplification_outcome.is_active_path_unblocked() {
1130 self.on_amplification_unblocked(
1131 path_manager,
1132 datagram.timestamp,
1133 handshake_status.is_confirmed(),
1134 random_generator,
1135 );
1136 }
1137
1138 self.on_processed_packet(processed_packet, path_id, &path_manager[path_id], publisher)?;
1150
1151 Ok(processed_packet)
1152 }
1153}
1154
1155struct AckInterceptContext<
1156 'a,
1157 Config: endpoint::Config,
1158 Pub: event::ConnectionPublisher,
1159 Space: PacketSpace<Config>,
1160 OnProcessedFrame: FnMut(&Ack<&ack::Ranges>),
1161> {
1162 packet_space: &'a mut Space,
1163 timestamp: Timestamp,
1164 path_id: path::Id,
1165 path_manager: &'a mut path::Manager<Config>,
1166 packet_number: PacketNumber,
1167 handshake_status: &'a mut HandshakeStatus,
1168 local_id_registry: &'a mut connection::LocalIdRegistry,
1169 random_generator: &'a mut Config::RandomGenerator,
1170 publisher: &'a mut Pub,
1171 on_processed_frame: OnProcessedFrame,
1172 error: Option<transport::Error>,
1173}
1174
1175impl<
1176 Config: endpoint::Config,
1177 Pub: event::ConnectionPublisher,
1178 Space: PacketSpace<Config>,
1179 OnProcessedFrame: FnMut(&Ack<&ack::Ranges>),
1180 > s2n_quic_core::packet::interceptor::Ack
1181 for AckInterceptContext<'_, Config, Pub, Space, OnProcessedFrame>
1182{
1183 fn space(&self) -> PacketNumberSpace {
1184 self.packet_number.space()
1185 }
1186
1187 fn insert_range(&mut self, range: RangeInclusive<VarInt>) {
1188 use s2n_quic_core::packet::number::PacketNumberRange;
1189
1190 let mut ack_ranges = ack::Ranges::default();
1191 let pn_range = PacketNumberRange::new(
1192 self.space().new_packet_number(*range.start()),
1193 self.space().new_packet_number(*range.end()),
1194 );
1195 if ack_ranges.insert_packet_number_range(pn_range).is_err() {
1196 self.error = Some(
1197 transport::Error::INTERNAL_ERROR
1198 .with_reason("Ack interceptor inserted an invalid packet range"),
1199 );
1200 return;
1201 }
1202 let ack_frame = Ack {
1203 ack_delay: Default::default(),
1204 ack_ranges: &ack_ranges,
1205 ecn_counts: None,
1206 };
1207 (self.on_processed_frame)(&ack_frame);
1208
1209 let on_error = {
1210 let frame_type = ack_frame.tag();
1211 move |err: transport::Error| err.with_frame_type(frame_type.into())
1212 };
1213 self.error = self
1214 .packet_space
1215 .handle_ack_frame(
1216 ack_frame,
1217 self.timestamp,
1218 self.path_id,
1219 self.path_manager,
1220 self.packet_number,
1221 self.handshake_status,
1222 self.local_id_registry,
1223 self.random_generator,
1224 self.publisher,
1225 )
1226 .map_err(on_error)
1227 .err();
1228 }
1229}