tokio_postgres/
copy_out.rs1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::{query, slice_iter, Error, Statement};
5use bytes::Bytes;
6use futures_util::Stream;
7use log::debug;
8use pin_project_lite::pin_project;
9use postgres_protocol::message::backend::Message;
10use std::pin::Pin;
11use std::task::{ready, Context, Poll};
12
13pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
14 debug!("executing copy out statement {}", statement.name());
15
16 let buf = query::encode(client, &statement, slice_iter(&[]))?;
17 let responses = start(client, buf).await?;
18 Ok(CopyOutStream { responses })
19}
20
21async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
22 let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
23
24 match responses.next().await? {
25 Message::BindComplete => {}
26 _ => return Err(Error::unexpected_message()),
27 }
28
29 match responses.next().await? {
30 Message::CopyOutResponse(_) => {}
31 _ => return Err(Error::unexpected_message()),
32 }
33
34 Ok(responses)
35}
36
37pin_project! {
38 #[project(!Unpin)]
40 pub struct CopyOutStream {
41 responses: Responses,
42 }
43}
44
45impl Stream for CopyOutStream {
46 type Item = Result<Bytes, Error>;
47
48 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49 let this = self.project();
50
51 match ready!(this.responses.poll_next(cx)?) {
52 Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
53 Message::CopyDone => Poll::Ready(None),
54 _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
55 }
56 }
57}