1use super::{
2 client_channel::*,
3 log_prefixes,
4 webrtc::{webrtc_action_with_timeout, Options},
5};
6use crate::gen::google;
7use crate::gen::proto::rpc::v1::{
8 auth_service_client::AuthServiceClient, AuthenticateRequest, Credentials,
9};
10use crate::gen::proto::rpc::webrtc::v1::{
11 call_response::Stage, call_update_request::Update,
12 signaling_service_client::SignalingServiceClient, CallUpdateRequest,
13 OptionalWebRtcConfigRequest, OptionalWebRtcConfigResponse,
14};
15use crate::gen::proto::rpc::webrtc::v1::{
16 CallRequest, IceCandidate, Metadata, RequestHeaders, Strings,
17};
18use crate::rpc::webrtc;
19use ::http::header::HeaderName;
20use ::http::{
21 uri::{Authority, Parts, PathAndQuery, Scheme},
22 HeaderValue, Version,
23};
24use ::viam_mdns::{discover, RecordKind, Response};
25use ::webrtc::ice_transport::{
26 ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
27 ice_connection_state::RTCIceConnectionState,
28};
29use ::webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
30use anyhow::{Context, Result};
31use core::fmt;
32use futures::stream::FuturesUnordered;
33use futures_util::{pin_mut, stream::StreamExt};
34use local_ip_address::list_afinet_netifas;
35use std::{
36 collections::HashMap,
37 net::{IpAddr, Ipv4Addr},
38 sync::{
39 atomic::{AtomicBool, Ordering},
40 Arc, Mutex, RwLock,
41 },
42 task::{Context as TaskContext, Poll},
43 time::{Duration, Instant},
44};
45use tokio::sync::{mpsc, watch};
46use tonic::body::BoxBody;
47use tonic::codegen::BoxFuture;
48use tonic::transport::{Body, Channel, ClientTlsConfig, Uri};
49
50use tower::{Service, ServiceBuilder};
51use tower_http::auth::AddAuthorization;
52use tower_http::auth::AddAuthorizationLayer;
53use tower_http::set_header::{SetRequestHeader, SetRequestHeaderLayer};
54
55const STATUS_CODE_OK: i32 = 0;
57const STATUS_CODE_UNKNOWN: i32 = 2;
58const STATUS_CODE_RESOURCE_EXHAUSTED: i32 = 8;
59
60pub const VIAM_MDNS_SERVICE_NAME: &'static str = "_rpc._tcp.local";
61
62type SecretType = String;
63
64#[derive(Clone)]
65pub enum ViamChannel {
68 Direct(Channel),
69 DirectPreAuthorized(AddAuthorization<SetRequestHeader<Channel, HeaderValue>>),
70 WebRTC(Arc<WebRTCClientChannel>),
71}
72
73#[derive(Debug, Clone)]
74pub struct RPCCredentials {
75 entity: Option<String>,
76 credentials: Credentials,
77}
78
79impl RPCCredentials {
80 pub fn new(entity: Option<String>, r#type: SecretType, payload: String) -> Self {
81 Self {
82 credentials: Credentials { r#type, payload },
83 entity,
84 }
85 }
86}
87
88impl ViamChannel {
89 async fn create_resp(
90 channel: &mut Arc<WebRTCClientChannel>,
91 stream: crate::gen::proto::rpc::webrtc::v1::Stream,
92 request: http::Request<BoxBody>,
93 response: http::response::Builder,
94 ) -> http::Response<Body> {
95 let (parts, body) = request.into_parts();
96 let mut status_code = STATUS_CODE_OK;
97 let stream_id = stream.id;
98 let metadata = Some(metadata_from_parts(&parts));
99 let headers = RequestHeaders {
100 method: parts
101 .uri
102 .path_and_query()
103 .map(PathAndQuery::to_string)
104 .unwrap_or_default(),
105 metadata,
106 timeout: None,
107 };
108
109 if let Err(e) = channel.write_headers(&stream, headers).await {
110 log::error!("error writing headers: {e}");
111 channel.close_stream_with_recv_error(stream_id, e);
112 status_code = STATUS_CODE_UNKNOWN;
113 }
114
115 let data = hyper::body::to_bytes(body).await.unwrap().to_vec();
116 if let Err(e) = channel.write_message(Some(stream), data).await {
117 log::error!("error sending message: {e}");
118 channel.close_stream_with_recv_error(stream_id, e);
119 status_code = STATUS_CODE_UNKNOWN;
120 };
121
122 let body = match channel.resp_body_from_stream(stream_id) {
123 Ok(body) => body,
124 Err(e) => {
125 log::error!("error receiving response from stream: {e}");
126 channel.close_stream_with_recv_error(stream_id, e);
127 status_code = STATUS_CODE_UNKNOWN;
128 Body::empty()
129 }
130 };
131
132 let response = if status_code != STATUS_CODE_OK {
133 response.header("grpc-status", &status_code.to_string())
134 } else {
135 response
136 };
137
138 response.body(body).unwrap()
139 }
140}
141
142impl Service<http::Request<BoxBody>> for ViamChannel {
143 type Response = http::Response<Body>;
144 type Error = tonic::transport::Error;
145 type Future = BoxFuture<Self::Response, Self::Error>;
146
147 fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
148 match self {
149 Self::Direct(channel) => channel.poll_ready(cx),
150 Self::DirectPreAuthorized(channel) => channel.poll_ready(cx),
151 Self::WebRTC(_channel) => Poll::Ready(Ok(())),
152 }
153 }
154
155 fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
156 match self {
157 Self::Direct(channel) => Box::pin(channel.call(request)),
158 Self::DirectPreAuthorized(channel) => Box::pin(channel.call(request)),
159 Self::WebRTC(channel) => {
160 let mut channel = channel.clone();
161 let fut = async move {
162 let response = http::response::Response::builder()
163 .header("content-type", "application/grpc")
165 .version(Version::HTTP_2);
166
167 match channel.new_stream() {
168 Err(e) => {
169 log::error!("{e}");
170 let response = response
171 .header("grpc-status", &STATUS_CODE_RESOURCE_EXHAUSTED.to_string())
172 .body(Body::default())
173 .unwrap();
174
175 Ok(response)
176 }
177 Ok(stream) => {
178 Ok(Self::create_resp(&mut channel, stream, request, response).await)
179 }
180 }
181 };
182 Box::pin(fut)
183 }
184 }
185 }
186}
187
188#[derive(Debug)]
190pub struct DialOptions {
191 credentials: Option<RPCCredentials>,
192 webrtc_options: Option<Options>,
193 uri: Option<Parts>,
194 disable_mdns: bool,
195 allow_downgrade: bool,
196 insecure: bool,
197 signaling_server_override: Option<String>,
198}
199#[derive(Clone)]
200pub struct WantsCredentials(());
201#[derive(Clone)]
202pub struct WantsUri(());
203#[derive(Clone)]
204pub struct WithCredentials(());
205#[derive(Clone)]
206pub struct WithoutCredentials(());
207
208pub trait AuthMethod {}
209impl AuthMethod for WithCredentials {}
210impl AuthMethod for WithoutCredentials {}
211#[allow(dead_code)]
213pub struct DialBuilder<T> {
214 state: T,
215 config: DialOptions,
216}
217
218impl<T> fmt::Debug for DialBuilder<T> {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 f.debug_struct("Dial")
221 .field("State", &format_args!("{}", &std::any::type_name::<T>()))
222 .field("Opt", &format_args!("{:?}", self.config))
223 .finish()
224 }
225}
226
227impl DialOptions {
228 pub fn builder() -> DialBuilder<WantsUri> {
230 DialBuilder {
231 state: WantsUri(()),
232 config: DialOptions {
233 credentials: None,
234 uri: None,
235 allow_downgrade: false,
236 disable_mdns: false,
237 insecure: false,
238 webrtc_options: None,
239 signaling_server_override: None,
240 },
241 }
242 }
243}
244
245impl DialBuilder<WantsUri> {
246 pub fn uri(self, uri: &str) -> DialBuilder<WantsCredentials> {
248 let uri_parts = uri_parts_with_defaults(uri);
249 DialBuilder {
250 state: WantsCredentials(()),
251 config: DialOptions {
252 credentials: None,
253 uri: Some(uri_parts),
254 allow_downgrade: false,
255 disable_mdns: false,
256 insecure: false,
257 webrtc_options: None,
258 signaling_server_override: None,
259 },
260 }
261 }
262}
263impl DialBuilder<WantsCredentials> {
264 pub fn without_credentials(self) -> DialBuilder<WithoutCredentials> {
266 DialBuilder {
267 state: WithoutCredentials(()),
268 config: DialOptions {
269 credentials: None,
270 uri: self.config.uri,
271 allow_downgrade: false,
272 disable_mdns: false,
273 insecure: false,
274 webrtc_options: None,
275 signaling_server_override: None,
276 },
277 }
278 }
279 pub fn with_credentials(self, creds: RPCCredentials) -> DialBuilder<WithCredentials> {
281 DialBuilder {
282 state: WithCredentials(()),
283 config: DialOptions {
284 credentials: Some(creds),
285 uri: self.config.uri,
286 allow_downgrade: false,
287 disable_mdns: false,
288 insecure: false,
289 webrtc_options: None,
290 signaling_server_override: None,
291 },
292 }
293 }
294}
295
296impl<T: AuthMethod> DialBuilder<T> {
297 pub fn insecure(mut self) -> Self {
299 self.config.insecure = true;
300 self
301 }
302 pub fn allow_downgrade(mut self) -> Self {
304 self.config.allow_downgrade = true;
305 self
306 }
307 pub fn disable_mdns(mut self) -> Self {
309 self.config.disable_mdns = true;
310 self
311 }
312
313 pub fn disable_webrtc(mut self) -> Self {
317 let webrtc_options = Options::default().disable_webrtc();
318 self.config.webrtc_options = Some(webrtc_options);
319 self
320 }
321
322 pub fn force_relay(mut self) -> Self {
325 self.config
326 .webrtc_options
327 .get_or_insert_with(Options::default)
328 .force_relay = true;
329 self
330 }
331
332 pub fn force_p2p(mut self) -> Self {
335 self.config
336 .webrtc_options
337 .get_or_insert_with(Options::default)
338 .force_p2p = true;
339 self
340 }
341
342 pub fn turn_uri(mut self, uri: String) -> Self {
346 self.config
347 .webrtc_options
348 .get_or_insert_with(Options::default)
349 .turn_uri = Some(uri);
350 self
351 }
352
353 pub fn signaling_server(mut self, address: String) -> Self {
355 self.config.signaling_server_override = Some(address);
356 self
357 }
358
359 async fn get_addr_from_interface(
360 iface: (&str, Vec<&IpAddr>),
361 candidates: &Vec<String>,
362 local_ipv4s: &std::collections::HashSet<Ipv4Addr>,
363 ) -> Option<String> {
364 let addresses: Vec<Ipv4Addr> = iface
365 .1
366 .iter()
367 .filter_map(|ip| match ip {
368 IpAddr::V4(v4) => Some(*v4),
369 IpAddr::V6(_) => None,
370 })
371 .collect();
372
373 let mut resp: Option<Response> = None;
374 for ipv4 in addresses {
375 for candidate in candidates {
376 let discovery = match discover::interface_with_loopback(
377 VIAM_MDNS_SERVICE_NAME,
378 Duration::from_millis(250),
379 ipv4,
380 ) {
381 Ok(d) => d,
382 Err(e) => {
383 log::debug!("mDNS socket error on {ipv4}: {e}");
384 continue;
385 }
386 };
387 let stream = discovery.listen();
388 pin_mut!(stream);
389 while let Some(Ok(response)) = stream.next().await {
390 if let Some(hostname) = response.hostname() {
391 let local_agnostic_candidate = candidate.as_str().split("viam").next()?;
399 log::debug!(
400 "mDNS response on {ipv4}: hostname={hostname:?}, candidate={candidate:?}, local_agnostic={local_agnostic_candidate:?}, matches={}",
401 hostname.contains(local_agnostic_candidate)
402 );
403 if hostname.contains(local_agnostic_candidate) {
404 resp = Some(response);
405 break;
406 }
407 } else {
408 log::debug!(
409 "mDNS response on {ipv4}: no hostname (no PTR record); answers={:?}",
410 response.answers
411 );
412 }
413 if resp.is_some() {
414 break;
415 }
416 }
417 }
418 }
419
420 let resp = resp?;
421 let mut has_grpc = false;
422 let mut has_webrtc = false;
423 for field in resp.txt_records() {
424 has_grpc = has_grpc || field.contains("grpc");
425 has_webrtc = has_webrtc || field.contains("webrtc");
426 }
427
428 log::debug!(
430 "mDNS matched response records: {:?}",
431 resp.records().collect::<Vec<_>>()
432 );
433
434 let ip_addr = resp
451 .records()
452 .filter_map(|r| match r.kind {
453 RecordKind::A(addr) if !addr.is_loopback() && local_ipv4s.contains(&addr) => {
454 Some(addr)
455 }
456 _ => None,
457 })
458 .next()
459 .or_else(|| {
460 resp.records()
461 .find_map(|r| match r.kind {
462 RecordKind::A(addr) if local_ipv4s.contains(&addr) => Some(addr),
463 _ => None,
464 })
465 .or_else(|| {
466 resp.records().find_map(|r| match r.kind {
467 RecordKind::A(addr) => Some(addr),
468 _ => None,
469 })
470 })
471 });
472
473 if !(has_grpc || has_webrtc) || ip_addr.is_none() {
474 return None;
475 }
476 let mut local_addr = ip_addr?.to_string();
477 local_addr.push(':');
478 local_addr.push_str(&resp.port()?.to_string());
479 log::debug!("mDNS resolved address: {local_addr}");
480 Some(local_addr)
481 }
482
483 fn duplicate_uri(&self) -> Option<Parts> {
484 match &self.config.uri {
485 None => None,
486 Some(uri) => duplicate_uri(uri),
487 }
488 }
489
490 async fn get_mdns_uri(&self) -> Option<Parts> {
491 log::debug!("{}", log_prefixes::MDNS_QUERY_ATTEMPT);
492 if self.config.disable_mdns {
493 return None;
494 }
495
496 let mut uri = self.duplicate_uri()?;
497 let candidate = uri.authority.clone()?.to_string();
498
499 let candidates: Vec<String> = vec![candidate.replace('.', "-"), candidate];
500
501 let ifaces = list_afinet_netifas().ok()?;
502
503 let local_ipv4s: std::collections::HashSet<Ipv4Addr> = ifaces
509 .iter()
510 .filter_map(|(_, ip)| match ip {
511 IpAddr::V4(v4) => Some(*v4),
512 _ => None,
513 })
514 .collect();
515
516 let ifaces: HashMap<&str, Vec<&IpAddr>> =
517 ifaces.iter().fold(HashMap::new(), |mut map, (k, v)| {
518 map.entry(k).or_default().push(v);
519 map
520 });
521
522 let mut iface_futures = FuturesUnordered::new();
523 for iface in ifaces {
524 iface_futures.push(Self::get_addr_from_interface(
525 iface,
526 &candidates,
527 &local_ipv4s,
528 ));
529 }
530
531 let mut local_addr: Option<String> = None;
532 while let Some(maybe_addr) = iface_futures.next().await {
533 if maybe_addr.is_some() {
534 local_addr = maybe_addr;
535 break;
536 }
537 }
538 let local_addr = match local_addr {
539 None => {
540 log::debug!("Unable to connect via mDNS");
541 return None;
542 }
543 Some(addr) => {
544 log::debug!("{}: {addr}", log_prefixes::MDNS_ADDRESS_FOUND);
545 addr
546 }
547 };
548
549 let auth = local_addr.parse::<Authority>().ok()?;
550 uri.authority = Some(auth);
551
552 Some(uri)
553 }
554
555 async fn create_channel(
556 allow_downgrade: bool,
557 domain: &str,
558 uri: Uri,
559 for_mdns: bool,
560 ) -> Result<Channel> {
561 if for_mdns {
562 let host = uri.host().unwrap_or("");
563 log::debug!("mDNS create_channel: connecting to {host} with TLS");
567 let tls_config = ClientTlsConfig::new().domain_name(domain);
568 let mut parts = uri.clone().into_parts();
569 parts.scheme = Some(Scheme::HTTPS);
570 let tls_uri = Uri::from_parts(parts)?;
571 match Channel::builder(tls_uri.clone())
572 .tls_config(tls_config)?
573 .connect()
574 .await
575 .with_context(|| format!("Connecting to {:?}", tls_uri))
576 {
577 Ok(channel) => return Ok(channel),
578 Err(e) => {
582 if allow_downgrade {
583 let mut parts = uri.into_parts();
584 parts.scheme = Some(Scheme::HTTP);
585 let uri = Uri::from_parts(parts)?;
586 log::debug!(
587 "mDNS TLS connect failed ({e:#}); downgrading to plaintext h2c {uri:?}"
588 );
589 return Channel::builder(uri.clone())
590 .connect()
591 .await
592 .with_context(|| format!("Connecting to {:?}", uri));
593 }
594 return Err(e);
595 }
596 }
597 }
598
599 let chan = match Channel::builder(uri.clone())
600 .connect()
601 .await
602 .with_context(|| format!("Connecting to {:?}", uri.clone()))
603 {
604 Ok(c) => c,
605 Err(e) => {
606 if allow_downgrade {
607 let mut uri_parts = uri.clone().into_parts();
608 uri_parts.scheme = Some(Scheme::HTTP);
609 let uri = Uri::from_parts(uri_parts)?;
610 Channel::builder(uri).connect().await?
611 } else {
612 return Err(anyhow::anyhow!(e));
613 }
614 }
615 };
616 Ok(chan)
617 }
618}
619
620impl DialBuilder<WithoutCredentials> {
621 fn clone(&self) -> Self {
622 DialBuilder {
623 state: WithoutCredentials(()),
624 config: DialOptions {
625 credentials: None,
626 webrtc_options: self.config.webrtc_options.clone(),
627 uri: self.duplicate_uri(),
628 disable_mdns: self.config.disable_mdns,
629 allow_downgrade: self.config.allow_downgrade,
630 insecure: self.config.insecure,
631 signaling_server_override: self.config.signaling_server_override.clone(),
632 },
633 }
634 }
635
636 async fn connect_inner(
638 self,
639 mdns_uri: Option<Parts>,
640 mut original_uri_parts: Parts,
641 ) -> Result<ViamChannel> {
642 let webrtc_options = self.config.webrtc_options;
643 let disable_webrtc = match &webrtc_options {
644 Some(options) => options.disable_webrtc,
645 None => false,
646 };
647 if self.config.insecure {
648 original_uri_parts.scheme = Some(Scheme::HTTP);
649 }
650 let original_uri = Uri::from_parts(original_uri_parts)?;
651 let uri2 = original_uri.clone();
652 let uri = infer_remote_uri_from_authority(
653 original_uri,
654 self.config.signaling_server_override.as_deref(),
655 );
656 let domain = uri2.authority().to_owned().unwrap().as_str();
657
658 let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
659 let attempting_mdns = mdns_uri.is_some();
660 if attempting_mdns {
661 log::debug!("Attempting to connect via mDNS");
662 } else {
663 log::debug!("Attempting to connect");
664 }
665
666 let channel = match mdns_uri {
667 Some(uri) => Self::create_channel(self.config.allow_downgrade, domain, uri, true).await,
668 None => Err(anyhow::anyhow!("")),
671 };
672
673 let channel = match channel {
674 Ok(c) => {
675 log::debug!("Connected via mDNS");
676 c
677 }
678 Err(e) => {
679 if attempting_mdns {
680 log::debug!("Unable to connect via mDNS. Error: {e:#}");
686 return Err(e);
687 }
688 Self::create_channel(self.config.allow_downgrade, domain, uri.clone(), false)
689 .await?
690 }
691 };
692
693 let intercepted_channel = ServiceBuilder::new()
696 .layer(AddAuthorizationLayer::basic(
697 "fake username",
698 "fake password",
699 ))
700 .layer(SetRequestHeaderLayer::overriding(
701 HeaderName::from_static("rpc-host"),
702 HeaderValue::from_str(domain)?,
703 ))
704 .service(channel.clone());
705
706 if disable_webrtc || attempting_mdns {
708 log::debug!("{}", log_prefixes::DIALED_GRPC);
709 Ok(ViamChannel::Direct(channel.clone()))
710 } else {
711 match maybe_connect_via_webrtc(uri, intercepted_channel.clone(), webrtc_options).await {
712 Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
713 Err(e) => {
714 log::error!("error connecting via webrtc: {e}. Attempting to connect directly");
715 log::debug!("{}", log_prefixes::DIALED_GRPC);
716 Ok(ViamChannel::Direct(channel.clone()))
717 }
718 }
719 }
720 }
721
722 async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
723 let mdns_uri =
724 webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
725 .await
726 .ok()
727 .flatten()
728 .ok_or(anyhow::anyhow!(
729 "Unable to establish connection via mDNS; uri not found"
730 ))?;
731
732 self.connect_inner(Some(mdns_uri), original_uri).await
733 }
734
735 pub async fn connect(self) -> Result<ViamChannel> {
736 log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
737 let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
738 "Attempting to connect but there was no uri"
739 ))?;
740 let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
741 "Attempting to connect but there was no uri"
742 ))?;
743
744 let skip_mdns = self.config.disable_mdns;
745
746 tokio::pin! {
756 let with_mdns = self.clone().connect_mdns(original_uri);
757 let without_mdns = self.connect_inner(None, original_uri2);
758 }
759 let mut with_mdns_err: Option<anyhow::Error> =
760 skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
761 let mut without_mdns_err: Option<anyhow::Error> = None;
762 while with_mdns_err.is_none() || without_mdns_err.is_none() {
763 tokio::select! {
764 with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
765 match with_mdns {
766 Ok(chan) => return Ok(chan),
767 Err(e) => {
768 log::debug!("Error connecting with mdns: {e}");
769 with_mdns_err = Some(e);
770 }
771 }
772 }
773 without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
774 match without_mdns {
775 Ok(chan) => return Ok(chan),
776 Err(e) => {
777 log::debug!("Error connecting without mdns: {e}");
778 without_mdns_err = Some(e);
779 }
780 }
781 }
782 }
783 }
784 Err(anyhow::anyhow!(
785 "Unable to connect with or without mdns.
786 with_mdns err: {with_mdns_err:?}
787 without_mdns err: {without_mdns_err:?}"
788 ))
789 }
790}
791
792async fn get_auth_token(
793 channel: &mut Channel,
794 creds: Credentials,
795 entity: String,
796) -> Result<String> {
797 let mut auth_service = AuthServiceClient::new(channel);
798 let req = AuthenticateRequest {
799 entity,
800 credentials: Some(creds),
801 };
802
803 let rsp = auth_service.authenticate(req).await?;
804 Ok(rsp.into_inner().access_token)
805}
806
807impl DialBuilder<WithCredentials> {
808 fn clone(&self) -> Self {
809 DialBuilder {
810 state: WithCredentials(()),
811 config: DialOptions {
812 credentials: self.config.credentials.clone(),
813 webrtc_options: self.config.webrtc_options.clone(),
814 uri: self.duplicate_uri(),
815 disable_mdns: self.config.disable_mdns,
816 allow_downgrade: self.config.allow_downgrade,
817 insecure: self.config.insecure,
818 signaling_server_override: self.config.signaling_server_override.clone(),
819 },
820 }
821 }
822
823 async fn connect_inner(
824 self,
825 mdns_uri: Option<Parts>,
826 mut original_uri_parts: Parts,
827 ) -> Result<ViamChannel> {
828 let is_insecure = self.config.insecure;
829
830 let webrtc_options = self.config.webrtc_options;
831 let disable_webrtc = match &webrtc_options {
832 Some(options) => options.disable_webrtc,
833 None => false,
834 };
835
836 if is_insecure {
837 original_uri_parts.scheme = Some(Scheme::HTTP);
838 }
839
840 let original_uri = Uri::from_parts(original_uri_parts)?;
841
842 let domain = original_uri.authority().unwrap().to_string();
843 let uri_for_auth = infer_remote_uri_from_authority(
844 original_uri.clone(),
845 self.config.signaling_server_override.as_deref(),
846 );
847
848 let mdns_uri = mdns_uri.and_then(|p| Uri::from_parts(p).ok());
849 let attempting_mdns = mdns_uri.is_some();
850
851 let allow_downgrade = self.config.allow_downgrade;
852 if attempting_mdns {
853 log::debug!("Attempting to connect via mDNS");
854 } else {
855 log::debug!("Attempting to connect");
856 }
857 let channel = match mdns_uri {
858 Some(uri) => Self::create_channel(allow_downgrade, &domain, uri, true).await,
859 None => Err(anyhow::anyhow!("")),
862 };
863 let real_channel = match channel {
864 Ok(c) => {
865 log::debug!("Connected via mDNS");
866 c
867 }
868 Err(e) => {
869 if attempting_mdns {
870 log::debug!("Unable to connect via mDNS. Error: {e:#}");
876 return Err(e);
877 }
878 Self::create_channel(allow_downgrade, &domain, uri_for_auth, false).await?
879 }
880 };
881
882 log::debug!("{}", log_prefixes::ACQUIRING_AUTH_TOKEN);
883 let token = get_auth_token(
884 &mut real_channel.clone(),
885 self.config
886 .credentials
887 .as_ref()
888 .unwrap()
889 .credentials
890 .clone(),
891 self.config
892 .credentials
893 .unwrap()
894 .entity
895 .unwrap_or_else(|| domain.clone()),
896 )
897 .await?;
898 log::debug!("{}", log_prefixes::ACQUIRED_AUTH_TOKEN);
899
900 let channel = ServiceBuilder::new()
901 .layer(AddAuthorizationLayer::bearer(&token))
902 .layer(SetRequestHeaderLayer::overriding(
903 HeaderName::from_static("rpc-host"),
904 HeaderValue::from_str(domain.as_str())?,
905 ))
906 .service(real_channel);
907
908 if disable_webrtc || attempting_mdns {
910 log::debug!("Connected via gRPC");
911 Ok(ViamChannel::DirectPreAuthorized(channel))
912 } else {
913 match maybe_connect_via_webrtc(original_uri, channel.clone(), webrtc_options).await {
914 Ok(webrtc_channel) => Ok(ViamChannel::WebRTC(webrtc_channel)),
915 Err(e) => {
916 log::error!(
917 "Unable to establish webrtc connection due to error: [{e}]. Attempting direct connection."
918 );
919 log::debug!("Connected via gRPC");
920 Ok(ViamChannel::DirectPreAuthorized(channel))
921 }
922 }
923 }
924 }
925
926 async fn connect_mdns(self, original_uri: Parts) -> Result<ViamChannel> {
927 let mdns_uri =
932 webrtc::action_with_timeout(self.get_mdns_uri(), Duration::from_millis(1500))
933 .await
934 .ok()
935 .flatten()
936 .ok_or(anyhow::anyhow!(
937 "Unable to establish connection via mDNS; uri not found"
938 ))?;
939
940 self.connect_inner(Some(mdns_uri), original_uri).await
941 }
942
943 pub async fn connect(self) -> Result<ViamChannel> {
945 log::debug!("{}", log_prefixes::DIAL_ATTEMPT);
946 let original_uri = self.duplicate_uri().ok_or(anyhow::anyhow!(
947 "Attempting to connect but there was no uri"
948 ))?;
949 let original_uri2 = duplicate_uri(&original_uri).ok_or(anyhow::anyhow!(
950 "Attempting to connect but there was no uri"
951 ))?;
952
953 let skip_mdns = self.config.disable_mdns;
954
955 tokio::pin! {
965 let with_mdns = self.clone().connect_mdns(original_uri);
966 let without_mdns = self.connect_inner(None, original_uri2);
967 }
968 let mut with_mdns_err: Option<anyhow::Error> =
969 skip_mdns.then(|| anyhow::anyhow!("mDNS skipped"));
970 let mut without_mdns_err: Option<anyhow::Error> = None;
971 while with_mdns_err.is_none() || without_mdns_err.is_none() {
972 tokio::select! {
973 with_mdns = &mut with_mdns, if with_mdns_err.is_none() => {
974 match with_mdns {
975 Ok(chan) => return Ok(chan),
976 Err(e) => {
977 log::debug!("Error connecting with mdns: {e}");
978 with_mdns_err = Some(e);
979 }
980 }
981 }
982 without_mdns = &mut without_mdns, if without_mdns_err.is_none() => {
983 match without_mdns {
984 Ok(chan) => return Ok(chan),
985 Err(e) => {
986 log::debug!("Error connecting without mdns: {e}");
987 without_mdns_err = Some(e);
988 }
989 }
990 }
991 }
992 }
993 Err(anyhow::anyhow!(
994 "Unable to connect with or without mdns.
995 with_mdns err: {with_mdns_err:?}
996 without_mdns err: {without_mdns_err:?}"
997 ))
998 }
999}
1000
1001async fn send_done_or_error_update(
1002 update: CallUpdateRequest,
1003 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1004) {
1005 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1006
1007 if let Err(e) = signaling_client
1008 .call_update(update)
1009 .await
1010 .map_err(anyhow::Error::from)
1011 .map(|_| ())
1012 {
1013 log::error!("Error sending done or error update: {e}")
1014 }
1015}
1016
1017async fn send_error_once(
1018 sent_error: Arc<AtomicBool>,
1019 uuid: &String,
1020 err: &anyhow::Error,
1021 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1022) {
1023 if sent_error.load(Ordering::Acquire) {
1024 return;
1025 }
1026
1027 let err = google::rpc::Status {
1028 code: google::rpc::Code::Unknown.into(),
1029 message: err.to_string(),
1030 details: Vec::new(),
1031 };
1032 sent_error.store(true, Ordering::Release);
1033 let update_request = CallUpdateRequest {
1034 uuid: uuid.to_string(),
1035 update: Some(Update::Error(err)),
1036 };
1037
1038 send_done_or_error_update(update_request, channel).await
1039}
1040
1041async fn send_done_once(
1042 sent_done: Arc<AtomicBool>,
1043 uuid: &String,
1044 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1045) {
1046 if sent_done.load(Ordering::Acquire) {
1047 return;
1048 }
1049 sent_done.store(true, Ordering::Release);
1050 let update_request = CallUpdateRequest {
1051 uuid: uuid.to_string(),
1052 update: Some(Update::Done(true)),
1053 };
1054
1055 send_done_or_error_update(update_request, channel).await
1056}
1057
1058#[derive(Default)]
1059struct CallerUpdateStats {
1060 count: u128,
1061 total_duration: Duration,
1062 max_duration: Duration,
1063}
1064
1065impl fmt::Display for CallerUpdateStats {
1066 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1067 let average_duration = &self.total_duration.as_millis() / &self.count;
1068 writeln!(
1069 f,
1070 "Caller update statistics: num_updates: {}, average_duration: {}ms, max_duration: {}ms",
1071 &self.count,
1072 average_duration,
1073 &self.max_duration.as_millis()
1074 )?;
1075 Ok(())
1076 }
1077}
1078
1079async fn maybe_connect_via_webrtc(
1080 uri: Uri,
1081 channel: AddAuthorization<SetRequestHeader<Channel, HeaderValue>>,
1082 webrtc_options: Option<Options>,
1083) -> Result<Arc<WebRTCClientChannel>> {
1084 let webrtc_options = webrtc_options.unwrap_or_else(|| Options::infer_from_uri(uri.clone()));
1085 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1086 let response = match signaling_client
1087 .optional_web_rtc_config(OptionalWebRtcConfigRequest::default())
1088 .await
1089 {
1090 Ok(resp) => resp,
1091 Err(e) => {
1092 if e.code() == tonic::Code::Unimplemented {
1093 tonic::Response::new(OptionalWebRtcConfigResponse::default())
1094 } else {
1095 return Err(anyhow::anyhow!(e));
1096 }
1097 }
1098 };
1099
1100 let optional_config = response.into_inner().config;
1101
1102 if webrtc_options.force_relay && webrtc_options.force_p2p {
1103 log::warn!(
1104 "force_relay and force_p2p are both set; forceP2P strips TURN servers that forceRelay requires so the connection will fail");
1105 }
1106
1107 let (base_config, optional_config) = webrtc::apply_ice_policy(
1108 webrtc_options.config,
1109 optional_config,
1110 webrtc_options.force_relay,
1111 webrtc_options.force_p2p,
1112 );
1113
1114 if webrtc_options.force_relay {
1115 log::debug!("force relay enabled; using relay-only ICE transport policy");
1116 }
1117
1118 if webrtc_options.force_p2p {
1119 log::debug!(
1120 "force P2P enabled; stripping TURN servers and ignoring signaling server ICE config"
1121 );
1122 }
1123
1124 let mut config = webrtc::extend_webrtc_config(base_config, optional_config);
1125
1126 if webrtc_options.force_p2p && webrtc_options.turn_uri.is_some() {
1127 log::warn!("force_p2p is set alongside turn_uri; the TURN filter will have no effect since TURN servers were already stripped");
1128 }
1129 let turn_uri = webrtc_options.turn_uri.as_deref().and_then(|s| {
1130 let parsed = webrtc::TurnUri::parse(s);
1131 if parsed.is_none() {
1132 log::warn!("Failed to parse turn_uri, ignoring: {s:?}");
1133 }
1134 parsed
1135 });
1136 config = webrtc::apply_turn_options(config, turn_uri.as_ref());
1137 if let Some(ref uri) = turn_uri {
1138 log::debug!("TURN filter options set: turn_uri={uri:?}");
1139 }
1140
1141 let (peer_connection, data_channel) =
1142 webrtc::new_peer_connection_for_client(config, webrtc_options.disable_trickle_ice).await?;
1143
1144 let sent_done_or_error = Arc::new(AtomicBool::new(false));
1145 let uuid_lock = Arc::new(RwLock::new("".to_string()));
1146 let uuid_for_ice_gathering_thread = uuid_lock.clone();
1147
1148 let (is_open_s, mut is_open_r) = mpsc::channel(1);
1152 let on_open_is_open = is_open_s.clone();
1153
1154 data_channel.on_open(Box::new(move || {
1155 let _ = on_open_is_open.try_send(None); Box::pin(async move {})
1157 }));
1158
1159 let exchange_done = Arc::new(AtomicBool::new(false));
1160 let (remote_description_set_s, remote_description_set_r) = watch::channel(None);
1161 let ice_done = Arc::new(tokio::sync::Notify::new());
1162 let ice_done2 = ice_done.clone();
1163 let caller_update_stats = Arc::new(Mutex::new(CallerUpdateStats::default()));
1164
1165 if !webrtc_options.disable_trickle_ice {
1166 let offer = peer_connection.create_offer(None).await?;
1167 let channel2 = channel.clone();
1168 let uuid_lock2 = uuid_lock.clone();
1169 let sent_done_or_error2 = sent_done_or_error.clone();
1170
1171 let exchange_done = exchange_done.clone();
1172
1173 let on_local_ice_candidate_failure = is_open_s.clone();
1174
1175 let caller_update_stats = caller_update_stats.clone();
1176 let caller_update_stats2 = caller_update_stats.clone();
1177 peer_connection.on_ice_connection_state_change(Box::new(
1178 move |state: RTCIceConnectionState| {
1179 let caller_update_stats = caller_update_stats.clone();
1180 Box::pin(async move {
1181 if state == RTCIceConnectionState::Completed {
1182 let caller_update_stats_inner = caller_update_stats.lock().unwrap();
1183 log::debug!("{}", caller_update_stats_inner);
1184 }
1185 })
1186 },
1187 ));
1188 peer_connection.on_ice_candidate(Box::new(
1189 move |ice_candidate: Option<RTCIceCandidate>| {
1190 if exchange_done.load(Ordering::Acquire) {
1191 return Box::pin(async move {});
1192 }
1193 let channel = channel2.clone();
1194 let sent_done_or_error = sent_done_or_error2.clone();
1195 let ice_done = ice_done.clone();
1196 let uuid_lock = uuid_lock2.clone();
1197 let on_local_ice_candidate_failure = on_local_ice_candidate_failure.clone();
1198 let mut remote_description_set_r = remote_description_set_r.clone();
1199 let caller_update_stats = caller_update_stats2.clone();
1200 Box::pin(async move {
1201 if remote_description_set_r.borrow().is_none() {
1205 match webrtc_action_with_timeout(remote_description_set_r.changed()).await {
1206 Ok(Err(e)) => {
1207 let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1208 anyhow::anyhow!(
1209 "remote description watch channel is closed with error {e}"
1210 ),
1211 )));
1212 }
1213 Err(_) => {
1214 log::info!(
1215 "timed out on_ice_candidate; remote description was never set"
1216 );
1217 let _ = on_local_ice_candidate_failure.try_send(Some(Box::new(
1218 anyhow::anyhow!("timed out waiting for remote description"),
1219 )));
1220 }
1221 _ => (),
1222 }
1223 }
1224
1225 let uuid = uuid_lock.read().unwrap().to_string();
1226 if uuid.is_empty() {
1238 log::debug!(
1239 "UUID never updated. This is likely because we never received a response \
1240 from the signaling client. This happens occasionally with parallel dialing \
1241 and isn't concerning provided connection still occurs."
1242 );
1243 return;
1244 }
1245 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1246 match ice_candidate {
1247 Some(ice_candidate) => {
1248 log::debug!("Gathered local candidate of {ice_candidate}");
1249 if sent_done_or_error.load(Ordering::Acquire) {
1250 return;
1251 }
1252 let proto_candidate = ice_candidate_to_proto(ice_candidate).await;
1253 match proto_candidate {
1254 Ok(proto_candidate) => {
1255 let update_request = CallUpdateRequest {
1256 uuid: uuid.clone(),
1257 update: Some(Update::Candidate(proto_candidate)),
1258 };
1259 let call_update_start = Instant::now();
1260 if let Err(e) = webrtc_action_with_timeout(
1261 signaling_client.call_update(update_request),
1262 )
1263 .await
1264 .and_then(|resp| resp.map_err(anyhow::Error::from))
1265 {
1266 log::error!("Error sending ice candidate: {e}");
1267 let _ = on_local_ice_candidate_failure.try_send(Some(
1268 Box::new(anyhow::anyhow!(
1269 "Error sending ice candidate: {e}"
1270 )),
1271 ));
1272 }
1273 let mut caller_update_stats_inner =
1274 caller_update_stats.lock().unwrap();
1275 caller_update_stats_inner.count += 1;
1276 let call_update_duration = call_update_start.elapsed();
1277 if call_update_duration > caller_update_stats_inner.max_duration
1278 {
1279 caller_update_stats_inner.max_duration =
1280 call_update_duration;
1281 }
1282 caller_update_stats_inner.total_duration +=
1283 call_update_duration;
1284 }
1285 Err(e) => log::error!("Error parsing ice candidate: {e}"),
1286 }
1287 }
1288 None => {
1289 ice_done.notify_one();
1291 send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1292 }
1293 }
1294 })
1295 },
1296 ));
1297
1298 peer_connection.set_local_description(offer).await?;
1299 }
1300
1301 let local_description = peer_connection.local_description().await.unwrap();
1302
1303 log::debug!(
1305 "{}\n{}",
1306 log_prefixes::START_LOCAL_SESSION_DESCRIPTION,
1307 local_description.sdp
1308 );
1309 log::debug!("{}", log_prefixes::END_LOCAL_SESSION_DESCRIPTION);
1310
1311 let sdp = encode_sdp(local_description)?;
1312 let call_request = CallRequest {
1313 sdp,
1314 disable_trickle: webrtc_options.disable_trickle_ice,
1315 };
1316
1317 let client_channel = WebRTCClientChannel::new(peer_connection, data_channel).await;
1318 let client_channel_for_ice_gathering_thread = Arc::downgrade(&client_channel);
1319 let mut signaling_client = SignalingServiceClient::new(channel.clone());
1320 let mut call_client = signaling_client.call(call_request).await?.into_inner();
1321
1322 let channel2 = channel.clone();
1323 let sent_done_or_error2 = sent_done_or_error.clone();
1324 tokio::spawn(async move {
1325 let uuid = uuid_for_ice_gathering_thread;
1326 let client_channel = client_channel_for_ice_gathering_thread;
1327 let init_received = AtomicBool::new(false);
1328 let sent_done = sent_done_or_error2;
1329
1330 loop {
1331 let response = match webrtc_action_with_timeout(call_client.message())
1332 .await
1333 .and_then(|resp| resp.map_err(anyhow::Error::from))
1334 {
1335 Ok(cr) => match cr {
1336 Some(cr) => cr,
1337 None => {
1338 let _ = webrtc_action_with_timeout(ice_done2.notified()).await;
1341 let uuid = uuid.read().unwrap().to_string();
1342 send_done_once(sent_done.clone(), &uuid, channel2.clone()).await;
1343 break;
1344 }
1345 },
1346 Err(e) => {
1347 log::error!("Error processing call response: {e}");
1348 let _ = is_open_s.try_send(Some(Box::new(e)));
1349 break;
1350 }
1351 };
1352
1353 match response.stage {
1354 Some(Stage::Init(init)) => {
1355 if init_received.load(Ordering::Acquire) {
1356 let uuid = uuid.read().unwrap().to_string();
1357 let e = anyhow::anyhow!("Init received more than once");
1358 send_error_once(sent_done.clone(), &uuid, &e, channel2.clone()).await;
1359 let _ = is_open_s.try_send(Some(Box::new(e)));
1360 break;
1361 }
1362 init_received.store(true, Ordering::Release);
1363 {
1364 let mut uuid_s = uuid.write().unwrap();
1365 uuid_s.clone_from(&response.uuid);
1366 }
1367
1368 let answer = match decode_sdp(init.sdp) {
1369 Ok(a) => a,
1370 Err(e) => {
1371 send_error_once(
1372 sent_done.clone(),
1373 &response.uuid,
1374 &e,
1375 channel2.clone(),
1376 )
1377 .await;
1378 let _ = is_open_s.try_send(Some(Box::new(e)));
1379 break;
1380 }
1381 };
1382 {
1383 let cc = match client_channel.upgrade() {
1384 Some(cc) => cc,
1385 None => {
1386 break;
1387 }
1388 };
1389 if let Err(e) = cc
1390 .base_channel
1391 .peer_connection
1392 .set_remote_description(answer)
1393 .await
1394 {
1395 let e = anyhow::Error::from(e);
1396 send_error_once(
1397 sent_done.clone(),
1398 &response.uuid,
1399 &e,
1400 channel2.clone(),
1401 )
1402 .await;
1403 let _ = is_open_s.try_send(Some(Box::new(e)));
1404 break;
1405 }
1406 }
1407 let _ = remote_description_set_s.send_replace(Some(()));
1408 if webrtc_options.disable_trickle_ice {
1409 send_done_once(sent_done.clone(), &response.uuid, channel2.clone()).await;
1410 break;
1411 }
1412 }
1413
1414 Some(Stage::Update(update)) => {
1415 let uuid_s = uuid.read().unwrap().to_string();
1416 if !init_received.load(Ordering::Acquire) {
1417 let e = anyhow::anyhow!("Got update before init stage");
1418 send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1419 let _ = is_open_s.try_send(Some(Box::new(e)));
1420 break;
1421 }
1422
1423 if response.uuid != *uuid.read().unwrap() {
1424 let e = anyhow::anyhow!(
1425 "uuid mismatch: have {}, want {}",
1426 response.uuid,
1427 uuid_s,
1428 );
1429 send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone()).await;
1430 let _ = is_open_s.try_send(Some(Box::new(e)));
1431 break;
1432 }
1433 match ice_candidate_from_proto(update.candidate) {
1434 Ok(candidate) => {
1435 let client_channel = match client_channel.upgrade() {
1436 Some(cc) => cc,
1437 None => {
1438 break;
1439 }
1440 };
1441 log::debug!("Received remote ICE candidate of {candidate:#?}");
1442 if let Err(e) = client_channel
1443 .base_channel
1444 .peer_connection
1445 .add_ice_candidate(candidate)
1446 .await
1447 {
1448 let e = anyhow::Error::from(e);
1449 send_error_once(sent_done.clone(), &uuid_s, &e, channel2.clone())
1450 .await;
1451 let _ = is_open_s.try_send(Some(Box::new(e)));
1452 break;
1453 }
1454 }
1455 Err(e) => log::error!("Error parsing ice candidate: {e}"),
1456 }
1457 }
1458 None => continue,
1459 }
1460 }
1461 });
1462
1463 let is_open = webrtc_action_with_timeout(is_open_r.recv()).await;
1467 match is_open {
1468 Ok(is_open) => {
1469 if let Some(Some(e)) = is_open {
1470 return Err(anyhow::anyhow!("Couldn't connect to peer with error {e}"));
1471 }
1472 }
1473 Err(_) => {
1474 return Err(anyhow::anyhow!("Timed out opening data channel."));
1475 }
1476 }
1477
1478 exchange_done.store(true, Ordering::Release);
1479 let uuid = uuid_lock.read().unwrap().to_string();
1480 send_done_once(sent_done_or_error, &uuid, channel.clone()).await;
1481 Ok(client_channel)
1482}
1483
1484async fn ice_candidate_to_proto(ice_candidate: RTCIceCandidate) -> Result<IceCandidate> {
1485 let ice_candidate = ice_candidate.to_json()?;
1486 Ok(IceCandidate {
1487 candidate: ice_candidate.candidate,
1488 sdp_mid: ice_candidate.sdp_mid,
1489 sdpm_line_index: ice_candidate.sdp_mline_index.map(u32::from),
1490 username_fragment: ice_candidate.username_fragment,
1491 })
1492}
1493
1494fn ice_candidate_from_proto(proto: Option<IceCandidate>) -> Result<RTCIceCandidateInit> {
1495 match proto {
1496 Some(proto) => {
1497 let proto_sdpm: usize = proto.sdpm_line_index().try_into()?;
1498 let sdp_mline_index: Option<u16> = proto_sdpm.try_into().ok();
1499
1500 Ok(RTCIceCandidateInit {
1501 candidate: proto.candidate.clone(),
1502 sdp_mid: Some(proto.sdp_mid().to_string()),
1503 sdp_mline_index,
1504 username_fragment: Some(proto.username_fragment().to_string()),
1505 })
1506 }
1507 None => Err(anyhow::anyhow!("No ice candidate provided")),
1508 }
1509}
1510
1511fn decode_sdp(sdp: String) -> Result<RTCSessionDescription> {
1512 let sdp = String::from_utf8(base64::decode(sdp)?)?;
1513 Ok(serde_json::from_str::<RTCSessionDescription>(&sdp)?)
1514}
1515
1516fn encode_sdp(sdp: RTCSessionDescription) -> Result<String> {
1517 let sdp = serde_json::to_vec(&sdp)?;
1518 Ok(base64::encode(sdp))
1519}
1520
1521fn infer_remote_uri_from_authority(uri: Uri, override_addr: Option<&str>) -> Uri {
1522 if let Some(addr) = override_addr {
1523 return Uri::from_parts(uri_parts_with_defaults(addr)).unwrap_or_else(|e| {
1524 log::warn!("Failed to parse signaling server override {addr:?}: {e}; falling back to original URI");
1525 uri
1526 });
1527 }
1528 let authority = uri.authority().map(Authority::as_str).unwrap_or_default();
1529 let is_local_connection = authority.contains(".local.viam.cloud")
1530 || authority.contains("localhost")
1531 || authority.contains("0.0.0.0")
1532 || authority.contains("127.0.0.1");
1533
1534 if !is_local_connection {
1535 if let Some((new_uri, _)) = Options::infer_signaling_server_address(&uri) {
1536 return Uri::from_parts(uri_parts_with_defaults(&new_uri)).unwrap_or(uri);
1537 }
1538 }
1539 uri
1540}
1541
1542fn duplicate_uri(parts: &Parts) -> Option<Parts> {
1543 let uri = Uri::builder()
1544 .authority(parts.authority.clone()?)
1545 .path_and_query(parts.path_and_query.clone()?)
1546 .scheme(parts.scheme.clone()?);
1547 Some(uri.build().ok()?.into_parts())
1548}
1549
1550fn uri_parts_with_defaults(uri: &str) -> Parts {
1551 let mut uri_parts = uri.parse::<Uri>().unwrap().into_parts();
1552 uri_parts.scheme = Some(Scheme::HTTPS);
1553 uri_parts.path_and_query = Some(PathAndQuery::from_static(""));
1554 uri_parts
1555}
1556
1557fn metadata_from_parts(parts: &http::request::Parts) -> Metadata {
1558 let mut md = HashMap::new();
1559 for (k, v) in parts.headers.iter() {
1560 let k = k.to_string();
1561 let v = Strings {
1562 values: vec![HeaderValue::to_str(v).unwrap().to_string()],
1563 };
1564 md.insert(k, v);
1565 }
1566 Metadata { md }
1567}