battleware_client/
client.rs1use crate::{events::Stream, Error, Result};
2use battleware_types::{
3 api::{Lookup, Pending, Submission, Summary, Update, UpdatesFilter},
4 execution::{Key, Seed, Transaction},
5 Identity,
6};
7use commonware_codec::{DecodeExt, Encode};
8use commonware_cryptography::{Hasher, Sha256};
9use commonware_utils::hex;
10use reqwest::Client as HttpClient;
11use std::time::Duration;
12use tokio::time::timeout;
13use tokio_tungstenite::connect_async;
14use tracing::{debug, info};
15use url::Url;
16
17const TIMEOUT: Duration = Duration::from_secs(30);
19
20#[derive(Clone)]
22pub struct Client {
23 pub base_url: Url,
24 pub ws_url: Url,
25 pub http_client: HttpClient,
26
27 pub identity: Identity,
28}
29
30impl Client {
31 #[allow(clippy::result_large_err)]
33 pub fn new(base_url: &str, identity: Identity) -> Self {
34 let base_url = Url::parse(base_url).unwrap();
35
36 let ws_scheme = match base_url.scheme() {
38 "http" => "ws",
39 "https" => "wss",
40 scheme => {
41 panic!("Invalid scheme: {scheme}");
42 }
43 };
44
45 let mut ws_url = base_url.clone();
46 ws_url.set_scheme(ws_scheme).unwrap();
47
48 let http_client = HttpClient::builder().timeout(TIMEOUT).build().unwrap();
49
50 Self {
51 base_url,
52 ws_url,
53 http_client,
54 identity,
55 }
56 }
57
58 pub async fn submit_transactions(&self, txs: Vec<Transaction>) -> Result<()> {
60 let submission = Submission::Transactions(txs);
61 self.submit(submission).await
62 }
63
64 pub async fn submit_summary(&self, summary: Summary) -> Result<()> {
65 let submission = Submission::Summary(summary);
66 self.submit(submission).await
67 }
68
69 pub async fn submit_seed(&self, seed: Seed) -> Result<()> {
70 let submission = Submission::Seed(seed);
71 self.submit(submission).await
72 }
73
74 async fn submit(&self, submission: Submission) -> Result<()> {
75 let encoded = submission.encode();
76 let url = self.base_url.join("submit")?;
77 debug!("Submitting to {}", url);
78
79 let response = self
80 .http_client
81 .post(url)
82 .body(encoded.to_vec())
83 .send()
84 .await?;
85 if !response.status().is_success() {
86 return Err(Error::Failed(response.status()));
87 }
88 Ok(())
89 }
90
91 pub async fn query_state(&self, key: &Key) -> Result<Option<Lookup>> {
93 let key_hash = Sha256::hash(&key.encode());
95 let url = self.base_url.join(&format!(
96 "state/{}",
97 commonware_utils::hex(&key_hash.encode())
98 ))?;
99 let response = self.http_client.get(url).send().await?;
100
101 match response.status() {
103 reqwest::StatusCode::OK => {
104 let buf = response.bytes().await?.to_vec();
105 let lookup = Lookup::decode(&mut buf.as_slice())?;
106
107 if !lookup.verify(&self.identity) {
109 return Err(Error::InvalidSignature);
110 }
111
112 Ok(Some(lookup))
113 }
114 reqwest::StatusCode::NOT_FOUND => Ok(None),
115 _ => Err(Error::Failed(response.status())),
116 }
117 }
118
119 pub async fn connect_updates(&self, filter: UpdatesFilter) -> Result<Stream<Update>> {
121 let filter = hex(&filter.encode());
122 let ws_url = self.ws_url.join(&format!("updates/{filter}"))?;
123 info!(
124 "Connecting to WebSocket at {} with filter {:?}",
125 ws_url, filter
126 );
127
128 let (ws_stream, _) = timeout(TIMEOUT, connect_async(ws_url.as_str()))
129 .await
130 .map_err(|_| Error::DialTimeout)??;
131 info!("WebSocket connected");
132
133 Ok(Stream::new_with_verifier(ws_stream, self.identity))
134 }
135
136 pub async fn connect_mempool(&self) -> Result<Stream<Pending>> {
138 let ws_url = self.ws_url.join("mempool")?;
139 info!("Connecting to WebSocket at {}", ws_url);
140
141 let (ws_stream, _) = timeout(TIMEOUT, connect_async(ws_url.as_str()))
142 .await
143 .map_err(|_| Error::DialTimeout)??;
144 info!("WebSocket connected");
145
146 Ok(Stream::new(ws_stream))
147 }
148}