tokio_postgres/
simple_query.rs1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::query::extract_row_affected;
5use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
6use bytes::Bytes;
7use fallible_iterator::FallibleIterator;
8use futures_util::Stream;
9use log::debug;
10use pin_project_lite::pin_project;
11use postgres_protocol::message::backend::Message;
12use postgres_protocol::message::frontend;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{ready, Context, Poll};
16
17#[derive(Debug)]
19pub struct SimpleColumn {
20 name: String,
21}
22
23impl SimpleColumn {
24 pub(crate) fn new(name: String) -> SimpleColumn {
25 SimpleColumn { name }
26 }
27
28 pub fn name(&self) -> &str {
30 &self.name
31 }
32}
33
34pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
35 debug!("executing simple query: {query}");
36
37 let buf = encode(client, query)?;
38 let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
39
40 Ok(SimpleQueryStream {
41 responses,
42 columns: None,
43 })
44}
45
46pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
47 debug!("executing statement batch: {query}");
48
49 let buf = encode(client, query)?;
50 let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
51
52 loop {
53 match responses.next().await? {
54 Message::ReadyForQuery(_) => return Ok(()),
55 Message::CommandComplete(_)
56 | Message::EmptyQueryResponse
57 | Message::RowDescription(_)
58 | Message::DataRow(_) => {}
59 _ => return Err(Error::unexpected_message()),
60 }
61 }
62}
63
64fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
65 client.with_buf(|buf| {
66 frontend::query(query, buf).map_err(Error::encode)?;
67 Ok(buf.split().freeze())
68 })
69}
70
71pin_project! {
72 #[project(!Unpin)]
74 pub struct SimpleQueryStream {
75 responses: Responses,
76 columns: Option<Arc<[SimpleColumn]>>,
77 }
78}
79
80impl Stream for SimpleQueryStream {
81 type Item = Result<SimpleQueryMessage, Error>;
82
83 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
84 let this = self.project();
85 match ready!(this.responses.poll_next(cx)?) {
86 Message::CommandComplete(body) => {
87 let rows = extract_row_affected(&body)?;
88 Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))))
89 }
90 Message::EmptyQueryResponse => {
91 Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))))
92 }
93 Message::RowDescription(body) => {
94 let columns: Arc<[SimpleColumn]> = body
95 .fields()
96 .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
97 .collect::<Vec<_>>()
98 .map_err(Error::parse)?
99 .into();
100
101 *this.columns = Some(columns.clone());
102 Poll::Ready(Some(Ok(SimpleQueryMessage::RowDescription(columns))))
103 }
104 Message::DataRow(body) => {
105 let row = match &this.columns {
106 Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
107 None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
108 };
109 Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))))
110 }
111 Message::ReadyForQuery(_) => Poll::Ready(None),
112 _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
113 }
114 }
115}