Skip to main content

tidepool_server/
upstream_http.rs

1//! Production `UpstreamClient` impl over `reqwest`. Plain JSON-RPC
2//! POST to the upstream URL.
3//!
4//! Why not `solana-client`: its typed `RpcClient` doesn't surface
5//! generic method dispatch, and we need to pass unknown methods
6//! through unchanged. reqwest is the de facto Rust HTTP client and
7//! what solana-client uses internally anyway.
8
9use std::time::Duration;
10
11use async_trait::async_trait;
12use reqwest::Client;
13use serde_json::{json, Value};
14
15use tidepool_rpc::upstream::{AccountData, UpstreamClient, UpstreamError, UpstreamResult};
16
17#[derive(Debug, Clone)]
18pub struct HttpUpstream {
19    client: Client,
20    url: String,
21    timeout: Duration,
22}
23
24impl HttpUpstream {
25    pub fn new(url: impl Into<String>, timeout: Duration) -> Result<Self, UpstreamError> {
26        let client = Client::builder()
27            .timeout(timeout)
28            .build()
29            .map_err(|e| UpstreamError::Transport(e.to_string()))?;
30        Ok(Self {
31            client,
32            url: url.into(),
33            timeout,
34        })
35    }
36
37    async fn post_rpc(&self, method: &str, params: Value) -> UpstreamResult<Value> {
38        let body = json!({
39            "jsonrpc": "2.0",
40            "id": 1,
41            "method": method,
42            "params": params,
43        });
44        let resp = self
45            .client
46            .post(&self.url)
47            .json(&body)
48            .send()
49            .await
50            .map_err(|e| {
51                if e.is_timeout() {
52                    UpstreamError::Timeout {
53                        millis: u64::try_from(self.timeout.as_millis()).unwrap_or(u64::MAX),
54                    }
55                } else {
56                    UpstreamError::Transport(e.to_string())
57                }
58            })?;
59        let json: Value = resp
60            .json()
61            .await
62            .map_err(|e| UpstreamError::Transport(format!("decode upstream body: {e}")))?;
63        if let Some(err) = json.get("error") {
64            return Err(UpstreamError::Rpc(err.to_string()));
65        }
66        Ok(json.get("result").cloned().unwrap_or(Value::Null))
67    }
68}
69
70#[async_trait]
71impl UpstreamClient for HttpUpstream {
72    async fn rpc_call(&self, method: &str, params: Value) -> UpstreamResult<Vec<u8>> {
73        let result = self.post_rpc(method, params).await?;
74        serde_json::to_vec(&result)
75            .map_err(|e| UpstreamError::Transport(format!("serialize result: {e}")))
76    }
77
78    async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
79        let params = json!([address, { "encoding": "base64" }]);
80        let result = self.post_rpc("getAccountInfo", params).await?;
81        // Response shape: { context: { slot }, value: AccountInfo | null }
82        let Some(value) = result.get("value") else {
83            return Ok(None);
84        };
85        if value.is_null() {
86            return Ok(None);
87        }
88
89        let owner = value
90            .get("owner")
91            .and_then(Value::as_str)
92            .ok_or_else(|| UpstreamError::Rpc("missing owner in getAccountInfo response".into()))?;
93        let lamports = value.get("lamports").and_then(Value::as_u64).unwrap_or(0);
94        let data_array = value
95            .get("data")
96            .and_then(Value::as_array)
97            .ok_or_else(|| UpstreamError::Rpc("missing data array in getAccountInfo".into()))?;
98        // Shape: [base64_data, encoding].
99        let b64 = data_array
100            .first()
101            .and_then(Value::as_str)
102            .ok_or_else(|| UpstreamError::Rpc("malformed data tuple".into()))?;
103
104        let data = base64_decode(b64)
105            .ok_or_else(|| UpstreamError::Rpc("base64-decode failed for account data".into()))?;
106        let owner_bytes = base58_decode_32(owner)
107            .ok_or_else(|| UpstreamError::Rpc("base58-decode owner failed".into()))?;
108
109        Ok(Some(AccountData {
110            data,
111            owner: owner_bytes,
112            lamports,
113        }))
114    }
115}
116
117// ─── small codec helpers ───────────────────────────────────────────
118
119fn base64_decode(s: &str) -> Option<Vec<u8>> {
120    // Hand-rolled base64 decoder to avoid adding `base64` crate for
121    // this single call site. Handles standard + URL-safe alphabets,
122    // ignores padding strictness.
123    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
124    const ALPHABET_URL: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
125    let mut table = [255u8; 256];
126    for (i, &b) in ALPHABET.iter().enumerate() {
127        table[b as usize] = i as u8;
128    }
129    for (i, &b) in ALPHABET_URL.iter().enumerate() {
130        table[b as usize] = i as u8;
131    }
132    let mut out: Vec<u8> = Vec::with_capacity(s.len() * 3 / 4);
133    let mut buf: u32 = 0;
134    let mut bits: u32 = 0;
135    for &b in s.as_bytes() {
136        if b == b'=' || b == b'\r' || b == b'\n' {
137            continue;
138        }
139        let v = table[b as usize];
140        if v == 255 {
141            return None;
142        }
143        buf = (buf << 6) | u32::from(v);
144        bits += 6;
145        if bits >= 8 {
146            bits -= 8;
147            out.push(((buf >> bits) & 0xff) as u8);
148        }
149    }
150    Some(out)
151}
152
153fn base58_decode_32(s: &str) -> Option<[u8; 32]> {
154    let bytes = bs58::decode(s).into_vec().ok()?;
155    bytes.try_into().ok()
156}