Skip to main content

celestia_client/
client.rs

1use std::error::Error as StdError;
2use std::fmt::{self, Debug};
3use std::sync::Arc;
4use std::time::Duration;
5
6use blockstore::cond_send::CondSend;
7pub use celestia_grpc::Endpoint;
8use celestia_grpc::{GrpcClient, GrpcClientBuilder};
9use celestia_rpc::{Client as RpcClient, HeaderClient};
10use http::Request;
11use tonic::body::Body as TonicBody;
12use tonic::codegen::{Bytes, Service};
13
14use crate::blob::BlobApi;
15use crate::blobstream::BlobstreamApi;
16use crate::fraud::FraudApi;
17use crate::header::HeaderApi;
18use crate::share::ShareApi;
19use crate::state::StateApi;
20use crate::tx::{DocSigner, Keypair, VerifyingKey};
21use crate::types::ExtendedHeader;
22use crate::types::state::AccAddress;
23use crate::{Error, Result};
24
25/// A high-level client for interacting with a Celestia node.
26///
27/// There are two modes: read-only mode and submit mode. Read-only mode requires
28/// RPC and optionally gRPC endpoint, while submit mode requires both, plus a signer.
29///
30/// # Examples
31///
32/// Read-only mode:
33///
34/// ```no_run
35/// # use celestia_client::{Client, Result};
36/// # async fn docs() -> Result<()> {
37/// let client = Client::builder()
38///     .rpc_url("ws://localhost:26658")
39///     .grpc_url("http://localhost:9090") // optional in read-only mode
40///     .build()
41///     .await?;
42///
43/// client.header().head().await?;
44/// # Ok(())
45/// # }
46/// ```
47///
48/// Submit mode:
49///
50/// ```no_run
51/// # use celestia_client::{Client, Endpoint, Result};
52/// # use celestia_client::tx::TxConfig;
53/// # use std::time::Duration;
54/// # async fn docs() -> Result<()> {
55/// let endpoint = Endpoint::from("http://localhost:9090")
56///     .metadata("x-token", "auth-token")
57///     .timeout(Duration::from_secs(30));
58///
59/// let client = Client::builder()
60///     .rpc_url("ws://localhost:26658")
61///     .grpc_endpoint(endpoint)
62///     .private_key_hex("393fdb5def075819de55756b45c9e2c8531a8c78dd6eede483d3440e9457d839")
63///     .build()
64///     .await?;
65///
66/// let to_address = "celestia169s50psyj2f4la9a2235329xz7rk6c53zhw9mm".parse().unwrap();
67/// client.state().transfer(&to_address, 12345, TxConfig::default()).await?;
68/// # Ok(())
69/// # }
70/// ```
71///
72/// [`celestia-rpc`]: celestia_rpc
73/// [`celestia-grpc`]: celestia_grpc
74pub struct Client {
75    inner: Arc<ClientInner>,
76    state: StateApi,
77    blob: BlobApi,
78    header: HeaderApi,
79    share: ShareApi,
80    fraud: FraudApi,
81    blobstream: BlobstreamApi,
82}
83
84pub(crate) struct ClientInner {
85    pub(crate) rpc: RpcClient,
86    grpc: Option<GrpcClient>,
87    pubkey: Option<VerifyingKey>,
88    chain_id: tendermint::chain::Id,
89}
90
91/// A builder for [`Client`].
92#[derive(Debug, Default)]
93pub struct ClientBuilder {
94    rpc_url: Option<String>,
95    rpc_auth_token: Option<String>,
96    timeout: Option<Duration>,
97    grpc_builder: Option<GrpcClientBuilder>,
98}
99
100impl ClientInner {
101    pub(crate) fn grpc(&self) -> Result<&GrpcClient> {
102        self.grpc.as_ref().ok_or(Error::GrpcEndpointNotSet)
103    }
104
105    pub(crate) fn pubkey(&self) -> Result<&VerifyingKey> {
106        self.pubkey.as_ref().ok_or(Error::NoAssociatedAddress)
107    }
108
109    pub(crate) fn address(&self) -> Result<AccAddress> {
110        let pubkey = self.pubkey()?.to_owned();
111        Ok(AccAddress::new(pubkey.into()))
112    }
113
114    pub(crate) async fn get_header_validated(&self, height: u64) -> Result<ExtendedHeader> {
115        let header = self.rpc.header_get_by_height(height).await?;
116        header.validate()?;
117        Ok(header)
118    }
119}
120
121impl Client {
122    /// Returns `ClientBuilder`.
123    pub fn builder() -> ClientBuilder {
124        ClientBuilder::new()
125    }
126
127    /// Returns chain id of the network.
128    pub fn chain_id(&self) -> &tendermint::chain::Id {
129        &self.inner.chain_id
130    }
131
132    /// Returns the public key of the signer.
133    pub fn pubkey(&self) -> Result<VerifyingKey> {
134        self.inner.pubkey().cloned()
135    }
136
137    /// Returns the address of signer.
138    pub fn address(&self) -> Result<AccAddress> {
139        self.inner.address()
140    }
141
142    /// Returns state API accessor.
143    pub fn state(&self) -> &StateApi {
144        &self.state
145    }
146
147    /// Returns blob API accessor.
148    pub fn blob(&self) -> &BlobApi {
149        &self.blob
150    }
151
152    /// Returns blobstream API accessor.
153    pub fn blobstream(&self) -> &BlobstreamApi {
154        &self.blobstream
155    }
156
157    /// Returns header API accessor.
158    pub fn header(&self) -> &HeaderApi {
159        &self.header
160    }
161
162    /// Returns share API accessor.
163    pub fn share(&self) -> &ShareApi {
164        &self.share
165    }
166
167    /// Returns fraud API accessor.
168    pub fn fraud(&self) -> &FraudApi {
169        &self.fraud
170    }
171}
172
173impl ClientBuilder {
174    /// Returns a new builder.
175    pub fn new() -> ClientBuilder {
176        ClientBuilder::default()
177    }
178
179    /// Set signer and its public key.
180    pub fn signer<S>(mut self, pubkey: VerifyingKey, signer: S) -> ClientBuilder
181    where
182        S: DocSigner + Sync + Send + 'static,
183    {
184        let grpc_builder = self.grpc_builder.unwrap_or_default();
185        self.grpc_builder = Some(grpc_builder.pubkey_and_signer(pubkey, signer));
186        self
187    }
188
189    /// Set signer from a keypair.
190    pub fn keypair<S>(mut self, keypair: S) -> ClientBuilder
191    where
192        S: DocSigner + Keypair<VerifyingKey = VerifyingKey> + Sync + Send + 'static,
193    {
194        let grpc_builder = self.grpc_builder.unwrap_or_default();
195        self.grpc_builder = Some(grpc_builder.signer_keypair(keypair));
196        self
197    }
198
199    /// Set signer from a raw private key.
200    pub fn private_key(mut self, bytes: &[u8]) -> ClientBuilder {
201        let grpc_builder = self.grpc_builder.unwrap_or_default();
202        self.grpc_builder = Some(grpc_builder.private_key(bytes));
203        self
204    }
205
206    /// Set signer from a hex formatted private key.
207    pub fn private_key_hex(mut self, s: &str) -> ClientBuilder {
208        let grpc_builder = self.grpc_builder.unwrap_or_default();
209        self.grpc_builder = Some(grpc_builder.private_key_hex(s));
210        self
211    }
212
213    /// Set the RPC endpoint.
214    pub fn rpc_url(mut self, url: &str) -> ClientBuilder {
215        self.rpc_url = Some(url.to_owned());
216        self
217    }
218
219    /// Set the authentication token of RPC endpoint.
220    pub fn rpc_auth_token(mut self, auth_token: &str) -> ClientBuilder {
221        self.rpc_auth_token = Some(auth_token.to_owned());
222        self
223    }
224
225    /// Set the request timeout for both RPC and gRPC endpoints.
226    pub fn timeout(mut self, timeout: Duration) -> ClientBuilder {
227        self.timeout = Some(timeout);
228        self
229    }
230
231    /// Set the gRPC endpoint.
232    ///
233    /// Alias of [`ClientBuilder::grpc_endpoint`].
234    ///
235    /// Accepts `Endpoint`, `&str`, or `String`.
236    ///
237    /// # Note
238    ///
239    /// In WASM the endpoint needs to support gRPC-Web.
240    pub fn grpc_url(self, url: impl Into<Endpoint>) -> ClientBuilder {
241        self.grpc_endpoint(url)
242    }
243
244    /// Set the gRPC endpoint.
245    pub fn grpc_endpoint(mut self, endpoint: impl Into<Endpoint>) -> ClientBuilder {
246        let grpc_builder = self.grpc_builder.unwrap_or_default();
247        self.grpc_builder = Some(grpc_builder.endpoint(endpoint));
248        self
249    }
250
251    /// Add multiple gRPC endpoints at once for fallback support.
252    ///
253    /// Accepts `Endpoint`, `&str`, or `String` items.
254    ///
255    /// When multiple endpoints are configured, the client will automatically
256    /// fall back to the next endpoint if a network-related error occurs.
257    ///
258    /// # Note
259    ///
260    /// In WASM the endpoints need to support gRPC-Web.
261    pub fn grpc_endpoints<I, E>(mut self, endpoints: I) -> ClientBuilder
262    where
263        I: IntoIterator<Item = E>,
264        E: Into<Endpoint>,
265    {
266        let grpc_builder = self.grpc_builder.unwrap_or_default();
267        self.grpc_builder = Some(grpc_builder.endpoints(endpoints));
268        self
269    }
270
271    /// Add multiple gRPC endpoints. Alias of [`ClientBuilder::grpc_endpoints`].
272    pub fn grpc_urls<I, E>(self, urls: I) -> ClientBuilder
273    where
274        I: IntoIterator<Item = E>,
275        E: Into<Endpoint>,
276    {
277        self.grpc_endpoints(urls)
278    }
279
280    /// Set manually configured gRPC transport
281    pub fn grpc_transport<B, T>(mut self, transport: T) -> Self
282    where
283        B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
284        <B as http_body::Body>::Error: StdError + Send + Sync,
285        T: Service<Request<TonicBody>, Response = http::Response<B>>
286            + Send
287            + Sync
288            + Clone
289            + 'static,
290        <T as Service<Request<TonicBody>>>::Error: StdError + Send + Sync + 'static,
291        <T as Service<Request<TonicBody>>>::Future: CondSend + 'static,
292    {
293        let grpc_builder = self.grpc_builder.unwrap_or_default();
294        self.grpc_builder = Some(grpc_builder.transport(transport));
295        self
296    }
297
298    /// Build [`Client`].
299    pub async fn build(self) -> Result<Client> {
300        let rpc_url = self.rpc_url.as_ref().ok_or(Error::RpcEndpointNotSet)?;
301        let rpc_auth_token = self.rpc_auth_token.as_deref();
302
303        let (grpc, pubkey) = if let Some(mut grpc_builder) = self.grpc_builder {
304            if let Some(timeout) = self.timeout {
305                grpc_builder = grpc_builder.timeout(timeout)
306            };
307            let client = grpc_builder.build()?;
308            let pubkey = client.get_account_pubkey();
309            (Some(client), pubkey)
310        } else {
311            (None, None)
312        };
313
314        let rpc = RpcClient::new(rpc_url, rpc_auth_token, self.timeout, self.timeout).await?;
315
316        let head = rpc.header_network_head().await?;
317        head.validate()?;
318
319        if let Some(grpc) = &grpc
320            && &grpc.chain_id().await? != head.chain_id()
321        {
322            return Err(Error::ChainIdMissmatch);
323        }
324
325        let inner = Arc::new(ClientInner {
326            rpc,
327            grpc,
328            pubkey,
329            chain_id: head.chain_id().to_owned(),
330        });
331
332        Ok(Client {
333            inner: inner.clone(),
334            blob: BlobApi::new(inner.clone()),
335            header: HeaderApi::new(inner.clone()),
336            share: ShareApi::new(inner.clone()),
337            fraud: FraudApi::new(inner.clone()),
338            blobstream: BlobstreamApi::new(inner.clone()),
339            state: StateApi::new(inner.clone()),
340        })
341    }
342}
343
344impl Debug for Client {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        f.write_str("Client { .. }")
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    use lumina_utils::test_utils::async_test;
355
356    use crate::test_utils::{TEST_PRIV_KEY, TEST_RPC_URL};
357
358    #[async_test]
359    async fn builder() {
360        let e = Client::builder()
361            .rpc_url(TEST_RPC_URL)
362            .private_key_hex(TEST_PRIV_KEY)
363            .build()
364            .await
365            .unwrap_err();
366        assert!(matches!(e, Error::GrpcEndpointNotSet))
367    }
368}