xitca_postgres/driver/codec/
response.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
use fallible_iterator::FallibleIterator;
use postgres_protocol::message::backend;

use crate::{
    column::Column,
    error::Error,
    pipeline::PipelineStream,
    prepare::Prepare,
    query::{RowSimpleStream, RowStream, RowStreamGuarded},
    statement::Statement,
    BoxedFuture,
};

use super::{sealed, Response};

/// trait for generic over how to construct an async stream rows
pub trait IntoResponse: sealed::Sealed + Sized {
    type Response;

    fn into_response(self, res: Response) -> Self::Response;
}

impl sealed::Sealed for &[Column] {}

impl<'c> IntoResponse for &'c [Column] {
    type Response = RowStream<'c>;

    #[inline]
    fn into_response(self, res: Response) -> Self::Response {
        RowStream::new(res, self)
    }
}

impl sealed::Sealed for Vec<Column> {}

impl IntoResponse for Vec<Column> {
    type Response = RowSimpleStream;

    #[inline]
    fn into_response(self, res: Response) -> Self::Response {
        RowSimpleStream::new(res, self)
    }
}

impl sealed::Sealed for Vec<&[Column]> {}

impl<'c> IntoResponse for Vec<&'c [Column]> {
    type Response = PipelineStream<'c>;

    #[inline]
    fn into_response(self, res: Response) -> Self::Response {
        PipelineStream::new(res, self)
    }
}

pub struct IntoRowStreamGuard<'a, C>(pub &'a C);

impl<C> sealed::Sealed for IntoRowStreamGuard<'_, C> {}

impl<'c, C> IntoResponse for IntoRowStreamGuard<'c, C>
where
    C: Prepare,
{
    type Response = RowStreamGuarded<'c, C>;

    #[inline]
    fn into_response(self, res: Response) -> Self::Response {
        RowStreamGuarded::new(res, self.0)
    }
}

/// type for case where no row stream can be created.
/// the api caller should never call into_stream method from this type.
pub struct NoOpIntoRowStream;

impl sealed::Sealed for NoOpIntoRowStream {}

impl IntoResponse for NoOpIntoRowStream {
    type Response = RowStream<'static>;

    fn into_response(self, _: Response) -> Self::Response {
        unreachable!("no row stream can be generated from no op row stream constructor")
    }
}

pub struct StatementCreateResponse<'a, C> {
    pub(super) name: String,
    pub(super) cli: &'a C,
}

impl<C> sealed::Sealed for StatementCreateResponse<'_, C> {}

impl<'s, C> IntoResponse for StatementCreateResponse<'s, C>
where
    C: Prepare,
{
    type Response = BoxedFuture<'s, Result<Statement, Error>>;

    fn into_response(self, mut res: Response) -> Self::Response {
        Box::pin(async move {
            let Self { name, cli } = self;

            match res.recv().await? {
                backend::Message::ParseComplete => {}
                _ => return Err(Error::unexpected()),
            }

            let parameter_description = match res.recv().await? {
                backend::Message::ParameterDescription(body) => body,
                _ => return Err(Error::unexpected()),
            };

            let row_description = match res.recv().await? {
                backend::Message::RowDescription(body) => Some(body),
                backend::Message::NoData => None,
                _ => return Err(Error::unexpected()),
            };

            let mut params = Vec::new();
            let mut it = parameter_description.parameters();
            while let Some(oid) = it.next()? {
                let ty = cli._get_type(oid).await?;
                params.push(ty);
            }

            let mut columns = Vec::new();
            if let Some(row_description) = row_description {
                let mut it = row_description.fields();
                while let Some(field) = it.next()? {
                    let type_ = cli._get_type(field.type_oid()).await?;
                    let column = Column::new(field.name(), type_);
                    columns.push(column);
                }
            }

            Ok(Statement::new(name, params, columns))
        })
    }
}

pub struct StatementCreateResponseBlocking<'a, C> {
    pub(super) name: String,
    pub(super) cli: &'a C,
}

impl<C> sealed::Sealed for StatementCreateResponseBlocking<'_, C> {}

impl<C> IntoResponse for StatementCreateResponseBlocking<'_, C>
where
    C: Prepare,
{
    type Response = Result<Statement, Error>;

    fn into_response(self, mut res: Response) -> Self::Response {
        let Self { name, cli } = self;

        match res.blocking_recv()? {
            backend::Message::ParseComplete => {}
            _ => return Err(Error::unexpected()),
        }

        let parameter_description = match res.blocking_recv()? {
            backend::Message::ParameterDescription(body) => body,
            _ => return Err(Error::unexpected()),
        };

        let row_description = match res.blocking_recv()? {
            backend::Message::RowDescription(body) => Some(body),
            backend::Message::NoData => None,
            _ => return Err(Error::unexpected()),
        };

        let mut params = Vec::new();
        let mut it = parameter_description.parameters();
        while let Some(oid) = it.next()? {
            let ty = cli._get_type_blocking(oid)?;
            params.push(ty);
        }

        let mut columns = Vec::new();
        if let Some(row_description) = row_description {
            let mut it = row_description.fields();
            while let Some(field) = it.next()? {
                let type_ = cli._get_type_blocking(field.type_oid())?;
                let column = Column::new(field.name(), type_);
                columns.push(column);
            }
        }

        Ok(Statement::new(name, params, columns))
    }
}