madsim_tokio_postgres/
simple_query.rs1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
5use bytes::Bytes;
6use fallible_iterator::FallibleIterator;
7use futures::{ready, Stream};
8use log::debug;
9use pin_project_lite::pin_project;
10use postgres_protocol::message::backend::Message;
11use postgres_protocol::message::frontend;
12use std::marker::PhantomPinned;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16
17pub struct SimpleColumn {
19 name: String,
20}
21
22impl SimpleColumn {
23 pub(crate) fn new(name: String) -> SimpleColumn {
24 SimpleColumn { name }
25 }
26
27 pub fn name(&self) -> &str {
29 &self.name
30 }
31}
32
33pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
34 debug!("executing simple query: {}", query);
35
36 let buf = encode(client, query)?;
37 let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
38
39 Ok(SimpleQueryStream {
40 responses,
41 columns: None,
42 _p: PhantomPinned,
43 })
44}
45
46pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
47 debug!("executing statement batch: {}", query);
48
49 let buf = encode(client, query)?;
50 let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
51
52 loop {
53 match responses.next().await? {
54 Message::ReadyForQuery(_) => return Ok(()),
55 Message::CommandComplete(_)
56 | Message::EmptyQueryResponse
57 | Message::RowDescription(_)
58 | Message::DataRow(_) => {}
59 _ => return Err(Error::unexpected_message()),
60 }
61 }
62}
63
64fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
65 client.with_buf(|buf| {
66 frontend::query(query, buf).map_err(Error::encode)?;
67 Ok(buf.split().freeze())
68 })
69}
70
71pin_project! {
72 pub struct SimpleQueryStream {
74 responses: Responses,
75 columns: Option<Arc<[SimpleColumn]>>,
76 #[pin]
77 _p: PhantomPinned,
78 }
79}
80
81impl Stream for SimpleQueryStream {
82 type Item = Result<SimpleQueryMessage, Error>;
83
84 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 let this = self.project();
86 loop {
87 match ready!(this.responses.poll_next(cx)?) {
88 Message::CommandComplete(body) => {
89 let rows = body
90 .tag()
91 .map_err(Error::parse)?
92 .rsplit(' ')
93 .next()
94 .unwrap()
95 .parse()
96 .unwrap_or(0);
97 return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
98 }
99 Message::EmptyQueryResponse => {
100 return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
101 }
102 Message::RowDescription(body) => {
103 let columns = body
104 .fields()
105 .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
106 .collect::<Vec<_>>()
107 .map_err(Error::parse)?
108 .into();
109
110 *this.columns = Some(columns);
111 }
112 Message::DataRow(body) => {
113 let row = match &this.columns {
114 Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
115 None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
116 };
117 return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
118 }
119 Message::ReadyForQuery(_) => return Poll::Ready(None),
120 _ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
121 }
122 }
123 }
124}