1use std::{num::NonZeroUsize, sync::Arc, time::Duration};
2
3use hkdf::Hkdf;
4use iroh::{endpoint::Connection, Endpoint, EndpointId, PublicKey};
5use lru::LruCache;
6use n0_watcher::Watchable;
7use secrecy::{ExposeSecret, SecretSlice};
8use sha2::Sha512;
9use spake2::{Ed25519Group, Identity, Password, Spake2};
10use subtle::ConstantTimeEq;
11use tokio::{
12 sync::Mutex,
13 time::{timeout, Instant},
14};
15use tracing::{error, info, trace, warn};
16
17use crate::{
18 protocol::release_in_flight, AuthenticatorError, IntoSecret, ALPN, AUTH_TIMEOUT,
19 TRANSMISSION_TIMEOUT,
20};
21
22#[derive(Debug, Clone)]
23pub struct Authenticator {
24 secret: SecretSlice<u8>,
25 endpoint: Arc<Mutex<Option<iroh::Endpoint>>>,
26 pub(crate) auth_state: Arc<Mutex<LruCache<EndpointId, WatchableRemote>>>,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum AuthState {
31 Unauthenticated,
32 InFlight,
33 Authenticated,
34 Blocked,
35}
36
37impl std::fmt::Display for AuthState {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 match self {
40 AuthState::Unauthenticated => write!(f, "Unauthenticated"),
41 AuthState::InFlight => write!(f, "InFlight"),
42 AuthState::Authenticated => write!(f, "Authenticated"),
43 AuthState::Blocked => write!(f, "Blocked"),
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub(crate) enum RegisterResponse {
50 InFlightRegistered, AlreadyInFlight, AlreadyAuthenticated, AlreadyBlocked, }
55
56#[derive(Debug, Clone)]
57pub(crate) struct WatchableRemote {
58 id: PublicKey,
59 inner: Watchable<AuthState>,
60}
61
62impl WatchableRemote {
63 pub fn new(id: PublicKey) -> Self {
65 Self {
66 id,
67 inner: Watchable::new(AuthState::Unauthenticated),
68 }
69 }
70
71 pub fn watcher(&self) -> Watchable<AuthState> {
72 self.inner.clone()
73 }
74
75 pub fn id(&self) -> &PublicKey {
76 &self.id
77 }
78
79 pub fn state(&self) -> AuthState {
80 self.inner.get()
81 }
82
83 pub fn set_state(&self, state: AuthState) {
84 let previous_state = self.inner.get();
85 if previous_state == state {
86 trace!(
87 "[watchable_remote] endpoint {} state unchanged at {}",
88 self.id,
89 state
90 );
91 } else {
92 trace!(
93 "[watchable_remote] endpoint {} state transition {} -> {}",
94 self.id,
95 previous_state,
96 state
97 );
98 }
99 self.inner.set(state).ok();
100 }
101}
102
103impl PartialEq for WatchableRemote {
104 fn eq(&self, other: &Self) -> bool {
105 self.id() == other.id()
106 }
107}
108
109impl Eq for WatchableRemote {}
110
111impl PartialEq<PublicKey> for WatchableRemote {
112 fn eq(&self, other: &PublicKey) -> bool {
113 self.id() == other
114 }
115}
116
117impl std::hash::Hash for WatchableRemote {
118 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
119 self.id().hash(state);
120 }
121}
122
123impl Authenticator {
124 pub const ALPN: &'static [u8] = crate::ALPN;
125 const ACCEPT_CONTEXT: &'static [u8] = b"iroh-auth-accept";
126 const OPEN_CONTEXT: &'static [u8] = b"iroh-auth-open";
127
128 pub fn new<S: IntoSecret>(secret: S) -> Self {
129 Self {
130 secret: secret.into_secret(),
131 endpoint: Arc::new(Mutex::new(None)),
132 auth_state: Arc::new(Mutex::new(LruCache::new(
133 NonZeroUsize::new(crate::LRU_CACHE_SIZE).expect("LRU_CACHE_SIZE must be > 0"),
134 ))),
135 }
136 }
137
138 pub async fn set_endpoint(&self, endpoint: &Endpoint) {
139 let mut guard = self.endpoint.lock().await;
140 if guard.is_none() {
141 *guard = Some(endpoint.clone());
142 trace!("Authenticator endpoint set to {}", endpoint.id());
143 } else {
144 trace!("Authenticator endpoint already set, ignoring {}", endpoint.id());
145 }
146 }
147
148 async fn id(&self) -> Result<PublicKey, AuthenticatorError> {
149 self.endpoint
150 .lock()
151 .await
152 .as_ref()
153 .map(|ep| ep.id())
154 .ok_or(AuthenticatorError::EndpointNotSet)
155 }
156
157 pub(crate) async fn endpoint(&self) -> Result<iroh::Endpoint, AuthenticatorError> {
158 self.endpoint
159 .lock()
160 .await
161 .as_ref()
162 .cloned()
163 .ok_or(AuthenticatorError::EndpointNotSet)
164 }
165
166 pub async fn is_authenticated(&self, id: &PublicKey) -> bool {
167 let state = self
168 .auth_state
169 .lock()
170 .await
171 .get(id)
172 .map(|watchable| watchable.state());
173
174 match state {
175 Some(AuthState::Authenticated) => {
176 trace!("[is_authenticated] endpoint {} is authenticated", id);
177 true
178 }
179 Some(other) => {
180 trace!(
181 "[is_authenticated] endpoint {} is not authenticated, current state {}",
182 id,
183 other
184 );
185 false
186 }
187 None => {
188 trace!("[is_authenticated] endpoint {} has no auth state entry", id);
189 false
190 }
191 }
192 }
193
194 #[cfg(test)]
195 pub async fn list_authenticated(&self) -> Vec<PublicKey> {
196 self.auth_state
197 .lock()
198 .await
199 .iter()
200 .filter_map(|(id, watchable)| {
201 if watchable.state() == AuthState::Authenticated {
202 Some(*id)
203 } else {
204 None
205 }
206 })
207 .collect::<Vec<_>>()
208 }
209
210 #[cfg(test)]
211 pub async fn list_blocked(&self) -> Vec<PublicKey> {
212 self.auth_state
213 .lock()
214 .await
215 .iter()
216 .filter_map(|(id, watchable)| {
217 if watchable.state() == AuthState::Blocked {
218 Some(*id)
219 } else {
220 None
221 }
222 })
223 .collect::<Vec<_>>()
224 }
225}
226
227impl Authenticator {
228 async fn end_of_auth(
229 &self,
230 send: &mut iroh::endpoint::SendStream,
231 recv: &mut iroh::endpoint::RecvStream,
232 open: bool,
233 ) -> Result<(), AuthenticatorError> {
234 let start = Instant::now();
235 trace!(
236 "[end_of_auth] starting shutdown sequence for {} side",
237 if open { "open" } else { "accept" }
238 );
239 send.finish().map_err(|err| {
240 error!("[end_of_auth] failed to finish stream: {}", err);
241 if open {
242 AuthenticatorError::OpenFailed(format!("Failed to finish stream: {}", err))
243 } else {
244 AuthenticatorError::AcceptFailed(format!("Failed to finish stream: {}", err))
245 }
246 })?;
247
248 const MAX_READ_SIZE: usize = 1024;
249 if let Err(err) = tokio::time::timeout(AUTH_TIMEOUT, recv.read_to_end(MAX_READ_SIZE))
250 .await
251 .map_err(|_| {
252 if open {
253 AuthenticatorError::OpenFailed(
254 "Failed to wait for stream stopped: timeout".to_string(),
255 )
256 } else {
257 AuthenticatorError::AcceptFailed(
258 "Failed to wait for stream stopped: timeout".to_string(),
259 )
260 }
261 })
262 .and_then(|res| {
263 res.map_err(|err| {
264 if open {
265 AuthenticatorError::OpenFailed(format!(
266 "Failed to read remaining data from stream: {}",
267 err
268 ))
269 } else {
270 AuthenticatorError::AcceptFailed(format!(
271 "Failed to read remaining data from stream: {}",
272 err
273 ))
274 }
275 })
276 })
277 {
278 warn!("[end_of_auth] {}", err);
279 }
280 trace!(
281 "[end_of_auth] shutdown sequence for {} side completed in {:?}",
282 if open { "open" } else { "accept" },
283 start.elapsed()
284 );
285 Ok(())
286 }
287
288 pub(crate) async fn auth_accept(&self, conn: Connection) -> Result<(), AuthenticatorError> {
292 let remote_id = conn.remote_id();
293 let start = Instant::now();
294 trace!("[auth_accept] accepting auth connection from {}", remote_id);
295 trace!("[auth_accept] waiting for inbound bidirectional stream from {}", remote_id);
296 let (mut send, mut recv) = timeout(TRANSMISSION_TIMEOUT, conn.accept_bi())
297 .await
298 .map_err(|_| {
299 error!("[auth_accept] accept bidirectional stream timed out");
300 AuthenticatorError::AcceptFailed(
301 "Accept bidirectional stream timed out".to_string(),
302 )
303 })?
304 .map_err(|err| {
305 error!("[auth_accept] accept bidirectional stream failed: {}", err);
306 AuthenticatorError::AcceptFailed(format!(
307 "Accept bidirectional stream failed: {}",
308 err
309 ))
310 })?;
311 trace!(
312 "[auth_accept] bidirectional stream accepted from {} after {:?}",
313 remote_id,
314 start.elapsed()
315 );
316
317 let (spake, token_b) = Spake2::<Ed25519Group>::start_b(
318 &Password::new(self.secret.expose_secret()),
319 &Identity::new(conn.remote_id().as_bytes()),
320 &Identity::new(self.id().await?.as_bytes()),
321 );
322
323 let mut token_a = [0u8; 33];
324 trace!("[auth_accept] waiting for token_a from {}", remote_id);
325 recv.read_exact(&mut token_a).await.map_err(|err| {
326 error!("[auth_accept] failed to read token_a: {}", err);
327 AuthenticatorError::AcceptFailed(format!("Failed to read token_a: {}", err))
328 })?;
329 trace!("[auth_accept] received token_a from {}", remote_id);
330
331 trace!("[auth_accept] sending token_b to {}", remote_id);
332 send.write_all(&token_b).await.map_err(|err| {
333 error!("[auth_accept] failed to write token_b: {}", err);
334 AuthenticatorError::AcceptFailed(format!("Failed to write token_b: {}", err))
335 })?;
336 trace!("[auth_accept] sent token_b to {}", remote_id);
337
338 let shared_secret = spake.finish(&token_a).map_err(|err| {
339 error!("[auth_accept] SPAKE2 invalid: {}", err);
340 AuthenticatorError::AcceptFailedAndBlock(format!("SPAKE2 invalid: {}", err), remote_id)
341 })?;
342 trace!("[auth_accept] derived shared secret for {}", remote_id);
343
344 let hk = Hkdf::<Sha512>::new(None, shared_secret.as_slice());
345 let mut accept_key = [0u8; 64];
346 let mut open_key = [0u8; 64];
347 hk.expand(Self::ACCEPT_CONTEXT, &mut accept_key)
348 .map_err(|err| {
349 error!("[auth_accept] failed to expand accept_key: {}", err);
350 AuthenticatorError::AcceptFailed(format!("Failed to expand accept_key: {}", err))
351 })?;
352 hk.expand(Self::OPEN_CONTEXT, &mut open_key)
353 .map_err(|err| {
354 error!("[auth_accept] failed to expand open_key: {}", err);
355 AuthenticatorError::AcceptFailed(format!("Failed to expand open_key: {}", err))
356 })?;
357
358 trace!("[auth_accept] sending accept_key to {}", remote_id);
359 send.write_all(&accept_key).await.map_err(|err| {
360 error!("[auth_accept] failed to write accept_key: {}", err);
361 AuthenticatorError::AcceptFailed(format!("Failed to write accept_key: {}", err))
362 })?;
363 let mut remote_open_key = [0u8; 64];
364 trace!("[auth_accept] waiting for remote_open_key from {}", remote_id);
365 recv.read_exact(&mut remote_open_key).await.map_err(|err| {
366 error!("[auth_accept] failed to read remote_open_key: {}", err);
367 AuthenticatorError::AcceptFailed(format!("Failed to read remote_open_key: {}", err))
368 })?;
369 trace!("[auth_accept] received remote_open_key from {}", remote_id);
370
371 let _ = self.end_of_auth(&mut send, &mut recv, false).await;
372
373 if !bool::from(remote_open_key.ct_eq(&open_key)) {
374 error!("[auth_accept] remote open_key mismatch");
375 return Err(AuthenticatorError::AcceptFailedAndBlock(
376 "Remote open_key mismatch".to_string(),
377 remote_id,
378 ));
379 }
380
381 info!(
382 "[auth_accept] authenticated connection from {} in {:?}",
383 remote_id,
384 start.elapsed()
385 );
386
387 Ok(())
388 }
389
390 pub(crate) async fn auth_open(&self, conn: Connection) -> Result<(), AuthenticatorError> {
394 let remote_id = conn.remote_id();
395 let start = Instant::now();
396 trace!("[auth_open] opening auth connection to {}", remote_id);
397 trace!("[auth_open] waiting to open bidirectional stream to {}", remote_id);
398 let (mut send, mut recv) = timeout(TRANSMISSION_TIMEOUT, conn.open_bi())
399 .await
400 .map_err(|_| {
401 error!("[auth_open] open bidirectional stream timed out");
402 AuthenticatorError::OpenFailed("Open bidirectional stream timed out".to_string())
403 })?
404 .map_err(|err| {
405 error!("[auth_open] open bidirectional stream failed: {}", err);
406 AuthenticatorError::OpenFailed(format!("Open bidirectional stream failed: {}", err))
407 })?;
408 trace!(
409 "[auth_open] bidirectional stream opened to {} after {:?}",
410 remote_id,
411 start.elapsed()
412 );
413
414 let (spake, token_a) = Spake2::<Ed25519Group>::start_a(
415 &Password::new(self.secret.expose_secret()),
416 &Identity::new(self.id().await?.as_bytes()),
417 &Identity::new(conn.remote_id().as_bytes()),
418 );
419
420 trace!("[auth_open] sending token_a to {}", remote_id);
421 send.write_all(&token_a).await.map_err(|err| {
422 error!("[auth_open] failed to write token_a: {}", err);
423 AuthenticatorError::OpenFailed(format!("Failed to write token_a: {}", err))
424 })?;
425 trace!("[auth_open] sent token_a to {}", remote_id);
426
427 let mut token_b = [0u8; 33];
428 trace!("[auth_open] waiting for token_b from {}", remote_id);
429 recv.read_exact(&mut token_b).await.map_err(|err| {
430 error!("[auth_open] failed to read token_b: {}", err);
431 AuthenticatorError::OpenFailed(format!("Failed to read token_b: {}", err))
432 })?;
433 trace!("[auth_open] received token_b from {}", remote_id);
434
435 let shared_secret = spake.finish(&token_b).map_err(|err| {
436 error!("[auth_open] SPAKE2 invalid: {}", err);
437 AuthenticatorError::OpenFailedAndBlock(format!("SPAKE2 invalid: {}", err), remote_id)
438 })?;
439 trace!("[auth_open] derived shared secret for {}", remote_id);
440
441 let hk = Hkdf::<Sha512>::new(None, shared_secret.as_slice());
442 let mut accept_key = [0u8; 64];
443 let mut open_key = [0u8; 64];
444 hk.expand(Self::ACCEPT_CONTEXT, &mut accept_key)
445 .map_err(|err| {
446 error!("[auth_open] failed to expand accept_key: {}", err);
447 AuthenticatorError::OpenFailed(format!("Failed to expand accept_key: {}", err))
448 })?;
449 hk.expand(Self::OPEN_CONTEXT, &mut open_key)
450 .map_err(|err| {
451 error!("[auth_open] failed to expand open_key: {}", err);
452 AuthenticatorError::OpenFailed(format!("Failed to expand open_key: {}", err))
453 })?;
454
455 let mut remote_accept_key = [0u8; 64];
456 trace!("[auth_open] waiting for remote_accept_key from {}", remote_id);
457 recv.read_exact(&mut remote_accept_key)
458 .await
459 .map_err(|err| {
460 error!("[auth_open] failed to read remote_accept_key: {}", err);
461 AuthenticatorError::OpenFailed(format!("Failed to read remote_accept_key: {}", err))
462 })?;
463 trace!("[auth_open] received remote_accept_key from {}", remote_id);
464
465 if !bool::from(remote_accept_key.ct_eq(&accept_key)) {
466 error!("[auth_open] remote accept_key mismatch");
467
468 send.write_all(&rand::random::<[u8; 64]>()).await.ok();
472 let _ = self.end_of_auth(&mut send, &mut recv, true).await;
473
474 return Err(AuthenticatorError::OpenFailedAndBlock(
475 "Remote accept_key mismatch".to_string(),
476 remote_id,
477 ));
478 }
479
480 trace!("[auth_open] sending open_key to {}", remote_id);
481 send.write_all(&open_key).await.map_err(|err| {
482 error!("[auth_open] failed to write open_key: {}", err);
483 AuthenticatorError::OpenFailed(format!("Failed to write open_key: {}", err))
484 })?;
485 let _ = self.end_of_auth(&mut send, &mut recv, true).await;
486
487 info!(
488 "[auth_open] authenticated connection to {} in {:?}",
489 remote_id,
490 start.elapsed()
491 );
492
493 Ok(())
494 }
495}
496
497impl Authenticator {
498 pub(crate) async fn perform_auth(
499 &self,
500 remote_id: EndpointId,
501 endpoint: Endpoint,
502 ) -> Result<(), AuthenticatorError> {
503 let start_time = Instant::now();
504 let mut attempt = 0usize;
505 trace!("[perform_auth] starting authentication workflow for {}", remote_id);
506 if let Err(err) = timeout(AUTH_TIMEOUT, endpoint.online()).await.map_err(|_| {
507 AuthenticatorError::OpenFailed(
508 "[before_connect] awaiting endpoint.online() timed out".to_string(),
509 )
510 }) {
511 error!(
512 "[before_connect] awaiting endpoint.online() failed: {}",
513 err
514 );
515 release_in_flight(
516 self.auth_state.clone(),
517 remote_id,
518 AuthState::Unauthenticated,
519 )
520 .await
521 .map_err(|err| {
522 AuthenticatorError::OpenFailed(format!(
523 "[before_connect] failed to release in-flight state for {}: {}",
524 remote_id, err
525 ))
526 })?;
527 return Err(err);
528 }
529 trace!(
530 "[perform_auth] endpoint is online for {}, entering retry loop after {:?}",
531 remote_id,
532 start_time.elapsed()
533 );
534
535 while start_time.elapsed() < AUTH_TIMEOUT {
536 attempt += 1;
537 let attempt_start = Instant::now();
538 trace!(
539 "[perform_auth] attempt {} connecting to {} with {:?} remaining",
540 attempt,
541 remote_id,
542 remaining_timeout(start_time, AUTH_TIMEOUT)
543 );
544 match timeout(
545 remaining_timeout(start_time, AUTH_TIMEOUT),
546 endpoint.connect(remote_id, ALPN),
547 )
548 .await
549 {
550 Ok(Ok(conn)) => {
551 trace!(
552 "[perform_auth] attempt {} connected to {} after {:?}, starting auth_open",
553 attempt,
554 remote_id,
555 attempt_start.elapsed()
556 );
557 match timeout(
558 remaining_timeout(start_time, AUTH_TIMEOUT),
559 self.auth_open(conn),
560 )
561 .await
562 {
563 Ok(Ok(())) => {
564 trace!(
565 "[perform_auth] attempt {} authentication successful for {} after {:?}",
566 attempt,
567 remote_id,
568 attempt_start.elapsed()
569 );
570
571 release_in_flight(
572 self.auth_state.clone(),
573 remote_id,
574 AuthState::Authenticated,
575 )
576 .await
577 .map_err(|err| {
578 error!(
579 "[before_connect] failed to release in-flight state for {}: {}",
580 remote_id, err
581 );
582 AuthenticatorError::OpenFailed(format!(
583 "[before_connect] failed to release in-flight state for {}: {}",
584 remote_id, err
585 ))
586 })?;
587 info!(
588 "[perform_auth] authentication workflow for {} completed successfully in {:?}",
589 remote_id,
590 start_time.elapsed()
591 );
592 return Ok(());
593 }
594 Ok(Err(err)) => match &err {
595 AuthenticatorError::OpenFailedAndBlock(msg, public_key) => {
596 warn!(
597 "[perform_auth] attempt {} authentication failed and blocking {} after {:?}: {}",
598 attempt,
599 public_key,
600 attempt_start.elapsed(),
601 msg
602 );
603 release_in_flight(
604 self.auth_state.clone(),
605 remote_id,
606 AuthState::Blocked,
607 )
608 .await
609 .map_err(|err| {
610 AuthenticatorError::OpenFailedAndBlock(format!(
611 "[before_connect] failed to release in-flight state for {}: {}",
612 public_key, err
613 ), *public_key)
614 })?;
615 return Err(AuthenticatorError::OpenFailedAndBlock(
616 msg.clone(),
617 *public_key,
618 ));
619 }
620 _ => {
621 warn!(
622 "[perform_auth] attempt {} authentication failed for {} after {:?}: {}",
623 attempt,
624 remote_id,
625 attempt_start.elapsed(),
626 err
627 );
628 }
629 },
630 Err(_) => {
631 warn!(
632 "[perform_auth] attempt {} auth_open timed out for {} after {:?}, retrying",
633 attempt,
634 remote_id,
635 attempt_start.elapsed()
636 );
637 }
638 }
639 }
640 Ok(Err(e)) => {
641 warn!(
642 "[perform_auth] attempt {} failed to connect auth channel to {} after {:?}: {}, retrying",
643 attempt, remote_id, attempt_start.elapsed(), e
644 );
645 }
646 Err(e) => {
647 warn!(
648 "[perform_auth] attempt {} connection timed out for {} after {:?}: {}, retrying",
649 attempt, remote_id, attempt_start.elapsed(), e
650 );
651 }
652 };
653 trace!(
654 "[perform_auth] attempt {} for {} sleeping before retry with {:?} remaining",
655 attempt,
656 remote_id,
657 remaining_timeout(start_time, AUTH_TIMEOUT)
658 );
659 tokio::time::sleep(Duration::from_secs(1)).await;
660 }
661
662 warn!(
663 "[perform_auth] authentication workflow timed out for {} after {} attempts and {:?}",
664 remote_id,
665 attempt,
666 start_time.elapsed()
667 );
668
669 release_in_flight(
671 self.auth_state.clone(),
672 remote_id,
673 AuthState::Unauthenticated,
674 )
675 .await
676 .map_err(|err| {
677 AuthenticatorError::OpenFailed(format!(
678 "[before_connect] failed to release in-flight state for {}: {}",
679 remote_id, err
680 ))
681 })?;
682 Err(AuthenticatorError::OpenFailed(format!(
683 "Authentication timed out for {}",
684 remote_id
685 )))
686 }
687}
688
689fn remaining_timeout(start: Instant, timeout_duration: Duration) -> Duration {
690 timeout_duration.saturating_sub(Instant::now().saturating_duration_since(start))
691}