tidepool_server/
upstream_http.rs1use std::time::Duration;
10
11use async_trait::async_trait;
12use reqwest::Client;
13use serde_json::{json, Value};
14
15use tidepool_rpc::upstream::{AccountData, UpstreamClient, UpstreamError, UpstreamResult};
16
17const OFFCHAIN_MAX_BYTES: usize = 2 * 1024 * 1024;
21
22const OFFCHAIN_TIMEOUT: Duration = Duration::from_secs(3);
28
29#[derive(Debug, Clone)]
30pub struct HttpUpstream {
31 client: Client,
32 url: String,
33 timeout: Duration,
34 offchain_enabled: bool,
38}
39
40impl HttpUpstream {
41 pub fn new(url: impl Into<String>, timeout: Duration) -> Result<Self, UpstreamError> {
42 let client = Client::builder()
43 .timeout(timeout)
44 .build()
45 .map_err(|e| UpstreamError::Transport(e.to_string()))?;
46 Ok(Self {
47 client,
48 url: url.into(),
49 timeout,
50 offchain_enabled: true,
51 })
52 }
53
54 #[must_use]
56 pub fn with_offchain_metadata(mut self, enabled: bool) -> Self {
57 self.offchain_enabled = enabled;
58 self
59 }
60
61 async fn post_rpc(&self, method: &str, params: Value) -> UpstreamResult<Value> {
62 let body = json!({
63 "jsonrpc": "2.0",
64 "id": 1,
65 "method": method,
66 "params": params,
67 });
68 let resp = self
69 .client
70 .post(&self.url)
71 .json(&body)
72 .send()
73 .await
74 .map_err(|e| {
75 if e.is_timeout() {
76 UpstreamError::Timeout {
77 millis: u64::try_from(self.timeout.as_millis()).unwrap_or(u64::MAX),
78 }
79 } else {
80 UpstreamError::Transport(e.to_string())
81 }
82 })?;
83 let json: Value = resp
84 .json()
85 .await
86 .map_err(|e| UpstreamError::Transport(format!("decode upstream body: {e}")))?;
87 if let Some(err) = json.get("error") {
88 return Err(UpstreamError::Rpc(err.to_string()));
89 }
90 Ok(json.get("result").cloned().unwrap_or(Value::Null))
91 }
92}
93
94#[async_trait]
95impl UpstreamClient for HttpUpstream {
96 async fn rpc_call(&self, method: &str, params: Value) -> UpstreamResult<Vec<u8>> {
97 let result = self.post_rpc(method, params).await?;
98 serde_json::to_vec(&result)
99 .map_err(|e| UpstreamError::Transport(format!("serialize result: {e}")))
100 }
101
102 async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
103 let params = json!([address, { "encoding": "base64" }]);
104 let result = self.post_rpc("getAccountInfo", params).await?;
105 let Some(value) = result.get("value") else {
107 return Ok(None);
108 };
109 if value.is_null() {
110 return Ok(None);
111 }
112
113 let owner = value
114 .get("owner")
115 .and_then(Value::as_str)
116 .ok_or_else(|| UpstreamError::Rpc("missing owner in getAccountInfo response".into()))?;
117 let lamports = value.get("lamports").and_then(Value::as_u64).unwrap_or(0);
118 let data_array = value
119 .get("data")
120 .and_then(Value::as_array)
121 .ok_or_else(|| UpstreamError::Rpc("missing data array in getAccountInfo".into()))?;
122 let b64 = data_array
124 .first()
125 .and_then(Value::as_str)
126 .ok_or_else(|| UpstreamError::Rpc("malformed data tuple".into()))?;
127
128 let data = base64_decode(b64)
129 .ok_or_else(|| UpstreamError::Rpc("base64-decode failed for account data".into()))?;
130 let owner_bytes = base58_decode_32(owner)
131 .ok_or_else(|| UpstreamError::Rpc("base58-decode owner failed".into()))?;
132
133 Ok(Some(AccountData {
134 data,
135 owner: owner_bytes,
136 lamports,
137 }))
138 }
139
140 async fn fetch_uri(&self, uri: &str) -> Option<Vec<u8>> {
146 if !self.offchain_enabled {
147 return None;
148 }
149 if let Some(path) = uri.strip_prefix("file://") {
150 let bytes = tokio::fs::read(path).await.ok()?;
154 if bytes.len() > OFFCHAIN_MAX_BYTES {
155 return None;
156 }
157 return Some(bytes);
158 }
159 if uri.starts_with("http://") || uri.starts_with("https://") {
160 let resp = self
161 .client
162 .get(uri)
163 .timeout(OFFCHAIN_TIMEOUT)
164 .send()
165 .await
166 .ok()?;
167 if !resp.status().is_success() {
168 return None;
169 }
170 if let Some(len) = resp.content_length() {
173 if len > OFFCHAIN_MAX_BYTES as u64 {
174 return None;
175 }
176 }
177 let bytes = resp.bytes().await.ok()?;
178 if bytes.len() > OFFCHAIN_MAX_BYTES {
179 return None;
180 }
181 return Some(bytes.to_vec());
182 }
183 None
187 }
188}
189
190fn base64_decode(s: &str) -> Option<Vec<u8>> {
193 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
197 const ALPHABET_URL: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
198 let mut table = [255u8; 256];
199 for (i, &b) in ALPHABET.iter().enumerate() {
200 table[b as usize] = i as u8;
201 }
202 for (i, &b) in ALPHABET_URL.iter().enumerate() {
203 table[b as usize] = i as u8;
204 }
205 let mut out: Vec<u8> = Vec::with_capacity(s.len() * 3 / 4);
206 let mut buf: u32 = 0;
207 let mut bits: u32 = 0;
208 for &b in s.as_bytes() {
209 if b == b'=' || b == b'\r' || b == b'\n' {
210 continue;
211 }
212 let v = table[b as usize];
213 if v == 255 {
214 return None;
215 }
216 buf = (buf << 6) | u32::from(v);
217 bits += 6;
218 if bits >= 8 {
219 bits -= 8;
220 out.push(((buf >> bits) & 0xff) as u8);
221 }
222 }
223 Some(out)
224}
225
226fn base58_decode_32(s: &str) -> Option<[u8; 32]> {
227 let bytes = bs58::decode(s).into_vec().ok()?;
228 bytes.try_into().ok()
229}