1use bzip2_rs::DecoderReader;
2use std::{collections::HashMap, io::Read};
3
4use crate::error::Error;
5use base64::{engine::general_purpose, Engine as _};
6use reqwest::multipart::{Form, Part};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use spiral_rs::{
10 client::Client,
11 key_value::{extract_result_impl, row_from_key, varint_decode},
12 params::Params,
13 util::params_from_json_obj,
14};
15
16pub(crate) async fn http_get_string(url: &str, api_key: &str) -> Result<String, Error> {
18 let req = reqwest::Client::new().get(url).header("x-api-key", api_key);
19 let res = req.send().await?.error_for_status()?.text().await?;
20 Ok(res)
21}
22
23pub(crate) async fn http_post_bytes(
25 url: &str,
26 api_key: &str,
27 data: Vec<u8>,
28) -> Result<Vec<u8>, Error> {
29 let req = reqwest::Client::new()
30 .post(url)
31 .body(data)
32 .header("Content-Type", "application/octet-stream")
33 .header("x-api-key", api_key);
34 let res = req.send().await?.error_for_status()?;
35 let resp_body = res.bytes().await?;
36 Ok(resp_body.to_vec())
37}
38
39pub(crate) async fn http_post_string(
41 url: &str,
42 api_key: &str,
43 data: String,
44) -> Result<String, Error> {
45 let req = reqwest::Client::new()
46 .post(url)
47 .body(data)
48 .header("x-api-key", api_key);
49 let res = req.send().await?.error_for_status()?.text().await?;
50 Ok(res)
51}
52
53pub(crate) async fn http_post_form_data(
55 url: &str,
56 api_key: &str,
57 data: Vec<u8>,
58 fields: HashMap<String, String>,
59) -> Result<Vec<u8>, Error> {
60 let mut form_data = Form::new();
61 for (key, value) in fields {
62 form_data = form_data.text(key, value);
63 }
64 form_data = form_data.part("file", Part::bytes(data));
65
66 let req = reqwest::Client::new()
67 .post(url)
68 .multipart(form_data)
69 .header("x-api-key", api_key);
70 let res = req.send().await?.error_for_status()?;
71 let resp_body = res.bytes().await?;
72 Ok(resp_body.to_vec())
73}
74
75fn decompress(data: &[u8]) -> Result<Vec<u8>, Error> {
77 let mut decoder = DecoderReader::new(data);
78 let mut decompressed = Vec::new();
79 decoder.read_to_end(&mut decompressed)?;
80 Ok(decompressed)
81}
82
83fn serialize_chunks(data: &[Vec<u8>]) -> Vec<u8> {
89 let mut serialized = Vec::new();
90 serialized.extend(u64::to_le_bytes(data.len() as u64).to_vec());
91 for chunk in data {
92 serialized.extend(u64::to_le_bytes(chunk.len() as u64).to_vec());
93 serialized.extend(chunk);
94 }
95 serialized
96}
97
98fn deserialize_chunks(data: &[u8]) -> Vec<Vec<u8>> {
104 let mut chunks = Vec::new();
105 let mut offset = 0;
106 let num_chunks = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
107 offset += 8;
108 for _ in 0..num_chunks {
109 let chunk_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
110 offset += 8;
111 chunks.push(data[offset..offset + chunk_len as usize].to_vec());
112 offset += chunk_len as usize;
113 }
114 chunks
115}
116
117fn split_metadata(data: &[u8]) -> (&[u8], &[u8]) {
119 let (value, bytes_used) = varint_decode(data);
120 let metadata_len = value as usize;
121 if metadata_len == 0 {
122 return (&[], data);
123 }
124 let metadata = &data[bytes_used..bytes_used + metadata_len];
125 let data = &data[bytes_used + metadata_len..];
126
127 (metadata, data)
128}
129
130fn is_all_zeros(decrypted: &[u8]) -> bool {
132 decrypted.iter().all(|&x| x == 0)
133}
134
135pub(crate) async fn get_meta(url: &str, api_key: &str) -> Result<String, Error> {
137 http_get_string(&format!("{}/meta", url), api_key).await
138}
139
140fn is_blyss_url(url: &str) -> bool {
141 url.contains("blyss.dev/")
142}
143
144#[derive(Serialize, Deserialize)]
145struct PrelimSetupBody {
146 length: usize,
147}
148
149async fn perform_setup(url: &str, api_key: &str, setup_data: Vec<u8>) -> Result<String, Error> {
150 if !is_blyss_url(url) {
151 let setup_resp = http_post_bytes(&format!("{}/setup", url), api_key, setup_data).await?;
152 let setup_resp_str = String::from_utf8(setup_resp)?;
153 let uuid = serde_json::from_str::<Value>(&setup_resp_str)?
154 .get("uuid")
155 .ok_or(Error::Unknown)?
156 .as_str()
157 .ok_or(Error::Unknown)?
158 .to_string();
159 return Ok(uuid);
160 }
161
162 let prelim_setup_body = serde_json::to_string(&PrelimSetupBody {
163 length: setup_data.len(),
164 })?;
165 let setup_resp =
166 http_post_string(&format!("{}/setup", url), api_key, prelim_setup_body).await?;
167 let setup_resp_value: Value = serde_json::from_str(&setup_resp)?;
168 let fields: HashMap<String, String> = serde_json::from_value(
169 setup_resp_value
170 .get("fields")
171 .ok_or(Error::Unknown)?
172 .clone(),
173 )?;
174 let s3_url: String =
175 serde_json::from_value(setup_resp_value.get("url").ok_or(Error::Unknown)?.clone())?;
176
177 http_post_form_data(&s3_url, api_key, setup_data, fields).await?;
178
179 let uuid = setup_resp_value
180 .get("uuid")
181 .ok_or(Error::Unknown)?
182 .as_str()
183 .ok_or(Error::Unknown)?
184 .to_owned();
185 Ok(uuid)
186}
187
188async fn private_read<'a>(
190 client: &Client<'a>,
191 params: &Params,
192 uuid: &str,
193 url: &str,
194 api_key: &str,
195 keys: &[String],
196) -> Result<Vec<Vec<u8>>, Error> {
197 let queries: Vec<_> = keys
198 .iter()
199 .map(|key| {
200 let idx_target = row_from_key(¶ms, key);
201 let query = client.generate_query(idx_target);
202 let query_data = query.serialize();
203 let uuid_and_query_data: Vec<_> = (uuid.as_bytes().to_vec().into_iter())
204 .chain(query_data)
205 .collect();
206 uuid_and_query_data
207 })
208 .collect();
209 let full_query_data = serialize_chunks(&queries);
210
211 let resp_data_b64 =
212 http_post_bytes(&format!("{}/private-read", url), api_key, full_query_data).await?;
213 let resp_data = general_purpose::STANDARD.decode(resp_data_b64)?;
214 let resp_chunks = deserialize_chunks(&resp_data);
215
216 let mut results = Vec::new();
217 for (i, chunk) in resp_chunks.iter().enumerate() {
218 let decrypted = client.decode_response(&chunk);
219 if is_all_zeros(&decrypted) {
220 results.push(vec![]);
221 continue;
222 }
223 let decompressed = decompress(&decrypted)?;
224 let result = extract_result_impl(&keys[i], &decompressed);
225 if let Ok(result) = result {
226 let (_metadata, data) = split_metadata(&result);
227 results.push(data.to_vec());
228 } else {
229 results.push(vec![]);
230 }
231 }
232
233 Ok(results)
234}
235
236pub struct ApiClient {
238 pub url: String,
240
241 api_key: String,
242 params: &'static Params,
243 client: Client<'static>,
244 uuid: Option<String>,
245}
246
247impl ApiClient {
248 pub async fn new(url: &str, api_key: &str) -> Result<Self, Error> {
252 let metadata = get_meta(url, api_key).await?;
253 let params_value = serde_json::from_str::<Value>(&metadata)?
254 .get("pir_scheme")
255 .ok_or(Error::Unknown)?
256 .clone();
257 let params = params_from_json_obj(¶ms_value);
258 let boxed_params = Box::leak(Box::new(params)); Ok(Self {
261 url: url.to_string(),
262 api_key: api_key.to_string(),
263 params: boxed_params,
264 client: Client::init(boxed_params),
265 uuid: None,
266 })
267 }
268
269 fn has_set_up(&self) -> bool {
271 self.uuid.is_some()
272 }
273
274 pub async fn setup(&mut self) -> Result<(), Error> {
276 let setup = self.client.generate_keys();
277 let setup_data = setup.serialize();
278
279 let uuid = perform_setup(&self.url, &self.api_key, setup_data).await?;
280
281 self.uuid = Some(uuid);
282
283 Ok(())
284 }
285
286 pub async fn private_read(&self, keys: &[String]) -> Result<Vec<Vec<u8>>, Error> {
299 if !self.has_set_up() {
300 return Err(Error::NeedSetup);
301 }
302
303 private_read(
304 &self.client,
305 &self.params,
306 self.uuid.as_ref().unwrap(),
307 &self.url,
308 &self.api_key,
309 keys,
310 )
311 .await
312 }
313}