1use bsql_driver_postgres::arena::release_arena;
10use bsql_driver_postgres::codec::Encode;
11use bsql_driver_postgres::{Arena, QueryResult};
12
13use crate::error::{BsqlError, BsqlResult};
14use crate::pool::{Pool, PoolConnection};
15use crate::transaction::Transaction;
16
17pub struct OwnedResult {
22 pub result: QueryResult,
23 arena: Arena,
24}
25
26impl std::fmt::Debug for OwnedResult {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("OwnedResult")
29 .field("rows", &self.result.len())
30 .finish()
31 }
32}
33
34impl OwnedResult {
35 pub(crate) fn without_arena(result: QueryResult) -> Self {
38 Self {
39 result,
40 arena: Arena::empty(),
41 }
42 }
43
44 pub fn len(&self) -> usize {
46 self.result.len()
47 }
48
49 pub fn is_empty(&self) -> bool {
51 self.result.is_empty()
52 }
53
54 pub fn row(&self, idx: usize) -> bsql_driver_postgres::Row<'_> {
56 self.result.row(idx, &self.arena)
57 }
58
59 pub fn iter(&self) -> impl Iterator<Item = bsql_driver_postgres::Row<'_>> {
61 self.result.rows(&self.arena)
62 }
63}
64
65impl Drop for OwnedResult {
66 fn drop(&mut self) {
67 let arena = std::mem::take(&mut self.arena);
69 release_arena(arena);
70 if let Some(buf) = self.result.take_data_buf() {
72 bsql_driver_postgres::release_resp_buf(buf);
73 }
74 let col_offsets = self.result.take_col_offsets();
76 if col_offsets.capacity() > 0 {
77 bsql_driver_postgres::release_col_offsets(col_offsets);
78 }
79 }
80}
81
82#[cfg(feature = "async")]
98pub trait Executor: Send + Sync {
99 fn query_raw<'a>(
101 &'a self,
102 sql: &'a str,
103 sql_hash: u64,
104 params: &'a [&'a (dyn Encode + Sync)],
105 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a;
106
107 fn query_raw_readonly<'a>(
109 &'a self,
110 sql: &'a str,
111 sql_hash: u64,
112 params: &'a [&'a (dyn Encode + Sync)],
113 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a;
114
115 fn execute_raw<'a>(
117 &'a self,
118 sql: &'a str,
119 sql_hash: u64,
120 params: &'a [&'a (dyn Encode + Sync)],
121 ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a;
122}
123
124#[cfg(not(feature = "async"))]
126pub trait Executor {
127 fn query_raw(
128 &self,
129 sql: &str,
130 sql_hash: u64,
131 params: &[&(dyn Encode + Sync)],
132 ) -> BsqlResult<OwnedResult>;
133
134 fn query_raw_readonly(
135 &self,
136 sql: &str,
137 sql_hash: u64,
138 params: &[&(dyn Encode + Sync)],
139 ) -> BsqlResult<OwnedResult>;
140
141 fn execute_raw(
142 &self,
143 sql: &str,
144 sql_hash: u64,
145 params: &[&(dyn Encode + Sync)],
146 ) -> BsqlResult<u64>;
147}
148
149#[cfg(feature = "async")]
157#[allow(clippy::manual_async_fn)] impl Executor for Pool {
159 #[inline]
160 fn query_raw<'a>(
161 &'a self,
162 sql: &'a str,
163 sql_hash: u64,
164 params: &'a [&'a (dyn Encode + Sync)],
165 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
166 async move {
167 let mut guard = self.inner.acquire_async().await.map_err(BsqlError::from)?;
168 let result = guard
169 .query_async(sql, sql_hash, params)
170 .await
171 .map_err(BsqlError::from_driver_query)?;
172 Ok(OwnedResult::without_arena(result))
173 }
174 }
175
176 #[inline]
177 fn query_raw_readonly<'a>(
178 &'a self,
179 sql: &'a str,
180 sql_hash: u64,
181 params: &'a [&'a (dyn Encode + Sync)],
182 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
183 async move {
184 let pool = self.read_pool.as_ref().unwrap_or(&self.inner);
185 let mut guard = pool.acquire_async().await.map_err(BsqlError::from)?;
186 let result = guard
187 .query_async(sql, sql_hash, params)
188 .await
189 .map_err(BsqlError::from_driver_query)?;
190 Ok(OwnedResult::without_arena(result))
191 }
192 }
193
194 #[inline]
195 fn execute_raw<'a>(
196 &'a self,
197 sql: &'a str,
198 sql_hash: u64,
199 params: &'a [&'a (dyn Encode + Sync)],
200 ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a {
201 async move {
202 let mut guard = self.inner.acquire_async().await.map_err(BsqlError::from)?;
203 guard
204 .execute_async(sql, sql_hash, params)
205 .await
206 .map_err(BsqlError::from_driver_query)
207 }
208 }
209}
210
211#[cfg(not(feature = "async"))]
213impl Executor for Pool {
214 #[inline]
215 fn query_raw(
216 &self,
217 sql: &str,
218 sql_hash: u64,
219 params: &[&(dyn Encode + Sync)],
220 ) -> BsqlResult<OwnedResult> {
221 let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
222 let result = guard
223 .query(sql, sql_hash, params)
224 .map_err(BsqlError::from_driver_query)?;
225 Ok(OwnedResult::without_arena(result))
226 }
227
228 #[inline]
229 fn query_raw_readonly(
230 &self,
231 sql: &str,
232 sql_hash: u64,
233 params: &[&(dyn Encode + Sync)],
234 ) -> BsqlResult<OwnedResult> {
235 let pool = self.read_pool.as_ref().unwrap_or(&self.inner);
236 let mut guard = pool.acquire().map_err(BsqlError::from)?;
237 let result = guard
238 .query(sql, sql_hash, params)
239 .map_err(BsqlError::from_driver_query)?;
240 Ok(OwnedResult::without_arena(result))
241 }
242
243 #[inline]
244 fn execute_raw(
245 &self,
246 sql: &str,
247 sql_hash: u64,
248 params: &[&(dyn Encode + Sync)],
249 ) -> BsqlResult<u64> {
250 let mut guard = self.inner.acquire().map_err(BsqlError::from)?;
251 guard
252 .execute(sql, sql_hash, params)
253 .map_err(BsqlError::from_driver_query)
254 }
255}
256
257#[cfg(feature = "async")]
262#[allow(clippy::manual_async_fn)]
263impl Executor for PoolConnection {
264 #[inline]
265 fn query_raw<'a>(
266 &'a self,
267 sql: &'a str,
268 sql_hash: u64,
269 params: &'a [&'a (dyn Encode + Sync)],
270 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
271 async move {
272 let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
273 let result = guard
274 .query(sql, sql_hash, params)
275 .map_err(BsqlError::from_driver_query)?;
276 Ok(OwnedResult::without_arena(result))
277 }
278 }
279
280 #[inline]
281 fn query_raw_readonly<'a>(
282 &'a self,
283 sql: &'a str,
284 sql_hash: u64,
285 params: &'a [&'a (dyn Encode + Sync)],
286 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
287 self.query_raw(sql, sql_hash, params)
288 }
289
290 #[inline]
291 fn execute_raw<'a>(
292 &'a self,
293 sql: &'a str,
294 sql_hash: u64,
295 params: &'a [&'a (dyn Encode + Sync)],
296 ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a {
297 async move {
298 let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
299 guard
300 .execute(sql, sql_hash, params)
301 .map_err(BsqlError::from_driver_query)
302 }
303 }
304}
305
306#[cfg(not(feature = "async"))]
307impl Executor for PoolConnection {
308 #[inline]
309 fn query_raw(
310 &self,
311 sql: &str,
312 sql_hash: u64,
313 params: &[&(dyn Encode + Sync)],
314 ) -> BsqlResult<OwnedResult> {
315 let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
316 let result = guard
317 .query(sql, sql_hash, params)
318 .map_err(BsqlError::from_driver_query)?;
319 Ok(OwnedResult::without_arena(result))
320 }
321
322 #[inline]
323 fn query_raw_readonly(
324 &self,
325 sql: &str,
326 sql_hash: u64,
327 params: &[&(dyn Encode + Sync)],
328 ) -> BsqlResult<OwnedResult> {
329 self.query_raw(sql, sql_hash, params)
330 }
331
332 #[inline]
333 fn execute_raw(
334 &self,
335 sql: &str,
336 sql_hash: u64,
337 params: &[&(dyn Encode + Sync)],
338 ) -> BsqlResult<u64> {
339 let mut guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
340 guard
341 .execute(sql, sql_hash, params)
342 .map_err(BsqlError::from_driver_query)
343 }
344}
345
346#[cfg(feature = "async")]
351#[allow(clippy::manual_async_fn)]
352impl Executor for Transaction {
353 fn query_raw<'a>(
354 &'a self,
355 sql: &'a str,
356 sql_hash: u64,
357 params: &'a [&'a (dyn Encode + Sync)],
358 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
359 async move { self.query_inner(sql, sql_hash, params) }
360 }
361
362 #[inline]
363 fn query_raw_readonly<'a>(
364 &'a self,
365 sql: &'a str,
366 sql_hash: u64,
367 params: &'a [&'a (dyn Encode + Sync)],
368 ) -> impl std::future::Future<Output = BsqlResult<OwnedResult>> + Send + 'a {
369 self.query_raw(sql, sql_hash, params)
370 }
371
372 #[inline]
373 fn execute_raw<'a>(
374 &'a self,
375 sql: &'a str,
376 sql_hash: u64,
377 params: &'a [&'a (dyn Encode + Sync)],
378 ) -> impl std::future::Future<Output = BsqlResult<u64>> + Send + 'a {
379 async move { self.execute_inner(sql, sql_hash, params) }
380 }
381}
382
383#[cfg(not(feature = "async"))]
384impl Executor for Transaction {
385 fn query_raw(
386 &self,
387 sql: &str,
388 sql_hash: u64,
389 params: &[&(dyn Encode + Sync)],
390 ) -> BsqlResult<OwnedResult> {
391 self.query_inner(sql, sql_hash, params)
392 }
393
394 #[inline]
395 fn query_raw_readonly(
396 &self,
397 sql: &str,
398 sql_hash: u64,
399 params: &[&(dyn Encode + Sync)],
400 ) -> BsqlResult<OwnedResult> {
401 self.query_raw(sql, sql_hash, params)
402 }
403
404 #[inline]
405 fn execute_raw(
406 &self,
407 sql: &str,
408 sql_hash: u64,
409 params: &[&(dyn Encode + Sync)],
410 ) -> BsqlResult<u64> {
411 self.execute_inner(sql, sql_hash, params)
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use bsql_driver_postgres::arena::{acquire_arena, release_arena};
419 use bsql_driver_postgres::{ColumnDesc, QueryResult};
420 use std::sync::Arc;
421
422 fn make_owned_result(num_rows: usize, num_cols: usize) -> OwnedResult {
426 let arena = acquire_arena();
427 let cols: Arc<[ColumnDesc]> = (0..num_cols)
428 .map(|i| ColumnDesc {
429 name: format!("c{i}").into(),
430 type_oid: 23, type_size: 4,
432 table_oid: 0,
433 column_id: 0,
434 })
435 .collect::<Vec<_>>()
436 .into();
437
438 let col_offsets: Vec<(usize, i32)> = vec![(0, -1); num_rows * num_cols]; let result = QueryResult::from_parts(col_offsets, num_cols, cols, 0);
440 OwnedResult { result, arena }
441 }
442
443 #[test]
446 fn owned_result_new_zero_rows() {
447 let owned = make_owned_result(0, 2);
448 assert_eq!(owned.len(), 0);
449 assert!(owned.is_empty());
450 }
451
452 #[test]
453 fn owned_result_new_single_row() {
454 let owned = make_owned_result(1, 3);
455 assert_eq!(owned.len(), 1);
456 assert!(!owned.is_empty());
457 }
458
459 #[test]
460 fn owned_result_new_multiple_rows() {
461 let owned = make_owned_result(5, 2);
462 assert_eq!(owned.len(), 5);
463 assert!(!owned.is_empty());
464 }
465
466 #[test]
469 fn owned_result_row_access() {
470 let owned = make_owned_result(3, 2);
471 let _r0 = owned.row(0);
473 let _r1 = owned.row(1);
474 let _r2 = owned.row(2);
475 }
476
477 #[test]
478 #[should_panic]
479 fn owned_result_row_out_of_bounds_panics() {
480 let owned = make_owned_result(2, 1);
481 let _r = owned.row(2); }
483
484 #[test]
487 fn owned_result_iter_count() {
488 let owned = make_owned_result(4, 2);
489 let count = owned.iter().count();
490 assert_eq!(count, 4);
491 }
492
493 #[test]
494 fn owned_result_iter_empty() {
495 let owned = make_owned_result(0, 2);
496 let count = owned.iter().count();
497 assert_eq!(count, 0);
498 }
499
500 #[test]
503 fn owned_result_drop_releases_arena() {
504 let owned = make_owned_result(1, 1);
507 drop(owned);
508 let arena = acquire_arena();
510 release_arena(arena);
511 }
512
513 #[test]
516 fn owned_result_zero_columns() {
517 let arena = acquire_arena();
519 let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
520 let result = QueryResult::from_parts(vec![], 0, cols, 42);
521 let owned = OwnedResult { result, arena };
522 assert_eq!(owned.len(), 0);
523 assert!(owned.is_empty());
524 assert_eq!(owned.result.affected_rows(), 42);
525 }
526
527 #[test]
530 fn owned_result_without_arena_len_zero() {
531 let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
532 let result = QueryResult::from_parts(vec![], 0, cols, 0);
533 let owned = OwnedResult::without_arena(result);
534 assert_eq!(owned.len(), 0);
535 }
536
537 #[test]
538 fn owned_result_without_arena_is_empty() {
539 let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
540 let result = QueryResult::from_parts(vec![], 0, cols, 0);
541 let owned = OwnedResult::without_arena(result);
542 assert!(owned.is_empty());
543 }
544
545 #[test]
546 fn owned_result_without_arena_with_rows() {
547 let cols: Arc<[ColumnDesc]> = vec![ColumnDesc {
548 name: "c0".into(),
549 type_oid: 23,
550 type_size: 4,
551 table_oid: 0,
552 column_id: 0,
553 }]
554 .into();
555 let col_offsets = vec![(0, -1); 3]; let result = QueryResult::from_parts(col_offsets, 1, cols, 0);
557 let owned = OwnedResult::without_arena(result);
558 assert_eq!(owned.len(), 3);
559 assert!(!owned.is_empty());
560 }
561
562 #[test]
565 fn owned_result_debug_format() {
566 let owned = make_owned_result(5, 2);
567 let dbg = format!("{owned:?}");
568 assert!(
569 dbg.contains("OwnedResult"),
570 "Debug should contain struct name: {dbg}"
571 );
572 assert!(dbg.contains("5"), "Debug should contain row count: {dbg}");
573 }
574
575 #[test]
578 fn owned_result_without_arena_drop_does_not_panic() {
579 let cols: Arc<[ColumnDesc]> = Arc::from(Vec::new());
580 let result = QueryResult::from_parts(vec![], 0, cols, 0);
581 let owned = OwnedResult::without_arena(result);
582 drop(owned); }
584
585 #[test]
588 fn pool_is_send_and_sync() {
589 fn _assert_send<T: Send>() {}
590 fn _assert_sync<T: Sync>() {}
591 _assert_send::<crate::pool::Pool>();
592 _assert_sync::<crate::pool::Pool>();
593 }
594
595 #[test]
596 fn pool_connection_is_send_and_sync() {
597 fn _assert_send<T: Send>() {}
598 fn _assert_sync<T: Sync>() {}
599 _assert_send::<crate::pool::PoolConnection>();
600 _assert_sync::<crate::pool::PoolConnection>();
601 }
602
603 #[test]
604 fn transaction_is_send_and_sync() {
605 fn _assert_send<T: Send>() {}
606 fn _assert_sync<T: Sync>() {}
607 _assert_send::<crate::transaction::Transaction>();
608 _assert_sync::<crate::transaction::Transaction>();
609 }
610
611 #[test]
612 fn owned_result_is_send_and_sync() {
613 fn _assert_send<T: Send>() {}
614 fn _assert_sync<T: Sync>() {}
615 _assert_send::<OwnedResult>();
616 _assert_sync::<OwnedResult>();
617 }
618
619 #[cfg(feature = "async")]
620 #[test]
621 fn executor_trait_requires_send_sync() {
622 fn _check_executor_bounds<E: Executor>() {
625 fn _assert_send_sync<T: Send + Sync>() {}
626 _assert_send_sync::<E>();
627 }
628 }
629}