tidepool_server/
upstream_http.rs1use std::time::Duration;
10
11use async_trait::async_trait;
12use reqwest::Client;
13use serde_json::{json, Value};
14
15use tidepool_rpc::upstream::{AccountData, UpstreamClient, UpstreamError, UpstreamResult};
16
17const OFFCHAIN_MAX_BYTES: usize = 2 * 1024 * 1024;
21
22#[derive(Debug, Clone)]
23pub struct HttpUpstream {
24 client: Client,
25 url: String,
26 timeout: Duration,
27 offchain_enabled: bool,
31}
32
33impl HttpUpstream {
34 pub fn new(url: impl Into<String>, timeout: Duration) -> Result<Self, UpstreamError> {
35 let client = Client::builder()
36 .timeout(timeout)
37 .build()
38 .map_err(|e| UpstreamError::Transport(e.to_string()))?;
39 Ok(Self {
40 client,
41 url: url.into(),
42 timeout,
43 offchain_enabled: true,
44 })
45 }
46
47 #[must_use]
49 pub fn with_offchain_metadata(mut self, enabled: bool) -> Self {
50 self.offchain_enabled = enabled;
51 self
52 }
53
54 async fn post_rpc(&self, method: &str, params: Value) -> UpstreamResult<Value> {
55 let body = json!({
56 "jsonrpc": "2.0",
57 "id": 1,
58 "method": method,
59 "params": params,
60 });
61 let resp = self
62 .client
63 .post(&self.url)
64 .json(&body)
65 .send()
66 .await
67 .map_err(|e| {
68 if e.is_timeout() {
69 UpstreamError::Timeout {
70 millis: u64::try_from(self.timeout.as_millis()).unwrap_or(u64::MAX),
71 }
72 } else {
73 UpstreamError::Transport(e.to_string())
74 }
75 })?;
76 let json: Value = resp
77 .json()
78 .await
79 .map_err(|e| UpstreamError::Transport(format!("decode upstream body: {e}")))?;
80 if let Some(err) = json.get("error") {
81 return Err(UpstreamError::Rpc(err.to_string()));
82 }
83 Ok(json.get("result").cloned().unwrap_or(Value::Null))
84 }
85}
86
87#[async_trait]
88impl UpstreamClient for HttpUpstream {
89 async fn rpc_call(&self, method: &str, params: Value) -> UpstreamResult<Vec<u8>> {
90 let result = self.post_rpc(method, params).await?;
91 serde_json::to_vec(&result)
92 .map_err(|e| UpstreamError::Transport(format!("serialize result: {e}")))
93 }
94
95 async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
96 let params = json!([address, { "encoding": "base64" }]);
97 let result = self.post_rpc("getAccountInfo", params).await?;
98 let Some(value) = result.get("value") else {
100 return Ok(None);
101 };
102 if value.is_null() {
103 return Ok(None);
104 }
105
106 let owner = value
107 .get("owner")
108 .and_then(Value::as_str)
109 .ok_or_else(|| UpstreamError::Rpc("missing owner in getAccountInfo response".into()))?;
110 let lamports = value.get("lamports").and_then(Value::as_u64).unwrap_or(0);
111 let data_array = value
112 .get("data")
113 .and_then(Value::as_array)
114 .ok_or_else(|| UpstreamError::Rpc("missing data array in getAccountInfo".into()))?;
115 let b64 = data_array
117 .first()
118 .and_then(Value::as_str)
119 .ok_or_else(|| UpstreamError::Rpc("malformed data tuple".into()))?;
120
121 let data = base64_decode(b64)
122 .ok_or_else(|| UpstreamError::Rpc("base64-decode failed for account data".into()))?;
123 let owner_bytes = base58_decode_32(owner)
124 .ok_or_else(|| UpstreamError::Rpc("base58-decode owner failed".into()))?;
125
126 Ok(Some(AccountData {
127 data,
128 owner: owner_bytes,
129 lamports,
130 }))
131 }
132
133 async fn fetch_uri(&self, uri: &str) -> Option<Vec<u8>> {
139 if !self.offchain_enabled {
140 return None;
141 }
142 if let Some(path) = uri.strip_prefix("file://") {
143 let bytes = tokio::fs::read(path).await.ok()?;
147 if bytes.len() > OFFCHAIN_MAX_BYTES {
148 return None;
149 }
150 return Some(bytes);
151 }
152 if uri.starts_with("http://") || uri.starts_with("https://") {
153 let resp = self.client.get(uri).send().await.ok()?;
154 if !resp.status().is_success() {
155 return None;
156 }
157 if let Some(len) = resp.content_length() {
160 if len > OFFCHAIN_MAX_BYTES as u64 {
161 return None;
162 }
163 }
164 let bytes = resp.bytes().await.ok()?;
165 if bytes.len() > OFFCHAIN_MAX_BYTES {
166 return None;
167 }
168 return Some(bytes.to_vec());
169 }
170 None
174 }
175}
176
177fn base64_decode(s: &str) -> Option<Vec<u8>> {
180 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
184 const ALPHABET_URL: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
185 let mut table = [255u8; 256];
186 for (i, &b) in ALPHABET.iter().enumerate() {
187 table[b as usize] = i as u8;
188 }
189 for (i, &b) in ALPHABET_URL.iter().enumerate() {
190 table[b as usize] = i as u8;
191 }
192 let mut out: Vec<u8> = Vec::with_capacity(s.len() * 3 / 4);
193 let mut buf: u32 = 0;
194 let mut bits: u32 = 0;
195 for &b in s.as_bytes() {
196 if b == b'=' || b == b'\r' || b == b'\n' {
197 continue;
198 }
199 let v = table[b as usize];
200 if v == 255 {
201 return None;
202 }
203 buf = (buf << 6) | u32::from(v);
204 bits += 6;
205 if bits >= 8 {
206 bits -= 8;
207 out.push(((buf >> bits) & 0xff) as u8);
208 }
209 }
210 Some(out)
211}
212
213fn base58_decode_32(s: &str) -> Option<[u8; 32]> {
214 let bytes = bs58::decode(s).into_vec().ok()?;
215 bytes.try_into().ok()
216}