Skip to main content

bunnydb_http/
client.rs

1use std::{fmt, time::Duration};
2
3use reqwest::{header, StatusCode};
4use tokio::time::sleep;
5
6use crate::{
7    decode::{build_execute_statement, decode_exec_result, decode_query_result},
8    wire::{self, PipelineRequest, Request},
9    BunnyDbError, ClientOptions, ExecResult, Params, QueryResult, Result, Statement,
10    StatementOutcome,
11};
12
13/// Formats a database ID into the canonical pipeline URL.
14///
15/// Example: `"abc123"` → `"https://abc123.lite.bunnydb.net/v2/pipeline"`
16pub fn db_id_to_pipeline_url(db_id: &str) -> String {
17    format!("https://{}.lite.bunnydb.net/v2/pipeline", db_id.trim())
18}
19
20#[derive(Clone)]
21/// HTTP client for Bunny.net Database SQL pipeline endpoint.
22pub struct BunnyDbClient {
23    http: reqwest::Client,
24    pipeline_url: String,
25    token: String,
26    options: ClientOptions,
27}
28
29impl fmt::Debug for BunnyDbClient {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        f.debug_struct("BunnyDbClient")
32            .field("pipeline_url", &self.pipeline_url)
33            .field("token", &"<redacted>")
34            .field("options", &self.options)
35            .finish()
36    }
37}
38
39impl BunnyDbClient {
40    /// Creates a client with a raw authorization header value.
41    ///
42    /// This is backward-compatible with previous versions where `token`
43    /// was passed directly as `Authorization: <value>`.
44    pub fn new(pipeline_url: impl Into<String>, token: impl Into<String>) -> Self {
45        Self::new_raw_auth(pipeline_url, token)
46    }
47
48    /// Creates a client with a full raw authorization value.
49    ///
50    /// Example: `"Bearer <token>"` or any custom scheme.
51    pub fn new_raw_auth(pipeline_url: impl Into<String>, authorization: impl Into<String>) -> Self {
52        Self {
53            http: reqwest::Client::new(),
54            pipeline_url: pipeline_url.into(),
55            token: authorization.into(),
56            options: ClientOptions::default(),
57        }
58    }
59
60    /// Creates a client from a bearer token.
61    ///
62    /// If the token is missing the `Bearer ` prefix, it is added automatically.
63    pub fn new_bearer(pipeline_url: impl Into<String>, token: impl AsRef<str>) -> Self {
64        let authorization = normalize_bearer_authorization(token.as_ref());
65        Self::new_raw_auth(pipeline_url, authorization)
66    }
67
68    /// Creates a client from a **Bunny Database ID** and a bearer token.
69    ///
70    /// The pipeline URL is derived automatically:
71    /// `https://<db_id>.lite.bunnydb.net/v2/pipeline`
72    ///
73    /// This is the most ergonomic constructor when you know the database ID.
74    ///
75    /// # Example
76    ///
77    /// ```no_run
78    /// use bunnydb_http::BunnyDbClient;
79    ///
80    /// let db = BunnyDbClient::from_db_id("my-db-id", "my-token");
81    /// ```
82    pub fn from_db_id(db_id: impl AsRef<str>, token: impl AsRef<str>) -> Self {
83        let url = db_id_to_pipeline_url(db_id.as_ref());
84        Self::new_bearer(url, token)
85    }
86
87    /// Creates a client from environment variables.
88    ///
89    /// Reads:
90    /// - `BUNNYDB_PIPELINE_URL` — full pipeline endpoint URL  
91    ///   (e.g. `https://<id>.lite.bunnydb.net/v2/pipeline`)
92    /// - `BUNNYDB_TOKEN` — access token (Bearer prefix optional)
93    ///
94    /// Returns an error if either variable is missing or empty.
95    ///
96    /// # Example
97    ///
98    /// ```no_run
99    /// use bunnydb_http::BunnyDbClient;
100    ///
101    /// // Set env vars via shell or .env loader, then:
102    /// let db = BunnyDbClient::from_env().expect("missing BUNNYDB_* env vars");
103    /// ```
104    pub fn from_env() -> std::result::Result<Self, String> {
105        let url = std::env::var("BUNNYDB_PIPELINE_URL")
106            .map_err(|_| "missing BUNNYDB_PIPELINE_URL environment variable".to_owned())?;
107        let token = std::env::var("BUNNYDB_TOKEN")
108            .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
109        if url.trim().is_empty() {
110            return Err("BUNNYDB_PIPELINE_URL is set but empty".to_owned());
111        }
112        if token.trim().is_empty() {
113            return Err("BUNNYDB_TOKEN is set but empty".to_owned());
114        }
115        Ok(Self::new_bearer(url, token))
116    }
117
118    /// Creates a client from a **database ID** read from the environment,
119    /// combined with an access token also read from the environment.
120    ///
121    /// Reads:
122    /// - `BUNNYDB_ID` — the database ID (e.g. `my-db-abc123`)
123    /// - `BUNNYDB_TOKEN` — access token
124    ///
125    /// The pipeline URL is derived from the database ID automatically.
126    ///
127    /// # Example
128    ///
129    /// ```no_run
130    /// use bunnydb_http::BunnyDbClient;
131    ///
132    /// let db = BunnyDbClient::from_env_db_id().expect("missing BUNNYDB_ID / BUNNYDB_TOKEN");
133    /// ```
134    pub fn from_env_db_id() -> std::result::Result<Self, String> {
135        let db_id = std::env::var("BUNNYDB_ID")
136            .map_err(|_| "missing BUNNYDB_ID environment variable".to_owned())?;
137        let token = std::env::var("BUNNYDB_TOKEN")
138            .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
139        if db_id.trim().is_empty() {
140            return Err("BUNNYDB_ID is set but empty".to_owned());
141        }
142        if token.trim().is_empty() {
143            return Err("BUNNYDB_TOKEN is set but empty".to_owned());
144        }
145        Ok(Self::from_db_id(db_id, token))
146    }
147
148    /// Applies client options such as timeout and retry behavior.
149    pub fn with_options(mut self, opts: ClientOptions) -> Self {
150        self.options = opts;
151        self
152    }
153
154    /// Executes a query statement and returns rows.
155    pub async fn query<P: Into<Params>>(&self, sql: &str, params: P) -> Result<QueryResult> {
156        let result = self.run_single(sql, params.into(), true).await?;
157        decode_query_result(result)
158    }
159
160    /// Executes a statement and returns execution metadata.
161    pub async fn execute<P: Into<Params>>(&self, sql: &str, params: P) -> Result<ExecResult> {
162        let result = self.run_single(sql, params.into(), false).await?;
163        decode_exec_result(result)
164    }
165
166    /// Sends multiple statements in one pipeline request.
167    ///
168    /// SQL errors at statement level are returned as
169    /// [`StatementOutcome::SqlError`] instead of failing the entire batch.
170    pub async fn batch<I>(&self, statements: I) -> Result<Vec<StatementOutcome>>
171    where
172        I: IntoIterator<Item = Statement>,
173    {
174        let statements: Vec<Statement> = statements.into_iter().collect();
175        let mut requests = Vec::with_capacity(statements.len() + 1);
176        let mut wants_rows = Vec::with_capacity(statements.len());
177
178        for statement in statements {
179            let stmt =
180                build_execute_statement(&statement.sql, statement.params, statement.want_rows)?;
181            requests.push(Request::Execute { stmt });
182            wants_rows.push(statement.want_rows);
183        }
184
185        requests.push(Request::Close {});
186        let payload = PipelineRequest { requests };
187        let response = self.send_pipeline_with_retry(&payload).await?;
188
189        let expected = wants_rows.len() + 1;
190        if response.results.len() != expected {
191            return Err(BunnyDbError::Decode(format!(
192                "result count mismatch: expected {expected}, got {}",
193                response.results.len()
194            )));
195        }
196
197        let mut results = response.results.into_iter();
198        let mut outcomes = Vec::with_capacity(wants_rows.len());
199
200        for (index, want_rows) in wants_rows.into_iter().enumerate() {
201            let result = results.next().ok_or_else(|| {
202                BunnyDbError::Decode(format!("missing execute result at index {index}"))
203            })?;
204            outcomes.push(Self::decode_statement_outcome(result, index, want_rows)?);
205        }
206
207        let close_index = outcomes.len();
208        let close = results.next().ok_or_else(|| {
209            BunnyDbError::Decode(format!("missing close result at index {close_index}"))
210        })?;
211        Self::ensure_close_success(close, close_index)?;
212
213        Ok(outcomes)
214    }
215
216    async fn run_single(
217        &self,
218        sql: &str,
219        params: Params,
220        want_rows: bool,
221    ) -> Result<wire::ExecuteResult> {
222        let execute_stmt = build_execute_statement(sql, params, want_rows)?;
223        let payload = PipelineRequest {
224            requests: vec![Request::Execute { stmt: execute_stmt }, Request::Close {}],
225        };
226        let response = self.send_pipeline_with_retry(&payload).await?;
227
228        if response.results.len() != 2 {
229            return Err(BunnyDbError::Decode(format!(
230                "result count mismatch: expected 2, got {}",
231                response.results.len()
232            )));
233        }
234
235        let mut iter = response.results.into_iter();
236        let execute = iter
237            .next()
238            .ok_or_else(|| BunnyDbError::Decode("missing execute result".to_owned()))?;
239        let close = iter
240            .next()
241            .ok_or_else(|| BunnyDbError::Decode("missing close result".to_owned()))?;
242
243        let execute_result = Self::into_execute_result(execute, 0)?;
244        Self::ensure_close_success(close, 1)?;
245        Ok(execute_result)
246    }
247
248    async fn send_pipeline_with_retry(
249        &self,
250        payload: &PipelineRequest,
251    ) -> Result<wire::PipelineResponse> {
252        let mut attempt = 0usize;
253        loop {
254            let response = self
255                .http
256                .post(&self.pipeline_url)
257                .header(header::AUTHORIZATION, &self.token)
258                .header(header::CONTENT_TYPE, "application/json")
259                .timeout(Duration::from_millis(self.options.timeout_ms))
260                .json(payload)
261                .send()
262                .await;
263
264            match response {
265                Ok(response) => {
266                    let status = response.status();
267                    let body = response.text().await.map_err(BunnyDbError::Transport)?;
268
269                    if !status.is_success() {
270                        if self.should_retry_status(status) && attempt < self.options.max_retries {
271                            self.wait_before_retry(attempt).await;
272                            attempt += 1;
273                            continue;
274                        }
275
276                        return Err(BunnyDbError::Http {
277                            status: status.as_u16(),
278                            body,
279                        });
280                    }
281
282                    return serde_json::from_str::<wire::PipelineResponse>(&body).map_err(|err| {
283                        BunnyDbError::Decode(format!(
284                            "invalid pipeline response JSON: {err}; body: {body}"
285                        ))
286                    });
287                }
288                Err(err) => {
289                    if self.should_retry_transport(&err) && attempt < self.options.max_retries {
290                        self.wait_before_retry(attempt).await;
291                        attempt += 1;
292                        continue;
293                    }
294                    return Err(BunnyDbError::Transport(err));
295                }
296            }
297        }
298    }
299
300    fn decode_statement_outcome(
301        result: wire::PipelineResult,
302        request_index: usize,
303        want_rows: bool,
304    ) -> Result<StatementOutcome> {
305        match result.kind.as_str() {
306            "ok" => {
307                let execute_result = Self::into_execute_result(result, request_index)?;
308                if want_rows {
309                    Ok(StatementOutcome::Query(decode_query_result(
310                        execute_result,
311                    )?))
312                } else {
313                    Ok(StatementOutcome::Exec(decode_exec_result(execute_result)?))
314                }
315            }
316            "error" => {
317                let error = result.error.ok_or_else(|| {
318                    BunnyDbError::Decode(format!(
319                        "missing error payload for request {request_index}"
320                    ))
321                })?;
322                Ok(StatementOutcome::SqlError {
323                    request_index,
324                    message: error.message,
325                    code: error.code,
326                })
327            }
328            other => Err(BunnyDbError::Decode(format!(
329                "unknown pipeline result type '{other}' at request {request_index}"
330            ))),
331        }
332    }
333
334    fn into_execute_result(
335        result: wire::PipelineResult,
336        request_index: usize,
337    ) -> Result<wire::ExecuteResult> {
338        match result.kind.as_str() {
339            "ok" => {
340                let response = result.response.ok_or_else(|| {
341                    BunnyDbError::Decode(format!(
342                        "missing response payload for request {request_index}"
343                    ))
344                })?;
345                if response.kind != "execute" {
346                    return Err(BunnyDbError::Decode(format!(
347                        "expected execute response at request {request_index}, got '{}'",
348                        response.kind
349                    )));
350                }
351                response.result.ok_or_else(|| {
352                    BunnyDbError::Decode(format!(
353                        "missing execute result payload at request {request_index}"
354                    ))
355                })
356            }
357            "error" => {
358                let error = result.error.ok_or_else(|| {
359                    BunnyDbError::Decode(format!(
360                        "missing error payload for request {request_index}"
361                    ))
362                })?;
363                Err(BunnyDbError::Pipeline {
364                    request_index,
365                    message: error.message,
366                    code: error.code,
367                })
368            }
369            other => Err(BunnyDbError::Decode(format!(
370                "unknown pipeline result type '{other}' at request {request_index}"
371            ))),
372        }
373    }
374
375    fn ensure_close_success(result: wire::PipelineResult, request_index: usize) -> Result<()> {
376        match result.kind.as_str() {
377            "ok" => {
378                let response = result.response.ok_or_else(|| {
379                    BunnyDbError::Decode(format!(
380                        "missing close response payload for request {request_index}"
381                    ))
382                })?;
383                if response.kind != "close" {
384                    return Err(BunnyDbError::Decode(format!(
385                        "expected close response at request {request_index}, got '{}'",
386                        response.kind
387                    )));
388                }
389                Ok(())
390            }
391            "error" => {
392                let error = result.error.ok_or_else(|| {
393                    BunnyDbError::Decode(format!(
394                        "missing error payload for close request {request_index}"
395                    ))
396                })?;
397                Err(BunnyDbError::Pipeline {
398                    request_index,
399                    message: error.message,
400                    code: error.code,
401                })
402            }
403            other => Err(BunnyDbError::Decode(format!(
404                "unknown pipeline result type '{other}' at request {request_index}"
405            ))),
406        }
407    }
408
409    fn should_retry_status(&self, status: StatusCode) -> bool {
410        matches!(
411            status,
412            StatusCode::TOO_MANY_REQUESTS
413                | StatusCode::INTERNAL_SERVER_ERROR
414                | StatusCode::BAD_GATEWAY
415                | StatusCode::SERVICE_UNAVAILABLE
416                | StatusCode::GATEWAY_TIMEOUT
417        )
418    }
419
420    fn should_retry_transport(&self, err: &reqwest::Error) -> bool {
421        err.is_timeout() || err.is_connect() || err.is_request() || err.is_body()
422    }
423
424    async fn wait_before_retry(&self, attempt: usize) {
425        let exp = attempt.min(16) as u32;
426        let multiplier = 1u64 << exp;
427        let delay_ms = self.options.retry_backoff_ms.saturating_mul(multiplier);
428        #[cfg(feature = "tracing")]
429        tracing::debug!("retrying pipeline request after {} ms", delay_ms);
430        sleep(Duration::from_millis(delay_ms)).await;
431    }
432}
433
434fn normalize_bearer_authorization(token: &str) -> String {
435    let trimmed = token.trim();
436    let prefix = trimmed.get(..7);
437    if prefix.is_some_and(|value| value.eq_ignore_ascii_case("bearer ")) {
438        trimmed.to_owned()
439    } else {
440        format!("Bearer {trimmed}")
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::{normalize_bearer_authorization, BunnyDbClient};
447
448    #[test]
449    fn normalize_bearer_adds_prefix_when_missing() {
450        assert_eq!(
451            normalize_bearer_authorization("abc123"),
452            "Bearer abc123".to_owned()
453        );
454    }
455
456    #[test]
457    fn normalize_bearer_keeps_existing_prefix() {
458        assert_eq!(
459            normalize_bearer_authorization("bEaReR abc123"),
460            "bEaReR abc123".to_owned()
461        );
462    }
463
464    #[test]
465    fn debug_redacts_authorization_value() {
466        let client = BunnyDbClient::new_raw_auth("https://db/v2/pipeline", "secret-token");
467        let debug = format!("{client:?}");
468        assert!(debug.contains("<redacted>"));
469        assert!(!debug.contains("secret-token"));
470    }
471}