Skip to main content

viam_rust_utils/rpc/
dial.rs

1use super::{
2    client_channel::*,
3    log_prefixes,
4    webrtc::{webrtc_action_with_timeout, Options},
5};
6use crate::gen::google;
7use crate::gen::proto::rpc::v1::{
8    auth_service_client::AuthServiceClient, AuthenticateRequest, Credentials,
9};
10use crate::gen::proto::rpc::webrtc::v1::{
11    call_response::Stage, call_update_request::Update,
12    signaling_service_client::SignalingServiceClient, CallUpdateRequest,
13    OptionalWebRtcConfigRequest, OptionalWebRtcConfigResponse,
14};
15use crate::gen::proto::rpc::webrtc::v1::{
16    CallRequest, IceCandidate, Metadata, RequestHeaders, Strings,
17};
18use crate::rpc::webrtc;
19use ::http::header::HeaderName;
20use ::http::{
21    uri::{Authority, Parts, PathAndQuery, Scheme},
22    HeaderValue, Version,
23};
24use ::viam_mdns::{discover, RecordKind, Response};
25use ::webrtc::ice_transport::{
26    ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
27    ice_connection_state::RTCIceConnectionState,
28};
29use ::webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
30use anyhow::{Context, Result};
31use core::fmt;
32use futures::stream::FuturesUnordered;
33use futures_util::{pin_mut, stream::StreamExt};
34use local_ip_address::list_afinet_netifas;
35use std::{
36    collections::HashMap,
37    net::{IpAddr, Ipv4Addr},
38    sync::{
39        atomic::{AtomicBool, Ordering},
40        Arc, Mutex, RwLock,
41    },
42    task::{Context as TaskContext, Poll},
43    time::{Duration, Instant},
44};
45use tokio::sync::{mpsc, watch};
46use tonic::body::BoxBody;
47use tonic::codegen::BoxFuture;
48use tonic::transport::{Body, Channel, Uri};
49
50/// TLS certificate verifier that accepts any certificate, used for mDNS connections
51/// where the local device's cert is signed by a public CA (Let's Encrypt or Google)
52/// but previously only the leaf cert was sent in the TLS handshake.  Without the
53/// intermediate CA cert in the chain, clients cannot build the path to the root even
54/// though the root is in the system trust store, so verification fails.
55///
56/// TODO(RSDK-13879): app commit d83137fc added `Bundle: true` to ACME certificate
57/// issuance (both Let's Encrypt and Google CA paths), which causes the full chain
58/// (leaf + intermediate) to be stored and sent during TLS handshakes.  A follow-up
59/// migration (RSDK-13799) redistributes renewal dates so that all existing robot
60/// certificates are re-issued with the bundled chain by May 22, 2026.  After that
61/// date, `NoCertVerification` can be replaced with tonic's standard TLS:
62///   1. Remove this struct entirely.
63///   2. Remove the `hyper-rustls` and `rustls` dependencies from Cargo.toml.
64///   3. Restore the `domain: &str` parameter to `create_channel` (it was removed
65///      along with this workaround).  The domain is the robot's canonical hostname
66///      from the original URI (e.g. `my-robot.abcdefg.local.viam.cloud`), NOT the
67///      discovered IP — it is needed for SNI and SAN verification since the cert's
68///      SANs are hostnames, not IP addresses.
69///   4. In `create_channel` (non-loopback mDNS path below), replace the entire
70///      custom-connector block with:
71///         let tls_config = ClientTlsConfig::new().domain_name(domain);
72///         let mut parts = uri.into_parts();
73///         parts.scheme = Some(Scheme::HTTPS);
74///         let uri = Uri::from_parts(parts)?;
75///         return Channel::builder(uri.clone())
76///             .tls_config(tls_config)?
77///             .connect()
78///             .await
79///             .with_context(|| format!("Connecting to {:?}", uri));
80///      `ClientTlsConfig::new()` uses the webpki root CA bundle (tonic is built
81///      with `tls-webpki-roots`), which already contains the Let's Encrypt and
82///      Google CA roots — no custom cert pinning required.
83#[derive(Debug)]
84struct NoCertVerification;
85
86impl rustls::client::ServerCertVerifier for NoCertVerification {
87    fn verify_server_cert(
88        &self,
89        _end_entity: &rustls::Certificate,
90        _intermediates: &[rustls::Certificate],
91        _server_name: &rustls::ServerName,
92        _scts: &mut dyn Iterator<Item = &[u8]>,
93        _ocsp_response: &[u8],
94        _now: std::time::SystemTime,
95    ) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
96        Ok(rustls::client::ServerCertVerified::assertion())
97    }
98}
99use tower::{Service, ServiceBuilder};
100use tower_http::auth::AddAuthorization;
101use tower_http::auth::AddAuthorizationLayer;
102use tower_http::set_header::{SetRequestHeader, SetRequestHeaderLayer};
103
104// gRPC status codes
105const STATUS_CODE_OK: i32 = 0;
106const STATUS_CODE_UNKNOWN: i32 = 2;
107const STATUS_CODE_RESOURCE_EXHAUSTED: i32 = 8;
108
109pub const VIAM_MDNS_SERVICE_NAME: &'static str = "_rpc._tcp.local";
110
111type SecretType = String;
112
113#[derive(Clone)]
114/// A communication channel to a given uri. The channel is either a direct tonic channel,
115/// or a webRTC channel.
116pub enum ViamChannel {
117    Direct(Channel),
118    DirectPreAuthorized(AddAuthorization<SetRequestHeader<Channel, HeaderValue>>),
119    WebRTC(Arc<WebRTCClientChannel>),
120}
121
122#[derive(Debug, Clone)]
123pub struct RPCCredentials {
124    entity: Option<String>,
125    credentials: Credentials,
126}
127
128impl RPCCredentials {
129    pub fn new(entity: Option<String>, r#type: SecretType, payload: String) -> Self {
130        Self {
131            credentials: Credentials { r#type, payload },
132            entity,
133        }
134    }
135}
136
137impl ViamChannel {
138    async fn create_resp(
139        channel: &mut Arc<WebRTCClientChannel>,
140        stream: crate::gen::proto::rpc::webrtc::v1::Stream,
141        request: http::Request<BoxBody>,
142        response: http::response::Builder,
143    ) -> http::Response<Body> {
144        let (parts, body) = request.into_parts();
145        let mut status_code = STATUS_CODE_OK;
146        let stream_id = stream.id;
147        let metadata = Some(metadata_from_parts(&parts));
148        let headers = RequestHeaders {
149            method: parts
150                .uri
151                .path_and_query()
152                .map(PathAndQuery::to_string)
153                .unwrap_or_default(),
154            metadata,
155            timeout: None,
156        };
157
158        if let Err(e) = channel.write_headers(&stream, headers).await {
159            log::error!("error writing headers: {e}");
160            channel.close_stream_with_recv_error(stream_id, e);
161            status_code = STATUS_CODE_UNKNOWN;
162        }
163
164        let data = hyper::body::to_bytes(body).await.unwrap().to_vec();
165        if let Err(e) = channel.write_message(Some(stream), data).await {
166            log::error!("error sending message: {e}");
167            channel.close_stream_with_recv_error(stream_id, e);
168            status_code = STATUS_CODE_UNKNOWN;
169        };
170
171        let body = match channel.resp_body_from_stream(stream_id) {
172            Ok(body) => body,
173            Err(e) => {
174                log::error!("error receiving response from stream: {e}");
175                channel.close_stream_with_recv_error(stream_id, e);
176                status_code = STATUS_CODE_UNKNOWN;
177                Body::empty()
178            }
179        };
180
181        let response = if status_code != STATUS_CODE_OK {
182            response.header("grpc-status", &status_code.to_string())
183        } else {
184            response
185        };
186
187        response.body(body).unwrap()
188    }
189}
190
191impl Service<http::Request<BoxBody>> for ViamChannel {
192    type Response = http::Response<Body>;
193    type Error = tonic::transport::Error;
194    type Future = BoxFuture<Self::Response, Self::Error>;
195
196    fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
197        match self {
198            Self::Direct(channel) => channel.poll_ready(cx),
199            Self::DirectPreAuthorized(channel) => channel.poll_ready(cx),
200            Self::WebRTC(_channel) => Poll::Ready(Ok(())),
201        }
202    }
203
204    fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
205        match self {
206            Self::Direct(channel) => Box::pin(channel.call(request)),
207            Self::DirectPreAuthorized(channel) => Box::pin(channel.call(request)),
208            Self::WebRTC(channel) => {
209                let mut channel = channel.clone();
210                let fut = async move {
211                    let response = http::response::Response::builder()
212                        // standardized gRPC headers.
213                        .header("content-type", "application/grpc")
214                        .version(Version::HTTP_2);
215
216                    match channel.new_stream() {
217                        Err(e) => {
218                            log::error!("{e}");
219                            let response = response
220                                .header("grpc-status", &STATUS_CODE_RESOURCE_EXHAUSTED.to_string())
221                                .body(Body::default())
222                                .unwrap();
223
224                            Ok(response)
225                        }
226                        Ok(stream) => {
227                            Ok(Self::create_resp(&mut channel, stream, request, response).await)
228                        }
229                    }
230                };
231                Box::pin(fut)
232            }
233        }
234    }
235}
236
237/// Options for modifying the connection parameters
238#[derive(Debug)]
239pub struct DialOptions {
240    credentials: Option<RPCCredentials>,
241    webrtc_options: Option<Options>,
242    uri: Option<Parts>,
243    disable_mdns: bool,
244    allow_downgrade: bool,
245    insecure: bool,
246    signaling_server_override: Option<String>,
247}
248#[derive(Clone)]
249pub struct WantsCredentials(());
250#[derive(Clone)]
251pub struct WantsUri(());
252#[derive(Clone)]
253pub struct WithCredentials(());
254#[derive(Clone)]
255pub struct WithoutCredentials(());
256
257pub trait AuthMethod {}
258impl AuthMethod for WithCredentials {}
259impl AuthMethod for WithoutCredentials {}
260/// A DialBuilder allows us to set options before establishing a connection to a server
261#[allow(dead_code)]
262pub struct DialBuilder<T> {
263    state: T,
264    config: DialOptions,
265}
266
267impl<T> fmt::Debug for DialBuilder<T> {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        f.debug_struct("Dial")
270            .field("State", &format_args!("{}", &std::any::type_name::<T>()))
271            .field("Opt", &format_args!("{:?}", self.config))
272            .finish()
273    }
274}
275
276impl DialOptions {
277    /// Creates a new DialBuilder
278    pub fn builder() -> DialBuilder<WantsUri> {
279        DialBuilder {
280            state: WantsUri(()),
281            config: DialOptions {
282                credentials: None,
283                uri: None,
284                allow_downgrade: false,
285                disable_mdns: false,
286                insecure: false,
287                webrtc_options: None,
288                signaling_server_override: None,
289            },
290        }
291    }
292}
293
294impl DialBuilder<WantsUri> {
295    /// Sets the uri to connect to
296    pub fn uri(self, uri: &str) -> DialBuilder<WantsCredentials> {
297        let uri_parts = uri_parts_with_defaults(uri);
298        DialBuilder {
299            state: WantsCredentials(()),
300            config: DialOptions {
301                credentials: None,
302                uri: Some(uri_parts),
303                allow_downgrade: false,
304                disable_mdns: false,
305                insecure: false,
306                webrtc_options: None,
307                signaling_server_override: None,
308            },
309        }
310    }
311}
312impl DialBuilder<WantsCredentials> {
313    /// Tells connecting logic to not expect/require credentials
314    pub fn without_credentials(self) -> DialBuilder<WithoutCredentials> {
315        DialBuilder {
316            state: WithoutCredentials(()),
317            config: DialOptions {
318                credentials: None,
319                uri: self.config.uri,
320                allow_downgrade: false,
321                disable_mdns: false,
322                insecure: false,
323                webrtc_options: None,
324                signaling_server_override: None,
325            },
326        }
327    }
328    /// Sets credentials to use when connecting
329    pub fn with_credentials(self, creds: RPCCredentials) -> DialBuilder<WithCredentials> {
330        DialBuilder {
331            state: WithCredentials(()),
332            config: DialOptions {
333                credentials: Some(creds),
334                uri: self.config.uri,
335                allow_downgrade: false,
336                disable_mdns: false,
337                insecure: false,
338                webrtc_options: None,
339                signaling_server_override: None,
340            },
341        }
342    }
343}
344
345impl<T: AuthMethod> DialBuilder<T> {
346    /// Attempts to connect insecurely with scheme of HTTP as a default
347    pub fn insecure(mut self) -> Self {
348        self.config.insecure = true;
349        self
350    }
351    /// Allows for downgrading and attempting to connect via HTTP if HTTPS fails
352    pub fn allow_downgrade(mut self) -> Self {
353        self.config.allow_downgrade = true;
354        self
355    }
356    /// Disables connection via mDNS
357    pub fn disable_mdns(mut self) -> Self {
358        self.config.disable_mdns = true;
359        self
360    }
361
362    /// Overrides any default connection behavior, forcing direct connection. Note that
363    /// the connection itself will fail if it is between a client and server on separate
364    /// networks and not over webRTC
365    pub fn disable_webrtc(mut self) -> Self {
366        let webrtc_options = Options::default().disable_webrtc();
367        self.config.webrtc_options = Some(webrtc_options);
368        self
369    }
370
371    /// Forces ICE transport policy to relay-only so only TURN candidates are used.
372    /// Useful for testing relay connectivity through a TURN server.
373    pub fn force_relay(mut self) -> Self {
374        self.config
375            .webrtc_options
376            .get_or_insert_with(Options::default)
377            .force_relay = true;
378        self
379    }
380
381    /// Strips TURN servers from the ICE config so only host and server-reflexive
382    /// candidates are used. Useful for testing direct connectivity without relay fallback.
383    pub fn force_p2p(mut self) -> Self {
384        self.config
385            .webrtc_options
386            .get_or_insert_with(Options::default)
387            .force_p2p = true;
388        self
389    }
390
391    /// Filters the signaling server's TURN list to only the server whose parsed URI
392    /// matches (compared by scheme, host, port, and transport — defaulting transport
393    /// to UDP if unspecified). Example: "turn:turn.viam.com:443"
394    pub fn turn_uri(mut self, uri: String) -> Self {
395        self.config
396            .webrtc_options
397            .get_or_insert_with(Options::default)
398            .turn_uri = Some(uri);
399        self
400    }
401
402    /// Overrides the signaling server address used for WebRTC negotiation.
403    pub fn signaling_server(mut self, address: String) -> Self {
404        self.config.signaling_server_override = Some(address);
405        self
406    }
407
408    async fn get_addr_from_interface(
409        iface: (&str, Vec<&IpAddr>),
410        candidates: &Vec<String>,
411        local_ipv4s: &std::collections::HashSet<Ipv4Addr>,
412    ) -> Option<String> {
413        let addresses: Vec<Ipv4Addr> = iface
414            .1
415            .iter()
416            .filter_map(|ip| match ip {
417                IpAddr::V4(v4) => Some(*v4),
418                IpAddr::V6(_) => None,
419            })
420            .collect();
421
422        let mut resp: Option<Response> = None;
423        for ipv4 in addresses {
424            for candidate in candidates {
425                let discovery = match discover::interface_with_loopback(
426                    VIAM_MDNS_SERVICE_NAME,
427                    Duration::from_millis(250),
428                    ipv4,
429                ) {
430                    Ok(d) => d,
431                    Err(e) => {
432                        log::debug!("mDNS socket error on {ipv4}: {e}");
433                        continue;
434                    }
435                };
436                let stream = discovery.listen();
437                pin_mut!(stream);
438                while let Some(Ok(response)) = stream.next().await {
439                    if let Some(hostname) = response.hostname() {
440                        // Machine uris come in local ("my-cool-robot.abcdefg.local.viam.cloud")
441                        // and non-local ("my-cool-robot.abcdefg.viam.cloud") forms. Sometimes
442                        // (namely with micro-rdk), our mdns query can only see one (the local) version.
443                        // However, users are typically passing the non-local version. By splitting at
444                        // "viam" and taking the only the first value, we can still search for
445                        // candidates based on the actual "my-cool-robot" name without being opinionated
446                        // on whether the candidate is locally named or not.
447                        let local_agnostic_candidate = candidate.as_str().split("viam").next()?;
448                        log::debug!(
449                            "mDNS response on {ipv4}: hostname={hostname:?}, candidate={candidate:?}, local_agnostic={local_agnostic_candidate:?}, matches={}",
450                            hostname.contains(local_agnostic_candidate)
451                        );
452                        if hostname.contains(local_agnostic_candidate) {
453                            resp = Some(response);
454                            break;
455                        }
456                    } else {
457                        log::debug!(
458                            "mDNS response on {ipv4}: no hostname (no PTR record); answers={:?}",
459                            response.answers
460                        );
461                    }
462                    if resp.is_some() {
463                        break;
464                    }
465                }
466            }
467        }
468
469        let resp = resp?;
470        let mut has_grpc = false;
471        let mut has_webrtc = false;
472        for field in resp.txt_records() {
473            has_grpc = has_grpc || field.contains("grpc");
474            has_webrtc = has_webrtc || field.contains("webrtc");
475        }
476
477        // Log all records in the response for diagnostics.
478        log::debug!(
479            "mDNS matched response records: {:?}",
480            resp.records().collect::<Vec<_>>()
481        );
482
483        // Prefer a non-loopback IPv4 address that is currently present on one of our own
484        // network interfaces.  viam-server may advertise a stale IP (e.g. a WiFi address
485        // from when it was started that is now unreachable because WiFi was turned off).
486        // Accepting only IPs we can actually route to avoids ENETUNREACH at connect time.
487        // If no reachable non-loopback IP is found, fall back to any advertised IPv4
488        // (including loopback) so that offline-only (loopback-only) machines still work.
489        let ip_addr = resp
490            .records()
491            .filter_map(|r| match r.kind {
492                RecordKind::A(addr) if !addr.is_loopback() && local_ipv4s.contains(&addr) => {
493                    Some(addr)
494                }
495                _ => None,
496            })
497            .next()
498            .or_else(|| {
499                resp.records().find_map(|r| match r.kind {
500                    RecordKind::A(addr) => Some(addr),
501                    _ => None,
502                })
503            });
504
505        if !(has_grpc || has_webrtc) || ip_addr.is_none() {
506            return None;
507        }
508        let mut local_addr = ip_addr?.to_string();
509        local_addr.push(':');
510        local_addr.push_str(&resp.port()?.to_string());
511        log::debug!("mDNS resolved address: {local_addr}");
512        Some(local_addr)
513    }
514
515    fn duplicate_uri(&self) -> Option<Parts> {
516        match &self.config.uri {
517            None => None,
518            Some(uri) => duplicate_uri(uri),
519        }
520    }
521
522    async fn get_mdns_uri(&self) -> Option<Parts> {
523        log::debug!("{}", log_prefixes::MDNS_QUERY_ATTEMPT);
524        if self.config.disable_mdns {
525            return None;
526        }
527
528        let mut uri = self.duplicate_uri()?;
529        let candidate = uri.authority.clone()?.to_string();
530
531        let candidates: Vec<String> = vec![candidate.replace('.', "-"), candidate];
532
533        let ifaces = list_afinet_netifas().ok()?;
534
535        // Collect all local IPv4 addresses for use in get_addr_from_interface, which prefers
536        // mDNS response IPs that are currently assigned to one of our own interfaces.
537        // viam-server may advertise a stale IP (e.g. a WiFi address from when it started,
538        // now unreachable because WiFi is off); filtering by current interface addresses
539        // avoids connecting to an unreachable address.
540        let local_ipv4s: std::collections::HashSet<Ipv4Addr> = ifaces
541            .iter()
542            .filter_map(|(_, ip)| match ip {
543                IpAddr::V4(v4) => Some(*v4),
544                _ => None,
545            })
546            .collect();
547
548        let ifaces: HashMap<&str, Vec<&IpAddr>> =
549            ifaces.iter().fold(HashMap::new(), |mut map, (k, v)| {
550                map.entry(k).or_default().push(v);
551                map
552            });
553
554        let mut iface_futures = FuturesUnordered::new();
555        for iface in ifaces {
556            iface_futures.push(Self::get_addr_from_interface(
557                iface,
558                &candidates,
559                &local_ipv4s,
560            ));
561        }
562
563        let mut local_addr: Option<String> = None;
564        while let Some(maybe_addr) = iface_futures.next().await {
565            if maybe_addr.is_some() {
566                local_addr = maybe_addr;
567                break;
568            }
569        }
570        let local_addr = match local_addr {
571            None => {
572                log::debug!("Unable to connect via mDNS");
573                return None;
574            }
575            Some(addr) => {
576                log::debug!("{}: {addr}", log_prefixes::MDNS_ADDRESS_FOUND);
577                addr
578            }
579        };
580
581        let auth = local_addr.parse::<Authority>().ok()?;
582        uri.authority = Some(auth);
583        uri.scheme = Some(Scheme::HTTP);
584
585        Some(uri)
586    }
587
588    // TODO(RSDK-13879) step 3: restore `domain: &str` as a second parameter (after
589    // `allow_downgrade`) and thread it through all four call sites below.  See the
590    // `NoCertVerification` doc comment for the full migration instructions.
591    async fn create_channel(allow_downgrade: bool, uri: Uri, for_mdns: bool) -> Result<Channel> {
592        if for_mdns {
593            let host = uri.host().unwrap_or("");
594            let is_loopback = host
595                .parse::<std::net::IpAddr>()
596                .map(|ip| ip.is_loopback())
597                .unwrap_or(false);
598
599            if is_loopback {
600                // viam-server serves plain gRPC (no TLS) on the loopback port it advertises
601                // via mDNS. The server only accepts HTTP/2 (gRPC requires it), so we must use
602                // h2c (HTTP/2 cleartext Prior Knowledge). tonic's own connect() sets
603                // http2_only=true which sends the h2c connection preface directly over TCP.
604                // We intentionally do NOT use connect_with_connector here: hyper-rustls's
605                // plain-TCP path sends HTTP/1.1, which the server rejects with an empty reply
606                // (BrokenPipe from our side).
607                log::debug!("mDNS create_channel: loopback {host}, using tonic h2c connect");
608                return Channel::builder(uri.clone())
609                    .connect()
610                    .await
611                    .with_context(|| format!("Connecting to {:?}", uri));
612            }
613
614            // Non-loopback LAN address: viam-server uses TLS but currently only sends the leaf
615            // cert (no intermediate), so standard verification fails.  Use a custom connector
616            // that skips cert verification for now.
617            // TODO(RSDK-13879) step 4: replace the block below with standard tonic TLS once all
618            // robot certs include the full chain (after May 22, 2026).  See the
619            // `NoCertVerification` doc comment for the exact replacement snippet.
620            log::debug!("mDNS create_channel: LAN address {host}, using no-cert-verify TLS");
621            let tls_config = rustls::ClientConfig::builder()
622                .with_safe_defaults()
623                .with_custom_certificate_verifier(Arc::new(NoCertVerification))
624                .with_no_client_auth();
625
626            let connector = hyper_rustls::HttpsConnectorBuilder::new()
627                .with_tls_config(tls_config)
628                .https_or_http()
629                .enable_http2()
630                .build();
631
632            let mut parts = uri.into_parts();
633            parts.scheme = Some(Scheme::HTTPS);
634            let uri = Uri::from_parts(parts)?;
635            log::debug!("mDNS create_channel: connecting to {uri:?}");
636            return Channel::builder(uri.clone())
637                .connect_with_connector(connector)
638                .await
639                .with_context(|| format!("Connecting to {:?}", uri));
640        }
641
642        let chan = match Channel::builder(uri.clone())
643            .connect()
644            .await
645            .with_context(|| format!("Connecting to {:?}", uri.clone()))
646        {
647            Ok(c) => c,
648            Err(e) => {
649                if allow_downgrade {
650                    let mut uri_parts = uri.clone().into_parts();
651                    uri_parts.scheme = Some(Scheme::HTTP);
652                    let uri = Uri::from_parts(uri_parts)?;
653                    Channel::builder(uri).connect().await?
654                } else {
655                    return Err(anyhow::anyhow!(e));
656                }
657            }
658        };
659        Ok(chan)
660    }
661}
662
663impl DialBuilder<WithoutCredentials> {
664    fn clone(&self) -> Self {
665        DialBuilder {
666            state: WithoutCredentials(()),
667            config: DialOptions {
668                credentials: None,
669                webrtc_options: self.config.webrtc_options.clone(),
670                uri: self.duplicate_uri(),
671                disable_mdns: self.config.disable_mdns,
672                allow_downgrade: self.config.allow_downgrade,
673                insecure: self.config.insecure,
674                signaling_server_override: self.config.signaling_server_override.clone(),
675            },
676        }
677    }
678
679    /// attempts to establish a connection without credentials to the DialBuilder's given uri
680    async fn connect_inner(
681        self,
682        mdns_uri: Option<Parts>,
683        mut original_uri_parts: Parts,
684    ) -> Result<ViamChannel> {
685        let webrtc_options = self.config.webrtc_options;
686        let disable_webrtc = match &webrtc_options {
687            Some(options) => options.disable_webrtc,
688            None => false,
689        };
690        if self.config.insecure {
691            original_uri_parts.scheme = Some(Scheme::HTTP);
692        }
693        let original_uri = Uri::from_parts(original_uri_parts)?;
694        let uri2 = original_uri.clone();
695        let uri = infer_remote_uri_from_authority(
696            original_uri,
697            self.config.signaling_server_override.as_deref(),
698        );
699        let domain = uri2.authority().to_owned().unwrap().as_str();
700
701        let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
702        let attempting_mdns = mdns_uri.is_some();
703        if attempting_mdns {
704            log::debug!("Attempting to connect via mDNS");
705        } else {
706            log::debug!("Attempting to connect");
707        }
708
709        let channel = match mdns_uri {
710            // TODO(RSDK-13879) step 3: pass `domain` as second arg once NoCertVerification is removed.
711            Some(uri) => Self::create_channel(self.config.allow_downgrade, uri, true).await,
712            // not actually an error necessarily, but we want to ensure that a channel is still
713            // created with the default uri
714            None => Err(anyhow::anyhow!("")),
715        };
716
717        let channel = match channel {
718            Ok(c) => {
719                log::debug!("Connected via mDNS");
720                c
721            }
722            Err(e) => {
723                if attempting_mdns {
724                    log::debug!(
725                        "Unable to connect via mDNS; falling back to robot URI. Error: {e:#}"
726                    );
727                }
728                // TODO(RSDK-13879) step 3: pass `domain` as second arg once NoCertVerification is removed.
729                Self::create_channel(self.config.allow_downgrade, uri.clone(), false).await?
730            }
731        };
732
733        // TODO (RSDK-517) make maybe_connect_via_webrtc take a more generic type so we don't
734        // need to add these dummy layers.
735        let intercepted_channel = ServiceBuilder::new()
736            .layer(AddAuthorizationLayer::basic(
737                "fake username",
738                "fake password",
739            ))
740            .layer(SetRequestHeaderLayer::overriding(
741                HeaderName::from_static("rpc-host"),
742                HeaderValue::from_str(domain)?,
743            ))
744            .service(channel.clone());
745
746        if disable_webrtc {
747            log::debug!("{}", log_prefixes::DIALED_GRPC);
748            Ok(ViamChannel::Direct(channel.clone()))
749        } else {
750            match maybe_connect_via_webrtc(uri, intercepted_channel.clone(), webrtc_options).await {
751                Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
752                Err(e) => {
753                    log::error!("error connecting via webrtc: {e}. Attempting to connect directly");
754                    log::debug!("{}", log_prefixes::DIALED_GRPC);
755                    Ok(ViamChannel::Direct(channel.clone()))
756                }
757            }
758        }
759    }
760
761    async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
762        let mdns_uri =
763            webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
764                .await
765                .ok()
766                .flatten()
767                .ok_or(anyhow::anyhow!(
768                    "Unable to establish connection via mDNS; uri not found"
769                ))?;
770
771        self.connect_inner(Some(mdns_uri), original_uri).await
772    }
773
774    pub async fn connect(self) -> Result<ViamChannel> {
775        log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
776        let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
777            "Attempting to connect but there was no uri"
778        ))?;
779        let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
780            "Attempting to connect but there was no uri"
781        ))?;
782
783        let skip_mdns = self.config.disable_mdns;
784
785        // We want to short circuit and return the first `Ok` result from our connection
786        // attempts, which `tokio::select!` does great. Buuuuut, we don't want to
787        // abandon the `Err` results, and we want to provide comprehensive logging for
788        // debugging purposes. Hence the loop and pinning. The pinning lets us reference
789        // the same future multiple times, while the loop lets us immediately return on the
790        // first `Ok` result while still seeing and logging any error results.
791        //
792        // When mDNS is skipped (disable_mdns), with_mdns_err is pre-set so
793        // the select guard disables that branch and only the direct connection is attempted.
794        tokio::pin! {
795            let with_mdns = self.clone().connect_mdns(original_uri);
796            let without_mdns = self.connect_inner(None, original_uri2);
797        }
798        let mut with_mdns_err: Option<anyhow::Error> =
799            skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
800        let mut without_mdns_err: Option<anyhow::Error> = None;
801        while with_mdns_err.is_none() || without_mdns_err.is_none() {
802            tokio::select! {
803                with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
804                    match with_mdns {
805                        Ok(chan) => return Ok(chan),
806                        Err(e) => {
807                            log::debug!("Error connecting with mdns: {e}");
808                            with_mdns_err = Some(e);
809                        }
810                    }
811                }
812                without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
813                    match without_mdns {
814                        Ok(chan) => return Ok(chan),
815                        Err(e) => {
816                            log::debug!("Error connecting without mdns: {e}");
817                            without_mdns_err = Some(e);
818                        }
819                    }
820                }
821            }
822        }
823        Err(anyhow::anyhow!(
824            "Unable to connect with or without mdns.
825                    with_mdns err: {with_mdns_err:?}
826                    without_mdns err: {without_mdns_err:?}"
827        ))
828    }
829}
830
831async fn get_auth_token(
832    channel: &mut Channel,
833    creds: Credentials,
834    entity: String,
835) -> Result<String> {
836    let mut auth_service = AuthServiceClient::new(channel);
837    let req = AuthenticateRequest {
838        entity,
839        credentials: Some(creds),
840    };
841
842    let rsp = auth_service.authenticate(req).await?;
843    Ok(rsp.into_inner().access_token)
844}
845
846impl DialBuilder<WithCredentials> {
847    fn clone(&self) -> Self {
848        DialBuilder {
849            state: WithCredentials(()),
850            config: DialOptions {
851                credentials: self.config.credentials.clone(),
852                webrtc_options: self.config.webrtc_options.clone(),
853                uri: self.duplicate_uri(),
854                disable_mdns: self.config.disable_mdns,
855                allow_downgrade: self.config.allow_downgrade,
856                insecure: self.config.insecure,
857                signaling_server_override: self.config.signaling_server_override.clone(),
858            },
859        }
860    }
861
862    async fn connect_inner(
863        self,
864        mdns_uri: Option<Parts>,
865        mut original_uri_parts: Parts,
866    ) -> Result<ViamChannel> {
867        let is_insecure = self.config.insecure;
868
869        let webrtc_options = self.config.webrtc_options;
870        let disable_webrtc = match &webrtc_options {
871            Some(options) => options.disable_webrtc,
872            None => false,
873        };
874
875        if is_insecure {
876            original_uri_parts.scheme = Some(Scheme::HTTP);
877        }
878
879        let original_uri = Uri::from_parts(original_uri_parts)?;
880
881        let domain = original_uri.authority().unwrap().to_string();
882        let uri_for_auth = infer_remote_uri_from_authority(
883            original_uri.clone(),
884            self.config.signaling_server_override.as_deref(),
885        );
886
887        let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
888        let attempting_mdns = mdns_uri.is_some();
889
890        let allow_downgrade = self.config.allow_downgrade;
891        if attempting_mdns {
892            log::debug!("Attempting to connect via mDNS");
893        } else {
894            log::debug!("Attempting to connect");
895        }
896        let channel = match mdns_uri {
897            // TODO(RSDK-13879) step 3: pass `domain` as second arg once NoCertVerification is removed.
898            Some(uri) => Self::create_channel(allow_downgrade, uri, true).await,
899            // not actually an error necessarily, but we want to ensure that a channel is still
900            // created with the default uri
901            None => Err(anyhow::anyhow!("")),
902        };
903        let real_channel = match channel {
904            Ok(c) => {
905                log::debug!("Connected via mDNS");
906                c
907            }
908            Err(e) => {
909                if attempting_mdns {
910                    log::debug!(
911                        "Unable to connect via mDNS; falling back to robot URI. Error: {e:#}"
912                    );
913                }
914                // TODO(RSDK-13879) step 3: pass `domain` as second arg once NoCertVerification is removed.
915                Self::create_channel(allow_downgrade, uri_for_auth, false).await?
916            }
917        };
918
919        log::debug!("{}", log_prefixes::ACQUIRING_AUTH_TOKEN);
920        let token = get_auth_token(
921            &mut real_channel.clone(),
922            self.config
923                .credentials
924                .as_ref()
925                .unwrap()
926                .credentials
927                .clone(),
928            self.config
929                .credentials
930                .unwrap()
931                .entity
932                .unwrap_or_else(|| domain.clone()),
933        )
934        .await?;
935        log::debug!("{}", log_prefixes::ACQUIRED_AUTH_TOKEN);
936
937        let channel = ServiceBuilder::new()
938            .layer(AddAuthorizationLayer::bearer(&token))
939            .layer(SetRequestHeaderLayer::overriding(
940                HeaderName::from_static("rpc-host"),
941                HeaderValue::from_str(domain.as_str())?,
942            ))
943            .service(real_channel);
944
945        if disable_webrtc {
946            log::debug!("Connected via gRPC");
947            Ok(ViamChannel::DirectPreAuthorized(channel))
948        } else {
949            match maybe_connect_via_webrtc(original_uri, channel.clone(), webrtc_options).await {
950                Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
951                Err(e) => {
952                    log::error!(
953                    "Unable to establish webrtc connection due to error: [{e}]. Attempting direct connection."
954                );
955                    log::debug!("Connected via gRPC");
956                    Ok(ViamChannel::DirectPreAuthorized(channel))
957                }
958            }
959        }
960    }
961
962    async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
963        // NOTE(benjirewis): Use a duration of 1500ms for getting the mDNS URI. I've anecdotally
964        // seen times as great as 922ms to fetch a non-loopback mDNS URI. With an
965        // interface_with_loopback query interval of 250ms, 1500ms here should give us time for ~6
966        // queries.
967        let mdns_uri =
968            webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
969                .await
970                .ok()
971                .flatten()
972                .ok_or(anyhow::anyhow!(
973                    "Unable to establish connection via mDNS; uri not found"
974                ))?;
975
976        self.connect_inner(Some(mdns_uri), original_uri).await
977    }
978
979    /// attempts to establish a connection with credentials to the DialBuilder's given uri
980    pub async fn connect(self) -> Result<ViamChannel> {
981        log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
982        let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
983            "Attempting to connect but there was no uri"
984        ))?;
985        let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
986            "Attempting to connect but there was no uri"
987        ))?;
988
989        let skip_mdns = self.config.disable_mdns;
990
991        // We want to short circuit and return the first `Ok` result from our connection
992        // attempts, which `tokio::select!` does great. Buuuuut, we don't want to
993        // abandon the `Err` results, and we want to provide comprehensive logging for
994        // debugging purposes. Hence the loop and pinning. The pinning lets us reference
995        // the same future multiple times, while the loop lets us immediately return on the
996        // first `Ok` result while still seeing and logging any error results.
997        //
998        // When mDNS is skipped (disable_mdns), with_mdns_err is pre-set so
999        // the select guard disables that branch and only the direct connection is attempted.
1000        tokio::pin! {
1001            let with_mdns = self.clone().connect_mdns(original_uri);
1002            let without_mdns = self.connect_inner(None, original_uri2);
1003        }
1004        let mut with_mdns_err: Option<anyhow::Error> =
1005            skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
1006        let mut without_mdns_err: Option<anyhow::Error> = None;
1007        while with_mdns_err.is_none() || without_mdns_err.is_none() {
1008            tokio::select! {
1009                with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
1010                    match with_mdns {
1011                        Ok(chan) => return Ok(chan),
1012                        Err(e) => {
1013                            log::debug!("Error connecting with mdns: {e}");
1014                            with_mdns_err = Some(e);
1015                        }
1016                    }
1017                }
1018                without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
1019                    match without_mdns {
1020                        Ok(chan) => return Ok(chan),
1021                        Err(e) => {
1022                            log::debug!("Error connecting without mdns: {e}");
1023                            without_mdns_err = Some(e);
1024                        }
1025                    }
1026                }
1027            }
1028        }
1029        Err(anyhow::anyhow!(
1030            "Unable to connect with or without mdns.
1031                    with_mdns err: {with_mdns_err:?}
1032                    without_mdns err: {without_mdns_err:?}"
1033        ))
1034    }
1035}
1036
1037async fn send_done_or_error_update(
1038    update: CallUpdateRequest,
1039    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1040) {
1041    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1042
1043    if let Err(e) = signaling_client
1044        .call_update(update)
1045        .await
1046        .map_err(anyhow::Error::from)
1047        .map(|_| ())
1048    {
1049        log::error!("Error sending done or error update: {e}")
1050    }
1051}
1052
1053async fn send_error_once(
1054    sent_error: Arc<AtomicBool>,
1055    uuid: &String,
1056    err: &anyhow::Error,
1057    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1058) {
1059    if sent_error.load(Ordering::Acquire) {
1060        return;
1061    }
1062
1063    let err = google::rpc::Status {
1064        code: google::rpc::Code::Unknown.into(),
1065        message: err.to_string(),
1066        details: Vec::new(),
1067    };
1068    sent_error.store(true, Ordering::Release);
1069    let update_request = CallUpdateRequest {
1070        uuid: uuid.to_string(),
1071        update: Some(Update::Error(err)),
1072    };
1073
1074    send_done_or_error_update(update_request, channel).await
1075}
1076
1077async fn send_done_once(
1078    sent_done: Arc<AtomicBool>,
1079    uuid: &String,
1080    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1081) {
1082    if sent_done.load(Ordering::Acquire) {
1083        return;
1084    }
1085    sent_done.store(true, Ordering::Release);
1086    let update_request = CallUpdateRequest {
1087        uuid: uuid.to_string(),
1088        update: Some(Update::Done(true)),
1089    };
1090
1091    send_done_or_error_update(update_request, channel).await
1092}
1093
1094#[derive(Default)]
1095struct CallerUpdateStats {
1096    count: u128,
1097    total_duration: Duration,
1098    max_duration: Duration,
1099}
1100
1101impl fmt::Display for CallerUpdateStats {
1102    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1103        let average_duration = &self.total_duration.as_millis() / &self.count;
1104        writeln!(
1105            f,
1106            "Caller update statistics: num_updates: {}, average_duration: {}ms, max_duration: {}ms",
1107            &self.count,
1108            average_duration,
1109            &self.max_duration.as_millis()
1110        )?;
1111        Ok(())
1112    }
1113}
1114
1115async fn maybe_connect_via_webrtc(
1116    uri: Uri,
1117    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1118    webrtc_options: Option<Options>,
1119) -> Result<Arc<WebRTCClientChannel>> {
1120    let webrtc_options = webrtc_options.unwrap_or_else(|| Options::infer_from_uri(uri.clone()));
1121    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1122    let response = match signaling_client
1123        .optional_web_rtc_config(OptionalWebRtcConfigRequest::default())
1124        .await
1125    {
1126        Ok(resp) => resp,
1127        Err(e) => {
1128            if e.code() == tonic::Code::Unimplemented {
1129                tonic::Response::new(OptionalWebRtcConfigResponse::default())
1130            } else {
1131                return Err(anyhow::anyhow!(e));
1132            }
1133        }
1134    };
1135
1136    let optional_config = response.into_inner().config;
1137
1138    if webrtc_options.force_relay && webrtc_options.force_p2p {
1139        log::warn!("force_relay and force_p2p are both set; forceP2P strips TURN servers that forceRelay requires so the connection will fail");
1140    }
1141
1142    let (base_config, optional_config) = webrtc::apply_ice_policy(
1143        webrtc_options.config,
1144        optional_config,
1145        webrtc_options.force_relay,
1146        webrtc_options.force_p2p,
1147    );
1148
1149    if webrtc_options.force_relay {
1150        log::debug!("force relay enabled; using relay-only ICE transport policy");
1151    }
1152
1153    if webrtc_options.force_p2p {
1154        log::debug!("force P2P enabled; stripping TURN servers and ignoring signaling server ICE config");
1155    }
1156
1157    let mut config = webrtc::extend_webrtc_config(base_config, optional_config);
1158    
1159    if webrtc_options.force_p2p && webrtc_options.turn_uri.is_some() {
1160        log::warn!("force_p2p is set alongside turn_uri; the TURN filter will have no effect since TURN servers were already stripped");
1161    }
1162    let turn_uri = webrtc_options.turn_uri.as_deref().and_then(|s| {
1163        let parsed = webrtc::TurnUri::parse(s);
1164        if parsed.is_none() {
1165            log::warn!("Failed to parse turn_uri, ignoring: {s:?}");
1166        }
1167        parsed
1168    });
1169    config = webrtc::apply_turn_options(config, turn_uri.as_ref());
1170    if let Some(ref uri) = turn_uri {
1171        log::debug!("TURN filter options set: turn_uri={uri:?}");
1172    }
1173
1174    let (peer_connection, data_channel) =
1175        webrtc::new_peer_connection_for_client(config, webrtc_options.disable_trickle_ice).await?;
1176
1177    let sent_done_or_error = Arc::new(AtomicBool::new(false));
1178    let uuid_lock = Arc::new(RwLock::new("".to_string()));
1179    let uuid_for_ice_gathering_thread = uuid_lock.clone();
1180
1181    // Using an mpsc channel to report unrecoverable errors during Signaling, so we
1182    // don't have to wait until the timeout expires before giving up on this attempt.
1183    // The size of the channel is set to 1 since any error (or success) should terminate the function
1184    let (is_open_s, mut is_open_r) = mpsc::channel(1);
1185    let on_open_is_open = is_open_s.clone();
1186
1187    data_channel.on_open(Box::new(move || {
1188        let _ = on_open_is_open.try_send(None); // ignore sending errors, either an error (or success) was already sent or the operation will succeed
1189        Box::pin(async move {})
1190    }));
1191
1192    let exchange_done = Arc::new(AtomicBool::new(false));
1193    let (remote_description_set_s, remote_description_set_r) = watch::channel(None);
1194    let ice_done = Arc::new(tokio::sync::Notify::new());
1195    let ice_done2 = ice_done.clone();
1196    let caller_update_stats = Arc::new(Mutex::new(CallerUpdateStats::default()));
1197
1198    if !webrtc_options.disable_trickle_ice {
1199        let offer = peer_connection.create_offer(None).await?;
1200        let channel2 = channel.clone();
1201        let uuid_lock2 = uuid_lock.clone();
1202        let sent_done_or_error2 = sent_done_or_error.clone();
1203
1204        let exchange_done = exchange_done.clone();
1205
1206        let on_local_ice_candidate_failure = is_open_s.clone();
1207
1208        let caller_update_stats = caller_update_stats.clone();
1209        let caller_update_stats2 = caller_update_stats.clone();
1210        peer_connection.on_ice_connection_state_change(Box::new(
1211            move |state: RTCIceConnectionState| {
1212                let caller_update_stats = caller_update_stats.clone();
1213                Box::pin(async move {
1214                    if state == RTCIceConnectionState::Completed {
1215                        let caller_update_stats_inner = caller_update_stats.lock().unwrap();
1216                        log::debug!("{}", caller_update_stats_inner);
1217                    }
1218                })
1219            },
1220        ));
1221        peer_connection.on_ice_candidate(Box::new(
1222            move |ice_candidate: Option<RTCIceCandidate>| {
1223                if exchange_done.load(Ordering::Acquire) {
1224                    return Box::pin(async move {});
1225                }
1226                let channel = channel2.clone();
1227                let sent_done_or_error = sent_done_or_error2.clone();
1228                let ice_done = ice_done.clone();
1229                let uuid_lock = uuid_lock2.clone();
1230                let on_local_ice_candidate_failure = on_local_ice_candidate_failure.clone();
1231                let mut remote_description_set_r = remote_description_set_r.clone();
1232                let caller_update_stats = caller_update_stats2.clone();
1233                Box::pin(async move {
1234                    // If the value in the watch channel has not been set yet, we wait until it does.
1235                    // Afterwards Some(()) should be visible to all watcher and any watcher waiting  will
1236                    // return
1237                    if remote_description_set_r.borrow().is_none() {
1238                        match webrtc_action_with_timeout(remote_description_set_r.changed()).await {
1239                            Ok(Err(e)) => {
1240                                let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1241                                    anyhow::anyhow!(
1242                                        "remote description watch channel is closed with error {e}"
1243                                    ),
1244                                )));
1245                            }
1246                            Err(_) => {
1247                                log::info!(
1248                                    "timed out on_ice_candidate; remote description was never set"
1249                                );
1250                                let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1251                                    anyhow::anyhow!("timed out waiting for remote description"),
1252                                )));
1253                            }
1254                            _ => (),
1255                        }
1256                    }
1257
1258                    let uuid = uuid_lock.read().unwrap().to_string();
1259                    // Note(ethan): for reasons that aren't entirely clear to me, parallel dialing
1260                    // occasionally causes us to not receive a signaling client response when
1261                    // trying to establish a connection. This results in noisy error messages that
1262                    // fortunately are harmless (this problem seems to only ever affect one branch
1263                    // of the parallel dial, so we still end up with a successful connection).
1264                    // By checking if the `uuid` is empty, we can tell if we're in such a case and
1265                    // exit out before it results in logging noisy error messages.
1266                    //
1267                    // It would be lovely to understand this problem better, but given that it's
1268                    // not actually causing performance failures it's probably not worth the effort
1269                    // at this time.
1270                    if uuid.is_empty() {
1271                        log::debug!(
1272                            "UUID never updated. This is likely because we never received a response \
1273                            from the signaling client. This happens occasionally with parallel dialing \
1274                            and isn't concerning provided connection still occurs."
1275                        );
1276                        return;
1277                    }
1278                    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1279                    match ice_candidate {
1280                        Some(ice_candidate) => {
1281                            log::debug!("Gathered local candidate of {ice_candidate}");
1282                            if sent_done_or_error.load(Ordering::Acquire) {
1283                                return;
1284                            }
1285                            let proto_candidate = ice_candidate_to_proto(ice_candidate).await;
1286                            match proto_candidate {
1287                                Ok(proto_candidate) => {
1288                                    let update_request = CallUpdateRequest {
1289                                        uuid: uuid.clone(),
1290                                        update: Some(Update::Candidate(proto_candidate)),
1291                                    };
1292                                    let call_update_start = Instant::now();
1293                                    if let Err(e) = webrtc_action_with_timeout(
1294                                        signaling_client.call_update(update_request),
1295                                    )
1296                                    .await
1297                                    .and_then(|resp| resp.map_err(anyhow::Error::from))
1298                                    {
1299                                        log::error!("Error sending ice candidate: {e}");
1300                                        let _ = on_local_ice_candidate_failure.try_send(Some(
1301                                            Box::new(anyhow::anyhow!(
1302                                                "Error sending ice candidate: {e}"
1303                                            )),
1304                                        ));
1305                                    }
1306                                    let mut caller_update_stats_inner =
1307                                        caller_update_stats.lock().unwrap();
1308                                    caller_update_stats_inner.count += 1;
1309                                    let call_update_duration = call_update_start.elapsed();
1310                                    if call_update_duration > caller_update_stats_inner.max_duration
1311                                    {
1312                                        caller_update_stats_inner.max_duration =
1313                                            call_update_duration;
1314                                    }
1315                                    caller_update_stats_inner.total_duration +=
1316                                        call_update_duration;
1317                                }
1318                                Err(e) => log::error!("Error parsing ice candidate: {e}"),
1319                            }
1320                        }
1321                        None => {
1322                            // will only be executed once when gathering is finished
1323                            ice_done.notify_one();
1324                            send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1325                        }
1326                    }
1327                })
1328            },
1329        ));
1330
1331        peer_connection.set_local_description(offer).await?;
1332    }
1333
1334    let local_description = peer_connection.local_description().await.unwrap();
1335
1336    // Local SD will be multi-line, so use two log messages to indicate start, SD and end.
1337    log::debug!(
1338        "{}\n{}",
1339        log_prefixes::START_LOCAL_SESSION_DESCRIPTION,
1340        local_description.sdp
1341    );
1342    log::debug!("{}", log_prefixes::END_LOCAL_SESSION_DESCRIPTION);
1343
1344    let sdp = encode_sdp(local_description)?;
1345    let call_request = CallRequest {
1346        sdp,
1347        disable_trickle: webrtc_options.disable_trickle_ice,
1348    };
1349
1350    let client_channel = WebRTCClientChannel::new(peer_connection, data_channel).await;
1351    let client_channel_for_ice_gathering_thread = Arc::downgrade(&client_channel);
1352    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1353    let mut call_client = signaling_client.call(call_request).await?.into_inner();
1354
1355    let channel2 = channel.clone();
1356    let sent_done_or_error2 = sent_done_or_error.clone();
1357    tokio::spawn(async move {
1358        let uuid = uuid_for_ice_gathering_thread;
1359        let client_channel = client_channel_for_ice_gathering_thread;
1360        let init_received = AtomicBool::new(false);
1361        let sent_done = sent_done_or_error2;
1362
1363        loop {
1364            let response = match webrtc_action_with_timeout(call_client.message())
1365                .await
1366                .and_then(|resp| resp.map_err(anyhow::Error::from))
1367            {
1368                Ok(cr) => match cr {
1369                    Some(cr) => cr,
1370                    None => {
1371                        // want to delay sending done until we either are actually done, or
1372                        // we hit a timeout
1373                        let _ = webrtc_action_with_timeout(ice_done2.notified()).await;
1374                        let uuid = uuid.read().unwrap().to_string();
1375                        send_done_once(sent_done.clone(), &uuid, channel2.clone()).await;
1376                        break;
1377                    }
1378                },
1379                Err(e) => {
1380                    log::error!("Error processing call response: {e}");
1381                    let _ = is_open_s.try_send(Some(Box::new(e)));
1382                    break;
1383                }
1384            };
1385
1386            match response.stage {
1387                Some(Stage::Init(init)) => {
1388                    if init_received.load(Ordering::Acquire) {
1389                        let uuid = uuid.read().unwrap().to_string();
1390                        let e = anyhow::anyhow!("Init received more than once");
1391                        send_error_once(sent_done.clone(), &uuid, &e, channel2.clone()).await;
1392                        let _ = is_open_s.try_send(Some(Box::new(e)));
1393                        break;
1394                    }
1395                    init_received.store(true, Ordering::Release);
1396                    {
1397                        let mut uuid_s = uuid.write().unwrap();
1398                        uuid_s.clone_from(&response.uuid);
1399                    }
1400
1401                    let answer = match decode_sdp(init.sdp) {
1402                        Ok(a) => a,
1403                        Err(e) => {
1404                            send_error_once(
1405                                sent_done.clone(),
1406                                &response.uuid,
1407                                &e,
1408                                channel2.clone(),
1409                            )
1410                            .await;
1411                            let _ = is_open_s.try_send(Some(Box::new(e)));
1412                            break;
1413                        }
1414                    };
1415                    {
1416                        let cc = match client_channel.upgrade() {
1417                            Some(cc) => cc,
1418                            None => {
1419                                break;
1420                            }
1421                        };
1422                        if let Err(e) = cc
1423                            .base_channel
1424                            .peer_connection
1425                            .set_remote_description(answer)
1426                            .await
1427                        {
1428                            let e = anyhow::Error::from(e);
1429                            send_error_once(
1430                                sent_done.clone(),
1431                                &response.uuid,
1432                                &e,
1433                                channel2.clone(),
1434                            )
1435                            .await;
1436                            let _ = is_open_s.try_send(Some(Box::new(e)));
1437                            break;
1438                        }
1439                    }
1440                    let _ = remote_description_set_s.send_replace(Some(()));
1441                    if webrtc_options.disable_trickle_ice {
1442                        send_done_once(sent_done.clone(), &response.uuid, channel2.clone()).await;
1443                        break;
1444                    }
1445                }
1446
1447                Some(Stage::Update(update)) => {
1448                    let uuid_s = uuid.read().unwrap().to_string();
1449                    if !init_received.load(Ordering::Acquire) {
1450                        let e = anyhow::anyhow!("Got update before init stage");
1451                        send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1452                        let _ = is_open_s.try_send(Some(Box::new(e)));
1453                        break;
1454                    }
1455
1456                    if response.uuid != *uuid.read().unwrap() {
1457                        let e = anyhow::anyhow!(
1458                            "uuid mismatch: have {}, want {}",
1459                            response.uuid,
1460                            uuid_s,
1461                        );
1462                        send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1463                        let _ = is_open_s.try_send(Some(Box::new(e)));
1464                        break;
1465                    }
1466                    match ice_candidate_from_proto(update.candidate) {
1467                        Ok(candidate) => {
1468                            let client_channel = match client_channel.upgrade() {
1469                                Some(cc) => cc,
1470                                None => {
1471                                    break;
1472                                }
1473                            };
1474                            log::debug!("Received remote ICE candidate of {candidate:#?}");
1475                            if let Err(e) = client_channel
1476                                .base_channel
1477                                .peer_connection
1478                                .add_ice_candidate(candidate)
1479                                .await
1480                            {
1481                                let e = anyhow::Error::from(e);
1482                                send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone())
1483                                    .await;
1484                                let _ = is_open_s.try_send(Some(Box::new(e)));
1485                                break;
1486                            }
1487                        }
1488                        Err(e) => log::error!("Error parsing ice candidate: {e}"),
1489                    }
1490                }
1491                None => continue,
1492            }
1493        }
1494    });
1495
1496    // TODO (GOUT-11): create separate authorization if external_auth_addr and/or creds.Type is `Some`
1497
1498    // Delay returning the client channel until data channel is open, so we don't lose messages
1499    let is_open = webrtc_action_with_timeout(is_open_r.recv()).await;
1500    match is_open {
1501        Ok(is_open) => {
1502            if let Some(Some(e)) = is_open {
1503                return Err(anyhow::anyhow!("Couldn't connect to peer with error {e}"));
1504            }
1505        }
1506        Err(_) => {
1507            return Err(anyhow::anyhow!("Timed out opening data channel."));
1508        }
1509    }
1510
1511    exchange_done.store(true, Ordering::Release);
1512    let uuid = uuid_lock.read().unwrap().to_string();
1513    send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1514    Ok(client_channel)
1515}
1516
1517async fn ice_candidate_to_proto(ice_candidate: RTCIceCandidate) -> Result<IceCandidate> {
1518    let ice_candidate = ice_candidate.to_json()?;
1519    Ok(IceCandidate {
1520        candidate: ice_candidate.candidate,
1521        sdp_mid: ice_candidate.sdp_mid,
1522        sdpm_line_index: ice_candidate.sdp_mline_index.map(u32::from),
1523        username_fragment: ice_candidate.username_fragment,
1524    })
1525}
1526
1527fn ice_candidate_from_proto(proto: Option<IceCandidate>) -> Result<RTCIceCandidateInit> {
1528    match proto {
1529        Some(proto) => {
1530            let proto_sdpm: usize = proto.sdpm_line_index().try_into()?;
1531            let sdp_mline_index: Option<u16> = proto_sdpm.try_into().ok();
1532
1533            Ok(RTCIceCandidateInit {
1534                candidate: proto.candidate.clone(),
1535                sdp_mid: Some(proto.sdp_mid().to_string()),
1536                sdp_mline_index,
1537                username_fragment: Some(proto.username_fragment().to_string()),
1538            })
1539        }
1540        None => Err(anyhow::anyhow!("No ice candidate provided")),
1541    }
1542}
1543
1544fn decode_sdp(sdp: String) -> Result<RTCSessionDescription> {
1545    let sdp = String::from_utf8(base64::decode(sdp)?)?;
1546    Ok(serde_json::from_str::<RTCSessionDescription>(&sdp)?)
1547}
1548
1549fn encode_sdp(sdp: RTCSessionDescription) -> Result<String> {
1550    let sdp = serde_json::to_vec(&sdp)?;
1551    Ok(base64::encode(sdp))
1552}
1553
1554fn infer_remote_uri_from_authority(uri: Uri, override_addr: Option<&str>) -> Uri {
1555    if let Some(addr) = override_addr {
1556        return Uri::from_parts(uri_parts_with_defaults(addr)).unwrap_or_else(|e| {
1557            log::warn!("Failed to parse signaling server override {addr:?}: {e}; falling back to original URI");
1558            uri
1559        });
1560    }
1561    let authority = uri.authority().map(Authority::as_str).unwrap_or_default();
1562    let is_local_connection = authority.contains(".local.viam.cloud")
1563        || authority.contains("localhost")
1564        || authority.contains("0.0.0.0");
1565
1566    if !is_local_connection {
1567        if let Some((new_uri, _)) = Options::infer_signaling_server_address(&uri) {
1568            return Uri::from_parts(uri_parts_with_defaults(&new_uri)).unwrap_or(uri);
1569        }
1570    }
1571    uri
1572}
1573
1574fn duplicate_uri(parts: &Parts) -> Option<Parts> {
1575    let uri = Uri::builder()
1576        .authority(parts.authority.clone()?)
1577        .path_and_query(parts.path_and_query.clone()?)
1578        .scheme(parts.scheme.clone()?);
1579    Some(uri.build().ok()?.into_parts())
1580}
1581
1582fn uri_parts_with_defaults(uri: &str) -> Parts {
1583    let mut uri_parts = uri.parse::<Uri>().unwrap().into_parts();
1584    uri_parts.scheme = Some(Scheme::HTTPS);
1585    uri_parts.path_and_query = Some(PathAndQuery::from_static(""));
1586    uri_parts
1587}
1588
1589fn metadata_from_parts(parts: &http::request::Parts) -> Metadata {
1590    let mut md = HashMap::new();
1591    for (k, v) in parts.headers.iter() {
1592        let k = k.to_string();
1593        let v = Strings {
1594            values: vec![HeaderValue::to_str(v).unwrap().to_string()],
1595        };
1596        md.insert(k, v);
1597    }
1598    Metadata { md }
1599}