1use std::{fmt, time::Duration};
2
3use reqwest::{header, StatusCode};
4use tokio::time::sleep;
5
6use crate::{
7 decode::{build_execute_statement, decode_exec_result, decode_query_result},
8 wire::{self, PipelineRequest, Request},
9 BunnyDbError, ClientOptions, ExecResult, Params, QueryResult, Result, Statement,
10 StatementOutcome,
11};
12
13pub fn db_id_to_pipeline_url(db_id: &str) -> String {
17 format!("https://{}.lite.bunnydb.net/v2/pipeline", db_id.trim())
18}
19
20#[derive(Clone)]
21pub struct BunnyDbClient {
23 http: reqwest::Client,
24 pipeline_url: String,
25 token: String,
26 options: ClientOptions,
27}
28
29impl fmt::Debug for BunnyDbClient {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 f.debug_struct("BunnyDbClient")
32 .field("pipeline_url", &self.pipeline_url)
33 .field("token", &"<redacted>")
34 .field("options", &self.options)
35 .finish()
36 }
37}
38
39impl BunnyDbClient {
40 pub fn new(pipeline_url: impl Into<String>, token: impl Into<String>) -> Self {
45 Self::new_raw_auth(pipeline_url, token)
46 }
47
48 pub fn new_raw_auth(pipeline_url: impl Into<String>, authorization: impl Into<String>) -> Self {
52 Self {
53 http: reqwest::Client::new(),
54 pipeline_url: pipeline_url.into(),
55 token: authorization.into(),
56 options: ClientOptions::default(),
57 }
58 }
59
60 pub fn new_bearer(pipeline_url: impl Into<String>, token: impl AsRef<str>) -> Self {
64 let authorization = normalize_bearer_authorization(token.as_ref());
65 Self::new_raw_auth(pipeline_url, authorization)
66 }
67
68 pub fn from_db_id(db_id: impl AsRef<str>, token: impl AsRef<str>) -> Self {
83 let url = db_id_to_pipeline_url(db_id.as_ref());
84 Self::new_bearer(url, token)
85 }
86
87 pub fn from_env() -> std::result::Result<Self, String> {
105 let url = std::env::var("BUNNYDB_PIPELINE_URL")
106 .map_err(|_| "missing BUNNYDB_PIPELINE_URL environment variable".to_owned())?;
107 let token = std::env::var("BUNNYDB_TOKEN")
108 .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
109 if url.trim().is_empty() {
110 return Err("BUNNYDB_PIPELINE_URL is set but empty".to_owned());
111 }
112 if token.trim().is_empty() {
113 return Err("BUNNYDB_TOKEN is set but empty".to_owned());
114 }
115 Ok(Self::new_bearer(url, token))
116 }
117
118 pub fn from_env_db_id() -> std::result::Result<Self, String> {
135 let db_id = std::env::var("BUNNYDB_ID")
136 .map_err(|_| "missing BUNNYDB_ID environment variable".to_owned())?;
137 let token = std::env::var("BUNNYDB_TOKEN")
138 .map_err(|_| "missing BUNNYDB_TOKEN environment variable".to_owned())?;
139 if db_id.trim().is_empty() {
140 return Err("BUNNYDB_ID is set but empty".to_owned());
141 }
142 if token.trim().is_empty() {
143 return Err("BUNNYDB_TOKEN is set but empty".to_owned());
144 }
145 Ok(Self::from_db_id(db_id, token))
146 }
147
148 pub fn with_options(mut self, opts: ClientOptions) -> Self {
150 self.options = opts;
151 self
152 }
153
154 pub async fn query<P: Into<Params>>(&self, sql: &str, params: P) -> Result<QueryResult> {
156 let result = self.run_single(sql, params.into(), true).await?;
157 decode_query_result(result)
158 }
159
160 pub async fn execute<P: Into<Params>>(&self, sql: &str, params: P) -> Result<ExecResult> {
162 let result = self.run_single(sql, params.into(), false).await?;
163 decode_exec_result(result)
164 }
165
166 pub async fn batch<I>(&self, statements: I) -> Result<Vec<StatementOutcome>>
171 where
172 I: IntoIterator<Item = Statement>,
173 {
174 let statements: Vec<Statement> = statements.into_iter().collect();
175 let mut requests = Vec::with_capacity(statements.len() + 1);
176 let mut wants_rows = Vec::with_capacity(statements.len());
177
178 for statement in statements {
179 let stmt =
180 build_execute_statement(&statement.sql, statement.params, statement.want_rows)?;
181 requests.push(Request::Execute { stmt });
182 wants_rows.push(statement.want_rows);
183 }
184
185 requests.push(Request::Close {});
186 let payload = PipelineRequest { requests };
187 let response = self.send_pipeline_with_retry(&payload).await?;
188
189 let expected = wants_rows.len() + 1;
190 if response.results.len() != expected {
191 return Err(BunnyDbError::Decode(format!(
192 "result count mismatch: expected {expected}, got {}",
193 response.results.len()
194 )));
195 }
196
197 let mut results = response.results.into_iter();
198 let mut outcomes = Vec::with_capacity(wants_rows.len());
199
200 for (index, want_rows) in wants_rows.into_iter().enumerate() {
201 let result = results.next().ok_or_else(|| {
202 BunnyDbError::Decode(format!("missing execute result at index {index}"))
203 })?;
204 outcomes.push(Self::decode_statement_outcome(result, index, want_rows)?);
205 }
206
207 let close_index = outcomes.len();
208 let close = results.next().ok_or_else(|| {
209 BunnyDbError::Decode(format!("missing close result at index {close_index}"))
210 })?;
211 Self::ensure_close_success(close, close_index)?;
212
213 Ok(outcomes)
214 }
215
216 async fn run_single(
217 &self,
218 sql: &str,
219 params: Params,
220 want_rows: bool,
221 ) -> Result<wire::ExecuteResult> {
222 let execute_stmt = build_execute_statement(sql, params, want_rows)?;
223 let payload = PipelineRequest {
224 requests: vec![Request::Execute { stmt: execute_stmt }, Request::Close {}],
225 };
226 let response = self.send_pipeline_with_retry(&payload).await?;
227
228 if response.results.len() != 2 {
229 return Err(BunnyDbError::Decode(format!(
230 "result count mismatch: expected 2, got {}",
231 response.results.len()
232 )));
233 }
234
235 let mut iter = response.results.into_iter();
236 let execute = iter
237 .next()
238 .ok_or_else(|| BunnyDbError::Decode("missing execute result".to_owned()))?;
239 let close = iter
240 .next()
241 .ok_or_else(|| BunnyDbError::Decode("missing close result".to_owned()))?;
242
243 let execute_result = Self::into_execute_result(execute, 0)?;
244 Self::ensure_close_success(close, 1)?;
245 Ok(execute_result)
246 }
247
248 async fn send_pipeline_with_retry(
249 &self,
250 payload: &PipelineRequest,
251 ) -> Result<wire::PipelineResponse> {
252 let mut attempt = 0usize;
253 loop {
254 let response = self
255 .http
256 .post(&self.pipeline_url)
257 .header(header::AUTHORIZATION, &self.token)
258 .header(header::CONTENT_TYPE, "application/json")
259 .timeout(Duration::from_millis(self.options.timeout_ms))
260 .json(payload)
261 .send()
262 .await;
263
264 match response {
265 Ok(response) => {
266 let status = response.status();
267 let body = response.text().await.map_err(BunnyDbError::Transport)?;
268
269 if !status.is_success() {
270 if self.should_retry_status(status) && attempt < self.options.max_retries {
271 self.wait_before_retry(attempt).await;
272 attempt += 1;
273 continue;
274 }
275
276 return Err(BunnyDbError::Http {
277 status: status.as_u16(),
278 body,
279 });
280 }
281
282 return serde_json::from_str::<wire::PipelineResponse>(&body).map_err(|err| {
283 BunnyDbError::Decode(format!(
284 "invalid pipeline response JSON: {err}; body: {body}"
285 ))
286 });
287 }
288 Err(err) => {
289 if self.should_retry_transport(&err) && attempt < self.options.max_retries {
290 self.wait_before_retry(attempt).await;
291 attempt += 1;
292 continue;
293 }
294 return Err(BunnyDbError::Transport(err));
295 }
296 }
297 }
298 }
299
300 fn decode_statement_outcome(
301 result: wire::PipelineResult,
302 request_index: usize,
303 want_rows: bool,
304 ) -> Result<StatementOutcome> {
305 match result.kind.as_str() {
306 "ok" => {
307 let execute_result = Self::into_execute_result(result, request_index)?;
308 if want_rows {
309 Ok(StatementOutcome::Query(decode_query_result(
310 execute_result,
311 )?))
312 } else {
313 Ok(StatementOutcome::Exec(decode_exec_result(execute_result)?))
314 }
315 }
316 "error" => {
317 let error = result.error.ok_or_else(|| {
318 BunnyDbError::Decode(format!(
319 "missing error payload for request {request_index}"
320 ))
321 })?;
322 Ok(StatementOutcome::SqlError {
323 request_index,
324 message: error.message,
325 code: error.code,
326 })
327 }
328 other => Err(BunnyDbError::Decode(format!(
329 "unknown pipeline result type '{other}' at request {request_index}"
330 ))),
331 }
332 }
333
334 fn into_execute_result(
335 result: wire::PipelineResult,
336 request_index: usize,
337 ) -> Result<wire::ExecuteResult> {
338 match result.kind.as_str() {
339 "ok" => {
340 let response = result.response.ok_or_else(|| {
341 BunnyDbError::Decode(format!(
342 "missing response payload for request {request_index}"
343 ))
344 })?;
345 if response.kind != "execute" {
346 return Err(BunnyDbError::Decode(format!(
347 "expected execute response at request {request_index}, got '{}'",
348 response.kind
349 )));
350 }
351 response.result.ok_or_else(|| {
352 BunnyDbError::Decode(format!(
353 "missing execute result payload at request {request_index}"
354 ))
355 })
356 }
357 "error" => {
358 let error = result.error.ok_or_else(|| {
359 BunnyDbError::Decode(format!(
360 "missing error payload for request {request_index}"
361 ))
362 })?;
363 Err(BunnyDbError::Pipeline {
364 request_index,
365 message: error.message,
366 code: error.code,
367 })
368 }
369 other => Err(BunnyDbError::Decode(format!(
370 "unknown pipeline result type '{other}' at request {request_index}"
371 ))),
372 }
373 }
374
375 fn ensure_close_success(result: wire::PipelineResult, request_index: usize) -> Result<()> {
376 match result.kind.as_str() {
377 "ok" => {
378 let response = result.response.ok_or_else(|| {
379 BunnyDbError::Decode(format!(
380 "missing close response payload for request {request_index}"
381 ))
382 })?;
383 if response.kind != "close" {
384 return Err(BunnyDbError::Decode(format!(
385 "expected close response at request {request_index}, got '{}'",
386 response.kind
387 )));
388 }
389 Ok(())
390 }
391 "error" => {
392 let error = result.error.ok_or_else(|| {
393 BunnyDbError::Decode(format!(
394 "missing error payload for close request {request_index}"
395 ))
396 })?;
397 Err(BunnyDbError::Pipeline {
398 request_index,
399 message: error.message,
400 code: error.code,
401 })
402 }
403 other => Err(BunnyDbError::Decode(format!(
404 "unknown pipeline result type '{other}' at request {request_index}"
405 ))),
406 }
407 }
408
409 fn should_retry_status(&self, status: StatusCode) -> bool {
410 matches!(
411 status,
412 StatusCode::TOO_MANY_REQUESTS
413 | StatusCode::INTERNAL_SERVER_ERROR
414 | StatusCode::BAD_GATEWAY
415 | StatusCode::SERVICE_UNAVAILABLE
416 | StatusCode::GATEWAY_TIMEOUT
417 )
418 }
419
420 fn should_retry_transport(&self, err: &reqwest::Error) -> bool {
421 err.is_timeout() || err.is_connect() || err.is_request() || err.is_body()
422 }
423
424 async fn wait_before_retry(&self, attempt: usize) {
425 let exp = attempt.min(16) as u32;
426 let multiplier = 1u64 << exp;
427 let delay_ms = self.options.retry_backoff_ms.saturating_mul(multiplier);
428 #[cfg(feature = "tracing")]
429 tracing::debug!("retrying pipeline request after {} ms", delay_ms);
430 sleep(Duration::from_millis(delay_ms)).await;
431 }
432}
433
434fn normalize_bearer_authorization(token: &str) -> String {
435 let trimmed = token.trim();
436 let prefix = trimmed.get(..7);
437 if prefix.is_some_and(|value| value.eq_ignore_ascii_case("bearer ")) {
438 trimmed.to_owned()
439 } else {
440 format!("Bearer {trimmed}")
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::{normalize_bearer_authorization, BunnyDbClient};
447
448 #[test]
449 fn normalize_bearer_adds_prefix_when_missing() {
450 assert_eq!(
451 normalize_bearer_authorization("abc123"),
452 "Bearer abc123".to_owned()
453 );
454 }
455
456 #[test]
457 fn normalize_bearer_keeps_existing_prefix() {
458 assert_eq!(
459 normalize_bearer_authorization("bEaReR abc123"),
460 "bEaReR abc123".to_owned()
461 );
462 }
463
464 #[test]
465 fn debug_redacts_authorization_value() {
466 let client = BunnyDbClient::new_raw_auth("https://db/v2/pipeline", "secret-token");
467 let debug = format!("{client:?}");
468 assert!(debug.contains("<redacted>"));
469 assert!(!debug.contains("secret-token"));
470 }
471}