1use arc_swap::ArcSwap;
2use std::error::Error as StdError;
3use std::fmt;
4use std::sync::Arc;
5use std::time::Duration;
6
7use bytes::Bytes;
8use k256::ecdsa::{SigningKey, VerifyingKey};
9use signature::Keypair;
10use tonic::body::Body as TonicBody;
11use tonic::codegen::Service;
12use tonic::metadata::MetadataMap;
13use zeroize::Zeroizing;
14
15use crate::boxed::{BoxedTransport, TransportMetadata, boxed};
16use crate::client::AccountState;
17use crate::grpc::Context;
18use crate::signer::BoxedDocSigner;
19use crate::utils::CondSend;
20use crate::{DocSigner, GrpcClient, GrpcClientBuilderError};
21
22use imp::build_transport;
23
24#[derive(Debug, Clone)]
42pub struct Endpoint {
43 pub url: String,
45 ascii_metadata: Vec<(String, String)>,
47 binary_metadata: Vec<(String, Vec<u8>)>,
49 metadata_map: Option<MetadataMap>,
51 timeout: Option<Duration>,
53}
54
55impl Endpoint {
56 pub fn new(url: impl Into<String>) -> Self {
58 Self {
59 url: url.into(),
60 ascii_metadata: Vec::new(),
61 binary_metadata: Vec::new(),
62 metadata_map: None,
63 timeout: None,
64 }
65 }
66
67 pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
69 self.ascii_metadata.push((key.into(), value.into()));
70 self
71 }
72
73 pub fn metadata_bin(mut self, key: impl Into<String>, value: impl Into<Vec<u8>>) -> Self {
77 self.binary_metadata.push((key.into(), value.into()));
78 self
79 }
80
81 pub fn metadata_map(mut self, metadata: MetadataMap) -> Self {
83 self.metadata_map = Some(metadata);
84 self
85 }
86
87 pub fn timeout(mut self, timeout: Duration) -> Self {
89 self.timeout = Some(timeout);
90 self
91 }
92
93 pub(crate) fn into_parts(self) -> Result<(String, Context), GrpcClientBuilderError> {
94 let mut context = Context {
95 timeout: self.timeout,
96 ..Default::default()
97 };
98 for (key, value) in self.ascii_metadata {
99 context.append_metadata(&key, &value)?;
100 }
101 for (key, value) in self.binary_metadata {
102 context.append_metadata_bin(&key, &value)?;
103 }
104 if let Some(metadata) = self.metadata_map {
105 context.append_metadata_map(&metadata);
106 }
107 Ok((self.url, context))
108 }
109}
110
111impl<S> From<S> for Endpoint
112where
113 S: Into<String>,
114{
115 fn from(url: S) -> Self {
116 Endpoint::new(url)
117 }
118}
119
120enum TransportEntry {
121 Endpoint(Endpoint),
122 BoxedTransport(BoxedTransport),
123}
124
125#[derive(Default)]
129pub struct GrpcClientBuilder {
130 transports: Vec<TransportEntry>,
131 timeout: Option<Duration>,
132 signer_kind: Option<SignerKind>,
133}
134
135enum SignerKind {
136 Signer((VerifyingKey, BoxedDocSigner)),
137 PrivKeyBytes(Zeroizing<Vec<u8>>),
138 PrivKeyHex(Zeroizing<String>),
139}
140
141impl GrpcClientBuilder {
142 pub fn new() -> Self {
144 GrpcClientBuilder::default()
145 }
146
147 pub fn url(self, endpoint: impl Into<Endpoint>) -> Self {
165 self.endpoint(endpoint)
166 }
167
168 pub fn endpoint(mut self, endpoint: impl Into<Endpoint>) -> Self {
170 self.transports
171 .push(TransportEntry::Endpoint(endpoint.into()));
172 self
173 }
174
175 pub fn endpoints<I, E>(mut self, endpoints: I) -> Self
190 where
191 I: IntoIterator<Item = E>,
192 E: Into<Endpoint>,
193 {
194 for endpoint in endpoints {
195 self = self.endpoint(endpoint);
196 }
197 self
198 }
199
200 pub fn urls<I, E>(self, urls: I) -> Self
202 where
203 I: IntoIterator<Item = E>,
204 E: Into<Endpoint>,
205 {
206 self.endpoints(urls)
207 }
208
209 pub fn transport<B, T>(mut self, transport: T) -> Self
214 where
215 B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
216 <B as http_body::Body>::Error: StdError + Send + Sync,
217 T: Service<http::Request<TonicBody>, Response = http::Response<B>>
218 + Send
219 + Sync
220 + Clone
221 + 'static,
222 <T as Service<http::Request<TonicBody>>>::Error: StdError + Send + Sync + 'static,
223 <T as Service<http::Request<TonicBody>>>::Future: CondSend + 'static,
224 {
225 self.transports.push(TransportEntry::BoxedTransport(boxed(
226 transport,
227 TransportMetadata::default(),
228 )));
229 self
230 }
231
232 pub fn pubkey_and_signer<S>(
234 mut self,
235 account_pubkey: VerifyingKey,
236 signer: S,
237 ) -> GrpcClientBuilder
238 where
239 S: DocSigner + 'static,
240 {
241 let signer = BoxedDocSigner::new(signer);
242 self.signer_kind = Some(SignerKind::Signer((account_pubkey, signer)));
243 self
244 }
245
246 pub fn signer_keypair<S>(self, signer: S) -> GrpcClientBuilder
248 where
249 S: DocSigner + Keypair<VerifyingKey = VerifyingKey> + 'static,
250 {
251 let pubkey = signer.verifying_key();
252 self.pubkey_and_signer(pubkey, signer)
253 }
254
255 pub fn private_key(mut self, bytes: &[u8]) -> GrpcClientBuilder {
257 self.signer_kind = Some(SignerKind::PrivKeyBytes(Zeroizing::new(bytes.to_vec())));
258 self
259 }
260
261 pub fn private_key_hex(mut self, s: &str) -> GrpcClientBuilder {
263 self.signer_kind = Some(SignerKind::PrivKeyHex(Zeroizing::new(s.to_string())));
264 self
265 }
266
267 pub fn timeout(mut self, timeout: Duration) -> GrpcClientBuilder {
269 self.timeout = Some(timeout);
270 self
271 }
272
273 pub fn build(self) -> Result<GrpcClient, GrpcClientBuilderError> {
277 if self.transports.is_empty() {
278 return Err(GrpcClientBuilderError::TransportNotSet);
279 }
280
281 let base_context = Context {
282 timeout: self.timeout,
283 ..Default::default()
284 };
285
286 let transports: Vec<BoxedTransport> = self
287 .transports
288 .into_iter()
289 .map(|entry| match entry {
290 TransportEntry::Endpoint(endpoint) => {
291 let (url, endpoint_context) = endpoint.into_parts()?;
292 let mut context = base_context.clone();
293 context.extend(&endpoint_context);
294 build_transport(url, context)
295 }
296 TransportEntry::BoxedTransport(mut transport) => {
297 let mut context = base_context.clone();
298 context.extend(&transport.metadata.context);
299 transport.metadata = Arc::new(TransportMetadata {
300 url: transport.metadata.url.clone(),
301 context,
302 });
303 Ok(transport)
304 }
305 })
306 .collect::<Result<Vec<_>, _>>()?;
307
308 let transports = Arc::new(ArcSwap::from_pointee(transports));
309
310 let signer_config = self.signer_kind.map(TryInto::try_into).transpose()?;
311
312 Ok(GrpcClient::new(transports, signer_config))
313 }
314}
315
316impl TryFrom<SignerKind> for AccountState {
317 type Error = GrpcClientBuilderError;
318
319 fn try_from(value: SignerKind) -> Result<Self, Self::Error> {
320 match value {
321 SignerKind::Signer((pubkey, signer)) => Ok(AccountState::new(pubkey, signer)),
322 SignerKind::PrivKeyBytes(bytes) => priv_key_signer(&bytes),
323 SignerKind::PrivKeyHex(string) => {
324 let bytes = Zeroizing::new(
325 hex::decode(string.trim())
326 .map_err(|_| GrpcClientBuilderError::InvalidPrivateKey)?,
327 );
328 priv_key_signer(&bytes)
329 }
330 }
331 }
332}
333
334fn priv_key_signer(bytes: &[u8]) -> Result<AccountState, GrpcClientBuilderError> {
335 let signing_key =
336 SigningKey::from_slice(bytes).map_err(|_| GrpcClientBuilderError::InvalidPrivateKey)?;
337 let pubkey = signing_key.verifying_key().to_owned();
338 let signer = BoxedDocSigner::new(signing_key);
339 Ok(AccountState::new(pubkey, signer))
340}
341
342impl fmt::Debug for SignerKind {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 let s = match self {
345 SignerKind::Signer(..) => "SignerKind::Signer(..)",
346 SignerKind::PrivKeyBytes(..) => "SignerKind::PrivKeyBytes(..)",
347 SignerKind::PrivKeyHex(..) => "SignerKind::PrivKeyHex(..)",
348 };
349 f.write_str(s)
350 }
351}
352
353impl fmt::Debug for GrpcClientBuilder {
354 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355 f.write_str("GrpcClientBuilder { .. }")
356 }
357}
358
359#[cfg(not(target_arch = "wasm32"))]
360#[cfg(any(feature = "tls-native-roots", feature = "tls-webpki-roots"))]
361mod imp {
362 use super::*;
363
364 use tonic::transport::{ClientTlsConfig, Endpoint as TonicEndpoint};
365
366 pub(super) fn build_transport(
367 url: String,
368 context: Context,
369 ) -> Result<BoxedTransport, GrpcClientBuilderError> {
370 let tls_config = ClientTlsConfig::new().with_enabled_roots();
371
372 let channel = TonicEndpoint::from_shared(url.clone())?
373 .user_agent("celestia-grpc")?
374 .tls_config(tls_config)?
375 .connect_lazy();
376
377 Ok(boxed(
378 channel,
379 TransportMetadata::with_url_and_context(url, context),
380 ))
381 }
382}
383
384#[cfg(not(target_arch = "wasm32"))]
385#[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
386mod imp {
387 use super::*;
388
389 use tonic::transport::Endpoint as TonicEndpoint;
390
391 pub(super) fn build_transport(
392 url: String,
393 context: Context,
394 ) -> Result<BoxedTransport, GrpcClientBuilderError> {
395 if url
396 .split_once(':')
397 .is_some_and(|(scheme, _)| scheme == "https")
398 {
399 return Err(GrpcClientBuilderError::TlsNotSupported);
400 }
401
402 let channel = TonicEndpoint::from_shared(url.clone())?
403 .user_agent("celestia-grpc")?
404 .connect_lazy();
405
406 Ok(boxed(
407 channel,
408 TransportMetadata::with_url_and_context(url, context),
409 ))
410 }
411}
412
413#[cfg(target_arch = "wasm32")]
414mod imp {
415 use super::*;
416 pub(super) fn build_transport(
417 url: String,
418 context: Context,
419 ) -> Result<BoxedTransport, GrpcClientBuilderError> {
420 let client = tonic_web_wasm_client::Client::new(url.clone());
421 Ok(boxed(
422 client,
423 TransportMetadata::with_url_and_context(url, context),
424 ))
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use lumina_utils::test_utils::async_test;
432
433 #[test]
434 fn empty_builder_returns_transport_not_set() {
435 let result = GrpcClientBuilder::new().build();
436
437 assert!(matches!(
438 result,
439 Err(GrpcClientBuilderError::TransportNotSet)
440 ));
441 }
442
443 #[async_test]
444 async fn single_url_builds_successfully() {
445 let result = GrpcClientBuilder::new()
446 .url("http://localhost:9090")
447 .build();
448
449 assert!(result.is_ok());
450 }
451
452 #[async_test]
453 async fn multiple_urls_build_successfully() {
454 let result = GrpcClientBuilder::new()
455 .url("http://localhost:9090")
456 .url("http://localhost:9091")
457 .url("http://localhost:9092")
458 .build();
459
460 assert!(result.is_ok());
461 }
462
463 #[async_test]
464 async fn endpoint_with_config_builds_successfully() {
465 let result = GrpcClientBuilder::new()
466 .endpoint(
467 Endpoint::from("http://localhost:9090")
468 .metadata("authorization", "Bearer token")
469 .timeout(Duration::from_secs(30)),
470 )
471 .build();
472
473 assert!(result.is_ok());
474 }
475
476 #[async_test]
477 async fn urls_helper_builds_successfully() {
478 let result = GrpcClientBuilder::new()
479 .urls(["http://localhost:9090", "http://localhost:9091"])
480 .build();
481
482 assert!(result.is_ok());
483 }
484
485 #[async_test]
486 async fn endpoints_helper_builds_successfully() {
487 let result = GrpcClientBuilder::new()
488 .endpoints([
489 Endpoint::from("http://localhost:9090"),
490 Endpoint::from("http://localhost:9091"),
491 ])
492 .build();
493
494 assert!(result.is_ok());
495 }
496
497 #[test]
498 fn endpoint_from_str_uses_default_config() {
499 let endpoint: Endpoint = "http://localhost:9090".into();
500
501 assert_eq!(endpoint.url, "http://localhost:9090");
502 assert!(endpoint.ascii_metadata.is_empty());
503 assert!(endpoint.binary_metadata.is_empty());
504 assert!(endpoint.metadata_map.is_none());
505 assert!(endpoint.timeout.is_none());
506 }
507
508 #[test]
509 fn endpoint_from_string_uses_default_config() {
510 let endpoint: Endpoint = String::from("http://localhost:9090").into();
511
512 assert_eq!(endpoint.url, "http://localhost:9090");
513 assert!(endpoint.ascii_metadata.is_empty());
514 assert!(endpoint.binary_metadata.is_empty());
515 assert!(endpoint.metadata_map.is_none());
516 assert!(endpoint.timeout.is_none());
517 }
518}