Skip to main content

tidepool_server/
upstream_http.rs

1//! Production `UpstreamClient` impl over `reqwest`. Plain JSON-RPC
2//! POST to the upstream URL.
3//!
4//! Why not `solana-client`: its typed `RpcClient` doesn't surface
5//! generic method dispatch, and we need to pass unknown methods
6//! through unchanged. reqwest is the de facto Rust HTTP client and
7//! what solana-client uses internally anyway.
8
9use std::time::Duration;
10
11use async_trait::async_trait;
12use reqwest::Client;
13use serde_json::{json, Value};
14
15use tidepool_rpc::upstream::{AccountData, UpstreamClient, UpstreamError, UpstreamResult};
16
17/// Max bytes we'll read from an off-chain metadata document. Metaplex
18/// JSON is a few KB; 2 MiB is a generous ceiling that still caps a
19/// hostile or runaway URI.
20const OFFCHAIN_MAX_BYTES: usize = 2 * 1024 * 1024;
21
22#[derive(Debug, Clone)]
23pub struct HttpUpstream {
24    client: Client,
25    url: String,
26    timeout: Duration,
27    /// When false, `fetch_uri` always returns `None` — disables
28    /// off-chain DAS metadata enrichment (the `--no-offchain-metadata`
29    /// flag). Useful for hermetic / fully-offline CI.
30    offchain_enabled: bool,
31}
32
33impl HttpUpstream {
34    pub fn new(url: impl Into<String>, timeout: Duration) -> Result<Self, UpstreamError> {
35        let client = Client::builder()
36            .timeout(timeout)
37            .build()
38            .map_err(|e| UpstreamError::Transport(e.to_string()))?;
39        Ok(Self {
40            client,
41            url: url.into(),
42            timeout,
43            offchain_enabled: true,
44        })
45    }
46
47    /// Toggle off-chain metadata fetching. Defaults to enabled.
48    #[must_use]
49    pub fn with_offchain_metadata(mut self, enabled: bool) -> Self {
50        self.offchain_enabled = enabled;
51        self
52    }
53
54    async fn post_rpc(&self, method: &str, params: Value) -> UpstreamResult<Value> {
55        let body = json!({
56            "jsonrpc": "2.0",
57            "id": 1,
58            "method": method,
59            "params": params,
60        });
61        let resp = self
62            .client
63            .post(&self.url)
64            .json(&body)
65            .send()
66            .await
67            .map_err(|e| {
68                if e.is_timeout() {
69                    UpstreamError::Timeout {
70                        millis: u64::try_from(self.timeout.as_millis()).unwrap_or(u64::MAX),
71                    }
72                } else {
73                    UpstreamError::Transport(e.to_string())
74                }
75            })?;
76        let json: Value = resp
77            .json()
78            .await
79            .map_err(|e| UpstreamError::Transport(format!("decode upstream body: {e}")))?;
80        if let Some(err) = json.get("error") {
81            return Err(UpstreamError::Rpc(err.to_string()));
82        }
83        Ok(json.get("result").cloned().unwrap_or(Value::Null))
84    }
85}
86
87#[async_trait]
88impl UpstreamClient for HttpUpstream {
89    async fn rpc_call(&self, method: &str, params: Value) -> UpstreamResult<Vec<u8>> {
90        let result = self.post_rpc(method, params).await?;
91        serde_json::to_vec(&result)
92            .map_err(|e| UpstreamError::Transport(format!("serialize result: {e}")))
93    }
94
95    async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
96        let params = json!([address, { "encoding": "base64" }]);
97        let result = self.post_rpc("getAccountInfo", params).await?;
98        // Response shape: { context: { slot }, value: AccountInfo | null }
99        let Some(value) = result.get("value") else {
100            return Ok(None);
101        };
102        if value.is_null() {
103            return Ok(None);
104        }
105
106        let owner = value
107            .get("owner")
108            .and_then(Value::as_str)
109            .ok_or_else(|| UpstreamError::Rpc("missing owner in getAccountInfo response".into()))?;
110        let lamports = value.get("lamports").and_then(Value::as_u64).unwrap_or(0);
111        let data_array = value
112            .get("data")
113            .and_then(Value::as_array)
114            .ok_or_else(|| UpstreamError::Rpc("missing data array in getAccountInfo".into()))?;
115        // Shape: [base64_data, encoding].
116        let b64 = data_array
117            .first()
118            .and_then(Value::as_str)
119            .ok_or_else(|| UpstreamError::Rpc("malformed data tuple".into()))?;
120
121        let data = base64_decode(b64)
122            .ok_or_else(|| UpstreamError::Rpc("base64-decode failed for account data".into()))?;
123        let owner_bytes = base58_decode_32(owner)
124            .ok_or_else(|| UpstreamError::Rpc("base58-decode owner failed".into()))?;
125
126        Ok(Some(AccountData {
127            data,
128            owner: owner_bytes,
129            lamports,
130        }))
131    }
132
133    /// Fetch off-chain metadata. Supports `http(s)://` (via reqwest,
134    /// inheriting the client timeout, capped at `OFFCHAIN_MAX_BYTES`)
135    /// and `file://` (local read, for dev-seeded metadata). Fail-soft:
136    /// every error path returns `None` so a `getAsset` degrades to its
137    /// on-chain fields rather than failing.
138    async fn fetch_uri(&self, uri: &str) -> Option<Vec<u8>> {
139        if !self.offchain_enabled {
140            return None;
141        }
142        if let Some(path) = uri.strip_prefix("file://") {
143            // file:///abs/path → "/abs/path"; file://host/path is rare
144            // for metadata, so we treat everything after the scheme as
145            // a filesystem path.
146            let bytes = tokio::fs::read(path).await.ok()?;
147            if bytes.len() > OFFCHAIN_MAX_BYTES {
148                return None;
149            }
150            return Some(bytes);
151        }
152        if uri.starts_with("http://") || uri.starts_with("https://") {
153            let resp = self.client.get(uri).send().await.ok()?;
154            if !resp.status().is_success() {
155                return None;
156            }
157            // Cap the body. content-length is advisory; enforce on the
158            // actual bytes too.
159            if let Some(len) = resp.content_length() {
160                if len > OFFCHAIN_MAX_BYTES as u64 {
161                    return None;
162                }
163            }
164            let bytes = resp.bytes().await.ok()?;
165            if bytes.len() > OFFCHAIN_MAX_BYTES {
166                return None;
167            }
168            return Some(bytes.to_vec());
169        }
170        // Unknown scheme (ipfs://, ar://, data:, …) — not resolved
171        // locally. Real Helius runs gateways for these; Tidepool
172        // leaves them to the consumer. Fail-soft.
173        None
174    }
175}
176
177// ─── small codec helpers ───────────────────────────────────────────
178
179fn base64_decode(s: &str) -> Option<Vec<u8>> {
180    // Hand-rolled base64 decoder to avoid adding `base64` crate for
181    // this single call site. Handles standard + URL-safe alphabets,
182    // ignores padding strictness.
183    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
184    const ALPHABET_URL: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
185    let mut table = [255u8; 256];
186    for (i, &b) in ALPHABET.iter().enumerate() {
187        table[b as usize] = i as u8;
188    }
189    for (i, &b) in ALPHABET_URL.iter().enumerate() {
190        table[b as usize] = i as u8;
191    }
192    let mut out: Vec<u8> = Vec::with_capacity(s.len() * 3 / 4);
193    let mut buf: u32 = 0;
194    let mut bits: u32 = 0;
195    for &b in s.as_bytes() {
196        if b == b'=' || b == b'\r' || b == b'\n' {
197            continue;
198        }
199        let v = table[b as usize];
200        if v == 255 {
201            return None;
202        }
203        buf = (buf << 6) | u32::from(v);
204        bits += 6;
205        if bits >= 8 {
206            bits -= 8;
207            out.push(((buf >> bits) & 0xff) as u8);
208        }
209    }
210    Some(out)
211}
212
213fn base58_decode_32(s: &str) -> Option<[u8; 32]> {
214    let bytes = bs58::decode(s).into_vec().ok()?;
215    bytes.try_into().ok()
216}