1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
use futures_core::{future::BoxFuture, stream::BoxStream}; use futures_util::StreamExt; use crate::{describe::Describe, executor::Executor, pool::Pool, Database}; impl<DB> Executor for Pool<DB> where DB: Database, { type Database = DB; fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { Box::pin(async move { <&Pool<DB> as Executor>::send(&mut &*self, commands).await }) } fn execute<'e, 'q: 'e>( &'e mut self, query: &'q str, args: DB::Arguments, ) -> BoxFuture<'e, crate::Result<u64>> { Box::pin(async move { <&Pool<DB> as Executor>::execute(&mut &*self, query, args).await }) } fn fetch<'e, 'q: 'e>( &'e mut self, query: &'q str, args: DB::Arguments, ) -> BoxStream<'e, crate::Result<DB::Row>> { Box::pin(async_stream::try_stream! { let mut self_ = &*self; let mut s = <&Pool<DB> as Executor>::fetch(&mut self_, query, args); while let Some(row) = s.next().await.transpose()? { yield row; } }) } fn fetch_optional<'e, 'q: 'e>( &'e mut self, query: &'q str, args: DB::Arguments, ) -> BoxFuture<'e, crate::Result<Option<DB::Row>>> { Box::pin( async move { <&Pool<DB> as Executor>::fetch_optional(&mut &*self, query, args).await }, ) } fn describe<'e, 'q: 'e>( &'e mut self, query: &'q str, ) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> { Box::pin(async move { <&Pool<DB> as Executor>::describe(&mut &*self, query).await }) } } impl<DB> Executor for &'_ Pool<DB> where DB: Database, { type Database = DB; fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { Box::pin(async move { self.acquire().await?.send(commands).await }) } fn execute<'e, 'q: 'e>( &'e mut self, query: &'q str, args: DB::Arguments, ) -> BoxFuture<'e, crate::Result<u64>> { Box::pin(async move { self.acquire().await?.execute(query, args).await }) } fn fetch<'e, 'q: 'e>( &'e mut self, query: &'q str, args: DB::Arguments, ) -> BoxStream<'e, crate::Result<DB::Row>> { Box::pin(async_stream::try_stream! { let mut live = self.acquire().await?; let mut s = live.fetch(query, args); while let Some(row) = s.next().await.transpose()? { yield row; } }) } fn fetch_optional<'e, 'q: 'e>( &'e mut self, query: &'q str, args: DB::Arguments, ) -> BoxFuture<'e, crate::Result<Option<DB::Row>>> { Box::pin(async move { self.acquire().await?.fetch_optional(query, args).await }) } fn describe<'e, 'q: 'e>( &'e mut self, query: &'q str, ) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> { Box::pin(async move { self.acquire().await?.describe(query).await }) } }