1use crate::association::{
2 state::{AckMode, AckState, AssociationState},
3 stats::AssociationStats,
4};
5use crate::chunk::{
6 chunk_abort::ChunkAbort, chunk_cookie_ack::ChunkCookieAck, chunk_cookie_echo::ChunkCookieEcho,
7 chunk_error::ChunkError, chunk_forward_tsn::ChunkForwardTsn,
8 chunk_forward_tsn::ChunkForwardTsnStream, chunk_heartbeat::ChunkHeartbeat,
9 chunk_heartbeat_ack::ChunkHeartbeatAck, chunk_init::ChunkInit, chunk_init::ChunkInitAck,
10 chunk_payload_data::ChunkPayloadData, chunk_payload_data::PayloadProtocolIdentifier,
11 chunk_reconfig::ChunkReconfig, chunk_selective_ack::ChunkSelectiveAck,
12 chunk_shutdown::ChunkShutdown, chunk_shutdown_ack::ChunkShutdownAck,
13 chunk_shutdown_complete::ChunkShutdownComplete, chunk_type::CT_FORWARD_TSN, Chunk,
14 ErrorCauseUnrecognizedChunkType, USER_INITIATED_ABORT,
15};
16use crate::config::{ServerConfig, TransportConfig, COMMON_HEADER_SIZE, DATA_CHUNK_HEADER_SIZE};
17use crate::error::{Error, Result};
18use crate::packet::{CommonHeader, Packet};
19use crate::param::{
20 param_heartbeat_info::ParamHeartbeatInfo,
21 param_outgoing_reset_request::ParamOutgoingResetRequest,
22 param_reconfig_response::{ParamReconfigResponse, ReconfigResult},
23 param_state_cookie::ParamStateCookie,
24 param_supported_extensions::ParamSupportedExtensions,
25 Param,
26};
27use crate::queue::{payload_queue::PayloadQueue, pending_queue::PendingQueue};
28use crate::shared::{AssociationEventInner, AssociationId, EndpointEvent, EndpointEventInner};
29use crate::util::{sna16lt, sna32gt, sna32gte, sna32lt, sna32lte};
30use crate::{AssociationEvent, Payload, Side, Transmit};
31use stream::{ReliabilityType, Stream, StreamEvent, StreamId, StreamState};
32use timer::{RtoManager, Timer, TimerTable, ACK_INTERVAL};
33
34use crate::association::stream::RecvSendState;
35use bytes::Bytes;
36use log::{debug, error, trace, warn};
37use rand::random;
38use rustc_hash::FxHashMap;
39use std::collections::{HashMap, VecDeque};
40use std::net::{IpAddr, SocketAddr};
41use std::str::FromStr;
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44use thiserror::Error;
45
46pub(crate) mod state;
47pub(crate) mod stats;
48pub(crate) mod stream;
49mod timer;
50
51#[cfg(test)]
52mod association_test;
53
54#[derive(Debug, Error, Eq, Clone, PartialEq)]
56pub enum AssociationError {
57 #[error("{0}")]
59 HandshakeFailed(#[from] Error),
60 #[error("transport error")]
62 TransportError,
63 #[error("aborted by peer")]
65 AssociationClosed,
66 #[error("closed by peer")]
68 ApplicationClosed,
69 #[error("reset by peer")]
71 Reset,
72 #[error("timed out")]
77 TimedOut,
78 #[error("closed")]
80 LocallyClosed,
81}
82
83#[derive(Debug)]
85pub enum Event {
86 Connected,
88 AssociationLost {
92 reason: AssociationError,
94 },
95 Stream(StreamEvent),
97 DatagramReceived,
99}
100
101#[derive(Debug)]
119pub struct Association {
120 side: Side,
121 state: AssociationState,
122 handshake_completed: bool,
123 max_message_size: u32,
124 inflight_queue_length: usize,
125 will_send_shutdown: bool,
126 bytes_received: usize,
127 bytes_sent: usize,
128
129 peer_verification_tag: u32,
130 my_verification_tag: u32,
131 my_next_tsn: u32,
132 peer_last_tsn: u32,
133 min_tsn2measure_rtt: u32,
135 will_send_forward_tsn: bool,
136 will_retransmit_fast: bool,
137 will_retransmit_reconfig: bool,
138
139 will_send_shutdown_ack: bool,
140 will_send_shutdown_complete: bool,
141
142 my_next_rsn: u32,
144 reconfigs: FxHashMap<u32, ChunkReconfig>,
145 reconfig_requests: FxHashMap<u32, ParamOutgoingResetRequest>,
146
147 remote_addr: SocketAddr,
149 local_ip: Option<IpAddr>,
150 source_port: u16,
151 destination_port: u16,
152 my_max_num_inbound_streams: u16,
153 my_max_num_outbound_streams: u16,
154 my_cookie: Option<ParamStateCookie>,
155
156 payload_queue: PayloadQueue,
157 inflight_queue: PayloadQueue,
158 pending_queue: PendingQueue,
159 control_queue: VecDeque<Packet>,
160 stream_queue: VecDeque<u16>,
161
162 pub(crate) mtu: u32,
163 max_payload_size: u32,
165 cumulative_tsn_ack_point: u32,
166 advanced_peer_tsn_ack_point: u32,
167 use_forward_tsn: bool,
168
169 pub(crate) rto_mgr: RtoManager,
170 timers: TimerTable,
171
172 max_receive_buffer_size: u32,
174 pub(crate) cwnd: u32,
176 rwnd: u32,
178 pub(crate) ssthresh: u32,
180 partial_bytes_acked: u32,
181 pub(crate) in_fast_recovery: bool,
182 fast_recover_exit_point: u32,
183
184 stored_init: Option<ChunkInit>,
186 stored_cookie_echo: Option<ChunkCookieEcho>,
187 pub(crate) streams: FxHashMap<StreamId, StreamState>,
188
189 events: VecDeque<Event>,
190 endpoint_events: VecDeque<EndpointEventInner>,
191 error: Option<AssociationError>,
192
193 delayed_ack_triggered: bool,
195 immediate_ack_triggered: bool,
196
197 pub(crate) stats: AssociationStats,
198 ack_state: AckState,
199
200 pub(crate) ack_mode: AckMode,
202}
203
204impl Default for Association {
205 fn default() -> Self {
206 Association {
207 side: Side::default(),
208 state: AssociationState::default(),
209 handshake_completed: false,
210 max_message_size: 0,
211 inflight_queue_length: 0,
212 will_send_shutdown: false,
213 bytes_received: 0,
214 bytes_sent: 0,
215
216 peer_verification_tag: 0,
217 my_verification_tag: 0,
218 my_next_tsn: 0,
219 peer_last_tsn: 0,
220 min_tsn2measure_rtt: 0,
222 will_send_forward_tsn: false,
223 will_retransmit_fast: false,
224 will_retransmit_reconfig: false,
225
226 will_send_shutdown_ack: false,
227 will_send_shutdown_complete: false,
228
229 my_next_rsn: 0,
231 reconfigs: FxHashMap::default(),
232 reconfig_requests: FxHashMap::default(),
233
234 remote_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
236 local_ip: None,
237 source_port: 0,
238 destination_port: 0,
239 my_max_num_inbound_streams: 0,
240 my_max_num_outbound_streams: 0,
241 my_cookie: None,
242
243 payload_queue: PayloadQueue::default(),
244 inflight_queue: PayloadQueue::default(),
245 pending_queue: PendingQueue::default(),
246 control_queue: VecDeque::default(),
247 stream_queue: VecDeque::default(),
248
249 mtu: 0,
250 max_payload_size: 0,
252 cumulative_tsn_ack_point: 0,
253 advanced_peer_tsn_ack_point: 0,
254 use_forward_tsn: false,
255
256 rto_mgr: RtoManager::default(),
257 timers: TimerTable::default(),
258
259 max_receive_buffer_size: 0,
261 cwnd: 0,
263 rwnd: 0,
265 ssthresh: 0,
267 partial_bytes_acked: 0,
268 in_fast_recovery: false,
269 fast_recover_exit_point: 0,
270
271 stored_init: None,
273 stored_cookie_echo: None,
274 streams: FxHashMap::default(),
275
276 events: VecDeque::default(),
277 endpoint_events: VecDeque::default(),
278 error: None,
279
280 delayed_ack_triggered: false,
282 immediate_ack_triggered: false,
283
284 stats: AssociationStats::default(),
285 ack_state: AckState::default(),
286
287 ack_mode: AckMode::default(),
289 }
290 }
291}
292
293impl Association {
294 pub(crate) fn new(
295 server_config: Option<Arc<ServerConfig>>,
296 config: Arc<TransportConfig>,
297 max_payload_size: u32,
298 local_aid: AssociationId,
299 remote_addr: SocketAddr,
300 local_ip: Option<IpAddr>,
301 now: Instant,
302 ) -> Self {
303 let side = if server_config.is_some() {
304 Side::Server
305 } else {
306 Side::Client
307 };
308
309 let mtu = max_payload_size + COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE;
312
313 let cwnd = (2 * mtu).clamp(4380, 4 * mtu);
317 let mut tsn = random::<u32>();
318 if tsn == 0 {
319 tsn += 1;
320 }
321
322 let mut this = Association {
323 side,
324 handshake_completed: false,
325 max_receive_buffer_size: config.max_receive_buffer_size(),
326 max_message_size: config.max_message_size(),
327 my_max_num_outbound_streams: config.max_num_outbound_streams(),
328 my_max_num_inbound_streams: config.max_num_inbound_streams(),
329 max_payload_size,
330
331 rto_mgr: RtoManager::new(
332 config.rto_initial_ms(),
333 config.rto_min_ms(),
334 config.rto_max_ms(),
335 ),
336 timers: TimerTable::new(
337 config.max_init_retransmits(),
338 config.max_data_retransmits(),
339 config.rto_max_ms(),
340 ),
341
342 mtu,
343 cwnd,
344 remote_addr,
345 local_ip,
346
347 my_verification_tag: local_aid,
348 my_next_tsn: tsn,
349 my_next_rsn: tsn,
350 min_tsn2measure_rtt: tsn,
351 cumulative_tsn_ack_point: tsn - 1,
352 advanced_peer_tsn_ack_point: tsn - 1,
353 error: None,
354
355 ..Default::default()
356 };
357
358 if side.is_client() {
359 let mut init = ChunkInit {
360 initial_tsn: this.my_next_tsn,
361 num_outbound_streams: this.my_max_num_outbound_streams,
362 num_inbound_streams: this.my_max_num_inbound_streams,
363 initiate_tag: this.my_verification_tag,
364 advertised_receiver_window_credit: this.max_receive_buffer_size,
365 ..Default::default()
366 };
367 init.set_supported_extensions();
368
369 this.set_state(AssociationState::CookieWait);
370 this.stored_init = Some(init);
371 let _ = this.send_init();
372 this.timers
373 .start(Timer::T1Init, now, this.rto_mgr.get_rto());
374 }
375
376 this
377 }
378
379 #[must_use]
385 pub fn poll(&mut self) -> Option<Event> {
386 if let Some(x) = self.events.pop_front() {
387 return Some(x);
388 }
389
390 if let Some(err) = self.error.take() {
395 return Some(Event::AssociationLost { reason: err });
396 }
397
398 None
399 }
400
401 #[must_use]
403 pub fn poll_endpoint_event(&mut self) -> Option<EndpointEvent> {
404 self.endpoint_events.pop_front().map(EndpointEvent)
405 }
406
407 #[must_use]
415 pub fn poll_timeout(&mut self) -> Option<Instant> {
416 self.timers.next_timeout()
417 }
418
419 #[must_use]
426 pub fn poll_transmit(&mut self, now: Instant) -> Option<Transmit> {
427 let (contents, _) = self.gather_outbound(now);
428 if contents.is_empty() {
429 None
430 } else {
431 trace!(
432 "[{}] sending {} bytes (total {} datagrams)",
433 self.side,
434 contents.iter().fold(0, |l, c| l + c.len()),
435 contents.len()
436 );
437 Some(Transmit {
438 now,
439 remote: self.remote_addr,
440 payload: Payload::RawEncode(contents),
441 ecn: None,
442 local_ip: self.local_ip,
443 })
444 }
445 }
446
447 pub fn handle_timeout(&mut self, now: Instant) {
457 for &timer in &Timer::VALUES {
458 let (expired, failure, n_rtos) = self.timers.is_expired(timer, now);
459 if !expired {
460 continue;
461 }
462 self.timers.set(timer, None);
463 if timer == Timer::Ack {
466 self.on_ack_timeout();
467 } else if failure {
468 self.on_retransmission_failure(timer);
469 } else {
470 self.on_retransmission_timeout(timer, n_rtos);
471 self.timers.start(timer, now, self.rto_mgr.get_rto());
472 }
473 }
474 }
475
476 pub fn handle_event(&mut self, event: AssociationEvent) {
482 match event.0 {
483 AssociationEventInner::Datagram(transmit) => {
484 if let Payload::PartialDecode(partial_decode) = transmit.payload {
494 trace!(
495 "[{}] receiving {} bytes",
496 self.side,
497 COMMON_HEADER_SIZE as usize + partial_decode.remaining.len()
498 );
499
500 let pkt = match partial_decode.finish() {
501 Ok(p) => p,
502 Err(err) => {
503 warn!("[{}] unable to parse SCTP packet {}", self.side, err);
504 return;
505 }
506 };
507
508 if let Err(err) = self.handle_inbound(pkt, transmit.now) {
509 error!("handle_inbound got err: {}", err);
510 let _ = self.close();
511 }
512 } else {
513 trace!("discarding invalid partial_decode");
514 }
515 } }
517 }
518
519 pub fn stats(&self) -> AssociationStats {
521 self.stats
522 }
523
524 pub fn is_handshaking(&self) -> bool {
529 !self.handshake_completed
530 }
531
532 pub fn is_closed(&self) -> bool {
540 self.state == AssociationState::Closed
541 }
542
543 pub fn is_drained(&self) -> bool {
548 self.state.is_drained()
549 }
550
551 pub fn side(&self) -> Side {
553 self.side
554 }
555
556 pub fn remote_addr(&self) -> SocketAddr {
558 self.remote_addr
559 }
560
561 pub fn rtt(&self) -> Duration {
563 Duration::from_millis(self.rto_mgr.get_rto())
564 }
565
566 pub fn local_ip(&self) -> Option<IpAddr> {
581 self.local_ip
582 }
583
584 pub fn shutdown(&mut self) -> Result<()> {
588 debug!("[{}] closing association..", self.side);
589
590 let state = self.state();
591 if state != AssociationState::Established {
592 return Err(Error::ErrShutdownNonEstablished);
593 }
594
595 self.set_state(AssociationState::ShutdownPending);
597
598 if self.inflight_queue_length == 0 {
599 self.will_send_shutdown = true;
601 self.awake_write_loop();
602 self.set_state(AssociationState::ShutdownSent);
603 }
604
605 self.endpoint_events.push_back(EndpointEventInner::Drained);
606
607 Ok(())
608 }
609
610 pub fn close(&mut self) -> Result<()> {
612 if self.state() != AssociationState::Closed {
613 self.set_state(AssociationState::Closed);
614
615 debug!("[{}] closing association..", self.side);
616
617 self.close_all_timers();
618
619 for si in self.streams.keys().cloned().collect::<Vec<u16>>() {
620 self.unregister_stream(si);
621 }
622
623 debug!("[{}] association closed", self.side);
624 debug!(
625 "[{}] stats nDATAs (in) : {}",
626 self.side,
627 self.stats.get_num_datas()
628 );
629 debug!(
630 "[{}] stats nSACKs (in) : {}",
631 self.side,
632 self.stats.get_num_sacks()
633 );
634 debug!(
635 "[{}] stats nT3Timeouts : {}",
636 self.side,
637 self.stats.get_num_t3timeouts()
638 );
639 debug!(
640 "[{}] stats nAckTimeouts: {}",
641 self.side,
642 self.stats.get_num_ack_timeouts()
643 );
644 debug!(
645 "[{}] stats nFastRetrans: {}",
646 self.side,
647 self.stats.get_num_fast_retrans()
648 );
649 }
650
651 Ok(())
652 }
653
654 pub fn open_stream(
656 &mut self,
657 stream_identifier: StreamId,
658 default_payload_type: PayloadProtocolIdentifier,
659 ) -> Result<Stream<'_>> {
660 if self.streams.contains_key(&stream_identifier) {
661 return Err(Error::ErrStreamAlreadyExist);
662 }
663
664 if let Some(s) = self.create_stream(stream_identifier, false, default_payload_type) {
665 Ok(s)
666 } else {
667 Err(Error::ErrStreamCreateFailed)
668 }
669 }
670
671 pub fn accept_stream(&mut self) -> Option<Stream<'_>> {
673 self.stream_queue
674 .pop_front()
675 .map(move |stream_identifier| Stream {
676 stream_identifier,
677 association: self,
678 })
679 }
680
681 pub fn stream(&mut self, stream_identifier: StreamId) -> Result<Stream<'_>> {
683 if !self.streams.contains_key(&stream_identifier) {
684 Err(Error::ErrStreamNotExisted)
685 } else {
686 Ok(Stream {
687 stream_identifier,
688 association: self,
689 })
690 }
691 }
692
693 pub(crate) fn bytes_sent(&self) -> usize {
695 self.bytes_sent
696 }
697
698 pub(crate) fn bytes_received(&self) -> usize {
700 self.bytes_received
701 }
702
703 pub(crate) fn max_message_size(&self) -> u32 {
705 self.max_message_size
706 }
707
708 pub(crate) fn set_max_message_size(&mut self, max_message_size: u32) {
710 self.max_message_size = max_message_size;
711 }
712
713 fn unregister_stream(&mut self, stream_identifier: StreamId) {
716 if let Some(mut s) = self.streams.remove(&stream_identifier) {
717 debug!("[{}] unregister_stream {}", self.side, stream_identifier);
718 s.state = RecvSendState::Closed;
719 }
720 }
721
722 fn set_state(&mut self, new_state: AssociationState) {
724 if new_state != self.state {
725 debug!(
726 "[{}] state change: '{}' => '{}'",
727 self.side, self.state, new_state,
728 );
729 }
730 self.state = new_state;
731 }
732
733 pub(crate) fn state(&self) -> AssociationState {
735 self.state
736 }
737
738 fn send_init(&mut self) -> Result<()> {
740 if let Some(stored_init) = &self.stored_init {
741 debug!("[{}] sending INIT", self.side);
742
743 self.source_port = 5000; self.destination_port = 5000; let outbound = Packet {
747 common_header: CommonHeader {
748 source_port: self.source_port,
749 destination_port: self.destination_port,
750 verification_tag: self.peer_verification_tag,
751 },
752 chunks: vec![Box::new(stored_init.clone())],
753 };
754
755 self.control_queue.push_back(outbound);
756 self.awake_write_loop();
757
758 Ok(())
759 } else {
760 Err(Error::ErrInitNotStoredToSend)
761 }
762 }
763
764 fn send_cookie_echo(&mut self) -> Result<()> {
766 if let Some(stored_cookie_echo) = &self.stored_cookie_echo {
767 debug!("[{}] sending COOKIE-ECHO", self.side);
768
769 let outbound = Packet {
770 common_header: CommonHeader {
771 source_port: self.source_port,
772 destination_port: self.destination_port,
773 verification_tag: self.peer_verification_tag,
774 },
775 chunks: vec![Box::new(stored_cookie_echo.clone())],
776 };
777
778 self.control_queue.push_back(outbound);
779 self.awake_write_loop();
780
781 Ok(())
782 } else {
783 Err(Error::ErrCookieEchoNotStoredToSend)
784 }
785 }
786
787 fn handle_inbound(&mut self, p: Packet, now: Instant) -> Result<()> {
789 if let Err(err) = p.check_packet() {
790 warn!("[{}] failed validating packet {}", self.side, err);
791 return Ok(());
792 }
793
794 self.handle_chunk_start();
795
796 for c in &p.chunks {
797 self.handle_chunk(&p, c, now)?;
798 }
799
800 self.handle_chunk_end(now);
801
802 Ok(())
803 }
804
805 fn handle_chunk_start(&mut self) {
806 self.delayed_ack_triggered = false;
807 self.immediate_ack_triggered = false;
808 }
809
810 fn handle_chunk_end(&mut self, now: Instant) {
811 if self.immediate_ack_triggered {
812 self.ack_state = AckState::Immediate;
813 self.timers.stop(Timer::Ack);
814 self.awake_write_loop();
815 } else if self.delayed_ack_triggered {
816 self.ack_state = AckState::Delay;
818 self.timers.start(Timer::Ack, now, ACK_INTERVAL);
819 }
820 }
821
822 #[allow(clippy::borrowed_box)]
823 fn handle_chunk(
824 &mut self,
825 p: &Packet,
826 chunk: &Box<dyn Chunk + Send + Sync>,
827 now: Instant,
828 ) -> Result<()> {
829 chunk.check()?;
830 let chunk_any = chunk.as_any();
831 let packets = if let Some(c) = chunk_any.downcast_ref::<ChunkInit>() {
832 if c.is_ack {
833 self.handle_init_ack(p, c, now)?
834 } else {
835 self.handle_init(p, c)?
836 }
837 } else if let Some(c) = chunk_any.downcast_ref::<ChunkAbort>() {
838 let mut err_str = String::new();
839 for e in &c.error_causes {
840 if matches!(e.code, USER_INITIATED_ABORT) {
841 debug!("User initiated abort received");
842 let _ = self.close();
843 return Ok(());
844 }
845 err_str += &format!("({})", e);
846 }
847 return Err(Error::ErrAbortChunk(err_str));
848 } else if let Some(c) = chunk_any.downcast_ref::<ChunkError>() {
849 let mut err_str = String::new();
850 for e in &c.error_causes {
851 err_str += &format!("({})", e);
852 }
853 return Err(Error::ErrAbortChunk(err_str));
854 } else if let Some(c) = chunk_any.downcast_ref::<ChunkHeartbeat>() {
855 self.handle_heartbeat(c)?
856 } else if let Some(c) = chunk_any.downcast_ref::<ChunkCookieEcho>() {
857 self.handle_cookie_echo(c)?
858 } else if chunk_any.downcast_ref::<ChunkCookieAck>().is_some() {
859 self.handle_cookie_ack()?
860 } else if let Some(c) = chunk_any.downcast_ref::<ChunkPayloadData>() {
861 self.handle_data(c)?
862 } else if let Some(c) = chunk_any.downcast_ref::<ChunkSelectiveAck>() {
863 self.handle_sack(c, now)?
864 } else if let Some(c) = chunk_any.downcast_ref::<ChunkReconfig>() {
865 self.handle_reconfig(c)?
866 } else if let Some(c) = chunk_any.downcast_ref::<ChunkForwardTsn>() {
867 self.handle_forward_tsn(c)?
868 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdown>() {
869 self.handle_shutdown(c)?
870 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownAck>() {
871 self.handle_shutdown_ack(c)?
872 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownComplete>() {
873 self.handle_shutdown_complete(c)?
874 } else {
875 return Err(Error::ErrChunkTypeUnhandled);
876 };
877
878 if !packets.is_empty() {
879 let mut buf: VecDeque<_> = packets.into_iter().collect();
880 self.control_queue.append(&mut buf);
881 self.awake_write_loop();
882 }
883
884 Ok(())
885 }
886
887 fn handle_init(&mut self, p: &Packet, i: &ChunkInit) -> Result<Vec<Packet>> {
888 let state = self.state();
889 debug!("[{}] chunkInit received in state '{}'", self.side, state);
890
891 if state != AssociationState::Closed
899 && state != AssociationState::CookieWait
900 && state != AssociationState::CookieEchoed
901 {
902 return Err(Error::ErrHandleInitState);
905 }
906
907 self.my_max_num_inbound_streams =
909 std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
910 self.my_max_num_outbound_streams =
911 std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
912 self.peer_verification_tag = i.initiate_tag;
913 self.source_port = p.common_header.destination_port;
914 self.destination_port = p.common_header.source_port;
915
916 self.peer_last_tsn = if i.initial_tsn == 0 {
921 u32::MAX
922 } else {
923 i.initial_tsn - 1
924 };
925
926 for param in &i.params {
927 if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
928 for t in &v.chunk_types {
929 if *t == CT_FORWARD_TSN {
930 debug!("[{}] use ForwardTSN (on init)", self.side);
931 self.use_forward_tsn = true;
932 }
933 }
934 }
935 }
936 if !self.use_forward_tsn {
937 warn!("[{}] not using ForwardTSN (on init)", self.side);
938 }
939
940 let mut outbound = Packet {
941 common_header: CommonHeader {
942 verification_tag: self.peer_verification_tag,
943 source_port: self.source_port,
944 destination_port: self.destination_port,
945 },
946 chunks: vec![],
947 };
948
949 let mut init_ack = ChunkInit {
950 is_ack: true,
951 initial_tsn: self.my_next_tsn,
952 num_outbound_streams: self.my_max_num_outbound_streams,
953 num_inbound_streams: self.my_max_num_inbound_streams,
954 initiate_tag: self.my_verification_tag,
955 advertised_receiver_window_credit: self.max_receive_buffer_size,
956 ..Default::default()
957 };
958
959 if self.my_cookie.is_none() {
960 self.my_cookie = Some(ParamStateCookie::new());
961 }
962
963 if let Some(my_cookie) = &self.my_cookie {
964 init_ack.params = vec![Box::new(my_cookie.clone())];
965 }
966
967 init_ack.set_supported_extensions();
968
969 outbound.chunks = vec![Box::new(init_ack)];
970
971 Ok(vec![outbound])
972 }
973
974 fn handle_init_ack(
975 &mut self,
976 p: &Packet,
977 i: &ChunkInitAck,
978 now: Instant,
979 ) -> Result<Vec<Packet>> {
980 let state = self.state();
981 debug!("[{}] chunkInitAck received in state '{}'", self.side, state);
982 if state != AssociationState::CookieWait {
983 return Ok(vec![]);
990 }
991
992 self.my_max_num_inbound_streams =
993 std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
994 self.my_max_num_outbound_streams =
995 std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
996 self.peer_verification_tag = i.initiate_tag;
997 self.peer_last_tsn = if i.initial_tsn == 0 {
998 u32::MAX
999 } else {
1000 i.initial_tsn - 1
1001 };
1002 if self.source_port != p.common_header.destination_port
1003 || self.destination_port != p.common_header.source_port
1004 {
1005 warn!("[{}] handle_init_ack: port mismatch", self.side);
1006 return Ok(vec![]);
1007 }
1008
1009 self.rwnd = i.advertised_receiver_window_credit;
1010 debug!("[{}] initial rwnd={}", self.side, self.rwnd);
1011
1012 self.ssthresh = self.rwnd;
1017 trace!(
1018 "[{}] updated cwnd={} ssthresh={} inflight={} (INI)",
1019 self.side,
1020 self.cwnd,
1021 self.ssthresh,
1022 self.inflight_queue.get_num_bytes()
1023 );
1024
1025 self.timers.stop(Timer::T1Init);
1026 self.stored_init = None;
1027
1028 let mut cookie_param = None;
1029 for param in &i.params {
1030 if let Some(v) = param.as_any().downcast_ref::<ParamStateCookie>() {
1031 cookie_param = Some(v);
1032 } else if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
1033 for t in &v.chunk_types {
1034 if *t == CT_FORWARD_TSN {
1035 debug!("[{}] use ForwardTSN (on initAck)", self.side);
1036 self.use_forward_tsn = true;
1037 }
1038 }
1039 }
1040 }
1041 if !self.use_forward_tsn {
1042 warn!("[{}] not using ForwardTSN (on initAck)", self.side);
1043 }
1044
1045 if let Some(v) = cookie_param {
1046 self.stored_cookie_echo = Some(ChunkCookieEcho {
1047 cookie: v.cookie.clone(),
1048 });
1049
1050 self.send_cookie_echo()?;
1051
1052 self.timers
1053 .start(Timer::T1Cookie, now, self.rto_mgr.get_rto());
1054
1055 self.set_state(AssociationState::CookieEchoed);
1056
1057 Ok(vec![])
1058 } else {
1059 Err(Error::ErrInitAckNoCookie)
1060 }
1061 }
1062
1063 fn handle_heartbeat(&self, c: &ChunkHeartbeat) -> Result<Vec<Packet>> {
1064 trace!("[{}] chunkHeartbeat", self.side);
1065 if let Some(p) = c.params.first() {
1066 if let Some(hbi) = p.as_any().downcast_ref::<ParamHeartbeatInfo>() {
1067 return Ok(vec![Packet {
1068 common_header: CommonHeader {
1069 verification_tag: self.peer_verification_tag,
1070 source_port: self.source_port,
1071 destination_port: self.destination_port,
1072 },
1073 chunks: vec![Box::new(ChunkHeartbeatAck {
1074 params: vec![Box::new(ParamHeartbeatInfo {
1075 heartbeat_information: hbi.heartbeat_information.clone(),
1076 })],
1077 })],
1078 }]);
1079 } else {
1080 warn!(
1081 "[{}] failed to handle Heartbeat, no ParamHeartbeatInfo",
1082 self.side,
1083 );
1084 }
1085 }
1086
1087 Ok(vec![])
1088 }
1089
1090 fn handle_cookie_echo(&mut self, c: &ChunkCookieEcho) -> Result<Vec<Packet>> {
1091 let state = self.state();
1092 debug!("[{}] COOKIE-ECHO received in state '{}'", self.side, state);
1093
1094 if let Some(my_cookie) = &self.my_cookie {
1095 match state {
1096 AssociationState::Established => {
1097 if my_cookie.cookie != c.cookie {
1098 return Ok(vec![]);
1099 }
1100 }
1101 AssociationState::Closed
1102 | AssociationState::CookieWait
1103 | AssociationState::CookieEchoed => {
1104 if my_cookie.cookie != c.cookie {
1105 return Ok(vec![]);
1106 }
1107
1108 self.timers.stop(Timer::T1Init);
1109 self.stored_init = None;
1110
1111 self.timers.stop(Timer::T1Cookie);
1112 self.stored_cookie_echo = None;
1113
1114 self.events.push_back(Event::Connected);
1115 self.set_state(AssociationState::Established);
1116 self.handshake_completed = true;
1117 }
1118 _ => return Ok(vec![]),
1119 };
1120 } else {
1121 debug!("[{}] COOKIE-ECHO received before initialization", self.side);
1122 return Ok(vec![]);
1123 }
1124
1125 Ok(vec![Packet {
1126 common_header: CommonHeader {
1127 verification_tag: self.peer_verification_tag,
1128 source_port: self.source_port,
1129 destination_port: self.destination_port,
1130 },
1131 chunks: vec![Box::new(ChunkCookieAck {})],
1132 }])
1133 }
1134
1135 fn handle_cookie_ack(&mut self) -> Result<Vec<Packet>> {
1136 let state = self.state();
1137 debug!("[{}] COOKIE-ACK received in state '{}'", self.side, state);
1138 if state != AssociationState::CookieEchoed {
1139 return Ok(vec![]);
1144 }
1145
1146 self.timers.stop(Timer::T1Cookie);
1147 self.stored_cookie_echo = None;
1148
1149 self.events.push_back(Event::Connected);
1150 self.set_state(AssociationState::Established);
1151 self.handshake_completed = true;
1152
1153 Ok(vec![])
1154 }
1155
1156 fn handle_data(&mut self, d: &ChunkPayloadData) -> Result<Vec<Packet>> {
1157 trace!(
1158 "[{}] DATA: tsn={} immediateSack={} len={}",
1159 self.side,
1160 d.tsn,
1161 d.immediate_sack,
1162 d.user_data.len()
1163 );
1164 self.stats.inc_datas();
1165
1166 let can_push = self.payload_queue.can_push(d, self.peer_last_tsn);
1167 let mut stream_handle_data = false;
1168 if can_push {
1169 if self.get_or_create_stream(d.stream_identifier).is_some() {
1170 if self.get_my_receiver_window_credit() > 0 {
1171 self.payload_queue.push(d.clone(), self.peer_last_tsn);
1173 stream_handle_data = true;
1174 } else {
1175 if let Some(last_tsn) = self.payload_queue.get_last_tsn_received() {
1177 if sna32lt(d.tsn, *last_tsn) {
1178 debug!("[{}] receive buffer full, but accepted as this is a missing chunk with tsn={} ssn={}", self.side, d.tsn, d.stream_sequence_number);
1179 self.payload_queue.push(d.clone(), self.peer_last_tsn);
1180 stream_handle_data = true; }
1182 } else {
1183 debug!(
1184 "[{}] receive buffer full. dropping DATA with tsn={} ssn={}",
1185 self.side, d.tsn, d.stream_sequence_number
1186 );
1187 }
1188 }
1189 } else {
1190 debug!("[{}] discard {}", self.side, d.stream_sequence_number);
1193 return Ok(vec![]);
1194 }
1195 }
1196
1197 let immediate_sack = d.immediate_sack;
1198
1199 if stream_handle_data {
1200 if let Some(s) = self.streams.get_mut(&d.stream_identifier) {
1201 self.events.push_back(Event::DatagramReceived);
1202 s.handle_data(d);
1203 if s.reassembly_queue.is_readable() {
1204 self.events.push_back(Event::Stream(StreamEvent::Readable {
1205 id: d.stream_identifier,
1206 }))
1207 }
1208 }
1209 }
1210
1211 self.handle_peer_last_tsn_and_acknowledgement(immediate_sack)
1212 }
1213
1214 fn handle_sack(&mut self, d: &ChunkSelectiveAck, now: Instant) -> Result<Vec<Packet>> {
1215 trace!(
1216 "[{}] {}, SACK: cumTSN={} a_rwnd={}",
1217 self.side,
1218 self.cumulative_tsn_ack_point,
1219 d.cumulative_tsn_ack,
1220 d.advertised_receiver_window_credit
1221 );
1222 let state = self.state();
1223 if state != AssociationState::Established
1224 && state != AssociationState::ShutdownPending
1225 && state != AssociationState::ShutdownReceived
1226 {
1227 return Ok(vec![]);
1228 }
1229
1230 self.stats.inc_sacks();
1231
1232 if sna32gt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1233 debug!(
1242 "[{}] SACK Cumulative ACK {} is older than ACK point {}",
1243 self.side, d.cumulative_tsn_ack, self.cumulative_tsn_ack_point
1244 );
1245
1246 return Ok(vec![]);
1247 }
1248
1249 let (bytes_acked_per_stream, htna) = self.process_selective_ack(d, now)?;
1251
1252 let mut total_bytes_acked = 0;
1253 for n_bytes_acked in bytes_acked_per_stream.values() {
1254 total_bytes_acked += *n_bytes_acked;
1255 }
1256
1257 let mut cum_tsn_ack_point_advanced = false;
1258 if sna32lt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1259 trace!(
1260 "[{}] SACK: cumTSN advanced: {} -> {}",
1261 self.side,
1262 self.cumulative_tsn_ack_point,
1263 d.cumulative_tsn_ack
1264 );
1265
1266 self.cumulative_tsn_ack_point = d.cumulative_tsn_ack;
1267 cum_tsn_ack_point_advanced = true;
1268 self.on_cumulative_tsn_ack_point_advanced(total_bytes_acked, now);
1269 }
1270
1271 for (si, n_bytes_acked) in &bytes_acked_per_stream {
1272 if let Some(s) = self.streams.get_mut(si) {
1273 if s.on_buffer_released(*n_bytes_acked) {
1274 self.events
1275 .push_back(Event::Stream(StreamEvent::BufferedAmountLow { id: *si }))
1276 }
1277 }
1278 }
1279
1280 let bytes_outstanding = self.inflight_queue.get_num_bytes() as u32;
1289 if bytes_outstanding >= d.advertised_receiver_window_credit {
1290 self.rwnd = 0;
1291 } else {
1292 self.rwnd = d.advertised_receiver_window_credit - bytes_outstanding;
1293 }
1294
1295 self.process_fast_retransmission(d.cumulative_tsn_ack, htna, cum_tsn_ack_point_advanced)?;
1296
1297 if self.use_forward_tsn {
1298 if sna32lt(
1300 self.advanced_peer_tsn_ack_point,
1301 self.cumulative_tsn_ack_point,
1302 ) {
1303 self.advanced_peer_tsn_ack_point = self.cumulative_tsn_ack_point
1304 }
1305
1306 let mut i = self.advanced_peer_tsn_ack_point + 1;
1308 while let Some(c) = self.inflight_queue.get(i) {
1309 if !c.abandoned() {
1310 break;
1311 }
1312 self.advanced_peer_tsn_ack_point = i;
1313 i += 1;
1314 }
1315
1316 if sna32gt(
1318 self.advanced_peer_tsn_ack_point,
1319 self.cumulative_tsn_ack_point,
1320 ) {
1321 self.will_send_forward_tsn = true;
1322 debug!(
1323 "[{}] handleSack {}: sna32GT({}, {})",
1324 self.side,
1325 self.will_send_forward_tsn,
1326 self.advanced_peer_tsn_ack_point,
1327 self.cumulative_tsn_ack_point
1328 );
1329 }
1330 self.awake_write_loop();
1331 }
1332
1333 self.postprocess_sack(state, cum_tsn_ack_point_advanced, now);
1334
1335 Ok(vec![])
1336 }
1337
1338 fn handle_reconfig(&mut self, c: &ChunkReconfig) -> Result<Vec<Packet>> {
1339 trace!("[{}] handle_reconfig", self.side);
1340
1341 let mut pp = vec![];
1342
1343 if let Some(param_a) = &c.param_a {
1344 self.handle_reconfig_param(param_a, &mut pp)?;
1345 }
1346
1347 if let Some(param_b) = &c.param_b {
1348 self.handle_reconfig_param(param_b, &mut pp)?;
1349 }
1350
1351 Ok(pp)
1352 }
1353
1354 fn handle_forward_tsn(&mut self, c: &ChunkForwardTsn) -> Result<Vec<Packet>> {
1355 trace!("[{}] FwdTSN: {}", self.side, c);
1356
1357 if !self.use_forward_tsn {
1358 warn!("[{}] received FwdTSN but not enabled", self.side);
1359 let cerr = ChunkError {
1361 error_causes: vec![ErrorCauseUnrecognizedChunkType::default()],
1362 };
1363
1364 let outbound = Packet {
1365 common_header: CommonHeader {
1366 verification_tag: self.peer_verification_tag,
1367 source_port: self.source_port,
1368 destination_port: self.destination_port,
1369 },
1370 chunks: vec![Box::new(cerr)],
1371 };
1372 return Ok(vec![outbound]);
1373 }
1374
1375 trace!(
1384 "[{}] should send ack? newCumTSN={} peer_last_tsn={}",
1385 self.side,
1386 c.new_cumulative_tsn,
1387 self.peer_last_tsn
1388 );
1389 if sna32lte(c.new_cumulative_tsn, self.peer_last_tsn) {
1390 trace!("[{}] sending ack on Forward TSN", self.side);
1391 self.ack_state = AckState::Immediate;
1392 self.timers.stop(Timer::Ack);
1393 self.awake_write_loop();
1394 return Ok(vec![]);
1395 }
1396
1397 while sna32lt(self.peer_last_tsn, c.new_cumulative_tsn) {
1409 self.payload_queue.pop(self.peer_last_tsn + 1); self.peer_last_tsn += 1;
1411 }
1412
1413 for forwarded in &c.streams {
1417 if let Some(s) = self.streams.get_mut(&forwarded.identifier) {
1418 s.handle_forward_tsn_for_ordered(forwarded.sequence);
1419 }
1420 }
1421
1422 for s in self.streams.values_mut() {
1428 s.handle_forward_tsn_for_unordered(c.new_cumulative_tsn);
1429 }
1430
1431 self.handle_peer_last_tsn_and_acknowledgement(false)
1432 }
1433
1434 fn handle_shutdown(&mut self, _: &ChunkShutdown) -> Result<Vec<Packet>> {
1435 let state = self.state();
1436
1437 if state == AssociationState::Established {
1438 if !self.inflight_queue.is_empty() {
1439 self.set_state(AssociationState::ShutdownReceived);
1440 } else {
1441 self.will_send_shutdown_ack = true;
1443 self.set_state(AssociationState::ShutdownAckSent);
1444
1445 self.awake_write_loop();
1446 }
1447 } else if state == AssociationState::ShutdownSent {
1448 self.will_send_shutdown_ack = true;
1451 self.set_state(AssociationState::ShutdownAckSent);
1452
1453 self.awake_write_loop();
1454 }
1455
1456 Ok(vec![])
1457 }
1458
1459 fn handle_shutdown_ack(&mut self, _: &ChunkShutdownAck) -> Result<Vec<Packet>> {
1460 let state = self.state();
1461 if state == AssociationState::ShutdownSent || state == AssociationState::ShutdownAckSent {
1462 self.timers.stop(Timer::T2Shutdown);
1463 self.will_send_shutdown_complete = true;
1464
1465 self.awake_write_loop();
1466 }
1467
1468 Ok(vec![])
1469 }
1470
1471 fn handle_shutdown_complete(&mut self, _: &ChunkShutdownComplete) -> Result<Vec<Packet>> {
1472 let state = self.state();
1473 if state == AssociationState::ShutdownAckSent {
1474 self.timers.stop(Timer::T2Shutdown);
1475 self.close()?;
1476 }
1477
1478 Ok(vec![])
1479 }
1480
1481 fn handle_peer_last_tsn_and_acknowledgement(
1483 &mut self,
1484 sack_immediately: bool,
1485 ) -> Result<Vec<Packet>> {
1486 let mut reply = vec![];
1487
1488 while self.payload_queue.pop(self.peer_last_tsn + 1).is_some() {
1497 self.peer_last_tsn += 1;
1498 let rst_reqs: Vec<ParamOutgoingResetRequest> =
1501 self.reconfig_requests.values().cloned().collect();
1502 for rst_req in rst_reqs {
1503 self.reset_streams_if_any(&rst_req, false, &mut reply)?;
1504 }
1505 }
1506
1507 let has_packet_loss = !self.payload_queue.is_empty();
1508 if has_packet_loss {
1509 trace!(
1510 "[{}] packetloss: {}",
1511 self.side,
1512 self.payload_queue
1513 .get_gap_ack_blocks_string(self.peer_last_tsn)
1514 );
1515 }
1516
1517 if (self.ack_state != AckState::Immediate
1518 && !sack_immediately
1519 && !has_packet_loss
1520 && self.ack_mode == AckMode::Normal)
1521 || self.ack_mode == AckMode::AlwaysDelay
1522 {
1523 if self.ack_state == AckState::Idle {
1524 self.delayed_ack_triggered = true;
1525 } else {
1526 self.immediate_ack_triggered = true;
1527 }
1528 } else {
1529 self.immediate_ack_triggered = true;
1530 }
1531
1532 Ok(reply)
1533 }
1534
1535 #[allow(clippy::borrowed_box)]
1536 fn handle_reconfig_param(
1537 &mut self,
1538 raw: &Box<dyn Param + Send + Sync>,
1539 reply: &mut Vec<Packet>,
1540 ) -> Result<()> {
1541 if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
1542 self.reconfig_requests
1543 .insert(p.reconfig_request_sequence_number, p.clone());
1544 self.reset_streams_if_any(p, true, reply)?;
1545 Ok(())
1546 } else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
1547 self.reconfigs.remove(&p.reconfig_response_sequence_number);
1548 if self.reconfigs.is_empty() {
1549 self.timers.stop(Timer::Reconfig);
1550 }
1551 Ok(())
1552 } else {
1553 Err(Error::ErrParameterType)
1554 }
1555 }
1556
1557 fn process_selective_ack(
1558 &mut self,
1559 d: &ChunkSelectiveAck,
1560 now: Instant,
1561 ) -> Result<(HashMap<u16, i64>, u32)> {
1562 let mut bytes_acked_per_stream = HashMap::new();
1563
1564 let mut i = self.cumulative_tsn_ack_point + 1;
1568 while sna32lte(i, d.cumulative_tsn_ack) {
1570 if let Some(c) = self.inflight_queue.pop(i) {
1571 if !c.acked {
1572 if i == self.cumulative_tsn_ack_point + 1 {
1578 self.timers.stop(Timer::T3RTX);
1580 }
1581
1582 let n_bytes_acked = c.user_data.len() as i64;
1583
1584 if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1586 *amount += n_bytes_acked;
1587 } else {
1588 bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1589 }
1590
1591 if c.nsent == 1 && sna32gte(c.tsn, self.min_tsn2measure_rtt) {
1601 self.min_tsn2measure_rtt = self.my_next_tsn;
1602 if let Some(since) = &c.since {
1603 let rtt = now.duration_since(*since);
1604 let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1605 trace!(
1606 "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1607 self.side,
1608 rtt.as_millis(),
1609 srtt,
1610 self.rto_mgr.get_rto()
1611 );
1612 } else {
1613 error!("[{}] invalid c.since", self.side);
1614 }
1615 }
1616 }
1617
1618 if self.in_fast_recovery && c.tsn == self.fast_recover_exit_point {
1619 debug!("[{}] exit fast-recovery", self.side);
1620 self.in_fast_recovery = false;
1621 }
1622 } else {
1623 return Err(Error::ErrInflightQueueTsnPop);
1624 }
1625
1626 i += 1;
1627 }
1628
1629 let mut htna = d.cumulative_tsn_ack;
1630
1631 for g in &d.gap_ack_blocks {
1633 for i in g.start..=g.end {
1634 let tsn = d.cumulative_tsn_ack + i as u32;
1635
1636 let (is_existed, is_acked) = if let Some(c) = self.inflight_queue.get(tsn) {
1637 (true, c.acked)
1638 } else {
1639 (false, false)
1640 };
1641 let n_bytes_acked = if is_existed && !is_acked {
1642 self.inflight_queue.mark_as_acked(tsn) as i64
1643 } else {
1644 0
1645 };
1646
1647 if let Some(c) = self.inflight_queue.get(tsn) {
1648 if !is_acked {
1649 if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1651 *amount += n_bytes_acked;
1652 } else {
1653 bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1654 }
1655
1656 trace!("[{}] tsn={} has been sacked", self.side, c.tsn);
1657
1658 if c.nsent == 1 {
1659 self.min_tsn2measure_rtt = self.my_next_tsn;
1660 if let Some(since) = &c.since {
1661 let rtt = now.duration_since(*since);
1662 let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1663 trace!(
1664 "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1665 self.side,
1666 rtt.as_millis(),
1667 srtt,
1668 self.rto_mgr.get_rto()
1669 );
1670 } else {
1671 error!("[{}] invalid c.since", self.side);
1672 }
1673 }
1674
1675 if sna32lt(htna, tsn) {
1676 htna = tsn;
1677 }
1678 }
1679 } else {
1680 return Err(Error::ErrTsnRequestNotExist);
1681 }
1682 }
1683 }
1684
1685 Ok((bytes_acked_per_stream, htna))
1686 }
1687
1688 fn on_cumulative_tsn_ack_point_advanced(&mut self, total_bytes_acked: i64, now: Instant) {
1689 if self.inflight_queue.is_empty() {
1693 trace!(
1694 "[{}] SACK: no more packet in-flight (pending={})",
1695 self.side,
1696 self.pending_queue.len()
1697 );
1698 self.timers.stop(Timer::T3RTX);
1699 } else {
1700 trace!("[{}] T3-rtx timer start (pt2)", self.side);
1701 self.timers
1702 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1703 }
1704
1705 if self.cwnd <= self.ssthresh {
1707 if !self.in_fast_recovery && !self.pending_queue.is_empty() {
1719 self.cwnd += std::cmp::min(total_bytes_acked as u32, self.cwnd); trace!(
1722 "[{}] updated cwnd={} ssthresh={} acked={} (SS)",
1723 self.side,
1724 self.cwnd,
1725 self.ssthresh,
1726 total_bytes_acked
1727 );
1728 } else {
1729 trace!(
1730 "[{}] cwnd did not grow: cwnd={} ssthresh={} acked={} FR={} pending={}",
1731 self.side,
1732 self.cwnd,
1733 self.ssthresh,
1734 total_bytes_acked,
1735 self.in_fast_recovery,
1736 self.pending_queue.len()
1737 );
1738 }
1739 } else {
1740 self.partial_bytes_acked += total_bytes_acked as u32;
1747
1748 if self.partial_bytes_acked >= self.cwnd && !self.pending_queue.is_empty() {
1754 self.partial_bytes_acked -= self.cwnd;
1755 self.cwnd += self.mtu;
1756 trace!(
1757 "[{}] updated cwnd={} ssthresh={} acked={} (CA)",
1758 self.side,
1759 self.cwnd,
1760 self.ssthresh,
1761 total_bytes_acked
1762 );
1763 }
1764 }
1765 }
1766
1767 fn process_fast_retransmission(
1768 &mut self,
1769 cum_tsn_ack_point: u32,
1770 htna: u32,
1771 cum_tsn_ack_point_advanced: bool,
1772 ) -> Result<()> {
1773 if !self.in_fast_recovery || cum_tsn_ack_point_advanced {
1783 let max_tsn = if !self.in_fast_recovery {
1784 htna
1786 } else {
1787 cum_tsn_ack_point + (self.inflight_queue.len() as u32) + 1
1789 };
1790
1791 let mut tsn = cum_tsn_ack_point + 1;
1792 while sna32lt(tsn, max_tsn) {
1793 if let Some(c) = self.inflight_queue.get_mut(tsn) {
1794 if !c.acked && !c.abandoned() && c.miss_indicator < 3 {
1795 c.miss_indicator += 1;
1796 if c.miss_indicator == 3 && !self.in_fast_recovery {
1797 self.in_fast_recovery = true;
1801 self.fast_recover_exit_point = htna;
1802 self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
1803 self.cwnd = self.ssthresh;
1804 self.partial_bytes_acked = 0;
1805 self.will_retransmit_fast = true;
1806
1807 trace!(
1808 "[{}] updated cwnd={} ssthresh={} inflight={} (FR)",
1809 self.side,
1810 self.cwnd,
1811 self.ssthresh,
1812 self.inflight_queue.get_num_bytes()
1813 );
1814 }
1815 }
1816 } else {
1817 return Err(Error::ErrTsnRequestNotExist);
1818 }
1819
1820 tsn += 1;
1821 }
1822 }
1823
1824 if self.in_fast_recovery && cum_tsn_ack_point_advanced {
1825 self.will_retransmit_fast = true;
1826 }
1827
1828 Ok(())
1829 }
1830
1831 fn postprocess_sack(
1834 &mut self,
1835 state: AssociationState,
1836 mut should_awake_write_loop: bool,
1837 now: Instant,
1838 ) {
1839 if !self.inflight_queue.is_empty() {
1840 trace!("[{}] T3-rtx timer start (pt3)", self.side);
1842 self.timers
1843 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1844 } else if state == AssociationState::ShutdownPending {
1845 should_awake_write_loop = true;
1847 self.will_send_shutdown = true;
1848 self.set_state(AssociationState::ShutdownSent);
1849 } else if state == AssociationState::ShutdownReceived {
1850 should_awake_write_loop = true;
1852 self.will_send_shutdown_ack = true;
1853 self.set_state(AssociationState::ShutdownAckSent);
1854 }
1855
1856 if should_awake_write_loop {
1857 self.awake_write_loop();
1858 }
1859 }
1860
1861 fn reset_streams_if_any(
1862 &mut self,
1863 p: &ParamOutgoingResetRequest,
1864 respond: bool,
1865 reply: &mut Vec<Packet>,
1866 ) -> Result<()> {
1867 let mut result = ReconfigResult::SuccessPerformed;
1868 let mut sis_to_reset = vec![];
1869
1870 if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
1871 debug!(
1872 "[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
1873 self.side, p.sender_last_tsn, self.peer_last_tsn
1874 );
1875 for id in &p.stream_identifiers {
1876 if self.streams.contains_key(id) {
1877 if respond {
1878 sis_to_reset.push(*id);
1879 }
1880 self.unregister_stream(*id);
1881 }
1882 }
1883 self.reconfig_requests
1884 .remove(&p.reconfig_request_sequence_number);
1885 } else {
1886 debug!(
1887 "[{}] resetStream(): senderLastTSN={} > peer_last_tsn={}",
1888 self.side, p.sender_last_tsn, self.peer_last_tsn
1889 );
1890 result = ReconfigResult::InProgress;
1891 }
1892
1893 if !sis_to_reset.is_empty() {
1896 let rsn = self.generate_next_rsn();
1897 let tsn = self.my_next_tsn - 1;
1898
1899 let c = ChunkReconfig {
1900 param_a: Some(Box::new(ParamOutgoingResetRequest {
1901 reconfig_request_sequence_number: rsn,
1902 reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1903 sender_last_tsn: tsn,
1904 stream_identifiers: sis_to_reset,
1905 })),
1906 ..Default::default()
1907 };
1908
1909 self.reconfigs.insert(rsn, c.clone()); let p = self.create_packet(vec![Box::new(c)]);
1912 reply.push(p);
1913 }
1914
1915 let packet = self.create_packet(vec![Box::new(ChunkReconfig {
1916 param_a: Some(Box::new(ParamReconfigResponse {
1917 reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1918 result,
1919 })),
1920 param_b: None,
1921 })]);
1922
1923 debug!("[{}] RESET RESPONSE: {}", self.side, packet);
1924
1925 reply.push(packet);
1926
1927 Ok(())
1928 }
1929
1930 pub(crate) fn create_packet(&self, chunks: Vec<Box<dyn Chunk + Send + Sync>>) -> Packet {
1933 Packet {
1934 common_header: CommonHeader {
1935 verification_tag: self.peer_verification_tag,
1936 source_port: self.source_port,
1937 destination_port: self.destination_port,
1938 },
1939 chunks,
1940 }
1941 }
1942
1943 fn create_stream(
1945 &mut self,
1946 stream_identifier: StreamId,
1947 accept: bool,
1948 default_payload_type: PayloadProtocolIdentifier,
1949 ) -> Option<Stream<'_>> {
1950 let s = StreamState::new(
1951 self.side,
1952 stream_identifier,
1953 self.max_payload_size,
1954 default_payload_type,
1955 );
1956
1957 if accept {
1958 self.stream_queue.push_back(stream_identifier);
1959 self.events.push_back(Event::Stream(StreamEvent::Opened));
1960 }
1961
1962 self.streams.insert(stream_identifier, s);
1963
1964 Some(Stream {
1965 stream_identifier,
1966 association: self,
1967 })
1968 }
1969
1970 fn get_or_create_stream(&mut self, stream_identifier: StreamId) -> Option<Stream<'_>> {
1972 if self.streams.contains_key(&stream_identifier) {
1973 Some(Stream {
1974 stream_identifier,
1975 association: self,
1976 })
1977 } else {
1978 self.create_stream(
1979 stream_identifier,
1980 true,
1981 PayloadProtocolIdentifier::default(),
1982 )
1983 }
1984 }
1985
1986 pub(crate) fn get_my_receiver_window_credit(&self) -> u32 {
1987 let mut bytes_queued = 0;
1988 for s in self.streams.values() {
1989 bytes_queued += s.get_num_bytes_in_reassembly_queue() as u32;
1990 }
1991
1992 self.max_receive_buffer_size.saturating_sub(bytes_queued)
1993 }
1994
1995 fn gather_outbound(&mut self, now: Instant) -> (Vec<Bytes>, bool) {
1998 let mut raw_packets = vec![];
1999
2000 if !self.control_queue.is_empty() {
2001 for p in self.control_queue.drain(..) {
2002 if let Ok(raw) = p.marshal() {
2003 raw_packets.push(raw);
2004 } else {
2005 warn!("[{}] failed to serialize a control packet", self.side);
2006 continue;
2007 }
2008 }
2009 }
2010
2011 let state = self.state();
2012 match state {
2013 AssociationState::Established => {
2014 raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2015 raw_packets = self.gather_outbound_data_and_reconfig_packets(raw_packets, now);
2016 raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2017 raw_packets = self.gather_outbound_sack_packets(raw_packets);
2018 raw_packets = self.gather_outbound_forward_tsn_packets(raw_packets);
2019 (raw_packets, true)
2020 }
2021 AssociationState::ShutdownPending
2022 | AssociationState::ShutdownSent
2023 | AssociationState::ShutdownReceived => {
2024 raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2025 raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2026 raw_packets = self.gather_outbound_sack_packets(raw_packets);
2027 self.gather_outbound_shutdown_packets(raw_packets, now)
2028 }
2029 AssociationState::ShutdownAckSent => {
2030 self.gather_outbound_shutdown_packets(raw_packets, now)
2031 }
2032 _ => (raw_packets, true),
2033 }
2034 }
2035
2036 fn gather_data_packets_to_retransmit(
2037 &mut self,
2038 mut raw_packets: Vec<Bytes>,
2039 now: Instant,
2040 ) -> Vec<Bytes> {
2041 for p in &self.get_data_packets_to_retransmit(now) {
2042 if let Ok(raw) = p.marshal() {
2043 raw_packets.push(raw);
2044 } else {
2045 warn!(
2046 "[{}] failed to serialize a DATA packet to be retransmitted",
2047 self.side
2048 );
2049 }
2050 }
2051
2052 raw_packets
2053 }
2054
2055 fn gather_outbound_data_and_reconfig_packets(
2056 &mut self,
2057 mut raw_packets: Vec<Bytes>,
2058 now: Instant,
2059 ) -> Vec<Bytes> {
2060 let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send(now);
2063 if !chunks.is_empty() {
2064 trace!("[{}] T3-rtx timer start (pt1)", self.side);
2066 self.timers
2067 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
2068
2069 for p in &self.bundle_data_chunks_into_packets(chunks) {
2070 if let Ok(raw) = p.marshal() {
2071 raw_packets.push(raw);
2072 } else {
2073 warn!("[{}] failed to serialize a DATA packet", self.side);
2074 }
2075 }
2076 }
2077
2078 if !sis_to_reset.is_empty() || self.will_retransmit_reconfig {
2079 if self.will_retransmit_reconfig {
2080 self.will_retransmit_reconfig = false;
2081 debug!(
2082 "[{}] retransmit {} RECONFIG chunk(s)",
2083 self.side,
2084 self.reconfigs.len()
2085 );
2086 for c in self.reconfigs.values() {
2087 let p = self.create_packet(vec![Box::new(c.clone())]);
2088 if let Ok(raw) = p.marshal() {
2089 raw_packets.push(raw);
2090 } else {
2091 warn!(
2092 "[{}] failed to serialize a RECONFIG packet to be retransmitted",
2093 self.side,
2094 );
2095 }
2096 }
2097 }
2098
2099 if !sis_to_reset.is_empty() {
2100 let rsn = self.generate_next_rsn();
2101 let tsn = self.my_next_tsn - 1;
2102 debug!(
2103 "[{}] sending RECONFIG: rsn={} tsn={} streams={:?}",
2104 self.side,
2105 rsn,
2106 self.my_next_tsn - 1,
2107 sis_to_reset
2108 );
2109
2110 let c = ChunkReconfig {
2111 param_a: Some(Box::new(ParamOutgoingResetRequest {
2112 reconfig_request_sequence_number: rsn,
2113 sender_last_tsn: tsn,
2114 stream_identifiers: sis_to_reset,
2115 ..Default::default()
2116 })),
2117 ..Default::default()
2118 };
2119 self.reconfigs.insert(rsn, c.clone()); let p = self.create_packet(vec![Box::new(c)]);
2122 if let Ok(raw) = p.marshal() {
2123 raw_packets.push(raw);
2124 } else {
2125 warn!(
2126 "[{}] failed to serialize a RECONFIG packet to be transmitted",
2127 self.side
2128 );
2129 }
2130 }
2131
2132 if !self.reconfigs.is_empty() {
2133 self.timers
2134 .start(Timer::Reconfig, now, self.rto_mgr.get_rto());
2135 }
2136 }
2137
2138 raw_packets
2139 }
2140
2141 fn gather_outbound_fast_retransmission_packets(
2142 &mut self,
2143 mut raw_packets: Vec<Bytes>,
2144 now: Instant,
2145 ) -> Vec<Bytes> {
2146 if self.will_retransmit_fast {
2147 self.will_retransmit_fast = false;
2148
2149 let mut to_fast_retrans: Vec<Box<dyn Chunk + Send + Sync>> = vec![];
2150 let mut fast_retrans_size = COMMON_HEADER_SIZE;
2151
2152 let mut i = 0;
2153 loop {
2154 let tsn = self.cumulative_tsn_ack_point + i + 1;
2155 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2156 if c.acked || c.abandoned() || c.nsent > 1 || c.miss_indicator < 3 {
2157 i += 1;
2158 continue;
2159 }
2160
2161 let data_chunk_size = DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2172 if self.mtu < fast_retrans_size + data_chunk_size {
2173 break;
2174 }
2175
2176 fast_retrans_size += data_chunk_size;
2177 self.stats.inc_fast_retrans();
2178 c.nsent += 1;
2179 } else {
2180 break; }
2182
2183 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2184 Association::check_partial_reliability_status(
2185 c,
2186 now,
2187 self.use_forward_tsn,
2188 self.side,
2189 &self.streams,
2190 );
2191 to_fast_retrans.push(Box::new(c.clone()));
2192 trace!(
2193 "[{}] fast-retransmit: tsn={} sent={} htna={}",
2194 self.side,
2195 c.tsn,
2196 c.nsent,
2197 self.fast_recover_exit_point
2198 );
2199 }
2200 i += 1;
2201 }
2202
2203 if !to_fast_retrans.is_empty() {
2204 if let Ok(raw) = self.create_packet(to_fast_retrans).marshal() {
2205 raw_packets.push(raw);
2206 } else {
2207 warn!(
2208 "[{}] failed to serialize a DATA packet to be fast-retransmitted",
2209 self.side
2210 );
2211 }
2212 }
2213 }
2214
2215 raw_packets
2216 }
2217
2218 fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2219 if self.ack_state == AckState::Immediate {
2220 self.ack_state = AckState::Idle;
2221 let sack = self.create_selective_ack_chunk();
2222 trace!("[{}] sending SACK: {}", self.side, sack);
2223 if let Ok(raw) = self.create_packet(vec![Box::new(sack)]).marshal() {
2224 raw_packets.push(raw);
2225 } else {
2226 warn!("[{}] failed to serialize a SACK packet", self.side);
2227 }
2228 }
2229
2230 raw_packets
2231 }
2232
2233 fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2234 if self.will_send_forward_tsn {
2240 self.will_send_forward_tsn = false;
2241 if sna32gt(
2242 self.advanced_peer_tsn_ack_point,
2243 self.cumulative_tsn_ack_point,
2244 ) {
2245 let fwd_tsn = self.create_forward_tsn();
2246 if let Ok(raw) = self.create_packet(vec![Box::new(fwd_tsn)]).marshal() {
2247 raw_packets.push(raw);
2248 } else {
2249 warn!("[{}] failed to serialize a Forward TSN packet", self.side);
2250 }
2251 }
2252 }
2253
2254 raw_packets
2255 }
2256
2257 fn gather_outbound_shutdown_packets(
2258 &mut self,
2259 mut raw_packets: Vec<Bytes>,
2260 now: Instant,
2261 ) -> (Vec<Bytes>, bool) {
2262 let mut ok = true;
2263
2264 if self.will_send_shutdown {
2265 self.will_send_shutdown = false;
2266
2267 let shutdown = ChunkShutdown {
2268 cumulative_tsn_ack: self.cumulative_tsn_ack_point,
2269 };
2270
2271 if let Ok(raw) = self.create_packet(vec![Box::new(shutdown)]).marshal() {
2272 self.timers
2273 .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2274 raw_packets.push(raw);
2275 } else {
2276 warn!("[{}] failed to serialize a Shutdown packet", self.side);
2277 }
2278 } else if self.will_send_shutdown_ack {
2279 self.will_send_shutdown_ack = false;
2280
2281 let shutdown_ack = ChunkShutdownAck {};
2282
2283 if let Ok(raw) = self.create_packet(vec![Box::new(shutdown_ack)]).marshal() {
2284 self.timers
2285 .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2286 raw_packets.push(raw);
2287 } else {
2288 warn!("[{}] failed to serialize a ShutdownAck packet", self.side);
2289 }
2290 } else if self.will_send_shutdown_complete {
2291 self.will_send_shutdown_complete = false;
2292
2293 let shutdown_complete = ChunkShutdownComplete {};
2294
2295 if let Ok(raw) = self
2296 .create_packet(vec![Box::new(shutdown_complete)])
2297 .marshal()
2298 {
2299 raw_packets.push(raw);
2300 ok = false;
2301 } else {
2302 warn!(
2303 "[{}] failed to serialize a ShutdownComplete packet",
2304 self.side
2305 );
2306 }
2307 }
2308
2309 (raw_packets, ok)
2310 }
2311
2312 fn get_data_packets_to_retransmit(&mut self, now: Instant) -> Vec<Packet> {
2315 let awnd = std::cmp::min(self.cwnd, self.rwnd);
2316 let mut chunks = vec![];
2317 let mut bytes_to_send = 0;
2318 let mut done = false;
2319 let mut i = 0;
2320 while !done {
2321 let tsn = self.cumulative_tsn_ack_point + i + 1;
2322 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2323 if !c.retransmit {
2324 i += 1;
2325 continue;
2326 }
2327
2328 if i == 0 && self.rwnd < c.user_data.len() as u32 {
2329 done = true;
2331 } else if bytes_to_send + c.user_data.len() > awnd as usize {
2332 break;
2333 }
2334
2335 c.retransmit = false;
2338 bytes_to_send += c.user_data.len();
2339
2340 c.nsent += 1;
2341 } else {
2342 break; }
2344
2345 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2346 Association::check_partial_reliability_status(
2347 c,
2348 now,
2349 self.use_forward_tsn,
2350 self.side,
2351 &self.streams,
2352 );
2353
2354 trace!(
2355 "[{}] retransmitting tsn={} ssn={} sent={}",
2356 self.side,
2357 c.tsn,
2358 c.stream_sequence_number,
2359 c.nsent
2360 );
2361
2362 chunks.push(c.clone());
2363 }
2364 i += 1;
2365 }
2366
2367 self.bundle_data_chunks_into_packets(chunks)
2368 }
2369
2370 fn pop_pending_data_chunks_to_send(
2373 &mut self,
2374 now: Instant,
2375 ) -> (Vec<ChunkPayloadData>, Vec<u16>) {
2376 let mut chunks = vec![];
2377 let mut sis_to_reset = vec![]; if !self.pending_queue.is_empty() {
2379 while let Some(c) = self.pending_queue.peek() {
2388 let (beginning_fragment, unordered, data_len, stream_identifier) = (
2389 c.beginning_fragment,
2390 c.unordered,
2391 c.user_data.len(),
2392 c.stream_identifier,
2393 );
2394
2395 if data_len == 0 {
2396 sis_to_reset.push(stream_identifier);
2397 if self
2398 .pending_queue
2399 .pop(beginning_fragment, unordered)
2400 .is_none()
2401 {
2402 error!("[{}] failed to pop from pending queue", self.side);
2403 }
2404 continue;
2405 }
2406
2407 if self.inflight_queue.get_num_bytes() + data_len > self.cwnd as usize {
2408 break; }
2410
2411 if data_len > self.rwnd as usize {
2412 break; }
2414
2415 self.rwnd -= data_len as u32;
2416
2417 if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2418 beginning_fragment,
2419 unordered,
2420 now,
2421 ) {
2422 chunks.push(chunk);
2423 }
2424 }
2425
2426 if chunks.is_empty() && self.inflight_queue.is_empty() {
2428 if let Some(c) = self.pending_queue.peek() {
2430 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
2431
2432 if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2433 beginning_fragment,
2434 unordered,
2435 now,
2436 ) {
2437 chunks.push(chunk);
2438 }
2439 }
2440 }
2441 }
2442
2443 (chunks, sis_to_reset)
2444 }
2445
2446 fn bundle_data_chunks_into_packets(&self, chunks: Vec<ChunkPayloadData>) -> Vec<Packet> {
2450 let mut packets = vec![];
2451 let mut chunks_to_send = vec![];
2452 let mut bytes_in_packet = COMMON_HEADER_SIZE;
2453
2454 for c in chunks {
2455 if bytes_in_packet + c.user_data.len() as u32 > self.mtu {
2461 packets.push(self.create_packet(chunks_to_send));
2462 chunks_to_send = vec![];
2463 bytes_in_packet = COMMON_HEADER_SIZE;
2464 }
2465
2466 bytes_in_packet += DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2467 chunks_to_send.push(Box::new(c));
2468 }
2469
2470 if !chunks_to_send.is_empty() {
2471 packets.push(self.create_packet(chunks_to_send));
2472 }
2473
2474 packets
2475 }
2476
2477 fn generate_next_tsn(&mut self) -> u32 {
2479 let tsn = self.my_next_tsn;
2480 self.my_next_tsn += 1;
2481 tsn
2482 }
2483
2484 fn generate_next_rsn(&mut self) -> u32 {
2486 let rsn = self.my_next_rsn;
2487 self.my_next_rsn += 1;
2488 rsn
2489 }
2490
2491 fn check_partial_reliability_status(
2492 c: &mut ChunkPayloadData,
2493 now: Instant,
2494 use_forward_tsn: bool,
2495 side: Side,
2496 streams: &FxHashMap<u16, StreamState>,
2497 ) {
2498 if !use_forward_tsn {
2499 return;
2500 }
2501
2502 if c.payload_type == PayloadProtocolIdentifier::Dcep {
2508 return;
2509 }
2510
2511 if let Some(s) = streams.get(&c.stream_identifier) {
2513 let reliability_type: ReliabilityType = s.reliability_type;
2514 let reliability_value = s.reliability_value;
2515
2516 if reliability_type == ReliabilityType::Rexmit {
2517 if c.nsent >= reliability_value {
2518 c.set_abandoned(true);
2519 trace!(
2520 "[{}] marked as abandoned: tsn={} ppi={} (remix: {})",
2521 side,
2522 c.tsn,
2523 c.payload_type,
2524 c.nsent
2525 );
2526 }
2527 } else if reliability_type == ReliabilityType::Timed {
2528 if let Some(since) = &c.since {
2529 let elapsed = now.duration_since(*since);
2530 if elapsed.as_millis() as u32 >= reliability_value {
2531 c.set_abandoned(true);
2532 trace!(
2533 "[{}] marked as abandoned: tsn={} ppi={} (timed: {:?})",
2534 side,
2535 c.tsn,
2536 c.payload_type,
2537 elapsed
2538 );
2539 }
2540 } else {
2541 error!("[{}] invalid c.since", side);
2542 }
2543 }
2544 } else {
2545 error!("[{}] stream {} not found)", side, c.stream_identifier);
2546 }
2547 }
2548
2549 fn create_selective_ack_chunk(&mut self) -> ChunkSelectiveAck {
2550 ChunkSelectiveAck {
2551 cumulative_tsn_ack: self.peer_last_tsn,
2552 advertised_receiver_window_credit: self.get_my_receiver_window_credit(),
2553 gap_ack_blocks: self.payload_queue.get_gap_ack_blocks(self.peer_last_tsn),
2554 duplicate_tsn: self.payload_queue.pop_duplicates(),
2555 }
2556 }
2557
2558 fn create_forward_tsn(&self) -> ChunkForwardTsn {
2561 let mut stream_map: HashMap<u16, u16> = HashMap::new(); let mut i = self.cumulative_tsn_ack_point + 1;
2564 while sna32lte(i, self.advanced_peer_tsn_ack_point) {
2565 if let Some(c) = self.inflight_queue.get(i) {
2566 if let Some(ssn) = stream_map.get(&c.stream_identifier) {
2567 if sna16lt(*ssn, c.stream_sequence_number) {
2568 stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2570 }
2571 } else {
2572 stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2573 }
2574 } else {
2575 break;
2576 }
2577
2578 i += 1;
2579 }
2580
2581 let mut fwd_tsn = ChunkForwardTsn {
2582 new_cumulative_tsn: self.advanced_peer_tsn_ack_point,
2583 streams: vec![],
2584 };
2585
2586 let mut stream_str = String::new();
2587 for (si, ssn) in &stream_map {
2588 stream_str += format!("(si={} ssn={})", si, ssn).as_str();
2589 fwd_tsn.streams.push(ChunkForwardTsnStream {
2590 identifier: *si,
2591 sequence: *ssn,
2592 });
2593 }
2594 trace!(
2595 "[{}] building fwd_tsn: newCumulativeTSN={} cumTSN={} - {}",
2596 self.side,
2597 fwd_tsn.new_cumulative_tsn,
2598 self.cumulative_tsn_ack_point,
2599 stream_str
2600 );
2601
2602 fwd_tsn
2603 }
2604
2605 fn move_pending_data_chunk_to_inflight_queue(
2607 &mut self,
2608 beginning_fragment: bool,
2609 unordered: bool,
2610 now: Instant,
2611 ) -> Option<ChunkPayloadData> {
2612 if let Some(mut c) = self.pending_queue.pop(beginning_fragment, unordered) {
2613 if c.ending_fragment {
2615 c.set_all_inflight();
2616 }
2617
2618 c.tsn = self.generate_next_tsn();
2620
2621 c.since = Some(now); c.nsent = 1; Association::check_partial_reliability_status(
2625 &mut c,
2626 now,
2627 self.use_forward_tsn,
2628 self.side,
2629 &self.streams,
2630 );
2631
2632 trace!(
2633 "[{}] sending ppi={} tsn={} ssn={} sent={} len={} ({},{})",
2634 self.side,
2635 c.payload_type as u32,
2636 c.tsn,
2637 c.stream_sequence_number,
2638 c.nsent,
2639 c.user_data.len(),
2640 c.beginning_fragment,
2641 c.ending_fragment
2642 );
2643
2644 self.inflight_queue.push_no_check(c.clone());
2645
2646 Some(c)
2647 } else {
2648 error!("[{}] failed to pop from pending queue", self.side);
2649 None
2650 }
2651 }
2652
2653 pub(crate) fn send_reset_request(&mut self, stream_identifier: StreamId) -> Result<()> {
2654 let state = self.state();
2655 if state != AssociationState::Established {
2656 return Err(Error::ErrResetPacketInStateNotExist);
2657 }
2658
2659 let c = ChunkPayloadData {
2662 stream_identifier,
2663 beginning_fragment: true,
2664 ending_fragment: true,
2665 user_data: Bytes::new(),
2666 ..Default::default()
2667 };
2668
2669 self.pending_queue.push(c);
2670 self.awake_write_loop();
2671
2672 Ok(())
2673 }
2674
2675 pub(crate) fn send_payload_data(&mut self, chunks: Vec<ChunkPayloadData>) -> Result<()> {
2677 let state = self.state();
2678 if state != AssociationState::Established {
2679 return Err(Error::ErrPayloadDataStateNotExist);
2680 }
2681
2682 for c in chunks {
2684 self.pending_queue.push(c);
2685 }
2686
2687 self.awake_write_loop();
2688 Ok(())
2689 }
2690
2691 pub(crate) fn buffered_amount(&self) -> usize {
2694 self.pending_queue.get_num_bytes() + self.inflight_queue.get_num_bytes()
2695 }
2696
2697 fn awake_write_loop(&self) {
2698 }
2700
2701 fn close_all_timers(&mut self) {
2702 for timer in Timer::VALUES {
2704 self.timers.stop(timer);
2705 }
2706 }
2707
2708 fn on_ack_timeout(&mut self) {
2709 trace!(
2710 "[{}] ack timed out (ack_state: {})",
2711 self.side,
2712 self.ack_state
2713 );
2714 self.stats.inc_ack_timeouts();
2715 self.ack_state = AckState::Immediate;
2716 self.awake_write_loop();
2717 }
2718
2719 fn on_retransmission_timeout(&mut self, timer_id: Timer, n_rtos: usize) {
2720 match timer_id {
2721 Timer::T1Init => {
2722 if let Err(err) = self.send_init() {
2723 debug!(
2724 "[{}] failed to retransmit init (n_rtos={}): {:?}",
2725 self.side, n_rtos, err
2726 );
2727 }
2728 }
2729
2730 Timer::T1Cookie => {
2731 if let Err(err) = self.send_cookie_echo() {
2732 debug!(
2733 "[{}] failed to retransmit cookie-echo (n_rtos={}): {:?}",
2734 self.side, n_rtos, err
2735 );
2736 }
2737 }
2738
2739 Timer::T2Shutdown => {
2740 debug!(
2741 "[{}] retransmission of shutdown timeout (n_rtos={})",
2742 self.side, n_rtos
2743 );
2744 let state = self.state();
2745 match state {
2746 AssociationState::ShutdownSent => {
2747 self.will_send_shutdown = true;
2748 self.awake_write_loop();
2749 }
2750 AssociationState::ShutdownAckSent => {
2751 self.will_send_shutdown_ack = true;
2752 self.awake_write_loop();
2753 }
2754 _ => {}
2755 }
2756 }
2757
2758 Timer::T3RTX => {
2759 self.stats.inc_t3timeouts();
2760
2761 self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
2772 self.cwnd = self.mtu;
2773 trace!(
2774 "[{}] updated cwnd={} ssthresh={} inflight={} (RTO)",
2775 self.side,
2776 self.cwnd,
2777 self.ssthresh,
2778 self.inflight_queue.get_num_bytes()
2779 );
2780
2781 if self.use_forward_tsn {
2786 let mut i = self.advanced_peer_tsn_ack_point + 1;
2788 while let Some(c) = self.inflight_queue.get(i) {
2789 if !c.abandoned() {
2790 break;
2791 }
2792 self.advanced_peer_tsn_ack_point = i;
2793 i += 1;
2794 }
2795
2796 if sna32gt(
2798 self.advanced_peer_tsn_ack_point,
2799 self.cumulative_tsn_ack_point,
2800 ) {
2801 self.will_send_forward_tsn = true;
2802 debug!(
2803 "[{}] on_retransmission_timeout {}: sna32GT({}, {})",
2804 self.side,
2805 self.will_send_forward_tsn,
2806 self.advanced_peer_tsn_ack_point,
2807 self.cumulative_tsn_ack_point
2808 );
2809 }
2810 }
2811
2812 debug!(
2813 "[{}] T3-rtx timed out: n_rtos={} cwnd={} ssthresh={}",
2814 self.side, n_rtos, self.cwnd, self.ssthresh
2815 );
2816
2817 self.inflight_queue.mark_all_to_retrasmit();
2818 self.awake_write_loop();
2819 }
2820
2821 Timer::Reconfig => {
2822 self.will_retransmit_reconfig = true;
2823 self.awake_write_loop();
2824 }
2825
2826 _ => {}
2827 }
2828 }
2829
2830 fn on_retransmission_failure(&mut self, id: Timer) {
2831 match id {
2832 Timer::T1Init => {
2833 error!("[{}] retransmission failure: T1-init", self.side);
2834 self.error = Some(AssociationError::HandshakeFailed(
2835 Error::ErrHandshakeInitAck,
2836 ));
2837 }
2838
2839 Timer::T1Cookie => {
2840 error!("[{}] retransmission failure: T1-cookie", self.side);
2841 self.error = Some(AssociationError::HandshakeFailed(
2842 Error::ErrHandshakeCookieEcho,
2843 ));
2844 }
2845
2846 Timer::T2Shutdown => {
2847 error!("[{}] retransmission failure: T2-shutdown", self.side);
2848 }
2849
2850 Timer::T3RTX => {
2851 error!("[{}] retransmission failure: T3-rtx (DATA)", self.side);
2856 }
2857
2858 _ => {}
2859 }
2860 }
2861
2862 #[cfg(test)]
2864 pub(crate) fn is_idle(&self) -> bool {
2865 Timer::VALUES
2866 .iter()
2867 .filter_map(|&t| Some((t, self.timers.get(t)?)))
2869 .min_by_key(|&(_, time)| time)
2870 .is_none()
2872 }
2873}