xitca_postgres/driver/codec/
response.rs

1use std::sync::Arc;
2
3use fallible_iterator::FallibleIterator;
4use postgres_protocol::message::backend;
5
6use crate::{
7    BoxedFuture,
8    column::Column,
9    error::Error,
10    prepare::Prepare,
11    query::{RowSimpleStream, RowStream, RowStreamGuarded, RowStreamOwned},
12    statement::Statement,
13};
14
15use super::{Response, sealed};
16
17/// trait for generic over how to construct an async stream rows
18pub trait IntoResponse: sealed::Sealed + Sized {
19    type Response;
20
21    fn into_response(self, res: Response) -> Self::Response;
22}
23
24impl sealed::Sealed for &[Column] {}
25
26impl<'c> IntoResponse for &'c [Column] {
27    type Response = RowStream<'c>;
28
29    #[inline]
30    fn into_response(self, res: Response) -> Self::Response {
31        RowStream::new(res, self)
32    }
33}
34
35impl sealed::Sealed for Arc<[Column]> {}
36
37impl IntoResponse for Arc<[Column]> {
38    type Response = RowStreamOwned;
39
40    #[inline]
41    fn into_response(self, res: Response) -> Self::Response {
42        RowStreamOwned::new(res, self)
43    }
44}
45
46impl sealed::Sealed for Vec<Column> {}
47
48impl IntoResponse for Vec<Column> {
49    type Response = RowSimpleStream;
50
51    #[inline]
52    fn into_response(self, res: Response) -> Self::Response {
53        RowSimpleStream::new(res, self)
54    }
55}
56
57pub struct IntoRowStreamGuard<'a, C>(pub &'a C);
58
59impl<C> sealed::Sealed for IntoRowStreamGuard<'_, C> {}
60
61impl<'c, C> IntoResponse for IntoRowStreamGuard<'c, C>
62where
63    C: Prepare,
64{
65    type Response = RowStreamGuarded<'c, C>;
66
67    #[inline]
68    fn into_response(self, res: Response) -> Self::Response {
69        RowStreamGuarded::new(res, self.0)
70    }
71}
72
73/// type for case where no row stream can be created.
74/// the api caller should never call into_stream method from this type.
75pub struct NoOpIntoRowStream;
76
77impl sealed::Sealed for NoOpIntoRowStream {}
78
79impl IntoResponse for NoOpIntoRowStream {
80    type Response = RowStream<'static>;
81
82    fn into_response(self, _: Response) -> Self::Response {
83        unreachable!("no row stream can be generated from no op row stream constructor")
84    }
85}
86
87pub struct StatementCreateResponse<'a, C> {
88    pub(super) name: String,
89    pub(super) cli: &'a C,
90}
91
92impl<C> sealed::Sealed for StatementCreateResponse<'_, C> {}
93
94impl<'s, C> IntoResponse for StatementCreateResponse<'s, C>
95where
96    C: Prepare,
97{
98    type Response = BoxedFuture<'s, Result<Statement, Error>>;
99
100    fn into_response(self, mut res: Response) -> Self::Response {
101        Box::pin(async move {
102            let Self { name, cli } = self;
103
104            match res.recv().await? {
105                backend::Message::ParseComplete => {}
106                _ => return Err(Error::unexpected()),
107            }
108
109            let parameter_description = match res.recv().await? {
110                backend::Message::ParameterDescription(body) => body,
111                _ => return Err(Error::unexpected()),
112            };
113
114            let row_description = match res.recv().await? {
115                backend::Message::RowDescription(body) => Some(body),
116                backend::Message::NoData => None,
117                _ => return Err(Error::unexpected()),
118            };
119
120            let mut it = parameter_description.parameters();
121            let mut params = Vec::with_capacity(it.size_hint().0);
122
123            while let Some(oid) = it.next()? {
124                let ty = cli._get_type(oid).await?;
125                params.push(ty);
126            }
127
128            let mut columns = Vec::new();
129            if let Some(row_description) = row_description {
130                let mut it = row_description.fields();
131                columns.reserve(it.size_hint().0);
132                while let Some(field) = it.next()? {
133                    let type_ = cli._get_type(field.type_oid()).await?;
134                    let column = Column::new(field.name(), type_);
135                    columns.push(column);
136                }
137            }
138
139            Ok(Statement::new(name, params, columns))
140        })
141    }
142}
143
144pub struct StatementCreateResponseBlocking<'a, C> {
145    pub(super) name: String,
146    pub(super) cli: &'a C,
147}
148
149impl<C> sealed::Sealed for StatementCreateResponseBlocking<'_, C> {}
150
151impl<C> IntoResponse for StatementCreateResponseBlocking<'_, C>
152where
153    C: Prepare,
154{
155    type Response = Result<Statement, Error>;
156
157    fn into_response(self, mut res: Response) -> Self::Response {
158        let Self { name, cli } = self;
159
160        match res.blocking_recv()? {
161            backend::Message::ParseComplete => {}
162            _ => return Err(Error::unexpected()),
163        }
164
165        let parameter_description = match res.blocking_recv()? {
166            backend::Message::ParameterDescription(body) => body,
167            _ => return Err(Error::unexpected()),
168        };
169
170        let row_description = match res.blocking_recv()? {
171            backend::Message::RowDescription(body) => Some(body),
172            backend::Message::NoData => None,
173            _ => return Err(Error::unexpected()),
174        };
175
176        let mut it = parameter_description.parameters();
177        let mut params = Vec::with_capacity(it.size_hint().0);
178
179        while let Some(oid) = it.next()? {
180            let ty = cli._get_type_blocking(oid)?;
181            params.push(ty);
182        }
183
184        let mut columns = Vec::new();
185        if let Some(row_description) = row_description {
186            let mut it = row_description.fields();
187            columns.reserve(it.size_hint().0);
188            while let Some(field) = it.next()? {
189                let type_ = cli._get_type_blocking(field.type_oid())?;
190                let column = Column::new(field.name(), type_);
191                columns.push(column);
192            }
193        }
194
195        Ok(Statement::new(name, params, columns))
196    }
197}