Skip to main content

iroh_auth/
lib.rs

1use n0_watcher::Watchable;
2use std::{
3    collections::BTreeSet,
4    sync::{Arc, Mutex},
5    time::Duration,
6};
7use tracing::{debug, error, info, trace, warn};
8
9use hkdf::Hkdf;
10use iroh::{
11    endpoint::{AfterHandshakeOutcome, Connection, EndpointHooks, VarInt},
12    protocol::ProtocolHandler,
13    Endpoint, EndpointId, PublicKey, Watcher,
14};
15use n0_future::{task::spawn, time::timeout, StreamExt};
16use secrecy::{ExposeSecret, SecretSlice};
17use sha2::Sha512;
18use spake2::{Ed25519Group, Identity, Password, Spake2};
19use subtle::ConstantTimeEq;
20
21// Errors
22#[derive(Debug)]
23pub enum AuthenticatorError {
24    AddFailed,
25    AcceptFailed(String),
26    OpenFailed(String),
27    AcceptFailedAndBlock(String, EndpointId),
28    OpenFailedAndBlock(String, EndpointId),
29    EndpointNotSet,
30}
31
32impl std::fmt::Display for AuthenticatorError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            AuthenticatorError::AddFailed => write!(f, "Failed to add authenticated ID"),
36            AuthenticatorError::AcceptFailed(msg) => write!(f, "Accept failed: {}", msg),
37            AuthenticatorError::OpenFailed(msg) => write!(f, "Open failed: {}", msg),
38            AuthenticatorError::EndpointNotSet => write!(
39                f,
40                "Authenticator endpoint not set: missing authenticator.start(endpoint)"
41            ),
42            AuthenticatorError::AcceptFailedAndBlock(msg, id) => {
43                write!(f, "Blocked endpoint ID: {}: {}", msg, id)
44            }
45            AuthenticatorError::OpenFailedAndBlock(msg, id) => {
46                write!(f, "Blocked endpoint ID: {}: {}", msg, id)
47            }
48        }
49    }
50}
51
52impl std::error::Error for AuthenticatorError {}
53
54pub trait IntoSecret {
55    fn into_secret(self) -> SecretSlice<u8>;
56}
57
58impl IntoSecret for SecretSlice<u8> {
59    fn into_secret(self) -> SecretSlice<u8> {
60        self
61    }
62}
63
64impl IntoSecret for String {
65    fn into_secret(self) -> SecretSlice<u8> {
66        SecretSlice::new(self.into_bytes().into_boxed_slice())
67    }
68}
69
70impl IntoSecret for &str {
71    fn into_secret(self) -> SecretSlice<u8> {
72        SecretSlice::new(self.as_bytes().to_vec().into_boxed_slice())
73    }
74}
75
76impl IntoSecret for Vec<u8> {
77    fn into_secret(self) -> SecretSlice<u8> {
78        SecretSlice::new(self.into_boxed_slice())
79    }
80}
81
82impl IntoSecret for &[u8] {
83    fn into_secret(self) -> SecretSlice<u8> {
84        SecretSlice::new(self.to_vec().into_boxed_slice())
85    }
86}
87
88impl<const N: usize> IntoSecret for &[u8; N] {
89    fn into_secret(self) -> SecretSlice<u8> {
90        SecretSlice::new(self.as_slice().to_vec().into_boxed_slice())
91    }
92}
93
94impl IntoSecret for Box<[u8]> {
95    fn into_secret(self) -> SecretSlice<u8> {
96        SecretSlice::new(self)
97    }
98}
99
100#[derive(Debug, Clone, Default, PartialEq, Eq)]
101struct WatchableCounter {
102    authenticated: usize,
103    blocked: usize,
104}
105
106#[derive(Debug, Clone)]
107pub struct Authenticator {
108    secret: SecretSlice<u8>,
109    authenticated: Arc<Mutex<BTreeSet<PublicKey>>>,
110    watcher: Watchable<WatchableCounter>,
111    endpoint: Arc<Mutex<Option<iroh::Endpoint>>>,
112}
113
114pub const ALPN: &[u8] = b"/iroh/auth/0.1";
115pub const AUTH_TIMEOUT: Duration = Duration::from_secs(10);
116
117impl Authenticator {
118    pub const ALPN: &'static [u8] = ALPN;
119    const ACCEPT_CONTEXT: &'static [u8] = b"iroh-auth-accept";
120    const OPEN_CONTEXT: &'static [u8] = b"iroh-auth-open";
121
122    pub fn new<S: IntoSecret>(secret: S) -> Self {
123        Self {
124            secret: secret.into_secret(),
125            authenticated: Arc::new(Mutex::new(BTreeSet::new())),
126            watcher: Watchable::new(WatchableCounter::default()),
127            endpoint: Arc::new(Mutex::new(None)),
128        }
129    }
130
131    pub fn set_endpoint(&self, endpoint: &Endpoint) {
132        if let Ok(mut guard) = self.endpoint.lock() {
133            if guard.is_none() {
134                *guard = Some(endpoint.clone());
135                trace!("Authenticator endpoint set to {}", endpoint.id());
136            }
137        }
138    }
139
140    fn id(&self) -> Result<PublicKey, AuthenticatorError> {
141        self.endpoint
142            .lock()
143            .map_err(|_| AuthenticatorError::EndpointNotSet)?
144            .as_ref()
145            .map(|ep| ep.id())
146            .ok_or(AuthenticatorError::EndpointNotSet)
147    }
148
149    fn endpoint(&self) -> Result<iroh::Endpoint, AuthenticatorError> {
150        self.endpoint
151            .lock()
152            .map_err(|_| AuthenticatorError::EndpointNotSet)?
153            .as_ref()
154            .cloned()
155            .ok_or(AuthenticatorError::EndpointNotSet)
156    }
157
158    fn is_authenticated(&self, id: &PublicKey) -> bool {
159        self.authenticated
160            .lock()
161            .map(|set| set.contains(id))
162            .unwrap_or(false)
163    }
164
165    fn add_authenticated(&self, id: PublicKey) -> Result<(), AuthenticatorError> {
166        self.authenticated
167            .lock()
168            .map_err(|_| AuthenticatorError::AddFailed)?
169            .insert(id);
170        let mut counter = self.watcher.get();
171        counter.authenticated += 1;
172        self.watcher
173            .set(counter)
174            .map_err(|_| AuthenticatorError::AddFailed)?;
175        Ok(())
176    }
177
178    fn add_blocked(&self) -> Result<(), AuthenticatorError> {
179        let mut counter = self.watcher.get();
180        counter.blocked += 1;
181        self.watcher
182            .set(counter)
183            .map_err(|_| AuthenticatorError::AddFailed)?;
184        Ok(())
185    }
186
187    #[doc(hidden)]
188    pub fn list_authenticated(&self) -> Vec<PublicKey> {
189        self.authenticated
190            .lock()
191            .map(|set| set.iter().cloned().collect())
192            .unwrap_or_default()
193    }
194
195    async fn end_of_auth(&self, send: &mut iroh::endpoint::SendStream, open: bool) -> Result<(), AuthenticatorError> {
196        send.finish().map_err(|err| {
197            error!("[end_of_auth] failed to finish stream: {}", err);
198            if open {
199                AuthenticatorError::OpenFailed(format!("Failed to finish stream: {}", err))
200            } else {
201                AuthenticatorError::AcceptFailed(format!("Failed to finish stream: {}", err))
202            }
203        })?;
204        send.stopped().await.map_err(|err| {
205            error!("[end_of_auth] failed to wait for stream stopped: {}", err);
206            if open {
207                AuthenticatorError::OpenFailed(format!("Failed to wait for stream stopped: {}", err))
208            } else {
209                AuthenticatorError::AcceptFailed(format!("Failed to wait for stream stopped: {}", err))
210            }
211        })?;
212        Ok(())
213    }
214
215    /// Accept an incoming connection and perform SPAKE2 authentication.
216    /// On success, adds the remote ID to the authenticated set.
217    /// Returns Ok(()) on success, or an AuthenticatorError on failure.
218    async fn auth_accept(&self, conn: Connection) -> Result<(), AuthenticatorError> {
219        let remote_id = conn.remote_id();
220        debug!("[auth_accept] accepting auth connection from {}", remote_id);
221        let (mut send, mut recv) = conn.accept_bi().await.map_err(|err| {
222            error!("[auth_accept] accept bidirectional stream failed: {}", err);
223            AuthenticatorError::AcceptFailed(format!("Accept bidirectional stream failed: {}", err))
224        })?;
225
226        let (spake, token_b) = Spake2::<Ed25519Group>::start_b(
227            &Password::new(self.secret.expose_secret()),
228            &Identity::new(conn.remote_id().as_bytes()),
229            &Identity::new(self.id()?.as_bytes()),
230        );
231
232        let mut token_a = [0u8; 33];
233        recv.read_exact(&mut token_a).await.map_err(|err| {
234            error!("[auth_accept] failed to read token_a: {}", err);
235            AuthenticatorError::AcceptFailed(format!("Failed to read token_a: {}", err))
236        })?;
237
238        send.write_all(&token_b).await.map_err(|err| {
239            error!("[auth_accept] failed to write token_b: {}", err);
240            AuthenticatorError::AcceptFailed(format!("Failed to write token_b: {}", err))
241        })?;
242
243        let shared_secret = spake.finish(&token_a).map_err(|err| {
244            error!("[auth_accept] SPAKE2 invalid: {}", err);
245            AuthenticatorError::AcceptFailedAndBlock(format!("SPAKE2 invalid: {}", err), remote_id)
246        })?;
247
248        let hk = Hkdf::<Sha512>::new(None, shared_secret.as_slice());
249        let mut accept_key = [0u8; 64];
250        let mut open_key = [0u8; 64];
251        hk.expand(Self::ACCEPT_CONTEXT, &mut accept_key)
252            .map_err(|err| {
253                error!("[auth_accept] failed to expand accept_key: {}", err);
254                AuthenticatorError::AcceptFailed(format!("Failed to expand accept_key: {}", err))
255            })?;
256        hk.expand(Self::OPEN_CONTEXT, &mut open_key)
257            .map_err(|err| {
258                error!("[auth_accept] failed to expand open_key: {}", err);
259                AuthenticatorError::AcceptFailed(format!("Failed to expand open_key: {}", err))
260            })?;
261
262        send.write_all(&accept_key).await.map_err(|err| {
263            error!("[auth_accept] failed to write accept_key: {}", err);
264            AuthenticatorError::AcceptFailed(format!("Failed to write accept_key: {}", err))
265        })?;
266        let mut remote_open_key = [0u8; 64];
267        recv.read_exact(&mut remote_open_key).await.map_err(|err| {
268            error!("[auth_accept] failed to read remote_open_key: {}", err);
269            AuthenticatorError::AcceptFailed(format!("Failed to read remote_open_key: {}", err))
270        })?;
271
272        self.end_of_auth(&mut send, false).await?;
273
274        if !bool::from(remote_open_key.ct_eq(&open_key)) {
275            error!("[auth_accept] remote open_key mismatch");
276            return Err(AuthenticatorError::AcceptFailedAndBlock(
277                "Remote open_key mismatch".to_string(),
278                remote_id,
279            ));
280        }
281
282        self.add_authenticated(conn.remote_id())?;
283        info!("[auth_accept] authenticated connection from {}", remote_id);
284
285        Ok(())
286    }
287
288    /// Open an outgoing connection and perform SPAKE2 authentication.
289    /// On success, adds the remote ID to the authenticated set.
290    /// Returns Ok(()) on success, or an AuthenticatorError on failure.
291    async fn auth_open(&self, conn: Connection) -> Result<(), AuthenticatorError> {
292        let remote_id = conn.remote_id();
293        debug!("[auth_open] opening auth connection to {}", remote_id);
294        let (mut send, mut recv) = conn.open_bi().await.map_err(|err| {
295            error!("[auth_open] open bidirectional stream failed: {}", err);
296            AuthenticatorError::OpenFailed(format!("Open bidirectional stream failed: {}", err))
297        })?;
298
299        let (spake, token_a) = Spake2::<Ed25519Group>::start_a(
300            &Password::new(self.secret.expose_secret()),
301            &Identity::new(self.id()?.as_bytes()),
302            &Identity::new(conn.remote_id().as_bytes()),
303        );
304
305        send.write_all(&token_a).await.map_err(|err| {
306            error!("[auth_open] failed to write token_a: {}", err);
307            AuthenticatorError::OpenFailed(format!("Failed to write token_a: {}", err))
308        })?;
309
310        let mut token_b = [0u8; 33];
311        recv.read_exact(&mut token_b).await.map_err(|err| {
312            error!("[auth_open] failed to read token_b: {}", err);
313            AuthenticatorError::OpenFailed(format!("Failed to read token_b: {}", err))
314        })?;
315
316        let shared_secret = spake.finish(&token_b).map_err(|err| {
317            error!("[auth_open] SPAKE2 invalid: {}", err);
318            AuthenticatorError::OpenFailedAndBlock(format!("SPAKE2 invalid: {}", err), remote_id)
319        })?;
320
321        let hk = Hkdf::<Sha512>::new(None, shared_secret.as_slice());
322        let mut accept_key = [0u8; 64];
323        let mut open_key = [0u8; 64];
324        hk.expand(Self::ACCEPT_CONTEXT, &mut accept_key)
325            .map_err(|err| {
326                error!("[auth_open] failed to expand accept_key: {}", err);
327                AuthenticatorError::OpenFailed(format!("Failed to expand accept_key: {}", err))
328            })?;
329        hk.expand(Self::OPEN_CONTEXT, &mut open_key)
330            .map_err(|err| {
331                error!("[auth_open] failed to expand open_key: {}", err);
332                AuthenticatorError::OpenFailed(format!("Failed to expand open_key: {}", err))
333            })?;
334
335        let mut remote_accept_key = [0u8; 64];
336        recv.read_exact(&mut remote_accept_key)
337            .await
338            .map_err(|err| {
339                error!("[auth_open] failed to read remote_accept_key: {}", err);
340                AuthenticatorError::OpenFailed(format!("Failed to read remote_accept_key: {}", err))
341            })?;
342
343        if !bool::from(remote_accept_key.ct_eq(&accept_key)) {
344            error!("[auth_open] remote accept_key mismatch");
345
346            // Writing a random dummy open_key back to finishing the stream but not give away
347            // that the accept_key was correct to avoid leaking information to an attacker about valid accept_keys
348            // (probably not needed but better safe than sorry ^^)
349            send.write_all(&rand::random::<[u8; 64]>()).await.ok();
350            self.end_of_auth(&mut send, true).await?;
351
352            return Err(AuthenticatorError::OpenFailedAndBlock(
353                "Remote accept_key mismatch".to_string(),
354                remote_id,
355            ));
356        }
357
358        send.write_all(&open_key).await.map_err(|err| {
359            error!("[auth_open] failed to write open_key: {}", err);
360            AuthenticatorError::OpenFailed(format!("Failed to write open_key: {}", err))
361        })?;
362        self.end_of_auth(&mut send, true).await?;
363
364        self.add_authenticated(conn.remote_id())?;
365        info!("[auth_open] authenticated connection to {}", remote_id);
366
367        Ok(())
368    }
369}
370
371impl ProtocolHandler for Authenticator {
372    async fn accept(
373        &self,
374        connection: iroh::endpoint::Connection,
375    ) -> Result<(), iroh::protocol::AcceptError> {
376        match timeout(AUTH_TIMEOUT, self.auth_accept(connection)).await {
377            Ok(Ok(())) => Ok(()),
378            Ok(Err(err)) => match &err {
379                AuthenticatorError::AcceptFailedAndBlock(msg, public_key) => {
380                    warn!("[accept] authentication failed and blocking {}: {}", public_key, msg);
381                    self.add_blocked().ok();
382                    Err(iroh::protocol::AcceptError::from_err(err))
383                }
384                _ => {
385                    warn!("[accept] authentication failed: {}", err);
386                    Err(iroh::protocol::AcceptError::from_err(err))
387                }
388            },
389            Err(_) => {
390                warn!("[accept] authentication failed: timed out");
391                Err(iroh::protocol::AcceptError::from_err(
392                    AuthenticatorError::AcceptFailed("Authentication timed out".into()),
393                ))
394            }
395        }
396    }
397}
398
399impl EndpointHooks for Authenticator {
400    async fn after_handshake<'a>(
401        &'a self,
402        conn_info: &'a iroh::endpoint::ConnectionInfo,
403    ) -> iroh::endpoint::AfterHandshakeOutcome {
404        if self.is_authenticated(&conn_info.remote_id()) {
405            debug!("[after_handshake] already authenticated: {}", conn_info.remote_id());
406            return AfterHandshakeOutcome::accept();
407        }
408
409        if conn_info.alpn() == Self::ALPN {
410            debug!(
411                "[after_handshake] skipping auth for connection with alpn {}",
412                String::from_utf8_lossy(conn_info.alpn())
413            );
414            return AfterHandshakeOutcome::accept();
415        }
416
417        let remote_id = conn_info.remote_id();
418        let counter = self.watcher.get();
419
420        let wait_for_auth = async {
421            let mut stream = self.watcher.watch().stream();
422            while let Some(next_counter) = stream.next().await {
423                if next_counter != counter && self.is_authenticated(&remote_id) {
424                    return Ok(()) as Result<(), AuthenticatorError>;
425                }
426            }
427            Err(AuthenticatorError::AcceptFailed(
428                "Watcher stream ended unexpectedly".to_string(),
429            ))
430        };
431
432        match timeout(AUTH_TIMEOUT, wait_for_auth).await {
433            Ok(_) => AfterHandshakeOutcome::accept(),
434            Err(_) => {
435                warn!("[after_handshake] authentication timed out for {}", remote_id);
436                AfterHandshakeOutcome::Reject {
437                    error_code: VarInt::from_u32(401),
438                    reason: b"Authentication timed out".to_vec(),
439                }
440            }
441        }
442    }
443
444    async fn before_connect<'a>(
445        &'a self,
446        remote_addr: &'a iroh::EndpointAddr,
447        alpn: &'a [u8],
448    ) -> iroh::endpoint::BeforeConnectOutcome {
449        if self.is_authenticated(&remote_addr.id) {
450            debug!("[before_connect] already authenticated: {}", remote_addr.id);
451            return iroh::endpoint::BeforeConnectOutcome::Accept;
452        }
453
454        if alpn == Self::ALPN {
455            debug!(
456                "[before_connect] skipping auth for connection to {} with alpn {:?}",
457                remote_addr.id, alpn
458            );
459            return iroh::endpoint::BeforeConnectOutcome::Accept;
460        }
461
462        debug!(
463            "[before_connect] initiating auth for client connection with alpn {} to {}",
464            String::from_utf8_lossy(alpn),
465            remote_addr.id
466        );
467        let endpoint = match self.endpoint() {
468            Ok(ep) => ep,
469            Err(_) => {
470                warn!("[before_connect] authenticator endpoint not set");
471                return iroh::endpoint::BeforeConnectOutcome::Reject;
472            }
473        };
474        spawn({
475            let auth = self.clone();
476            let remote_id = remote_addr.id;
477
478            async move {
479                debug!("[before_connect] background: connecting to {} for auth", remote_id);
480                let start = std::time::Instant::now();
481                while start.elapsed() < AUTH_TIMEOUT {
482                    match endpoint.connect(remote_id, Self::ALPN).await {
483                        Ok(conn) => {
484                            debug!("[before_connect] background: connected to {}, performing auth", remote_id);
485                            match timeout(AUTH_TIMEOUT, auth.auth_open(conn)).await {
486                                Ok(Ok(())) => {
487                                    debug!(
488                                        "[before_connect] background: authentication successful for {}",
489                                        remote_id
490                                    );
491                                    return;
492                                }
493                                Ok(Err(err)) => match &err {
494                                    AuthenticatorError::OpenFailedAndBlock(msg, public_key) => {
495                                        warn!(
496                                            "[before_connect] authentication failed and blocking {}: {}",
497                                            public_key, msg
498                                        );
499                                        auth.add_blocked().ok();
500                                        return;
501                                    }
502                                    _ => {
503                                        warn!("[before_connect] authentication failed for {}: {}", remote_id, err);
504                                    }
505                                },
506                                Err(_) => {
507                                    warn!(
508                                        "[before_connect] background: authentication timed out for {}, retrying...",
509                                        remote_id
510                                    );
511                                }
512                            }
513                        }
514                        Err(e) => {
515                            warn!(
516                                "[before_connect] background: failed to open connection for authentication to {}: {}, retrying...",
517                                remote_id, e
518                            );
519                        }
520                    };
521                    
522                    tokio::time::sleep(Duration::from_millis(500)).await;
523                }
524                warn!("[before_connect] background: authentication timed out for {}", remote_id);
525            }
526        });
527        iroh::endpoint::BeforeConnectOutcome::Accept
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use iroh::Watcher;
534
535    use super::*;
536    #[test]
537    fn test_token_different() {
538        let password = b"testpassword";
539        let id_a = b"identityA";
540        let id_b = b"identityB";
541
542        let (spake_a, token_a) = Spake2::<Ed25519Group>::start_a(
543            &Password::new(password),
544            &Identity::new(id_a),
545            &Identity::new(id_b),
546        );
547
548        let (spake_b, token_b) = Spake2::<Ed25519Group>::start_b(
549            &Password::new(password),
550            &Identity::new(id_a),
551            &Identity::new(id_b),
552        );
553
554        assert_ne!(token_a, token_b);
555
556        let key_a = spake_a.finish(&token_b).unwrap();
557        let key_b = spake_b.finish(&token_a).unwrap();
558
559        assert_eq!(key_a, key_b);
560    }
561
562    #[derive(Debug, Clone)]
563    struct DummyProtocol;
564    impl ProtocolHandler for DummyProtocol {
565        async fn accept(&self, _conn: Connection) -> Result<(), iroh::protocol::AcceptError> {
566            Ok(())
567        }
568    }
569
570    #[tokio::test(flavor = "multi_thread")]
571    async fn test_auth_success() {
572        let secret = b"supersecrettoken1234567890123456";
573        assert!(run_auth_test(secret, secret).await.unwrap());
574    }
575
576    #[tokio::test(flavor = "multi_thread")]
577    async fn test_auth_failure() {
578        let secret_a = b"supersecrettoken1234567890123456";
579        let secret_b = b"differentsecrettoken123456789012";
580        assert!(!run_auth_test(secret_a, secret_b).await.unwrap());
581    }
582
583    async fn run_auth_test(
584        secret_a: &'static [u8],
585        secret_b: &'static [u8],
586    ) -> Result<bool, String> {
587        let auth_a = Authenticator::new(secret_a);
588        let endpoint_a = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
589            .hooks(auth_a.clone())
590            .bind()
591            .await
592            .map_err(|e| e.to_string())?;
593        auth_a.set_endpoint(&endpoint_a);
594        let dummy_a = DummyProtocol;
595
596        let auth_b = Authenticator::new(secret_b);
597        let endpoint_b = iroh::Endpoint::builder(iroh::endpoint::presets::N0)
598            .hooks(auth_b.clone())
599            .bind()
600            .await
601            .map_err(|e| e.to_string())?;
602        auth_b.set_endpoint(&endpoint_b);
603        let dummy_b = DummyProtocol;
604
605        let router_a = iroh::protocol::Router::builder(endpoint_a.clone())
606            .accept(Authenticator::ALPN, auth_a.clone())
607            .accept(b"/dummy/1", dummy_a)
608            .spawn();
609
610        let router_b = iroh::protocol::Router::builder(endpoint_b.clone())
611            .accept(Authenticator::ALPN, auth_b.clone())
612            .accept(b"/dummy/1", dummy_b)
613            .spawn();
614
615        spawn({
616            let endpoint_a = endpoint_a.clone();
617            let endpoint_b = endpoint_b.clone();
618            async move {
619                endpoint_a
620                    .connect(endpoint_b.addr(), b"/dummy/1")
621                    .await
622                    .ok();
623            }
624        });
625
626        let wait_loop = async {
627            use n0_future::StreamExt;
628
629            let wait_a = async {
630                let mut stream = auth_a.watcher.watch().stream();
631                while let Some(counter) = stream.next().await {
632                    debug!(
633                        "auth_a watcher: authenticated={}, blocked={}",
634                        counter.authenticated, counter.blocked
635                    );
636                    if counter.authenticated >= 1 || counter.blocked >= 1 {
637                        break;
638                    }
639                }
640            };
641            let wait_b = async {
642                let mut stream = auth_b.watcher.watch().stream();
643                while let Some(counter) = stream.next().await {
644                    debug!(
645                        "auth_b watcher: authenticated={}, blocked={}",
646                        counter.authenticated, counter.blocked
647                    );
648                    if counter.authenticated >= 1 || counter.blocked >= 1 {
649                        break;
650                    }
651                }
652            };
653            tokio::join!(wait_a, wait_b);
654        };
655
656        if timeout(AUTH_TIMEOUT * 2, wait_loop).await.is_err() {
657            router_a.shutdown().await.ok();
658            router_b.shutdown().await.ok();
659            return Err("Authentication did not complete in time".to_string());
660        }
661
662        router_a.shutdown().await.ok();
663        router_b.shutdown().await.ok();
664
665        Ok(auth_a.is_authenticated(&endpoint_b.id()) && auth_b.is_authenticated(&endpoint_a.id()))
666    }
667
668    #[test]
669    fn test_into_secret_impls() {
670        use secrecy::ExposeSecret;
671
672        let expected_bytes = b"my-secret-key";
673
674        // &str
675        let secret = "my-secret-key".into_secret();
676        assert_eq!(secret.expose_secret(), expected_bytes);
677
678        // String
679        let secret = String::from("my-secret-key").into_secret();
680        assert_eq!(secret.expose_secret(), expected_bytes);
681        // Vec<u8>
682        let secret = b"my-secret-key".to_vec().into_secret();
683        assert_eq!(secret.expose_secret(), expected_bytes);
684
685        // &[u8]
686        let bytes: &[u8] = b"my-secret-key";
687        let secret = bytes.into_secret();
688        assert_eq!(secret.expose_secret(), expected_bytes);
689
690        // &[u8; N]
691        let bytes: &[u8; 13] = b"my-secret-key";
692        let secret = bytes.into_secret();
693        assert_eq!(secret.expose_secret(), expected_bytes);
694
695        // Box<[u8]>
696        let bytes: Box<[u8]> = Box::new(*b"my-secret-key");
697        let secret = bytes.into_secret();
698        assert_eq!(secret.expose_secret(), expected_bytes);
699
700        // SecretSlice<u8>
701        let ps = SecretSlice::new(Box::new(*b"my-secret-key"));
702        let secret = ps.into_secret();
703        assert_eq!(secret.expose_secret(), expected_bytes);
704    }
705}