xitca_postgres/driver/codec/
response.rs1use fallible_iterator::FallibleIterator;
2use postgres_protocol::message::backend;
3
4use crate::{
5 column::Column,
6 error::Error,
7 pipeline::PipelineStream,
8 prepare::Prepare,
9 query::{RowSimpleStream, RowStream, RowStreamGuarded},
10 statement::Statement,
11 BoxedFuture,
12};
13
14use super::{sealed, Response};
15
16pub trait IntoResponse: sealed::Sealed + Sized {
18 type Response;
19
20 fn into_response(self, res: Response) -> Self::Response;
21}
22
23impl sealed::Sealed for &[Column] {}
24
25impl<'c> IntoResponse for &'c [Column] {
26 type Response = RowStream<'c>;
27
28 #[inline]
29 fn into_response(self, res: Response) -> Self::Response {
30 RowStream::new(res, self)
31 }
32}
33
34impl sealed::Sealed for Vec<Column> {}
35
36impl IntoResponse for Vec<Column> {
37 type Response = RowSimpleStream;
38
39 #[inline]
40 fn into_response(self, res: Response) -> Self::Response {
41 RowSimpleStream::new(res, self)
42 }
43}
44
45impl sealed::Sealed for Vec<&[Column]> {}
46
47impl<'c> IntoResponse for Vec<&'c [Column]> {
48 type Response = PipelineStream<'c>;
49
50 #[inline]
51 fn into_response(self, res: Response) -> Self::Response {
52 PipelineStream::new(res, self)
53 }
54}
55
56pub struct IntoRowStreamGuard<'a, C>(pub &'a C);
57
58impl<C> sealed::Sealed for IntoRowStreamGuard<'_, C> {}
59
60impl<'c, C> IntoResponse for IntoRowStreamGuard<'c, C>
61where
62 C: Prepare,
63{
64 type Response = RowStreamGuarded<'c, C>;
65
66 #[inline]
67 fn into_response(self, res: Response) -> Self::Response {
68 RowStreamGuarded::new(res, self.0)
69 }
70}
71
72pub struct NoOpIntoRowStream;
75
76impl sealed::Sealed for NoOpIntoRowStream {}
77
78impl IntoResponse for NoOpIntoRowStream {
79 type Response = RowStream<'static>;
80
81 fn into_response(self, _: Response) -> Self::Response {
82 unreachable!("no row stream can be generated from no op row stream constructor")
83 }
84}
85
86pub struct StatementCreateResponse<'a, C> {
87 pub(super) name: String,
88 pub(super) cli: &'a C,
89}
90
91impl<C> sealed::Sealed for StatementCreateResponse<'_, C> {}
92
93impl<'s, C> IntoResponse for StatementCreateResponse<'s, C>
94where
95 C: Prepare,
96{
97 type Response = BoxedFuture<'s, Result<Statement, Error>>;
98
99 fn into_response(self, mut res: Response) -> Self::Response {
100 Box::pin(async move {
101 let Self { name, cli } = self;
102
103 match res.recv().await? {
104 backend::Message::ParseComplete => {}
105 _ => return Err(Error::unexpected()),
106 }
107
108 let parameter_description = match res.recv().await? {
109 backend::Message::ParameterDescription(body) => body,
110 _ => return Err(Error::unexpected()),
111 };
112
113 let row_description = match res.recv().await? {
114 backend::Message::RowDescription(body) => Some(body),
115 backend::Message::NoData => None,
116 _ => return Err(Error::unexpected()),
117 };
118
119 let mut params = Vec::new();
120 let mut it = parameter_description.parameters();
121 while let Some(oid) = it.next()? {
122 let ty = cli._get_type(oid).await?;
123 params.push(ty);
124 }
125
126 let mut columns = Vec::new();
127 if let Some(row_description) = row_description {
128 let mut it = row_description.fields();
129 while let Some(field) = it.next()? {
130 let type_ = cli._get_type(field.type_oid()).await?;
131 let column = Column::new(field.name(), type_);
132 columns.push(column);
133 }
134 }
135
136 Ok(Statement::new(name, params, columns))
137 })
138 }
139}
140
141pub struct StatementCreateResponseBlocking<'a, C> {
142 pub(super) name: String,
143 pub(super) cli: &'a C,
144}
145
146impl<C> sealed::Sealed for StatementCreateResponseBlocking<'_, C> {}
147
148impl<C> IntoResponse for StatementCreateResponseBlocking<'_, C>
149where
150 C: Prepare,
151{
152 type Response = Result<Statement, Error>;
153
154 fn into_response(self, mut res: Response) -> Self::Response {
155 let Self { name, cli } = self;
156
157 match res.blocking_recv()? {
158 backend::Message::ParseComplete => {}
159 _ => return Err(Error::unexpected()),
160 }
161
162 let parameter_description = match res.blocking_recv()? {
163 backend::Message::ParameterDescription(body) => body,
164 _ => return Err(Error::unexpected()),
165 };
166
167 let row_description = match res.blocking_recv()? {
168 backend::Message::RowDescription(body) => Some(body),
169 backend::Message::NoData => None,
170 _ => return Err(Error::unexpected()),
171 };
172
173 let mut params = Vec::new();
174 let mut it = parameter_description.parameters();
175 while let Some(oid) = it.next()? {
176 let ty = cli._get_type_blocking(oid)?;
177 params.push(ty);
178 }
179
180 let mut columns = Vec::new();
181 if let Some(row_description) = row_description {
182 let mut it = row_description.fields();
183 while let Some(field) = it.next()? {
184 let type_ = cli._get_type_blocking(field.type_oid())?;
185 let column = Column::new(field.name(), type_);
186 columns.push(column);
187 }
188 }
189
190 Ok(Statement::new(name, params, columns))
191 }
192}