tidepool_server/
upstream_http.rs1use std::time::Duration;
10
11use async_trait::async_trait;
12use reqwest::Client;
13use serde_json::{json, Value};
14
15use tidepool_rpc::upstream::{AccountData, UpstreamClient, UpstreamError, UpstreamResult};
16
17#[derive(Debug, Clone)]
18pub struct HttpUpstream {
19 client: Client,
20 url: String,
21 timeout: Duration,
22}
23
24impl HttpUpstream {
25 pub fn new(url: impl Into<String>, timeout: Duration) -> Result<Self, UpstreamError> {
26 let client = Client::builder()
27 .timeout(timeout)
28 .build()
29 .map_err(|e| UpstreamError::Transport(e.to_string()))?;
30 Ok(Self {
31 client,
32 url: url.into(),
33 timeout,
34 })
35 }
36
37 async fn post_rpc(&self, method: &str, params: Value) -> UpstreamResult<Value> {
38 let body = json!({
39 "jsonrpc": "2.0",
40 "id": 1,
41 "method": method,
42 "params": params,
43 });
44 let resp = self
45 .client
46 .post(&self.url)
47 .json(&body)
48 .send()
49 .await
50 .map_err(|e| {
51 if e.is_timeout() {
52 UpstreamError::Timeout {
53 millis: u64::try_from(self.timeout.as_millis()).unwrap_or(u64::MAX),
54 }
55 } else {
56 UpstreamError::Transport(e.to_string())
57 }
58 })?;
59 let json: Value = resp
60 .json()
61 .await
62 .map_err(|e| UpstreamError::Transport(format!("decode upstream body: {e}")))?;
63 if let Some(err) = json.get("error") {
64 return Err(UpstreamError::Rpc(err.to_string()));
65 }
66 Ok(json.get("result").cloned().unwrap_or(Value::Null))
67 }
68}
69
70#[async_trait]
71impl UpstreamClient for HttpUpstream {
72 async fn rpc_call(&self, method: &str, params: Value) -> UpstreamResult<Vec<u8>> {
73 let result = self.post_rpc(method, params).await?;
74 serde_json::to_vec(&result)
75 .map_err(|e| UpstreamError::Transport(format!("serialize result: {e}")))
76 }
77
78 async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
79 let params = json!([address, { "encoding": "base64" }]);
80 let result = self.post_rpc("getAccountInfo", params).await?;
81 let Some(value) = result.get("value") else {
83 return Ok(None);
84 };
85 if value.is_null() {
86 return Ok(None);
87 }
88
89 let owner = value
90 .get("owner")
91 .and_then(Value::as_str)
92 .ok_or_else(|| UpstreamError::Rpc("missing owner in getAccountInfo response".into()))?;
93 let lamports = value.get("lamports").and_then(Value::as_u64).unwrap_or(0);
94 let data_array = value
95 .get("data")
96 .and_then(Value::as_array)
97 .ok_or_else(|| UpstreamError::Rpc("missing data array in getAccountInfo".into()))?;
98 let b64 = data_array
100 .first()
101 .and_then(Value::as_str)
102 .ok_or_else(|| UpstreamError::Rpc("malformed data tuple".into()))?;
103
104 let data = base64_decode(b64)
105 .ok_or_else(|| UpstreamError::Rpc("base64-decode failed for account data".into()))?;
106 let owner_bytes = base58_decode_32(owner)
107 .ok_or_else(|| UpstreamError::Rpc("base58-decode owner failed".into()))?;
108
109 Ok(Some(AccountData {
110 data,
111 owner: owner_bytes,
112 lamports,
113 }))
114 }
115}
116
117fn base64_decode(s: &str) -> Option<Vec<u8>> {
120 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
124 const ALPHABET_URL: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
125 let mut table = [255u8; 256];
126 for (i, &b) in ALPHABET.iter().enumerate() {
127 table[b as usize] = i as u8;
128 }
129 for (i, &b) in ALPHABET_URL.iter().enumerate() {
130 table[b as usize] = i as u8;
131 }
132 let mut out: Vec<u8> = Vec::with_capacity(s.len() * 3 / 4);
133 let mut buf: u32 = 0;
134 let mut bits: u32 = 0;
135 for &b in s.as_bytes() {
136 if b == b'=' || b == b'\r' || b == b'\n' {
137 continue;
138 }
139 let v = table[b as usize];
140 if v == 255 {
141 return None;
142 }
143 buf = (buf << 6) | u32::from(v);
144 bits += 6;
145 if bits >= 8 {
146 bits -= 8;
147 out.push(((buf >> bits) & 0xff) as u8);
148 }
149 }
150 Some(out)
151}
152
153fn base58_decode_32(s: &str) -> Option<[u8; 32]> {
154 let bytes = bs58::decode(s).into_vec().ok()?;
155 bytes.try_into().ok()
156}