Skip to main content

tidepool_server/
upstream_http.rs

1//! Production `UpstreamClient` impl over `reqwest`. Plain JSON-RPC
2//! POST to the upstream URL.
3//!
4//! Why not `solana-client`: its typed `RpcClient` doesn't surface
5//! generic method dispatch, and we need to pass unknown methods
6//! through unchanged. reqwest is the de facto Rust HTTP client and
7//! what solana-client uses internally anyway.
8
9use std::time::Duration;
10
11use async_trait::async_trait;
12use reqwest::Client;
13use serde_json::{json, Value};
14
15use tidepool_rpc::upstream::{AccountData, UpstreamClient, UpstreamError, UpstreamResult};
16
17/// Max bytes we'll read from an off-chain metadata document. Metaplex
18/// JSON is a few KB; 2 MiB is a generous ceiling that still caps a
19/// hostile or runaway URI.
20const OFFCHAIN_MAX_BYTES: usize = 2 * 1024 * 1024;
21
22/// Dedicated timeout for off-chain metadata fetches. Deliberately
23/// short and independent of the (longer) RPC timeout: a slow or dead
24/// metadata URI must not stall `getAsset`. Especially matters in
25/// network-restricted CI, where the fetch fails fast and degrades to
26/// on-chain fields rather than blocking for the whole RPC timeout.
27const OFFCHAIN_TIMEOUT: Duration = Duration::from_secs(3);
28
29#[derive(Debug, Clone)]
30pub struct HttpUpstream {
31    client: Client,
32    url: String,
33    timeout: Duration,
34    /// When false, `fetch_uri` always returns `None` — disables
35    /// off-chain DAS metadata enrichment (the `--no-offchain-metadata`
36    /// flag). Useful for hermetic / fully-offline CI.
37    offchain_enabled: bool,
38}
39
40impl HttpUpstream {
41    pub fn new(url: impl Into<String>, timeout: Duration) -> Result<Self, UpstreamError> {
42        let client = Client::builder()
43            .timeout(timeout)
44            .build()
45            .map_err(|e| UpstreamError::Transport(e.to_string()))?;
46        Ok(Self {
47            client,
48            url: url.into(),
49            timeout,
50            offchain_enabled: true,
51        })
52    }
53
54    /// Toggle off-chain metadata fetching. Defaults to enabled.
55    #[must_use]
56    pub fn with_offchain_metadata(mut self, enabled: bool) -> Self {
57        self.offchain_enabled = enabled;
58        self
59    }
60
61    async fn post_rpc(&self, method: &str, params: Value) -> UpstreamResult<Value> {
62        let body = json!({
63            "jsonrpc": "2.0",
64            "id": 1,
65            "method": method,
66            "params": params,
67        });
68        let resp = self
69            .client
70            .post(&self.url)
71            .json(&body)
72            .send()
73            .await
74            .map_err(|e| {
75                if e.is_timeout() {
76                    UpstreamError::Timeout {
77                        millis: u64::try_from(self.timeout.as_millis()).unwrap_or(u64::MAX),
78                    }
79                } else {
80                    UpstreamError::Transport(e.to_string())
81                }
82            })?;
83        let json: Value = resp
84            .json()
85            .await
86            .map_err(|e| UpstreamError::Transport(format!("decode upstream body: {e}")))?;
87        if let Some(err) = json.get("error") {
88            return Err(UpstreamError::Rpc(err.to_string()));
89        }
90        Ok(json.get("result").cloned().unwrap_or(Value::Null))
91    }
92}
93
94#[async_trait]
95impl UpstreamClient for HttpUpstream {
96    async fn rpc_call(&self, method: &str, params: Value) -> UpstreamResult<Vec<u8>> {
97        let result = self.post_rpc(method, params).await?;
98        serde_json::to_vec(&result)
99            .map_err(|e| UpstreamError::Transport(format!("serialize result: {e}")))
100    }
101
102    async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
103        let params = json!([address, { "encoding": "base64" }]);
104        let result = self.post_rpc("getAccountInfo", params).await?;
105        // Response shape: { context: { slot }, value: AccountInfo | null }
106        let Some(value) = result.get("value") else {
107            return Ok(None);
108        };
109        if value.is_null() {
110            return Ok(None);
111        }
112
113        let owner = value
114            .get("owner")
115            .and_then(Value::as_str)
116            .ok_or_else(|| UpstreamError::Rpc("missing owner in getAccountInfo response".into()))?;
117        let lamports = value.get("lamports").and_then(Value::as_u64).unwrap_or(0);
118        let data_array = value
119            .get("data")
120            .and_then(Value::as_array)
121            .ok_or_else(|| UpstreamError::Rpc("missing data array in getAccountInfo".into()))?;
122        // Shape: [base64_data, encoding].
123        let b64 = data_array
124            .first()
125            .and_then(Value::as_str)
126            .ok_or_else(|| UpstreamError::Rpc("malformed data tuple".into()))?;
127
128        let data = base64_decode(b64)
129            .ok_or_else(|| UpstreamError::Rpc("base64-decode failed for account data".into()))?;
130        let owner_bytes = base58_decode_32(owner)
131            .ok_or_else(|| UpstreamError::Rpc("base58-decode owner failed".into()))?;
132
133        Ok(Some(AccountData {
134            data,
135            owner: owner_bytes,
136            lamports,
137        }))
138    }
139
140    /// Fetch off-chain metadata. Supports `http(s)://` (via reqwest,
141    /// inheriting the client timeout, capped at `OFFCHAIN_MAX_BYTES`)
142    /// and `file://` (local read, for dev-seeded metadata). Fail-soft:
143    /// every error path returns `None` so a `getAsset` degrades to its
144    /// on-chain fields rather than failing.
145    async fn fetch_uri(&self, uri: &str) -> Option<Vec<u8>> {
146        if !self.offchain_enabled {
147            return None;
148        }
149        if let Some(path) = uri.strip_prefix("file://") {
150            // file:///abs/path → "/abs/path"; file://host/path is rare
151            // for metadata, so we treat everything after the scheme as
152            // a filesystem path.
153            let bytes = tokio::fs::read(path).await.ok()?;
154            if bytes.len() > OFFCHAIN_MAX_BYTES {
155                return None;
156            }
157            return Some(bytes);
158        }
159        if uri.starts_with("http://") || uri.starts_with("https://") {
160            let resp = self
161                .client
162                .get(uri)
163                .timeout(OFFCHAIN_TIMEOUT)
164                .send()
165                .await
166                .ok()?;
167            if !resp.status().is_success() {
168                return None;
169            }
170            // Cap the body. content-length is advisory; enforce on the
171            // actual bytes too.
172            if let Some(len) = resp.content_length() {
173                if len > OFFCHAIN_MAX_BYTES as u64 {
174                    return None;
175                }
176            }
177            let bytes = resp.bytes().await.ok()?;
178            if bytes.len() > OFFCHAIN_MAX_BYTES {
179                return None;
180            }
181            return Some(bytes.to_vec());
182        }
183        // Unknown scheme (ipfs://, ar://, data:, …) — not resolved
184        // locally. Real Helius runs gateways for these; Tidepool
185        // leaves them to the consumer. Fail-soft.
186        None
187    }
188}
189
190// ─── small codec helpers ───────────────────────────────────────────
191
192fn base64_decode(s: &str) -> Option<Vec<u8>> {
193    // Hand-rolled base64 decoder to avoid adding `base64` crate for
194    // this single call site. Handles standard + URL-safe alphabets,
195    // ignores padding strictness.
196    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
197    const ALPHABET_URL: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
198    let mut table = [255u8; 256];
199    for (i, &b) in ALPHABET.iter().enumerate() {
200        table[b as usize] = i as u8;
201    }
202    for (i, &b) in ALPHABET_URL.iter().enumerate() {
203        table[b as usize] = i as u8;
204    }
205    let mut out: Vec<u8> = Vec::with_capacity(s.len() * 3 / 4);
206    let mut buf: u32 = 0;
207    let mut bits: u32 = 0;
208    for &b in s.as_bytes() {
209        if b == b'=' || b == b'\r' || b == b'\n' {
210            continue;
211        }
212        let v = table[b as usize];
213        if v == 255 {
214            return None;
215        }
216        buf = (buf << 6) | u32::from(v);
217        bits += 6;
218        if bits >= 8 {
219            bits -= 8;
220            out.push(((buf >> bits) & 0xff) as u8);
221        }
222    }
223    Some(out)
224}
225
226fn base58_decode_32(s: &str) -> Option<[u8; 32]> {
227    let bytes = bs58::decode(s).into_vec().ok()?;
228    bytes.try_into().ok()
229}