mysql/conn/
query_result.rs

1// Copyright (c) 2020 rust-mysql-simple contributors
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9pub use mysql_common::proto::{Binary, Text};
10
11use mysql_common::{io::ParseBuf, packets::OkPacket, row::RowDeserializer, value::ServerSide};
12
13use std::{borrow::Cow, marker::PhantomData, sync::Arc};
14
15use crate::{conn::ConnMut, Column, Conn, Error, Result, Row};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum Or<A, B> {
19    A(A),
20    B(B),
21}
22
23/// Result set kind.
24pub trait Protocol: 'static + Send + Sync {
25    fn next(conn: &mut Conn, columns: Arc<[Column]>) -> Result<Option<Row>>;
26}
27
28impl Protocol for Text {
29    fn next(conn: &mut Conn, columns: Arc<[Column]>) -> Result<Option<Row>> {
30        match conn.next_row_packet()? {
31            Some(pld) => {
32                let row = ParseBuf(&*pld).parse::<RowDeserializer<(), Text>>(columns)?;
33                Ok(Some(row.into()))
34            }
35            None => Ok(None),
36        }
37    }
38}
39
40impl Protocol for Binary {
41    fn next(conn: &mut Conn, columns: Arc<[Column]>) -> Result<Option<Row>> {
42        match conn.next_row_packet()? {
43            Some(pld) => {
44                let row = ParseBuf(&*pld).parse::<RowDeserializer<ServerSide, Binary>>(columns)?;
45                Ok(Some(row.into()))
46            }
47            None => Ok(None),
48        }
49    }
50}
51
52/// State of a result set iterator.
53#[derive(Debug)]
54enum SetIteratorState {
55    /// Iterator is in a non-empty set.
56    InSet(Arc<[Column]>),
57    /// Iterator is in an empty set.
58    InEmptySet(OkPacket<'static>),
59    /// Iterator is in an errored result set.
60    Errored(Error),
61    /// Next result set isn't handled.
62    OnBoundary,
63    /// No more result sets.
64    Done,
65}
66
67impl SetIteratorState {
68    fn ok_packet(&self) -> Option<&OkPacket<'_>> {
69        if let Self::InEmptySet(ref ok) = self {
70            Some(ok)
71        } else {
72            None
73        }
74    }
75
76    fn columns(&self) -> Option<&Arc<[Column]>> {
77        if let Self::InSet(ref cols) = self {
78            Some(cols)
79        } else {
80            None
81        }
82    }
83}
84
85impl From<Vec<Column>> for SetIteratorState {
86    fn from(columns: Vec<Column>) -> Self {
87        Self::InSet(columns.into())
88    }
89}
90
91impl From<OkPacket<'static>> for SetIteratorState {
92    fn from(ok_packet: OkPacket<'static>) -> Self {
93        Self::InEmptySet(ok_packet)
94    }
95}
96
97impl From<Error> for SetIteratorState {
98    fn from(err: Error) -> Self {
99        Self::Errored(err)
100    }
101}
102
103impl From<Or<Vec<Column>, OkPacket<'static>>> for SetIteratorState {
104    fn from(or: Or<Vec<Column>, OkPacket<'static>>) -> Self {
105        match or {
106            Or::A(cols) => Self::from(cols),
107            Or::B(ok) => Self::from(ok),
108        }
109    }
110}
111
112/// Response to a query or statement execution.
113///
114/// It is an iterator:
115/// *   over result sets (via `Self::current_set`)
116/// *   over rows of a current result set (via `Iterator` impl)
117#[derive(Debug)]
118pub struct QueryResult<'c, 't, 'tc, T: crate::prelude::Protocol> {
119    conn: ConnMut<'c, 't, 'tc>,
120    state: SetIteratorState,
121    set_index: usize,
122    protocol: PhantomData<T>,
123}
124
125impl<'c, 't, 'tc, T: crate::prelude::Protocol> QueryResult<'c, 't, 'tc, T> {
126    fn from_state(
127        conn: ConnMut<'c, 't, 'tc>,
128        state: SetIteratorState,
129    ) -> QueryResult<'c, 't, 'tc, T> {
130        QueryResult {
131            conn,
132            state,
133            set_index: 0,
134            protocol: PhantomData,
135        }
136    }
137
138    pub(crate) fn new(
139        conn: ConnMut<'c, 't, 'tc>,
140        meta: Or<Vec<Column>, OkPacket<'static>>,
141    ) -> QueryResult<'c, 't, 'tc, T> {
142        Self::from_state(conn, meta.into())
143    }
144
145    /// Updates state with the next result set, if any.
146    ///
147    /// Returns `false` if there is no next result set.
148    ///
149    /// **Requires:** `self.state == OnBoundary`
150    fn handle_next(&mut self) {
151        debug_assert!(
152            matches!(self.state, SetIteratorState::OnBoundary),
153            "self.state != OnBoundary"
154        );
155
156        if self.conn.more_results_exists() {
157            match self.conn.handle_result_set() {
158                Ok(meta) => self.state = meta.into(),
159                Err(err) => self.state = err.into(),
160            }
161            self.set_index += 1;
162        } else {
163            self.state = SetIteratorState::Done;
164        }
165    }
166
167    /// Returns an iterator over the current result set.
168    #[deprecated = "Please use QueryResult::iter"]
169    pub fn next_set<'d>(&'d mut self) -> Option<ResultSet<'c, 't, 'tc, 'd, T>> {
170        self.iter()
171    }
172
173    /// Returns an iterator over the current result set.
174    ///
175    /// The returned iterator will be consumed either by the caller
176    /// or implicitly by the `ResultSet::drop`. This operation
177    /// will advance `self` to the next result set (if any).
178    ///
179    /// The following code describes the behavior:
180    ///
181    /// ```rust
182    /// # mysql::doctest_wrapper!(__result, {
183    /// # use mysql::*;
184    /// # use mysql::prelude::*;
185    /// # let pool = Pool::new(get_opts())?;
186    /// # let mut conn = pool.get_conn()?;
187    /// # conn.query_drop("CREATE TEMPORARY TABLE mysql.tbl(id INT NOT NULL PRIMARY KEY)")?;
188    ///
189    /// let mut query_result = conn.query_iter("\
190    ///     INSERT INTO mysql.tbl (id) VALUES (3, 4);\
191    ///     SELECT * FROM mysql.tbl;
192    ///     UPDATE mysql.tbl SET id = id + 1;")?;
193    ///
194    /// // query_result is on the first result set at the moment
195    /// {
196    ///     assert_eq!(query_result.affected_rows(), 2);
197    ///     assert_eq!(query_result.last_insert_id(), Some(4));
198    ///
199    ///     let first_result_set = query_result.iter().unwrap();
200    ///     assert_eq!(first_result_set.affected_rows(), 2);
201    ///     assert_eq!(first_result_set.last_insert_id(), Some(4));
202    /// }
203    ///
204    /// // the first result set is now dropped, so query_result is on the second result set
205    /// {
206    ///     assert_eq!(query_result.affected_rows(), 0);
207    ///     assert_eq!(query_result.last_insert_id(), None);
208    ///     
209    ///     let mut second_result_set = query_result.iter().unwrap();
210    ///
211    ///     let first_row = second_result_set.next().unwrap().unwrap();
212    ///     assert_eq!(from_row::<u8>(first_row), 3_u8);
213    ///     let second_row = second_result_set.next().unwrap().unwrap();
214    ///     assert_eq!(from_row::<u8>(second_row), 4_u8);
215    ///
216    ///     assert!(second_result_set.next().is_none());
217    ///
218    ///     // second_result_set is consumed but still represents the second result set
219    ///     assert_eq!(second_result_set.affected_rows(), 0);
220    /// }
221    ///
222    /// // the second result set is now dropped, so query_result is on the third result set
223    /// assert_eq!(query_result.affected_rows(), 2);
224    ///
225    /// // QueryResult::drop simply does the following:
226    /// while query_result.iter().is_some() {}
227    /// # });
228    /// ```
229    pub fn iter<'d>(&'d mut self) -> Option<ResultSet<'c, 't, 'tc, 'd, T>> {
230        use SetIteratorState::*;
231
232        if let OnBoundary | Done = &self.state {
233            debug_assert!(
234                !self.conn.more_results_exists(),
235                "the next state must be handled by the Iterator::next"
236            );
237
238            None
239        } else {
240            Some(ResultSet {
241                set_index: self.set_index,
242                inner: self,
243            })
244        }
245    }
246
247    /// Returns the number of affected rows for the current result set.
248    pub fn affected_rows(&self) -> u64 {
249        self.state
250            .ok_packet()
251            .map(|ok| ok.affected_rows())
252            .unwrap_or_default()
253    }
254
255    /// Returns the last insert id for the current result set.
256    pub fn last_insert_id(&self) -> Option<u64> {
257        self.state
258            .ok_packet()
259            .map(|ok| ok.last_insert_id())
260            .unwrap_or_default()
261    }
262
263    /// Returns the warnings count for the current result set.
264    pub fn warnings(&self) -> u16 {
265        self.state
266            .ok_packet()
267            .map(|ok| ok.warnings())
268            .unwrap_or_default()
269    }
270
271    /// [Info] for the current result set.
272    ///
273    /// Will be empty if not defined.
274    ///
275    /// [Info]: http://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
276    pub fn info_ref(&self) -> &[u8] {
277        self.state
278            .ok_packet()
279            .and_then(|ok| ok.info_ref())
280            .unwrap_or_default()
281    }
282
283    /// [Info] for the current result set.
284    ///
285    /// Will be empty if not defined.
286    ///
287    /// [Info]: http://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
288    pub fn info_str(&self) -> Cow<str> {
289        self.state
290            .ok_packet()
291            .and_then(|ok| ok.info_str())
292            .unwrap_or_else(|| "".into())
293    }
294
295    /// Returns columns of the current result rest.
296    pub fn columns(&self) -> SetColumns {
297        SetColumns {
298            inner: self.state.columns().map(Into::into),
299        }
300    }
301}
302
303impl<'c, 't, 'tc, T: crate::prelude::Protocol> Drop for QueryResult<'c, 't, 'tc, T> {
304    fn drop(&mut self) {
305        while self.iter().is_some() {}
306    }
307}
308
309#[derive(Debug)]
310pub struct ResultSet<'a, 'b, 'c, 'd, T: crate::prelude::Protocol> {
311    set_index: usize,
312    inner: &'d mut QueryResult<'a, 'b, 'c, T>,
313}
314
315impl<'a, 'b, 'c, T: crate::prelude::Protocol> std::ops::Deref for ResultSet<'a, 'b, 'c, '_, T> {
316    type Target = QueryResult<'a, 'b, 'c, T>;
317
318    fn deref(&self) -> &Self::Target {
319        &*self.inner
320    }
321}
322
323impl<T: crate::prelude::Protocol> Iterator for ResultSet<'_, '_, '_, '_, T> {
324    type Item = Result<Row>;
325
326    fn next(&mut self) -> Option<Self::Item> {
327        if self.set_index == self.inner.set_index {
328            self.inner.next()
329        } else {
330            None
331        }
332    }
333}
334
335impl<T: crate::prelude::Protocol> Iterator for QueryResult<'_, '_, '_, T> {
336    type Item = Result<Row>;
337
338    fn next(&mut self) -> Option<Self::Item> {
339        use SetIteratorState::*;
340
341        let state = std::mem::replace(&mut self.state, OnBoundary);
342
343        match state {
344            InSet(cols) => match T::next(&mut *self.conn, cols.clone()) {
345                Ok(Some(row)) => {
346                    self.state = InSet(cols.clone());
347                    Some(Ok(row))
348                }
349                Ok(None) => {
350                    self.handle_next();
351                    None
352                }
353                Err(e) => {
354                    self.handle_next();
355                    Some(Err(e))
356                }
357            },
358            InEmptySet(_) => {
359                self.handle_next();
360                None
361            }
362            Errored(err) => {
363                self.handle_next();
364                Some(Err(err))
365            }
366            OnBoundary => None,
367            Done => {
368                self.state = Done;
369                None
370            }
371        }
372    }
373}
374
375impl<T: crate::prelude::Protocol> Drop for ResultSet<'_, '_, '_, '_, T> {
376    fn drop(&mut self) {
377        while self.next().is_some() {}
378    }
379}
380
381#[derive(Debug, Clone, PartialEq)]
382pub struct SetColumns<'a> {
383    inner: Option<&'a Arc<[Column]>>,
384}
385
386impl<'a> SetColumns<'a> {
387    /// Returns an index of a column by its name.
388    pub fn column_index<U: AsRef<str>>(&self, name: U) -> Option<usize> {
389        let name = name.as_ref().as_bytes();
390        self.inner
391            .as_ref()
392            .and_then(|cols| cols.iter().position(|col| col.name_ref() == name))
393    }
394
395    pub fn as_ref(&self) -> &[Column] {
396        self.inner
397            .as_ref()
398            .map(|cols| &(*cols)[..])
399            .unwrap_or(&[][..])
400    }
401}