rtc_sctp/association/
mod.rs

1use crate::association::{
2    state::{AckMode, AckState, AssociationState},
3    stats::AssociationStats,
4};
5use crate::chunk::{
6    Chunk, ErrorCauseUnrecognizedChunkType, USER_INITIATED_ABORT, chunk_abort::ChunkAbort,
7    chunk_cookie_ack::ChunkCookieAck, chunk_cookie_echo::ChunkCookieEcho, chunk_error::ChunkError,
8    chunk_forward_tsn::ChunkForwardTsn, chunk_forward_tsn::ChunkForwardTsnStream,
9    chunk_heartbeat::ChunkHeartbeat, chunk_heartbeat_ack::ChunkHeartbeatAck, chunk_init::ChunkInit,
10    chunk_init::ChunkInitAck, chunk_payload_data::ChunkPayloadData,
11    chunk_payload_data::PayloadProtocolIdentifier, chunk_reconfig::ChunkReconfig,
12    chunk_selective_ack::ChunkSelectiveAck, chunk_shutdown::ChunkShutdown,
13    chunk_shutdown_ack::ChunkShutdownAck, chunk_shutdown_complete::ChunkShutdownComplete,
14    chunk_type::CT_FORWARD_TSN,
15};
16use crate::config::{COMMON_HEADER_SIZE, DATA_CHUNK_HEADER_SIZE, ServerConfig, TransportConfig};
17use crate::packet::{CommonHeader, Packet};
18use crate::param::{
19    Param,
20    param_heartbeat_info::ParamHeartbeatInfo,
21    param_outgoing_reset_request::ParamOutgoingResetRequest,
22    param_reconfig_response::{ParamReconfigResponse, ReconfigResult},
23    param_state_cookie::ParamStateCookie,
24    param_supported_extensions::ParamSupportedExtensions,
25};
26use crate::queue::{payload_queue::PayloadQueue, pending_queue::PendingQueue};
27use crate::shared::{AssociationEventInner, AssociationId, EndpointEvent, EndpointEventInner};
28use crate::util::{sna16lt, sna32gt, sna32gte, sna32lt, sna32lte};
29use crate::{AssociationEvent, Payload, Side};
30use shared::error::{Error, Result};
31use shared::{TransportContext, TransportMessage, TransportProtocol};
32use stream::{ReliabilityType, Stream, StreamEvent, StreamId, StreamState};
33use timer::{ACK_INTERVAL, RtoManager, Timer, TimerTable};
34
35use crate::association::stream::RecvSendState;
36use bytes::Bytes;
37use log::{debug, error, trace, warn};
38use rand::random;
39use std::collections::{HashMap, VecDeque};
40use std::net::SocketAddr;
41use std::str::FromStr;
42use std::sync::Arc;
43use std::time::{Duration, Instant};
44use thiserror::Error;
45
46pub(crate) mod state;
47pub(crate) mod stats;
48pub(crate) mod stream;
49pub(crate) mod timer;
50
51#[cfg(test)]
52mod association_test;
53
54/// Reasons why an association might be lost
55#[derive(Debug, Error, Clone, PartialEq)]
56pub enum AssociationError {
57    /// Handshake failed
58    #[error("handshake failed due to {0}")]
59    HandshakeFailed(String),
60    /// The peer violated the QUIC specification as understood by this implementation
61    #[error("transport error")]
62    TransportError,
63    /// The peer's QUIC stack aborted the association automatically
64    #[error("aborted by peer")]
65    AssociationClosed,
66    /// The peer closed the association
67    #[error("closed by peer")]
68    ApplicationClosed,
69    /// The peer is unable to continue processing this association, usually due to having restarted
70    #[error("reset by peer")]
71    Reset,
72    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
73    ///
74    /// If neither side is sending keep-alives, an association will time out after a long enough idle
75    /// period even if the peer is still reachable
76    #[error("timed out")]
77    TimedOut,
78    /// The local application closed the association
79    #[error("closed")]
80    LocallyClosed,
81}
82
83/// Events of interest to the application
84#[derive(Debug)]
85pub enum Event {
86    /// Handshake was failed
87    HandshakeFailed {
88        /// Reason that the association was closed
89        reason: AssociationError,
90    },
91
92    /// The association was successfully established
93    Connected,
94    /// The association was lost
95    ///
96    /// Emitted if the peer closes the association or an error is encountered.
97    AssociationLost {
98        /// Reason that the association was closed
99        reason: AssociationError,
100        id: StreamId,
101    },
102    /// Stream events
103    Stream(StreamEvent),
104    /// One or more application datagrams have been received
105    DatagramReceived,
106}
107
108///Association represents an SCTP association
109//13.2.  Parameters Necessary per Association (i.e., the TCB)
110//Peer : Tag value to be sent in every packet and is received
111//Verification: in the INIT or INIT ACK chunk.
112//Tag :
113//
114//My : Tag expected in every inbound packet and sent in the
115//Verification: INIT or INIT ACK chunk.
116//
117//Tag :
118//State : A state variable indicating what state the association
119// : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
120// : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
121// : SHUTDOWN-ACK-SENT.
122//
123// No Closed state is illustrated since if a
124// association is Closed its TCB SHOULD be removed.
125pub struct Association {
126    side: Side,
127    state: AssociationState,
128    handshake_completed: bool,
129    max_message_size: u32,
130    inflight_queue_length: usize,
131    will_send_shutdown: bool,
132    bytes_received: usize,
133    bytes_sent: usize,
134
135    peer_verification_tag: u32,
136    my_verification_tag: u32,
137    my_next_tsn: u32,
138    peer_last_tsn: u32,
139    // for RTT measurement
140    min_tsn2measure_rtt: u32,
141    will_send_forward_tsn: bool,
142    will_retransmit_fast: bool,
143    will_retransmit_reconfig: bool,
144
145    will_send_shutdown_ack: bool,
146    will_send_shutdown_complete: bool,
147
148    // Reconfig
149    my_next_rsn: u32,
150    reconfigs: HashMap<u32, ChunkReconfig>,
151    reconfig_requests: HashMap<u32, ParamOutgoingResetRequest>,
152
153    // Non-RFC internal data
154    remote_addr: SocketAddr,
155    local_addr: SocketAddr,
156    transport_protocol: TransportProtocol,
157
158    source_port: u16,
159    destination_port: u16,
160    my_max_num_inbound_streams: u16,
161    my_max_num_outbound_streams: u16,
162    my_cookie: Option<ParamStateCookie>,
163
164    payload_queue: PayloadQueue,
165    inflight_queue: PayloadQueue,
166    pending_queue: PendingQueue,
167    control_queue: VecDeque<Packet>,
168    stream_queue: VecDeque<u16>,
169
170    pub(crate) mtu: u32,
171    // max DATA chunk payload size
172    max_payload_size: u32,
173    cumulative_tsn_ack_point: u32,
174    advanced_peer_tsn_ack_point: u32,
175    use_forward_tsn: bool,
176
177    pub(crate) rto_mgr: RtoManager,
178    timers: TimerTable,
179
180    // Congestion control parameters
181    max_receive_buffer_size: u32,
182    // my congestion window size
183    pub(crate) cwnd: u32,
184    // calculated peer's receiver windows size
185    rwnd: u32,
186    // slow start threshold
187    pub(crate) ssthresh: u32,
188    partial_bytes_acked: u32,
189    pub(crate) in_fast_recovery: bool,
190    fast_recover_exit_point: u32,
191
192    // Chunks stored for retransmission
193    stored_init: Option<ChunkInit>,
194    stored_cookie_echo: Option<ChunkCookieEcho>,
195    pub(crate) streams: HashMap<StreamId, StreamState>,
196
197    events: VecDeque<Event>,
198    endpoint_events: VecDeque<EndpointEventInner>,
199    error: Option<AssociationError>,
200
201    // per inbound packet context
202    delayed_ack_triggered: bool,
203    immediate_ack_triggered: bool,
204
205    pub(crate) stats: AssociationStats,
206    ack_state: AckState,
207
208    // for testing
209    pub(crate) ack_mode: AckMode,
210}
211
212impl Default for Association {
213    fn default() -> Self {
214        Association {
215            side: Side::default(),
216            state: AssociationState::default(),
217            handshake_completed: false,
218            max_message_size: 0,
219            inflight_queue_length: 0,
220            will_send_shutdown: false,
221            bytes_received: 0,
222            bytes_sent: 0,
223
224            peer_verification_tag: 0,
225            my_verification_tag: 0,
226            my_next_tsn: 0,
227            peer_last_tsn: 0,
228            // for RTT measurement
229            min_tsn2measure_rtt: 0,
230            will_send_forward_tsn: false,
231            will_retransmit_fast: false,
232            will_retransmit_reconfig: false,
233
234            will_send_shutdown_ack: false,
235            will_send_shutdown_complete: false,
236
237            // Reconfig
238            my_next_rsn: 0,
239            reconfigs: HashMap::default(),
240            reconfig_requests: HashMap::default(),
241
242            // Non-RFC internal data
243            remote_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
244            local_addr: SocketAddr::from_str("0.0.0.0:0").unwrap(),
245            transport_protocol: TransportProtocol::UDP,
246
247            source_port: 0,
248            destination_port: 0,
249            my_max_num_inbound_streams: 0,
250            my_max_num_outbound_streams: 0,
251            my_cookie: None,
252
253            payload_queue: PayloadQueue::default(),
254            inflight_queue: PayloadQueue::default(),
255            pending_queue: PendingQueue::default(),
256            control_queue: VecDeque::default(),
257            stream_queue: VecDeque::default(),
258
259            mtu: 0,
260            // max DATA chunk payload size
261            max_payload_size: 0,
262            cumulative_tsn_ack_point: 0,
263            advanced_peer_tsn_ack_point: 0,
264            use_forward_tsn: false,
265
266            rto_mgr: RtoManager::default(),
267            timers: TimerTable::default(),
268
269            // Congestion control parameters
270            max_receive_buffer_size: 0,
271            // my congestion window size
272            cwnd: 0,
273            // calculated peer's receiver windows size
274            rwnd: 0,
275            // slow start threshold
276            ssthresh: 0,
277            partial_bytes_acked: 0,
278            in_fast_recovery: false,
279            fast_recover_exit_point: 0,
280
281            // Chunks stored for retransmission
282            stored_init: None,
283            stored_cookie_echo: None,
284            streams: HashMap::default(),
285
286            events: VecDeque::default(),
287            endpoint_events: VecDeque::default(),
288            error: None,
289
290            // per inbound packet context
291            delayed_ack_triggered: false,
292            immediate_ack_triggered: false,
293
294            stats: AssociationStats::default(),
295            ack_state: AckState::default(),
296
297            // for testing
298            ack_mode: AckMode::default(),
299        }
300    }
301}
302
303impl Association {
304    #[allow(clippy::too_many_arguments)]
305    pub(crate) fn new(
306        server_config: Option<Arc<ServerConfig>>,
307        config: Arc<TransportConfig>,
308        max_payload_size: u32,
309        local_aid: AssociationId,
310        remote_addr: SocketAddr,
311        local_addr: SocketAddr,
312        protocol: TransportProtocol,
313        now: Instant,
314    ) -> Self {
315        let side = if server_config.is_some() {
316            Side::Server
317        } else {
318            Side::Client
319        };
320
321        // It's a bit strange, but we're going backwards from the calculation in
322        // config.rs to get max_payload_size from INITIAL_MTU.
323        let mtu = max_payload_size + COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE;
324
325        // RFC 4690 Sec 7.2.1
326        // The initial cwnd before DATA transmission or after a sufficiently
327        // long idle period MUST be set to min(4*MTU, max (2*MTU, 4380bytes)).
328        let cwnd = (2 * mtu).clamp(4380, 4 * mtu);
329        let mut tsn = random::<u32>();
330        if tsn == 0 {
331            tsn += 1;
332        }
333
334        let mut this = Association {
335            side,
336            handshake_completed: false,
337            max_receive_buffer_size: config.max_receive_buffer_size(),
338            max_message_size: config.max_message_size(),
339            my_max_num_outbound_streams: config.max_num_outbound_streams(),
340            my_max_num_inbound_streams: config.max_num_inbound_streams(),
341            max_payload_size,
342
343            rto_mgr: RtoManager::new(),
344            timers: TimerTable::new(config.timer_config()),
345
346            mtu,
347            cwnd,
348            remote_addr,
349            local_addr,
350            transport_protocol: protocol,
351
352            my_verification_tag: local_aid,
353            my_next_tsn: tsn,
354            my_next_rsn: tsn,
355            min_tsn2measure_rtt: tsn,
356            cumulative_tsn_ack_point: tsn - 1,
357            advanced_peer_tsn_ack_point: tsn - 1,
358            error: None,
359
360            ..Default::default()
361        };
362
363        if side.is_client() {
364            let mut init = ChunkInit {
365                initial_tsn: this.my_next_tsn,
366                num_outbound_streams: this.my_max_num_outbound_streams,
367                num_inbound_streams: this.my_max_num_inbound_streams,
368                initiate_tag: this.my_verification_tag,
369                advertised_receiver_window_credit: this.max_receive_buffer_size,
370                ..Default::default()
371            };
372            init.set_supported_extensions();
373
374            this.set_state(AssociationState::CookieWait);
375            this.stored_init = Some(init);
376            let _ = this.send_init();
377            this.timers
378                .start(Timer::T1Init, now, this.rto_mgr.get_rto());
379        }
380
381        this
382    }
383
384    /// Returns application-facing event
385    ///
386    /// Associations should be polled for events after:
387    /// - a call was made to `handle_event`
388    /// - a call was made to `handle_timeout`
389    #[must_use]
390    pub fn poll(&mut self) -> Option<Event> {
391        if let Some(x) = self.events.pop_front() {
392            return Some(x);
393        }
394
395        /*TODO: if let Some(event) = self.streams.poll() {
396            return Some(Event::Stream(event));
397        }*/
398
399        if let Some(err) = self.error.take() {
400            return Some(Event::HandshakeFailed { reason: err });
401        }
402
403        None
404    }
405
406    /// Return endpoint-facing event
407    #[must_use]
408    pub fn poll_endpoint_event(&mut self) -> Option<EndpointEvent> {
409        self.endpoint_events.pop_front().map(EndpointEvent)
410    }
411
412    /// Returns the next time at which `handle_timeout` should be called
413    ///
414    /// The value returned may change after:
415    /// - the application performed some I/O on the association
416    /// - a call was made to `handle_transmit`
417    /// - a call to `poll_transmit` returned `Some`
418    /// - a call was made to `handle_timeout`
419    #[must_use]
420    pub fn poll_timeout(&self) -> Option<Instant> {
421        self.timers.next_timeout()
422    }
423
424    /// Returns packets to transmit
425    ///
426    /// Associations should be polled for transmit after:
427    /// - the application performed some I/O on the Association
428    /// - a call was made to `handle_event`
429    /// - a call was made to `handle_timeout`
430    #[must_use]
431    pub fn poll_transmit(&mut self, now: Instant) -> Option<TransportMessage<Payload>> {
432        let (contents, _) = self.gather_outbound(now);
433        if contents.is_empty() {
434            None
435        } else {
436            trace!(
437                "[{}] sending {} bytes (total {} datagrams)",
438                self.side,
439                contents.iter().fold(0, |l, c| l + c.len()),
440                contents.len()
441            );
442            Some(TransportMessage {
443                now,
444                transport: TransportContext {
445                    local_addr: self.local_addr,
446                    peer_addr: self.remote_addr,
447                    ecn: None,
448                    transport_protocol: Default::default(),
449                },
450                message: Payload::RawEncode(contents),
451            })
452        }
453    }
454
455    /// Process timer expirations
456    ///
457    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
458    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
459    /// methods.
460    ///
461    /// It is most efficient to call this immediately after the system clock reaches the latest
462    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
463    /// no-op and therefore are safe.
464    pub fn handle_timeout(&mut self, now: Instant) {
465        for &timer in &Timer::VALUES {
466            let (expired, failure, n_rtos) = self.timers.is_expired(timer, now);
467            if !expired {
468                continue;
469            }
470            self.timers.set(timer, None);
471            //trace!("{:?} timeout", timer);
472
473            if timer == Timer::Ack {
474                self.on_ack_timeout();
475            } else if failure {
476                self.on_retransmission_failure(timer);
477            } else {
478                self.on_retransmission_timeout(timer, n_rtos);
479                self.timers.start(timer, now, self.rto_mgr.get_rto());
480            }
481        }
482    }
483
484    /// Process `AssociationEvent`s generated by the associated `Endpoint`
485    ///
486    /// Will execute protocol logic upon receipt of an association event, in turn preparing signals
487    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
488    /// extracted through the relevant methods.
489    pub fn handle_event(&mut self, event: AssociationEvent) {
490        match event.0 {
491            AssociationEventInner::Datagram(transmit) => {
492                // If this packet could initiate a migration and we're a client or a server that
493                // forbids migration, drop the datagram. This could be relaxed to heuristically
494                // permit NAT-rebinding-like migration.
495                /*TODO:if remote != self.remote && self.server_config.as_ref().map_or(true, |x| !x.migration)
496                {
497                    trace!("discarding packet from unrecognized peer {}", remote);
498                    return;
499                }*/
500
501                if let Payload::PartialDecode(partial_decode) = transmit.message {
502                    debug!(
503                        "[{}] recving {} bytes",
504                        self.side,
505                        COMMON_HEADER_SIZE as usize + partial_decode.remaining.len()
506                    );
507
508                    let pkt = match partial_decode.finish() {
509                        Ok(p) => p,
510                        Err(err) => {
511                            warn!("[{}] unable to parse SCTP packet {}", self.side, err);
512                            return;
513                        }
514                    };
515
516                    if let Err(err) = self.handle_inbound(pkt, transmit.now) {
517                        error!("handle_inbound got err: {}", err);
518                        let _ = self.close(AssociationError::TransportError);
519                    }
520                } else {
521                    trace!("discarding invalid partial_decode");
522                }
523            } //TODO:
524        }
525    }
526
527    /// Returns Association statistics
528    pub fn stats(&self) -> AssociationStats {
529        self.stats
530    }
531
532    /// Whether the Association is in the process of being established
533    ///
534    /// If this returns `false`, the Association may be either established or closed, signaled by the
535    /// emission of a `Connected` or `AssociationLost` message respectively.
536    pub fn is_handshaking(&self) -> bool {
537        !self.handshake_completed
538    }
539
540    /// Whether the Association is closed
541    ///
542    /// Closed Associations cannot transport any further data. An association becomes closed when
543    /// either peer application intentionally closes it, or when either transport layer detects an
544    /// error such as a time-out or certificate validation failure.
545    ///
546    /// A `AssociationLost` event is emitted with details when the association becomes closed.
547    pub fn is_closed(&self) -> bool {
548        self.state == AssociationState::Closed
549    }
550
551    /// Whether there is no longer any need to keep the association around
552    ///
553    /// Closed associations become drained after a brief timeout to absorb any remaining in-flight
554    /// packets from the peer. All drained associations have been closed.
555    pub fn is_drained(&self) -> bool {
556        self.state.is_drained()
557    }
558
559    /// Look up whether we're the client or server of this Association
560    pub fn side(&self) -> Side {
561        self.side
562    }
563
564    /// The latest socket address for this Association's peer
565    pub fn remote_addr(&self) -> SocketAddr {
566        self.remote_addr
567    }
568
569    /// Current best estimate of this Association's latency (round-trip-time)
570    pub fn rtt(&self) -> Duration {
571        Duration::from_millis(self.rto_mgr.get_rto())
572    }
573
574    /// The local IP address which was used when the peer established
575    /// the association
576    ///
577    /// This can be different from the address the endpoint is bound to, in case
578    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
579    ///
580    /// This will return `None` for clients.
581    ///
582    /// Retrieving the local IP address is currently supported on the following
583    /// platforms:
584    /// - Linux
585    ///
586    /// On all non-supported platforms the local IP address will not be available,
587    /// and the method will return `None`.
588    pub fn local_addr(&self) -> SocketAddr {
589        self.local_addr
590    }
591
592    /// Shutdown initiates the shutdown sequence. The method blocks until the
593    /// shutdown sequence is completed and the association is closed, or until the
594    /// passed context is done, in which case the context's error is returned.
595    pub fn shutdown(&mut self) -> Result<()> {
596        debug!("[{}] closing association..", self.side);
597
598        let state = self.state();
599        if state != AssociationState::Established {
600            return Err(Error::ErrShutdownNonEstablished);
601        }
602
603        // Attempt a graceful shutdown.
604        self.set_state(AssociationState::ShutdownPending);
605
606        if self.inflight_queue_length == 0 {
607            // No more outstanding, send shutdown.
608            self.will_send_shutdown = true;
609            self.awake_write_loop();
610            self.set_state(AssociationState::ShutdownSent);
611        }
612
613        self.endpoint_events.push_back(EndpointEventInner::Drained);
614
615        Ok(())
616    }
617
618    /// Close ends the SCTP Association and cleans up any state
619    pub fn close(&mut self, reason: AssociationError) -> Result<()> {
620        if self.state() != AssociationState::Closed {
621            self.set_state(AssociationState::Closed);
622
623            debug!("[{}] closing association..", self.side);
624
625            self.close_all_timers();
626
627            for si in self.streams.keys().cloned().collect::<Vec<u16>>() {
628                self.unregister_stream(si, reason.clone());
629            }
630
631            debug!("[{}] association closed", self.side);
632            debug!(
633                "[{}] stats nDATAs (in) : {}",
634                self.side,
635                self.stats.get_num_datas()
636            );
637            debug!(
638                "[{}] stats nSACKs (in) : {}",
639                self.side,
640                self.stats.get_num_sacks()
641            );
642            debug!(
643                "[{}] stats nT3Timeouts : {}",
644                self.side,
645                self.stats.get_num_t3timeouts()
646            );
647            debug!(
648                "[{}] stats nAckTimeouts: {}",
649                self.side,
650                self.stats.get_num_ack_timeouts()
651            );
652            debug!(
653                "[{}] stats nFastRetrans: {}",
654                self.side,
655                self.stats.get_num_fast_retrans()
656            );
657        }
658
659        Ok(())
660    }
661
662    /// open_stream opens a stream
663    pub fn open_stream(
664        &mut self,
665        stream_identifier: StreamId,
666        default_payload_type: PayloadProtocolIdentifier,
667    ) -> Result<Stream<'_>> {
668        if self.streams.contains_key(&stream_identifier) {
669            return Err(Error::ErrStreamAlreadyExist);
670        }
671
672        if let Some(s) = self.create_stream(stream_identifier, false, default_payload_type) {
673            Ok(s)
674        } else {
675            Err(Error::ErrStreamCreateFailed)
676        }
677    }
678
679    /// accept_stream accepts a stream
680    pub fn accept_stream(&mut self) -> Option<Stream<'_>> {
681        self.stream_queue
682            .pop_front()
683            .map(move |stream_identifier| Stream {
684                stream_identifier,
685                association: self,
686            })
687    }
688
689    /// stream returns a stream
690    pub fn stream(&mut self, stream_identifier: StreamId) -> Result<Stream<'_>> {
691        if !self.streams.contains_key(&stream_identifier) {
692            Err(Error::ErrStreamNotExisted)
693        } else {
694            Ok(Stream {
695                stream_identifier,
696                association: self,
697            })
698        }
699    }
700
701    pub fn stream_ids(&self) -> Vec<StreamId> {
702        self.streams.keys().cloned().collect()
703    }
704
705    /// bytes_sent returns the number of bytes sent
706    pub(crate) fn bytes_sent(&self) -> usize {
707        self.bytes_sent
708    }
709
710    /// bytes_received returns the number of bytes received
711    pub(crate) fn bytes_received(&self) -> usize {
712        self.bytes_received
713    }
714
715    /// max_message_size returns the maximum message size you can send.
716    pub(crate) fn max_message_size(&self) -> u32 {
717        self.max_message_size
718    }
719
720    /// set_max_message_size sets the maximum message size you can send.
721    pub(crate) fn set_max_message_size(&mut self, max_message_size: u32) {
722        self.max_message_size = max_message_size;
723    }
724
725    /// unregister_stream un-registers a stream from the association
726    /// The caller should hold the association write lock.
727    fn unregister_stream(&mut self, stream_identifier: StreamId, reason: AssociationError) {
728        if let Some(mut s) = self.streams.remove(&stream_identifier) {
729            debug!("[{}] unregister_stream {}", self.side, stream_identifier);
730            self.events.push_back(Event::AssociationLost {
731                reason,
732                id: stream_identifier,
733            });
734            s.state = RecvSendState::Closed;
735        }
736    }
737
738    /// set_state atomically sets the state of the Association.
739    fn set_state(&mut self, new_state: AssociationState) {
740        if new_state != self.state {
741            debug!(
742                "[{}] state change: '{}' => '{}'",
743                self.side, self.state, new_state,
744            );
745        }
746        self.state = new_state;
747    }
748
749    /// state atomically returns the state of the Association.
750    pub(crate) fn state(&self) -> AssociationState {
751        self.state
752    }
753
754    /// caller must hold self.lock
755    fn send_init(&mut self) -> Result<()> {
756        if let Some(stored_init) = &self.stored_init {
757            debug!("[{}] sending INIT", self.side);
758
759            self.source_port = 5000; // Spec??
760            self.destination_port = 5000; // Spec??
761
762            let outbound = Packet {
763                common_header: CommonHeader {
764                    source_port: self.source_port,
765                    destination_port: self.destination_port,
766                    verification_tag: self.peer_verification_tag,
767                },
768                chunks: vec![Box::new(stored_init.clone())],
769            };
770
771            self.control_queue.push_back(outbound);
772            self.awake_write_loop();
773
774            Ok(())
775        } else {
776            Err(Error::ErrInitNotStoredToSend)
777        }
778    }
779
780    /// caller must hold self.lock
781    fn send_cookie_echo(&mut self) -> Result<()> {
782        if let Some(stored_cookie_echo) = &self.stored_cookie_echo {
783            debug!("[{}] sending COOKIE-ECHO", self.side);
784
785            let outbound = Packet {
786                common_header: CommonHeader {
787                    source_port: self.source_port,
788                    destination_port: self.destination_port,
789                    verification_tag: self.peer_verification_tag,
790                },
791                chunks: vec![Box::new(stored_cookie_echo.clone())],
792            };
793
794            self.control_queue.push_back(outbound);
795            self.awake_write_loop();
796
797            Ok(())
798        } else {
799            Err(Error::ErrCookieEchoNotStoredToSend)
800        }
801    }
802
803    /// handle_inbound parses incoming raw packets
804    fn handle_inbound(&mut self, p: Packet, now: Instant) -> Result<()> {
805        if let Err(err) = p.check_packet() {
806            warn!("[{}] failed validating packet {}", self.side, err);
807            return Ok(());
808        }
809
810        self.handle_chunk_start();
811
812        for c in &p.chunks {
813            self.handle_chunk(&p, c, now)?;
814        }
815
816        self.handle_chunk_end(now);
817
818        Ok(())
819    }
820
821    fn handle_chunk_start(&mut self) {
822        self.delayed_ack_triggered = false;
823        self.immediate_ack_triggered = false;
824    }
825
826    fn handle_chunk_end(&mut self, now: Instant) {
827        if self.immediate_ack_triggered {
828            self.ack_state = AckState::Immediate;
829            self.timers.stop(Timer::Ack);
830            self.awake_write_loop();
831        } else if self.delayed_ack_triggered {
832            // Will send delayed ack in the next ack timeout
833            self.ack_state = AckState::Delay;
834            self.timers.start(Timer::Ack, now, ACK_INTERVAL);
835        }
836    }
837
838    #[allow(clippy::borrowed_box)]
839    fn handle_chunk(&mut self, p: &Packet, chunk: &Box<dyn Chunk>, now: Instant) -> Result<()> {
840        chunk.check()?;
841        let chunk_any = chunk.as_any();
842        let packets = if let Some(c) = chunk_any.downcast_ref::<ChunkInit>() {
843            if c.is_ack {
844                self.handle_init_ack(p, c, now)?
845            } else {
846                self.handle_init(p, c)?
847            }
848        } else if let Some(c) = chunk_any.downcast_ref::<ChunkAbort>() {
849            let mut err_str = String::new();
850            for e in &c.error_causes {
851                if matches!(e.code, USER_INITIATED_ABORT) {
852                    debug!("User initiated abort received");
853                    let _ = self.close(AssociationError::Reset);
854                    return Ok(());
855                }
856                err_str += &format!("({})", e);
857            }
858            return Err(Error::ErrAbortChunk(err_str));
859        } else if let Some(c) = chunk_any.downcast_ref::<ChunkError>() {
860            let mut err_str = String::new();
861            for e in &c.error_causes {
862                err_str += &format!("({})", e);
863            }
864            return Err(Error::ErrAbortChunk(err_str));
865        } else if let Some(c) = chunk_any.downcast_ref::<ChunkHeartbeat>() {
866            self.handle_heartbeat(c)?
867        } else if let Some(c) = chunk_any.downcast_ref::<ChunkCookieEcho>() {
868            self.handle_cookie_echo(c)?
869        } else if chunk_any.downcast_ref::<ChunkCookieAck>().is_some() {
870            self.handle_cookie_ack()?
871        } else if let Some(c) = chunk_any.downcast_ref::<ChunkPayloadData>() {
872            self.handle_data(c)?
873        } else if let Some(c) = chunk_any.downcast_ref::<ChunkSelectiveAck>() {
874            self.handle_sack(c, now)?
875        } else if let Some(c) = chunk_any.downcast_ref::<ChunkReconfig>() {
876            self.handle_reconfig(c)?
877        } else if let Some(c) = chunk_any.downcast_ref::<ChunkForwardTsn>() {
878            self.handle_forward_tsn(c)?
879        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdown>() {
880            self.handle_shutdown(c)?
881        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownAck>() {
882            self.handle_shutdown_ack(c)?
883        } else if let Some(c) = chunk_any.downcast_ref::<ChunkShutdownComplete>() {
884            self.handle_shutdown_complete(c)?
885        } else {
886            return Err(Error::ErrChunkTypeUnhandled);
887        };
888
889        if !packets.is_empty() {
890            let mut buf: VecDeque<_> = packets.into_iter().collect();
891            self.control_queue.append(&mut buf);
892            self.awake_write_loop();
893        }
894
895        Ok(())
896    }
897
898    fn handle_init(&mut self, p: &Packet, i: &ChunkInit) -> Result<Vec<Packet>> {
899        let state = self.state();
900        debug!("[{}] chunkInit received in state '{}'", self.side, state);
901
902        // https://tools.ietf.org/html/rfc4960#section-5.2.1
903        // Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST
904        // respond with an INIT ACK using the same parameters it sent in its
905        // original INIT chunk (including its Initiate Tag, unchanged).  When
906        // responding, the endpoint MUST send the INIT ACK back to the same
907        // address that the original INIT (sent by this endpoint) was sent.
908
909        if state != AssociationState::Closed
910            && state != AssociationState::CookieWait
911            && state != AssociationState::CookieEchoed
912        {
913            // 5.2.2.  Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED,
914            //        COOKIE-WAIT, and SHUTDOWN-ACK-SENT
915            return Err(Error::ErrHandleInitState);
916        }
917
918        // Should we be setting any of these permanently until we've ACKed further?
919        self.my_max_num_inbound_streams =
920            std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
921        self.my_max_num_outbound_streams =
922            std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
923        self.peer_verification_tag = i.initiate_tag;
924        self.source_port = p.common_header.destination_port;
925        self.destination_port = p.common_header.source_port;
926
927        // 13.2 This is the last TSN received in sequence.  This value
928        // is set initially by taking the peer's initial TSN,
929        // received in the INIT or INIT ACK chunk, and
930        // subtracting one from it.
931        self.peer_last_tsn = if i.initial_tsn == 0 {
932            u32::MAX
933        } else {
934            i.initial_tsn - 1
935        };
936
937        for param in &i.params {
938            if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
939                for t in &v.chunk_types {
940                    if *t == CT_FORWARD_TSN {
941                        debug!("[{}] use ForwardTSN (on init)", self.side);
942                        self.use_forward_tsn = true;
943                    }
944                }
945            }
946        }
947        if !self.use_forward_tsn {
948            warn!("[{}] not using ForwardTSN (on init)", self.side);
949        }
950
951        let mut outbound = Packet {
952            common_header: CommonHeader {
953                verification_tag: self.peer_verification_tag,
954                source_port: self.source_port,
955                destination_port: self.destination_port,
956            },
957            chunks: vec![],
958        };
959
960        let mut init_ack = ChunkInit {
961            is_ack: true,
962            initial_tsn: self.my_next_tsn,
963            num_outbound_streams: self.my_max_num_outbound_streams,
964            num_inbound_streams: self.my_max_num_inbound_streams,
965            initiate_tag: self.my_verification_tag,
966            advertised_receiver_window_credit: self.max_receive_buffer_size,
967            ..Default::default()
968        };
969
970        if self.my_cookie.is_none() {
971            self.my_cookie = Some(ParamStateCookie::new());
972        }
973
974        if let Some(my_cookie) = &self.my_cookie {
975            init_ack.params = vec![Box::new(my_cookie.clone())];
976        }
977
978        init_ack.set_supported_extensions();
979
980        outbound.chunks = vec![Box::new(init_ack)];
981
982        Ok(vec![outbound])
983    }
984
985    fn handle_init_ack(
986        &mut self,
987        p: &Packet,
988        i: &ChunkInitAck,
989        now: Instant,
990    ) -> Result<Vec<Packet>> {
991        let state = self.state();
992        debug!("[{}] chunkInitAck received in state '{}'", self.side, state);
993        if state != AssociationState::CookieWait {
994            // RFC 4960
995            // 5.2.3.  Unexpected INIT ACK
996            //   If an INIT ACK is received by an endpoint in any state other than the
997            //   COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk.
998            //   An unexpected INIT ACK usually indicates the processing of an old or
999            //   duplicated INIT chunk.
1000            return Ok(vec![]);
1001        }
1002
1003        self.my_max_num_inbound_streams =
1004            std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
1005        self.my_max_num_outbound_streams =
1006            std::cmp::min(i.num_outbound_streams, self.my_max_num_outbound_streams);
1007        self.peer_verification_tag = i.initiate_tag;
1008        self.peer_last_tsn = if i.initial_tsn == 0 {
1009            u32::MAX
1010        } else {
1011            i.initial_tsn - 1
1012        };
1013        if self.source_port != p.common_header.destination_port
1014            || self.destination_port != p.common_header.source_port
1015        {
1016            warn!("[{}] handle_init_ack: port mismatch", self.side);
1017            return Ok(vec![]);
1018        }
1019
1020        self.rwnd = i.advertised_receiver_window_credit;
1021        debug!("[{}] initial rwnd={}", self.side, self.rwnd);
1022
1023        // RFC 4690 Sec 7.2.1
1024        //  o  The initial value of ssthresh MAY be arbitrarily high (for
1025        //     example, implementations MAY use the size of the receiver
1026        //     advertised window).
1027        self.ssthresh = self.rwnd;
1028        trace!(
1029            "[{}] updated cwnd={} ssthresh={} inflight={} (INI)",
1030            self.side,
1031            self.cwnd,
1032            self.ssthresh,
1033            self.inflight_queue.get_num_bytes()
1034        );
1035
1036        self.timers.stop(Timer::T1Init);
1037        self.stored_init = None;
1038
1039        let mut cookie_param = None;
1040        for param in &i.params {
1041            if let Some(v) = param.as_any().downcast_ref::<ParamStateCookie>() {
1042                cookie_param = Some(v);
1043            } else if let Some(v) = param.as_any().downcast_ref::<ParamSupportedExtensions>() {
1044                for t in &v.chunk_types {
1045                    if *t == CT_FORWARD_TSN {
1046                        debug!("[{}] use ForwardTSN (on initAck)", self.side);
1047                        self.use_forward_tsn = true;
1048                    }
1049                }
1050            }
1051        }
1052        if !self.use_forward_tsn {
1053            warn!("[{}] not using ForwardTSN (on initAck)", self.side);
1054        }
1055
1056        if let Some(v) = cookie_param {
1057            self.stored_cookie_echo = Some(ChunkCookieEcho {
1058                cookie: v.cookie.clone(),
1059            });
1060
1061            self.send_cookie_echo()?;
1062
1063            self.timers
1064                .start(Timer::T1Cookie, now, self.rto_mgr.get_rto());
1065
1066            self.set_state(AssociationState::CookieEchoed);
1067
1068            Ok(vec![])
1069        } else {
1070            Err(Error::ErrInitAckNoCookie)
1071        }
1072    }
1073
1074    fn handle_heartbeat(&self, c: &ChunkHeartbeat) -> Result<Vec<Packet>> {
1075        trace!("[{}] chunkHeartbeat", self.side);
1076        if let Some(p) = c.params.first() {
1077            if let Some(hbi) = p.as_any().downcast_ref::<ParamHeartbeatInfo>() {
1078                return Ok(vec![Packet {
1079                    common_header: CommonHeader {
1080                        verification_tag: self.peer_verification_tag,
1081                        source_port: self.source_port,
1082                        destination_port: self.destination_port,
1083                    },
1084                    chunks: vec![Box::new(ChunkHeartbeatAck {
1085                        params: vec![Box::new(ParamHeartbeatInfo {
1086                            heartbeat_information: hbi.heartbeat_information.clone(),
1087                        })],
1088                    })],
1089                }]);
1090            } else {
1091                warn!(
1092                    "[{}] failed to handle Heartbeat, no ParamHeartbeatInfo",
1093                    self.side,
1094                );
1095            }
1096        }
1097
1098        Ok(vec![])
1099    }
1100
1101    fn handle_cookie_echo(&mut self, c: &ChunkCookieEcho) -> Result<Vec<Packet>> {
1102        let state = self.state();
1103        debug!("[{}] COOKIE-ECHO received in state '{}'", self.side, state);
1104
1105        if let Some(my_cookie) = &self.my_cookie {
1106            match state {
1107                AssociationState::Established => {
1108                    if my_cookie.cookie != c.cookie {
1109                        return Ok(vec![]);
1110                    }
1111                }
1112                AssociationState::Closed
1113                | AssociationState::CookieWait
1114                | AssociationState::CookieEchoed => {
1115                    if my_cookie.cookie != c.cookie {
1116                        return Ok(vec![]);
1117                    }
1118
1119                    self.timers.stop(Timer::T1Init);
1120                    self.stored_init = None;
1121
1122                    self.timers.stop(Timer::T1Cookie);
1123                    self.stored_cookie_echo = None;
1124
1125                    self.events.push_back(Event::Connected);
1126                    self.set_state(AssociationState::Established);
1127                    self.handshake_completed = true;
1128                }
1129                _ => return Ok(vec![]),
1130            };
1131        } else {
1132            debug!("[{}] COOKIE-ECHO received before initialization", self.side);
1133            return Ok(vec![]);
1134        }
1135
1136        Ok(vec![Packet {
1137            common_header: CommonHeader {
1138                verification_tag: self.peer_verification_tag,
1139                source_port: self.source_port,
1140                destination_port: self.destination_port,
1141            },
1142            chunks: vec![Box::new(ChunkCookieAck {})],
1143        }])
1144    }
1145
1146    fn handle_cookie_ack(&mut self) -> Result<Vec<Packet>> {
1147        let state = self.state();
1148        debug!("[{}] COOKIE-ACK received in state '{}'", self.side, state);
1149        if state != AssociationState::CookieEchoed {
1150            // RFC 4960
1151            // 5.2.5.  Handle Duplicate COOKIE-ACK.
1152            //   At any state other than COOKIE-ECHOED, an endpoint should silently
1153            //   discard a received COOKIE ACK chunk.
1154            return Ok(vec![]);
1155        }
1156
1157        self.timers.stop(Timer::T1Cookie);
1158        self.stored_cookie_echo = None;
1159
1160        self.events.push_back(Event::Connected);
1161        self.set_state(AssociationState::Established);
1162        self.handshake_completed = true;
1163
1164        Ok(vec![])
1165    }
1166
1167    fn handle_data(&mut self, d: &ChunkPayloadData) -> Result<Vec<Packet>> {
1168        debug!(
1169            "[{}] DATA: tsn={} peer_last_tsn={} immediateSack={} len={}, unordered={}",
1170            self.side,
1171            d.tsn,
1172            self.peer_last_tsn,
1173            d.immediate_sack,
1174            d.user_data.len(),
1175            d.unordered,
1176        );
1177        self.stats.inc_datas();
1178
1179        let can_push = self.payload_queue.can_push(d, self.peer_last_tsn);
1180        let mut stream_handle_data = false;
1181        if can_push {
1182            if self.get_or_create_stream(d.stream_identifier).is_some() {
1183                if self.get_my_receiver_window_credit() > 0 {
1184                    // Pass the new chunk to stream level as soon as it arrives
1185                    self.payload_queue.push(d.clone(), self.peer_last_tsn);
1186                    stream_handle_data = true;
1187                } else {
1188                    // Receive buffer is full
1189                    if let Some(last_tsn) = self.payload_queue.get_last_tsn_received() {
1190                        if sna32lt(d.tsn, *last_tsn) {
1191                            debug!(
1192                                "[{}] receive buffer full, but accepted as this is a missing chunk with tsn={} ssn={}",
1193                                self.side, d.tsn, d.stream_sequence_number
1194                            );
1195                            self.payload_queue.push(d.clone(), self.peer_last_tsn);
1196                            stream_handle_data = true; //s.handle_data(d.clone());
1197                        }
1198                    } else {
1199                        debug!(
1200                            "[{}] receive buffer full. dropping DATA with tsn={} ssn={}",
1201                            self.side, d.tsn, d.stream_sequence_number
1202                        );
1203                    }
1204                }
1205            } else {
1206                // silently discard the data. (sender will retry on T3-rtx timeout)
1207                debug!("[{}] discard {}", self.side, d.stream_sequence_number);
1208                return Ok(vec![]);
1209            }
1210        }
1211
1212        let immediate_sack = d.immediate_sack;
1213
1214        if stream_handle_data && let Some(s) = self.streams.get_mut(&d.stream_identifier) {
1215            self.events.push_back(Event::DatagramReceived);
1216            if s.handle_data(d) && s.reassembly_queue.is_readable() {
1217                self.events.push_back(Event::Stream(StreamEvent::Readable {
1218                    id: s.stream_identifier,
1219                }));
1220            }
1221        }
1222
1223        self.handle_peer_last_tsn_and_acknowledgement(immediate_sack)
1224    }
1225
1226    fn handle_sack(&mut self, d: &ChunkSelectiveAck, now: Instant) -> Result<Vec<Packet>> {
1227        trace!(
1228            "[{}] {}, SACK: cumTSN={} a_rwnd={}",
1229            self.side,
1230            self.cumulative_tsn_ack_point,
1231            d.cumulative_tsn_ack,
1232            d.advertised_receiver_window_credit
1233        );
1234        let state = self.state();
1235        if state != AssociationState::Established
1236            && state != AssociationState::ShutdownPending
1237            && state != AssociationState::ShutdownReceived
1238        {
1239            return Ok(vec![]);
1240        }
1241
1242        self.stats.inc_sacks();
1243
1244        if sna32gt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1245            // RFC 4960 sec 6.2.1.  Processing a Received SACK
1246            // D)
1247            //   i) If Cumulative TSN Ack is less than the Cumulative TSN Ack
1248            //      Point, then drop the SACK.  Since Cumulative TSN Ack is
1249            //      monotonically increasing, a SACK whose Cumulative TSN Ack is
1250            //      less than the Cumulative TSN Ack Point indicates an out-of-
1251            //      order SACK.
1252
1253            debug!(
1254                "[{}] SACK Cumulative ACK {} is older than ACK point {}",
1255                self.side, d.cumulative_tsn_ack, self.cumulative_tsn_ack_point
1256            );
1257
1258            return Ok(vec![]);
1259        }
1260
1261        // Process selective ack
1262        let (bytes_acked_per_stream, htna) = self.process_selective_ack(d, now)?;
1263
1264        let mut total_bytes_acked = 0;
1265        for n_bytes_acked in bytes_acked_per_stream.values() {
1266            total_bytes_acked += *n_bytes_acked;
1267        }
1268
1269        let mut cum_tsn_ack_point_advanced = false;
1270        if sna32lt(self.cumulative_tsn_ack_point, d.cumulative_tsn_ack) {
1271            trace!(
1272                "[{}] SACK: cumTSN advanced: {} -> {}",
1273                self.side, self.cumulative_tsn_ack_point, d.cumulative_tsn_ack
1274            );
1275
1276            self.cumulative_tsn_ack_point = d.cumulative_tsn_ack;
1277            cum_tsn_ack_point_advanced = true;
1278            self.on_cumulative_tsn_ack_point_advanced(total_bytes_acked, now);
1279        }
1280
1281        for (si, n_bytes_acked) in &bytes_acked_per_stream {
1282            if let Some(s) = self.streams.get_mut(si)
1283                && s.on_buffer_released(*n_bytes_acked)
1284            {
1285                trace!("StreamEvent::BufferedAmountLow");
1286                self.events
1287                    .push_back(Event::Stream(StreamEvent::BufferedAmountLow { id: *si }))
1288            }
1289        }
1290
1291        // New rwnd value
1292        // RFC 4960 sec 6.2.1.  Processing a Received SACK
1293        // D)
1294        //   ii) Set rwnd equal to the newly received a_rwnd minus the number
1295        //       of bytes still outstanding after processing the Cumulative
1296        //       TSN Ack and the Gap Ack Blocks.
1297
1298        // bytes acked were already subtracted by markAsAcked() method
1299        let bytes_outstanding = self.inflight_queue.get_num_bytes() as u32;
1300        if bytes_outstanding >= d.advertised_receiver_window_credit {
1301            self.rwnd = 0;
1302        } else {
1303            self.rwnd = d.advertised_receiver_window_credit - bytes_outstanding;
1304        }
1305
1306        self.process_fast_retransmission(d.cumulative_tsn_ack, htna, cum_tsn_ack_point_advanced)?;
1307
1308        if self.use_forward_tsn {
1309            // RFC 3758 Sec 3.5 C1
1310            if sna32lt(
1311                self.advanced_peer_tsn_ack_point,
1312                self.cumulative_tsn_ack_point,
1313            ) {
1314                self.advanced_peer_tsn_ack_point = self.cumulative_tsn_ack_point
1315            }
1316
1317            // RFC 3758 Sec 3.5 C2
1318            let mut i = self.advanced_peer_tsn_ack_point + 1;
1319            while let Some(c) = self.inflight_queue.get(i) {
1320                if !c.abandoned() {
1321                    break;
1322                }
1323                self.advanced_peer_tsn_ack_point = i;
1324                i += 1;
1325            }
1326
1327            // RFC 3758 Sec 3.5 C3
1328            if sna32gt(
1329                self.advanced_peer_tsn_ack_point,
1330                self.cumulative_tsn_ack_point,
1331            ) {
1332                self.will_send_forward_tsn = true;
1333                debug!(
1334                    "[{}] handleSack {}: sna32GT({}, {})",
1335                    self.side,
1336                    self.will_send_forward_tsn,
1337                    self.advanced_peer_tsn_ack_point,
1338                    self.cumulative_tsn_ack_point
1339                );
1340            }
1341            self.awake_write_loop();
1342        }
1343
1344        self.postprocess_sack(state, cum_tsn_ack_point_advanced, now);
1345
1346        Ok(vec![])
1347    }
1348
1349    fn handle_reconfig(&mut self, c: &ChunkReconfig) -> Result<Vec<Packet>> {
1350        trace!("[{}] handle_reconfig", self.side);
1351
1352        let mut pp = vec![];
1353
1354        if let Some(param_a) = &c.param_a {
1355            self.handle_reconfig_param(param_a, &mut pp)?;
1356        }
1357
1358        if let Some(param_b) = &c.param_b {
1359            self.handle_reconfig_param(param_b, &mut pp)?;
1360        }
1361
1362        Ok(pp)
1363    }
1364
1365    fn handle_forward_tsn(&mut self, c: &ChunkForwardTsn) -> Result<Vec<Packet>> {
1366        trace!("[{}] FwdTSN: {}", self.side, c);
1367
1368        if !self.use_forward_tsn {
1369            warn!("[{}] received FwdTSN but not enabled", self.side);
1370            // Return an error chunk
1371            let cerr = ChunkError {
1372                error_causes: vec![ErrorCauseUnrecognizedChunkType::default()],
1373            };
1374
1375            let outbound = Packet {
1376                common_header: CommonHeader {
1377                    verification_tag: self.peer_verification_tag,
1378                    source_port: self.source_port,
1379                    destination_port: self.destination_port,
1380                },
1381                chunks: vec![Box::new(cerr)],
1382            };
1383            return Ok(vec![outbound]);
1384        }
1385
1386        // From RFC 3758 Sec 3.6:
1387        //   Note, if the "New Cumulative TSN" value carried in the arrived
1388        //   FORWARD TSN chunk is found to be behind or at the current cumulative
1389        //   TSN point, the data receiver MUST treat this FORWARD TSN as out-of-
1390        //   date and MUST NOT update its Cumulative TSN.  The receiver SHOULD
1391        //   send a SACK to its peer (the sender of the FORWARD TSN) since such a
1392        //   duplicate may indicate the previous SACK was lost in the network.
1393
1394        trace!(
1395            "[{}] should send ack? newCumTSN={} peer_last_tsn={}",
1396            self.side, c.new_cumulative_tsn, self.peer_last_tsn
1397        );
1398        if sna32lte(c.new_cumulative_tsn, self.peer_last_tsn) {
1399            trace!("[{}] sending ack on Forward TSN", self.side);
1400            self.ack_state = AckState::Immediate;
1401            self.timers.stop(Timer::Ack);
1402            self.awake_write_loop();
1403            return Ok(vec![]);
1404        }
1405
1406        // From RFC 3758 Sec 3.6:
1407        //   the receiver MUST perform the same TSN handling, including duplicate
1408        //   detection, gap detection, SACK generation, cumulative TSN
1409        //   advancement, etc. as defined in RFC 2960 [2]---with the following
1410        //   exceptions and additions.
1411
1412        //   When a FORWARD TSN chunk arrives, the data receiver MUST first update
1413        //   its cumulative TSN point to the value carried in the FORWARD TSN
1414        //   chunk,
1415
1416        // Advance peer_last_tsn
1417        while sna32lt(self.peer_last_tsn, c.new_cumulative_tsn) {
1418            self.payload_queue.pop(self.peer_last_tsn + 1); // may not exist
1419            self.peer_last_tsn += 1;
1420        }
1421
1422        // Report new peer_last_tsn value and abandoned largest SSN value to
1423        // corresponding streams so that the abandoned chunks can be removed
1424        // from the reassemblyQueue.
1425        for forwarded in &c.streams {
1426            if let Some(s) = self.streams.get_mut(&forwarded.identifier) {
1427                s.handle_forward_tsn_for_ordered(forwarded.sequence);
1428                if s.reassembly_queue.is_readable() {
1429                    self.events.push_back(Event::Stream(StreamEvent::Readable {
1430                        id: s.stream_identifier,
1431                    }));
1432                }
1433            }
1434        }
1435
1436        // TSN may be forwarded for unordered chunks. ForwardTSN chunk does not
1437        // report which stream identifier it skipped for unordered chunks.
1438        // Therefore, we need to broadcast this event to all existing streams for
1439        // unordered chunks.
1440        for s in self.streams.values_mut() {
1441            s.handle_forward_tsn_for_unordered(c.new_cumulative_tsn);
1442            if s.reassembly_queue.is_readable() {
1443                self.events.push_back(Event::Stream(StreamEvent::Readable {
1444                    id: s.stream_identifier,
1445                }));
1446            }
1447        }
1448
1449        self.handle_peer_last_tsn_and_acknowledgement(false)
1450    }
1451
1452    fn handle_shutdown(&mut self, _: &ChunkShutdown) -> Result<Vec<Packet>> {
1453        let state = self.state();
1454
1455        if state == AssociationState::Established {
1456            if !self.inflight_queue.is_empty() {
1457                self.set_state(AssociationState::ShutdownReceived);
1458            } else {
1459                // No more outstanding, send shutdown ack.
1460                self.will_send_shutdown_ack = true;
1461                self.set_state(AssociationState::ShutdownAckSent);
1462
1463                self.awake_write_loop();
1464            }
1465        } else if state == AssociationState::ShutdownSent {
1466            // self.cumulative_tsn_ack_point = c.cumulative_tsn_ack
1467
1468            self.will_send_shutdown_ack = true;
1469            self.set_state(AssociationState::ShutdownAckSent);
1470
1471            self.awake_write_loop();
1472        }
1473
1474        Ok(vec![])
1475    }
1476
1477    fn handle_shutdown_ack(&mut self, _: &ChunkShutdownAck) -> Result<Vec<Packet>> {
1478        let state = self.state();
1479        if state == AssociationState::ShutdownSent || state == AssociationState::ShutdownAckSent {
1480            self.timers.stop(Timer::T2Shutdown);
1481            self.will_send_shutdown_complete = true;
1482
1483            self.awake_write_loop();
1484        }
1485
1486        Ok(vec![])
1487    }
1488
1489    fn handle_shutdown_complete(&mut self, _: &ChunkShutdownComplete) -> Result<Vec<Packet>> {
1490        let state = self.state();
1491        if state == AssociationState::ShutdownAckSent {
1492            self.timers.stop(Timer::T2Shutdown);
1493            self.close(AssociationError::AssociationClosed)?;
1494        }
1495
1496        Ok(vec![])
1497    }
1498
1499    /// A common routine for handle_data and handle_forward_tsn routines
1500    fn handle_peer_last_tsn_and_acknowledgement(
1501        &mut self,
1502        sack_immediately: bool,
1503    ) -> Result<Vec<Packet>> {
1504        let mut reply = vec![];
1505
1506        // Try to advance peer_last_tsn
1507
1508        // From RFC 3758 Sec 3.6:
1509        //   .. and then MUST further advance its cumulative TSN point locally
1510        //   if possible
1511        // Meaning, if peer_last_tsn+1 points to a chunk that is received,
1512        // advance peer_last_tsn until peer_last_tsn+1 points to unreceived chunk.
1513        //debug!("[{}] peer_last_tsn = {}", self.side, self.peer_last_tsn);
1514        while self.payload_queue.pop(self.peer_last_tsn + 1).is_some() {
1515            self.peer_last_tsn += 1;
1516            //debug!("[{}] peer_last_tsn = {}", self.side, self.peer_last_tsn);
1517
1518            let rst_reqs: Vec<ParamOutgoingResetRequest> =
1519                self.reconfig_requests.values().cloned().collect();
1520            for rst_req in rst_reqs {
1521                self.reset_streams_if_any(&rst_req, false, &mut reply)?;
1522            }
1523        }
1524
1525        let has_packet_loss = !self.payload_queue.is_empty();
1526        if has_packet_loss {
1527            trace!(
1528                "[{}] packetloss: {}",
1529                self.side,
1530                self.payload_queue
1531                    .get_gap_ack_blocks_string(self.peer_last_tsn)
1532            );
1533        }
1534
1535        if (self.ack_state != AckState::Immediate
1536            && !sack_immediately
1537            && !has_packet_loss
1538            && self.ack_mode == AckMode::Normal)
1539            || self.ack_mode == AckMode::AlwaysDelay
1540        {
1541            if self.ack_state == AckState::Idle {
1542                self.delayed_ack_triggered = true;
1543            } else {
1544                self.immediate_ack_triggered = true;
1545            }
1546        } else {
1547            self.immediate_ack_triggered = true;
1548        }
1549
1550        Ok(reply)
1551    }
1552
1553    #[allow(clippy::borrowed_box)]
1554    fn handle_reconfig_param(
1555        &mut self,
1556        raw: &Box<dyn Param>,
1557        reply: &mut Vec<Packet>,
1558    ) -> Result<()> {
1559        if let Some(p) = raw.as_any().downcast_ref::<ParamOutgoingResetRequest>() {
1560            self.reconfig_requests
1561                .insert(p.reconfig_request_sequence_number, p.clone());
1562            self.reset_streams_if_any(p, true, reply)?;
1563            Ok(())
1564        } else if let Some(p) = raw.as_any().downcast_ref::<ParamReconfigResponse>() {
1565            self.reconfigs.remove(&p.reconfig_response_sequence_number);
1566            if self.reconfigs.is_empty() {
1567                self.timers.stop(Timer::Reconfig);
1568            }
1569            Ok(())
1570        } else {
1571            Err(Error::ErrParameterType)
1572        }
1573    }
1574
1575    fn process_selective_ack(
1576        &mut self,
1577        d: &ChunkSelectiveAck,
1578        now: Instant,
1579    ) -> Result<(HashMap<u16, i64>, u32)> {
1580        let mut bytes_acked_per_stream = HashMap::new();
1581
1582        // New ack point, so pop all ACKed packets from inflight_queue
1583        // We add 1 because the "currentAckPoint" has already been popped from the inflight queue
1584        // For the first SACK we take care of this by setting the ackpoint to cumAck - 1
1585        let mut i = self.cumulative_tsn_ack_point + 1;
1586        //log::debug!("[{}] i={} d={}", self.name, i, d.cumulative_tsn_ack);
1587        while sna32lte(i, d.cumulative_tsn_ack) {
1588            if let Some(c) = self.inflight_queue.pop(i) {
1589                if !c.acked {
1590                    // RFC 4096 sec 6.3.2.  Retransmission Timer Rules
1591                    //   R3)  Whenever a SACK is received that acknowledges the DATA chunk
1592                    //        with the earliest outstanding TSN for that address, restart the
1593                    //        T3-rtx timer for that address with its current RTO (if there is
1594                    //        still outstanding data on that address).
1595                    if i == self.cumulative_tsn_ack_point + 1 {
1596                        // T3 timer needs to be reset. Stop it for now.
1597                        self.timers.stop(Timer::T3RTX);
1598                    }
1599
1600                    let n_bytes_acked = c.user_data.len() as i64;
1601
1602                    // Sum the number of bytes acknowledged per stream
1603                    if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1604                        *amount += n_bytes_acked;
1605                    } else {
1606                        bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1607                    }
1608
1609                    // RFC 4960 sec 6.3.1.  RTO Calculation
1610                    //   C4)  When data is in flight and when allowed by rule C5 below, a new
1611                    //        RTT measurement MUST be made each round trip.  Furthermore, new
1612                    //        RTT measurements SHOULD be made no more than once per round trip
1613                    //        for a given destination transport address.
1614                    //   C5)  Karn's algorithm: RTT measurements MUST NOT be made using
1615                    //        packets that were retransmitted (and thus for which it is
1616                    //        ambiguous whether the reply was for the first instance of the
1617                    //        chunk or for a later instance)
1618                    if c.nsent == 1 && sna32gte(c.tsn, self.min_tsn2measure_rtt) {
1619                        self.min_tsn2measure_rtt = self.my_next_tsn;
1620                        if let Some(since) = &c.since {
1621                            let rtt = now.duration_since(*since);
1622                            let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1623                            trace!(
1624                                "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1625                                self.side,
1626                                rtt.as_millis(),
1627                                srtt,
1628                                self.rto_mgr.get_rto()
1629                            );
1630                        } else {
1631                            error!("[{}] invalid c.since", self.side);
1632                        }
1633                    }
1634                }
1635
1636                if self.in_fast_recovery && c.tsn == self.fast_recover_exit_point {
1637                    debug!("[{}] exit fast-recovery", self.side);
1638                    self.in_fast_recovery = false;
1639                }
1640            } else {
1641                return Err(Error::ErrInflightQueueTsnPop);
1642            }
1643
1644            i += 1;
1645        }
1646
1647        let mut htna = d.cumulative_tsn_ack;
1648
1649        // Mark selectively acknowledged chunks as "acked"
1650        for g in &d.gap_ack_blocks {
1651            for i in g.start..=g.end {
1652                let tsn = d.cumulative_tsn_ack + i as u32;
1653
1654                let (is_existed, is_acked) = if let Some(c) = self.inflight_queue.get(tsn) {
1655                    (true, c.acked)
1656                } else {
1657                    (false, false)
1658                };
1659                let n_bytes_acked = if is_existed && !is_acked {
1660                    self.inflight_queue.mark_as_acked(tsn) as i64
1661                } else {
1662                    0
1663                };
1664
1665                if let Some(c) = self.inflight_queue.get(tsn) {
1666                    if !is_acked {
1667                        // Sum the number of bytes acknowledged per stream
1668                        if let Some(amount) = bytes_acked_per_stream.get_mut(&c.stream_identifier) {
1669                            *amount += n_bytes_acked;
1670                        } else {
1671                            bytes_acked_per_stream.insert(c.stream_identifier, n_bytes_acked);
1672                        }
1673
1674                        trace!("[{}] tsn={} has been sacked", self.side, c.tsn);
1675
1676                        if c.nsent == 1 {
1677                            self.min_tsn2measure_rtt = self.my_next_tsn;
1678                            if let Some(since) = &c.since {
1679                                let rtt = now.duration_since(*since);
1680                                let srtt = self.rto_mgr.set_new_rtt(rtt.as_millis() as u64);
1681                                trace!(
1682                                    "[{}] SACK: measured-rtt={} srtt={} new-rto={}",
1683                                    self.side,
1684                                    rtt.as_millis(),
1685                                    srtt,
1686                                    self.rto_mgr.get_rto()
1687                                );
1688                            } else {
1689                                error!("[{}] invalid c.since", self.side);
1690                            }
1691                        }
1692
1693                        if sna32lt(htna, tsn) {
1694                            htna = tsn;
1695                        }
1696                    }
1697                } else {
1698                    return Err(Error::ErrTsnRequestNotExist);
1699                }
1700            }
1701        }
1702
1703        Ok((bytes_acked_per_stream, htna))
1704    }
1705
1706    fn on_cumulative_tsn_ack_point_advanced(&mut self, total_bytes_acked: i64, now: Instant) {
1707        // RFC 4096, sec 6.3.2.  Retransmission Timer Rules
1708        //   R2)  Whenever all outstanding data sent to an address have been
1709        //        acknowledged, turn off the T3-rtx timer of that address.
1710        if self.inflight_queue.is_empty() {
1711            trace!(
1712                "[{}] SACK: no more packet in-flight (pending={})",
1713                self.side,
1714                self.pending_queue.len()
1715            );
1716            self.timers.stop(Timer::T3RTX);
1717        } else {
1718            trace!("[{}] T3-rtx timer start (pt2)", self.side);
1719            self.timers
1720                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1721        }
1722
1723        // Update congestion control parameters
1724        if self.cwnd <= self.ssthresh {
1725            // RFC 4096, sec 7.2.1.  Slow-Start
1726            //   o  When cwnd is less than or equal to ssthresh, an SCTP endpoint MUST
1727            //		use the slow-start algorithm to increase cwnd only if the current
1728            //      congestion window is being fully utilized, an incoming SACK
1729            //      advances the Cumulative TSN Ack Point, and the data sender is not
1730            //      in Fast Recovery.  Only when these three conditions are met can
1731            //      the cwnd be increased; otherwise, the cwnd MUST not be increased.
1732            //		If these conditions are met, then cwnd MUST be increased by, at
1733            //      most, the lesser of 1) the total size of the previously
1734            //      outstanding DATA chunk(s) acknowledged, and 2) the destination's
1735            //      path MTU.
1736            if !self.in_fast_recovery && !self.pending_queue.is_empty() {
1737                self.cwnd += std::cmp::min(total_bytes_acked as u32, self.cwnd); // TCP way
1738                // self.cwnd += min32(uint32(total_bytes_acked), self.mtu) // SCTP way (slow)
1739                trace!(
1740                    "[{}] updated cwnd={} ssthresh={} acked={} (SS)",
1741                    self.side, self.cwnd, self.ssthresh, total_bytes_acked
1742                );
1743            } else {
1744                trace!(
1745                    "[{}] cwnd did not grow: cwnd={} ssthresh={} acked={} FR={} pending={}",
1746                    self.side,
1747                    self.cwnd,
1748                    self.ssthresh,
1749                    total_bytes_acked,
1750                    self.in_fast_recovery,
1751                    self.pending_queue.len()
1752                );
1753            }
1754        } else {
1755            // RFC 4096, sec 7.2.2.  Congestion Avoidance
1756            //   o  Whenever cwnd is greater than ssthresh, upon each SACK arrival
1757            //      that advances the Cumulative TSN Ack Point, increase
1758            //      partial_bytes_acked by the total number of bytes of all new chunks
1759            //      acknowledged in that SACK including chunks acknowledged by the new
1760            //      Cumulative TSN Ack and by Gap Ack Blocks.
1761            self.partial_bytes_acked += total_bytes_acked as u32;
1762
1763            //   o  When partial_bytes_acked is equal to or greater than cwnd and
1764            //      before the arrival of the SACK the sender had cwnd or more bytes
1765            //      of data outstanding (i.e., before arrival of the SACK, flight size
1766            //      was greater than or equal to cwnd), increase cwnd by MTU, and
1767            //      reset partial_bytes_acked to (partial_bytes_acked - cwnd).
1768            if self.partial_bytes_acked >= self.cwnd && !self.pending_queue.is_empty() {
1769                self.partial_bytes_acked -= self.cwnd;
1770                self.cwnd += self.mtu;
1771                trace!(
1772                    "[{}] updated cwnd={} ssthresh={} acked={} (CA)",
1773                    self.side, self.cwnd, self.ssthresh, total_bytes_acked
1774                );
1775            }
1776        }
1777    }
1778
1779    fn process_fast_retransmission(
1780        &mut self,
1781        cum_tsn_ack_point: u32,
1782        htna: u32,
1783        cum_tsn_ack_point_advanced: bool,
1784    ) -> Result<()> {
1785        // HTNA algorithm - RFC 4960 Sec 7.2.4
1786        // Increment missIndicator of each chunks that the SACK reported missing
1787        // when either of the following is met:
1788        // a)  Not in fast-recovery
1789        //     miss indications are incremented only for missing TSNs prior to the
1790        //     highest TSN newly acknowledged in the SACK.
1791        // b)  In fast-recovery AND the Cumulative TSN Ack Point advanced
1792        //     the miss indications are incremented for all TSNs reported missing
1793        //     in the SACK.
1794        if !self.in_fast_recovery || cum_tsn_ack_point_advanced {
1795            let max_tsn = if !self.in_fast_recovery {
1796                // a) increment only for missing TSNs prior to the HTNA
1797                htna
1798            } else {
1799                // b) increment for all TSNs reported missing
1800                cum_tsn_ack_point + (self.inflight_queue.len() as u32) + 1
1801            };
1802
1803            let mut tsn = cum_tsn_ack_point + 1;
1804            while sna32lt(tsn, max_tsn) {
1805                if let Some(c) = self.inflight_queue.get_mut(tsn) {
1806                    if !c.acked && !c.abandoned() && c.miss_indicator < 3 {
1807                        c.miss_indicator += 1;
1808                        if c.miss_indicator == 3 && !self.in_fast_recovery {
1809                            // 2)  If not in Fast Recovery, adjust the ssthresh and cwnd of the
1810                            //     destination address(es) to which the missing DATA chunks were
1811                            //     last sent, according to the formula described in Section 7.2.3.
1812                            self.in_fast_recovery = true;
1813                            self.fast_recover_exit_point = htna;
1814                            self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
1815                            self.cwnd = self.ssthresh;
1816                            self.partial_bytes_acked = 0;
1817                            self.will_retransmit_fast = true;
1818
1819                            trace!(
1820                                "[{}] updated cwnd={} ssthresh={} inflight={} (FR)",
1821                                self.side,
1822                                self.cwnd,
1823                                self.ssthresh,
1824                                self.inflight_queue.get_num_bytes()
1825                            );
1826                        }
1827                    }
1828                } else {
1829                    return Err(Error::ErrTsnRequestNotExist);
1830                }
1831
1832                tsn += 1;
1833            }
1834        }
1835
1836        if self.in_fast_recovery && cum_tsn_ack_point_advanced {
1837            self.will_retransmit_fast = true;
1838        }
1839
1840        Ok(())
1841    }
1842
1843    /// The caller must hold the lock. This method was only added because the
1844    /// linter was complaining about the "cognitive complexity" of handle_sack.
1845    fn postprocess_sack(
1846        &mut self,
1847        state: AssociationState,
1848        mut should_awake_write_loop: bool,
1849        now: Instant,
1850    ) {
1851        if !self.inflight_queue.is_empty() {
1852            // Start timer. (noop if already started)
1853            trace!("[{}] T3-rtx timer start (pt3)", self.side);
1854            self.timers
1855                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
1856        } else if state == AssociationState::ShutdownPending {
1857            // No more outstanding, send shutdown.
1858            should_awake_write_loop = true;
1859            self.will_send_shutdown = true;
1860            self.set_state(AssociationState::ShutdownSent);
1861        } else if state == AssociationState::ShutdownReceived {
1862            // No more outstanding, send shutdown ack.
1863            should_awake_write_loop = true;
1864            self.will_send_shutdown_ack = true;
1865            self.set_state(AssociationState::ShutdownAckSent);
1866        }
1867
1868        if should_awake_write_loop {
1869            self.awake_write_loop();
1870        }
1871    }
1872
1873    fn reset_streams_if_any(
1874        &mut self,
1875        p: &ParamOutgoingResetRequest,
1876        respond: bool,
1877        reply: &mut Vec<Packet>,
1878    ) -> Result<()> {
1879        let mut result = ReconfigResult::SuccessPerformed;
1880        let mut sis_to_reset = vec![];
1881
1882        if sna32lte(p.sender_last_tsn, self.peer_last_tsn) {
1883            debug!(
1884                "[{}] resetStream(): senderLastTSN={} <= peer_last_tsn={}",
1885                self.side, p.sender_last_tsn, self.peer_last_tsn
1886            );
1887            for id in &p.stream_identifiers {
1888                if self.streams.contains_key(id) {
1889                    if respond {
1890                        sis_to_reset.push(*id);
1891                    }
1892                    self.unregister_stream(*id, AssociationError::Reset);
1893                }
1894            }
1895            self.reconfig_requests
1896                .remove(&p.reconfig_request_sequence_number);
1897        } else {
1898            debug!(
1899                "[{}] resetStream(): senderLastTSN={} > peer_last_tsn={}",
1900                self.side, p.sender_last_tsn, self.peer_last_tsn
1901            );
1902            result = ReconfigResult::InProgress;
1903        }
1904
1905        // Answer incoming reset requests with the same reset request, but with
1906        // reconfig_response_sequence_number.
1907        if !sis_to_reset.is_empty() {
1908            let rsn = self.generate_next_rsn();
1909            let tsn = self.my_next_tsn - 1;
1910
1911            let c = ChunkReconfig {
1912                param_a: Some(Box::new(ParamOutgoingResetRequest {
1913                    reconfig_request_sequence_number: rsn,
1914                    reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1915                    sender_last_tsn: tsn,
1916                    stream_identifiers: sis_to_reset,
1917                })),
1918                ..Default::default()
1919            };
1920
1921            self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
1922
1923            let p = self.create_packet(vec![Box::new(c)]);
1924            reply.push(p);
1925        }
1926
1927        let packet = self.create_packet(vec![Box::new(ChunkReconfig {
1928            param_a: Some(Box::new(ParamReconfigResponse {
1929                reconfig_response_sequence_number: p.reconfig_request_sequence_number,
1930                result,
1931            })),
1932            param_b: None,
1933        })]);
1934
1935        debug!("[{}] RESET RESPONSE: {}", self.side, packet);
1936
1937        reply.push(packet);
1938
1939        Ok(())
1940    }
1941
1942    /// create_packet wraps chunks in a packet.
1943    /// The caller should hold the read lock.
1944    pub(crate) fn create_packet(&self, chunks: Vec<Box<dyn Chunk>>) -> Packet {
1945        Packet {
1946            common_header: CommonHeader {
1947                verification_tag: self.peer_verification_tag,
1948                source_port: self.source_port,
1949                destination_port: self.destination_port,
1950            },
1951            chunks,
1952        }
1953    }
1954
1955    /// create_stream creates a stream. The caller should hold the lock and check no stream exists for this id.
1956    fn create_stream(
1957        &mut self,
1958        stream_identifier: StreamId,
1959        accept: bool,
1960        default_payload_type: PayloadProtocolIdentifier,
1961    ) -> Option<Stream<'_>> {
1962        let s = StreamState::new(
1963            self.side,
1964            stream_identifier,
1965            self.max_payload_size,
1966            default_payload_type,
1967        );
1968
1969        if accept {
1970            self.stream_queue.push_back(stream_identifier);
1971            self.events.push_back(Event::Stream(StreamEvent::Opened {
1972                id: stream_identifier,
1973            }));
1974        }
1975
1976        self.streams.insert(stream_identifier, s);
1977
1978        Some(Stream {
1979            stream_identifier,
1980            association: self,
1981        })
1982    }
1983
1984    /// get_or_create_stream gets or creates a stream. The caller should hold the lock.
1985    fn get_or_create_stream(&mut self, stream_identifier: StreamId) -> Option<Stream<'_>> {
1986        if self.streams.contains_key(&stream_identifier) {
1987            Some(Stream {
1988                stream_identifier,
1989                association: self,
1990            })
1991        } else {
1992            self.create_stream(
1993                stream_identifier,
1994                true,
1995                PayloadProtocolIdentifier::default(),
1996            )
1997        }
1998    }
1999
2000    pub(crate) fn get_my_receiver_window_credit(&self) -> u32 {
2001        let mut bytes_queued = 0;
2002        for s in self.streams.values() {
2003            bytes_queued += s.get_num_bytes_in_reassembly_queue() as u32;
2004        }
2005
2006        self.max_receive_buffer_size.saturating_sub(bytes_queued)
2007    }
2008
2009    /// gather_outbound gathers outgoing packets. The returned bool value set to
2010    /// false means the association should be closed down after the final send.
2011    fn gather_outbound(&mut self, now: Instant) -> (Vec<Bytes>, bool) {
2012        let mut raw_packets = vec![];
2013
2014        if !self.control_queue.is_empty() {
2015            for p in self.control_queue.drain(..) {
2016                if let Ok(raw) = p.marshal() {
2017                    raw_packets.push(raw);
2018                } else {
2019                    warn!("[{}] failed to serialize a control packet", self.side);
2020                    continue;
2021                }
2022            }
2023        }
2024
2025        let state = self.state();
2026        match state {
2027            AssociationState::Established => {
2028                raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2029                raw_packets = self.gather_outbound_data_and_reconfig_packets(raw_packets, now);
2030                raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2031                raw_packets = self.gather_outbound_sack_packets(raw_packets);
2032                raw_packets = self.gather_outbound_forward_tsn_packets(raw_packets);
2033                (raw_packets, true)
2034            }
2035            AssociationState::ShutdownPending
2036            | AssociationState::ShutdownSent
2037            | AssociationState::ShutdownReceived => {
2038                raw_packets = self.gather_data_packets_to_retransmit(raw_packets, now);
2039                raw_packets = self.gather_outbound_fast_retransmission_packets(raw_packets, now);
2040                raw_packets = self.gather_outbound_sack_packets(raw_packets);
2041                self.gather_outbound_shutdown_packets(raw_packets, now)
2042            }
2043            AssociationState::ShutdownAckSent => {
2044                self.gather_outbound_shutdown_packets(raw_packets, now)
2045            }
2046            _ => (raw_packets, true),
2047        }
2048    }
2049
2050    fn gather_data_packets_to_retransmit(
2051        &mut self,
2052        mut raw_packets: Vec<Bytes>,
2053        now: Instant,
2054    ) -> Vec<Bytes> {
2055        for p in &self.get_data_packets_to_retransmit(now) {
2056            if let Ok(raw) = p.marshal() {
2057                raw_packets.push(raw);
2058            } else {
2059                warn!(
2060                    "[{}] failed to serialize a DATA packet to be retransmitted",
2061                    self.side
2062                );
2063            }
2064        }
2065
2066        raw_packets
2067    }
2068
2069    fn gather_outbound_data_and_reconfig_packets(
2070        &mut self,
2071        mut raw_packets: Vec<Bytes>,
2072        now: Instant,
2073    ) -> Vec<Bytes> {
2074        // Pop unsent data chunks from the pending queue to send as much as
2075        // cwnd and rwnd allow.
2076        let (chunks, sis_to_reset) = self.pop_pending_data_chunks_to_send(now);
2077        if !chunks.is_empty() {
2078            // Start timer. (noop if already started)
2079            trace!("[{}] T3-rtx timer start (pt1)", self.side);
2080            self.timers
2081                .restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
2082
2083            for p in &self.bundle_data_chunks_into_packets(chunks) {
2084                if let Ok(raw) = p.marshal() {
2085                    raw_packets.push(raw);
2086                } else {
2087                    warn!("[{}] failed to serialize a DATA packet", self.side);
2088                }
2089            }
2090        }
2091
2092        if !sis_to_reset.is_empty() || self.will_retransmit_reconfig {
2093            if self.will_retransmit_reconfig {
2094                self.will_retransmit_reconfig = false;
2095                debug!(
2096                    "[{}] retransmit {} RECONFIG chunk(s)",
2097                    self.side,
2098                    self.reconfigs.len()
2099                );
2100                for c in self.reconfigs.values() {
2101                    let p = self.create_packet(vec![Box::new(c.clone())]);
2102                    if let Ok(raw) = p.marshal() {
2103                        raw_packets.push(raw);
2104                    } else {
2105                        warn!(
2106                            "[{}] failed to serialize a RECONFIG packet to be retransmitted",
2107                            self.side,
2108                        );
2109                    }
2110                }
2111            }
2112
2113            if !sis_to_reset.is_empty() {
2114                let rsn = self.generate_next_rsn();
2115                let tsn = self.my_next_tsn - 1;
2116                debug!(
2117                    "[{}] sending RECONFIG: rsn={} tsn={} streams={:?}",
2118                    self.side,
2119                    rsn,
2120                    self.my_next_tsn - 1,
2121                    sis_to_reset
2122                );
2123
2124                let c = ChunkReconfig {
2125                    param_a: Some(Box::new(ParamOutgoingResetRequest {
2126                        reconfig_request_sequence_number: rsn,
2127                        sender_last_tsn: tsn,
2128                        stream_identifiers: sis_to_reset,
2129                        ..Default::default()
2130                    })),
2131                    ..Default::default()
2132                };
2133                self.reconfigs.insert(rsn, c.clone()); // store in the map for retransmission
2134
2135                let p = self.create_packet(vec![Box::new(c)]);
2136                if let Ok(raw) = p.marshal() {
2137                    raw_packets.push(raw);
2138                } else {
2139                    warn!(
2140                        "[{}] failed to serialize a RECONFIG packet to be transmitted",
2141                        self.side
2142                    );
2143                }
2144            }
2145
2146            if !self.reconfigs.is_empty() {
2147                self.timers
2148                    .start(Timer::Reconfig, now, self.rto_mgr.get_rto());
2149            }
2150        }
2151
2152        raw_packets
2153    }
2154
2155    fn gather_outbound_fast_retransmission_packets(
2156        &mut self,
2157        mut raw_packets: Vec<Bytes>,
2158        now: Instant,
2159    ) -> Vec<Bytes> {
2160        if self.will_retransmit_fast {
2161            self.will_retransmit_fast = false;
2162
2163            let mut to_fast_retrans: Vec<Box<dyn Chunk>> = vec![];
2164            let mut fast_retrans_size = COMMON_HEADER_SIZE;
2165
2166            let mut i = 0;
2167            loop {
2168                let tsn = self.cumulative_tsn_ack_point + i + 1;
2169                if let Some(c) = self.inflight_queue.get_mut(tsn) {
2170                    if c.acked || c.abandoned() || c.nsent > 1 || c.miss_indicator < 3 {
2171                        i += 1;
2172                        continue;
2173                    }
2174
2175                    // RFC 4960 Sec 7.2.4 Fast Retransmit on Gap Reports
2176                    //  3)  Determine how many of the earliest (i.e., lowest TSN) DATA chunks
2177                    //      marked for retransmission will fit into a single packet, subject
2178                    //      to constraint of the path MTU of the destination transport
2179                    //      address to which the packet is being sent.  Call this value K.
2180                    //      Retransmit those K DATA chunks in a single packet.  When a Fast
2181                    //      Retransmit is being performed, the sender SHOULD ignore the value
2182                    //      of cwnd and SHOULD NOT delay retransmission for this single
2183                    //		packet.
2184
2185                    let data_chunk_size = DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2186                    if self.mtu < fast_retrans_size + data_chunk_size {
2187                        break;
2188                    }
2189
2190                    fast_retrans_size += data_chunk_size;
2191                    self.stats.inc_fast_retrans();
2192                    c.nsent += 1;
2193                } else {
2194                    break; // end of pending data
2195                }
2196
2197                if let Some(c) = self.inflight_queue.get_mut(tsn) {
2198                    Association::check_partial_reliability_status(
2199                        c,
2200                        now,
2201                        self.use_forward_tsn,
2202                        self.side,
2203                        &self.streams,
2204                    );
2205                    to_fast_retrans.push(Box::new(c.clone()));
2206                    trace!(
2207                        "[{}] fast-retransmit: tsn={} sent={} htna={}",
2208                        self.side, c.tsn, c.nsent, self.fast_recover_exit_point
2209                    );
2210                }
2211                i += 1;
2212            }
2213
2214            if !to_fast_retrans.is_empty() {
2215                if let Ok(raw) = self.create_packet(to_fast_retrans).marshal() {
2216                    raw_packets.push(raw);
2217                } else {
2218                    warn!(
2219                        "[{}] failed to serialize a DATA packet to be fast-retransmitted",
2220                        self.side
2221                    );
2222                }
2223            }
2224        }
2225
2226        raw_packets
2227    }
2228
2229    fn gather_outbound_sack_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2230        if self.ack_state == AckState::Immediate {
2231            self.ack_state = AckState::Idle;
2232            let sack = self.create_selective_ack_chunk();
2233            debug!("[{}] sending SACK: {}", self.side, sack);
2234            if let Ok(raw) = self.create_packet(vec![Box::new(sack)]).marshal() {
2235                raw_packets.push(raw);
2236            } else {
2237                warn!("[{}] failed to serialize a SACK packet", self.side);
2238            }
2239        }
2240
2241        raw_packets
2242    }
2243
2244    fn gather_outbound_forward_tsn_packets(&mut self, mut raw_packets: Vec<Bytes>) -> Vec<Bytes> {
2245        /*log::debug!(
2246            "[{}] gatherOutboundForwardTSNPackets {}",
2247            self.name,
2248            self.will_send_forward_tsn
2249        );*/
2250        if self.will_send_forward_tsn {
2251            self.will_send_forward_tsn = false;
2252            if sna32gt(
2253                self.advanced_peer_tsn_ack_point,
2254                self.cumulative_tsn_ack_point,
2255            ) {
2256                let fwd_tsn = self.create_forward_tsn();
2257                if let Ok(raw) = self.create_packet(vec![Box::new(fwd_tsn)]).marshal() {
2258                    raw_packets.push(raw);
2259                } else {
2260                    warn!("[{}] failed to serialize a Forward TSN packet", self.side);
2261                }
2262            }
2263        }
2264
2265        raw_packets
2266    }
2267
2268    fn gather_outbound_shutdown_packets(
2269        &mut self,
2270        mut raw_packets: Vec<Bytes>,
2271        now: Instant,
2272    ) -> (Vec<Bytes>, bool) {
2273        let mut ok = true;
2274
2275        if self.will_send_shutdown {
2276            self.will_send_shutdown = false;
2277
2278            let shutdown = ChunkShutdown {
2279                cumulative_tsn_ack: self.cumulative_tsn_ack_point,
2280            };
2281
2282            if let Ok(raw) = self.create_packet(vec![Box::new(shutdown)]).marshal() {
2283                self.timers
2284                    .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2285                raw_packets.push(raw);
2286            } else {
2287                warn!("[{}] failed to serialize a Shutdown packet", self.side);
2288            }
2289        } else if self.will_send_shutdown_ack {
2290            self.will_send_shutdown_ack = false;
2291
2292            let shutdown_ack = ChunkShutdownAck {};
2293
2294            if let Ok(raw) = self.create_packet(vec![Box::new(shutdown_ack)]).marshal() {
2295                self.timers
2296                    .start(Timer::T2Shutdown, now, self.rto_mgr.get_rto());
2297                raw_packets.push(raw);
2298            } else {
2299                warn!("[{}] failed to serialize a ShutdownAck packet", self.side);
2300            }
2301        } else if self.will_send_shutdown_complete {
2302            self.will_send_shutdown_complete = false;
2303
2304            let shutdown_complete = ChunkShutdownComplete {};
2305
2306            if let Ok(raw) = self
2307                .create_packet(vec![Box::new(shutdown_complete)])
2308                .marshal()
2309            {
2310                raw_packets.push(raw);
2311                ok = false;
2312            } else {
2313                warn!(
2314                    "[{}] failed to serialize a ShutdownComplete packet",
2315                    self.side
2316                );
2317            }
2318        }
2319
2320        (raw_packets, ok)
2321    }
2322
2323    /// get_data_packets_to_retransmit is called when T3-rtx is timed out and retransmit outstanding data chunks
2324    /// that are not acked or abandoned yet.
2325    fn get_data_packets_to_retransmit(&mut self, now: Instant) -> Vec<Packet> {
2326        let awnd = std::cmp::min(self.cwnd, self.rwnd);
2327        let mut chunks = vec![];
2328        let mut bytes_to_send = 0;
2329        let mut done = false;
2330        let mut i = 0;
2331        while !done {
2332            let tsn = self.cumulative_tsn_ack_point + i + 1;
2333            if let Some(c) = self.inflight_queue.get_mut(tsn) {
2334                if !c.retransmit {
2335                    i += 1;
2336                    continue;
2337                }
2338
2339                if i == 0 && self.rwnd < c.user_data.len() as u32 {
2340                    // Send it as a zero window probe
2341                    done = true;
2342                } else if bytes_to_send + c.user_data.len() > awnd as usize {
2343                    break;
2344                }
2345
2346                // reset the retransmit flag not to retransmit again before the next
2347                // t3-rtx timer fires
2348                c.retransmit = false;
2349                bytes_to_send += c.user_data.len();
2350
2351                c.nsent += 1;
2352            } else {
2353                break; // end of pending data
2354            }
2355
2356            if let Some(c) = self.inflight_queue.get_mut(tsn) {
2357                Association::check_partial_reliability_status(
2358                    c,
2359                    now,
2360                    self.use_forward_tsn,
2361                    self.side,
2362                    &self.streams,
2363                );
2364
2365                trace!(
2366                    "[{}] retransmitting tsn={} ssn={} sent={}",
2367                    self.side, c.tsn, c.stream_sequence_number, c.nsent
2368                );
2369
2370                chunks.push(c.clone());
2371            }
2372            i += 1;
2373        }
2374
2375        self.bundle_data_chunks_into_packets(chunks)
2376    }
2377
2378    /// pop_pending_data_chunks_to_send pops chunks from the pending queues as many as
2379    /// the cwnd and rwnd allows to send.
2380    fn pop_pending_data_chunks_to_send(
2381        &mut self,
2382        now: Instant,
2383    ) -> (Vec<ChunkPayloadData>, Vec<u16>) {
2384        let mut chunks = vec![];
2385        let mut sis_to_reset = vec![]; // stream identifiers to reset
2386        if !self.pending_queue.is_empty() {
2387            // RFC 4960 sec 6.1.  Transmission of DATA Chunks
2388            //   A) At any given time, the data sender MUST NOT transmit new data to
2389            //      any destination transport address if its peer's rwnd indicates
2390            //      that the peer has no buffer space (i.e., rwnd is 0; see Section
2391            //      6.2.1).  However, regardless of the value of rwnd (including if it
2392            //      is 0), the data sender can always have one DATA chunk in flight to
2393            //      the receiver if allowed by cwnd (see rule B, below).
2394
2395            while let Some(c) = self.pending_queue.peek() {
2396                let (beginning_fragment, unordered, data_len, stream_identifier) = (
2397                    c.beginning_fragment,
2398                    c.unordered,
2399                    c.user_data.len(),
2400                    c.stream_identifier,
2401                );
2402
2403                if data_len == 0 {
2404                    sis_to_reset.push(stream_identifier);
2405                    if self
2406                        .pending_queue
2407                        .pop(beginning_fragment, unordered)
2408                        .is_none()
2409                    {
2410                        error!("[{}] failed to pop from pending queue", self.side);
2411                    }
2412                    continue;
2413                }
2414
2415                if self.inflight_queue.get_num_bytes() + data_len > self.cwnd as usize {
2416                    break; // would exceeds cwnd
2417                }
2418
2419                if data_len > self.rwnd as usize {
2420                    break; // no more rwnd
2421                }
2422
2423                self.rwnd -= data_len as u32;
2424
2425                if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2426                    beginning_fragment,
2427                    unordered,
2428                    now,
2429                ) {
2430                    chunks.push(chunk);
2431                }
2432            }
2433
2434            // the data sender can always have one DATA chunk in flight to the receiver
2435            if chunks.is_empty() && self.inflight_queue.is_empty() {
2436                // Send zero window probe
2437                if let Some(c) = self.pending_queue.peek() {
2438                    let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
2439
2440                    if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
2441                        beginning_fragment,
2442                        unordered,
2443                        now,
2444                    ) {
2445                        chunks.push(chunk);
2446                    }
2447                }
2448            }
2449        }
2450
2451        (chunks, sis_to_reset)
2452    }
2453
2454    /// bundle_data_chunks_into_packets packs DATA chunks into packets. It tries to bundle
2455    /// DATA chunks into a packet so long as the resulting packet size does not exceed
2456    /// the path MTU.
2457    fn bundle_data_chunks_into_packets(&self, chunks: Vec<ChunkPayloadData>) -> Vec<Packet> {
2458        let mut packets = vec![];
2459        let mut chunks_to_send = vec![];
2460        let mut bytes_in_packet = COMMON_HEADER_SIZE;
2461
2462        for c in chunks {
2463            // RFC 4960 sec 6.1.  Transmission of DATA Chunks
2464            //   Multiple DATA chunks committed for transmission MAY be bundled in a
2465            //   single packet.  Furthermore, DATA chunks being retransmitted MAY be
2466            //   bundled with new DATA chunks, as long as the resulting packet size
2467            //   does not exceed the path MTU.
2468            if bytes_in_packet + c.user_data.len() as u32 > self.mtu {
2469                packets.push(self.create_packet(chunks_to_send));
2470                chunks_to_send = vec![];
2471                bytes_in_packet = COMMON_HEADER_SIZE;
2472            }
2473
2474            bytes_in_packet += DATA_CHUNK_HEADER_SIZE + c.user_data.len() as u32;
2475            chunks_to_send.push(Box::new(c));
2476        }
2477
2478        if !chunks_to_send.is_empty() {
2479            packets.push(self.create_packet(chunks_to_send));
2480        }
2481
2482        packets
2483    }
2484
2485    /// generate_next_tsn returns the my_next_tsn and increases it. The caller should hold the lock.
2486    fn generate_next_tsn(&mut self) -> u32 {
2487        let tsn = self.my_next_tsn;
2488        self.my_next_tsn += 1;
2489        tsn
2490    }
2491
2492    /// generate_next_rsn returns the my_next_rsn and increases it. The caller should hold the lock.
2493    fn generate_next_rsn(&mut self) -> u32 {
2494        let rsn = self.my_next_rsn;
2495        self.my_next_rsn += 1;
2496        rsn
2497    }
2498
2499    fn check_partial_reliability_status(
2500        c: &mut ChunkPayloadData,
2501        now: Instant,
2502        use_forward_tsn: bool,
2503        side: Side,
2504        streams: &HashMap<u16, StreamState>,
2505    ) {
2506        if !use_forward_tsn {
2507            return;
2508        }
2509
2510        // draft-ietf-rtcweb-data-protocol-09.txt section 6
2511        //	6.  Procedures
2512        //		All Data Channel Establishment TransportProtocol messages MUST be sent using
2513        //		ordered delivery and reliable transmission.
2514        //
2515        if c.payload_type == PayloadProtocolIdentifier::Dcep {
2516            return;
2517        }
2518
2519        // PR-SCTP
2520        if let Some(s) = streams.get(&c.stream_identifier) {
2521            let reliability_type: ReliabilityType = s.reliability_type;
2522            let reliability_value = s.reliability_value;
2523
2524            if reliability_type == ReliabilityType::Rexmit {
2525                if c.nsent >= reliability_value {
2526                    c.set_abandoned(true);
2527                    trace!(
2528                        "[{}] marked as abandoned: tsn={} ppi={} (remix: {})",
2529                        side, c.tsn, c.payload_type, c.nsent
2530                    );
2531                }
2532            } else if reliability_type == ReliabilityType::Timed {
2533                if let Some(since) = &c.since {
2534                    let elapsed = now.duration_since(*since);
2535                    if elapsed.as_millis() as u32 >= reliability_value {
2536                        c.set_abandoned(true);
2537                        trace!(
2538                            "[{}] marked as abandoned: tsn={} ppi={} (timed: {:?})",
2539                            side, c.tsn, c.payload_type, elapsed
2540                        );
2541                    }
2542                } else {
2543                    error!("[{}] invalid c.since", side);
2544                }
2545            }
2546        } else {
2547            error!("[{}] stream {} not found)", side, c.stream_identifier);
2548        }
2549    }
2550
2551    fn create_selective_ack_chunk(&mut self) -> ChunkSelectiveAck {
2552        ChunkSelectiveAck {
2553            cumulative_tsn_ack: self.peer_last_tsn,
2554            advertised_receiver_window_credit: self.get_my_receiver_window_credit(),
2555            gap_ack_blocks: self.payload_queue.get_gap_ack_blocks(self.peer_last_tsn),
2556            duplicate_tsn: self.payload_queue.pop_duplicates(),
2557        }
2558    }
2559
2560    /// create_forward_tsn generates ForwardTSN chunk.
2561    /// This method will be be called if use_forward_tsn is set to false.
2562    fn create_forward_tsn(&self) -> ChunkForwardTsn {
2563        // RFC 3758 Sec 3.5 C4
2564        let mut stream_map: HashMap<u16, u16> = HashMap::new(); // to report only once per SI
2565        let mut i = self.cumulative_tsn_ack_point + 1;
2566        while sna32lte(i, self.advanced_peer_tsn_ack_point) {
2567            if let Some(c) = self.inflight_queue.get(i) {
2568                if let Some(ssn) = stream_map.get(&c.stream_identifier) {
2569                    if sna16lt(*ssn, c.stream_sequence_number) {
2570                        // to report only once with greatest SSN
2571                        stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2572                    }
2573                } else {
2574                    stream_map.insert(c.stream_identifier, c.stream_sequence_number);
2575                }
2576            } else {
2577                break;
2578            }
2579
2580            i += 1;
2581        }
2582
2583        let mut fwd_tsn = ChunkForwardTsn {
2584            new_cumulative_tsn: self.advanced_peer_tsn_ack_point,
2585            streams: vec![],
2586        };
2587
2588        let mut stream_str = String::new();
2589        for (si, ssn) in &stream_map {
2590            stream_str += format!("(si={} ssn={})", si, ssn).as_str();
2591            fwd_tsn.streams.push(ChunkForwardTsnStream {
2592                identifier: *si,
2593                sequence: *ssn,
2594            });
2595        }
2596        trace!(
2597            "[{}] building fwd_tsn: newCumulativeTSN={} cumTSN={} - {}",
2598            self.side, fwd_tsn.new_cumulative_tsn, self.cumulative_tsn_ack_point, stream_str
2599        );
2600
2601        fwd_tsn
2602    }
2603
2604    /// Move the chunk peeked with self.pending_queue.peek() to the inflight_queue.
2605    fn move_pending_data_chunk_to_inflight_queue(
2606        &mut self,
2607        beginning_fragment: bool,
2608        unordered: bool,
2609        now: Instant,
2610    ) -> Option<ChunkPayloadData> {
2611        if let Some(mut c) = self.pending_queue.pop(beginning_fragment, unordered) {
2612            // Mark all fragements are in-flight now
2613            if c.ending_fragment {
2614                c.set_all_inflight();
2615            }
2616
2617            // Assign TSN
2618            c.tsn = self.generate_next_tsn();
2619
2620            c.since = Some(now); // use to calculate RTT and also for maxPacketLifeTime
2621            c.nsent = 1; // being sent for the first time
2622
2623            Association::check_partial_reliability_status(
2624                &mut c,
2625                now,
2626                self.use_forward_tsn,
2627                self.side,
2628                &self.streams,
2629            );
2630
2631            trace!(
2632                "[{}] sending ppi={} tsn={} ssn={} sent={} len={} ({},{})",
2633                self.side,
2634                c.payload_type as u32,
2635                c.tsn,
2636                c.stream_sequence_number,
2637                c.nsent,
2638                c.user_data.len(),
2639                c.beginning_fragment,
2640                c.ending_fragment
2641            );
2642
2643            self.inflight_queue.push_no_check(c.clone());
2644
2645            Some(c)
2646        } else {
2647            error!("[{}] failed to pop from pending queue", self.side);
2648            None
2649        }
2650    }
2651
2652    pub(crate) fn send_reset_request(&mut self, stream_identifier: StreamId) -> Result<()> {
2653        let state = self.state();
2654        if state != AssociationState::Established {
2655            return Err(Error::ErrResetPacketInStateNotExist);
2656        }
2657
2658        // Create DATA chunk which only contains valid stream identifier with
2659        // nil userData and use it as a EOS from the stream.
2660        let c = ChunkPayloadData {
2661            stream_identifier,
2662            beginning_fragment: true,
2663            ending_fragment: true,
2664            user_data: Bytes::new(),
2665            ..Default::default()
2666        };
2667
2668        self.pending_queue.push(c);
2669        self.awake_write_loop();
2670
2671        Ok(())
2672    }
2673
2674    /// send_payload_data sends the data chunks.
2675    pub(crate) fn send_payload_data(&mut self, chunks: Vec<ChunkPayloadData>) -> Result<()> {
2676        let state = self.state();
2677        if state != AssociationState::Established {
2678            return Err(Error::ErrPayloadDataStateNotExist);
2679        }
2680
2681        // Push the chunks into the pending queue first.
2682        for c in chunks {
2683            self.pending_queue.push(c);
2684        }
2685
2686        self.awake_write_loop();
2687        Ok(())
2688    }
2689
2690    /// buffered_amount returns total amount (in bytes) of currently buffered user data.
2691    /// This is used only by testing.
2692    pub(crate) fn buffered_amount(&self) -> usize {
2693        self.pending_queue.get_num_bytes() + self.inflight_queue.get_num_bytes()
2694    }
2695
2696    fn awake_write_loop(&self) {
2697        // No Op on Purpose
2698    }
2699
2700    fn close_all_timers(&mut self) {
2701        // Close all retransmission & ack timers
2702        for timer in Timer::VALUES {
2703            self.timers.stop(timer);
2704        }
2705    }
2706
2707    fn on_ack_timeout(&mut self) {
2708        trace!(
2709            "[{}] ack timed out (ack_state: {})",
2710            self.side, self.ack_state
2711        );
2712        self.stats.inc_ack_timeouts();
2713        self.ack_state = AckState::Immediate;
2714        self.awake_write_loop();
2715    }
2716
2717    fn on_retransmission_timeout(&mut self, timer_id: Timer, n_rtos: usize) {
2718        match timer_id {
2719            Timer::T1Init => {
2720                if let Err(err) = self.send_init() {
2721                    debug!(
2722                        "[{}] failed to retransmit init (n_rtos={}): {:?}",
2723                        self.side, n_rtos, err
2724                    );
2725                }
2726            }
2727
2728            Timer::T1Cookie => {
2729                if let Err(err) = self.send_cookie_echo() {
2730                    debug!(
2731                        "[{}] failed to retransmit cookie-echo (n_rtos={}): {:?}",
2732                        self.side, n_rtos, err
2733                    );
2734                }
2735            }
2736
2737            Timer::T2Shutdown => {
2738                debug!(
2739                    "[{}] retransmission of shutdown timeout (n_rtos={})",
2740                    self.side, n_rtos
2741                );
2742                let state = self.state();
2743                match state {
2744                    AssociationState::ShutdownSent => {
2745                        self.will_send_shutdown = true;
2746                        self.awake_write_loop();
2747                    }
2748                    AssociationState::ShutdownAckSent => {
2749                        self.will_send_shutdown_ack = true;
2750                        self.awake_write_loop();
2751                    }
2752                    _ => {}
2753                }
2754            }
2755
2756            Timer::T3RTX => {
2757                self.stats.inc_t3timeouts();
2758
2759                // RFC 4960 sec 6.3.3
2760                //  E1)  For the destination address for which the timer expires, adjust
2761                //       its ssthresh with rules defined in Section 7.2.3 and set the
2762                //       cwnd <- MTU.
2763                // RFC 4960 sec 7.2.3
2764                //   When the T3-rtx timer expires on an address, SCTP should perform slow
2765                //   start by:
2766                //      ssthresh = max(cwnd/2, 4*MTU)
2767                //      cwnd = 1*MTU
2768
2769                self.ssthresh = std::cmp::max(self.cwnd / 2, 4 * self.mtu);
2770                self.cwnd = self.mtu;
2771                trace!(
2772                    "[{}] updated cwnd={} ssthresh={} inflight={} (RTO)",
2773                    self.side,
2774                    self.cwnd,
2775                    self.ssthresh,
2776                    self.inflight_queue.get_num_bytes()
2777                );
2778
2779                // RFC 3758 sec 3.5
2780                //  A5) Any time the T3-rtx timer expires, on any destination, the sender
2781                //  SHOULD try to advance the "Advanced.Peer.Ack.Point" by following
2782                //  the procedures outlined in C2 - C5.
2783                if self.use_forward_tsn {
2784                    // RFC 3758 Sec 3.5 C2
2785                    let mut i = self.advanced_peer_tsn_ack_point + 1;
2786                    while let Some(c) = self.inflight_queue.get(i) {
2787                        if !c.abandoned() {
2788                            break;
2789                        }
2790                        self.advanced_peer_tsn_ack_point = i;
2791                        i += 1;
2792                    }
2793
2794                    // RFC 3758 Sec 3.5 C3
2795                    if sna32gt(
2796                        self.advanced_peer_tsn_ack_point,
2797                        self.cumulative_tsn_ack_point,
2798                    ) {
2799                        self.will_send_forward_tsn = true;
2800                        debug!(
2801                            "[{}] on_retransmission_timeout {}: sna32GT({}, {})",
2802                            self.side,
2803                            self.will_send_forward_tsn,
2804                            self.advanced_peer_tsn_ack_point,
2805                            self.cumulative_tsn_ack_point
2806                        );
2807                    }
2808                }
2809
2810                debug!(
2811                    "[{}] T3-rtx timed out: n_rtos={} cwnd={} ssthresh={}",
2812                    self.side, n_rtos, self.cwnd, self.ssthresh
2813                );
2814
2815                self.inflight_queue.mark_all_to_retrasmit();
2816                self.awake_write_loop();
2817            }
2818
2819            Timer::Reconfig => {
2820                self.will_retransmit_reconfig = true;
2821                self.awake_write_loop();
2822            }
2823
2824            _ => {}
2825        }
2826    }
2827
2828    fn on_retransmission_failure(&mut self, id: Timer) {
2829        match id {
2830            Timer::T1Init => {
2831                error!("[{}] retransmission failure: T1-init", self.side);
2832                self.error = Some(AssociationError::HandshakeFailed(
2833                    Error::ErrHandshakeInitAck.to_string(),
2834                ));
2835            }
2836
2837            Timer::T1Cookie => {
2838                error!("[{}] retransmission failure: T1-cookie", self.side);
2839                self.error = Some(AssociationError::HandshakeFailed(
2840                    Error::ErrHandshakeCookieEcho.to_string(),
2841                ));
2842            }
2843
2844            Timer::T2Shutdown => {
2845                error!("[{}] retransmission failure: T2-shutdown", self.side);
2846            }
2847
2848            Timer::T3RTX => {
2849                // T3-rtx timer will not fail by design
2850                // Justifications:
2851                //  * ICE would fail if the connectivity is lost
2852                //  * WebRTC spec is not clear how this incident should be reported to ULP
2853                error!("[{}] retransmission failure: T3-rtx (DATA)", self.side);
2854            }
2855
2856            _ => {}
2857        }
2858    }
2859
2860    /// Whether no timers are running
2861    #[cfg(test)]
2862    pub(crate) fn is_idle(&self) -> bool {
2863        Timer::VALUES
2864            .iter()
2865            //.filter(|&&t| t != Timer::KeepAlive && t != Timer::PushNewCid)
2866            .filter_map(|&t| Some((t, self.timers.get(t)?)))
2867            .min_by_key(|&(_, time)| time)
2868            //.map_or(true, |(timer, _)| timer == Timer::Idle)
2869            .is_none()
2870    }
2871}