1use std::future::Future;
4use std::marker::{Send, Sync};
5use std::pin::Pin;
6
7use async_stream::try_stream;
8use celestia_types::nmt::{Namespace, NamespaceProof};
9use celestia_types::{Blob, Commitment};
10use futures_util::{Stream, StreamExt};
11use jsonrpsee::core::client::{ClientT, Error, SubscriptionClientT};
12use jsonrpsee::core::{RpcResult, SubscriptionResult};
13use jsonrpsee::proc_macros::rpc;
14use serde::{Deserialize, Serialize};
15
16use crate::{HeaderClient, TxConfig, custom_client_error};
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "PascalCase")]
21pub struct BlobsAtHeight {
22 pub blobs: Option<Vec<Blob>>,
24 pub height: u64,
26}
27
28mod rpc {
29 use super::*;
30
31 #[rpc(client, server, namespace = "blob", namespace_separator = ".")]
33 pub trait Blob {
34 #[method(name = "Get")]
36 async fn blob_get(
37 &self,
38 height: u64,
39 namespace: Namespace,
40 commitment: Commitment,
41 ) -> RpcResult<Blob>;
42
43 #[method(name = "GetAll")]
45 async fn blob_get_all(
46 &self,
47 height: u64,
48 namespaces: Vec<Namespace>,
49 ) -> RpcResult<Option<Vec<Blob>>>;
50
51 #[method(name = "GetProof")]
53 async fn blob_get_proof(
54 &self,
55 height: u64,
56 namespace: Namespace,
57 commitment: Commitment,
58 ) -> RpcResult<Vec<NamespaceProof>>;
59
60 #[method(name = "Included")]
62 async fn blob_included(
63 &self,
64 height: u64,
65 namespace: Namespace,
66 proof: NamespaceProof,
67 commitment: Commitment,
68 ) -> RpcResult<bool>;
69
70 #[method(name = "Submit")]
72 async fn blob_submit(&self, blobs: Vec<Blob>, opts: TxConfig) -> RpcResult<u64>;
73 }
74
75 #[rpc(client, server, namespace = "blob", namespace_separator = ".")]
77 pub trait BlobSubscription {
78 #[subscription(name = "Subscribe", unsubscribe = "Unsubscribe", item = BlobsAtHeight)]
80 async fn blob_subscribe(&self, namespace: Namespace) -> SubscriptionResult;
81 }
82}
83
84pub trait BlobClient: ClientT {
86 fn blob_get<'a, 'fut>(
88 &'a self,
89 height: u64,
90 namespace: Namespace,
91 commitment: Commitment,
92 ) -> impl Future<Output = Result<Blob, Error>> + Send + 'fut
93 where
94 'a: 'fut,
95 Self: Sized + Sync + 'fut,
96 {
97 rpc::BlobClient::blob_get(self, height, namespace, commitment)
98 }
99
100 fn blob_get_all<'a, 'b, 'fut>(
102 &'a self,
103 height: u64,
104 namespaces: &'b [Namespace],
105 ) -> impl Future<Output = Result<Option<Vec<Blob>>, Error>> + Send + 'fut
106 where
107 'a: 'fut,
108 'b: 'fut,
109 Self: Sized + Sync + 'fut,
110 {
111 rpc::BlobClient::blob_get_all(self, height, namespaces.to_vec())
112 }
113
114 fn blob_get_proof<'a, 'fut>(
116 &'a self,
117 height: u64,
118 namespace: Namespace,
119 commitment: Commitment,
120 ) -> impl Future<Output = Result<Vec<NamespaceProof>, Error>> + Send + 'fut
121 where
122 'a: 'fut,
123 Self: Sized + Sync + 'fut,
124 {
125 rpc::BlobClient::blob_get_proof(self, height, namespace, commitment)
126 }
127
128 fn blob_included<'a, 'b, 'fut>(
130 &'a self,
131 height: u64,
132 namespace: Namespace,
133 proof: &'b NamespaceProof,
134 commitment: Commitment,
135 ) -> impl Future<Output = Result<bool, Error>> + Send + 'fut
136 where
137 'a: 'fut,
138 'b: 'fut,
139 Self: Sized + Sync + 'fut,
140 {
141 rpc::BlobClient::blob_included(self, height, namespace, proof.clone(), commitment)
142 }
143
144 fn blob_submit<'a, 'b, 'fut>(
146 &'a self,
147 blobs: &'b [Blob],
148 opts: TxConfig,
149 ) -> impl Future<Output = Result<u64, Error>> + Send + 'fut
150 where
151 'a: 'fut,
152 'b: 'fut,
153 Self: Sized + Sync + 'fut,
154 {
155 rpc::BlobClient::blob_submit(self, blobs.to_vec(), opts)
156 }
157
158 fn blob_subscribe<'a>(
168 &'a self,
169 namespace: Namespace,
170 ) -> Pin<Box<dyn Stream<Item = Result<BlobsAtHeight, Error>> + Send + 'a>>
171 where
172 Self: SubscriptionClientT + Sized + Sync,
173 {
174 try_stream! {
175 let subscription_res = rpc::BlobSubscriptionClient::blob_subscribe(self, namespace).await;
176 let has_real_sub = !matches!(&subscription_res, Err(Error::HttpNotImplemented));
177
178 let (mut blob_sub, mut header_sub) = if has_real_sub {
179 (Some(subscription_res?), None)
180 } else {
181 (None, Some(HeaderClient::header_subscribe(self)))
182 };
183
184 loop {
187 yield if has_real_sub {
188 blob_sub
189 .as_mut()
190 .expect("must be some")
191 .next()
192 .await
193 .ok_or_else(|| custom_client_error("unexpected end of stream"))??
194 } else {
195 let header = header_sub
196 .as_mut()
197 .expect("must be some")
198 .next()
199 .await
200 .ok_or_else(|| custom_client_error("unexpected end of stream"))??;
201 let height = header.height();
202 let blobs = rpc::BlobClient::blob_get_all(self, height, vec![namespace]).await?;
203
204 BlobsAtHeight {
205 blobs,
206 height,
207 }
208 };
209 }
210 }
211 .boxed()
212 }
213}
214
215pub trait BlobServer: rpc::BlobServer + rpc::BlobSubscriptionServer {}
217
218impl<T> BlobServer for T where T: rpc::BlobServer + rpc::BlobSubscriptionServer {}
219
220impl<T> BlobClient for T where T: ClientT {}
221
222pub use rpc::BlobServer as BlobRpcServer;
223pub use rpc::BlobSubscriptionServer as BlobSubscriptionRpcServer;