blyss_rs/
api.rs

1use bzip2_rs::DecoderReader;
2use std::{collections::HashMap, io::Read};
3
4use crate::error::Error;
5use base64::{engine::general_purpose, Engine as _};
6use reqwest::multipart::{Form, Part};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use spiral_rs::{
10    client::Client,
11    key_value::{extract_result_impl, row_from_key, varint_decode},
12    params::Params,
13    util::params_from_json_obj,
14};
15
16/// HTTP GET request to the given URL with the given API key.
17pub(crate) async fn http_get_string(url: &str, api_key: &str) -> Result<String, Error> {
18    let req = reqwest::Client::new().get(url).header("x-api-key", api_key);
19    let res = req.send().await?.error_for_status()?.text().await?;
20    Ok(res)
21}
22
23/// HTTP POST request with binary body to the given URL with the given API key.
24pub(crate) async fn http_post_bytes(
25    url: &str,
26    api_key: &str,
27    data: Vec<u8>,
28) -> Result<Vec<u8>, Error> {
29    let req = reqwest::Client::new()
30        .post(url)
31        .body(data)
32        .header("Content-Type", "application/octet-stream")
33        .header("x-api-key", api_key);
34    let res = req.send().await?.error_for_status()?;
35    let resp_body = res.bytes().await?;
36    Ok(resp_body.to_vec())
37}
38
39/// HTTP POST request with string body to the given URL with the given API key.
40pub(crate) async fn http_post_string(
41    url: &str,
42    api_key: &str,
43    data: String,
44) -> Result<String, Error> {
45    let req = reqwest::Client::new()
46        .post(url)
47        .body(data)
48        .header("x-api-key", api_key);
49    let res = req.send().await?.error_for_status()?.text().await?;
50    Ok(res)
51}
52
53/// HTTP POST request to the given URL with the given API key.
54pub(crate) async fn http_post_form_data(
55    url: &str,
56    api_key: &str,
57    data: Vec<u8>,
58    fields: HashMap<String, String>,
59) -> Result<Vec<u8>, Error> {
60    let mut form_data = Form::new();
61    for (key, value) in fields {
62        form_data = form_data.text(key, value);
63    }
64    form_data = form_data.part("file", Part::bytes(data));
65
66    let req = reqwest::Client::new()
67        .post(url)
68        .multipart(form_data)
69        .header("x-api-key", api_key);
70    let res = req.send().await?.error_for_status()?;
71    let resp_body = res.bytes().await?;
72    Ok(resp_body.to_vec())
73}
74
75/// Decompress the given data using bzip2.
76fn decompress(data: &[u8]) -> Result<Vec<u8>, Error> {
77    let mut decoder = DecoderReader::new(data);
78    let mut decompressed = Vec::new();
79    decoder.read_to_end(&mut decompressed)?;
80    Ok(decompressed)
81}
82
83/// Serialize a list of chunks into a single byte array using the following format:
84/// - 8 bytes: number of chunks (u64 LE)
85/// - for each chunk:
86///   - 8 bytes: chunk length (u64 LE)
87///   - (chunk data)
88fn serialize_chunks(data: &[Vec<u8>]) -> Vec<u8> {
89    let mut serialized = Vec::new();
90    serialized.extend(u64::to_le_bytes(data.len() as u64).to_vec());
91    for chunk in data {
92        serialized.extend(u64::to_le_bytes(chunk.len() as u64).to_vec());
93        serialized.extend(chunk);
94    }
95    serialized
96}
97
98/// Deserialize a list of chunks from a single byte array in the following format:
99/// - 8 bytes: number of chunks (u64 LE)
100/// - for each chunk:
101///   - 8 bytes: chunk length (u64 LE)
102///   - (chunk data)
103fn deserialize_chunks(data: &[u8]) -> Vec<Vec<u8>> {
104    let mut chunks = Vec::new();
105    let mut offset = 0;
106    let num_chunks = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
107    offset += 8;
108    for _ in 0..num_chunks {
109        let chunk_len = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
110        offset += 8;
111        chunks.push(data[offset..offset + chunk_len as usize].to_vec());
112        offset += chunk_len as usize;
113    }
114    chunks
115}
116
117/// Split the given data into metadata and the rest of the data.
118fn split_metadata(data: &[u8]) -> (&[u8], &[u8]) {
119    let (value, bytes_used) = varint_decode(data);
120    let metadata_len = value as usize;
121    if metadata_len == 0 {
122        return (&[], data);
123    }
124    let metadata = &data[bytes_used..bytes_used + metadata_len];
125    let data = &data[bytes_used + metadata_len..];
126
127    (metadata, data)
128}
129
130/// Return whether the given data is all zeros.
131fn is_all_zeros(decrypted: &[u8]) -> bool {
132    decrypted.iter().all(|&x| x == 0)
133}
134
135/// Fetch the metadata from the given URL.
136pub(crate) async fn get_meta(url: &str, api_key: &str) -> Result<String, Error> {
137    http_get_string(&format!("{}/meta", url), api_key).await
138}
139
140fn is_blyss_url(url: &str) -> bool {
141    url.contains("blyss.dev/")
142}
143
144#[derive(Serialize, Deserialize)]
145struct PrelimSetupBody {
146    length: usize,
147}
148
149async fn perform_setup(url: &str, api_key: &str, setup_data: Vec<u8>) -> Result<String, Error> {
150    if !is_blyss_url(url) {
151        let setup_resp = http_post_bytes(&format!("{}/setup", url), api_key, setup_data).await?;
152        let setup_resp_str = String::from_utf8(setup_resp)?;
153        let uuid = serde_json::from_str::<Value>(&setup_resp_str)?
154            .get("uuid")
155            .ok_or(Error::Unknown)?
156            .as_str()
157            .ok_or(Error::Unknown)?
158            .to_string();
159        return Ok(uuid);
160    }
161
162    let prelim_setup_body = serde_json::to_string(&PrelimSetupBody {
163        length: setup_data.len(),
164    })?;
165    let setup_resp =
166        http_post_string(&format!("{}/setup", url), api_key, prelim_setup_body).await?;
167    let setup_resp_value: Value = serde_json::from_str(&setup_resp)?;
168    let fields: HashMap<String, String> = serde_json::from_value(
169        setup_resp_value
170            .get("fields")
171            .ok_or(Error::Unknown)?
172            .clone(),
173    )?;
174    let s3_url: String =
175        serde_json::from_value(setup_resp_value.get("url").ok_or(Error::Unknown)?.clone())?;
176
177    http_post_form_data(&s3_url, api_key, setup_data, fields).await?;
178
179    let uuid = setup_resp_value
180        .get("uuid")
181        .ok_or(Error::Unknown)?
182        .as_str()
183        .ok_or(Error::Unknown)?
184        .to_owned();
185    Ok(uuid)
186}
187
188/// Privately read the given keys from the given URL, using the given API key.
189async fn private_read<'a>(
190    client: &Client<'a>,
191    params: &Params,
192    uuid: &str,
193    url: &str,
194    api_key: &str,
195    keys: &[String],
196) -> Result<Vec<Vec<u8>>, Error> {
197    let queries: Vec<_> = keys
198        .iter()
199        .map(|key| {
200            let idx_target = row_from_key(&params, key);
201            let query = client.generate_query(idx_target);
202            let query_data = query.serialize();
203            let uuid_and_query_data: Vec<_> = (uuid.as_bytes().to_vec().into_iter())
204                .chain(query_data)
205                .collect();
206            uuid_and_query_data
207        })
208        .collect();
209    let full_query_data = serialize_chunks(&queries);
210
211    let resp_data_b64 =
212        http_post_bytes(&format!("{}/private-read", url), api_key, full_query_data).await?;
213    let resp_data = general_purpose::STANDARD.decode(resp_data_b64)?;
214    let resp_chunks = deserialize_chunks(&resp_data);
215
216    let mut results = Vec::new();
217    for (i, chunk) in resp_chunks.iter().enumerate() {
218        let decrypted = client.decode_response(&chunk);
219        if is_all_zeros(&decrypted) {
220            results.push(vec![]);
221            continue;
222        }
223        let decompressed = decompress(&decrypted)?;
224        let result = extract_result_impl(&keys[i], &decompressed);
225        if let Ok(result) = result {
226            let (_metadata, data) = split_metadata(&result);
227            results.push(data.to_vec());
228        } else {
229            results.push(vec![]);
230        }
231    }
232
233    Ok(results)
234}
235
236/// A client for a single, existing Blyss bucket.
237pub struct ApiClient {
238    /// The URL for the bucket.
239    pub url: String,
240
241    api_key: String,
242    params: &'static Params,
243    client: Client<'static>,
244    uuid: Option<String>,
245}
246
247impl ApiClient {
248    /// Create a new API client for the given URL and API key.
249    ///
250    /// The URL should be the URL of the bucket, e.g. `https://beta.api.blyss.dev/global.abc123`.
251    pub async fn new(url: &str, api_key: &str) -> Result<Self, Error> {
252        let metadata = get_meta(url, api_key).await?;
253        let params_value = serde_json::from_str::<Value>(&metadata)?
254            .get("pir_scheme")
255            .ok_or(Error::Unknown)?
256            .clone();
257        let params = params_from_json_obj(&params_value);
258        let boxed_params = Box::leak(Box::new(params)); // TODO: avoid this
259
260        Ok(Self {
261            url: url.to_string(),
262            api_key: api_key.to_string(),
263            params: boxed_params,
264            client: Client::init(boxed_params),
265            uuid: None,
266        })
267    }
268
269    /// Returns whether the client has been set up for private reads.
270    fn has_set_up(&self) -> bool {
271        self.uuid.is_some()
272    }
273
274    /// Prepare the client for private reads. This must be called before calling private_read().
275    pub async fn setup(&mut self) -> Result<(), Error> {
276        let setup = self.client.generate_keys();
277        let setup_data = setup.serialize();
278
279        let uuid = perform_setup(&self.url, &self.api_key, setup_data).await?;
280
281        self.uuid = Some(uuid);
282
283        Ok(())
284    }
285
286    /// Privately read the given keys from the bucket.
287    /// Must call setup() before calling this.
288    ///
289    /// # Arguments
290    /// - `keys` - The keys to read.
291    ///
292    /// # Returns
293    /// A vector of the values corresponding to the given keys.
294    /// If a key does not exist, the corresponding value will be an empty vector.
295    ///
296    /// # Errors
297    /// - `Error::NeedSetup` - If setup() has not been called.
298    pub async fn private_read(&self, keys: &[String]) -> Result<Vec<Vec<u8>>, Error> {
299        if !self.has_set_up() {
300            return Err(Error::NeedSetup);
301        }
302
303        private_read(
304            &self.client,
305            &self.params,
306            self.uuid.as_ref().unwrap(),
307            &self.url,
308            &self.api_key,
309            keys,
310        )
311        .await
312    }
313}