Skip to main content

magic_wormhole/
transit.rs

1//! Connect two sides via TCP, no matter where they are
2//!
3//! This protocol is the second part where the Wormhole magic happens. It does not strictly require a Wormhole connection,
4//! but it depends on some kind of secure communication channel to talk to the other side. Conveniently, Wormhole provides
5//! exactly such a thing :)
6//!
7//! Both clients exchange messages containing hints on how to find each other. These may be local IP addresses for in case they
8//! are in the same network, or the URL to a relay server. In case a direct connection fails, both will connect to the relay server
9//! which will transparently glue the connections together.
10//!
11//! Each side might implement (or use/enable) some [abilities](Abilities).
12//!
13//! **Notice:** while the resulting TCP connection is naturally bi-directional, the handshake is not symmetric. There *must* be one
14//! "leader" side and one "follower" side (formerly called "sender" and "receiver").
15
16use crate::{Key, KeyPurpose, core::key::GenericKey};
17use serde_derive::{Deserialize, Serialize};
18
19#[cfg(not(target_family = "wasm"))]
20use async_net::{TcpListener, TcpStream};
21#[allow(unused_imports)] /* We need them for the docs */
22use futures::{
23    Sink, SinkExt, Stream, StreamExt, TryStreamExt,
24    future::FutureExt,
25    future::TryFutureExt,
26    io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
27};
28use std::{
29    collections::HashSet,
30    net::{IpAddr, SocketAddr},
31    sync::Arc,
32    time::Instant,
33};
34
35mod crypto;
36mod transport;
37use crypto::TransitHandshakeError;
38use transport::{TransitTransport, TransitTransportRx, TransitTransportTx};
39
40/// ULR to a default hosted relay server. Please don't abuse or DOS.
41pub const DEFAULT_RELAY_SERVER: &str = "tcp://transit.magic-wormhole.io:4001";
42// No need to make public, it's hard-coded anyways (:
43// Open an issue if you want an API for this
44// Use <stun.stunprotocol.org:3478> for non-production testing
45#[cfg(not(target_family = "wasm"))]
46const PUBLIC_STUN_SERVER: &str = "stun.piegames.de:3478";
47
48/// Marker type for base key used in the Transit protocol.
49#[derive(Debug)]
50pub struct TransitKey;
51
52impl KeyPurpose for TransitKey {}
53
54#[derive(Debug)]
55pub(crate) struct TransitRxKey;
56
57impl KeyPurpose for TransitRxKey {}
58
59#[derive(Debug)]
60pub(crate) struct TransitTxKey;
61
62impl KeyPurpose for TransitTxKey {}
63
64/// An error occurred when connecting to the peer.
65#[derive(Debug, thiserror::Error)]
66#[non_exhaustive]
67pub enum TransitConnectError {
68    /// Incompatible abilities, or wrong hints
69    #[error("{}", _0)]
70    Protocol(Box<str>),
71
72    /// All (relay) handshakes failed or timed out; could not establish a connection with the peer
73    #[error(
74        "All (relay) handshakes failed or timed out; could not establish a connection with the peer"
75    )]
76    Handshake,
77
78    /// I/O error
79    #[error("I/O error")]
80    IO(
81        #[from]
82        #[source]
83        std::io::Error,
84    ),
85
86    /// WASM error
87    #[cfg(target_family = "wasm")]
88    #[error("WASM error")]
89    WASM(
90        #[from]
91        #[source]
92        ws_stream_wasm::WsErr,
93    ),
94}
95
96/// An error occurred during transit
97#[derive(Debug, thiserror::Error)]
98#[non_exhaustive]
99pub enum TransitError {
100    /// Cryptography error. This is probably an implementation bug, but may also be caused by an attack
101    #[error(
102        "Cryptography error. This is probably an implementation bug, but may also be caused by an attack."
103    )]
104    Crypto,
105
106    /// Wrong nonce received, got {:x?} but expected {:x?}. This is probably an implementation bug, but may also be caused by an attack
107    #[error(
108        "Wrong nonce received, got {:x?} but expected {:x?}. This is probably an implementation bug, but may also be caused by an attack.",
109        _0,
110        _1
111    )]
112    Nonce(Box<[u8]>, Box<[u8]>),
113
114    /// I/O error
115    #[error("I/O error")]
116    IO(
117        #[from]
118        #[source]
119        std::io::Error,
120    ),
121
122    /// WASM error
123    #[cfg(target_family = "wasm")]
124    #[error("WASM error")]
125    WASM(
126        #[from]
127        #[source]
128        ws_stream_wasm::WsErr,
129    ),
130}
131
132impl From<()> for TransitError {
133    fn from(_: ()) -> Self {
134        Self::Crypto
135    }
136}
137
138/**
139 * Defines a way to find the other side.
140 *
141 * Each ability comes with a set of [`Hints`] to encode how to meet up.
142 */
143#[derive(Copy, Clone, Debug, Default)]
144pub struct Abilities {
145    /** Direct connection to the peer */
146    pub direct_tcp_v1: bool,
147    /** Connection over a relay */
148    pub relay_v1: bool,
149    #[cfg(any())]
150    /** **Experimental** Use the [noise protocol](https://noiseprotocol.org) for the encryption. */
151    pub noise_v1: bool,
152}
153
154impl Abilities {
155    /// The abilities preset that contains all abilities
156    pub const ALL: Self = Self {
157        direct_tcp_v1: true,
158        relay_v1: true,
159        #[cfg(any())]
160        noise_v1: false,
161    };
162
163    /**
164     * If you absolutely don't want to use any relay servers.
165     *
166     * If the other side forces relay usage or doesn't support any of your connection modes
167     * the attempt will fail.
168     */
169    pub const FORCE_DIRECT: Self = Self {
170        direct_tcp_v1: true,
171        relay_v1: false,
172        #[cfg(any())]
173        noise_v1: false,
174    };
175
176    /**
177     * If you don't want to disclose your IP address to your peer
178     *
179     * If the other side forces a the usage of a direct connection the attempt will fail.
180     * Note that the other side might control the relay server being used, if you really
181     * don't want your IP to potentially be disclosed use TOR instead (not supported by
182     * the Rust implementation yet).
183     */
184    pub const FORCE_RELAY: Self = Self {
185        direct_tcp_v1: false,
186        relay_v1: true,
187        #[cfg(any())]
188        noise_v1: false,
189    };
190
191    /// Whether direct transfer is allowed
192    pub fn can_direct(&self) -> bool {
193        self.direct_tcp_v1
194    }
195
196    /// Whether relay transfer is allowed
197    pub fn can_relay(&self) -> bool {
198        self.relay_v1
199    }
200
201    #[cfg(any())]
202    pub(crate) fn can_noise_crypto(&self) -> bool {
203        self.noise_v1
204    }
205
206    /// Whether noise cryptography is supported
207    pub(crate) fn can_noise_crypto(&self) -> bool {
208        false
209    }
210
211    /// Keep only abilities that both sides support
212    pub fn intersect(mut self, other: &Self) -> Self {
213        self.direct_tcp_v1 &= other.direct_tcp_v1;
214        self.relay_v1 &= other.relay_v1;
215        #[cfg(any())]
216        {
217            self.noise_v1 &= other.noise_v1;
218        }
219        self
220    }
221}
222
223impl serde::Serialize for Abilities {
224    fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
225    where
226        S: serde::Serializer,
227    {
228        let mut hints = Vec::new();
229        if self.direct_tcp_v1 {
230            hints.push(serde_json::json!({
231                "type": "direct-tcp-v1",
232            }));
233        }
234        if self.relay_v1 {
235            hints.push(serde_json::json!({
236                "type": "relay-v1",
237            }));
238        }
239        #[cfg(any())]
240        if self.noise_v1 {
241            hints.push(serde_json::json!({
242                "type": "noise-crypto-v1",
243            }));
244        }
245        serde_json::Value::Array(hints).serialize(ser)
246    }
247}
248
249impl<'de> serde::Deserialize<'de> for Abilities {
250    fn deserialize<D>(de: D) -> Result<Self, D::Error>
251    where
252        D: serde::Deserializer<'de>,
253    {
254        #[derive(Deserialize)]
255        #[serde(rename_all = "kebab-case", tag = "type")]
256        enum Ability {
257            DirectTcpV1,
258            RelayV1,
259            RelayV2,
260            #[cfg(any())]
261            NoiseCryptoV1,
262            #[serde(other)]
263            Other,
264        }
265
266        let mut abilities = Self::default();
267        /* Specifying a hint multiple times is undefined behavior. Here, we simply merge all features. */
268        for ability in <Vec<Ability> as serde::Deserialize>::deserialize(de)? {
269            match ability {
270                Ability::DirectTcpV1 => {
271                    abilities.direct_tcp_v1 = true;
272                },
273                Ability::RelayV1 => {
274                    abilities.relay_v1 = true;
275                },
276                #[cfg(any())]
277                Ability::NoiseCryptoV1 => {
278                    abilities.noise_v1 = true;
279                },
280                _ => (),
281            }
282        }
283        Ok(abilities)
284    }
285}
286
287/* Wire representation of a single hint */
288#[derive(Serialize, Deserialize, Debug, PartialEq)]
289#[serde(rename_all = "kebab-case", tag = "type")]
290#[non_exhaustive]
291enum HintSerde {
292    DirectTcpV1(DirectHint),
293    RelayV1(RelayHint),
294    #[serde(other)]
295    Unknown,
296}
297
298/** Information about how to find a peer */
299#[derive(Clone, Debug, Default)]
300pub struct Hints {
301    /** Hints for direct connection */
302    pub direct_tcp: HashSet<DirectHint>,
303    /** List of relay servers */
304    pub relay: Vec<RelayHint>,
305}
306
307impl Hints {
308    /// Create new hints
309    pub fn new(
310        direct_tcp: impl IntoIterator<Item = DirectHint>,
311        relay: impl IntoIterator<Item = RelayHint>,
312    ) -> Self {
313        Self {
314            direct_tcp: direct_tcp.into_iter().collect(),
315            relay: relay.into_iter().collect(),
316        }
317    }
318}
319
320impl<'de> serde::Deserialize<'de> for Hints {
321    fn deserialize<D>(de: D) -> Result<Self, D::Error>
322    where
323        D: serde::Deserializer<'de>,
324    {
325        let hints: Vec<HintSerde> = serde::Deserialize::deserialize(de)?;
326        let mut direct_tcp = HashSet::new();
327        let mut relay = Vec::<RelayHint>::new();
328        let mut relay_v2 = Vec::<RelayHint>::new();
329
330        for hint in hints {
331            match hint {
332                HintSerde::DirectTcpV1(hint) => {
333                    direct_tcp.insert(hint);
334                },
335                HintSerde::RelayV1(hint) => {
336                    relay_v2.push(hint);
337                },
338                /* Ignore unknown hints */
339                _ => {},
340            }
341        }
342
343        /* If there are any relay-v2 hints, there relay-v1 are redundant */
344        if !relay_v2.is_empty() {
345            relay.clear();
346        }
347        relay.extend(relay_v2);
348
349        Ok(Hints { direct_tcp, relay })
350    }
351}
352
353impl serde::Serialize for Hints {
354    fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
355    where
356        S: serde::Serializer,
357    {
358        let direct = self.direct_tcp.iter().cloned().map(HintSerde::DirectTcpV1);
359        let relay = self.relay.iter().cloned().map(HintSerde::RelayV1);
360        ser.collect_seq(direct.chain(relay))
361    }
362}
363
364/** hostname and port for direct connection */
365#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash, derive_more::Display)]
366#[display("tcp://{}:{}", hostname, port)]
367pub struct DirectHint {
368    // DirectHint also contains a `priority` field, but it is underspecified
369    // and we won't use it
370    // pub priority: f32,
371    /// The hostname via which to reach this peer
372    pub hostname: String,
373    /// The port to use
374    pub port: u16,
375}
376
377impl DirectHint {
378    /// Create a new direct hint
379    pub fn new(hostname: impl Into<String>, port: u16) -> Self {
380        Self {
381            hostname: hostname.into(),
382            port,
383        }
384    }
385}
386
387/* Wire representation of a single relay hint (Helper struct for serialization) */
388#[derive(Serialize, Deserialize, Debug, PartialEq)]
389#[serde(rename_all = "kebab-case", tag = "type")]
390#[non_exhaustive]
391struct RelayHintSerde {
392    name: Option<String>,
393    #[serde(rename = "hints")]
394    endpoints: Vec<RelayHintSerdeInner>,
395}
396
397/* Wire representation of a single relay endpoint (Helper struct for serialization) */
398#[derive(Serialize, Deserialize, Debug, PartialEq)]
399#[serde(rename_all = "kebab-case", tag = "type")]
400#[non_exhaustive]
401enum RelayHintSerdeInner {
402    #[serde(rename = "direct-tcp-v1")]
403    Tcp(DirectHint),
404    Websocket {
405        url: url::Url,
406    },
407    #[serde(other)]
408    Unknown,
409}
410
411#[derive(Debug, thiserror::Error)]
412#[non_exhaustive]
413/// An error occurred while parsing a relay hint
414pub enum RelayHintParseError {
415    #[error(
416        "Invalid TCP hint endpoint: '{}' (Does it have hostname and port?)",
417        _0
418    )]
419    /// Invalid TCP hint endpoint
420    InvalidTcp(url::Url),
421    #[error(
422        "Unknown schema: '{}'. Currently known values are 'tcp', 'ws'  and 'wss'.",
423        _0
424    )]
425    /// Unknown schema. Currently known values are 'tcp', 'ws'  and 'wss'.
426    UnknownSchema(Box<str>),
427    #[error("'{}' is not an absolute URL (must start with a '/')", _0)]
428    /// The provided URL is not absolute
429    UrlNotAbsolute(url::Url),
430}
431
432/**
433 * Hint describing a relay server
434 *
435 * A server may be reachable at multiple locations. Any two must be relayable
436 * over that server, therefore a client may pick only one of these per hint.
437 *
438 * All locations are URLs, but here they are already deconstructed and grouped
439 * by schema out of convenience.
440 */
441/* RelayHint::default() gives the empty server (cannot be reached), and is only there for struct update syntax */
442#[derive(Clone, Debug, Eq, PartialEq, Default)]
443pub struct RelayHint {
444    /** Human readable name. The expectation is that when a server has multiple endpoints, the
445     * expectation is that the domain name is used as name
446     */
447    pub name: Option<String>,
448    /** TCP endpoints of that relay */
449    pub tcp: HashSet<DirectHint>,
450    /** WebSockets endpoints of that relay */
451    pub ws: HashSet<url::Url>,
452}
453
454impl RelayHint {
455    /// Create a new relay hint
456    pub fn new(
457        name: Option<String>,
458        tcp: impl IntoIterator<Item = DirectHint>,
459        ws: impl IntoIterator<Item = url::Url>,
460    ) -> Self {
461        Self {
462            name,
463            tcp: tcp.into_iter().collect(),
464            ws: ws.into_iter().collect(),
465        }
466    }
467
468    /// Construct a relay hint from a list of multiple endpoints, and optionally a name.
469    ///
470    /// Not all URLs are acceptable, therefore this method is fallible. Especially, TCP endpoints
471    /// must be encoded as `tcp://hostname:port`. All URLs must be absolute, i.e. start with a `/`.
472    ///
473    /// Basic usage (default server):
474    ///
475    /// ```
476    /// use magic_wormhole::transit;
477    /// let hint =
478    ///     transit::RelayHint::from_urls(None, [transit::DEFAULT_RELAY_SERVER.parse().unwrap()])
479    ///         .unwrap();
480    /// ```
481    ///
482    /// Custom relay server from url with name:
483    ///
484    /// ```
485    /// use magic_wormhole::transit;
486    /// # let url: url::Url = transit::DEFAULT_RELAY_SERVER.parse().unwrap();
487    /// let hint = transit::RelayHint::from_urls(url.host_str().map(str::to_owned), [url]).unwrap();
488    /// ```
489    pub fn from_urls(
490        name: Option<String>,
491        urls: impl IntoIterator<Item = url::Url>,
492    ) -> Result<Self, RelayHintParseError> {
493        let mut this = Self {
494            name,
495            ..Self::default()
496        };
497        for url in urls.into_iter() {
498            ensure!(
499                !url.cannot_be_a_base(),
500                RelayHintParseError::UrlNotAbsolute(url)
501            );
502            match url.scheme() {
503                "tcp" => {
504                    /* Using match */
505                    let (hostname, port) = match (url.host_str(), url.port()) {
506                        (Some(hostname), Some(port)) => (hostname.into(), port),
507                        _ => bail!(RelayHintParseError::InvalidTcp(url)),
508                    };
509                    this.tcp.insert(DirectHint { hostname, port });
510                },
511                "ws" | "wss" => {
512                    this.ws.insert(url);
513                },
514                other => bail!(RelayHintParseError::UnknownSchema(other.into())),
515            }
516        }
517        assert!(
518            !this.tcp.is_empty() || !this.ws.is_empty(),
519            "No URLs provided"
520        );
521        Ok(this)
522    }
523
524    /// Whether the relay server is probably the same
525    pub(crate) fn can_merge(&self, other: &Self) -> bool {
526        !self.tcp.is_disjoint(&other.tcp) || !self.ws.is_disjoint(&other.ws)
527    }
528
529    /// Extend this server with additional endpoints
530    pub(crate) fn merge_mut(&mut self, other: Self) {
531        self.tcp.extend(other.tcp);
532        self.ws.extend(other.ws);
533    }
534
535    /// Deduplicate and merge the hints us into theirs
536    pub(crate) fn merge_into(self, collection: &mut Vec<RelayHint>) {
537        for item in collection.iter_mut() {
538            if item.can_merge(&self) {
539                item.merge_mut(self);
540                return;
541            }
542        }
543        collection.push(self);
544    }
545}
546
547impl serde::Serialize for RelayHint {
548    fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
549    where
550        S: serde::Serializer,
551    {
552        let mut hints = Vec::new();
553        hints.extend(self.tcp.iter().cloned().map(RelayHintSerdeInner::Tcp));
554        hints.extend(
555            self.ws
556                .iter()
557                .cloned()
558                .map(|h| RelayHintSerdeInner::Websocket { url: h }),
559        );
560
561        serde_json::json!({
562            "name": self.name,
563            "hints": hints,
564        })
565        .serialize(ser)
566    }
567}
568
569impl<'de> serde::Deserialize<'de> for RelayHint {
570    fn deserialize<D>(de: D) -> Result<Self, D::Error>
571    where
572        D: serde::Deserializer<'de>,
573    {
574        let raw = RelayHintSerde::deserialize(de)?;
575        let mut hint = RelayHint {
576            name: raw.name,
577            tcp: HashSet::new(),
578            ws: HashSet::new(),
579        };
580
581        for e in raw.endpoints {
582            match e {
583                RelayHintSerdeInner::Tcp(tcp) => {
584                    hint.tcp.insert(tcp);
585                },
586                RelayHintSerdeInner::Websocket { url } => {
587                    hint.ws.insert(url);
588                },
589                /* Ignore unknown hints */
590                _ => {},
591            }
592        }
593
594        Ok(hint)
595    }
596}
597
598impl TryFrom<&DirectHint> for IpAddr {
599    type Error = std::net::AddrParseError;
600    fn try_from(hint: &DirectHint) -> Result<IpAddr, std::net::AddrParseError> {
601        hint.hostname.parse()
602    }
603}
604
605impl TryFrom<&DirectHint> for SocketAddr {
606    type Error = std::net::AddrParseError;
607    /** This does not do the obvious thing and also implicitly maps all V4 addresses into V6 */
608    fn try_from(hint: &DirectHint) -> Result<SocketAddr, std::net::AddrParseError> {
609        let addr = hint.try_into()?;
610        let addr = match addr {
611            IpAddr::V4(v4) => IpAddr::V6(v4.to_ipv6_mapped()),
612            IpAddr::V6(_) => addr,
613        };
614        Ok(SocketAddr::new(addr, hint.port))
615    }
616}
617
618/// Direct or relay
619#[derive(Clone, Debug, Eq, PartialEq)]
620#[non_exhaustive]
621pub enum ConnectionType {
622    /// We are directly connected to our peer
623    Direct,
624    /// We are connected to a relay server, and may even know its name
625    Relay {
626        /// The name of the relay server
627        name: Option<String>,
628    },
629}
630
631impl std::fmt::Display for ConnectionType {
632    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
633        match self {
634            ConnectionType::Direct => write!(f, "directly"),
635            ConnectionType::Relay { name: Some(name) } => write!(f, "via relay ({name})"),
636            ConnectionType::Relay { name: None } => write!(f, "via relay"),
637        }
638    }
639}
640
641/// Metadata for the established transit connection
642#[derive(Clone, Debug, Eq, PartialEq)]
643#[non_exhaustive]
644pub struct TransitInfo {
645    /// Whether we are connected directly or via a relay server
646    pub conn_type: ConnectionType,
647    /// Target address of our connection. This may be our peer, or the relay server.
648    /// This says nothing about the actual transport protocol used.
649    #[cfg(not(target_family = "wasm"))]
650    pub peer_addr: SocketAddr,
651}
652
653type TransitConnection = (Box<dyn TransitTransport>, TransitInfo);
654
655#[cfg(not(target_family = "wasm"))]
656#[derive(Debug, thiserror::Error)]
657enum StunError {
658    #[error("No IPv4 addresses were found for the selected STUN server")]
659    ServerIsV6Only,
660    #[error("Server did not tell us our IP address")]
661    ServerNoResponse,
662    #[error("Connection timed out")]
663    Timeout,
664    #[error("IO error")]
665    IO(
666        #[from]
667        #[source]
668        std::io::Error,
669    ),
670    #[error("Malformed STUN packet")]
671    Codec(
672        #[from]
673        #[source]
674        bytecodec::Error,
675    ),
676}
677
678#[cfg(not(target_family = "wasm"))]
679impl std::fmt::Display for TransitInfo {
680    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681        match &self.conn_type {
682            ConnectionType::Direct => {
683                write!(
684                    f,
685                    "Established direct transit connection to '{}'",
686                    self.peer_addr,
687                )
688            },
689            ConnectionType::Relay { name: Some(name) } => {
690                write!(
691                    f,
692                    "Established transit connection via relay '{}' ({})",
693                    name, self.peer_addr,
694                )
695            },
696            ConnectionType::Relay { name: None } => {
697                write!(
698                    f,
699                    "Established transit connection via relay ({})",
700                    self.peer_addr,
701                )
702            },
703        }
704    }
705}
706
707#[cfg(target_family = "wasm")]
708impl std::fmt::Display for TransitInfo {
709    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710        match &self.conn_type {
711            ConnectionType::Direct => {
712                write!(f, "Established direct transit connection",)
713            },
714            ConnectionType::Relay { name: Some(name) } => {
715                write!(f, "Established transit connection via relay '{}'", name)
716            },
717            ConnectionType::Relay { name: None } => {
718                write!(f, "Established transit connection via relay",)
719            },
720        }
721    }
722}
723
724/**
725 * Initialize a relay handshake
726 *
727 * Bind a port and generate our [`Hints`]. This does not do any communication yet.
728 */
729pub async fn init(
730    mut abilities: Abilities,
731    peer_abilities: Option<Abilities>,
732    relay_hints: Vec<RelayHint>,
733) -> Result<TransitConnector, std::io::Error> {
734    let mut our_hints = Hints::default();
735    #[cfg(not(target_family = "wasm"))]
736    let mut sockets = None;
737
738    if let Some(peer_abilities) = peer_abilities {
739        abilities = abilities.intersect(&peer_abilities);
740    }
741
742    /* Detect our IP addresses if the ability is enabled */
743    #[cfg(not(target_family = "wasm"))]
744    if abilities.can_direct() {
745        let create_sockets = async {
746            /* Do a STUN query to get our public IP. If it works, we must reuse the same socket (port)
747             * so that we will be NATted to the same port again. If it doesn't, simply bind a new socket
748             * and use that instead.
749             */
750
751            let socket: MaybeConnectedSocket = match crate::util::timeout(
752                std::time::Duration::from_secs(4),
753                transport::tcp_get_external_ip(),
754            )
755            .await
756            .map_err(|_| StunError::Timeout)
757            {
758                Ok(Ok((external_ip, stream))) => {
759                    tracing::debug!("Our external IP address is {}", external_ip);
760                    our_hints.direct_tcp.insert(DirectHint {
761                        hostname: external_ip.ip().to_string(),
762                        port: external_ip.port(),
763                    });
764                    tracing::debug!(
765                        "Our socket for connecting is bound to {} and connected to {}",
766                        stream.local_addr()?,
767                        stream.peer_addr()?,
768                    );
769                    stream.into()
770                },
771                // TODO replace with .flatten() once stable
772                // https://github.com/rust-lang/rust/issues/70142
773                Err(err) | Ok(Err(err)) => {
774                    tracing::warn!("Failed to get external address via STUN, {}", err);
775                    let socket =
776                        socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, None)?;
777                    transport::set_socket_opts(&socket)?;
778
779                    socket.bind(&"[::]:0".parse::<SocketAddr>().unwrap().into())?;
780                    tracing::debug!(
781                        "Our socket for connecting is bound to {}",
782                        socket.local_addr()?.as_socket().unwrap(),
783                    );
784
785                    socket.into()
786                },
787            };
788
789            /* Get a second socket, but this time open a listener on that port.
790             * This sadly doubles the number of hints, but the method above doesn't work
791             * for systems which don't have any firewalls. Also, this time we can't reuse
792             * the port. In theory, we could, but it really confused the kernel to the point
793             * of `accept` calls never returning again.
794             */
795            let listener = TcpListener::bind("[::]:0").await?;
796
797            /* Find our ports, iterate all our local addresses, combine them with the ports and that's our hints */
798            let port = socket.local_addr()?.as_socket().unwrap().port();
799            let port2 = listener.local_addr()?.port();
800            our_hints.direct_tcp.extend(
801                if_addrs::get_if_addrs()?
802                    .iter()
803                    .filter(|iface| !iface.is_loopback())
804                    .flat_map(|ip| {
805                        [
806                            DirectHint {
807                                hostname: ip.ip().to_string(),
808                                port,
809                            },
810                            DirectHint {
811                                hostname: ip.ip().to_string(),
812                                port: port2,
813                            },
814                        ]
815                        .into_iter()
816                    }),
817            );
818            tracing::debug!("Our socket for listening is {}", listener.local_addr()?);
819
820            Ok::<_, std::io::Error>((socket, listener))
821        };
822
823        sockets = create_sockets
824            .await
825            // TODO replace with inspect_err once stable
826            .map_err(|err| {
827                tracing::error!("Failed to create direct hints for our side: {}", err);
828                err
829            })
830            .ok();
831    }
832
833    if abilities.can_relay() {
834        our_hints.relay.extend(relay_hints);
835    }
836
837    Ok(TransitConnector {
838        #[cfg(not(target_family = "wasm"))]
839        sockets,
840        our_abilities: abilities,
841        our_hints: Arc::new(our_hints),
842    })
843}
844
845/// Bound socket, maybe also connected. Guaranteed to have SO_REUSEADDR.
846#[cfg(not(target_family = "wasm"))]
847#[derive(derive_more::From)]
848enum MaybeConnectedSocket {
849    #[from]
850    Socket(socket2::Socket),
851    #[from]
852    Stream(TcpStream),
853}
854
855#[cfg(not(target_family = "wasm"))]
856impl MaybeConnectedSocket {
857    fn local_addr(&self) -> std::io::Result<socket2::SockAddr> {
858        match &self {
859            Self::Socket(socket) => socket.local_addr(),
860            Self::Stream(stream) => Ok(stream.local_addr()?.into()),
861        }
862    }
863}
864
865/// The role a party takes in a transit connection handshake.
866///
867/// (Despite the handshake being asymmetric, the resulting connection is symmetric.)
868#[derive(Clone, Debug)]
869pub enum TransitRole {
870    /// The leader role in the handshake, formerly called "sender".
871    Leader,
872    /// The follower role in the handshake, formerly called "receiver".
873    Follower,
874}
875
876/**
877 * A partially set up [`Transit`] connection.
878 *
879 * For the transit handshake, each side generates a [`Hints`] with all the information to find the other. You need
880 * to exchange it (as in: send yours, receive theirs) with them. This is outside of the transit protocol, because we
881 * are protocol agnostic.
882 */
883pub struct TransitConnector {
884    /* Only `Some` if direct-tcp-v1 ability has been enabled.
885     * The first socket is the port from which we will start connection attempts.
886     * For in case the user is behind no firewalls, we must also listen to the second socket.
887     */
888    #[cfg(not(target_family = "wasm"))]
889    sockets: Option<(MaybeConnectedSocket, TcpListener)>,
890    our_abilities: Abilities,
891    our_hints: Arc<Hints>,
892}
893
894impl TransitConnector {
895    /// The abilities that we've sent to the other side
896    pub fn our_abilities(&self) -> &Abilities {
897        &self.our_abilities
898    }
899
900    /** Send this one to the other side */
901    pub fn our_hints(&self) -> &Arc<Hints> {
902        &self.our_hints
903    }
904
905    /// Connect to the other side.
906    ///
907    /// One side must call with `role` set to [`TransitRole::Leader`]
908    /// and the other with [`TransitRole::Follower`].
909    pub async fn connect(
910        self,
911        role: TransitRole,
912        transit_key: Key<TransitKey>,
913        their_abilities: Abilities,
914        their_hints: Arc<Hints>,
915    ) -> Result<(Transit, TransitInfo), TransitConnectError> {
916        match role {
917            TransitRole::Leader => {
918                self.leader_connect(transit_key, their_abilities, their_hints)
919                    .await
920            },
921            TransitRole::Follower => {
922                self.follower_connect(transit_key, their_abilities, their_hints)
923                    .await
924            },
925        }
926    }
927
928    /**
929     * Connect to the other side, as sender.
930     */
931    async fn leader_connect(
932        self,
933        transit_key: Key<TransitKey>,
934        their_abilities: Abilities,
935        their_hints: Arc<Hints>,
936    ) -> Result<(Transit, TransitInfo), TransitConnectError> {
937        let Self {
938            #[cfg(not(target_family = "wasm"))]
939            sockets,
940            our_abilities,
941            our_hints,
942        } = self;
943        let transit_key = Arc::new(transit_key);
944
945        let start = Instant::now();
946        let mut connection_stream = Box::pin(
947            Self::connect_inner(
948                true,
949                transit_key,
950                our_abilities,
951                our_hints,
952                their_abilities,
953                their_hints,
954                #[cfg(not(target_family = "wasm"))]
955                sockets,
956            )
957            .filter_map(|result| async {
958                match result {
959                    Ok(val) => Some(val),
960                    Err(err) => {
961                        tracing::debug!("Some leader handshake failed: {:?}", err);
962                        None
963                    },
964                }
965            }),
966        );
967
968        let (mut transit, mut finalizer, mut conn_info) =
969            crate::util::timeout(std::time::Duration::from_secs(60), connection_stream.next())
970                .await
971                .map_err(|_| {
972                    tracing::debug!("`leader_connect` timed out");
973                    TransitConnectError::Handshake
974                })?
975                .ok_or(TransitConnectError::Handshake)?;
976
977        if conn_info.conn_type != ConnectionType::Direct && our_abilities.can_direct() {
978            tracing::debug!(
979                "Established transit connection over relay. Trying to find a direct connection …"
980            );
981            /* Measure the time it took us to get a response. Based on this, wait some more for more responses
982             * in case we like one better.
983             */
984            let elapsed = start.elapsed();
985            let to_wait = if elapsed.as_secs() > 5 {
986                /* If our RTT was *that* long, let's just be happy we even got one connection */
987                std::time::Duration::from_secs(1)
988            } else {
989                elapsed.mul_f32(0.3)
990            };
991            let _ = crate::util::timeout(to_wait, async {
992                while let Some((new_transit, new_finalizer, new_conn_info)) =
993                    connection_stream.next().await
994                {
995                    /* We already got a connection, so we're only interested in direct ones */
996                    if new_conn_info.conn_type == ConnectionType::Direct {
997                        transit = new_transit;
998                        finalizer = new_finalizer;
999                        conn_info = new_conn_info;
1000                        tracing::debug!("Found direct connection; using that instead.");
1001                        break;
1002                    }
1003                }
1004            })
1005            .await;
1006            tracing::debug!("Did not manage to establish a better connection in time.");
1007        } else {
1008            tracing::debug!("Established direct transit connection");
1009        }
1010
1011        /* Cancel all remaining non-finished handshakes. We could send "nevermind" to explicitly tell
1012         * the other side (probably, this is mostly for relay server statistics), but eeh, nevermind :)
1013         */
1014        std::mem::drop(connection_stream);
1015
1016        let (tx, rx) = finalizer
1017            .handshake_finalize(&mut transit)
1018            .await
1019            .map_err(|e| {
1020                tracing::debug!("`handshake_finalize` failed: {e}");
1021                TransitConnectError::Handshake
1022            })?;
1023
1024        Ok((
1025            Transit {
1026                socket: transit,
1027                tx,
1028                rx,
1029            },
1030            conn_info,
1031        ))
1032    }
1033
1034    /**
1035     * Connect to the other side, as receiver
1036     */
1037    async fn follower_connect(
1038        self,
1039        transit_key: Key<TransitKey>,
1040        their_abilities: Abilities,
1041        their_hints: Arc<Hints>,
1042    ) -> Result<(Transit, TransitInfo), TransitConnectError> {
1043        let Self {
1044            #[cfg(not(target_family = "wasm"))]
1045            sockets,
1046            our_abilities,
1047            our_hints,
1048        } = self;
1049        let transit_key = Arc::new(transit_key);
1050
1051        let mut connection_stream = Box::pin(
1052            Self::connect_inner(
1053                false,
1054                transit_key,
1055                our_abilities,
1056                our_hints,
1057                their_abilities,
1058                their_hints,
1059                #[cfg(not(target_family = "wasm"))]
1060                sockets,
1061            )
1062            .filter_map(|result| async {
1063                match result {
1064                    Ok(val) => Some(val),
1065                    Err(err) => {
1066                        tracing::debug!("Some follower handshake failed: {:?}", err);
1067                        None
1068                    },
1069                }
1070            }),
1071        );
1072
1073        let transit = match crate::util::timeout(
1074            std::time::Duration::from_secs(60),
1075            &mut connection_stream.next(),
1076        )
1077        .await
1078        {
1079            Ok(Some((mut socket, finalizer, conn_info))) => {
1080                let (tx, rx) = finalizer
1081                    .handshake_finalize(&mut socket)
1082                    .await
1083                    .map_err(|e| {
1084                        tracing::debug!("`handshake_finalize` failed: {e}");
1085                        TransitConnectError::Handshake
1086                    })?;
1087
1088                Ok((Transit { socket, tx, rx }, conn_info))
1089            },
1090            Ok(None) | Err(_) => {
1091                tracing::debug!("`follower_connect` timed out");
1092                Err(TransitConnectError::Handshake)
1093            },
1094        };
1095
1096        /* Cancel all remaining non-finished handshakes. We could send "nevermind" to explicitly tell
1097         * the other side (probably, this is mostly for relay server statistics), but eeh, nevermind :)
1098         */
1099        std::mem::drop(connection_stream);
1100
1101        transit
1102    }
1103
1104    /** Try to establish a connection with the peer.
1105     *
1106     * This encapsulates code that is common to both the leader and the follower.
1107     *
1108     * ## Panics
1109     *
1110     * If the receiving end of the channel for the results is closed before all futures in the return
1111     * value are cancelled/dropped.
1112     */
1113    fn connect_inner(
1114        is_leader: bool,
1115        transit_key: Arc<Key<TransitKey>>,
1116        our_abilities: Abilities,
1117        our_hints: Arc<Hints>,
1118        their_abilities: Abilities,
1119        their_hints: Arc<Hints>,
1120        #[cfg(not(target_family = "wasm"))] sockets: Option<(MaybeConnectedSocket, TcpListener)>,
1121    ) -> impl Stream<Item = Result<HandshakeResult, TransitHandshakeError>> + 'static {
1122        /* Have Some(sockets) → Can direct */
1123        #[cfg(not(target_family = "wasm"))]
1124        assert!(sockets.is_none() || our_abilities.can_direct());
1125
1126        let cryptor = if our_abilities.can_noise_crypto() && their_abilities.can_noise_crypto() {
1127            tracing::debug!("Using noise protocol for encryption");
1128            Arc::new(crypto::NoiseInit {
1129                key: transit_key.clone(),
1130            }) as Arc<dyn crypto::TransitCryptoInit>
1131        } else {
1132            tracing::debug!("Using secretbox for encryption");
1133            Arc::new(crypto::SecretboxInit {
1134                key: transit_key.clone(),
1135            }) as Arc<dyn crypto::TransitCryptoInit>
1136        };
1137
1138        // 8. listen for connections on the port and simultaneously try connecting to the peer port.
1139        let tside = Arc::new(hex::encode(rand::random::<[u8; 8]>()));
1140
1141        /* Iterator of futures yielding a connection. They'll be then mapped with the handshake, collected into
1142         * a Vec and polled concurrently.
1143         */
1144        #[cfg(not(target_family = "wasm"))]
1145        use futures::future::BoxFuture;
1146        #[cfg(target_family = "wasm")]
1147        use futures::future::LocalBoxFuture as BoxFuture;
1148        type BoxIterator<T> = Box<dyn Iterator<Item = T>>;
1149        type ConnectorFuture = BoxFuture<'static, Result<TransitConnection, TransitHandshakeError>>;
1150        let mut connectors: BoxIterator<ConnectorFuture> = Box::new(std::iter::empty());
1151
1152        #[cfg(not(target_family = "wasm"))]
1153        let (socket, listener) = sockets.unzip();
1154        #[cfg(not(target_family = "wasm"))]
1155        if our_abilities.can_direct() && their_abilities.can_direct() {
1156            let local_addr = socket.map(|socket| {
1157                Arc::new(
1158                    socket
1159                        .local_addr()
1160                        .expect("This is guaranteed to be an IP socket"),
1161                )
1162            });
1163            /* Connect to each hint of the peer */
1164            connectors = Box::new(
1165                connectors.chain(
1166                    their_hints
1167                        .direct_tcp
1168                        .clone()
1169                        .into_iter()
1170                        /* Nobody should have that many IP addresses, even with NATing */
1171                        .take(50)
1172                        .map(move |hint| transport::connect_tcp_direct(local_addr.clone(), hint))
1173                        .map(|fut| Box::pin(fut) as ConnectorFuture),
1174                ),
1175            ) as BoxIterator<ConnectorFuture>;
1176        }
1177
1178        /* Relay hints. Make sure that both sides advertise it, since it is fine to support it without providing own hints. */
1179        if our_abilities.can_relay() && their_abilities.can_relay() {
1180            /* Collect intermediate into HashSet for deduplication */
1181            let mut relay_hints = Vec::<RelayHint>::new();
1182            relay_hints.extend(our_hints.relay.iter().take(2).cloned());
1183            for hint in their_hints.relay.iter().take(2).cloned() {
1184                hint.merge_into(&mut relay_hints);
1185            }
1186
1187            #[cfg(not(target_family = "wasm"))]
1188            {
1189                connectors = Box::new(
1190                    connectors.chain(
1191                    relay_hints
1192                        .into_iter()
1193                        /* A hint may have multiple addresses pointing towards the server. This may be multiple
1194                        * domain aliases or different ports or an IPv6 or IPv4 address. We only need
1195                         * to connect to one of them, since they are considered equivalent. However, we
1196                         * also want to be prepared for the rare case of one failing, thus we try to reach
1197                         * up to three different addresses. To not flood the system with requests, we
1198                         * start them in a 5 seconds interval spread. If one of them succeeds, the remaining ones
1199                         * will be cancelled anyways. Note that a hint might not necessarily be reachable via TCP.
1200                         */
1201                        .flat_map(|hint| {
1202                            /* If the hint has no name, take the first domain name as fallback */
1203                            let name = hint.name
1204                            .or_else(|| {
1205                                /* Try to parse as IP address. We are only interested in human readable names (the IP address will be printed anyways) */
1206                                hint.tcp.iter()
1207                                        .filter_map(|hint| match url::Host::parse(&hint.hostname) {
1208                                            Ok(url::Host::Domain(_)) => Some(hint.hostname.clone()),
1209                                            _ => None,
1210                                        })
1211                                        .next()
1212                                    });
1213                            hint.tcp
1214                                .into_iter()
1215                                .take(3)
1216                                .enumerate()
1217                                .map(move |(i, h)| (i, h, name.clone()))
1218                            })
1219                            .map(|(index, host, name)| async move {
1220                                async_io::Timer::after(std::time::Duration::from_secs(
1221                                    index as u64 * 5,
1222                                ))
1223                                .await;
1224                                transport::connect_tcp_relay(host, name).await
1225                            })
1226                            .map(|fut| Box::pin(fut) as ConnectorFuture),
1227                    ),
1228                ) as BoxIterator<ConnectorFuture>;
1229            }
1230
1231            #[cfg(target_family = "wasm")]
1232            {
1233                connectors = Box::new(
1234                    connectors.chain(
1235                        relay_hints
1236                            .into_iter()
1237                            /* A hint may have multiple addresses pointing towards the server. This may be multiple
1238                            * domain aliases or different ports or an IPv6 or IPv4 address. We only need
1239                            * to connect to one of them, since they are considered equivalent. However, we
1240                            * also want to be prepared for the rare case of one failing, thus we try to reach
1241                            * up to three different addresses. To not flood the system with requests, we
1242                            * start them in a 5 seconds interval spread. If one of them succeeds, the remaining ones
1243                            * will be cancelled anyways. Note that a hint might not necessarily be reachable via TCP.
1244                            */
1245                            .flat_map(|hint| {
1246                                /* If the hint has no name, take the first domain name as fallback */
1247                                let name = hint.name
1248                                    .or_else(|| {
1249                                        /* Try to parse as IP address. We are only interested in human readable names (the IP address will be printed anyways) */
1250                                        hint.tcp.iter()
1251                                            .filter_map(|hint| match url::Host::parse(&hint.hostname) {
1252                                                Ok(url::Host::Domain(_)) => Some(hint.hostname.clone()),
1253                                                _ => None,
1254                                            })
1255                                            .next()
1256                                    });
1257                                hint.ws
1258                                    .into_iter()
1259                                    .take(3)
1260                                    .enumerate()
1261                                    .map(move |(i, u)| (i, u, name.clone()))
1262                            })
1263                            .map(|(index, url, name)| async move {
1264                                crate::util::sleep(std::time::Duration::from_secs(
1265                                    index as u64 * 5,
1266                                ))
1267                                .await;
1268                                transport::connect_ws_relay(url, name).await
1269                            })
1270                            .map(|fut| Box::pin(fut) as ConnectorFuture),
1271                    ),
1272                ) as BoxIterator<ConnectorFuture>;
1273            }
1274        }
1275
1276        /* Do a handshake on all our found connections */
1277        let transit_key2 = transit_key.clone();
1278        let tside2 = tside.clone();
1279        let cryptor2 = cryptor.clone();
1280        #[allow(unused_mut)] // For WASM targets
1281        let mut connectors = Box::new(
1282            connectors
1283                .map(move |fut| {
1284                    let transit_key = transit_key2.clone();
1285                    let tside = tside2.clone();
1286                    let cryptor = cryptor2.clone();
1287                    async move {
1288                        let (socket, conn_info) = fut.await?;
1289                        let (transit, finalizer) = handshake_exchange(
1290                            is_leader,
1291                            tside,
1292                            socket,
1293                            &conn_info.conn_type,
1294                            &*cryptor,
1295                            transit_key,
1296                        )
1297                        .await?;
1298                        Ok((transit, finalizer, conn_info))
1299                    }
1300                })
1301                .map(|fut| {
1302                    Box::pin(fut) as BoxFuture<Result<HandshakeResult, TransitHandshakeError>>
1303                }),
1304        )
1305            as BoxIterator<BoxFuture<Result<HandshakeResult, TransitHandshakeError>>>;
1306
1307        /* Also listen on some port just in case. */
1308        #[cfg(not(target_family = "wasm"))]
1309        if let Some(listener) = listener {
1310            connectors = Box::new(
1311                connectors.chain(
1312                    std::iter::once(async move {
1313                        let transit_key = transit_key.clone();
1314                        let tside = tside.clone();
1315                        let cryptor = cryptor.clone();
1316                        let connect = || async {
1317                            let (socket, peer) = listener.accept().await?;
1318                            let (socket, info) =
1319                                transport::wrap_tcp_connection(socket, ConnectionType::Direct)?;
1320                            tracing::debug!("Got connection from {}!", peer);
1321                            let (transit, finalizer) = handshake_exchange(
1322                                is_leader,
1323                                tside.clone(),
1324                                socket,
1325                                &ConnectionType::Direct,
1326                                &*cryptor,
1327                                transit_key.clone(),
1328                            )
1329                            .await?;
1330                            Result::<_, TransitHandshakeError>::Ok((transit, finalizer, info))
1331                        };
1332                        loop {
1333                            match connect().await {
1334                                Ok(success) => break Ok(success),
1335                                Err(err) => {
1336                                    tracing::debug!(
1337                                        "Some handshake failed on the listening port: {:?}",
1338                                        err
1339                                    );
1340                                    continue;
1341                                },
1342                            }
1343                        }
1344                    })
1345                    .map(|fut| {
1346                        Box::pin(fut) as BoxFuture<Result<HandshakeResult, TransitHandshakeError>>
1347                    }),
1348                ),
1349            )
1350                as BoxIterator<BoxFuture<Result<HandshakeResult, TransitHandshakeError>>>;
1351        }
1352        connectors.collect::<futures::stream::futures_unordered::FuturesUnordered<_>>()
1353    }
1354}
1355
1356/**
1357 * An established Transit connection.
1358 *
1359 * While you can manually send and receive bytes over the TCP stream, this is not recommended as the transit protocol
1360 * also specifies an encrypted record pipe that does all the hard work for you. See the provided methods.
1361 */
1362pub struct Transit {
1363    /** Raw transit connection */
1364    socket: Box<dyn TransitTransport>,
1365    tx: Box<dyn crypto::TransitCryptoEncrypt>,
1366    rx: Box<dyn crypto::TransitCryptoDecrypt>,
1367}
1368
1369impl Transit {
1370    /** Receive and decrypt one message from the other side. */
1371    pub async fn receive_record(&mut self) -> Result<Box<[u8]>, TransitError> {
1372        self.rx.decrypt(&mut self.socket).await
1373    }
1374
1375    /** Send an encrypted message to the other side */
1376    pub async fn send_record(&mut self, plaintext: &[u8]) -> Result<(), TransitError> {
1377        assert!(!plaintext.is_empty());
1378        self.tx.encrypt(&mut self.socket, plaintext).await
1379    }
1380
1381    /// Flush the socket
1382    pub async fn flush(&mut self) -> Result<(), TransitError> {
1383        tracing::debug!("Flush");
1384        self.socket.flush().await.map_err(Into::into)
1385    }
1386
1387    /** Convert the transit connection to a [`Stream`]/[`Sink`] pair */
1388    #[cfg(not(target_family = "wasm"))]
1389    #[expect(clippy::type_complexity)]
1390    pub fn split(
1391        self,
1392    ) -> (
1393        impl futures::sink::Sink<Box<[u8]>, Error = TransitError>,
1394        impl futures_lite::stream::Stream<Item = Result<Box<[u8]>, TransitError>>,
1395    ) {
1396        let (reader, writer) = self.socket.split();
1397        (
1398            futures::sink::unfold(
1399                (writer, self.tx),
1400                |(mut writer, mut tx), plaintext: Box<[u8]>| async move {
1401                    tx.encrypt(&mut writer, &plaintext)
1402                        .await
1403                        .map(|()| (writer, tx))
1404                },
1405            ),
1406            futures::stream::try_unfold((reader, self.rx), |(mut reader, mut rx)| async move {
1407                rx.decrypt(&mut reader)
1408                    .await
1409                    .map(|record| Some((record, (reader, rx))))
1410            }),
1411        )
1412    }
1413}
1414
1415type HandshakeResult = (
1416    Box<dyn TransitTransport>,
1417    Box<dyn crypto::TransitCryptoInitFinalizer>,
1418    TransitInfo,
1419);
1420
1421/**
1422 * Do a transit handshake exchange, to establish a direct connection.
1423 *
1424 * This automatically does the relay handshake first if necessary. On the follower
1425 * side, the future will successfully run to completion if a connection could be
1426 * established. On the leader side, the handshake is not 100% completed: the caller
1427 * must write `Ok\n` into the stream that should be used (and optionally `Nevermind\n`
1428 * into all others).
1429 */
1430async fn handshake_exchange(
1431    is_leader: bool,
1432    tside: Arc<String>,
1433    mut socket: Box<dyn TransitTransport>,
1434    host_type: &ConnectionType,
1435    cryptor: &dyn crypto::TransitCryptoInit,
1436    key: Arc<Key<TransitKey>>,
1437) -> Result<
1438    (
1439        Box<dyn TransitTransport>,
1440        Box<dyn crypto::TransitCryptoInitFinalizer>,
1441    ),
1442    TransitHandshakeError,
1443> {
1444    if host_type != &ConnectionType::Direct {
1445        tracing::trace!("initiating relay handshake");
1446
1447        let sub_key = key.derive_subkey_from_purpose::<GenericKey>("transit_relay_token");
1448        socket
1449            .write_all(format!("please relay {} for side {}\n", sub_key.to_hex(), tside).as_bytes())
1450            .await?;
1451        let mut rx = [0u8; 3];
1452        socket.read_exact(&mut rx).await?;
1453        let ok_msg: [u8; 3] = *b"ok\n";
1454        ensure!(ok_msg == rx, TransitHandshakeError::RelayHandshakeFailed);
1455    }
1456
1457    let finalizer = if is_leader {
1458        cryptor.handshake_leader(&mut socket).await?
1459    } else {
1460        cryptor.handshake_follower(&mut socket).await?
1461    };
1462
1463    Ok((socket, finalizer))
1464}
1465
1466#[cfg(test)]
1467mod test {
1468    use super::*;
1469    use serde_json::json;
1470
1471    #[test]
1472    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
1473    pub fn test_abilities_encoding() {
1474        assert_eq!(
1475            serde_json::to_value(Abilities::ALL).unwrap(),
1476            json!([{"type": "direct-tcp-v1"}, {"type": "relay-v1"}])
1477        );
1478        assert_eq!(
1479            serde_json::to_value(Abilities::FORCE_DIRECT).unwrap(),
1480            json!([{"type": "direct-tcp-v1"}])
1481        );
1482    }
1483
1484    #[test]
1485    #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
1486    pub fn test_hints_encoding() {
1487        assert_eq!(
1488            serde_json::to_value(Hints::new(
1489                [DirectHint {
1490                    hostname: "localhost".into(),
1491                    port: 1234
1492                }],
1493                [RelayHint::new(
1494                    Some("default".into()),
1495                    [DirectHint::new("transit.magic-wormhole.io", 4001)],
1496                    ["ws://transit.magic-wormhole.io/relay".parse().unwrap(),],
1497                )]
1498            ))
1499            .unwrap(),
1500            json!([
1501                {
1502                    "type": "direct-tcp-v1",
1503                    "hostname": "localhost",
1504                    "port": 1234
1505                },
1506                {
1507                    "type": "relay-v1",
1508                    "name": "default",
1509                    "hints": [
1510                        {
1511                            "type": "direct-tcp-v1",
1512                            "hostname": "transit.magic-wormhole.io",
1513                            "port": 4001,
1514                        },
1515                        {
1516                            "type": "websocket",
1517                            "url": "ws://transit.magic-wormhole.io/relay",
1518                        },
1519                    ]
1520                }
1521            ])
1522        )
1523    }
1524}