use crate::{Client, Error, IndexQuery, Query};
use alto_types::{Block, Finalized, Kind, Notarized, Seed};
use commonware_codec::{DecodeExt, Encode};
use commonware_consensus::Viewable;
use commonware_cryptography::Digestible;
use commonware_parallel::Strategy;
use futures::{channel::mpsc::unbounded, Stream, StreamExt};
use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::Message as TMessage};
fn seed_upload_path(base: String) -> String {
format!("{base}/seed")
}
fn seed_get_path(base: String, query: &IndexQuery) -> String {
format!("{base}/seed/{}", query.serialize())
}
fn notarization_upload_path(base: String) -> String {
format!("{base}/notarization")
}
fn notarization_get_path(base: String, query: &IndexQuery) -> String {
format!("{base}/notarization/{}", query.serialize())
}
fn finalization_upload_path(base: String) -> String {
format!("{base}/finalization")
}
fn finalization_get_path(base: String, query: &IndexQuery) -> String {
format!("{base}/finalization/{}", query.serialize())
}
fn block_upload_path(base: String) -> String {
format!("{base}/block")
}
fn block_get_path(base: String, query: &Query) -> String {
format!("{base}/block/{}", query.serialize())
}
fn listen_path(base: String) -> String {
format!("{base}/consensus/ws")
}
pub enum Payload {
Finalized(Box<Finalized>),
Block(Box<Block>),
}
pub enum Message {
Seed(Seed),
Notarization(Notarized),
Finalization(Finalized),
}
impl<S: Strategy> Client<S> {
pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
let result = self
.http_client
.post(seed_upload_path(self.uri.clone()))
.body(seed.encode().to_vec())
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
Ok(())
}
pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
let result = self
.http_client
.get(seed_get_path(self.uri.clone(), &query))
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
let bytes = result.bytes().await.map_err(Error::Reqwest)?;
let seed = Seed::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
if self.verify && !seed.verify(&self.certificate_verifier) {
return Err(Error::InvalidSignature);
}
match query {
IndexQuery::Latest => {}
IndexQuery::Index(index) => {
if seed.view().get() != index {
return Err(Error::UnexpectedResponse);
}
}
}
Ok(seed)
}
pub async fn notarized_upload(&self, notarized: Notarized) -> Result<(), Error> {
let result = self
.http_client
.post(notarization_upload_path(self.uri.clone()))
.body(notarized.encode().to_vec())
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
Ok(())
}
pub async fn notarized_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
let result = self
.http_client
.get(notarization_get_path(self.uri.clone(), &query))
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
let bytes = result.bytes().await.map_err(Error::Reqwest)?;
let notarized = Notarized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
if self.verify && !notarized.verify(&self.certificate_verifier, &self.strategy) {
return Err(Error::InvalidSignature);
}
match query {
IndexQuery::Latest => {}
IndexQuery::Index(index) => {
if notarized.proof.view().get() != index {
return Err(Error::UnexpectedResponse);
}
}
}
Ok(notarized)
}
pub async fn finalized_upload(&self, finalized: Finalized) -> Result<(), Error> {
let result = self
.http_client
.post(finalization_upload_path(self.uri.clone()))
.body(finalized.encode().to_vec())
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
Ok(())
}
pub async fn finalized_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
let result = self
.http_client
.get(finalization_get_path(self.uri.clone(), &query))
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
let bytes = result.bytes().await.map_err(Error::Reqwest)?;
let finalized = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
if self.verify && !finalized.verify(&self.certificate_verifier, &self.strategy) {
return Err(Error::InvalidSignature);
}
match query {
IndexQuery::Latest => {}
IndexQuery::Index(index) => {
if finalized.proof.view().get() != index {
return Err(Error::UnexpectedResponse);
}
}
}
Ok(finalized)
}
pub async fn block_upload(&self, block: Block) -> Result<(), Error> {
let result = self
.http_client
.post(block_upload_path(self.uri.clone()))
.body(block.encode().to_vec())
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
Ok(())
}
pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
let result = self
.http_client
.get(block_get_path(self.uri.clone(), &query))
.send()
.await
.map_err(Error::Reqwest)?;
if !result.status().is_success() {
return Err(Error::Failed(result.status()));
}
let bytes = result.bytes().await.map_err(Error::Reqwest)?;
let result = match query {
Query::Latest => {
let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
if self.verify && !result.verify(&self.certificate_verifier, &self.strategy) {
return Err(Error::InvalidSignature);
}
Payload::Finalized(Box::new(result))
}
Query::Index(index) => {
let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
if self.verify && !result.verify(&self.certificate_verifier, &self.strategy) {
return Err(Error::InvalidSignature);
}
if result.block.height.get() != index {
return Err(Error::UnexpectedResponse);
}
Payload::Finalized(Box::new(result))
}
Query::Digest(digest) => {
let result = Block::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
if result.digest() != digest {
return Err(Error::UnexpectedResponse);
}
Payload::Block(Box::new(result))
}
};
Ok(result)
}
pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
let (stream, _) = connect_async_tls_with_config(
listen_path(self.ws_uri.clone()),
None,
false,
Some(self.ws_connector.clone()),
)
.await
.map_err(Error::from)?;
let (_, read) = stream.split();
let (sender, receiver) = unbounded();
tokio::spawn({
let certificate_verifier = self.certificate_verifier.clone();
let strategy = self.strategy.clone();
let verify = self.verify;
async move {
read.for_each(|message| async {
match message {
Ok(TMessage::Binary(data)) => {
let kind = data[0];
let Some(kind) = Kind::from_u8(kind) else {
let _ = sender.unbounded_send(Err(Error::UnexpectedResponse));
return;
};
let data = &data[1..];
match kind {
Kind::Seed => {
let result = Seed::decode(data);
match result {
Ok(seed) => {
if verify && !seed.verify(&certificate_verifier) {
let _ = sender
.unbounded_send(Err(Error::InvalidSignature));
return;
}
let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
}
Err(e) => {
let _ =
sender.unbounded_send(Err(Error::InvalidData(e)));
}
}
}
Kind::Notarization => {
let result = Notarized::decode(data);
match result {
Ok(notarized) => {
if verify
&& !notarized
.verify(&certificate_verifier, &strategy)
{
let _ = sender
.unbounded_send(Err(Error::InvalidSignature));
return;
}
let _ = sender.unbounded_send(Ok(
Message::Notarization(notarized),
));
}
Err(e) => {
let _ =
sender.unbounded_send(Err(Error::InvalidData(e)));
}
}
}
Kind::Finalization => {
let result = Finalized::decode(data);
match result {
Ok(finalized) => {
if verify
&& !finalized
.verify(&certificate_verifier, &strategy)
{
let _ = sender
.unbounded_send(Err(Error::InvalidSignature));
return;
}
let _ = sender.unbounded_send(Ok(
Message::Finalization(finalized),
));
}
Err(e) => {
let _ =
sender.unbounded_send(Err(Error::InvalidData(e)));
}
}
}
}
}
Ok(_) => {} Err(e) => {
let _ = sender.unbounded_send(Err(Error::from(e)));
}
}
})
.await;
}
});
Ok(receiver)
}
}