1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
use futures_core::{future::BoxFuture, stream::BoxStream};
use futures_util::StreamExt;

use crate::{describe::Describe, executor::Executor, pool::Pool, Database};

impl<DB> Executor for Pool<DB>
where
    DB: Database,
{
    type Database = DB;

    fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
        Box::pin(async move { <&Pool<DB> as Executor>::send(&mut &*self, commands).await })
    }

    fn execute<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
        args: DB::Arguments,
    ) -> BoxFuture<'e, crate::Result<u64>> {
        Box::pin(async move { <&Pool<DB> as Executor>::execute(&mut &*self, query, args).await })
    }

    fn fetch<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
        args: DB::Arguments,
    ) -> BoxStream<'e, crate::Result<DB::Row>> {
        Box::pin(async_stream::try_stream! {
            let mut self_ = &*self;
            let mut s = <&Pool<DB> as Executor>::fetch(&mut self_, query, args);

            while let Some(row) = s.next().await.transpose()? {
                yield row;
            }
        })
    }

    fn fetch_optional<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
        args: DB::Arguments,
    ) -> BoxFuture<'e, crate::Result<Option<DB::Row>>> {
        Box::pin(
            async move { <&Pool<DB> as Executor>::fetch_optional(&mut &*self, query, args).await },
        )
    }

    fn describe<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
    ) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> {
        Box::pin(async move { <&Pool<DB> as Executor>::describe(&mut &*self, query).await })
    }
}

impl<DB> Executor for &'_ Pool<DB>
where
    DB: Database,
{
    type Database = DB;

    fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
        Box::pin(async move { self.acquire().await?.send(commands).await })
    }

    fn execute<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
        args: DB::Arguments,
    ) -> BoxFuture<'e, crate::Result<u64>> {
        Box::pin(async move { self.acquire().await?.execute(query, args).await })
    }

    fn fetch<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
        args: DB::Arguments,
    ) -> BoxStream<'e, crate::Result<DB::Row>> {
        Box::pin(async_stream::try_stream! {
            let mut live = self.acquire().await?;
            let mut s = live.fetch(query, args);

            while let Some(row) = s.next().await.transpose()? {
                yield row;
            }
        })
    }

    fn fetch_optional<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
        args: DB::Arguments,
    ) -> BoxFuture<'e, crate::Result<Option<DB::Row>>> {
        Box::pin(async move { self.acquire().await?.fetch_optional(query, args).await })
    }

    fn describe<'e, 'q: 'e>(
        &'e mut self,
        query: &'q str,
    ) -> BoxFuture<'e, crate::Result<Describe<Self::Database>>> {
        Box::pin(async move { self.acquire().await?.describe(query).await })
    }
}