webrtc_unreliable/
client.rs

1use std::{
2    collections::VecDeque,
3    error::Error,
4    fmt,
5    io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write},
6    iter::Iterator,
7    mem,
8    net::SocketAddr,
9    time::{Duration, Instant},
10};
11
12use openssl::{
13    error::ErrorStack as OpenSslErrorStack,
14    ssl::{
15        Error as SslError, ErrorCode, HandshakeError, MidHandshakeSslStream, ShutdownResult,
16        SslAcceptor, SslStream,
17    },
18};
19use rand::{thread_rng, Rng};
20
21use crate::buffer_pool::{BufferPool, OwnedBuffer};
22use crate::sctp::{
23    read_sctp_packet, write_sctp_packet, SctpChunk, SctpPacket, SctpWriteError,
24    SCTP_FLAG_BEGIN_FRAGMENT, SCTP_FLAG_COMPLETE_UNRELIABLE, SCTP_FLAG_END_FRAGMENT,
25};
26
27/// Heartbeat packets will be generated at a maximum of this rate (if the connection is otherwise
28/// idle).
29pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
30
31// Maximum theoretical UDP payload size
32pub const MAX_UDP_PAYLOAD_SIZE: usize = 65507;
33
34// Derived through experimentation, any larger and openssl reports 'dtls message too big'.
35pub const MAX_DTLS_MESSAGE_SIZE: usize = 16384;
36
37pub const MAX_SCTP_PACKET_SIZE: usize = MAX_DTLS_MESSAGE_SIZE;
38
39// The overhead of sending a single SCTP packet with a single data message.
40pub const SCTP_MESSAGE_OVERHEAD: usize = 28;
41
42/// Maximum supported theoretical size of a single WebRTC message, based on DTLS and SCTP packet
43/// size limits.
44///
45/// WebRTC makes no attempt at packet fragmentation and re-assembly or to support fragmented
46/// received messages, all sent and received unreliable messages must fit into a single SCTP packet.
47/// As such, this maximum size is almost certainly too large for browsers to actually support.
48/// Start with a much lower MTU (around 1200) and test it.
49pub const MAX_MESSAGE_LEN: usize = MAX_SCTP_PACKET_SIZE - SCTP_MESSAGE_OVERHEAD;
50
51#[derive(Debug)]
52pub enum ClientError {
53    TlsError(SslError),
54    OpenSslError(OpenSslErrorStack),
55    NotConnected,
56    NotEstablished,
57    IncompletePacketRead,
58    IncompletePacketWrite,
59}
60
61impl fmt::Display for ClientError {
62    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
63        match self {
64            ClientError::TlsError(err) => fmt::Display::fmt(err, f),
65            ClientError::OpenSslError(err) => fmt::Display::fmt(err, f),
66            ClientError::NotConnected => write!(f, "client is not connected"),
67            ClientError::NotEstablished => {
68                write!(f, "client does not have an established WebRTC data channel")
69            }
70            ClientError::IncompletePacketRead => {
71                write!(f, "WebRTC connection packet not completely read")
72            }
73            ClientError::IncompletePacketWrite => {
74                write!(f, "WebRTC connection packet not completely written")
75            }
76        }
77    }
78}
79
80impl Error for ClientError {}
81
82#[derive(Copy, Clone, Eq, PartialEq, Debug)]
83pub enum MessageType {
84    Text,
85    Binary,
86}
87
88pub struct Client {
89    buffer_pool: BufferPool,
90    remote_addr: SocketAddr,
91    ssl_state: ClientSslState,
92    client_state: ClientState,
93}
94
95impl Client {
96    pub fn new(
97        ssl_acceptor: &SslAcceptor,
98        buffer_pool: BufferPool,
99        remote_addr: SocketAddr,
100    ) -> Result<Client, OpenSslErrorStack> {
101        match ssl_acceptor.accept(ClientSslPackets {
102            buffer_pool: buffer_pool.clone(),
103            incoming_udp: VecDeque::new(),
104            outgoing_udp: VecDeque::new(),
105        }) {
106            Ok(_) => unreachable!("handshake cannot finish with no incoming packets"),
107            Err(HandshakeError::SetupFailure(err)) => return Err(err),
108            Err(HandshakeError::Failure(_)) => {
109                unreachable!("handshake cannot fail before starting")
110            }
111            Err(HandshakeError::WouldBlock(mid_handshake)) => Ok(Client {
112                buffer_pool,
113                remote_addr,
114                ssl_state: ClientSslState::Handshake(mid_handshake),
115                client_state: ClientState {
116                    last_activity: Instant::now(),
117                    last_sent: Instant::now(),
118                    received_messages: Vec::new(),
119                    sctp_state: SctpState::Shutdown,
120                    sctp_local_port: 0,
121                    sctp_remote_port: 0,
122                    sctp_local_verification_tag: 0,
123                    sctp_remote_verification_tag: 0,
124                    sctp_local_tsn: 0,
125                    sctp_remote_tsn: 0,
126                },
127            }),
128        }
129    }
130
131    /// DTLS and SCTP states are established, and RTC messages may be sent
132    pub fn is_established(&self) -> bool {
133        match (&self.ssl_state, self.client_state.sctp_state) {
134            (ClientSslState::Established(_), SctpState::Established) => true,
135            _ => false,
136        }
137    }
138
139    /// Time of last activity that indicates a working connection
140    pub fn last_activity(&self) -> Instant {
141        self.client_state.last_activity
142    }
143
144    /// Request SCTP and DTLS shutdown, connection immediately becomes un-established
145    pub fn start_shutdown(&mut self) -> Result<bool, ClientError> {
146        let started;
147        self.ssl_state = match mem::replace(&mut self.ssl_state, ClientSslState::Shutdown) {
148            ClientSslState::Established(mut ssl_stream) => {
149                started = true;
150                if self.client_state.sctp_state != SctpState::Shutdown {
151                    // TODO: For now, we just do an immediate one-sided SCTP abort
152                    send_sctp_packet(
153                        &self.buffer_pool,
154                        &mut ssl_stream,
155                        SctpPacket {
156                            source_port: self.client_state.sctp_local_port,
157                            dest_port: self.client_state.sctp_remote_port,
158                            verification_tag: self.client_state.sctp_remote_verification_tag,
159                            chunks: &[SctpChunk::Abort],
160                        },
161                    )?;
162                    self.client_state.last_sent = Instant::now();
163                    self.client_state.sctp_state = SctpState::Shutdown;
164                }
165                match ssl_stream.shutdown() {
166                    Err(err) => {
167                        if err.code() == ErrorCode::ZERO_RETURN {
168                            ClientSslState::Shutdown
169                        } else {
170                            return Err(ssl_err_to_client_err(err));
171                        }
172                    }
173                    Ok(res) => ClientSslState::ShuttingDown(ssl_stream, res),
174                }
175            }
176            prev_state => {
177                started = false;
178                prev_state
179            }
180        };
181        Ok(started)
182    }
183
184    /// Returns true if the shutdown process has been started or has already finished.
185    pub fn shutdown_started(&self) -> bool {
186        match &self.ssl_state {
187            ClientSslState::ShuttingDown(_, _) | ClientSslState::Shutdown => true,
188            _ => false,
189        }
190    }
191
192    /// Connection has finished shutting down.
193    pub fn is_shutdown(&self) -> bool {
194        match &self.ssl_state {
195            ClientSslState::ShuttingDown(_, ShutdownResult::Received)
196            | ClientSslState::Shutdown => true,
197            _ => false,
198        }
199    }
200
201    /// Generate any periodic packets, currently only heartbeat packets.
202    pub fn generate_periodic(&mut self) -> Result<(), ClientError> {
203        // We send heartbeat packets if the last sent packet was more than HEARTBEAT_INTERVAL ago
204        if self.client_state.last_sent.elapsed() > HEARTBEAT_INTERVAL {
205            match &mut self.ssl_state {
206                ClientSslState::Established(ssl_stream) => {
207                    if self.client_state.sctp_state == SctpState::Established {
208                        send_sctp_packet(
209                            &self.buffer_pool,
210                            ssl_stream,
211                            SctpPacket {
212                                source_port: self.client_state.sctp_local_port,
213                                dest_port: self.client_state.sctp_remote_port,
214                                verification_tag: self.client_state.sctp_remote_verification_tag,
215                                chunks: &[SctpChunk::Heartbeat {
216                                    heartbeat_info: Some(SCTP_HEARTBEAT),
217                                }],
218                            },
219                        )?;
220                        self.client_state.last_sent = Instant::now();
221                    }
222                }
223                _ => {}
224            }
225        }
226        Ok(())
227    }
228
229    /// Pushes an available UDP packet.  Will error if called when the client is currently in the
230    /// shutdown state.
231    pub fn receive_incoming_packet(&mut self, udp_packet: OwnedBuffer) -> Result<(), ClientError> {
232        self.ssl_state = match mem::replace(&mut self.ssl_state, ClientSslState::Shutdown) {
233            ClientSslState::Handshake(mut mid_handshake) => {
234                mid_handshake.get_mut().incoming_udp.push_back(udp_packet);
235                match mid_handshake.handshake() {
236                    Ok(ssl_stream) => {
237                        log::info!("DTLS handshake finished for remote {}", self.remote_addr);
238                        ClientSslState::Established(ssl_stream)
239                    }
240                    Err(handshake_error) => match handshake_error {
241                        HandshakeError::SetupFailure(err) => {
242                            return Err(ClientError::OpenSslError(err));
243                        }
244                        HandshakeError::Failure(mid_handshake) => {
245                            log::warn!(
246                                "SSL handshake failure with remote {}: {}",
247                                self.remote_addr,
248                                mid_handshake.error()
249                            );
250                            ClientSslState::Handshake(mid_handshake)
251                        }
252                        HandshakeError::WouldBlock(mid_handshake) => {
253                            ClientSslState::Handshake(mid_handshake)
254                        }
255                    },
256                }
257            }
258            ClientSslState::Established(mut ssl_stream) => {
259                ssl_stream.get_mut().incoming_udp.push_back(udp_packet);
260                ClientSslState::Established(ssl_stream)
261            }
262            ClientSslState::ShuttingDown(mut ssl_stream, shutdown_result) => {
263                ssl_stream.get_mut().incoming_udp.push_back(udp_packet);
264                match ssl_stream.shutdown() {
265                    Err(err) => {
266                        if err.code() == ErrorCode::WANT_READ {
267                            ClientSslState::ShuttingDown(ssl_stream, shutdown_result)
268                        } else if err.code() == ErrorCode::ZERO_RETURN {
269                            ClientSslState::Shutdown
270                        } else {
271                            return Err(ssl_err_to_client_err(err));
272                        }
273                    }
274                    Ok(res) => ClientSslState::ShuttingDown(ssl_stream, res),
275                }
276            }
277            ClientSslState::Shutdown => ClientSslState::Shutdown,
278        };
279
280        while let ClientSslState::Established(ssl_stream) = &mut self.ssl_state {
281            let mut ssl_buffer = self.buffer_pool.acquire();
282            ssl_buffer.resize(MAX_SCTP_PACKET_SIZE, 0);
283            match ssl_stream.ssl_read(&mut ssl_buffer) {
284                Ok(size) => {
285                    let mut sctp_chunks = [SctpChunk::Abort; SCTP_MAX_CHUNKS];
286                    match read_sctp_packet(&ssl_buffer[0..size], false, &mut sctp_chunks) {
287                        Ok(sctp_packet) => {
288                            if !receive_sctp_packet(
289                                &self.buffer_pool,
290                                ssl_stream,
291                                &mut self.client_state,
292                                &sctp_packet,
293                            )? {
294                                drop(ssl_buffer);
295                                self.start_shutdown()?;
296                            }
297                        }
298                        Err(err) => {
299                            log::debug!("sctp read error on packet received over DTLS: {}", err);
300                        }
301                    }
302                }
303                Err(err) => {
304                    if err.code() == ErrorCode::WANT_READ {
305                        break;
306                    } else if err.code() == ErrorCode::ZERO_RETURN {
307                        log::info!("DTLS received close notify");
308                        drop(ssl_buffer);
309                        self.start_shutdown()?;
310                    } else {
311                        return Err(ssl_err_to_client_err(err));
312                    }
313                }
314            }
315        }
316
317        Ok(())
318    }
319
320    pub fn take_outgoing_packets<'a>(&'a mut self) -> impl Iterator<Item = OwnedBuffer> + 'a {
321        (match &mut self.ssl_state {
322            ClientSslState::Handshake(mid_handshake) => {
323                Some(mid_handshake.get_mut().outgoing_udp.drain(..))
324            }
325            ClientSslState::Established(ssl_stream)
326            | ClientSslState::ShuttingDown(ssl_stream, _) => {
327                Some(ssl_stream.get_mut().outgoing_udp.drain(..))
328            }
329            ClientSslState::Shutdown => None,
330        })
331        .into_iter()
332        .flatten()
333    }
334
335    pub fn send_message(
336        &mut self,
337        message_type: MessageType,
338        message: &[u8],
339    ) -> Result<(), ClientError> {
340        let ssl_stream = match &mut self.ssl_state {
341            ClientSslState::Established(ssl_stream) => ssl_stream,
342            _ => {
343                return Err(ClientError::NotConnected);
344            }
345        };
346
347        if self.client_state.sctp_state != SctpState::Established {
348            return Err(ClientError::NotEstablished);
349        }
350
351        let proto_id = if message_type == MessageType::Text {
352            DATA_CHANNEL_PROTO_STRING
353        } else {
354            DATA_CHANNEL_PROTO_BINARY
355        };
356
357        send_sctp_packet(
358            &self.buffer_pool,
359            ssl_stream,
360            SctpPacket {
361                source_port: self.client_state.sctp_local_port,
362                dest_port: self.client_state.sctp_remote_port,
363                verification_tag: self.client_state.sctp_remote_verification_tag,
364                chunks: &[SctpChunk::Data {
365                    chunk_flags: SCTP_FLAG_COMPLETE_UNRELIABLE,
366                    tsn: self.client_state.sctp_local_tsn,
367                    stream_id: 0,
368                    stream_seq: 0,
369                    proto_id,
370                    user_data: message,
371                }],
372            },
373        )?;
374        self.client_state.sctp_local_tsn = self.client_state.sctp_local_tsn.wrapping_add(1);
375
376        Ok(())
377    }
378
379    pub fn receive_messages<'a>(
380        &'a mut self,
381    ) -> impl Iterator<Item = (MessageType, OwnedBuffer)> + 'a {
382        self.client_state.received_messages.drain(..)
383    }
384}
385
386pub struct ClientState {
387    last_activity: Instant,
388    last_sent: Instant,
389
390    received_messages: Vec<(MessageType, OwnedBuffer)>,
391
392    sctp_state: SctpState,
393
394    sctp_local_port: u16,
395    sctp_remote_port: u16,
396
397    sctp_local_verification_tag: u32,
398    sctp_remote_verification_tag: u32,
399
400    sctp_local_tsn: u32,
401    sctp_remote_tsn: u32,
402}
403
404enum ClientSslState {
405    Handshake(MidHandshakeSslStream<ClientSslPackets>),
406    Established(SslStream<ClientSslPackets>),
407    ShuttingDown(SslStream<ClientSslPackets>, ShutdownResult),
408    Shutdown,
409}
410
411#[derive(Debug)]
412struct ClientSslPackets {
413    buffer_pool: BufferPool,
414    incoming_udp: VecDeque<OwnedBuffer>,
415    outgoing_udp: VecDeque<OwnedBuffer>,
416}
417
418impl Read for ClientSslPackets {
419    fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
420        if let Some(next_packet) = self.incoming_udp.pop_front() {
421            let next_packet = self.buffer_pool.adopt(next_packet);
422            if next_packet.len() > buf.len() {
423                return Err(IoError::new(
424                    IoErrorKind::Other,
425                    ClientError::IncompletePacketRead,
426                ));
427            }
428            buf[0..next_packet.len()].copy_from_slice(&next_packet);
429            Ok(next_packet.len())
430        } else {
431            Err(IoErrorKind::WouldBlock.into())
432        }
433    }
434}
435
436impl Write for ClientSslPackets {
437    fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
438        let mut buffer = self.buffer_pool.acquire();
439        buffer.extend_from_slice(buf);
440        self.outgoing_udp.push_back(buffer.into_owned());
441        Ok(buf.len())
442    }
443
444    fn flush(&mut self) -> Result<(), IoError> {
445        Ok(())
446    }
447}
448
449const SCTP_COOKIE: &[u8] = b"WEBRTC-UNRELIABLE-COOKIE";
450const SCTP_HEARTBEAT: &[u8] = b"WEBRTC-UNRELIABLE-HEARTBEAT";
451const SCTP_MAX_CHUNKS: usize = 16;
452const SCTP_BUFFER_SIZE: u32 = 0x40000;
453
454const DATA_CHANNEL_PROTO_CONTROL: u32 = 50;
455const DATA_CHANNEL_PROTO_STRING: u32 = 51;
456const DATA_CHANNEL_PROTO_BINARY: u32 = 53;
457
458const DATA_CHANNEL_MESSAGE_ACK: u8 = 2;
459const DATA_CHANNEL_MESSAGE_OPEN: u8 = 3;
460
461#[derive(Debug, Eq, PartialEq, Copy, Clone)]
462enum SctpState {
463    Shutdown,
464    InitAck,
465    Established,
466}
467
468fn ssl_err_to_client_err(err: SslError) -> ClientError {
469    if let Some(io_err) = err.io_error() {
470        if let Some(inner) = io_err.get_ref() {
471            if inner.is::<ClientError>() {
472                return *err
473                    .into_io_error()
474                    .unwrap()
475                    .into_inner()
476                    .unwrap()
477                    .downcast()
478                    .unwrap();
479            }
480        }
481    }
482
483    ClientError::TlsError(err)
484}
485
486fn max_tsn(a: u32, b: u32) -> u32 {
487    if a > b {
488        if a - b < (1 << 31) {
489            a
490        } else {
491            b
492        }
493    } else {
494        if b - a < (1 << 31) {
495            b
496        } else {
497            a
498        }
499    }
500}
501
502fn send_sctp_packet(
503    buffer_pool: &BufferPool,
504    ssl_stream: &mut SslStream<ClientSslPackets>,
505    sctp_packet: SctpPacket,
506) -> Result<(), ClientError> {
507    let mut sctp_buffer = buffer_pool.acquire();
508    sctp_buffer.resize(MAX_SCTP_PACKET_SIZE, 0);
509
510    let packet_len = match write_sctp_packet(&mut sctp_buffer, sctp_packet) {
511        Ok(len) => len,
512        Err(SctpWriteError::BufferSize) => {
513            return Err(ClientError::IncompletePacketWrite);
514        }
515        Err(err) => panic!("error writing SCTP packet: {}", err),
516    };
517
518    assert_eq!(
519        ssl_stream
520            .ssl_write(&sctp_buffer[0..packet_len])
521            .map_err(ssl_err_to_client_err)?,
522        packet_len
523    );
524
525    Ok(())
526}
527
528fn receive_sctp_packet(
529    buffer_pool: &BufferPool,
530    ssl_stream: &mut SslStream<ClientSslPackets>,
531    client_state: &mut ClientState,
532    sctp_packet: &SctpPacket,
533) -> Result<bool, ClientError> {
534    for chunk in sctp_packet.chunks {
535        match *chunk {
536            SctpChunk::Init {
537                initiate_tag,
538                window_credit: _,
539                num_outbound_streams,
540                num_inbound_streams,
541                initial_tsn,
542                support_unreliable,
543            } => {
544                if !support_unreliable {
545                    log::warn!("peer does not support selective unreliability, abort connection");
546                    client_state.sctp_state = SctpState::Shutdown;
547                    return Ok(false);
548                }
549
550                let mut rng = thread_rng();
551
552                client_state.sctp_local_port = sctp_packet.dest_port;
553                client_state.sctp_remote_port = sctp_packet.source_port;
554
555                client_state.sctp_local_verification_tag = rng.gen();
556                client_state.sctp_remote_verification_tag = initiate_tag;
557
558                client_state.sctp_local_tsn = rng.gen();
559                client_state.sctp_remote_tsn = initial_tsn;
560
561                send_sctp_packet(
562                    &buffer_pool,
563                    ssl_stream,
564                    SctpPacket {
565                        source_port: client_state.sctp_local_port,
566                        dest_port: client_state.sctp_remote_port,
567                        verification_tag: client_state.sctp_remote_verification_tag,
568                        chunks: &[SctpChunk::InitAck {
569                            initiate_tag: client_state.sctp_local_verification_tag,
570                            window_credit: SCTP_BUFFER_SIZE,
571                            num_outbound_streams: num_outbound_streams,
572                            num_inbound_streams: num_inbound_streams,
573                            initial_tsn: client_state.sctp_local_tsn,
574                            state_cookie: SCTP_COOKIE,
575                        }],
576                    },
577                )?;
578
579                client_state.sctp_state = SctpState::InitAck;
580                client_state.last_activity = Instant::now();
581                client_state.last_sent = Instant::now();
582            }
583            SctpChunk::CookieEcho { state_cookie } => {
584                if state_cookie == SCTP_COOKIE && client_state.sctp_state != SctpState::Shutdown {
585                    send_sctp_packet(
586                        &buffer_pool,
587                        ssl_stream,
588                        SctpPacket {
589                            source_port: client_state.sctp_local_port,
590                            dest_port: client_state.sctp_remote_port,
591                            verification_tag: client_state.sctp_remote_verification_tag,
592                            chunks: &[SctpChunk::CookieAck],
593                        },
594                    )?;
595                    client_state.last_sent = Instant::now();
596
597                    if client_state.sctp_state == SctpState::InitAck {
598                        client_state.sctp_state = SctpState::Established;
599                        client_state.last_activity = Instant::now();
600                    }
601                }
602            }
603            SctpChunk::Data {
604                chunk_flags,
605                tsn,
606                stream_id,
607                stream_seq: _,
608                proto_id,
609                user_data,
610            } => {
611                if chunk_flags & SCTP_FLAG_BEGIN_FRAGMENT == 0
612                    || chunk_flags & SCTP_FLAG_END_FRAGMENT == 0
613                {
614                    log::debug!("received fragmented SCTP packet, dropping");
615                } else {
616                    client_state.sctp_remote_tsn = max_tsn(client_state.sctp_remote_tsn, tsn);
617
618                    if proto_id == DATA_CHANNEL_PROTO_CONTROL {
619                        if !user_data.is_empty() {
620                            if user_data[0] == DATA_CHANNEL_MESSAGE_OPEN {
621                                send_sctp_packet(
622                                    &buffer_pool,
623                                    ssl_stream,
624                                    SctpPacket {
625                                        source_port: client_state.sctp_local_port,
626                                        dest_port: client_state.sctp_remote_port,
627                                        verification_tag: client_state.sctp_remote_verification_tag,
628                                        chunks: &[SctpChunk::Data {
629                                            chunk_flags: SCTP_FLAG_COMPLETE_UNRELIABLE,
630                                            tsn: client_state.sctp_local_tsn,
631                                            stream_id,
632                                            stream_seq: 0,
633                                            proto_id: DATA_CHANNEL_PROTO_CONTROL,
634                                            user_data: &[DATA_CHANNEL_MESSAGE_ACK],
635                                        }],
636                                    },
637                                )?;
638                                client_state.sctp_local_tsn =
639                                    client_state.sctp_local_tsn.wrapping_add(1);
640                            }
641                        }
642                    } else if proto_id == DATA_CHANNEL_PROTO_STRING {
643                        let mut msg_buffer = buffer_pool.acquire();
644                        msg_buffer.extend(user_data);
645                        client_state
646                            .received_messages
647                            .push((MessageType::Text, msg_buffer.into_owned()));
648                    } else if proto_id == DATA_CHANNEL_PROTO_BINARY {
649                        let mut msg_buffer = buffer_pool.acquire();
650                        msg_buffer.extend(user_data);
651                        client_state
652                            .received_messages
653                            .push((MessageType::Binary, msg_buffer.into_owned()));
654                    }
655
656                    send_sctp_packet(
657                        &buffer_pool,
658                        ssl_stream,
659                        SctpPacket {
660                            source_port: client_state.sctp_local_port,
661                            dest_port: client_state.sctp_remote_port,
662                            verification_tag: client_state.sctp_remote_verification_tag,
663                            chunks: &[SctpChunk::SAck {
664                                cumulative_tsn_ack: client_state.sctp_remote_tsn,
665                                adv_recv_window: SCTP_BUFFER_SIZE,
666                                num_gap_ack_blocks: 0,
667                                num_dup_tsn: 0,
668                            }],
669                        },
670                    )?;
671
672                    client_state.last_activity = Instant::now();
673                    client_state.last_sent = Instant::now();
674                }
675            }
676            SctpChunk::Heartbeat { heartbeat_info } => {
677                send_sctp_packet(
678                    &buffer_pool,
679                    ssl_stream,
680                    SctpPacket {
681                        source_port: client_state.sctp_local_port,
682                        dest_port: client_state.sctp_remote_port,
683                        verification_tag: client_state.sctp_remote_verification_tag,
684                        chunks: &[SctpChunk::HeartbeatAck { heartbeat_info }],
685                    },
686                )?;
687                client_state.last_activity = Instant::now();
688                client_state.last_sent = Instant::now();
689            }
690            SctpChunk::HeartbeatAck { .. } => {
691                client_state.last_activity = Instant::now();
692            }
693            SctpChunk::SAck {
694                cumulative_tsn_ack: _,
695                adv_recv_window: _,
696                num_gap_ack_blocks,
697                num_dup_tsn: _,
698            } => {
699                if num_gap_ack_blocks > 0 {
700                    send_sctp_packet(
701                        &buffer_pool,
702                        ssl_stream,
703                        SctpPacket {
704                            source_port: client_state.sctp_local_port,
705                            dest_port: client_state.sctp_remote_port,
706                            verification_tag: client_state.sctp_remote_verification_tag,
707                            chunks: &[SctpChunk::ForwardTsn {
708                                new_cumulative_tsn: client_state.sctp_local_tsn,
709                            }],
710                        },
711                    )?;
712                    client_state.last_sent = Instant::now();
713                }
714                client_state.last_activity = Instant::now();
715            }
716            SctpChunk::Shutdown { .. } => {
717                send_sctp_packet(
718                    &buffer_pool,
719                    ssl_stream,
720                    SctpPacket {
721                        source_port: client_state.sctp_local_port,
722                        dest_port: client_state.sctp_remote_port,
723                        verification_tag: client_state.sctp_remote_verification_tag,
724                        chunks: &[SctpChunk::ShutdownAck],
725                    },
726                )?;
727            }
728            SctpChunk::ShutdownAck { .. } | SctpChunk::Abort => {
729                client_state.sctp_state = SctpState::Shutdown;
730                return Ok(false);
731            }
732            SctpChunk::ForwardTsn { new_cumulative_tsn } => {
733                client_state.sctp_remote_tsn = new_cumulative_tsn;
734            }
735            SctpChunk::InitAck { .. } | SctpChunk::CookieAck => {}
736            SctpChunk::Error {
737                first_param_type,
738                first_param_data,
739            } => {
740                log::warn!(
741                    "SCTP error chunk received: {} {:?}",
742                    first_param_type,
743                    first_param_data
744                );
745            }
746            chunk => log::debug!("unhandled SCTP chunk {:?}", chunk),
747        }
748    }
749
750    Ok(true)
751}