Skip to main content

celestia_rpc/
blob.rs

1//! celestia-node rpc types and methods related to blobs
2
3use std::future::Future;
4use std::marker::{Send, Sync};
5use std::pin::Pin;
6
7use async_stream::try_stream;
8use celestia_types::nmt::{Namespace, NamespaceProof};
9use celestia_types::{Blob, Commitment};
10use futures_util::{Stream, StreamExt};
11use jsonrpsee::core::client::{ClientT, Error, SubscriptionClientT};
12use jsonrpsee::core::{RpcResult, SubscriptionResult};
13use jsonrpsee::proc_macros::rpc;
14use serde::{Deserialize, Serialize};
15
16use crate::{HeaderClient, TxConfig, custom_client_error};
17
18/// Response type for [`BlobClient::blob_subscribe`].
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "PascalCase")]
21pub struct BlobsAtHeight {
22    /// Blobs submitted at given height.
23    pub blobs: Option<Vec<Blob>>,
24    /// A height for which the blobs were returned.
25    pub height: u64,
26}
27
28mod rpc {
29    use super::*;
30
31    /// Blob RPC methods.
32    #[rpc(client, server, namespace = "blob", namespace_separator = ".")]
33    pub trait Blob {
34        /// See [`crate::BlobClient::blob_get`].
35        #[method(name = "Get")]
36        async fn blob_get(
37            &self,
38            height: u64,
39            namespace: Namespace,
40            commitment: Commitment,
41        ) -> RpcResult<Blob>;
42
43        /// See [`crate::BlobClient::blob_get_all`].
44        #[method(name = "GetAll")]
45        async fn blob_get_all(
46            &self,
47            height: u64,
48            namespaces: Vec<Namespace>,
49        ) -> RpcResult<Option<Vec<Blob>>>;
50
51        /// See [`crate::BlobClient::blob_get_proof`].
52        #[method(name = "GetProof")]
53        async fn blob_get_proof(
54            &self,
55            height: u64,
56            namespace: Namespace,
57            commitment: Commitment,
58        ) -> RpcResult<Vec<NamespaceProof>>;
59
60        /// See [`crate::BlobClient::blob_included`].
61        #[method(name = "Included")]
62        async fn blob_included(
63            &self,
64            height: u64,
65            namespace: Namespace,
66            proof: NamespaceProof,
67            commitment: Commitment,
68        ) -> RpcResult<bool>;
69
70        /// See [`crate::BlobClient::blob_submit`].
71        #[method(name = "Submit")]
72        async fn blob_submit(&self, blobs: Vec<Blob>, opts: TxConfig) -> RpcResult<u64>;
73    }
74
75    /// Blob subscription RPC methods.
76    #[rpc(client, server, namespace = "blob", namespace_separator = ".")]
77    pub trait BlobSubscription {
78        /// See [`crate::BlobClient::blob_subscribe`].
79        #[subscription(name = "Subscribe", unsubscribe = "Unsubscribe", item = BlobsAtHeight)]
80        async fn blob_subscribe(&self, namespace: Namespace) -> SubscriptionResult;
81    }
82}
83
84/// Client implementation for the `Blob` RPC API.
85pub trait BlobClient: ClientT {
86    /// Get retrieves the blob by commitment under the given namespace and height.
87    fn blob_get<'a, 'fut>(
88        &'a self,
89        height: u64,
90        namespace: Namespace,
91        commitment: Commitment,
92    ) -> impl Future<Output = Result<Blob, Error>> + Send + 'fut
93    where
94        'a: 'fut,
95        Self: Sized + Sync + 'fut,
96    {
97        rpc::BlobClient::blob_get(self, height, namespace, commitment)
98    }
99
100    /// GetAll returns all blobs under the given namespaces and height.
101    fn blob_get_all<'a, 'b, 'fut>(
102        &'a self,
103        height: u64,
104        namespaces: &'b [Namespace],
105    ) -> impl Future<Output = Result<Option<Vec<Blob>>, Error>> + Send + 'fut
106    where
107        'a: 'fut,
108        'b: 'fut,
109        Self: Sized + Sync + 'fut,
110    {
111        rpc::BlobClient::blob_get_all(self, height, namespaces.to_vec())
112    }
113
114    /// GetProof retrieves proofs in the given namespaces at the given height by commitment.
115    fn blob_get_proof<'a, 'fut>(
116        &'a self,
117        height: u64,
118        namespace: Namespace,
119        commitment: Commitment,
120    ) -> impl Future<Output = Result<Vec<NamespaceProof>, Error>> + Send + 'fut
121    where
122        'a: 'fut,
123        Self: Sized + Sync + 'fut,
124    {
125        rpc::BlobClient::blob_get_proof(self, height, namespace, commitment)
126    }
127
128    /// Included checks whether a blob's given commitment(Merkle subtree root) is included at given height and under the namespace.
129    fn blob_included<'a, 'b, 'fut>(
130        &'a self,
131        height: u64,
132        namespace: Namespace,
133        proof: &'b NamespaceProof,
134        commitment: Commitment,
135    ) -> impl Future<Output = Result<bool, Error>> + Send + 'fut
136    where
137        'a: 'fut,
138        'b: 'fut,
139        Self: Sized + Sync + 'fut,
140    {
141        rpc::BlobClient::blob_included(self, height, namespace, proof.clone(), commitment)
142    }
143
144    /// Submit sends Blobs and reports the height in which they were included. Allows sending multiple Blobs atomically synchronously. Uses default wallet registered on the Node.
145    fn blob_submit<'a, 'b, 'fut>(
146        &'a self,
147        blobs: &'b [Blob],
148        opts: TxConfig,
149    ) -> impl Future<Output = Result<u64, Error>> + Send + 'fut
150    where
151        'a: 'fut,
152        'b: 'fut,
153        Self: Sized + Sync + 'fut,
154    {
155        rpc::BlobClient::blob_submit(self, blobs.to_vec(), opts)
156    }
157
158    /// Subscribe to published blobs from the given namespace as they are included.
159    ///
160    /// # Notes
161    ///
162    /// If client returns [`Error::HttpNotImplemented`], the subscription will fallback to
163    /// using combination of [`HeaderClient::header_wait_for_height`] and
164    /// [`BlobClient::blob_get_all`] for streaming the blobs.
165    ///
166    /// Unsubscribe is not implemented by Celestia nodes.
167    fn blob_subscribe<'a>(
168        &'a self,
169        namespace: Namespace,
170    ) -> Pin<Box<dyn Stream<Item = Result<BlobsAtHeight, Error>> + Send + 'a>>
171    where
172        Self: SubscriptionClientT + Sized + Sync,
173    {
174        try_stream! {
175            let subscription_res = rpc::BlobSubscriptionClient::blob_subscribe(self, namespace).await;
176            let has_real_sub = !matches!(&subscription_res, Err(Error::HttpNotImplemented));
177
178            let (mut blob_sub, mut header_sub) = if has_real_sub {
179                (Some(subscription_res?), None)
180            } else {
181                (None, Some(HeaderClient::header_subscribe(self)))
182            };
183
184            // TODO: should we validate blobs? we could do it in fallback,
185            // but we don't know the app version for real sub
186            loop {
187                yield if has_real_sub {
188                    blob_sub
189                        .as_mut()
190                        .expect("must be some")
191                        .next()
192                        .await
193                        .ok_or_else(|| custom_client_error("unexpected end of stream"))??
194                } else {
195                    let header = header_sub
196                        .as_mut()
197                        .expect("must be some")
198                        .next()
199                        .await
200                        .ok_or_else(|| custom_client_error("unexpected end of stream"))??;
201                    let height = header.height();
202                    let blobs = rpc::BlobClient::blob_get_all(self, height, vec![namespace]).await?;
203
204                    BlobsAtHeight {
205                        blobs,
206                        height,
207                    }
208                };
209            }
210        }
211        .boxed()
212    }
213}
214
215/// Server trait for Blob RPC endpoints.
216pub trait BlobServer: rpc::BlobServer + rpc::BlobSubscriptionServer {}
217
218impl<T> BlobServer for T where T: rpc::BlobServer + rpc::BlobSubscriptionServer {}
219
220impl<T> BlobClient for T where T: ClientT {}
221
222pub use rpc::BlobServer as BlobRpcServer;
223pub use rpc::BlobSubscriptionServer as BlobSubscriptionRpcServer;