bark-server-rpc 0.2.1

gRPC code for bark-server API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
//! Client-side Ark server connector.
//!
//! This module provides a managed, version-aware gRPC connection between a
//! Bark client and a paired Ark server. Its responsibilities are:
//! - Negotiating and enforcing a compatible wire protocol version via a
//!   handshake.
//! - Establishing a gRPC channel (optionally with TLS) with sensible timeouts
//!   and keepalives.
//! - Injecting the negotiated protocol version into every RPC call so the
//!   server can route/validate requests correctly.
//! - Fetching and exposing the server's runtime configuration ([ArkInfo]) so
//!   the client can adapt its behavior (e.g., network, round cadence, limits).
//!
//! Overview
//! - Version negotiation: The client first calls the server's handshake RPC,
//!   which returns the supported protocol version range. The client checks its
//!   own supported range ([MIN_PROTOCOL_VERSION]..=[MAX_PROTOCOL_VERSION]) and
//!   picks the highest mutually supported version.
//! - Metadata propagation: After negotiation, all subsequent RPCs carry the
//!   selected protocol version in the request metadata using a gRPC
//!   interceptor.
//! - TLS: If the server URI is HTTPS, a TLS configuration with the configured
//!   crate roots is set up; otherwise the connection proceeds in cleartext.
//! - Server info: Once connected, the client retrieves [ArkInfo] to validate
//!   that the selected Bitcoin [Network] matches the wallet and to learn
//!   server-side parameters that drive client behavior.
//!

use std::cmp;
use std::convert::TryFrom;
use std::ops::Deref;
use std::sync::Arc;

use bitcoin::{FeeRate, Network};
use log::warn;
use tokio::sync::RwLock;
use tonic::metadata::AsciiMetadataValue;
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::service::interceptor::{InterceptedService, Interceptor};

use ark::ArkInfo;

use crate::{mailbox, protos, ArkServiceClient, ConvertError, RequestExt};


#[cfg(all(feature = "tonic-native", feature = "tonic-web"))]
compile_error!("features `tonic-native` and `tonic-web` are mutually exclusive");

#[cfg(all(feature = "socks5-proxy", not(feature = "tonic-native")))]
compile_error!("the `socks5-proxy` feature is only usable in conjunction with `tonic-native`");


/// The HTTP header used for private server access tokens
pub const ACCESS_TOKEN_HEADER: &str = "ark-access-token";
/// Error text used when no Ark RPC transport backend was compiled into the binary.
pub const NO_TRANSPORT_BACKEND_MESSAGE: &str =
	"no Ark RPC transport backend compiled in this build; enable `bark-server-rpc/tonic-native` or `bark-server-rpc/tonic-web`";


#[cfg(feature = "tonic-native")]
mod transport {
	use std::str::FromStr;
	use std::time::Duration;

	use http::Uri;
	use log::info;
	use tonic::transport::{Channel, Endpoint};

	use super::CreateEndpointError;

	pub type Transport = Channel;

	/// Build a tonic endpoint from a server address, configuring timeouts and TLS if required.
	///
	/// - Supports `http` and `https` URIs. Any other scheme results in an error.
	/// - Uses a 10-minute keep-alive and overall request timeout to accommodate long-running RPCs.
	/// - When `https` is used, the crate-configured root CAs are enabled and the SNI domain is set.
	pub async fn connect(address: &str) -> Result<Transport, CreateEndpointError> {
		Ok(create_endpoint(address)?.connect().await?)
	}

	/// Similar to [connect] but the HTTP/HTTPS connection is wrapped with a SOCKS5 proxy.
	#[cfg(feature = "socks5-proxy")]
	pub async fn connect_with_proxy(
		address: &str,
		proxy: &str,
	) -> Result<Transport, CreateEndpointError> {
		use hyper_socks2::SocksConnector;
		use hyper_util::client::legacy::connect::HttpConnector;

		let endpoint = create_endpoint(address)?;
		let proxy_uri = proxy.parse::<Uri>().map_err(CreateEndpointError::InvalidProxyUri)?;
		let connector = {
			// TLS is handled by tonic's `tls_config()` on the endpoint, so this connector only
			// needs to establish the SOCKS5 tunnel.
			let mut http = HttpConnector::new();
			http.enforce_http(false);
			SocksConnector {
				proxy_addr: proxy_uri,
				auth: None,
				connector: http,
			}
		};
		info!("Connecting to Ark server via SOCKS5 proxy {}...", proxy);
		Ok(endpoint.connect_with_connector(connector).await?)
	}

	/// Creates an endpoint for the given server address which the application can use to create a
	/// connection. Any required TLS configuration will be added so both HTTP and HTTPS are
	/// supported.
	fn create_endpoint(address: &str) -> Result<Endpoint, CreateEndpointError> {
		let uri = Uri::from_str(address)?;

		let scheme = uri.scheme_str().unwrap_or("");
		if scheme != "http" && scheme != "https" {
			return Err(CreateEndpointError::InvalidScheme(scheme.to_string()));
		}

		#[cfg_attr(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")), allow(unused_mut))]
		let mut endpoint = Channel::builder(uri.clone())
			.http2_keep_alive_interval(Duration::from_secs(20))
			.keep_alive_timeout(Duration::from_secs(600))
			.keep_alive_while_idle(true)
			.timeout(Duration::from_secs(600));

		#[cfg(any(feature = "tls-native-roots", feature = "tls-webpki-roots"))]
		if scheme == "https" {
			use tonic::transport::ClientTlsConfig;

			info!("Connecting to Ark server at {} using TLS...", address);
			let uri_auth = uri.clone().into_parts().authority
				.ok_or(CreateEndpointError::MissingAuthority)?;
			let domain = uri_auth.host();

			let tls_config = ClientTlsConfig::new()
					.with_enabled_roots()
					.domain_name(domain);
			endpoint = endpoint.tls_config(tls_config).map_err(CreateEndpointError::Transport)?;
			return Ok(endpoint);
		}
		#[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
		if scheme == "https" {
			return Err(CreateEndpointError::InvalidScheme(
				"Missing TLS roots, https is unsupported".to_owned(),
			));
		}
		info!("Connecting to Ark server at {} without TLS...", address);
		Ok(endpoint)
	}
}

#[cfg(feature = "tonic-web")]
mod transport {
	use super::CreateEndpointError;
	use tonic_web_wasm_client::Client as WasmClient;

	pub type Transport = WasmClient;

	pub async fn connect(address: &str) -> Result<Transport, CreateEndpointError> {
		Ok(tonic_web_wasm_client::Client::new(address.to_string()))
	}
}

/// Dummy transport used so the generated tonic clients still have a concrete transport type in
/// transportless builds. `connect()` rejects these builds before any RPC is attempted, but
/// if a client somehow does call into this transport we still return a clean gRPC error.
#[cfg(not(any(feature = "tonic-native", feature = "tonic-web")))]
mod transport {
	use std::convert::Infallible;
	use std::future::{ready, Ready};
	use std::task::{Context, Poll};

	use http::{Request, Response};
	use tonic::Status;
	use tonic::body::Body;
	use tonic::codegen::Service;

	use super::NO_TRANSPORT_BACKEND_MESSAGE;

	pub async fn connect(_address: &str) -> Result<Transport, crate::client::CreateEndpointError> {
		Err(crate::client::CreateEndpointError::NoTransportBackend)
	}

	#[derive(Debug, Clone, Default)]
	pub struct Transport;

	impl Service<Request<Body>> for Transport {
		type Response = Response<Body>;
		type Error = Infallible;
		type Future = Ready<Result<Self::Response, Self::Error>>;

		fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
			Poll::Ready(Ok(()))
		}

		fn call(&mut self, _req: Request<Body>) -> Self::Future {
			let status = Status::failed_precondition(NO_TRANSPORT_BACKEND_MESSAGE);
			ready(Ok(status.into_http::<Body>()))
		}
	}
}

/// The minimum protocol version supported by the client.
///
/// For info on protocol versions, see [server_rpc](crate) module documentation.
pub const MIN_PROTOCOL_VERSION: u64 = 1;

/// The maximum protocol version supported by the client.
///
/// For info on protocol versions, see [server_rpc](crate) module documentation.
pub const MAX_PROTOCOL_VERSION: u64 = 1;


#[derive(Debug, thiserror::Error)]
#[error("failed to create gRPC endpoint: {msg}")]
pub enum CreateEndpointError {
	#[error("{NO_TRANSPORT_BACKEND_MESSAGE}")]
	NoTransportBackend,
	#[error("failed to parse Ark server as a URI")]
	InvalidUri(#[from] http::uri::InvalidUri),
	#[error("Ark server scheme must be either http or https. Found: {0}")]
	InvalidScheme(String),
	#[error("Ark server URI is missing an authority part")]
	MissingAuthority,
	#[cfg(feature = "tonic-native")]
	#[error(transparent)]
	Transport(#[from] tonic::transport::Error),
	#[cfg(feature = "socks5-proxy")]
	#[error("invalid SOCKS5 proxy URI: {0:#}")]
	InvalidProxyUri(http::uri::InvalidUri),
}

#[derive(Debug, thiserror::Error)]
#[error("failed to connect to Ark server: {msg}")]
pub enum ConnectError {
	#[error("missing info '{0}' to connect")]
	MissingInfo(&'static str),
	#[error("invalid access token: {0}")]
	InvalidAccessToken(#[from] #[source] InvalidMetadataValue),
	#[error(transparent)]
	CreateEndpoint(#[from] CreateEndpointError),
	#[error("handshake request failed: {0}")]
	Handshake(tonic::Status),
	#[error("version mismatch. Client max is: {client_max}, server min is: {server_min}")]
	ProtocolVersionMismatchClientTooOld { client_max: u64, server_min: u64 },
	#[error("version mismatch. Client min is: {client_min}, server max is: {server_max}")]
	ProtocolVersionMismatchServerTooOld { client_min: u64, server_max: u64 },
	#[error("error getting ark info: {0}")]
	GetArkInfo(tonic::Status),
	#[error("invalid ark info from ark server: {0}")]
	InvalidArkInfo(#[from] ConvertError),
	#[error("network mismatch. Expected: {expected}, Got: {got}")]
	NetworkMismatch { expected: Network, got: Network },
	#[error("error getting offboard fee rate: {0}")]
	GetOffboardFeeRate(tonic::Status),
	#[error("tokio channel error: {0}")]
	Tokio(#[from] tokio::sync::oneshot::error::RecvError),
}

/// A gRPC interceptor that attaches the negotiated protocol version to each request.
///
/// After the handshake determines the mutually supported protocol version, this
/// interceptor injects it into the outgoing request metadata so the server can
/// process calls according to the agreed wire format and semantics.
#[derive(Clone)]
#[deprecated(since = "0.1.3", note = "should not be used directly")]
pub struct ProtocolVersionInterceptor {
	pver: u64,
}

#[allow(deprecated)]
impl tonic::service::Interceptor for ProtocolVersionInterceptor {
	fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
		#[allow(deprecated)]
		req.set_pver(self.pver);
		Ok(req)
	}
}

/// A gRPC interceptor that attaches ark-specific headers to each request
///
/// - pver: the negotiated protocol version
/// - access_token: the access token to use for private servers
#[derive(Clone)]
pub struct ArkServiceInterceptor {
	pver: Option<u64>,
	access_token: Option<AsciiMetadataValue>,
}

impl tonic::service::Interceptor for ArkServiceInterceptor {
	fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
		if let Some(pver) = self.pver {
			req.set_pver(pver);
		}
		if let Some(ref access_token) = self.access_token {
			req.metadata_mut().insert(ACCESS_TOKEN_HEADER, access_token.clone());
		}
		Ok(req)
	}
}

/// A handle to the Ark info.
///
/// This handle is used to wait for the Ark info to be updated, if needed.
pub struct ArkInfoHandle {
	pub info: ArkInfo,
	pub waiter: Option<tokio::sync::oneshot::Receiver<Result<ArkInfo, ConnectError>>>,
}

impl Deref for ArkInfoHandle {
	type Target = ArkInfo;

	fn deref(&self) -> &Self::Target {
		&self.info
	}
}

pub struct ServerInfo {
	/// Protocol version used for rpc protocol.
	///
	/// For info on protocol versions, see [server_rpc](crate) module documentation.
	pub pver: u64,
	/// Server-side configuration and network parameters returned after connection.
	pub info: ArkInfo,
}

impl ServerInfo {
	pub fn new(pver: u64, info: ArkInfo) -> Self {
		Self { pver, info }
	}
}

#[derive(Default)]
pub struct ServerConnectionBuilder {
	address: Option<String>,
	network: Option<Network>,
	#[cfg(feature = "socks5-proxy")]
	proxy: Option<String>,
	access_token: Option<String>,
}

impl ServerConnectionBuilder {
	pub fn address(mut self, address: impl Into<String>) -> Self {
		self.address = Some(address.into());
		self
	}

	pub fn network(mut self, network: Network) -> Self {
		self.network = Some(network);
		self
	}

	#[cfg(feature = "socks5-proxy")]
	pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
		self.proxy = Some(proxy.into());
		self
	}

	pub fn access_token(mut self, access_token: impl Into<String>) -> Self {
		self.access_token = Some(access_token.into());
		self
	}

	pub async fn connect(self) -> Result<ServerConnection, ConnectError> {
		ServerConnection::inner_connect(self).await
	}
}

/// A managed connection to the Ark server.
///
/// This type encapsulates:
/// - `pver`: The negotiated protocol version for the current session.
/// - `info`: The server's [ArkInfo] configuration snapshot retrieved at connection time.
/// - `client`: A ready-to-use gRPC client bound to the same channel used for the handshake.
#[derive(Clone)]
pub struct ServerConnection {
	info: Arc<RwLock<ServerInfo>>,
	/// The gRPC client to call Ark RPCs.
	pub client: ArkServiceClient<InterceptedService<transport::Transport, ArkServiceInterceptor>>,
	/// The mailbox gRPC client to call mailbox RPCs.
	pub mailbox_client: mailbox::MailboxServiceClient<InterceptedService<transport::Transport, ArkServiceInterceptor>>,
}

impl ServerConnection {
	fn handshake_req() -> protos::HandshakeRequest {
		protos::HandshakeRequest {
			bark_version: Some(env!("CARGO_PKG_VERSION").into()),
		}
	}

	/// Establish a connection to an Ark server and perform protocol negotiation.
	///
	/// Steps performed:
	/// 1. Build and connect a gRPC channel to `address` (with TLS for https).
	/// 2. Perform the handshake RPC, sending the Bark client version.
	/// 3. Validate the server's supported protocol range against
	///    [MIN_PROTOCOL_VERSION]..=[MAX_PROTOCOL_VERSION] and select a version.
	/// 4. Create a client with a protocol-version interceptor to tag future calls.
	/// 5. Fetch [ArkInfo] and verify it matches the provided Bitcoin [Network].
	///
	/// Returns a [ServerConnection] with:
	/// - the negotiated protocol version,
	/// - the server's configuration snapshot,
	/// - and a gRPC client bound to the established channel.
	///
	/// Errors if the server cannot be reached, handshake fails, protocol versions
	/// are incompatible, or the server's network does not match `network`.
	pub fn builder() -> ServerConnectionBuilder {
		ServerConnectionBuilder::default()
	}

	//TODO(stevenroose) can rename to connect once original removed
	async fn inner_connect(builder: ServerConnectionBuilder) -> Result<ServerConnection, ConnectError> {
		let address = builder.address.ok_or(ConnectError::MissingInfo("address"))?;
		let network = builder.network.ok_or(ConnectError::MissingInfo("network"))?;

		#[cfg(feature = "socks5-proxy")]
		let transport = if let Some(proxy) = builder.proxy {
			transport::connect_with_proxy(&address, &proxy).await?
		} else {
			transport::connect(&address).await?
		};
		#[cfg(not(feature = "socks5-proxy"))]
		let transport = transport::connect(&address).await?;

		let mut interceptor = ArkServiceInterceptor {
			pver: None,
			access_token: builder.access_token.map(|t| t.try_into()).transpose()?,
		};

		let mut handshake_client = ArkServiceClient::with_interceptor(transport.clone(), interceptor.clone());
		let handshake = handshake_client.handshake(Self::handshake_req()).await
			.map_err(ConnectError::Handshake)?.into_inner();

		let pver = check_handshake(handshake)?;
		interceptor.pver = Some(pver);

		let mut client = ArkServiceClient::with_interceptor(transport.clone(), interceptor.clone())
			.max_decoding_message_size(64 * 1024 * 1024); // 64MB limit

		let info = client.ark_info(network).await?;

		let mailbox_client = mailbox::MailboxServiceClient::with_interceptor(transport, interceptor)
			.max_decoding_message_size(64 * 1024 * 1024); // 64MB limit

		let info = Arc::new(RwLock::new(ServerInfo::new(pver, info)));
		Ok(ServerConnection {
			info,
			client,
			mailbox_client,
		})
	}

	#[deprecated(since = "0.1.3", note = "use builder() instead")]
	pub async fn connect(
		address: &str,
		network: Network,
	) -> Result<ServerConnection, ConnectError> {
		Self::builder().address(address).network(network).connect().await
	}

	#[cfg(feature = "socks5-proxy")]
	#[deprecated(since = "0.1.3", note = "use builder() instead")]
	pub async fn connect_via_proxy(
		address: &str,
		network: Network,
		proxy: &str,
	) -> Result<ServerConnection, ConnectError> {
		Self::builder().address(address).network(network).proxy(proxy).connect().await
	}

	/// Checks the connection to the Ark server by performing an handshake request.
	pub async fn check_connection(&self) -> Result<(), ConnectError> {
		let mut client = self.client.clone();
		let handshake = client.handshake(Self::handshake_req()).await
			.map_err(ConnectError::Handshake)?.into_inner();
		check_handshake(handshake)?;
		Ok(())
	}

	/// Returns the cached [ArkInfo].
	pub async fn ark_info(&self) -> ArkInfo {
		self.info.read().await.info.clone()
	}

	/// Fetches the current offboard fee rate from the server.
	pub async fn offboard_feerate(&self) -> Result<FeeRate, ConnectError> {
		let resp = self.client.clone()
			.get_offboard_fee_rate(protos::Empty {}).await
			.map_err(ConnectError::GetOffboardFeeRate)?
			.into_inner();
		Ok(FeeRate::from_sat_per_kwu(resp.sat_vkb / 4))
	}
}
trait ArkServiceClientExt {
	async fn ark_info(&mut self, network: Network) -> Result<ArkInfo, ConnectError>;
}

impl<I: Interceptor> ArkServiceClientExt for ArkServiceClient<InterceptedService<transport::Transport, I>> {
	async fn ark_info(&mut self, network: Network) -> Result<ArkInfo, ConnectError> {
		let res = self.get_ark_info(protos::Empty {}).await
			.map_err(ConnectError::GetArkInfo)?;
		let info = ArkInfo::try_from(res.into_inner())
			.map_err(ConnectError::InvalidArkInfo)?;
		if network != info.network {
			return Err(ConnectError::NetworkMismatch { expected: network, got: info.network });
		}

		Ok(info)
	}
}

fn check_handshake(handshake: protos::HandshakeResponse) -> Result<u64, ConnectError> {
	if let Some(ref msg) = handshake.psa {
		warn!("Message from Ark server: \"{}\"", msg);
	}

	if MAX_PROTOCOL_VERSION < handshake.min_protocol_version {
		return Err(ConnectError::ProtocolVersionMismatchClientTooOld {
			client_max: MAX_PROTOCOL_VERSION, server_min: handshake.min_protocol_version
		});
	}
	if MIN_PROTOCOL_VERSION > handshake.max_protocol_version {
		return Err(ConnectError::ProtocolVersionMismatchServerTooOld {
			client_min: MIN_PROTOCOL_VERSION, server_max: handshake.max_protocol_version
		});
	}

	let pver = cmp::min(MAX_PROTOCOL_VERSION, handshake.max_protocol_version);
	assert!((MIN_PROTOCOL_VERSION..=MAX_PROTOCOL_VERSION).contains(&pver));
	assert!((handshake.min_protocol_version..=handshake.max_protocol_version).contains(&pver));

	Ok(pver)
}

#[cfg(test)]
mod tests {
	use super::{CreateEndpointError, NO_TRANSPORT_BACKEND_MESSAGE};

	#[test]
	fn no_transport_backend_error_mentions_feature_selection() {
		let err = CreateEndpointError::NoTransportBackend;
		assert_eq!(err.to_string(), NO_TRANSPORT_BACKEND_MESSAGE);
		assert!(err.to_string().contains("bark-server-rpc/tonic-native"));
		assert!(err.to_string().contains("bark-server-rpc/tonic-web"));
	}
}