Struct librice::stun::message::Message

source ·
pub struct Message { /* private fields */ }
Expand description

The structure that encapsulates the entirety of a STUN message

Contains the MessageType, a transaction ID, and a list of STUN Attributes.

Implementations§

Create a new Message with the provided MessageType and transaction ID

Note you probably want to use one of the other helper constructors instead.

Examples
let mtype = MessageType::from_class_method(MessageClass::Indication, BINDING);
let message = Message::new(mtype, 0.into());
assert!(message.has_class(MessageClass::Indication));
assert!(message.has_method(BINDING));
Examples found in repository?
src/stun/message.rs (lines 352-355)
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
    pub fn new_request(method: u16) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Request, method),
            Message::generate_transaction(),
        )
    }

    /// Create a new success [`Message`] response from the provided request
    ///
    /// # Panics
    ///
    /// When a non-request [`Message`] is passed as the original input [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// let success = Message::new_success(&message);
    /// assert!(success.has_class(MessageClass::Success));
    /// assert!(success.has_method(BINDING));
    /// ```
    pub fn new_success(orig: &Message) -> Self {
        if !orig.has_class(MessageClass::Request) {
            panic!(
                "A success response message was attempted to be created from a non-request message"
            );
        }
        Message::new(
            MessageType::from_class_method(MessageClass::Success, orig.method()),
            orig.transaction_id(),
        )
    }

    /// Create a new error [`Message`] response from the provided request
    ///
    /// # Panics
    ///
    /// When a non-request [`Message`] is passed as the original input [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// let success = Message::new_error(&message);
    /// assert!(success.has_class(MessageClass::Error));
    /// assert!(success.has_method(BINDING));
    /// ```
    pub fn new_error(orig: &Message) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Error, orig.method()),
            orig.transaction_id(),
        )
    }

Create a new request Message of the provided method

Examples
let message = Message::new_request(BINDING);
assert!(message.has_class(MessageClass::Request));
assert!(message.has_method(BINDING));
Examples found in repository?
src/gathering.rs (line 61)
60
61
62
63
64
65
66
67
fn generate_bind_request() -> std::io::Result<Message> {
    let mut out = Message::new_request(BINDING);
    out.add_fingerprint()
        .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid message"))?;

    trace!("generated to {}", out);
    Ok(out)
}
More examples
Hide additional examples
src/conncheck.rs (line 161)
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
    fn generate_stun_request(
        conncheck: Arc<ConnCheck>,
        username: String,
        controlling: bool,
        tie_breaker: u64,
    ) -> Result<StunRequest, StunError> {
        let mut msg = Message::new_request(BINDING);

        // XXX: this needs to be the priority as if the candidate was peer-reflexive
        msg.add_attribute(Priority::new(conncheck.pair.local.priority))?;
        if controlling {
            msg.add_attribute(IceControlling::new(tie_breaker))?;
        } else {
            msg.add_attribute(IceControlled::new(tie_breaker))?;
        }
        if conncheck.nominate {
            msg.add_attribute(UseCandidate::new())?;
        }
        msg.add_attribute(Username::new(&username)?)?;
        msg.add_message_integrity(&conncheck.agent.local_credentials().unwrap())?;
        msg.add_fingerprint()?;

        let to = conncheck.pair.remote.address;
        conncheck.agent.stun_request_transaction(&msg, to)?.build()
    }

Create a new success Message response from the provided request

Panics

When a non-request Message is passed as the original input Message

Examples
let message = Message::new_request(BINDING);
let success = Message::new_success(&message);
assert!(success.has_class(MessageClass::Success));
assert!(success.has_method(BINDING));
Examples found in repository?
src/conncheck.rs (line 772)
767
768
769
770
771
772
773
774
775
776
777
fn binding_success_response(
    msg: &Message,
    from: SocketAddr,
    local_credentials: MessageIntegrityCredentials,
) -> Result<Message, AgentError> {
    let mut response = Message::new_success(msg);
    response.add_attribute(XorMappedAddress::new(from, msg.transaction_id()))?;
    response.add_message_integrity(&local_credentials)?;
    response.add_fingerprint()?;
    Ok(response)
}

Create a new error Message response from the provided request

Panics

When a non-request Message is passed as the original input Message

Examples
let message = Message::new_request(BINDING);
let success = Message::new_error(&message);
assert!(success.has_class(MessageClass::Error));
assert!(success.has_method(BINDING));
Examples found in repository?
src/stun/message.rs (line 1012)
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
    pub fn unknown_attributes(
        src: &Message,
        attributes: &[AttributeType],
    ) -> Result<Message, StunError> {
        let mut out = Message::new_error(src);
        out.add_attribute(Software::new("stund - librice v0.1")?)?;
        out.add_attribute(ErrorCode::new(420, "Unknown Attributes")?)?;
        if !attributes.is_empty() {
            out.add_attribute(UnknownAttributes::new(attributes))?;
        }
        Ok(out)
    }

    /// Generate an error message with an [`ERROR_CODE`] attribute signalling a 'Bad Request'
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// # use librice::stun::attribute::*;
    /// # use std::convert::TryInto;
    /// let msg = Message::new_request(BINDING);
    /// let error_msg = Message::bad_request(&msg).unwrap();
    /// assert!(error_msg.has_attribute(ERROR_CODE));
    /// let error_code =  error_msg.attribute::<ErrorCode>(ERROR_CODE).unwrap();
    /// assert_eq!(error_code.code(), 400);
    /// ```
    pub fn bad_request(src: &Message) -> Result<Message, StunError> {
        let mut out = Message::new_error(src);
        out.add_attribute(Software::new("stund - librice v0.1")?)?;
        out.add_attribute(ErrorCode::new(400, "Bad Request")?)?;
        Ok(out)
    }
More examples
Hide additional examples
src/stun/agent.rs (line 491)
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
    pub async fn perform(&self) -> Result<(Message, SocketAddr), StunError> {
        StunAgent::maybe_store_message(&self.agent.inner.state, self.msg.clone());
        let tid = self.msg.transaction_id();
        let (send_abortable, send_abort_handle) =
            futures::future::abortable(Self::send_request(&self.agent, self.msg.clone(), self.to));

        let to = self.to;
        let mut receive_s =
            self.agent
                .receive_stream_filter(move |stun_or_data| match stun_or_data {
                    StunOrData::Stun(msg, from) => tid == msg.transaction_id() && from == &to,
                    _ => false,
                });
        let (recv_abortable, recv_abort_handle) = {
            let send_abort_handle = send_abort_handle.clone();
            futures::future::abortable(clock::timeout(
                self.agent.clock.clone(),
                Duration::from_secs(40),
                receive_s.next().then(|msg| async move {
                    send_abort_handle.abort();
                    msg.and_then(|msg| msg.stun())
                        .ok_or(StunError::ResourceNotFound)
                }),
            ))
        };

        {
            let mut inner = self.inner.lock().unwrap();
            inner.send_abort = Some(send_abort_handle);
            inner.recv_abort = Some(recv_abort_handle);
        }

        futures::pin_mut!(send_abortable);
        futures::pin_mut!(recv_abortable);

        // race the sending and receiving futures returning the first that succeeds
        let ret = match futures::future::try_select(send_abortable, recv_abortable).await {
            Ok(Either::Left((x, _))) => x.map(|_| (Message::new_error(&self.msg), self.to)),
            Ok(Either::Right((y, _))) => y.map_err(|_| StunError::TimedOut)?,
            Err(Either::Left((_send_aborted, recv_abortable))) => {
                // if both have been aborted, then we return aborted, otherwise, we continue
                // waiting for a response until timeout
                recv_abortable
                    .await
                    .map_err(|_| StunError::Aborted)?
                    .unwrap_or(Err(StunError::TimedOut))
            }
            _ => unreachable!(),
        };
        let _ = StunAgent::take_outstanding_request(
            &self.agent.inner.state,
            &self.msg.transaction_id(),
        );

        ret
    }
src/conncheck.rs (line 887)
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
    async fn handle_binding_request(
        weak_inner: Weak<Mutex<ConnCheckListInner>>,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        msg: &Message,
        from: SocketAddr,
    ) -> Result<Option<Message>, AgentError> {
        trace!("have request {}", msg);

        let local_credentials = agent
            .local_credentials()
            .ok_or(AgentError::ResourceNotFound)?;

        if let Some(error_msg) = Message::check_attribute_types(
            msg,
            &[
                USERNAME,
                FINGERPRINT,
                MESSAGE_INTEGRITY,
                ICE_CONTROLLED,
                ICE_CONTROLLING,
                PRIORITY,
                USE_CANDIDATE,
            ],
            &[USERNAME, FINGERPRINT, MESSAGE_INTEGRITY, PRIORITY],
        ) {
            // failure -> send error response
            return Ok(Some(error_msg));
        }
        let peer_nominating =
            if let Some(use_candidate_raw) = msg.attribute::<RawAttribute>(USE_CANDIDATE) {
                if UseCandidate::from_raw(&use_candidate_raw).is_ok() {
                    true
                } else {
                    return Ok(Some(Message::bad_request(msg)?));
                }
            } else {
                false
            };

        let priority = match msg.attribute::<Priority>(PRIORITY) {
            Some(p) => p.priority(),
            None => {
                return Ok(Some(Message::bad_request(msg)?));
            }
        };

        let ice_controlling = msg.attribute::<IceControlling>(ICE_CONTROLLING);
        let ice_controlled = msg.attribute::<IceControlled>(ICE_CONTROLLED);

        let response = {
            let checklist = weak_inner.upgrade().ok_or(AgentError::ConnectionClosed)?;
            let mut checklist = checklist.lock().unwrap();

            if checklist.state == CheckListState::Completed && !peer_nominating {
                // ignore binding requests if we are completed
                trace!("ignoring binding request as we have completed");
                return Ok(None);
            }

            // validate username
            if let Some(username) = msg.attribute::<Username>(USERNAME) {
                if !validate_username(username, &checklist.local_credentials) {
                    warn!("binding request failed username validation -> UNAUTHORIZED");
                    let mut response = Message::new_error(msg);
                    response.add_attribute(ErrorCode::builder(ErrorCode::UNAUTHORIZED).build()?)?;
                    return Ok(Some(response));
                }
            } else {
                // existence is checked above so can only fail when the username is invalid
                return Ok(Some(Message::bad_request(msg)?));
            }

            {
                // Deal with role conflicts
                // RFC 8445 7.3.1.1.  Detecting and Repairing Role Conflicts
                let set = checklist
                    .set_inner
                    .upgrade()
                    .ok_or(AgentError::ConnectionClosed)?;
                let mut set = set.lock().unwrap();
                if let Some(ice_controlling) = ice_controlling {
                    //  o  If the agent is in the controlling role, and the ICE-CONTROLLING
                    //     attribute is present in the request:
                    if set.controlling {
                        if set.tie_breaker >= ice_controlling.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLING attribute, the agent generates
                            //    a Binding error response and includes an ERROR-CODE attribute
                            //    with a value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLING attribute, the agent switches to the
                            //    controlled role.
                            set.controlling = false;
                            checklist.controlling = false;
                            // TODO: update priorities and other things
                        }
                    }
                }
                if let Some(ice_controlled) = ice_controlled {
                    // o  If the agent is in the controlled role, and the ICE-CONTROLLED
                    //    attribute is present in the request:
                    if !set.controlling {
                        if set.tie_breaker >= ice_controlled.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLED attribute, the agent switches to
                            //    the controlling role.
                            set.controlling = true;
                            checklist.set_controlling(false);
                            for l in set.checklists.iter() {
                                if l.checklist_id == checklist.checklist_id {
                                    continue;
                                }
                                let mut l = l.inner.lock().unwrap();
                                l.set_controlling(false);
                            }
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLED attribute, the agent generates a Binding
                            //    error response and includes an ERROR-CODE attribute with a
                            //    value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        }
                    }
                }
            }

            checklist.handle_binding_request(
                peer_nominating,
                component_id,
                local,
                agent,
                from,
                priority,
            )?
        };
        if let Some(component) = response {
            component.set_state(ComponentState::Connected).await;
        }
        Ok(Some(binding_success_response(
            msg,
            from,
            local_credentials,
        )?))
    }

Retrieve the MessageType of a Message

Examples
let message = Message::new_request(BINDING);
assert!(message.get_type().has_class(MessageClass::Request));
assert!(message.get_type().has_method(BINDING));
Examples found in repository?
src/stun/message.rs (line 290)
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "Message(class: {:?}, method: {} ({:#x}), transaction: {}, attributes: ",
            self.get_type().class(),
            self.get_type().method(),
            self.get_type().method(),
            self.transaction_id()
        )?;
        if self.attributes.is_empty() {
            write!(f, "[]")?;
        } else {
            write!(f, "[")?;
            for (i, a) in self.attributes.iter().enumerate() {
                if i > 0 {
                    write!(f, ", ")?;
                }
                write!(f, "{}", a)?;
            }
            write!(f, "]")?;
        }
        write!(f, ")")
    }
}

fn padded_attr_size(attr: &RawAttribute) -> usize {
    if attr.length() % 4 == 0 {
        4 + attr.length() as usize
    } else {
        8 + attr.length() as usize - attr.length() as usize % 4
    }
}

impl Message {
    /// Create a new [`Message`] with the provided [`MessageType`] and transaction ID
    ///
    /// Note you probably want to use one of the other helper constructors instead.
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let mtype = MessageType::from_class_method(MessageClass::Indication, BINDING);
    /// let message = Message::new(mtype, 0.into());
    /// assert!(message.has_class(MessageClass::Indication));
    /// assert!(message.has_method(BINDING));
    /// ```
    pub fn new(mtype: MessageType, transaction: TransactionId) -> Self {
        Self {
            msg_type: mtype,
            transaction,
            attributes: vec![],
        }
    }

    /// Create a new request [`Message`] of the provided method
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert!(message.has_class(MessageClass::Request));
    /// assert!(message.has_method(BINDING));
    /// ```
    pub fn new_request(method: u16) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Request, method),
            Message::generate_transaction(),
        )
    }

    /// Create a new success [`Message`] response from the provided request
    ///
    /// # Panics
    ///
    /// When a non-request [`Message`] is passed as the original input [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// let success = Message::new_success(&message);
    /// assert!(success.has_class(MessageClass::Success));
    /// assert!(success.has_method(BINDING));
    /// ```
    pub fn new_success(orig: &Message) -> Self {
        if !orig.has_class(MessageClass::Request) {
            panic!(
                "A success response message was attempted to be created from a non-request message"
            );
        }
        Message::new(
            MessageType::from_class_method(MessageClass::Success, orig.method()),
            orig.transaction_id(),
        )
    }

    /// Create a new error [`Message`] response from the provided request
    ///
    /// # Panics
    ///
    /// When a non-request [`Message`] is passed as the original input [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// let success = Message::new_error(&message);
    /// assert!(success.has_class(MessageClass::Error));
    /// assert!(success.has_method(BINDING));
    /// ```
    pub fn new_error(orig: &Message) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Error, orig.method()),
            orig.transaction_id(),
        )
    }

    /// Retrieve the [`MessageType`] of a [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert!(message.get_type().has_class(MessageClass::Request));
    /// assert!(message.get_type().has_method(BINDING));
    /// ```
    pub fn get_type(&self) -> MessageType {
        self.msg_type
    }

    /// Retrieve the [`MessageClass`] of a [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.class(), MessageClass::Request);
    /// ```
    pub fn class(&self) -> MessageClass {
        self.get_type().class()
    }

    /// Returns whether the [`Message`] is of the specified [`MessageClass`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert!(message.has_class(MessageClass::Request));
    /// ```
    pub fn has_class(&self, cls: MessageClass) -> bool {
        self.class() == cls
    }

    /// Returns whether the [`Message`] is a response
    ///
    /// This means that the [`Message`] has a class of either success or error
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.is_response(), false);
    ///
    /// let error = Message::new_error(&message);
    /// assert_eq!(error.is_response(), true);
    ///
    /// let success = Message::new_success(&message);
    /// assert_eq!(success.is_response(), true);
    /// ```
    pub fn is_response(&self) -> bool {
        self.class().is_response()
    }

    /// Retrieves the method of the [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.method(), BINDING);
    /// ```
    pub fn method(&self) -> u16 {
        self.get_type().method()
    }

Retrieve the MessageClass of a Message

Examples
let message = Message::new_request(BINDING);
assert_eq!(message.class(), MessageClass::Request);
Examples found in repository?
src/stun/message.rs (line 444)
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
    pub fn has_class(&self, cls: MessageClass) -> bool {
        self.class() == cls
    }

    /// Returns whether the [`Message`] is a response
    ///
    /// This means that the [`Message`] has a class of either success or error
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.is_response(), false);
    ///
    /// let error = Message::new_error(&message);
    /// assert_eq!(error.is_response(), true);
    ///
    /// let success = Message::new_success(&message);
    /// assert_eq!(success.is_response(), true);
    /// ```
    pub fn is_response(&self) -> bool {
        self.class().is_response()
    }

Returns whether the Message is of the specified MessageClass

Examples
let message = Message::new_request(BINDING);
assert!(message.has_class(MessageClass::Request));
Examples found in repository?
src/stun/agent.rs (line 116)
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    fn maybe_store_message(state: &Mutex<StunAgentState>, msg: Message) {
        if msg.has_class(MessageClass::Request) {
            let mut state = state.lock().unwrap();
            trace!("storing request {}", msg.transaction_id());
            state.outstanding_requests.insert(msg.transaction_id(), msg);
        }
    }

    fn take_outstanding_request(
        state: &Mutex<StunAgentState>,
        transaction_id: &TransactionId,
    ) -> Option<Message> {
        let mut state = state.lock().unwrap();
        state.take_outstanding_request(transaction_id)
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            stun.id = ?self.id
        )
    )]
    pub fn set_local_credentials(&self, credentials: MessageIntegrityCredentials) {
        let mut state = self.inner.state.lock().unwrap();
        state.local_credentials = Some(credentials)
    }

    pub fn local_credentials(&self) -> Option<MessageIntegrityCredentials> {
        let state = self.inner.state.lock().unwrap();
        state.local_credentials.clone()
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            stun.id = ?self.id
        )
    )]
    pub fn set_remote_credentials(&self, credentials: MessageIntegrityCredentials) {
        let mut state = self.inner.state.lock().unwrap();
        state.remote_credentials = Some(credentials)
    }

    pub fn remote_credentials(&self) -> Option<MessageIntegrityCredentials> {
        let state = self.inner.state.lock().unwrap();
        state.remote_credentials.clone()
    }

    pub async fn send_data_to(&self, bytes: &[u8], to: SocketAddr) -> Result<(), std::io::Error> {
        self.inner.channel.send(DataFraming::from(bytes, to)).await
    }

    #[tracing::instrument(
        name = "send_to",
        skip(self, msg, to),
        fields(
            stun.id = self.id,
            msg.transaction = %msg.transaction_id(),
            to
        )
    )]
    pub async fn send_to(&self, msg: Message, to: SocketAddr) -> Result<(), std::io::Error> {
        StunAgent::maybe_store_message(&self.inner.state, msg.clone());
        self.send_data_to(&msg.to_bytes(), to).await
    }

    pub async fn send(&self, msg: Message) -> Result<(), std::io::Error> {
        let to = self.inner.channel.remote_addr()?;
        self.send_to(msg, to).await
    }

    fn receive_task_loop(inner_weak: Weak<StunAgentInner>, channel: &StunChannel, inner_id: usize) {
        // XXX: can we remove this demuxing task?
        // retrieve stream outside task to avoid a race
        let recv_stream = channel.receive_stream();
        let local_addr = channel.local_addr();
        debug!(
            "starting stun_recv_loop stun.id={} local_addr={:?}",
            inner_id, local_addr
        );
        async_std::task::spawn({
            let span = debug_span!("stun_recv_loop", stun.id = inner_id, ?local_addr);
            async move {
                futures::pin_mut!(recv_stream);

                debug!("started");
                while let Some(data_address) = recv_stream.next().await {
                    trace!(
                        "got {} bytes from {:?}",
                        data_address.data.len(),
                        data_address.address
                    );
                    let inner = match Weak::upgrade(&inner_weak) {
                        Some(inner) => inner,
                        None => {
                            warn!("stun agent has disappeared, exiting receive loop");
                            break;
                        }
                    };
                    match Message::from_bytes(&data_address.data) {
                        Ok(stun_msg) => {
                            debug!("received from {:?} {}", data_address.address, stun_msg);
                            let handle = {
                                let mut state = inner.state.lock().unwrap();
                                state.handle_stun(stun_msg, &data_address.data, data_address.address)
                            };
                            match handle {
                                HandleStunReply::Broadcast(stun_msg) => {
                                    inner
                                        .broadcaster
                                        .broadcast(StunOrData::Stun(stun_msg, data_address.address))
                                        .await;
                                }
                                HandleStunReply::Failure(err) => {
                                    warn!("Failed to handle message. {:?}", err);
                                    break;
                                }
                                _ => {}
                            }
                        }
                        Err(_) => {
                            let peer_validated = {
                                let state = inner.state.lock().unwrap();
                                state.validated_peers.get(&data_address.address).is_some()
                            };
                            if peer_validated {
                                inner
                                    .broadcaster
                                    .broadcast(StunOrData::Data(
                                        data_address.data,
                                        data_address.address,
                                    ))
                                    .await
                            } else if matches!(inner.channel, StunChannel::Tcp(_)) {
                                // close the tcp channel
                                warn!("stun message not the first message sent over TCP channel, closing");
                                if let Err(e) = inner.channel.close().await {
                                    warn!("error closing channel {:?}", e);
                                }
                                break;
                            } else {
                                trace!("dropping unvalidated data from peer");
                            }
                        }
                    }
                }
                debug!("task exit");
            }
            .instrument(span.or_current())
        });
    }

    fn ensure_receive_task_loop(&self) {
        {
            let mut state = self.inner.state.lock().unwrap();
            if !state.receive_loop_started {
                let inner_weak = Arc::downgrade(&self.inner);
                StunAgent::receive_task_loop(inner_weak, &self.inner.channel, self.inner.id);
                state.receive_loop_started = true;
            }
        }
    }

    pub fn receive_stream_filter<F>(&self, filter: F) -> impl Stream<Item = StunOrData>
    where
        F: Fn(&StunOrData) -> bool + Send + Sync + 'static,
    {
        let ret = self.inner.broadcaster.channel_with_filter(filter);
        self.ensure_receive_task_loop();
        ret
    }

    pub fn receive_stream(&self) -> impl Stream<Item = StunOrData> {
        self.receive_stream_filter(|_| true)
    }

    #[tracing::instrument(
        level = "debug",
        err,
        skip(self, msg, addr),
        fields(
            agent_id = %self.inner.id,
            transaction_id = %msg.transaction_id(),
            target_addr = ?addr,
            source_addr = ?self.inner.channel.local_addr()
        ),
    )]
    pub fn stun_request_transaction(
        &self,
        msg: &Message,
        addr: SocketAddr,
    ) -> Result<StunRequestBuilder, StunError> {
        StunRequestBuilder::new(self.clone(), msg.clone(), addr)
    }
}

pub struct StunRequestBuilder {
    agent: StunAgent,
    msg: Message,
    to: SocketAddr,
}

impl StunRequestBuilder {
    fn new(agent: StunAgent, msg: Message, addr: SocketAddr) -> Result<Self, StunError> {
        if !msg.has_class(MessageClass::Request) {
            return Err(StunError::WrongImplementation);
        }
        Ok(Self {
            agent,
            msg,
            to: addr,
        })
    }
More examples
Hide additional examples
src/stun/message.rs (line 374)
373
374
375
376
377
378
379
380
381
382
383
    pub fn new_success(orig: &Message) -> Self {
        if !orig.has_class(MessageClass::Request) {
            panic!(
                "A success response message was attempted to be created from a non-request message"
            );
        }
        Message::new(
            MessageType::from_class_method(MessageClass::Success, orig.method()),
            orig.transaction_id(),
        )
    }
src/conncheck.rs (line 205)
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }

Returns whether the Message is a response

This means that the Message has a class of either success or error

Examples
let message = Message::new_request(BINDING);
assert_eq!(message.is_response(), false);

let error = Message::new_error(&message);
assert_eq!(error.is_response(), true);

let success = Message::new_success(&message);
assert_eq!(success.is_response(), true);
Examples found in repository?
src/stun/agent.rs (line 607)
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
    fn handle_stun(&mut self, msg: Message, orig_data: &[u8], from: SocketAddr) -> HandleStunReply {
        if msg.is_response() {
            if let Some(orig_request) = self.take_outstanding_request(&msg.transaction_id()) {
                // only validate response if the original request had credentials
                if orig_request
                    .attribute::<MessageIntegrity>(MESSAGE_INTEGRITY)
                    .is_some()
                {
                    if let Some(remote_creds) = &self.remote_credentials {
                        match msg.validate_integrity(orig_data, remote_creds) {
                            Ok(_) => {
                                self.validated_peer(from);
                                HandleStunReply::Broadcast(msg)
                            }
                            Err(e) => {
                                debug!("message failed integrity check: {:?}", e);
                                HandleStunReply::Ignore
                            }
                        }
                    } else {
                        debug!("no remote credentials, ignoring");
                        HandleStunReply::Ignore
                    }
                } else {
                    // original message didn't have integrity, reply doesn't need to either
                    self.validated_peer(from);
                    HandleStunReply::Broadcast(msg)
                }
            } else {
                debug!("unmatched stun response, dropping {}", msg);
                // unmatched response -> drop
                HandleStunReply::Ignore
            }
        } else {
            self.validated_peer(from);
            HandleStunReply::Broadcast(msg)
        }
    }
More examples
Hide additional examples
src/conncheck.rs (line 199)
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }

Retrieves the method of the Message

Examples
let message = Message::new_request(BINDING);
assert_eq!(message.method(), BINDING);
Examples found in repository?
src/stun/message.rs (line 380)
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
    pub fn new_success(orig: &Message) -> Self {
        if !orig.has_class(MessageClass::Request) {
            panic!(
                "A success response message was attempted to be created from a non-request message"
            );
        }
        Message::new(
            MessageType::from_class_method(MessageClass::Success, orig.method()),
            orig.transaction_id(),
        )
    }

    /// Create a new error [`Message`] response from the provided request
    ///
    /// # Panics
    ///
    /// When a non-request [`Message`] is passed as the original input [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// let success = Message::new_error(&message);
    /// assert!(success.has_class(MessageClass::Error));
    /// assert!(success.has_method(BINDING));
    /// ```
    pub fn new_error(orig: &Message) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Error, orig.method()),
            orig.transaction_id(),
        )
    }

    /// Retrieve the [`MessageType`] of a [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert!(message.get_type().has_class(MessageClass::Request));
    /// assert!(message.get_type().has_method(BINDING));
    /// ```
    pub fn get_type(&self) -> MessageType {
        self.msg_type
    }

    /// Retrieve the [`MessageClass`] of a [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.class(), MessageClass::Request);
    /// ```
    pub fn class(&self) -> MessageClass {
        self.get_type().class()
    }

    /// Returns whether the [`Message`] is of the specified [`MessageClass`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert!(message.has_class(MessageClass::Request));
    /// ```
    pub fn has_class(&self, cls: MessageClass) -> bool {
        self.class() == cls
    }

    /// Returns whether the [`Message`] is a response
    ///
    /// This means that the [`Message`] has a class of either success or error
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.is_response(), false);
    ///
    /// let error = Message::new_error(&message);
    /// assert_eq!(error.is_response(), true);
    ///
    /// let success = Message::new_success(&message);
    /// assert_eq!(success.is_response(), true);
    /// ```
    pub fn is_response(&self) -> bool {
        self.class().is_response()
    }

    /// Retrieves the method of the [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.method(), BINDING);
    /// ```
    pub fn method(&self) -> u16 {
        self.get_type().method()
    }

    /// Returns whether the [`Message`] is of the specified method
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert_eq!(message.has_method(BINDING), true);
    /// assert_eq!(message.has_method(0), false);
    /// ```
    pub fn has_method(&self, method: u16) -> bool {
        self.method() == method
    }

Returns whether the Message is of the specified method

Examples
let message = Message::new_request(BINDING);
assert_eq!(message.has_method(BINDING), true);
assert_eq!(message.has_method(0), false);

Retrieves the 96-bit transaction ID of the Message

Examples
let mtype = MessageType::from_class_method(MessageClass::Request, BINDING);
let transaction_id = Message::generate_transaction();
let message = Message::new(mtype, transaction_id);
assert_eq!(message.transaction_id(), transaction_id);
Examples found in repository?
src/stun/agent.rs (line 118)
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
    fn maybe_store_message(state: &Mutex<StunAgentState>, msg: Message) {
        if msg.has_class(MessageClass::Request) {
            let mut state = state.lock().unwrap();
            trace!("storing request {}", msg.transaction_id());
            state.outstanding_requests.insert(msg.transaction_id(), msg);
        }
    }

    fn take_outstanding_request(
        state: &Mutex<StunAgentState>,
        transaction_id: &TransactionId,
    ) -> Option<Message> {
        let mut state = state.lock().unwrap();
        state.take_outstanding_request(transaction_id)
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            stun.id = ?self.id
        )
    )]
    pub fn set_local_credentials(&self, credentials: MessageIntegrityCredentials) {
        let mut state = self.inner.state.lock().unwrap();
        state.local_credentials = Some(credentials)
    }

    pub fn local_credentials(&self) -> Option<MessageIntegrityCredentials> {
        let state = self.inner.state.lock().unwrap();
        state.local_credentials.clone()
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            stun.id = ?self.id
        )
    )]
    pub fn set_remote_credentials(&self, credentials: MessageIntegrityCredentials) {
        let mut state = self.inner.state.lock().unwrap();
        state.remote_credentials = Some(credentials)
    }

    pub fn remote_credentials(&self) -> Option<MessageIntegrityCredentials> {
        let state = self.inner.state.lock().unwrap();
        state.remote_credentials.clone()
    }

    pub async fn send_data_to(&self, bytes: &[u8], to: SocketAddr) -> Result<(), std::io::Error> {
        self.inner.channel.send(DataFraming::from(bytes, to)).await
    }

    #[tracing::instrument(
        name = "send_to",
        skip(self, msg, to),
        fields(
            stun.id = self.id,
            msg.transaction = %msg.transaction_id(),
            to
        )
    )]
    pub async fn send_to(&self, msg: Message, to: SocketAddr) -> Result<(), std::io::Error> {
        StunAgent::maybe_store_message(&self.inner.state, msg.clone());
        self.send_data_to(&msg.to_bytes(), to).await
    }

    pub async fn send(&self, msg: Message) -> Result<(), std::io::Error> {
        let to = self.inner.channel.remote_addr()?;
        self.send_to(msg, to).await
    }

    fn receive_task_loop(inner_weak: Weak<StunAgentInner>, channel: &StunChannel, inner_id: usize) {
        // XXX: can we remove this demuxing task?
        // retrieve stream outside task to avoid a race
        let recv_stream = channel.receive_stream();
        let local_addr = channel.local_addr();
        debug!(
            "starting stun_recv_loop stun.id={} local_addr={:?}",
            inner_id, local_addr
        );
        async_std::task::spawn({
            let span = debug_span!("stun_recv_loop", stun.id = inner_id, ?local_addr);
            async move {
                futures::pin_mut!(recv_stream);

                debug!("started");
                while let Some(data_address) = recv_stream.next().await {
                    trace!(
                        "got {} bytes from {:?}",
                        data_address.data.len(),
                        data_address.address
                    );
                    let inner = match Weak::upgrade(&inner_weak) {
                        Some(inner) => inner,
                        None => {
                            warn!("stun agent has disappeared, exiting receive loop");
                            break;
                        }
                    };
                    match Message::from_bytes(&data_address.data) {
                        Ok(stun_msg) => {
                            debug!("received from {:?} {}", data_address.address, stun_msg);
                            let handle = {
                                let mut state = inner.state.lock().unwrap();
                                state.handle_stun(stun_msg, &data_address.data, data_address.address)
                            };
                            match handle {
                                HandleStunReply::Broadcast(stun_msg) => {
                                    inner
                                        .broadcaster
                                        .broadcast(StunOrData::Stun(stun_msg, data_address.address))
                                        .await;
                                }
                                HandleStunReply::Failure(err) => {
                                    warn!("Failed to handle message. {:?}", err);
                                    break;
                                }
                                _ => {}
                            }
                        }
                        Err(_) => {
                            let peer_validated = {
                                let state = inner.state.lock().unwrap();
                                state.validated_peers.get(&data_address.address).is_some()
                            };
                            if peer_validated {
                                inner
                                    .broadcaster
                                    .broadcast(StunOrData::Data(
                                        data_address.data,
                                        data_address.address,
                                    ))
                                    .await
                            } else if matches!(inner.channel, StunChannel::Tcp(_)) {
                                // close the tcp channel
                                warn!("stun message not the first message sent over TCP channel, closing");
                                if let Err(e) = inner.channel.close().await {
                                    warn!("error closing channel {:?}", e);
                                }
                                break;
                            } else {
                                trace!("dropping unvalidated data from peer");
                            }
                        }
                    }
                }
                debug!("task exit");
            }
            .instrument(span.or_current())
        });
    }

    fn ensure_receive_task_loop(&self) {
        {
            let mut state = self.inner.state.lock().unwrap();
            if !state.receive_loop_started {
                let inner_weak = Arc::downgrade(&self.inner);
                StunAgent::receive_task_loop(inner_weak, &self.inner.channel, self.inner.id);
                state.receive_loop_started = true;
            }
        }
    }

    pub fn receive_stream_filter<F>(&self, filter: F) -> impl Stream<Item = StunOrData>
    where
        F: Fn(&StunOrData) -> bool + Send + Sync + 'static,
    {
        let ret = self.inner.broadcaster.channel_with_filter(filter);
        self.ensure_receive_task_loop();
        ret
    }

    pub fn receive_stream(&self) -> impl Stream<Item = StunOrData> {
        self.receive_stream_filter(|_| true)
    }

    #[tracing::instrument(
        level = "debug",
        err,
        skip(self, msg, addr),
        fields(
            agent_id = %self.inner.id,
            transaction_id = %msg.transaction_id(),
            target_addr = ?addr,
            source_addr = ?self.inner.channel.local_addr()
        ),
    )]
    pub fn stun_request_transaction(
        &self,
        msg: &Message,
        addr: SocketAddr,
    ) -> Result<StunRequestBuilder, StunError> {
        StunRequestBuilder::new(self.clone(), msg.clone(), addr)
    }
}

pub struct StunRequestBuilder {
    agent: StunAgent,
    msg: Message,
    to: SocketAddr,
}

impl StunRequestBuilder {
    fn new(agent: StunAgent, msg: Message, addr: SocketAddr) -> Result<Self, StunError> {
        if !msg.has_class(MessageClass::Request) {
            return Err(StunError::WrongImplementation);
        }
        Ok(Self {
            agent,
            msg,
            to: addr,
        })
    }

    pub fn build(self) -> Result<StunRequest, StunError> {
        let transaction_id = self.msg.transaction_id();
        Ok(StunRequest(Arc::new(StunRequestState {
            agent: self.agent,
            msg: self.msg,
            to: self.to,
            inner: Mutex::new(StunRequestInner {
                transaction_id,
                send_abort: None,
                recv_abort: None,
            }),
        })))
    }
}

#[derive(Debug)]
struct StunRequestInner {
    transaction_id: TransactionId,
    send_abort: Option<AbortHandle>,
    recv_abort: Option<AbortHandle>,
}

impl StunRequestInner {
    #[tracing::instrument(
        name = "stun_request_cancel_retransmissions",
        level = "debug",
        skip(self),
        fields(
            msg.transaction_id = %self.transaction_id
        )
    )]
    fn cancel_retransmissions(&mut self) {
        if let Some(send_abort) = self.send_abort.take() {
            trace!("aborting sending stun request");
            send_abort.abort();
        }
    }
}

#[derive(Debug)]
pub struct StunRequestState {
    agent: StunAgent,
    msg: Message,
    to: SocketAddr,
    inner: Mutex<StunRequestInner>,
}

#[derive(Debug, Clone)]
pub struct StunRequest(Arc<StunRequestState>);

impl Deref for StunRequest {
    type Target = StunRequestState;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl StunRequest {
    pub fn request(&self) -> &Message {
        &self.msg
    }

    pub fn peer_address(&self) -> SocketAddr {
        self.to
    }

    pub fn cancel_retransmissions(&self) {
        let mut inner = self.inner.lock().unwrap();
        inner.cancel_retransmissions();
    }

    #[tracing::instrument(
        name = "stun_request_cancel",
        level = "debug",
        skip(self),
        fields(
            msg.transaction_id = %self.msg.transaction_id()
        )
    )]
    pub fn cancel(&self) {
        let mut inner = self.inner.lock().unwrap();
        inner.cancel_retransmissions();
        if let Some(recv_abort) = inner.recv_abort.take() {
            trace!("aborting recv stun request");
            recv_abort.abort();
        }
    }

    #[tracing::instrument(
        name = "stun_send_request",
        level = "debug",
        err,
        skip(agent, msg),
        fields(
            msg.transaction_id = %msg.transaction_id()
        )
    )]
    async fn send_request(
        agent: &StunAgent,
        msg: Message,
        to: SocketAddr,
    ) -> Result<(), StunError> {
        // FIXME: configurable timeout values: RFC 4389 Secion 7.2.1
        let timeouts: [u64; 7] = [0, 500, 1500, 3500, 7500, 15500, 31500];
        for timeout in timeouts.iter() {
            agent
                .clock
                .delay(Duration::from_millis(*timeout))
                .await
                .wait()
                .await;
            trace!("sending {}", msg);
            agent
                .inner
                .channel
                .send(DataFraming::from(&msg.to_bytes(), to))
                .await?;
        }

        Err(StunError::TimedOut)
    }

    pub async fn perform(&self) -> Result<(Message, SocketAddr), StunError> {
        StunAgent::maybe_store_message(&self.agent.inner.state, self.msg.clone());
        let tid = self.msg.transaction_id();
        let (send_abortable, send_abort_handle) =
            futures::future::abortable(Self::send_request(&self.agent, self.msg.clone(), self.to));

        let to = self.to;
        let mut receive_s =
            self.agent
                .receive_stream_filter(move |stun_or_data| match stun_or_data {
                    StunOrData::Stun(msg, from) => tid == msg.transaction_id() && from == &to,
                    _ => false,
                });
        let (recv_abortable, recv_abort_handle) = {
            let send_abort_handle = send_abort_handle.clone();
            futures::future::abortable(clock::timeout(
                self.agent.clock.clone(),
                Duration::from_secs(40),
                receive_s.next().then(|msg| async move {
                    send_abort_handle.abort();
                    msg.and_then(|msg| msg.stun())
                        .ok_or(StunError::ResourceNotFound)
                }),
            ))
        };

        {
            let mut inner = self.inner.lock().unwrap();
            inner.send_abort = Some(send_abort_handle);
            inner.recv_abort = Some(recv_abort_handle);
        }

        futures::pin_mut!(send_abortable);
        futures::pin_mut!(recv_abortable);

        // race the sending and receiving futures returning the first that succeeds
        let ret = match futures::future::try_select(send_abortable, recv_abortable).await {
            Ok(Either::Left((x, _))) => x.map(|_| (Message::new_error(&self.msg), self.to)),
            Ok(Either::Right((y, _))) => y.map_err(|_| StunError::TimedOut)?,
            Err(Either::Left((_send_aborted, recv_abortable))) => {
                // if both have been aborted, then we return aborted, otherwise, we continue
                // waiting for a response until timeout
                recv_abortable
                    .await
                    .map_err(|_| StunError::Aborted)?
                    .unwrap_or(Err(StunError::TimedOut))
            }
            _ => unreachable!(),
        };
        let _ = StunAgent::take_outstanding_request(
            &self.agent.inner.state,
            &self.msg.transaction_id(),
        );

        ret
    }
}

#[derive(Debug, Clone)]
pub enum StunOrData {
    Stun(Message, SocketAddr),
    Data(Vec<u8>, SocketAddr),
}

impl StunOrData {
    pub fn stun(self) -> Option<(Message, SocketAddr)> {
        match self {
            StunOrData::Stun(msg, addr) => Some((msg, addr)),
            _ => None,
        }
    }
    pub fn data(self) -> Option<(Vec<u8>, SocketAddr)> {
        match self {
            StunOrData::Data(data, addr) => Some((data, addr)),
            _ => None,
        }
    }
    pub fn addr(&self) -> SocketAddr {
        match self {
            StunOrData::Stun(_msg, addr) => *addr,
            StunOrData::Data(_data, addr) => *addr,
        }
    }
}

#[derive(Debug)]
enum HandleStunReply {
    Broadcast(Message),
    Failure(StunError),
    Ignore,
}
impl From<StunError> for HandleStunReply {
    fn from(e: StunError) -> Self {
        HandleStunReply::Failure(e)
    }
}

#[derive(Debug)]
pub enum StunError {
    Failed,
    WrongImplementation,
    AlreadyExists,
    ResourceNotFound,
    TimedOut,
    IntegrityCheckFailed,
    ParseError(StunParseError),
    IoError(std::io::Error),
    Aborted,
}

impl std::error::Error for StunError {}

impl std::fmt::Display for StunError {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl From<std::io::Error> for StunError {
    fn from(e: std::io::Error) -> Self {
        Self::IoError(e)
    }
}

impl From<StunParseError> for StunError {
    fn from(e: StunParseError) -> Self {
        match e {
            StunParseError::WrongImplementation => StunError::WrongImplementation,
            _ => StunError::ParseError(e),
        }
    }
}

impl StunAgentState {
    fn new(id: usize) -> Self {
        Self {
            id,
            outstanding_requests: HashMap::new(),
            local_credentials: None,
            remote_credentials: None,
            receive_loop_started: false,
            validated_peers: HashSet::new(),
        }
    }

    fn validated_peer(&mut self, addr: SocketAddr) {
        if self.validated_peers.get(&addr).is_none() {
            debug!("validated peer {:?}", addr);
            self.validated_peers.insert(addr);
        }
    }

    fn handle_stun(&mut self, msg: Message, orig_data: &[u8], from: SocketAddr) -> HandleStunReply {
        if msg.is_response() {
            if let Some(orig_request) = self.take_outstanding_request(&msg.transaction_id()) {
                // only validate response if the original request had credentials
                if orig_request
                    .attribute::<MessageIntegrity>(MESSAGE_INTEGRITY)
                    .is_some()
                {
                    if let Some(remote_creds) = &self.remote_credentials {
                        match msg.validate_integrity(orig_data, remote_creds) {
                            Ok(_) => {
                                self.validated_peer(from);
                                HandleStunReply::Broadcast(msg)
                            }
                            Err(e) => {
                                debug!("message failed integrity check: {:?}", e);
                                HandleStunReply::Ignore
                            }
                        }
                    } else {
                        debug!("no remote credentials, ignoring");
                        HandleStunReply::Ignore
                    }
                } else {
                    // original message didn't have integrity, reply doesn't need to either
                    self.validated_peer(from);
                    HandleStunReply::Broadcast(msg)
                }
            } else {
                debug!("unmatched stun response, dropping {}", msg);
                // unmatched response -> drop
                HandleStunReply::Ignore
            }
        } else {
            self.validated_peer(from);
            HandleStunReply::Broadcast(msg)
        }
    }
More examples
Hide additional examples
src/stun/message.rs (line 293)
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "Message(class: {:?}, method: {} ({:#x}), transaction: {}, attributes: ",
            self.get_type().class(),
            self.get_type().method(),
            self.get_type().method(),
            self.transaction_id()
        )?;
        if self.attributes.is_empty() {
            write!(f, "[]")?;
        } else {
            write!(f, "[")?;
            for (i, a) in self.attributes.iter().enumerate() {
                if i > 0 {
                    write!(f, ", ")?;
                }
                write!(f, "{}", a)?;
            }
            write!(f, "]")?;
        }
        write!(f, ")")
    }
}

fn padded_attr_size(attr: &RawAttribute) -> usize {
    if attr.length() % 4 == 0 {
        4 + attr.length() as usize
    } else {
        8 + attr.length() as usize - attr.length() as usize % 4
    }
}

impl Message {
    /// Create a new [`Message`] with the provided [`MessageType`] and transaction ID
    ///
    /// Note you probably want to use one of the other helper constructors instead.
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let mtype = MessageType::from_class_method(MessageClass::Indication, BINDING);
    /// let message = Message::new(mtype, 0.into());
    /// assert!(message.has_class(MessageClass::Indication));
    /// assert!(message.has_method(BINDING));
    /// ```
    pub fn new(mtype: MessageType, transaction: TransactionId) -> Self {
        Self {
            msg_type: mtype,
            transaction,
            attributes: vec![],
        }
    }

    /// Create a new request [`Message`] of the provided method
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// assert!(message.has_class(MessageClass::Request));
    /// assert!(message.has_method(BINDING));
    /// ```
    pub fn new_request(method: u16) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Request, method),
            Message::generate_transaction(),
        )
    }

    /// Create a new success [`Message`] response from the provided request
    ///
    /// # Panics
    ///
    /// When a non-request [`Message`] is passed as the original input [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// let success = Message::new_success(&message);
    /// assert!(success.has_class(MessageClass::Success));
    /// assert!(success.has_method(BINDING));
    /// ```
    pub fn new_success(orig: &Message) -> Self {
        if !orig.has_class(MessageClass::Request) {
            panic!(
                "A success response message was attempted to be created from a non-request message"
            );
        }
        Message::new(
            MessageType::from_class_method(MessageClass::Success, orig.method()),
            orig.transaction_id(),
        )
    }

    /// Create a new error [`Message`] response from the provided request
    ///
    /// # Panics
    ///
    /// When a non-request [`Message`] is passed as the original input [`Message`]
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// let message = Message::new_request(BINDING);
    /// let success = Message::new_error(&message);
    /// assert!(success.has_class(MessageClass::Error));
    /// assert!(success.has_method(BINDING));
    /// ```
    pub fn new_error(orig: &Message) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Error, orig.method()),
            orig.transaction_id(),
        )
    }
src/gathering.rs (line 96)
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
async fn gather_stun_xor_address(
    local_preference: u8,
    agent: StunAgent,
    transport: TransportType,
    stun_server: SocketAddr,
) -> Result<GatherCandidateAddress, StunError> {
    let msg = generate_bind_request()?;

    agent
        .stun_request_transaction(&msg, stun_server)?
        .build()?
        .perform()
        .await
        .and_then(move |(response, from)| {
            if let Some(attr) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
                debug!(
                    "got external address {:?}",
                    attr.addr(response.transaction_id())
                );
                return Ok(GatherCandidateAddress {
                    ctype: CandidateType::ServerReflexive,
                    local_preference,
                    transport,
                    address: attr.addr(response.transaction_id()),
                    base: from,
                    related: Some(stun_server),
                });
            }
            Err(StunError::Failed)
        })
}
src/conncheck.rs (line 232)
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum CheckListState {
    Running,
    Completed,
    Failed,
}

static CONN_CHECK_LIST_COUNT: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
pub struct ConnCheckList {
    checklist_id: usize,
    inner: Arc<Mutex<ConnCheckListInner>>,
}

fn candidate_is_same_connection(a: &Candidate, b: &Candidate) -> bool {
    if a.component_id != b.component_id {
        return false;
    }
    if a.transport_type != b.transport_type {
        return false;
    }
    if a.base_address != b.base_address {
        return false;
    }
    if a.address != b.address {
        return false;
    }
    // TODO: active vs passive vs simultaneous open
    if a.tcp_type != b.tcp_type {
        return false;
    }
    // XXX: extensions?
    true
}

fn candidate_pair_is_same_connection(a: &CandidatePair, b: &CandidatePair) -> bool {
    if !candidate_is_same_connection(&a.local, &b.local) {
        return false;
    }
    if !candidate_is_same_connection(&a.remote, &b.remote) {
        return false;
    }
    true
}

#[derive(Debug)]
struct ConnCheckLocalCandidate {
    candidate: Candidate,
    stun_agent: StunAgent,
    #[allow(dead_code)]
    stun_recv_abort: AbortHandle,
    #[allow(dead_code)]
    data_recv_abort: AbortHandle,
}

#[derive(Debug)]
struct ConnCheckListInner {
    checklist_id: usize,
    set_inner: Weak<Mutex<CheckListSetInner>>,
    state: CheckListState,
    component_ids: Vec<usize>,
    components: Vec<Weak<Component>>,
    local_credentials: Credentials,
    remote_credentials: Credentials,
    local_candidates: Vec<ConnCheckLocalCandidate>,
    remote_candidates: Vec<Candidate>,
    // TODO: move to BinaryHeap or similar
    triggered: VecDeque<Arc<ConnCheck>>,
    pairs: VecDeque<Arc<ConnCheck>>,
    valid: Vec<CandidatePair>,
    nominated: Vec<CandidatePair>,
    controlling: bool,
}

impl ConnCheckListInner {
    fn new(
        checklist_id: usize,
        set_inner: Weak<Mutex<CheckListSetInner>>,
        controlling: bool,
    ) -> Self {
        Self {
            checklist_id,
            set_inner,
            state: CheckListState::Running,
            component_ids: vec![],
            components: vec![],
            local_credentials: Self::generate_random_credentials(),
            remote_credentials: Self::generate_random_credentials(),
            local_candidates: vec![],
            remote_candidates: vec![],
            triggered: VecDeque::new(),
            pairs: VecDeque::new(),
            valid: vec![],
            nominated: vec![],
            controlling,
        }
    }

    fn generate_random_ice_string(alphabet: &[u8], length: usize) -> String {
        use rand::{seq::SliceRandom, thread_rng};
        let mut rng = thread_rng();
        String::from_utf8(
            (0..length)
                .map(|_| *alphabet.choose(&mut rng).unwrap())
                .collect(),
        )
        .unwrap()
    }

    fn generate_random_credentials() -> Credentials {
        let alphabet =
            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/".as_bytes();
        let user = Self::generate_random_ice_string(alphabet, 4);
        let pass = Self::generate_random_ice_string(alphabet, 22);
        Credentials::new(user, pass)
    }

    #[tracing::instrument(
        name = "set_checklist_state",
        level = "debug",
        skip(self),
        fields(
            self.checklist_id,
        )
    )]
    fn set_state(&mut self, state: CheckListState) {
        if self.state != state {
            trace!(old_state = ?self.state, new_state = ?state, "changing state");
            self.state = state;
        }
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            self.checklist_id
        )
    )]
    fn find_remote_candidate(
        &self,
        component_id: usize,
        ttype: TransportType,
        addr: SocketAddr,
    ) -> Option<Candidate> {
        self.remote_candidates
            .iter()
            .find(|&remote| {
                remote.component_id == component_id
                    && remote.transport_type == ttype
                    && remote.address == addr
            })
            .cloned()
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, check),
        fields(
            self.checklist_id,
            check.conncheck_id
        )
    )]
    fn add_triggered(&mut self, check: Arc<ConnCheck>) {
        if let Some(idx) = self
            .triggered
            .iter()
            .position(|existing| candidate_pair_is_same_connection(&existing.pair, &check.pair))
        {
            // a nominating check trumps not nominating.  Otherwise, if the peers are delay sync,
            // then the non-nominating trigerred check may override the nomination process for a
            // long time and delay the connection process
            if check.nominate() && !self.triggered[idx].nominate() {
                let existing = self.triggered.remove(idx).unwrap();
                debug!("removing existing triggered {:?}", existing);
            } else {
                debug!("not adding duplicate triggered check");
                return;
            }
        }
        debug!("adding triggered check {:?}", check);
        self.triggered.push_front(check)
    }

    #[tracing::instrument(
        level = "debug",
        skip(self)
        fields(
            self.checklist_id,
            remote.ctype = ?remote.candidate_type,
            remote.foundation = ?remote.foundation,
            remote.address = ?remote.address
        )
    )]
    fn add_remote_candidate(&mut self, remote: Candidate) {
        self.remote_candidates.push(remote);
    }

    fn check_is_equal(check: &Arc<ConnCheck>, pair: &CandidatePair, nominate: Nominate) -> bool {
        candidate_is_same_connection(&check.pair.local, &pair.local)
            && candidate_is_same_connection(&check.pair.remote, &pair.remote)
            && nominate.eq(&check.nominate)
    }

    #[tracing::instrument(level = "trace", ret, skip(self, pair))]
    fn matching_check(&self, pair: &CandidatePair, nominate: Nominate) -> Option<Arc<ConnCheck>> {
        self.triggered
            .iter()
            .find(|&check| Self::check_is_equal(check, pair, nominate))
            .or_else(|| {
                self.pairs
                    .iter()
                    .find(|&check| Self::check_is_equal(check, pair, nominate))
            })
            .cloned()
    }

    fn take_matching_check(&mut self, pair: &CandidatePair) -> Option<Arc<ConnCheck>> {
        let pos = self
            .pairs
            .iter()
            .position(|check| Self::check_is_equal(check, pair, Nominate::DontCare));
        if let Some(position) = pos {
            self.pairs.remove(position)
        } else {
            None
        }
    }

    fn add_check(&mut self, check: Arc<ConnCheck>) {
        let idx = self
            .pairs
            .binary_search_by(|existing| {
                existing
                    .pair
                    .priority(self.controlling)
                    .cmp(&check.pair.priority(self.controlling))
                    .reverse()
            })
            .unwrap_or_else(|x| x);
        self.pairs.insert(idx, check);
    }

    fn set_controlling(&mut self, controlling: bool) {
        self.controlling = controlling;
        // changing the controlling (and therefore priority) requires resorting
        self.pairs.make_contiguous().sort_by(|a, b| {
            a.pair
                .priority(self.controlling)
                .cmp(&b.pair.priority(self.controlling))
                .reverse()
        })
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, pair),
        fields(component.id = pair.local.component_id)
    )]
    fn nominated_pair(&mut self, pair: &CandidatePair) -> Option<Arc<Component>> {
        if let Some(idx) = self
            .valid
            .iter()
            .position(|valid_pair| candidate_pair_is_same_connection(valid_pair, pair))
        {
            info!(
                ttype = ?pair.local.transport_type,
                local.address = ?pair.local.address,
                remote.address = ?pair.remote.address,
                local.ctype = ?pair.local.candidate_type,
                remote.ctype = ?pair.remote.candidate_type,
                foundation = %pair.foundation(),
                "nominated"
            );
            self.nominated.push(self.valid.remove(idx));
            let component = self
                .components
                .iter()
                .filter_map(|component| component.upgrade())
                .find(|component| component.id == pair.local.component_id);
            if self.state == CheckListState::Running {
                // o Once a candidate pair for a component of a data stream has been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent MUST remove all candidate pairs
                //   for the same component from the checklist and from the triggered-
                //   check queue.  If the state of a pair is In-Progress, the agent
                //   cancels the In-Progress transaction.  Cancellation means that the
                //   agent will not retransmit the Binding requests associated with the
                //   connectivity-check transaction, will not treat the lack of
                //   response to be a failure, but will wait the duration of the
                //   transaction timeout for a response.
                self.dump_check_state();
                self.triggered.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                self.pairs.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                // XXX: do we also need to clear self.valid?
                // o Once candidate pairs for each component of a data stream have been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent sets the state of the checklist
                //   to Completed.
                let all_nominated = self.component_ids.iter().all(|&component_id| {
                    self.nominated
                        .iter()
                        .any(|valid_pair| valid_pair.local.component_id == component_id)
                });
                if all_nominated {
                    // ... Once an ICE agent sets the
                    // state of the checklist to Completed (when there is a nominated pair
                    // for each component of the data stream), that pair becomes the
                    // selected pair for that agent and is used for sending and receiving
                    // data for that component of the data stream.
                    info!(
                        "all {} component/s nominated, setting selected pair/s",
                        self.component_ids.len()
                    );
                    self.nominated
                        .iter()
                        .fold(vec![], |mut component_ids_selected, valid_pair| {
                            // Only nominate one valid candidatePair
                            if !component_ids_selected
                                .iter()
                                .any(|&comp_id| comp_id == valid_pair.local.component_id)
                            {
                                if let Some(component) = &component {
                                    let local_agent = self
                                        .local_candidates
                                        .iter()
                                        .find(|cand| {
                                            cand.candidate.base_address == pair.local.base_address
                                        })
                                        .map(|cand| cand.stun_agent.clone());
                                    if let Some(local_agent) = local_agent {
                                        component.set_selected_pair(SelectedPair::new(
                                            pair.clone(),
                                            local_agent,
                                        ));
                                    } else {
                                        panic!("Cannot find existing local stun agent!");
                                    }
                                }
                                component_ids_selected.push(valid_pair.local.component_id);
                            }
                            component_ids_selected
                        });
                    self.set_state(CheckListState::Completed);
                }
            }
            debug!(
                "trying to signal component {:?}",
                component.clone().map(|c| c.id)
            );
            return component;
        } else {
            warn!("unknown nomination");
        }
        None
    }

    fn dump_check_state(&self) {
        let mut s = format!("checklist {}", self.checklist_id);
        for pair in self.pairs.iter() {
            use std::fmt::Write as _;
            let _ = write!(&mut s,
                "\nID:{id} foundation:{foundation} state:{state} nom:{nominate} priority:{local_pri},{remote_pri} trans:{transport} local:{local_cand_type} {local_addr} remote:{remote_cand_type} {remote_addr}",
                id = format_args!("{:<3}", pair.conncheck_id),
                foundation = format_args!("{:10}", pair.pair.foundation()),
                state = format_args!("{:10}", pair.state()),
                nominate = format_args!("{:5}", pair.nominate()),
                local_pri = format_args!("{:10}", pair.pair.local.priority),
                remote_pri = format_args!("{:10}", pair.pair.remote.priority),
                transport = format_args!("{:4}", pair.pair.local.transport_type),
                local_cand_type = format_args!("{:5}", pair.pair.local.candidate_type),
                local_addr = format_args!("{:32}", pair.pair.local.address),
                remote_cand_type = format_args!("{:5}", pair.pair.remote.candidate_type),
                remote_addr = format_args!("{:32}", pair.pair.remote.address)
            );
        }
        debug!("{}", s);
    }

    #[tracing::instrument(
        level = "debug",
        err
        skip(self, local, agent, from, priority)
        fields(
            checklist_id = self.checklist_id,
            state = ?self.state,
        )
    )]
    fn handle_binding_request(
        &mut self,
        peer_nominating: bool,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        from: SocketAddr,
        priority: u32,
    ) -> Result<Option<Arc<Component>>, AgentError> {
        let remote = self
            .find_remote_candidate(component_id, local.transport_type, from)
            .unwrap_or_else(|| {
                // If the source transport address of the request does not match any
                // existing remote candidates, it represents a new peer-reflexive remote
                // candidate.  This candidate is constructed as follows:
                //
                //   o  The priority is the value of the PRIORITY attribute in the Binding
                //      request.
                //   o  The type is peer reflexive.
                //   o  The component ID is the component ID of the local candidate to
                //      which the request was sent.
                //   o  The foundation is an arbitrary value, different from the
                //      foundations of all other remote candidates.  If any subsequent
                //      candidate exchanges contain this peer-reflexive candidate, it will
                //      signal the actual foundation for the candidate.
                let cand = Candidate::builder(
                    component_id,
                    CandidateType::PeerReflexive,
                    local.transport_type,
                    /* FIXME */ "rflx",
                    from,
                )
                .priority(priority)
                .build();
                debug!("new reflexive remote {:?}", cand);
                self.add_remote_candidate(cand.clone());
                cand
            });
        // RFC 8445 Section 7.3.1.4. Triggered Checks
        let pair = CandidatePair::new(local.clone(), remote);
        if let Some(mut check) = self.take_matching_check(&pair) {
            // When the pair is already on the checklist:
            trace!("found existing {:?} check {:?}", check.state(), check);
            match check.state() {
                // If the state of that pair is Succeeded, nothing further is
                // done.
                CandidatePairState::Succeeded => {
                    if peer_nominating {
                        debug!("existing pair succeeded -> nominate");
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            true,
                        ));
                        if let Some(component) = self.nominated_pair(&pair) {
                            self.add_check(check);
                            return Ok(Some(component));
                        }
                    }
                }
                // If the state of that pair is In-Progress, the agent cancels the
                // In-Progress transaction.  Cancellation means that the agent
                // will not retransmit the Binding requests associated with the
                // connectivity-check transaction, will not treat the lack of
                // response to be a failure, but will wait the duration of the
                // transaction timeout for a response.  In addition, the agent
                // MUST enqueue the pair in the triggered checklist associated
                // with the checklist, and set the state of the pair to Waiting,
                // in order to trigger a new connectivity check of the pair.
                // Creating a new connectivity check enables validating
                // In-Progress pairs as soon as possible, without having to wait
                // for retransmissions of the Binding requests associated with the
                // original connectivity-check transaction.
                CandidatePairState::InProgress => {
                    check.cancel_retransmissions();
                    // TODO: ignore response timeouts

                    self.add_check(check.clone());
                    check = Arc::new(ConnCheck::new(
                        check.pair.clone(),
                        check.agent.clone(),
                        peer_nominating,
                    ));
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
                // If the state of that pair is Waiting, Frozen, or Failed, the
                // agent MUST enqueue the pair in the triggered checklist
                // associated with the checklist (if not already present), and set
                // the state of the pair to Waiting, in order to trigger a new
                // connectivity check of the pair.  Note that a state change of
                // the pair from Failed to Waiting might also trigger a state
                // change of the associated checklist.
                CandidatePairState::Waiting
                | CandidatePairState::Frozen
                | CandidatePairState::Failed => {
                    if peer_nominating && !check.nominate() {
                        check.cancel();
                        self.add_check(check.clone());
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            peer_nominating,
                        ));
                    }
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
            }
            self.add_check(check);
        } else {
            debug!("creating new check for pair {:?}", pair);
            let check = Arc::new(ConnCheck::new(pair, agent.clone(), peer_nominating));
            check.set_state(CandidatePairState::Waiting);
            self.add_check(check.clone());
            self.add_triggered(check);
        }

        Ok(None)
    }
}

fn binding_success_response(
    msg: &Message,
    from: SocketAddr,
    local_credentials: MessageIntegrityCredentials,
) -> Result<Message, AgentError> {
    let mut response = Message::new_success(msg);
    response.add_attribute(XorMappedAddress::new(from, msg.transaction_id()))?;
    response.add_message_integrity(&local_credentials)?;
    response.add_fingerprint()?;
    Ok(response)
}
Examples found in repository?
src/stun/message.rs (line 354)
351
352
353
354
355
356
    pub fn new_request(method: u16) -> Self {
        Message::new(
            MessageType::from_class_method(MessageClass::Request, method),
            Message::generate_transaction(),
        )
    }

Serialize a Message to network bytes

Examples
let mut message = Message::new(MessageType::from_class_method(MessageClass::Request, BINDING), 1000.into());
let attr = RawAttribute::new(1.into(), &[3]);
assert!(message.add_attribute(attr).is_ok());
assert_eq!(message.to_bytes(), vec![0, 1, 0, 8, 33, 18, 164, 66, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 232, 0, 1, 0, 1, 3, 0, 0, 0]);
Examples found in repository?
src/stun/message.rs (line 1048)
1047
1048
1049
    fn from(f: Message) -> Self {
        f.to_bytes()
    }

Deserialize a Message

Examples
let msg_data = vec![0, 1, 0, 8, 33, 18, 164, 66, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 232, 0, 1, 0, 1, 3, 0, 0, 0];
let mut message = Message::from_bytes(&msg_data).unwrap();
let attr = RawAttribute::new(1.into(), &[3]);
let msg_attr = message.attribute::<RawAttribute>(1.into()).unwrap();
assert_eq!(msg_attr, attr);
assert_eq!(message.get_type(), MessageType::from_class_method(MessageClass::Request, BINDING));
assert_eq!(message.transaction_id(), 1000.into());
Examples found in repository?
src/stun/message.rs (line 1055)
1054
1055
1056
    fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
        Message::from_bytes(value)
    }
More examples
Hide additional examples
src/stun/agent.rs (line 216)
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
    fn receive_task_loop(inner_weak: Weak<StunAgentInner>, channel: &StunChannel, inner_id: usize) {
        // XXX: can we remove this demuxing task?
        // retrieve stream outside task to avoid a race
        let recv_stream = channel.receive_stream();
        let local_addr = channel.local_addr();
        debug!(
            "starting stun_recv_loop stun.id={} local_addr={:?}",
            inner_id, local_addr
        );
        async_std::task::spawn({
            let span = debug_span!("stun_recv_loop", stun.id = inner_id, ?local_addr);
            async move {
                futures::pin_mut!(recv_stream);

                debug!("started");
                while let Some(data_address) = recv_stream.next().await {
                    trace!(
                        "got {} bytes from {:?}",
                        data_address.data.len(),
                        data_address.address
                    );
                    let inner = match Weak::upgrade(&inner_weak) {
                        Some(inner) => inner,
                        None => {
                            warn!("stun agent has disappeared, exiting receive loop");
                            break;
                        }
                    };
                    match Message::from_bytes(&data_address.data) {
                        Ok(stun_msg) => {
                            debug!("received from {:?} {}", data_address.address, stun_msg);
                            let handle = {
                                let mut state = inner.state.lock().unwrap();
                                state.handle_stun(stun_msg, &data_address.data, data_address.address)
                            };
                            match handle {
                                HandleStunReply::Broadcast(stun_msg) => {
                                    inner
                                        .broadcaster
                                        .broadcast(StunOrData::Stun(stun_msg, data_address.address))
                                        .await;
                                }
                                HandleStunReply::Failure(err) => {
                                    warn!("Failed to handle message. {:?}", err);
                                    break;
                                }
                                _ => {}
                            }
                        }
                        Err(_) => {
                            let peer_validated = {
                                let state = inner.state.lock().unwrap();
                                state.validated_peers.get(&data_address.address).is_some()
                            };
                            if peer_validated {
                                inner
                                    .broadcaster
                                    .broadcast(StunOrData::Data(
                                        data_address.data,
                                        data_address.address,
                                    ))
                                    .await
                            } else if matches!(inner.channel, StunChannel::Tcp(_)) {
                                // close the tcp channel
                                warn!("stun message not the first message sent over TCP channel, closing");
                                if let Err(e) = inner.channel.close().await {
                                    warn!("error closing channel {:?}", e);
                                }
                                break;
                            } else {
                                trace!("dropping unvalidated data from peer");
                            }
                        }
                    }
                }
                debug!("task exit");
            }
            .instrument(span.or_current())
        });
    }

Validates the MESSAGE_INTEGRITY attribute with the provided credentials

The Original data that was used to construct this Message must be provided in order to successfully validate the Message

Examples found in repository?
src/stun/agent.rs (line 615)
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
    fn handle_stun(&mut self, msg: Message, orig_data: &[u8], from: SocketAddr) -> HandleStunReply {
        if msg.is_response() {
            if let Some(orig_request) = self.take_outstanding_request(&msg.transaction_id()) {
                // only validate response if the original request had credentials
                if orig_request
                    .attribute::<MessageIntegrity>(MESSAGE_INTEGRITY)
                    .is_some()
                {
                    if let Some(remote_creds) = &self.remote_credentials {
                        match msg.validate_integrity(orig_data, remote_creds) {
                            Ok(_) => {
                                self.validated_peer(from);
                                HandleStunReply::Broadcast(msg)
                            }
                            Err(e) => {
                                debug!("message failed integrity check: {:?}", e);
                                HandleStunReply::Ignore
                            }
                        }
                    } else {
                        debug!("no remote credentials, ignoring");
                        HandleStunReply::Ignore
                    }
                } else {
                    // original message didn't have integrity, reply doesn't need to either
                    self.validated_peer(from);
                    HandleStunReply::Broadcast(msg)
                }
            } else {
                debug!("unmatched stun response, dropping {}", msg);
                // unmatched response -> drop
                HandleStunReply::Ignore
            }
        } else {
            self.validated_peer(from);
            HandleStunReply::Broadcast(msg)
        }
    }

Adds MESSAGE_INTEGRITY attribute to a Message using the provided credentials

Errors
  • If a MESSAGE_INTEGRITY attribute is already present
  • If a FINGERPRINT attribute is already present
Examples
    MessageIntegrityCredentials, ShortTermCredentials};
let mut message = Message::new_request(BINDING);
let credentials = MessageIntegrityCredentials::ShortTerm(ShortTermCredentials { password:
    "pass".to_owned() });
assert!(message.add_message_integrity(&credentials).is_ok());
let data = message.to_bytes();
assert!(message.validate_integrity(&data, &credentials).is_ok());

// duplicate MESSAGE_INTEGRITY is an error
assert!(message.add_message_integrity(&credentials).is_err());
Examples found in repository?
src/conncheck.rs (line 174)
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
    fn generate_stun_request(
        conncheck: Arc<ConnCheck>,
        username: String,
        controlling: bool,
        tie_breaker: u64,
    ) -> Result<StunRequest, StunError> {
        let mut msg = Message::new_request(BINDING);

        // XXX: this needs to be the priority as if the candidate was peer-reflexive
        msg.add_attribute(Priority::new(conncheck.pair.local.priority))?;
        if controlling {
            msg.add_attribute(IceControlling::new(tie_breaker))?;
        } else {
            msg.add_attribute(IceControlled::new(tie_breaker))?;
        }
        if conncheck.nominate {
            msg.add_attribute(UseCandidate::new())?;
        }
        msg.add_attribute(Username::new(&username)?)?;
        msg.add_message_integrity(&conncheck.agent.local_credentials().unwrap())?;
        msg.add_fingerprint()?;

        let to = conncheck.pair.remote.address;
        conncheck.agent.stun_request_transaction(&msg, to)?.build()
    }

    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum CheckListState {
    Running,
    Completed,
    Failed,
}

static CONN_CHECK_LIST_COUNT: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
pub struct ConnCheckList {
    checklist_id: usize,
    inner: Arc<Mutex<ConnCheckListInner>>,
}

fn candidate_is_same_connection(a: &Candidate, b: &Candidate) -> bool {
    if a.component_id != b.component_id {
        return false;
    }
    if a.transport_type != b.transport_type {
        return false;
    }
    if a.base_address != b.base_address {
        return false;
    }
    if a.address != b.address {
        return false;
    }
    // TODO: active vs passive vs simultaneous open
    if a.tcp_type != b.tcp_type {
        return false;
    }
    // XXX: extensions?
    true
}

fn candidate_pair_is_same_connection(a: &CandidatePair, b: &CandidatePair) -> bool {
    if !candidate_is_same_connection(&a.local, &b.local) {
        return false;
    }
    if !candidate_is_same_connection(&a.remote, &b.remote) {
        return false;
    }
    true
}

#[derive(Debug)]
struct ConnCheckLocalCandidate {
    candidate: Candidate,
    stun_agent: StunAgent,
    #[allow(dead_code)]
    stun_recv_abort: AbortHandle,
    #[allow(dead_code)]
    data_recv_abort: AbortHandle,
}

#[derive(Debug)]
struct ConnCheckListInner {
    checklist_id: usize,
    set_inner: Weak<Mutex<CheckListSetInner>>,
    state: CheckListState,
    component_ids: Vec<usize>,
    components: Vec<Weak<Component>>,
    local_credentials: Credentials,
    remote_credentials: Credentials,
    local_candidates: Vec<ConnCheckLocalCandidate>,
    remote_candidates: Vec<Candidate>,
    // TODO: move to BinaryHeap or similar
    triggered: VecDeque<Arc<ConnCheck>>,
    pairs: VecDeque<Arc<ConnCheck>>,
    valid: Vec<CandidatePair>,
    nominated: Vec<CandidatePair>,
    controlling: bool,
}

impl ConnCheckListInner {
    fn new(
        checklist_id: usize,
        set_inner: Weak<Mutex<CheckListSetInner>>,
        controlling: bool,
    ) -> Self {
        Self {
            checklist_id,
            set_inner,
            state: CheckListState::Running,
            component_ids: vec![],
            components: vec![],
            local_credentials: Self::generate_random_credentials(),
            remote_credentials: Self::generate_random_credentials(),
            local_candidates: vec![],
            remote_candidates: vec![],
            triggered: VecDeque::new(),
            pairs: VecDeque::new(),
            valid: vec![],
            nominated: vec![],
            controlling,
        }
    }

    fn generate_random_ice_string(alphabet: &[u8], length: usize) -> String {
        use rand::{seq::SliceRandom, thread_rng};
        let mut rng = thread_rng();
        String::from_utf8(
            (0..length)
                .map(|_| *alphabet.choose(&mut rng).unwrap())
                .collect(),
        )
        .unwrap()
    }

    fn generate_random_credentials() -> Credentials {
        let alphabet =
            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/".as_bytes();
        let user = Self::generate_random_ice_string(alphabet, 4);
        let pass = Self::generate_random_ice_string(alphabet, 22);
        Credentials::new(user, pass)
    }

    #[tracing::instrument(
        name = "set_checklist_state",
        level = "debug",
        skip(self),
        fields(
            self.checklist_id,
        )
    )]
    fn set_state(&mut self, state: CheckListState) {
        if self.state != state {
            trace!(old_state = ?self.state, new_state = ?state, "changing state");
            self.state = state;
        }
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            self.checklist_id
        )
    )]
    fn find_remote_candidate(
        &self,
        component_id: usize,
        ttype: TransportType,
        addr: SocketAddr,
    ) -> Option<Candidate> {
        self.remote_candidates
            .iter()
            .find(|&remote| {
                remote.component_id == component_id
                    && remote.transport_type == ttype
                    && remote.address == addr
            })
            .cloned()
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, check),
        fields(
            self.checklist_id,
            check.conncheck_id
        )
    )]
    fn add_triggered(&mut self, check: Arc<ConnCheck>) {
        if let Some(idx) = self
            .triggered
            .iter()
            .position(|existing| candidate_pair_is_same_connection(&existing.pair, &check.pair))
        {
            // a nominating check trumps not nominating.  Otherwise, if the peers are delay sync,
            // then the non-nominating trigerred check may override the nomination process for a
            // long time and delay the connection process
            if check.nominate() && !self.triggered[idx].nominate() {
                let existing = self.triggered.remove(idx).unwrap();
                debug!("removing existing triggered {:?}", existing);
            } else {
                debug!("not adding duplicate triggered check");
                return;
            }
        }
        debug!("adding triggered check {:?}", check);
        self.triggered.push_front(check)
    }

    #[tracing::instrument(
        level = "debug",
        skip(self)
        fields(
            self.checklist_id,
            remote.ctype = ?remote.candidate_type,
            remote.foundation = ?remote.foundation,
            remote.address = ?remote.address
        )
    )]
    fn add_remote_candidate(&mut self, remote: Candidate) {
        self.remote_candidates.push(remote);
    }

    fn check_is_equal(check: &Arc<ConnCheck>, pair: &CandidatePair, nominate: Nominate) -> bool {
        candidate_is_same_connection(&check.pair.local, &pair.local)
            && candidate_is_same_connection(&check.pair.remote, &pair.remote)
            && nominate.eq(&check.nominate)
    }

    #[tracing::instrument(level = "trace", ret, skip(self, pair))]
    fn matching_check(&self, pair: &CandidatePair, nominate: Nominate) -> Option<Arc<ConnCheck>> {
        self.triggered
            .iter()
            .find(|&check| Self::check_is_equal(check, pair, nominate))
            .or_else(|| {
                self.pairs
                    .iter()
                    .find(|&check| Self::check_is_equal(check, pair, nominate))
            })
            .cloned()
    }

    fn take_matching_check(&mut self, pair: &CandidatePair) -> Option<Arc<ConnCheck>> {
        let pos = self
            .pairs
            .iter()
            .position(|check| Self::check_is_equal(check, pair, Nominate::DontCare));
        if let Some(position) = pos {
            self.pairs.remove(position)
        } else {
            None
        }
    }

    fn add_check(&mut self, check: Arc<ConnCheck>) {
        let idx = self
            .pairs
            .binary_search_by(|existing| {
                existing
                    .pair
                    .priority(self.controlling)
                    .cmp(&check.pair.priority(self.controlling))
                    .reverse()
            })
            .unwrap_or_else(|x| x);
        self.pairs.insert(idx, check);
    }

    fn set_controlling(&mut self, controlling: bool) {
        self.controlling = controlling;
        // changing the controlling (and therefore priority) requires resorting
        self.pairs.make_contiguous().sort_by(|a, b| {
            a.pair
                .priority(self.controlling)
                .cmp(&b.pair.priority(self.controlling))
                .reverse()
        })
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, pair),
        fields(component.id = pair.local.component_id)
    )]
    fn nominated_pair(&mut self, pair: &CandidatePair) -> Option<Arc<Component>> {
        if let Some(idx) = self
            .valid
            .iter()
            .position(|valid_pair| candidate_pair_is_same_connection(valid_pair, pair))
        {
            info!(
                ttype = ?pair.local.transport_type,
                local.address = ?pair.local.address,
                remote.address = ?pair.remote.address,
                local.ctype = ?pair.local.candidate_type,
                remote.ctype = ?pair.remote.candidate_type,
                foundation = %pair.foundation(),
                "nominated"
            );
            self.nominated.push(self.valid.remove(idx));
            let component = self
                .components
                .iter()
                .filter_map(|component| component.upgrade())
                .find(|component| component.id == pair.local.component_id);
            if self.state == CheckListState::Running {
                // o Once a candidate pair for a component of a data stream has been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent MUST remove all candidate pairs
                //   for the same component from the checklist and from the triggered-
                //   check queue.  If the state of a pair is In-Progress, the agent
                //   cancels the In-Progress transaction.  Cancellation means that the
                //   agent will not retransmit the Binding requests associated with the
                //   connectivity-check transaction, will not treat the lack of
                //   response to be a failure, but will wait the duration of the
                //   transaction timeout for a response.
                self.dump_check_state();
                self.triggered.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                self.pairs.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                // XXX: do we also need to clear self.valid?
                // o Once candidate pairs for each component of a data stream have been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent sets the state of the checklist
                //   to Completed.
                let all_nominated = self.component_ids.iter().all(|&component_id| {
                    self.nominated
                        .iter()
                        .any(|valid_pair| valid_pair.local.component_id == component_id)
                });
                if all_nominated {
                    // ... Once an ICE agent sets the
                    // state of the checklist to Completed (when there is a nominated pair
                    // for each component of the data stream), that pair becomes the
                    // selected pair for that agent and is used for sending and receiving
                    // data for that component of the data stream.
                    info!(
                        "all {} component/s nominated, setting selected pair/s",
                        self.component_ids.len()
                    );
                    self.nominated
                        .iter()
                        .fold(vec![], |mut component_ids_selected, valid_pair| {
                            // Only nominate one valid candidatePair
                            if !component_ids_selected
                                .iter()
                                .any(|&comp_id| comp_id == valid_pair.local.component_id)
                            {
                                if let Some(component) = &component {
                                    let local_agent = self
                                        .local_candidates
                                        .iter()
                                        .find(|cand| {
                                            cand.candidate.base_address == pair.local.base_address
                                        })
                                        .map(|cand| cand.stun_agent.clone());
                                    if let Some(local_agent) = local_agent {
                                        component.set_selected_pair(SelectedPair::new(
                                            pair.clone(),
                                            local_agent,
                                        ));
                                    } else {
                                        panic!("Cannot find existing local stun agent!");
                                    }
                                }
                                component_ids_selected.push(valid_pair.local.component_id);
                            }
                            component_ids_selected
                        });
                    self.set_state(CheckListState::Completed);
                }
            }
            debug!(
                "trying to signal component {:?}",
                component.clone().map(|c| c.id)
            );
            return component;
        } else {
            warn!("unknown nomination");
        }
        None
    }

    fn dump_check_state(&self) {
        let mut s = format!("checklist {}", self.checklist_id);
        for pair in self.pairs.iter() {
            use std::fmt::Write as _;
            let _ = write!(&mut s,
                "\nID:{id} foundation:{foundation} state:{state} nom:{nominate} priority:{local_pri},{remote_pri} trans:{transport} local:{local_cand_type} {local_addr} remote:{remote_cand_type} {remote_addr}",
                id = format_args!("{:<3}", pair.conncheck_id),
                foundation = format_args!("{:10}", pair.pair.foundation()),
                state = format_args!("{:10}", pair.state()),
                nominate = format_args!("{:5}", pair.nominate()),
                local_pri = format_args!("{:10}", pair.pair.local.priority),
                remote_pri = format_args!("{:10}", pair.pair.remote.priority),
                transport = format_args!("{:4}", pair.pair.local.transport_type),
                local_cand_type = format_args!("{:5}", pair.pair.local.candidate_type),
                local_addr = format_args!("{:32}", pair.pair.local.address),
                remote_cand_type = format_args!("{:5}", pair.pair.remote.candidate_type),
                remote_addr = format_args!("{:32}", pair.pair.remote.address)
            );
        }
        debug!("{}", s);
    }

    #[tracing::instrument(
        level = "debug",
        err
        skip(self, local, agent, from, priority)
        fields(
            checklist_id = self.checklist_id,
            state = ?self.state,
        )
    )]
    fn handle_binding_request(
        &mut self,
        peer_nominating: bool,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        from: SocketAddr,
        priority: u32,
    ) -> Result<Option<Arc<Component>>, AgentError> {
        let remote = self
            .find_remote_candidate(component_id, local.transport_type, from)
            .unwrap_or_else(|| {
                // If the source transport address of the request does not match any
                // existing remote candidates, it represents a new peer-reflexive remote
                // candidate.  This candidate is constructed as follows:
                //
                //   o  The priority is the value of the PRIORITY attribute in the Binding
                //      request.
                //   o  The type is peer reflexive.
                //   o  The component ID is the component ID of the local candidate to
                //      which the request was sent.
                //   o  The foundation is an arbitrary value, different from the
                //      foundations of all other remote candidates.  If any subsequent
                //      candidate exchanges contain this peer-reflexive candidate, it will
                //      signal the actual foundation for the candidate.
                let cand = Candidate::builder(
                    component_id,
                    CandidateType::PeerReflexive,
                    local.transport_type,
                    /* FIXME */ "rflx",
                    from,
                )
                .priority(priority)
                .build();
                debug!("new reflexive remote {:?}", cand);
                self.add_remote_candidate(cand.clone());
                cand
            });
        // RFC 8445 Section 7.3.1.4. Triggered Checks
        let pair = CandidatePair::new(local.clone(), remote);
        if let Some(mut check) = self.take_matching_check(&pair) {
            // When the pair is already on the checklist:
            trace!("found existing {:?} check {:?}", check.state(), check);
            match check.state() {
                // If the state of that pair is Succeeded, nothing further is
                // done.
                CandidatePairState::Succeeded => {
                    if peer_nominating {
                        debug!("existing pair succeeded -> nominate");
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            true,
                        ));
                        if let Some(component) = self.nominated_pair(&pair) {
                            self.add_check(check);
                            return Ok(Some(component));
                        }
                    }
                }
                // If the state of that pair is In-Progress, the agent cancels the
                // In-Progress transaction.  Cancellation means that the agent
                // will not retransmit the Binding requests associated with the
                // connectivity-check transaction, will not treat the lack of
                // response to be a failure, but will wait the duration of the
                // transaction timeout for a response.  In addition, the agent
                // MUST enqueue the pair in the triggered checklist associated
                // with the checklist, and set the state of the pair to Waiting,
                // in order to trigger a new connectivity check of the pair.
                // Creating a new connectivity check enables validating
                // In-Progress pairs as soon as possible, without having to wait
                // for retransmissions of the Binding requests associated with the
                // original connectivity-check transaction.
                CandidatePairState::InProgress => {
                    check.cancel_retransmissions();
                    // TODO: ignore response timeouts

                    self.add_check(check.clone());
                    check = Arc::new(ConnCheck::new(
                        check.pair.clone(),
                        check.agent.clone(),
                        peer_nominating,
                    ));
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
                // If the state of that pair is Waiting, Frozen, or Failed, the
                // agent MUST enqueue the pair in the triggered checklist
                // associated with the checklist (if not already present), and set
                // the state of the pair to Waiting, in order to trigger a new
                // connectivity check of the pair.  Note that a state change of
                // the pair from Failed to Waiting might also trigger a state
                // change of the associated checklist.
                CandidatePairState::Waiting
                | CandidatePairState::Frozen
                | CandidatePairState::Failed => {
                    if peer_nominating && !check.nominate() {
                        check.cancel();
                        self.add_check(check.clone());
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            peer_nominating,
                        ));
                    }
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
            }
            self.add_check(check);
        } else {
            debug!("creating new check for pair {:?}", pair);
            let check = Arc::new(ConnCheck::new(pair, agent.clone(), peer_nominating));
            check.set_state(CandidatePairState::Waiting);
            self.add_check(check.clone());
            self.add_triggered(check);
        }

        Ok(None)
    }
}

fn binding_success_response(
    msg: &Message,
    from: SocketAddr,
    local_credentials: MessageIntegrityCredentials,
) -> Result<Message, AgentError> {
    let mut response = Message::new_success(msg);
    response.add_attribute(XorMappedAddress::new(from, msg.transaction_id()))?;
    response.add_message_integrity(&local_credentials)?;
    response.add_fingerprint()?;
    Ok(response)
}

Adds FINGERPRINT attribute to a Message

Errors
  • If a FINGERPRINT attribute is already present
Examples
let mut message = Message::new_request(BINDING);
assert!(message.add_fingerprint().is_ok());

// duplicate FINGERPRINT is an error
assert!(message.add_fingerprint().is_err());
Examples found in repository?
src/gathering.rs (line 62)
60
61
62
63
64
65
66
67
fn generate_bind_request() -> std::io::Result<Message> {
    let mut out = Message::new_request(BINDING);
    out.add_fingerprint()
        .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid message"))?;

    trace!("generated to {}", out);
    Ok(out)
}
More examples
Hide additional examples
src/conncheck.rs (line 175)
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
    fn generate_stun_request(
        conncheck: Arc<ConnCheck>,
        username: String,
        controlling: bool,
        tie_breaker: u64,
    ) -> Result<StunRequest, StunError> {
        let mut msg = Message::new_request(BINDING);

        // XXX: this needs to be the priority as if the candidate was peer-reflexive
        msg.add_attribute(Priority::new(conncheck.pair.local.priority))?;
        if controlling {
            msg.add_attribute(IceControlling::new(tie_breaker))?;
        } else {
            msg.add_attribute(IceControlled::new(tie_breaker))?;
        }
        if conncheck.nominate {
            msg.add_attribute(UseCandidate::new())?;
        }
        msg.add_attribute(Username::new(&username)?)?;
        msg.add_message_integrity(&conncheck.agent.local_credentials().unwrap())?;
        msg.add_fingerprint()?;

        let to = conncheck.pair.remote.address;
        conncheck.agent.stun_request_transaction(&msg, to)?.build()
    }

    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum CheckListState {
    Running,
    Completed,
    Failed,
}

static CONN_CHECK_LIST_COUNT: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
pub struct ConnCheckList {
    checklist_id: usize,
    inner: Arc<Mutex<ConnCheckListInner>>,
}

fn candidate_is_same_connection(a: &Candidate, b: &Candidate) -> bool {
    if a.component_id != b.component_id {
        return false;
    }
    if a.transport_type != b.transport_type {
        return false;
    }
    if a.base_address != b.base_address {
        return false;
    }
    if a.address != b.address {
        return false;
    }
    // TODO: active vs passive vs simultaneous open
    if a.tcp_type != b.tcp_type {
        return false;
    }
    // XXX: extensions?
    true
}

fn candidate_pair_is_same_connection(a: &CandidatePair, b: &CandidatePair) -> bool {
    if !candidate_is_same_connection(&a.local, &b.local) {
        return false;
    }
    if !candidate_is_same_connection(&a.remote, &b.remote) {
        return false;
    }
    true
}

#[derive(Debug)]
struct ConnCheckLocalCandidate {
    candidate: Candidate,
    stun_agent: StunAgent,
    #[allow(dead_code)]
    stun_recv_abort: AbortHandle,
    #[allow(dead_code)]
    data_recv_abort: AbortHandle,
}

#[derive(Debug)]
struct ConnCheckListInner {
    checklist_id: usize,
    set_inner: Weak<Mutex<CheckListSetInner>>,
    state: CheckListState,
    component_ids: Vec<usize>,
    components: Vec<Weak<Component>>,
    local_credentials: Credentials,
    remote_credentials: Credentials,
    local_candidates: Vec<ConnCheckLocalCandidate>,
    remote_candidates: Vec<Candidate>,
    // TODO: move to BinaryHeap or similar
    triggered: VecDeque<Arc<ConnCheck>>,
    pairs: VecDeque<Arc<ConnCheck>>,
    valid: Vec<CandidatePair>,
    nominated: Vec<CandidatePair>,
    controlling: bool,
}

impl ConnCheckListInner {
    fn new(
        checklist_id: usize,
        set_inner: Weak<Mutex<CheckListSetInner>>,
        controlling: bool,
    ) -> Self {
        Self {
            checklist_id,
            set_inner,
            state: CheckListState::Running,
            component_ids: vec![],
            components: vec![],
            local_credentials: Self::generate_random_credentials(),
            remote_credentials: Self::generate_random_credentials(),
            local_candidates: vec![],
            remote_candidates: vec![],
            triggered: VecDeque::new(),
            pairs: VecDeque::new(),
            valid: vec![],
            nominated: vec![],
            controlling,
        }
    }

    fn generate_random_ice_string(alphabet: &[u8], length: usize) -> String {
        use rand::{seq::SliceRandom, thread_rng};
        let mut rng = thread_rng();
        String::from_utf8(
            (0..length)
                .map(|_| *alphabet.choose(&mut rng).unwrap())
                .collect(),
        )
        .unwrap()
    }

    fn generate_random_credentials() -> Credentials {
        let alphabet =
            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/".as_bytes();
        let user = Self::generate_random_ice_string(alphabet, 4);
        let pass = Self::generate_random_ice_string(alphabet, 22);
        Credentials::new(user, pass)
    }

    #[tracing::instrument(
        name = "set_checklist_state",
        level = "debug",
        skip(self),
        fields(
            self.checklist_id,
        )
    )]
    fn set_state(&mut self, state: CheckListState) {
        if self.state != state {
            trace!(old_state = ?self.state, new_state = ?state, "changing state");
            self.state = state;
        }
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            self.checklist_id
        )
    )]
    fn find_remote_candidate(
        &self,
        component_id: usize,
        ttype: TransportType,
        addr: SocketAddr,
    ) -> Option<Candidate> {
        self.remote_candidates
            .iter()
            .find(|&remote| {
                remote.component_id == component_id
                    && remote.transport_type == ttype
                    && remote.address == addr
            })
            .cloned()
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, check),
        fields(
            self.checklist_id,
            check.conncheck_id
        )
    )]
    fn add_triggered(&mut self, check: Arc<ConnCheck>) {
        if let Some(idx) = self
            .triggered
            .iter()
            .position(|existing| candidate_pair_is_same_connection(&existing.pair, &check.pair))
        {
            // a nominating check trumps not nominating.  Otherwise, if the peers are delay sync,
            // then the non-nominating trigerred check may override the nomination process for a
            // long time and delay the connection process
            if check.nominate() && !self.triggered[idx].nominate() {
                let existing = self.triggered.remove(idx).unwrap();
                debug!("removing existing triggered {:?}", existing);
            } else {
                debug!("not adding duplicate triggered check");
                return;
            }
        }
        debug!("adding triggered check {:?}", check);
        self.triggered.push_front(check)
    }

    #[tracing::instrument(
        level = "debug",
        skip(self)
        fields(
            self.checklist_id,
            remote.ctype = ?remote.candidate_type,
            remote.foundation = ?remote.foundation,
            remote.address = ?remote.address
        )
    )]
    fn add_remote_candidate(&mut self, remote: Candidate) {
        self.remote_candidates.push(remote);
    }

    fn check_is_equal(check: &Arc<ConnCheck>, pair: &CandidatePair, nominate: Nominate) -> bool {
        candidate_is_same_connection(&check.pair.local, &pair.local)
            && candidate_is_same_connection(&check.pair.remote, &pair.remote)
            && nominate.eq(&check.nominate)
    }

    #[tracing::instrument(level = "trace", ret, skip(self, pair))]
    fn matching_check(&self, pair: &CandidatePair, nominate: Nominate) -> Option<Arc<ConnCheck>> {
        self.triggered
            .iter()
            .find(|&check| Self::check_is_equal(check, pair, nominate))
            .or_else(|| {
                self.pairs
                    .iter()
                    .find(|&check| Self::check_is_equal(check, pair, nominate))
            })
            .cloned()
    }

    fn take_matching_check(&mut self, pair: &CandidatePair) -> Option<Arc<ConnCheck>> {
        let pos = self
            .pairs
            .iter()
            .position(|check| Self::check_is_equal(check, pair, Nominate::DontCare));
        if let Some(position) = pos {
            self.pairs.remove(position)
        } else {
            None
        }
    }

    fn add_check(&mut self, check: Arc<ConnCheck>) {
        let idx = self
            .pairs
            .binary_search_by(|existing| {
                existing
                    .pair
                    .priority(self.controlling)
                    .cmp(&check.pair.priority(self.controlling))
                    .reverse()
            })
            .unwrap_or_else(|x| x);
        self.pairs.insert(idx, check);
    }

    fn set_controlling(&mut self, controlling: bool) {
        self.controlling = controlling;
        // changing the controlling (and therefore priority) requires resorting
        self.pairs.make_contiguous().sort_by(|a, b| {
            a.pair
                .priority(self.controlling)
                .cmp(&b.pair.priority(self.controlling))
                .reverse()
        })
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, pair),
        fields(component.id = pair.local.component_id)
    )]
    fn nominated_pair(&mut self, pair: &CandidatePair) -> Option<Arc<Component>> {
        if let Some(idx) = self
            .valid
            .iter()
            .position(|valid_pair| candidate_pair_is_same_connection(valid_pair, pair))
        {
            info!(
                ttype = ?pair.local.transport_type,
                local.address = ?pair.local.address,
                remote.address = ?pair.remote.address,
                local.ctype = ?pair.local.candidate_type,
                remote.ctype = ?pair.remote.candidate_type,
                foundation = %pair.foundation(),
                "nominated"
            );
            self.nominated.push(self.valid.remove(idx));
            let component = self
                .components
                .iter()
                .filter_map(|component| component.upgrade())
                .find(|component| component.id == pair.local.component_id);
            if self.state == CheckListState::Running {
                // o Once a candidate pair for a component of a data stream has been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent MUST remove all candidate pairs
                //   for the same component from the checklist and from the triggered-
                //   check queue.  If the state of a pair is In-Progress, the agent
                //   cancels the In-Progress transaction.  Cancellation means that the
                //   agent will not retransmit the Binding requests associated with the
                //   connectivity-check transaction, will not treat the lack of
                //   response to be a failure, but will wait the duration of the
                //   transaction timeout for a response.
                self.dump_check_state();
                self.triggered.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                self.pairs.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                // XXX: do we also need to clear self.valid?
                // o Once candidate pairs for each component of a data stream have been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent sets the state of the checklist
                //   to Completed.
                let all_nominated = self.component_ids.iter().all(|&component_id| {
                    self.nominated
                        .iter()
                        .any(|valid_pair| valid_pair.local.component_id == component_id)
                });
                if all_nominated {
                    // ... Once an ICE agent sets the
                    // state of the checklist to Completed (when there is a nominated pair
                    // for each component of the data stream), that pair becomes the
                    // selected pair for that agent and is used for sending and receiving
                    // data for that component of the data stream.
                    info!(
                        "all {} component/s nominated, setting selected pair/s",
                        self.component_ids.len()
                    );
                    self.nominated
                        .iter()
                        .fold(vec![], |mut component_ids_selected, valid_pair| {
                            // Only nominate one valid candidatePair
                            if !component_ids_selected
                                .iter()
                                .any(|&comp_id| comp_id == valid_pair.local.component_id)
                            {
                                if let Some(component) = &component {
                                    let local_agent = self
                                        .local_candidates
                                        .iter()
                                        .find(|cand| {
                                            cand.candidate.base_address == pair.local.base_address
                                        })
                                        .map(|cand| cand.stun_agent.clone());
                                    if let Some(local_agent) = local_agent {
                                        component.set_selected_pair(SelectedPair::new(
                                            pair.clone(),
                                            local_agent,
                                        ));
                                    } else {
                                        panic!("Cannot find existing local stun agent!");
                                    }
                                }
                                component_ids_selected.push(valid_pair.local.component_id);
                            }
                            component_ids_selected
                        });
                    self.set_state(CheckListState::Completed);
                }
            }
            debug!(
                "trying to signal component {:?}",
                component.clone().map(|c| c.id)
            );
            return component;
        } else {
            warn!("unknown nomination");
        }
        None
    }

    fn dump_check_state(&self) {
        let mut s = format!("checklist {}", self.checklist_id);
        for pair in self.pairs.iter() {
            use std::fmt::Write as _;
            let _ = write!(&mut s,
                "\nID:{id} foundation:{foundation} state:{state} nom:{nominate} priority:{local_pri},{remote_pri} trans:{transport} local:{local_cand_type} {local_addr} remote:{remote_cand_type} {remote_addr}",
                id = format_args!("{:<3}", pair.conncheck_id),
                foundation = format_args!("{:10}", pair.pair.foundation()),
                state = format_args!("{:10}", pair.state()),
                nominate = format_args!("{:5}", pair.nominate()),
                local_pri = format_args!("{:10}", pair.pair.local.priority),
                remote_pri = format_args!("{:10}", pair.pair.remote.priority),
                transport = format_args!("{:4}", pair.pair.local.transport_type),
                local_cand_type = format_args!("{:5}", pair.pair.local.candidate_type),
                local_addr = format_args!("{:32}", pair.pair.local.address),
                remote_cand_type = format_args!("{:5}", pair.pair.remote.candidate_type),
                remote_addr = format_args!("{:32}", pair.pair.remote.address)
            );
        }
        debug!("{}", s);
    }

    #[tracing::instrument(
        level = "debug",
        err
        skip(self, local, agent, from, priority)
        fields(
            checklist_id = self.checklist_id,
            state = ?self.state,
        )
    )]
    fn handle_binding_request(
        &mut self,
        peer_nominating: bool,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        from: SocketAddr,
        priority: u32,
    ) -> Result<Option<Arc<Component>>, AgentError> {
        let remote = self
            .find_remote_candidate(component_id, local.transport_type, from)
            .unwrap_or_else(|| {
                // If the source transport address of the request does not match any
                // existing remote candidates, it represents a new peer-reflexive remote
                // candidate.  This candidate is constructed as follows:
                //
                //   o  The priority is the value of the PRIORITY attribute in the Binding
                //      request.
                //   o  The type is peer reflexive.
                //   o  The component ID is the component ID of the local candidate to
                //      which the request was sent.
                //   o  The foundation is an arbitrary value, different from the
                //      foundations of all other remote candidates.  If any subsequent
                //      candidate exchanges contain this peer-reflexive candidate, it will
                //      signal the actual foundation for the candidate.
                let cand = Candidate::builder(
                    component_id,
                    CandidateType::PeerReflexive,
                    local.transport_type,
                    /* FIXME */ "rflx",
                    from,
                )
                .priority(priority)
                .build();
                debug!("new reflexive remote {:?}", cand);
                self.add_remote_candidate(cand.clone());
                cand
            });
        // RFC 8445 Section 7.3.1.4. Triggered Checks
        let pair = CandidatePair::new(local.clone(), remote);
        if let Some(mut check) = self.take_matching_check(&pair) {
            // When the pair is already on the checklist:
            trace!("found existing {:?} check {:?}", check.state(), check);
            match check.state() {
                // If the state of that pair is Succeeded, nothing further is
                // done.
                CandidatePairState::Succeeded => {
                    if peer_nominating {
                        debug!("existing pair succeeded -> nominate");
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            true,
                        ));
                        if let Some(component) = self.nominated_pair(&pair) {
                            self.add_check(check);
                            return Ok(Some(component));
                        }
                    }
                }
                // If the state of that pair is In-Progress, the agent cancels the
                // In-Progress transaction.  Cancellation means that the agent
                // will not retransmit the Binding requests associated with the
                // connectivity-check transaction, will not treat the lack of
                // response to be a failure, but will wait the duration of the
                // transaction timeout for a response.  In addition, the agent
                // MUST enqueue the pair in the triggered checklist associated
                // with the checklist, and set the state of the pair to Waiting,
                // in order to trigger a new connectivity check of the pair.
                // Creating a new connectivity check enables validating
                // In-Progress pairs as soon as possible, without having to wait
                // for retransmissions of the Binding requests associated with the
                // original connectivity-check transaction.
                CandidatePairState::InProgress => {
                    check.cancel_retransmissions();
                    // TODO: ignore response timeouts

                    self.add_check(check.clone());
                    check = Arc::new(ConnCheck::new(
                        check.pair.clone(),
                        check.agent.clone(),
                        peer_nominating,
                    ));
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
                // If the state of that pair is Waiting, Frozen, or Failed, the
                // agent MUST enqueue the pair in the triggered checklist
                // associated with the checklist (if not already present), and set
                // the state of the pair to Waiting, in order to trigger a new
                // connectivity check of the pair.  Note that a state change of
                // the pair from Failed to Waiting might also trigger a state
                // change of the associated checklist.
                CandidatePairState::Waiting
                | CandidatePairState::Frozen
                | CandidatePairState::Failed => {
                    if peer_nominating && !check.nominate() {
                        check.cancel();
                        self.add_check(check.clone());
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            peer_nominating,
                        ));
                    }
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
            }
            self.add_check(check);
        } else {
            debug!("creating new check for pair {:?}", pair);
            let check = Arc::new(ConnCheck::new(pair, agent.clone(), peer_nominating));
            check.set_state(CandidatePairState::Waiting);
            self.add_check(check.clone());
            self.add_triggered(check);
        }

        Ok(None)
    }
}

fn binding_success_response(
    msg: &Message,
    from: SocketAddr,
    local_credentials: MessageIntegrityCredentials,
) -> Result<Message, AgentError> {
    let mut response = Message::new_success(msg);
    response.add_attribute(XorMappedAddress::new(from, msg.transaction_id()))?;
    response.add_message_integrity(&local_credentials)?;
    response.add_fingerprint()?;
    Ok(response)
}

Add a Attribute to this Message. Only one AttributeType can be added for each Attribute. Attempting to add multiple Atributes of the same AttributeType` will fail.

Errors
  • if a MESSAGE_INTEGRITY attribute is attempted to be added. Use Message::add_message_integrity instead.
  • if a FINGERPRINT attribute is attempted to be added. Use Message::add_fingerprint instead.
  • If the attribute already exists within the message
  • If attempting to add attributes when MESSAGE_INTEGRITY or FINGERPRINT atributes already exist
Examples

Add an Attribute

let mut message = Message::new_request(BINDING);
let attr = RawAttribute::new(1.into(), &[3]);
assert!(message.add_attribute(attr.clone()).is_ok());
assert!(message.add_attribute(attr).is_err());
Examples found in repository?
src/stun/message.rs (line 1013)
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
    pub fn unknown_attributes(
        src: &Message,
        attributes: &[AttributeType],
    ) -> Result<Message, StunError> {
        let mut out = Message::new_error(src);
        out.add_attribute(Software::new("stund - librice v0.1")?)?;
        out.add_attribute(ErrorCode::new(420, "Unknown Attributes")?)?;
        if !attributes.is_empty() {
            out.add_attribute(UnknownAttributes::new(attributes))?;
        }
        Ok(out)
    }

    /// Generate an error message with an [`ERROR_CODE`] attribute signalling a 'Bad Request'
    ///
    /// # Examples
    ///
    /// ```
    /// # use librice::stun::message::{Message, MessageType, MessageClass, BINDING};
    /// # use librice::stun::attribute::*;
    /// # use std::convert::TryInto;
    /// let msg = Message::new_request(BINDING);
    /// let error_msg = Message::bad_request(&msg).unwrap();
    /// assert!(error_msg.has_attribute(ERROR_CODE));
    /// let error_code =  error_msg.attribute::<ErrorCode>(ERROR_CODE).unwrap();
    /// assert_eq!(error_code.code(), 400);
    /// ```
    pub fn bad_request(src: &Message) -> Result<Message, StunError> {
        let mut out = Message::new_error(src);
        out.add_attribute(Software::new("stund - librice v0.1")?)?;
        out.add_attribute(ErrorCode::new(400, "Bad Request")?)?;
        Ok(out)
    }
More examples
Hide additional examples
src/conncheck.rs (line 164)
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
    fn generate_stun_request(
        conncheck: Arc<ConnCheck>,
        username: String,
        controlling: bool,
        tie_breaker: u64,
    ) -> Result<StunRequest, StunError> {
        let mut msg = Message::new_request(BINDING);

        // XXX: this needs to be the priority as if the candidate was peer-reflexive
        msg.add_attribute(Priority::new(conncheck.pair.local.priority))?;
        if controlling {
            msg.add_attribute(IceControlling::new(tie_breaker))?;
        } else {
            msg.add_attribute(IceControlled::new(tie_breaker))?;
        }
        if conncheck.nominate {
            msg.add_attribute(UseCandidate::new())?;
        }
        msg.add_attribute(Username::new(&username)?)?;
        msg.add_message_integrity(&conncheck.agent.local_credentials().unwrap())?;
        msg.add_fingerprint()?;

        let to = conncheck.pair.remote.address;
        conncheck.agent.stun_request_transaction(&msg, to)?.build()
    }

    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum CheckListState {
    Running,
    Completed,
    Failed,
}

static CONN_CHECK_LIST_COUNT: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
pub struct ConnCheckList {
    checklist_id: usize,
    inner: Arc<Mutex<ConnCheckListInner>>,
}

fn candidate_is_same_connection(a: &Candidate, b: &Candidate) -> bool {
    if a.component_id != b.component_id {
        return false;
    }
    if a.transport_type != b.transport_type {
        return false;
    }
    if a.base_address != b.base_address {
        return false;
    }
    if a.address != b.address {
        return false;
    }
    // TODO: active vs passive vs simultaneous open
    if a.tcp_type != b.tcp_type {
        return false;
    }
    // XXX: extensions?
    true
}

fn candidate_pair_is_same_connection(a: &CandidatePair, b: &CandidatePair) -> bool {
    if !candidate_is_same_connection(&a.local, &b.local) {
        return false;
    }
    if !candidate_is_same_connection(&a.remote, &b.remote) {
        return false;
    }
    true
}

#[derive(Debug)]
struct ConnCheckLocalCandidate {
    candidate: Candidate,
    stun_agent: StunAgent,
    #[allow(dead_code)]
    stun_recv_abort: AbortHandle,
    #[allow(dead_code)]
    data_recv_abort: AbortHandle,
}

#[derive(Debug)]
struct ConnCheckListInner {
    checklist_id: usize,
    set_inner: Weak<Mutex<CheckListSetInner>>,
    state: CheckListState,
    component_ids: Vec<usize>,
    components: Vec<Weak<Component>>,
    local_credentials: Credentials,
    remote_credentials: Credentials,
    local_candidates: Vec<ConnCheckLocalCandidate>,
    remote_candidates: Vec<Candidate>,
    // TODO: move to BinaryHeap or similar
    triggered: VecDeque<Arc<ConnCheck>>,
    pairs: VecDeque<Arc<ConnCheck>>,
    valid: Vec<CandidatePair>,
    nominated: Vec<CandidatePair>,
    controlling: bool,
}

impl ConnCheckListInner {
    fn new(
        checklist_id: usize,
        set_inner: Weak<Mutex<CheckListSetInner>>,
        controlling: bool,
    ) -> Self {
        Self {
            checklist_id,
            set_inner,
            state: CheckListState::Running,
            component_ids: vec![],
            components: vec![],
            local_credentials: Self::generate_random_credentials(),
            remote_credentials: Self::generate_random_credentials(),
            local_candidates: vec![],
            remote_candidates: vec![],
            triggered: VecDeque::new(),
            pairs: VecDeque::new(),
            valid: vec![],
            nominated: vec![],
            controlling,
        }
    }

    fn generate_random_ice_string(alphabet: &[u8], length: usize) -> String {
        use rand::{seq::SliceRandom, thread_rng};
        let mut rng = thread_rng();
        String::from_utf8(
            (0..length)
                .map(|_| *alphabet.choose(&mut rng).unwrap())
                .collect(),
        )
        .unwrap()
    }

    fn generate_random_credentials() -> Credentials {
        let alphabet =
            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/".as_bytes();
        let user = Self::generate_random_ice_string(alphabet, 4);
        let pass = Self::generate_random_ice_string(alphabet, 22);
        Credentials::new(user, pass)
    }

    #[tracing::instrument(
        name = "set_checklist_state",
        level = "debug",
        skip(self),
        fields(
            self.checklist_id,
        )
    )]
    fn set_state(&mut self, state: CheckListState) {
        if self.state != state {
            trace!(old_state = ?self.state, new_state = ?state, "changing state");
            self.state = state;
        }
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            self.checklist_id
        )
    )]
    fn find_remote_candidate(
        &self,
        component_id: usize,
        ttype: TransportType,
        addr: SocketAddr,
    ) -> Option<Candidate> {
        self.remote_candidates
            .iter()
            .find(|&remote| {
                remote.component_id == component_id
                    && remote.transport_type == ttype
                    && remote.address == addr
            })
            .cloned()
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, check),
        fields(
            self.checklist_id,
            check.conncheck_id
        )
    )]
    fn add_triggered(&mut self, check: Arc<ConnCheck>) {
        if let Some(idx) = self
            .triggered
            .iter()
            .position(|existing| candidate_pair_is_same_connection(&existing.pair, &check.pair))
        {
            // a nominating check trumps not nominating.  Otherwise, if the peers are delay sync,
            // then the non-nominating trigerred check may override the nomination process for a
            // long time and delay the connection process
            if check.nominate() && !self.triggered[idx].nominate() {
                let existing = self.triggered.remove(idx).unwrap();
                debug!("removing existing triggered {:?}", existing);
            } else {
                debug!("not adding duplicate triggered check");
                return;
            }
        }
        debug!("adding triggered check {:?}", check);
        self.triggered.push_front(check)
    }

    #[tracing::instrument(
        level = "debug",
        skip(self)
        fields(
            self.checklist_id,
            remote.ctype = ?remote.candidate_type,
            remote.foundation = ?remote.foundation,
            remote.address = ?remote.address
        )
    )]
    fn add_remote_candidate(&mut self, remote: Candidate) {
        self.remote_candidates.push(remote);
    }

    fn check_is_equal(check: &Arc<ConnCheck>, pair: &CandidatePair, nominate: Nominate) -> bool {
        candidate_is_same_connection(&check.pair.local, &pair.local)
            && candidate_is_same_connection(&check.pair.remote, &pair.remote)
            && nominate.eq(&check.nominate)
    }

    #[tracing::instrument(level = "trace", ret, skip(self, pair))]
    fn matching_check(&self, pair: &CandidatePair, nominate: Nominate) -> Option<Arc<ConnCheck>> {
        self.triggered
            .iter()
            .find(|&check| Self::check_is_equal(check, pair, nominate))
            .or_else(|| {
                self.pairs
                    .iter()
                    .find(|&check| Self::check_is_equal(check, pair, nominate))
            })
            .cloned()
    }

    fn take_matching_check(&mut self, pair: &CandidatePair) -> Option<Arc<ConnCheck>> {
        let pos = self
            .pairs
            .iter()
            .position(|check| Self::check_is_equal(check, pair, Nominate::DontCare));
        if let Some(position) = pos {
            self.pairs.remove(position)
        } else {
            None
        }
    }

    fn add_check(&mut self, check: Arc<ConnCheck>) {
        let idx = self
            .pairs
            .binary_search_by(|existing| {
                existing
                    .pair
                    .priority(self.controlling)
                    .cmp(&check.pair.priority(self.controlling))
                    .reverse()
            })
            .unwrap_or_else(|x| x);
        self.pairs.insert(idx, check);
    }

    fn set_controlling(&mut self, controlling: bool) {
        self.controlling = controlling;
        // changing the controlling (and therefore priority) requires resorting
        self.pairs.make_contiguous().sort_by(|a, b| {
            a.pair
                .priority(self.controlling)
                .cmp(&b.pair.priority(self.controlling))
                .reverse()
        })
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, pair),
        fields(component.id = pair.local.component_id)
    )]
    fn nominated_pair(&mut self, pair: &CandidatePair) -> Option<Arc<Component>> {
        if let Some(idx) = self
            .valid
            .iter()
            .position(|valid_pair| candidate_pair_is_same_connection(valid_pair, pair))
        {
            info!(
                ttype = ?pair.local.transport_type,
                local.address = ?pair.local.address,
                remote.address = ?pair.remote.address,
                local.ctype = ?pair.local.candidate_type,
                remote.ctype = ?pair.remote.candidate_type,
                foundation = %pair.foundation(),
                "nominated"
            );
            self.nominated.push(self.valid.remove(idx));
            let component = self
                .components
                .iter()
                .filter_map(|component| component.upgrade())
                .find(|component| component.id == pair.local.component_id);
            if self.state == CheckListState::Running {
                // o Once a candidate pair for a component of a data stream has been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent MUST remove all candidate pairs
                //   for the same component from the checklist and from the triggered-
                //   check queue.  If the state of a pair is In-Progress, the agent
                //   cancels the In-Progress transaction.  Cancellation means that the
                //   agent will not retransmit the Binding requests associated with the
                //   connectivity-check transaction, will not treat the lack of
                //   response to be a failure, but will wait the duration of the
                //   transaction timeout for a response.
                self.dump_check_state();
                self.triggered.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                self.pairs.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                // XXX: do we also need to clear self.valid?
                // o Once candidate pairs for each component of a data stream have been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent sets the state of the checklist
                //   to Completed.
                let all_nominated = self.component_ids.iter().all(|&component_id| {
                    self.nominated
                        .iter()
                        .any(|valid_pair| valid_pair.local.component_id == component_id)
                });
                if all_nominated {
                    // ... Once an ICE agent sets the
                    // state of the checklist to Completed (when there is a nominated pair
                    // for each component of the data stream), that pair becomes the
                    // selected pair for that agent and is used for sending and receiving
                    // data for that component of the data stream.
                    info!(
                        "all {} component/s nominated, setting selected pair/s",
                        self.component_ids.len()
                    );
                    self.nominated
                        .iter()
                        .fold(vec![], |mut component_ids_selected, valid_pair| {
                            // Only nominate one valid candidatePair
                            if !component_ids_selected
                                .iter()
                                .any(|&comp_id| comp_id == valid_pair.local.component_id)
                            {
                                if let Some(component) = &component {
                                    let local_agent = self
                                        .local_candidates
                                        .iter()
                                        .find(|cand| {
                                            cand.candidate.base_address == pair.local.base_address
                                        })
                                        .map(|cand| cand.stun_agent.clone());
                                    if let Some(local_agent) = local_agent {
                                        component.set_selected_pair(SelectedPair::new(
                                            pair.clone(),
                                            local_agent,
                                        ));
                                    } else {
                                        panic!("Cannot find existing local stun agent!");
                                    }
                                }
                                component_ids_selected.push(valid_pair.local.component_id);
                            }
                            component_ids_selected
                        });
                    self.set_state(CheckListState::Completed);
                }
            }
            debug!(
                "trying to signal component {:?}",
                component.clone().map(|c| c.id)
            );
            return component;
        } else {
            warn!("unknown nomination");
        }
        None
    }

    fn dump_check_state(&self) {
        let mut s = format!("checklist {}", self.checklist_id);
        for pair in self.pairs.iter() {
            use std::fmt::Write as _;
            let _ = write!(&mut s,
                "\nID:{id} foundation:{foundation} state:{state} nom:{nominate} priority:{local_pri},{remote_pri} trans:{transport} local:{local_cand_type} {local_addr} remote:{remote_cand_type} {remote_addr}",
                id = format_args!("{:<3}", pair.conncheck_id),
                foundation = format_args!("{:10}", pair.pair.foundation()),
                state = format_args!("{:10}", pair.state()),
                nominate = format_args!("{:5}", pair.nominate()),
                local_pri = format_args!("{:10}", pair.pair.local.priority),
                remote_pri = format_args!("{:10}", pair.pair.remote.priority),
                transport = format_args!("{:4}", pair.pair.local.transport_type),
                local_cand_type = format_args!("{:5}", pair.pair.local.candidate_type),
                local_addr = format_args!("{:32}", pair.pair.local.address),
                remote_cand_type = format_args!("{:5}", pair.pair.remote.candidate_type),
                remote_addr = format_args!("{:32}", pair.pair.remote.address)
            );
        }
        debug!("{}", s);
    }

    #[tracing::instrument(
        level = "debug",
        err
        skip(self, local, agent, from, priority)
        fields(
            checklist_id = self.checklist_id,
            state = ?self.state,
        )
    )]
    fn handle_binding_request(
        &mut self,
        peer_nominating: bool,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        from: SocketAddr,
        priority: u32,
    ) -> Result<Option<Arc<Component>>, AgentError> {
        let remote = self
            .find_remote_candidate(component_id, local.transport_type, from)
            .unwrap_or_else(|| {
                // If the source transport address of the request does not match any
                // existing remote candidates, it represents a new peer-reflexive remote
                // candidate.  This candidate is constructed as follows:
                //
                //   o  The priority is the value of the PRIORITY attribute in the Binding
                //      request.
                //   o  The type is peer reflexive.
                //   o  The component ID is the component ID of the local candidate to
                //      which the request was sent.
                //   o  The foundation is an arbitrary value, different from the
                //      foundations of all other remote candidates.  If any subsequent
                //      candidate exchanges contain this peer-reflexive candidate, it will
                //      signal the actual foundation for the candidate.
                let cand = Candidate::builder(
                    component_id,
                    CandidateType::PeerReflexive,
                    local.transport_type,
                    /* FIXME */ "rflx",
                    from,
                )
                .priority(priority)
                .build();
                debug!("new reflexive remote {:?}", cand);
                self.add_remote_candidate(cand.clone());
                cand
            });
        // RFC 8445 Section 7.3.1.4. Triggered Checks
        let pair = CandidatePair::new(local.clone(), remote);
        if let Some(mut check) = self.take_matching_check(&pair) {
            // When the pair is already on the checklist:
            trace!("found existing {:?} check {:?}", check.state(), check);
            match check.state() {
                // If the state of that pair is Succeeded, nothing further is
                // done.
                CandidatePairState::Succeeded => {
                    if peer_nominating {
                        debug!("existing pair succeeded -> nominate");
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            true,
                        ));
                        if let Some(component) = self.nominated_pair(&pair) {
                            self.add_check(check);
                            return Ok(Some(component));
                        }
                    }
                }
                // If the state of that pair is In-Progress, the agent cancels the
                // In-Progress transaction.  Cancellation means that the agent
                // will not retransmit the Binding requests associated with the
                // connectivity-check transaction, will not treat the lack of
                // response to be a failure, but will wait the duration of the
                // transaction timeout for a response.  In addition, the agent
                // MUST enqueue the pair in the triggered checklist associated
                // with the checklist, and set the state of the pair to Waiting,
                // in order to trigger a new connectivity check of the pair.
                // Creating a new connectivity check enables validating
                // In-Progress pairs as soon as possible, without having to wait
                // for retransmissions of the Binding requests associated with the
                // original connectivity-check transaction.
                CandidatePairState::InProgress => {
                    check.cancel_retransmissions();
                    // TODO: ignore response timeouts

                    self.add_check(check.clone());
                    check = Arc::new(ConnCheck::new(
                        check.pair.clone(),
                        check.agent.clone(),
                        peer_nominating,
                    ));
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
                // If the state of that pair is Waiting, Frozen, or Failed, the
                // agent MUST enqueue the pair in the triggered checklist
                // associated with the checklist (if not already present), and set
                // the state of the pair to Waiting, in order to trigger a new
                // connectivity check of the pair.  Note that a state change of
                // the pair from Failed to Waiting might also trigger a state
                // change of the associated checklist.
                CandidatePairState::Waiting
                | CandidatePairState::Frozen
                | CandidatePairState::Failed => {
                    if peer_nominating && !check.nominate() {
                        check.cancel();
                        self.add_check(check.clone());
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            peer_nominating,
                        ));
                    }
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
            }
            self.add_check(check);
        } else {
            debug!("creating new check for pair {:?}", pair);
            let check = Arc::new(ConnCheck::new(pair, agent.clone(), peer_nominating));
            check.set_state(CandidatePairState::Waiting);
            self.add_check(check.clone());
            self.add_triggered(check);
        }

        Ok(None)
    }
}

fn binding_success_response(
    msg: &Message,
    from: SocketAddr,
    local_credentials: MessageIntegrityCredentials,
) -> Result<Message, AgentError> {
    let mut response = Message::new_success(msg);
    response.add_attribute(XorMappedAddress::new(from, msg.transaction_id()))?;
    response.add_message_integrity(&local_credentials)?;
    response.add_fingerprint()?;
    Ok(response)
}

#[derive(Clone, Copy, Debug)]
enum Nominate {
    True,
    False,
    DontCare,
}

impl PartialEq<Nominate> for Nominate {
    fn eq(&self, other: &Nominate) -> bool {
        matches!(self, &Nominate::DontCare)
            || matches!(other, &Nominate::DontCare)
            || (matches!(self, Nominate::True) && matches!(other, Nominate::True))
            || (matches!(self, Nominate::False) && matches!(other, Nominate::False))
    }
}
impl PartialEq<bool> for Nominate {
    fn eq(&self, other: &bool) -> bool {
        matches!(self, Nominate::DontCare)
            || (*other && self.eq(&Nominate::True))
            || (!*other && self.eq(&Nominate::False))
    }
}

impl ConnCheckList {
    fn state(&self) -> CheckListState {
        self.inner.lock().unwrap().state
    }

    fn set_state(&self, state: CheckListState) {
        let mut inner = self.inner.lock().unwrap();
        inner.set_state(state);
    }

    pub(crate) fn set_local_credentials(&self, credentials: Credentials) {
        let mut inner = self.inner.lock().unwrap();
        inner.local_credentials = credentials;
    }

    pub(crate) fn set_remote_credentials(&self, credentials: Credentials) {
        let mut inner = self.inner.lock().unwrap();
        inner.remote_credentials = credentials;
    }

    async fn handle_binding_request(
        weak_inner: Weak<Mutex<ConnCheckListInner>>,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        msg: &Message,
        from: SocketAddr,
    ) -> Result<Option<Message>, AgentError> {
        trace!("have request {}", msg);

        let local_credentials = agent
            .local_credentials()
            .ok_or(AgentError::ResourceNotFound)?;

        if let Some(error_msg) = Message::check_attribute_types(
            msg,
            &[
                USERNAME,
                FINGERPRINT,
                MESSAGE_INTEGRITY,
                ICE_CONTROLLED,
                ICE_CONTROLLING,
                PRIORITY,
                USE_CANDIDATE,
            ],
            &[USERNAME, FINGERPRINT, MESSAGE_INTEGRITY, PRIORITY],
        ) {
            // failure -> send error response
            return Ok(Some(error_msg));
        }
        let peer_nominating =
            if let Some(use_candidate_raw) = msg.attribute::<RawAttribute>(USE_CANDIDATE) {
                if UseCandidate::from_raw(&use_candidate_raw).is_ok() {
                    true
                } else {
                    return Ok(Some(Message::bad_request(msg)?));
                }
            } else {
                false
            };

        let priority = match msg.attribute::<Priority>(PRIORITY) {
            Some(p) => p.priority(),
            None => {
                return Ok(Some(Message::bad_request(msg)?));
            }
        };

        let ice_controlling = msg.attribute::<IceControlling>(ICE_CONTROLLING);
        let ice_controlled = msg.attribute::<IceControlled>(ICE_CONTROLLED);

        let response = {
            let checklist = weak_inner.upgrade().ok_or(AgentError::ConnectionClosed)?;
            let mut checklist = checklist.lock().unwrap();

            if checklist.state == CheckListState::Completed && !peer_nominating {
                // ignore binding requests if we are completed
                trace!("ignoring binding request as we have completed");
                return Ok(None);
            }

            // validate username
            if let Some(username) = msg.attribute::<Username>(USERNAME) {
                if !validate_username(username, &checklist.local_credentials) {
                    warn!("binding request failed username validation -> UNAUTHORIZED");
                    let mut response = Message::new_error(msg);
                    response.add_attribute(ErrorCode::builder(ErrorCode::UNAUTHORIZED).build()?)?;
                    return Ok(Some(response));
                }
            } else {
                // existence is checked above so can only fail when the username is invalid
                return Ok(Some(Message::bad_request(msg)?));
            }

            {
                // Deal with role conflicts
                // RFC 8445 7.3.1.1.  Detecting and Repairing Role Conflicts
                let set = checklist
                    .set_inner
                    .upgrade()
                    .ok_or(AgentError::ConnectionClosed)?;
                let mut set = set.lock().unwrap();
                if let Some(ice_controlling) = ice_controlling {
                    //  o  If the agent is in the controlling role, and the ICE-CONTROLLING
                    //     attribute is present in the request:
                    if set.controlling {
                        if set.tie_breaker >= ice_controlling.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLING attribute, the agent generates
                            //    a Binding error response and includes an ERROR-CODE attribute
                            //    with a value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLING attribute, the agent switches to the
                            //    controlled role.
                            set.controlling = false;
                            checklist.controlling = false;
                            // TODO: update priorities and other things
                        }
                    }
                }
                if let Some(ice_controlled) = ice_controlled {
                    // o  If the agent is in the controlled role, and the ICE-CONTROLLED
                    //    attribute is present in the request:
                    if !set.controlling {
                        if set.tie_breaker >= ice_controlled.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLED attribute, the agent switches to
                            //    the controlling role.
                            set.controlling = true;
                            checklist.set_controlling(false);
                            for l in set.checklists.iter() {
                                if l.checklist_id == checklist.checklist_id {
                                    continue;
                                }
                                let mut l = l.inner.lock().unwrap();
                                l.set_controlling(false);
                            }
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLED attribute, the agent generates a Binding
                            //    error response and includes an ERROR-CODE attribute with a
                            //    value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        }
                    }
                }
            }

            checklist.handle_binding_request(
                peer_nominating,
                component_id,
                local,
                agent,
                from,
                priority,
            )?
        };
        if let Some(component) = response {
            component.set_state(ComponentState::Connected).await;
        }
        Ok(Some(binding_success_response(
            msg,
            from,
            local_credentials,
        )?))
    }

Retrieve an Attribute from this Message.

Examples

Retrieve an Attribute

let mut message = Message::new_request(BINDING);
let attr = RawAttribute::new(1.into(), &[3]);
assert!(message.add_attribute(attr.clone()).is_ok());
assert_eq!(message.attribute::<RawAttribute>(1.into()).unwrap(), attr);
Examples found in repository?
src/gathering.rs (line 93)
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
async fn gather_stun_xor_address(
    local_preference: u8,
    agent: StunAgent,
    transport: TransportType,
    stun_server: SocketAddr,
) -> Result<GatherCandidateAddress, StunError> {
    let msg = generate_bind_request()?;

    agent
        .stun_request_transaction(&msg, stun_server)?
        .build()?
        .perform()
        .await
        .and_then(move |(response, from)| {
            if let Some(attr) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
                debug!(
                    "got external address {:?}",
                    attr.addr(response.transaction_id())
                );
                return Ok(GatherCandidateAddress {
                    ctype: CandidateType::ServerReflexive,
                    local_preference,
                    transport,
                    address: attr.addr(response.transaction_id()),
                    base: from,
                    related: Some(stun_server),
                });
            }
            Err(StunError::Failed)
        })
}
More examples
Hide additional examples
src/stun/agent.rs (line 611)
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
    fn handle_stun(&mut self, msg: Message, orig_data: &[u8], from: SocketAddr) -> HandleStunReply {
        if msg.is_response() {
            if let Some(orig_request) = self.take_outstanding_request(&msg.transaction_id()) {
                // only validate response if the original request had credentials
                if orig_request
                    .attribute::<MessageIntegrity>(MESSAGE_INTEGRITY)
                    .is_some()
                {
                    if let Some(remote_creds) = &self.remote_credentials {
                        match msg.validate_integrity(orig_data, remote_creds) {
                            Ok(_) => {
                                self.validated_peer(from);
                                HandleStunReply::Broadcast(msg)
                            }
                            Err(e) => {
                                debug!("message failed integrity check: {:?}", e);
                                HandleStunReply::Ignore
                            }
                        }
                    } else {
                        debug!("no remote credentials, ignoring");
                        HandleStunReply::Ignore
                    }
                } else {
                    // original message didn't have integrity, reply doesn't need to either
                    self.validated_peer(from);
                    HandleStunReply::Broadcast(msg)
                }
            } else {
                debug!("unmatched stun response, dropping {}", msg);
                // unmatched response -> drop
                HandleStunReply::Ignore
            }
        } else {
            self.validated_peer(from);
            HandleStunReply::Broadcast(msg)
        }
    }
src/conncheck.rs (line 207)
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum CheckListState {
    Running,
    Completed,
    Failed,
}

static CONN_CHECK_LIST_COUNT: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
pub struct ConnCheckList {
    checklist_id: usize,
    inner: Arc<Mutex<ConnCheckListInner>>,
}

fn candidate_is_same_connection(a: &Candidate, b: &Candidate) -> bool {
    if a.component_id != b.component_id {
        return false;
    }
    if a.transport_type != b.transport_type {
        return false;
    }
    if a.base_address != b.base_address {
        return false;
    }
    if a.address != b.address {
        return false;
    }
    // TODO: active vs passive vs simultaneous open
    if a.tcp_type != b.tcp_type {
        return false;
    }
    // XXX: extensions?
    true
}

fn candidate_pair_is_same_connection(a: &CandidatePair, b: &CandidatePair) -> bool {
    if !candidate_is_same_connection(&a.local, &b.local) {
        return false;
    }
    if !candidate_is_same_connection(&a.remote, &b.remote) {
        return false;
    }
    true
}

#[derive(Debug)]
struct ConnCheckLocalCandidate {
    candidate: Candidate,
    stun_agent: StunAgent,
    #[allow(dead_code)]
    stun_recv_abort: AbortHandle,
    #[allow(dead_code)]
    data_recv_abort: AbortHandle,
}

#[derive(Debug)]
struct ConnCheckListInner {
    checklist_id: usize,
    set_inner: Weak<Mutex<CheckListSetInner>>,
    state: CheckListState,
    component_ids: Vec<usize>,
    components: Vec<Weak<Component>>,
    local_credentials: Credentials,
    remote_credentials: Credentials,
    local_candidates: Vec<ConnCheckLocalCandidate>,
    remote_candidates: Vec<Candidate>,
    // TODO: move to BinaryHeap or similar
    triggered: VecDeque<Arc<ConnCheck>>,
    pairs: VecDeque<Arc<ConnCheck>>,
    valid: Vec<CandidatePair>,
    nominated: Vec<CandidatePair>,
    controlling: bool,
}

impl ConnCheckListInner {
    fn new(
        checklist_id: usize,
        set_inner: Weak<Mutex<CheckListSetInner>>,
        controlling: bool,
    ) -> Self {
        Self {
            checklist_id,
            set_inner,
            state: CheckListState::Running,
            component_ids: vec![],
            components: vec![],
            local_credentials: Self::generate_random_credentials(),
            remote_credentials: Self::generate_random_credentials(),
            local_candidates: vec![],
            remote_candidates: vec![],
            triggered: VecDeque::new(),
            pairs: VecDeque::new(),
            valid: vec![],
            nominated: vec![],
            controlling,
        }
    }

    fn generate_random_ice_string(alphabet: &[u8], length: usize) -> String {
        use rand::{seq::SliceRandom, thread_rng};
        let mut rng = thread_rng();
        String::from_utf8(
            (0..length)
                .map(|_| *alphabet.choose(&mut rng).unwrap())
                .collect(),
        )
        .unwrap()
    }

    fn generate_random_credentials() -> Credentials {
        let alphabet =
            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/".as_bytes();
        let user = Self::generate_random_ice_string(alphabet, 4);
        let pass = Self::generate_random_ice_string(alphabet, 22);
        Credentials::new(user, pass)
    }

    #[tracing::instrument(
        name = "set_checklist_state",
        level = "debug",
        skip(self),
        fields(
            self.checklist_id,
        )
    )]
    fn set_state(&mut self, state: CheckListState) {
        if self.state != state {
            trace!(old_state = ?self.state, new_state = ?state, "changing state");
            self.state = state;
        }
    }

    #[tracing::instrument(
        level = "debug",
        skip(self),
        fields(
            self.checklist_id
        )
    )]
    fn find_remote_candidate(
        &self,
        component_id: usize,
        ttype: TransportType,
        addr: SocketAddr,
    ) -> Option<Candidate> {
        self.remote_candidates
            .iter()
            .find(|&remote| {
                remote.component_id == component_id
                    && remote.transport_type == ttype
                    && remote.address == addr
            })
            .cloned()
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, check),
        fields(
            self.checklist_id,
            check.conncheck_id
        )
    )]
    fn add_triggered(&mut self, check: Arc<ConnCheck>) {
        if let Some(idx) = self
            .triggered
            .iter()
            .position(|existing| candidate_pair_is_same_connection(&existing.pair, &check.pair))
        {
            // a nominating check trumps not nominating.  Otherwise, if the peers are delay sync,
            // then the non-nominating trigerred check may override the nomination process for a
            // long time and delay the connection process
            if check.nominate() && !self.triggered[idx].nominate() {
                let existing = self.triggered.remove(idx).unwrap();
                debug!("removing existing triggered {:?}", existing);
            } else {
                debug!("not adding duplicate triggered check");
                return;
            }
        }
        debug!("adding triggered check {:?}", check);
        self.triggered.push_front(check)
    }

    #[tracing::instrument(
        level = "debug",
        skip(self)
        fields(
            self.checklist_id,
            remote.ctype = ?remote.candidate_type,
            remote.foundation = ?remote.foundation,
            remote.address = ?remote.address
        )
    )]
    fn add_remote_candidate(&mut self, remote: Candidate) {
        self.remote_candidates.push(remote);
    }

    fn check_is_equal(check: &Arc<ConnCheck>, pair: &CandidatePair, nominate: Nominate) -> bool {
        candidate_is_same_connection(&check.pair.local, &pair.local)
            && candidate_is_same_connection(&check.pair.remote, &pair.remote)
            && nominate.eq(&check.nominate)
    }

    #[tracing::instrument(level = "trace", ret, skip(self, pair))]
    fn matching_check(&self, pair: &CandidatePair, nominate: Nominate) -> Option<Arc<ConnCheck>> {
        self.triggered
            .iter()
            .find(|&check| Self::check_is_equal(check, pair, nominate))
            .or_else(|| {
                self.pairs
                    .iter()
                    .find(|&check| Self::check_is_equal(check, pair, nominate))
            })
            .cloned()
    }

    fn take_matching_check(&mut self, pair: &CandidatePair) -> Option<Arc<ConnCheck>> {
        let pos = self
            .pairs
            .iter()
            .position(|check| Self::check_is_equal(check, pair, Nominate::DontCare));
        if let Some(position) = pos {
            self.pairs.remove(position)
        } else {
            None
        }
    }

    fn add_check(&mut self, check: Arc<ConnCheck>) {
        let idx = self
            .pairs
            .binary_search_by(|existing| {
                existing
                    .pair
                    .priority(self.controlling)
                    .cmp(&check.pair.priority(self.controlling))
                    .reverse()
            })
            .unwrap_or_else(|x| x);
        self.pairs.insert(idx, check);
    }

    fn set_controlling(&mut self, controlling: bool) {
        self.controlling = controlling;
        // changing the controlling (and therefore priority) requires resorting
        self.pairs.make_contiguous().sort_by(|a, b| {
            a.pair
                .priority(self.controlling)
                .cmp(&b.pair.priority(self.controlling))
                .reverse()
        })
    }

    #[tracing::instrument(
        level = "debug",
        skip(self, pair),
        fields(component.id = pair.local.component_id)
    )]
    fn nominated_pair(&mut self, pair: &CandidatePair) -> Option<Arc<Component>> {
        if let Some(idx) = self
            .valid
            .iter()
            .position(|valid_pair| candidate_pair_is_same_connection(valid_pair, pair))
        {
            info!(
                ttype = ?pair.local.transport_type,
                local.address = ?pair.local.address,
                remote.address = ?pair.remote.address,
                local.ctype = ?pair.local.candidate_type,
                remote.ctype = ?pair.remote.candidate_type,
                foundation = %pair.foundation(),
                "nominated"
            );
            self.nominated.push(self.valid.remove(idx));
            let component = self
                .components
                .iter()
                .filter_map(|component| component.upgrade())
                .find(|component| component.id == pair.local.component_id);
            if self.state == CheckListState::Running {
                // o Once a candidate pair for a component of a data stream has been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent MUST remove all candidate pairs
                //   for the same component from the checklist and from the triggered-
                //   check queue.  If the state of a pair is In-Progress, the agent
                //   cancels the In-Progress transaction.  Cancellation means that the
                //   agent will not retransmit the Binding requests associated with the
                //   connectivity-check transaction, will not treat the lack of
                //   response to be a failure, but will wait the duration of the
                //   transaction timeout for a response.
                self.dump_check_state();
                self.triggered.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                self.pairs.retain(|check| {
                    if check.pair.local.component_id == pair.local.component_id {
                        check.cancel_retransmissions();
                        false
                    } else {
                        true
                    }
                });
                // XXX: do we also need to clear self.valid?
                // o Once candidate pairs for each component of a data stream have been
                //   nominated, and the state of the checklist associated with the data
                //   stream is Running, the ICE agent sets the state of the checklist
                //   to Completed.
                let all_nominated = self.component_ids.iter().all(|&component_id| {
                    self.nominated
                        .iter()
                        .any(|valid_pair| valid_pair.local.component_id == component_id)
                });
                if all_nominated {
                    // ... Once an ICE agent sets the
                    // state of the checklist to Completed (when there is a nominated pair
                    // for each component of the data stream), that pair becomes the
                    // selected pair for that agent and is used for sending and receiving
                    // data for that component of the data stream.
                    info!(
                        "all {} component/s nominated, setting selected pair/s",
                        self.component_ids.len()
                    );
                    self.nominated
                        .iter()
                        .fold(vec![], |mut component_ids_selected, valid_pair| {
                            // Only nominate one valid candidatePair
                            if !component_ids_selected
                                .iter()
                                .any(|&comp_id| comp_id == valid_pair.local.component_id)
                            {
                                if let Some(component) = &component {
                                    let local_agent = self
                                        .local_candidates
                                        .iter()
                                        .find(|cand| {
                                            cand.candidate.base_address == pair.local.base_address
                                        })
                                        .map(|cand| cand.stun_agent.clone());
                                    if let Some(local_agent) = local_agent {
                                        component.set_selected_pair(SelectedPair::new(
                                            pair.clone(),
                                            local_agent,
                                        ));
                                    } else {
                                        panic!("Cannot find existing local stun agent!");
                                    }
                                }
                                component_ids_selected.push(valid_pair.local.component_id);
                            }
                            component_ids_selected
                        });
                    self.set_state(CheckListState::Completed);
                }
            }
            debug!(
                "trying to signal component {:?}",
                component.clone().map(|c| c.id)
            );
            return component;
        } else {
            warn!("unknown nomination");
        }
        None
    }

    fn dump_check_state(&self) {
        let mut s = format!("checklist {}", self.checklist_id);
        for pair in self.pairs.iter() {
            use std::fmt::Write as _;
            let _ = write!(&mut s,
                "\nID:{id} foundation:{foundation} state:{state} nom:{nominate} priority:{local_pri},{remote_pri} trans:{transport} local:{local_cand_type} {local_addr} remote:{remote_cand_type} {remote_addr}",
                id = format_args!("{:<3}", pair.conncheck_id),
                foundation = format_args!("{:10}", pair.pair.foundation()),
                state = format_args!("{:10}", pair.state()),
                nominate = format_args!("{:5}", pair.nominate()),
                local_pri = format_args!("{:10}", pair.pair.local.priority),
                remote_pri = format_args!("{:10}", pair.pair.remote.priority),
                transport = format_args!("{:4}", pair.pair.local.transport_type),
                local_cand_type = format_args!("{:5}", pair.pair.local.candidate_type),
                local_addr = format_args!("{:32}", pair.pair.local.address),
                remote_cand_type = format_args!("{:5}", pair.pair.remote.candidate_type),
                remote_addr = format_args!("{:32}", pair.pair.remote.address)
            );
        }
        debug!("{}", s);
    }

    #[tracing::instrument(
        level = "debug",
        err
        skip(self, local, agent, from, priority)
        fields(
            checklist_id = self.checklist_id,
            state = ?self.state,
        )
    )]
    fn handle_binding_request(
        &mut self,
        peer_nominating: bool,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        from: SocketAddr,
        priority: u32,
    ) -> Result<Option<Arc<Component>>, AgentError> {
        let remote = self
            .find_remote_candidate(component_id, local.transport_type, from)
            .unwrap_or_else(|| {
                // If the source transport address of the request does not match any
                // existing remote candidates, it represents a new peer-reflexive remote
                // candidate.  This candidate is constructed as follows:
                //
                //   o  The priority is the value of the PRIORITY attribute in the Binding
                //      request.
                //   o  The type is peer reflexive.
                //   o  The component ID is the component ID of the local candidate to
                //      which the request was sent.
                //   o  The foundation is an arbitrary value, different from the
                //      foundations of all other remote candidates.  If any subsequent
                //      candidate exchanges contain this peer-reflexive candidate, it will
                //      signal the actual foundation for the candidate.
                let cand = Candidate::builder(
                    component_id,
                    CandidateType::PeerReflexive,
                    local.transport_type,
                    /* FIXME */ "rflx",
                    from,
                )
                .priority(priority)
                .build();
                debug!("new reflexive remote {:?}", cand);
                self.add_remote_candidate(cand.clone());
                cand
            });
        // RFC 8445 Section 7.3.1.4. Triggered Checks
        let pair = CandidatePair::new(local.clone(), remote);
        if let Some(mut check) = self.take_matching_check(&pair) {
            // When the pair is already on the checklist:
            trace!("found existing {:?} check {:?}", check.state(), check);
            match check.state() {
                // If the state of that pair is Succeeded, nothing further is
                // done.
                CandidatePairState::Succeeded => {
                    if peer_nominating {
                        debug!("existing pair succeeded -> nominate");
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            true,
                        ));
                        if let Some(component) = self.nominated_pair(&pair) {
                            self.add_check(check);
                            return Ok(Some(component));
                        }
                    }
                }
                // If the state of that pair is In-Progress, the agent cancels the
                // In-Progress transaction.  Cancellation means that the agent
                // will not retransmit the Binding requests associated with the
                // connectivity-check transaction, will not treat the lack of
                // response to be a failure, but will wait the duration of the
                // transaction timeout for a response.  In addition, the agent
                // MUST enqueue the pair in the triggered checklist associated
                // with the checklist, and set the state of the pair to Waiting,
                // in order to trigger a new connectivity check of the pair.
                // Creating a new connectivity check enables validating
                // In-Progress pairs as soon as possible, without having to wait
                // for retransmissions of the Binding requests associated with the
                // original connectivity-check transaction.
                CandidatePairState::InProgress => {
                    check.cancel_retransmissions();
                    // TODO: ignore response timeouts

                    self.add_check(check.clone());
                    check = Arc::new(ConnCheck::new(
                        check.pair.clone(),
                        check.agent.clone(),
                        peer_nominating,
                    ));
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
                // If the state of that pair is Waiting, Frozen, or Failed, the
                // agent MUST enqueue the pair in the triggered checklist
                // associated with the checklist (if not already present), and set
                // the state of the pair to Waiting, in order to trigger a new
                // connectivity check of the pair.  Note that a state change of
                // the pair from Failed to Waiting might also trigger a state
                // change of the associated checklist.
                CandidatePairState::Waiting
                | CandidatePairState::Frozen
                | CandidatePairState::Failed => {
                    if peer_nominating && !check.nominate() {
                        check.cancel();
                        self.add_check(check.clone());
                        check = Arc::new(ConnCheck::new(
                            check.pair.clone(),
                            check.agent.clone(),
                            peer_nominating,
                        ));
                    }
                    check.set_state(CandidatePairState::Waiting);
                    self.add_triggered(check.clone());
                }
            }
            self.add_check(check);
        } else {
            debug!("creating new check for pair {:?}", pair);
            let check = Arc::new(ConnCheck::new(pair, agent.clone(), peer_nominating));
            check.set_state(CandidatePairState::Waiting);
            self.add_check(check.clone());
            self.add_triggered(check);
        }

        Ok(None)
    }
}

fn binding_success_response(
    msg: &Message,
    from: SocketAddr,
    local_credentials: MessageIntegrityCredentials,
) -> Result<Message, AgentError> {
    let mut response = Message::new_success(msg);
    response.add_attribute(XorMappedAddress::new(from, msg.transaction_id()))?;
    response.add_message_integrity(&local_credentials)?;
    response.add_fingerprint()?;
    Ok(response)
}

#[derive(Clone, Copy, Debug)]
enum Nominate {
    True,
    False,
    DontCare,
}

impl PartialEq<Nominate> for Nominate {
    fn eq(&self, other: &Nominate) -> bool {
        matches!(self, &Nominate::DontCare)
            || matches!(other, &Nominate::DontCare)
            || (matches!(self, Nominate::True) && matches!(other, Nominate::True))
            || (matches!(self, Nominate::False) && matches!(other, Nominate::False))
    }
}
impl PartialEq<bool> for Nominate {
    fn eq(&self, other: &bool) -> bool {
        matches!(self, Nominate::DontCare)
            || (*other && self.eq(&Nominate::True))
            || (!*other && self.eq(&Nominate::False))
    }
}

impl ConnCheckList {
    fn state(&self) -> CheckListState {
        self.inner.lock().unwrap().state
    }

    fn set_state(&self, state: CheckListState) {
        let mut inner = self.inner.lock().unwrap();
        inner.set_state(state);
    }

    pub(crate) fn set_local_credentials(&self, credentials: Credentials) {
        let mut inner = self.inner.lock().unwrap();
        inner.local_credentials = credentials;
    }

    pub(crate) fn set_remote_credentials(&self, credentials: Credentials) {
        let mut inner = self.inner.lock().unwrap();
        inner.remote_credentials = credentials;
    }

    async fn handle_binding_request(
        weak_inner: Weak<Mutex<ConnCheckListInner>>,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        msg: &Message,
        from: SocketAddr,
    ) -> Result<Option<Message>, AgentError> {
        trace!("have request {}", msg);

        let local_credentials = agent
            .local_credentials()
            .ok_or(AgentError::ResourceNotFound)?;

        if let Some(error_msg) = Message::check_attribute_types(
            msg,
            &[
                USERNAME,
                FINGERPRINT,
                MESSAGE_INTEGRITY,
                ICE_CONTROLLED,
                ICE_CONTROLLING,
                PRIORITY,
                USE_CANDIDATE,
            ],
            &[USERNAME, FINGERPRINT, MESSAGE_INTEGRITY, PRIORITY],
        ) {
            // failure -> send error response
            return Ok(Some(error_msg));
        }
        let peer_nominating =
            if let Some(use_candidate_raw) = msg.attribute::<RawAttribute>(USE_CANDIDATE) {
                if UseCandidate::from_raw(&use_candidate_raw).is_ok() {
                    true
                } else {
                    return Ok(Some(Message::bad_request(msg)?));
                }
            } else {
                false
            };

        let priority = match msg.attribute::<Priority>(PRIORITY) {
            Some(p) => p.priority(),
            None => {
                return Ok(Some(Message::bad_request(msg)?));
            }
        };

        let ice_controlling = msg.attribute::<IceControlling>(ICE_CONTROLLING);
        let ice_controlled = msg.attribute::<IceControlled>(ICE_CONTROLLED);

        let response = {
            let checklist = weak_inner.upgrade().ok_or(AgentError::ConnectionClosed)?;
            let mut checklist = checklist.lock().unwrap();

            if checklist.state == CheckListState::Completed && !peer_nominating {
                // ignore binding requests if we are completed
                trace!("ignoring binding request as we have completed");
                return Ok(None);
            }

            // validate username
            if let Some(username) = msg.attribute::<Username>(USERNAME) {
                if !validate_username(username, &checklist.local_credentials) {
                    warn!("binding request failed username validation -> UNAUTHORIZED");
                    let mut response = Message::new_error(msg);
                    response.add_attribute(ErrorCode::builder(ErrorCode::UNAUTHORIZED).build()?)?;
                    return Ok(Some(response));
                }
            } else {
                // existence is checked above so can only fail when the username is invalid
                return Ok(Some(Message::bad_request(msg)?));
            }

            {
                // Deal with role conflicts
                // RFC 8445 7.3.1.1.  Detecting and Repairing Role Conflicts
                let set = checklist
                    .set_inner
                    .upgrade()
                    .ok_or(AgentError::ConnectionClosed)?;
                let mut set = set.lock().unwrap();
                if let Some(ice_controlling) = ice_controlling {
                    //  o  If the agent is in the controlling role, and the ICE-CONTROLLING
                    //     attribute is present in the request:
                    if set.controlling {
                        if set.tie_breaker >= ice_controlling.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLING attribute, the agent generates
                            //    a Binding error response and includes an ERROR-CODE attribute
                            //    with a value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLING attribute, the agent switches to the
                            //    controlled role.
                            set.controlling = false;
                            checklist.controlling = false;
                            // TODO: update priorities and other things
                        }
                    }
                }
                if let Some(ice_controlled) = ice_controlled {
                    // o  If the agent is in the controlled role, and the ICE-CONTROLLED
                    //    attribute is present in the request:
                    if !set.controlling {
                        if set.tie_breaker >= ice_controlled.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLED attribute, the agent switches to
                            //    the controlling role.
                            set.controlling = true;
                            checklist.set_controlling(false);
                            for l in set.checklists.iter() {
                                if l.checklist_id == checklist.checklist_id {
                                    continue;
                                }
                                let mut l = l.inner.lock().unwrap();
                                l.set_controlling(false);
                            }
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLED attribute, the agent generates a Binding
                            //    error response and includes an ERROR-CODE attribute with a
                            //    value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        }
                    }
                }
            }

            checklist.handle_binding_request(
                peer_nominating,
                component_id,
                local,
                agent,
                from,
                priority,
            )?
        };
        if let Some(component) = response {
            component.set_state(ComponentState::Connected).await;
        }
        Ok(Some(binding_success_response(
            msg,
            from,
            local_credentials,
        )?))
    }

Returns an iterator over the attributes in the Message.

Check that a message Message only contains required attributes that are supported and have at least some set of required attributes. Returns an appropriate error message on failure to meet these requirements.

Examples
let mut message = Message::new_request(BINDING);
// If nothing is required, no error response is returned
assert!(matches!(Message::check_attribute_types(&message, &[], &[]), None));

// If an atttribute is required that is not in the message, then and error response message
// is generated
let error_msg = Message::check_attribute_types(
    &message,
    &[],
    &[SOFTWARE]
).unwrap();
assert!(error_msg.has_attribute(ERROR_CODE));
let error_code = error_msg.attribute::<ErrorCode>(ERROR_CODE).unwrap();
assert_eq!(error_code.code(), 400);

message.add_attribute(Username::new("user").unwrap());
// If a Username is in the message but is not advertised as supported then an
// 'UNKNOWN-ATTRIBUTES' error response is returned
let error_msg = Message::check_attribute_types(&message, &[], &[]).unwrap();
assert!(error_msg.is_response());
assert!(error_msg.has_attribute(ERROR_CODE));
let error_code : ErrorCode = error_msg.attribute::<ErrorCode>(ERROR_CODE).unwrap();
assert_eq!(error_code.code(), 420);
assert!(error_msg.has_attribute(UNKNOWN_ATTRIBUTES));
Examples found in repository?
src/conncheck.rs (lines 836-848)
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
    async fn handle_binding_request(
        weak_inner: Weak<Mutex<ConnCheckListInner>>,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        msg: &Message,
        from: SocketAddr,
    ) -> Result<Option<Message>, AgentError> {
        trace!("have request {}", msg);

        let local_credentials = agent
            .local_credentials()
            .ok_or(AgentError::ResourceNotFound)?;

        if let Some(error_msg) = Message::check_attribute_types(
            msg,
            &[
                USERNAME,
                FINGERPRINT,
                MESSAGE_INTEGRITY,
                ICE_CONTROLLED,
                ICE_CONTROLLING,
                PRIORITY,
                USE_CANDIDATE,
            ],
            &[USERNAME, FINGERPRINT, MESSAGE_INTEGRITY, PRIORITY],
        ) {
            // failure -> send error response
            return Ok(Some(error_msg));
        }
        let peer_nominating =
            if let Some(use_candidate_raw) = msg.attribute::<RawAttribute>(USE_CANDIDATE) {
                if UseCandidate::from_raw(&use_candidate_raw).is_ok() {
                    true
                } else {
                    return Ok(Some(Message::bad_request(msg)?));
                }
            } else {
                false
            };

        let priority = match msg.attribute::<Priority>(PRIORITY) {
            Some(p) => p.priority(),
            None => {
                return Ok(Some(Message::bad_request(msg)?));
            }
        };

        let ice_controlling = msg.attribute::<IceControlling>(ICE_CONTROLLING);
        let ice_controlled = msg.attribute::<IceControlled>(ICE_CONTROLLED);

        let response = {
            let checklist = weak_inner.upgrade().ok_or(AgentError::ConnectionClosed)?;
            let mut checklist = checklist.lock().unwrap();

            if checklist.state == CheckListState::Completed && !peer_nominating {
                // ignore binding requests if we are completed
                trace!("ignoring binding request as we have completed");
                return Ok(None);
            }

            // validate username
            if let Some(username) = msg.attribute::<Username>(USERNAME) {
                if !validate_username(username, &checklist.local_credentials) {
                    warn!("binding request failed username validation -> UNAUTHORIZED");
                    let mut response = Message::new_error(msg);
                    response.add_attribute(ErrorCode::builder(ErrorCode::UNAUTHORIZED).build()?)?;
                    return Ok(Some(response));
                }
            } else {
                // existence is checked above so can only fail when the username is invalid
                return Ok(Some(Message::bad_request(msg)?));
            }

            {
                // Deal with role conflicts
                // RFC 8445 7.3.1.1.  Detecting and Repairing Role Conflicts
                let set = checklist
                    .set_inner
                    .upgrade()
                    .ok_or(AgentError::ConnectionClosed)?;
                let mut set = set.lock().unwrap();
                if let Some(ice_controlling) = ice_controlling {
                    //  o  If the agent is in the controlling role, and the ICE-CONTROLLING
                    //     attribute is present in the request:
                    if set.controlling {
                        if set.tie_breaker >= ice_controlling.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLING attribute, the agent generates
                            //    a Binding error response and includes an ERROR-CODE attribute
                            //    with a value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLING attribute, the agent switches to the
                            //    controlled role.
                            set.controlling = false;
                            checklist.controlling = false;
                            // TODO: update priorities and other things
                        }
                    }
                }
                if let Some(ice_controlled) = ice_controlled {
                    // o  If the agent is in the controlled role, and the ICE-CONTROLLED
                    //    attribute is present in the request:
                    if !set.controlling {
                        if set.tie_breaker >= ice_controlled.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLED attribute, the agent switches to
                            //    the controlling role.
                            set.controlling = true;
                            checklist.set_controlling(false);
                            for l in set.checklists.iter() {
                                if l.checklist_id == checklist.checklist_id {
                                    continue;
                                }
                                let mut l = l.inner.lock().unwrap();
                                l.set_controlling(false);
                            }
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLED attribute, the agent generates a Binding
                            //    error response and includes an ERROR-CODE attribute with a
                            //    value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        }
                    }
                }
            }

            checklist.handle_binding_request(
                peer_nominating,
                component_id,
                local,
                agent,
                from,
                priority,
            )?
        };
        if let Some(component) = response {
            component.set_state(ComponentState::Connected).await;
        }
        Ok(Some(binding_success_response(
            msg,
            from,
            local_credentials,
        )?))
    }

Generate an error message with an ERROR_CODE attribute signalling ‘Unknown Attribute’ and an UNKNOWN_ATTRIBUTES attribute containing the attributes that are unknown.

Examples
let msg = Message::new_request(BINDING);
let error_msg = Message::unknown_attributes(&msg, &[USERNAME]).unwrap();
assert!(error_msg.is_response());
assert!(error_msg.has_attribute(ERROR_CODE));
let error_code = error_msg.attribute::<ErrorCode>(ERROR_CODE).unwrap();
assert_eq!(error_code.code(), 420);
let unknown = error_msg.attribute::<UnknownAttributes>(UNKNOWN_ATTRIBUTES).unwrap();
assert!(unknown.has_attribute(USERNAME));

Generate an error message with an ERROR_CODE attribute signalling a ‘Bad Request’

Examples
let msg = Message::new_request(BINDING);
let error_msg = Message::bad_request(&msg).unwrap();
assert!(error_msg.has_attribute(ERROR_CODE));
let error_code =  error_msg.attribute::<ErrorCode>(ERROR_CODE).unwrap();
assert_eq!(error_code.code(), 400);
Examples found in repository?
src/conncheck.rs (line 857)
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
    async fn handle_binding_request(
        weak_inner: Weak<Mutex<ConnCheckListInner>>,
        component_id: usize,
        local: &Candidate,
        agent: StunAgent,
        msg: &Message,
        from: SocketAddr,
    ) -> Result<Option<Message>, AgentError> {
        trace!("have request {}", msg);

        let local_credentials = agent
            .local_credentials()
            .ok_or(AgentError::ResourceNotFound)?;

        if let Some(error_msg) = Message::check_attribute_types(
            msg,
            &[
                USERNAME,
                FINGERPRINT,
                MESSAGE_INTEGRITY,
                ICE_CONTROLLED,
                ICE_CONTROLLING,
                PRIORITY,
                USE_CANDIDATE,
            ],
            &[USERNAME, FINGERPRINT, MESSAGE_INTEGRITY, PRIORITY],
        ) {
            // failure -> send error response
            return Ok(Some(error_msg));
        }
        let peer_nominating =
            if let Some(use_candidate_raw) = msg.attribute::<RawAttribute>(USE_CANDIDATE) {
                if UseCandidate::from_raw(&use_candidate_raw).is_ok() {
                    true
                } else {
                    return Ok(Some(Message::bad_request(msg)?));
                }
            } else {
                false
            };

        let priority = match msg.attribute::<Priority>(PRIORITY) {
            Some(p) => p.priority(),
            None => {
                return Ok(Some(Message::bad_request(msg)?));
            }
        };

        let ice_controlling = msg.attribute::<IceControlling>(ICE_CONTROLLING);
        let ice_controlled = msg.attribute::<IceControlled>(ICE_CONTROLLED);

        let response = {
            let checklist = weak_inner.upgrade().ok_or(AgentError::ConnectionClosed)?;
            let mut checklist = checklist.lock().unwrap();

            if checklist.state == CheckListState::Completed && !peer_nominating {
                // ignore binding requests if we are completed
                trace!("ignoring binding request as we have completed");
                return Ok(None);
            }

            // validate username
            if let Some(username) = msg.attribute::<Username>(USERNAME) {
                if !validate_username(username, &checklist.local_credentials) {
                    warn!("binding request failed username validation -> UNAUTHORIZED");
                    let mut response = Message::new_error(msg);
                    response.add_attribute(ErrorCode::builder(ErrorCode::UNAUTHORIZED).build()?)?;
                    return Ok(Some(response));
                }
            } else {
                // existence is checked above so can only fail when the username is invalid
                return Ok(Some(Message::bad_request(msg)?));
            }

            {
                // Deal with role conflicts
                // RFC 8445 7.3.1.1.  Detecting and Repairing Role Conflicts
                let set = checklist
                    .set_inner
                    .upgrade()
                    .ok_or(AgentError::ConnectionClosed)?;
                let mut set = set.lock().unwrap();
                if let Some(ice_controlling) = ice_controlling {
                    //  o  If the agent is in the controlling role, and the ICE-CONTROLLING
                    //     attribute is present in the request:
                    if set.controlling {
                        if set.tie_breaker >= ice_controlling.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLING attribute, the agent generates
                            //    a Binding error response and includes an ERROR-CODE attribute
                            //    with a value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLING attribute, the agent switches to the
                            //    controlled role.
                            set.controlling = false;
                            checklist.controlling = false;
                            // TODO: update priorities and other things
                        }
                    }
                }
                if let Some(ice_controlled) = ice_controlled {
                    // o  If the agent is in the controlled role, and the ICE-CONTROLLED
                    //    attribute is present in the request:
                    if !set.controlling {
                        if set.tie_breaker >= ice_controlled.tie_breaker() {
                            // *  If the agent's tiebreaker value is larger than or equal to the
                            //    contents of the ICE-CONTROLLED attribute, the agent switches to
                            //    the controlling role.
                            set.controlling = true;
                            checklist.set_controlling(false);
                            for l in set.checklists.iter() {
                                if l.checklist_id == checklist.checklist_id {
                                    continue;
                                }
                                let mut l = l.inner.lock().unwrap();
                                l.set_controlling(false);
                            }
                        } else {
                            // *  If the agent's tiebreaker value is less than the contents of
                            //    the ICE-CONTROLLED attribute, the agent generates a Binding
                            //    error response and includes an ERROR-CODE attribute with a
                            //    value of 487 (Role Conflict) but retains its role.
                            let mut response = Message::new_error(msg);
                            response.add_attribute(
                                ErrorCode::builder(ErrorCode::ROLE_CONFLICT).build()?,
                            )?;
                            return Ok(Some(response));
                        }
                    }
                }
            }

            checklist.handle_binding_request(
                peer_nominating,
                component_id,
                local,
                agent,
                from,
                priority,
            )?
        };
        if let Some(component) = response {
            component.set_state(ComponentState::Connected).await;
        }
        Ok(Some(binding_success_response(
            msg,
            from,
            local_credentials,
        )?))
    }
Examples found in repository?
src/conncheck.rs (line 212)
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    async fn do_stun_request(
        conncheck: Arc<ConnCheck>,
        stun_request: StunRequest,
    ) -> Result<ConnCheckResponse, AgentError> {
        // send binding request
        // wait for response
        // if timeout -> resend?
        // if longer timeout -> fail
        // TODO: optional: if icmp error -> fail
        let (response, from) = match stun_request.perform().await {
            Err(e) => {
                warn!("connectivity check produced error: {:?}", e);
                return Ok(ConnCheckResponse::Failure(conncheck));
            }
            Ok(v) => v,
        };
        trace!("have response: {}", response);

        if !response.is_response() {
            // response is not a response!
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response error -> fail TODO: might be a recoverable error!
        if response.has_class(MessageClass::Error) {
            warn!("error response {}", response);
            if let Some(err) = response.attribute::<ErrorCode>(ERROR_CODE) {
                if err.code() == ErrorCode::ROLE_CONFLICT {
                    info!("Role conflict received {}", response);
                    return Ok(ConnCheckResponse::RoleConflict(
                        conncheck,
                        stun_request.request().has_attribute(ICE_CONTROLLED),
                    ));
                }
            }
            // FIXME: some failures are recoverable
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        // if response success:
        // if mismatched address -> fail
        if from != stun_request.peer_address() {
            warn!(
                "response came from different ip {:?} than candidate {:?}",
                from,
                stun_request.peer_address()
            );
            return Ok(ConnCheckResponse::Failure(conncheck));
        }

        if let Some(xor) = response.attribute::<XorMappedAddress>(XOR_MAPPED_ADDRESS) {
            let xor_addr = xor.addr(response.transaction_id());
            // TODO: if response mapped address not in remote candidate list -> new peer-reflexive candidate
            // TODO glare
            return Ok(ConnCheckResponse::Success(conncheck, xor_addr));
        }

        Ok(ConnCheckResponse::Failure(conncheck))
    }

Trait Implementations§

Returns a copy of the value. Read more
Performs copy-assignment from source. Read more
Formats the value using the given formatter. Read more
Formats the value using the given formatter. Read more
Converts to this type from the input type.
The type returned in the event of a conversion error.
Performs the conversion.

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Should always be Self
The resulting type after obtaining ownership.
Creates owned data from borrowed data, usually by cloning. Read more
Uses borrowed data to replace owned data, usually by cloning. Read more
Converts the given value to a String. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more