Skip to main content

bunnydb_http/
client.rs

1use std::fmt;
2use std::time::Duration;
3
4use reqwest::{header, StatusCode};
5
6// tokio::time::sleep is only available on non-WASM targets.
7#[cfg(not(target_arch = "wasm32"))]
8use tokio::time::sleep;
9
10use crate::{
11    decode::{build_execute_statement, decode_exec_result, decode_query_result},
12    wire::{self, PipelineRequest, Request},
13    BunnyDbError, ClientOptions, ExecResult, Params, QueryResult, Result, Statement,
14    StatementOutcome,
15};
16
17/// Formats a database ID into the canonical pipeline URL.
18///
19/// Example: `"abc123"` → `"https://abc123.lite.bunnydb.net/v2/pipeline"`
20pub fn db_id_to_pipeline_url(db_id: &str) -> String {
21    format!("https://{}.lite.bunnydb.net/v2/pipeline", db_id.trim())
22}
23
24#[derive(Clone)]
25/// HTTP client for Bunny.net Database SQL pipeline endpoint.
26pub struct BunnyDbClient {
27    http: reqwest::Client,
28    pipeline_url: String,
29    token: String,
30    options: ClientOptions,
31}
32
33impl fmt::Debug for BunnyDbClient {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("BunnyDbClient")
36            .field("pipeline_url", &self.pipeline_url)
37            .field("token", &"<redacted>")
38            .field("options", &self.options)
39            .finish()
40    }
41}
42
43impl BunnyDbClient {
44    /// Creates a client with a raw authorization header value.
45    ///
46    /// This is backward-compatible with previous versions where `token`
47    /// was passed directly as `Authorization: <value>`.
48    pub fn new(pipeline_url: impl Into<String>, token: impl Into<String>) -> Self {
49        Self::new_raw_auth(pipeline_url, token)
50    }
51
52    /// Creates a client with a full raw authorization value.
53    ///
54    /// Example: `"Bearer <token>"` or any custom scheme.
55    pub fn new_raw_auth(pipeline_url: impl Into<String>, authorization: impl Into<String>) -> Self {
56        Self {
57            http: reqwest::Client::new(),
58            pipeline_url: pipeline_url.into(),
59            token: authorization.into(),
60            options: ClientOptions::default(),
61        }
62    }
63
64    /// Creates a client from a bearer token.
65    ///
66    /// If the token is missing the `Bearer ` prefix, it is added automatically.
67    pub fn new_bearer(pipeline_url: impl Into<String>, token: impl AsRef<str>) -> Self {
68        let authorization = normalize_bearer_authorization(token.as_ref());
69        Self::new_raw_auth(pipeline_url, authorization)
70    }
71
72    /// Creates a client from a **Bunny Database ID** and a bearer token.
73    ///
74    /// The pipeline URL is derived automatically:
75    /// `https://<db_id>.lite.bunnydb.net/v2/pipeline`
76    ///
77    /// This is the most ergonomic constructor when you know the database ID.
78    ///
79    /// # Example
80    ///
81    /// ```no_run
82    /// use bunnydb_http::BunnyDbClient;
83    ///
84    /// let db = BunnyDbClient::from_db_id("my-db-id", "my-token");
85    /// ```
86    pub fn from_db_id(db_id: impl AsRef<str>, token: impl AsRef<str>) -> Self {
87        let url = db_id_to_pipeline_url(db_id.as_ref());
88        Self::new_bearer(url, token)
89    }
90
91    /// Creates a client from environment variables.
92    ///
93    /// Reads:
94    /// - `BUNNYDB_PIPELINE_URL` — full pipeline endpoint URL  
95    ///   (e.g. `https://<id>.lite.bunnydb.net/v2/pipeline`)
96    /// - `BUNNYDB_TOKEN` — access token (Bearer prefix optional)
97    ///
98    /// Returns an error if either variable is missing or empty.
99    ///
100    /// **Not available on `wasm32` targets** — environment variables do not
101    /// exist in browser runtimes. Use [`BunnyDbClient::new_bearer`] or
102    /// receive credentials from JavaScript via `wasm-bindgen`.
103    ///
104    /// # Example
105    ///
106    /// ```no_run
107    /// use bunnydb_http::BunnyDbClient;
108    ///
109    /// let db = BunnyDbClient::from_env().expect("missing BUNNYDB_* env vars");
110    /// ```
111    #[cfg(not(target_arch = "wasm32"))]
112    pub fn from_env() -> std::result::Result<Self, String> {
113        let url = std::env::var("BUNNYDB_PIPELINE_URL")
114            .map_err(|_| "missing BUNNYDB_PIPELINE_URL environment variable".to_owned())?;
115        let token = std::env::var("BUNNYDB_TOKEN")
116            .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
117        if url.trim().is_empty() {
118            return Err("BUNNYDB_PIPELINE_URL is set but empty".to_owned());
119        }
120        if token.trim().is_empty() {
121            return Err("BUNNYDB_TOKEN is set but empty".to_owned());
122        }
123        Ok(Self::new_bearer(url, token))
124    }
125
126    /// Creates a client from a **database ID** read from the environment,
127    /// combined with an access token also read from the environment.
128    ///
129    /// Reads:
130    /// - `BUNNYDB_ID` — the database ID (e.g. `my-db-abc123`)
131    /// - `BUNNYDB_TOKEN` — access token
132    ///
133    /// The pipeline URL is derived from the database ID automatically.
134    ///
135    /// **Not available on `wasm32` targets** — see [`BunnyDbClient::from_env`].
136    ///
137    /// # Example
138    ///
139    /// ```no_run
140    /// use bunnydb_http::BunnyDbClient;
141    ///
142    /// let db = BunnyDbClient::from_env_db_id().expect("missing BUNNYDB_ID / BUNNYDB_TOKEN");
143    /// ```
144    #[cfg(not(target_arch = "wasm32"))]
145    pub fn from_env_db_id() -> std::result::Result<Self, String> {
146        let db_id = std::env::var("BUNNYDB_ID")
147            .map_err(|_| "missing BUNNYDB_ID environment variable".to_owned())?;
148        let token = std::env::var("BUNNYDB_TOKEN")
149            .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
150        if db_id.trim().is_empty() {
151            return Err("BUNNYDB_ID is set but empty".to_owned());
152        }
153        if token.trim().is_empty() {
154            return Err("BUNNYDB_TOKEN is set but empty".to_owned());
155        }
156        Ok(Self::from_db_id(db_id, token))
157    }
158
159    /// Applies client options such as timeout and retry behavior.
160    pub fn with_options(mut self, opts: ClientOptions) -> Self {
161        self.options = opts;
162        self
163    }
164
165    /// Executes a query statement and returns rows.
166    pub async fn query<P: Into<Params>>(&self, sql: &str, params: P) -> Result<QueryResult> {
167        let result = self.run_single(sql, params.into(), true).await?;
168        decode_query_result(result)
169    }
170
171    /// Executes a statement and returns execution metadata.
172    pub async fn execute<P: Into<Params>>(&self, sql: &str, params: P) -> Result<ExecResult> {
173        let result = self.run_single(sql, params.into(), false).await?;
174        decode_exec_result(result)
175    }
176
177    /// Sends multiple statements in one pipeline request.
178    ///
179    /// SQL errors at statement level are returned as
180    /// [`StatementOutcome::SqlError`] instead of failing the entire batch.
181    pub async fn batch<I>(&self, statements: I) -> Result<Vec<StatementOutcome>>
182    where
183        I: IntoIterator<Item = Statement>,
184    {
185        let statements: Vec<Statement> = statements.into_iter().collect();
186        let mut requests = Vec::with_capacity(statements.len() + 1);
187        let mut wants_rows = Vec::with_capacity(statements.len());
188
189        for statement in statements {
190            let stmt =
191                build_execute_statement(&statement.sql, statement.params, statement.want_rows)?;
192            requests.push(Request::Execute { stmt });
193            wants_rows.push(statement.want_rows);
194        }
195
196        requests.push(Request::Close {});
197        let payload = PipelineRequest { requests };
198        let response = self.send_pipeline_with_retry(&payload).await?;
199
200        let expected = wants_rows.len() + 1;
201        if response.results.len() != expected {
202            return Err(BunnyDbError::Decode(format!(
203                "result count mismatch: expected {expected}, got {}",
204                response.results.len()
205            )));
206        }
207
208        let mut results = response.results.into_iter();
209        let mut outcomes = Vec::with_capacity(wants_rows.len());
210
211        for (index, want_rows) in wants_rows.into_iter().enumerate() {
212            let result = results.next().ok_or_else(|| {
213                BunnyDbError::Decode(format!("missing execute result at index {index}"))
214            })?;
215            outcomes.push(Self::decode_statement_outcome(result, index, want_rows)?);
216        }
217
218        let close_index = outcomes.len();
219        let close = results.next().ok_or_else(|| {
220            BunnyDbError::Decode(format!("missing close result at index {close_index}"))
221        })?;
222        Self::ensure_close_success(close, close_index)?;
223
224        Ok(outcomes)
225    }
226
227    async fn run_single(
228        &self,
229        sql: &str,
230        params: Params,
231        want_rows: bool,
232    ) -> Result<wire::ExecuteResult> {
233        let execute_stmt = build_execute_statement(sql, params, want_rows)?;
234        let payload = PipelineRequest {
235            requests: vec![Request::Execute { stmt: execute_stmt }, Request::Close {}],
236        };
237        let response = self.send_pipeline_with_retry(&payload).await?;
238
239        if response.results.len() != 2 {
240            return Err(BunnyDbError::Decode(format!(
241                "result count mismatch: expected 2, got {}",
242                response.results.len()
243            )));
244        }
245
246        let mut iter = response.results.into_iter();
247        let execute = iter
248            .next()
249            .ok_or_else(|| BunnyDbError::Decode("missing execute result".to_owned()))?;
250        let close = iter
251            .next()
252            .ok_or_else(|| BunnyDbError::Decode("missing close result".to_owned()))?;
253
254        let execute_result = Self::into_execute_result(execute, 0)?;
255        Self::ensure_close_success(close, 1)?;
256        Ok(execute_result)
257    }
258
259    async fn send_pipeline_with_retry(
260        &self,
261        payload: &PipelineRequest,
262    ) -> Result<wire::PipelineResponse> {
263        let mut attempt = 0usize;
264        loop {
265            // Build the request. On WASM, reqwest uses AbortController for
266            // timeout; the `.timeout()` method is available on both targets.
267            let response = self
268                .http
269                .post(&self.pipeline_url)
270                .header(header::AUTHORIZATION, &self.token)
271                .header(header::CONTENT_TYPE, "application/json")
272                .timeout(Duration::from_millis(self.options.timeout_ms))
273                .json(payload)
274                .send()
275                .await;
276
277            match response {
278                Ok(response) => {
279                    let status = response.status();
280                    let body = response.text().await.map_err(BunnyDbError::Transport)?;
281
282                    if !status.is_success() {
283                        if self.should_retry_status(status) && attempt < self.options.max_retries {
284                            self.wait_before_retry(attempt).await;
285                            attempt += 1;
286                            continue;
287                        }
288
289                        return Err(BunnyDbError::Http {
290                            status: status.as_u16(),
291                            body,
292                        });
293                    }
294
295                    return serde_json::from_str::<wire::PipelineResponse>(&body).map_err(|err| {
296                        BunnyDbError::Decode(format!(
297                            "invalid pipeline response JSON: {err}; body: {body}"
298                        ))
299                    });
300                }
301                Err(err) => {
302                    if self.should_retry_transport(&err) && attempt < self.options.max_retries {
303                        self.wait_before_retry(attempt).await;
304                        attempt += 1;
305                        continue;
306                    }
307                    return Err(BunnyDbError::Transport(err));
308                }
309            }
310        }
311    }
312
313    fn decode_statement_outcome(
314        result: wire::PipelineResult,
315        request_index: usize,
316        want_rows: bool,
317    ) -> Result<StatementOutcome> {
318        match result.kind.as_str() {
319            "ok" => {
320                let execute_result = Self::into_execute_result(result, request_index)?;
321                if want_rows {
322                    Ok(StatementOutcome::Query(decode_query_result(
323                        execute_result,
324                    )?))
325                } else {
326                    Ok(StatementOutcome::Exec(decode_exec_result(execute_result)?))
327                }
328            }
329            "error" => {
330                let error = result.error.ok_or_else(|| {
331                    BunnyDbError::Decode(format!(
332                        "missing error payload for request {request_index}"
333                    ))
334                })?;
335                Ok(StatementOutcome::SqlError {
336                    request_index,
337                    message: error.message,
338                    code: error.code,
339                })
340            }
341            other => Err(BunnyDbError::Decode(format!(
342                "unknown pipeline result type '{other}' at request {request_index}"
343            ))),
344        }
345    }
346
347    fn into_execute_result(
348        result: wire::PipelineResult,
349        request_index: usize,
350    ) -> Result<wire::ExecuteResult> {
351        match result.kind.as_str() {
352            "ok" => {
353                let response = result.response.ok_or_else(|| {
354                    BunnyDbError::Decode(format!(
355                        "missing response payload for request {request_index}"
356                    ))
357                })?;
358                if response.kind != "execute" {
359                    return Err(BunnyDbError::Decode(format!(
360                        "expected execute response at request {request_index}, got '{}'",
361                        response.kind
362                    )));
363                }
364                response.result.ok_or_else(|| {
365                    BunnyDbError::Decode(format!(
366                        "missing execute result payload at request {request_index}"
367                    ))
368                })
369            }
370            "error" => {
371                let error = result.error.ok_or_else(|| {
372                    BunnyDbError::Decode(format!(
373                        "missing error payload for request {request_index}"
374                    ))
375                })?;
376                Err(BunnyDbError::Pipeline {
377                    request_index,
378                    message: error.message,
379                    code: error.code,
380                })
381            }
382            other => Err(BunnyDbError::Decode(format!(
383                "unknown pipeline result type '{other}' at request {request_index}"
384            ))),
385        }
386    }
387
388    fn ensure_close_success(result: wire::PipelineResult, request_index: usize) -> Result<()> {
389        match result.kind.as_str() {
390            "ok" => {
391                let response = result.response.ok_or_else(|| {
392                    BunnyDbError::Decode(format!(
393                        "missing close response payload for request {request_index}"
394                    ))
395                })?;
396                if response.kind != "close" {
397                    return Err(BunnyDbError::Decode(format!(
398                        "expected close response at request {request_index}, got '{}'",
399                        response.kind
400                    )));
401                }
402                Ok(())
403            }
404            "error" => {
405                let error = result.error.ok_or_else(|| {
406                    BunnyDbError::Decode(format!(
407                        "missing error payload for close request {request_index}"
408                    ))
409                })?;
410                Err(BunnyDbError::Pipeline {
411                    request_index,
412                    message: error.message,
413                    code: error.code,
414                })
415            }
416            other => Err(BunnyDbError::Decode(format!(
417                "unknown pipeline result type '{other}' at request {request_index}"
418            ))),
419        }
420    }
421
422    fn should_retry_status(&self, status: StatusCode) -> bool {
423        matches!(
424            status,
425            StatusCode::TOO_MANY_REQUESTS
426                | StatusCode::INTERNAL_SERVER_ERROR
427                | StatusCode::BAD_GATEWAY
428                | StatusCode::SERVICE_UNAVAILABLE
429                | StatusCode::GATEWAY_TIMEOUT
430        )
431    }
432
433    fn should_retry_transport(&self, err: &reqwest::Error) -> bool {
434        err.is_timeout()
435            || err.is_request()
436            || err.is_body()
437            // is_connect() is not available on wasm32 targets (no TCP)
438            || {
439                #[cfg(not(target_arch = "wasm32"))]
440                { err.is_connect() }
441                #[cfg(target_arch = "wasm32")]
442                { false }
443            }
444    }
445
446    /// Waits before the next retry attempt.
447    ///
448    /// On native targets: exponential backoff sleep via `tokio::time::sleep`.
449    /// On WASM targets: no-op — edge functions prefer fast failure over
450    /// sleeping, and `tokio::time::sleep` is not available.
451    async fn wait_before_retry(&self, attempt: usize) {
452        let exp = attempt.min(16) as u32;
453        let multiplier = 1u64 << exp;
454        let delay_ms = self.options.retry_backoff_ms.saturating_mul(multiplier);
455
456        #[cfg(feature = "tracing")]
457        tracing::debug!("retrying pipeline request after {} ms", delay_ms);
458
459        #[cfg(not(target_arch = "wasm32"))]
460        sleep(Duration::from_millis(delay_ms)).await;
461
462        // WASM: no sleep implementation — suppress unused variable warning.
463        #[cfg(target_arch = "wasm32")]
464        let _ = delay_ms;
465    }
466}
467
468fn normalize_bearer_authorization(token: &str) -> String {
469    let trimmed = token.trim();
470    let prefix = trimmed.get(..7);
471    if prefix.is_some_and(|value| value.eq_ignore_ascii_case("bearer ")) {
472        trimmed.to_owned()
473    } else {
474        format!("Bearer {trimmed}")
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use super::{normalize_bearer_authorization, BunnyDbClient};
481
482    #[test]
483    fn normalize_bearer_adds_prefix_when_missing() {
484        assert_eq!(
485            normalize_bearer_authorization("abc123"),
486            "Bearer abc123".to_owned()
487        );
488    }
489
490    #[test]
491    fn normalize_bearer_keeps_existing_prefix() {
492        assert_eq!(
493            normalize_bearer_authorization("bEaReR abc123"),
494            "bEaReR abc123".to_owned()
495        );
496    }
497
498    #[test]
499    fn debug_redacts_authorization_value() {
500        let client = BunnyDbClient::new_raw_auth("https://db/v2/pipeline", "secret-token");
501        let debug = format!("{client:?}");
502        assert!(debug.contains("<redacted>"));
503        assert!(!debug.contains("secret-token"));
504    }
505}