use std::future::Future;
use std::marker::{Send, Sync};
use std::pin::Pin;
use async_stream::try_stream;
use celestia_types::nmt::{Namespace, NamespaceProof};
use celestia_types::{Blob, Commitment};
use futures_util::{Stream, StreamExt};
use jsonrpsee::core::client::{ClientT, Error, SubscriptionClientT};
use jsonrpsee::core::{RpcResult, SubscriptionResult};
use jsonrpsee::proc_macros::rpc;
use serde::{Deserialize, Serialize};
use crate::{HeaderClient, TxConfig, custom_client_error};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct BlobsAtHeight {
pub blobs: Option<Vec<Blob>>,
pub height: u64,
}
mod rpc {
use super::*;
#[rpc(client, server, namespace = "blob", namespace_separator = ".")]
pub trait Blob {
#[method(name = "Get")]
async fn blob_get(
&self,
height: u64,
namespace: Namespace,
commitment: Commitment,
) -> RpcResult<Blob>;
#[method(name = "GetAll")]
async fn blob_get_all(
&self,
height: u64,
namespaces: Vec<Namespace>,
) -> RpcResult<Option<Vec<Blob>>>;
#[method(name = "GetProof")]
async fn blob_get_proof(
&self,
height: u64,
namespace: Namespace,
commitment: Commitment,
) -> RpcResult<Vec<NamespaceProof>>;
#[method(name = "Included")]
async fn blob_included(
&self,
height: u64,
namespace: Namespace,
proof: NamespaceProof,
commitment: Commitment,
) -> RpcResult<bool>;
#[method(name = "Submit")]
async fn blob_submit(&self, blobs: Vec<Blob>, opts: TxConfig) -> RpcResult<u64>;
}
#[rpc(client, server, namespace = "blob", namespace_separator = ".")]
pub trait BlobSubscription {
#[subscription(name = "Subscribe", unsubscribe = "Unsubscribe", item = BlobsAtHeight)]
async fn blob_subscribe(&self, namespace: Namespace) -> SubscriptionResult;
}
}
pub trait BlobClient: ClientT {
fn blob_get<'a, 'fut>(
&'a self,
height: u64,
namespace: Namespace,
commitment: Commitment,
) -> impl Future<Output = Result<Blob, Error>> + Send + 'fut
where
'a: 'fut,
Self: Sized + Sync + 'fut,
{
rpc::BlobClient::blob_get(self, height, namespace, commitment)
}
fn blob_get_all<'a, 'b, 'fut>(
&'a self,
height: u64,
namespaces: &'b [Namespace],
) -> impl Future<Output = Result<Option<Vec<Blob>>, Error>> + Send + 'fut
where
'a: 'fut,
'b: 'fut,
Self: Sized + Sync + 'fut,
{
rpc::BlobClient::blob_get_all(self, height, namespaces.to_vec())
}
fn blob_get_proof<'a, 'fut>(
&'a self,
height: u64,
namespace: Namespace,
commitment: Commitment,
) -> impl Future<Output = Result<Vec<NamespaceProof>, Error>> + Send + 'fut
where
'a: 'fut,
Self: Sized + Sync + 'fut,
{
rpc::BlobClient::blob_get_proof(self, height, namespace, commitment)
}
fn blob_included<'a, 'b, 'fut>(
&'a self,
height: u64,
namespace: Namespace,
proof: &'b NamespaceProof,
commitment: Commitment,
) -> impl Future<Output = Result<bool, Error>> + Send + 'fut
where
'a: 'fut,
'b: 'fut,
Self: Sized + Sync + 'fut,
{
rpc::BlobClient::blob_included(self, height, namespace, proof.clone(), commitment)
}
fn blob_submit<'a, 'b, 'fut>(
&'a self,
blobs: &'b [Blob],
opts: TxConfig,
) -> impl Future<Output = Result<u64, Error>> + Send + 'fut
where
'a: 'fut,
'b: 'fut,
Self: Sized + Sync + 'fut,
{
rpc::BlobClient::blob_submit(self, blobs.to_vec(), opts)
}
fn blob_subscribe<'a>(
&'a self,
namespace: Namespace,
) -> Pin<Box<dyn Stream<Item = Result<BlobsAtHeight, Error>> + Send + 'a>>
where
Self: SubscriptionClientT + Sized + Sync,
{
try_stream! {
let subscription_res = rpc::BlobSubscriptionClient::blob_subscribe(self, namespace).await;
let has_real_sub = !matches!(&subscription_res, Err(Error::HttpNotImplemented));
let (mut blob_sub, mut header_sub) = if has_real_sub {
(Some(subscription_res?), None)
} else {
(None, Some(HeaderClient::header_subscribe(self)))
};
loop {
yield if has_real_sub {
blob_sub
.as_mut()
.expect("must be some")
.next()
.await
.ok_or_else(|| custom_client_error("unexpected end of stream"))??
} else {
let header = header_sub
.as_mut()
.expect("must be some")
.next()
.await
.ok_or_else(|| custom_client_error("unexpected end of stream"))??;
let height = header.height();
let blobs = rpc::BlobClient::blob_get_all(self, height, vec![namespace]).await?;
BlobsAtHeight {
blobs,
height,
}
};
}
}
.boxed()
}
}
pub trait BlobServer: rpc::BlobServer + rpc::BlobSubscriptionServer {}
impl<T> BlobServer for T where T: rpc::BlobServer + rpc::BlobSubscriptionServer {}
impl<T> BlobClient for T where T: ClientT {}
pub use rpc::BlobServer as BlobRpcServer;
pub use rpc::BlobSubscriptionServer as BlobSubscriptionRpcServer;