xitca_postgres/driver/codec/
response.rs

1use fallible_iterator::FallibleIterator;
2use postgres_protocol::message::backend;
3
4use crate::{
5    column::Column,
6    error::Error,
7    pipeline::PipelineStream,
8    prepare::Prepare,
9    query::{RowSimpleStream, RowStream, RowStreamGuarded},
10    statement::Statement,
11    BoxedFuture,
12};
13
14use super::{sealed, Response};
15
16/// trait for generic over how to construct an async stream rows
17pub trait IntoResponse: sealed::Sealed + Sized {
18    type Response;
19
20    fn into_response(self, res: Response) -> Self::Response;
21}
22
23impl sealed::Sealed for &[Column] {}
24
25impl<'c> IntoResponse for &'c [Column] {
26    type Response = RowStream<'c>;
27
28    #[inline]
29    fn into_response(self, res: Response) -> Self::Response {
30        RowStream::new(res, self)
31    }
32}
33
34impl sealed::Sealed for Vec<Column> {}
35
36impl IntoResponse for Vec<Column> {
37    type Response = RowSimpleStream;
38
39    #[inline]
40    fn into_response(self, res: Response) -> Self::Response {
41        RowSimpleStream::new(res, self)
42    }
43}
44
45impl sealed::Sealed for Vec<&[Column]> {}
46
47impl<'c> IntoResponse for Vec<&'c [Column]> {
48    type Response = PipelineStream<'c>;
49
50    #[inline]
51    fn into_response(self, res: Response) -> Self::Response {
52        PipelineStream::new(res, self)
53    }
54}
55
56pub struct IntoRowStreamGuard<'a, C>(pub &'a C);
57
58impl<C> sealed::Sealed for IntoRowStreamGuard<'_, C> {}
59
60impl<'c, C> IntoResponse for IntoRowStreamGuard<'c, C>
61where
62    C: Prepare,
63{
64    type Response = RowStreamGuarded<'c, C>;
65
66    #[inline]
67    fn into_response(self, res: Response) -> Self::Response {
68        RowStreamGuarded::new(res, self.0)
69    }
70}
71
72/// type for case where no row stream can be created.
73/// the api caller should never call into_stream method from this type.
74pub struct NoOpIntoRowStream;
75
76impl sealed::Sealed for NoOpIntoRowStream {}
77
78impl IntoResponse for NoOpIntoRowStream {
79    type Response = RowStream<'static>;
80
81    fn into_response(self, _: Response) -> Self::Response {
82        unreachable!("no row stream can be generated from no op row stream constructor")
83    }
84}
85
86pub struct StatementCreateResponse<'a, C> {
87    pub(super) name: String,
88    pub(super) cli: &'a C,
89}
90
91impl<C> sealed::Sealed for StatementCreateResponse<'_, C> {}
92
93impl<'s, C> IntoResponse for StatementCreateResponse<'s, C>
94where
95    C: Prepare,
96{
97    type Response = BoxedFuture<'s, Result<Statement, Error>>;
98
99    fn into_response(self, mut res: Response) -> Self::Response {
100        Box::pin(async move {
101            let Self { name, cli } = self;
102
103            match res.recv().await? {
104                backend::Message::ParseComplete => {}
105                _ => return Err(Error::unexpected()),
106            }
107
108            let parameter_description = match res.recv().await? {
109                backend::Message::ParameterDescription(body) => body,
110                _ => return Err(Error::unexpected()),
111            };
112
113            let row_description = match res.recv().await? {
114                backend::Message::RowDescription(body) => Some(body),
115                backend::Message::NoData => None,
116                _ => return Err(Error::unexpected()),
117            };
118
119            let mut params = Vec::new();
120            let mut it = parameter_description.parameters();
121            while let Some(oid) = it.next()? {
122                let ty = cli._get_type(oid).await?;
123                params.push(ty);
124            }
125
126            let mut columns = Vec::new();
127            if let Some(row_description) = row_description {
128                let mut it = row_description.fields();
129                while let Some(field) = it.next()? {
130                    let type_ = cli._get_type(field.type_oid()).await?;
131                    let column = Column::new(field.name(), type_);
132                    columns.push(column);
133                }
134            }
135
136            Ok(Statement::new(name, params, columns))
137        })
138    }
139}
140
141pub struct StatementCreateResponseBlocking<'a, C> {
142    pub(super) name: String,
143    pub(super) cli: &'a C,
144}
145
146impl<C> sealed::Sealed for StatementCreateResponseBlocking<'_, C> {}
147
148impl<C> IntoResponse for StatementCreateResponseBlocking<'_, C>
149where
150    C: Prepare,
151{
152    type Response = Result<Statement, Error>;
153
154    fn into_response(self, mut res: Response) -> Self::Response {
155        let Self { name, cli } = self;
156
157        match res.blocking_recv()? {
158            backend::Message::ParseComplete => {}
159            _ => return Err(Error::unexpected()),
160        }
161
162        let parameter_description = match res.blocking_recv()? {
163            backend::Message::ParameterDescription(body) => body,
164            _ => return Err(Error::unexpected()),
165        };
166
167        let row_description = match res.blocking_recv()? {
168            backend::Message::RowDescription(body) => Some(body),
169            backend::Message::NoData => None,
170            _ => return Err(Error::unexpected()),
171        };
172
173        let mut params = Vec::new();
174        let mut it = parameter_description.parameters();
175        while let Some(oid) = it.next()? {
176            let ty = cli._get_type_blocking(oid)?;
177            params.push(ty);
178        }
179
180        let mut columns = Vec::new();
181        if let Some(row_description) = row_description {
182            let mut it = row_description.fields();
183            while let Some(field) = it.next()? {
184                let type_ = cli._get_type_blocking(field.type_oid())?;
185                let column = Column::new(field.name(), type_);
186                columns.push(column);
187            }
188        }
189
190        Ok(Statement::new(name, params, columns))
191    }
192}