xitca_postgres/driver/codec/
response.rs1use std::sync::Arc;
2
3use fallible_iterator::FallibleIterator;
4use postgres_protocol::message::backend;
5
6use crate::{
7 BoxedFuture,
8 column::Column,
9 error::Error,
10 prepare::Prepare,
11 query::{RowSimpleStream, RowStream, RowStreamGuarded, RowStreamOwned},
12 statement::Statement,
13};
14
15use super::{Response, sealed};
16
17pub trait IntoResponse: sealed::Sealed + Sized {
19 type Response;
20
21 fn into_response(self, res: Response) -> Self::Response;
22}
23
24impl sealed::Sealed for &[Column] {}
25
26impl<'c> IntoResponse for &'c [Column] {
27 type Response = RowStream<'c>;
28
29 #[inline]
30 fn into_response(self, res: Response) -> Self::Response {
31 RowStream::new(res, self)
32 }
33}
34
35impl sealed::Sealed for Arc<[Column]> {}
36
37impl IntoResponse for Arc<[Column]> {
38 type Response = RowStreamOwned;
39
40 #[inline]
41 fn into_response(self, res: Response) -> Self::Response {
42 RowStreamOwned::new(res, self)
43 }
44}
45
46impl sealed::Sealed for Vec<Column> {}
47
48impl IntoResponse for Vec<Column> {
49 type Response = RowSimpleStream;
50
51 #[inline]
52 fn into_response(self, res: Response) -> Self::Response {
53 RowSimpleStream::new(res, self)
54 }
55}
56
57pub struct IntoRowStreamGuard<'a, C>(pub &'a C);
58
59impl<C> sealed::Sealed for IntoRowStreamGuard<'_, C> {}
60
61impl<'c, C> IntoResponse for IntoRowStreamGuard<'c, C>
62where
63 C: Prepare,
64{
65 type Response = RowStreamGuarded<'c, C>;
66
67 #[inline]
68 fn into_response(self, res: Response) -> Self::Response {
69 RowStreamGuarded::new(res, self.0)
70 }
71}
72
73pub struct NoOpIntoRowStream;
76
77impl sealed::Sealed for NoOpIntoRowStream {}
78
79impl IntoResponse for NoOpIntoRowStream {
80 type Response = RowStream<'static>;
81
82 fn into_response(self, _: Response) -> Self::Response {
83 unreachable!("no row stream can be generated from no op row stream constructor")
84 }
85}
86
87pub struct StatementCreateResponse<'a, C> {
88 pub(super) name: String,
89 pub(super) cli: &'a C,
90}
91
92impl<C> sealed::Sealed for StatementCreateResponse<'_, C> {}
93
94impl<'s, C> IntoResponse for StatementCreateResponse<'s, C>
95where
96 C: Prepare,
97{
98 type Response = BoxedFuture<'s, Result<Statement, Error>>;
99
100 fn into_response(self, mut res: Response) -> Self::Response {
101 Box::pin(async move {
102 let Self { name, cli } = self;
103
104 match res.recv().await? {
105 backend::Message::ParseComplete => {}
106 _ => return Err(Error::unexpected()),
107 }
108
109 let parameter_description = match res.recv().await? {
110 backend::Message::ParameterDescription(body) => body,
111 _ => return Err(Error::unexpected()),
112 };
113
114 let row_description = match res.recv().await? {
115 backend::Message::RowDescription(body) => Some(body),
116 backend::Message::NoData => None,
117 _ => return Err(Error::unexpected()),
118 };
119
120 let mut it = parameter_description.parameters();
121 let mut params = Vec::with_capacity(it.size_hint().0);
122
123 while let Some(oid) = it.next()? {
124 let ty = cli._get_type(oid).await?;
125 params.push(ty);
126 }
127
128 let mut columns = Vec::new();
129 if let Some(row_description) = row_description {
130 let mut it = row_description.fields();
131 columns.reserve(it.size_hint().0);
132 while let Some(field) = it.next()? {
133 let type_ = cli._get_type(field.type_oid()).await?;
134 let column = Column::new(field.name(), type_);
135 columns.push(column);
136 }
137 }
138
139 Ok(Statement::new(name, params, columns))
140 })
141 }
142}
143
144pub struct StatementCreateResponseBlocking<'a, C> {
145 pub(super) name: String,
146 pub(super) cli: &'a C,
147}
148
149impl<C> sealed::Sealed for StatementCreateResponseBlocking<'_, C> {}
150
151impl<C> IntoResponse for StatementCreateResponseBlocking<'_, C>
152where
153 C: Prepare,
154{
155 type Response = Result<Statement, Error>;
156
157 fn into_response(self, mut res: Response) -> Self::Response {
158 let Self { name, cli } = self;
159
160 match res.blocking_recv()? {
161 backend::Message::ParseComplete => {}
162 _ => return Err(Error::unexpected()),
163 }
164
165 let parameter_description = match res.blocking_recv()? {
166 backend::Message::ParameterDescription(body) => body,
167 _ => return Err(Error::unexpected()),
168 };
169
170 let row_description = match res.blocking_recv()? {
171 backend::Message::RowDescription(body) => Some(body),
172 backend::Message::NoData => None,
173 _ => return Err(Error::unexpected()),
174 };
175
176 let mut it = parameter_description.parameters();
177 let mut params = Vec::with_capacity(it.size_hint().0);
178
179 while let Some(oid) = it.next()? {
180 let ty = cli._get_type_blocking(oid)?;
181 params.push(ty);
182 }
183
184 let mut columns = Vec::new();
185 if let Some(row_description) = row_description {
186 let mut it = row_description.fields();
187 columns.reserve(it.size_hint().0);
188 while let Some(field) = it.next()? {
189 let type_ = cli._get_type_blocking(field.type_oid())?;
190 let column = Column::new(field.name(), type_);
191 columns.push(column);
192 }
193 }
194
195 Ok(Statement::new(name, params, columns))
196 }
197}