Skip to main content

celestia_grpc/
builder.rs

1use arc_swap::ArcSwap;
2use std::error::Error as StdError;
3use std::fmt;
4use std::sync::Arc;
5use std::time::Duration;
6
7use bytes::Bytes;
8use k256::ecdsa::{SigningKey, VerifyingKey};
9use signature::Keypair;
10use tonic::body::Body as TonicBody;
11use tonic::codegen::Service;
12use tonic::metadata::MetadataMap;
13use zeroize::Zeroizing;
14
15use crate::boxed::{BoxedTransport, TransportMetadata, boxed};
16use crate::client::AccountState;
17use crate::grpc::Context;
18use crate::signer::BoxedDocSigner;
19use crate::utils::CondSend;
20use crate::{DocSigner, GrpcClient, GrpcClientBuilderError};
21
22use imp::build_transport;
23
24/// A URL endpoint with per-endpoint configuration.
25///
26/// This includes HTTP/2 headers (metadata) and timeout that will be applied
27/// to all requests made to this endpoint.
28///
29/// # Example
30///
31/// ```no_run
32/// use std::time::Duration;
33/// use celestia_grpc::{Endpoint, GrpcClient};
34///
35/// let client = GrpcClient::builder()
36///     .url(Endpoint::from("http://localhost:9090")
37///         .metadata("authorization", "Bearer token")
38///         .timeout(Duration::from_secs(30)))
39///     .build();
40/// ```
41#[derive(Debug, Clone)]
42pub struct Endpoint {
43    /// The endpoint URL.
44    pub url: String,
45    /// ASCII metadata (HTTP/2 headers) as key-value pairs.
46    ascii_metadata: Vec<(String, String)>,
47    /// Binary metadata as key-value pairs (keys must have `-bin` suffix).
48    binary_metadata: Vec<(String, Vec<u8>)>,
49    /// Pre-built metadata map.
50    metadata_map: Option<MetadataMap>,
51    /// Request timeout for this endpoint.
52    timeout: Option<Duration>,
53}
54
55impl Endpoint {
56    /// Create a new endpoint with a default configuration.
57    pub fn new(url: impl Into<String>) -> Self {
58        Self {
59            url: url.into(),
60            ascii_metadata: Vec::new(),
61            binary_metadata: Vec::new(),
62            metadata_map: None,
63            timeout: None,
64        }
65    }
66
67    /// Appends ASCII metadata (HTTP/2 header) to requests made to this endpoint.
68    pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
69        self.ascii_metadata.push((key.into(), value.into()));
70        self
71    }
72
73    /// Appends binary metadata to requests made to this endpoint.
74    ///
75    /// Keys must have `-bin` suffix. Values are base64-encoded on the wire.
76    pub fn metadata_bin(mut self, key: impl Into<String>, value: impl Into<Vec<u8>>) -> Self {
77        self.binary_metadata.push((key.into(), value.into()));
78        self
79    }
80
81    /// Sets a metadata map for this endpoint.
82    pub fn metadata_map(mut self, metadata: MetadataMap) -> Self {
83        self.metadata_map = Some(metadata);
84        self
85    }
86
87    /// Sets the request timeout for this endpoint.
88    pub fn timeout(mut self, timeout: Duration) -> Self {
89        self.timeout = Some(timeout);
90        self
91    }
92
93    pub(crate) fn into_parts(self) -> Result<(String, Context), GrpcClientBuilderError> {
94        let mut context = Context {
95            timeout: self.timeout,
96            ..Default::default()
97        };
98        for (key, value) in self.ascii_metadata {
99            context.append_metadata(&key, &value)?;
100        }
101        for (key, value) in self.binary_metadata {
102            context.append_metadata_bin(&key, &value)?;
103        }
104        if let Some(metadata) = self.metadata_map {
105            context.append_metadata_map(&metadata);
106        }
107        Ok((self.url, context))
108    }
109}
110
111impl<S> From<S> for Endpoint
112where
113    S: Into<String>,
114{
115    fn from(url: S) -> Self {
116        Endpoint::new(url)
117    }
118}
119
120enum TransportEntry {
121    Endpoint(Endpoint),
122    BoxedTransport(BoxedTransport),
123}
124
125/// Builder for [`GrpcClient`]
126///
127/// Note that TLS configuration is governed using `tls-*-roots` feature flags.
128#[derive(Default)]
129pub struct GrpcClientBuilder {
130    transports: Vec<TransportEntry>,
131    timeout: Option<Duration>,
132    signer_kind: Option<SignerKind>,
133}
134
135enum SignerKind {
136    Signer((VerifyingKey, BoxedDocSigner)),
137    PrivKeyBytes(Zeroizing<Vec<u8>>),
138    PrivKeyHex(Zeroizing<String>),
139}
140
141impl GrpcClientBuilder {
142    /// Create a new, empty builder.
143    pub fn new() -> Self {
144        GrpcClientBuilder::default()
145    }
146
147    /// Add a URL endpoint. Multiple calls add multiple fallback endpoints.
148    ///
149    /// This is an alias of [`GrpcClientBuilder::endpoint`].
150    ///
151    /// When multiple endpoints are configured, the client will automatically
152    /// fall back to the next endpoint if a network-related error occurs.
153    ///
154    /// # Example
155    ///
156    /// ```no_run
157    /// use celestia_grpc::GrpcClient;
158    ///
159    /// let client = GrpcClient::builder()
160    ///     .url("http://primary:9090")
161    ///     .url("http://fallback:9090")
162    ///     .build();
163    /// ```
164    pub fn url(self, endpoint: impl Into<Endpoint>) -> Self {
165        self.endpoint(endpoint)
166    }
167
168    /// Add a URL endpoint. This is the primary entry point for endpoints.
169    pub fn endpoint(mut self, endpoint: impl Into<Endpoint>) -> Self {
170        self.transports
171            .push(TransportEntry::Endpoint(endpoint.into()));
172        self
173    }
174
175    /// Add multiple URL endpoints.
176    ///
177    /// When multiple endpoints are configured, the client will automatically
178    /// fall back to the next endpoint if a network-related error occurs.
179    ///
180    /// # Example
181    ///
182    /// ```no_run
183    /// use celestia_grpc::GrpcClient;
184    ///
185    /// let client = GrpcClient::builder()
186    ///     .urls(["http://primary:9090", "http://fallback:9090"])
187    ///     .build();
188    /// ```
189    pub fn endpoints<I, E>(mut self, endpoints: I) -> Self
190    where
191        I: IntoIterator<Item = E>,
192        E: Into<Endpoint>,
193    {
194        for endpoint in endpoints {
195            self = self.endpoint(endpoint);
196        }
197        self
198    }
199
200    /// Add multiple URL endpoints. This is an alias of [`GrpcClientBuilder::endpoints`].
201    pub fn urls<I, E>(self, urls: I) -> Self
202    where
203        I: IntoIterator<Item = E>,
204        E: Into<Endpoint>,
205    {
206        self.endpoints(urls)
207    }
208
209    /// Add a custom transport endpoint. Multiple calls add multiple fallback endpoints.
210    ///
211    /// When multiple endpoints are configured, the client will automatically
212    /// fall back to the next endpoint if a network-related error occurs.
213    pub fn transport<B, T>(mut self, transport: T) -> Self
214    where
215        B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
216        <B as http_body::Body>::Error: StdError + Send + Sync,
217        T: Service<http::Request<TonicBody>, Response = http::Response<B>>
218            + Send
219            + Sync
220            + Clone
221            + 'static,
222        <T as Service<http::Request<TonicBody>>>::Error: StdError + Send + Sync + 'static,
223        <T as Service<http::Request<TonicBody>>>::Future: CondSend + 'static,
224    {
225        self.transports.push(TransportEntry::BoxedTransport(boxed(
226            transport,
227            TransportMetadata::default(),
228        )));
229        self
230    }
231
232    /// Add signer and a public key
233    pub fn pubkey_and_signer<S>(
234        mut self,
235        account_pubkey: VerifyingKey,
236        signer: S,
237    ) -> GrpcClientBuilder
238    where
239        S: DocSigner + 'static,
240    {
241        let signer = BoxedDocSigner::new(signer);
242        self.signer_kind = Some(SignerKind::Signer((account_pubkey, signer)));
243        self
244    }
245
246    /// Add signer and associated public key
247    pub fn signer_keypair<S>(self, signer: S) -> GrpcClientBuilder
248    where
249        S: DocSigner + Keypair<VerifyingKey = VerifyingKey> + 'static,
250    {
251        let pubkey = signer.verifying_key();
252        self.pubkey_and_signer(pubkey, signer)
253    }
254
255    /// Set signer from a raw private key.
256    pub fn private_key(mut self, bytes: &[u8]) -> GrpcClientBuilder {
257        self.signer_kind = Some(SignerKind::PrivKeyBytes(Zeroizing::new(bytes.to_vec())));
258        self
259    }
260
261    /// Set signer from a hex formatted private key.
262    pub fn private_key_hex(mut self, s: &str) -> GrpcClientBuilder {
263        self.signer_kind = Some(SignerKind::PrivKeyHex(Zeroizing::new(s.to_string())));
264        self
265    }
266
267    /// Sets the request timeout, overriding default one from the transport.
268    pub fn timeout(mut self, timeout: Duration) -> GrpcClientBuilder {
269        self.timeout = Some(timeout);
270        self
271    }
272
273    /// Build [`GrpcClient`]
274    ///
275    /// Returns error if no transports were configured.
276    pub fn build(self) -> Result<GrpcClient, GrpcClientBuilderError> {
277        if self.transports.is_empty() {
278            return Err(GrpcClientBuilderError::TransportNotSet);
279        }
280
281        let base_context = Context {
282            timeout: self.timeout,
283            ..Default::default()
284        };
285
286        let transports: Vec<BoxedTransport> = self
287            .transports
288            .into_iter()
289            .map(|entry| match entry {
290                TransportEntry::Endpoint(endpoint) => {
291                    let (url, endpoint_context) = endpoint.into_parts()?;
292                    let mut context = base_context.clone();
293                    context.extend(&endpoint_context);
294                    build_transport(url, context)
295                }
296                TransportEntry::BoxedTransport(mut transport) => {
297                    let mut context = base_context.clone();
298                    context.extend(&transport.metadata.context);
299                    transport.metadata = Arc::new(TransportMetadata {
300                        url: transport.metadata.url.clone(),
301                        context,
302                    });
303                    Ok(transport)
304                }
305            })
306            .collect::<Result<Vec<_>, _>>()?;
307
308        let transports = Arc::new(ArcSwap::from_pointee(transports));
309
310        let signer_config = self.signer_kind.map(TryInto::try_into).transpose()?;
311
312        Ok(GrpcClient::new(transports, signer_config))
313    }
314}
315
316impl TryFrom<SignerKind> for AccountState {
317    type Error = GrpcClientBuilderError;
318
319    fn try_from(value: SignerKind) -> Result<Self, Self::Error> {
320        match value {
321            SignerKind::Signer((pubkey, signer)) => Ok(AccountState::new(pubkey, signer)),
322            SignerKind::PrivKeyBytes(bytes) => priv_key_signer(&bytes),
323            SignerKind::PrivKeyHex(string) => {
324                let bytes = Zeroizing::new(
325                    hex::decode(string.trim())
326                        .map_err(|_| GrpcClientBuilderError::InvalidPrivateKey)?,
327                );
328                priv_key_signer(&bytes)
329            }
330        }
331    }
332}
333
334fn priv_key_signer(bytes: &[u8]) -> Result<AccountState, GrpcClientBuilderError> {
335    let signing_key =
336        SigningKey::from_slice(bytes).map_err(|_| GrpcClientBuilderError::InvalidPrivateKey)?;
337    let pubkey = signing_key.verifying_key().to_owned();
338    let signer = BoxedDocSigner::new(signing_key);
339    Ok(AccountState::new(pubkey, signer))
340}
341
342impl fmt::Debug for SignerKind {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        let s = match self {
345            SignerKind::Signer(..) => "SignerKind::Signer(..)",
346            SignerKind::PrivKeyBytes(..) => "SignerKind::PrivKeyBytes(..)",
347            SignerKind::PrivKeyHex(..) => "SignerKind::PrivKeyHex(..)",
348        };
349        f.write_str(s)
350    }
351}
352
353impl fmt::Debug for GrpcClientBuilder {
354    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355        f.write_str("GrpcClientBuilder { .. }")
356    }
357}
358
359#[cfg(not(target_arch = "wasm32"))]
360#[cfg(any(feature = "tls-native-roots", feature = "tls-webpki-roots"))]
361mod imp {
362    use super::*;
363
364    use tonic::transport::{ClientTlsConfig, Endpoint as TonicEndpoint};
365
366    pub(super) fn build_transport(
367        url: String,
368        context: Context,
369    ) -> Result<BoxedTransport, GrpcClientBuilderError> {
370        let tls_config = ClientTlsConfig::new().with_enabled_roots();
371
372        let channel = TonicEndpoint::from_shared(url.clone())?
373            .user_agent("celestia-grpc")?
374            .tls_config(tls_config)?
375            .connect_lazy();
376
377        Ok(boxed(
378            channel,
379            TransportMetadata::with_url_and_context(url, context),
380        ))
381    }
382}
383
384#[cfg(not(target_arch = "wasm32"))]
385#[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
386mod imp {
387    use super::*;
388
389    use tonic::transport::Endpoint as TonicEndpoint;
390
391    pub(super) fn build_transport(
392        url: String,
393        context: Context,
394    ) -> Result<BoxedTransport, GrpcClientBuilderError> {
395        if url
396            .split_once(':')
397            .is_some_and(|(scheme, _)| scheme == "https")
398        {
399            return Err(GrpcClientBuilderError::TlsNotSupported);
400        }
401
402        let channel = TonicEndpoint::from_shared(url.clone())?
403            .user_agent("celestia-grpc")?
404            .connect_lazy();
405
406        Ok(boxed(
407            channel,
408            TransportMetadata::with_url_and_context(url, context),
409        ))
410    }
411}
412
413#[cfg(target_arch = "wasm32")]
414mod imp {
415    use super::*;
416    pub(super) fn build_transport(
417        url: String,
418        context: Context,
419    ) -> Result<BoxedTransport, GrpcClientBuilderError> {
420        let client = tonic_web_wasm_client::Client::new(url.clone());
421        Ok(boxed(
422            client,
423            TransportMetadata::with_url_and_context(url, context),
424        ))
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use lumina_utils::test_utils::async_test;
432
433    #[test]
434    fn empty_builder_returns_transport_not_set() {
435        let result = GrpcClientBuilder::new().build();
436
437        assert!(matches!(
438            result,
439            Err(GrpcClientBuilderError::TransportNotSet)
440        ));
441    }
442
443    #[async_test]
444    async fn single_url_builds_successfully() {
445        let result = GrpcClientBuilder::new()
446            .url("http://localhost:9090")
447            .build();
448
449        assert!(result.is_ok());
450    }
451
452    #[async_test]
453    async fn multiple_urls_build_successfully() {
454        let result = GrpcClientBuilder::new()
455            .url("http://localhost:9090")
456            .url("http://localhost:9091")
457            .url("http://localhost:9092")
458            .build();
459
460        assert!(result.is_ok());
461    }
462
463    #[async_test]
464    async fn endpoint_with_config_builds_successfully() {
465        let result = GrpcClientBuilder::new()
466            .endpoint(
467                Endpoint::from("http://localhost:9090")
468                    .metadata("authorization", "Bearer token")
469                    .timeout(Duration::from_secs(30)),
470            )
471            .build();
472
473        assert!(result.is_ok());
474    }
475
476    #[async_test]
477    async fn urls_helper_builds_successfully() {
478        let result = GrpcClientBuilder::new()
479            .urls(["http://localhost:9090", "http://localhost:9091"])
480            .build();
481
482        assert!(result.is_ok());
483    }
484
485    #[async_test]
486    async fn endpoints_helper_builds_successfully() {
487        let result = GrpcClientBuilder::new()
488            .endpoints([
489                Endpoint::from("http://localhost:9090"),
490                Endpoint::from("http://localhost:9091"),
491            ])
492            .build();
493
494        assert!(result.is_ok());
495    }
496
497    #[test]
498    fn endpoint_from_str_uses_default_config() {
499        let endpoint: Endpoint = "http://localhost:9090".into();
500
501        assert_eq!(endpoint.url, "http://localhost:9090");
502        assert!(endpoint.ascii_metadata.is_empty());
503        assert!(endpoint.binary_metadata.is_empty());
504        assert!(endpoint.metadata_map.is_none());
505        assert!(endpoint.timeout.is_none());
506    }
507
508    #[test]
509    fn endpoint_from_string_uses_default_config() {
510        let endpoint: Endpoint = String::from("http://localhost:9090").into();
511
512        assert_eq!(endpoint.url, "http://localhost:9090");
513        assert!(endpoint.ascii_metadata.is_empty());
514        assert!(endpoint.binary_metadata.is_empty());
515        assert!(endpoint.metadata_map.is_none());
516        assert!(endpoint.timeout.is_none());
517    }
518}