1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use bytes::{Buf, Bytes};
use futures::stream::{self, Stream};
use std::io::{self, BufRead, Cursor, Read};
use std::marker::PhantomData;
use tokio_postgres::impls;
use tokio_postgres::Error;

/// The reader returned by the `copy_out` method.
pub struct CopyOutReader<'a> {
    it: stream::Wait<impls::CopyOut>,
    cur: Cursor<Bytes>,
    _p: PhantomData<&'a mut ()>,
}

// no-op impl to extend borrow until drop
impl<'a> Drop for CopyOutReader<'a> {
    fn drop(&mut self) {}
}

impl<'a> CopyOutReader<'a> {
    #[allow(clippy::new_ret_no_self)]
    pub(crate) fn new(stream: impls::CopyOut) -> Result<CopyOutReader<'a>, Error> {
        let mut it = stream.wait();
        let cur = match it.next() {
            Some(Ok(cur)) => cur,
            Some(Err(e)) => return Err(e),
            None => Bytes::new(),
        };

        Ok(CopyOutReader {
            it,
            cur: Cursor::new(cur),
            _p: PhantomData,
        })
    }
}

impl<'a> Read for CopyOutReader<'a> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        let b = self.fill_buf()?;
        let len = usize::min(buf.len(), b.len());
        buf[..len].copy_from_slice(&b[..len]);
        self.consume(len);
        Ok(len)
    }
}

impl<'a> BufRead for CopyOutReader<'a> {
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
        if self.cur.remaining() == 0 {
            match self.it.next() {
                Some(Ok(cur)) => self.cur = Cursor::new(cur),
                Some(Err(e)) => return Err(io::Error::new(io::ErrorKind::Other, e)),
                None => {}
            };
        }

        Ok(Buf::bytes(&self.cur))
    }

    fn consume(&mut self, amt: usize) {
        self.cur.advance(amt);
    }
}