tokio_postgres/
copy_out.rs

1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::{query, slice_iter, Error, Statement};
5use bytes::Bytes;
6use futures_util::Stream;
7use log::debug;
8use pin_project_lite::pin_project;
9use postgres_protocol::message::backend::Message;
10use std::pin::Pin;
11use std::task::{ready, Context, Poll};
12
13pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
14    debug!("executing copy out statement {}", statement.name());
15
16    let buf = query::encode(client, &statement, slice_iter(&[]))?;
17    let responses = start(client, buf).await?;
18    Ok(CopyOutStream { responses })
19}
20
21async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
22    let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
23
24    match responses.next().await? {
25        Message::BindComplete => {}
26        _ => return Err(Error::unexpected_message()),
27    }
28
29    match responses.next().await? {
30        Message::CopyOutResponse(_) => {}
31        _ => return Err(Error::unexpected_message()),
32    }
33
34    Ok(responses)
35}
36
37pin_project! {
38    /// A stream of `COPY ... TO STDOUT` query data.
39    #[project(!Unpin)]
40    pub struct CopyOutStream {
41        responses: Responses,
42    }
43}
44
45impl Stream for CopyOutStream {
46    type Item = Result<Bytes, Error>;
47
48    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49        let this = self.project();
50
51        match ready!(this.responses.poll_next(cx)?) {
52            Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
53            Message::CopyDone => Poll::Ready(None),
54            _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
55        }
56    }
57}