1use crate::association::{
2 state::{AckMode, AckState, AssociationState},
3 stats::AssociationStats,
4};
5use crate::chunk::{
6 Chunk, ErrorCauseUnrecognizedChunkType, USER_INITIATED_ABORT, chunk_abort::ChunkAbort,
7 chunk_cookie_ack::ChunkCookieAck, chunk_cookie_echo::ChunkCookieEcho, chunk_error::ChunkError,
8 chunk_forward_tsn::ChunkForwardTsn, chunk_forward_tsn::ChunkForwardTsnStream,
9 chunk_heartbeat::ChunkHeartbeat, chunk_heartbeat_ack::ChunkHeartbeatAck, chunk_init::ChunkInit,
10 chunk_init::ChunkInitAck, chunk_payload_data::ChunkPayloadData,
11 chunk_payload_data::PayloadProtocolIdentifier, chunk_reconfig::ChunkReconfig,
12 chunk_selective_ack::ChunkSelectiveAck, chunk_shutdown::ChunkShutdown,
13 chunk_shutdown_ack::ChunkShutdownAck, chunk_shutdown_complete::ChunkShutdownComplete,
14 chunk_type::CT_FORWARD_TSN,
15};
16use crate::config::{COMMON_HEADER_SIZE, DATA_CHUNK_HEADER_SIZE, ServerConfig, TransportConfig};
17use crate::packet::{CommonHeader, Packet};
18use crate::param::{
19 Param,
20 param_heartbeat_info::ParamHeartbeatInfo,
21 param_outgoing_reset_request::ParamOutgoingResetRequest,
22 param_reconfig_response::{ParamReconfigResponse, ReconfigResult},
23 param_state_cookie::ParamStateCookie,
24 param_supported_extensions::ParamSupportedExtensions,
25};
26use crate::queue::{payload_queue::PayloadQueue, pending_queue::PendingQueue};
27use crate::shared::{AssociationEventInner, AssociationId, EndpointEvent, EndpointEventInner};
28use crate::util::{sna16lt, sna32gt, sna32gte, sna32lt, sna32lte};
29use crate::{AssociationEvent, Payload, Side};
30use shared::error::{Error, Result};
31use shared::{TransportContext, TransportMessage, TransportProtocol};
32use stream::{ReliabilityType, Stream, StreamEvent, StreamId, StreamState};
33use timer::{ACK_INTERVAL, RtoManager, Timer, TimerTable};
34
35use crate::association::stream::RecvSendState;
36use bytes::Bytes;
37use log::{debug, error, trace, warn};
38use rand::random;
39use std::collections::{HashMap, VecDeque};
40use std::net::SocketAddr;
41use std::str::FromStr;
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44use thiserror::Error;
45
46pub(crate) mod state;
47pub(crate) mod stats;
48pub(crate) mod stream;
49pub(crate) mod timer;
50
51#[cfg(test)]
52mod association_test;
53
54#[derive(Debug, Error, Clone, PartialEq)]
56pub enum AssociationError {
57 #[error("handshake failed due to {0}")]
59 HandshakeFailed(String),
60 #[error("transport error")]
62 TransportError,
63 #[error("aborted by peer")]
65 AssociationClosed,
66 #[error("closed by peer")]
68 ApplicationClosed,
69 #[error("reset by peer")]
71 Reset,
72 #[error("timed out")]
77 TimedOut,
78 #[error("closed")]
80 LocallyClosed,
81}
82
83#[derive(Debug)]
85pub enum Event {
86 HandshakeFailed {
88 reason: AssociationError,
90 },
91
92 Connected,
94 AssociationLost {
98 reason: AssociationError,
100 id: StreamId,
101 },
102 Stream(StreamEvent),
104 DatagramReceived,
106}
107
108pub struct Association {
126 side: Side,
127 state: AssociationState,
128 handshake_completed: bool,
129 max_message_size: u32,
130 inflight_queue_length: usize,
131 will_send_shutdown: bool,
132 bytes_received: usize,
133 bytes_sent: usize,
134
135 peer_verification_tag: u32,
136 my_verification_tag: u32,
137 my_next_tsn: u32,
138 peer_last_tsn: u32,
139 min_tsn2measure_rtt: u32,
141 will_send_forward_tsn: bool,
142 will_retransmit_fast: bool,
143 will_retransmit_reconfig: bool,
144
145 will_send_shutdown_ack: bool,
146 will_send_shutdown_complete: bool,
147
148 my_next_rsn: u32,
150 reconfigs: HashMap<u32, ChunkReconfig>,
151 reconfig_requests: HashMap<u32, ParamOutgoingResetRequest>,
152
153 remote_addr: SocketAddr,
155 local_addr: SocketAddr,
156 transport_protocol: TransportProtocol,
157
158 source_port: u16,
159 destination_port: u16,
160 my_max_num_inbound_streams: u16,
161 my_max_num_outbound_streams: u16,
162 my_cookie: Option<ParamStateCookie>,
163
164 payload_queue: PayloadQueue,
165 inflight_queue: PayloadQueue,
166 pending_queue: PendingQueue,
167 control_queue: VecDeque<Packet>,
168 stream_queue: VecDeque<u16>,
169
170 pub(crate) mtu: u32,
171 max_payload_size: u32,
173 cumulative_tsn_ack_point: u32,
174 advanced_peer_tsn_ack_point: u32,
175 use_forward_tsn: bool,
176
177 pub(crate) rto_mgr: RtoManager,
178 timers: TimerTable,
179
180 max_receive_buffer_size: u32,
182 pub(crate) cwnd: u32,
184 rwnd: u32,
186 pub(crate) ssthresh: u32,
188 partial_bytes_acked: u32,
189 pub(crate) in_fast_recovery: bool,
190 fast_recover_exit_point: u32,
191
192 stored_init: Option<ChunkInit>,
194 stored_cookie_echo: Option<ChunkCookieEcho>,
195 pub(crate) streams: HashMap<StreamId, StreamState>,
196
197 events: VecDeque<Event>,
198 endpoint_events: VecDeque<EndpointEventInner>,
199 error: Option<AssociationError>,
200
201 delayed_ack_triggered: bool,
203 immediate_ack_triggered: bool,
204
205 pub(crate) stats: AssociationStats,
206 ack_state: AckState,
207
208 pub(crate) ack_mode: AckMode,
210}
211
212impl Default for Association {
213 fn default() -> Self {
214 Association {
215 side: Side::default(),
216 state: AssociationState::default(),
217 handshake_completed: false,
218 max_message_size: 0,
219 inflight_queue_length: 0,
220 will_send_shutdown: false,
221 bytes_received: 0,
222 bytes_sent: 0,
223
224 peer_verification_tag: 0,
225 my_verification_tag: 0,
226 my_next_tsn: 0,
227 peer_last_tsn: 0,
228 min_tsn2measure_rtt: 0,
230 will_send_forward_tsn: false,
231 will_retransmit_fast: false,
232 will_retransmit_reconfig: false,
233
234 will_send_shutdown_ack: false,
235 will_send_shutdown_complete: false,
236
237 my_next_rsn: 0,
239 reconfigs: HashMap::default(),
240 reconfig_requests: HashMap::default(),
241
242 remote_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
244 local_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
245 transport_protocol: TransportProtocol::UDP,
246
247 source_port: 0,
248 destination_port: 0,
249 my_max_num_inbound_streams: 0,
250 my_max_num_outbound_streams: 0,
251 my_cookie: None,
252
253 payload_queue: PayloadQueue::default(),
254 inflight_queue: PayloadQueue::default(),
255 pending_queue: PendingQueue::default(),
256 control_queue: VecDeque::default(),
257 stream_queue: VecDeque::default(),
258
259 mtu: 0,
260 max_payload_size: 0,
262 cumulative_tsn_ack_point: 0,
263 advanced_peer_tsn_ack_point: 0,
264 use_forward_tsn: false,
265
266 rto_mgr: RtoManager::default(),
267 timers: TimerTable::default(),
268
269 max_receive_buffer_size: 0,
271 cwnd: 0,
273 rwnd: 0,
275 ssthresh: 0,
277 partial_bytes_acked: 0,
278 in_fast_recovery: false,
279 fast_recover_exit_point: 0,
280
281 stored_init: None,
283 stored_cookie_echo: None,
284 streams: HashMap::default(),
285
286 events: VecDeque::default(),
287 endpoint_events: VecDeque::default(),
288 error: None,
289
290 delayed_ack_triggered: false,
292 immediate_ack_triggered: false,
293
294 stats: AssociationStats::default(),
295 ack_state: AckState::default(),
296
297 ack_mode: AckMode::default(),
299 }
300 }
301}
302
303impl Association {
304 #[allow(clippy::too_many_arguments)]
305 pub(crate) fn new(
306 server_config: Option<Arc<ServerConfig>>,
307 config: Arc<TransportConfig>,
308 max_payload_size: u32,
309 local_aid: AssociationId,
310 remote_addr: SocketAddr,
311 local_addr: SocketAddr,
312 protocol: TransportProtocol,
313 now: Instant,
314 ) -> Self {
315 let side = if server_config.is_some() {
316 Side::Server
317 } else {
318 Side::Client
319 };
320
321 let mtu = max_payload_size + COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE;
324
325 let cwnd = (2 * mtu).clamp(4380, 4 * mtu);
329 let mut tsn = random::<u32>();
330 if tsn == 0 {
331 tsn += 1;
332 }
333
334 let mut this = Association {
335 side,
336 handshake_completed: false,
337 max_receive_buffer_size: config.max_receive_buffer_size(),
338 max_message_size: config.max_message_size(),
339 my_max_num_outbound_streams: config.max_num_outbound_streams(),
340 my_max_num_inbound_streams: config.max_num_inbound_streams(),
341 max_payload_size,
342
343 rto_mgr: RtoManager::new(),
344 timers: TimerTable::new(config.timer_config()),
345
346 mtu,
347 cwnd,
348 remote_addr,
349 local_addr,
350 transport_protocol: protocol,
351
352 my_verification_tag: local_aid,
353 my_next_tsn: tsn,
354 my_next_rsn: tsn,
355 min_tsn2measure_rtt: tsn,
356 cumulative_tsn_ack_point: tsn - 1,
357 advanced_peer_tsn_ack_point: tsn - 1,
358 error: None,
359
360 ..Default::default()
361 };
362
363 if side.is_client() {
364 let mut init = ChunkInit {
365 initial_tsn: this.my_next_tsn,
366 num_outbound_streams: this.my_max_num_outbound_streams,
367 num_inbound_streams: this.my_max_num_inbound_streams,
368 initiate_tag: this.my_verification_tag,
369 advertised_receiver_window_credit: this.max_receive_buffer_size,
370 ..Default::default()
371 };
372 init.set_supported_extensions();
373
374 this.set_state(AssociationState::CookieWait);
375 this.stored_init = Some(init);
376 let _ = this.send_init();
377 this.timers
378 .start(Timer::T1Init, now, this.rto_mgr.get_rto());
379 }
380
381 this
382 }
383
384 #[must_use]
390 pub fn poll(&mut self) -> Option<Event> {
391 if let Some(x) = self.events.pop_front() {
392 return Some(x);
393 }
394
395 if let Some(err) = self.error.take() {
400 return Some(Event::HandshakeFailed { reason: err });
401 }
402
403 None
404 }
405
406 #[must_use]
408 pub fn poll_endpoint_event(&mut self) -> Option<EndpointEvent> {
409 self.endpoint_events.pop_front().map(EndpointEvent)
410 }
411
412 #[must_use]
420 pub fn poll_timeout(&self) -> Option<Instant> {
421 self.timers.next_timeout()
422 }
423
424 #[must_use]
431 pub fn poll_transmit(&mut self, now: Instant) -> Option<TransportMessage<Payload>> {
432 let (contents, _) = self.gather_outbound(now);
433 if contents.is_empty() {
434 None
435 } else {
436 trace!(
437 "[{}] sending {} bytes (total {} datagrams)",
438 self.side,
439 contents.iter().fold(0, |l, c| l + c.len()),
440 contents.len()
441 );
442 Some(TransportMessage {
443 now,
444 transport: TransportContext {
445 local_addr: self.local_addr,
446 peer_addr: self.remote_addr,
447 ecn: None,
448 transport_protocol: Default::default(),
449 },
450 message: Payload::RawEncode(contents),
451 })
452 }
453 }
454
455 pub fn handle_timeout(&mut self, now: Instant) {
465 for &timer in &Timer::VALUES {
466 let (expired, failure, n_rtos) = self.timers.is_expired(timer, now);
467 if !expired {
468 continue;
469 }
470 self.timers.set(timer, None);
471 if timer == Timer::Ack {
474 self.on_ack_timeout();
475 } else if failure {
476 self.on_retransmission_failure(timer);
477 } else {
478 self.on_retransmission_timeout(timer, n_rtos);
479 self.timers.start(timer, now, self.rto_mgr.get_rto());
480 }
481 }
482 }
483
484 pub fn handle_event(&mut self, event: AssociationEvent) {
490 match event.0 {
491 AssociationEventInner::Datagram(transmit) => {
492 if let Payload::PartialDecode(partial_decode) = transmit.message {
502 debug!(
503 "[{}] recving {} bytes",
504 self.side,
505 COMMON_HEADER_SIZE as usize + partial_decode.remaining.len()
506 );
507
508 let pkt = match partial_decode.finish() {
509 Ok(p) => p,
510 Err(err) => {
511 warn!("[{}] unable to parse SCTP packet {}", self.side, err);
512 return;
513 }
514 };
515
516 if let Err(err) = self.handle_inbound(pkt, transmit.now) {
517 error!("handle_inbound got err: {}", err);
518 let _ = self.close(AssociationError::TransportError);
519 }
520 } else {
521 trace!("discarding invalid partial_decode");
522 }
523 } }
525 }
526
527 pub fn stats(&self) -> AssociationStats {
529 self.stats
530 }
531
532 pub fn is_handshaking(&self) -> bool {
537 !self.handshake_completed
538 }
539
540 pub fn is_closed(&self) -> bool {
548 self.state == AssociationState::Closed
549 }
550
551 pub fn is_drained(&self) -> bool {
556 self.state.is_drained()
557 }
558
559 pub fn side(&self) -> Side {
561 self.side
562 }
563
564 pub fn remote_addr(&self) -> SocketAddr {
566 self.remote_addr
567 }
568
569 pub fn rtt(&self) -> Duration {
571 Duration::from_millis(self.rto_mgr.get_rto())
572 }
573
574 pub fn local_addr(&self) -> SocketAddr {
589 self.local_addr
590 }
591
592 pub fn shutdown(&mut self) -> Result<()> {
596 debug!("[{}] closing association..", self.side);
597
598 let state = self.state();
599 if state != AssociationState::Established {
600 return Err(Error::ErrShutdownNonEstablished);
601 }
602
603 self.set_state(AssociationState::ShutdownPending);
605
606 if self.inflight_queue_length == 0 {
607 self.will_send_shutdown = true;
609 self.awake_write_loop();
610 self.set_state(AssociationState::ShutdownSent);
611 }
612
613 self.endpoint_events.push_back(EndpointEventInner::Drained);
614
615 Ok(())
616 }
617
618 pub fn close(&mut self, reason: AssociationError) -> Result<()> {
620 if self.state() != AssociationState::Closed {
621 self.set_state(AssociationState::Closed);
622
623 debug!("[{}] closing association..", self.side);
624
625 self.close_all_timers();
626
627 for si in self.streams.keys().cloned().collect::<Vec<u16>>() {
628 self.unregister_stream(si, reason.clone());
629 }
630
631 debug!("[{}] association closed", self.side);
632 debug!(
633 "[{}] stats nDATAs (in) : {}",
634 self.side,
635 self.stats.get_num_datas()
636 );
637 debug!(
638 "[{}] stats nSACKs (in) : {}",
639 self.side,
640 self.stats.get_num_sacks()
641 );
642 debug!(
643 "[{}] stats nT3Timeouts : {}",
644 self.side,
645 self.stats.get_num_t3timeouts()
646 );
647 debug!(
648 "[{}] stats nAckTimeouts: {}",
649 self.side,
650 self.stats.get_num_ack_timeouts()
651 );
652 debug!(
653 "[{}] stats nFastRetrans: {}",
654 self.side,
655 self.stats.get_num_fast_retrans()
656 );
657 }
658
659 Ok(())
660 }
661
662 pub fn open_stream(
664 &mut self,
665 stream_identifier: StreamId,
666 default_payload_type: PayloadProtocolIdentifier,
667 ) -> Result<Stream<'_>> {
668 if self.streams.contains_key(&stream_identifier) {
669 return Err(Error::ErrStreamAlreadyExist);
670 }
671
672 if let Some(s) = self.create_stream(stream_identifier, false, default_payload_type) {
673 Ok(s)
674 } else {
675 Err(Error::ErrStreamCreateFailed)
676 }
677 }
678
679 pub fn accept_stream(&mut self) -> Option<Stream<'_>> {
681 self.stream_queue
682 .pop_front()
683 .map(move |stream_identifier| Stream {
684 stream_identifier,
685 association: self,
686 })
687 }
688
689 pub fn stream(&mut self, stream_identifier: StreamId) -> Result<Stream<'_>> {
691 if !self.streams.contains_key(&stream_identifier) {
692 Err(Error::ErrStreamNotExisted)
693 } else {
694 Ok(Stream {
695 stream_identifier,
696 association: self,
697 })
698 }
699 }
700
701 pub fn stream_ids(&self) -> Vec<StreamId> {
702 self.streams.keys().cloned().collect()
703 }
704
705 pub(crate) fn bytes_sent(&self) -> usize {
707 self.bytes_sent
708 }
709
710 pub(crate) fn bytes_received(&self) -> usize {
712 self.bytes_received
713 }
714
715 pub(crate) fn max_message_size(&self) -> u32 {
717 self.max_message_size
718 }
719
720 pub(crate) fn set_max_message_size(&mut self, max_message_size: u32) {
722 self.max_message_size = max_message_size;
723 }
724
725 fn unregister_stream(&mut self, stream_identifier: StreamId, reason: AssociationError) {
728 if let Some(mut s) = self.streams.remove(&stream_identifier) {
729 debug!("[{}] unregister_stream {}", self.side, stream_identifier);
730 self.events.push_back(Event::AssociationLost {
731 reason,
732 id: stream_identifier,
733 });
734 s.state = RecvSendState::Closed;
735 }
736 }
737
738 fn set_state(&mut self, new_state: AssociationState) {
740 if new_state != self.state {
741 debug!(
742 "[{}] state change: '{}' => '{}'",
743 self.side, self.state, new_state,
744 );
745 }
746 self.state = new_state;
747 }
748
749 pub(crate) fn state(&self) -> AssociationState {
751 self.state
752 }
753
754 fn send_init(&mut self) -> Result<()> {
756 if let Some(stored_init) = &self.stored_init {
757 debug!("[{}] sending INIT", self.side);
758
759 self.source_port = 5000; self.destination_port = 5000; let outbound = Packet {
763 common_header: CommonHeader {
764 source_port: self.source_port,
765 destination_port: self.destination_port,
766 verification_tag: self.peer_verification_tag,
767 },
768 chunks: vec![Box::new(stored_init.clone())],
769 };
770
771 self.control_queue.push_back(outbound);
772 self.awake_write_loop();
773
774 Ok(())
775 } else {
776 Err(Error::ErrInitNotStoredToSend)
777 }
778 }
779
780 fn send_cookie_echo(&mut self) -> Result<()> {
782 if let Some(stored_cookie_echo) = &self.stored_cookie_echo {
783 debug!("[{}] sending COOKIE-ECHO", self.side);
784
785 let outbound = Packet {
786 common_header: CommonHeader {
787 source_port: self.source_port,
788 destination_port: self.destination_port,
789 verification_tag: self.peer_verification_tag,
790 },
791 chunks: vec![Box::new(stored_cookie_echo.clone())],
792 };
793
794 self.control_queue.push_back(outbound);
795 self.awake_write_loop();
796
797 Ok(())
798 } else {
799 Err(Error::ErrCookieEchoNotStoredToSend)
800 }
801 }
802
803 fn handle_inbound(&mut self, p: Packet, now: Instant) -> Result<()> {
805 if let Err(err) = p.check_packet() {
806 warn!("[{}] failed validating packet {}", self.side, err);
807 return Ok(());
808 }
809
810 self.handle_chunk_start();
811
812 for c in &p.chunks {
813 self.handle_chunk(&p, c, now)?;
814 }
815
816 self.handle_chunk_end(now);
817
818 Ok(())
819 }
820
821 fn handle_chunk_start(&mut self) {
822 self.delayed_ack_triggered = false;
823 self.immediate_ack_triggered = false;
824 }
825
826 fn handle_chunk_end(&mut self, now: Instant) {
827 if self.immediate_ack_triggered {
828 self.ack_state = AckState::Immediate;
829 self.timers.stop(Timer::Ack);
830 self.awake_write_loop();
831 } else if self.delayed_ack_triggered {
832 self.ack_state = AckState::Delay;
834 self.timers.start(Timer::Ack, now, ACK_INTERVAL);
835 }
836 }
837
838 #[allow(clippy::borrowed_box)]
839 fn handle_chunk(&mut self, p: &Packet, chunk: &Box<dyn Chunk>, now: Instant) -> Result<()> {
840 chunk.check()?;
841 let chunk_any = chunk.as_any();
842 let packets = if let Some(c) = chunk_any.downcast_ref::<ChunkInit>() {
843 if c.is_ack {
844 self.handle_init_ack(p, c, now)?
845 } else {
846 self.handle_init(p, c)?
847 }
848 } else if let Some(c) = chunk_any.downcast_ref::<ChunkAbort>() {
849 let mut err_str = String::new();
850 for e in &c.error_causes {
851 if matches!(e.code, USER_INITIATED_ABORT) {
852 debug!("User initiated abort received");
853 let _ = self.close(AssociationError::Reset);
854 return Ok(());
855 }
856 err_str += &format!("({})", e);
857 }
858 return Err(Error::ErrAbortChunk(err_str));
859 } else if let Some(c) = chunk_any.downcast_ref::<ChunkError>() {
860 let mut err_str = String::new();
861 for e in &c.error_causes {
862 err_str += &format!("({})", e);
863 }
864 return Err(Error::ErrAbortChunk(err_str));
865 } else if let Some(c) = chunk_any.downcast_ref::<ChunkHeartbeat>() {
866 self.handle_heartbeat(c)?
867 } else if let Some(c) = chunk_any.downcast_ref::<ChunkCookieEcho>() {
868 self.handle_cookie_echo(c)?
869 } else if chunk_any.downcast_ref::<ChunkCookieAck>().is_some() {
870 self.handle_cookie_ack()?
871 } else if let Some(c) = chunk_any.downcast_ref::<ChunkPayloadData>() {
872 self.handle_data(c)?
873 } else if let Some(c) = chunk_any.downcast_ref::<ChunkSelectiveAck>() {
874 self.handle_sack(c, now)?
875 } else if let Some(c) = chunk_any.downcast_ref::<ChunkReconfig>() {
876 self.handle_reconfig(c)?
877 } else if let Some(c) = chunk_any.downcast_ref::<ChunkForwardTsn>() {
878 self.handle_forward_tsn(c)?
879 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdown>() {
880 self.handle_shutdown(c)?
881 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownAck>() {
882 self.handle_shutdown_ack(c)?
883 } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownComplete>() {
884 self.handle_shutdown_complete(c)?
885 } else {
886 return Err(Error::ErrChunkTypeUnhandled);
887 };
888
889 if !packets.is_empty() {
890 let mut buf: VecDeque<_> = packets.into_iter().collect();
891 self.control_queue.append(&mut buf);
892 self.awake_write_loop();
893 }
894
895 Ok(())
896 }
897
898 fn handle_init(&mut self, p: &Packet, i: &ChunkInit) -> Result<Vec<Packet>> {
899 let state = self.state();
900 debug!("[{}] chunkInit received in state '{}'", self.side, state);
901
902 if state != AssociationState::Closed
910 && state != AssociationState::CookieWait
911 && state != AssociationState::CookieEchoed
912 {
913 return Err(Error::ErrHandleInitState);
916 }
917
918 self.my_max_num_inbound_streams =
920 std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
921 self.my_max_num_outbound_streams =
922 std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
923 self.peer_verification_tag = i.initiate_tag;
924 self.source_port = p.common_header.destination_port;
925 self.destination_port = p.common_header.source_port;
926
927 self.peer_last_tsn = if i.initial_tsn == 0 {
932 u32::MAX
933 } else {
934 i.initial_tsn - 1
935 };
936
937 for param in &i.params {
938 if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
939 for t in &v.chunk_types {
940 if *t == CT_FORWARD_TSN {
941 debug!("[{}] use ForwardTSN (on init)", self.side);
942 self.use_forward_tsn = true;
943 }
944 }
945 }
946 }
947 if !self.use_forward_tsn {
948 warn!("[{}] not using ForwardTSN (on init)", self.side);
949 }
950
951 let mut outbound = Packet {
952 common_header: CommonHeader {
953 verification_tag: self.peer_verification_tag,
954 source_port: self.source_port,
955 destination_port: self.destination_port,
956 },
957 chunks: vec![],
958 };
959
960 let mut init_ack = ChunkInit {
961 is_ack: true,
962 initial_tsn: self.my_next_tsn,
963 num_outbound_streams: self.my_max_num_outbound_streams,
964 num_inbound_streams: self.my_max_num_inbound_streams,
965 initiate_tag: self.my_verification_tag,
966 advertised_receiver_window_credit: self.max_receive_buffer_size,
967 ..Default::default()
968 };
969
970 if self.my_cookie.is_none() {
971 self.my_cookie = Some(ParamStateCookie::new());
972 }
973
974 if let Some(my_cookie) = &self.my_cookie {
975 init_ack.params = vec![Box::new(my_cookie.clone())];
976 }
977
978 init_ack.set_supported_extensions();
979
980 outbound.chunks = vec![Box::new(init_ack)];
981
982 Ok(vec![outbound])
983 }
984
985 fn handle_init_ack(
986 &mut self,
987 p: &Packet,
988 i: &ChunkInitAck,
989 now: Instant,
990 ) -> Result<Vec<Packet>> {
991 let state = self.state();
992 debug!("[{}] chunkInitAck received in state '{}'", self.side, state);
993 if state != AssociationState::CookieWait {
994 return Ok(vec![]);
1001 }
1002
1003 self.my_max_num_inbound_streams =
1004 std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
1005 self.my_max_num_outbound_streams =
1006 std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
1007 self.peer_verification_tag = i.initiate_tag;
1008 self.peer_last_tsn = if i.initial_tsn == 0 {
1009 u32::MAX
1010 } else {
1011 i.initial_tsn - 1
1012 };
1013 if self.source_port != p.common_header.destination_port
1014 || self.destination_port != p.common_header.source_port
1015 {
1016 warn!("[{}] handle_init_ack: port mismatch", self.side);
1017 return Ok(vec![]);
1018 }
1019
1020 self.rwnd = i.advertised_receiver_window_credit;
1021 debug!("[{}] initial rwnd={}", self.side, self.rwnd);
1022
1023 self.ssthresh = self.rwnd;
1028 trace!(
1029 "[{}] updated cwnd={} ssthresh={} inflight={} (INI)",
1030 self.side,
1031 self.cwnd,
1032 self.ssthresh,
1033 self.inflight_queue.get_num_bytes()
1034 );
1035
1036 self.timers.stop(Timer::T1Init);
1037 self.stored_init = None;
1038
1039 let mut cookie_param = None;
1040 for param in &i.params {
1041 if let Some(v) = param.as_any().downcast_ref::<ParamStateCookie>() {
1042 cookie_param = Some(v);
1043 } else if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
1044 for t in &v.chunk_types {
1045 if *t == CT_FORWARD_TSN {
1046 debug!("[{}] use ForwardTSN (on initAck)", self.side);
1047 self.use_forward_tsn = true;
1048 }
1049 }
1050 }
1051 }
1052 if !self.use_forward_tsn {
1053 warn!("[{}] not using ForwardTSN (on initAck)", self.side);
1054 }
1055
1056 if let Some(v) = cookie_param {
1057 self.stored_cookie_echo = Some(ChunkCookieEcho {
1058 cookie: v.cookie.clone(),
1059 });
1060
1061 self.send_cookie_echo()?;
1062
1063 self.timers
1064 .start(Timer::T1Cookie, now, self.rto_mgr.get_rto());
1065
1066 self.set_state(AssociationState::CookieEchoed);
1067
1068 Ok(vec![])
1069 } else {
1070 Err(Error::ErrInitAckNoCookie)
1071 }
1072 }
1073
1074 fn handle_heartbeat(&self, c: &ChunkHeartbeat) -> Result<Vec<Packet>> {
1075 trace!("[{}] chunkHeartbeat", self.side);
1076 if let Some(p) = c.params.first() {
1077 if let Some(hbi) = p.as_any().downcast_ref::<ParamHeartbeatInfo>() {
1078 return Ok(vec![Packet {
1079 common_header: CommonHeader {
1080 verification_tag: self.peer_verification_tag,
1081 source_port: self.source_port,
1082 destination_port: self.destination_port,
1083 },
1084 chunks: vec![Box::new(ChunkHeartbeatAck {
1085 params: vec![Box::new(ParamHeartbeatInfo {
1086 heartbeat_information: hbi.heartbeat_information.clone(),
1087 })],
1088 })],
1089 }]);
1090 } else {
1091 warn!(
1092 "[{}] failed to handle Heartbeat, no ParamHeartbeatInfo",
1093 self.side,
1094 );
1095 }
1096 }
1097
1098 Ok(vec![])
1099 }
1100
1101 fn handle_cookie_echo(&mut self, c: &ChunkCookieEcho) -> Result<Vec<Packet>> {
1102 let state = self.state();
1103 debug!("[{}] COOKIE-ECHO received in state '{}'", self.side, state);
1104
1105 if let Some(my_cookie) = &self.my_cookie {
1106 match state {
1107 AssociationState::Established => {
1108 if my_cookie.cookie != c.cookie {
1109 return Ok(vec![]);
1110 }
1111 }
1112 AssociationState::Closed
1113 | AssociationState::CookieWait
1114 | AssociationState::CookieEchoed => {
1115 if my_cookie.cookie != c.cookie {
1116 return Ok(vec![]);
1117 }
1118
1119 self.timers.stop(Timer::T1Init);
1120 self.stored_init = None;
1121
1122 self.timers.stop(Timer::T1Cookie);
1123 self.stored_cookie_echo = None;
1124
1125 self.events.push_back(Event::Connected);
1126 self.set_state(AssociationState::Established);
1127 self.handshake_completed = true;
1128 }
1129 _ => return Ok(vec![]),
1130 };
1131 } else {
1132 debug!("[{}] COOKIE-ECHO received before initialization", self.side);
1133 return Ok(vec![]);
1134 }
1135
1136 Ok(vec![Packet {
1137 common_header: CommonHeader {
1138 verification_tag: self.peer_verification_tag,
1139 source_port: self.source_port,
1140 destination_port: self.destination_port,
1141 },
1142 chunks: vec![Box::new(ChunkCookieAck {})],
1143 }])
1144 }
1145
1146 fn handle_cookie_ack(&mut self) -> Result<Vec<Packet>> {
1147 let state = self.state();
1148 debug!("[{}] COOKIE-ACK received in state '{}'", self.side, state);
1149 if state != AssociationState::CookieEchoed {
1150 return Ok(vec![]);
1155 }
1156
1157 self.timers.stop(Timer::T1Cookie);
1158 self.stored_cookie_echo = None;
1159
1160 self.events.push_back(Event::Connected);
1161 self.set_state(AssociationState::Established);
1162 self.handshake_completed = true;
1163
1164 Ok(vec![])
1165 }
1166
1167 fn handle_data(&mut self, d: &ChunkPayloadData) -> Result<Vec<Packet>> {
1168 debug!(
1169 "[{}] DATA: tsn={} peer_last_tsn={} immediateSack={} len={}, unordered={}",
1170 self.side,
1171 d.tsn,
1172 self.peer_last_tsn,
1173 d.immediate_sack,
1174 d.user_data.len(),
1175 d.unordered,
1176 );
1177 self.stats.inc_datas();
1178
1179 let can_push = self.payload_queue.can_push(d, self.peer_last_tsn);
1180 let mut stream_handle_data = false;
1181 if can_push {
1182 if self.get_or_create_stream(d.stream_identifier).is_some() {
1183 if self.get_my_receiver_window_credit() > 0 {
1184 self.payload_queue.push(d.clone(), self.peer_last_tsn);
1186 stream_handle_data = true;
1187 } else {
1188 if let Some(last_tsn) = self.payload_queue.get_last_tsn_received() {
1190 if sna32lt(d.tsn, *last_tsn) {
1191 debug!(
1192 "[{}] receive buffer full, but accepted as this is a missing chunk with tsn={} ssn={}",
1193 self.side, d.tsn, d.stream_sequence_number
1194 );
1195 self.payload_queue.push(d.clone(), self.peer_last_tsn);
1196 stream_handle_data = true; }
1198 } else {
1199 debug!(
1200 "[{}] receive buffer full. dropping DATA with tsn={} ssn={}",
1201 self.side, d.tsn, d.stream_sequence_number
1202 );
1203 }
1204 }
1205 } else {
1206 debug!("[{}] discard {}", self.side, d.stream_sequence_number);
1208 return Ok(vec![]);
1209 }
1210 }
1211
1212 let immediate_sack = d.immediate_sack;
1213
1214 if stream_handle_data && let Some(s) = self.streams.get_mut(&d.stream_identifier) {
1215 self.events.push_back(Event::DatagramReceived);
1216 if s.handle_data(d) && s.reassembly_queue.is_readable() {
1217 self.events.push_back(Event::Stream(StreamEvent::Readable {
1218 id: s.stream_identifier,
1219 }));
1220 }
1221 }
1222
1223 self.handle_peer_last_tsn_and_acknowledgement(immediate_sack)
1224 }
1225
1226 fn handle_sack(&mut self, d: &ChunkSelectiveAck, now: Instant) -> Result<Vec<Packet>> {
1227 trace!(
1228 "[{}] {}, SACK: cumTSN={} a_rwnd={}",
1229 self.side,
1230 self.cumulative_tsn_ack_point,
1231 d.cumulative_tsn_ack,
1232 d.advertised_receiver_window_credit
1233 );
1234 let state = self.state();
1235 if state != AssociationState::Established
1236 && state != AssociationState::ShutdownPending
1237 && state != AssociationState::ShutdownReceived
1238 {
1239 return Ok(vec![]);
1240 }
1241
1242 self.stats.inc_sacks();
1243
1244 if sna32gt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1245 debug!(
1254 "[{}] SACK Cumulative ACK {} is older than ACK point {}",
1255 self.side, d.cumulative_tsn_ack, self.cumulative_tsn_ack_point
1256 );
1257
1258 return Ok(vec![]);
1259 }
1260
1261 let (bytes_acked_per_stream, htna) = self.process_selective_ack(d, now)?;
1263
1264 let mut total_bytes_acked = 0;
1265 for n_bytes_acked in bytes_acked_per_stream.values() {
1266 total_bytes_acked += *n_bytes_acked;
1267 }
1268
1269 let mut cum_tsn_ack_point_advanced = false;
1270 if sna32lt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1271 trace!(
1272 "[{}] SACK: cumTSN advanced: {} -> {}",
1273 self.side, self.cumulative_tsn_ack_point, d.cumulative_tsn_ack
1274 );
1275
1276 self.cumulative_tsn_ack_point = d.cumulative_tsn_ack;
1277 cum_tsn_ack_point_advanced = true;
1278 self.on_cumulative_tsn_ack_point_advanced(total_bytes_acked, now);
1279 }
1280
1281 for (si, n_bytes_acked) in &bytes_acked_per_stream {
1282 if let Some(s) = self.streams.get_mut(si)
1283 && s.on_buffer_released(*n_bytes_acked)
1284 {
1285 trace!("StreamEvent::BufferedAmountLow");
1286 self.events
1287 .push_back(Event::Stream(StreamEvent::BufferedAmountLow { id: *si }))
1288 }
1289 }
1290
1291 let bytes_outstanding = self.inflight_queue.get_num_bytes() as u32;
1300 if bytes_outstanding >= d.advertised_receiver_window_credit {
1301 self.rwnd = 0;
1302 } else {
1303 self.rwnd = d.advertised_receiver_window_credit - bytes_outstanding;
1304 }
1305
1306 self.process_fast_retransmission(d.cumulative_tsn_ack, htna, cum_tsn_ack_point_advanced)?;
1307
1308 if self.use_forward_tsn {
1309 if sna32lt(
1311 self.advanced_peer_tsn_ack_point,
1312 self.cumulative_tsn_ack_point,
1313 ) {
1314 self.advanced_peer_tsn_ack_point = self.cumulative_tsn_ack_point
1315 }
1316
1317 let mut i = self.advanced_peer_tsn_ack_point + 1;
1319 while let Some(c) = self.inflight_queue.get(i) {
1320 if !c.abandoned() {
1321 break;
1322 }
1323 self.advanced_peer_tsn_ack_point = i;
1324 i += 1;
1325 }
1326
1327 if sna32gt(
1329 self.advanced_peer_tsn_ack_point,
1330 self.cumulative_tsn_ack_point,
1331 ) {
1332 self.will_send_forward_tsn = true;
1333 debug!(
1334 "[{}] handleSack {}: sna32GT({}, {})",
1335 self.side,
1336 self.will_send_forward_tsn,
1337 self.advanced_peer_tsn_ack_point,
1338 self.cumulative_tsn_ack_point
1339 );
1340 }
1341 self.awake_write_loop();
1342 }
1343
1344 self.postprocess_sack(state, cum_tsn_ack_point_advanced, now);
1345
1346 Ok(vec![])
1347 }
1348
1349 fn handle_reconfig(&mut self, c: &ChunkReconfig) -> Result<Vec<Packet>> {
1350 trace!("[{}] handle_reconfig", self.side);
1351
1352 let mut pp = vec![];
1353
1354 if let Some(param_a) = &c.param_a {
1355 self.handle_reconfig_param(param_a, &mut pp)?;
1356 }
1357
1358 if let Some(param_b) = &c.param_b {
1359 self.handle_reconfig_param(param_b, &mut pp)?;
1360 }
1361
1362 Ok(pp)
1363 }
1364
1365 fn handle_forward_tsn(&mut self, c: &ChunkForwardTsn) -> Result<Vec<Packet>> {
1366 trace!("[{}] FwdTSN: {}", self.side, c);
1367
1368 if !self.use_forward_tsn {
1369 warn!("[{}] received FwdTSN but not enabled", self.side);
1370 let cerr = ChunkError {
1372 error_causes: vec![ErrorCauseUnrecognizedChunkType::default()],
1373 };
1374
1375 let outbound = Packet {
1376 common_header: CommonHeader {
1377 verification_tag: self.peer_verification_tag,
1378 source_port: self.source_port,
1379 destination_port: self.destination_port,
1380 },
1381 chunks: vec![Box::new(cerr)],
1382 };
1383 return Ok(vec![outbound]);
1384 }
1385
1386 trace!(
1395 "[{}] should send ack? newCumTSN={} peer_last_tsn={}",
1396 self.side, c.new_cumulative_tsn, self.peer_last_tsn
1397 );
1398 if sna32lte(c.new_cumulative_tsn, self.peer_last_tsn) {
1399 trace!("[{}] sending ack on Forward TSN", self.side);
1400 self.ack_state = AckState::Immediate;
1401 self.timers.stop(Timer::Ack);
1402 self.awake_write_loop();
1403 return Ok(vec![]);
1404 }
1405
1406 while sna32lt(self.peer_last_tsn, c.new_cumulative_tsn) {
1418 self.payload_queue.pop(self.peer_last_tsn + 1); self.peer_last_tsn += 1;
1420 }
1421
1422 for forwarded in &c.streams {
1426 if let Some(s) = self.streams.get_mut(&forwarded.identifier) {
1427 s.handle_forward_tsn_for_ordered(forwarded.sequence);
1428 if s.reassembly_queue.is_readable() {
1429 self.events.push_back(Event::Stream(StreamEvent::Readable {
1430 id: s.stream_identifier,
1431 }));
1432 }
1433 }
1434 }
1435
1436 for s in self.streams.values_mut() {
1441 s.handle_forward_tsn_for_unordered(c.new_cumulative_tsn);
1442 if s.reassembly_queue.is_readable() {
1443 self.events.push_back(Event::Stream(StreamEvent::Readable {
1444 id: s.stream_identifier,
1445 }));
1446 }
1447 }
1448
1449 self.handle_peer_last_tsn_and_acknowledgement(false)
1450 }
1451
1452 fn handle_shutdown(&mut self, _: &ChunkShutdown) -> Result<Vec<Packet>> {
1453 let state = self.state();
1454
1455 if state == AssociationState::Established {
1456 if !self.inflight_queue.is_empty() {
1457 self.set_state(AssociationState::ShutdownReceived);
1458 } else {
1459 self.will_send_shutdown_ack = true;
1461 self.set_state(AssociationState::ShutdownAckSent);
1462
1463 self.awake_write_loop();
1464 }
1465 } else if state == AssociationState::ShutdownSent {
1466 self.will_send_shutdown_ack = true;
1469 self.set_state(AssociationState::ShutdownAckSent);
1470
1471 self.awake_write_loop();
1472 }
1473
1474 Ok(vec![])
1475 }
1476
1477 fn handle_shutdown_ack(&mut self, _: &ChunkShutdownAck) -> Result<Vec<Packet>> {
1478 let state = self.state();
1479 if state == AssociationState::ShutdownSent || state == AssociationState::ShutdownAckSent {
1480 self.timers.stop(Timer::T2Shutdown);
1481 self.will_send_shutdown_complete = true;
1482
1483 self.awake_write_loop();
1484 }
1485
1486 Ok(vec![])
1487 }
1488
1489 fn handle_shutdown_complete(&mut self, _: &ChunkShutdownComplete) -> Result<Vec<Packet>> {
1490 let state = self.state();
1491 if state == AssociationState::ShutdownAckSent {
1492 self.timers.stop(Timer::T2Shutdown);
1493 self.close(AssociationError::AssociationClosed)?;
1494 }
1495
1496 Ok(vec![])
1497 }
1498
1499 fn handle_peer_last_tsn_and_acknowledgement(
1501 &mut self,
1502 sack_immediately: bool,
1503 ) -> Result<Vec<Packet>> {
1504 let mut reply = vec![];
1505
1506 while self.payload_queue.pop(self.peer_last_tsn + 1).is_some() {
1515 self.peer_last_tsn += 1;
1516 let rst_reqs: Vec<ParamOutgoingResetRequest> =
1519 self.reconfig_requests.values().cloned().collect();
1520 for rst_req in rst_reqs {
1521 self.reset_streams_if_any(&rst_req, false, &mut reply)?;
1522 }
1523 }
1524
1525 let has_packet_loss = !self.payload_queue.is_empty();
1526 if has_packet_loss {
1527 trace!(
1528 "[{}] packetloss: {}",
1529 self.side,
1530 self.payload_queue
1531 .get_gap_ack_blocks_string(self.peer_last_tsn)
1532 );
1533 }
1534
1535 if (self.ack_state != AckState::Immediate
1536 && !sack_immediately
1537 && !has_packet_loss
1538 && self.ack_mode == AckMode::Normal)
1539 || self.ack_mode == AckMode::AlwaysDelay
1540 {
1541 if self.ack_state == AckState::Idle {
1542 self.delayed_ack_triggered = true;
1543 } else {
1544 self.immediate_ack_triggered = true;
1545 }
1546 } else {
1547 self.immediate_ack_triggered = true;
1548 }
1549
1550 Ok(reply)
1551 }
1552
1553 #[allow(clippy::borrowed_box)]
1554 fn handle_reconfig_param(
1555 &mut self,
1556 raw: &Box<dyn Param>,
1557 reply: &mut Vec<Packet>,
1558 ) -> Result<()> {
1559 if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
1560 self.reconfig_requests
1561 .insert(p.reconfig_request_sequence_number, p.clone());
1562 self.reset_streams_if_any(p, true, reply)?;
1563 Ok(())
1564 } else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
1565 self.reconfigs.remove(&p.reconfig_response_sequence_number);
1566 if self.reconfigs.is_empty() {
1567 self.timers.stop(Timer::Reconfig);
1568 }
1569 Ok(())
1570 } else {
1571 Err(Error::ErrParameterType)
1572 }
1573 }
1574
1575 fn process_selective_ack(
1576 &mut self,
1577 d: &ChunkSelectiveAck,
1578 now: Instant,
1579 ) -> Result<(HashMap<u16, i64>, u32)> {
1580 let mut bytes_acked_per_stream = HashMap::new();
1581
1582 let mut i = self.cumulative_tsn_ack_point + 1;
1586 while sna32lte(i, d.cumulative_tsn_ack) {
1588 if let Some(c) = self.inflight_queue.pop(i) {
1589 if !c.acked {
1590 if i == self.cumulative_tsn_ack_point + 1 {
1596 self.timers.stop(Timer::T3RTX);
1598 }
1599
1600 let n_bytes_acked = c.user_data.len() as i64;
1601
1602 if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1604 *amount += n_bytes_acked;
1605 } else {
1606 bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1607 }
1608
1609 if c.nsent == 1 && sna32gte(c.tsn, self.min_tsn2measure_rtt) {
1619 self.min_tsn2measure_rtt = self.my_next_tsn;
1620 if let Some(since) = &c.since {
1621 let rtt = now.duration_since(*since);
1622 let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1623 trace!(
1624 "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1625 self.side,
1626 rtt.as_millis(),
1627 srtt,
1628 self.rto_mgr.get_rto()
1629 );
1630 } else {
1631 error!("[{}] invalid c.since", self.side);
1632 }
1633 }
1634 }
1635
1636 if self.in_fast_recovery && c.tsn == self.fast_recover_exit_point {
1637 debug!("[{}] exit fast-recovery", self.side);
1638 self.in_fast_recovery = false;
1639 }
1640 } else {
1641 return Err(Error::ErrInflightQueueTsnPop);
1642 }
1643
1644 i += 1;
1645 }
1646
1647 let mut htna = d.cumulative_tsn_ack;
1648
1649 for g in &d.gap_ack_blocks {
1651 for i in g.start..=g.end {
1652 let tsn = d.cumulative_tsn_ack + i as u32;
1653
1654 let (is_existed, is_acked) = if let Some(c) = self.inflight_queue.get(tsn) {
1655 (true, c.acked)
1656 } else {
1657 (false, false)
1658 };
1659 let n_bytes_acked = if is_existed && !is_acked {
1660 self.inflight_queue.mark_as_acked(tsn) as i64
1661 } else {
1662 0
1663 };
1664
1665 if let Some(c) = self.inflight_queue.get(tsn) {
1666 if !is_acked {
1667 if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1669 *amount += n_bytes_acked;
1670 } else {
1671 bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1672 }
1673
1674 trace!("[{}] tsn={} has been sacked", self.side, c.tsn);
1675
1676 if c.nsent == 1 {
1677 self.min_tsn2measure_rtt = self.my_next_tsn;
1678 if let Some(since) = &c.since {
1679 let rtt = now.duration_since(*since);
1680 let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1681 trace!(
1682 "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1683 self.side,
1684 rtt.as_millis(),
1685 srtt,
1686 self.rto_mgr.get_rto()
1687 );
1688 } else {
1689 error!("[{}] invalid c.since", self.side);
1690 }
1691 }
1692
1693 if sna32lt(htna, tsn) {
1694 htna = tsn;
1695 }
1696 }
1697 } else {
1698 return Err(Error::ErrTsnRequestNotExist);
1699 }
1700 }
1701 }
1702
1703 Ok((bytes_acked_per_stream, htna))
1704 }
1705
1706 fn on_cumulative_tsn_ack_point_advanced(&mut self, total_bytes_acked: i64, now: Instant) {
1707 if self.inflight_queue.is_empty() {
1711 trace!(
1712 "[{}] SACK: no more packet in-flight (pending={})",
1713 self.side,
1714 self.pending_queue.len()
1715 );
1716 self.timers.stop(Timer::T3RTX);
1717 } else {
1718 trace!("[{}] T3-rtx timer start (pt2)", self.side);
1719 self.timers
1720 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1721 }
1722
1723 if self.cwnd <= self.ssthresh {
1725 if !self.in_fast_recovery && !self.pending_queue.is_empty() {
1737 self.cwnd += std::cmp::min(total_bytes_acked as u32, self.cwnd); trace!(
1740 "[{}] updated cwnd={} ssthresh={} acked={} (SS)",
1741 self.side, self.cwnd, self.ssthresh, total_bytes_acked
1742 );
1743 } else {
1744 trace!(
1745 "[{}] cwnd did not grow: cwnd={} ssthresh={} acked={} FR={} pending={}",
1746 self.side,
1747 self.cwnd,
1748 self.ssthresh,
1749 total_bytes_acked,
1750 self.in_fast_recovery,
1751 self.pending_queue.len()
1752 );
1753 }
1754 } else {
1755 self.partial_bytes_acked += total_bytes_acked as u32;
1762
1763 if self.partial_bytes_acked >= self.cwnd && !self.pending_queue.is_empty() {
1769 self.partial_bytes_acked -= self.cwnd;
1770 self.cwnd += self.mtu;
1771 trace!(
1772 "[{}] updated cwnd={} ssthresh={} acked={} (CA)",
1773 self.side, self.cwnd, self.ssthresh, total_bytes_acked
1774 );
1775 }
1776 }
1777 }
1778
1779 fn process_fast_retransmission(
1780 &mut self,
1781 cum_tsn_ack_point: u32,
1782 htna: u32,
1783 cum_tsn_ack_point_advanced: bool,
1784 ) -> Result<()> {
1785 if !self.in_fast_recovery || cum_tsn_ack_point_advanced {
1795 let max_tsn = if !self.in_fast_recovery {
1796 htna
1798 } else {
1799 cum_tsn_ack_point + (self.inflight_queue.len() as u32) + 1
1801 };
1802
1803 let mut tsn = cum_tsn_ack_point + 1;
1804 while sna32lt(tsn, max_tsn) {
1805 if let Some(c) = self.inflight_queue.get_mut(tsn) {
1806 if !c.acked && !c.abandoned() && c.miss_indicator < 3 {
1807 c.miss_indicator += 1;
1808 if c.miss_indicator == 3 && !self.in_fast_recovery {
1809 self.in_fast_recovery = true;
1813 self.fast_recover_exit_point = htna;
1814 self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
1815 self.cwnd = self.ssthresh;
1816 self.partial_bytes_acked = 0;
1817 self.will_retransmit_fast = true;
1818
1819 trace!(
1820 "[{}] updated cwnd={} ssthresh={} inflight={} (FR)",
1821 self.side,
1822 self.cwnd,
1823 self.ssthresh,
1824 self.inflight_queue.get_num_bytes()
1825 );
1826 }
1827 }
1828 } else {
1829 return Err(Error::ErrTsnRequestNotExist);
1830 }
1831
1832 tsn += 1;
1833 }
1834 }
1835
1836 if self.in_fast_recovery && cum_tsn_ack_point_advanced {
1837 self.will_retransmit_fast = true;
1838 }
1839
1840 Ok(())
1841 }
1842
1843 fn postprocess_sack(
1846 &mut self,
1847 state: AssociationState,
1848 mut should_awake_write_loop: bool,
1849 now: Instant,
1850 ) {
1851 if !self.inflight_queue.is_empty() {
1852 trace!("[{}] T3-rtx timer start (pt3)", self.side);
1854 self.timers
1855 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1856 } else if state == AssociationState::ShutdownPending {
1857 should_awake_write_loop = true;
1859 self.will_send_shutdown = true;
1860 self.set_state(AssociationState::ShutdownSent);
1861 } else if state == AssociationState::ShutdownReceived {
1862 should_awake_write_loop = true;
1864 self.will_send_shutdown_ack = true;
1865 self.set_state(AssociationState::ShutdownAckSent);
1866 }
1867
1868 if should_awake_write_loop {
1869 self.awake_write_loop();
1870 }
1871 }
1872
1873 fn reset_streams_if_any(
1874 &mut self,
1875 p: &ParamOutgoingResetRequest,
1876 respond: bool,
1877 reply: &mut Vec<Packet>,
1878 ) -> Result<()> {
1879 let mut result = ReconfigResult::SuccessPerformed;
1880 let mut sis_to_reset = vec![];
1881
1882 if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
1883 debug!(
1884 "[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
1885 self.side, p.sender_last_tsn, self.peer_last_tsn
1886 );
1887 for id in &p.stream_identifiers {
1888 if self.streams.contains_key(id) {
1889 if respond {
1890 sis_to_reset.push(*id);
1891 }
1892 self.unregister_stream(*id, AssociationError::Reset);
1893 }
1894 }
1895 self.reconfig_requests
1896 .remove(&p.reconfig_request_sequence_number);
1897 } else {
1898 debug!(
1899 "[{}] resetStream(): senderLastTSN={} > peer_last_tsn={}",
1900 self.side, p.sender_last_tsn, self.peer_last_tsn
1901 );
1902 result = ReconfigResult::InProgress;
1903 }
1904
1905 if !sis_to_reset.is_empty() {
1908 let rsn = self.generate_next_rsn();
1909 let tsn = self.my_next_tsn - 1;
1910
1911 let c = ChunkReconfig {
1912 param_a: Some(Box::new(ParamOutgoingResetRequest {
1913 reconfig_request_sequence_number: rsn,
1914 reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1915 sender_last_tsn: tsn,
1916 stream_identifiers: sis_to_reset,
1917 })),
1918 ..Default::default()
1919 };
1920
1921 self.reconfigs.insert(rsn, c.clone()); let p = self.create_packet(vec![Box::new(c)]);
1924 reply.push(p);
1925 }
1926
1927 let packet = self.create_packet(vec![Box::new(ChunkReconfig {
1928 param_a: Some(Box::new(ParamReconfigResponse {
1929 reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1930 result,
1931 })),
1932 param_b: None,
1933 })]);
1934
1935 debug!("[{}] RESET RESPONSE: {}", self.side, packet);
1936
1937 reply.push(packet);
1938
1939 Ok(())
1940 }
1941
1942 pub(crate) fn create_packet(&self, chunks: Vec<Box<dyn Chunk>>) -> Packet {
1945 Packet {
1946 common_header: CommonHeader {
1947 verification_tag: self.peer_verification_tag,
1948 source_port: self.source_port,
1949 destination_port: self.destination_port,
1950 },
1951 chunks,
1952 }
1953 }
1954
1955 fn create_stream(
1957 &mut self,
1958 stream_identifier: StreamId,
1959 accept: bool,
1960 default_payload_type: PayloadProtocolIdentifier,
1961 ) -> Option<Stream<'_>> {
1962 let s = StreamState::new(
1963 self.side,
1964 stream_identifier,
1965 self.max_payload_size,
1966 default_payload_type,
1967 );
1968
1969 if accept {
1970 self.stream_queue.push_back(stream_identifier);
1971 self.events.push_back(Event::Stream(StreamEvent::Opened {
1972 id: stream_identifier,
1973 }));
1974 }
1975
1976 self.streams.insert(stream_identifier, s);
1977
1978 Some(Stream {
1979 stream_identifier,
1980 association: self,
1981 })
1982 }
1983
1984 fn get_or_create_stream(&mut self, stream_identifier: StreamId) -> Option<Stream<'_>> {
1986 if self.streams.contains_key(&stream_identifier) {
1987 Some(Stream {
1988 stream_identifier,
1989 association: self,
1990 })
1991 } else {
1992 self.create_stream(
1993 stream_identifier,
1994 true,
1995 PayloadProtocolIdentifier::default(),
1996 )
1997 }
1998 }
1999
2000 pub(crate) fn get_my_receiver_window_credit(&self) -> u32 {
2001 let mut bytes_queued = 0;
2002 for s in self.streams.values() {
2003 bytes_queued += s.get_num_bytes_in_reassembly_queue() as u32;
2004 }
2005
2006 self.max_receive_buffer_size.saturating_sub(bytes_queued)
2007 }
2008
2009 fn gather_outbound(&mut self, now: Instant) -> (Vec<Bytes>, bool) {
2012 let mut raw_packets = vec![];
2013
2014 if !self.control_queue.is_empty() {
2015 for p in self.control_queue.drain(..) {
2016 if let Ok(raw) = p.marshal() {
2017 raw_packets.push(raw);
2018 } else {
2019 warn!("[{}] failed to serialize a control packet", self.side);
2020 continue;
2021 }
2022 }
2023 }
2024
2025 let state = self.state();
2026 match state {
2027 AssociationState::Established => {
2028 raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2029 raw_packets = self.gather_outbound_data_and_reconfig_packets(raw_packets, now);
2030 raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2031 raw_packets = self.gather_outbound_sack_packets(raw_packets);
2032 raw_packets = self.gather_outbound_forward_tsn_packets(raw_packets);
2033 (raw_packets, true)
2034 }
2035 AssociationState::ShutdownPending
2036 | AssociationState::ShutdownSent
2037 | AssociationState::ShutdownReceived => {
2038 raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2039 raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2040 raw_packets = self.gather_outbound_sack_packets(raw_packets);
2041 self.gather_outbound_shutdown_packets(raw_packets, now)
2042 }
2043 AssociationState::ShutdownAckSent => {
2044 self.gather_outbound_shutdown_packets(raw_packets, now)
2045 }
2046 _ => (raw_packets, true),
2047 }
2048 }
2049
2050 fn gather_data_packets_to_retransmit(
2051 &mut self,
2052 mut raw_packets: Vec<Bytes>,
2053 now: Instant,
2054 ) -> Vec<Bytes> {
2055 for p in &self.get_data_packets_to_retransmit(now) {
2056 if let Ok(raw) = p.marshal() {
2057 raw_packets.push(raw);
2058 } else {
2059 warn!(
2060 "[{}] failed to serialize a DATA packet to be retransmitted",
2061 self.side
2062 );
2063 }
2064 }
2065
2066 raw_packets
2067 }
2068
2069 fn gather_outbound_data_and_reconfig_packets(
2070 &mut self,
2071 mut raw_packets: Vec<Bytes>,
2072 now: Instant,
2073 ) -> Vec<Bytes> {
2074 let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send(now);
2077 if !chunks.is_empty() {
2078 trace!("[{}] T3-rtx timer start (pt1)", self.side);
2080 self.timers
2081 .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
2082
2083 for p in &self.bundle_data_chunks_into_packets(chunks) {
2084 if let Ok(raw) = p.marshal() {
2085 raw_packets.push(raw);
2086 } else {
2087 warn!("[{}] failed to serialize a DATA packet", self.side);
2088 }
2089 }
2090 }
2091
2092 if !sis_to_reset.is_empty() || self.will_retransmit_reconfig {
2093 if self.will_retransmit_reconfig {
2094 self.will_retransmit_reconfig = false;
2095 debug!(
2096 "[{}] retransmit {} RECONFIG chunk(s)",
2097 self.side,
2098 self.reconfigs.len()
2099 );
2100 for c in self.reconfigs.values() {
2101 let p = self.create_packet(vec![Box::new(c.clone())]);
2102 if let Ok(raw) = p.marshal() {
2103 raw_packets.push(raw);
2104 } else {
2105 warn!(
2106 "[{}] failed to serialize a RECONFIG packet to be retransmitted",
2107 self.side,
2108 );
2109 }
2110 }
2111 }
2112
2113 if !sis_to_reset.is_empty() {
2114 let rsn = self.generate_next_rsn();
2115 let tsn = self.my_next_tsn - 1;
2116 debug!(
2117 "[{}] sending RECONFIG: rsn={} tsn={} streams={:?}",
2118 self.side,
2119 rsn,
2120 self.my_next_tsn - 1,
2121 sis_to_reset
2122 );
2123
2124 let c = ChunkReconfig {
2125 param_a: Some(Box::new(ParamOutgoingResetRequest {
2126 reconfig_request_sequence_number: rsn,
2127 sender_last_tsn: tsn,
2128 stream_identifiers: sis_to_reset,
2129 ..Default::default()
2130 })),
2131 ..Default::default()
2132 };
2133 self.reconfigs.insert(rsn, c.clone()); let p = self.create_packet(vec![Box::new(c)]);
2136 if let Ok(raw) = p.marshal() {
2137 raw_packets.push(raw);
2138 } else {
2139 warn!(
2140 "[{}] failed to serialize a RECONFIG packet to be transmitted",
2141 self.side
2142 );
2143 }
2144 }
2145
2146 if !self.reconfigs.is_empty() {
2147 self.timers
2148 .start(Timer::Reconfig, now, self.rto_mgr.get_rto());
2149 }
2150 }
2151
2152 raw_packets
2153 }
2154
2155 fn gather_outbound_fast_retransmission_packets(
2156 &mut self,
2157 mut raw_packets: Vec<Bytes>,
2158 now: Instant,
2159 ) -> Vec<Bytes> {
2160 if self.will_retransmit_fast {
2161 self.will_retransmit_fast = false;
2162
2163 let mut to_fast_retrans: Vec<Box<dyn Chunk>> = vec![];
2164 let mut fast_retrans_size = COMMON_HEADER_SIZE;
2165
2166 let mut i = 0;
2167 loop {
2168 let tsn = self.cumulative_tsn_ack_point + i + 1;
2169 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2170 if c.acked || c.abandoned() || c.nsent > 1 || c.miss_indicator < 3 {
2171 i += 1;
2172 continue;
2173 }
2174
2175 let data_chunk_size = DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2186 if self.mtu < fast_retrans_size + data_chunk_size {
2187 break;
2188 }
2189
2190 fast_retrans_size += data_chunk_size;
2191 self.stats.inc_fast_retrans();
2192 c.nsent += 1;
2193 } else {
2194 break; }
2196
2197 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2198 Association::check_partial_reliability_status(
2199 c,
2200 now,
2201 self.use_forward_tsn,
2202 self.side,
2203 &self.streams,
2204 );
2205 to_fast_retrans.push(Box::new(c.clone()));
2206 trace!(
2207 "[{}] fast-retransmit: tsn={} sent={} htna={}",
2208 self.side, c.tsn, c.nsent, self.fast_recover_exit_point
2209 );
2210 }
2211 i += 1;
2212 }
2213
2214 if !to_fast_retrans.is_empty() {
2215 if let Ok(raw) = self.create_packet(to_fast_retrans).marshal() {
2216 raw_packets.push(raw);
2217 } else {
2218 warn!(
2219 "[{}] failed to serialize a DATA packet to be fast-retransmitted",
2220 self.side
2221 );
2222 }
2223 }
2224 }
2225
2226 raw_packets
2227 }
2228
2229 fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2230 if self.ack_state == AckState::Immediate {
2231 self.ack_state = AckState::Idle;
2232 let sack = self.create_selective_ack_chunk();
2233 debug!("[{}] sending SACK: {}", self.side, sack);
2234 if let Ok(raw) = self.create_packet(vec![Box::new(sack)]).marshal() {
2235 raw_packets.push(raw);
2236 } else {
2237 warn!("[{}] failed to serialize a SACK packet", self.side);
2238 }
2239 }
2240
2241 raw_packets
2242 }
2243
2244 fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2245 if self.will_send_forward_tsn {
2251 self.will_send_forward_tsn = false;
2252 if sna32gt(
2253 self.advanced_peer_tsn_ack_point,
2254 self.cumulative_tsn_ack_point,
2255 ) {
2256 let fwd_tsn = self.create_forward_tsn();
2257 if let Ok(raw) = self.create_packet(vec![Box::new(fwd_tsn)]).marshal() {
2258 raw_packets.push(raw);
2259 } else {
2260 warn!("[{}] failed to serialize a Forward TSN packet", self.side);
2261 }
2262 }
2263 }
2264
2265 raw_packets
2266 }
2267
2268 fn gather_outbound_shutdown_packets(
2269 &mut self,
2270 mut raw_packets: Vec<Bytes>,
2271 now: Instant,
2272 ) -> (Vec<Bytes>, bool) {
2273 let mut ok = true;
2274
2275 if self.will_send_shutdown {
2276 self.will_send_shutdown = false;
2277
2278 let shutdown = ChunkShutdown {
2279 cumulative_tsn_ack: self.cumulative_tsn_ack_point,
2280 };
2281
2282 if let Ok(raw) = self.create_packet(vec![Box::new(shutdown)]).marshal() {
2283 self.timers
2284 .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2285 raw_packets.push(raw);
2286 } else {
2287 warn!("[{}] failed to serialize a Shutdown packet", self.side);
2288 }
2289 } else if self.will_send_shutdown_ack {
2290 self.will_send_shutdown_ack = false;
2291
2292 let shutdown_ack = ChunkShutdownAck {};
2293
2294 if let Ok(raw) = self.create_packet(vec![Box::new(shutdown_ack)]).marshal() {
2295 self.timers
2296 .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2297 raw_packets.push(raw);
2298 } else {
2299 warn!("[{}] failed to serialize a ShutdownAck packet", self.side);
2300 }
2301 } else if self.will_send_shutdown_complete {
2302 self.will_send_shutdown_complete = false;
2303
2304 let shutdown_complete = ChunkShutdownComplete {};
2305
2306 if let Ok(raw) = self
2307 .create_packet(vec![Box::new(shutdown_complete)])
2308 .marshal()
2309 {
2310 raw_packets.push(raw);
2311 ok = false;
2312 } else {
2313 warn!(
2314 "[{}] failed to serialize a ShutdownComplete packet",
2315 self.side
2316 );
2317 }
2318 }
2319
2320 (raw_packets, ok)
2321 }
2322
2323 fn get_data_packets_to_retransmit(&mut self, now: Instant) -> Vec<Packet> {
2326 let awnd = std::cmp::min(self.cwnd, self.rwnd);
2327 let mut chunks = vec![];
2328 let mut bytes_to_send = 0;
2329 let mut done = false;
2330 let mut i = 0;
2331 while !done {
2332 let tsn = self.cumulative_tsn_ack_point + i + 1;
2333 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2334 if !c.retransmit {
2335 i += 1;
2336 continue;
2337 }
2338
2339 if i == 0 && self.rwnd < c.user_data.len() as u32 {
2340 done = true;
2342 } else if bytes_to_send + c.user_data.len() > awnd as usize {
2343 break;
2344 }
2345
2346 c.retransmit = false;
2349 bytes_to_send += c.user_data.len();
2350
2351 c.nsent += 1;
2352 } else {
2353 break; }
2355
2356 if let Some(c) = self.inflight_queue.get_mut(tsn) {
2357 Association::check_partial_reliability_status(
2358 c,
2359 now,
2360 self.use_forward_tsn,
2361 self.side,
2362 &self.streams,
2363 );
2364
2365 trace!(
2366 "[{}] retransmitting tsn={} ssn={} sent={}",
2367 self.side, c.tsn, c.stream_sequence_number, c.nsent
2368 );
2369
2370 chunks.push(c.clone());
2371 }
2372 i += 1;
2373 }
2374
2375 self.bundle_data_chunks_into_packets(chunks)
2376 }
2377
2378 fn pop_pending_data_chunks_to_send(
2381 &mut self,
2382 now: Instant,
2383 ) -> (Vec<ChunkPayloadData>, Vec<u16>) {
2384 let mut chunks = vec![];
2385 let mut sis_to_reset = vec![]; if !self.pending_queue.is_empty() {
2387 while let Some(c) = self.pending_queue.peek() {
2396 let (beginning_fragment, unordered, data_len, stream_identifier) = (
2397 c.beginning_fragment,
2398 c.unordered,
2399 c.user_data.len(),
2400 c.stream_identifier,
2401 );
2402
2403 if data_len == 0 {
2404 sis_to_reset.push(stream_identifier);
2405 if self
2406 .pending_queue
2407 .pop(beginning_fragment, unordered)
2408 .is_none()
2409 {
2410 error!("[{}] failed to pop from pending queue", self.side);
2411 }
2412 continue;
2413 }
2414
2415 if self.inflight_queue.get_num_bytes() + data_len > self.cwnd as usize {
2416 break; }
2418
2419 if data_len > self.rwnd as usize {
2420 break; }
2422
2423 self.rwnd -= data_len as u32;
2424
2425 if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2426 beginning_fragment,
2427 unordered,
2428 now,
2429 ) {
2430 chunks.push(chunk);
2431 }
2432 }
2433
2434 if chunks.is_empty() && self.inflight_queue.is_empty() {
2436 if let Some(c) = self.pending_queue.peek() {
2438 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
2439
2440 if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2441 beginning_fragment,
2442 unordered,
2443 now,
2444 ) {
2445 chunks.push(chunk);
2446 }
2447 }
2448 }
2449 }
2450
2451 (chunks, sis_to_reset)
2452 }
2453
2454 fn bundle_data_chunks_into_packets(&self, chunks: Vec<ChunkPayloadData>) -> Vec<Packet> {
2458 let mut packets = vec![];
2459 let mut chunks_to_send = vec![];
2460 let mut bytes_in_packet = COMMON_HEADER_SIZE;
2461
2462 for c in chunks {
2463 if bytes_in_packet + c.user_data.len() as u32 > self.mtu {
2469 packets.push(self.create_packet(chunks_to_send));
2470 chunks_to_send = vec![];
2471 bytes_in_packet = COMMON_HEADER_SIZE;
2472 }
2473
2474 bytes_in_packet += DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2475 chunks_to_send.push(Box::new(c));
2476 }
2477
2478 if !chunks_to_send.is_empty() {
2479 packets.push(self.create_packet(chunks_to_send));
2480 }
2481
2482 packets
2483 }
2484
2485 fn generate_next_tsn(&mut self) -> u32 {
2487 let tsn = self.my_next_tsn;
2488 self.my_next_tsn += 1;
2489 tsn
2490 }
2491
2492 fn generate_next_rsn(&mut self) -> u32 {
2494 let rsn = self.my_next_rsn;
2495 self.my_next_rsn += 1;
2496 rsn
2497 }
2498
2499 fn check_partial_reliability_status(
2500 c: &mut ChunkPayloadData,
2501 now: Instant,
2502 use_forward_tsn: bool,
2503 side: Side,
2504 streams: &HashMap<u16, StreamState>,
2505 ) {
2506 if !use_forward_tsn {
2507 return;
2508 }
2509
2510 if c.payload_type == PayloadProtocolIdentifier::Dcep {
2516 return;
2517 }
2518
2519 if let Some(s) = streams.get(&c.stream_identifier) {
2521 let reliability_type: ReliabilityType = s.reliability_type;
2522 let reliability_value = s.reliability_value;
2523
2524 if reliability_type == ReliabilityType::Rexmit {
2525 if c.nsent >= reliability_value {
2526 c.set_abandoned(true);
2527 trace!(
2528 "[{}] marked as abandoned: tsn={} ppi={} (remix: {})",
2529 side, c.tsn, c.payload_type, c.nsent
2530 );
2531 }
2532 } else if reliability_type == ReliabilityType::Timed {
2533 if let Some(since) = &c.since {
2534 let elapsed = now.duration_since(*since);
2535 if elapsed.as_millis() as u32 >= reliability_value {
2536 c.set_abandoned(true);
2537 trace!(
2538 "[{}] marked as abandoned: tsn={} ppi={} (timed: {:?})",
2539 side, c.tsn, c.payload_type, elapsed
2540 );
2541 }
2542 } else {
2543 error!("[{}] invalid c.since", side);
2544 }
2545 }
2546 } else {
2547 error!("[{}] stream {} not found)", side, c.stream_identifier);
2548 }
2549 }
2550
2551 fn create_selective_ack_chunk(&mut self) -> ChunkSelectiveAck {
2552 ChunkSelectiveAck {
2553 cumulative_tsn_ack: self.peer_last_tsn,
2554 advertised_receiver_window_credit: self.get_my_receiver_window_credit(),
2555 gap_ack_blocks: self.payload_queue.get_gap_ack_blocks(self.peer_last_tsn),
2556 duplicate_tsn: self.payload_queue.pop_duplicates(),
2557 }
2558 }
2559
2560 fn create_forward_tsn(&self) -> ChunkForwardTsn {
2563 let mut stream_map: HashMap<u16, u16> = HashMap::new(); let mut i = self.cumulative_tsn_ack_point + 1;
2566 while sna32lte(i, self.advanced_peer_tsn_ack_point) {
2567 if let Some(c) = self.inflight_queue.get(i) {
2568 if let Some(ssn) = stream_map.get(&c.stream_identifier) {
2569 if sna16lt(*ssn, c.stream_sequence_number) {
2570 stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2572 }
2573 } else {
2574 stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2575 }
2576 } else {
2577 break;
2578 }
2579
2580 i += 1;
2581 }
2582
2583 let mut fwd_tsn = ChunkForwardTsn {
2584 new_cumulative_tsn: self.advanced_peer_tsn_ack_point,
2585 streams: vec![],
2586 };
2587
2588 let mut stream_str = String::new();
2589 for (si, ssn) in &stream_map {
2590 stream_str += format!("(si={} ssn={})", si, ssn).as_str();
2591 fwd_tsn.streams.push(ChunkForwardTsnStream {
2592 identifier: *si,
2593 sequence: *ssn,
2594 });
2595 }
2596 trace!(
2597 "[{}] building fwd_tsn: newCumulativeTSN={} cumTSN={} - {}",
2598 self.side, fwd_tsn.new_cumulative_tsn, self.cumulative_tsn_ack_point, stream_str
2599 );
2600
2601 fwd_tsn
2602 }
2603
2604 fn move_pending_data_chunk_to_inflight_queue(
2606 &mut self,
2607 beginning_fragment: bool,
2608 unordered: bool,
2609 now: Instant,
2610 ) -> Option<ChunkPayloadData> {
2611 if let Some(mut c) = self.pending_queue.pop(beginning_fragment, unordered) {
2612 if c.ending_fragment {
2614 c.set_all_inflight();
2615 }
2616
2617 c.tsn = self.generate_next_tsn();
2619
2620 c.since = Some(now); c.nsent = 1; Association::check_partial_reliability_status(
2624 &mut c,
2625 now,
2626 self.use_forward_tsn,
2627 self.side,
2628 &self.streams,
2629 );
2630
2631 trace!(
2632 "[{}] sending ppi={} tsn={} ssn={} sent={} len={} ({},{})",
2633 self.side,
2634 c.payload_type as u32,
2635 c.tsn,
2636 c.stream_sequence_number,
2637 c.nsent,
2638 c.user_data.len(),
2639 c.beginning_fragment,
2640 c.ending_fragment
2641 );
2642
2643 self.inflight_queue.push_no_check(c.clone());
2644
2645 Some(c)
2646 } else {
2647 error!("[{}] failed to pop from pending queue", self.side);
2648 None
2649 }
2650 }
2651
2652 pub(crate) fn send_reset_request(&mut self, stream_identifier: StreamId) -> Result<()> {
2653 let state = self.state();
2654 if state != AssociationState::Established {
2655 return Err(Error::ErrResetPacketInStateNotExist);
2656 }
2657
2658 let c = ChunkPayloadData {
2661 stream_identifier,
2662 beginning_fragment: true,
2663 ending_fragment: true,
2664 user_data: Bytes::new(),
2665 ..Default::default()
2666 };
2667
2668 self.pending_queue.push(c);
2669 self.awake_write_loop();
2670
2671 Ok(())
2672 }
2673
2674 pub(crate) fn send_payload_data(&mut self, chunks: Vec<ChunkPayloadData>) -> Result<()> {
2676 let state = self.state();
2677 if state != AssociationState::Established {
2678 return Err(Error::ErrPayloadDataStateNotExist);
2679 }
2680
2681 for c in chunks {
2683 self.pending_queue.push(c);
2684 }
2685
2686 self.awake_write_loop();
2687 Ok(())
2688 }
2689
2690 pub(crate) fn buffered_amount(&self) -> usize {
2693 self.pending_queue.get_num_bytes() + self.inflight_queue.get_num_bytes()
2694 }
2695
2696 fn awake_write_loop(&self) {
2697 }
2699
2700 fn close_all_timers(&mut self) {
2701 for timer in Timer::VALUES {
2703 self.timers.stop(timer);
2704 }
2705 }
2706
2707 fn on_ack_timeout(&mut self) {
2708 trace!(
2709 "[{}] ack timed out (ack_state: {})",
2710 self.side, self.ack_state
2711 );
2712 self.stats.inc_ack_timeouts();
2713 self.ack_state = AckState::Immediate;
2714 self.awake_write_loop();
2715 }
2716
2717 fn on_retransmission_timeout(&mut self, timer_id: Timer, n_rtos: usize) {
2718 match timer_id {
2719 Timer::T1Init => {
2720 if let Err(err) = self.send_init() {
2721 debug!(
2722 "[{}] failed to retransmit init (n_rtos={}): {:?}",
2723 self.side, n_rtos, err
2724 );
2725 }
2726 }
2727
2728 Timer::T1Cookie => {
2729 if let Err(err) = self.send_cookie_echo() {
2730 debug!(
2731 "[{}] failed to retransmit cookie-echo (n_rtos={}): {:?}",
2732 self.side, n_rtos, err
2733 );
2734 }
2735 }
2736
2737 Timer::T2Shutdown => {
2738 debug!(
2739 "[{}] retransmission of shutdown timeout (n_rtos={})",
2740 self.side, n_rtos
2741 );
2742 let state = self.state();
2743 match state {
2744 AssociationState::ShutdownSent => {
2745 self.will_send_shutdown = true;
2746 self.awake_write_loop();
2747 }
2748 AssociationState::ShutdownAckSent => {
2749 self.will_send_shutdown_ack = true;
2750 self.awake_write_loop();
2751 }
2752 _ => {}
2753 }
2754 }
2755
2756 Timer::T3RTX => {
2757 self.stats.inc_t3timeouts();
2758
2759 self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
2770 self.cwnd = self.mtu;
2771 trace!(
2772 "[{}] updated cwnd={} ssthresh={} inflight={} (RTO)",
2773 self.side,
2774 self.cwnd,
2775 self.ssthresh,
2776 self.inflight_queue.get_num_bytes()
2777 );
2778
2779 if self.use_forward_tsn {
2784 let mut i = self.advanced_peer_tsn_ack_point + 1;
2786 while let Some(c) = self.inflight_queue.get(i) {
2787 if !c.abandoned() {
2788 break;
2789 }
2790 self.advanced_peer_tsn_ack_point = i;
2791 i += 1;
2792 }
2793
2794 if sna32gt(
2796 self.advanced_peer_tsn_ack_point,
2797 self.cumulative_tsn_ack_point,
2798 ) {
2799 self.will_send_forward_tsn = true;
2800 debug!(
2801 "[{}] on_retransmission_timeout {}: sna32GT({}, {})",
2802 self.side,
2803 self.will_send_forward_tsn,
2804 self.advanced_peer_tsn_ack_point,
2805 self.cumulative_tsn_ack_point
2806 );
2807 }
2808 }
2809
2810 debug!(
2811 "[{}] T3-rtx timed out: n_rtos={} cwnd={} ssthresh={}",
2812 self.side, n_rtos, self.cwnd, self.ssthresh
2813 );
2814
2815 self.inflight_queue.mark_all_to_retrasmit();
2816 self.awake_write_loop();
2817 }
2818
2819 Timer::Reconfig => {
2820 self.will_retransmit_reconfig = true;
2821 self.awake_write_loop();
2822 }
2823
2824 _ => {}
2825 }
2826 }
2827
2828 fn on_retransmission_failure(&mut self, id: Timer) {
2829 match id {
2830 Timer::T1Init => {
2831 error!("[{}] retransmission failure: T1-init", self.side);
2832 self.error = Some(AssociationError::HandshakeFailed(
2833 Error::ErrHandshakeInitAck.to_string(),
2834 ));
2835 }
2836
2837 Timer::T1Cookie => {
2838 error!("[{}] retransmission failure: T1-cookie", self.side);
2839 self.error = Some(AssociationError::HandshakeFailed(
2840 Error::ErrHandshakeCookieEcho.to_string(),
2841 ));
2842 }
2843
2844 Timer::T2Shutdown => {
2845 error!("[{}] retransmission failure: T2-shutdown", self.side);
2846 }
2847
2848 Timer::T3RTX => {
2849 error!("[{}] retransmission failure: T3-rtx (DATA)", self.side);
2854 }
2855
2856 _ => {}
2857 }
2858 }
2859
2860 #[cfg(test)]
2862 pub(crate) fn is_idle(&self) -> bool {
2863 Timer::VALUES
2864 .iter()
2865 .filter_map(|&t| Some((t, self.timers.get(t)?)))
2867 .min_by_key(|&(_, time)| time)
2868 .is_none()
2870 }
2871}