mysql_connector/connection/
result_set.rs

1use {
2    super::{
3        types::{Column, Protocol},
4        Connection, MAX_PAYLOAD_LEN,
5    },
6    crate::{
7        bitflags::CapabilityFlags,
8        model::{FromQueryResult, FromQueryResultMapping},
9        packets::{ErrPacket, OkPacket},
10        Deserialize, Error, ParseBuf,
11    },
12    std::marker::PhantomData,
13};
14
15pub struct ResultSet<'a, P, R>
16where
17    P: Protocol,
18    R: FromQueryResult,
19{
20    __phantom_data: PhantomData<P>,
21    columns: Vec<Column>,
22    mapping: R::Mapping,
23    ok_packet: Option<OkPacket>,
24    conn: &'a mut Connection,
25}
26
27impl<'a, P, R> ResultSet<'a, P, R>
28where
29    P: Protocol,
30    R: FromQueryResult,
31{
32    pub(super) async fn read(conn: &'a mut Connection) -> Result<Self, Error> {
33        let packet = conn.read_packet().await?;
34        match packet.first() {
35            Some(0x00) => {
36                let mut res = ResultSet::new(Vec::new(), conn);
37                res.ok_packet = Some(res.conn.decode_response(&packet).await??);
38                Ok(res)
39            }
40            Some(0xFB) => unimplemented!("local infile"),
41            Some(0xFF) => Err(ErrPacket::deserialize(
42                &mut ParseBuf(&packet),
43                conn.data().capabilities(),
44            )?
45            .into()),
46            _ => {
47                conn.pending_result = true;
48                let columns_len = ParseBuf(&packet).checked_eat_lenenc_int()?;
49                let columns = conn.read_column_defs(columns_len as usize).await?;
50                Ok(ResultSet::new(columns, conn))
51            }
52        }
53    }
54
55    fn new(columns: Vec<Column>, conn: &'a mut Connection) -> Self {
56        let mapping = R::Mapping::from_columns(&columns);
57        Self {
58            __phantom_data: PhantomData,
59            columns,
60            mapping,
61            ok_packet: None,
62            conn,
63        }
64    }
65
66    pub async fn next(&mut self) -> Result<Option<R>, Error> {
67        if self.ok_packet.is_some() {
68            return Ok(None);
69        }
70        let packet = self.conn.read_packet().await?;
71        let is_last_result_set_packet = if self
72            .conn
73            .data
74            .capabilities
75            .contains(CapabilityFlags::DEPRECATE_EOF)
76        {
77            packet[0] == 0xFE && packet.len() < MAX_PAYLOAD_LEN
78        } else {
79            packet[0] == 0xFE && packet.len() < 8
80        };
81        if is_last_result_set_packet {
82            self.ok_packet = Some(OkPacket::read_eof(packet, self.conn.data.capabilities)?);
83            self.conn.pending_result = false;
84            Ok(None)
85        } else {
86            let mut row = P::read_result_set_row(&packet, &self.columns)?;
87            Ok(Some(R::from_mapping_and_row(&self.mapping, &mut row)?))
88        }
89    }
90
91    pub async fn collect(&mut self) -> Result<Vec<R>, Error> {
92        let mut rows = Vec::new();
93        while let Some(row) = self.next().await? {
94            rows.push(row);
95        }
96        Ok(rows)
97    }
98
99    pub async fn one(&mut self) -> Result<Option<R>, Error> {
100        let res = self.next().await;
101        while self.next().await?.is_some() {}
102        res
103    }
104
105    pub fn columns(&self) -> &[Column] {
106        &self.columns
107    }
108
109    pub fn into_columns(self) -> Vec<Column> {
110        self.columns
111    }
112
113    pub fn mapping(&self) -> &R::Mapping {
114        &self.mapping
115    }
116
117    pub fn into_mapping(self) -> R::Mapping {
118        self.mapping
119    }
120
121    pub fn into_inner(self) -> (Vec<Column>, R::Mapping) {
122        (self.columns, self.mapping)
123    }
124
125    pub async fn finish(mut self) -> Result<OkPacket, Error> {
126        match self.ok_packet {
127            Some(x) => Ok(x),
128            None => {
129                while self.next().await?.is_some() {}
130                // Safety: `self.next()` only returns `None` if ok packet was read
131                Ok(self.ok_packet.unwrap())
132            }
133        }
134    }
135
136    pub async fn finish_into_inner(mut self) -> Result<(OkPacket, Vec<Column>, R::Mapping), Error> {
137        match self.ok_packet {
138            Some(x) => Ok((x, self.columns, self.mapping)),
139            None => {
140                while self.next().await?.is_some() {}
141                // Safety: `self.next() only returns `None` if ok packet was read
142                Ok((self.ok_packet.unwrap(), self.columns, self.mapping))
143            }
144        }
145    }
146}
147
148impl Connection {
149    pub async fn cleanup(&mut self) -> Result<Option<OkPacket>, Error> {
150        if self.pending_result {
151            loop {
152                let packet = self.read_packet().await?;
153                let is_last_result_set_packet = if self
154                    .data
155                    .capabilities
156                    .contains(CapabilityFlags::DEPRECATE_EOF)
157                {
158                    packet[0] == 0xFE && packet.len() < MAX_PAYLOAD_LEN
159                } else {
160                    packet[0] == 0xFE && packet.len() < 8
161                };
162                if is_last_result_set_packet {
163                    self.pending_result = false;
164                    return Ok(Some(OkPacket::read_eof(packet, self.data.capabilities)?));
165                }
166            }
167        } else {
168            Ok(None)
169        }
170    }
171}