Skip to main content

viam_rust_utils/rpc/
dial.rs

1use super::{
2    client_channel::*,
3    log_prefixes,
4    webrtc::{webrtc_action_with_timeout, Options},
5};
6use crate::gen::google;
7use crate::gen::proto::rpc::v1::{
8    auth_service_client::AuthServiceClient, AuthenticateRequest, Credentials,
9};
10use crate::gen::proto::rpc::webrtc::v1::{
11    call_response::Stage, call_update_request::Update,
12    signaling_service_client::SignalingServiceClient, CallUpdateRequest,
13    OptionalWebRtcConfigRequest, OptionalWebRtcConfigResponse,
14};
15use crate::gen::proto::rpc::webrtc::v1::{
16    CallRequest, IceCandidate, Metadata, RequestHeaders, Strings,
17};
18use crate::rpc::webrtc;
19use ::http::header::HeaderName;
20use ::http::{
21    uri::{Authority, Parts, PathAndQuery, Scheme},
22    HeaderValue, Version,
23};
24use ::viam_mdns::{discover, RecordKind, Response};
25use ::webrtc::ice_transport::{
26    ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
27    ice_connection_state::RTCIceConnectionState,
28};
29use ::webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
30use anyhow::{Context, Result};
31use core::fmt;
32use futures::stream::FuturesUnordered;
33use futures_util::{pin_mut, stream::StreamExt};
34use local_ip_address::list_afinet_netifas;
35use std::{
36    collections::HashMap,
37    net::{IpAddr, Ipv4Addr},
38    sync::{
39        atomic::{AtomicBool, Ordering},
40        Arc, Mutex, RwLock,
41    },
42    task::{Context as TaskContext, Poll},
43    time::{Duration, Instant},
44};
45use tokio::sync::{mpsc, watch};
46use tonic::body::BoxBody;
47use tonic::codegen::BoxFuture;
48use tonic::transport::{Body, Channel, ClientTlsConfig, Uri};
49
50use tower::{Service, ServiceBuilder};
51use tower_http::auth::AddAuthorization;
52use tower_http::auth::AddAuthorizationLayer;
53use tower_http::set_header::{SetRequestHeader, SetRequestHeaderLayer};
54
55// gRPC status codes
56const STATUS_CODE_OK: i32 = 0;
57const STATUS_CODE_UNKNOWN: i32 = 2;
58const STATUS_CODE_RESOURCE_EXHAUSTED: i32 = 8;
59
60pub const VIAM_MDNS_SERVICE_NAME: &'static str = "_rpc._tcp.local";
61
62type SecretType = String;
63
64#[derive(Clone)]
65/// A communication channel to a given uri. The channel is either a direct tonic channel,
66/// or a webRTC channel.
67pub enum ViamChannel {
68    Direct(Channel),
69    DirectPreAuthorized(AddAuthorization<SetRequestHeader<Channel, HeaderValue>>),
70    WebRTC(Arc<WebRTCClientChannel>),
71}
72
73#[derive(Debug, Clone)]
74pub struct RPCCredentials {
75    entity: Option<String>,
76    credentials: Credentials,
77}
78
79impl RPCCredentials {
80    pub fn new(entity: Option<String>, r#type: SecretType, payload: String) -> Self {
81        Self {
82            credentials: Credentials { r#type, payload },
83            entity,
84        }
85    }
86}
87
88impl ViamChannel {
89    async fn create_resp(
90        channel: &mut Arc<WebRTCClientChannel>,
91        stream: crate::gen::proto::rpc::webrtc::v1::Stream,
92        request: http::Request<BoxBody>,
93        response: http::response::Builder,
94    ) -> http::Response<Body> {
95        let (parts, body) = request.into_parts();
96        let mut status_code = STATUS_CODE_OK;
97        let stream_id = stream.id;
98        let metadata = Some(metadata_from_parts(&parts));
99        let headers = RequestHeaders {
100            method: parts
101                .uri
102                .path_and_query()
103                .map(PathAndQuery::to_string)
104                .unwrap_or_default(),
105            metadata,
106            timeout: None,
107        };
108
109        if let Err(e) = channel.write_headers(&stream, headers).await {
110            log::error!("error writing headers: {e}");
111            channel.close_stream_with_recv_error(stream_id, e);
112            status_code = STATUS_CODE_UNKNOWN;
113        }
114
115        let data = hyper::body::to_bytes(body).await.unwrap().to_vec();
116        if let Err(e) = channel.write_message(Some(stream), data).await {
117            log::error!("error sending message: {e}");
118            channel.close_stream_with_recv_error(stream_id, e);
119            status_code = STATUS_CODE_UNKNOWN;
120        };
121
122        let body = match channel.resp_body_from_stream(stream_id) {
123            Ok(body) => body,
124            Err(e) => {
125                log::error!("error receiving response from stream: {e}");
126                channel.close_stream_with_recv_error(stream_id, e);
127                status_code = STATUS_CODE_UNKNOWN;
128                Body::empty()
129            }
130        };
131
132        let response = if status_code != STATUS_CODE_OK {
133            response.header("grpc-status", &status_code.to_string())
134        } else {
135            response
136        };
137
138        response.body(body).unwrap()
139    }
140}
141
142impl Service<http::Request<BoxBody>> for ViamChannel {
143    type Response = http::Response<Body>;
144    type Error = tonic::transport::Error;
145    type Future = BoxFuture<Self::Response, Self::Error>;
146
147    fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
148        match self {
149            Self::Direct(channel) => channel.poll_ready(cx),
150            Self::DirectPreAuthorized(channel) => channel.poll_ready(cx),
151            Self::WebRTC(_channel) => Poll::Ready(Ok(())),
152        }
153    }
154
155    fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
156        match self {
157            Self::Direct(channel) => Box::pin(channel.call(request)),
158            Self::DirectPreAuthorized(channel) => Box::pin(channel.call(request)),
159            Self::WebRTC(channel) => {
160                let mut channel = channel.clone();
161                let fut = async move {
162                    let response = http::response::Response::builder()
163                        // standardized gRPC headers.
164                        .header("content-type", "application/grpc")
165                        .version(Version::HTTP_2);
166
167                    match channel.new_stream() {
168                        Err(e) => {
169                            log::error!("{e}");
170                            let response = response
171                                .header("grpc-status", &STATUS_CODE_RESOURCE_EXHAUSTED.to_string())
172                                .body(Body::default())
173                                .unwrap();
174
175                            Ok(response)
176                        }
177                        Ok(stream) => {
178                            Ok(Self::create_resp(&mut channel, stream, request, response).await)
179                        }
180                    }
181                };
182                Box::pin(fut)
183            }
184        }
185    }
186}
187
188/// Options for modifying the connection parameters
189#[derive(Debug)]
190pub struct DialOptions {
191    credentials: Option<RPCCredentials>,
192    webrtc_options: Option<Options>,
193    uri: Option<Parts>,
194    disable_mdns: bool,
195    allow_downgrade: bool,
196    insecure: bool,
197    signaling_server_override: Option<String>,
198}
199#[derive(Clone)]
200pub struct WantsCredentials(());
201#[derive(Clone)]
202pub struct WantsUri(());
203#[derive(Clone)]
204pub struct WithCredentials(());
205#[derive(Clone)]
206pub struct WithoutCredentials(());
207
208pub trait AuthMethod {}
209impl AuthMethod for WithCredentials {}
210impl AuthMethod for WithoutCredentials {}
211/// A DialBuilder allows us to set options before establishing a connection to a server
212#[allow(dead_code)]
213pub struct DialBuilder<T> {
214    state: T,
215    config: DialOptions,
216}
217
218impl<T> fmt::Debug for DialBuilder<T> {
219    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220        f.debug_struct("Dial")
221            .field("State", &format_args!("{}", &std::any::type_name::<T>()))
222            .field("Opt", &format_args!("{:?}", self.config))
223            .finish()
224    }
225}
226
227impl DialOptions {
228    /// Creates a new DialBuilder
229    pub fn builder() -> DialBuilder<WantsUri> {
230        DialBuilder {
231            state: WantsUri(()),
232            config: DialOptions {
233                credentials: None,
234                uri: None,
235                allow_downgrade: false,
236                disable_mdns: false,
237                insecure: false,
238                webrtc_options: None,
239                signaling_server_override: None,
240            },
241        }
242    }
243}
244
245impl DialBuilder<WantsUri> {
246    /// Sets the uri to connect to
247    pub fn uri(self, uri: &str) -> DialBuilder<WantsCredentials> {
248        let uri_parts = uri_parts_with_defaults(uri);
249        DialBuilder {
250            state: WantsCredentials(()),
251            config: DialOptions {
252                credentials: None,
253                uri: Some(uri_parts),
254                allow_downgrade: false,
255                disable_mdns: false,
256                insecure: false,
257                webrtc_options: None,
258                signaling_server_override: None,
259            },
260        }
261    }
262}
263impl DialBuilder<WantsCredentials> {
264    /// Tells connecting logic to not expect/require credentials
265    pub fn without_credentials(self) -> DialBuilder<WithoutCredentials> {
266        DialBuilder {
267            state: WithoutCredentials(()),
268            config: DialOptions {
269                credentials: None,
270                uri: self.config.uri,
271                allow_downgrade: false,
272                disable_mdns: false,
273                insecure: false,
274                webrtc_options: None,
275                signaling_server_override: None,
276            },
277        }
278    }
279    /// Sets credentials to use when connecting
280    pub fn with_credentials(self, creds: RPCCredentials) -> DialBuilder<WithCredentials> {
281        DialBuilder {
282            state: WithCredentials(()),
283            config: DialOptions {
284                credentials: Some(creds),
285                uri: self.config.uri,
286                allow_downgrade: false,
287                disable_mdns: false,
288                insecure: false,
289                webrtc_options: None,
290                signaling_server_override: None,
291            },
292        }
293    }
294}
295
296impl<T: AuthMethod> DialBuilder<T> {
297    /// Attempts to connect insecurely with scheme of HTTP as a default
298    pub fn insecure(mut self) -> Self {
299        self.config.insecure = true;
300        self
301    }
302    /// Allows for downgrading and attempting to connect via HTTP if HTTPS fails
303    pub fn allow_downgrade(mut self) -> Self {
304        self.config.allow_downgrade = true;
305        self
306    }
307    /// Disables connection via mDNS
308    pub fn disable_mdns(mut self) -> Self {
309        self.config.disable_mdns = true;
310        self
311    }
312
313    /// Overrides any default connection behavior, forcing direct connection. Note that
314    /// the connection itself will fail if it is between a client and server on separate
315    /// networks and not over webRTC
316    pub fn disable_webrtc(mut self) -> Self {
317        let webrtc_options = Options::default().disable_webrtc();
318        self.config.webrtc_options = Some(webrtc_options);
319        self
320    }
321
322    /// Forces ICE transport policy to relay-only so only TURN candidates are used.
323    /// Useful for testing relay connectivity through a TURN server.
324    pub fn force_relay(mut self) -> Self {
325        self.config
326            .webrtc_options
327            .get_or_insert_with(Options::default)
328            .force_relay = true;
329        self
330    }
331
332    /// Strips TURN servers from the ICE config so only host and server-reflexive
333    /// candidates are used. Useful for testing direct connectivity without relay fallback.
334    pub fn force_p2p(mut self) -> Self {
335        self.config
336            .webrtc_options
337            .get_or_insert_with(Options::default)
338            .force_p2p = true;
339        self
340    }
341
342    /// Filters the signaling server's TURN list to only the server whose parsed URI
343    /// matches (compared by scheme, host, port, and transport — defaulting transport
344    /// to UDP if unspecified). Example: "turn:turn.viam.com:443"
345    pub fn turn_uri(mut self, uri: String) -> Self {
346        self.config
347            .webrtc_options
348            .get_or_insert_with(Options::default)
349            .turn_uri = Some(uri);
350        self
351    }
352
353    /// Overrides the signaling server address used for WebRTC negotiation.
354    pub fn signaling_server(mut self, address: String) -> Self {
355        self.config.signaling_server_override = Some(address);
356        self
357    }
358
359    async fn get_addr_from_interface(
360        iface: (&str, Vec<&IpAddr>),
361        candidates: &Vec<String>,
362        local_ipv4s: &std::collections::HashSet<Ipv4Addr>,
363    ) -> Option<String> {
364        let addresses: Vec<Ipv4Addr> = iface
365            .1
366            .iter()
367            .filter_map(|ip| match ip {
368                IpAddr::V4(v4) => Some(*v4),
369                IpAddr::V6(_) => None,
370            })
371            .collect();
372
373        let mut resp: Option<Response> = None;
374        for ipv4 in addresses {
375            for candidate in candidates {
376                let discovery = match discover::interface_with_loopback(
377                    VIAM_MDNS_SERVICE_NAME,
378                    Duration::from_millis(250),
379                    ipv4,
380                ) {
381                    Ok(d) => d,
382                    Err(e) => {
383                        log::debug!("mDNS socket error on {ipv4}: {e}");
384                        continue;
385                    }
386                };
387                let stream = discovery.listen();
388                pin_mut!(stream);
389                while let Some(Ok(response)) = stream.next().await {
390                    if let Some(hostname) = response.hostname() {
391                        // Machine uris come in local ("my-cool-robot.abcdefg.local.viam.cloud")
392                        // and non-local ("my-cool-robot.abcdefg.viam.cloud") forms. Sometimes
393                        // (namely with micro-rdk), our mdns query can only see one (the local) version.
394                        // However, users are typically passing the non-local version. By splitting at
395                        // "viam" and taking the only the first value, we can still search for
396                        // candidates based on the actual "my-cool-robot" name without being opinionated
397                        // on whether the candidate is locally named or not.
398                        let local_agnostic_candidate = candidate.as_str().split("viam").next()?;
399                        log::debug!(
400                            "mDNS response on {ipv4}: hostname={hostname:?}, candidate={candidate:?}, local_agnostic={local_agnostic_candidate:?}, matches={}",
401                            hostname.contains(local_agnostic_candidate)
402                        );
403                        if hostname.contains(local_agnostic_candidate) {
404                            resp = Some(response);
405                            break;
406                        }
407                    } else {
408                        log::debug!(
409                            "mDNS response on {ipv4}: no hostname (no PTR record); answers={:?}",
410                            response.answers
411                        );
412                    }
413                    if resp.is_some() {
414                        break;
415                    }
416                }
417            }
418        }
419
420        let resp = resp?;
421        let mut has_grpc = false;
422        let mut has_webrtc = false;
423        for field in resp.txt_records() {
424            has_grpc = has_grpc || field.contains("grpc");
425            has_webrtc = has_webrtc || field.contains("webrtc");
426        }
427
428        // Log all records in the response for diagnostics.
429        log::debug!(
430            "mDNS matched response records: {:?}",
431            resp.records().collect::<Vec<_>>()
432        );
433
434        // Select the best IP from the mDNS response using a three-tier preference:
435        //
436        // 1. Non-loopback IP that is currently assigned to one of our own network
437        //    interfaces.  This handles the same-machine case (client and robot on
438        //    the same host) while avoiding stale IPs that were valid when
439        //    viam-server started but are now unreachable (e.g. a WiFi address after
440        //    WiFi was disconnected).
441        //
442        // 2. Any IP (including loopback) that is currently assigned to one of our
443        //    interfaces.  This catches 127.0.0.1 when offline on the same machine:
444        //    127.0.0.1 is excluded from tier 1 by the !is_loopback() guard, but it
445        //    is always in local_ipv4s, so it is correctly preferred here over a
446        //    stale non-loopback address that is no longer assigned.
447        //
448        // 3. Last resort: any advertised IPv4, for the common case of connecting to
449        //    a robot on a separate machine (its IP will never appear in local_ipv4s).
450        let ip_addr = resp
451            .records()
452            .filter_map(|r| match r.kind {
453                RecordKind::A(addr) if !addr.is_loopback() && local_ipv4s.contains(&addr) => {
454                    Some(addr)
455                }
456                _ => None,
457            })
458            .next()
459            .or_else(|| {
460                resp.records()
461                    .find_map(|r| match r.kind {
462                        RecordKind::A(addr) if local_ipv4s.contains(&addr) => Some(addr),
463                        _ => None,
464                    })
465                    .or_else(|| {
466                        resp.records().find_map(|r| match r.kind {
467                            RecordKind::A(addr) => Some(addr),
468                            _ => None,
469                        })
470                    })
471            });
472
473        if !(has_grpc || has_webrtc) || ip_addr.is_none() {
474            return None;
475        }
476        let mut local_addr = ip_addr?.to_string();
477        local_addr.push(':');
478        local_addr.push_str(&resp.port()?.to_string());
479        log::debug!("mDNS resolved address: {local_addr}");
480        Some(local_addr)
481    }
482
483    fn duplicate_uri(&self) -> Option<Parts> {
484        match &self.config.uri {
485            None => None,
486            Some(uri) => duplicate_uri(uri),
487        }
488    }
489
490    async fn get_mdns_uri(&self) -> Option<Parts> {
491        log::debug!("{}", log_prefixes::MDNS_QUERY_ATTEMPT);
492        if self.config.disable_mdns {
493            return None;
494        }
495
496        let mut uri = self.duplicate_uri()?;
497        let candidate = uri.authority.clone()?.to_string();
498
499        let candidates: Vec<String> = vec![candidate.replace('.', "-"), candidate];
500
501        let ifaces = list_afinet_netifas().ok()?;
502
503        // Collect all local IPv4 addresses for use in get_addr_from_interface, which prefers
504        // mDNS response IPs that are currently assigned to one of our own interfaces.
505        // viam-server may advertise a stale IP (e.g. a WiFi address from when it started,
506        // now unreachable because WiFi is off); filtering by current interface addresses
507        // avoids connecting to an unreachable address.
508        let local_ipv4s: std::collections::HashSet<Ipv4Addr> = ifaces
509            .iter()
510            .filter_map(|(_, ip)| match ip {
511                IpAddr::V4(v4) => Some(*v4),
512                _ => None,
513            })
514            .collect();
515
516        let ifaces: HashMap<&str, Vec<&IpAddr>> =
517            ifaces.iter().fold(HashMap::new(), |mut map, (k, v)| {
518                map.entry(k).or_default().push(v);
519                map
520            });
521
522        let mut iface_futures = FuturesUnordered::new();
523        for iface in ifaces {
524            iface_futures.push(Self::get_addr_from_interface(
525                iface,
526                &candidates,
527                &local_ipv4s,
528            ));
529        }
530
531        let mut local_addr: Option<String> = None;
532        while let Some(maybe_addr) = iface_futures.next().await {
533            if maybe_addr.is_some() {
534                local_addr = maybe_addr;
535                break;
536            }
537        }
538        let local_addr = match local_addr {
539            None => {
540                log::debug!("Unable to connect via mDNS");
541                return None;
542            }
543            Some(addr) => {
544                log::debug!("{}: {addr}", log_prefixes::MDNS_ADDRESS_FOUND);
545                addr
546            }
547        };
548
549        let auth = local_addr.parse::<Authority>().ok()?;
550        uri.authority = Some(auth);
551
552        Some(uri)
553    }
554
555    async fn create_channel(
556        allow_downgrade: bool,
557        domain: &str,
558        uri: Uri,
559        for_mdns: bool,
560    ) -> Result<Channel> {
561        if for_mdns {
562            let host = uri.host().unwrap_or("");
563            // viam-server serves TLS gRPC on the port it advertises via mDNS, including on
564            // loopback.  `domain` is the robot's canonical hostname (e.g.
565            // `my-robot.abcdefg.viam.cloud`) used for SNI and SAN verification.
566            log::debug!("mDNS create_channel: connecting to {host} with TLS");
567            let tls_config = ClientTlsConfig::new().domain_name(domain);
568            let mut parts = uri.clone().into_parts();
569            parts.scheme = Some(Scheme::HTTPS);
570            let tls_uri = Uri::from_parts(parts)?;
571            match Channel::builder(tls_uri.clone())
572                .tls_config(tls_config)?
573                .connect()
574                .await
575                .with_context(|| format!("Connecting to {:?}", tls_uri))
576            {
577                Ok(channel) => return Ok(channel),
578                // When the caller allows insecure/downgrade, retry the local connection over plaintext
579                // instead of failing. A local viam-server with no TLS serves plain gRPC on the advertised
580                // port, so the TLS handshake above fails against it.
581                Err(e) => {
582                    if allow_downgrade {
583                        let mut parts = uri.into_parts();
584                        parts.scheme = Some(Scheme::HTTP);
585                        let uri = Uri::from_parts(parts)?;
586                        log::debug!(
587                            "mDNS TLS connect failed ({e:#}); downgrading to plaintext h2c {uri:?}"
588                        );
589                        return Channel::builder(uri.clone())
590                            .connect()
591                            .await
592                            .with_context(|| format!("Connecting to {:?}", uri));
593                    }
594                    return Err(e);
595                }
596            }
597        }
598
599        let chan = match Channel::builder(uri.clone())
600            .connect()
601            .await
602            .with_context(|| format!("Connecting to {:?}", uri.clone()))
603        {
604            Ok(c) => c,
605            Err(e) => {
606                if allow_downgrade {
607                    let mut uri_parts = uri.clone().into_parts();
608                    uri_parts.scheme = Some(Scheme::HTTP);
609                    let uri = Uri::from_parts(uri_parts)?;
610                    Channel::builder(uri).connect().await?
611                } else {
612                    return Err(anyhow::anyhow!(e));
613                }
614            }
615        };
616        Ok(chan)
617    }
618}
619
620impl DialBuilder<WithoutCredentials> {
621    fn clone(&self) -> Self {
622        DialBuilder {
623            state: WithoutCredentials(()),
624            config: DialOptions {
625                credentials: None,
626                webrtc_options: self.config.webrtc_options.clone(),
627                uri: self.duplicate_uri(),
628                disable_mdns: self.config.disable_mdns,
629                allow_downgrade: self.config.allow_downgrade,
630                insecure: self.config.insecure,
631                signaling_server_override: self.config.signaling_server_override.clone(),
632            },
633        }
634    }
635
636    /// attempts to establish a connection without credentials to the DialBuilder's given uri
637    async fn connect_inner(
638        self,
639        mdns_uri: Option<Parts>,
640        mut original_uri_parts: Parts,
641    ) -> Result<ViamChannel> {
642        let webrtc_options = self.config.webrtc_options;
643        let disable_webrtc = match &webrtc_options {
644            Some(options) => options.disable_webrtc,
645            None => false,
646        };
647        if self.config.insecure {
648            original_uri_parts.scheme = Some(Scheme::HTTP);
649        }
650        let original_uri = Uri::from_parts(original_uri_parts)?;
651        let uri2 = original_uri.clone();
652        let uri = infer_remote_uri_from_authority(
653            original_uri,
654            self.config.signaling_server_override.as_deref(),
655        );
656        let domain = uri2.authority().to_owned().unwrap().as_str();
657
658        let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
659        let attempting_mdns = mdns_uri.is_some();
660        if attempting_mdns {
661            log::debug!("Attempting to connect via mDNS");
662        } else {
663            log::debug!("Attempting to connect");
664        }
665
666        let channel = match mdns_uri {
667            Some(uri) => Self::create_channel(self.config.allow_downgrade, domain, uri, true).await,
668            // not actually an error necessarily, but we want to ensure that a channel is still
669            // created with the default uri
670            None => Err(anyhow::anyhow!("")),
671        };
672
673        let channel = match channel {
674            Ok(c) => {
675                log::debug!("Connected via mDNS");
676                c
677            }
678            Err(e) => {
679                if attempting_mdns {
680                    // mDNS found the robot but the connection failed — don't fall through to
681                    // the remote/signaling URI here.  The parallel `without_mdns` branch in
682                    // `connect()` is already handling that case.  Returning an error lets the
683                    // select! loop surface whichever branch succeeds, and avoids a spurious
684                    // connection attempt to app.viam.com when the device is offline.
685                    log::debug!("Unable to connect via mDNS. Error: {e:#}");
686                    return Err(e);
687                }
688                Self::create_channel(self.config.allow_downgrade, domain, uri.clone(), false)
689                    .await?
690            }
691        };
692
693        // TODO (RSDK-517) make maybe_connect_via_webrtc take a more generic type so we don't
694        // need to add these dummy layers.
695        let intercepted_channel = ServiceBuilder::new()
696            .layer(AddAuthorizationLayer::basic(
697                "fake username",
698                "fake password",
699            ))
700            .layer(SetRequestHeaderLayer::overriding(
701                HeaderName::from_static("rpc-host"),
702                HeaderValue::from_str(domain)?,
703            ))
704            .service(channel.clone());
705
706        // TODO (RSDK-14026): support WebRTC over mDNS for offline connections (e.g. video streaming).
707        if disable_webrtc || attempting_mdns {
708            log::debug!("{}", log_prefixes::DIALED_GRPC);
709            Ok(ViamChannel::Direct(channel.clone()))
710        } else {
711            match maybe_connect_via_webrtc(uri, intercepted_channel.clone(), webrtc_options).await {
712                Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
713                Err(e) => {
714                    log::error!("error connecting via webrtc: {e}. Attempting to connect directly");
715                    log::debug!("{}", log_prefixes::DIALED_GRPC);
716                    Ok(ViamChannel::Direct(channel.clone()))
717                }
718            }
719        }
720    }
721
722    async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
723        let mdns_uri =
724            webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
725                .await
726                .ok()
727                .flatten()
728                .ok_or(anyhow::anyhow!(
729                    "Unable to establish connection via mDNS; uri not found"
730                ))?;
731
732        self.connect_inner(Some(mdns_uri), original_uri).await
733    }
734
735    pub async fn connect(self) -> Result<ViamChannel> {
736        log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
737        let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
738            "Attempting to connect but there was no uri"
739        ))?;
740        let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
741            "Attempting to connect but there was no uri"
742        ))?;
743
744        let skip_mdns = self.config.disable_mdns;
745
746        // We want to short circuit and return the first `Ok` result from our connection
747        // attempts, which `tokio::select!` does great. Buuuuut, we don't want to
748        // abandon the `Err` results, and we want to provide comprehensive logging for
749        // debugging purposes. Hence the loop and pinning. The pinning lets us reference
750        // the same future multiple times, while the loop lets us immediately return on the
751        // first `Ok` result while still seeing and logging any error results.
752        //
753        // When mDNS is skipped (disable_mdns), with_mdns_err is pre-set so
754        // the select guard disables that branch and only the direct connection is attempted.
755        tokio::pin! {
756            let with_mdns = self.clone().connect_mdns(original_uri);
757            let without_mdns = self.connect_inner(None, original_uri2);
758        }
759        let mut with_mdns_err: Option<anyhow::Error> =
760            skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
761        let mut without_mdns_err: Option<anyhow::Error> = None;
762        while with_mdns_err.is_none() || without_mdns_err.is_none() {
763            tokio::select! {
764                with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
765                    match with_mdns {
766                        Ok(chan) => return Ok(chan),
767                        Err(e) => {
768                            log::debug!("Error connecting with mdns: {e}");
769                            with_mdns_err = Some(e);
770                        }
771                    }
772                }
773                without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
774                    match without_mdns {
775                        Ok(chan) => return Ok(chan),
776                        Err(e) => {
777                            log::debug!("Error connecting without mdns: {e}");
778                            without_mdns_err = Some(e);
779                        }
780                    }
781                }
782            }
783        }
784        Err(anyhow::anyhow!(
785            "Unable to connect with or without mdns.
786                    with_mdns err: {with_mdns_err:?}
787                    without_mdns err: {without_mdns_err:?}"
788        ))
789    }
790}
791
792async fn get_auth_token(
793    channel: &mut Channel,
794    creds: Credentials,
795    entity: String,
796) -> Result<String> {
797    let mut auth_service = AuthServiceClient::new(channel);
798    let req = AuthenticateRequest {
799        entity,
800        credentials: Some(creds),
801    };
802
803    let rsp = auth_service.authenticate(req).await?;
804    Ok(rsp.into_inner().access_token)
805}
806
807impl DialBuilder<WithCredentials> {
808    fn clone(&self) -> Self {
809        DialBuilder {
810            state: WithCredentials(()),
811            config: DialOptions {
812                credentials: self.config.credentials.clone(),
813                webrtc_options: self.config.webrtc_options.clone(),
814                uri: self.duplicate_uri(),
815                disable_mdns: self.config.disable_mdns,
816                allow_downgrade: self.config.allow_downgrade,
817                insecure: self.config.insecure,
818                signaling_server_override: self.config.signaling_server_override.clone(),
819            },
820        }
821    }
822
823    async fn connect_inner(
824        self,
825        mdns_uri: Option<Parts>,
826        mut original_uri_parts: Parts,
827    ) -> Result<ViamChannel> {
828        let is_insecure = self.config.insecure;
829
830        let webrtc_options = self.config.webrtc_options;
831        let disable_webrtc = match &webrtc_options {
832            Some(options) => options.disable_webrtc,
833            None => false,
834        };
835
836        if is_insecure {
837            original_uri_parts.scheme = Some(Scheme::HTTP);
838        }
839
840        let original_uri = Uri::from_parts(original_uri_parts)?;
841
842        let domain = original_uri.authority().unwrap().to_string();
843        let uri_for_auth = infer_remote_uri_from_authority(
844            original_uri.clone(),
845            self.config.signaling_server_override.as_deref(),
846        );
847
848        let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
849        let attempting_mdns = mdns_uri.is_some();
850
851        let allow_downgrade = self.config.allow_downgrade;
852        if attempting_mdns {
853            log::debug!("Attempting to connect via mDNS");
854        } else {
855            log::debug!("Attempting to connect");
856        }
857        let channel = match mdns_uri {
858            Some(uri) => Self::create_channel(allow_downgrade, &domain, uri, true).await,
859            // not actually an error necessarily, but we want to ensure that a channel is still
860            // created with the default uri
861            None => Err(anyhow::anyhow!("")),
862        };
863        let real_channel = match channel {
864            Ok(c) => {
865                log::debug!("Connected via mDNS");
866                c
867            }
868            Err(e) => {
869                if attempting_mdns {
870                    // mDNS found the robot but the connection failed — don't fall through to
871                    // the remote/signaling URI here.  The parallel `without_mdns` branch in
872                    // `connect()` is already handling that case.  Returning an error lets the
873                    // select! loop surface whichever branch succeeds, and avoids a spurious
874                    // auth attempt against app.viam.com when the device is offline.
875                    log::debug!("Unable to connect via mDNS. Error: {e:#}");
876                    return Err(e);
877                }
878                Self::create_channel(allow_downgrade, &domain, uri_for_auth, false).await?
879            }
880        };
881
882        log::debug!("{}", log_prefixes::ACQUIRING_AUTH_TOKEN);
883        let token = get_auth_token(
884            &mut real_channel.clone(),
885            self.config
886                .credentials
887                .as_ref()
888                .unwrap()
889                .credentials
890                .clone(),
891            self.config
892                .credentials
893                .unwrap()
894                .entity
895                .unwrap_or_else(|| domain.clone()),
896        )
897        .await?;
898        log::debug!("{}", log_prefixes::ACQUIRED_AUTH_TOKEN);
899
900        let channel = ServiceBuilder::new()
901            .layer(AddAuthorizationLayer::bearer(&token))
902            .layer(SetRequestHeaderLayer::overriding(
903                HeaderName::from_static("rpc-host"),
904                HeaderValue::from_str(domain.as_str())?,
905            ))
906            .service(real_channel);
907
908        // TODO (RSDK-14026): support WebRTC over mDNS for offline connections (e.g. video streaming).
909        if disable_webrtc || attempting_mdns {
910            log::debug!("Connected via gRPC");
911            Ok(ViamChannel::DirectPreAuthorized(channel))
912        } else {
913            match maybe_connect_via_webrtc(original_uri, channel.clone(), webrtc_options).await {
914                Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
915                Err(e) => {
916                    log::error!(
917                    "Unable to establish webrtc connection due to error: [{e}]. Attempting direct connection."
918                );
919                    log::debug!("Connected via gRPC");
920                    Ok(ViamChannel::DirectPreAuthorized(channel))
921                }
922            }
923        }
924    }
925
926    async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
927        // NOTE(benjirewis): Use a duration of 1500ms for getting the mDNS URI. I've anecdotally
928        // seen times as great as 922ms to fetch a non-loopback mDNS URI. With an
929        // interface_with_loopback query interval of 250ms, 1500ms here should give us time for ~6
930        // queries.
931        let mdns_uri =
932            webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
933                .await
934                .ok()
935                .flatten()
936                .ok_or(anyhow::anyhow!(
937                    "Unable to establish connection via mDNS; uri not found"
938                ))?;
939
940        self.connect_inner(Some(mdns_uri), original_uri).await
941    }
942
943    /// attempts to establish a connection with credentials to the DialBuilder's given uri
944    pub async fn connect(self) -> Result<ViamChannel> {
945        log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
946        let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
947            "Attempting to connect but there was no uri"
948        ))?;
949        let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
950            "Attempting to connect but there was no uri"
951        ))?;
952
953        let skip_mdns = self.config.disable_mdns;
954
955        // We want to short circuit and return the first `Ok` result from our connection
956        // attempts, which `tokio::select!` does great. Buuuuut, we don't want to
957        // abandon the `Err` results, and we want to provide comprehensive logging for
958        // debugging purposes. Hence the loop and pinning. The pinning lets us reference
959        // the same future multiple times, while the loop lets us immediately return on the
960        // first `Ok` result while still seeing and logging any error results.
961        //
962        // When mDNS is skipped (disable_mdns), with_mdns_err is pre-set so
963        // the select guard disables that branch and only the direct connection is attempted.
964        tokio::pin! {
965            let with_mdns = self.clone().connect_mdns(original_uri);
966            let without_mdns = self.connect_inner(None, original_uri2);
967        }
968        let mut with_mdns_err: Option<anyhow::Error> =
969            skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
970        let mut without_mdns_err: Option<anyhow::Error> = None;
971        while with_mdns_err.is_none() || without_mdns_err.is_none() {
972            tokio::select! {
973                with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
974                    match with_mdns {
975                        Ok(chan) => return Ok(chan),
976                        Err(e) => {
977                            log::debug!("Error connecting with mdns: {e}");
978                            with_mdns_err = Some(e);
979                        }
980                    }
981                }
982                without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
983                    match without_mdns {
984                        Ok(chan) => return Ok(chan),
985                        Err(e) => {
986                            log::debug!("Error connecting without mdns: {e}");
987                            without_mdns_err = Some(e);
988                        }
989                    }
990                }
991            }
992        }
993        Err(anyhow::anyhow!(
994            "Unable to connect with or without mdns.
995                    with_mdns err: {with_mdns_err:?}
996                    without_mdns err: {without_mdns_err:?}"
997        ))
998    }
999}
1000
1001async fn send_done_or_error_update(
1002    update: CallUpdateRequest,
1003    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1004) {
1005    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1006
1007    if let Err(e) = signaling_client
1008        .call_update(update)
1009        .await
1010        .map_err(anyhow::Error::from)
1011        .map(|_| ())
1012    {
1013        log::error!("Error sending done or error update: {e}")
1014    }
1015}
1016
1017async fn send_error_once(
1018    sent_error: Arc<AtomicBool>,
1019    uuid: &String,
1020    err: &anyhow::Error,
1021    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1022) {
1023    if sent_error.load(Ordering::Acquire) {
1024        return;
1025    }
1026
1027    let err = google::rpc::Status {
1028        code: google::rpc::Code::Unknown.into(),
1029        message: err.to_string(),
1030        details: Vec::new(),
1031    };
1032    sent_error.store(true, Ordering::Release);
1033    let update_request = CallUpdateRequest {
1034        uuid: uuid.to_string(),
1035        update: Some(Update::Error(err)),
1036    };
1037
1038    send_done_or_error_update(update_request, channel).await
1039}
1040
1041async fn send_done_once(
1042    sent_done: Arc<AtomicBool>,
1043    uuid: &String,
1044    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1045) {
1046    if sent_done.load(Ordering::Acquire) {
1047        return;
1048    }
1049    sent_done.store(true, Ordering::Release);
1050    let update_request = CallUpdateRequest {
1051        uuid: uuid.to_string(),
1052        update: Some(Update::Done(true)),
1053    };
1054
1055    send_done_or_error_update(update_request, channel).await
1056}
1057
1058#[derive(Default)]
1059struct CallerUpdateStats {
1060    count: u128,
1061    total_duration: Duration,
1062    max_duration: Duration,
1063}
1064
1065impl fmt::Display for CallerUpdateStats {
1066    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1067        let average_duration = &self.total_duration.as_millis() / &self.count;
1068        writeln!(
1069            f,
1070            "Caller update statistics: num_updates: {}, average_duration: {}ms, max_duration: {}ms",
1071            &self.count,
1072            average_duration,
1073            &self.max_duration.as_millis()
1074        )?;
1075        Ok(())
1076    }
1077}
1078
1079async fn maybe_connect_via_webrtc(
1080    uri: Uri,
1081    channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1082    webrtc_options: Option<Options>,
1083) -> Result<Arc<WebRTCClientChannel>> {
1084    let webrtc_options = webrtc_options.unwrap_or_else(|| Options::infer_from_uri(uri.clone()));
1085    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1086    let response = match signaling_client
1087        .optional_web_rtc_config(OptionalWebRtcConfigRequest::default())
1088        .await
1089    {
1090        Ok(resp) => resp,
1091        Err(e) => {
1092            if e.code() == tonic::Code::Unimplemented {
1093                tonic::Response::new(OptionalWebRtcConfigResponse::default())
1094            } else {
1095                return Err(anyhow::anyhow!(e));
1096            }
1097        }
1098    };
1099
1100    let optional_config = response.into_inner().config;
1101
1102    if webrtc_options.force_relay && webrtc_options.force_p2p {
1103        log::warn!(
1104            "force_relay and force_p2p are both set; forceP2P strips TURN servers that forceRelay requires so the connection will fail");
1105    }
1106
1107    let (base_config, optional_config) = webrtc::apply_ice_policy(
1108        webrtc_options.config,
1109        optional_config,
1110        webrtc_options.force_relay,
1111        webrtc_options.force_p2p,
1112    );
1113
1114    if webrtc_options.force_relay {
1115        log::debug!("force relay enabled; using relay-only ICE transport policy");
1116    }
1117
1118    if webrtc_options.force_p2p {
1119        log::debug!(
1120            "force P2P enabled; stripping TURN servers and ignoring signaling server ICE config"
1121        );
1122    }
1123
1124    let mut config = webrtc::extend_webrtc_config(base_config, optional_config);
1125
1126    if webrtc_options.force_p2p && webrtc_options.turn_uri.is_some() {
1127        log::warn!("force_p2p is set alongside turn_uri; the TURN filter will have no effect since TURN servers were already stripped");
1128    }
1129    let turn_uri = webrtc_options.turn_uri.as_deref().and_then(|s| {
1130        let parsed = webrtc::TurnUri::parse(s);
1131        if parsed.is_none() {
1132            log::warn!("Failed to parse turn_uri, ignoring: {s:?}");
1133        }
1134        parsed
1135    });
1136    config = webrtc::apply_turn_options(config, turn_uri.as_ref());
1137    if let Some(ref uri) = turn_uri {
1138        log::debug!("TURN filter options set: turn_uri={uri:?}");
1139    }
1140
1141    let (peer_connection, data_channel) =
1142        webrtc::new_peer_connection_for_client(config, webrtc_options.disable_trickle_ice).await?;
1143
1144    let sent_done_or_error = Arc::new(AtomicBool::new(false));
1145    let uuid_lock = Arc::new(RwLock::new("".to_string()));
1146    let uuid_for_ice_gathering_thread = uuid_lock.clone();
1147
1148    // Using an mpsc channel to report unrecoverable errors during Signaling, so we
1149    // don't have to wait until the timeout expires before giving up on this attempt.
1150    // The size of the channel is set to 1 since any error (or success) should terminate the function
1151    let (is_open_s, mut is_open_r) = mpsc::channel(1);
1152    let on_open_is_open = is_open_s.clone();
1153
1154    data_channel.on_open(Box::new(move || {
1155        let _ = on_open_is_open.try_send(None); // ignore sending errors, either an error (or success) was already sent or the operation will succeed
1156        Box::pin(async move {})
1157    }));
1158
1159    let exchange_done = Arc::new(AtomicBool::new(false));
1160    let (remote_description_set_s, remote_description_set_r) = watch::channel(None);
1161    let ice_done = Arc::new(tokio::sync::Notify::new());
1162    let ice_done2 = ice_done.clone();
1163    let caller_update_stats = Arc::new(Mutex::new(CallerUpdateStats::default()));
1164
1165    if !webrtc_options.disable_trickle_ice {
1166        let offer = peer_connection.create_offer(None).await?;
1167        let channel2 = channel.clone();
1168        let uuid_lock2 = uuid_lock.clone();
1169        let sent_done_or_error2 = sent_done_or_error.clone();
1170
1171        let exchange_done = exchange_done.clone();
1172
1173        let on_local_ice_candidate_failure = is_open_s.clone();
1174
1175        let caller_update_stats = caller_update_stats.clone();
1176        let caller_update_stats2 = caller_update_stats.clone();
1177        peer_connection.on_ice_connection_state_change(Box::new(
1178            move |state: RTCIceConnectionState| {
1179                let caller_update_stats = caller_update_stats.clone();
1180                Box::pin(async move {
1181                    if state == RTCIceConnectionState::Completed {
1182                        let caller_update_stats_inner = caller_update_stats.lock().unwrap();
1183                        log::debug!("{}", caller_update_stats_inner);
1184                    }
1185                })
1186            },
1187        ));
1188        peer_connection.on_ice_candidate(Box::new(
1189            move |ice_candidate: Option<RTCIceCandidate>| {
1190                if exchange_done.load(Ordering::Acquire) {
1191                    return Box::pin(async move {});
1192                }
1193                let channel = channel2.clone();
1194                let sent_done_or_error = sent_done_or_error2.clone();
1195                let ice_done = ice_done.clone();
1196                let uuid_lock = uuid_lock2.clone();
1197                let on_local_ice_candidate_failure = on_local_ice_candidate_failure.clone();
1198                let mut remote_description_set_r = remote_description_set_r.clone();
1199                let caller_update_stats = caller_update_stats2.clone();
1200                Box::pin(async move {
1201                    // If the value in the watch channel has not been set yet, we wait until it does.
1202                    // Afterwards Some(()) should be visible to all watcher and any watcher waiting  will
1203                    // return
1204                    if remote_description_set_r.borrow().is_none() {
1205                        match webrtc_action_with_timeout(remote_description_set_r.changed()).await {
1206                            Ok(Err(e)) => {
1207                                let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1208                                    anyhow::anyhow!(
1209                                        "remote description watch channel is closed with error {e}"
1210                                    ),
1211                                )));
1212                            }
1213                            Err(_) => {
1214                                log::info!(
1215                                    "timed out on_ice_candidate; remote description was never set"
1216                                );
1217                                let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1218                                    anyhow::anyhow!("timed out waiting for remote description"),
1219                                )));
1220                            }
1221                            _ => (),
1222                        }
1223                    }
1224
1225                    let uuid = uuid_lock.read().unwrap().to_string();
1226                    // Note(ethan): for reasons that aren't entirely clear to me, parallel dialing
1227                    // occasionally causes us to not receive a signaling client response when
1228                    // trying to establish a connection. This results in noisy error messages that
1229                    // fortunately are harmless (this problem seems to only ever affect one branch
1230                    // of the parallel dial, so we still end up with a successful connection).
1231                    // By checking if the `uuid` is empty, we can tell if we're in such a case and
1232                    // exit out before it results in logging noisy error messages.
1233                    //
1234                    // It would be lovely to understand this problem better, but given that it's
1235                    // not actually causing performance failures it's probably not worth the effort
1236                    // at this time.
1237                    if uuid.is_empty() {
1238                        log::debug!(
1239                            "UUID never updated. This is likely because we never received a response \
1240                            from the signaling client. This happens occasionally with parallel dialing \
1241                            and isn't concerning provided connection still occurs."
1242                        );
1243                        return;
1244                    }
1245                    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1246                    match ice_candidate {
1247                        Some(ice_candidate) => {
1248                            log::debug!("Gathered local candidate of {ice_candidate}");
1249                            if sent_done_or_error.load(Ordering::Acquire) {
1250                                return;
1251                            }
1252                            let proto_candidate = ice_candidate_to_proto(ice_candidate).await;
1253                            match proto_candidate {
1254                                Ok(proto_candidate) => {
1255                                    let update_request = CallUpdateRequest {
1256                                        uuid: uuid.clone(),
1257                                        update: Some(Update::Candidate(proto_candidate)),
1258                                    };
1259                                    let call_update_start = Instant::now();
1260                                    if let Err(e) = webrtc_action_with_timeout(
1261                                        signaling_client.call_update(update_request),
1262                                    )
1263                                    .await
1264                                    .and_then(|resp| resp.map_err(anyhow::Error::from))
1265                                    {
1266                                        log::error!("Error sending ice candidate: {e}");
1267                                        let _ = on_local_ice_candidate_failure.try_send(Some(
1268                                            Box::new(anyhow::anyhow!(
1269                                                "Error sending ice candidate: {e}"
1270                                            )),
1271                                        ));
1272                                    }
1273                                    let mut caller_update_stats_inner =
1274                                        caller_update_stats.lock().unwrap();
1275                                    caller_update_stats_inner.count += 1;
1276                                    let call_update_duration = call_update_start.elapsed();
1277                                    if call_update_duration > caller_update_stats_inner.max_duration
1278                                    {
1279                                        caller_update_stats_inner.max_duration =
1280                                            call_update_duration;
1281                                    }
1282                                    caller_update_stats_inner.total_duration +=
1283                                        call_update_duration;
1284                                }
1285                                Err(e) => log::error!("Error parsing ice candidate: {e}"),
1286                            }
1287                        }
1288                        None => {
1289                            // will only be executed once when gathering is finished
1290                            ice_done.notify_one();
1291                            send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1292                        }
1293                    }
1294                })
1295            },
1296        ));
1297
1298        peer_connection.set_local_description(offer).await?;
1299    }
1300
1301    let local_description = peer_connection.local_description().await.unwrap();
1302
1303    // Local SD will be multi-line, so use two log messages to indicate start, SD and end.
1304    log::debug!(
1305        "{}\n{}",
1306        log_prefixes::START_LOCAL_SESSION_DESCRIPTION,
1307        local_description.sdp
1308    );
1309    log::debug!("{}", log_prefixes::END_LOCAL_SESSION_DESCRIPTION);
1310
1311    let sdp = encode_sdp(local_description)?;
1312    let call_request = CallRequest {
1313        sdp,
1314        disable_trickle: webrtc_options.disable_trickle_ice,
1315    };
1316
1317    let client_channel = WebRTCClientChannel::new(peer_connection, data_channel).await;
1318    let client_channel_for_ice_gathering_thread = Arc::downgrade(&client_channel);
1319    let mut signaling_client = SignalingServiceClient::new(channel.clone());
1320    let mut call_client = signaling_client.call(call_request).await?.into_inner();
1321
1322    let channel2 = channel.clone();
1323    let sent_done_or_error2 = sent_done_or_error.clone();
1324    tokio::spawn(async move {
1325        let uuid = uuid_for_ice_gathering_thread;
1326        let client_channel = client_channel_for_ice_gathering_thread;
1327        let init_received = AtomicBool::new(false);
1328        let sent_done = sent_done_or_error2;
1329
1330        loop {
1331            let response = match webrtc_action_with_timeout(call_client.message())
1332                .await
1333                .and_then(|resp| resp.map_err(anyhow::Error::from))
1334            {
1335                Ok(cr) => match cr {
1336                    Some(cr) => cr,
1337                    None => {
1338                        // want to delay sending done until we either are actually done, or
1339                        // we hit a timeout
1340                        let _ = webrtc_action_with_timeout(ice_done2.notified()).await;
1341                        let uuid = uuid.read().unwrap().to_string();
1342                        send_done_once(sent_done.clone(), &uuid, channel2.clone()).await;
1343                        break;
1344                    }
1345                },
1346                Err(e) => {
1347                    log::error!("Error processing call response: {e}");
1348                    let _ = is_open_s.try_send(Some(Box::new(e)));
1349                    break;
1350                }
1351            };
1352
1353            match response.stage {
1354                Some(Stage::Init(init)) => {
1355                    if init_received.load(Ordering::Acquire) {
1356                        let uuid = uuid.read().unwrap().to_string();
1357                        let e = anyhow::anyhow!("Init received more than once");
1358                        send_error_once(sent_done.clone(), &uuid, &e, channel2.clone()).await;
1359                        let _ = is_open_s.try_send(Some(Box::new(e)));
1360                        break;
1361                    }
1362                    init_received.store(true, Ordering::Release);
1363                    {
1364                        let mut uuid_s = uuid.write().unwrap();
1365                        uuid_s.clone_from(&response.uuid);
1366                    }
1367
1368                    let answer = match decode_sdp(init.sdp) {
1369                        Ok(a) => a,
1370                        Err(e) => {
1371                            send_error_once(
1372                                sent_done.clone(),
1373                                &response.uuid,
1374                                &e,
1375                                channel2.clone(),
1376                            )
1377                            .await;
1378                            let _ = is_open_s.try_send(Some(Box::new(e)));
1379                            break;
1380                        }
1381                    };
1382                    {
1383                        let cc = match client_channel.upgrade() {
1384                            Some(cc) => cc,
1385                            None => {
1386                                break;
1387                            }
1388                        };
1389                        if let Err(e) = cc
1390                            .base_channel
1391                            .peer_connection
1392                            .set_remote_description(answer)
1393                            .await
1394                        {
1395                            let e = anyhow::Error::from(e);
1396                            send_error_once(
1397                                sent_done.clone(),
1398                                &response.uuid,
1399                                &e,
1400                                channel2.clone(),
1401                            )
1402                            .await;
1403                            let _ = is_open_s.try_send(Some(Box::new(e)));
1404                            break;
1405                        }
1406                    }
1407                    let _ = remote_description_set_s.send_replace(Some(()));
1408                    if webrtc_options.disable_trickle_ice {
1409                        send_done_once(sent_done.clone(), &response.uuid, channel2.clone()).await;
1410                        break;
1411                    }
1412                }
1413
1414                Some(Stage::Update(update)) => {
1415                    let uuid_s = uuid.read().unwrap().to_string();
1416                    if !init_received.load(Ordering::Acquire) {
1417                        let e = anyhow::anyhow!("Got update before init stage");
1418                        send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1419                        let _ = is_open_s.try_send(Some(Box::new(e)));
1420                        break;
1421                    }
1422
1423                    if response.uuid != *uuid.read().unwrap() {
1424                        let e = anyhow::anyhow!(
1425                            "uuid mismatch: have {}, want {}",
1426                            response.uuid,
1427                            uuid_s,
1428                        );
1429                        send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1430                        let _ = is_open_s.try_send(Some(Box::new(e)));
1431                        break;
1432                    }
1433                    match ice_candidate_from_proto(update.candidate) {
1434                        Ok(candidate) => {
1435                            let client_channel = match client_channel.upgrade() {
1436                                Some(cc) => cc,
1437                                None => {
1438                                    break;
1439                                }
1440                            };
1441                            log::debug!("Received remote ICE candidate of {candidate:#?}");
1442                            if let Err(e) = client_channel
1443                                .base_channel
1444                                .peer_connection
1445                                .add_ice_candidate(candidate)
1446                                .await
1447                            {
1448                                let e = anyhow::Error::from(e);
1449                                send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone())
1450                                    .await;
1451                                let _ = is_open_s.try_send(Some(Box::new(e)));
1452                                break;
1453                            }
1454                        }
1455                        Err(e) => log::error!("Error parsing ice candidate: {e}"),
1456                    }
1457                }
1458                None => continue,
1459            }
1460        }
1461    });
1462
1463    // TODO (GOUT-11): create separate authorization if external_auth_addr and/or creds.Type is `Some`
1464
1465    // Delay returning the client channel until data channel is open, so we don't lose messages
1466    let is_open = webrtc_action_with_timeout(is_open_r.recv()).await;
1467    match is_open {
1468        Ok(is_open) => {
1469            if let Some(Some(e)) = is_open {
1470                return Err(anyhow::anyhow!("Couldn't connect to peer with error {e}"));
1471            }
1472        }
1473        Err(_) => {
1474            return Err(anyhow::anyhow!("Timed out opening data channel."));
1475        }
1476    }
1477
1478    exchange_done.store(true, Ordering::Release);
1479    let uuid = uuid_lock.read().unwrap().to_string();
1480    send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1481    Ok(client_channel)
1482}
1483
1484async fn ice_candidate_to_proto(ice_candidate: RTCIceCandidate) -> Result<IceCandidate> {
1485    let ice_candidate = ice_candidate.to_json()?;
1486    Ok(IceCandidate {
1487        candidate: ice_candidate.candidate,
1488        sdp_mid: ice_candidate.sdp_mid,
1489        sdpm_line_index: ice_candidate.sdp_mline_index.map(u32::from),
1490        username_fragment: ice_candidate.username_fragment,
1491    })
1492}
1493
1494fn ice_candidate_from_proto(proto: Option<IceCandidate>) -> Result<RTCIceCandidateInit> {
1495    match proto {
1496        Some(proto) => {
1497            let proto_sdpm: usize = proto.sdpm_line_index().try_into()?;
1498            let sdp_mline_index: Option<u16> = proto_sdpm.try_into().ok();
1499
1500            Ok(RTCIceCandidateInit {
1501                candidate: proto.candidate.clone(),
1502                sdp_mid: Some(proto.sdp_mid().to_string()),
1503                sdp_mline_index,
1504                username_fragment: Some(proto.username_fragment().to_string()),
1505            })
1506        }
1507        None => Err(anyhow::anyhow!("No ice candidate provided")),
1508    }
1509}
1510
1511fn decode_sdp(sdp: String) -> Result<RTCSessionDescription> {
1512    let sdp = String::from_utf8(base64::decode(sdp)?)?;
1513    Ok(serde_json::from_str::<RTCSessionDescription>(&sdp)?)
1514}
1515
1516fn encode_sdp(sdp: RTCSessionDescription) -> Result<String> {
1517    let sdp = serde_json::to_vec(&sdp)?;
1518    Ok(base64::encode(sdp))
1519}
1520
1521fn infer_remote_uri_from_authority(uri: Uri, override_addr: Option<&str>) -> Uri {
1522    if let Some(addr) = override_addr {
1523        return Uri::from_parts(uri_parts_with_defaults(addr)).unwrap_or_else(|e| {
1524            log::warn!("Failed to parse signaling server override {addr:?}: {e}; falling back to original URI");
1525            uri
1526        });
1527    }
1528    let authority = uri.authority().map(Authority::as_str).unwrap_or_default();
1529    let is_local_connection = authority.contains(".local.viam.cloud")
1530        || authority.contains("localhost")
1531        || authority.contains("0.0.0.0")
1532        || authority.contains("127.0.0.1");
1533
1534    if !is_local_connection {
1535        if let Some((new_uri, _)) = Options::infer_signaling_server_address(&uri) {
1536            return Uri::from_parts(uri_parts_with_defaults(&new_uri)).unwrap_or(uri);
1537        }
1538    }
1539    uri
1540}
1541
1542fn duplicate_uri(parts: &Parts) -> Option<Parts> {
1543    let uri = Uri::builder()
1544        .authority(parts.authority.clone()?)
1545        .path_and_query(parts.path_and_query.clone()?)
1546        .scheme(parts.scheme.clone()?);
1547    Some(uri.build().ok()?.into_parts())
1548}
1549
1550fn uri_parts_with_defaults(uri: &str) -> Parts {
1551    let mut uri_parts = uri.parse::<Uri>().unwrap().into_parts();
1552    uri_parts.scheme = Some(Scheme::HTTPS);
1553    uri_parts.path_and_query = Some(PathAndQuery::from_static(""));
1554    uri_parts
1555}
1556
1557fn metadata_from_parts(parts: &http::request::Parts) -> Metadata {
1558    let mut md = HashMap::new();
1559    for (k, v) in parts.headers.iter() {
1560        let k = k.to_string();
1561        let v = Strings {
1562            values: vec![HeaderValue::to_str(v).unwrap().to_string()],
1563        };
1564        md.insert(k, v);
1565    }
1566    Metadata { md }
1567}