1use std::fmt;
2use std::time::Duration;
3
4use reqwest::{header, StatusCode};
5
6#[cfg(not(target_arch = "wasm32"))]
8use tokio::time::sleep;
9
10use crate::{
11 decode::{build_execute_statement, decode_exec_result, decode_query_result},
12 wire::{self, PipelineRequest, Request},
13 BunnyDbError, ClientOptions, ExecResult, Params, QueryResult, Result, Statement,
14 StatementOutcome,
15};
16
17pub fn db_id_to_pipeline_url(db_id: &str) -> String {
21 format!("https://{}.lite.bunnydb.net/v2/pipeline", db_id.trim())
22}
23
24#[derive(Clone)]
25pub struct BunnyDbClient {
27 http: reqwest::Client,
28 pipeline_url: String,
29 token: String,
30 options: ClientOptions,
31}
32
33impl fmt::Debug for BunnyDbClient {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("BunnyDbClient")
36 .field("pipeline_url", &self.pipeline_url)
37 .field("token", &"<redacted>")
38 .field("options", &self.options)
39 .finish()
40 }
41}
42
43impl BunnyDbClient {
44 pub fn new(pipeline_url: impl Into<String>, token: impl Into<String>) -> Self {
49 Self::new_raw_auth(pipeline_url, token)
50 }
51
52 pub fn new_raw_auth(pipeline_url: impl Into<String>, authorization: impl Into<String>) -> Self {
56 Self {
57 http: reqwest::Client::new(),
58 pipeline_url: pipeline_url.into(),
59 token: authorization.into(),
60 options: ClientOptions::default(),
61 }
62 }
63
64 pub fn new_bearer(pipeline_url: impl Into<String>, token: impl AsRef<str>) -> Self {
68 let authorization = normalize_bearer_authorization(token.as_ref());
69 Self::new_raw_auth(pipeline_url, authorization)
70 }
71
72 pub fn from_db_id(db_id: impl AsRef<str>, token: impl AsRef<str>) -> Self {
87 let url = db_id_to_pipeline_url(db_id.as_ref());
88 Self::new_bearer(url, token)
89 }
90
91 #[cfg(not(target_arch = "wasm32"))]
112 pub fn from_env() -> std::result::Result<Self, String> {
113 let url = std::env::var("BUNNYDB_PIPELINE_URL")
114 .map_err(|_| "missing BUNNYDB_PIPELINE_URL environment variable".to_owned())?;
115 let token = std::env::var("BUNNYDB_TOKEN")
116 .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
117 if url.trim().is_empty() {
118 return Err("BUNNYDB_PIPELINE_URL is set but empty".to_owned());
119 }
120 if token.trim().is_empty() {
121 return Err("BUNNYDB_TOKEN is set but empty".to_owned());
122 }
123 Ok(Self::new_bearer(url, token))
124 }
125
126 #[cfg(not(target_arch = "wasm32"))]
145 pub fn from_env_db_id() -> std::result::Result<Self, String> {
146 let db_id = std::env::var("BUNNYDB_ID")
147 .map_err(|_| "missing BUNNYDB_ID environment variable".to_owned())?;
148 let token = std::env::var("BUNNYDB_TOKEN")
149 .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
150 if db_id.trim().is_empty() {
151 return Err("BUNNYDB_ID is set but empty".to_owned());
152 }
153 if token.trim().is_empty() {
154 return Err("BUNNYDB_TOKEN is set but empty".to_owned());
155 }
156 Ok(Self::from_db_id(db_id, token))
157 }
158
159 pub fn with_options(mut self, opts: ClientOptions) -> Self {
161 self.options = opts;
162 self
163 }
164
165 pub async fn query<P: Into<Params>>(&self, sql: &str, params: P) -> Result<QueryResult> {
167 let result = self.run_single(sql, params.into(), true).await?;
168 decode_query_result(result)
169 }
170
171 pub async fn execute<P: Into<Params>>(&self, sql: &str, params: P) -> Result<ExecResult> {
173 let result = self.run_single(sql, params.into(), false).await?;
174 decode_exec_result(result)
175 }
176
177 pub async fn batch<I>(&self, statements: I) -> Result<Vec<StatementOutcome>>
182 where
183 I: IntoIterator<Item = Statement>,
184 {
185 let statements: Vec<Statement> = statements.into_iter().collect();
186 let mut requests = Vec::with_capacity(statements.len() + 1);
187 let mut wants_rows = Vec::with_capacity(statements.len());
188
189 for statement in statements {
190 let stmt =
191 build_execute_statement(&statement.sql, statement.params, statement.want_rows)?;
192 requests.push(Request::Execute { stmt });
193 wants_rows.push(statement.want_rows);
194 }
195
196 requests.push(Request::Close {});
197 let payload = PipelineRequest { requests };
198 let response = self.send_pipeline_with_retry(&payload).await?;
199
200 let expected = wants_rows.len() + 1;
201 if response.results.len() != expected {
202 return Err(BunnyDbError::Decode(format!(
203 "result count mismatch: expected {expected}, got {}",
204 response.results.len()
205 )));
206 }
207
208 let mut results = response.results.into_iter();
209 let mut outcomes = Vec::with_capacity(wants_rows.len());
210
211 for (index, want_rows) in wants_rows.into_iter().enumerate() {
212 let result = results.next().ok_or_else(|| {
213 BunnyDbError::Decode(format!("missing execute result at index {index}"))
214 })?;
215 outcomes.push(Self::decode_statement_outcome(result, index, want_rows)?);
216 }
217
218 let close_index = outcomes.len();
219 let close = results.next().ok_or_else(|| {
220 BunnyDbError::Decode(format!("missing close result at index {close_index}"))
221 })?;
222 Self::ensure_close_success(close, close_index)?;
223
224 Ok(outcomes)
225 }
226
227 async fn run_single(
228 &self,
229 sql: &str,
230 params: Params,
231 want_rows: bool,
232 ) -> Result<wire::ExecuteResult> {
233 let execute_stmt = build_execute_statement(sql, params, want_rows)?;
234 let payload = PipelineRequest {
235 requests: vec![Request::Execute { stmt: execute_stmt }, Request::Close {}],
236 };
237 let response = self.send_pipeline_with_retry(&payload).await?;
238
239 if response.results.len() != 2 {
240 return Err(BunnyDbError::Decode(format!(
241 "result count mismatch: expected 2, got {}",
242 response.results.len()
243 )));
244 }
245
246 let mut iter = response.results.into_iter();
247 let execute = iter
248 .next()
249 .ok_or_else(|| BunnyDbError::Decode("missing execute result".to_owned()))?;
250 let close = iter
251 .next()
252 .ok_or_else(|| BunnyDbError::Decode("missing close result".to_owned()))?;
253
254 let execute_result = Self::into_execute_result(execute, 0)?;
255 Self::ensure_close_success(close, 1)?;
256 Ok(execute_result)
257 }
258
259 async fn send_pipeline_with_retry(
260 &self,
261 payload: &PipelineRequest,
262 ) -> Result<wire::PipelineResponse> {
263 let mut attempt = 0usize;
264 loop {
265 let response = self
268 .http
269 .post(&self.pipeline_url)
270 .header(header::AUTHORIZATION, &self.token)
271 .header(header::CONTENT_TYPE, "application/json")
272 .timeout(Duration::from_millis(self.options.timeout_ms))
273 .json(payload)
274 .send()
275 .await;
276
277 match response {
278 Ok(response) => {
279 let status = response.status();
280 let body = response.text().await.map_err(BunnyDbError::Transport)?;
281
282 if !status.is_success() {
283 if self.should_retry_status(status) && attempt < self.options.max_retries {
284 self.wait_before_retry(attempt).await;
285 attempt += 1;
286 continue;
287 }
288
289 return Err(BunnyDbError::Http {
290 status: status.as_u16(),
291 body,
292 });
293 }
294
295 return serde_json::from_str::<wire::PipelineResponse>(&body).map_err(|err| {
296 BunnyDbError::Decode(format!(
297 "invalid pipeline response JSON: {err}; body: {body}"
298 ))
299 });
300 }
301 Err(err) => {
302 if self.should_retry_transport(&err) && attempt < self.options.max_retries {
303 self.wait_before_retry(attempt).await;
304 attempt += 1;
305 continue;
306 }
307 return Err(BunnyDbError::Transport(err));
308 }
309 }
310 }
311 }
312
313 fn decode_statement_outcome(
314 result: wire::PipelineResult,
315 request_index: usize,
316 want_rows: bool,
317 ) -> Result<StatementOutcome> {
318 match result.kind.as_str() {
319 "ok" => {
320 let execute_result = Self::into_execute_result(result, request_index)?;
321 if want_rows {
322 Ok(StatementOutcome::Query(decode_query_result(
323 execute_result,
324 )?))
325 } else {
326 Ok(StatementOutcome::Exec(decode_exec_result(execute_result)?))
327 }
328 }
329 "error" => {
330 let error = result.error.ok_or_else(|| {
331 BunnyDbError::Decode(format!(
332 "missing error payload for request {request_index}"
333 ))
334 })?;
335 Ok(StatementOutcome::SqlError {
336 request_index,
337 message: error.message,
338 code: error.code,
339 })
340 }
341 other => Err(BunnyDbError::Decode(format!(
342 "unknown pipeline result type '{other}' at request {request_index}"
343 ))),
344 }
345 }
346
347 fn into_execute_result(
348 result: wire::PipelineResult,
349 request_index: usize,
350 ) -> Result<wire::ExecuteResult> {
351 match result.kind.as_str() {
352 "ok" => {
353 let response = result.response.ok_or_else(|| {
354 BunnyDbError::Decode(format!(
355 "missing response payload for request {request_index}"
356 ))
357 })?;
358 if response.kind != "execute" {
359 return Err(BunnyDbError::Decode(format!(
360 "expected execute response at request {request_index}, got '{}'",
361 response.kind
362 )));
363 }
364 response.result.ok_or_else(|| {
365 BunnyDbError::Decode(format!(
366 "missing execute result payload at request {request_index}"
367 ))
368 })
369 }
370 "error" => {
371 let error = result.error.ok_or_else(|| {
372 BunnyDbError::Decode(format!(
373 "missing error payload for request {request_index}"
374 ))
375 })?;
376 Err(BunnyDbError::Pipeline {
377 request_index,
378 message: error.message,
379 code: error.code,
380 })
381 }
382 other => Err(BunnyDbError::Decode(format!(
383 "unknown pipeline result type '{other}' at request {request_index}"
384 ))),
385 }
386 }
387
388 fn ensure_close_success(result: wire::PipelineResult, request_index: usize) -> Result<()> {
389 match result.kind.as_str() {
390 "ok" => {
391 let response = result.response.ok_or_else(|| {
392 BunnyDbError::Decode(format!(
393 "missing close response payload for request {request_index}"
394 ))
395 })?;
396 if response.kind != "close" {
397 return Err(BunnyDbError::Decode(format!(
398 "expected close response at request {request_index}, got '{}'",
399 response.kind
400 )));
401 }
402 Ok(())
403 }
404 "error" => {
405 let error = result.error.ok_or_else(|| {
406 BunnyDbError::Decode(format!(
407 "missing error payload for close request {request_index}"
408 ))
409 })?;
410 Err(BunnyDbError::Pipeline {
411 request_index,
412 message: error.message,
413 code: error.code,
414 })
415 }
416 other => Err(BunnyDbError::Decode(format!(
417 "unknown pipeline result type '{other}' at request {request_index}"
418 ))),
419 }
420 }
421
422 fn should_retry_status(&self, status: StatusCode) -> bool {
423 matches!(
424 status,
425 StatusCode::TOO_MANY_REQUESTS
426 | StatusCode::INTERNAL_SERVER_ERROR
427 | StatusCode::BAD_GATEWAY
428 | StatusCode::SERVICE_UNAVAILABLE
429 | StatusCode::GATEWAY_TIMEOUT
430 )
431 }
432
433 fn should_retry_transport(&self, err: &reqwest::Error) -> bool {
434 err.is_timeout()
435 || err.is_request()
436 || err.is_body()
437 || {
439 #[cfg(not(target_arch = "wasm32"))]
440 { err.is_connect() }
441 #[cfg(target_arch = "wasm32")]
442 { false }
443 }
444 }
445
446 async fn wait_before_retry(&self, attempt: usize) {
452 let exp = attempt.min(16) as u32;
453 let multiplier = 1u64 << exp;
454 let delay_ms = self.options.retry_backoff_ms.saturating_mul(multiplier);
455
456 #[cfg(feature = "tracing")]
457 tracing::debug!("retrying pipeline request after {} ms", delay_ms);
458
459 #[cfg(not(target_arch = "wasm32"))]
460 sleep(Duration::from_millis(delay_ms)).await;
461
462 #[cfg(target_arch = "wasm32")]
464 let _ = delay_ms;
465 }
466}
467
468fn normalize_bearer_authorization(token: &str) -> String {
469 let trimmed = token.trim();
470 let prefix = trimmed.get(..7);
471 if prefix.is_some_and(|value| value.eq_ignore_ascii_case("bearer ")) {
472 trimmed.to_owned()
473 } else {
474 format!("Bearer {trimmed}")
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use super::{normalize_bearer_authorization, BunnyDbClient};
481
482 #[test]
483 fn normalize_bearer_adds_prefix_when_missing() {
484 assert_eq!(
485 normalize_bearer_authorization("abc123"),
486 "Bearer abc123".to_owned()
487 );
488 }
489
490 #[test]
491 fn normalize_bearer_keeps_existing_prefix() {
492 assert_eq!(
493 normalize_bearer_authorization("bEaReR abc123"),
494 "bEaReR abc123".to_owned()
495 );
496 }
497
498 #[test]
499 fn debug_redacts_authorization_value() {
500 let client = BunnyDbClient::new_raw_auth("https://db/v2/pipeline", "secret-token");
501 let debug = format!("{client:?}");
502 assert!(debug.contains("<redacted>"));
503 assert!(!debug.contains("secret-token"));
504 }
505}