battleware_client/
client.rs

1use crate::{events::Stream, Error, Result};
2use battleware_types::{
3    api::{Lookup, Pending, Submission, Summary, Update, UpdatesFilter},
4    execution::{Key, Seed, Transaction},
5    Identity,
6};
7use commonware_codec::{DecodeExt, Encode};
8use commonware_cryptography::{Hasher, Sha256};
9use commonware_utils::hex;
10use reqwest::Client as HttpClient;
11use std::time::Duration;
12use tokio::time::timeout;
13use tokio_tungstenite::connect_async;
14use tracing::{debug, info};
15use url::Url;
16
17/// Timeout for connections and requests
18const TIMEOUT: Duration = Duration::from_secs(30);
19
20/// Battleware API client
21#[derive(Clone)]
22pub struct Client {
23    pub base_url: Url,
24    pub ws_url: Url,
25    pub http_client: HttpClient,
26
27    pub identity: Identity,
28}
29
30impl Client {
31    /// Create a new client
32    #[allow(clippy::result_large_err)]
33    pub fn new(base_url: &str, identity: Identity) -> Self {
34        let base_url = Url::parse(base_url).unwrap();
35
36        // Convert http(s) to ws(s) for WebSocket URL
37        let ws_scheme = match base_url.scheme() {
38            "http" => "ws",
39            "https" => "wss",
40            scheme => {
41                panic!("Invalid scheme: {scheme}");
42            }
43        };
44
45        let mut ws_url = base_url.clone();
46        ws_url.set_scheme(ws_scheme).unwrap();
47
48        let http_client = HttpClient::builder().timeout(TIMEOUT).build().unwrap();
49
50        Self {
51            base_url,
52            ws_url,
53            http_client,
54            identity,
55        }
56    }
57
58    /// Submit a transaction
59    pub async fn submit_transactions(&self, txs: Vec<Transaction>) -> Result<()> {
60        let submission = Submission::Transactions(txs);
61        self.submit(submission).await
62    }
63
64    pub async fn submit_summary(&self, summary: Summary) -> Result<()> {
65        let submission = Submission::Summary(summary);
66        self.submit(submission).await
67    }
68
69    pub async fn submit_seed(&self, seed: Seed) -> Result<()> {
70        let submission = Submission::Seed(seed);
71        self.submit(submission).await
72    }
73
74    async fn submit(&self, submission: Submission) -> Result<()> {
75        let encoded = submission.encode();
76        let url = self.base_url.join("submit")?;
77        debug!("Submitting to {}", url);
78
79        let response = self
80            .http_client
81            .post(url)
82            .body(encoded.to_vec())
83            .send()
84            .await?;
85        if !response.status().is_success() {
86            return Err(Error::Failed(response.status()));
87        }
88        Ok(())
89    }
90
91    /// Query state by key
92    pub async fn query_state(&self, key: &Key) -> Result<Option<Lookup>> {
93        // Make request
94        let key_hash = Sha256::hash(&key.encode());
95        let url = self.base_url.join(&format!(
96            "state/{}",
97            commonware_utils::hex(&key_hash.encode())
98        ))?;
99        let response = self.http_client.get(url).send().await?;
100
101        // Parse response
102        match response.status() {
103            reqwest::StatusCode::OK => {
104                let buf = response.bytes().await?.to_vec();
105                let lookup = Lookup::decode(&mut buf.as_slice())?;
106
107                // Verify the lookup
108                if !lookup.verify(&self.identity) {
109                    return Err(Error::InvalidSignature);
110                }
111
112                Ok(Some(lookup))
113            }
114            reqwest::StatusCode::NOT_FOUND => Ok(None),
115            _ => Err(Error::Failed(response.status())),
116        }
117    }
118
119    /// Connect to the updates stream with the specified filter
120    pub async fn connect_updates(&self, filter: UpdatesFilter) -> Result<Stream<Update>> {
121        let filter = hex(&filter.encode());
122        let ws_url = self.ws_url.join(&format!("updates/{filter}"))?;
123        info!(
124            "Connecting to WebSocket at {} with filter {:?}",
125            ws_url, filter
126        );
127
128        let (ws_stream, _) = timeout(TIMEOUT, connect_async(ws_url.as_str()))
129            .await
130            .map_err(|_| Error::DialTimeout)??;
131        info!("WebSocket connected");
132
133        Ok(Stream::new_with_verifier(ws_stream, self.identity))
134    }
135
136    /// Connect to the mempool stream (transactions)
137    pub async fn connect_mempool(&self) -> Result<Stream<Pending>> {
138        let ws_url = self.ws_url.join("mempool")?;
139        info!("Connecting to WebSocket at {}", ws_url);
140
141        let (ws_stream, _) = timeout(TIMEOUT, connect_async(ws_url.as_str()))
142            .await
143            .map_err(|_| Error::DialTimeout)??;
144        info!("WebSocket connected");
145
146        Ok(Stream::new(ws_stream))
147    }
148}