1use super::{
2 client_channel::*,
3 log_prefixes,
4 webrtc::{webrtc_action_with_timeout, Options},
5};
6use crate::gen::google;
7use crate::gen::proto::rpc::v1::{
8 auth_service_client::AuthServiceClient, AuthenticateRequest, Credentials,
9};
10use crate::gen::proto::rpc::webrtc::v1::{
11 call_response::Stage, call_update_request::Update,
12 signaling_service_client::SignalingServiceClient, CallUpdateRequest,
13 OptionalWebRtcConfigRequest, OptionalWebRtcConfigResponse,
14};
15use crate::gen::proto::rpc::webrtc::v1::{
16 CallRequest, IceCandidate, Metadata, RequestHeaders, Strings,
17};
18use crate::rpc::webrtc;
19use ::http::header::HeaderName;
20use ::http::{
21 uri::{Authority, Parts, PathAndQuery, Scheme},
22 HeaderValue, Version,
23};
24use ::viam_mdns::{discover, RecordKind, Response};
25use ::webrtc::ice_transport::{
26 ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
27 ice_connection_state::RTCIceConnectionState,
28};
29use ::webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
30use anyhow::{Context, Result};
31use core::fmt;
32use futures::stream::FuturesUnordered;
33use futures_util::{pin_mut, stream::StreamExt};
34use local_ip_address::list_afinet_netifas;
35use std::{
36 collections::HashMap,
37 net::{IpAddr, Ipv4Addr},
38 sync::{
39 atomic::{AtomicBool, Ordering},
40 Arc, Mutex, RwLock,
41 },
42 task::{Context as TaskContext, Poll},
43 time::{Duration, Instant},
44};
45use tokio::sync::{mpsc, watch};
46use tonic::body::BoxBody;
47use tonic::codegen::BoxFuture;
48use tonic::transport::{Body, Channel, Uri};
49
50#[derive(Debug)]
84struct NoCertVerification;
85
86impl rustls::client::ServerCertVerifier for NoCertVerification {
87 fn verify_server_cert(
88 &self,
89 _end_entity: &rustls::Certificate,
90 _intermediates: &[rustls::Certificate],
91 _server_name: &rustls::ServerName,
92 _scts: &mut dyn Iterator<Item = &[u8]>,
93 _ocsp_response: &[u8],
94 _now: std::time::SystemTime,
95 ) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
96 Ok(rustls::client::ServerCertVerified::assertion())
97 }
98}
99use tower::{Service, ServiceBuilder};
100use tower_http::auth::AddAuthorization;
101use tower_http::auth::AddAuthorizationLayer;
102use tower_http::set_header::{SetRequestHeader, SetRequestHeaderLayer};
103
104const STATUS_CODE_OK: i32 = 0;
106const STATUS_CODE_UNKNOWN: i32 = 2;
107const STATUS_CODE_RESOURCE_EXHAUSTED: i32 = 8;
108
109pub const VIAM_MDNS_SERVICE_NAME: &'static str = "_rpc._tcp.local";
110
111type SecretType = String;
112
113#[derive(Clone)]
114pub enum ViamChannel {
117 Direct(Channel),
118 DirectPreAuthorized(AddAuthorization<SetRequestHeader<Channel, HeaderValue>>),
119 WebRTC(Arc<WebRTCClientChannel>),
120}
121
122#[derive(Debug, Clone)]
123pub struct RPCCredentials {
124 entity: Option<String>,
125 credentials: Credentials,
126}
127
128impl RPCCredentials {
129 pub fn new(entity: Option<String>, r#type: SecretType, payload: String) -> Self {
130 Self {
131 credentials: Credentials { r#type, payload },
132 entity,
133 }
134 }
135}
136
137impl ViamChannel {
138 async fn create_resp(
139 channel: &mut Arc<WebRTCClientChannel>,
140 stream: crate::gen::proto::rpc::webrtc::v1::Stream,
141 request: http::Request<BoxBody>,
142 response: http::response::Builder,
143 ) -> http::Response<Body> {
144 let (parts, body) = request.into_parts();
145 let mut status_code = STATUS_CODE_OK;
146 let stream_id = stream.id;
147 let metadata = Some(metadata_from_parts(&parts));
148 let headers = RequestHeaders {
149 method: parts
150 .uri
151 .path_and_query()
152 .map(PathAndQuery::to_string)
153 .unwrap_or_default(),
154 metadata,
155 timeout: None,
156 };
157
158 if let Err(e) = channel.write_headers(&stream, headers).await {
159 log::error!("error writing headers: {e}");
160 channel.close_stream_with_recv_error(stream_id, e);
161 status_code = STATUS_CODE_UNKNOWN;
162 }
163
164 let data = hyper::body::to_bytes(body).await.unwrap().to_vec();
165 if let Err(e) = channel.write_message(Some(stream), data).await {
166 log::error!("error sending message: {e}");
167 channel.close_stream_with_recv_error(stream_id, e);
168 status_code = STATUS_CODE_UNKNOWN;
169 };
170
171 let body = match channel.resp_body_from_stream(stream_id) {
172 Ok(body) => body,
173 Err(e) => {
174 log::error!("error receiving response from stream: {e}");
175 channel.close_stream_with_recv_error(stream_id, e);
176 status_code = STATUS_CODE_UNKNOWN;
177 Body::empty()
178 }
179 };
180
181 let response = if status_code != STATUS_CODE_OK {
182 response.header("grpc-status", &status_code.to_string())
183 } else {
184 response
185 };
186
187 response.body(body).unwrap()
188 }
189}
190
191impl Service<http::Request<BoxBody>> for ViamChannel {
192 type Response = http::Response<Body>;
193 type Error = tonic::transport::Error;
194 type Future = BoxFuture<Self::Response, Self::Error>;
195
196 fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
197 match self {
198 Self::Direct(channel) => channel.poll_ready(cx),
199 Self::DirectPreAuthorized(channel) => channel.poll_ready(cx),
200 Self::WebRTC(_channel) => Poll::Ready(Ok(())),
201 }
202 }
203
204 fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
205 match self {
206 Self::Direct(channel) => Box::pin(channel.call(request)),
207 Self::DirectPreAuthorized(channel) => Box::pin(channel.call(request)),
208 Self::WebRTC(channel) => {
209 let mut channel = channel.clone();
210 let fut = async move {
211 let response = http::response::Response::builder()
212 .header("content-type", "application/grpc")
214 .version(Version::HTTP_2);
215
216 match channel.new_stream() {
217 Err(e) => {
218 log::error!("{e}");
219 let response = response
220 .header("grpc-status", &STATUS_CODE_RESOURCE_EXHAUSTED.to_string())
221 .body(Body::default())
222 .unwrap();
223
224 Ok(response)
225 }
226 Ok(stream) => {
227 Ok(Self::create_resp(&mut channel, stream, request, response).await)
228 }
229 }
230 };
231 Box::pin(fut)
232 }
233 }
234 }
235}
236
237#[derive(Debug)]
239pub struct DialOptions {
240 credentials: Option<RPCCredentials>,
241 webrtc_options: Option<Options>,
242 uri: Option<Parts>,
243 disable_mdns: bool,
244 allow_downgrade: bool,
245 insecure: bool,
246 signaling_server_override: Option<String>,
247}
248#[derive(Clone)]
249pub struct WantsCredentials(());
250#[derive(Clone)]
251pub struct WantsUri(());
252#[derive(Clone)]
253pub struct WithCredentials(());
254#[derive(Clone)]
255pub struct WithoutCredentials(());
256
257pub trait AuthMethod {}
258impl AuthMethod for WithCredentials {}
259impl AuthMethod for WithoutCredentials {}
260#[allow(dead_code)]
262pub struct DialBuilder<T> {
263 state: T,
264 config: DialOptions,
265}
266
267impl<T> fmt::Debug for DialBuilder<T> {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 f.debug_struct("Dial")
270 .field("State", &format_args!("{}", &std::any::type_name::<T>()))
271 .field("Opt", &format_args!("{:?}", self.config))
272 .finish()
273 }
274}
275
276impl DialOptions {
277 pub fn builder() -> DialBuilder<WantsUri> {
279 DialBuilder {
280 state: WantsUri(()),
281 config: DialOptions {
282 credentials: None,
283 uri: None,
284 allow_downgrade: false,
285 disable_mdns: false,
286 insecure: false,
287 webrtc_options: None,
288 signaling_server_override: None,
289 },
290 }
291 }
292}
293
294impl DialBuilder<WantsUri> {
295 pub fn uri(self, uri: &str) -> DialBuilder<WantsCredentials> {
297 let uri_parts = uri_parts_with_defaults(uri);
298 DialBuilder {
299 state: WantsCredentials(()),
300 config: DialOptions {
301 credentials: None,
302 uri: Some(uri_parts),
303 allow_downgrade: false,
304 disable_mdns: false,
305 insecure: false,
306 webrtc_options: None,
307 signaling_server_override: None,
308 },
309 }
310 }
311}
312impl DialBuilder<WantsCredentials> {
313 pub fn without_credentials(self) -> DialBuilder<WithoutCredentials> {
315 DialBuilder {
316 state: WithoutCredentials(()),
317 config: DialOptions {
318 credentials: None,
319 uri: self.config.uri,
320 allow_downgrade: false,
321 disable_mdns: false,
322 insecure: false,
323 webrtc_options: None,
324 signaling_server_override: None,
325 },
326 }
327 }
328 pub fn with_credentials(self, creds: RPCCredentials) -> DialBuilder<WithCredentials> {
330 DialBuilder {
331 state: WithCredentials(()),
332 config: DialOptions {
333 credentials: Some(creds),
334 uri: self.config.uri,
335 allow_downgrade: false,
336 disable_mdns: false,
337 insecure: false,
338 webrtc_options: None,
339 signaling_server_override: None,
340 },
341 }
342 }
343}
344
345impl<T: AuthMethod> DialBuilder<T> {
346 pub fn insecure(mut self) -> Self {
348 self.config.insecure = true;
349 self
350 }
351 pub fn allow_downgrade(mut self) -> Self {
353 self.config.allow_downgrade = true;
354 self
355 }
356 pub fn disable_mdns(mut self) -> Self {
358 self.config.disable_mdns = true;
359 self
360 }
361
362 pub fn disable_webrtc(mut self) -> Self {
366 let webrtc_options = Options::default().disable_webrtc();
367 self.config.webrtc_options = Some(webrtc_options);
368 self
369 }
370
371 pub fn force_relay(mut self) -> Self {
374 self.config
375 .webrtc_options
376 .get_or_insert_with(Options::default)
377 .force_relay = true;
378 self
379 }
380
381 pub fn force_p2p(mut self) -> Self {
384 self.config
385 .webrtc_options
386 .get_or_insert_with(Options::default)
387 .force_p2p = true;
388 self
389 }
390
391 pub fn turn_uri(mut self, uri: String) -> Self {
395 self.config
396 .webrtc_options
397 .get_or_insert_with(Options::default)
398 .turn_uri = Some(uri);
399 self
400 }
401
402 pub fn signaling_server(mut self, address: String) -> Self {
404 self.config.signaling_server_override = Some(address);
405 self
406 }
407
408 async fn get_addr_from_interface(
409 iface: (&str, Vec<&IpAddr>),
410 candidates: &Vec<String>,
411 local_ipv4s: &std::collections::HashSet<Ipv4Addr>,
412 ) -> Option<String> {
413 let addresses: Vec<Ipv4Addr> = iface
414 .1
415 .iter()
416 .filter_map(|ip| match ip {
417 IpAddr::V4(v4) => Some(*v4),
418 IpAddr::V6(_) => None,
419 })
420 .collect();
421
422 let mut resp: Option<Response> = None;
423 for ipv4 in addresses {
424 for candidate in candidates {
425 let discovery = match discover::interface_with_loopback(
426 VIAM_MDNS_SERVICE_NAME,
427 Duration::from_millis(250),
428 ipv4,
429 ) {
430 Ok(d) => d,
431 Err(e) => {
432 log::debug!("mDNS socket error on {ipv4}: {e}");
433 continue;
434 }
435 };
436 let stream = discovery.listen();
437 pin_mut!(stream);
438 while let Some(Ok(response)) = stream.next().await {
439 if let Some(hostname) = response.hostname() {
440 let local_agnostic_candidate = candidate.as_str().split("viam").next()?;
448 log::debug!(
449 "mDNS response on {ipv4}: hostname={hostname:?}, candidate={candidate:?}, local_agnostic={local_agnostic_candidate:?}, matches={}",
450 hostname.contains(local_agnostic_candidate)
451 );
452 if hostname.contains(local_agnostic_candidate) {
453 resp = Some(response);
454 break;
455 }
456 } else {
457 log::debug!(
458 "mDNS response on {ipv4}: no hostname (no PTR record); answers={:?}",
459 response.answers
460 );
461 }
462 if resp.is_some() {
463 break;
464 }
465 }
466 }
467 }
468
469 let resp = resp?;
470 let mut has_grpc = false;
471 let mut has_webrtc = false;
472 for field in resp.txt_records() {
473 has_grpc = has_grpc || field.contains("grpc");
474 has_webrtc = has_webrtc || field.contains("webrtc");
475 }
476
477 log::debug!(
479 "mDNS matched response records: {:?}",
480 resp.records().collect::<Vec<_>>()
481 );
482
483 let ip_addr = resp
490 .records()
491 .filter_map(|r| match r.kind {
492 RecordKind::A(addr) if !addr.is_loopback() && local_ipv4s.contains(&addr) => {
493 Some(addr)
494 }
495 _ => None,
496 })
497 .next()
498 .or_else(|| {
499 resp.records().find_map(|r| match r.kind {
500 RecordKind::A(addr) => Some(addr),
501 _ => None,
502 })
503 });
504
505 if !(has_grpc || has_webrtc) || ip_addr.is_none() {
506 return None;
507 }
508 let mut local_addr = ip_addr?.to_string();
509 local_addr.push(':');
510 local_addr.push_str(&resp.port()?.to_string());
511 log::debug!("mDNS resolved address: {local_addr}");
512 Some(local_addr)
513 }
514
515 fn duplicate_uri(&self) -> Option<Parts> {
516 match &self.config.uri {
517 None => None,
518 Some(uri) => duplicate_uri(uri),
519 }
520 }
521
522 async fn get_mdns_uri(&self) -> Option<Parts> {
523 log::debug!("{}", log_prefixes::MDNS_QUERY_ATTEMPT);
524 if self.config.disable_mdns {
525 return None;
526 }
527
528 let mut uri = self.duplicate_uri()?;
529 let candidate = uri.authority.clone()?.to_string();
530
531 let candidates: Vec<String> = vec![candidate.replace('.', "-"), candidate];
532
533 let ifaces = list_afinet_netifas().ok()?;
534
535 let local_ipv4s: std::collections::HashSet<Ipv4Addr> = ifaces
541 .iter()
542 .filter_map(|(_, ip)| match ip {
543 IpAddr::V4(v4) => Some(*v4),
544 _ => None,
545 })
546 .collect();
547
548 let ifaces: HashMap<&str, Vec<&IpAddr>> =
549 ifaces.iter().fold(HashMap::new(), |mut map, (k, v)| {
550 map.entry(k).or_default().push(v);
551 map
552 });
553
554 let mut iface_futures = FuturesUnordered::new();
555 for iface in ifaces {
556 iface_futures.push(Self::get_addr_from_interface(
557 iface,
558 &candidates,
559 &local_ipv4s,
560 ));
561 }
562
563 let mut local_addr: Option<String> = None;
564 while let Some(maybe_addr) = iface_futures.next().await {
565 if maybe_addr.is_some() {
566 local_addr = maybe_addr;
567 break;
568 }
569 }
570 let local_addr = match local_addr {
571 None => {
572 log::debug!("Unable to connect via mDNS");
573 return None;
574 }
575 Some(addr) => {
576 log::debug!("{}: {addr}", log_prefixes::MDNS_ADDRESS_FOUND);
577 addr
578 }
579 };
580
581 let auth = local_addr.parse::<Authority>().ok()?;
582 uri.authority = Some(auth);
583 uri.scheme = Some(Scheme::HTTP);
584
585 Some(uri)
586 }
587
588 async fn create_channel(allow_downgrade: bool, uri: Uri, for_mdns: bool) -> Result<Channel> {
592 if for_mdns {
593 let host = uri.host().unwrap_or("");
594 let is_loopback = host
595 .parse::<std::net::IpAddr>()
596 .map(|ip| ip.is_loopback())
597 .unwrap_or(false);
598
599 if is_loopback {
600 log::debug!("mDNS create_channel: loopback {host}, using tonic h2c connect");
608 return Channel::builder(uri.clone())
609 .connect()
610 .await
611 .with_context(|| format!("Connecting to {:?}", uri));
612 }
613
614 log::debug!("mDNS create_channel: LAN address {host}, using no-cert-verify TLS");
621 let tls_config = rustls::ClientConfig::builder()
622 .with_safe_defaults()
623 .with_custom_certificate_verifier(Arc::new(NoCertVerification))
624 .with_no_client_auth();
625
626 let connector = hyper_rustls::HttpsConnectorBuilder::new()
627 .with_tls_config(tls_config)
628 .https_or_http()
629 .enable_http2()
630 .build();
631
632 let mut parts = uri.into_parts();
633 parts.scheme = Some(Scheme::HTTPS);
634 let uri = Uri::from_parts(parts)?;
635 log::debug!("mDNS create_channel: connecting to {uri:?}");
636 return Channel::builder(uri.clone())
637 .connect_with_connector(connector)
638 .await
639 .with_context(|| format!("Connecting to {:?}", uri));
640 }
641
642 let chan = match Channel::builder(uri.clone())
643 .connect()
644 .await
645 .with_context(|| format!("Connecting to {:?}", uri.clone()))
646 {
647 Ok(c) => c,
648 Err(e) => {
649 if allow_downgrade {
650 let mut uri_parts = uri.clone().into_parts();
651 uri_parts.scheme = Some(Scheme::HTTP);
652 let uri = Uri::from_parts(uri_parts)?;
653 Channel::builder(uri).connect().await?
654 } else {
655 return Err(anyhow::anyhow!(e));
656 }
657 }
658 };
659 Ok(chan)
660 }
661}
662
663impl DialBuilder<WithoutCredentials> {
664 fn clone(&self) -> Self {
665 DialBuilder {
666 state: WithoutCredentials(()),
667 config: DialOptions {
668 credentials: None,
669 webrtc_options: self.config.webrtc_options.clone(),
670 uri: self.duplicate_uri(),
671 disable_mdns: self.config.disable_mdns,
672 allow_downgrade: self.config.allow_downgrade,
673 insecure: self.config.insecure,
674 signaling_server_override: self.config.signaling_server_override.clone(),
675 },
676 }
677 }
678
679 async fn connect_inner(
681 self,
682 mdns_uri: Option<Parts>,
683 mut original_uri_parts: Parts,
684 ) -> Result<ViamChannel> {
685 let webrtc_options = self.config.webrtc_options;
686 let disable_webrtc = match &webrtc_options {
687 Some(options) => options.disable_webrtc,
688 None => false,
689 };
690 if self.config.insecure {
691 original_uri_parts.scheme = Some(Scheme::HTTP);
692 }
693 let original_uri = Uri::from_parts(original_uri_parts)?;
694 let uri2 = original_uri.clone();
695 let uri = infer_remote_uri_from_authority(
696 original_uri,
697 self.config.signaling_server_override.as_deref(),
698 );
699 let domain = uri2.authority().to_owned().unwrap().as_str();
700
701 let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
702 let attempting_mdns = mdns_uri.is_some();
703 if attempting_mdns {
704 log::debug!("Attempting to connect via mDNS");
705 } else {
706 log::debug!("Attempting to connect");
707 }
708
709 let channel = match mdns_uri {
710 Some(uri) => Self::create_channel(self.config.allow_downgrade, uri, true).await,
712 None => Err(anyhow::anyhow!("")),
715 };
716
717 let channel = match channel {
718 Ok(c) => {
719 log::debug!("Connected via mDNS");
720 c
721 }
722 Err(e) => {
723 if attempting_mdns {
724 log::debug!(
725 "Unable to connect via mDNS; falling back to robot URI. Error: {e:#}"
726 );
727 }
728 Self::create_channel(self.config.allow_downgrade, uri.clone(), false).await?
730 }
731 };
732
733 let intercepted_channel = ServiceBuilder::new()
736 .layer(AddAuthorizationLayer::basic(
737 "fake username",
738 "fake password",
739 ))
740 .layer(SetRequestHeaderLayer::overriding(
741 HeaderName::from_static("rpc-host"),
742 HeaderValue::from_str(domain)?,
743 ))
744 .service(channel.clone());
745
746 if disable_webrtc {
747 log::debug!("{}", log_prefixes::DIALED_GRPC);
748 Ok(ViamChannel::Direct(channel.clone()))
749 } else {
750 match maybe_connect_via_webrtc(uri, intercepted_channel.clone(), webrtc_options).await {
751 Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
752 Err(e) => {
753 log::error!("error connecting via webrtc: {e}. Attempting to connect directly");
754 log::debug!("{}", log_prefixes::DIALED_GRPC);
755 Ok(ViamChannel::Direct(channel.clone()))
756 }
757 }
758 }
759 }
760
761 async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
762 let mdns_uri =
763 webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
764 .await
765 .ok()
766 .flatten()
767 .ok_or(anyhow::anyhow!(
768 "Unable to establish connection via mDNS; uri not found"
769 ))?;
770
771 self.connect_inner(Some(mdns_uri), original_uri).await
772 }
773
774 pub async fn connect(self) -> Result<ViamChannel> {
775 log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
776 let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
777 "Attempting to connect but there was no uri"
778 ))?;
779 let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
780 "Attempting to connect but there was no uri"
781 ))?;
782
783 let skip_mdns = self.config.disable_mdns;
784
785 tokio::pin! {
795 let with_mdns = self.clone().connect_mdns(original_uri);
796 let without_mdns = self.connect_inner(None, original_uri2);
797 }
798 let mut with_mdns_err: Option<anyhow::Error> =
799 skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
800 let mut without_mdns_err: Option<anyhow::Error> = None;
801 while with_mdns_err.is_none() || without_mdns_err.is_none() {
802 tokio::select! {
803 with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
804 match with_mdns {
805 Ok(chan) => return Ok(chan),
806 Err(e) => {
807 log::debug!("Error connecting with mdns: {e}");
808 with_mdns_err = Some(e);
809 }
810 }
811 }
812 without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
813 match without_mdns {
814 Ok(chan) => return Ok(chan),
815 Err(e) => {
816 log::debug!("Error connecting without mdns: {e}");
817 without_mdns_err = Some(e);
818 }
819 }
820 }
821 }
822 }
823 Err(anyhow::anyhow!(
824 "Unable to connect with or without mdns.
825 with_mdns err: {with_mdns_err:?}
826 without_mdns err: {without_mdns_err:?}"
827 ))
828 }
829}
830
831async fn get_auth_token(
832 channel: &mut Channel,
833 creds: Credentials,
834 entity: String,
835) -> Result<String> {
836 let mut auth_service = AuthServiceClient::new(channel);
837 let req = AuthenticateRequest {
838 entity,
839 credentials: Some(creds),
840 };
841
842 let rsp = auth_service.authenticate(req).await?;
843 Ok(rsp.into_inner().access_token)
844}
845
846impl DialBuilder<WithCredentials> {
847 fn clone(&self) -> Self {
848 DialBuilder {
849 state: WithCredentials(()),
850 config: DialOptions {
851 credentials: self.config.credentials.clone(),
852 webrtc_options: self.config.webrtc_options.clone(),
853 uri: self.duplicate_uri(),
854 disable_mdns: self.config.disable_mdns,
855 allow_downgrade: self.config.allow_downgrade,
856 insecure: self.config.insecure,
857 signaling_server_override: self.config.signaling_server_override.clone(),
858 },
859 }
860 }
861
862 async fn connect_inner(
863 self,
864 mdns_uri: Option<Parts>,
865 mut original_uri_parts: Parts,
866 ) -> Result<ViamChannel> {
867 let is_insecure = self.config.insecure;
868
869 let webrtc_options = self.config.webrtc_options;
870 let disable_webrtc = match &webrtc_options {
871 Some(options) => options.disable_webrtc,
872 None => false,
873 };
874
875 if is_insecure {
876 original_uri_parts.scheme = Some(Scheme::HTTP);
877 }
878
879 let original_uri = Uri::from_parts(original_uri_parts)?;
880
881 let domain = original_uri.authority().unwrap().to_string();
882 let uri_for_auth = infer_remote_uri_from_authority(
883 original_uri.clone(),
884 self.config.signaling_server_override.as_deref(),
885 );
886
887 let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
888 let attempting_mdns = mdns_uri.is_some();
889
890 let allow_downgrade = self.config.allow_downgrade;
891 if attempting_mdns {
892 log::debug!("Attempting to connect via mDNS");
893 } else {
894 log::debug!("Attempting to connect");
895 }
896 let channel = match mdns_uri {
897 Some(uri) => Self::create_channel(allow_downgrade, uri, true).await,
899 None => Err(anyhow::anyhow!("")),
902 };
903 let real_channel = match channel {
904 Ok(c) => {
905 log::debug!("Connected via mDNS");
906 c
907 }
908 Err(e) => {
909 if attempting_mdns {
910 log::debug!(
911 "Unable to connect via mDNS; falling back to robot URI. Error: {e:#}"
912 );
913 }
914 Self::create_channel(allow_downgrade, uri_for_auth, false).await?
916 }
917 };
918
919 log::debug!("{}", log_prefixes::ACQUIRING_AUTH_TOKEN);
920 let token = get_auth_token(
921 &mut real_channel.clone(),
922 self.config
923 .credentials
924 .as_ref()
925 .unwrap()
926 .credentials
927 .clone(),
928 self.config
929 .credentials
930 .unwrap()
931 .entity
932 .unwrap_or_else(|| domain.clone()),
933 )
934 .await?;
935 log::debug!("{}", log_prefixes::ACQUIRED_AUTH_TOKEN);
936
937 let channel = ServiceBuilder::new()
938 .layer(AddAuthorizationLayer::bearer(&token))
939 .layer(SetRequestHeaderLayer::overriding(
940 HeaderName::from_static("rpc-host"),
941 HeaderValue::from_str(domain.as_str())?,
942 ))
943 .service(real_channel);
944
945 if disable_webrtc {
946 log::debug!("Connected via gRPC");
947 Ok(ViamChannel::DirectPreAuthorized(channel))
948 } else {
949 match maybe_connect_via_webrtc(original_uri, channel.clone(), webrtc_options).await {
950 Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
951 Err(e) => {
952 log::error!(
953 "Unable to establish webrtc connection due to error: [{e}]. Attempting direct connection."
954 );
955 log::debug!("Connected via gRPC");
956 Ok(ViamChannel::DirectPreAuthorized(channel))
957 }
958 }
959 }
960 }
961
962 async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
963 let mdns_uri =
968 webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
969 .await
970 .ok()
971 .flatten()
972 .ok_or(anyhow::anyhow!(
973 "Unable to establish connection via mDNS; uri not found"
974 ))?;
975
976 self.connect_inner(Some(mdns_uri), original_uri).await
977 }
978
979 pub async fn connect(self) -> Result<ViamChannel> {
981 log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
982 let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
983 "Attempting to connect but there was no uri"
984 ))?;
985 let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
986 "Attempting to connect but there was no uri"
987 ))?;
988
989 let skip_mdns = self.config.disable_mdns;
990
991 tokio::pin! {
1001 let with_mdns = self.clone().connect_mdns(original_uri);
1002 let without_mdns = self.connect_inner(None, original_uri2);
1003 }
1004 let mut with_mdns_err: Option<anyhow::Error> =
1005 skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
1006 let mut without_mdns_err: Option<anyhow::Error> = None;
1007 while with_mdns_err.is_none() || without_mdns_err.is_none() {
1008 tokio::select! {
1009 with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
1010 match with_mdns {
1011 Ok(chan) => return Ok(chan),
1012 Err(e) => {
1013 log::debug!("Error connecting with mdns: {e}");
1014 with_mdns_err = Some(e);
1015 }
1016 }
1017 }
1018 without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
1019 match without_mdns {
1020 Ok(chan) => return Ok(chan),
1021 Err(e) => {
1022 log::debug!("Error connecting without mdns: {e}");
1023 without_mdns_err = Some(e);
1024 }
1025 }
1026 }
1027 }
1028 }
1029 Err(anyhow::anyhow!(
1030 "Unable to connect with or without mdns.
1031 with_mdns err: {with_mdns_err:?}
1032 without_mdns err: {without_mdns_err:?}"
1033 ))
1034 }
1035}
1036
1037async fn send_done_or_error_update(
1038 update: CallUpdateRequest,
1039 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1040) {
1041 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1042
1043 if let Err(e) = signaling_client
1044 .call_update(update)
1045 .await
1046 .map_err(anyhow::Error::from)
1047 .map(|_| ())
1048 {
1049 log::error!("Error sending done or error update: {e}")
1050 }
1051}
1052
1053async fn send_error_once(
1054 sent_error: Arc<AtomicBool>,
1055 uuid: &String,
1056 err: &anyhow::Error,
1057 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1058) {
1059 if sent_error.load(Ordering::Acquire) {
1060 return;
1061 }
1062
1063 let err = google::rpc::Status {
1064 code: google::rpc::Code::Unknown.into(),
1065 message: err.to_string(),
1066 details: Vec::new(),
1067 };
1068 sent_error.store(true, Ordering::Release);
1069 let update_request = CallUpdateRequest {
1070 uuid: uuid.to_string(),
1071 update: Some(Update::Error(err)),
1072 };
1073
1074 send_done_or_error_update(update_request, channel).await
1075}
1076
1077async fn send_done_once(
1078 sent_done: Arc<AtomicBool>,
1079 uuid: &String,
1080 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1081) {
1082 if sent_done.load(Ordering::Acquire) {
1083 return;
1084 }
1085 sent_done.store(true, Ordering::Release);
1086 let update_request = CallUpdateRequest {
1087 uuid: uuid.to_string(),
1088 update: Some(Update::Done(true)),
1089 };
1090
1091 send_done_or_error_update(update_request, channel).await
1092}
1093
1094#[derive(Default)]
1095struct CallerUpdateStats {
1096 count: u128,
1097 total_duration: Duration,
1098 max_duration: Duration,
1099}
1100
1101impl fmt::Display for CallerUpdateStats {
1102 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1103 let average_duration = &self.total_duration.as_millis() / &self.count;
1104 writeln!(
1105 f,
1106 "Caller update statistics: num_updates: {}, average_duration: {}ms, max_duration: {}ms",
1107 &self.count,
1108 average_duration,
1109 &self.max_duration.as_millis()
1110 )?;
1111 Ok(())
1112 }
1113}
1114
1115async fn maybe_connect_via_webrtc(
1116 uri: Uri,
1117 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1118 webrtc_options: Option<Options>,
1119) -> Result<Arc<WebRTCClientChannel>> {
1120 let webrtc_options = webrtc_options.unwrap_or_else(|| Options::infer_from_uri(uri.clone()));
1121 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1122 let response = match signaling_client
1123 .optional_web_rtc_config(OptionalWebRtcConfigRequest::default())
1124 .await
1125 {
1126 Ok(resp) => resp,
1127 Err(e) => {
1128 if e.code() == tonic::Code::Unimplemented {
1129 tonic::Response::new(OptionalWebRtcConfigResponse::default())
1130 } else {
1131 return Err(anyhow::anyhow!(e));
1132 }
1133 }
1134 };
1135
1136 let optional_config = response.into_inner().config;
1137
1138 if webrtc_options.force_relay && webrtc_options.force_p2p {
1139 log::warn!("force_relay and force_p2p are both set; forceP2P strips TURN servers that forceRelay requires so the connection will fail");
1140 }
1141
1142 let (base_config, optional_config) = webrtc::apply_ice_policy(
1143 webrtc_options.config,
1144 optional_config,
1145 webrtc_options.force_relay,
1146 webrtc_options.force_p2p,
1147 );
1148
1149 if webrtc_options.force_relay {
1150 log::debug!("force relay enabled; using relay-only ICE transport policy");
1151 }
1152
1153 if webrtc_options.force_p2p {
1154 log::debug!("force P2P enabled; stripping TURN servers and ignoring signaling server ICE config");
1155 }
1156
1157 let mut config = webrtc::extend_webrtc_config(base_config, optional_config);
1158
1159 if webrtc_options.force_p2p && webrtc_options.turn_uri.is_some() {
1160 log::warn!("force_p2p is set alongside turn_uri; the TURN filter will have no effect since TURN servers were already stripped");
1161 }
1162 let turn_uri = webrtc_options.turn_uri.as_deref().and_then(|s| {
1163 let parsed = webrtc::TurnUri::parse(s);
1164 if parsed.is_none() {
1165 log::warn!("Failed to parse turn_uri, ignoring: {s:?}");
1166 }
1167 parsed
1168 });
1169 config = webrtc::apply_turn_options(config, turn_uri.as_ref());
1170 if let Some(ref uri) = turn_uri {
1171 log::debug!("TURN filter options set: turn_uri={uri:?}");
1172 }
1173
1174 let (peer_connection, data_channel) =
1175 webrtc::new_peer_connection_for_client(config, webrtc_options.disable_trickle_ice).await?;
1176
1177 let sent_done_or_error = Arc::new(AtomicBool::new(false));
1178 let uuid_lock = Arc::new(RwLock::new("".to_string()));
1179 let uuid_for_ice_gathering_thread = uuid_lock.clone();
1180
1181 let (is_open_s, mut is_open_r) = mpsc::channel(1);
1185 let on_open_is_open = is_open_s.clone();
1186
1187 data_channel.on_open(Box::new(move || {
1188 let _ = on_open_is_open.try_send(None); Box::pin(async move {})
1190 }));
1191
1192 let exchange_done = Arc::new(AtomicBool::new(false));
1193 let (remote_description_set_s, remote_description_set_r) = watch::channel(None);
1194 let ice_done = Arc::new(tokio::sync::Notify::new());
1195 let ice_done2 = ice_done.clone();
1196 let caller_update_stats = Arc::new(Mutex::new(CallerUpdateStats::default()));
1197
1198 if !webrtc_options.disable_trickle_ice {
1199 let offer = peer_connection.create_offer(None).await?;
1200 let channel2 = channel.clone();
1201 let uuid_lock2 = uuid_lock.clone();
1202 let sent_done_or_error2 = sent_done_or_error.clone();
1203
1204 let exchange_done = exchange_done.clone();
1205
1206 let on_local_ice_candidate_failure = is_open_s.clone();
1207
1208 let caller_update_stats = caller_update_stats.clone();
1209 let caller_update_stats2 = caller_update_stats.clone();
1210 peer_connection.on_ice_connection_state_change(Box::new(
1211 move |state: RTCIceConnectionState| {
1212 let caller_update_stats = caller_update_stats.clone();
1213 Box::pin(async move {
1214 if state == RTCIceConnectionState::Completed {
1215 let caller_update_stats_inner = caller_update_stats.lock().unwrap();
1216 log::debug!("{}", caller_update_stats_inner);
1217 }
1218 })
1219 },
1220 ));
1221 peer_connection.on_ice_candidate(Box::new(
1222 move |ice_candidate: Option<RTCIceCandidate>| {
1223 if exchange_done.load(Ordering::Acquire) {
1224 return Box::pin(async move {});
1225 }
1226 let channel = channel2.clone();
1227 let sent_done_or_error = sent_done_or_error2.clone();
1228 let ice_done = ice_done.clone();
1229 let uuid_lock = uuid_lock2.clone();
1230 let on_local_ice_candidate_failure = on_local_ice_candidate_failure.clone();
1231 let mut remote_description_set_r = remote_description_set_r.clone();
1232 let caller_update_stats = caller_update_stats2.clone();
1233 Box::pin(async move {
1234 if remote_description_set_r.borrow().is_none() {
1238 match webrtc_action_with_timeout(remote_description_set_r.changed()).await {
1239 Ok(Err(e)) => {
1240 let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1241 anyhow::anyhow!(
1242 "remote description watch channel is closed with error {e}"
1243 ),
1244 )));
1245 }
1246 Err(_) => {
1247 log::info!(
1248 "timed out on_ice_candidate; remote description was never set"
1249 );
1250 let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1251 anyhow::anyhow!("timed out waiting for remote description"),
1252 )));
1253 }
1254 _ => (),
1255 }
1256 }
1257
1258 let uuid = uuid_lock.read().unwrap().to_string();
1259 if uuid.is_empty() {
1271 log::debug!(
1272 "UUID never updated. This is likely because we never received a response \
1273 from the signaling client. This happens occasionally with parallel dialing \
1274 and isn't concerning provided connection still occurs."
1275 );
1276 return;
1277 }
1278 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1279 match ice_candidate {
1280 Some(ice_candidate) => {
1281 log::debug!("Gathered local candidate of {ice_candidate}");
1282 if sent_done_or_error.load(Ordering::Acquire) {
1283 return;
1284 }
1285 let proto_candidate = ice_candidate_to_proto(ice_candidate).await;
1286 match proto_candidate {
1287 Ok(proto_candidate) => {
1288 let update_request = CallUpdateRequest {
1289 uuid: uuid.clone(),
1290 update: Some(Update::Candidate(proto_candidate)),
1291 };
1292 let call_update_start = Instant::now();
1293 if let Err(e) = webrtc_action_with_timeout(
1294 signaling_client.call_update(update_request),
1295 )
1296 .await
1297 .and_then(|resp| resp.map_err(anyhow::Error::from))
1298 {
1299 log::error!("Error sending ice candidate: {e}");
1300 let _ = on_local_ice_candidate_failure.try_send(Some(
1301 Box::new(anyhow::anyhow!(
1302 "Error sending ice candidate: {e}"
1303 )),
1304 ));
1305 }
1306 let mut caller_update_stats_inner =
1307 caller_update_stats.lock().unwrap();
1308 caller_update_stats_inner.count += 1;
1309 let call_update_duration = call_update_start.elapsed();
1310 if call_update_duration > caller_update_stats_inner.max_duration
1311 {
1312 caller_update_stats_inner.max_duration =
1313 call_update_duration;
1314 }
1315 caller_update_stats_inner.total_duration +=
1316 call_update_duration;
1317 }
1318 Err(e) => log::error!("Error parsing ice candidate: {e}"),
1319 }
1320 }
1321 None => {
1322 ice_done.notify_one();
1324 send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1325 }
1326 }
1327 })
1328 },
1329 ));
1330
1331 peer_connection.set_local_description(offer).await?;
1332 }
1333
1334 let local_description = peer_connection.local_description().await.unwrap();
1335
1336 log::debug!(
1338 "{}\n{}",
1339 log_prefixes::START_LOCAL_SESSION_DESCRIPTION,
1340 local_description.sdp
1341 );
1342 log::debug!("{}", log_prefixes::END_LOCAL_SESSION_DESCRIPTION);
1343
1344 let sdp = encode_sdp(local_description)?;
1345 let call_request = CallRequest {
1346 sdp,
1347 disable_trickle: webrtc_options.disable_trickle_ice,
1348 };
1349
1350 let client_channel = WebRTCClientChannel::new(peer_connection, data_channel).await;
1351 let client_channel_for_ice_gathering_thread = Arc::downgrade(&client_channel);
1352 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1353 let mut call_client = signaling_client.call(call_request).await?.into_inner();
1354
1355 let channel2 = channel.clone();
1356 let sent_done_or_error2 = sent_done_or_error.clone();
1357 tokio::spawn(async move {
1358 let uuid = uuid_for_ice_gathering_thread;
1359 let client_channel = client_channel_for_ice_gathering_thread;
1360 let init_received = AtomicBool::new(false);
1361 let sent_done = sent_done_or_error2;
1362
1363 loop {
1364 let response = match webrtc_action_with_timeout(call_client.message())
1365 .await
1366 .and_then(|resp| resp.map_err(anyhow::Error::from))
1367 {
1368 Ok(cr) => match cr {
1369 Some(cr) => cr,
1370 None => {
1371 let _ = webrtc_action_with_timeout(ice_done2.notified()).await;
1374 let uuid = uuid.read().unwrap().to_string();
1375 send_done_once(sent_done.clone(), &uuid, channel2.clone()).await;
1376 break;
1377 }
1378 },
1379 Err(e) => {
1380 log::error!("Error processing call response: {e}");
1381 let _ = is_open_s.try_send(Some(Box::new(e)));
1382 break;
1383 }
1384 };
1385
1386 match response.stage {
1387 Some(Stage::Init(init)) => {
1388 if init_received.load(Ordering::Acquire) {
1389 let uuid = uuid.read().unwrap().to_string();
1390 let e = anyhow::anyhow!("Init received more than once");
1391 send_error_once(sent_done.clone(), &uuid, &e, channel2.clone()).await;
1392 let _ = is_open_s.try_send(Some(Box::new(e)));
1393 break;
1394 }
1395 init_received.store(true, Ordering::Release);
1396 {
1397 let mut uuid_s = uuid.write().unwrap();
1398 uuid_s.clone_from(&response.uuid);
1399 }
1400
1401 let answer = match decode_sdp(init.sdp) {
1402 Ok(a) => a,
1403 Err(e) => {
1404 send_error_once(
1405 sent_done.clone(),
1406 &response.uuid,
1407 &e,
1408 channel2.clone(),
1409 )
1410 .await;
1411 let _ = is_open_s.try_send(Some(Box::new(e)));
1412 break;
1413 }
1414 };
1415 {
1416 let cc = match client_channel.upgrade() {
1417 Some(cc) => cc,
1418 None => {
1419 break;
1420 }
1421 };
1422 if let Err(e) = cc
1423 .base_channel
1424 .peer_connection
1425 .set_remote_description(answer)
1426 .await
1427 {
1428 let e = anyhow::Error::from(e);
1429 send_error_once(
1430 sent_done.clone(),
1431 &response.uuid,
1432 &e,
1433 channel2.clone(),
1434 )
1435 .await;
1436 let _ = is_open_s.try_send(Some(Box::new(e)));
1437 break;
1438 }
1439 }
1440 let _ = remote_description_set_s.send_replace(Some(()));
1441 if webrtc_options.disable_trickle_ice {
1442 send_done_once(sent_done.clone(), &response.uuid, channel2.clone()).await;
1443 break;
1444 }
1445 }
1446
1447 Some(Stage::Update(update)) => {
1448 let uuid_s = uuid.read().unwrap().to_string();
1449 if !init_received.load(Ordering::Acquire) {
1450 let e = anyhow::anyhow!("Got update before init stage");
1451 send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1452 let _ = is_open_s.try_send(Some(Box::new(e)));
1453 break;
1454 }
1455
1456 if response.uuid != *uuid.read().unwrap() {
1457 let e = anyhow::anyhow!(
1458 "uuid mismatch: have {}, want {}",
1459 response.uuid,
1460 uuid_s,
1461 );
1462 send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1463 let _ = is_open_s.try_send(Some(Box::new(e)));
1464 break;
1465 }
1466 match ice_candidate_from_proto(update.candidate) {
1467 Ok(candidate) => {
1468 let client_channel = match client_channel.upgrade() {
1469 Some(cc) => cc,
1470 None => {
1471 break;
1472 }
1473 };
1474 log::debug!("Received remote ICE candidate of {candidate:#?}");
1475 if let Err(e) = client_channel
1476 .base_channel
1477 .peer_connection
1478 .add_ice_candidate(candidate)
1479 .await
1480 {
1481 let e = anyhow::Error::from(e);
1482 send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone())
1483 .await;
1484 let _ = is_open_s.try_send(Some(Box::new(e)));
1485 break;
1486 }
1487 }
1488 Err(e) => log::error!("Error parsing ice candidate: {e}"),
1489 }
1490 }
1491 None => continue,
1492 }
1493 }
1494 });
1495
1496 let is_open = webrtc_action_with_timeout(is_open_r.recv()).await;
1500 match is_open {
1501 Ok(is_open) => {
1502 if let Some(Some(e)) = is_open {
1503 return Err(anyhow::anyhow!("Couldn't connect to peer with error {e}"));
1504 }
1505 }
1506 Err(_) => {
1507 return Err(anyhow::anyhow!("Timed out opening data channel."));
1508 }
1509 }
1510
1511 exchange_done.store(true, Ordering::Release);
1512 let uuid = uuid_lock.read().unwrap().to_string();
1513 send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1514 Ok(client_channel)
1515}
1516
1517async fn ice_candidate_to_proto(ice_candidate: RTCIceCandidate) -> Result<IceCandidate> {
1518 let ice_candidate = ice_candidate.to_json()?;
1519 Ok(IceCandidate {
1520 candidate: ice_candidate.candidate,
1521 sdp_mid: ice_candidate.sdp_mid,
1522 sdpm_line_index: ice_candidate.sdp_mline_index.map(u32::from),
1523 username_fragment: ice_candidate.username_fragment,
1524 })
1525}
1526
1527fn ice_candidate_from_proto(proto: Option<IceCandidate>) -> Result<RTCIceCandidateInit> {
1528 match proto {
1529 Some(proto) => {
1530 let proto_sdpm: usize = proto.sdpm_line_index().try_into()?;
1531 let sdp_mline_index: Option<u16> = proto_sdpm.try_into().ok();
1532
1533 Ok(RTCIceCandidateInit {
1534 candidate: proto.candidate.clone(),
1535 sdp_mid: Some(proto.sdp_mid().to_string()),
1536 sdp_mline_index,
1537 username_fragment: Some(proto.username_fragment().to_string()),
1538 })
1539 }
1540 None => Err(anyhow::anyhow!("No ice candidate provided")),
1541 }
1542}
1543
1544fn decode_sdp(sdp: String) -> Result<RTCSessionDescription> {
1545 let sdp = String::from_utf8(base64::decode(sdp)?)?;
1546 Ok(serde_json::from_str::<RTCSessionDescription>(&sdp)?)
1547}
1548
1549fn encode_sdp(sdp: RTCSessionDescription) -> Result<String> {
1550 let sdp = serde_json::to_vec(&sdp)?;
1551 Ok(base64::encode(sdp))
1552}
1553
1554fn infer_remote_uri_from_authority(uri: Uri, override_addr: Option<&str>) -> Uri {
1555 if let Some(addr) = override_addr {
1556 return Uri::from_parts(uri_parts_with_defaults(addr)).unwrap_or_else(|e| {
1557 log::warn!("Failed to parse signaling server override {addr:?}: {e}; falling back to original URI");
1558 uri
1559 });
1560 }
1561 let authority = uri.authority().map(Authority::as_str).unwrap_or_default();
1562 let is_local_connection = authority.contains(".local.viam.cloud")
1563 || authority.contains("localhost")
1564 || authority.contains("0.0.0.0");
1565
1566 if !is_local_connection {
1567 if let Some((new_uri, _)) = Options::infer_signaling_server_address(&uri) {
1568 return Uri::from_parts(uri_parts_with_defaults(&new_uri)).unwrap_or(uri);
1569 }
1570 }
1571 uri
1572}
1573
1574fn duplicate_uri(parts: &Parts) -> Option<Parts> {
1575 let uri = Uri::builder()
1576 .authority(parts.authority.clone()?)
1577 .path_and_query(parts.path_and_query.clone()?)
1578 .scheme(parts.scheme.clone()?);
1579 Some(uri.build().ok()?.into_parts())
1580}
1581
1582fn uri_parts_with_defaults(uri: &str) -> Parts {
1583 let mut uri_parts = uri.parse::<Uri>().unwrap().into_parts();
1584 uri_parts.scheme = Some(Scheme::HTTPS);
1585 uri_parts.path_and_query = Some(PathAndQuery::from_static(""));
1586 uri_parts
1587}
1588
1589fn metadata_from_parts(parts: &http::request::Parts) -> Metadata {
1590 let mut md = HashMap::new();
1591 for (k, v) in parts.headers.iter() {
1592 let k = k.to_string();
1593 let v = Strings {
1594 values: vec![HeaderValue::to_str(v).unwrap().to_string()],
1595 };
1596 md.insert(k, v);
1597 }
1598 Metadata { md }
1599}