1use std::cmp;
30use std::convert::TryFrom;
31use std::ops::Deref;
32use std::sync::Arc;
33use std::time::Duration;
34
35use bitcoin::{FeeRate, Network};
36use log::warn;
37use tokio::sync::RwLock;
38use tonic::metadata::AsciiMetadataValue;
39use tonic::metadata::errors::InvalidMetadataValue;
40use tonic::service::interceptor::{InterceptedService, Interceptor};
41
42use ark::ArkInfo;
43
44use crate::{
45 mailbox, protos, ArkServiceClient, ConvertError, RequestExt,
46 MAX_PROTOCOL_VERSION, MIN_PROTOCOL_VERSION,
47};
48
49
50#[cfg(all(feature = "tonic-native", feature = "tonic-web"))]
51compile_error!("features `tonic-native` and `tonic-web` are mutually exclusive");
52
53#[cfg(all(feature = "socks5-proxy", not(feature = "tonic-native")))]
54compile_error!("the `socks5-proxy` feature is only usable in conjunction with `tonic-native`");
55
56
57pub const ACCESS_TOKEN_HEADER: &str = "ark-access-token";
59pub const USER_AGENT_HEADER: &str = "x-user-agent";
69pub const NO_TRANSPORT_BACKEND_MESSAGE: &str =
71 "no Ark RPC transport backend compiled in this build; enable `bark-server-rpc/tonic-native` or `bark-server-rpc/tonic-web`";
72
73pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10 * 60);
75
76
77#[cfg(feature = "tonic-native")]
78mod transport {
79 use std::str::FromStr;
80 use std::time::Duration;
81
82 use http::Uri;
83 use log::info;
84 use tonic::transport::{Channel, Endpoint};
85
86 use super::CreateEndpointError;
87
88 pub type Transport = Channel;
89
90 pub async fn connect(address: &str) -> Result<Transport, CreateEndpointError> {
96 Ok(create_endpoint(address)?.connect().await?)
97 }
98
99 #[cfg(feature = "socks5-proxy")]
101 pub async fn connect_with_proxy(
102 address: &str,
103 proxy: &str,
104 ) -> Result<Transport, CreateEndpointError> {
105 use hyper_socks2::SocksConnector;
106 use hyper_util::client::legacy::connect::HttpConnector;
107
108 let endpoint = create_endpoint(address)?;
109 let proxy_uri = proxy.parse::<Uri>().map_err(CreateEndpointError::InvalidProxyUri)?;
110 let connector = {
111 let mut http = HttpConnector::new();
114 http.enforce_http(false);
115 SocksConnector {
116 proxy_addr: proxy_uri,
117 auth: None,
118 connector: http,
119 }
120 };
121 info!("Connecting to Ark server via SOCKS5 proxy {}...", proxy);
122 Ok(endpoint.connect_with_connector(connector).await?)
123 }
124
125 fn create_endpoint(address: &str) -> Result<Endpoint, CreateEndpointError> {
129 let uri = Uri::from_str(address)?;
130
131 let scheme = uri.scheme_str().unwrap_or("");
132 if scheme != "http" && scheme != "https" {
133 return Err(CreateEndpointError::InvalidScheme(scheme.to_string()));
134 }
135
136 #[cfg_attr(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")), allow(unused_mut))]
137 let mut endpoint = Channel::builder(uri.clone())
138 .http2_keep_alive_interval(Duration::from_secs(20))
140 .keep_alive_timeout(Duration::from_secs(60)) .keep_alive_while_idle(true);
143
144 #[cfg(any(feature = "tls-native-roots", feature = "tls-webpki-roots"))]
145 if scheme == "https" {
146 use tonic::transport::ClientTlsConfig;
147
148 info!("Connecting to Ark server at {} using TLS...", address);
149 let uri_auth = uri.clone().into_parts().authority
150 .ok_or(CreateEndpointError::MissingAuthority)?;
151 let domain = uri_auth.host();
152
153 let tls_config = ClientTlsConfig::new()
154 .with_enabled_roots()
155 .domain_name(domain);
156 endpoint = endpoint.tls_config(tls_config).map_err(CreateEndpointError::Transport)?;
157 return Ok(endpoint);
158 }
159 #[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
160 if scheme == "https" {
161 return Err(CreateEndpointError::InvalidScheme(
162 "Missing TLS roots, https is unsupported".to_owned(),
163 ));
164 }
165 info!("Connecting to Ark server at {} without TLS...", address);
166 Ok(endpoint)
167 }
168}
169
170#[cfg(feature = "tonic-web")]
171mod transport {
172 use super::CreateEndpointError;
173 use tonic_web_wasm_client::Client as WasmClient;
174
175 pub type Transport = WasmClient;
176
177 pub async fn connect(address: &str) -> Result<Transport, CreateEndpointError> {
178 Ok(tonic_web_wasm_client::Client::new(address.to_string()))
179 }
180}
181
182#[cfg(not(any(feature = "tonic-native", feature = "tonic-web")))]
186mod transport {
187 use std::convert::Infallible;
188 use std::future::{ready, Ready};
189 use std::task::{Context, Poll};
190
191 use http::{Request, Response};
192 use tonic::Status;
193 use tonic::body::Body;
194 use tonic::codegen::Service;
195
196 use super::NO_TRANSPORT_BACKEND_MESSAGE;
197
198 pub async fn connect(_address: &str) -> Result<Transport, crate::client::CreateEndpointError> {
199 Err(crate::client::CreateEndpointError::NoTransportBackend)
200 }
201
202 #[derive(Debug, Clone, Default)]
203 pub struct Transport;
204
205 impl Service<Request<Body>> for Transport {
206 type Response = Response<Body>;
207 type Error = Infallible;
208 type Future = Ready<Result<Self::Response, Self::Error>>;
209
210 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211 Poll::Ready(Ok(()))
212 }
213
214 fn call(&mut self, _req: Request<Body>) -> Self::Future {
215 let status = Status::failed_precondition(NO_TRANSPORT_BACKEND_MESSAGE);
216 ready(Ok(status.into_http::<Body>()))
217 }
218 }
219}
220
221
222#[derive(Debug, thiserror::Error)]
223#[error("failed to create gRPC endpoint: {msg}")]
224pub enum CreateEndpointError {
225 #[error("{NO_TRANSPORT_BACKEND_MESSAGE}")]
226 NoTransportBackend,
227 #[error("failed to parse Ark server as a URI")]
228 InvalidUri(#[from] http::uri::InvalidUri),
229 #[error("Ark server scheme must be either http or https. Found: {0}")]
230 InvalidScheme(String),
231 #[error("Ark server URI is missing an authority part")]
232 MissingAuthority,
233 #[cfg(feature = "tonic-native")]
234 #[error(transparent)]
235 Transport(#[from] tonic::transport::Error),
236 #[cfg(feature = "socks5-proxy")]
237 #[error("invalid SOCKS5 proxy URI: {0:#}")]
238 InvalidProxyUri(http::uri::InvalidUri),
239}
240
241#[derive(Debug, thiserror::Error)]
242#[error("failed to connect to Ark server: {msg}")]
243pub enum ConnectError {
244 #[error("missing info '{0}' to connect")]
245 MissingInfo(&'static str),
246 #[error("invalid access token: {0}")]
247 InvalidAccessToken(#[source] InvalidMetadataValue),
248 #[error("invalid user agent: {0}")]
249 InvalidUserAgent(#[source] InvalidMetadataValue),
250 #[error(transparent)]
251 CreateEndpoint(#[from] CreateEndpointError),
252 #[error("handshake request failed: {0}")]
253 Handshake(tonic::Status),
254 #[error("version mismatch. Client max is: {client_max}, server min is: {server_min}")]
255 ProtocolVersionMismatchClientTooOld { client_max: u64, server_min: u64 },
256 #[error("version mismatch. Client min is: {client_min}, server max is: {server_max}")]
257 ProtocolVersionMismatchServerTooOld { client_min: u64, server_max: u64 },
258 #[error("error getting ark info: {0}")]
259 GetArkInfo(tonic::Status),
260 #[error("invalid ark info from ark server: {0}")]
261 InvalidArkInfo(#[from] ConvertError),
262 #[error("network mismatch. Expected: {expected}, Got: {got}")]
263 NetworkMismatch { expected: Network, got: Network },
264 #[error("error getting offboard fee rate: {0}")]
265 GetOffboardFeeRate(tonic::Status),
266 #[error("tokio channel error: {0}")]
267 Tokio(#[from] tokio::sync::oneshot::error::RecvError),
268}
269
270#[derive(Clone)]
276#[deprecated(since = "0.1.3", note = "should not be used directly")]
277pub struct ProtocolVersionInterceptor {
278 pver: u64,
279}
280
281#[allow(deprecated)]
282impl tonic::service::Interceptor for ProtocolVersionInterceptor {
283 fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
284 #[allow(deprecated)]
285 req.set_pver(self.pver);
286 Ok(req)
287 }
288}
289
290#[derive(Clone)]
298pub struct ArkServiceInterceptor {
299 pver: Option<u64>,
300 access_token: Option<AsciiMetadataValue>,
301 user_agent: AsciiMetadataValue,
302}
303
304impl tonic::service::Interceptor for ArkServiceInterceptor {
305 fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
306 req.set_default_timeout(DEFAULT_REQUEST_TIMEOUT);
307 if let Some(pver) = self.pver {
308 req.set_pver(pver);
309 }
310 if let Some(ref access_token) = self.access_token {
311 req.metadata_mut().insert(ACCESS_TOKEN_HEADER, access_token.clone());
312 }
313 req.metadata_mut().insert(USER_AGENT_HEADER, self.user_agent.clone());
314 Ok(req)
315 }
316}
317
318pub struct ArkInfoHandle {
322 pub info: ArkInfo,
323 pub waiter: Option<tokio::sync::oneshot::Receiver<Result<ArkInfo, ConnectError>>>,
324}
325
326impl Deref for ArkInfoHandle {
327 type Target = ArkInfo;
328
329 fn deref(&self) -> &Self::Target {
330 &self.info
331 }
332}
333
334pub struct ServerInfo {
335 pub pver: u64,
339 pub info: ArkInfo,
341}
342
343impl ServerInfo {
344 pub fn new(pver: u64, info: ArkInfo) -> Self {
345 Self { pver, info }
346 }
347}
348
349#[derive(Default)]
350pub struct ServerConnectionBuilder {
351 address: Option<String>,
352 network: Option<Network>,
353 #[cfg(feature = "socks5-proxy")]
354 proxy: Option<String>,
355 access_token: Option<String>,
356 user_agent: Option<String>,
357}
358
359impl ServerConnectionBuilder {
360 pub fn address(mut self, address: impl Into<String>) -> Self {
361 self.address = Some(address.into());
362 self
363 }
364
365 pub fn network(mut self, network: Network) -> Self {
366 self.network = Some(network);
367 self
368 }
369
370 #[cfg(feature = "socks5-proxy")]
371 pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
372 self.proxy = Some(proxy.into());
373 self
374 }
375
376 pub fn access_token(mut self, access_token: impl Into<String>) -> Self {
377 self.access_token = Some(access_token.into());
378 self
379 }
380
381 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
387 self.user_agent = Some(user_agent.into());
388 self
389 }
390
391 pub async fn connect(self) -> Result<ServerConnection, ConnectError> {
392 ServerConnection::inner_connect(self).await
393 }
394}
395
396#[derive(Clone)]
403pub struct ServerConnection {
404 info: Arc<RwLock<ServerInfo>>,
405 pub client: ArkServiceClient<InterceptedService<transport::Transport, ArkServiceInterceptor>>,
407 pub mailbox_client: mailbox::MailboxServiceClient<InterceptedService<transport::Transport, ArkServiceInterceptor>>,
409}
410
411impl ServerConnection {
412 fn handshake_req() -> protos::HandshakeRequest {
413 protos::HandshakeRequest {
414 bark_version: Some(env!("CARGO_PKG_VERSION").into()),
415 }
416 }
417
418 pub fn builder() -> ServerConnectionBuilder {
436 ServerConnectionBuilder::default()
437 }
438
439 async fn inner_connect(builder: ServerConnectionBuilder) -> Result<ServerConnection, ConnectError> {
441 let address = builder.address.ok_or(ConnectError::MissingInfo("address"))?;
442 let network = builder.network.ok_or(ConnectError::MissingInfo("network"))?;
443
444 #[cfg(feature = "socks5-proxy")]
445 let transport = if let Some(proxy) = builder.proxy {
446 transport::connect_with_proxy(&address, &proxy).await?
447 } else {
448 transport::connect(&address).await?
449 };
450 #[cfg(not(feature = "socks5-proxy"))]
451 let transport = transport::connect(&address).await?;
452
453 let user_agent = builder.user_agent
454 .unwrap_or_else(|| format!("bark/{}", env!("CARGO_PKG_VERSION")));
455 let user_agent: AsciiMetadataValue = user_agent.try_into()
456 .map_err(ConnectError::InvalidUserAgent)?;
457 let access_token = builder.access_token
458 .map(AsciiMetadataValue::try_from)
459 .transpose()
460 .map_err(ConnectError::InvalidAccessToken)?;
461
462 let mut interceptor = ArkServiceInterceptor {
463 pver: None,
464 access_token,
465 user_agent,
466 };
467
468 let mut handshake_client = ArkServiceClient::with_interceptor(transport.clone(), interceptor.clone());
469 let handshake = handshake_client.handshake(Self::handshake_req()).await
470 .map_err(ConnectError::Handshake)?.into_inner();
471
472 let pver = check_handshake(handshake)?;
473 interceptor.pver = Some(pver);
474
475 let mut client = ArkServiceClient::with_interceptor(transport.clone(), interceptor.clone())
476 .max_decoding_message_size(64 * 1024 * 1024); let info = client.ark_info(network).await?;
479
480 let mailbox_client = mailbox::MailboxServiceClient::with_interceptor(transport, interceptor)
481 .max_decoding_message_size(64 * 1024 * 1024); let info = Arc::new(RwLock::new(ServerInfo::new(pver, info)));
484 Ok(ServerConnection {
485 info,
486 client,
487 mailbox_client,
488 })
489 }
490
491 #[deprecated(since = "0.1.3", note = "use builder() instead")]
492 pub async fn connect(
493 address: &str,
494 network: Network,
495 ) -> Result<ServerConnection, ConnectError> {
496 Self::builder().address(address).network(network).connect().await
497 }
498
499 #[cfg(feature = "socks5-proxy")]
500 #[deprecated(since = "0.1.3", note = "use builder() instead")]
501 pub async fn connect_via_proxy(
502 address: &str,
503 network: Network,
504 proxy: &str,
505 ) -> Result<ServerConnection, ConnectError> {
506 Self::builder().address(address).network(network).proxy(proxy).connect().await
507 }
508
509 pub async fn check_connection(&self) -> Result<(), ConnectError> {
511 let mut client = self.client.clone();
512 let handshake = client.handshake(Self::handshake_req()).await
513 .map_err(ConnectError::Handshake)?.into_inner();
514 check_handshake(handshake)?;
515 Ok(())
516 }
517
518 pub async fn ark_info(&self) -> ArkInfo {
520 self.info.read().await.info.clone()
521 }
522
523 pub async fn offboard_feerate(&self) -> Result<FeeRate, ConnectError> {
525 let resp = self.client.clone()
526 .get_offboard_fee_rate(protos::Empty {}).await
527 .map_err(ConnectError::GetOffboardFeeRate)?
528 .into_inner();
529 Ok(FeeRate::from_sat_per_kwu(resp.sat_vkb / 4))
530 }
531}
532trait ArkServiceClientExt {
533 async fn ark_info(&mut self, network: Network) -> Result<ArkInfo, ConnectError>;
534}
535
536impl<I: Interceptor> ArkServiceClientExt for ArkServiceClient<InterceptedService<transport::Transport, I>> {
537 async fn ark_info(&mut self, network: Network) -> Result<ArkInfo, ConnectError> {
538 let res = self.get_ark_info(protos::Empty {}).await
539 .map_err(ConnectError::GetArkInfo)?;
540 let info = ArkInfo::try_from(res.into_inner())
541 .map_err(ConnectError::InvalidArkInfo)?;
542 if network != info.network {
543 return Err(ConnectError::NetworkMismatch { expected: network, got: info.network });
544 }
545
546 Ok(info)
547 }
548}
549
550fn check_handshake(handshake: protos::HandshakeResponse) -> Result<u64, ConnectError> {
551 if let Some(ref msg) = handshake.psa {
552 warn!("Message from Ark server: \"{}\"", msg);
553 }
554
555 if MAX_PROTOCOL_VERSION < handshake.min_protocol_version {
556 return Err(ConnectError::ProtocolVersionMismatchClientTooOld {
557 client_max: MAX_PROTOCOL_VERSION, server_min: handshake.min_protocol_version
558 });
559 }
560 if MIN_PROTOCOL_VERSION > handshake.max_protocol_version {
561 return Err(ConnectError::ProtocolVersionMismatchServerTooOld {
562 client_min: MIN_PROTOCOL_VERSION, server_max: handshake.max_protocol_version
563 });
564 }
565
566 let pver = cmp::min(MAX_PROTOCOL_VERSION, handshake.max_protocol_version);
567 assert!((MIN_PROTOCOL_VERSION..=MAX_PROTOCOL_VERSION).contains(&pver));
568 assert!((handshake.min_protocol_version..=handshake.max_protocol_version).contains(&pver));
569
570 Ok(pver)
571}
572
573#[cfg(test)]
574mod tests {
575 use super::{CreateEndpointError, NO_TRANSPORT_BACKEND_MESSAGE};
576
577 #[test]
578 fn no_transport_backend_error_mentions_feature_selection() {
579 let err = CreateEndpointError::NoTransportBackend;
580 assert_eq!(err.to_string(), NO_TRANSPORT_BACKEND_MESSAGE);
581 assert!(err.to_string().contains("bark-server-rpc/tonic-native"));
582 assert!(err.to_string().contains("bark-server-rpc/tonic-web"));
583 }
584}