Skip to main content

iroh_auth/
auth.rs

1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
2
3use hkdf::Hkdf;
4use iroh::{endpoint::Connection, Endpoint, EndpointId, PublicKey};
5use lru::LruCache;
6use n0_watcher::Watchable;
7use secrecy::{ExposeSecret, SecretSlice};
8use sha2::Sha512;
9use spake2::{Ed25519Group, Identity, Password, Spake2};
10use subtle::ConstantTimeEq;
11use tokio::{
12    sync::Mutex,
13    time::{timeout, Instant},
14};
15use tracing::{error, info, trace, warn};
16
17use crate::{
18    protocol::release_in_flight, AuthenticatorError, IntoSecret, ALPN, AUTH_TIMEOUT,
19    TRANSMISSION_TIMEOUT,
20};
21
22#[derive(Debug, Clone)]
23pub struct Authenticator {
24    secret: SecretSlice<u8>,
25    endpoint: Arc<Mutex<Option<iroh::Endpoint>>>,
26    pub(crate) auth_state: Arc<Mutex<LruCache<EndpointId, WatchableRemote>>>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum AuthState {
31    Unauthenticated,
32    InFlight,
33    Authenticated,
34    Blocked,
35}
36
37impl std::fmt::Display for AuthState {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        match self {
40            AuthState::Unauthenticated => write!(f, "Unauthenticated"),
41            AuthState::InFlight => write!(f, "InFlight"),
42            AuthState::Authenticated => write!(f, "Authenticated"),
43            AuthState::Blocked => write!(f, "Blocked"),
44        }
45    }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub(crate) enum RegisterResponse {
50    InFlightRegistered,   // auth after this
51    AlreadyInFlight, // another auth was running while we called this, only check is_authenticated, don't auth again
52    AlreadyAuthenticated, // we are already authenticated, same as AlreadyInFlight => only check is_authenticated, don't auth again
53    AlreadyBlocked,       // we are blocked, DO NOT AUTH AGAIN, just reject
54}
55
56#[derive(Debug, Clone)]
57pub(crate) struct WatchableRemote {
58    id: PublicKey,
59    inner: Watchable<AuthState>,
60}
61
62impl WatchableRemote {
63    /// inner is AuthState::Unauthenticated by default
64    pub fn new(id: PublicKey) -> Self {
65        Self {
66            id,
67            inner: Watchable::new(AuthState::Unauthenticated),
68        }
69    }
70
71    pub fn watcher(&self) -> Watchable<AuthState> {
72        self.inner.clone()
73    }
74
75    pub fn id(&self) -> &PublicKey {
76        &self.id
77    }
78
79    pub fn state(&self) -> AuthState {
80        self.inner.get()
81    }
82
83    pub fn set_state(&self, state: AuthState) {
84        let previous_state = self.inner.get();
85        if previous_state == state {
86            trace!(
87                "[watchable_remote] endpoint {} state unchanged at {}",
88                self.id,
89                state
90            );
91        } else {
92            trace!(
93                "[watchable_remote] endpoint {} state transition {} -> {}",
94                self.id,
95                previous_state,
96                state
97            );
98        }
99        self.inner.set(state).ok();
100    }
101}
102
103impl PartialEq for WatchableRemote {
104    fn eq(&self, other: &Self) -> bool {
105        self.id() == other.id()
106    }
107}
108
109impl Eq for WatchableRemote {}
110
111impl PartialEq<PublicKey> for WatchableRemote {
112    fn eq(&self, other: &PublicKey) -> bool {
113        self.id() == other
114    }
115}
116
117impl std::hash::Hash for WatchableRemote {
118    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
119        self.id().hash(state);
120    }
121}
122
123impl Authenticator {
124    pub const ALPN: &'static [u8] = crate::ALPN;
125    const ACCEPT_CONTEXT: &'static [u8] = b"iroh-auth-accept";
126    const OPEN_CONTEXT: &'static [u8] = b"iroh-auth-open";
127
128    pub fn new<S: IntoSecret>(secret: S) -> Self {
129        Self {
130            secret: secret.into_secret(),
131            endpoint: Arc::new(Mutex::new(None)),
132            auth_state: Arc::new(Mutex::new(LruCache::new(
133                NonZeroUsize::new(crate::LRU_CACHE_SIZE).expect("LRU_CACHE_SIZE must be > 0"),
134            ))),
135        }
136    }
137
138    pub async fn set_endpoint(&self, endpoint: &Endpoint) {
139        let mut guard = self.endpoint.lock().await;
140        if guard.is_none() {
141            *guard = Some(endpoint.clone());
142            trace!("Authenticator endpoint set to {}", endpoint.id());
143        } else {
144            trace!("Authenticator endpoint already set, ignoring {}", endpoint.id());
145        }
146    }
147
148    async fn id(&self) -> Result<PublicKey, AuthenticatorError> {
149        self.endpoint
150            .lock()
151            .await
152            .as_ref()
153            .map(|ep| ep.id())
154            .ok_or(AuthenticatorError::EndpointNotSet)
155    }
156
157    pub(crate) async fn endpoint(&self) -> Result<iroh::Endpoint, AuthenticatorError> {
158        self.endpoint
159            .lock()
160            .await
161            .as_ref()
162            .cloned()
163            .ok_or(AuthenticatorError::EndpointNotSet)
164    }
165
166    pub async fn is_authenticated(&self, id: &PublicKey) -> bool {
167        let state = self
168            .auth_state
169            .lock()
170            .await
171            .get(id)
172            .map(|watchable| watchable.state());
173
174        match state {
175            Some(AuthState::Authenticated) => {
176                trace!("[is_authenticated] endpoint {} is authenticated", id);
177                true
178            }
179            Some(other) => {
180                trace!(
181                    "[is_authenticated] endpoint {} is not authenticated, current state {}",
182                    id,
183                    other
184                );
185                false
186            }
187            None => {
188                trace!("[is_authenticated] endpoint {} has no auth state entry", id);
189                false
190            }
191        }
192    }
193
194    #[cfg(test)]
195    pub async fn list_authenticated(&self) -> Vec<PublicKey> {
196        self.auth_state
197            .lock()
198            .await
199            .iter()
200            .filter_map(|(id, watchable)| {
201                if watchable.state() == AuthState::Authenticated {
202                    Some(*id)
203                } else {
204                    None
205                }
206            })
207            .collect::<Vec<_>>()
208    }
209
210    #[cfg(test)]
211    pub async fn list_blocked(&self) -> Vec<PublicKey> {
212        self.auth_state
213            .lock()
214            .await
215            .iter()
216            .filter_map(|(id, watchable)| {
217                if watchable.state() == AuthState::Blocked {
218                    Some(*id)
219                } else {
220                    None
221                }
222            })
223            .collect::<Vec<_>>()
224    }
225}
226
227impl Authenticator {
228    async fn end_of_auth(
229        &self,
230        send: &mut iroh::endpoint::SendStream,
231        recv: &mut iroh::endpoint::RecvStream,
232        open: bool,
233    ) -> Result<(), AuthenticatorError> {
234        let start = Instant::now();
235        trace!(
236            "[end_of_auth] starting shutdown sequence for {} side",
237            if open { "open" } else { "accept" }
238        );
239        send.finish().map_err(|err| {
240            error!("[end_of_auth] failed to finish stream: {}", err);
241            if open {
242                AuthenticatorError::OpenFailed(format!("Failed to finish stream: {}", err))
243            } else {
244                AuthenticatorError::AcceptFailed(format!("Failed to finish stream: {}", err))
245            }
246        })?;
247
248        const MAX_READ_SIZE: usize = 1024;
249        if let Err(err) = tokio::time::timeout(AUTH_TIMEOUT, recv.read_to_end(MAX_READ_SIZE))
250            .await
251            .map_err(|_| {
252                if open {
253                    AuthenticatorError::OpenFailed(
254                        "Failed to wait for stream stopped: timeout".to_string(),
255                    )
256                } else {
257                    AuthenticatorError::AcceptFailed(
258                        "Failed to wait for stream stopped: timeout".to_string(),
259                    )
260                }
261            })
262            .and_then(|res| {
263                res.map_err(|err| {
264                    if open {
265                        AuthenticatorError::OpenFailed(format!(
266                            "Failed to read remaining data from stream: {}",
267                            err
268                        ))
269                    } else {
270                        AuthenticatorError::AcceptFailed(format!(
271                            "Failed to read remaining data from stream: {}",
272                            err
273                        ))
274                    }
275                })
276            })
277        {
278            warn!("[end_of_auth] {}", err);
279        }
280        trace!(
281            "[end_of_auth] shutdown sequence for {} side completed in {:?}",
282            if open { "open" } else { "accept" },
283            start.elapsed()
284        );
285        Ok(())
286    }
287
288    /// Accept an incoming connection and perform SPAKE2 authentication.
289    /// On success, adds the remote ID to the authenticated set.
290    /// Returns Ok(()) on success, or an AuthenticatorError on failure.
291    pub(crate) async fn auth_accept(&self, conn: Connection) -> Result<(), AuthenticatorError> {
292        let remote_id = conn.remote_id();
293        let start = Instant::now();
294        trace!("[auth_accept] accepting auth connection from {}", remote_id);
295        trace!("[auth_accept] waiting for inbound bidirectional stream from {}", remote_id);
296        let (mut send, mut recv) = timeout(TRANSMISSION_TIMEOUT, conn.accept_bi())
297            .await
298            .map_err(|_| {
299                error!("[auth_accept] accept bidirectional stream timed out");
300                AuthenticatorError::AcceptFailed(
301                    "Accept bidirectional stream timed out".to_string(),
302                )
303            })?
304            .map_err(|err| {
305                error!("[auth_accept] accept bidirectional stream failed: {}", err);
306                AuthenticatorError::AcceptFailed(format!(
307                    "Accept bidirectional stream failed: {}",
308                    err
309                ))
310            })?;
311        trace!(
312            "[auth_accept] bidirectional stream accepted from {} after {:?}",
313            remote_id,
314            start.elapsed()
315        );
316
317        let (spake, token_b) = Spake2::<Ed25519Group>::start_b(
318            &Password::new(self.secret.expose_secret()),
319            &Identity::new(conn.remote_id().as_bytes()),
320            &Identity::new(self.id().await?.as_bytes()),
321        );
322
323        let mut token_a = [0u8; 33];
324        trace!("[auth_accept] waiting for token_a from {}", remote_id);
325        recv.read_exact(&mut token_a).await.map_err(|err| {
326            error!("[auth_accept] failed to read token_a: {}", err);
327            AuthenticatorError::AcceptFailed(format!("Failed to read token_a: {}", err))
328        })?;
329        trace!("[auth_accept] received token_a from {}", remote_id);
330
331        trace!("[auth_accept] sending token_b to {}", remote_id);
332        send.write_all(&token_b).await.map_err(|err| {
333            error!("[auth_accept] failed to write token_b: {}", err);
334            AuthenticatorError::AcceptFailed(format!("Failed to write token_b: {}", err))
335        })?;
336        trace!("[auth_accept] sent token_b to {}", remote_id);
337
338        let shared_secret = spake.finish(&token_a).map_err(|err| {
339            error!("[auth_accept] SPAKE2 invalid: {}", err);
340            AuthenticatorError::AcceptFailedAndBlock(format!("SPAKE2 invalid: {}", err), remote_id)
341        })?;
342        trace!("[auth_accept] derived shared secret for {}", remote_id);
343
344        let hk = Hkdf::<Sha512>::new(None, shared_secret.as_slice());
345        let mut accept_key = [0u8; 64];
346        let mut open_key = [0u8; 64];
347        hk.expand(Self::ACCEPT_CONTEXT, &mut accept_key)
348            .map_err(|err| {
349                error!("[auth_accept] failed to expand accept_key: {}", err);
350                AuthenticatorError::AcceptFailed(format!("Failed to expand accept_key: {}", err))
351            })?;
352        hk.expand(Self::OPEN_CONTEXT, &mut open_key)
353            .map_err(|err| {
354                error!("[auth_accept] failed to expand open_key: {}", err);
355                AuthenticatorError::AcceptFailed(format!("Failed to expand open_key: {}", err))
356            })?;
357
358        trace!("[auth_accept] sending accept_key to {}", remote_id);
359        send.write_all(&accept_key).await.map_err(|err| {
360            error!("[auth_accept] failed to write accept_key: {}", err);
361            AuthenticatorError::AcceptFailed(format!("Failed to write accept_key: {}", err))
362        })?;
363        let mut remote_open_key = [0u8; 64];
364        trace!("[auth_accept] waiting for remote_open_key from {}", remote_id);
365        recv.read_exact(&mut remote_open_key).await.map_err(|err| {
366            error!("[auth_accept] failed to read remote_open_key: {}", err);
367            AuthenticatorError::AcceptFailed(format!("Failed to read remote_open_key: {}", err))
368        })?;
369        trace!("[auth_accept] received remote_open_key from {}", remote_id);
370
371        let _ = self.end_of_auth(&mut send, &mut recv, false).await;
372
373        if !bool::from(remote_open_key.ct_eq(&open_key)) {
374            error!("[auth_accept] remote open_key mismatch");
375            return Err(AuthenticatorError::AcceptFailedAndBlock(
376                "Remote open_key mismatch".to_string(),
377                remote_id,
378            ));
379        }
380
381        info!(
382            "[auth_accept] authenticated connection from {} in {:?}",
383            remote_id,
384            start.elapsed()
385        );
386
387        Ok(())
388    }
389
390    /// Open an outgoing connection and perform SPAKE2 authentication.
391    /// On success, adds the remote ID to the authenticated set.
392    /// Returns Ok(()) on success, or an AuthenticatorError on failure.
393    pub(crate) async fn auth_open(&self, conn: Connection) -> Result<(), AuthenticatorError> {
394        let remote_id = conn.remote_id();
395        let start = Instant::now();
396        trace!("[auth_open] opening auth connection to {}", remote_id);
397        trace!("[auth_open] waiting to open bidirectional stream to {}", remote_id);
398        let (mut send, mut recv) = timeout(TRANSMISSION_TIMEOUT, conn.open_bi())
399            .await
400            .map_err(|_| {
401                error!("[auth_open] open bidirectional stream timed out");
402                AuthenticatorError::OpenFailed("Open bidirectional stream timed out".to_string())
403            })?
404            .map_err(|err| {
405                error!("[auth_open] open bidirectional stream failed: {}", err);
406                AuthenticatorError::OpenFailed(format!("Open bidirectional stream failed: {}", err))
407            })?;
408        trace!(
409            "[auth_open] bidirectional stream opened to {} after {:?}",
410            remote_id,
411            start.elapsed()
412        );
413
414        let (spake, token_a) = Spake2::<Ed25519Group>::start_a(
415            &Password::new(self.secret.expose_secret()),
416            &Identity::new(self.id().await?.as_bytes()),
417            &Identity::new(conn.remote_id().as_bytes()),
418        );
419
420        trace!("[auth_open] sending token_a to {}", remote_id);
421        send.write_all(&token_a).await.map_err(|err| {
422            error!("[auth_open] failed to write token_a: {}", err);
423            AuthenticatorError::OpenFailed(format!("Failed to write token_a: {}", err))
424        })?;
425        trace!("[auth_open] sent token_a to {}", remote_id);
426
427        let mut token_b = [0u8; 33];
428        trace!("[auth_open] waiting for token_b from {}", remote_id);
429        recv.read_exact(&mut token_b).await.map_err(|err| {
430            error!("[auth_open] failed to read token_b: {}", err);
431            AuthenticatorError::OpenFailed(format!("Failed to read token_b: {}", err))
432        })?;
433        trace!("[auth_open] received token_b from {}", remote_id);
434
435        let shared_secret = spake.finish(&token_b).map_err(|err| {
436            error!("[auth_open] SPAKE2 invalid: {}", err);
437            AuthenticatorError::OpenFailedAndBlock(format!("SPAKE2 invalid: {}", err), remote_id)
438        })?;
439        trace!("[auth_open] derived shared secret for {}", remote_id);
440
441        let hk = Hkdf::<Sha512>::new(None, shared_secret.as_slice());
442        let mut accept_key = [0u8; 64];
443        let mut open_key = [0u8; 64];
444        hk.expand(Self::ACCEPT_CONTEXT, &mut accept_key)
445            .map_err(|err| {
446                error!("[auth_open] failed to expand accept_key: {}", err);
447                AuthenticatorError::OpenFailed(format!("Failed to expand accept_key: {}", err))
448            })?;
449        hk.expand(Self::OPEN_CONTEXT, &mut open_key)
450            .map_err(|err| {
451                error!("[auth_open] failed to expand open_key: {}", err);
452                AuthenticatorError::OpenFailed(format!("Failed to expand open_key: {}", err))
453            })?;
454
455        let mut remote_accept_key = [0u8; 64];
456        trace!("[auth_open] waiting for remote_accept_key from {}", remote_id);
457        recv.read_exact(&mut remote_accept_key)
458            .await
459            .map_err(|err| {
460                error!("[auth_open] failed to read remote_accept_key: {}", err);
461                AuthenticatorError::OpenFailed(format!("Failed to read remote_accept_key: {}", err))
462            })?;
463        trace!("[auth_open] received remote_accept_key from {}", remote_id);
464
465        if !bool::from(remote_accept_key.ct_eq(&accept_key)) {
466            error!("[auth_open] remote accept_key mismatch");
467
468            // Writing a random dummy open_key back to finishing the stream but not give away
469            // that the accept_key was correct to avoid leaking information to an attacker about valid accept_keys
470            // (probably not needed but better safe than sorry ^^)
471            send.write_all(&rand::random::<[u8; 64]>()).await.ok();
472            let _ = self.end_of_auth(&mut send, &mut recv, true).await;
473
474            return Err(AuthenticatorError::OpenFailedAndBlock(
475                "Remote accept_key mismatch".to_string(),
476                remote_id,
477            ));
478        }
479
480        trace!("[auth_open] sending open_key to {}", remote_id);
481        send.write_all(&open_key).await.map_err(|err| {
482            error!("[auth_open] failed to write open_key: {}", err);
483            AuthenticatorError::OpenFailed(format!("Failed to write open_key: {}", err))
484        })?;
485        let _ = self.end_of_auth(&mut send, &mut recv, true).await;
486
487        info!(
488            "[auth_open] authenticated connection to {} in {:?}",
489            remote_id,
490            start.elapsed()
491        );
492
493        Ok(())
494    }
495}
496
497impl Authenticator {
498    pub(crate) async fn perform_auth(
499        &self,
500        remote_id: EndpointId,
501        endpoint: Endpoint,
502    ) -> Result<(), AuthenticatorError> {
503        let start_time = Instant::now();
504        let mut attempt = 0usize;
505        trace!("[perform_auth] starting authentication workflow for {}", remote_id);
506        if let Err(err) = timeout(AUTH_TIMEOUT, endpoint.online()).await.map_err(|_| {
507            AuthenticatorError::OpenFailed(
508                "[before_connect] awaiting endpoint.online() timed out".to_string(),
509            )
510        }) {
511            error!(
512                "[before_connect] awaiting endpoint.online() failed: {}",
513                err
514            );
515            release_in_flight(
516                self.auth_state.clone(),
517                remote_id,
518                AuthState::Unauthenticated,
519            )
520            .await
521            .map_err(|err| {
522                AuthenticatorError::OpenFailed(format!(
523                    "[before_connect] failed to release in-flight state for {}: {}",
524                    remote_id, err
525                ))
526            })?;
527            return Err(err);
528        }
529        trace!(
530            "[perform_auth] endpoint is online for {}, entering retry loop after {:?}",
531            remote_id,
532            start_time.elapsed()
533        );
534
535        while start_time.elapsed() < AUTH_TIMEOUT {
536            attempt += 1;
537            let attempt_start = Instant::now();
538            trace!(
539                "[perform_auth] attempt {} connecting to {} with {:?} remaining",
540                attempt,
541                remote_id,
542                remaining_timeout(start_time, AUTH_TIMEOUT)
543            );
544            match timeout(
545                remaining_timeout(start_time, AUTH_TIMEOUT),
546                endpoint.connect(remote_id, ALPN),
547            )
548            .await
549            {
550                Ok(Ok(conn)) => {
551                    trace!(
552                        "[perform_auth] attempt {} connected to {} after {:?}, starting auth_open",
553                        attempt,
554                        remote_id,
555                        attempt_start.elapsed()
556                    );
557                    match timeout(
558                        remaining_timeout(start_time, AUTH_TIMEOUT),
559                        self.auth_open(conn),
560                    )
561                    .await
562                    {
563                        Ok(Ok(())) => {
564                            trace!(
565                                "[perform_auth] attempt {} authentication successful for {} after {:?}",
566                                attempt,
567                                remote_id,
568                                attempt_start.elapsed()
569                            );
570
571                            release_in_flight(
572                                self.auth_state.clone(),
573                                remote_id,
574                                AuthState::Authenticated,
575                            )
576                            .await
577                            .map_err(|err| {
578                                error!(
579                                    "[before_connect] failed to release in-flight state for {}: {}",
580                                    remote_id, err
581                                );
582                                AuthenticatorError::OpenFailed(format!(
583                                    "[before_connect] failed to release in-flight state for {}: {}",
584                                    remote_id, err
585                                ))
586                            })?;
587                            info!(
588                                "[perform_auth] authentication workflow for {} completed successfully in {:?}",
589                                remote_id,
590                                start_time.elapsed()
591                            );
592                            return Ok(());
593                        }
594                        Ok(Err(err)) => match &err {
595                            AuthenticatorError::OpenFailedAndBlock(msg, public_key) => {
596                                warn!(
597                                    "[perform_auth] attempt {} authentication failed and blocking {} after {:?}: {}",
598                                    attempt,
599                                    public_key,
600                                    attempt_start.elapsed(),
601                                    msg
602                                );
603                                release_in_flight(
604                                    self.auth_state.clone(),
605                                    remote_id,
606                                    AuthState::Blocked,
607                                )
608                                .await
609                                .map_err(|err| {
610                                    AuthenticatorError::OpenFailedAndBlock(format!(
611                                        "[before_connect] failed to release in-flight state for {}: {}",
612                                        public_key, err
613                                    ), *public_key)
614                                })?;
615                                return Err(AuthenticatorError::OpenFailedAndBlock(
616                                    msg.clone(),
617                                    *public_key,
618                                ));
619                            }
620                            _ => {
621                                warn!(
622                                    "[perform_auth] attempt {} authentication failed for {} after {:?}: {}",
623                                    attempt,
624                                    remote_id,
625                                    attempt_start.elapsed(),
626                                    err
627                                );
628                            }
629                        },
630                        Err(_) => {
631                            warn!(
632                                "[perform_auth] attempt {} auth_open timed out for {} after {:?}, retrying",
633                                attempt,
634                                remote_id,
635                                attempt_start.elapsed()
636                            );
637                        }
638                    }
639                }
640                Ok(Err(e)) => {
641                    warn!(
642                            "[perform_auth] attempt {} failed to connect auth channel to {} after {:?}: {}, retrying",
643                            attempt, remote_id, attempt_start.elapsed(), e
644                        );
645                }
646                Err(e) => {
647                    warn!(
648                        "[perform_auth] attempt {} connection timed out for {} after {:?}: {}, retrying",
649                        attempt, remote_id, attempt_start.elapsed(), e
650                    );
651                }
652            };
653            trace!(
654                "[perform_auth] attempt {} for {} sleeping before retry with {:?} remaining",
655                attempt,
656                remote_id,
657                remaining_timeout(start_time, AUTH_TIMEOUT)
658            );
659            tokio::time::sleep(Duration::from_secs(1)).await;
660        }
661
662        warn!(
663            "[perform_auth] authentication workflow timed out for {} after {} attempts and {:?}",
664            remote_id,
665            attempt,
666            start_time.elapsed()
667        );
668
669        // Timeout no more retries, clean up in-flight state
670        release_in_flight(
671            self.auth_state.clone(),
672            remote_id,
673            AuthState::Unauthenticated,
674        )
675        .await
676        .map_err(|err| {
677            AuthenticatorError::OpenFailed(format!(
678                "[before_connect] failed to release in-flight state for {}: {}",
679                remote_id, err
680            ))
681        })?;
682        Err(AuthenticatorError::OpenFailed(format!(
683            "Authentication timed out for {}",
684            remote_id
685        )))
686    }
687}
688
689fn remaining_timeout(start: Instant, timeout_duration: Duration) -> Duration {
690    timeout_duration.saturating_sub(Instant::now().saturating_duration_since(start))
691}