tokio_postgres/
simple_query.rs

1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::query::extract_row_affected;
5use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
6use bytes::Bytes;
7use fallible_iterator::FallibleIterator;
8use futures_util::Stream;
9use log::debug;
10use pin_project_lite::pin_project;
11use postgres_protocol::message::backend::Message;
12use postgres_protocol::message::frontend;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{ready, Context, Poll};
16
17/// Information about a column of a single query row.
18#[derive(Debug)]
19pub struct SimpleColumn {
20    name: String,
21}
22
23impl SimpleColumn {
24    pub(crate) fn new(name: String) -> SimpleColumn {
25        SimpleColumn { name }
26    }
27
28    /// Returns the name of the column.
29    pub fn name(&self) -> &str {
30        &self.name
31    }
32}
33
34pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
35    debug!("executing simple query: {query}");
36
37    let buf = encode(client, query)?;
38    let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
39
40    Ok(SimpleQueryStream {
41        responses,
42        columns: None,
43    })
44}
45
46pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
47    debug!("executing statement batch: {query}");
48
49    let buf = encode(client, query)?;
50    let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
51
52    loop {
53        match responses.next().await? {
54            Message::ReadyForQuery(_) => return Ok(()),
55            Message::CommandComplete(_)
56            | Message::EmptyQueryResponse
57            | Message::RowDescription(_)
58            | Message::DataRow(_) => {}
59            _ => return Err(Error::unexpected_message()),
60        }
61    }
62}
63
64fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
65    client.with_buf(|buf| {
66        frontend::query(query, buf).map_err(Error::encode)?;
67        Ok(buf.split().freeze())
68    })
69}
70
71pin_project! {
72    /// A stream of simple query results.
73    #[project(!Unpin)]
74    pub struct SimpleQueryStream {
75        responses: Responses,
76        columns: Option<Arc<[SimpleColumn]>>,
77    }
78}
79
80impl Stream for SimpleQueryStream {
81    type Item = Result<SimpleQueryMessage, Error>;
82
83    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84        let this = self.project();
85        match ready!(this.responses.poll_next(cx)?) {
86            Message::CommandComplete(body) => {
87                let rows = extract_row_affected(&body)?;
88                Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))))
89            }
90            Message::EmptyQueryResponse => {
91                Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))))
92            }
93            Message::RowDescription(body) => {
94                let columns: Arc<[SimpleColumn]> = body
95                    .fields()
96                    .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
97                    .collect::<Vec<_>>()
98                    .map_err(Error::parse)?
99                    .into();
100
101                *this.columns = Some(columns.clone());
102                Poll::Ready(Some(Ok(SimpleQueryMessage::RowDescription(columns))))
103            }
104            Message::DataRow(body) => {
105                let row = match &this.columns {
106                    Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
107                    None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
108                };
109                Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))))
110            }
111            Message::ReadyForQuery(_) => Poll::Ready(None),
112            _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
113        }
114    }
115}