use std::pin::Pin;
use std::sync::Arc;
use async_stream::try_stream;
use celestia_rpc::BlobClient;
use futures_util::{Stream, StreamExt};
use crate::Result;
use crate::api::blob::BlobsAtHeight;
use crate::client::ClientInner;
use crate::state::AsyncGrpcCall;
use crate::tx::{TxConfig, TxInfo};
use crate::types::nmt::{Namespace, NamespaceProof};
use crate::types::{Blob, Commitment};
pub struct BlobApi {
inner: Arc<ClientInner>,
}
impl BlobApi {
pub(crate) fn new(inner: Arc<ClientInner>) -> BlobApi {
BlobApi { inner }
}
pub fn submit(&self, blobs: &[Blob], cfg: TxConfig) -> AsyncGrpcCall<TxInfo> {
let inner = self.inner.clone();
let blobs = blobs.to_vec();
AsyncGrpcCall::new(move |context| async move {
Ok(inner
.grpc()?
.submit_blobs(&blobs, cfg)
.context(&context)
.await?)
})
}
pub async fn get(
&self,
height: u64,
namespace: Namespace,
commitment: Commitment,
) -> Result<Blob> {
let blob = self
.inner
.rpc
.blob_get(height, namespace, commitment)
.await?;
let app_version = self
.inner
.get_header_validated(height)
.await?
.app_version()?;
blob.validate_with_commitment(&commitment, app_version)?;
Ok(blob)
}
pub async fn get_all(
&self,
height: u64,
namespaces: &[Namespace],
) -> Result<Option<Vec<Blob>>> {
let Some(blobs) = self.inner.rpc.blob_get_all(height, namespaces).await? else {
return Ok(None);
};
let app_version = self
.inner
.get_header_validated(height)
.await?
.app_version()?;
for blob in &blobs {
blob.validate(app_version)?;
}
Ok(Some(blobs))
}
pub async fn get_proof(
&self,
height: u64,
namespace: Namespace,
commitment: Commitment,
) -> Result<Vec<NamespaceProof>> {
Ok(self
.inner
.rpc
.blob_get_proof(height, namespace, commitment)
.await?)
}
pub async fn included(
&self,
height: u64,
namespace: Namespace,
proof: &NamespaceProof,
commitment: Commitment,
) -> Result<bool> {
Ok(self
.inner
.rpc
.blob_included(height, namespace, proof, commitment)
.await?)
}
pub fn subscribe(
&self,
namespace: Namespace,
) -> Pin<Box<dyn Stream<Item = Result<BlobsAtHeight>> + Send + 'static>> {
let inner = self.inner.clone();
try_stream! {
let mut subscription = inner.rpc.blob_subscribe(namespace);
while let Some(item) = subscription.next().await {
yield item?;
}
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{ensure_serializable_deserializable, new_client};
use celestia_types::AppVersion;
use lumina_utils::test_utils::async_test;
#[async_test]
async fn blob_submit_and_retrieve() {
let client = new_client().await;
let ns = Namespace::new_v0(b"mydata").unwrap();
let blob = Blob::new(
ns,
b"some data to store".to_vec(),
Some(client.address().unwrap()),
AppVersion::V3,
)
.unwrap();
let submitted_commitment = blob.commitment;
let tx_info = client
.blob()
.submit(&[blob], TxConfig::default())
.await
.unwrap();
let received_blob = client
.blob()
.get(tx_info.height.value(), ns, submitted_commitment)
.await
.unwrap();
received_blob
.validate_with_commitment(&submitted_commitment, AppVersion::V3)
.unwrap();
}
#[async_test]
async fn blob_retrieve_unknown() {
let client = new_client().await;
let head = client.header().head().await.unwrap();
let ns = Namespace::new_v0(b"mydata").unwrap();
let commitment = Commitment::new(rand::random());
client
.blob()
.get(head.height().value(), ns, commitment)
.await
.unwrap_err();
}
#[allow(dead_code)]
#[allow(unused_variables)]
#[allow(unreachable_code)]
#[allow(clippy::diverging_sub_expression)]
async fn enforce_serde_bounds() {
let api = BlobApi::new(unimplemented!());
let blobs: Vec<_> = ensure_serializable_deserializable(unimplemented!());
let cfg = ensure_serializable_deserializable(unimplemented!());
ensure_serializable_deserializable(api.submit(&blobs, cfg).await.unwrap());
let namespace = ensure_serializable_deserializable(unimplemented!());
let commitment = ensure_serializable_deserializable(unimplemented!());
ensure_serializable_deserializable(api.get(0, namespace, commitment).await.unwrap());
let namespaces: Vec<_> = ensure_serializable_deserializable(unimplemented!());
ensure_serializable_deserializable(api.get_all(0, &namespaces).await.unwrap());
let namespace = ensure_serializable_deserializable(unimplemented!());
let commitment = ensure_serializable_deserializable(unimplemented!());
ensure_serializable_deserializable(api.get_proof(0, namespace, commitment).await.unwrap());
let namespace = ensure_serializable_deserializable(unimplemented!());
let proof = ensure_serializable_deserializable(unimplemented!());
let commitment = ensure_serializable_deserializable(unimplemented!());
ensure_serializable_deserializable(
api.included(0, namespace, &proof, commitment)
.await
.unwrap(),
);
let namespace = ensure_serializable_deserializable(unimplemented!());
ensure_serializable_deserializable(api.subscribe(namespace).next().await.unwrap().unwrap());
}
}