mysql_connector/connection/
result_set.rs1use {
2 super::{
3 types::{Column, Protocol},
4 Connection, MAX_PAYLOAD_LEN,
5 },
6 crate::{
7 bitflags::CapabilityFlags,
8 model::{FromQueryResult, FromQueryResultMapping},
9 packets::{ErrPacket, OkPacket},
10 Deserialize, Error, ParseBuf,
11 },
12 std::marker::PhantomData,
13};
14
15pub struct ResultSet<'a, P, R>
16where
17 P: Protocol,
18 R: FromQueryResult,
19{
20 __phantom_data: PhantomData<P>,
21 columns: Vec<Column>,
22 mapping: R::Mapping,
23 ok_packet: Option<OkPacket>,
24 conn: &'a mut Connection,
25}
26
27impl<'a, P, R> ResultSet<'a, P, R>
28where
29 P: Protocol,
30 R: FromQueryResult,
31{
32 pub(super) async fn read(conn: &'a mut Connection) -> Result<Self, Error> {
33 let packet = conn.read_packet().await?;
34 match packet.first() {
35 Some(0x00) => {
36 let mut res = ResultSet::new(Vec::new(), conn);
37 res.ok_packet = Some(res.conn.decode_response(&packet).await??);
38 Ok(res)
39 }
40 Some(0xFB) => unimplemented!("local infile"),
41 Some(0xFF) => Err(ErrPacket::deserialize(
42 &mut ParseBuf(&packet),
43 conn.data().capabilities(),
44 )?
45 .into()),
46 _ => {
47 conn.pending_result = true;
48 let columns_len = ParseBuf(&packet).checked_eat_lenenc_int()?;
49 let columns = conn.read_column_defs(columns_len as usize).await?;
50 Ok(ResultSet::new(columns, conn))
51 }
52 }
53 }
54
55 fn new(columns: Vec<Column>, conn: &'a mut Connection) -> Self {
56 let mapping = R::Mapping::from_columns(&columns);
57 Self {
58 __phantom_data: PhantomData,
59 columns,
60 mapping,
61 ok_packet: None,
62 conn,
63 }
64 }
65
66 pub async fn next(&mut self) -> Result<Option<R>, Error> {
67 if self.ok_packet.is_some() {
68 return Ok(None);
69 }
70 let packet = self.conn.read_packet().await?;
71 let is_last_result_set_packet = if self
72 .conn
73 .data
74 .capabilities
75 .contains(CapabilityFlags::DEPRECATE_EOF)
76 {
77 packet[0] == 0xFE && packet.len() < MAX_PAYLOAD_LEN
78 } else {
79 packet[0] == 0xFE && packet.len() < 8
80 };
81 if is_last_result_set_packet {
82 self.ok_packet = Some(OkPacket::read_eof(packet, self.conn.data.capabilities)?);
83 self.conn.pending_result = false;
84 Ok(None)
85 } else {
86 let mut row = P::read_result_set_row(&packet, &self.columns)?;
87 Ok(Some(R::from_mapping_and_row(&self.mapping, &mut row)?))
88 }
89 }
90
91 pub async fn collect(&mut self) -> Result<Vec<R>, Error> {
92 let mut rows = Vec::new();
93 while let Some(row) = self.next().await? {
94 rows.push(row);
95 }
96 Ok(rows)
97 }
98
99 pub async fn one(&mut self) -> Result<Option<R>, Error> {
100 let res = self.next().await;
101 while self.next().await?.is_some() {}
102 res
103 }
104
105 pub fn columns(&self) -> &[Column] {
106 &self.columns
107 }
108
109 pub fn into_columns(self) -> Vec<Column> {
110 self.columns
111 }
112
113 pub fn mapping(&self) -> &R::Mapping {
114 &self.mapping
115 }
116
117 pub fn into_mapping(self) -> R::Mapping {
118 self.mapping
119 }
120
121 pub fn into_inner(self) -> (Vec<Column>, R::Mapping) {
122 (self.columns, self.mapping)
123 }
124
125 pub async fn finish(mut self) -> Result<OkPacket, Error> {
126 match self.ok_packet {
127 Some(x) => Ok(x),
128 None => {
129 while self.next().await?.is_some() {}
130 Ok(self.ok_packet.unwrap())
132 }
133 }
134 }
135
136 pub async fn finish_into_inner(mut self) -> Result<(OkPacket, Vec<Column>, R::Mapping), Error> {
137 match self.ok_packet {
138 Some(x) => Ok((x, self.columns, self.mapping)),
139 None => {
140 while self.next().await?.is_some() {}
141 Ok((self.ok_packet.unwrap(), self.columns, self.mapping))
143 }
144 }
145 }
146}
147
148impl Connection {
149 pub async fn cleanup(&mut self) -> Result<Option<OkPacket>, Error> {
150 if self.pending_result {
151 loop {
152 let packet = self.read_packet().await?;
153 let is_last_result_set_packet = if self
154 .data
155 .capabilities
156 .contains(CapabilityFlags::DEPRECATE_EOF)
157 {
158 packet[0] == 0xFE && packet.len() < MAX_PAYLOAD_LEN
159 } else {
160 packet[0] == 0xFE && packet.len() < 8
161 };
162 if is_last_result_set_packet {
163 self.pending_result = false;
164 return Ok(Some(OkPacket::read_eof(packet, self.data.capabilities)?));
165 }
166 }
167 } else {
168 Ok(None)
169 }
170 }
171}