madsim_tokio_postgres/
simple_query.rs

1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
5use bytes::Bytes;
6use fallible_iterator::FallibleIterator;
7use futures::{ready, Stream};
8use log::debug;
9use pin_project_lite::pin_project;
10use postgres_protocol::message::backend::Message;
11use postgres_protocol::message::frontend;
12use std::marker::PhantomPinned;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16
17/// Information about a column of a single query row.
18pub struct SimpleColumn {
19    name: String,
20}
21
22impl SimpleColumn {
23    pub(crate) fn new(name: String) -> SimpleColumn {
24        SimpleColumn { name }
25    }
26
27    /// Returns the name of the column.
28    pub fn name(&self) -> &str {
29        &self.name
30    }
31}
32
33pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
34    debug!("executing simple query: {}", query);
35
36    let buf = encode(client, query)?;
37    let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
38
39    Ok(SimpleQueryStream {
40        responses,
41        columns: None,
42        _p: PhantomPinned,
43    })
44}
45
46pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
47    debug!("executing statement batch: {}", query);
48
49    let buf = encode(client, query)?;
50    let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
51
52    loop {
53        match responses.next().await? {
54            Message::ReadyForQuery(_) => return Ok(()),
55            Message::CommandComplete(_)
56            | Message::EmptyQueryResponse
57            | Message::RowDescription(_)
58            | Message::DataRow(_) => {}
59            _ => return Err(Error::unexpected_message()),
60        }
61    }
62}
63
64fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
65    client.with_buf(|buf| {
66        frontend::query(query, buf).map_err(Error::encode)?;
67        Ok(buf.split().freeze())
68    })
69}
70
71pin_project! {
72    /// A stream of simple query results.
73    pub struct SimpleQueryStream {
74        responses: Responses,
75        columns: Option<Arc<[SimpleColumn]>>,
76        #[pin]
77        _p: PhantomPinned,
78    }
79}
80
81impl Stream for SimpleQueryStream {
82    type Item = Result<SimpleQueryMessage, Error>;
83
84    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85        let this = self.project();
86        loop {
87            match ready!(this.responses.poll_next(cx)?) {
88                Message::CommandComplete(body) => {
89                    let rows = body
90                        .tag()
91                        .map_err(Error::parse)?
92                        .rsplit(' ')
93                        .next()
94                        .unwrap()
95                        .parse()
96                        .unwrap_or(0);
97                    return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
98                }
99                Message::EmptyQueryResponse => {
100                    return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
101                }
102                Message::RowDescription(body) => {
103                    let columns = body
104                        .fields()
105                        .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
106                        .collect::<Vec<_>>()
107                        .map_err(Error::parse)?
108                        .into();
109
110                    *this.columns = Some(columns);
111                }
112                Message::DataRow(body) => {
113                    let row = match &this.columns {
114                        Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
115                        None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
116                    };
117                    return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
118                }
119                Message::ReadyForQuery(_) => return Poll::Ready(None),
120                _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
121            }
122        }
123    }
124}