Skip to main content

turn_server_proto/
server.rs

1// Copyright (C) 2025 Matthew Waters <matthew@centricular.com>
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8//
9// SPDX-License-Identifier: MIT OR Apache-2.0
10
11//! A TURN server that can handle UDP and TCP connections.
12
13use alloc::borrow::ToOwned;
14use alloc::collections::{BTreeMap, VecDeque};
15use alloc::string::String;
16use alloc::vec;
17use alloc::vec::Vec;
18use byteorder::{BigEndian, ByteOrder};
19use core::net::{IpAddr, SocketAddr};
20use core::time::Duration;
21use pnet_packet::Packet;
22use stun_proto::auth::{
23    Feature, LongTermServerAuth, LongTermServerAuthErrorReason, LongTermServerValidation,
24};
25use turn_types::stun::attribute::{MessageIntegritySha256, PasswordAlgorithm, Userhash};
26use turn_types::stun::prelude::AttributeExt;
27use turn_types::tcp::{IncomingTcp, StoredTcp, TurnTcpBuffer};
28use turn_types::transmit::{DelayedChannel, DelayedMessage, TransmitBuild};
29
30use stun_proto::agent::{StunAgent, Transmit};
31use stun_proto::types::attribute::{
32    AttributeType, ErrorCode, Fingerprint, MessageIntegrity, Nonce, Realm, Username,
33    XorMappedAddress,
34};
35use stun_proto::types::message::{
36    LongTermCredentials, Message, MessageClass, MessageType, MessageWrite, MessageWriteExt,
37    MessageWriteVec, TransactionId, BINDING,
38};
39use stun_proto::types::prelude::{Attribute, AttributeFromRaw, AttributeStaticType};
40use stun_proto::types::TransportType;
41use stun_proto::Instant;
42use turn_types::channel::ChannelData;
43
44use turn_types::message::{CONNECT, CONNECTION_ATTEMPT, CONNECTION_BIND, CREATE_PERMISSION};
45
46use turn_types::attribute::{
47    AdditionalAddressFamily, AddressErrorCode, ConnectionId, Data as AData, DontFragment, EvenPort,
48    Icmp, RequestedAddressFamily, ReservationToken,
49};
50use turn_types::attribute::{
51    ChannelNumber, Lifetime, RequestedTransport, XorPeerAddress, XorRelayedAddress,
52};
53use turn_types::message::{ALLOCATE, CHANNEL_BIND, DATA, REFRESH, SEND};
54use turn_types::stun::message::{IntegrityAlgorithm, MessageHeader};
55use turn_types::AddressFamily;
56
57use tracing::{debug, error, info, trace, warn};
58
59use crate::api::{
60    DelayedMessageOrChannelSend, SocketAllocateError, TcpConnectError, TurnServerApi,
61    TurnServerPollRet,
62};
63
64static MAXIMUM_ALLOCATION_DURATION: Duration = Duration::from_secs(3600);
65static DEFAULT_ALLOCATION_DURATION: Duration = Duration::from_secs(600);
66static PERMISSION_DURATION: Duration = Duration::from_secs(300);
67static CHANNEL_DURATION: Duration = Duration::from_secs(600);
68static TCP_PEER_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
69
70/// A TURN server.
71#[derive(Debug)]
72pub struct TurnServer {
73    protocol: TurnServerProtocol,
74
75    // client_addr, listen_addr
76    incoming_tcp_buffers: BTreeMap<(SocketAddr, SocketAddr), TcpBuffer>,
77    // allocation_addr, peer_addr, pending
78    peer_tcp: BTreeMap<(SocketAddr, SocketAddr), PeerTcp>,
79}
80
81#[derive(Debug)]
82struct TurnServerProtocol {
83    stun: StunAgent,
84    auth: LongTermServerAuth,
85
86    clients: Vec<Client>,
87    pending_transmits: VecDeque<Transmit<Vec<u8>>>,
88    pending_allocates: VecDeque<PendingClient>,
89    pending_socket_removals: VecDeque<Socket5Tuple>,
90    pending_socket_listen_removals: VecDeque<(TransportType, SocketAddr)>,
91
92    tcp_connection_id: u32,
93    pending_tcp_connection_binds: Vec<PendingConnectionBind>,
94}
95
96#[derive(Debug, Copy, Clone, PartialEq, Eq)]
97struct Socket5Tuple {
98    transport: TransportType,
99    local_addr: SocketAddr,
100    remote_addr: SocketAddr,
101}
102
103#[derive(Debug)]
104struct PendingClient {
105    client: Client,
106    allocation_transport: TransportType,
107    transaction_id: TransactionId,
108    to_ask_families: smallvec::SmallVec<[AddressFamily; 2]>,
109    pending_families: smallvec::SmallVec<[AddressFamily; 2]>,
110    pending_sockets:
111        smallvec::SmallVec<[(AddressFamily, Result<SocketAddr, SocketAllocateError>); 2]>,
112    requested_lifetime: Option<u32>,
113}
114
115#[derive(Debug)]
116struct ForwardChannelData {
117    transport: TransportType,
118    from: SocketAddr,
119    to: SocketAddr,
120}
121
122#[derive(Debug)]
123struct PendingConnectionBind {
124    connection_id: u32,
125    listen_addr: SocketAddr,
126    relayed_addr: SocketAddr,
127    peer_addr: SocketAddr,
128    // the remote address of the client's control connection.
129    client_control_addr: SocketAddr,
130}
131
132/// TCP buffers on a TURN listening socket
133#[derive(Debug)]
134enum TcpBuffer {
135    // It is unknown what kind of connection this is.
136    Unknown(TurnTcpBuffer),
137    // The control TURN connection. Always buffered.
138    Control(TurnTcpBuffer),
139    Passthrough {
140        relayed_addr: SocketAddr,
141        peer_addr: SocketAddr,
142        pending_data: Vec<u8>,
143    },
144}
145
146/// TCP buffers when sending/receiving between a relayed address and a peer.
147#[derive(Debug)]
148enum PeerTcp {
149    PendingConnectionBind {
150        peer_data: Vec<u8>,
151        expires_at: Instant,
152    },
153    Passthrough {
154        client_addr: SocketAddr,
155        listen_addr: SocketAddr,
156        pending_data: Vec<u8>,
157    },
158}
159
160impl TurnServerProtocol {
161    // returns the username being accessed
162    fn validate_stun(
163        &mut self,
164        transmit: &Transmit<&Message<'_>>,
165        now: Instant,
166    ) -> Result<(), MessageWriteVec> {
167        let msg = transmit.data;
168
169        match self.auth.validate_incoming_message(msg, transmit.from, now) {
170            Ok(LongTermServerValidation::Validated(_algo)) => {
171                // checked by auth already
172                let username = msg.attribute::<Username>().ok();
173                let userhash = msg.attribute::<Userhash>().ok();
174
175                // All requests after the initial Allocate must use the same username as
176                // that used to create the allocation, to prevent attackers from
177                // hijacking the client's allocation.  Specifically, if the server
178                // requires the use of the long-term credential mechanism, and if a non-
179                // Allocate request passes authentication under this mechanism, and if
180                // the 5-tuple identifies an existing allocation, but the request does
181                // not use the same username as used to create the allocation, then the
182                // request MUST be rejected with a 441 (Wrong Credentials) error.
183                if let Some(client) =
184                    self.client_from_5tuple(transmit.transport, transmit.to, transmit.from)
185                {
186                    trace!(
187                        "checking user: {username:?} vs {:?} || {userhash:?} vs {:?}",
188                        client.username,
189                        client.userhash
190                    );
191                    if username.as_ref().is_some_and(|u| {
192                        client
193                            .username
194                            .as_ref()
195                            .map_or(true, |cu| u.username() != cu)
196                    }) || userhash.as_ref().map(|u| *u.hash()) != client.userhash
197                    {
198                        trace!(
199                            "mismatched username: {username:?} vs {:?} || {userhash:?} vs {:?}",
200                            client.username,
201                            client.userhash
202                        );
203                        let error = ErrorCode::builder(ErrorCode::WRONG_CREDENTIALS)
204                            .build()
205                            .unwrap();
206                        let mut builder = Message::builder_error(
207                            msg,
208                            MessageWriteVec::with_capacity(
209                                MessageHeader::LENGTH + error.padded_len() + 24,
210                            ),
211                        );
212                        builder.add_attribute(&error).unwrap();
213                        let builder = self
214                            .auth
215                            .sign_outgoing_message(builder, transmit.from)
216                            .unwrap();
217                        return Err(builder);
218                    }
219                }
220
221                Ok(())
222            }
223            Err(err) => match err.reason() {
224                LongTermServerAuthErrorReason::BadRequest => {
225                    trace!("auth bad request");
226                    Err(Self::bad_request(msg, 0))
227                }
228                LongTermServerAuthErrorReason::StaleNonce => {
229                    trace!("auth stale nonce");
230                    let error = ErrorCode::builder(ErrorCode::STALE_NONCE).build().unwrap();
231                    let nonce_value = self.auth.nonce_for_client(transmit.from).unwrap();
232                    let realm = Realm::new(self.auth.realm()).unwrap();
233                    let nonce = Nonce::new(nonce_value).unwrap();
234                    let mut builder = Message::builder_error(
235                        msg,
236                        MessageWriteVec::with_capacity(
237                            MessageHeader::LENGTH
238                                + nonce.padded_len()
239                                + realm.padded_len()
240                                + error.padded_len(),
241                        ),
242                    );
243                    builder.add_attribute(&error).unwrap();
244                    builder.add_attribute(&realm).unwrap();
245                    builder.add_attribute(&nonce).unwrap();
246
247                    Err(builder)
248                }
249                LongTermServerAuthErrorReason::Unauthorized
250                | LongTermServerAuthErrorReason::IntegrityFailed
251                | LongTermServerAuthErrorReason::UnsupportedFeature => {
252                    trace!("auth unauthorized");
253                    let error = ErrorCode::builder(ErrorCode::UNAUTHORIZED).build().unwrap();
254                    let mut builder = Message::builder_error(
255                        msg,
256                        MessageWriteVec::with_capacity(
257                            MessageHeader::LENGTH
258                                + error.padded_len()
259                                + self.auth.message_signature_bytes(transmit.from, false),
260                        ),
261                    );
262                    builder.add_attribute(&error).unwrap();
263                    let builder = self
264                        .auth
265                        .sign_outgoing_message(builder, transmit.from)
266                        .unwrap();
267
268                    Err(builder)
269                }
270            },
271        }
272    }
273
274    fn server_error(msg: &Message<'_>) -> MessageWriteVec {
275        let error = ErrorCode::builder(ErrorCode::SERVER_ERROR).build().unwrap();
276        let mut response = Message::builder_error(
277            msg,
278            MessageWriteVec::with_capacity(MessageHeader::LENGTH + error.padded_len() + 8),
279        );
280        response.add_attribute(&error).unwrap();
281        response.add_fingerprint().unwrap();
282        response
283    }
284
285    fn bad_request(msg: &Message<'_>, additional_bytes: usize) -> MessageWriteVec {
286        let error = ErrorCode::builder(ErrorCode::BAD_REQUEST).build().unwrap();
287        let mut builder = Message::builder_error(
288            msg,
289            MessageWriteVec::with_capacity(
290                MessageHeader::LENGTH + error.padded_len() + additional_bytes,
291            ),
292        );
293        builder.add_attribute(&error).unwrap();
294        builder
295    }
296
297    fn bad_request_signed(
298        auth: &mut LongTermServerAuth,
299        msg: &Message<'_>,
300        to: SocketAddr,
301    ) -> MessageWriteVec {
302        let builder = Self::bad_request(msg, auth.message_signature_bytes(to, false));
303        auth.sign_outgoing_message(builder, to).unwrap()
304    }
305
306    fn allocation_mismatch(
307        auth: &mut LongTermServerAuth,
308        msg: &Message<'_>,
309        to: SocketAddr,
310    ) -> MessageWriteVec {
311        let error = ErrorCode::builder(ErrorCode::ALLOCATION_MISMATCH)
312            .build()
313            .unwrap();
314        let mut response = Message::builder_error(
315            msg,
316            MessageWriteVec::with_capacity(
317                MessageHeader::LENGTH
318                    + error.padded_len()
319                    + auth.message_signature_bytes(to, false)
320                    + 8,
321            ),
322        );
323        response.add_attribute(&error).unwrap();
324        let mut response = auth.sign_outgoing_message(response, to).unwrap();
325        response.add_fingerprint().unwrap();
326        response
327    }
328
329    fn handle_stun_binding(
330        &mut self,
331        transmit: Transmit<&Message<'_>>,
332        now: Instant,
333    ) -> Result<Transmit<Vec<u8>>, MessageWriteVec> {
334        let msg = transmit.data;
335        let response = if let Some(error_msg) = Message::check_attribute_types(
336            msg,
337            &[Fingerprint::TYPE],
338            &[],
339            MessageWriteVec::with_capacity(64),
340        ) {
341            error_msg
342        } else {
343            let xor_addr = XorMappedAddress::new(transmit.from, msg.transaction_id());
344            let mut response = Message::builder_success(
345                msg,
346                MessageWriteVec::with_capacity(MessageHeader::LENGTH + xor_addr.padded_len() + 8),
347            );
348            response.add_attribute(&xor_addr).unwrap();
349            response.add_fingerprint().unwrap();
350            response
351        };
352        let response = response.finish();
353
354        let Ok(transmit) = self.stun.send(response, transmit.to, now) else {
355            error!("Failed to send");
356            return Err(Self::server_error(msg));
357        };
358
359        Ok(transmit)
360    }
361
362    fn handle_stun_allocate(
363        &mut self,
364        transmit: Transmit<&Message<'_>>,
365        tcp_type: TcpStunType,
366        now: Instant,
367        tcp_stun_change: &mut Option<TcpStunChange>,
368    ) -> Result<(), MessageWriteVec> {
369        let msg = transmit.data;
370        self.validate_stun(&transmit, now)?;
371        let mut address_families = smallvec::SmallVec::<[AddressFamily; 2]>::new();
372
373        if let Some(_client) =
374            self.mut_client_from_5tuple(transmit.transport, transmit.to, transmit.from)
375        {
376            trace!("allocation mismatch");
377            return Err(Self::allocation_mismatch(
378                &mut self.auth,
379                msg,
380                transmit.from,
381            ));
382        };
383
384        let mut requested_transport = None;
385        let mut lifetime = None;
386        let mut reservation_token = None;
387        let mut even_port = None;
388        let mut requested_address_family = None;
389        let mut additional_address_family = None;
390        let mut username = None;
391        let mut userhash = None;
392        let mut dont_fragment = None;
393
394        let mut unknown_attributes = smallvec::SmallVec::<[AttributeType; 4]>::default();
395        for (_offset, attr) in msg.iter_attributes() {
396            match attr.get_type() {
397                // checked by validate_stun()
398                Realm::TYPE
399                | Nonce::TYPE
400                | PasswordAlgorithm::TYPE
401                | MessageIntegrity::TYPE
402                | MessageIntegritySha256::TYPE => (),
403                Userhash::TYPE => {
404                    userhash = Userhash::from_raw(attr).ok().map(|u| *u.hash());
405                }
406                Username::TYPE => {
407                    username = Username::from_raw(attr)
408                        .ok()
409                        .map(|u| u.username().to_owned())
410                }
411                RequestedTransport::TYPE => {
412                    requested_transport = RequestedTransport::from_raw(attr).ok()
413                }
414                Lifetime::TYPE => lifetime = Lifetime::from_raw(attr).ok(),
415                ReservationToken::TYPE => reservation_token = Some(attr),
416                EvenPort::TYPE => even_port = Some(attr),
417                RequestedAddressFamily::TYPE => {
418                    if additional_address_family.is_some() {
419                        return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
420                    } else {
421                        requested_address_family = Some(attr)
422                    }
423                }
424                AdditionalAddressFamily::TYPE => {
425                    if requested_address_family.is_some() {
426                        return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
427                    } else {
428                        additional_address_family = Some(attr)
429                    }
430                }
431                DontFragment::TYPE => {
432                    dont_fragment = DontFragment::from_raw(attr).ok();
433                }
434                atype => {
435                    if atype.comprehension_required() {
436                        unknown_attributes.push(atype);
437                    }
438                }
439            }
440        }
441
442        if !unknown_attributes.is_empty() {
443            trace!("unknown attributes: {unknown_attributes:?}");
444            let err = Message::unknown_attributes(
445                msg,
446                &unknown_attributes,
447                MessageWriteVec::with_capacity(64),
448            );
449            let err = self.auth.sign_outgoing_message(err, transmit.from).unwrap();
450            return Err(err);
451        }
452
453        let Some(requested_transport) = requested_transport else {
454            return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
455        };
456
457        let allocation_transport = match requested_transport.protocol() {
458            RequestedTransport::UDP => TransportType::Udp,
459            RequestedTransport::TCP => {
460                // RFC 6062 Section 5.1
461                // 2.  If the client connection transport is not TCP or TLS, the server
462                //     MUST reject the request with a 400 (Bad Request) error.
463                // 3.  If the request contains the DONT-FRAGMENT, EVEN-PORT, or
464                //     RESERVATION-TOKEN attribute, the server MUST reject the request
465                //     with a 400 (Bad Request) error.
466                if self.stun.transport() != TransportType::Tcp
467                    || even_port.is_some()
468                    || dont_fragment.is_some()
469                    || reservation_token.is_some()
470                {
471                    return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
472                }
473                TransportType::Tcp
474            }
475            protocol => {
476                debug!("unsupported RequestedTransport {protocol}",);
477                let error = ErrorCode::builder(ErrorCode::UNSUPPORTED_TRANSPORT_PROTOCOL)
478                    .build()
479                    .unwrap();
480                let mut builder = Message::builder_error(
481                    msg,
482                    MessageWriteVec::with_capacity(MessageHeader::LENGTH + error.padded_len() + 24),
483                );
484                builder.add_attribute(&error).unwrap();
485                let builder = self
486                    .auth
487                    .sign_outgoing_message(builder, transmit.from)
488                    .unwrap();
489                return Err(builder);
490            }
491        };
492
493        if let Some(additional) = additional_address_family {
494            let Ok(additional) = AdditionalAddressFamily::from_raw(additional) else {
495                return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
496            };
497            /* The server checks if the request contains both
498             * REQUESTED-ADDRESS-FAMILY and ADDITIONAL-ADDRESS-FAMILY attributes. If yes,
499             * then the server rejects the request with a 400 (Bad Request) error.
500             */
501            /* The server checks if the request contains an ADDITIONAL-ADDRESS-FAMILY
502             * attribute. If yes, and the attribute value is 0x01 (IPv4 address family),
503             * then the server rejects the request with a 400 (Bad Request) error.
504             */
505            if requested_address_family.is_some()
506                || additional.family() == AddressFamily::IPV4
507                || reservation_token.is_some()
508                || even_port.is_some()
509            {
510                debug!(
511                    "AdditionalAddressFamily with either {} == IPV4, ReservationToken {}, RequestedAddressFamily {}, or EvenPort {}. Bad Request",
512                    additional.family(),
513                    reservation_token.is_some(),
514                    requested_address_family.is_some(),
515                    even_port.is_some(),
516                );
517                return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
518            }
519            address_families.push(AddressFamily::IPV4);
520            address_families.push(additional.family());
521        }
522
523        if let Some(requested) = requested_address_family {
524            let Ok(requested) = RequestedAddressFamily::from_raw(requested) else {
525                return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
526            };
527            if reservation_token.is_some() {
528                debug!("RequestedAddressFamily with ReservationToken -> Bad Request");
529                return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
530            }
531            address_families.push(requested.family());
532        } else if address_families.is_empty() {
533            address_families.push(AddressFamily::IPV4);
534        }
535
536        if let Some(_reservation_token) = reservation_token {
537            /* The server checks if the request contains a RESERVATION-TOKEN
538             * attribute. If yes, and the request also contains an EVEN-PORT or
539             * REQUESTED-ADDRESS-FAMILY or ADDITIONAL-ADDRESS-FAMILY attribute,
540             * the server rejects the request with a 400 (Bad Request) error.
541             * Otherwise, it checks to see if the token is valid (i.e., the
542             * token is in range and has not expired, and the corresponding
543             * relayed transport address is still available). If the token is
544             * not valid for some reason, the server rejects the request with a
545             * 508 (Insufficient Capacity) error.
546             */
547            if even_port.is_some() {
548                debug!("ReservationToken with EvenPort -> Bad Request");
549                return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
550            }
551
552            // TODO: further RESERVATION-TOKEN handling
553        }
554
555        // TODO: DONT-FRAGMENT
556        // TODO: EVEN-PORT
557        // TODO: allocation quota
558        // XXX: TRY-ALTERNATE
559
560        let client = Client {
561            transport: transmit.transport,
562            remote_addr: transmit.from,
563            local_addr: transmit.to,
564            allocations: vec![],
565            username,
566            userhash,
567        };
568        debug!(
569            "have new pending ALLOCATE from client {} from {} to {}",
570            transmit.transport, transmit.from, transmit.to
571        );
572
573        self.pending_allocates.push_front(PendingClient {
574            client,
575            allocation_transport,
576            transaction_id: msg.transaction_id(),
577            to_ask_families: address_families.clone(),
578            pending_families: address_families,
579            pending_sockets: Default::default(),
580            requested_lifetime: lifetime.map(|lt| lt.seconds()),
581        });
582
583        if tcp_type == TcpStunType::Unknown {
584            *tcp_stun_change = Some(TcpStunChange::Control);
585        }
586
587        Ok(())
588    }
589
590    fn peer_address_family_mismatch_signed(
591        auth: &mut LongTermServerAuth,
592        msg: &Message<'_>,
593        to: SocketAddr,
594    ) -> MessageWriteVec {
595        let error = ErrorCode::builder(ErrorCode::PEER_ADDRESS_FAMILY_MISMATCH)
596            .build()
597            .unwrap();
598        let mut response = Message::builder_error(
599            msg,
600            MessageWriteVec::with_capacity(
601                MessageHeader::LENGTH
602                    + error.padded_len()
603                    + auth.message_signature_bytes(to, false)
604                    + 8,
605            ),
606        );
607        response.add_attribute(&error).unwrap();
608        let mut response = auth.sign_outgoing_message(response, to).unwrap();
609        response.add_fingerprint().unwrap();
610        response
611    }
612
613    fn handle_stun_refresh(
614        &mut self,
615        transmit: Transmit<&Message<'_>>,
616        now: Instant,
617        tcp_stun_change: &mut Option<TcpStunChange>,
618    ) -> Result<Transmit<Vec<u8>>, MessageWriteVec> {
619        let msg = transmit.data;
620        self.validate_stun(&transmit, now)?;
621
622        let Some(client) =
623            self.mut_client_from_5tuple(transmit.transport, transmit.to, transmit.from)
624        else {
625            trace!("allocation mismatch");
626            return Err(Self::allocation_mismatch(
627                &mut self.auth,
628                msg,
629                transmit.from,
630            ));
631        };
632
633        let mut request_lifetime = None;
634        let mut requested_family = None;
635
636        let mut unknown_attributes = smallvec::SmallVec::<[AttributeType; 4]>::default();
637        for (_offset, attr) in msg.iter_attributes() {
638            match attr.get_type() {
639                // handled by validate_stun
640                Username::TYPE
641                | Userhash::TYPE
642                | Realm::TYPE
643                | Nonce::TYPE
644                | PasswordAlgorithm::TYPE
645                | MessageIntegrity::TYPE
646                | MessageIntegritySha256::TYPE => (),
647                Lifetime::TYPE => {
648                    request_lifetime = Lifetime::from_raw(attr).ok().map(|lt| lt.seconds())
649                }
650                RequestedAddressFamily::TYPE => {
651                    requested_family = RequestedAddressFamily::from_raw(attr)
652                        .ok()
653                        .map(|r| r.family())
654                }
655                atype => {
656                    if atype.comprehension_required() {
657                        unknown_attributes.push(atype);
658                    }
659                }
660            }
661        }
662        if !unknown_attributes.is_empty() {
663            trace!("unknown attributes: {unknown_attributes:?}");
664            let err = Message::unknown_attributes(
665                msg,
666                &unknown_attributes,
667                MessageWriteVec::with_capacity(64),
668            );
669            let err = self.auth.sign_outgoing_message(err, transmit.from).unwrap();
670            return Err(err);
671        }
672
673        // TODO: proper lifetime handling
674        let mut request_lifetime =
675            request_lifetime.unwrap_or(DEFAULT_ALLOCATION_DURATION.as_secs() as u32);
676        if request_lifetime > 0 {
677            request_lifetime = request_lifetime.clamp(
678                DEFAULT_ALLOCATION_DURATION.as_secs() as u32,
679                MAXIMUM_ALLOCATION_DURATION.as_secs() as u32,
680            );
681        }
682        let mut modified = false;
683        let mut removed_client = false;
684        if request_lifetime == 0 {
685            error!("deleting allocation");
686            if let Some(family) = requested_family {
687                if let Some(allocation_idx) = client.allocations.iter().position(|allocation| {
688                    (family == AddressFamily::IPV4 && allocation.addr.is_ipv4())
689                        || (family == AddressFamily::IPV6 && allocation.addr.is_ipv6())
690                }) {
691                    modified = true;
692                    *tcp_stun_change = Some(TcpStunChange::Delete(vec![client
693                        .allocations
694                        .swap_remove(allocation_idx)]));
695                    if client.allocations.is_empty() {
696                        self.remove_client_by_5tuple(
697                            transmit.transport,
698                            transmit.to,
699                            transmit.from,
700                        )
701                        .unwrap();
702                        removed_client = true;
703                    }
704                }
705            } else {
706                let Some(client) =
707                    self.remove_client_by_5tuple(transmit.transport, transmit.to, transmit.from)
708                else {
709                    unreachable!();
710                };
711                *tcp_stun_change = Some(TcpStunChange::Delete(client.allocations));
712                removed_client = true;
713                modified = true;
714            }
715        } else {
716            for allocation in client.allocations.iter_mut() {
717                if requested_family.map_or(true, |family| {
718                    (family == AddressFamily::IPV4 && allocation.addr.is_ipv4())
719                        || (family == AddressFamily::IPV6 && allocation.addr.is_ipv6())
720                }) {
721                    modified = true;
722                    allocation.expires_at = now + Duration::from_secs(request_lifetime as u64)
723                }
724            }
725        }
726
727        let response = if modified {
728            let lifetime = Lifetime::new(request_lifetime);
729            let mut builder = Message::builder_success(
730                msg,
731                MessageWriteVec::with_capacity(MessageHeader::LENGTH + lifetime.padded_len() + 24),
732            );
733            builder.add_attribute(&lifetime).unwrap();
734            let builder = self
735                .auth
736                .sign_outgoing_message(builder, transmit.from)
737                .unwrap();
738            builder.finish()
739        } else {
740            trace!("peer address family mismatch");
741            return Err(Self::peer_address_family_mismatch_signed(
742                &mut self.auth,
743                msg,
744                transmit.from,
745            ));
746        };
747        if removed_client {
748            self.auth.remove_client(transmit.from);
749        }
750
751        let Ok(transmit) = self.stun.send(response, transmit.from, now) else {
752            error!("Failed to send");
753            return Err(Self::server_error(msg));
754        };
755
756        if request_lifetime == 0 {
757            error!("{:?}", tcp_stun_change);
758            info!(
759                "Successfully deleted allocation {}, client {} to {}",
760                transmit.transport, transmit.from, transmit.to
761            );
762        } else {
763            info!(
764                "Successfully refreshed allocation {}, client {} to {}",
765                transmit.transport, transmit.from, transmit.to
766            );
767        }
768
769        Ok(transmit)
770    }
771
772    fn handle_stun_create_permission(
773        &mut self,
774        transmit: Transmit<&Message<'_>>,
775        now: Instant,
776    ) -> Result<Transmit<Vec<u8>>, MessageWriteVec> {
777        let msg = transmit.data;
778        self.validate_stun(&transmit, now)?;
779
780        let Some(client) =
781            self.mut_client_from_5tuple(transmit.transport, transmit.to, transmit.from)
782        else {
783            trace!("allocation mismatch");
784            return Err(Self::allocation_mismatch(
785                &mut self.auth,
786                msg,
787                transmit.from,
788            ));
789        };
790
791        let mut peer_addresses = smallvec::SmallVec::<[SocketAddr; 4]>::default();
792        let mut unknown_attributes = smallvec::SmallVec::<[AttributeType; 4]>::default();
793        for (_offset, attr) in msg.iter_attributes() {
794            match attr.get_type() {
795                // checked by validate_stun()
796                Username::TYPE
797                | Userhash::TYPE
798                | Realm::TYPE
799                | Nonce::TYPE
800                | PasswordAlgorithm::TYPE
801                | MessageIntegrity::TYPE
802                | MessageIntegritySha256::TYPE => (),
803                XorPeerAddress::TYPE => {
804                    let Ok(xor_peer_addr) = XorPeerAddress::from_raw(attr) else {
805                        return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
806                    };
807                    peer_addresses.push(xor_peer_addr.addr(msg.transaction_id()));
808                }
809                atype => {
810                    if atype.comprehension_required() {
811                        unknown_attributes.push(atype);
812                    }
813                }
814            }
815        }
816        if !unknown_attributes.is_empty() {
817            trace!("unknown attributes: {unknown_attributes:?}");
818            let err = Message::unknown_attributes(
819                msg,
820                &unknown_attributes,
821                MessageWriteVec::with_capacity(64),
822            );
823            let err = self.auth.sign_outgoing_message(err, transmit.from).unwrap();
824            return Err(err);
825        }
826        if peer_addresses.is_empty() {
827            return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
828        }
829
830        for peer_addr in peer_addresses.iter() {
831            let Some(alloc) = client
832                .allocations
833                .iter_mut()
834                .find(|a| a.addr.is_ipv4() == peer_addr.is_ipv4())
835            else {
836                trace!("peer address family mismatch");
837                return Err(Self::peer_address_family_mismatch_signed(
838                    &mut self.auth,
839                    msg,
840                    transmit.from,
841                ));
842            };
843
844            if now > alloc.expires_at {
845                trace!("allocation has expired");
846                // allocation has expired
847                return Err(Self::allocation_mismatch(
848                    &mut self.auth,
849                    msg,
850                    transmit.from,
851                ));
852            }
853
854            // TODO: support TCP allocations
855            if let Some(position) = alloc
856                .permissions
857                .iter()
858                .position(|perm| perm.ttype == TransportType::Udp && perm.addr == peer_addr.ip())
859            {
860                alloc.permissions[position].expires_at = now + PERMISSION_DURATION;
861            } else {
862                alloc.permissions.push(Permission {
863                    addr: peer_addr.ip(),
864                    ttype: TransportType::Udp,
865                    expires_at: now + PERMISSION_DURATION,
866                });
867            }
868        }
869
870        let builder = Message::builder_success(
871            msg,
872            MessageWriteVec::with_capacity(
873                MessageHeader::LENGTH + self.auth.message_signature_bytes(transmit.from, false),
874            ),
875        );
876        let builder = self
877            .auth
878            .sign_outgoing_message(builder, transmit.from)
879            .unwrap();
880        let response = builder.finish();
881
882        let Ok(transmit) = self.stun.send(response, transmit.from, now) else {
883            error!("Failed to send");
884            return Err(Self::server_error(msg));
885        };
886        debug!(
887            "allocation {} from {} to {} successfully created permission for {:?}",
888            transmit.transport, transmit.from, transmit.to, peer_addresses
889        );
890
891        Ok(transmit)
892    }
893
894    fn handle_stun_channel_bind(
895        &mut self,
896        transmit: Transmit<&Message<'_>>,
897        now: Instant,
898    ) -> Result<Transmit<Vec<u8>>, MessageWriteVec> {
899        let msg = transmit.data;
900        self.validate_stun(&transmit, now)?;
901
902        let Some(client) =
903            self.mut_client_from_5tuple(transmit.transport, transmit.to, transmit.from)
904        else {
905            trace!("allocation mismatch");
906            return Err(Self::allocation_mismatch(
907                &mut self.auth,
908                msg,
909                transmit.from,
910            ));
911        };
912
913        let mut xor_peer_address = None;
914        let mut channel_number = None;
915
916        let mut unknown_attributes = smallvec::SmallVec::<[AttributeType; 4]>::default();
917        for (_offset, attr) in msg.iter_attributes() {
918            match attr.get_type() {
919                // checked by validate_stun()
920                Username::TYPE
921                | Userhash::TYPE
922                | Realm::TYPE
923                | Nonce::TYPE
924                | PasswordAlgorithm::TYPE
925                | MessageIntegrity::TYPE
926                | MessageIntegritySha256::TYPE => (),
927                XorPeerAddress::TYPE => xor_peer_address = XorPeerAddress::from_raw(attr).ok(),
928                ChannelNumber::TYPE => channel_number = ChannelNumber::from_raw(attr).ok(),
929                atype => {
930                    if atype.comprehension_required() {
931                        unknown_attributes.push(atype);
932                    }
933                }
934            }
935        }
936        if !unknown_attributes.is_empty() {
937            trace!("unknown attributes: {unknown_attributes:?}");
938            let err = Message::unknown_attributes(
939                msg,
940                &unknown_attributes,
941                MessageWriteVec::with_capacity(64),
942            );
943            let err = self.auth.sign_outgoing_message(err, transmit.from).unwrap();
944            return Err(err);
945        }
946
947        let peer_addr = xor_peer_address.map(|peer_addr| peer_addr.addr(msg.transaction_id()));
948        let channel_no = channel_number.map(|channel| channel.channel());
949
950        let Some(peer_addr) = peer_addr else {
951            trace!("No peer address");
952            return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
953        };
954
955        let Some(alloc) = client
956            .allocations
957            .iter_mut()
958            .find(|allocation| allocation.addr.is_ipv4() == peer_addr.is_ipv4())
959        else {
960            trace!("peer address family mismatch");
961            return Err(Self::peer_address_family_mismatch_signed(
962                &mut self.auth,
963                msg,
964                transmit.from,
965            ));
966        };
967
968        if now > alloc.expires_at {
969            trace!("allocation has expired");
970            // allocation has expired
971            return Err(Self::allocation_mismatch(
972                &mut self.auth,
973                msg,
974                transmit.from,
975            ));
976        }
977
978        let mut existing = alloc.channels.iter_mut().find(|channel| {
979            channel.peer_addr == peer_addr && channel.peer_transport == TransportType::Udp
980        });
981
982        let Some(channel_no) = channel_no else {
983            debug!("Bad request: no requested channel id");
984            return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
985        };
986
987        // RFC8656 reduces this range to 0x4000..=0x4fff but we keep the RFC5766 range for
988        // backwards compatibility
989        if !(0x4000..=0x7fff).contains(&channel_no) {
990            trace!("Channel id out of range");
991            return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
992        }
993        if existing
994            .as_ref()
995            .is_some_and(|existing| existing.id != channel_no)
996        {
997            trace!("channel peer address does not match channel ID");
998            return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
999        }
1000
1001        if let Some(existing) = existing.as_mut() {
1002            existing.expires_at = now + CHANNEL_DURATION;
1003        } else {
1004            alloc.channels.push(Channel {
1005                id: channel_no,
1006                peer_addr,
1007                peer_transport: TransportType::Udp,
1008                expires_at: now + CHANNEL_DURATION,
1009            });
1010        }
1011
1012        if let Some(existing) = alloc
1013            .permissions
1014            .iter_mut()
1015            .find(|perm| perm.ttype == TransportType::Udp && perm.addr == peer_addr.ip())
1016        {
1017            existing.expires_at = now + PERMISSION_DURATION;
1018        } else {
1019            alloc.permissions.push(Permission {
1020                addr: peer_addr.ip(),
1021                ttype: TransportType::Udp,
1022                expires_at: now + PERMISSION_DURATION,
1023            });
1024        }
1025
1026        let builder = Message::builder_success(
1027            msg,
1028            MessageWriteVec::with_capacity(MessageHeader::LENGTH + 24),
1029        );
1030        let builder = self
1031            .auth
1032            .sign_outgoing_message(builder, transmit.from)
1033            .unwrap();
1034        let response = builder.finish();
1035
1036        let Ok(transmit) = self.stun.send(response, transmit.from, now) else {
1037            error!("Failed to send");
1038            return Err(Self::server_error(msg));
1039        };
1040
1041        debug!(
1042            "allocation {} from {} to {} successfully created channel {channel_no} for {:?}",
1043            transmit.transport,
1044            transmit.from,
1045            transmit.to,
1046            peer_addr.ip()
1047        );
1048
1049        Ok(transmit)
1050    }
1051
1052    fn connection_already_exists_error_signed(
1053        auth: &mut LongTermServerAuth,
1054        msg: &Message<'_>,
1055        to: SocketAddr,
1056    ) -> MessageWriteVec {
1057        let error = ErrorCode::builder(ErrorCode::CONNECTION_ALREADY_EXISTS)
1058            .build()
1059            .unwrap();
1060        let mut response = Message::builder_error(
1061            msg,
1062            MessageWriteVec::with_capacity(
1063                MessageHeader::LENGTH
1064                    + error.padded_len()
1065                    + auth.message_signature_bytes(to, false)
1066                    + 8,
1067            ),
1068        );
1069        response.add_attribute(&error).unwrap();
1070        let mut response = auth.sign_outgoing_message(response, to).unwrap();
1071        response.add_fingerprint().unwrap();
1072        response
1073    }
1074
1075    fn handle_stun_connect(
1076        &mut self,
1077        peer_tcp: &BTreeMap<(SocketAddr, SocketAddr), PeerTcp>,
1078        transmit: Transmit<&Message<'_>>,
1079        now: Instant,
1080    ) -> Result<(), MessageWriteVec> {
1081        let msg = transmit.data;
1082        self.validate_stun(&transmit, now)?;
1083
1084        let Some(client) =
1085            self.mut_client_from_5tuple(transmit.transport, transmit.to, transmit.from)
1086        else {
1087            trace!("allocation mismatch");
1088            return Err(Self::allocation_mismatch(
1089                &mut self.auth,
1090                msg,
1091                transmit.from,
1092            ));
1093        };
1094
1095        let mut peer_addr = None;
1096
1097        let mut unknown_attributes = smallvec::SmallVec::<[AttributeType; 4]>::default();
1098        for (_offset, attr) in msg.iter_attributes() {
1099            match attr.get_type() {
1100                // handled by validate_stun
1101                Username::TYPE
1102                | Userhash::TYPE
1103                | Realm::TYPE
1104                | Nonce::TYPE
1105                | PasswordAlgorithm::TYPE
1106                | MessageIntegrity::TYPE
1107                | MessageIntegritySha256::TYPE => (),
1108                XorPeerAddress::TYPE => {
1109                    peer_addr = XorPeerAddress::from_raw(attr)
1110                        .ok()
1111                        .map(|r| r.addr(msg.transaction_id()))
1112                }
1113                atype => {
1114                    if atype.comprehension_required() {
1115                        unknown_attributes.push(atype);
1116                    }
1117                }
1118            }
1119        }
1120        if !unknown_attributes.is_empty() {
1121            trace!("unknown attributes: {unknown_attributes:?}");
1122            let err = Message::unknown_attributes(
1123                msg,
1124                &unknown_attributes,
1125                MessageWriteVec::with_capacity(64),
1126            );
1127            let err = self.auth.sign_outgoing_message(err, transmit.from).unwrap();
1128            return Err(err);
1129        }
1130
1131        let Some(peer_addr) = peer_addr else {
1132            return Err(Self::bad_request_signed(&mut self.auth, msg, transmit.from));
1133        };
1134
1135        let Some(alloc) = client
1136            .allocations
1137            .iter_mut()
1138            .find(|allocation| allocation.addr.is_ipv4() == peer_addr.is_ipv4())
1139        else {
1140            trace!("peer address family mismatch");
1141            return Err(Self::peer_address_family_mismatch_signed(
1142                &mut self.auth,
1143                msg,
1144                transmit.from,
1145            ));
1146        };
1147
1148        error!("now: {now}, expires {}", alloc.expires_at);
1149        if alloc.expires_at < now {
1150            trace!("allocation has expired");
1151            // allocation has expired
1152            return Err(Self::allocation_mismatch(
1153                &mut self.auth,
1154                msg,
1155                transmit.from,
1156            ));
1157        }
1158
1159        if alloc
1160            .pending_tcp_connect
1161            .iter()
1162            .any(|pending| pending.peer_addr == peer_addr)
1163            || peer_tcp
1164                .iter()
1165                .any(|((_relayed_addr, tcp_peer_addr), _)| tcp_peer_addr == &peer_addr)
1166        {
1167            trace!("already exists");
1168            return Err(Self::connection_already_exists_error_signed(
1169                &mut self.auth,
1170                msg,
1171                transmit.from,
1172            ));
1173        }
1174
1175        alloc.pending_tcp_connect.push(PendingTcpConnect {
1176            transaction_id: msg.transaction_id(),
1177            client_control_addr: transmit.from,
1178            listen_addr: transmit.to,
1179            relayed_addr: alloc.addr,
1180            peer_addr,
1181            expires_at: None,
1182        });
1183        Ok(())
1184    }
1185
1186    fn handle_stun_connection_bind(
1187        &mut self,
1188        transmit: Transmit<&Message<'_>>,
1189        now: Instant,
1190        tcp_stun_change: &mut Option<TcpStunChange>,
1191    ) -> Result<Transmit<Vec<u8>>, MessageWriteVec> {
1192        let msg = transmit.data;
1193
1194        if transmit.transport != TransportType::Tcp {
1195            return Err(Self::bad_request(msg, 0));
1196        }
1197
1198        if self
1199            .client_from_5tuple(transmit.transport, transmit.to, transmit.from)
1200            .is_some()
1201        {
1202            trace!("allocation mismatch");
1203            let error = ErrorCode::builder(ErrorCode::ALLOCATION_MISMATCH)
1204                .build()
1205                .unwrap();
1206            let mut response = Message::builder_error(
1207                msg,
1208                MessageWriteVec::with_capacity(MessageHeader::LENGTH + error.padded_len() + 24 + 8),
1209            );
1210            response.add_attribute(&error).unwrap();
1211            response.add_fingerprint().unwrap();
1212            return Err(response);
1213        };
1214
1215        let mut connection_id = None;
1216
1217        let mut unknown_attributes = smallvec::SmallVec::<[AttributeType; 4]>::default();
1218        for (_offset, attr) in msg.iter_attributes() {
1219            match attr.get_type() {
1220                // handled by validate_stun
1221                Username::TYPE
1222                | Userhash::TYPE
1223                | Realm::TYPE
1224                | Nonce::TYPE
1225                | PasswordAlgorithm::TYPE
1226                | MessageIntegrity::TYPE
1227                | MessageIntegritySha256::TYPE => (),
1228                ConnectionId::TYPE => {
1229                    connection_id = ConnectionId::from_raw(attr).ok().map(|r| r.id())
1230                }
1231                atype => {
1232                    if atype.comprehension_required() {
1233                        unknown_attributes.push(atype);
1234                    }
1235                }
1236            }
1237        }
1238
1239        // If the request does not contain the CONNECTION-ID attribute, or if
1240        // this attribute does not refer to an existing pending connection, the
1241        // server MUST return a 400 (Bad Request) error.
1242        let Some(connection_id) = connection_id else {
1243            trace!("missing connection id");
1244            return Err(Self::bad_request(msg, 0));
1245        };
1246        let Some(idx) = self
1247            .pending_tcp_connection_binds
1248            .iter()
1249            .position(|pending| {
1250                pending.connection_id == connection_id && pending.listen_addr == transmit.to
1251            })
1252        else {
1253            trace!("no pending connection with id {connection_id}");
1254            return Err(Self::bad_request(msg, 0));
1255        };
1256
1257        let pending = &self.pending_tcp_connection_binds[idx];
1258        // need to validate based on the client control connection.
1259        let client_transmit = Transmit::new(
1260            transmit.data,
1261            TransportType::Tcp,
1262            pending.client_control_addr,
1263            pending.listen_addr,
1264        );
1265        self.validate_stun(&client_transmit, now)?;
1266
1267        if !unknown_attributes.is_empty() {
1268            trace!("unknown attributes: {unknown_attributes:?}");
1269            let err = Message::unknown_attributes(
1270                msg,
1271                &unknown_attributes,
1272                MessageWriteVec::with_capacity(64),
1273            );
1274            let err = self
1275                .auth
1276                .sign_outgoing_message(err, client_transmit.from)
1277                .unwrap();
1278            return Err(err);
1279        }
1280
1281        // only once the incoming credentials are validated can we remove the pending request.
1282        let pending = self.pending_tcp_connection_binds.swap_remove(idx);
1283
1284        let Some(client) = self.mut_client_from_5tuple(
1285            client_transmit.transport,
1286            client_transmit.to,
1287            client_transmit.from,
1288        ) else {
1289            trace!("allocation mismatch, no client");
1290            return Err(Self::allocation_mismatch(
1291                &mut self.auth,
1292                msg,
1293                transmit.from,
1294            ));
1295        };
1296
1297        if client.allocations.iter().all(|alloc| {
1298            alloc.addr.is_ipv4() != pending.peer_addr.is_ipv4() || alloc.expires_at < now
1299        }) {
1300            // no allocation with
1301            trace!("allocation mismatch, no allocation");
1302            return Err(Self::allocation_mismatch(
1303                &mut self.auth,
1304                msg,
1305                transmit.from,
1306            ));
1307        };
1308
1309        *tcp_stun_change = Some(TcpStunChange::Data {
1310            client_data_addr: transmit.from,
1311            listen_addr: pending.listen_addr,
1312            relayed_addr: pending.relayed_addr,
1313            peer_addr: pending.peer_addr,
1314        });
1315
1316        // TODO: state changes required for sending/receiving TCP
1317
1318        debug!("TCP connection bound for pending {pending:?}");
1319
1320        let msg = Message::builder_success(msg, MessageWriteVec::new());
1321        let msg = self
1322            .auth
1323            .sign_outgoing_message(msg, client_transmit.from)
1324            .unwrap();
1325        Ok(Transmit::new(
1326            msg.finish(),
1327            transmit.transport,
1328            transmit.to,
1329            transmit.from,
1330        ))
1331    }
1332
1333    fn handle_stun_send_indication(
1334        &mut self,
1335        transmit: Transmit<&Message<'_>>,
1336        now: Instant,
1337    ) -> Result<
1338        (
1339            TransportType,
1340            SocketAddr,
1341            SocketAddr,
1342            core::ops::Range<usize>,
1343        ),
1344        (),
1345    > {
1346        let msg = transmit.data;
1347        let mut peer_address = None;
1348        let mut data = None;
1349
1350        for (offset, attr) in msg.iter_attributes() {
1351            match attr.get_type() {
1352                XorPeerAddress::TYPE => {
1353                    peer_address = Some(
1354                        XorPeerAddress::from_raw(attr)
1355                            .map_err(|_| ())?
1356                            .addr(msg.transaction_id()),
1357                    );
1358                }
1359                AData::TYPE => data = AData::from_raw(attr).ok().map(|adata| (offset + 4, adata)),
1360                atype => {
1361                    if atype.comprehension_required() {
1362                        return Err(());
1363                    }
1364                }
1365            }
1366        }
1367        let Some((peer_address, (offset, data))) = peer_address.zip(data) else {
1368            return Err(());
1369        };
1370
1371        let Some(client) = self.client_from_5tuple(transmit.transport, transmit.to, transmit.from)
1372        else {
1373            trace!(
1374                "no client for transport {} from {}, to {}",
1375                transmit.transport,
1376                transmit.from,
1377                transmit.to
1378            );
1379            return Err(());
1380        };
1381
1382        let Some(alloc) = client
1383            .allocations
1384            .iter()
1385            .find(|allocation| allocation.addr.ip().is_ipv4() == peer_address.is_ipv4())
1386        else {
1387            trace!("no allocation available");
1388            return Err(());
1389        };
1390        if now > alloc.expires_at {
1391            debug!("{} allocation {} expired", alloc.ttype, alloc.addr);
1392            return Err(());
1393        }
1394
1395        let Some(_permission) = alloc.have_permission(peer_address.ip(), now) else {
1396            trace!("no permission for {}", peer_address);
1397            return Err(());
1398        };
1399
1400        trace!("have {} to send to {:?}", data.data().len(), peer_address);
1401        Ok((
1402            alloc.ttype,
1403            alloc.addr,
1404            peer_address,
1405            offset..offset + data.data().len(),
1406        ))
1407    }
1408
1409    #[tracing::instrument(
1410        name = "turn_server_handle_stun",
1411        skip(self, transmit, now, tcp_stun_change, peer_tcp),
1412        fields(
1413            msg.transaction = %transmit.data.transaction_id(),
1414            msg.method = %transmit.data.method(),
1415        )
1416    )]
1417    fn handle_stun(
1418        &mut self,
1419        peer_tcp: &BTreeMap<(SocketAddr, SocketAddr), PeerTcp>,
1420        transmit: Transmit<&Message<'_>>,
1421        tcp_type: TcpStunType,
1422        now: Instant,
1423        tcp_stun_change: &mut Option<TcpStunChange>,
1424    ) -> Result<Option<InternalHandleStun>, MessageWriteVec> {
1425        trace!("received STUN message {}", transmit.data);
1426        let ret = if transmit
1427            .data
1428            .has_class(stun_proto::types::message::MessageClass::Request)
1429        {
1430            match transmit.data.method() {
1431                BINDING if matches!(tcp_type, TcpStunType::Control | TcpStunType::Unknown) => self
1432                    .handle_stun_binding(transmit, now)
1433                    .map(|t| Some(InternalHandleStun::Transmit(t))),
1434                ALLOCATE if matches!(tcp_type, TcpStunType::Unknown | TcpStunType::Control) => self
1435                    .handle_stun_allocate(transmit, tcp_type, now, tcp_stun_change)
1436                    .map(|_| None),
1437                REFRESH if matches!(tcp_type, TcpStunType::Control) => self
1438                    .handle_stun_refresh(transmit, now, tcp_stun_change)
1439                    .map(|t| Some(InternalHandleStun::Transmit(t))),
1440                CREATE_PERMISSION if matches!(tcp_type, TcpStunType::Control) => self
1441                    .handle_stun_create_permission(transmit, now)
1442                    .map(|t| Some(InternalHandleStun::Transmit(t))),
1443                CHANNEL_BIND if matches!(tcp_type, TcpStunType::Control) => self
1444                    .handle_stun_channel_bind(transmit, now)
1445                    .map(|t| Some(InternalHandleStun::Transmit(t))),
1446                CONNECT if matches!(tcp_type, TcpStunType::Control) => self
1447                    .handle_stun_connect(peer_tcp, transmit, now)
1448                    .map(|_| None),
1449                CONNECTION_BIND if matches!(tcp_type, TcpStunType::Unknown) => self
1450                    .handle_stun_connection_bind(transmit, now, tcp_stun_change)
1451                    .map(|t| Some(InternalHandleStun::Transmit(t))),
1452                _ => {
1453                    self.validate_stun(&transmit, now)?;
1454                    let Some(client) =
1455                        self.mut_client_from_5tuple(transmit.transport, transmit.to, transmit.from)
1456                    else {
1457                        return Err(Self::allocation_mismatch(
1458                            &mut self.auth,
1459                            transmit.data,
1460                            transmit.from,
1461                        ));
1462                    };
1463
1464                    if client
1465                        .allocations
1466                        .iter()
1467                        .all(|alloc| alloc.expires_at < now)
1468                    {
1469                        return Err(Self::allocation_mismatch(
1470                            &mut self.auth,
1471                            transmit.data,
1472                            transmit.from,
1473                        ));
1474                    }
1475
1476                    Err(Self::bad_request_signed(
1477                        &mut self.auth,
1478                        transmit.data,
1479                        transmit.from,
1480                    ))
1481                }
1482            }
1483        } else if transmit
1484            .data
1485            .has_class(stun_proto::types::message::MessageClass::Indication)
1486        {
1487            match transmit.data.method() {
1488                SEND if tcp_type == TcpStunType::Control => Ok(self
1489                    .handle_stun_send_indication(transmit, now)
1490                    .ok()
1491                    .map(|(transport, from, to, range)| {
1492                        InternalHandleStun::Data(transport, from, to, range)
1493                    })),
1494                _ => Ok(None),
1495            }
1496        } else if transmit.data.class().is_response() {
1497            match transmit.data.method() {
1498                CONNECTION_ATTEMPT if tcp_type == TcpStunType::Control => {
1499                    // TODO: handle connection attempt
1500                    Ok(None)
1501                }
1502                _ => Ok(None),
1503            }
1504        } else {
1505            Ok(None)
1506        };
1507        ret
1508    }
1509
1510    fn client_from_5tuple(
1511        &self,
1512        ttype: TransportType,
1513        local_addr: SocketAddr,
1514        remote_addr: SocketAddr,
1515    ) -> Option<&Client> {
1516        self.clients.iter().find(|client| {
1517            client.transport == ttype
1518                && client.remote_addr == remote_addr
1519                && client.local_addr == local_addr
1520        })
1521    }
1522
1523    fn mut_client_from_5tuple(
1524        &mut self,
1525        ttype: TransportType,
1526        local_addr: SocketAddr,
1527        remote_addr: SocketAddr,
1528    ) -> Option<&mut Client> {
1529        self.clients.iter_mut().find(|client| {
1530            client.transport == ttype
1531                && client.remote_addr == remote_addr
1532                && client.local_addr == local_addr
1533        })
1534    }
1535
1536    fn remove_client_by_5tuple(
1537        &mut self,
1538        ttype: TransportType,
1539        local_addr: SocketAddr,
1540        remote_addr: SocketAddr,
1541    ) -> Option<Client> {
1542        info!("attempting to remove client {ttype}, {remote_addr} -> {local_addr}");
1543        if let Some(idx) = self.clients.iter().position(|client| {
1544            client.transport == ttype
1545                && client.remote_addr == remote_addr
1546                && client.local_addr == local_addr
1547        }) {
1548            Some(self.clients.swap_remove(idx))
1549        } else {
1550            None
1551        }
1552    }
1553
1554    fn allocation_from_public_5tuple(
1555        &self,
1556        ttype: TransportType,
1557        local_addr: SocketAddr,
1558        remote_addr: SocketAddr,
1559    ) -> Option<(&Client, &Allocation, &Permission)> {
1560        self.clients.iter().find_map(|client| {
1561            client
1562                .allocations
1563                .iter()
1564                .find_map(|allocation| {
1565                    if allocation.ttype == ttype && allocation.addr == local_addr {
1566                        allocation
1567                            .permissions
1568                            .iter()
1569                            .find(|permission| permission.addr == remote_addr.ip())
1570                            .map(|permission| (allocation, permission))
1571                    } else {
1572                        None
1573                    }
1574                })
1575                .map(|(allocation, permission)| (client, allocation, permission))
1576        })
1577    }
1578
1579    fn handle_channel(
1580        &mut self,
1581        transport: TransportType,
1582        from: SocketAddr,
1583        to: SocketAddr,
1584        channel: ChannelData<'_>,
1585        now: Instant,
1586    ) -> Option<ForwardChannelData> {
1587        let Some(client) = self.client_from_5tuple(transport, to, from) else {
1588            trace!(
1589                "No handler for {} bytes over {:?} from {:?}, to {:?}. Ignoring",
1590                channel.data().len() + 4,
1591                transport,
1592                from,
1593                to
1594            );
1595            return None;
1596        };
1597        trace!(
1598            "received channel {} with {} bytes from {:?}",
1599            channel.id(),
1600            channel.data().len(),
1601            from
1602        );
1603        let Some((allocation, existing)) = client.allocations.iter().find_map(|allocation| {
1604            allocation
1605                .channel_from_id(channel.id())
1606                .map(|perm| (allocation, perm))
1607        }) else {
1608            warn!(
1609                "no channel id {} for this client {:?}",
1610                channel.id(),
1611                client.remote_addr
1612            );
1613            // no channel with that id
1614            return None;
1615        };
1616        if existing.expires_at < now {
1617            trace!(
1618                "channel for {from} expired {:?} ago",
1619                now - existing.expires_at
1620            );
1621            return None;
1622        }
1623
1624        // A packet from the client needs to be sent to the peer referenced by the
1625        // configured channel.
1626        let Some(permission) = allocation.permission_from_5tuple(
1627            allocation.ttype,
1628            allocation.addr,
1629            existing.peer_addr,
1630        ) else {
1631            warn!(
1632                "no permission for {:?} for this allocation {:?}",
1633                existing.peer_addr, allocation.addr
1634            );
1635            return None;
1636        };
1637        if permission.expires_at < now {
1638            trace!(
1639                "permission for {from} expired {:?} ago",
1640                now - permission.expires_at
1641            );
1642            return None;
1643        }
1644        Some(ForwardChannelData {
1645            transport: allocation.ttype,
1646            from: allocation.addr,
1647            to: existing.peer_addr,
1648        })
1649    }
1650
1651    fn handle_listen_tcp_stored_message(
1652        &mut self,
1653        peer_tcp: &BTreeMap<(SocketAddr, SocketAddr), PeerTcp>,
1654        remote_addr: SocketAddr,
1655        data: Vec<u8>,
1656        tcp_type: TcpStunType,
1657        now: Instant,
1658        tcp_stun_change: &mut Option<TcpStunChange>,
1659    ) -> Option<Transmit<Vec<u8>>> {
1660        let listen_addr = self.stun.local_addr();
1661        let Ok(msg) = Message::from_bytes(&data) else {
1662            return None;
1663        };
1664        let msg_transmit = Transmit::new(&msg, TransportType::Tcp, remote_addr, listen_addr);
1665        match self.handle_stun(peer_tcp, msg_transmit, tcp_type, now, tcp_stun_change) {
1666            Err(builder) => Some(Transmit::new(
1667                builder.finish(),
1668                TransportType::Tcp,
1669                listen_addr,
1670                remote_addr,
1671            )),
1672            Ok(Some(InternalHandleStun::Transmit(transmit))) => Some(transmit),
1673            Ok(Some(InternalHandleStun::Data(transport, from, to, range))) => Some(Transmit::new(
1674                data[range.start..range.end].to_vec(),
1675                transport,
1676                from,
1677                to,
1678            )),
1679            Ok(None) => None,
1680        }
1681    }
1682
1683    fn connection_attempt(
1684        auth: &mut LongTermServerAuth,
1685        to: SocketAddr,
1686        connection_id: u32,
1687        peer_addr: SocketAddr,
1688    ) -> MessageWriteVec {
1689        let transaction_id = TransactionId::generate();
1690        let connection_id = ConnectionId::new(connection_id);
1691        let peer_addr = XorPeerAddress::new(peer_addr, transaction_id);
1692        let mut response = Message::builder(
1693            MessageType::from_class_method(MessageClass::Request, CONNECTION_ATTEMPT),
1694            transaction_id,
1695            MessageWriteVec::with_capacity(
1696                MessageHeader::LENGTH
1697                    + connection_id.padded_len()
1698                    + auth.message_signature_bytes(to, false)
1699                    + 8,
1700            ),
1701        );
1702        response.add_attribute(&connection_id).unwrap();
1703        response.add_attribute(&peer_addr).unwrap();
1704        let mut response = auth.sign_outgoing_message(response, to).unwrap();
1705        response.add_fingerprint().unwrap();
1706        response
1707    }
1708}
1709
1710#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1711enum TcpStunType {
1712    // TCP connection type not determined yet.
1713    Unknown,
1714    // Control connection. Always STUN messages.
1715    Control,
1716}
1717
1718#[derive(Debug)]
1719enum TcpStunChange {
1720    Control,
1721    Data {
1722        client_data_addr: SocketAddr,
1723        listen_addr: SocketAddr,
1724        relayed_addr: SocketAddr,
1725        peer_addr: SocketAddr,
1726    },
1727    Delete(Vec<Allocation>),
1728}
1729
1730impl TurnServer {
1731    /// Construct a new [`TurnServer`]
1732    ///
1733    /// # Examples
1734    /// ```
1735    /// # use turn_server_proto::server::TurnServer;
1736    /// # use turn_server_proto::api::TurnServerApi;
1737    /// # use stun_proto::types::TransportType;
1738    /// let realm = String::from("realm");
1739    /// let listen_addr = "10.0.0.1:3478".parse().unwrap();
1740    /// let server = TurnServer::new(TransportType::Udp, listen_addr, realm);
1741    /// assert_eq!(server.listen_address(), listen_addr);
1742    /// ```
1743    pub fn new(ttype: TransportType, listen_addr: SocketAddr, realm: String) -> Self {
1744        let stun = StunAgent::builder(ttype, listen_addr).build();
1745        let mut auth = LongTermServerAuth::new(realm);
1746        auth.set_supported_integrity(IntegrityAlgorithm::Sha1);
1747        auth.add_supported_integrity(IntegrityAlgorithm::Sha256);
1748        auth.set_anonymous_username(Feature::Auto);
1749        Self {
1750            protocol: TurnServerProtocol {
1751                stun,
1752                auth,
1753                clients: vec![],
1754                pending_transmits: VecDeque::default(),
1755                pending_allocates: VecDeque::default(),
1756                pending_socket_removals: VecDeque::default(),
1757                pending_socket_listen_removals: VecDeque::default(),
1758                tcp_connection_id: 0,
1759                pending_tcp_connection_binds: Default::default(),
1760            },
1761            incoming_tcp_buffers: Default::default(),
1762            peer_tcp: Default::default(),
1763        }
1764    }
1765
1766    /// The [`TransportType`] of this TURN server.
1767    pub fn transport(&self) -> TransportType {
1768        self.protocol.stun.transport()
1769    }
1770
1771    fn remove_allocation_resources(
1772        allocation: &mut Allocation,
1773        peer_tcp: &mut BTreeMap<(SocketAddr, SocketAddr), PeerTcp>,
1774        incoming_tcp_buffers: &mut BTreeMap<(SocketAddr, SocketAddr), TcpBuffer>,
1775        pending_socket_removals: &mut VecDeque<Socket5Tuple>,
1776        pending_socket_listen_removals: &mut VecDeque<(TransportType, SocketAddr)>,
1777        pending_tcp_connection_binds: &mut Vec<PendingConnectionBind>,
1778    ) {
1779        trace!("removing allocation: {allocation:?}");
1780        let mut remove_peer_connections = vec![];
1781        let mut remove_client_connections = vec![];
1782        for pending in allocation.pending_tcp_connect.drain(..) {
1783            trace!(
1784                "removing pending tcp connection to peer {} from {}",
1785                pending.peer_addr,
1786                pending.relayed_addr
1787            );
1788            pending_socket_removals.push_back(Socket5Tuple {
1789                transport: TransportType::Tcp,
1790                local_addr: pending.relayed_addr,
1791                remote_addr: pending.peer_addr,
1792            });
1793        }
1794        peer_tcp.retain(|&(relayed_addr, peer_addr), peer_tcp| {
1795            if relayed_addr == allocation.addr {
1796                remove_peer_connections.push((relayed_addr, peer_addr));
1797                trace!(
1798                    "removing tcp peer connection from {} to {}",
1799                    relayed_addr,
1800                    peer_addr
1801                );
1802                pending_socket_removals.push_back(Socket5Tuple {
1803                    transport: TransportType::Tcp,
1804                    local_addr: relayed_addr,
1805                    remote_addr: peer_addr,
1806                });
1807                if let PeerTcp::Passthrough {
1808                    client_addr,
1809                    listen_addr,
1810                    pending_data: _,
1811                } = peer_tcp
1812                {
1813                    remove_client_connections.push((*client_addr, *listen_addr));
1814                }
1815                false
1816            } else {
1817                true
1818            }
1819        });
1820        Self::remove_tcp_resources(
1821            remove_peer_connections,
1822            remove_client_connections,
1823            incoming_tcp_buffers,
1824            pending_socket_removals,
1825            pending_tcp_connection_binds,
1826        );
1827        pending_socket_listen_removals.push_back((allocation.ttype, allocation.addr));
1828    }
1829
1830    fn remove_tcp_resources(
1831        remove_peer_connections: Vec<(SocketAddr, SocketAddr)>,
1832        remove_client_connections: Vec<(SocketAddr, SocketAddr)>,
1833        incoming_tcp_buffers: &mut BTreeMap<(SocketAddr, SocketAddr), TcpBuffer>,
1834        pending_socket_removals: &mut VecDeque<Socket5Tuple>,
1835        pending_tcp_connection_binds: &mut Vec<PendingConnectionBind>,
1836    ) {
1837        incoming_tcp_buffers.retain(|&(client_addr, listen_addr), _tcp_buffer| {
1838            if remove_client_connections.contains(&(client_addr, listen_addr)) {
1839                trace!(
1840                    "removing tcp connection to client {} from {}",
1841                    client_addr,
1842                    listen_addr
1843                );
1844                pending_socket_removals.push_back(Socket5Tuple {
1845                    transport: TransportType::Tcp,
1846                    local_addr: listen_addr,
1847                    remote_addr: client_addr,
1848                });
1849                false
1850            } else {
1851                true
1852            }
1853        });
1854        pending_tcp_connection_binds.retain(|pending| {
1855            !remove_peer_connections.contains(&(pending.relayed_addr, pending.peer_addr))
1856        });
1857    }
1858}
1859
1860impl TurnServerApi for TurnServer {
1861    fn add_user(&mut self, username: String, password: String) {
1862        self.protocol
1863            .auth
1864            .add_user(LongTermCredentials::new(username, password));
1865    }
1866
1867    fn listen_address(&self) -> SocketAddr {
1868        self.protocol.stun.local_addr()
1869    }
1870
1871    fn set_nonce_expiry_duration(&mut self, expiry_duration: Duration) {
1872        self.protocol
1873            .auth
1874            .set_nonce_expiry_duration(expiry_duration);
1875    }
1876
1877    #[tracing::instrument(
1878        name = "turn_server_recv_icmp",
1879        skip(self, bytes, now),
1880        fields(
1881            data_len = bytes.as_ref().len(),
1882        )
1883    )]
1884    fn recv_icmp<T: AsRef<[u8]>>(
1885        &mut self,
1886        family: AddressFamily,
1887        bytes: T,
1888        now: Instant,
1889    ) -> Option<Transmit<Vec<u8>>> {
1890        use pnet_packet::udp;
1891        let bytes = bytes.as_ref();
1892        trace!("have incoming icmp data");
1893        if bytes.len() < 8 {
1894            return None;
1895        }
1896
1897        let icmpv4;
1898        let ipv4;
1899        let icmpv6;
1900        let ipv6;
1901        let source;
1902        let destination;
1903        let icmp_code;
1904        let icmp_type;
1905        let icmp_data;
1906        let payload = match family {
1907            AddressFamily::IPV4 => {
1908                use pnet_packet::{icmp, ipv4};
1909                icmpv4 = icmp::IcmpPacket::new(bytes)?;
1910                trace!("parsed icmp: {icmpv4:?}");
1911                icmp_code = icmpv4.get_icmp_code().0;
1912                icmp_type = icmpv4.get_icmp_type().0;
1913                // the server verifies that the type is either 3 or 11 for an ICMPv4
1914                if ![
1915                    icmp::IcmpTypes::DestinationUnreachable,
1916                    icmp::IcmpTypes::TimeExceeded,
1917                ]
1918                .contains(&icmpv4.get_icmp_type())
1919                {
1920                    debug!("ICMPv4 is not an actionable type");
1921                    return None;
1922                }
1923                if icmpv4.get_icmp_type() == icmp::IcmpTypes::DestinationUnreachable &&
1924                    icmpv4.get_icmp_code() == icmp::destination_unreachable::IcmpCodes::FragmentationRequiredAndDFFlagSet
1925                {
1926                    icmp_data = BigEndian::read_u32(icmpv4.payload());
1927                } else {
1928                    icmp_data = 0;
1929                };
1930                ipv4 = ipv4::Ipv4Packet::new(&icmpv4.payload()[4..])?;
1931                trace!("parsed ipv4: {ipv4:?}");
1932                source = IpAddr::V4(ipv4.get_source().octets().into());
1933                destination = IpAddr::V4(ipv4.get_destination().octets().into());
1934                ipv4.payload()
1935            }
1936            AddressFamily::IPV6 => {
1937                use pnet_packet::{icmpv6, ipv6};
1938                icmpv6 = icmpv6::Icmpv6Packet::new(bytes)?;
1939                icmp_type = icmpv6.get_icmpv6_type().0;
1940                icmp_code = icmpv6.get_icmpv6_code().0;
1941                // the server verifies that the type is either 1, 2, or 3 for an ICMPv6
1942                if ![
1943                    icmpv6::Icmpv6Types::DestinationUnreachable,
1944                    icmpv6::Icmpv6Types::PacketTooBig,
1945                    icmpv6::Icmpv6Types::TimeExceeded,
1946                ]
1947                .contains(&icmpv6.get_icmpv6_type())
1948                {
1949                    debug!("ICMPv6 is not an actionable type");
1950                    return None;
1951                }
1952                if icmpv6.get_icmpv6_type() == icmpv6::Icmpv6Types::PacketTooBig {
1953                    icmp_data = BigEndian::read_u32(icmpv6.payload());
1954                } else {
1955                    icmp_data = 0;
1956                };
1957                ipv6 = ipv6::Ipv6Packet::new(&icmpv6.payload()[4..])?;
1958                trace!("parsed ipv6: {ipv6:?}");
1959                source = IpAddr::V6(ipv6.get_source().segments().into());
1960                destination = IpAddr::V6(ipv6.get_destination().segments().into());
1961                ipv6.payload()
1962            }
1963        };
1964        let udp = udp::UdpPacket::new(payload)?;
1965        let source = SocketAddr::new(source, udp.get_source());
1966        let destination = SocketAddr::new(destination, udp.get_destination());
1967        let (client, allocation, permission) =
1968            self.protocol
1969                .allocation_from_public_5tuple(TransportType::Udp, source, destination)?;
1970        if allocation.expires_at < now || permission.expires_at < now {
1971            return None;
1972        }
1973
1974        info!(
1975            "sending ICMP (type:{icmp_type}, code:{icmp_code}, data{icmp_data}) DATA indication to client {}",
1976            client.remote_addr
1977        );
1978        let transaction_id = TransactionId::generate();
1979        let xor_addr = XorPeerAddress::new(destination, transaction_id);
1980        let icmp = Icmp::new(icmp_type, icmp_code, icmp_data);
1981        let mut msg = Message::builder(
1982            MessageType::from_class_method(MessageClass::Indication, DATA),
1983            transaction_id,
1984            MessageWriteVec::with_capacity(
1985                MessageHeader::LENGTH + xor_addr.padded_len() + icmp.padded_len(),
1986            ),
1987        );
1988        msg.add_attribute(&xor_addr).unwrap();
1989        msg.add_attribute(&icmp).unwrap();
1990        self.protocol
1991            .stun
1992            .send(msg.finish(), client.remote_addr, now)
1993            .ok()
1994    }
1995
1996    #[tracing::instrument(
1997        name = "turn_server_recv",
1998        skip(self, transmit, now),
1999        fields(
2000            transport = %transmit.transport,
2001            remote_addr = %transmit.from,
2002            local_addr = %transmit.to,
2003            data_len = transmit.data.as_ref().len(),
2004        )
2005    )]
2006    fn recv<T: AsRef<[u8]> + core::fmt::Debug>(
2007        &mut self,
2008        transmit: Transmit<T>,
2009        now: Instant,
2010    ) -> Option<TransmitBuild<DelayedMessageOrChannelSend<T>>> {
2011        trace!("executing at {now:?}");
2012        if let Some((client, allocation, permission)) = self.protocol.allocation_from_public_5tuple(
2013            transmit.transport,
2014            transmit.to,
2015            transmit.from,
2016        ) {
2017            // A packet from the relayed address needs to be sent to the client that set up
2018            // the allocation.
2019            if permission.expires_at < now {
2020                trace!(
2021                    "permission for {} expired {:?} ago",
2022                    permission.addr,
2023                    now - permission.expires_at
2024                );
2025                return None;
2026            }
2027
2028            if allocation.ttype == TransportType::Tcp {
2029                let connection_id = self.protocol.tcp_connection_id;
2030
2031                if let Some(peer_tcp) = self.peer_tcp.get_mut(&(transmit.to, transmit.from)) {
2032                    match peer_tcp {
2033                        PeerTcp::PendingConnectionBind {
2034                            peer_data,
2035                            expires_at: _,
2036                        } => {
2037                            peer_data.extend_from_slice(transmit.data.as_ref());
2038                            return None;
2039                        }
2040                        PeerTcp::Passthrough {
2041                            client_addr,
2042                            listen_addr,
2043                            pending_data,
2044                        } => {
2045                            if pending_data.is_empty() {
2046                                let len = transmit.data.as_ref().len();
2047                                if len > 0 {
2048                                    return Some(TransmitBuild::new(
2049                                        DelayedMessageOrChannelSend::Range(transmit.data, 0..len),
2050                                        TransportType::Tcp,
2051                                        *listen_addr,
2052                                        *client_addr,
2053                                    ));
2054                                } else {
2055                                    let client_addr = *client_addr;
2056                                    let listen_addr = *listen_addr;
2057                                    if self
2058                                        .incoming_tcp_buffers
2059                                        .remove(&(client_addr, listen_addr))
2060                                        .is_some()
2061                                    {
2062                                        self.protocol.pending_socket_removals.push_back(
2063                                            Socket5Tuple {
2064                                                transport: TransportType::Tcp,
2065                                                local_addr: listen_addr,
2066                                                remote_addr: client_addr,
2067                                            },
2068                                        );
2069                                    }
2070                                    self.peer_tcp.remove(&(transmit.to, transmit.from));
2071                                    return None;
2072                                }
2073                            } else {
2074                                let mut peer_data = core::mem::take(pending_data);
2075                                peer_data.extend_from_slice(transmit.data.as_ref());
2076                                return Some(TransmitBuild::new(
2077                                    DelayedMessageOrChannelSend::Owned(peer_data),
2078                                    TransportType::Tcp,
2079                                    *listen_addr,
2080                                    *client_addr,
2081                                ));
2082                            }
2083                        }
2084                    }
2085                } else {
2086                    // No TCP connection set up for this peer address. Ask the client if they want
2087                    // to accept this peer.
2088                    let Some((allocation, msg, listen_addr, client_addr)) =
2089                        self.protocol.clients.iter_mut().find_map(|client| {
2090                            client
2091                                .allocations
2092                                .iter_mut()
2093                                .find(|allocation| {
2094                                    allocation.ttype == TransportType::Tcp
2095                                        && allocation.addr == transmit.to
2096                                        && allocation
2097                                            .permissions
2098                                            .iter()
2099                                            .any(|permission| permission.addr == transmit.from.ip())
2100                                })
2101                                .map(|allocation| {
2102                                    let msg = TurnServerProtocol::connection_attempt(
2103                                        &mut self.protocol.auth,
2104                                        client.remote_addr,
2105                                        connection_id,
2106                                        transmit.from,
2107                                    );
2108                                    (allocation, msg, client.local_addr, client.remote_addr)
2109                                })
2110                        })
2111                    else {
2112                        // unknown relayed address
2113                        return None;
2114                    };
2115                    let relayed_addr = allocation.addr;
2116                    self.protocol.tcp_connection_id =
2117                        self.protocol.tcp_connection_id.wrapping_add(1);
2118
2119                    self.peer_tcp.insert(
2120                        (transmit.to, transmit.from),
2121                        PeerTcp::PendingConnectionBind {
2122                            peer_data: transmit.data.as_ref().to_vec(),
2123                            expires_at: now + TCP_PEER_CONNECTION_TIMEOUT,
2124                        },
2125                    );
2126                    self.protocol
2127                        .pending_tcp_connection_binds
2128                        .push(PendingConnectionBind {
2129                            connection_id,
2130                            listen_addr,
2131                            relayed_addr,
2132                            peer_addr: transmit.from,
2133                            client_control_addr: client_addr,
2134                        });
2135                    return Some(TransmitBuild::new(
2136                        DelayedMessageOrChannelSend::Owned(msg.finish()),
2137                        TransportType::Tcp,
2138                        listen_addr,
2139                        client_addr,
2140                    ));
2141                }
2142            }
2143
2144            if let Some(existing) =
2145                allocation.channel_from_5tuple(transmit.transport, transmit.to, transmit.from)
2146            {
2147                debug!(
2148                    "found existing channel {} for {:?} for this allocation {:?}",
2149                    existing.id, transmit.from, allocation.addr
2150                );
2151                Some(TransmitBuild::new(
2152                    DelayedMessageOrChannelSend::Channel(DelayedChannel::new(
2153                        existing.id,
2154                        transmit.data,
2155                    )),
2156                    client.transport,
2157                    client.local_addr,
2158                    client.remote_addr,
2159                ))
2160            } else {
2161                // no channel with that id
2162                debug!(
2163                    "no channel for {:?} for this allocation {:?}, using DATA indication",
2164                    transmit.from, allocation.addr
2165                );
2166
2167                Some(TransmitBuild::new(
2168                    DelayedMessageOrChannelSend::Message(DelayedMessage::for_client(
2169                        transmit.from,
2170                        transmit.data,
2171                    )),
2172                    client.transport,
2173                    client.local_addr,
2174                    client.remote_addr,
2175                ))
2176            }
2177        } else if transmit.transport == self.protocol.stun.transport()
2178            && transmit.to == self.protocol.stun.local_addr()
2179        {
2180            match transmit.transport {
2181                TransportType::Tcp => {
2182                    let mut incoming_tcp_hoist;
2183                    let incoming_tcp = if transmit.data.as_ref().is_empty() {
2184                        incoming_tcp_hoist = self
2185                            .incoming_tcp_buffers
2186                            .remove(&(transmit.from, transmit.to))?;
2187                        &mut incoming_tcp_hoist
2188                    } else {
2189                        self.incoming_tcp_buffers
2190                            .entry((transmit.from, transmit.to))
2191                            .or_insert_with(|| TcpBuffer::Unknown(TurnTcpBuffer::new()))
2192                    };
2193                    let (tcp_type, tcp_buffer) = match incoming_tcp {
2194                        TcpBuffer::Unknown(tcp_buffer) => (TcpStunType::Unknown, tcp_buffer),
2195                        TcpBuffer::Control(tcp_buffer) => (TcpStunType::Control, tcp_buffer),
2196                        TcpBuffer::Passthrough {
2197                            relayed_addr,
2198                            peer_addr,
2199                            pending_data,
2200                        } => {
2201                            if pending_data.is_empty() {
2202                                let len = transmit.data.as_ref().len();
2203                                return Some(TransmitBuild::new(
2204                                    DelayedMessageOrChannelSend::Range(transmit.data, 0..len),
2205                                    TransportType::Tcp,
2206                                    *relayed_addr,
2207                                    *peer_addr,
2208                                ));
2209                            } else {
2210                                let mut peer_data = core::mem::take(pending_data);
2211                                peer_data.extend_from_slice(transmit.data.as_ref());
2212                                return Some(TransmitBuild::new(
2213                                    DelayedMessageOrChannelSend::Owned(peer_data),
2214                                    TransportType::Tcp,
2215                                    *relayed_addr,
2216                                    *peer_addr,
2217                                ));
2218                            }
2219                        }
2220                    };
2221
2222                    match tcp_buffer.incoming_tcp(transmit) {
2223                        None => None,
2224                        Some(IncomingTcp::CompleteMessage(transmit, range)) => {
2225                            let Ok(msg) = Message::from_bytes(
2226                                &transmit.data.as_ref()[range.start..range.end],
2227                            ) else {
2228                                return None;
2229                            };
2230                            let msg_transmit =
2231                                Transmit::new(&msg, transmit.transport, transmit.from, transmit.to);
2232                            let mut tcp_stun_change = None;
2233                            let ret = self.protocol.handle_stun(
2234                                &self.peer_tcp,
2235                                msg_transmit,
2236                                tcp_type,
2237                                now,
2238                                &mut tcp_stun_change,
2239                            );
2240                            if let Some(tcp_stun_change) = tcp_stun_change {
2241                                debug!("have tcp connection type change to {tcp_stun_change:?}");
2242                                match tcp_stun_change {
2243                                    TcpStunChange::Control => {
2244                                        *incoming_tcp =
2245                                            TcpBuffer::Control(core::mem::take(tcp_buffer));
2246                                    }
2247                                    TcpStunChange::Data {
2248                                        client_data_addr,
2249                                        listen_addr,
2250                                        relayed_addr,
2251                                        peer_addr,
2252                                    } => {
2253                                        *incoming_tcp = TcpBuffer::Passthrough {
2254                                            relayed_addr,
2255                                            peer_addr,
2256                                            pending_data: core::mem::take(tcp_buffer).into_inner(),
2257                                        };
2258                                        self.peer_tcp
2259                                            .entry((relayed_addr, peer_addr))
2260                                            .and_modify(|peer_tcp| {
2261                                                if let PeerTcp::PendingConnectionBind {
2262                                                    peer_data,
2263                                                    expires_at: _,
2264                                                } = peer_tcp
2265                                                {
2266                                                    *peer_tcp = PeerTcp::Passthrough {
2267                                                        client_addr: client_data_addr,
2268                                                        listen_addr,
2269                                                        pending_data: core::mem::take(peer_data),
2270                                                    };
2271                                                }
2272                                            })
2273                                            .or_insert_with(|| PeerTcp::Passthrough {
2274                                                client_addr: client_data_addr,
2275                                                listen_addr,
2276                                                pending_data: Vec::new(),
2277                                            });
2278                                    }
2279                                    TcpStunChange::Delete(allocations) => {
2280                                        for mut allocation in allocations {
2281                                            Self::remove_allocation_resources(
2282                                                &mut allocation,
2283                                                &mut self.peer_tcp,
2284                                                &mut self.incoming_tcp_buffers,
2285                                                &mut self.protocol.pending_socket_removals,
2286                                                &mut self.protocol.pending_socket_listen_removals,
2287                                                &mut self.protocol.pending_tcp_connection_binds,
2288                                            );
2289                                        }
2290                                    }
2291                                }
2292                            }
2293                            match ret {
2294                                Err(builder) => Some(TransmitBuild::new(
2295                                    DelayedMessageOrChannelSend::Owned(builder.finish()),
2296                                    transmit.transport,
2297                                    transmit.to,
2298                                    transmit.from,
2299                                )),
2300                                Ok(Some(InternalHandleStun::Transmit(transmit))) => {
2301                                    Some(TransmitBuild::new(
2302                                        DelayedMessageOrChannelSend::Owned(transmit.data),
2303                                        transmit.transport,
2304                                        transmit.from,
2305                                        transmit.to,
2306                                    ))
2307                                }
2308                                Ok(Some(InternalHandleStun::Data(transport, from, to, range))) => {
2309                                    Some(TransmitBuild::new(
2310                                        DelayedMessageOrChannelSend::Range(transmit.data, range),
2311                                        transport,
2312                                        from,
2313                                        to,
2314                                    ))
2315                                }
2316                                Ok(None) => None,
2317                            }
2318                        }
2319                        Some(IncomingTcp::CompleteChannel(transmit, range)) => {
2320                            let Ok(channel) =
2321                                ChannelData::parse(&transmit.data.as_ref()[range.start..range.end])
2322                            else {
2323                                return None;
2324                            };
2325                            let ForwardChannelData {
2326                                transport,
2327                                from,
2328                                to,
2329                            } = self.protocol.handle_channel(
2330                                transmit.transport,
2331                                transmit.from,
2332                                transmit.to,
2333                                channel,
2334                                now,
2335                            )?;
2336                            Some(TransmitBuild::new(
2337                                DelayedMessageOrChannelSend::Range(
2338                                    transmit.data,
2339                                    4 + range.start..range.end,
2340                                ),
2341                                transport,
2342                                from,
2343                                to,
2344                            ))
2345                        }
2346                        Some(IncomingTcp::StoredMessage(data, transmit)) => {
2347                            let mut tcp_stun_change = None;
2348                            let ret = self
2349                                .protocol
2350                                .handle_listen_tcp_stored_message(
2351                                    &self.peer_tcp,
2352                                    transmit.from,
2353                                    data,
2354                                    tcp_type,
2355                                    now,
2356                                    &mut tcp_stun_change,
2357                                )
2358                                .map(|transmit| {
2359                                    TransmitBuild::new(
2360                                        DelayedMessageOrChannelSend::Owned(transmit.data),
2361                                        transmit.transport,
2362                                        transmit.from,
2363                                        transmit.to,
2364                                    )
2365                                });
2366                            match tcp_stun_change {
2367                                Some(TcpStunChange::Control) => {
2368                                    *incoming_tcp = TcpBuffer::Control(core::mem::take(tcp_buffer));
2369                                }
2370                                Some(TcpStunChange::Data {
2371                                    client_data_addr,
2372                                    listen_addr,
2373                                    relayed_addr,
2374                                    peer_addr,
2375                                }) => {
2376                                    *incoming_tcp = TcpBuffer::Passthrough {
2377                                        relayed_addr,
2378                                        peer_addr,
2379                                        pending_data: core::mem::take(tcp_buffer).into_inner(),
2380                                    };
2381                                    self.peer_tcp
2382                                        .entry((relayed_addr, peer_addr))
2383                                        .and_modify(|peer_tcp| {
2384                                            if let PeerTcp::PendingConnectionBind {
2385                                                peer_data,
2386                                                expires_at: _,
2387                                            } = peer_tcp
2388                                            {
2389                                                *peer_tcp = PeerTcp::Passthrough {
2390                                                    client_addr: client_data_addr,
2391                                                    listen_addr,
2392                                                    pending_data: core::mem::take(peer_data),
2393                                                };
2394                                            }
2395                                        })
2396                                        .or_insert_with(|| PeerTcp::Passthrough {
2397                                            client_addr: client_data_addr,
2398                                            listen_addr,
2399                                            pending_data: Vec::new(),
2400                                        });
2401                                }
2402                                Some(TcpStunChange::Delete(allocations)) => {
2403                                    for mut allocation in allocations {
2404                                        Self::remove_allocation_resources(
2405                                            &mut allocation,
2406                                            &mut self.peer_tcp,
2407                                            &mut self.incoming_tcp_buffers,
2408                                            &mut self.protocol.pending_socket_removals,
2409                                            &mut self.protocol.pending_socket_listen_removals,
2410                                            &mut self.protocol.pending_tcp_connection_binds,
2411                                        );
2412                                    }
2413                                }
2414                                None => (),
2415                            }
2416                            ret
2417                        }
2418                        Some(IncomingTcp::StoredChannel(data, transmit)) => {
2419                            let Ok(channel) = ChannelData::parse(&data) else {
2420                                return None;
2421                            };
2422                            let ForwardChannelData {
2423                                transport,
2424                                from,
2425                                to,
2426                            } = self.protocol.handle_channel(
2427                                transmit.transport,
2428                                transmit.from,
2429                                transmit.to,
2430                                channel,
2431                                now,
2432                            )?;
2433                            Some(TransmitBuild::new(
2434                                DelayedMessageOrChannelSend::Owned(data[4..].to_vec()),
2435                                transport,
2436                                from,
2437                                to,
2438                            ))
2439                        }
2440                    }
2441                }
2442                TransportType::Udp => match Message::from_bytes(transmit.data.as_ref()) {
2443                    Ok(msg) => {
2444                        let msg_transmit =
2445                            Transmit::new(&msg, transmit.transport, transmit.from, transmit.to);
2446                        let mut change = None;
2447                        let ret = match self.protocol.handle_stun(
2448                            &self.peer_tcp,
2449                            msg_transmit,
2450                            TcpStunType::Control,
2451                            now,
2452                            &mut change,
2453                        ) {
2454                            Err(builder) => Some(TransmitBuild::new(
2455                                DelayedMessageOrChannelSend::Owned(builder.finish()),
2456                                transmit.transport,
2457                                transmit.to,
2458                                transmit.from,
2459                            )),
2460                            Ok(Some(InternalHandleStun::Transmit(transmit))) => {
2461                                Some(TransmitBuild::new(
2462                                    DelayedMessageOrChannelSend::Owned(transmit.data),
2463                                    transmit.transport,
2464                                    transmit.from,
2465                                    transmit.to,
2466                                ))
2467                            }
2468                            Ok(Some(InternalHandleStun::Data(transport, from, to, range))) => {
2469                                Some(TransmitBuild::new(
2470                                    DelayedMessageOrChannelSend::Range(transmit.data, range),
2471                                    transport,
2472                                    from,
2473                                    to,
2474                                ))
2475                            }
2476                            Ok(None) => None,
2477                        };
2478                        if let Some(TcpStunChange::Delete(allocations)) = change {
2479                            for mut allocation in allocations {
2480                                Self::remove_allocation_resources(
2481                                    &mut allocation,
2482                                    &mut self.peer_tcp,
2483                                    &mut self.incoming_tcp_buffers,
2484                                    &mut self.protocol.pending_socket_removals,
2485                                    &mut self.protocol.pending_socket_listen_removals,
2486                                    &mut self.protocol.pending_tcp_connection_binds,
2487                                );
2488                            }
2489                        }
2490                        ret
2491                    }
2492                    Err(_) => {
2493                        let Ok(channel) = ChannelData::parse(transmit.data.as_ref()) else {
2494                            return None;
2495                        };
2496                        let ForwardChannelData {
2497                            transport,
2498                            from,
2499                            to,
2500                        } = self.protocol.handle_channel(
2501                            transmit.transport,
2502                            transmit.from,
2503                            transmit.to,
2504                            channel,
2505                            now,
2506                        )?;
2507                        let channel_len = channel.data().len();
2508                        Some(TransmitBuild::new(
2509                            DelayedMessageOrChannelSend::Range(transmit.data, 4..4 + channel_len),
2510                            transport,
2511                            from,
2512                            to,
2513                        ))
2514                    }
2515                },
2516            }
2517        } else {
2518            None
2519        }
2520    }
2521
2522    #[tracing::instrument(level = "debug", name = "turn_server_poll", skip(self), ret)]
2523    fn poll(&mut self, now: Instant) -> TurnServerPollRet {
2524        let mut lowest_wait = now + Duration::from_secs(3600);
2525        for pending in self.protocol.pending_allocates.iter_mut() {
2526            if let Some(family) = pending.to_ask_families.pop() {
2527                return TurnServerPollRet::AllocateSocket {
2528                    transport: pending.client.transport,
2529                    listen_addr: pending.client.local_addr,
2530                    client_addr: pending.client.remote_addr,
2531                    allocation_transport: pending.allocation_transport,
2532                    family,
2533                };
2534            }
2535        }
2536
2537        let mut remove_peer_connections = vec![];
2538        let mut remove_client_connections = vec![];
2539        let mut remove_clients = vec![];
2540        for (client_idx, client) in self.protocol.clients.iter_mut().enumerate() {
2541            let mut remove_allocation_indices = vec![];
2542            for (alloc_idx, allocation) in client.allocations.iter_mut().enumerate() {
2543                let mut remove_permission = vec![];
2544                if allocation.expires_at < now {
2545                    remove_allocation_indices.push(alloc_idx);
2546                    // removal of allocation resources will be performed after this loop
2547                    continue;
2548                } else {
2549                    allocation
2550                        .channels
2551                        .retain(|channel| channel.expires_at >= now);
2552                    for (permission_idx, permission) in
2553                        allocation.permissions.iter_mut().enumerate()
2554                    {
2555                        if permission.expires_at < now {
2556                            remove_permission.push(permission_idx);
2557                        } else {
2558                            lowest_wait = lowest_wait.min(permission.expires_at);
2559                        }
2560                    }
2561                    lowest_wait = lowest_wait.min(allocation.expires_at);
2562                }
2563
2564                let mut remove_pending_tcp = vec![];
2565                for (pending_idx, pending) in allocation.pending_tcp_connect.iter_mut().enumerate()
2566                {
2567                    if let Some(expires_at) = pending.expires_at {
2568                        if expires_at >= now {
2569                            remove_pending_tcp.push(pending_idx);
2570                            let response = pending.as_timeout_or_failure_response(
2571                                &mut self.protocol.auth,
2572                                pending.client_control_addr,
2573                            );
2574                            self.protocol.pending_transmits.push_back(Transmit::new(
2575                                response.finish(),
2576                                TransportType::Tcp,
2577                                pending.listen_addr,
2578                                pending.client_control_addr,
2579                            ));
2580                            lowest_wait = now;
2581                        }
2582                    } else {
2583                        pending.expires_at = Some(now + TCP_PEER_CONNECTION_TIMEOUT);
2584                        return TurnServerPollRet::TcpConnect {
2585                            relayed_addr: allocation.addr,
2586                            peer_addr: pending.peer_addr,
2587                            listen_addr: client.local_addr,
2588                            client_addr: client.remote_addr,
2589                        };
2590                    }
2591                }
2592                for (idx, permission_idx) in remove_permission.into_iter().enumerate() {
2593                    let permission = allocation.permissions.remove(permission_idx - idx);
2594                    self.peer_tcp
2595                        .retain(|&(relayed_addr, peer_addr), peer_tcp| {
2596                            if peer_addr.ip() == permission.addr {
2597                                remove_peer_connections.push((relayed_addr, peer_addr));
2598                                self.protocol
2599                                    .pending_socket_removals
2600                                    .push_back(Socket5Tuple {
2601                                        transport: TransportType::Tcp,
2602                                        local_addr: relayed_addr,
2603                                        remote_addr: peer_addr,
2604                                    });
2605                                if let PeerTcp::Passthrough {
2606                                    client_addr,
2607                                    listen_addr,
2608                                    pending_data: _,
2609                                } = peer_tcp
2610                                {
2611                                    remove_client_connections.push((*client_addr, *listen_addr));
2612                                }
2613                                false
2614                            } else {
2615                                true
2616                            }
2617                        });
2618                }
2619                for (idx, pending_idx) in remove_pending_tcp.into_iter().enumerate() {
2620                    let pending = allocation.pending_tcp_connect.remove(pending_idx - idx);
2621                    self.incoming_tcp_buffers
2622                        .retain(|&(client_addr, listen_addr), _tcp_buffer| {
2623                            pending.client_control_addr != client_addr
2624                                && pending.listen_addr == listen_addr
2625                        });
2626                    self.peer_tcp.retain(|&(alloc_addr, peer_addr), _tcp| {
2627                        pending.relayed_addr != alloc_addr && pending.peer_addr == peer_addr
2628                    });
2629                    if pending.expires_at.is_some() {
2630                        self.protocol
2631                            .pending_socket_removals
2632                            .push_back(Socket5Tuple {
2633                                transport: TransportType::Tcp,
2634                                local_addr: pending.relayed_addr,
2635                                remote_addr: pending.peer_addr,
2636                            });
2637                    }
2638                }
2639            }
2640
2641            for (idx, allocation_idx) in remove_allocation_indices.into_iter().enumerate() {
2642                let mut allocation = client.allocations.remove(allocation_idx - idx);
2643                Self::remove_allocation_resources(
2644                    &mut allocation,
2645                    &mut self.peer_tcp,
2646                    &mut self.incoming_tcp_buffers,
2647                    &mut self.protocol.pending_socket_removals,
2648                    &mut self.protocol.pending_socket_listen_removals,
2649                    &mut self.protocol.pending_tcp_connection_binds,
2650                )
2651            }
2652            if client.allocations.is_empty() {
2653                remove_clients.push(client_idx);
2654            }
2655        }
2656        for (idx, client_idx) in remove_clients.into_iter().enumerate() {
2657            let client = self.protocol.clients.remove(client_idx - idx);
2658            self.protocol.auth.remove_client(client.remote_addr);
2659        }
2660
2661        for (key, value) in self.peer_tcp.iter_mut() {
2662            if let PeerTcp::PendingConnectionBind {
2663                peer_data: _,
2664                expires_at,
2665            } = value
2666            {
2667                if *expires_at < now {
2668                    remove_peer_connections.push(*key);
2669                }
2670            }
2671        }
2672        Self::remove_tcp_resources(
2673            remove_peer_connections,
2674            remove_client_connections,
2675            &mut self.incoming_tcp_buffers,
2676            &mut self.protocol.pending_socket_removals,
2677            &mut self.protocol.pending_tcp_connection_binds,
2678        );
2679
2680        if let Some(remove) = self.protocol.pending_socket_removals.pop_front() {
2681            return TurnServerPollRet::TcpClose {
2682                local_addr: remove.local_addr,
2683                remote_addr: remove.remote_addr,
2684            };
2685        }
2686
2687        if let Some((transport, listen_addr)) =
2688            self.protocol.pending_socket_listen_removals.pop_front()
2689        {
2690            return TurnServerPollRet::SocketClose {
2691                transport,
2692                listen_addr,
2693            };
2694        }
2695
2696        TurnServerPollRet::WaitUntil(lowest_wait.max(now))
2697    }
2698
2699    #[tracing::instrument(name = "turn_server_poll_transmit", skip(self))]
2700    fn poll_transmit(&mut self, now: Instant) -> Option<Transmit<Vec<u8>>> {
2701        if let Some(transmit) = self.protocol.pending_transmits.pop_back() {
2702            return Some(transmit);
2703        }
2704        if self.protocol.stun.transport() != TransportType::Tcp {
2705            return None;
2706        }
2707        let mut removed_allocations = vec![];
2708        for (&(remote_addr, local_addr), incoming_tcp) in self.incoming_tcp_buffers.iter_mut() {
2709            let (tcp_type, tcp_buffer) = match incoming_tcp {
2710                TcpBuffer::Unknown(tcp_buffer) => (TcpStunType::Unknown, tcp_buffer),
2711                TcpBuffer::Control(tcp_buffer) => (TcpStunType::Control, tcp_buffer),
2712                TcpBuffer::Passthrough {
2713                    relayed_addr,
2714                    peer_addr,
2715                    pending_data,
2716                } => {
2717                    if pending_data.is_empty() {
2718                        continue;
2719                    } else {
2720                        let peer_data = core::mem::take(pending_data);
2721                        return Some(Transmit::new(
2722                            peer_data,
2723                            TransportType::Tcp,
2724                            *relayed_addr,
2725                            *peer_addr,
2726                        ));
2727                    }
2728                }
2729            };
2730
2731            let ret = match tcp_buffer.poll_recv() {
2732                Some(StoredTcp::Message(msg)) => {
2733                    let mut tcp_stun_change = None;
2734                    let ret = self.protocol.handle_listen_tcp_stored_message(
2735                        &self.peer_tcp,
2736                        remote_addr,
2737                        msg,
2738                        tcp_type,
2739                        now,
2740                        &mut tcp_stun_change,
2741                    );
2742                    match tcp_stun_change {
2743                        Some(TcpStunChange::Control) => {
2744                            *incoming_tcp = TcpBuffer::Control(core::mem::take(tcp_buffer));
2745                        }
2746                        Some(TcpStunChange::Data {
2747                            client_data_addr,
2748                            listen_addr,
2749                            relayed_addr,
2750                            peer_addr,
2751                        }) => {
2752                            *incoming_tcp = TcpBuffer::Passthrough {
2753                                relayed_addr,
2754                                peer_addr,
2755                                pending_data: core::mem::take(tcp_buffer).into_inner(),
2756                            };
2757                            self.peer_tcp
2758                                .entry((relayed_addr, peer_addr))
2759                                .and_modify(|peer_tcp| {
2760                                    if let PeerTcp::PendingConnectionBind {
2761                                        peer_data,
2762                                        expires_at: _,
2763                                    } = peer_tcp
2764                                    {
2765                                        *peer_tcp = PeerTcp::Passthrough {
2766                                            client_addr: client_data_addr,
2767                                            listen_addr,
2768                                            pending_data: core::mem::take(peer_data),
2769                                        };
2770                                    }
2771                                })
2772                                .or_insert_with(|| PeerTcp::Passthrough {
2773                                    client_addr: client_data_addr,
2774                                    listen_addr,
2775                                    pending_data: Vec::new(),
2776                                });
2777                        }
2778                        Some(TcpStunChange::Delete(allocations)) => {
2779                            removed_allocations.extend(allocations);
2780                        }
2781                        None => (),
2782                    }
2783                    ret
2784                }
2785                Some(StoredTcp::Channel(channel)) => {
2786                    let Ok(channel) = ChannelData::parse(&channel) else {
2787                        continue;
2788                    };
2789                    let ForwardChannelData {
2790                        transport,
2791                        from,
2792                        to,
2793                    } = self.protocol.handle_channel(
2794                        TransportType::Tcp,
2795                        remote_addr,
2796                        local_addr,
2797                        channel,
2798                        now,
2799                    )?;
2800                    Some(Transmit::new(channel.data().to_vec(), transport, from, to))
2801                }
2802                None => continue,
2803            };
2804            if ret.is_some() {
2805                return ret;
2806            }
2807        }
2808
2809        for mut allocation in removed_allocations {
2810            Self::remove_allocation_resources(
2811                &mut allocation,
2812                &mut self.peer_tcp,
2813                &mut self.incoming_tcp_buffers,
2814                &mut self.protocol.pending_socket_removals,
2815                &mut self.protocol.pending_socket_listen_removals,
2816                &mut self.protocol.pending_tcp_connection_binds,
2817            );
2818        }
2819
2820        for ((_relayed_addr, _peer_addr), peer_tcp) in self.peer_tcp.iter_mut() {
2821            if let PeerTcp::Passthrough {
2822                client_addr,
2823                listen_addr,
2824                pending_data,
2825            } = peer_tcp
2826            {
2827                if !pending_data.is_empty() {
2828                    return Some(Transmit::new(
2829                        core::mem::take(pending_data),
2830                        TransportType::Tcp,
2831                        *listen_addr,
2832                        *client_addr,
2833                    ));
2834                }
2835            }
2836        }
2837        None
2838    }
2839
2840    #[tracing::instrument(name = "turn_server_allocated_socket", skip(self))]
2841    fn allocated_socket(
2842        &mut self,
2843        transport: TransportType,
2844        local_addr: SocketAddr,
2845        remote_addr: SocketAddr,
2846        allocation_transport: TransportType,
2847        family: AddressFamily,
2848        socket_addr: Result<SocketAddr, SocketAllocateError>,
2849        now: Instant,
2850    ) {
2851        let Some(position) = self.protocol.pending_allocates.iter().position(|pending| {
2852            pending.client.transport == transport
2853                && pending.client.local_addr == local_addr
2854                && pending.client.remote_addr == remote_addr
2855                && pending.allocation_transport == allocation_transport
2856                && pending.pending_families.contains(&family)
2857        }) else {
2858            warn!("No pending allocation for transport: {transport}, local: {local_addr:?}, remote {remote_addr:?}");
2859            return;
2860        };
2861        info!("pending allocation for transport: {transport}, local: {local_addr:?}, remote {remote_addr:?} family {family} resulted in Udp {socket_addr:?}");
2862        let pending = &mut self.protocol.pending_allocates[position];
2863        pending.pending_sockets.push((family, socket_addr));
2864        pending.pending_families.retain(|fam| *fam != family);
2865        if !pending.pending_families.is_empty() || !pending.to_ask_families.is_empty() {
2866            trace!(
2867                "Still waiting for more allocation results before sending a reply to the client"
2868            );
2869            return;
2870        }
2871
2872        let mut pending = self.protocol.pending_allocates.remove(position).unwrap();
2873        let transaction_id = pending.transaction_id;
2874        let to = pending.client.remote_addr;
2875        let lifetime_seconds = pending
2876            .requested_lifetime
2877            .unwrap_or(DEFAULT_ALLOCATION_DURATION.as_secs() as u32)
2878            .clamp(
2879                DEFAULT_ALLOCATION_DURATION.as_secs() as u32,
2880                MAXIMUM_ALLOCATION_DURATION.as_secs() as u32,
2881            );
2882
2883        let is_all_error = pending.pending_sockets.iter().all(|addr| addr.1.is_err());
2884        let n_pending_sockets = pending.pending_sockets.len();
2885
2886        let mut builder = Message::builder(
2887            MessageType::from_class_method(
2888                if is_all_error {
2889                    MessageClass::Error
2890                } else {
2891                    MessageClass::Success
2892                },
2893                ALLOCATE,
2894            ),
2895            transaction_id,
2896            MessageWriteVec::with_capacity(80),
2897        );
2898
2899        if is_all_error && pending.pending_sockets.len() > 1 {
2900            trace!("Returning insufficient capacity");
2901            // RFC8656 ADDITIONAL-ADDRESS-FAMILY path
2902            let error = ErrorCode::builder(ErrorCode::INSUFFICIENT_CAPACITY)
2903                .build()
2904                .unwrap();
2905            builder.add_attribute(&error).unwrap();
2906        } else {
2907            for (family, socket_addr) in pending.pending_sockets {
2908                match socket_addr {
2909                    Ok(addr) => {
2910                        pending.client.allocations.push(Allocation {
2911                            addr,
2912                            ttype: allocation_transport,
2913                            expires_at: now + Duration::from_secs(lifetime_seconds as u64),
2914                            permissions: vec![],
2915                            channels: vec![],
2916                            pending_tcp_connect: Vec::new(),
2917                        });
2918                        let relayed_address = XorRelayedAddress::new(addr, transaction_id);
2919                        builder.add_attribute(&relayed_address).unwrap();
2920                        let lifetime = Lifetime::new(lifetime_seconds);
2921                        builder.add_attribute(&lifetime).unwrap();
2922                        // TODO RESERVATION-TOKEN
2923                        let mapped_address =
2924                            XorMappedAddress::new(pending.client.remote_addr, transaction_id);
2925                        builder.add_attribute(&mapped_address).unwrap();
2926                    }
2927                    Err(e) => {
2928                        if n_pending_sockets > 1 {
2929                            // RFC8656 ADDITIONAL-ADDRESS-FAMILY path when at least one socket
2930                            // allocate succeeds
2931                            let error = AddressErrorCode::new(
2932                                family,
2933                                ErrorCode::builder(e.into_error_code()).build().unwrap(),
2934                            );
2935                            builder.add_attribute(&error).unwrap();
2936                        } else {
2937                            let error = ErrorCode::builder(e.into_error_code()).build().unwrap();
2938                            builder.add_attribute(&error).unwrap();
2939                        }
2940                    }
2941                }
2942            }
2943        }
2944        let builder = self
2945            .protocol
2946            .auth
2947            .sign_outgoing_message(builder, to)
2948            .unwrap();
2949        let msg = builder.finish();
2950
2951        let Ok(transmit) = self.protocol.stun.send(msg, to, now) else {
2952            unreachable!();
2953        };
2954        if socket_addr.is_ok() {
2955            self.protocol.clients.push(pending.client);
2956        }
2957        self.protocol.pending_transmits.push_back(transmit);
2958    }
2959
2960    fn tcp_connected(
2961        &mut self,
2962        relayed_addr: SocketAddr,
2963        peer_addr: SocketAddr,
2964        listen_addr: SocketAddr,
2965        client_addr: SocketAddr,
2966        socket_addr: Result<SocketAddr, TcpConnectError>,
2967        now: Instant,
2968    ) {
2969        let connection_id = self.protocol.tcp_connection_id;
2970        let Some(client) =
2971            self.protocol
2972                .mut_client_from_5tuple(TransportType::Tcp, listen_addr, client_addr)
2973        else {
2974            warn!("No client for transport: TCP, local: {listen_addr}, remote {client_addr}. Ignoring TCP connect");
2975            return;
2976        };
2977        let Some(allocation) = client.allocations.iter_mut().find(|allocation| {
2978            allocation.ttype == TransportType::Tcp
2979                && allocation.addr == relayed_addr
2980                && allocation.have_permission(peer_addr.ip(), now).is_some()
2981        }) else {
2982            warn!("No TCP allocation for TCP, relayed: {relayed_addr}, peer {peer_addr}");
2983            return;
2984        };
2985        let Some((position, _pending)) = allocation
2986            .pending_tcp_connect
2987            .iter_mut()
2988            .enumerate()
2989            .find(|(_idx, pending)| {
2990                pending.client_control_addr == client_addr
2991                    && pending.listen_addr == listen_addr
2992                    && pending.relayed_addr == relayed_addr
2993                    && pending.peer_addr == peer_addr
2994            })
2995        else {
2996            warn!("No outstanding TCP connect for relayed: {relayed_addr}, peer {peer_addr}");
2997            return;
2998        };
2999        let pending = allocation.pending_tcp_connect.swap_remove(position);
3000        if pending
3001            .expires_at
3002            .is_some_and(|expires_at| expires_at < now)
3003        {
3004            info!("Pending TCP connect has expired for relayed {relayed_addr}, peer {peer_addr}");
3005            return;
3006        }
3007
3008        let response = match socket_addr {
3009            Ok(socket_addr) => match self.peer_tcp.entry((socket_addr, peer_addr)) {
3010                alloc::collections::btree_map::Entry::Occupied(_) => {
3011                    let mut response = Message::builder(
3012                        MessageType::from_class_method(MessageClass::Error, CONNECT),
3013                        pending.transaction_id,
3014                        MessageWriteVec::new(),
3015                    );
3016                    response
3017                        .add_attribute(
3018                            &ErrorCode::builder(ErrorCode::CONNECTION_ALREADY_EXISTS)
3019                                .build()
3020                                .unwrap(),
3021                        )
3022                        .unwrap();
3023                    response
3024                }
3025                alloc::collections::btree_map::Entry::Vacant(vacant) => {
3026                    let mut response = Message::builder(
3027                        MessageType::from_class_method(MessageClass::Success, CONNECT),
3028                        pending.transaction_id,
3029                        MessageWriteVec::new(),
3030                    );
3031                    response
3032                        .add_attribute(&ConnectionId::new(connection_id))
3033                        .unwrap();
3034                    vacant.insert(PeerTcp::PendingConnectionBind {
3035                        peer_data: vec![],
3036                        expires_at: now + TCP_PEER_CONNECTION_TIMEOUT,
3037                    });
3038                    response
3039                }
3040            },
3041            Err(e) => {
3042                let mut response = Message::builder(
3043                    MessageType::from_class_method(MessageClass::Error, CONNECT),
3044                    pending.transaction_id,
3045                    MessageWriteVec::new(),
3046                );
3047                response
3048                    .add_attribute(&ErrorCode::builder(e.into_error_code()).build().unwrap())
3049                    .unwrap();
3050                response
3051            }
3052        };
3053        let mut response = self
3054            .protocol
3055            .auth
3056            .sign_outgoing_message(response, client_addr)
3057            .unwrap();
3058        response.add_fingerprint().unwrap();
3059        if socket_addr.is_ok() && response.has_class(MessageClass::Success) {
3060            self.protocol.tcp_connection_id += 1;
3061            self.protocol
3062                .pending_tcp_connection_binds
3063                .push(PendingConnectionBind {
3064                    connection_id,
3065                    listen_addr,
3066                    relayed_addr,
3067                    peer_addr,
3068                    client_control_addr: pending.client_control_addr,
3069                });
3070        }
3071        self.protocol.pending_transmits.push_front(
3072            self.protocol
3073                .stun
3074                .send(response.finish(), client_addr, now)
3075                .unwrap(),
3076        );
3077    }
3078}
3079
3080#[derive(Debug)]
3081struct Client {
3082    transport: TransportType,
3083    local_addr: SocketAddr,
3084    remote_addr: SocketAddr,
3085
3086    allocations: Vec<Allocation>,
3087    username: Option<String>,
3088    userhash: Option<[u8; 32]>,
3089}
3090
3091#[derive(Debug)]
3092struct Allocation {
3093    // the peer-side address of this allocation
3094    addr: SocketAddr,
3095    ttype: TransportType,
3096
3097    expires_at: Instant,
3098
3099    permissions: Vec<Permission>,
3100    channels: Vec<Channel>,
3101
3102    pending_tcp_connect: Vec<PendingTcpConnect>,
3103}
3104
3105/// Connecting to a Peer on behalf of a client and asking the user to perform the actual TCP socket
3106/// connection.
3107#[derive(Debug)]
3108struct PendingTcpConnect {
3109    /// CONNECT request transaction ID
3110    transaction_id: TransactionId,
3111    client_control_addr: SocketAddr,
3112    listen_addr: SocketAddr,
3113    relayed_addr: SocketAddr,
3114    peer_addr: SocketAddr,
3115    expires_at: Option<Instant>,
3116}
3117
3118impl PendingTcpConnect {
3119    fn as_timeout_or_failure_response(
3120        &self,
3121        auth: &mut LongTermServerAuth,
3122        to: SocketAddr,
3123    ) -> MessageWriteVec {
3124        let error = ErrorCode::builder(ErrorCode::CONNECTION_TIMEOUT_OR_FAILURE)
3125            .build()
3126            .unwrap();
3127        let response = Message::builder(
3128            MessageType::from_class_method(MessageClass::Error, CONNECT),
3129            self.transaction_id,
3130            MessageWriteVec::with_capacity(
3131                MessageHeader::LENGTH
3132                    + error.padded_len()
3133                    + auth.message_signature_bytes(to, false)
3134                    + 8,
3135            ),
3136        );
3137        let mut response = auth.sign_outgoing_message(response, to).unwrap();
3138        response.add_fingerprint().unwrap();
3139        response
3140    }
3141}
3142
3143impl Allocation {
3144    fn permission_from_5tuple(
3145        &self,
3146        ttype: TransportType,
3147        local_addr: SocketAddr,
3148        remote_addr: SocketAddr,
3149    ) -> Option<&Permission> {
3150        if local_addr != self.addr {
3151            return None;
3152        }
3153        self.permissions
3154            .iter()
3155            .find(|permission| permission.ttype == ttype && remote_addr.ip() == permission.addr)
3156    }
3157
3158    fn channel_from_id(&self, id: u16) -> Option<&Channel> {
3159        self.channels.iter().find(|channel| channel.id == id)
3160    }
3161
3162    fn channel_from_5tuple(
3163        &self,
3164        transport: TransportType,
3165        local_addr: SocketAddr,
3166        remote_addr: SocketAddr,
3167    ) -> Option<&Channel> {
3168        if self.addr != local_addr {
3169            return None;
3170        }
3171        self.channels
3172            .iter()
3173            .find(|channel| transport == channel.peer_transport && remote_addr == channel.peer_addr)
3174    }
3175
3176    #[tracing::instrument(level = "trace", skip(self, now), fields(ttype = %self.ttype, relayed = %self.addr))]
3177    fn have_permission(&self, addr: IpAddr, now: Instant) -> Option<&Permission> {
3178        let Some(permission) = self
3179            .permissions
3180            .iter()
3181            .find(|permission| permission.addr == addr)
3182        else {
3183            trace!("no permission available");
3184            // no permission installed for this peer, ignoring
3185            return None;
3186        };
3187        if now > permission.expires_at {
3188            trace!("permission has expired");
3189            return None;
3190        }
3191        debug!("have permission");
3192        Some(permission)
3193    }
3194}
3195
3196#[derive(Debug)]
3197struct Permission {
3198    addr: IpAddr,
3199    ttype: TransportType,
3200
3201    expires_at: Instant,
3202}
3203
3204#[derive(Debug)]
3205struct Channel {
3206    id: u16,
3207    peer_addr: SocketAddr,
3208    peer_transport: TransportType,
3209
3210    expires_at: Instant,
3211}
3212
3213enum InternalHandleStun {
3214    Transmit(Transmit<Vec<u8>>),
3215    Data(
3216        TransportType,
3217        SocketAddr,
3218        SocketAddr,
3219        core::ops::Range<usize>,
3220    ),
3221}
3222
3223#[cfg(test)]
3224mod tests {
3225    use alloc::string::{String, ToString};
3226    use turn_types::{
3227        prelude::DelayedTransmitBuild,
3228        stun::message::{IntegrityAlgorithm, LongTermKeyCredentials, Method},
3229        TurnCredentials,
3230    };
3231
3232    use super::*;
3233
3234    fn listen_address() -> SocketAddr {
3235        "127.0.0.1:3478".parse().unwrap()
3236    }
3237
3238    fn client_address() -> SocketAddr {
3239        "127.0.0.1:1000".parse().unwrap()
3240    }
3241
3242    fn relayed_address() -> SocketAddr {
3243        "10.0.0.1:2222".parse().unwrap()
3244    }
3245
3246    fn ipv6_relayed_address() -> SocketAddr {
3247        "[fda9:8765:4321:1::1]:2222".parse().unwrap()
3248    }
3249
3250    fn peer_address() -> SocketAddr {
3251        "10.0.0.2:44444".parse().unwrap()
3252    }
3253
3254    fn ipv6_peer_address() -> SocketAddr {
3255        "[fd12:3456:789a:1::1]:44444".parse().unwrap()
3256    }
3257
3258    fn credentials() -> TurnCredentials {
3259        TurnCredentials::new("tuser", "tpass")
3260    }
3261
3262    fn new_server(transport: TransportType) -> TurnServer {
3263        let mut server = TurnServer::new(transport, listen_address(), "realm".to_string());
3264        let credentials = credentials();
3265        server.add_user(
3266            credentials.username().to_string(),
3267            credentials.password().to_string(),
3268        );
3269        server
3270    }
3271
3272    fn client_transmit_from<T: AsRef<[u8]> + core::fmt::Debug>(
3273        data: T,
3274        transport: TransportType,
3275        from: SocketAddr,
3276    ) -> Transmit<T> {
3277        Transmit::new(data, transport, from, listen_address())
3278    }
3279
3280    fn client_transmit<T: AsRef<[u8]> + core::fmt::Debug>(
3281        data: T,
3282        transport: TransportType,
3283    ) -> Transmit<T> {
3284        client_transmit_from(data, transport, client_address())
3285    }
3286
3287    #[test]
3288    fn test_server_stun_binding() {
3289        let _init = crate::tests::test_init_log();
3290        let now = Instant::ZERO;
3291        let mut server = new_server(TransportType::Udp);
3292        let (_realm, _nonce) = initial_allocate(&mut server, now);
3293        let reply = server
3294            .recv(
3295                client_transmit(
3296                    {
3297                        let binding = Message::builder_request(BINDING, MessageWriteVec::new());
3298                        binding.finish()
3299                    },
3300                    server.transport(),
3301                ),
3302                now,
3303            )
3304            .unwrap();
3305        let reply = reply.build();
3306        let msg = Message::from_bytes(&reply.data).unwrap();
3307        assert!(msg.has_method(BINDING));
3308        assert!(msg.has_class(MessageClass::Success));
3309        assert_eq!(
3310            msg.attribute::<XorMappedAddress>()
3311                .unwrap()
3312                .addr(msg.transaction_id()),
3313            client_address()
3314        );
3315    }
3316
3317    fn initial_allocate_msg() -> Vec<u8> {
3318        let allocate = Message::builder_request(ALLOCATE, MessageWriteVec::new());
3319        allocate.finish()
3320    }
3321
3322    fn validate_unsigned_error_reply(msg: &[u8], method: Method, code: u16) -> Message<'_> {
3323        let msg = Message::from_bytes(msg).unwrap();
3324        assert!(msg.has_method(method));
3325        assert!(msg.has_class(MessageClass::Error));
3326        let err = msg.attribute::<ErrorCode>().unwrap();
3327        assert_eq!(err.code(), code);
3328        msg
3329    }
3330
3331    fn validate_signed_error_reply(
3332        msg: &[u8],
3333        method: Method,
3334        code: u16,
3335        credentials: LongTermKeyCredentials,
3336    ) -> Message<'_> {
3337        let msg = Message::from_bytes(msg).unwrap();
3338        assert!(msg.has_method(method));
3339        assert!(msg.has_class(MessageClass::Error));
3340        let err = msg.attribute::<ErrorCode>().unwrap();
3341        assert_eq!(err.code(), code);
3342        msg.validate_integrity(&credentials.into()).unwrap();
3343        msg
3344    }
3345
3346    fn validate_initial_allocate_reply(msg: &[u8]) -> (String, String) {
3347        let msg = validate_unsigned_error_reply(msg, ALLOCATE, ErrorCode::UNAUTHORIZED);
3348        let realm = msg.attribute::<Realm>().unwrap();
3349        let nonce = msg.attribute::<Nonce>().unwrap();
3350        (realm.realm().to_string(), nonce.nonce().to_string())
3351    }
3352
3353    #[test]
3354    fn test_server_initial_allocate_unauthorized_reply() {
3355        let _init = crate::tests::test_init_log();
3356        let now = Instant::ZERO;
3357        let mut server = new_server(TransportType::Udp);
3358        let reply = server
3359            .recv(
3360                client_transmit(initial_allocate_msg(), server.transport()),
3361                now,
3362            )
3363            .unwrap();
3364        validate_initial_allocate_reply(&reply.build().data);
3365    }
3366
3367    #[test]
3368    fn test_server_duplicate_initial_allocate_unauthorized_reply() {
3369        let _init = crate::tests::test_init_log();
3370        let now = Instant::ZERO;
3371        let mut server = new_server(TransportType::Udp);
3372        let reply = server
3373            .recv(
3374                client_transmit(initial_allocate_msg(), server.transport()),
3375                now,
3376            )
3377            .unwrap();
3378        let (realm, nonce) = validate_initial_allocate_reply(&reply.build().data);
3379        let reply = server
3380            .recv(
3381                client_transmit(initial_allocate_msg(), server.transport()),
3382                now,
3383            )
3384            .unwrap();
3385        let (realm2, nonce2) = validate_initial_allocate_reply(&reply.build().data);
3386        assert_eq!(nonce, nonce2);
3387        assert_eq!(realm, realm2);
3388    }
3389
3390    fn initial_allocate(server: &mut TurnServer, now: Instant) -> (String, String) {
3391        let reply = server
3392            .recv(
3393                client_transmit(initial_allocate_msg(), server.transport()),
3394                now,
3395            )
3396            .unwrap();
3397        validate_initial_allocate_reply(&reply.build().data)
3398    }
3399
3400    #[test]
3401    fn test_server_authenticated_allocate_missing_attributes() {
3402        let _init = crate::tests::test_init_log();
3403        let now = Instant::ZERO;
3404        let attributes = [
3405            Nonce::TYPE,
3406            Realm::TYPE,
3407            Username::TYPE,
3408            RequestedTransport::TYPE,
3409        ];
3410        for attr in attributes {
3411            let mut server = new_server(TransportType::Udp);
3412            let (realm, nonce) = initial_allocate(&mut server, now);
3413            let creds = credentials().into_long_term_credentials(&realm);
3414            let mut allocate = Message::builder_request(ALLOCATE, MessageWriteVec::new());
3415            if attr != Nonce::TYPE {
3416                allocate
3417                    .add_attribute(&Nonce::new(&nonce).unwrap())
3418                    .unwrap();
3419            }
3420            if attr != Realm::TYPE {
3421                allocate
3422                    .add_attribute(&Realm::new(&realm).unwrap())
3423                    .unwrap();
3424            }
3425            if attr != Username::TYPE {
3426                allocate
3427                    .add_attribute(&Username::new(creds.username()).unwrap())
3428                    .unwrap();
3429            }
3430            if attr != RequestedTransport::TYPE {
3431                allocate
3432                    .add_attribute(&RequestedTransport::new(RequestedTransport::UDP))
3433                    .unwrap();
3434            }
3435            allocate
3436                .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
3437                .unwrap();
3438            let reply = server
3439                .recv(client_transmit(allocate.finish(), server.transport()), now)
3440                .unwrap();
3441            if attr != RequestedTransport::TYPE {
3442                validate_unsigned_error_reply(
3443                    &reply.build().data,
3444                    ALLOCATE,
3445                    ErrorCode::BAD_REQUEST,
3446                );
3447            } else {
3448                validate_signed_error_reply(
3449                    &reply.build().data,
3450                    ALLOCATE,
3451                    ErrorCode::BAD_REQUEST,
3452                    creds,
3453                );
3454            }
3455        }
3456    }
3457
3458    fn add_authenticated_request_required_attributes(
3459        msg: &mut MessageWriteVec,
3460        credentials: LongTermKeyCredentials,
3461        nonce: &str,
3462    ) {
3463        msg.add_attribute(&Nonce::new(nonce).unwrap()).unwrap();
3464        msg.add_attribute(&Realm::new(credentials.realm()).unwrap())
3465            .unwrap();
3466        msg.add_attribute(&Username::new(credentials.username()).unwrap())
3467            .unwrap();
3468    }
3469
3470    fn authenticated_allocate_msg(
3471        credentials: LongTermKeyCredentials,
3472        nonce: &str,
3473        transport: u8,
3474        families: &[(AddressFamily, Result<SocketAddr, SocketAllocateError>)],
3475    ) -> Vec<u8> {
3476        let mut allocate = Message::builder_request(ALLOCATE, MessageWriteVec::new());
3477        add_authenticated_request_required_attributes(&mut allocate, credentials.clone(), nonce);
3478        allocate
3479            .add_attribute(&RequestedTransport::new(transport))
3480            .unwrap();
3481        if families.len() > 1 {
3482            for (family, _) in families {
3483                if *family != AddressFamily::IPV4 {
3484                    allocate
3485                        .add_attribute(&AdditionalAddressFamily::new(*family))
3486                        .unwrap();
3487                }
3488            }
3489        } else if families[0].0 != AddressFamily::IPV4 {
3490            allocate
3491                .add_attribute(&RequestedAddressFamily::new(families[0].0))
3492                .unwrap();
3493        }
3494        allocate
3495            .add_message_integrity(&credentials.into(), IntegrityAlgorithm::Sha1)
3496            .unwrap();
3497        allocate.finish()
3498    }
3499
3500    fn authenticated_allocate_reply(
3501        server: &mut TurnServer,
3502        families: &[(AddressFamily, Result<SocketAddr, SocketAllocateError>)],
3503        now: Instant,
3504    ) -> Transmit<Vec<u8>> {
3505        for _ in 0..families.len() {
3506            let TurnServerPollRet::AllocateSocket {
3507                transport,
3508                listen_addr,
3509                client_addr,
3510                allocation_transport,
3511                family,
3512            } = server.poll(now)
3513            else {
3514                unreachable!();
3515            };
3516            let socket_addr = families
3517                .iter()
3518                .find_map(|(fam, socket_addr)| {
3519                    if *fam == family {
3520                        Some(*socket_addr)
3521                    } else {
3522                        None
3523                    }
3524                })
3525                .unwrap();
3526            server.allocated_socket(
3527                transport,
3528                listen_addr,
3529                client_addr,
3530                allocation_transport,
3531                family,
3532                socket_addr,
3533                now,
3534            );
3535        }
3536        server.poll_transmit(now).unwrap()
3537    }
3538
3539    fn authenticated_allocate_with_credentials_transport_families(
3540        server: &mut TurnServer,
3541        credentials: LongTermKeyCredentials,
3542        nonce: &str,
3543        from: SocketAddr,
3544        transport: u8,
3545        families: &[(AddressFamily, Result<SocketAddr, SocketAllocateError>)],
3546        now: Instant,
3547    ) -> Transmit<Vec<u8>> {
3548        let ret = server.recv(
3549            client_transmit_from(
3550                authenticated_allocate_msg(credentials.clone(), nonce, transport, families),
3551                server.transport(),
3552                from,
3553            ),
3554            now,
3555        );
3556        if let Some(transmit) = ret {
3557            return transmit.build();
3558        }
3559        authenticated_allocate_reply(server, families, now)
3560    }
3561
3562    fn authenticated_allocate_with_credentials_transport(
3563        server: &mut TurnServer,
3564        credentials: LongTermKeyCredentials,
3565        nonce: &str,
3566        transport: u8,
3567        now: Instant,
3568    ) -> Transmit<Vec<u8>> {
3569        authenticated_allocate_with_credentials_transport_families(
3570            server,
3571            credentials,
3572            nonce,
3573            client_address(),
3574            transport,
3575            &[(AddressFamily::IPV4, Ok(relayed_address()))],
3576            now,
3577        )
3578    }
3579
3580    fn authenticated_allocate_with_credentials(
3581        server: &mut TurnServer,
3582        transport: TransportType,
3583        credentials: LongTermKeyCredentials,
3584        nonce: &str,
3585        now: Instant,
3586    ) -> Transmit<Vec<u8>> {
3587        authenticated_allocate_with_credentials_transport(
3588            server,
3589            credentials,
3590            nonce,
3591            match transport {
3592                TransportType::Udp => RequestedTransport::UDP,
3593                TransportType::Tcp => RequestedTransport::TCP,
3594            },
3595            now,
3596        )
3597    }
3598
3599    #[test]
3600    fn test_server_authenticated_allocate_wrong_credentials() {
3601        let _init = crate::tests::test_init_log();
3602        let now = Instant::ZERO;
3603        let mut server = new_server(TransportType::Udp);
3604        let (realm, nonce) = initial_allocate(&mut server, now);
3605        let creds = credentials();
3606        let creds = TurnCredentials::new(creds.username(), "another-password")
3607            .into_long_term_credentials(&realm);
3608        let reply = authenticated_allocate_with_credentials(
3609            &mut server,
3610            TransportType::Udp,
3611            creds.clone(),
3612            &nonce,
3613            now,
3614        );
3615        validate_initial_allocate_reply(&reply.data);
3616
3617        let mut server = new_server(TransportType::Udp);
3618        let (realm, nonce) = initial_allocate(&mut server, now);
3619        let creds = credentials();
3620        let creds = TurnCredentials::new("another-user", creds.password())
3621            .into_long_term_credentials(&realm);
3622        let reply = authenticated_allocate_with_credentials(
3623            &mut server,
3624            TransportType::Udp,
3625            creds.clone(),
3626            &nonce,
3627            now,
3628        );
3629        validate_initial_allocate_reply(&reply.data);
3630
3631        let mut server = new_server(TransportType::Udp);
3632        let (_realm, nonce) = initial_allocate(&mut server, now);
3633        let creds = credentials();
3634        let creds = TurnCredentials::new(creds.username(), creds.password())
3635            .into_long_term_credentials("another-realm");
3636        let reply = authenticated_allocate_with_credentials(
3637            &mut server,
3638            TransportType::Udp,
3639            creds.clone(),
3640            &nonce,
3641            now,
3642        );
3643        validate_initial_allocate_reply(&reply.data);
3644    }
3645
3646    #[test]
3647    fn test_server_authenticated_allocate_without_initial() {
3648        let _init = crate::tests::test_init_log();
3649        let now = Instant::ZERO;
3650        let mut server = new_server(TransportType::Udp);
3651        let nonce = String::from("random");
3652        let creds = credentials();
3653        let creds = creds.into_long_term_credentials("realm");
3654        let reply = authenticated_allocate_with_credentials(
3655            &mut server,
3656            TransportType::Udp,
3657            creds.clone(),
3658            &nonce,
3659            now,
3660        );
3661        validate_unsigned_error_reply(&reply.data, ALLOCATE, ErrorCode::STALE_NONCE);
3662    }
3663
3664    #[test]
3665    fn test_server_authenticated_allocate_wrong_transport_type() {
3666        let _init = crate::tests::test_init_log();
3667        let now = Instant::ZERO;
3668        let mut server = new_server(TransportType::Udp);
3669        let (realm, nonce) = initial_allocate(&mut server, now);
3670        let creds = credentials().into_long_term_credentials(&realm);
3671        let reply = authenticated_allocate_with_credentials_transport(
3672            &mut server,
3673            creds.clone(),
3674            &nonce,
3675            0x0,
3676            now,
3677        );
3678        validate_signed_error_reply(
3679            &reply.data,
3680            ALLOCATE,
3681            ErrorCode::UNSUPPORTED_TRANSPORT_PROTOCOL,
3682            creds,
3683        );
3684    }
3685
3686    fn validate_signed_success(
3687        msg: &[u8],
3688        method: Method,
3689        credentials: LongTermKeyCredentials,
3690    ) -> Message<'_> {
3691        let msg = Message::from_bytes(msg).unwrap();
3692        assert!(msg.has_method(method));
3693        assert!(msg.has_class(MessageClass::Success));
3694        msg.validate_integrity(&credentials.into()).unwrap();
3695        msg
3696    }
3697
3698    fn validate_authenticated_allocate_reply(
3699        msg: &[u8],
3700        credentials: LongTermKeyCredentials,
3701    ) -> (Message<'_>, u32) {
3702        let msg = validate_signed_success(msg, ALLOCATE, credentials);
3703        let lifetime = msg.attribute::<Lifetime>().unwrap();
3704        let _xor_relayed_address = msg.attribute::<XorRelayedAddress>().unwrap();
3705        let _xor_mapped_address = msg.attribute::<XorMappedAddress>().unwrap();
3706        (msg, lifetime.seconds())
3707    }
3708
3709    #[test]
3710    fn test_server_authenticated_allocate_ipv6() {
3711        let _init = crate::tests::test_init_log();
3712        let now = Instant::ZERO;
3713        let mut server = new_server(TransportType::Udp);
3714        let (realm, nonce) = initial_allocate(&mut server, now);
3715        let creds = credentials().into_long_term_credentials(&realm);
3716        let reply = authenticated_allocate_with_credentials_transport_families(
3717            &mut server,
3718            creds.clone(),
3719            &nonce,
3720            client_address(),
3721            RequestedTransport::UDP,
3722            &[(AddressFamily::IPV6, Ok(ipv6_relayed_address()))],
3723            now,
3724        );
3725        validate_authenticated_allocate_reply(&reply.data, creds);
3726    }
3727
3728    #[test]
3729    fn test_server_authenticated_allocate_ipv6_error() {
3730        let _init = crate::tests::test_init_log();
3731        let now = Instant::ZERO;
3732        let mut server = new_server(TransportType::Udp);
3733        let (realm, nonce) = initial_allocate(&mut server, now);
3734        let creds = credentials().into_long_term_credentials(&realm);
3735        let reply = authenticated_allocate_with_credentials_transport_families(
3736            &mut server,
3737            creds.clone(),
3738            &nonce,
3739            client_address(),
3740            RequestedTransport::UDP,
3741            &[(
3742                AddressFamily::IPV6,
3743                Err(SocketAllocateError::AddressFamilyNotSupported),
3744            )],
3745            now,
3746        );
3747        validate_signed_error_reply(
3748            &reply.data,
3749            ALLOCATE,
3750            ErrorCode::ADDRESS_FAMILY_NOT_SUPPORTED,
3751            creds,
3752        );
3753    }
3754
3755    #[test]
3756    fn test_server_authenticated_allocate_dual_ipv6_error() {
3757        let _init = crate::tests::test_init_log();
3758        let now = Instant::ZERO;
3759        let mut server = new_server(TransportType::Udp);
3760        let (realm, nonce) = initial_allocate(&mut server, now);
3761        let creds = credentials().into_long_term_credentials(&realm);
3762        let reply = authenticated_allocate_with_credentials_transport_families(
3763            &mut server,
3764            creds.clone(),
3765            &nonce,
3766            client_address(),
3767            RequestedTransport::UDP,
3768            &[
3769                (
3770                    AddressFamily::IPV6,
3771                    Err(SocketAllocateError::AddressFamilyNotSupported),
3772                ),
3773                (AddressFamily::IPV4, Ok(relayed_address())),
3774            ],
3775            now,
3776        );
3777        let (msg, _lifetime) = validate_authenticated_allocate_reply(&reply.data, creds);
3778        let address_error_code = msg.attribute::<AddressErrorCode>().unwrap();
3779        assert_eq!(address_error_code.family(), AddressFamily::IPV6);
3780        assert_eq!(
3781            address_error_code.error().code(),
3782            ErrorCode::ADDRESS_FAMILY_NOT_SUPPORTED
3783        );
3784    }
3785
3786    #[test]
3787    fn test_server_authenticated_allocate_dual_ipv4_error() {
3788        let _init = crate::tests::test_init_log();
3789        let now = Instant::ZERO;
3790        let mut server = new_server(TransportType::Udp);
3791        let (realm, nonce) = initial_allocate(&mut server, now);
3792        let creds = credentials().into_long_term_credentials(&realm);
3793        let reply = authenticated_allocate_with_credentials_transport_families(
3794            &mut server,
3795            creds.clone(),
3796            &nonce,
3797            client_address(),
3798            RequestedTransport::UDP,
3799            &[
3800                (AddressFamily::IPV6, Ok(ipv6_relayed_address())),
3801                (
3802                    AddressFamily::IPV4,
3803                    Err(SocketAllocateError::AddressFamilyNotSupported),
3804                ),
3805            ],
3806            now,
3807        );
3808        let (msg, _lifetime) = validate_authenticated_allocate_reply(&reply.data, creds);
3809        let address_error_code = msg.attribute::<AddressErrorCode>().unwrap();
3810        assert_eq!(address_error_code.family(), AddressFamily::IPV4);
3811        assert_eq!(
3812            address_error_code.error().code(),
3813            ErrorCode::ADDRESS_FAMILY_NOT_SUPPORTED
3814        );
3815    }
3816
3817    #[test]
3818    fn test_server_allocation_expire() {
3819        let _init = crate::tests::test_init_log();
3820        let now = Instant::ZERO;
3821        let mut server = new_server(TransportType::Udp);
3822        let (realm, nonce) = initial_allocate(&mut server, now);
3823        let creds = credentials().into_long_term_credentials(&realm);
3824        let reply = authenticated_allocate_with_credentials(
3825            &mut server,
3826            TransportType::Udp,
3827            creds.clone(),
3828            &nonce,
3829            now,
3830        );
3831        let (_msg, lifetime) = validate_authenticated_allocate_reply(&reply.data, creds.clone());
3832        let TurnServerPollRet::WaitUntil(wait) = server.poll(now) else {
3833            unreachable!();
3834        };
3835        assert_eq!(wait, now + Duration::from_secs(lifetime as u64));
3836    }
3837
3838    fn create_permission_request(
3839        credentials: LongTermKeyCredentials,
3840        nonce: &str,
3841        peer: SocketAddr,
3842    ) -> Vec<u8> {
3843        let mut request = Message::builder_request(CREATE_PERMISSION, MessageWriteVec::new());
3844        request
3845            .add_attribute(&XorPeerAddress::new(peer, request.transaction_id()))
3846            .unwrap();
3847        add_authenticated_request_required_attributes(&mut request, credentials.clone(), nonce);
3848        request
3849            .add_message_integrity(&credentials.into(), IntegrityAlgorithm::Sha1)
3850            .unwrap();
3851        request.finish()
3852    }
3853
3854    #[test]
3855    fn test_server_create_permission_without_allocation() {
3856        let _init = crate::tests::test_init_log();
3857        let now = Instant::ZERO;
3858        let mut server = new_server(TransportType::Udp);
3859        let (realm, nonce) = initial_allocate(&mut server, now);
3860        let creds = credentials().into_long_term_credentials(&realm);
3861        let reply = server
3862            .recv(
3863                client_transmit(
3864                    create_permission_request(creds.clone(), &nonce, peer_address()),
3865                    server.transport(),
3866                ),
3867                now,
3868            )
3869            .unwrap();
3870        validate_signed_error_reply(
3871            &reply.build().data,
3872            CREATE_PERMISSION,
3873            ErrorCode::ALLOCATION_MISMATCH,
3874            creds,
3875        );
3876    }
3877
3878    #[test]
3879    fn test_server_create_permission_without_peer_address() {
3880        let _init = crate::tests::test_init_log();
3881        let now = Instant::ZERO;
3882        let mut server = new_server(TransportType::Udp);
3883        let (realm, nonce) = initial_allocate(&mut server, now);
3884        let creds = credentials().into_long_term_credentials(&realm);
3885        let reply = authenticated_allocate_with_credentials(
3886            &mut server,
3887            TransportType::Udp,
3888            creds.clone(),
3889            &nonce,
3890            now,
3891        );
3892        validate_authenticated_allocate_reply(&reply.data, creds.clone());
3893        let reply = server
3894            .recv(
3895                client_transmit(
3896                    {
3897                        let mut request =
3898                            Message::builder_request(CREATE_PERMISSION, MessageWriteVec::new());
3899                        add_authenticated_request_required_attributes(
3900                            &mut request,
3901                            creds.clone(),
3902                            &nonce,
3903                        );
3904                        request
3905                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
3906                            .unwrap();
3907                        request.finish()
3908                    },
3909                    server.transport(),
3910                ),
3911                now,
3912            )
3913            .unwrap();
3914        validate_signed_error_reply(
3915            &reply.build().data,
3916            CREATE_PERMISSION,
3917            ErrorCode::BAD_REQUEST,
3918            creds,
3919        );
3920    }
3921
3922    #[test]
3923    fn test_server_create_permission_wrong_family() {
3924        let _init = crate::tests::test_init_log();
3925        let now = Instant::ZERO;
3926        let mut server = new_server(TransportType::Udp);
3927        let (realm, nonce) = initial_allocate(&mut server, now);
3928        let creds = credentials().into_long_term_credentials(&realm);
3929        let reply = authenticated_allocate_with_credentials(
3930            &mut server,
3931            TransportType::Udp,
3932            creds.clone(),
3933            &nonce,
3934            now,
3935        );
3936        validate_authenticated_allocate_reply(&reply.data, creds.clone());
3937        let reply = server
3938            .recv(
3939                client_transmit(
3940                    create_permission_request(creds.clone(), &nonce, ipv6_peer_address()),
3941                    server.transport(),
3942                ),
3943                now,
3944            )
3945            .unwrap();
3946        validate_signed_error_reply(
3947            &reply.build().data,
3948            CREATE_PERMISSION,
3949            ErrorCode::PEER_ADDRESS_FAMILY_MISMATCH,
3950            creds,
3951        );
3952    }
3953
3954    #[test]
3955    fn test_server_create_permission_ipv4_wrong_family() {
3956        let _init = crate::tests::test_init_log();
3957        let now = Instant::ZERO;
3958        let mut server = new_server(TransportType::Udp);
3959        let (realm, nonce) = initial_allocate(&mut server, now);
3960        let creds = credentials().into_long_term_credentials(&realm);
3961        let reply = authenticated_allocate_with_credentials_transport_families(
3962            &mut server,
3963            creds.clone(),
3964            &nonce,
3965            client_address(),
3966            RequestedTransport::UDP,
3967            &[(AddressFamily::IPV6, Ok(ipv6_relayed_address()))],
3968            now,
3969        );
3970        validate_authenticated_allocate_reply(&reply.data, creds.clone());
3971        let reply = server
3972            .recv(
3973                client_transmit(
3974                    create_permission_request(creds.clone(), &nonce, peer_address()),
3975                    server.transport(),
3976                ),
3977                now,
3978            )
3979            .unwrap();
3980        validate_signed_error_reply(
3981            &reply.build().data,
3982            CREATE_PERMISSION,
3983            ErrorCode::PEER_ADDRESS_FAMILY_MISMATCH,
3984            creds,
3985        );
3986    }
3987
3988    #[test]
3989    fn test_server_create_permission_wrong_username() {
3990        let _init = crate::tests::test_init_log();
3991        let now = Instant::ZERO;
3992        let mut server = new_server(TransportType::Udp);
3993        let (realm, nonce) = initial_allocate(&mut server, now);
3994        let creds = credentials().into_long_term_credentials(&realm);
3995        server.add_user("another-user".to_string(), creds.password().to_string());
3996        let reply = authenticated_allocate_with_credentials(
3997            &mut server,
3998            TransportType::Udp,
3999            creds.clone(),
4000            &nonce,
4001            now,
4002        );
4003        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4004        let creds = TurnCredentials::new("another-user", creds.password())
4005            .into_long_term_credentials(&realm);
4006        let reply = server
4007            .recv(
4008                client_transmit(
4009                    create_permission_request(creds, &nonce, peer_address()),
4010                    server.transport(),
4011                ),
4012                now,
4013            )
4014            .unwrap();
4015        validate_unsigned_error_reply(
4016            &reply.build().data,
4017            CREATE_PERMISSION,
4018            ErrorCode::WRONG_CREDENTIALS,
4019        );
4020    }
4021
4022    #[test]
4023    fn test_server_create_permission_malformed_peer_address() {
4024        let _init = crate::tests::test_init_log();
4025        let now = Instant::ZERO;
4026        let mut server = new_server(TransportType::Udp);
4027        let (realm, nonce) = initial_allocate(&mut server, now);
4028        let creds = credentials().into_long_term_credentials(&realm);
4029        let reply = authenticated_allocate_with_credentials(
4030            &mut server,
4031            TransportType::Udp,
4032            creds.clone(),
4033            &nonce,
4034            now,
4035        );
4036        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4037        let reply = server
4038            .recv(
4039                client_transmit(
4040                    {
4041                        let mut request =
4042                            Message::builder_request(CREATE_PERMISSION, MessageWriteVec::new());
4043                        request
4044                            .add_attribute(&XorPeerAddress::new(
4045                                peer_address(),
4046                                request.transaction_id(),
4047                            ))
4048                            .unwrap();
4049                        // modify the XorPeerAddress to be invalid
4050                        request[25] = 0x80;
4051                        add_authenticated_request_required_attributes(
4052                            &mut request,
4053                            creds.clone(),
4054                            &nonce,
4055                        );
4056                        request
4057                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
4058                            .unwrap();
4059                        request.finish()
4060                    },
4061                    server.transport(),
4062                ),
4063                now,
4064            )
4065            .unwrap();
4066        validate_signed_error_reply(
4067            &reply.build().data,
4068            CREATE_PERMISSION,
4069            ErrorCode::BAD_REQUEST,
4070            creds,
4071        );
4072    }
4073
4074    fn channel_bind_request(credentials: LongTermKeyCredentials, nonce: &str) -> Vec<u8> {
4075        let mut request = Message::builder_request(CHANNEL_BIND, MessageWriteVec::new());
4076        request.add_attribute(&ChannelNumber::new(0x4000)).unwrap();
4077        request
4078            .add_attribute(&XorPeerAddress::new(
4079                peer_address(),
4080                request.transaction_id(),
4081            ))
4082            .unwrap();
4083        add_authenticated_request_required_attributes(&mut request, credentials.clone(), nonce);
4084        request
4085            .add_message_integrity(&credentials.into(), IntegrityAlgorithm::Sha1)
4086            .unwrap();
4087        request.finish()
4088    }
4089
4090    #[test]
4091    fn test_server_channel_bind_without_allocation() {
4092        let _init = crate::tests::test_init_log();
4093        let now = Instant::ZERO;
4094        let mut server = new_server(TransportType::Udp);
4095        let (realm, nonce) = initial_allocate(&mut server, now);
4096        let creds = credentials().into_long_term_credentials(&realm);
4097        let reply = server
4098            .recv(
4099                client_transmit(
4100                    channel_bind_request(creds.clone(), &nonce),
4101                    server.transport(),
4102                ),
4103                now,
4104            )
4105            .unwrap();
4106        validate_signed_error_reply(
4107            &reply.build().data,
4108            CHANNEL_BIND,
4109            ErrorCode::ALLOCATION_MISMATCH,
4110            creds,
4111        );
4112    }
4113
4114    #[test]
4115    fn test_server_channel_bind_missing_attributes() {
4116        let _init = crate::tests::test_init_log();
4117        let now = Instant::ZERO;
4118        let mut server = new_server(TransportType::Udp);
4119        let (realm, nonce) = initial_allocate(&mut server, now);
4120        let creds = credentials().into_long_term_credentials(&realm);
4121        let reply = authenticated_allocate_with_credentials(
4122            &mut server,
4123            TransportType::Udp,
4124            creds.clone(),
4125            &nonce,
4126            now,
4127        );
4128        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4129        let reply = server
4130            .recv(
4131                client_transmit(
4132                    {
4133                        let mut request =
4134                            Message::builder_request(CHANNEL_BIND, MessageWriteVec::new());
4135                        request
4136                            .add_attribute(&XorPeerAddress::new(
4137                                peer_address(),
4138                                request.transaction_id(),
4139                            ))
4140                            .unwrap();
4141                        add_authenticated_request_required_attributes(
4142                            &mut request,
4143                            creds.clone(),
4144                            &nonce,
4145                        );
4146                        request
4147                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
4148                            .unwrap();
4149                        request.finish()
4150                    },
4151                    server.transport(),
4152                ),
4153                now,
4154            )
4155            .unwrap();
4156        validate_signed_error_reply(
4157            &reply.build().data,
4158            CHANNEL_BIND,
4159            ErrorCode::BAD_REQUEST,
4160            creds.clone(),
4161        );
4162
4163        let mut server = new_server(TransportType::Udp);
4164        let (realm, nonce) = initial_allocate(&mut server, now);
4165        let creds = credentials().into_long_term_credentials(&realm);
4166        let reply = authenticated_allocate_with_credentials(
4167            &mut server,
4168            TransportType::Udp,
4169            creds.clone(),
4170            &nonce,
4171            now,
4172        );
4173        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4174        let reply = server
4175            .recv(
4176                client_transmit(
4177                    {
4178                        let mut request =
4179                            Message::builder_request(CHANNEL_BIND, MessageWriteVec::new());
4180                        request.add_attribute(&ChannelNumber::new(0x4000)).unwrap();
4181                        add_authenticated_request_required_attributes(
4182                            &mut request,
4183                            creds.clone(),
4184                            &nonce,
4185                        );
4186                        request
4187                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
4188                            .unwrap();
4189                        request.finish()
4190                    },
4191                    server.transport(),
4192                ),
4193                now,
4194            )
4195            .unwrap();
4196        validate_signed_error_reply(
4197            &reply.build().data,
4198            CHANNEL_BIND,
4199            ErrorCode::BAD_REQUEST,
4200            creds.clone(),
4201        );
4202    }
4203
4204    #[test]
4205    fn test_server_channel_bind_invalid_id() {
4206        let _init = crate::tests::test_init_log();
4207        let now = Instant::ZERO;
4208        let mut server = new_server(TransportType::Udp);
4209        let (realm, nonce) = initial_allocate(&mut server, now);
4210        let creds = credentials().into_long_term_credentials(&realm);
4211        let reply = authenticated_allocate_with_credentials(
4212            &mut server,
4213            TransportType::Udp,
4214            creds.clone(),
4215            &nonce,
4216            now,
4217        );
4218        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4219        let reply = server
4220            .recv(
4221                client_transmit(
4222                    {
4223                        let mut request =
4224                            Message::builder_request(CHANNEL_BIND, MessageWriteVec::new());
4225                        request.add_attribute(&ChannelNumber::new(0x0)).unwrap();
4226                        request
4227                            .add_attribute(&XorPeerAddress::new(
4228                                peer_address(),
4229                                request.transaction_id(),
4230                            ))
4231                            .unwrap();
4232                        add_authenticated_request_required_attributes(
4233                            &mut request,
4234                            creds.clone(),
4235                            &nonce,
4236                        );
4237                        request
4238                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
4239                            .unwrap();
4240                        request.finish()
4241                    },
4242                    server.transport(),
4243                ),
4244                now,
4245            )
4246            .unwrap();
4247        validate_signed_error_reply(
4248            &reply.build().data,
4249            CHANNEL_BIND,
4250            ErrorCode::BAD_REQUEST,
4251            creds.clone(),
4252        );
4253    }
4254
4255    #[test]
4256    fn test_server_channel_bind_wrong_family() {
4257        let _init = crate::tests::test_init_log();
4258        let now = Instant::ZERO;
4259        let mut server = new_server(TransportType::Udp);
4260        let (realm, nonce) = initial_allocate(&mut server, now);
4261        let creds = credentials().into_long_term_credentials(&realm);
4262        let reply = authenticated_allocate_with_credentials(
4263            &mut server,
4264            TransportType::Udp,
4265            creds.clone(),
4266            &nonce,
4267            now,
4268        );
4269        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4270        let reply = server
4271            .recv(
4272                client_transmit(
4273                    {
4274                        let mut request =
4275                            Message::builder_request(CHANNEL_BIND, MessageWriteVec::new());
4276                        request.add_attribute(&ChannelNumber::new(0x4000)).unwrap();
4277                        request
4278                            .add_attribute(&XorPeerAddress::new(
4279                                ipv6_peer_address(),
4280                                request.transaction_id(),
4281                            ))
4282                            .unwrap();
4283                        add_authenticated_request_required_attributes(
4284                            &mut request,
4285                            creds.clone(),
4286                            &nonce,
4287                        );
4288                        request
4289                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
4290                            .unwrap();
4291                        request.finish()
4292                    },
4293                    server.transport(),
4294                ),
4295                now,
4296            )
4297            .unwrap();
4298        validate_signed_error_reply(
4299            &reply.build().data,
4300            CHANNEL_BIND,
4301            ErrorCode::PEER_ADDRESS_FAMILY_MISMATCH,
4302            creds,
4303        );
4304    }
4305
4306    #[test]
4307    fn test_server_allocation_expire_channel_bind() {
4308        let _init = crate::tests::test_init_log();
4309        let now = Instant::ZERO;
4310        let mut server = new_server(TransportType::Udp);
4311        let (realm, nonce) = initial_allocate(&mut server, now);
4312        let creds = credentials().into_long_term_credentials(&realm);
4313        let reply = authenticated_allocate_with_credentials(
4314            &mut server,
4315            TransportType::Udp,
4316            creds.clone(),
4317            &nonce,
4318            now,
4319        );
4320        let (_msg, lifetime) = validate_authenticated_allocate_reply(&reply.data, creds.clone());
4321        let now = now + Duration::from_secs(lifetime as u64 + 1);
4322        let reply = server
4323            .recv(
4324                client_transmit(
4325                    channel_bind_request(creds.clone(), &nonce),
4326                    server.transport(),
4327                ),
4328                now,
4329            )
4330            .unwrap();
4331        validate_signed_error_reply(
4332            &reply.build().data,
4333            CHANNEL_BIND,
4334            ErrorCode::ALLOCATION_MISMATCH,
4335            creds,
4336        );
4337    }
4338
4339    #[test]
4340    fn test_server_duplicate_channel_bind() {
4341        let _init = crate::tests::test_init_log();
4342        let now = Instant::ZERO;
4343        let mut server = new_server(TransportType::Udp);
4344        let (realm, nonce) = initial_allocate(&mut server, now);
4345        let creds = credentials().into_long_term_credentials(&realm);
4346        let reply = authenticated_allocate_with_credentials(
4347            &mut server,
4348            TransportType::Udp,
4349            creds.clone(),
4350            &nonce,
4351            now,
4352        );
4353        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4354        channel_bind(&mut server, creds.clone(), &nonce, now);
4355        channel_bind(&mut server, creds.clone(), &nonce, now);
4356    }
4357
4358    fn channel_bind(
4359        server: &mut TurnServer,
4360        creds: LongTermKeyCredentials,
4361        nonce: &str,
4362        now: Instant,
4363    ) {
4364        let reply = server
4365            .recv(
4366                client_transmit(
4367                    channel_bind_request(creds.clone(), nonce),
4368                    server.transport(),
4369                ),
4370                now,
4371            )
4372            .unwrap();
4373        validate_signed_success(&reply.build().data, CHANNEL_BIND, creds.clone());
4374    }
4375
4376    #[test]
4377    fn test_server_channel_bind_refresh_wrong_address() {
4378        let _init = crate::tests::test_init_log();
4379        let now = Instant::ZERO;
4380        let mut server = new_server(TransportType::Udp);
4381        let (realm, nonce) = initial_allocate(&mut server, now);
4382        let creds = credentials().into_long_term_credentials(&realm);
4383        let reply = authenticated_allocate_with_credentials(
4384            &mut server,
4385            TransportType::Udp,
4386            creds.clone(),
4387            &nonce,
4388            now,
4389        );
4390        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4391        channel_bind(&mut server, creds.clone(), &nonce, now);
4392        let reply = server
4393            .recv(
4394                client_transmit(
4395                    {
4396                        let mut request =
4397                            Message::builder_request(CHANNEL_BIND, MessageWriteVec::new());
4398                        request.add_attribute(&ChannelNumber::new(0x4100)).unwrap();
4399                        request
4400                            .add_attribute(&XorPeerAddress::new(
4401                                peer_address(),
4402                                request.transaction_id(),
4403                            ))
4404                            .unwrap();
4405                        add_authenticated_request_required_attributes(
4406                            &mut request,
4407                            creds.clone(),
4408                            &nonce,
4409                        );
4410                        request
4411                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
4412                            .unwrap();
4413                        request.finish()
4414                    },
4415                    server.transport(),
4416                ),
4417                now,
4418            )
4419            .unwrap();
4420        validate_signed_error_reply(
4421            &reply.build().data,
4422            CHANNEL_BIND,
4423            ErrorCode::BAD_REQUEST,
4424            creds,
4425        );
4426    }
4427
4428    #[test]
4429    fn test_server_channel_bind_send_data() {
4430        let _init = crate::tests::test_init_log();
4431        let now = Instant::ZERO;
4432        let mut server = new_server(TransportType::Udp);
4433        let (realm, nonce) = initial_allocate(&mut server, now);
4434        let creds = credentials().into_long_term_credentials(&realm);
4435        let reply = authenticated_allocate_with_credentials(
4436            &mut server,
4437            TransportType::Udp,
4438            creds.clone(),
4439            &nonce,
4440            now,
4441        );
4442        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4443        channel_bind(&mut server, creds.clone(), &nonce, now);
4444        let data = [8; 9];
4445        let reply = server
4446            .recv(
4447                client_transmit(
4448                    {
4449                        let mut out = [0; 13];
4450                        ChannelData::new(0x4000, data.as_slice()).write_into_unchecked(&mut out);
4451                        out
4452                    },
4453                    server.transport(),
4454                ),
4455                now,
4456            )
4457            .unwrap();
4458        assert_eq!(reply.transport, TransportType::Udp);
4459        assert_eq!(reply.from, relayed_address());
4460        assert_eq!(reply.to, peer_address());
4461        assert_eq!(reply.data.build(), data);
4462    }
4463
4464    fn refresh_request_with_lifetime(
4465        credentials: LongTermKeyCredentials,
4466        nonce: &str,
4467        lifetime: u32,
4468        requested_address: Option<AddressFamily>,
4469    ) -> Vec<u8> {
4470        let mut request = Message::builder_request(REFRESH, MessageWriteVec::new());
4471        request.add_attribute(&Lifetime::new(lifetime)).unwrap();
4472        add_authenticated_request_required_attributes(&mut request, credentials.clone(), nonce);
4473        if let Some(family) = requested_address {
4474            request
4475                .add_attribute(&RequestedAddressFamily::new(family))
4476                .unwrap();
4477        }
4478        request
4479            .add_message_integrity(&credentials.into(), IntegrityAlgorithm::Sha1)
4480            .unwrap();
4481        request.finish()
4482    }
4483
4484    fn refresh_request(
4485        credentials: LongTermKeyCredentials,
4486        nonce: &str,
4487        requested_address: Option<AddressFamily>,
4488    ) -> Vec<u8> {
4489        refresh_request_with_lifetime(credentials, nonce, 1800, requested_address)
4490    }
4491
4492    #[test]
4493    fn test_server_refresh_without_allocation() {
4494        let _init = crate::tests::test_init_log();
4495        let now = Instant::ZERO;
4496        let mut server = new_server(TransportType::Udp);
4497        let (realm, nonce) = initial_allocate(&mut server, now);
4498        let creds = credentials().into_long_term_credentials(&realm);
4499        let reply = server
4500            .recv(
4501                client_transmit(
4502                    refresh_request(creds.clone(), &nonce, None),
4503                    server.transport(),
4504                ),
4505                now,
4506            )
4507            .unwrap();
4508        validate_signed_error_reply(
4509            &reply.build().data,
4510            REFRESH,
4511            ErrorCode::ALLOCATION_MISMATCH,
4512            creds,
4513        );
4514    }
4515
4516    #[test]
4517    fn test_server_refresh_dual_allocation() {
4518        let _init = crate::tests::test_init_log();
4519        let now = Instant::ZERO;
4520        let mut server = new_server(TransportType::Udp);
4521        let (realm, nonce) = initial_allocate(&mut server, now);
4522        let creds = credentials().into_long_term_credentials(&realm);
4523        authenticated_allocate_with_credentials_transport_families(
4524            &mut server,
4525            creds.clone(),
4526            &nonce,
4527            client_address(),
4528            RequestedTransport::UDP,
4529            &[
4530                (AddressFamily::IPV4, Ok(relayed_address())),
4531                (AddressFamily::IPV6, Ok(ipv6_relayed_address())),
4532            ],
4533            now,
4534        );
4535        let TurnServerPollRet::WaitUntil(now) = server.poll(now) else {
4536            unreachable!();
4537        };
4538        let reply = server
4539            .recv(
4540                client_transmit(
4541                    refresh_request(creds.clone(), &nonce, None),
4542                    server.transport(),
4543                ),
4544                now,
4545            )
4546            .unwrap();
4547        validate_signed_success(&reply.build().data, REFRESH, creds);
4548    }
4549
4550    fn delete_request(
4551        credentials: LongTermKeyCredentials,
4552        nonce: &str,
4553        requested_address: Option<AddressFamily>,
4554    ) -> Vec<u8> {
4555        refresh_request_with_lifetime(credentials, nonce, 0, requested_address)
4556    }
4557
4558    #[test]
4559    fn test_server_dual_allocation_delete_single() {
4560        let _init = crate::tests::test_init_log();
4561        let now = Instant::ZERO;
4562        let mut server = new_server(TransportType::Udp);
4563        let (realm, nonce) = initial_allocate(&mut server, now);
4564        let creds = credentials().into_long_term_credentials(&realm);
4565        authenticated_allocate_with_credentials_transport_families(
4566            &mut server,
4567            creds.clone(),
4568            &nonce,
4569            client_address(),
4570            RequestedTransport::UDP,
4571            &[
4572                (AddressFamily::IPV4, Ok(relayed_address())),
4573                (AddressFamily::IPV6, Ok(ipv6_relayed_address())),
4574            ],
4575            now,
4576        );
4577        let reply = server
4578            .recv(
4579                client_transmit(
4580                    delete_request(creds.clone(), &nonce, Some(AddressFamily::IPV4)),
4581                    server.transport(),
4582                ),
4583                now,
4584            )
4585            .unwrap();
4586        validate_signed_success(&reply.build().data, REFRESH, creds.clone());
4587        // duplicate delete results in error
4588        let reply = server
4589            .recv(
4590                client_transmit(
4591                    refresh_request(creds.clone(), &nonce, Some(AddressFamily::IPV4)),
4592                    server.transport(),
4593                ),
4594                now,
4595            )
4596            .unwrap();
4597        validate_signed_error_reply(
4598            &reply.build().data,
4599            REFRESH,
4600            ErrorCode::PEER_ADDRESS_FAMILY_MISMATCH,
4601            creds.clone(),
4602        );
4603
4604        // delete the other relayed address
4605        let reply = server
4606            .recv(
4607                client_transmit(
4608                    delete_request(creds.clone(), &nonce, Some(AddressFamily::IPV6)),
4609                    server.transport(),
4610                ),
4611                now,
4612            )
4613            .unwrap();
4614        validate_signed_success(&reply.build().data, REFRESH, creds.clone());
4615        // duplicate delete when there are no allocation results in error
4616        let reply = server
4617            .recv(
4618                client_transmit(
4619                    refresh_request(creds.clone(), &nonce, Some(AddressFamily::IPV6)),
4620                    server.transport(),
4621                ),
4622                now,
4623            )
4624            .unwrap();
4625        validate_signed_error_reply(
4626            &reply.build().data,
4627            REFRESH,
4628            ErrorCode::ALLOCATION_MISMATCH,
4629            creds.clone(),
4630        );
4631        let reply = server
4632            .recv(
4633                client_transmit(
4634                    refresh_request(creds.clone(), &nonce, None),
4635                    server.transport(),
4636                ),
4637                now,
4638            )
4639            .unwrap();
4640        validate_signed_error_reply(
4641            &reply.build().data,
4642            REFRESH,
4643            ErrorCode::ALLOCATION_MISMATCH,
4644            creds,
4645        );
4646    }
4647
4648    fn send_indication(peer_addr: SocketAddr, data: &[u8]) -> Vec<u8> {
4649        let mut msg = Message::builder(
4650            MessageType::from_class_method(MessageClass::Indication, SEND),
4651            TransactionId::generate(),
4652            MessageWriteVec::new(),
4653        );
4654        msg.add_attribute(&XorPeerAddress::new(peer_addr, msg.transaction_id()))
4655            .unwrap();
4656        msg.add_attribute(&AData::new(data)).unwrap();
4657        msg.finish()
4658    }
4659
4660    #[test]
4661    fn test_server_send_without_allocation() {
4662        let _init = crate::tests::test_init_log();
4663        let now = Instant::ZERO;
4664        let mut server = new_server(TransportType::Udp);
4665        assert!(server
4666            .recv(
4667                client_transmit(
4668                    send_indication(peer_address(), [8; 9].as_slice()),
4669                    server.transport()
4670                ),
4671                now,
4672            )
4673            .is_none());
4674    }
4675
4676    #[test]
4677    fn test_server_send_allocation_expired() {
4678        let _init = crate::tests::test_init_log();
4679        let now = Instant::ZERO;
4680        let mut server = new_server(TransportType::Udp);
4681        let (realm, nonce) = initial_allocate(&mut server, now);
4682        let creds = credentials().into_long_term_credentials(&realm);
4683        let reply = authenticated_allocate_with_credentials(
4684            &mut server,
4685            TransportType::Udp,
4686            creds.clone(),
4687            &nonce,
4688            now,
4689        );
4690        let (_msg, lifetime) = validate_authenticated_allocate_reply(&reply.data, creds.clone());
4691        let now = now + Duration::from_secs(lifetime as u64 + 1);
4692        assert!(server
4693            .recv(
4694                client_transmit(
4695                    send_indication(peer_address(), [8; 9].as_slice()),
4696                    server.transport()
4697                ),
4698                now,
4699            )
4700            .is_none());
4701    }
4702
4703    #[test]
4704    fn test_server_send_no_allocation() {
4705        let _init = crate::tests::test_init_log();
4706        let now = Instant::ZERO;
4707        let mut server = new_server(TransportType::Udp);
4708        let (realm, nonce) = initial_allocate(&mut server, now);
4709        let creds = credentials().into_long_term_credentials(&realm);
4710        let reply = authenticated_allocate_with_credentials(
4711            &mut server,
4712            TransportType::Udp,
4713            creds.clone(),
4714            &nonce,
4715            now,
4716        );
4717        let (_msg, lifetime) = validate_authenticated_allocate_reply(&reply.data, creds.clone());
4718        let now = now + Duration::from_secs(lifetime as u64 + 1);
4719        assert!(server
4720            .recv(
4721                client_transmit(
4722                    send_indication(ipv6_peer_address(), [8; 9].as_slice()),
4723                    server.transport()
4724                ),
4725                now,
4726            )
4727            .is_none());
4728    }
4729
4730    #[test]
4731    fn test_server_send_without_permission() {
4732        let _init = crate::tests::test_init_log();
4733        let now = Instant::ZERO;
4734        let mut server = new_server(TransportType::Udp);
4735        let (realm, nonce) = initial_allocate(&mut server, now);
4736        let creds = credentials().into_long_term_credentials(&realm);
4737        let reply = authenticated_allocate_with_credentials(
4738            &mut server,
4739            TransportType::Udp,
4740            creds.clone(),
4741            &nonce,
4742            now,
4743        );
4744        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4745        assert!(server
4746            .recv(
4747                client_transmit(
4748                    send_indication(peer_address(), [8; 9].as_slice()),
4749                    server.transport()
4750                ),
4751                now,
4752            )
4753            .is_none());
4754    }
4755
4756    fn create_permission_with_address(
4757        server: &mut TurnServer,
4758        creds: LongTermKeyCredentials,
4759        nonce: &str,
4760        peer_addr: SocketAddr,
4761        now: Instant,
4762    ) {
4763        let reply = server
4764            .recv(
4765                client_transmit(
4766                    create_permission_request(creds.clone(), nonce, peer_addr),
4767                    server.transport(),
4768                ),
4769                now,
4770            )
4771            .unwrap();
4772        validate_signed_success(&reply.build().data, CREATE_PERMISSION, creds);
4773    }
4774
4775    fn create_permission(
4776        server: &mut TurnServer,
4777        creds: LongTermKeyCredentials,
4778        nonce: &str,
4779        now: Instant,
4780    ) {
4781        create_permission_with_address(server, creds, nonce, peer_address(), now);
4782    }
4783
4784    #[test]
4785    fn test_server_send_indication_with_permission() {
4786        let _init = crate::tests::test_init_log();
4787        let now = Instant::ZERO;
4788        let mut server = new_server(TransportType::Udp);
4789        let (realm, nonce) = initial_allocate(&mut server, now);
4790        let creds = credentials().into_long_term_credentials(&realm);
4791        let reply = authenticated_allocate_with_credentials(
4792            &mut server,
4793            TransportType::Udp,
4794            creds.clone(),
4795            &nonce,
4796            now,
4797        );
4798        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4799        create_permission(&mut server, creds.clone(), &nonce, now);
4800        let data = [8; 9];
4801        let reply = server
4802            .recv(
4803                client_transmit(
4804                    send_indication(peer_address(), data.as_slice()),
4805                    server.transport(),
4806                ),
4807                now,
4808            )
4809            .unwrap();
4810        assert_eq!(reply.transport, TransportType::Udp);
4811        assert_eq!(reply.from, relayed_address());
4812        assert_eq!(reply.to, peer_address());
4813        assert_eq!(reply.data.build(), data);
4814    }
4815
4816    #[test]
4817    fn test_server_unknown_request() {
4818        let _init = crate::tests::test_init_log();
4819        let now = Instant::ZERO;
4820        let mut server = new_server(TransportType::Udp);
4821        let (realm, nonce) = initial_allocate(&mut server, now);
4822        let creds = credentials().into_long_term_credentials(&realm);
4823        let reply = authenticated_allocate_with_credentials(
4824            &mut server,
4825            TransportType::Udp,
4826            creds.clone(),
4827            &nonce,
4828            now,
4829        );
4830        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4831        let reply = server
4832            .recv(
4833                client_transmit(
4834                    {
4835                        let mut request =
4836                            Message::builder_request(Method::new(0x123), MessageWriteVec::new());
4837                        add_authenticated_request_required_attributes(
4838                            &mut request,
4839                            creds.clone(),
4840                            &nonce,
4841                        );
4842                        request
4843                            .add_message_integrity(&creds.clone().into(), IntegrityAlgorithm::Sha1)
4844                            .unwrap();
4845                        request.finish()
4846                    },
4847                    server.transport(),
4848                ),
4849                now,
4850            )
4851            .unwrap();
4852        validate_signed_error_reply(
4853            &reply.build().data,
4854            Method::new(0x123),
4855            ErrorCode::BAD_REQUEST,
4856            creds,
4857        );
4858    }
4859
4860    #[test]
4861    fn test_server_unknown_indication() {
4862        let _init = crate::tests::test_init_log();
4863        let now = Instant::ZERO;
4864        let mut server = new_server(TransportType::Udp);
4865        let (realm, nonce) = initial_allocate(&mut server, now);
4866        let creds = credentials().into_long_term_credentials(&realm);
4867        let reply = authenticated_allocate_with_credentials(
4868            &mut server,
4869            TransportType::Udp,
4870            creds.clone(),
4871            &nonce,
4872            now,
4873        );
4874        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4875        assert!(server
4876            .recv(
4877                client_transmit(
4878                    {
4879                        let request = Message::builder(
4880                            MessageType::from_class_method(
4881                                MessageClass::Indication,
4882                                Method::new(0x123),
4883                            ),
4884                            TransactionId::generate(),
4885                            MessageWriteVec::new(),
4886                        );
4887                        request.finish()
4888                    },
4889                    server.transport(),
4890                ),
4891                now,
4892            )
4893            .is_none());
4894    }
4895
4896    #[test]
4897    fn test_server_unknown_source_address() {
4898        let _init = crate::tests::test_init_log();
4899        let now = Instant::ZERO;
4900        let mut server = new_server(TransportType::Udp);
4901        assert!(server
4902            .recv(client_transmit([4; 12], server.transport()), now)
4903            .is_none());
4904    }
4905
4906    #[test]
4907    fn test_server_invalid_client_data() {
4908        let _init = crate::tests::test_init_log();
4909        let now = Instant::ZERO;
4910        let mut server = new_server(TransportType::Udp);
4911        let (realm, nonce) = initial_allocate(&mut server, now);
4912        let creds = credentials().into_long_term_credentials(&realm);
4913        let reply = authenticated_allocate_with_credentials(
4914            &mut server,
4915            TransportType::Udp,
4916            creds.clone(),
4917            &nonce,
4918            now,
4919        );
4920        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4921        assert!(server
4922            .recv(client_transmit([4; 12], server.transport()), now)
4923            .is_none());
4924    }
4925
4926    #[test]
4927    fn test_server_recv_no_channel() {
4928        let _init = crate::tests::test_init_log();
4929        let now = Instant::ZERO;
4930        let mut server = new_server(TransportType::Udp);
4931        let (realm, nonce) = initial_allocate(&mut server, now);
4932        let creds = credentials().into_long_term_credentials(&realm);
4933        let reply = authenticated_allocate_with_credentials(
4934            &mut server,
4935            TransportType::Udp,
4936            creds.clone(),
4937            &nonce,
4938            now,
4939        );
4940        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4941        assert!(server
4942            .recv(
4943                client_transmit(
4944                    {
4945                        let channel = ChannelData::new(0x4000, [7; 3].as_slice());
4946                        let mut out = vec![0; 7];
4947                        channel.write_into_unchecked(&mut out);
4948                        out
4949                    },
4950                    server.transport()
4951                ),
4952                now
4953            )
4954            .is_none());
4955    }
4956
4957    #[test]
4958    fn test_server_recv_channel_permission_expire() {
4959        let _init = crate::tests::test_init_log();
4960        let now = Instant::ZERO;
4961        let mut server = new_server(TransportType::Udp);
4962        let (realm, nonce) = initial_allocate(&mut server, now);
4963        let creds = credentials().into_long_term_credentials(&realm);
4964        let reply = authenticated_allocate_with_credentials(
4965            &mut server,
4966            TransportType::Udp,
4967            creds.clone(),
4968            &nonce,
4969            now,
4970        );
4971        validate_authenticated_allocate_reply(&reply.data, creds.clone());
4972        channel_bind(&mut server, creds.clone(), &nonce, now);
4973        let now = now + PERMISSION_DURATION + Duration::from_secs(1);
4974        assert!(server
4975            .recv(
4976                client_transmit(
4977                    {
4978                        let channel = ChannelData::new(0x4000, [7; 3].as_slice());
4979                        let mut out = vec![0; 7];
4980                        channel.write_into_unchecked(&mut out);
4981                        out
4982                    },
4983                    server.transport()
4984                ),
4985                now
4986            )
4987            .is_none());
4988    }
4989
4990    #[test]
4991    fn test_server_peer_recv_permission_expire() {
4992        let _init = crate::tests::test_init_log();
4993        let now = Instant::ZERO;
4994        let mut server = new_server(TransportType::Udp);
4995        let (realm, nonce) = initial_allocate(&mut server, now);
4996        let creds = credentials().into_long_term_credentials(&realm);
4997        let reply = authenticated_allocate_with_credentials(
4998            &mut server,
4999            TransportType::Udp,
5000            creds.clone(),
5001            &nonce,
5002            now,
5003        );
5004        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5005        create_permission(&mut server, creds.clone(), &nonce, now);
5006        let now = now + PERMISSION_DURATION + Duration::from_secs(1);
5007        assert!(server
5008            .recv(
5009                Transmit::new(
5010                    [6; 7],
5011                    TransportType::Udp,
5012                    peer_address(),
5013                    relayed_address()
5014                ),
5015                now
5016            )
5017            .is_none());
5018    }
5019
5020    fn create_udp(source: SocketAddr, destination: SocketAddr) -> Vec<u8> {
5021        assert_eq!(source.is_ipv4(), destination.is_ipv4());
5022        assert_eq!(source.is_ipv6(), destination.is_ipv6());
5023        let mut udp = [0; pnet_packet::udp::UdpPacket::minimum_packet_size()];
5024        let mut udp_packet = pnet_packet::udp::MutableUdpPacket::new(&mut udp).unwrap();
5025        udp_packet.populate(&pnet_packet::udp::Udp {
5026            source: source.port(),
5027            destination: destination.port(),
5028            length: 0x10,
5029            checksum: 0x0000,
5030            payload: vec![],
5031        });
5032        match (source, destination) {
5033            (SocketAddr::V4(source), SocketAddr::V4(destination)) => {
5034                let mut ip = [0; pnet_packet::ipv4::Ipv4Packet::minimum_packet_size()
5035                    + pnet_packet::udp::UdpPacket::minimum_packet_size()];
5036                let mut ip_packet = pnet_packet::ipv4::MutableIpv4Packet::new(&mut ip).unwrap();
5037                ip_packet.set_version(0x4);
5038                ip_packet.set_header_length(5);
5039                ip_packet.set_total_length(48);
5040                ip_packet.set_flags(pnet_packet::ipv4::Ipv4Flags::DontFragment);
5041                ip_packet.set_ttl(16);
5042                ip_packet.set_next_level_protocol(pnet_packet::ip::IpNextHeaderProtocols::Udp);
5043                ip_packet.set_source(source.ip().octets().into());
5044                ip_packet.set_destination(destination.ip().octets().into());
5045                ip_packet.set_payload(&udp);
5046                ip.to_vec()
5047            }
5048            (SocketAddr::V6(source), SocketAddr::V6(destination)) => {
5049                let mut ip = [0; pnet_packet::ipv6::Ipv6Packet::minimum_packet_size()
5050                    + pnet_packet::udp::UdpPacket::minimum_packet_size()];
5051                let mut ip_packet = pnet_packet::ipv6::MutableIpv6Packet::new(&mut ip).unwrap();
5052                ip_packet.set_version(0x6);
5053                ip_packet.set_payload_length(48);
5054                ip_packet.set_hop_limit(16);
5055                ip_packet.set_next_header(pnet_packet::ip::IpNextHeaderProtocols::Udp);
5056                ip_packet.set_source(source.ip().segments().into());
5057                ip_packet.set_destination(destination.ip().segments().into());
5058                ip_packet.set_payload(&udp);
5059                ip.to_vec()
5060            }
5061            _ => unreachable!(),
5062        }
5063    }
5064
5065    fn create_icmpv4<'p, T: AsRef<[u8]>>(
5066        typ: pnet_packet::icmp::IcmpType,
5067        code: pnet_packet::icmp::IcmpCode,
5068        icmp_data: u32,
5069        other_packet: T,
5070    ) -> pnet_packet::icmp::IcmpPacket<'p> {
5071        let data = other_packet.as_ref();
5072        let ret = vec![0; data.len() + 8];
5073        let mut icmp = pnet_packet::icmp::MutableIcmpPacket::owned(ret).unwrap();
5074        icmp.set_icmp_type(typ);
5075        icmp.set_icmp_code(code);
5076        let mut payload = vec![0; 4];
5077        BigEndian::write_u32(&mut payload, icmp_data);
5078        payload.extend_from_slice(data);
5079        icmp.set_payload(&payload);
5080        icmp.consume_to_immutable()
5081    }
5082
5083    fn create_icmpv6<'p, T: AsRef<[u8]>>(
5084        typ: pnet_packet::icmpv6::Icmpv6Type,
5085        code: pnet_packet::icmpv6::Icmpv6Code,
5086        icmp_data: u32,
5087        other_packet: T,
5088    ) -> pnet_packet::icmpv6::Icmpv6Packet<'p> {
5089        let data = other_packet.as_ref();
5090        let ret = vec![0; data.len() + 8];
5091        let mut icmp = pnet_packet::icmpv6::MutableIcmpv6Packet::owned(ret).unwrap();
5092        icmp.set_icmpv6_type(typ);
5093        icmp.set_icmpv6_code(code);
5094        let mut payload = vec![0; 4];
5095        BigEndian::write_u32(&mut payload, icmp_data);
5096        payload.extend_from_slice(data);
5097        icmp.set_payload(&payload);
5098        icmp.consume_to_immutable()
5099    }
5100
5101    fn validate_icmp(msg: &[u8], peer_addr: SocketAddr, typ: u8, code: u8, data: u32) {
5102        let msg = Message::from_bytes(msg).unwrap();
5103        assert!(msg.has_method(DATA));
5104        let xor_peer_address = msg.attribute::<XorPeerAddress>().unwrap();
5105        assert_eq!(xor_peer_address.addr(msg.transaction_id()), peer_addr);
5106        let icmp = msg.attribute::<Icmp>().unwrap();
5107        assert_eq!(icmp.icmp_type(), typ);
5108        assert_eq!(icmp.code(), code);
5109        assert_eq!(icmp.data(), data);
5110    }
5111
5112    #[test]
5113    fn test_server_recv_icmpv4() {
5114        let _init = crate::tests::test_init_log();
5115        let now = Instant::ZERO;
5116        let mut server = new_server(TransportType::Udp);
5117        let (realm, nonce) = initial_allocate(&mut server, now);
5118        let creds = credentials().into_long_term_credentials(&realm);
5119        let reply = authenticated_allocate_with_credentials(
5120            &mut server,
5121            TransportType::Udp,
5122            creds.clone(),
5123            &nonce,
5124            now,
5125        );
5126        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5127        create_permission(&mut server, creds.clone(), &nonce, now);
5128        // icmpv6 for ipv4 allocation is ignored
5129        assert!(server
5130            .recv_icmp(
5131                AddressFamily::IPV6,
5132                create_icmpv6(
5133                    pnet_packet::icmpv6::Icmpv6Types::DestinationUnreachable,
5134                    pnet_packet::icmpv6::Icmpv6Code::new(0),
5135                    0,
5136                    create_udp(ipv6_peer_address(), ipv6_relayed_address())
5137                )
5138                .packet(),
5139                now
5140            )
5141            .is_none());
5142        let icmp_type = pnet_packet::icmp::IcmpTypes::DestinationUnreachable;
5143        let icmp_code =
5144            pnet_packet::icmp::destination_unreachable::IcmpCodes::DestinationHostUnreachable;
5145        let transmit = server
5146            .recv_icmp(
5147                AddressFamily::IPV4,
5148                create_icmpv4(
5149                    icmp_type,
5150                    icmp_code,
5151                    0,
5152                    create_udp(relayed_address(), peer_address()),
5153                )
5154                .packet(),
5155                now,
5156            )
5157            .unwrap();
5158        assert_eq!(transmit.transport, TransportType::Udp);
5159        assert_eq!(transmit.from, server.listen_address());
5160        assert_eq!(transmit.to, client_address());
5161        validate_icmp(&transmit.data, peer_address(), icmp_type.0, icmp_code.0, 0);
5162    }
5163
5164    #[test]
5165    fn test_server_recv_icmpv6() {
5166        let _init = crate::tests::test_init_log();
5167        let now = Instant::ZERO;
5168        let mut server = new_server(TransportType::Udp);
5169        let (realm, nonce) = initial_allocate(&mut server, now);
5170        let creds = credentials().into_long_term_credentials(&realm);
5171        let reply = authenticated_allocate_with_credentials_transport_families(
5172            &mut server,
5173            creds.clone(),
5174            &nonce,
5175            client_address(),
5176            RequestedTransport::UDP,
5177            &[(AddressFamily::IPV6, Ok(ipv6_relayed_address()))],
5178            now,
5179        );
5180        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5181        create_permission_with_address(
5182            &mut server,
5183            creds.clone(),
5184            &nonce,
5185            ipv6_peer_address(),
5186            now,
5187        );
5188        // icmpv4 for ipv6 allocation is ignored
5189        assert!(server
5190            .recv_icmp(
5191                AddressFamily::IPV4,
5192                create_icmpv4(
5193                    pnet_packet::icmp::IcmpTypes::DestinationUnreachable,
5194                    pnet_packet::icmp::IcmpCode::new(0),
5195                    0,
5196                    create_udp(peer_address(), relayed_address())
5197                )
5198                .packet(),
5199                now
5200            )
5201            .is_none());
5202        let icmp_type = pnet_packet::icmpv6::Icmpv6Types::DestinationUnreachable;
5203        let icmp_code = pnet_packet::icmpv6::Icmpv6Code::new(3);
5204        let transmit = server
5205            .recv_icmp(
5206                AddressFamily::IPV6,
5207                create_icmpv6(
5208                    icmp_type,
5209                    icmp_code,
5210                    0,
5211                    create_udp(ipv6_relayed_address(), ipv6_peer_address()),
5212                )
5213                .packet(),
5214                now,
5215            )
5216            .unwrap();
5217        assert_eq!(transmit.transport, TransportType::Udp);
5218        assert_eq!(transmit.from, server.listen_address());
5219        assert_eq!(transmit.to, client_address());
5220        validate_icmp(
5221            &transmit.data,
5222            ipv6_peer_address(),
5223            icmp_type.0,
5224            icmp_code.0,
5225            0,
5226        );
5227    }
5228
5229    #[test]
5230    fn test_tcp_server_split_recv_channel() {
5231        let _init = crate::tests::test_init_log();
5232        let now = Instant::ZERO;
5233        let mut server = new_server(TransportType::Tcp);
5234        let (realm, nonce) = initial_allocate(&mut server, now);
5235        let creds = credentials().into_long_term_credentials(&realm);
5236        let reply = authenticated_allocate_with_credentials(
5237            &mut server,
5238            TransportType::Udp,
5239            creds.clone(),
5240            &nonce,
5241            now,
5242        );
5243        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5244        channel_bind(&mut server, creds.clone(), &nonce, now);
5245        let data = {
5246            let channel = ChannelData::new(0x4000, [7; 3].as_slice());
5247            let mut out = vec![0; 7];
5248            channel.write_into_unchecked(&mut out);
5249            out
5250        };
5251        for i in 1..data.len() - 1 {
5252            assert!(server
5253                .recv(client_transmit(&data[..i], server.transport()), now)
5254                .is_none());
5255            let ret = server
5256                .recv(client_transmit(&data[i..], server.transport()), now)
5257                .unwrap();
5258            assert_eq!(ret.transport, TransportType::Udp);
5259            assert_eq!(ret.from, relayed_address());
5260            assert_eq!(ret.to, peer_address());
5261            assert_eq!(&ret.data.build(), &data[4..]);
5262        }
5263    }
5264
5265    #[test]
5266    fn test_tcp_server_split_recv_indication() {
5267        let _init = crate::tests::test_init_log();
5268        let now = Instant::ZERO;
5269        let mut server = new_server(TransportType::Tcp);
5270        let (realm, nonce) = initial_allocate(&mut server, now);
5271        let creds = credentials().into_long_term_credentials(&realm);
5272        let reply = authenticated_allocate_with_credentials(
5273            &mut server,
5274            TransportType::Udp,
5275            creds.clone(),
5276            &nonce,
5277            now,
5278        );
5279        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5280        create_permission(&mut server, creds.clone(), &nonce, now);
5281        let mut msg = Message::builder_indication(SEND, MessageWriteVec::new());
5282        msg.add_attribute(&XorPeerAddress::new(peer_address(), msg.transaction_id()))
5283            .unwrap();
5284        let offset = msg.len() + 4;
5285        msg.add_attribute(&AData::new(&[7; 3])).unwrap();
5286        let data = msg.clone().build();
5287        for i in 1..data.len() - 1 {
5288            assert!(server
5289                .recv(client_transmit(&data[..i], server.transport()), now)
5290                .is_none());
5291            let ret = server
5292                .recv(client_transmit(&data[i..], server.transport()), now)
5293                .unwrap();
5294            assert_eq!(ret.transport, TransportType::Udp);
5295            assert_eq!(ret.from, relayed_address());
5296            assert_eq!(ret.to, peer_address());
5297            assert_eq!(&ret.data.build(), &data[offset..data.len() - 1]);
5298        }
5299    }
5300
5301    #[test]
5302    fn test_tcp_server_two_interleaved_clients() {
5303        let _init = crate::tests::test_init_log();
5304        let now = Instant::ZERO;
5305
5306        let client_address2 = {
5307            let mut addr = client_address();
5308            addr.set_port(1001);
5309            addr
5310        };
5311        let relayed_address2 = {
5312            let mut addr = relayed_address();
5313            addr.set_port(2223);
5314            addr
5315        };
5316        let peer_address2 = {
5317            let mut addr = peer_address();
5318            addr.set_port(44445);
5319            addr
5320        };
5321
5322        for split in [3, 9] {
5323            let mut server = new_server(TransportType::Tcp);
5324
5325            let initial_allocate1 = initial_allocate_msg();
5326            let initial_allocate2 = initial_allocate_msg();
5327            assert!(server
5328                .recv(
5329                    client_transmit(&initial_allocate1[..split], TransportType::Tcp,),
5330                    now
5331                )
5332                .is_none());
5333
5334            assert!(server
5335                .recv(
5336                    client_transmit_from(
5337                        &initial_allocate2[..split],
5338                        TransportType::Tcp,
5339                        client_address2,
5340                    ),
5341                    now
5342                )
5343                .is_none());
5344
5345            let reply = server
5346                .recv(
5347                    client_transmit(&initial_allocate1[split..], TransportType::Tcp),
5348                    now,
5349                )
5350                .unwrap();
5351            let (realm, nonce) = validate_initial_allocate_reply(&reply.build().data);
5352            let creds = credentials().into_long_term_credentials(&realm);
5353
5354            let reply = server
5355                .recv(
5356                    client_transmit_from(
5357                        &initial_allocate2[split..],
5358                        TransportType::Tcp,
5359                        client_address2,
5360                    ),
5361                    now,
5362                )
5363                .unwrap();
5364            let (realm2, nonce2) = validate_initial_allocate_reply(&reply.build().data);
5365            let creds2 = credentials().into_long_term_credentials(&realm2);
5366
5367            let families = [(AddressFamily::IPV4, Ok(relayed_address()))];
5368            let auth_alloc = authenticated_allocate_msg(
5369                creds.clone(),
5370                &nonce,
5371                RequestedTransport::UDP,
5372                &families,
5373            );
5374            let families2 = [(AddressFamily::IPV4, Ok(relayed_address2))];
5375            let auth_alloc2 = authenticated_allocate_msg(
5376                creds2.clone(),
5377                &nonce2,
5378                RequestedTransport::UDP,
5379                &families2,
5380            );
5381
5382            assert!(server
5383                .recv(
5384                    client_transmit(&auth_alloc[..split], TransportType::Tcp,),
5385                    now
5386                )
5387                .is_none());
5388
5389            assert!(server
5390                .recv(
5391                    client_transmit_from(
5392                        &auth_alloc2[..split],
5393                        TransportType::Tcp,
5394                        client_address2,
5395                    ),
5396                    now
5397                )
5398                .is_none());
5399
5400            assert!(server
5401                .recv(
5402                    client_transmit(&auth_alloc[split..], TransportType::Tcp),
5403                    now,
5404                )
5405                .is_none());
5406            let reply = authenticated_allocate_reply(&mut server, &families, now);
5407            validate_authenticated_allocate_reply(&reply.data, creds.clone());
5408
5409            assert!(server
5410                .recv(
5411                    client_transmit_from(
5412                        &auth_alloc2[split..],
5413                        TransportType::Tcp,
5414                        client_address2
5415                    ),
5416                    now,
5417                )
5418                .is_none());
5419            let reply = authenticated_allocate_reply(&mut server, &families2, now);
5420            validate_authenticated_allocate_reply(&reply.data, creds2.clone());
5421
5422            let perm = create_permission_request(creds.clone(), &nonce, peer_address());
5423            let perm2 = create_permission_request(creds.clone(), &nonce2, peer_address2);
5424
5425            assert!(server
5426                .recv(client_transmit(&perm[..split], TransportType::Tcp,), now)
5427                .is_none());
5428
5429            assert!(server
5430                .recv(
5431                    client_transmit_from(&perm2[..split], TransportType::Tcp, client_address2,),
5432                    now
5433                )
5434                .is_none());
5435
5436            let reply = server
5437                .recv(client_transmit(&perm[split..], TransportType::Tcp), now)
5438                .unwrap();
5439            validate_signed_success(&reply.build().data, CREATE_PERMISSION, creds);
5440
5441            let reply = server
5442                .recv(
5443                    client_transmit_from(&perm2[split..], TransportType::Tcp, client_address2),
5444                    now,
5445                )
5446                .unwrap();
5447            validate_signed_success(&reply.build().data, CREATE_PERMISSION, creds2);
5448        }
5449    }
5450
5451    fn tcp_connect_msg(
5452        peer_addr: SocketAddr,
5453        credentials: LongTermKeyCredentials,
5454        nonce: &str,
5455    ) -> Vec<u8> {
5456        let mut connect = Message::builder_request(CONNECT, MessageWriteVec::new());
5457        connect
5458            .add_attribute(&XorPeerAddress::new(peer_addr, connect.transaction_id()))
5459            .unwrap();
5460        add_authenticated_request_required_attributes(&mut connect, credentials.clone(), nonce);
5461        connect
5462            .add_message_integrity(&credentials.into(), IntegrityAlgorithm::Sha1)
5463            .unwrap();
5464        connect.add_fingerprint().unwrap();
5465        connect.finish()
5466    }
5467
5468    fn tcp_connection_bind_msg(
5469        connection_id: u32,
5470        credentials: LongTermKeyCredentials,
5471        nonce: &str,
5472    ) -> Vec<u8> {
5473        let mut connect = Message::builder_request(CONNECTION_BIND, MessageWriteVec::new());
5474        connect
5475            .add_attribute(&ConnectionId::new(connection_id))
5476            .unwrap();
5477        add_authenticated_request_required_attributes(&mut connect, credentials.clone(), nonce);
5478        connect
5479            .add_message_integrity(&credentials.into(), IntegrityAlgorithm::Sha1)
5480            .unwrap();
5481        connect.add_fingerprint().unwrap();
5482        connect.finish()
5483    }
5484
5485    fn tcp_local_address() -> SocketAddr {
5486        "127.0.0.1:22222".parse().unwrap()
5487    }
5488
5489    fn tcp_connect(
5490        server: &mut TurnServer,
5491        peer_addr: SocketAddr,
5492        credentials: LongTermKeyCredentials,
5493        nonce: &str,
5494        now: Instant,
5495    ) -> u32 {
5496        let msg = tcp_connect_msg(peer_addr, credentials, nonce);
5497        assert!(server
5498            .recv(client_transmit(msg, server.transport()), now)
5499            .is_none());
5500        let TurnServerPollRet::TcpConnect {
5501            relayed_addr,
5502            peer_addr,
5503            listen_addr,
5504            client_addr,
5505        } = server.poll(now)
5506        else {
5507            unreachable!();
5508        };
5509        assert_eq!(relayed_addr, relayed_address());
5510        assert_eq!(peer_addr, peer_address());
5511        assert_eq!(listen_addr, server.listen_address());
5512        assert_eq!(client_addr, client_address());
5513
5514        server.tcp_connected(
5515            relayed_addr,
5516            peer_addr,
5517            listen_addr,
5518            client_addr,
5519            Ok(relayed_addr),
5520            now,
5521        );
5522
5523        let reply = server.poll_transmit(now).unwrap();
5524        assert_eq!(reply.transport, server.transport());
5525        assert_eq!(reply.from, server.listen_address());
5526        assert_eq!(reply.to, client_address());
5527        let reply = reply.data.build();
5528        let reply = Message::from_bytes(&reply).unwrap();
5529        assert!(reply.has_method(CONNECT));
5530        assert!(reply.has_class(MessageClass::Success));
5531        reply.attribute::<ConnectionId>().unwrap().id()
5532    }
5533
5534    fn tcp_connection_bind_with_peer_data(
5535        server: &mut TurnServer,
5536        connection_id: u32,
5537        local_addr: SocketAddr,
5538        creds: LongTermKeyCredentials,
5539        nonce: &str,
5540        now: Instant,
5541        peer_data: &[u8],
5542    ) {
5543        let mut msg = tcp_connection_bind_msg(connection_id, creds.clone(), nonce);
5544        msg.extend_from_slice(peer_data);
5545        let reply = server
5546            .recv(
5547                Transmit::new(msg, server.transport(), local_addr, server.listen_address()),
5548                now,
5549            )
5550            .unwrap();
5551
5552        assert_eq!(reply.transport, server.transport());
5553        assert_eq!(reply.from, server.listen_address());
5554        assert_eq!(reply.to, local_addr);
5555
5556        let reply = reply.data.build();
5557        let reply = Message::from_bytes(&reply).unwrap();
5558        assert!(reply.has_method(CONNECTION_BIND));
5559        assert!(reply.has_class(MessageClass::Success));
5560    }
5561
5562    fn tcp_connection_bind(
5563        server: &mut TurnServer,
5564        connection_id: u32,
5565        local_addr: SocketAddr,
5566        creds: LongTermKeyCredentials,
5567        nonce: &str,
5568        now: Instant,
5569    ) {
5570        tcp_connection_bind_with_peer_data(
5571            server,
5572            connection_id,
5573            local_addr,
5574            creds,
5575            nonce,
5576            now,
5577            &[],
5578        );
5579    }
5580
5581    fn tcp_data_transfer(server: &mut TurnServer, now: Instant) {
5582        let data = [9; 5];
5583        let forward = server
5584            .recv(
5585                Transmit::new(
5586                    data,
5587                    server.transport(),
5588                    tcp_local_address(),
5589                    server.listen_address(),
5590                ),
5591                now,
5592            )
5593            .unwrap();
5594        assert_eq!(forward.transport, TransportType::Tcp);
5595        assert_eq!(forward.from, relayed_address());
5596        assert_eq!(forward.to, peer_address());
5597        assert_eq!(&forward.data.build(), data.as_slice());
5598
5599        let data = [12; 6];
5600        let forward = server
5601            .recv(
5602                Transmit::new(data, server.transport(), peer_address(), relayed_address()),
5603                now,
5604            )
5605            .unwrap();
5606        assert_eq!(forward.transport, TransportType::Tcp);
5607        assert_eq!(forward.from, server.listen_address());
5608        assert_eq!(forward.to, tcp_local_address());
5609        assert_eq!(&forward.data.build(), data.as_slice());
5610    }
5611
5612    #[test]
5613    fn test_server_tcp_allocation_success() {
5614        let _init = crate::tests::test_init_log();
5615        let now = Instant::ZERO;
5616        let mut server = new_server(TransportType::Tcp);
5617        let (realm, nonce) = initial_allocate(&mut server, now);
5618        let creds = credentials().into_long_term_credentials(&realm);
5619        let reply = authenticated_allocate_with_credentials(
5620            &mut server,
5621            TransportType::Tcp,
5622            creds.clone(),
5623            &nonce,
5624            now,
5625        );
5626        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5627        create_permission(&mut server, creds.clone(), &nonce, now);
5628
5629        let connection_id = tcp_connect(&mut server, peer_address(), creds.clone(), &nonce, now);
5630
5631        tcp_connection_bind(
5632            &mut server,
5633            connection_id,
5634            tcp_local_address(),
5635            creds,
5636            &nonce,
5637            now,
5638        );
5639
5640        tcp_data_transfer(&mut server, now);
5641    }
5642
5643    #[test]
5644    fn test_server_tcp_allocation_early_peer_data() {
5645        let _init = crate::tests::test_init_log();
5646        let now = Instant::ZERO;
5647        let mut server = new_server(TransportType::Tcp);
5648        let (realm, nonce) = initial_allocate(&mut server, now);
5649        let creds = credentials().into_long_term_credentials(&realm);
5650        let reply = authenticated_allocate_with_credentials(
5651            &mut server,
5652            TransportType::Tcp,
5653            creds.clone(),
5654            &nonce,
5655            now,
5656        );
5657        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5658        create_permission(&mut server, creds.clone(), &nonce, now);
5659
5660        let connection_id = tcp_connect(&mut server, peer_address(), creds.clone(), &nonce, now);
5661
5662        // early peer data
5663        let peer_data = [12; 6];
5664        assert!(server
5665            .recv(
5666                Transmit::new(
5667                    peer_data,
5668                    server.transport(),
5669                    peer_address(),
5670                    relayed_address()
5671                ),
5672                now,
5673            )
5674            .is_none());
5675
5676        // client data sent directly after CONNECTION-BIND
5677        let data = [9; 5];
5678        tcp_connection_bind_with_peer_data(
5679            &mut server,
5680            connection_id,
5681            tcp_local_address(),
5682            creds,
5683            &nonce,
5684            now,
5685            &data,
5686        );
5687
5688        // order of these transmits is undefined
5689        let forward = server.poll_transmit(now).unwrap();
5690        assert_eq!(forward.transport, TransportType::Tcp);
5691        assert_eq!(forward.from, relayed_address());
5692        assert_eq!(forward.to, peer_address());
5693        assert_eq!(&forward.data, data.as_slice());
5694
5695        let forward = server.poll_transmit(now).unwrap();
5696        assert_eq!(forward.transport, TransportType::Tcp);
5697        assert_eq!(forward.from, server.listen_address());
5698        assert_eq!(forward.to, tcp_local_address());
5699        assert_eq!(&forward.data.build(), peer_data.as_slice());
5700    }
5701
5702    #[test]
5703    fn test_server_tcp_incoming_peer_data() {
5704        let _init = crate::tests::test_init_log();
5705        let now = Instant::ZERO;
5706        let mut server = new_server(TransportType::Tcp);
5707        let (realm, nonce) = initial_allocate(&mut server, now);
5708        let creds = credentials().into_long_term_credentials(&realm);
5709        let reply = authenticated_allocate_with_credentials(
5710            &mut server,
5711            TransportType::Tcp,
5712            creds.clone(),
5713            &nonce,
5714            now,
5715        );
5716        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5717        create_permission(&mut server, creds.clone(), &nonce, now);
5718
5719        let peer_data = [12; 6];
5720        let client_request = server
5721            .recv(
5722                Transmit::new(
5723                    peer_data,
5724                    server.transport(),
5725                    peer_address(),
5726                    relayed_address(),
5727                ),
5728                now,
5729            )
5730            .unwrap();
5731
5732        assert_eq!(client_request.transport, TransportType::Tcp);
5733        assert_eq!(client_request.from, server.listen_address());
5734        assert_eq!(client_request.to, client_address());
5735        let data = client_request.data.build();
5736        let msg = Message::from_bytes(&data).unwrap();
5737        assert!(msg.has_method(CONNECTION_ATTEMPT));
5738        assert!(msg.has_class(MessageClass::Request));
5739        let connection_id = msg.attribute::<ConnectionId>().unwrap().id();
5740        let peer_addr = msg
5741            .attribute::<XorPeerAddress>()
5742            .unwrap()
5743            .addr(msg.transaction_id());
5744        assert_eq!(peer_addr, peer_address());
5745
5746        tcp_connection_bind(
5747            &mut server,
5748            connection_id,
5749            tcp_local_address(),
5750            creds,
5751            &nonce,
5752            now,
5753        );
5754
5755        let data = [9; 5];
5756        let forward = server
5757            .recv(
5758                Transmit::new(
5759                    data,
5760                    server.transport(),
5761                    tcp_local_address(),
5762                    server.listen_address(),
5763                ),
5764                now,
5765            )
5766            .unwrap();
5767        assert_eq!(forward.transport, TransportType::Tcp);
5768        assert_eq!(forward.from, relayed_address());
5769        assert_eq!(forward.to, peer_address());
5770        assert_eq!(&forward.data.build(), data.as_slice());
5771
5772        // the peer data
5773        let forward = server.poll_transmit(now).unwrap();
5774        assert_eq!(forward.transport, TransportType::Tcp);
5775        assert_eq!(forward.from, server.listen_address());
5776        assert_eq!(forward.to, tcp_local_address());
5777        assert_eq!(&forward.data.build(), peer_data.as_slice());
5778    }
5779
5780    #[test]
5781    fn test_server_tcp_connection_already_exists_after_success() {
5782        let _init = crate::tests::test_init_log();
5783        let now = Instant::ZERO;
5784        let mut server = new_server(TransportType::Tcp);
5785        let (realm, nonce) = initial_allocate(&mut server, now);
5786        let creds = credentials().into_long_term_credentials(&realm);
5787        let reply = authenticated_allocate_with_credentials(
5788            &mut server,
5789            TransportType::Tcp,
5790            creds.clone(),
5791            &nonce,
5792            now,
5793        );
5794        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5795        create_permission(&mut server, creds.clone(), &nonce, now);
5796
5797        let connection_id = tcp_connect(&mut server, peer_address(), creds.clone(), &nonce, now);
5798
5799        tcp_connection_bind(
5800            &mut server,
5801            connection_id,
5802            tcp_local_address(),
5803            creds.clone(),
5804            &nonce,
5805            now,
5806        );
5807
5808        let msg = tcp_connect_msg(peer_address(), creds, &nonce);
5809        let reply = server
5810            .recv(
5811                Transmit::new(
5812                    msg,
5813                    server.transport(),
5814                    client_address(),
5815                    server.listen_address(),
5816                ),
5817                now,
5818            )
5819            .unwrap()
5820            .build();
5821        let reply = Message::from_bytes(&reply.data).unwrap();
5822        assert!(reply.has_method(CONNECT));
5823        let err_code = reply.attribute::<ErrorCode>().unwrap();
5824        assert_eq!(err_code.code(), ErrorCode::CONNECTION_ALREADY_EXISTS);
5825
5826        tcp_data_transfer(&mut server, now);
5827    }
5828
5829    #[test]
5830    fn test_server_tcp_connection_already_exists_while_pending() {
5831        let _init = crate::tests::test_init_log();
5832        let now = Instant::ZERO;
5833        let mut server = new_server(TransportType::Tcp);
5834        let (realm, nonce) = initial_allocate(&mut server, now);
5835        let creds = credentials().into_long_term_credentials(&realm);
5836        let reply = authenticated_allocate_with_credentials(
5837            &mut server,
5838            TransportType::Tcp,
5839            creds.clone(),
5840            &nonce,
5841            now,
5842        );
5843        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5844        create_permission(&mut server, creds.clone(), &nonce, now);
5845
5846        let connection_id = tcp_connect(&mut server, peer_address(), creds.clone(), &nonce, now);
5847
5848        let msg = tcp_connect_msg(peer_address(), creds.clone(), &nonce);
5849        let reply = server
5850            .recv(
5851                Transmit::new(
5852                    msg,
5853                    server.transport(),
5854                    client_address(),
5855                    server.listen_address(),
5856                ),
5857                now,
5858            )
5859            .unwrap()
5860            .build();
5861        let reply = Message::from_bytes(&reply.data).unwrap();
5862        assert!(reply.has_method(CONNECT));
5863        let err_code = reply.attribute::<ErrorCode>().unwrap();
5864        assert_eq!(err_code.code(), ErrorCode::CONNECTION_ALREADY_EXISTS);
5865
5866        tcp_connection_bind(
5867            &mut server,
5868            connection_id,
5869            tcp_local_address(),
5870            creds.clone(),
5871            &nonce,
5872            now,
5873        );
5874
5875        tcp_data_transfer(&mut server, now);
5876    }
5877
5878    #[test]
5879    fn test_server_tcp_connection_socket_error() {
5880        let _init = crate::tests::test_init_log();
5881        for (tcp_err, err_code) in [
5882            (
5883                TcpConnectError::InsufficientCapacity,
5884                ErrorCode::INSUFFICIENT_CAPACITY,
5885            ),
5886            (TcpConnectError::Forbidden, ErrorCode::FORBIDDEN),
5887            (
5888                TcpConnectError::TimedOut,
5889                ErrorCode::CONNECTION_TIMEOUT_OR_FAILURE,
5890            ),
5891            (
5892                TcpConnectError::Failure,
5893                ErrorCode::CONNECTION_TIMEOUT_OR_FAILURE,
5894            ),
5895        ] {
5896            let now = Instant::ZERO;
5897            let mut server = new_server(TransportType::Tcp);
5898            let (realm, nonce) = initial_allocate(&mut server, now);
5899            let creds = credentials().into_long_term_credentials(&realm);
5900            let reply = authenticated_allocate_with_credentials(
5901                &mut server,
5902                TransportType::Tcp,
5903                creds.clone(),
5904                &nonce,
5905                now,
5906            );
5907            validate_authenticated_allocate_reply(&reply.data, creds.clone());
5908            create_permission(&mut server, creds.clone(), &nonce, now);
5909
5910            let msg = tcp_connect_msg(peer_address(), creds.clone(), &nonce);
5911            assert!(server
5912                .recv(client_transmit(msg, server.transport()), now)
5913                .is_none());
5914            let TurnServerPollRet::TcpConnect {
5915                relayed_addr,
5916                peer_addr,
5917                listen_addr,
5918                client_addr,
5919            } = server.poll(now)
5920            else {
5921                unreachable!();
5922            };
5923            assert_eq!(relayed_addr, relayed_address());
5924            assert_eq!(peer_addr, peer_address());
5925            assert_eq!(listen_addr, server.listen_address());
5926            assert_eq!(client_addr, client_address());
5927
5928            server.tcp_connected(
5929                relayed_addr,
5930                peer_addr,
5931                listen_addr,
5932                client_addr,
5933                Err(tcp_err),
5934                now,
5935            );
5936
5937            let reply = server.poll_transmit(now).unwrap();
5938            assert_eq!(reply.transport, server.transport());
5939            assert_eq!(reply.from, server.listen_address());
5940            assert_eq!(reply.to, client_address());
5941            let reply = reply.data.build();
5942            let reply = Message::from_bytes(&reply).unwrap();
5943            assert!(reply.has_method(CONNECT));
5944            assert!(reply.has_class(MessageClass::Error));
5945            assert_eq!(reply.attribute::<ErrorCode>().unwrap().code(), err_code);
5946        }
5947    }
5948
5949    #[test]
5950    fn test_server_tcp_connection_bind_allocation_mismatch() {
5951        let _init = crate::tests::test_init_log();
5952        let now = Instant::ZERO;
5953        let mut server = new_server(TransportType::Tcp);
5954        let (realm, nonce) = initial_allocate(&mut server, now);
5955        let creds = credentials().into_long_term_credentials(&realm);
5956        let reply = authenticated_allocate_with_credentials(
5957            &mut server,
5958            TransportType::Tcp,
5959            creds.clone(),
5960            &nonce,
5961            now,
5962        );
5963        validate_authenticated_allocate_reply(&reply.data, creds.clone());
5964
5965        let now = now + DEFAULT_ALLOCATION_DURATION - Duration::from_secs(1);
5966        create_permission(&mut server, creds.clone(), &nonce, now);
5967
5968        // CONNECTION-BIND sent from a control connection results in an error.
5969        let connection_id = tcp_connect(&mut server, peer_address(), creds.clone(), &nonce, now);
5970
5971        let msg = tcp_connection_bind_msg(connection_id, creds.clone(), &nonce);
5972        let reply = server
5973            .recv(
5974                Transmit::new(
5975                    msg,
5976                    server.transport(),
5977                    client_address(),
5978                    server.listen_address(),
5979                ),
5980                now,
5981            )
5982            .unwrap();
5983
5984        assert_eq!(reply.transport, server.transport());
5985        assert_eq!(reply.from, server.listen_address());
5986        assert_eq!(reply.to, client_address());
5987
5988        let reply = reply.data.build();
5989        let reply = Message::from_bytes(&reply).unwrap();
5990        assert!(reply.has_method(CONNECTION_BIND));
5991        assert!(reply.has_class(MessageClass::Error));
5992        assert_eq!(
5993            reply.attribute::<ErrorCode>().unwrap().code(),
5994            ErrorCode::BAD_REQUEST
5995        );
5996
5997        // allocation has expired
5998        let now = now + Duration::from_secs(5);
5999
6000        let msg = tcp_connection_bind_msg(connection_id, creds.clone(), &nonce);
6001        let reply = server
6002            .recv(
6003                Transmit::new(
6004                    msg,
6005                    server.transport(),
6006                    tcp_local_address(),
6007                    server.listen_address(),
6008                ),
6009                now,
6010            )
6011            .unwrap();
6012
6013        assert_eq!(reply.transport, server.transport());
6014        assert_eq!(reply.from, server.listen_address());
6015        assert_eq!(reply.to, tcp_local_address());
6016
6017        let reply = reply.data.build();
6018        let reply = Message::from_bytes(&reply).unwrap();
6019        assert!(reply.has_method(CONNECTION_BIND));
6020        assert!(reply.has_class(MessageClass::Error));
6021        assert_eq!(
6022            reply.attribute::<ErrorCode>().unwrap().code(),
6023            ErrorCode::ALLOCATION_MISMATCH
6024        );
6025    }
6026}