sctp_proto/association/
mod.rs

1use crate::association::{
2    state::{AckMode, AckState, AssociationState},
3    stats::AssociationStats,
4};
5use crate::chunk::{
6    chunk_abort::ChunkAbort, chunk_cookie_ack::ChunkCookieAck, chunk_cookie_echo::ChunkCookieEcho,
7    chunk_error::ChunkError, chunk_forward_tsn::ChunkForwardTsn,
8    chunk_forward_tsn::ChunkForwardTsnStream, chunk_heartbeat::ChunkHeartbeat,
9    chunk_heartbeat_ack::ChunkHeartbeatAck, chunk_init::ChunkInit, chunk_init::ChunkInitAck,
10    chunk_payload_data::ChunkPayloadData, chunk_payload_data::PayloadProtocolIdentifier,
11    chunk_reconfig::ChunkReconfig, chunk_selective_ack::ChunkSelectiveAck,
12    chunk_shutdown::ChunkShutdown, chunk_shutdown_ack::ChunkShutdownAck,
13    chunk_shutdown_complete::ChunkShutdownComplete, chunk_type::CT_FORWARD_TSN, Chunk,
14    ErrorCauseUnrecognizedChunkType, USER_INITIATED_ABORT,
15};
16use crate::config::{ServerConfig, TransportConfig, COMMON_HEADER_SIZE, DATA_CHUNK_HEADER_SIZE};
17use crate::error::{Error, Result};
18use crate::packet::{CommonHeader, Packet};
19use crate::param::{
20    param_heartbeat_info::ParamHeartbeatInfo,
21    param_outgoing_reset_request::ParamOutgoingResetRequest,
22    param_reconfig_response::{ParamReconfigResponse, ReconfigResult},
23    param_state_cookie::ParamStateCookie,
24    param_supported_extensions::ParamSupportedExtensions,
25    Param,
26};
27use crate::queue::{payload_queue::PayloadQueue, pending_queue::PendingQueue};
28use crate::shared::{AssociationEventInner, AssociationId, EndpointEvent, EndpointEventInner};
29use crate::util::{sna16lt, sna32gt, sna32gte, sna32lt, sna32lte};
30use crate::{AssociationEvent, Payload, Side, Transmit};
31use stream::{ReliabilityType, Stream, StreamEvent, StreamId, StreamState};
32use timer::{RtoManager, Timer, TimerTable, ACK_INTERVAL};
33
34use crate::association::stream::RecvSendState;
35use bytes::Bytes;
36use log::{debug, error, trace, warn};
37use rand::random;
38use rustc_hash::FxHashMap;
39use std::collections::{HashMap, VecDeque};
40use std::net::{IpAddr, SocketAddr};
41use std::str::FromStr;
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44use thiserror::Error;
45
46pub(crate) mod state;
47pub(crate) mod stats;
48pub(crate) mod stream;
49mod timer;
50
51#[cfg(test)]
52mod association_test;
53
54/// Reasons why an association might be lost
55#[derive(Debug, Error, Eq, Clone, PartialEq)]
56pub enum AssociationError {
57    /// Handshake failed
58    #[error("{0}")]
59    HandshakeFailed(#[from] Error),
60    /// The peer violated the QUIC specification as understood by this implementation
61    #[error("transport error")]
62    TransportError,
63    /// The peer's QUIC stack aborted the association automatically
64    #[error("aborted by peer")]
65    AssociationClosed,
66    /// The peer closed the association
67    #[error("closed by peer")]
68    ApplicationClosed,
69    /// The peer is unable to continue processing this association, usually due to having restarted
70    #[error("reset by peer")]
71    Reset,
72    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
73    ///
74    /// If neither side is sending keep-alives, an association will time out after a long enough idle
75    /// period even if the peer is still reachable
76    #[error("timed out")]
77    TimedOut,
78    /// The local application closed the association
79    #[error("closed")]
80    LocallyClosed,
81}
82
83/// Events of interest to the application
84#[derive(Debug)]
85pub enum Event {
86    /// The association was successfully established
87    Connected,
88    /// The association was lost
89    ///
90    /// Emitted if the peer closes the association or an error is encountered.
91    AssociationLost {
92        /// Reason that the association was closed
93        reason: AssociationError,
94    },
95    /// Stream events
96    Stream(StreamEvent),
97    /// One or more application datagrams have been received
98    DatagramReceived,
99}
100
101///Association represents an SCTP association
102//13.2.  Parameters Necessary per Association (i.e., the TCB)
103//Peer : Tag value to be sent in every packet and is received
104//Verification: in the INIT or INIT ACK chunk.
105//Tag :
106//
107//My : Tag expected in every inbound packet and sent in the
108//Verification: INIT or INIT ACK chunk.
109//
110//Tag :
111//State : A state variable indicating what state the association
112// : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
113// : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
114// : SHUTDOWN-ACK-SENT.
115//
116// No Closed state is illustrated since if a
117// association is Closed its TCB SHOULD be removed.
118#[derive(Debug)]
119pub struct Association {
120    side: Side,
121    state: AssociationState,
122    handshake_completed: bool,
123    max_message_size: u32,
124    inflight_queue_length: usize,
125    will_send_shutdown: bool,
126    bytes_received: usize,
127    bytes_sent: usize,
128
129    peer_verification_tag: u32,
130    my_verification_tag: u32,
131    my_next_tsn: u32,
132    peer_last_tsn: u32,
133    // for RTT measurement
134    min_tsn2measure_rtt: u32,
135    will_send_forward_tsn: bool,
136    will_retransmit_fast: bool,
137    will_retransmit_reconfig: bool,
138
139    will_send_shutdown_ack: bool,
140    will_send_shutdown_complete: bool,
141
142    // Reconfig
143    my_next_rsn: u32,
144    reconfigs: FxHashMap<u32, ChunkReconfig>,
145    reconfig_requests: FxHashMap<u32, ParamOutgoingResetRequest>,
146
147    // Non-RFC internal data
148    remote_addr: SocketAddr,
149    local_ip: Option<IpAddr>,
150    source_port: u16,
151    destination_port: u16,
152    my_max_num_inbound_streams: u16,
153    my_max_num_outbound_streams: u16,
154    my_cookie: Option<ParamStateCookie>,
155
156    payload_queue: PayloadQueue,
157    inflight_queue: PayloadQueue,
158    pending_queue: PendingQueue,
159    control_queue: VecDeque<Packet>,
160    stream_queue: VecDeque<u16>,
161
162    pub(crate) mtu: u32,
163    // max DATA chunk payload size
164    max_payload_size: u32,
165    cumulative_tsn_ack_point: u32,
166    advanced_peer_tsn_ack_point: u32,
167    use_forward_tsn: bool,
168
169    pub(crate) rto_mgr: RtoManager,
170    timers: TimerTable,
171
172    // Congestion control parameters
173    max_receive_buffer_size: u32,
174    // my congestion window size
175    pub(crate) cwnd: u32,
176    // calculated peer's receiver windows size
177    rwnd: u32,
178    // slow start threshold
179    pub(crate) ssthresh: u32,
180    partial_bytes_acked: u32,
181    pub(crate) in_fast_recovery: bool,
182    fast_recover_exit_point: u32,
183
184    // Chunks stored for retransmission
185    stored_init: Option<ChunkInit>,
186    stored_cookie_echo: Option<ChunkCookieEcho>,
187    pub(crate) streams: FxHashMap<StreamId, StreamState>,
188
189    events: VecDeque<Event>,
190    endpoint_events: VecDeque<EndpointEventInner>,
191    error: Option<AssociationError>,
192
193    // per inbound packet context
194    delayed_ack_triggered: bool,
195    immediate_ack_triggered: bool,
196
197    pub(crate) stats: AssociationStats,
198    ack_state: AckState,
199
200    // for testing
201    pub(crate) ack_mode: AckMode,
202}
203
204impl Default for Association {
205    fn default() -> Self {
206        Association {
207            side: Side::default(),
208            state: AssociationState::default(),
209            handshake_completed: false,
210            max_message_size: 0,
211            inflight_queue_length: 0,
212            will_send_shutdown: false,
213            bytes_received: 0,
214            bytes_sent: 0,
215
216            peer_verification_tag: 0,
217            my_verification_tag: 0,
218            my_next_tsn: 0,
219            peer_last_tsn: 0,
220            // for RTT measurement
221            min_tsn2measure_rtt: 0,
222            will_send_forward_tsn: false,
223            will_retransmit_fast: false,
224            will_retransmit_reconfig: false,
225
226            will_send_shutdown_ack: false,
227            will_send_shutdown_complete: false,
228
229            // Reconfig
230            my_next_rsn: 0,
231            reconfigs: FxHashMap::default(),
232            reconfig_requests: FxHashMap::default(),
233
234            // Non-RFC internal data
235            remote_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
236            local_ip: None,
237            source_port: 0,
238            destination_port: 0,
239            my_max_num_inbound_streams: 0,
240            my_max_num_outbound_streams: 0,
241            my_cookie: None,
242
243            payload_queue: PayloadQueue::default(),
244            inflight_queue: PayloadQueue::default(),
245            pending_queue: PendingQueue::default(),
246            control_queue: VecDeque::default(),
247            stream_queue: VecDeque::default(),
248
249            mtu: 0,
250            // max DATA chunk payload size
251            max_payload_size: 0,
252            cumulative_tsn_ack_point: 0,
253            advanced_peer_tsn_ack_point: 0,
254            use_forward_tsn: false,
255
256            rto_mgr: RtoManager::default(),
257            timers: TimerTable::default(),
258
259            // Congestion control parameters
260            max_receive_buffer_size: 0,
261            // my congestion window size
262            cwnd: 0,
263            // calculated peer's receiver windows size
264            rwnd: 0,
265            // slow start threshold
266            ssthresh: 0,
267            partial_bytes_acked: 0,
268            in_fast_recovery: false,
269            fast_recover_exit_point: 0,
270
271            // Chunks stored for retransmission
272            stored_init: None,
273            stored_cookie_echo: None,
274            streams: FxHashMap::default(),
275
276            events: VecDeque::default(),
277            endpoint_events: VecDeque::default(),
278            error: None,
279
280            // per inbound packet context
281            delayed_ack_triggered: false,
282            immediate_ack_triggered: false,
283
284            stats: AssociationStats::default(),
285            ack_state: AckState::default(),
286
287            // for testing
288            ack_mode: AckMode::default(),
289        }
290    }
291}
292
293impl Association {
294    pub(crate) fn new(
295        server_config: Option<Arc<ServerConfig>>,
296        config: Arc<TransportConfig>,
297        max_payload_size: u32,
298        local_aid: AssociationId,
299        remote_addr: SocketAddr,
300        local_ip: Option<IpAddr>,
301        now: Instant,
302    ) -> Self {
303        let side = if server_config.is_some() {
304            Side::Server
305        } else {
306            Side::Client
307        };
308
309        // It's a bit strange, but we're going backwards from the calculation in
310        // config.rs to get max_payload_size from INITIAL_MTU.
311        let mtu = max_payload_size + COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE;
312
313        // RFC 4690 Sec 7.2.1
314        // The initial cwnd before DATA transmission or after a sufficiently
315        // long idle period MUST be set to min(4*MTU, max (2*MTU, 4380bytes)).
316        let cwnd = (2 * mtu).clamp(4380, 4 * mtu);
317        let mut tsn = random::<u32>();
318        if tsn == 0 {
319            tsn += 1;
320        }
321
322        let mut this = Association {
323            side,
324            handshake_completed: false,
325            max_receive_buffer_size: config.max_receive_buffer_size(),
326            max_message_size: config.max_message_size(),
327            my_max_num_outbound_streams: config.max_num_outbound_streams(),
328            my_max_num_inbound_streams: config.max_num_inbound_streams(),
329            max_payload_size,
330
331            rto_mgr: RtoManager::new(
332                config.rto_initial_ms(),
333                config.rto_min_ms(),
334                config.rto_max_ms(),
335            ),
336            timers: TimerTable::new(
337                config.max_init_retransmits(),
338                config.max_data_retransmits(),
339                config.rto_max_ms(),
340            ),
341
342            mtu,
343            cwnd,
344            remote_addr,
345            local_ip,
346
347            my_verification_tag: local_aid,
348            my_next_tsn: tsn,
349            my_next_rsn: tsn,
350            min_tsn2measure_rtt: tsn,
351            cumulative_tsn_ack_point: tsn - 1,
352            advanced_peer_tsn_ack_point: tsn - 1,
353            error: None,
354
355            ..Default::default()
356        };
357
358        if side.is_client() {
359            let mut init = ChunkInit {
360                initial_tsn: this.my_next_tsn,
361                num_outbound_streams: this.my_max_num_outbound_streams,
362                num_inbound_streams: this.my_max_num_inbound_streams,
363                initiate_tag: this.my_verification_tag,
364                advertised_receiver_window_credit: this.max_receive_buffer_size,
365                ..Default::default()
366            };
367            init.set_supported_extensions();
368
369            this.set_state(AssociationState::CookieWait);
370            this.stored_init = Some(init);
371            let _ = this.send_init();
372            this.timers
373                .start(Timer::T1Init, now, this.rto_mgr.get_rto());
374        }
375
376        this
377    }
378
379    /// Returns application-facing event
380    ///
381    /// Associations should be polled for events after:
382    /// - a call was made to `handle_event`
383    /// - a call was made to `handle_timeout`
384    #[must_use]
385    pub fn poll(&mut self) -> Option<Event> {
386        if let Some(x) = self.events.pop_front() {
387            return Some(x);
388        }
389
390        /*TODO: if let Some(event) = self.streams.poll() {
391            return Some(Event::Stream(event));
392        }*/
393
394        if let Some(err) = self.error.take() {
395            return Some(Event::AssociationLost { reason: err });
396        }
397
398        None
399    }
400
401    /// Return endpoint-facing event
402    #[must_use]
403    pub fn poll_endpoint_event(&mut self) -> Option<EndpointEvent> {
404        self.endpoint_events.pop_front().map(EndpointEvent)
405    }
406
407    /// Returns the next time at which `handle_timeout` should be called
408    ///
409    /// The value returned may change after:
410    /// - the application performed some I/O on the association
411    /// - a call was made to `handle_transmit`
412    /// - a call to `poll_transmit` returned `Some`
413    /// - a call was made to `handle_timeout`
414    #[must_use]
415    pub fn poll_timeout(&mut self) -> Option<Instant> {
416        self.timers.next_timeout()
417    }
418
419    /// Returns packets to transmit
420    ///
421    /// Associations should be polled for transmit after:
422    /// - the application performed some I/O on the Association
423    /// - a call was made to `handle_event`
424    /// - a call was made to `handle_timeout`
425    #[must_use]
426    pub fn poll_transmit(&mut self, now: Instant) -> Option<Transmit> {
427        let (contents, _) = self.gather_outbound(now);
428        if contents.is_empty() {
429            None
430        } else {
431            trace!(
432                "[{}] sending {} bytes (total {} datagrams)",
433                self.side,
434                contents.iter().fold(0, |l, c| l + c.len()),
435                contents.len()
436            );
437            Some(Transmit {
438                now,
439                remote: self.remote_addr,
440                payload: Payload::RawEncode(contents),
441                ecn: None,
442                local_ip: self.local_ip,
443            })
444        }
445    }
446
447    /// Process timer expirations
448    ///
449    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
450    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
451    /// methods.
452    ///
453    /// It is most efficient to call this immediately after the system clock reaches the latest
454    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
455    /// no-op and therefore are safe.
456    pub fn handle_timeout(&mut self, now: Instant) {
457        for &timer in &Timer::VALUES {
458            let (expired, failure, n_rtos) = self.timers.is_expired(timer, now);
459            if !expired {
460                continue;
461            }
462            self.timers.set(timer, None);
463            //trace!("{:?} timeout", timer);
464
465            if timer == Timer::Ack {
466                self.on_ack_timeout();
467            } else if failure {
468                self.on_retransmission_failure(timer);
469            } else {
470                self.on_retransmission_timeout(timer, n_rtos);
471                self.timers.start(timer, now, self.rto_mgr.get_rto());
472            }
473        }
474    }
475
476    /// Process `AssociationEvent`s generated by the associated `Endpoint`
477    ///
478    /// Will execute protocol logic upon receipt of an association event, in turn preparing signals
479    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
480    /// extracted through the relevant methods.
481    pub fn handle_event(&mut self, event: AssociationEvent) {
482        match event.0 {
483            AssociationEventInner::Datagram(transmit) => {
484                // If this packet could initiate a migration and we're a client or a server that
485                // forbids migration, drop the datagram. This could be relaxed to heuristically
486                // permit NAT-rebinding-like migration.
487                /*TODO:if remote != self.remote && self.server_config.as_ref().map_or(true, |x| !x.migration)
488                {
489                    trace!("discarding packet from unrecognized peer {}", remote);
490                    return;
491                }*/
492
493                if let Payload::PartialDecode(partial_decode) = transmit.payload {
494                    trace!(
495                        "[{}] receiving {} bytes",
496                        self.side,
497                        COMMON_HEADER_SIZE as usize + partial_decode.remaining.len()
498                    );
499
500                    let pkt = match partial_decode.finish() {
501                        Ok(p) => p,
502                        Err(err) => {
503                            warn!("[{}] unable to parse SCTP packet {}", self.side, err);
504                            return;
505                        }
506                    };
507
508                    if let Err(err) = self.handle_inbound(pkt, transmit.now) {
509                        error!("handle_inbound got err: {}", err);
510                        let _ = self.close();
511                    }
512                } else {
513                    trace!("discarding invalid partial_decode");
514                }
515            } //TODO:
516        }
517    }
518
519    /// Returns Association statistics
520    pub fn stats(&self) -> AssociationStats {
521        self.stats
522    }
523
524    /// Whether the Association is in the process of being established
525    ///
526    /// If this returns `false`, the Association may be either established or closed, signaled by the
527    /// emission of a `Connected` or `AssociationLost` message respectively.
528    pub fn is_handshaking(&self) -> bool {
529        !self.handshake_completed
530    }
531
532    /// Whether the Association is closed
533    ///
534    /// Closed Associations cannot transport any further data. An association becomes closed when
535    /// either peer application intentionally closes it, or when either transport layer detects an
536    /// error such as a time-out or certificate validation failure.
537    ///
538    /// A `AssociationLost` event is emitted with details when the association becomes closed.
539    pub fn is_closed(&self) -> bool {
540        self.state == AssociationState::Closed
541    }
542
543    /// Whether there is no longer any need to keep the association around
544    ///
545    /// Closed associations become drained after a brief timeout to absorb any remaining in-flight
546    /// packets from the peer. All drained associations have been closed.
547    pub fn is_drained(&self) -> bool {
548        self.state.is_drained()
549    }
550
551    /// Look up whether we're the client or server of this Association
552    pub fn side(&self) -> Side {
553        self.side
554    }
555
556    /// The latest socket address for this Association's peer
557    pub fn remote_addr(&self) -> SocketAddr {
558        self.remote_addr
559    }
560
561    /// Current best estimate of this Association's latency (round-trip-time)
562    pub fn rtt(&self) -> Duration {
563        Duration::from_millis(self.rto_mgr.get_rto())
564    }
565
566    /// The local IP address which was used when the peer established
567    /// the association
568    ///
569    /// This can be different from the address the endpoint is bound to, in case
570    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
571    ///
572    /// This will return `None` for clients.
573    ///
574    /// Retrieving the local IP address is currently supported on the following
575    /// platforms:
576    /// - Linux
577    ///
578    /// On all non-supported platforms the local IP address will not be available,
579    /// and the method will return `None`.
580    pub fn local_ip(&self) -> Option<IpAddr> {
581        self.local_ip
582    }
583
584    /// Shutdown initiates the shutdown sequence. The method blocks until the
585    /// shutdown sequence is completed and the association is closed, or until the
586    /// passed context is done, in which case the context's error is returned.
587    pub fn shutdown(&mut self) -> Result<()> {
588        debug!("[{}] closing association..", self.side);
589
590        let state = self.state();
591        if state != AssociationState::Established {
592            return Err(Error::ErrShutdownNonEstablished);
593        }
594
595        // Attempt a graceful shutdown.
596        self.set_state(AssociationState::ShutdownPending);
597
598        if self.inflight_queue_length == 0 {
599            // No more outstanding, send shutdown.
600            self.will_send_shutdown = true;
601            self.awake_write_loop();
602            self.set_state(AssociationState::ShutdownSent);
603        }
604
605        self.endpoint_events.push_back(EndpointEventInner::Drained);
606
607        Ok(())
608    }
609
610    /// Close ends the SCTP Association and cleans up any state
611    pub fn close(&mut self) -> Result<()> {
612        if self.state() != AssociationState::Closed {
613            self.set_state(AssociationState::Closed);
614
615            debug!("[{}] closing association..", self.side);
616
617            self.close_all_timers();
618
619            for si in self.streams.keys().cloned().collect::<Vec<u16>>() {
620                self.unregister_stream(si);
621            }
622
623            debug!("[{}] association closed", self.side);
624            debug!(
625                "[{}] stats nDATAs (in) : {}",
626                self.side,
627                self.stats.get_num_datas()
628            );
629            debug!(
630                "[{}] stats nSACKs (in) : {}",
631                self.side,
632                self.stats.get_num_sacks()
633            );
634            debug!(
635                "[{}] stats nT3Timeouts : {}",
636                self.side,
637                self.stats.get_num_t3timeouts()
638            );
639            debug!(
640                "[{}] stats nAckTimeouts: {}",
641                self.side,
642                self.stats.get_num_ack_timeouts()
643            );
644            debug!(
645                "[{}] stats nFastRetrans: {}",
646                self.side,
647                self.stats.get_num_fast_retrans()
648            );
649        }
650
651        Ok(())
652    }
653
654    /// open_stream opens a stream
655    pub fn open_stream(
656        &mut self,
657        stream_identifier: StreamId,
658        default_payload_type: PayloadProtocolIdentifier,
659    ) -> Result<Stream<'_>> {
660        if self.streams.contains_key(&stream_identifier) {
661            return Err(Error::ErrStreamAlreadyExist);
662        }
663
664        if let Some(s) = self.create_stream(stream_identifier, false, default_payload_type) {
665            Ok(s)
666        } else {
667            Err(Error::ErrStreamCreateFailed)
668        }
669    }
670
671    /// accept_stream accepts a stream
672    pub fn accept_stream(&mut self) -> Option<Stream<'_>> {
673        self.stream_queue
674            .pop_front()
675            .map(move |stream_identifier| Stream {
676                stream_identifier,
677                association: self,
678            })
679    }
680
681    /// stream returns a stream
682    pub fn stream(&mut self, stream_identifier: StreamId) -> Result<Stream<'_>> {
683        if !self.streams.contains_key(&stream_identifier) {
684            Err(Error::ErrStreamNotExisted)
685        } else {
686            Ok(Stream {
687                stream_identifier,
688                association: self,
689            })
690        }
691    }
692
693    /// bytes_sent returns the number of bytes sent
694    pub(crate) fn bytes_sent(&self) -> usize {
695        self.bytes_sent
696    }
697
698    /// bytes_received returns the number of bytes received
699    pub(crate) fn bytes_received(&self) -> usize {
700        self.bytes_received
701    }
702
703    /// max_message_size returns the maximum message size you can send.
704    pub(crate) fn max_message_size(&self) -> u32 {
705        self.max_message_size
706    }
707
708    /// set_max_message_size sets the maximum message size you can send.
709    pub(crate) fn set_max_message_size(&mut self, max_message_size: u32) {
710        self.max_message_size = max_message_size;
711    }
712
713    /// unregister_stream un-registers a stream from the association
714    /// The caller should hold the association write lock.
715    fn unregister_stream(&mut self, stream_identifier: StreamId) {
716        if let Some(mut s) = self.streams.remove(&stream_identifier) {
717            debug!("[{}] unregister_stream {}", self.side, stream_identifier);
718            s.state = RecvSendState::Closed;
719        }
720    }
721
722    /// set_state atomically sets the state of the Association.
723    fn set_state(&mut self, new_state: AssociationState) {
724        if new_state != self.state {
725            debug!(
726                "[{}] state change: '{}' => '{}'",
727                self.side, self.state, new_state,
728            );
729        }
730        self.state = new_state;
731    }
732
733    /// state atomically returns the state of the Association.
734    pub(crate) fn state(&self) -> AssociationState {
735        self.state
736    }
737
738    /// caller must hold self.lock
739    fn send_init(&mut self) -> Result<()> {
740        if let Some(stored_init) = &self.stored_init {
741            debug!("[{}] sending INIT", self.side);
742
743            self.source_port = 5000; // Spec??
744            self.destination_port = 5000; // Spec??
745
746            let outbound = Packet {
747                common_header: CommonHeader {
748                    source_port: self.source_port,
749                    destination_port: self.destination_port,
750                    verification_tag: self.peer_verification_tag,
751                },
752                chunks: vec![Box::new(stored_init.clone())],
753            };
754
755            self.control_queue.push_back(outbound);
756            self.awake_write_loop();
757
758            Ok(())
759        } else {
760            Err(Error::ErrInitNotStoredToSend)
761        }
762    }
763
764    /// caller must hold self.lock
765    fn send_cookie_echo(&mut self) -> Result<()> {
766        if let Some(stored_cookie_echo) = &self.stored_cookie_echo {
767            debug!("[{}] sending COOKIE-ECHO", self.side);
768
769            let outbound = Packet {
770                common_header: CommonHeader {
771                    source_port: self.source_port,
772                    destination_port: self.destination_port,
773                    verification_tag: self.peer_verification_tag,
774                },
775                chunks: vec![Box::new(stored_cookie_echo.clone())],
776            };
777
778            self.control_queue.push_back(outbound);
779            self.awake_write_loop();
780
781            Ok(())
782        } else {
783            Err(Error::ErrCookieEchoNotStoredToSend)
784        }
785    }
786
787    /// handle_inbound parses incoming raw packets
788    fn handle_inbound(&mut self, p: Packet, now: Instant) -> Result<()> {
789        if let Err(err) = p.check_packet() {
790            warn!("[{}] failed validating packet {}", self.side, err);
791            return Ok(());
792        }
793
794        self.handle_chunk_start();
795
796        for c in &p.chunks {
797            self.handle_chunk(&p, c, now)?;
798        }
799
800        self.handle_chunk_end(now);
801
802        Ok(())
803    }
804
805    fn handle_chunk_start(&mut self) {
806        self.delayed_ack_triggered = false;
807        self.immediate_ack_triggered = false;
808    }
809
810    fn handle_chunk_end(&mut self, now: Instant) {
811        if self.immediate_ack_triggered {
812            self.ack_state = AckState::Immediate;
813            self.timers.stop(Timer::Ack);
814            self.awake_write_loop();
815        } else if self.delayed_ack_triggered {
816            // Will send delayed ack in the next ack timeout
817            self.ack_state = AckState::Delay;
818            self.timers.start(Timer::Ack, now, ACK_INTERVAL);
819        }
820    }
821
822    #[allow(clippy::borrowed_box)]
823    fn handle_chunk(
824        &mut self,
825        p: &Packet,
826        chunk: &Box<dyn Chunk + Send + Sync>,
827        now: Instant,
828    ) -> Result<()> {
829        chunk.check()?;
830        let chunk_any = chunk.as_any();
831        let packets = if let Some(c) = chunk_any.downcast_ref::<ChunkInit>() {
832            if c.is_ack {
833                self.handle_init_ack(p, c, now)?
834            } else {
835                self.handle_init(p, c)?
836            }
837        } else if let Some(c) = chunk_any.downcast_ref::<ChunkAbort>() {
838            let mut err_str = String::new();
839            for e in &c.error_causes {
840                if matches!(e.code, USER_INITIATED_ABORT) {
841                    debug!("User initiated abort received");
842                    let _ = self.close();
843                    return Ok(());
844                }
845                err_str += &format!("({})", e);
846            }
847            return Err(Error::ErrAbortChunk(err_str));
848        } else if let Some(c) = chunk_any.downcast_ref::<ChunkError>() {
849            let mut err_str = String::new();
850            for e in &c.error_causes {
851                err_str += &format!("({})", e);
852            }
853            return Err(Error::ErrAbortChunk(err_str));
854        } else if let Some(c) = chunk_any.downcast_ref::<ChunkHeartbeat>() {
855            self.handle_heartbeat(c)?
856        } else if let Some(c) = chunk_any.downcast_ref::<ChunkCookieEcho>() {
857            self.handle_cookie_echo(c)?
858        } else if chunk_any.downcast_ref::<ChunkCookieAck>().is_some() {
859            self.handle_cookie_ack()?
860        } else if let Some(c) = chunk_any.downcast_ref::<ChunkPayloadData>() {
861            self.handle_data(c)?
862        } else if let Some(c) = chunk_any.downcast_ref::<ChunkSelectiveAck>() {
863            self.handle_sack(c, now)?
864        } else if let Some(c) = chunk_any.downcast_ref::<ChunkReconfig>() {
865            self.handle_reconfig(c)?
866        } else if let Some(c) = chunk_any.downcast_ref::<ChunkForwardTsn>() {
867            self.handle_forward_tsn(c)?
868        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdown>() {
869            self.handle_shutdown(c)?
870        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownAck>() {
871            self.handle_shutdown_ack(c)?
872        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownComplete>() {
873            self.handle_shutdown_complete(c)?
874        } else {
875            return Err(Error::ErrChunkTypeUnhandled);
876        };
877
878        if !packets.is_empty() {
879            let mut buf: VecDeque<_> = packets.into_iter().collect();
880            self.control_queue.append(&mut buf);
881            self.awake_write_loop();
882        }
883
884        Ok(())
885    }
886
887    fn handle_init(&mut self, p: &Packet, i: &ChunkInit) -> Result<Vec<Packet>> {
888        let state = self.state();
889        debug!("[{}] chunkInit received in state '{}'", self.side, state);
890
891        // https://tools.ietf.org/html/rfc4960#section-5.2.1
892        // Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST
893        // respond with an INIT ACK using the same parameters it sent in its
894        // original INIT chunk (including its Initiate Tag, unchanged).  When
895        // responding, the endpoint MUST send the INIT ACK back to the same
896        // address that the original INIT (sent by this endpoint) was sent.
897
898        if state != AssociationState::Closed
899            && state != AssociationState::CookieWait
900            && state != AssociationState::CookieEchoed
901        {
902            // 5.2.2.  Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED,
903            //        COOKIE-WAIT, and SHUTDOWN-ACK-SENT
904            return Err(Error::ErrHandleInitState);
905        }
906
907        // Should we be setting any of these permanently until we've ACKed further?
908        self.my_max_num_inbound_streams =
909            std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
910        self.my_max_num_outbound_streams =
911            std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
912        self.peer_verification_tag = i.initiate_tag;
913        self.source_port = p.common_header.destination_port;
914        self.destination_port = p.common_header.source_port;
915
916        // 13.2 This is the last TSN received in sequence.  This value
917        // is set initially by taking the peer's initial TSN,
918        // received in the INIT or INIT ACK chunk, and
919        // subtracting one from it.
920        self.peer_last_tsn = if i.initial_tsn == 0 {
921            u32::MAX
922        } else {
923            i.initial_tsn - 1
924        };
925
926        for param in &i.params {
927            if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
928                for t in &v.chunk_types {
929                    if *t == CT_FORWARD_TSN {
930                        debug!("[{}] use ForwardTSN (on init)", self.side);
931                        self.use_forward_tsn = true;
932                    }
933                }
934            }
935        }
936        if !self.use_forward_tsn {
937            warn!("[{}] not using ForwardTSN (on init)", self.side);
938        }
939
940        let mut outbound = Packet {
941            common_header: CommonHeader {
942                verification_tag: self.peer_verification_tag,
943                source_port: self.source_port,
944                destination_port: self.destination_port,
945            },
946            chunks: vec![],
947        };
948
949        let mut init_ack = ChunkInit {
950            is_ack: true,
951            initial_tsn: self.my_next_tsn,
952            num_outbound_streams: self.my_max_num_outbound_streams,
953            num_inbound_streams: self.my_max_num_inbound_streams,
954            initiate_tag: self.my_verification_tag,
955            advertised_receiver_window_credit: self.max_receive_buffer_size,
956            ..Default::default()
957        };
958
959        if self.my_cookie.is_none() {
960            self.my_cookie = Some(ParamStateCookie::new());
961        }
962
963        if let Some(my_cookie) = &self.my_cookie {
964            init_ack.params = vec![Box::new(my_cookie.clone())];
965        }
966
967        init_ack.set_supported_extensions();
968
969        outbound.chunks = vec![Box::new(init_ack)];
970
971        Ok(vec![outbound])
972    }
973
974    fn handle_init_ack(
975        &mut self,
976        p: &Packet,
977        i: &ChunkInitAck,
978        now: Instant,
979    ) -> Result<Vec<Packet>> {
980        let state = self.state();
981        debug!("[{}] chunkInitAck received in state '{}'", self.side, state);
982        if state != AssociationState::CookieWait {
983            // RFC 4960
984            // 5.2.3.  Unexpected INIT ACK
985            //   If an INIT ACK is received by an endpoint in any state other than the
986            //   COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk.
987            //   An unexpected INIT ACK usually indicates the processing of an old or
988            //   duplicated INIT chunk.
989            return Ok(vec![]);
990        }
991
992        self.my_max_num_inbound_streams =
993            std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
994        self.my_max_num_outbound_streams =
995            std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
996        self.peer_verification_tag = i.initiate_tag;
997        self.peer_last_tsn = if i.initial_tsn == 0 {
998            u32::MAX
999        } else {
1000            i.initial_tsn - 1
1001        };
1002        if self.source_port != p.common_header.destination_port
1003            || self.destination_port != p.common_header.source_port
1004        {
1005            warn!("[{}] handle_init_ack: port mismatch", self.side);
1006            return Ok(vec![]);
1007        }
1008
1009        self.rwnd = i.advertised_receiver_window_credit;
1010        debug!("[{}] initial rwnd={}", self.side, self.rwnd);
1011
1012        // RFC 4690 Sec 7.2.1
1013        //  o  The initial value of ssthresh MAY be arbitrarily high (for
1014        //     example, implementations MAY use the size of the receiver
1015        //     advertised window).
1016        self.ssthresh = self.rwnd;
1017        trace!(
1018            "[{}] updated cwnd={} ssthresh={} inflight={} (INI)",
1019            self.side,
1020            self.cwnd,
1021            self.ssthresh,
1022            self.inflight_queue.get_num_bytes()
1023        );
1024
1025        self.timers.stop(Timer::T1Init);
1026        self.stored_init = None;
1027
1028        let mut cookie_param = None;
1029        for param in &i.params {
1030            if let Some(v) = param.as_any().downcast_ref::<ParamStateCookie>() {
1031                cookie_param = Some(v);
1032            } else if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
1033                for t in &v.chunk_types {
1034                    if *t == CT_FORWARD_TSN {
1035                        debug!("[{}] use ForwardTSN (on initAck)", self.side);
1036                        self.use_forward_tsn = true;
1037                    }
1038                }
1039            }
1040        }
1041        if !self.use_forward_tsn {
1042            warn!("[{}] not using ForwardTSN (on initAck)", self.side);
1043        }
1044
1045        if let Some(v) = cookie_param {
1046            self.stored_cookie_echo = Some(ChunkCookieEcho {
1047                cookie: v.cookie.clone(),
1048            });
1049
1050            self.send_cookie_echo()?;
1051
1052            self.timers
1053                .start(Timer::T1Cookie, now, self.rto_mgr.get_rto());
1054
1055            self.set_state(AssociationState::CookieEchoed);
1056
1057            Ok(vec![])
1058        } else {
1059            Err(Error::ErrInitAckNoCookie)
1060        }
1061    }
1062
1063    fn handle_heartbeat(&self, c: &ChunkHeartbeat) -> Result<Vec<Packet>> {
1064        trace!("[{}] chunkHeartbeat", self.side);
1065        if let Some(p) = c.params.first() {
1066            if let Some(hbi) = p.as_any().downcast_ref::<ParamHeartbeatInfo>() {
1067                return Ok(vec![Packet {
1068                    common_header: CommonHeader {
1069                        verification_tag: self.peer_verification_tag,
1070                        source_port: self.source_port,
1071                        destination_port: self.destination_port,
1072                    },
1073                    chunks: vec![Box::new(ChunkHeartbeatAck {
1074                        params: vec![Box::new(ParamHeartbeatInfo {
1075                            heartbeat_information: hbi.heartbeat_information.clone(),
1076                        })],
1077                    })],
1078                }]);
1079            } else {
1080                warn!(
1081                    "[{}] failed to handle Heartbeat, no ParamHeartbeatInfo",
1082                    self.side,
1083                );
1084            }
1085        }
1086
1087        Ok(vec![])
1088    }
1089
1090    fn handle_cookie_echo(&mut self, c: &ChunkCookieEcho) -> Result<Vec<Packet>> {
1091        let state = self.state();
1092        debug!("[{}] COOKIE-ECHO received in state '{}'", self.side, state);
1093
1094        if let Some(my_cookie) = &self.my_cookie {
1095            match state {
1096                AssociationState::Established => {
1097                    if my_cookie.cookie != c.cookie {
1098                        return Ok(vec![]);
1099                    }
1100                }
1101                AssociationState::Closed
1102                | AssociationState::CookieWait
1103                | AssociationState::CookieEchoed => {
1104                    if my_cookie.cookie != c.cookie {
1105                        return Ok(vec![]);
1106                    }
1107
1108                    self.timers.stop(Timer::T1Init);
1109                    self.stored_init = None;
1110
1111                    self.timers.stop(Timer::T1Cookie);
1112                    self.stored_cookie_echo = None;
1113
1114                    self.events.push_back(Event::Connected);
1115                    self.set_state(AssociationState::Established);
1116                    self.handshake_completed = true;
1117                }
1118                _ => return Ok(vec![]),
1119            };
1120        } else {
1121            debug!("[{}] COOKIE-ECHO received before initialization", self.side);
1122            return Ok(vec![]);
1123        }
1124
1125        Ok(vec![Packet {
1126            common_header: CommonHeader {
1127                verification_tag: self.peer_verification_tag,
1128                source_port: self.source_port,
1129                destination_port: self.destination_port,
1130            },
1131            chunks: vec![Box::new(ChunkCookieAck {})],
1132        }])
1133    }
1134
1135    fn handle_cookie_ack(&mut self) -> Result<Vec<Packet>> {
1136        let state = self.state();
1137        debug!("[{}] COOKIE-ACK received in state '{}'", self.side, state);
1138        if state != AssociationState::CookieEchoed {
1139            // RFC 4960
1140            // 5.2.5.  Handle Duplicate COOKIE-ACK.
1141            //   At any state other than COOKIE-ECHOED, an endpoint should silently
1142            //   discard a received COOKIE ACK chunk.
1143            return Ok(vec![]);
1144        }
1145
1146        self.timers.stop(Timer::T1Cookie);
1147        self.stored_cookie_echo = None;
1148
1149        self.events.push_back(Event::Connected);
1150        self.set_state(AssociationState::Established);
1151        self.handshake_completed = true;
1152
1153        Ok(vec![])
1154    }
1155
1156    fn handle_data(&mut self, d: &ChunkPayloadData) -> Result<Vec<Packet>> {
1157        trace!(
1158            "[{}] DATA: tsn={} immediateSack={} len={}",
1159            self.side,
1160            d.tsn,
1161            d.immediate_sack,
1162            d.user_data.len()
1163        );
1164        self.stats.inc_datas();
1165
1166        let can_push = self.payload_queue.can_push(d, self.peer_last_tsn);
1167        let mut stream_handle_data = false;
1168        if can_push {
1169            if self.get_or_create_stream(d.stream_identifier).is_some() {
1170                if self.get_my_receiver_window_credit() > 0 {
1171                    // Pass the new chunk to stream level as soon as it arrives
1172                    self.payload_queue.push(d.clone(), self.peer_last_tsn);
1173                    stream_handle_data = true;
1174                } else {
1175                    // Receive buffer is full
1176                    if let Some(last_tsn) = self.payload_queue.get_last_tsn_received() {
1177                        if sna32lt(d.tsn, *last_tsn) {
1178                            debug!("[{}] receive buffer full, but accepted as this is a missing chunk with tsn={} ssn={}", self.side, d.tsn, d.stream_sequence_number);
1179                            self.payload_queue.push(d.clone(), self.peer_last_tsn);
1180                            stream_handle_data = true; //s.handle_data(d.clone());
1181                        }
1182                    } else {
1183                        debug!(
1184                            "[{}] receive buffer full. dropping DATA with tsn={} ssn={}",
1185                            self.side, d.tsn, d.stream_sequence_number
1186                        );
1187                    }
1188                }
1189            } else {
1190                // silently discard the data. (sender will retry on T3-rtx timeout)
1191                // see pion/sctp#30
1192                debug!("[{}] discard {}", self.side, d.stream_sequence_number);
1193                return Ok(vec![]);
1194            }
1195        }
1196
1197        let immediate_sack = d.immediate_sack;
1198
1199        if stream_handle_data {
1200            if let Some(s) = self.streams.get_mut(&d.stream_identifier) {
1201                self.events.push_back(Event::DatagramReceived);
1202                s.handle_data(d);
1203                if s.reassembly_queue.is_readable() {
1204                    self.events.push_back(Event::Stream(StreamEvent::Readable {
1205                        id: d.stream_identifier,
1206                    }))
1207                }
1208            }
1209        }
1210
1211        self.handle_peer_last_tsn_and_acknowledgement(immediate_sack)
1212    }
1213
1214    fn handle_sack(&mut self, d: &ChunkSelectiveAck, now: Instant) -> Result<Vec<Packet>> {
1215        trace!(
1216            "[{}] {}, SACK: cumTSN={} a_rwnd={}",
1217            self.side,
1218            self.cumulative_tsn_ack_point,
1219            d.cumulative_tsn_ack,
1220            d.advertised_receiver_window_credit
1221        );
1222        let state = self.state();
1223        if state != AssociationState::Established
1224            && state != AssociationState::ShutdownPending
1225            && state != AssociationState::ShutdownReceived
1226        {
1227            return Ok(vec![]);
1228        }
1229
1230        self.stats.inc_sacks();
1231
1232        if sna32gt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1233            // RFC 4960 sec 6.2.1.  Processing a Received SACK
1234            // D)
1235            //   i) If Cumulative TSN Ack is less than the Cumulative TSN Ack
1236            //      Point, then drop the SACK.  Since Cumulative TSN Ack is
1237            //      monotonically increasing, a SACK whose Cumulative TSN Ack is
1238            //      less than the Cumulative TSN Ack Point indicates an out-of-
1239            //      order SACK.
1240
1241            debug!(
1242                "[{}] SACK Cumulative ACK {} is older than ACK point {}",
1243                self.side, d.cumulative_tsn_ack, self.cumulative_tsn_ack_point
1244            );
1245
1246            return Ok(vec![]);
1247        }
1248
1249        // Process selective ack
1250        let (bytes_acked_per_stream, htna) = self.process_selective_ack(d, now)?;
1251
1252        let mut total_bytes_acked = 0;
1253        for n_bytes_acked in bytes_acked_per_stream.values() {
1254            total_bytes_acked += *n_bytes_acked;
1255        }
1256
1257        let mut cum_tsn_ack_point_advanced = false;
1258        if sna32lt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1259            trace!(
1260                "[{}] SACK: cumTSN advanced: {} -> {}",
1261                self.side,
1262                self.cumulative_tsn_ack_point,
1263                d.cumulative_tsn_ack
1264            );
1265
1266            self.cumulative_tsn_ack_point = d.cumulative_tsn_ack;
1267            cum_tsn_ack_point_advanced = true;
1268            self.on_cumulative_tsn_ack_point_advanced(total_bytes_acked, now);
1269        }
1270
1271        for (si, n_bytes_acked) in &bytes_acked_per_stream {
1272            if let Some(s) = self.streams.get_mut(si) {
1273                if s.on_buffer_released(*n_bytes_acked) {
1274                    self.events
1275                        .push_back(Event::Stream(StreamEvent::BufferedAmountLow { id: *si }))
1276                }
1277            }
1278        }
1279
1280        // New rwnd value
1281        // RFC 4960 sec 6.2.1.  Processing a Received SACK
1282        // D)
1283        //   ii) Set rwnd equal to the newly received a_rwnd minus the number
1284        //       of bytes still outstanding after processing the Cumulative
1285        //       TSN Ack and the Gap Ack Blocks.
1286
1287        // bytes acked were already subtracted by markAsAcked() method
1288        let bytes_outstanding = self.inflight_queue.get_num_bytes() as u32;
1289        if bytes_outstanding >= d.advertised_receiver_window_credit {
1290            self.rwnd = 0;
1291        } else {
1292            self.rwnd = d.advertised_receiver_window_credit - bytes_outstanding;
1293        }
1294
1295        self.process_fast_retransmission(d.cumulative_tsn_ack, htna, cum_tsn_ack_point_advanced)?;
1296
1297        if self.use_forward_tsn {
1298            // RFC 3758 Sec 3.5 C1
1299            if sna32lt(
1300                self.advanced_peer_tsn_ack_point,
1301                self.cumulative_tsn_ack_point,
1302            ) {
1303                self.advanced_peer_tsn_ack_point = self.cumulative_tsn_ack_point
1304            }
1305
1306            // RFC 3758 Sec 3.5 C2
1307            let mut i = self.advanced_peer_tsn_ack_point + 1;
1308            while let Some(c) = self.inflight_queue.get(i) {
1309                if !c.abandoned() {
1310                    break;
1311                }
1312                self.advanced_peer_tsn_ack_point = i;
1313                i += 1;
1314            }
1315
1316            // RFC 3758 Sec 3.5 C3
1317            if sna32gt(
1318                self.advanced_peer_tsn_ack_point,
1319                self.cumulative_tsn_ack_point,
1320            ) {
1321                self.will_send_forward_tsn = true;
1322                debug!(
1323                    "[{}] handleSack {}: sna32GT({}, {})",
1324                    self.side,
1325                    self.will_send_forward_tsn,
1326                    self.advanced_peer_tsn_ack_point,
1327                    self.cumulative_tsn_ack_point
1328                );
1329            }
1330            self.awake_write_loop();
1331        }
1332
1333        self.postprocess_sack(state, cum_tsn_ack_point_advanced, now);
1334
1335        Ok(vec![])
1336    }
1337
1338    fn handle_reconfig(&mut self, c: &ChunkReconfig) -> Result<Vec<Packet>> {
1339        trace!("[{}] handle_reconfig", self.side);
1340
1341        let mut pp = vec![];
1342
1343        if let Some(param_a) = &c.param_a {
1344            self.handle_reconfig_param(param_a, &mut pp)?;
1345        }
1346
1347        if let Some(param_b) = &c.param_b {
1348            self.handle_reconfig_param(param_b, &mut pp)?;
1349        }
1350
1351        Ok(pp)
1352    }
1353
1354    fn handle_forward_tsn(&mut self, c: &ChunkForwardTsn) -> Result<Vec<Packet>> {
1355        trace!("[{}] FwdTSN: {}", self.side, c);
1356
1357        if !self.use_forward_tsn {
1358            warn!("[{}] received FwdTSN but not enabled", self.side);
1359            // Return an error chunk
1360            let cerr = ChunkError {
1361                error_causes: vec![ErrorCauseUnrecognizedChunkType::default()],
1362            };
1363
1364            let outbound = Packet {
1365                common_header: CommonHeader {
1366                    verification_tag: self.peer_verification_tag,
1367                    source_port: self.source_port,
1368                    destination_port: self.destination_port,
1369                },
1370                chunks: vec![Box::new(cerr)],
1371            };
1372            return Ok(vec![outbound]);
1373        }
1374
1375        // From RFC 3758 Sec 3.6:
1376        //   Note, if the "New Cumulative TSN" value carried in the arrived
1377        //   FORWARD TSN chunk is found to be behind or at the current cumulative
1378        //   TSN point, the data receiver MUST treat this FORWARD TSN as out-of-
1379        //   date and MUST NOT update its Cumulative TSN.  The receiver SHOULD
1380        //   send a SACK to its peer (the sender of the FORWARD TSN) since such a
1381        //   duplicate may indicate the previous SACK was lost in the network.
1382
1383        trace!(
1384            "[{}] should send ack? newCumTSN={} peer_last_tsn={}",
1385            self.side,
1386            c.new_cumulative_tsn,
1387            self.peer_last_tsn
1388        );
1389        if sna32lte(c.new_cumulative_tsn, self.peer_last_tsn) {
1390            trace!("[{}] sending ack on Forward TSN", self.side);
1391            self.ack_state = AckState::Immediate;
1392            self.timers.stop(Timer::Ack);
1393            self.awake_write_loop();
1394            return Ok(vec![]);
1395        }
1396
1397        // From RFC 3758 Sec 3.6:
1398        //   the receiver MUST perform the same TSN handling, including duplicate
1399        //   detection, gap detection, SACK generation, cumulative TSN
1400        //   advancement, etc. as defined in RFC 2960 [2]---with the following
1401        //   exceptions and additions.
1402
1403        //   When a FORWARD TSN chunk arrives, the data receiver MUST first update
1404        //   its cumulative TSN point to the value carried in the FORWARD TSN
1405        //   chunk,
1406
1407        // Advance peer_last_tsn
1408        while sna32lt(self.peer_last_tsn, c.new_cumulative_tsn) {
1409            self.payload_queue.pop(self.peer_last_tsn + 1); // may not exist
1410            self.peer_last_tsn += 1;
1411        }
1412
1413        // Report new peer_last_tsn value and abandoned largest SSN value to
1414        // corresponding streams so that the abandoned chunks can be removed
1415        // from the reassemblyQueue.
1416        for forwarded in &c.streams {
1417            if let Some(s) = self.streams.get_mut(&forwarded.identifier) {
1418                s.handle_forward_tsn_for_ordered(forwarded.sequence);
1419            }
1420        }
1421
1422        // TSN may be forewared for unordered chunks. ForwardTSN chunk does not
1423        // report which stream identifier it skipped for unordered chunks.
1424        // Therefore, we need to broadcast this event to all existing streams for
1425        // unordered chunks.
1426        // See https://github.com/pion/sctp/issues/106
1427        for s in self.streams.values_mut() {
1428            s.handle_forward_tsn_for_unordered(c.new_cumulative_tsn);
1429        }
1430
1431        self.handle_peer_last_tsn_and_acknowledgement(false)
1432    }
1433
1434    fn handle_shutdown(&mut self, _: &ChunkShutdown) -> Result<Vec<Packet>> {
1435        let state = self.state();
1436
1437        if state == AssociationState::Established {
1438            if !self.inflight_queue.is_empty() {
1439                self.set_state(AssociationState::ShutdownReceived);
1440            } else {
1441                // No more outstanding, send shutdown ack.
1442                self.will_send_shutdown_ack = true;
1443                self.set_state(AssociationState::ShutdownAckSent);
1444
1445                self.awake_write_loop();
1446            }
1447        } else if state == AssociationState::ShutdownSent {
1448            // self.cumulative_tsn_ack_point = c.cumulative_tsn_ack
1449
1450            self.will_send_shutdown_ack = true;
1451            self.set_state(AssociationState::ShutdownAckSent);
1452
1453            self.awake_write_loop();
1454        }
1455
1456        Ok(vec![])
1457    }
1458
1459    fn handle_shutdown_ack(&mut self, _: &ChunkShutdownAck) -> Result<Vec<Packet>> {
1460        let state = self.state();
1461        if state == AssociationState::ShutdownSent || state == AssociationState::ShutdownAckSent {
1462            self.timers.stop(Timer::T2Shutdown);
1463            self.will_send_shutdown_complete = true;
1464
1465            self.awake_write_loop();
1466        }
1467
1468        Ok(vec![])
1469    }
1470
1471    fn handle_shutdown_complete(&mut self, _: &ChunkShutdownComplete) -> Result<Vec<Packet>> {
1472        let state = self.state();
1473        if state == AssociationState::ShutdownAckSent {
1474            self.timers.stop(Timer::T2Shutdown);
1475            self.close()?;
1476        }
1477
1478        Ok(vec![])
1479    }
1480
1481    /// A common routine for handle_data and handle_forward_tsn routines
1482    fn handle_peer_last_tsn_and_acknowledgement(
1483        &mut self,
1484        sack_immediately: bool,
1485    ) -> Result<Vec<Packet>> {
1486        let mut reply = vec![];
1487
1488        // Try to advance peer_last_tsn
1489
1490        // From RFC 3758 Sec 3.6:
1491        //   .. and then MUST further advance its cumulative TSN point locally
1492        //   if possible
1493        // Meaning, if peer_last_tsn+1 points to a chunk that is received,
1494        // advance peer_last_tsn until peer_last_tsn+1 points to unreceived chunk.
1495        //debug!("[{}] peer_last_tsn = {}", self.side, self.peer_last_tsn);
1496        while self.payload_queue.pop(self.peer_last_tsn + 1).is_some() {
1497            self.peer_last_tsn += 1;
1498            //debug!("[{}] peer_last_tsn = {}", self.side, self.peer_last_tsn);
1499
1500            let rst_reqs: Vec<ParamOutgoingResetRequest> =
1501                self.reconfig_requests.values().cloned().collect();
1502            for rst_req in rst_reqs {
1503                self.reset_streams_if_any(&rst_req, false, &mut reply)?;
1504            }
1505        }
1506
1507        let has_packet_loss = !self.payload_queue.is_empty();
1508        if has_packet_loss {
1509            trace!(
1510                "[{}] packetloss: {}",
1511                self.side,
1512                self.payload_queue
1513                    .get_gap_ack_blocks_string(self.peer_last_tsn)
1514            );
1515        }
1516
1517        if (self.ack_state != AckState::Immediate
1518            && !sack_immediately
1519            && !has_packet_loss
1520            && self.ack_mode == AckMode::Normal)
1521            || self.ack_mode == AckMode::AlwaysDelay
1522        {
1523            if self.ack_state == AckState::Idle {
1524                self.delayed_ack_triggered = true;
1525            } else {
1526                self.immediate_ack_triggered = true;
1527            }
1528        } else {
1529            self.immediate_ack_triggered = true;
1530        }
1531
1532        Ok(reply)
1533    }
1534
1535    #[allow(clippy::borrowed_box)]
1536    fn handle_reconfig_param(
1537        &mut self,
1538        raw: &Box<dyn Param + Send + Sync>,
1539        reply: &mut Vec<Packet>,
1540    ) -> Result<()> {
1541        if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
1542            self.reconfig_requests
1543                .insert(p.reconfig_request_sequence_number, p.clone());
1544            self.reset_streams_if_any(p, true, reply)?;
1545            Ok(())
1546        } else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
1547            self.reconfigs.remove(&p.reconfig_response_sequence_number);
1548            if self.reconfigs.is_empty() {
1549                self.timers.stop(Timer::Reconfig);
1550            }
1551            Ok(())
1552        } else {
1553            Err(Error::ErrParameterType)
1554        }
1555    }
1556
1557    fn process_selective_ack(
1558        &mut self,
1559        d: &ChunkSelectiveAck,
1560        now: Instant,
1561    ) -> Result<(HashMap<u16, i64>, u32)> {
1562        let mut bytes_acked_per_stream = HashMap::new();
1563
1564        // New ack point, so pop all ACKed packets from inflight_queue
1565        // We add 1 because the "currentAckPoint" has already been popped from the inflight queue
1566        // For the first SACK we take care of this by setting the ackpoint to cumAck - 1
1567        let mut i = self.cumulative_tsn_ack_point + 1;
1568        //log::debug!("[{}] i={} d={}", self.name, i, d.cumulative_tsn_ack);
1569        while sna32lte(i, d.cumulative_tsn_ack) {
1570            if let Some(c) = self.inflight_queue.pop(i) {
1571                if !c.acked {
1572                    // RFC 4096 sec 6.3.2.  Retransmission Timer Rules
1573                    //   R3)  Whenever a SACK is received that acknowledges the DATA chunk
1574                    //        with the earliest outstanding TSN for that address, restart the
1575                    //        T3-rtx timer for that address with its current RTO (if there is
1576                    //        still outstanding data on that address).
1577                    if i == self.cumulative_tsn_ack_point + 1 {
1578                        // T3 timer needs to be reset. Stop it for now.
1579                        self.timers.stop(Timer::T3RTX);
1580                    }
1581
1582                    let n_bytes_acked = c.user_data.len() as i64;
1583
1584                    // Sum the number of bytes acknowledged per stream
1585                    if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1586                        *amount += n_bytes_acked;
1587                    } else {
1588                        bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1589                    }
1590
1591                    // RFC 4960 sec 6.3.1.  RTO Calculation
1592                    //   C4)  When data is in flight and when allowed by rule C5 below, a new
1593                    //        RTT measurement MUST be made each round trip.  Furthermore, new
1594                    //        RTT measurements SHOULD be made no more than once per round trip
1595                    //        for a given destination transport address.
1596                    //   C5)  Karn's algorithm: RTT measurements MUST NOT be made using
1597                    //        packets that were retransmitted (and thus for which it is
1598                    //        ambiguous whether the reply was for the first instance of the
1599                    //        chunk or for a later instance)
1600                    if c.nsent == 1 && sna32gte(c.tsn, self.min_tsn2measure_rtt) {
1601                        self.min_tsn2measure_rtt = self.my_next_tsn;
1602                        if let Some(since) = &c.since {
1603                            let rtt = now.duration_since(*since);
1604                            let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1605                            trace!(
1606                                "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1607                                self.side,
1608                                rtt.as_millis(),
1609                                srtt,
1610                                self.rto_mgr.get_rto()
1611                            );
1612                        } else {
1613                            error!("[{}] invalid c.since", self.side);
1614                        }
1615                    }
1616                }
1617
1618                if self.in_fast_recovery && c.tsn == self.fast_recover_exit_point {
1619                    debug!("[{}] exit fast-recovery", self.side);
1620                    self.in_fast_recovery = false;
1621                }
1622            } else {
1623                return Err(Error::ErrInflightQueueTsnPop);
1624            }
1625
1626            i += 1;
1627        }
1628
1629        let mut htna = d.cumulative_tsn_ack;
1630
1631        // Mark selectively acknowledged chunks as "acked"
1632        for g in &d.gap_ack_blocks {
1633            for i in g.start..=g.end {
1634                let tsn = d.cumulative_tsn_ack + i as u32;
1635
1636                let (is_existed, is_acked) = if let Some(c) = self.inflight_queue.get(tsn) {
1637                    (true, c.acked)
1638                } else {
1639                    (false, false)
1640                };
1641                let n_bytes_acked = if is_existed && !is_acked {
1642                    self.inflight_queue.mark_as_acked(tsn) as i64
1643                } else {
1644                    0
1645                };
1646
1647                if let Some(c) = self.inflight_queue.get(tsn) {
1648                    if !is_acked {
1649                        // Sum the number of bytes acknowledged per stream
1650                        if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1651                            *amount += n_bytes_acked;
1652                        } else {
1653                            bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1654                        }
1655
1656                        trace!("[{}] tsn={} has been sacked", self.side, c.tsn);
1657
1658                        if c.nsent == 1 {
1659                            self.min_tsn2measure_rtt = self.my_next_tsn;
1660                            if let Some(since) = &c.since {
1661                                let rtt = now.duration_since(*since);
1662                                let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1663                                trace!(
1664                                    "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1665                                    self.side,
1666                                    rtt.as_millis(),
1667                                    srtt,
1668                                    self.rto_mgr.get_rto()
1669                                );
1670                            } else {
1671                                error!("[{}] invalid c.since", self.side);
1672                            }
1673                        }
1674
1675                        if sna32lt(htna, tsn) {
1676                            htna = tsn;
1677                        }
1678                    }
1679                } else {
1680                    return Err(Error::ErrTsnRequestNotExist);
1681                }
1682            }
1683        }
1684
1685        Ok((bytes_acked_per_stream, htna))
1686    }
1687
1688    fn on_cumulative_tsn_ack_point_advanced(&mut self, total_bytes_acked: i64, now: Instant) {
1689        // RFC 4096, sec 6.3.2.  Retransmission Timer Rules
1690        //   R2)  Whenever all outstanding data sent to an address have been
1691        //        acknowledged, turn off the T3-rtx timer of that address.
1692        if self.inflight_queue.is_empty() {
1693            trace!(
1694                "[{}] SACK: no more packet in-flight (pending={})",
1695                self.side,
1696                self.pending_queue.len()
1697            );
1698            self.timers.stop(Timer::T3RTX);
1699        } else {
1700            trace!("[{}] T3-rtx timer start (pt2)", self.side);
1701            self.timers
1702                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1703        }
1704
1705        // Update congestion control parameters
1706        if self.cwnd <= self.ssthresh {
1707            // RFC 4096, sec 7.2.1.  Slow-Start
1708            //   o  When cwnd is less than or equal to ssthresh, an SCTP endpoint MUST
1709            //		use the slow-start algorithm to increase cwnd only if the current
1710            //      congestion window is being fully utilized, an incoming SACK
1711            //      advances the Cumulative TSN Ack Point, and the data sender is not
1712            //      in Fast Recovery.  Only when these three conditions are met can
1713            //      the cwnd be increased; otherwise, the cwnd MUST not be increased.
1714            //		If these conditions are met, then cwnd MUST be increased by, at
1715            //      most, the lesser of 1) the total size of the previously
1716            //      outstanding DATA chunk(s) acknowledged, and 2) the destination's
1717            //      path MTU.
1718            if !self.in_fast_recovery && !self.pending_queue.is_empty() {
1719                self.cwnd += std::cmp::min(total_bytes_acked as u32, self.cwnd); // TCP way
1720                                                                                 // self.cwnd += min32(uint32(total_bytes_acked), self.mtu) // SCTP way (slow)
1721                trace!(
1722                    "[{}] updated cwnd={} ssthresh={} acked={} (SS)",
1723                    self.side,
1724                    self.cwnd,
1725                    self.ssthresh,
1726                    total_bytes_acked
1727                );
1728            } else {
1729                trace!(
1730                    "[{}] cwnd did not grow: cwnd={} ssthresh={} acked={} FR={} pending={}",
1731                    self.side,
1732                    self.cwnd,
1733                    self.ssthresh,
1734                    total_bytes_acked,
1735                    self.in_fast_recovery,
1736                    self.pending_queue.len()
1737                );
1738            }
1739        } else {
1740            // RFC 4096, sec 7.2.2.  Congestion Avoidance
1741            //   o  Whenever cwnd is greater than ssthresh, upon each SACK arrival
1742            //      that advances the Cumulative TSN Ack Point, increase
1743            //      partial_bytes_acked by the total number of bytes of all new chunks
1744            //      acknowledged in that SACK including chunks acknowledged by the new
1745            //      Cumulative TSN Ack and by Gap Ack Blocks.
1746            self.partial_bytes_acked += total_bytes_acked as u32;
1747
1748            //   o  When partial_bytes_acked is equal to or greater than cwnd and
1749            //      before the arrival of the SACK the sender had cwnd or more bytes
1750            //      of data outstanding (i.e., before arrival of the SACK, flight size
1751            //      was greater than or equal to cwnd), increase cwnd by MTU, and
1752            //      reset partial_bytes_acked to (partial_bytes_acked - cwnd).
1753            if self.partial_bytes_acked >= self.cwnd && !self.pending_queue.is_empty() {
1754                self.partial_bytes_acked -= self.cwnd;
1755                self.cwnd += self.mtu;
1756                trace!(
1757                    "[{}] updated cwnd={} ssthresh={} acked={} (CA)",
1758                    self.side,
1759                    self.cwnd,
1760                    self.ssthresh,
1761                    total_bytes_acked
1762                );
1763            }
1764        }
1765    }
1766
1767    fn process_fast_retransmission(
1768        &mut self,
1769        cum_tsn_ack_point: u32,
1770        htna: u32,
1771        cum_tsn_ack_point_advanced: bool,
1772    ) -> Result<()> {
1773        // HTNA algorithm - RFC 4960 Sec 7.2.4
1774        // Increment missIndicator of each chunks that the SACK reported missing
1775        // when either of the following is met:
1776        // a)  Not in fast-recovery
1777        //     miss indications are incremented only for missing TSNs prior to the
1778        //     highest TSN newly acknowledged in the SACK.
1779        // b)  In fast-recovery AND the Cumulative TSN Ack Point advanced
1780        //     the miss indications are incremented for all TSNs reported missing
1781        //     in the SACK.
1782        if !self.in_fast_recovery || cum_tsn_ack_point_advanced {
1783            let max_tsn = if !self.in_fast_recovery {
1784                // a) increment only for missing TSNs prior to the HTNA
1785                htna
1786            } else {
1787                // b) increment for all TSNs reported missing
1788                cum_tsn_ack_point + (self.inflight_queue.len() as u32) + 1
1789            };
1790
1791            let mut tsn = cum_tsn_ack_point + 1;
1792            while sna32lt(tsn, max_tsn) {
1793                if let Some(c) = self.inflight_queue.get_mut(tsn) {
1794                    if !c.acked && !c.abandoned() && c.miss_indicator < 3 {
1795                        c.miss_indicator += 1;
1796                        if c.miss_indicator == 3 && !self.in_fast_recovery {
1797                            // 2)  If not in Fast Recovery, adjust the ssthresh and cwnd of the
1798                            //     destination address(es) to which the missing DATA chunks were
1799                            //     last sent, according to the formula described in Section 7.2.3.
1800                            self.in_fast_recovery = true;
1801                            self.fast_recover_exit_point = htna;
1802                            self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
1803                            self.cwnd = self.ssthresh;
1804                            self.partial_bytes_acked = 0;
1805                            self.will_retransmit_fast = true;
1806
1807                            trace!(
1808                                "[{}] updated cwnd={} ssthresh={} inflight={} (FR)",
1809                                self.side,
1810                                self.cwnd,
1811                                self.ssthresh,
1812                                self.inflight_queue.get_num_bytes()
1813                            );
1814                        }
1815                    }
1816                } else {
1817                    return Err(Error::ErrTsnRequestNotExist);
1818                }
1819
1820                tsn += 1;
1821            }
1822        }
1823
1824        if self.in_fast_recovery && cum_tsn_ack_point_advanced {
1825            self.will_retransmit_fast = true;
1826        }
1827
1828        Ok(())
1829    }
1830
1831    /// The caller must hold the lock. This method was only added because the
1832    /// linter was complaining about the "cognitive complexity" of handle_sack.
1833    fn postprocess_sack(
1834        &mut self,
1835        state: AssociationState,
1836        mut should_awake_write_loop: bool,
1837        now: Instant,
1838    ) {
1839        if !self.inflight_queue.is_empty() {
1840            // Start timer. (noop if already started)
1841            trace!("[{}] T3-rtx timer start (pt3)", self.side);
1842            self.timers
1843                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1844        } else if state == AssociationState::ShutdownPending {
1845            // No more outstanding, send shutdown.
1846            should_awake_write_loop = true;
1847            self.will_send_shutdown = true;
1848            self.set_state(AssociationState::ShutdownSent);
1849        } else if state == AssociationState::ShutdownReceived {
1850            // No more outstanding, send shutdown ack.
1851            should_awake_write_loop = true;
1852            self.will_send_shutdown_ack = true;
1853            self.set_state(AssociationState::ShutdownAckSent);
1854        }
1855
1856        if should_awake_write_loop {
1857            self.awake_write_loop();
1858        }
1859    }
1860
1861    fn reset_streams_if_any(
1862        &mut self,
1863        p: &ParamOutgoingResetRequest,
1864        respond: bool,
1865        reply: &mut Vec<Packet>,
1866    ) -> Result<()> {
1867        let mut result = ReconfigResult::SuccessPerformed;
1868        let mut sis_to_reset = vec![];
1869
1870        if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
1871            debug!(
1872                "[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
1873                self.side, p.sender_last_tsn, self.peer_last_tsn
1874            );
1875            for id in &p.stream_identifiers {
1876                if self.streams.contains_key(id) {
1877                    if respond {
1878                        sis_to_reset.push(*id);
1879                    }
1880                    self.unregister_stream(*id);
1881                }
1882            }
1883            self.reconfig_requests
1884                .remove(&p.reconfig_request_sequence_number);
1885        } else {
1886            debug!(
1887                "[{}] resetStream(): senderLastTSN={} > peer_last_tsn={}",
1888                self.side, p.sender_last_tsn, self.peer_last_tsn
1889            );
1890            result = ReconfigResult::InProgress;
1891        }
1892
1893        // Answer incoming reset requests with the same reset request, but with
1894        // reconfig_response_sequence_number.
1895        if !sis_to_reset.is_empty() {
1896            let rsn = self.generate_next_rsn();
1897            let tsn = self.my_next_tsn - 1;
1898
1899            let c = ChunkReconfig {
1900                param_a: Some(Box::new(ParamOutgoingResetRequest {
1901                    reconfig_request_sequence_number: rsn,
1902                    reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1903                    sender_last_tsn: tsn,
1904                    stream_identifiers: sis_to_reset,
1905                })),
1906                ..Default::default()
1907            };
1908
1909            self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
1910
1911            let p = self.create_packet(vec![Box::new(c)]);
1912            reply.push(p);
1913        }
1914
1915        let packet = self.create_packet(vec![Box::new(ChunkReconfig {
1916            param_a: Some(Box::new(ParamReconfigResponse {
1917                reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1918                result,
1919            })),
1920            param_b: None,
1921        })]);
1922
1923        debug!("[{}] RESET RESPONSE: {}", self.side, packet);
1924
1925        reply.push(packet);
1926
1927        Ok(())
1928    }
1929
1930    /// create_packet wraps chunks in a packet.
1931    /// The caller should hold the read lock.
1932    pub(crate) fn create_packet(&self, chunks: Vec<Box<dyn Chunk + Send + Sync>>) -> Packet {
1933        Packet {
1934            common_header: CommonHeader {
1935                verification_tag: self.peer_verification_tag,
1936                source_port: self.source_port,
1937                destination_port: self.destination_port,
1938            },
1939            chunks,
1940        }
1941    }
1942
1943    /// create_stream creates a stream. The caller should hold the lock and check no stream exists for this id.
1944    fn create_stream(
1945        &mut self,
1946        stream_identifier: StreamId,
1947        accept: bool,
1948        default_payload_type: PayloadProtocolIdentifier,
1949    ) -> Option<Stream<'_>> {
1950        let s = StreamState::new(
1951            self.side,
1952            stream_identifier,
1953            self.max_payload_size,
1954            default_payload_type,
1955        );
1956
1957        if accept {
1958            self.stream_queue.push_back(stream_identifier);
1959            self.events.push_back(Event::Stream(StreamEvent::Opened));
1960        }
1961
1962        self.streams.insert(stream_identifier, s);
1963
1964        Some(Stream {
1965            stream_identifier,
1966            association: self,
1967        })
1968    }
1969
1970    /// get_or_create_stream gets or creates a stream. The caller should hold the lock.
1971    fn get_or_create_stream(&mut self, stream_identifier: StreamId) -> Option<Stream<'_>> {
1972        if self.streams.contains_key(&stream_identifier) {
1973            Some(Stream {
1974                stream_identifier,
1975                association: self,
1976            })
1977        } else {
1978            self.create_stream(
1979                stream_identifier,
1980                true,
1981                PayloadProtocolIdentifier::default(),
1982            )
1983        }
1984    }
1985
1986    pub(crate) fn get_my_receiver_window_credit(&self) -> u32 {
1987        let mut bytes_queued = 0;
1988        for s in self.streams.values() {
1989            bytes_queued += s.get_num_bytes_in_reassembly_queue() as u32;
1990        }
1991
1992        self.max_receive_buffer_size.saturating_sub(bytes_queued)
1993    }
1994
1995    /// gather_outbound gathers outgoing packets. The returned bool value set to
1996    /// false means the association should be closed down after the final send.
1997    fn gather_outbound(&mut self, now: Instant) -> (Vec<Bytes>, bool) {
1998        let mut raw_packets = vec![];
1999
2000        if !self.control_queue.is_empty() {
2001            for p in self.control_queue.drain(..) {
2002                if let Ok(raw) = p.marshal() {
2003                    raw_packets.push(raw);
2004                } else {
2005                    warn!("[{}] failed to serialize a control packet", self.side);
2006                    continue;
2007                }
2008            }
2009        }
2010
2011        let state = self.state();
2012        match state {
2013            AssociationState::Established => {
2014                raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2015                raw_packets = self.gather_outbound_data_and_reconfig_packets(raw_packets, now);
2016                raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2017                raw_packets = self.gather_outbound_sack_packets(raw_packets);
2018                raw_packets = self.gather_outbound_forward_tsn_packets(raw_packets);
2019                (raw_packets, true)
2020            }
2021            AssociationState::ShutdownPending
2022            | AssociationState::ShutdownSent
2023            | AssociationState::ShutdownReceived => {
2024                raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2025                raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2026                raw_packets = self.gather_outbound_sack_packets(raw_packets);
2027                self.gather_outbound_shutdown_packets(raw_packets, now)
2028            }
2029            AssociationState::ShutdownAckSent => {
2030                self.gather_outbound_shutdown_packets(raw_packets, now)
2031            }
2032            _ => (raw_packets, true),
2033        }
2034    }
2035
2036    fn gather_data_packets_to_retransmit(
2037        &mut self,
2038        mut raw_packets: Vec<Bytes>,
2039        now: Instant,
2040    ) -> Vec<Bytes> {
2041        for p in &self.get_data_packets_to_retransmit(now) {
2042            if let Ok(raw) = p.marshal() {
2043                raw_packets.push(raw);
2044            } else {
2045                warn!(
2046                    "[{}] failed to serialize a DATA packet to be retransmitted",
2047                    self.side
2048                );
2049            }
2050        }
2051
2052        raw_packets
2053    }
2054
2055    fn gather_outbound_data_and_reconfig_packets(
2056        &mut self,
2057        mut raw_packets: Vec<Bytes>,
2058        now: Instant,
2059    ) -> Vec<Bytes> {
2060        // Pop unsent data chunks from the pending queue to send as much as
2061        // cwnd and rwnd allow.
2062        let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send(now);
2063        if !chunks.is_empty() {
2064            // Start timer. (noop if already started)
2065            trace!("[{}] T3-rtx timer start (pt1)", self.side);
2066            self.timers
2067                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
2068
2069            for p in &self.bundle_data_chunks_into_packets(chunks) {
2070                if let Ok(raw) = p.marshal() {
2071                    raw_packets.push(raw);
2072                } else {
2073                    warn!("[{}] failed to serialize a DATA packet", self.side);
2074                }
2075            }
2076        }
2077
2078        if !sis_to_reset.is_empty() || self.will_retransmit_reconfig {
2079            if self.will_retransmit_reconfig {
2080                self.will_retransmit_reconfig = false;
2081                debug!(
2082                    "[{}] retransmit {} RECONFIG chunk(s)",
2083                    self.side,
2084                    self.reconfigs.len()
2085                );
2086                for c in self.reconfigs.values() {
2087                    let p = self.create_packet(vec![Box::new(c.clone())]);
2088                    if let Ok(raw) = p.marshal() {
2089                        raw_packets.push(raw);
2090                    } else {
2091                        warn!(
2092                            "[{}] failed to serialize a RECONFIG packet to be retransmitted",
2093                            self.side,
2094                        );
2095                    }
2096                }
2097            }
2098
2099            if !sis_to_reset.is_empty() {
2100                let rsn = self.generate_next_rsn();
2101                let tsn = self.my_next_tsn - 1;
2102                debug!(
2103                    "[{}] sending RECONFIG: rsn={} tsn={} streams={:?}",
2104                    self.side,
2105                    rsn,
2106                    self.my_next_tsn - 1,
2107                    sis_to_reset
2108                );
2109
2110                let c = ChunkReconfig {
2111                    param_a: Some(Box::new(ParamOutgoingResetRequest {
2112                        reconfig_request_sequence_number: rsn,
2113                        sender_last_tsn: tsn,
2114                        stream_identifiers: sis_to_reset,
2115                        ..Default::default()
2116                    })),
2117                    ..Default::default()
2118                };
2119                self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
2120
2121                let p = self.create_packet(vec![Box::new(c)]);
2122                if let Ok(raw) = p.marshal() {
2123                    raw_packets.push(raw);
2124                } else {
2125                    warn!(
2126                        "[{}] failed to serialize a RECONFIG packet to be transmitted",
2127                        self.side
2128                    );
2129                }
2130            }
2131
2132            if !self.reconfigs.is_empty() {
2133                self.timers
2134                    .start(Timer::Reconfig, now, self.rto_mgr.get_rto());
2135            }
2136        }
2137
2138        raw_packets
2139    }
2140
2141    fn gather_outbound_fast_retransmission_packets(
2142        &mut self,
2143        mut raw_packets: Vec<Bytes>,
2144        now: Instant,
2145    ) -> Vec<Bytes> {
2146        if self.will_retransmit_fast {
2147            self.will_retransmit_fast = false;
2148
2149            let mut to_fast_retrans: Vec<Box<dyn Chunk + Send + Sync>> = vec![];
2150            let mut fast_retrans_size = COMMON_HEADER_SIZE;
2151
2152            let mut i = 0;
2153            loop {
2154                let tsn = self.cumulative_tsn_ack_point + i + 1;
2155                if let Some(c) = self.inflight_queue.get_mut(tsn) {
2156                    if c.acked || c.abandoned() || c.nsent > 1 || c.miss_indicator < 3 {
2157                        i += 1;
2158                        continue;
2159                    }
2160
2161                    // RFC 4960 Sec 7.2.4 Fast Retransmit on Gap Reports
2162                    //  3)  Determine how many of the earliest (i.e., lowest TSN) DATA chunks
2163                    //      marked for retransmission will fit into a single packet, subject
2164                    //      to constraint of the path MTU of the destination transport
2165                    //      address to which the packet is being sent.  Call this value K.
2166                    //      Retransmit those K DATA chunks in a single packet.  When a Fast
2167                    //      Retransmit is being performed, the sender SHOULD ignore the value
2168                    //      of cwnd and SHOULD NOT delay retransmission for this single
2169                    //		packet.
2170
2171                    let data_chunk_size = DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2172                    if self.mtu < fast_retrans_size + data_chunk_size {
2173                        break;
2174                    }
2175
2176                    fast_retrans_size += data_chunk_size;
2177                    self.stats.inc_fast_retrans();
2178                    c.nsent += 1;
2179                } else {
2180                    break; // end of pending data
2181                }
2182
2183                if let Some(c) = self.inflight_queue.get_mut(tsn) {
2184                    Association::check_partial_reliability_status(
2185                        c,
2186                        now,
2187                        self.use_forward_tsn,
2188                        self.side,
2189                        &self.streams,
2190                    );
2191                    to_fast_retrans.push(Box::new(c.clone()));
2192                    trace!(
2193                        "[{}] fast-retransmit: tsn={} sent={} htna={}",
2194                        self.side,
2195                        c.tsn,
2196                        c.nsent,
2197                        self.fast_recover_exit_point
2198                    );
2199                }
2200                i += 1;
2201            }
2202
2203            if !to_fast_retrans.is_empty() {
2204                if let Ok(raw) = self.create_packet(to_fast_retrans).marshal() {
2205                    raw_packets.push(raw);
2206                } else {
2207                    warn!(
2208                        "[{}] failed to serialize a DATA packet to be fast-retransmitted",
2209                        self.side
2210                    );
2211                }
2212            }
2213        }
2214
2215        raw_packets
2216    }
2217
2218    fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2219        if self.ack_state == AckState::Immediate {
2220            self.ack_state = AckState::Idle;
2221            let sack = self.create_selective_ack_chunk();
2222            trace!("[{}] sending SACK: {}", self.side, sack);
2223            if let Ok(raw) = self.create_packet(vec![Box::new(sack)]).marshal() {
2224                raw_packets.push(raw);
2225            } else {
2226                warn!("[{}] failed to serialize a SACK packet", self.side);
2227            }
2228        }
2229
2230        raw_packets
2231    }
2232
2233    fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2234        /*log::debug!(
2235            "[{}] gatherOutboundForwardTSNPackets {}",
2236            self.name,
2237            self.will_send_forward_tsn
2238        );*/
2239        if self.will_send_forward_tsn {
2240            self.will_send_forward_tsn = false;
2241            if sna32gt(
2242                self.advanced_peer_tsn_ack_point,
2243                self.cumulative_tsn_ack_point,
2244            ) {
2245                let fwd_tsn = self.create_forward_tsn();
2246                if let Ok(raw) = self.create_packet(vec![Box::new(fwd_tsn)]).marshal() {
2247                    raw_packets.push(raw);
2248                } else {
2249                    warn!("[{}] failed to serialize a Forward TSN packet", self.side);
2250                }
2251            }
2252        }
2253
2254        raw_packets
2255    }
2256
2257    fn gather_outbound_shutdown_packets(
2258        &mut self,
2259        mut raw_packets: Vec<Bytes>,
2260        now: Instant,
2261    ) -> (Vec<Bytes>, bool) {
2262        let mut ok = true;
2263
2264        if self.will_send_shutdown {
2265            self.will_send_shutdown = false;
2266
2267            let shutdown = ChunkShutdown {
2268                cumulative_tsn_ack: self.cumulative_tsn_ack_point,
2269            };
2270
2271            if let Ok(raw) = self.create_packet(vec![Box::new(shutdown)]).marshal() {
2272                self.timers
2273                    .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2274                raw_packets.push(raw);
2275            } else {
2276                warn!("[{}] failed to serialize a Shutdown packet", self.side);
2277            }
2278        } else if self.will_send_shutdown_ack {
2279            self.will_send_shutdown_ack = false;
2280
2281            let shutdown_ack = ChunkShutdownAck {};
2282
2283            if let Ok(raw) = self.create_packet(vec![Box::new(shutdown_ack)]).marshal() {
2284                self.timers
2285                    .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2286                raw_packets.push(raw);
2287            } else {
2288                warn!("[{}] failed to serialize a ShutdownAck packet", self.side);
2289            }
2290        } else if self.will_send_shutdown_complete {
2291            self.will_send_shutdown_complete = false;
2292
2293            let shutdown_complete = ChunkShutdownComplete {};
2294
2295            if let Ok(raw) = self
2296                .create_packet(vec![Box::new(shutdown_complete)])
2297                .marshal()
2298            {
2299                raw_packets.push(raw);
2300                ok = false;
2301            } else {
2302                warn!(
2303                    "[{}] failed to serialize a ShutdownComplete packet",
2304                    self.side
2305                );
2306            }
2307        }
2308
2309        (raw_packets, ok)
2310    }
2311
2312    /// get_data_packets_to_retransmit is called when T3-rtx is timed out and retransmit outstanding data chunks
2313    /// that are not acked or abandoned yet.
2314    fn get_data_packets_to_retransmit(&mut self, now: Instant) -> Vec<Packet> {
2315        let awnd = std::cmp::min(self.cwnd, self.rwnd);
2316        let mut chunks = vec![];
2317        let mut bytes_to_send = 0;
2318        let mut done = false;
2319        let mut i = 0;
2320        while !done {
2321            let tsn = self.cumulative_tsn_ack_point + i + 1;
2322            if let Some(c) = self.inflight_queue.get_mut(tsn) {
2323                if !c.retransmit {
2324                    i += 1;
2325                    continue;
2326                }
2327
2328                if i == 0 && self.rwnd < c.user_data.len() as u32 {
2329                    // Send it as a zero window probe
2330                    done = true;
2331                } else if bytes_to_send + c.user_data.len() > awnd as usize {
2332                    break;
2333                }
2334
2335                // reset the retransmit flag not to retransmit again before the next
2336                // t3-rtx timer fires
2337                c.retransmit = false;
2338                bytes_to_send += c.user_data.len();
2339
2340                c.nsent += 1;
2341            } else {
2342                break; // end of pending data
2343            }
2344
2345            if let Some(c) = self.inflight_queue.get_mut(tsn) {
2346                Association::check_partial_reliability_status(
2347                    c,
2348                    now,
2349                    self.use_forward_tsn,
2350                    self.side,
2351                    &self.streams,
2352                );
2353
2354                trace!(
2355                    "[{}] retransmitting tsn={} ssn={} sent={}",
2356                    self.side,
2357                    c.tsn,
2358                    c.stream_sequence_number,
2359                    c.nsent
2360                );
2361
2362                chunks.push(c.clone());
2363            }
2364            i += 1;
2365        }
2366
2367        self.bundle_data_chunks_into_packets(chunks)
2368    }
2369
2370    /// pop_pending_data_chunks_to_send pops chunks from the pending queues as many as
2371    /// the cwnd and rwnd allows to send.
2372    fn pop_pending_data_chunks_to_send(
2373        &mut self,
2374        now: Instant,
2375    ) -> (Vec<ChunkPayloadData>, Vec<u16>) {
2376        let mut chunks = vec![];
2377        let mut sis_to_reset = vec![]; // stream identifiers to reset
2378        if !self.pending_queue.is_empty() {
2379            // RFC 4960 sec 6.1.  Transmission of DATA Chunks
2380            //   A) At any given time, the data sender MUST NOT transmit new data to
2381            //      any destination transport address if its peer's rwnd indicates
2382            //      that the peer has no buffer space (i.e., rwnd is 0; see Section
2383            //      6.2.1).  However, regardless of the value of rwnd (including if it
2384            //      is 0), the data sender can always have one DATA chunk in flight to
2385            //      the receiver if allowed by cwnd (see rule B, below).
2386
2387            while let Some(c) = self.pending_queue.peek() {
2388                let (beginning_fragment, unordered, data_len, stream_identifier) = (
2389                    c.beginning_fragment,
2390                    c.unordered,
2391                    c.user_data.len(),
2392                    c.stream_identifier,
2393                );
2394
2395                if data_len == 0 {
2396                    sis_to_reset.push(stream_identifier);
2397                    if self
2398                        .pending_queue
2399                        .pop(beginning_fragment, unordered)
2400                        .is_none()
2401                    {
2402                        error!("[{}] failed to pop from pending queue", self.side);
2403                    }
2404                    continue;
2405                }
2406
2407                if self.inflight_queue.get_num_bytes() + data_len > self.cwnd as usize {
2408                    break; // would exceeds cwnd
2409                }
2410
2411                if data_len > self.rwnd as usize {
2412                    break; // no more rwnd
2413                }
2414
2415                self.rwnd -= data_len as u32;
2416
2417                if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2418                    beginning_fragment,
2419                    unordered,
2420                    now,
2421                ) {
2422                    chunks.push(chunk);
2423                }
2424            }
2425
2426            // the data sender can always have one DATA chunk in flight to the receiver
2427            if chunks.is_empty() && self.inflight_queue.is_empty() {
2428                // Send zero window probe
2429                if let Some(c) = self.pending_queue.peek() {
2430                    let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
2431
2432                    if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2433                        beginning_fragment,
2434                        unordered,
2435                        now,
2436                    ) {
2437                        chunks.push(chunk);
2438                    }
2439                }
2440            }
2441        }
2442
2443        (chunks, sis_to_reset)
2444    }
2445
2446    /// bundle_data_chunks_into_packets packs DATA chunks into packets. It tries to bundle
2447    /// DATA chunks into a packet so long as the resulting packet size does not exceed
2448    /// the path MTU.
2449    fn bundle_data_chunks_into_packets(&self, chunks: Vec<ChunkPayloadData>) -> Vec<Packet> {
2450        let mut packets = vec![];
2451        let mut chunks_to_send = vec![];
2452        let mut bytes_in_packet = COMMON_HEADER_SIZE;
2453
2454        for c in chunks {
2455            // RFC 4960 sec 6.1.  Transmission of DATA Chunks
2456            //   Multiple DATA chunks committed for transmission MAY be bundled in a
2457            //   single packet.  Furthermore, DATA chunks being retransmitted MAY be
2458            //   bundled with new DATA chunks, as long as the resulting packet size
2459            //   does not exceed the path MTU.
2460            if bytes_in_packet + c.user_data.len() as u32 > self.mtu {
2461                packets.push(self.create_packet(chunks_to_send));
2462                chunks_to_send = vec![];
2463                bytes_in_packet = COMMON_HEADER_SIZE;
2464            }
2465
2466            bytes_in_packet += DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2467            chunks_to_send.push(Box::new(c));
2468        }
2469
2470        if !chunks_to_send.is_empty() {
2471            packets.push(self.create_packet(chunks_to_send));
2472        }
2473
2474        packets
2475    }
2476
2477    /// generate_next_tsn returns the my_next_tsn and increases it. The caller should hold the lock.
2478    fn generate_next_tsn(&mut self) -> u32 {
2479        let tsn = self.my_next_tsn;
2480        self.my_next_tsn += 1;
2481        tsn
2482    }
2483
2484    /// generate_next_rsn returns the my_next_rsn and increases it. The caller should hold the lock.
2485    fn generate_next_rsn(&mut self) -> u32 {
2486        let rsn = self.my_next_rsn;
2487        self.my_next_rsn += 1;
2488        rsn
2489    }
2490
2491    fn check_partial_reliability_status(
2492        c: &mut ChunkPayloadData,
2493        now: Instant,
2494        use_forward_tsn: bool,
2495        side: Side,
2496        streams: &FxHashMap<u16, StreamState>,
2497    ) {
2498        if !use_forward_tsn {
2499            return;
2500        }
2501
2502        // draft-ietf-rtcweb-data-protocol-09.txt section 6
2503        //	6.  Procedures
2504        //		All Data Channel Establishment Protocol messages MUST be sent using
2505        //		ordered delivery and reliable transmission.
2506        //
2507        if c.payload_type == PayloadProtocolIdentifier::Dcep {
2508            return;
2509        }
2510
2511        // PR-SCTP
2512        if let Some(s) = streams.get(&c.stream_identifier) {
2513            let reliability_type: ReliabilityType = s.reliability_type;
2514            let reliability_value = s.reliability_value;
2515
2516            if reliability_type == ReliabilityType::Rexmit {
2517                if c.nsent >= reliability_value {
2518                    c.set_abandoned(true);
2519                    trace!(
2520                        "[{}] marked as abandoned: tsn={} ppi={} (remix: {})",
2521                        side,
2522                        c.tsn,
2523                        c.payload_type,
2524                        c.nsent
2525                    );
2526                }
2527            } else if reliability_type == ReliabilityType::Timed {
2528                if let Some(since) = &c.since {
2529                    let elapsed = now.duration_since(*since);
2530                    if elapsed.as_millis() as u32 >= reliability_value {
2531                        c.set_abandoned(true);
2532                        trace!(
2533                            "[{}] marked as abandoned: tsn={} ppi={} (timed: {:?})",
2534                            side,
2535                            c.tsn,
2536                            c.payload_type,
2537                            elapsed
2538                        );
2539                    }
2540                } else {
2541                    error!("[{}] invalid c.since", side);
2542                }
2543            }
2544        } else {
2545            error!("[{}] stream {} not found)", side, c.stream_identifier);
2546        }
2547    }
2548
2549    fn create_selective_ack_chunk(&mut self) -> ChunkSelectiveAck {
2550        ChunkSelectiveAck {
2551            cumulative_tsn_ack: self.peer_last_tsn,
2552            advertised_receiver_window_credit: self.get_my_receiver_window_credit(),
2553            gap_ack_blocks: self.payload_queue.get_gap_ack_blocks(self.peer_last_tsn),
2554            duplicate_tsn: self.payload_queue.pop_duplicates(),
2555        }
2556    }
2557
2558    /// create_forward_tsn generates ForwardTSN chunk.
2559    /// This method will be be called if use_forward_tsn is set to false.
2560    fn create_forward_tsn(&self) -> ChunkForwardTsn {
2561        // RFC 3758 Sec 3.5 C4
2562        let mut stream_map: HashMap<u16, u16> = HashMap::new(); // to report only once per SI
2563        let mut i = self.cumulative_tsn_ack_point + 1;
2564        while sna32lte(i, self.advanced_peer_tsn_ack_point) {
2565            if let Some(c) = self.inflight_queue.get(i) {
2566                if let Some(ssn) = stream_map.get(&c.stream_identifier) {
2567                    if sna16lt(*ssn, c.stream_sequence_number) {
2568                        // to report only once with greatest SSN
2569                        stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2570                    }
2571                } else {
2572                    stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2573                }
2574            } else {
2575                break;
2576            }
2577
2578            i += 1;
2579        }
2580
2581        let mut fwd_tsn = ChunkForwardTsn {
2582            new_cumulative_tsn: self.advanced_peer_tsn_ack_point,
2583            streams: vec![],
2584        };
2585
2586        let mut stream_str = String::new();
2587        for (si, ssn) in &stream_map {
2588            stream_str += format!("(si={} ssn={})", si, ssn).as_str();
2589            fwd_tsn.streams.push(ChunkForwardTsnStream {
2590                identifier: *si,
2591                sequence: *ssn,
2592            });
2593        }
2594        trace!(
2595            "[{}] building fwd_tsn: newCumulativeTSN={} cumTSN={} - {}",
2596            self.side,
2597            fwd_tsn.new_cumulative_tsn,
2598            self.cumulative_tsn_ack_point,
2599            stream_str
2600        );
2601
2602        fwd_tsn
2603    }
2604
2605    /// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.
2606    fn move_pending_data_chunk_to_inflight_queue(
2607        &mut self,
2608        beginning_fragment: bool,
2609        unordered: bool,
2610        now: Instant,
2611    ) -> Option<ChunkPayloadData> {
2612        if let Some(mut c) = self.pending_queue.pop(beginning_fragment, unordered) {
2613            // Mark all fragements are in-flight now
2614            if c.ending_fragment {
2615                c.set_all_inflight();
2616            }
2617
2618            // Assign TSN
2619            c.tsn = self.generate_next_tsn();
2620
2621            c.since = Some(now); // use to calculate RTT and also for maxPacketLifeTime
2622            c.nsent = 1; // being sent for the first time
2623
2624            Association::check_partial_reliability_status(
2625                &mut c,
2626                now,
2627                self.use_forward_tsn,
2628                self.side,
2629                &self.streams,
2630            );
2631
2632            trace!(
2633                "[{}] sending ppi={} tsn={} ssn={} sent={} len={} ({},{})",
2634                self.side,
2635                c.payload_type as u32,
2636                c.tsn,
2637                c.stream_sequence_number,
2638                c.nsent,
2639                c.user_data.len(),
2640                c.beginning_fragment,
2641                c.ending_fragment
2642            );
2643
2644            self.inflight_queue.push_no_check(c.clone());
2645
2646            Some(c)
2647        } else {
2648            error!("[{}] failed to pop from pending queue", self.side);
2649            None
2650        }
2651    }
2652
2653    pub(crate) fn send_reset_request(&mut self, stream_identifier: StreamId) -> Result<()> {
2654        let state = self.state();
2655        if state != AssociationState::Established {
2656            return Err(Error::ErrResetPacketInStateNotExist);
2657        }
2658
2659        // Create DATA chunk which only contains valid stream identifier with
2660        // nil userData and use it as a EOS from the stream.
2661        let c = ChunkPayloadData {
2662            stream_identifier,
2663            beginning_fragment: true,
2664            ending_fragment: true,
2665            user_data: Bytes::new(),
2666            ..Default::default()
2667        };
2668
2669        self.pending_queue.push(c);
2670        self.awake_write_loop();
2671
2672        Ok(())
2673    }
2674
2675    /// send_payload_data sends the data chunks.
2676    pub(crate) fn send_payload_data(&mut self, chunks: Vec<ChunkPayloadData>) -> Result<()> {
2677        let state = self.state();
2678        if state != AssociationState::Established {
2679            return Err(Error::ErrPayloadDataStateNotExist);
2680        }
2681
2682        // Push the chunks into the pending queue first.
2683        for c in chunks {
2684            self.pending_queue.push(c);
2685        }
2686
2687        self.awake_write_loop();
2688        Ok(())
2689    }
2690
2691    /// buffered_amount returns total amount (in bytes) of currently buffered user data.
2692    /// This is used only by testing.
2693    pub(crate) fn buffered_amount(&self) -> usize {
2694        self.pending_queue.get_num_bytes() + self.inflight_queue.get_num_bytes()
2695    }
2696
2697    fn awake_write_loop(&self) {
2698        // No Op on Purpose
2699    }
2700
2701    fn close_all_timers(&mut self) {
2702        // Close all retransmission & ack timers
2703        for timer in Timer::VALUES {
2704            self.timers.stop(timer);
2705        }
2706    }
2707
2708    fn on_ack_timeout(&mut self) {
2709        trace!(
2710            "[{}] ack timed out (ack_state: {})",
2711            self.side,
2712            self.ack_state
2713        );
2714        self.stats.inc_ack_timeouts();
2715        self.ack_state = AckState::Immediate;
2716        self.awake_write_loop();
2717    }
2718
2719    fn on_retransmission_timeout(&mut self, timer_id: Timer, n_rtos: usize) {
2720        match timer_id {
2721            Timer::T1Init => {
2722                if let Err(err) = self.send_init() {
2723                    debug!(
2724                        "[{}] failed to retransmit init (n_rtos={}): {:?}",
2725                        self.side, n_rtos, err
2726                    );
2727                }
2728            }
2729
2730            Timer::T1Cookie => {
2731                if let Err(err) = self.send_cookie_echo() {
2732                    debug!(
2733                        "[{}] failed to retransmit cookie-echo (n_rtos={}): {:?}",
2734                        self.side, n_rtos, err
2735                    );
2736                }
2737            }
2738
2739            Timer::T2Shutdown => {
2740                debug!(
2741                    "[{}] retransmission of shutdown timeout (n_rtos={})",
2742                    self.side, n_rtos
2743                );
2744                let state = self.state();
2745                match state {
2746                    AssociationState::ShutdownSent => {
2747                        self.will_send_shutdown = true;
2748                        self.awake_write_loop();
2749                    }
2750                    AssociationState::ShutdownAckSent => {
2751                        self.will_send_shutdown_ack = true;
2752                        self.awake_write_loop();
2753                    }
2754                    _ => {}
2755                }
2756            }
2757
2758            Timer::T3RTX => {
2759                self.stats.inc_t3timeouts();
2760
2761                // RFC 4960 sec 6.3.3
2762                //  E1)  For the destination address for which the timer expires, adjust
2763                //       its ssthresh with rules defined in Section 7.2.3 and set the
2764                //       cwnd <- MTU.
2765                // RFC 4960 sec 7.2.3
2766                //   When the T3-rtx timer expires on an address, SCTP should perform slow
2767                //   start by:
2768                //      ssthresh = max(cwnd/2, 4*MTU)
2769                //      cwnd = 1*MTU
2770
2771                self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
2772                self.cwnd = self.mtu;
2773                trace!(
2774                    "[{}] updated cwnd={} ssthresh={} inflight={} (RTO)",
2775                    self.side,
2776                    self.cwnd,
2777                    self.ssthresh,
2778                    self.inflight_queue.get_num_bytes()
2779                );
2780
2781                // RFC 3758 sec 3.5
2782                //  A5) Any time the T3-rtx timer expires, on any destination, the sender
2783                //  SHOULD try to advance the "Advanced.Peer.Ack.Point" by following
2784                //  the procedures outlined in C2 - C5.
2785                if self.use_forward_tsn {
2786                    // RFC 3758 Sec 3.5 C2
2787                    let mut i = self.advanced_peer_tsn_ack_point + 1;
2788                    while let Some(c) = self.inflight_queue.get(i) {
2789                        if !c.abandoned() {
2790                            break;
2791                        }
2792                        self.advanced_peer_tsn_ack_point = i;
2793                        i += 1;
2794                    }
2795
2796                    // RFC 3758 Sec 3.5 C3
2797                    if sna32gt(
2798                        self.advanced_peer_tsn_ack_point,
2799                        self.cumulative_tsn_ack_point,
2800                    ) {
2801                        self.will_send_forward_tsn = true;
2802                        debug!(
2803                            "[{}] on_retransmission_timeout {}: sna32GT({}, {})",
2804                            self.side,
2805                            self.will_send_forward_tsn,
2806                            self.advanced_peer_tsn_ack_point,
2807                            self.cumulative_tsn_ack_point
2808                        );
2809                    }
2810                }
2811
2812                debug!(
2813                    "[{}] T3-rtx timed out: n_rtos={} cwnd={} ssthresh={}",
2814                    self.side, n_rtos, self.cwnd, self.ssthresh
2815                );
2816
2817                self.inflight_queue.mark_all_to_retrasmit();
2818                self.awake_write_loop();
2819            }
2820
2821            Timer::Reconfig => {
2822                self.will_retransmit_reconfig = true;
2823                self.awake_write_loop();
2824            }
2825
2826            _ => {}
2827        }
2828    }
2829
2830    fn on_retransmission_failure(&mut self, id: Timer) {
2831        match id {
2832            Timer::T1Init => {
2833                error!("[{}] retransmission failure: T1-init", self.side);
2834                self.error = Some(AssociationError::HandshakeFailed(
2835                    Error::ErrHandshakeInitAck,
2836                ));
2837            }
2838
2839            Timer::T1Cookie => {
2840                error!("[{}] retransmission failure: T1-cookie", self.side);
2841                self.error = Some(AssociationError::HandshakeFailed(
2842                    Error::ErrHandshakeCookieEcho,
2843                ));
2844            }
2845
2846            Timer::T2Shutdown => {
2847                error!("[{}] retransmission failure: T2-shutdown", self.side);
2848            }
2849
2850            Timer::T3RTX => {
2851                // T3-rtx timer will not fail by design
2852                // Justifications:
2853                //  * ICE would fail if the connectivity is lost
2854                //  * WebRTC spec is not clear how this incident should be reported to ULP
2855                error!("[{}] retransmission failure: T3-rtx (DATA)", self.side);
2856            }
2857
2858            _ => {}
2859        }
2860    }
2861
2862    /// Whether no timers are running
2863    #[cfg(test)]
2864    pub(crate) fn is_idle(&self) -> bool {
2865        Timer::VALUES
2866            .iter()
2867            //.filter(|&&t| t != Timer::KeepAlive && t != Timer::PushNewCid)
2868            .filter_map(|&t| Some((t, self.timers.get(t)?)))
2869            .min_by_key(|&(_, time)| time)
2870            //.map_or(true, |(timer, _)| timer == Timer::Idle)
2871            .is_none()
2872    }
2873}