msql_srv/
resultset.rs

1use crate::myc::constants::{ColumnFlags, StatusFlags};
2use crate::packet::PacketConn;
3use crate::value::ToMysqlValue;
4use crate::writers;
5use crate::{Column, ErrorKind, StatementData};
6use byteorder::WriteBytesExt;
7use std::borrow::Borrow;
8use std::collections::HashMap;
9use std::io::{self, Read, Write};
10
11/// Convenience type for responding to a client `USE <db>` command.
12pub struct InitWriter<'a, W: Read + Write> {
13    pub(crate) writer: &'a mut PacketConn<W>,
14}
15
16impl<'a, W: Read + Write + 'a> InitWriter<'a, W> {
17    /// Tell client that database context has been changed
18    pub fn ok(self) -> io::Result<()> {
19        writers::write_ok_packet(self.writer, 0, 0, StatusFlags::empty())
20    }
21
22    /// Tell client that there was a problem changing the database context.
23    /// Although you can return any valid MySQL error code you probably want
24    /// to keep it similar to the MySQL server and issue either a
25    /// `ErrorKind::ER_BAD_DB_ERROR` or a `ErrorKind::ER_DBACCESS_DENIED_ERROR`.
26    pub fn error<E>(self, kind: ErrorKind, msg: &E) -> io::Result<()>
27    where
28        E: Borrow<[u8]> + ?Sized,
29    {
30        writers::write_err(kind, msg.borrow(), self.writer)
31    }
32}
33
34/// Convenience type for responding to a client `PREPARE` command.
35///
36/// This type should not be dropped without calling
37/// [`reply`](struct.StatementMetaWriter.html#method.reply) or
38/// [`error`](struct.StatementMetaWriter.html#method.error).
39#[must_use]
40pub struct StatementMetaWriter<'a, W: Read + Write> {
41    pub(crate) writer: &'a mut PacketConn<W>,
42    pub(crate) stmts: &'a mut HashMap<u32, StatementData>,
43}
44
45impl<'a, W: Read + Write + 'a> StatementMetaWriter<'a, W> {
46    /// Reply to the client with the given meta-information.
47    ///
48    /// `id` is a statement identifier that the client should supply when it later wants to execute
49    /// this statement. `params` is a set of [`Column`](struct.Column.html) descriptors for the
50    /// parameters the client must provide when executing the prepared statement. `columns` is a
51    /// second set of [`Column`](struct.Column.html) descriptors for the values that will be
52    /// returned in each row then the statement is later executed.
53    pub fn reply<PI, CI>(self, id: u32, params: PI, columns: CI) -> io::Result<()>
54    where
55        PI: IntoIterator<Item = &'a Column>,
56        CI: IntoIterator<Item = &'a Column>,
57        <PI as IntoIterator>::IntoIter: ExactSizeIterator,
58        <CI as IntoIterator>::IntoIter: ExactSizeIterator,
59    {
60        let params = params.into_iter();
61        self.stmts.insert(
62            id,
63            StatementData {
64                params: params.len() as u16,
65                ..Default::default()
66            },
67        );
68        writers::write_prepare_ok(id, params, columns, self.writer)
69    }
70
71    /// Reply to the client's `PREPARE` with an error.
72    pub fn error<E>(self, kind: ErrorKind, msg: &E) -> io::Result<()>
73    where
74        E: Borrow<[u8]> + ?Sized,
75    {
76        writers::write_err(kind, msg.borrow(), self.writer)
77    }
78}
79
80enum Finalizer {
81    Ok { rows: u64, last_insert_id: u64 },
82    Eof,
83}
84
85/// Convenience type for providing query results to clients.
86///
87/// This type should not be dropped without calling
88/// [`start`](struct.QueryResultWriter.html#method.start),
89/// [`completed`](struct.QueryResultWriter.html#method.completed), or
90/// [`error`](struct.QueryResultWriter.html#method.error).
91///
92/// To send multiple resultsets, use
93/// [`RowWriter::finish_one`](struct.RowWriter.html#method.finish_one) and
94/// [`complete_one`](struct.QueryResultWriter.html#method.complete_one). These are similar to
95/// `RowWriter::finish` and `completed`, but both eventually yield back the `QueryResultWriter` so
96/// that another resultset can be sent. To indicate that no more resultset will be sent, call
97/// [`no_more_results`](struct.QueryResultWriter.html#method.no_more_results). All methods on
98/// `QueryResultWriter` (except `no_more_results`) automatically start a new resultset. The
99/// `QueryResultWriter` *may* be dropped without calling `no_more_results`, but in this case the
100/// program may panic if an I/O error occurs when sending the end-of-records marker to the client.
101/// To handle such errors, call `no_more_results` explicitly.
102#[must_use]
103pub struct QueryResultWriter<'a, W: Read + Write> {
104    // XXX: specialization instead?
105    pub(crate) is_bin: bool,
106    pub(crate) writer: &'a mut PacketConn<W>,
107    last_end: Option<Finalizer>,
108}
109
110impl<'a, W: Read + Write> QueryResultWriter<'a, W> {
111    pub(crate) fn new(writer: &'a mut PacketConn<W>, is_bin: bool) -> Self {
112        QueryResultWriter {
113            is_bin,
114            writer,
115            last_end: None,
116        }
117    }
118
119    fn finalize(&mut self, more_exists: bool) -> io::Result<()> {
120        let mut status = StatusFlags::empty();
121        if more_exists {
122            status.set(StatusFlags::SERVER_MORE_RESULTS_EXISTS, true);
123        }
124        match self.last_end.take() {
125            None => Ok(()),
126            Some(Finalizer::Ok {
127                rows,
128                last_insert_id,
129            }) => writers::write_ok_packet(self.writer, rows, last_insert_id, status),
130            Some(Finalizer::Eof) => writers::write_eof_packet(self.writer, status),
131        }
132    }
133
134    /// Start a resultset response to the client that conforms to the given `columns`.
135    ///
136    /// Note that if no columns are emitted, any written rows are ignored.
137    ///
138    /// See [`RowWriter`](struct.RowWriter.html).
139    pub fn start(mut self, columns: &'a [Column]) -> io::Result<RowWriter<'a, W>> {
140        self.finalize(true)?;
141        RowWriter::new(self, columns)
142    }
143
144    /// Send an empty resultset response to the client indicating that `rows` rows were affected by
145    /// the query in this resultset. `last_insert_id` may be given to communiate an identifier for
146    /// a client's most recent insertion.
147    pub fn complete_one(mut self, rows: u64, last_insert_id: u64) -> io::Result<Self> {
148        self.finalize(true)?;
149        self.last_end = Some(Finalizer::Ok {
150            rows,
151            last_insert_id,
152        });
153        Ok(self)
154    }
155
156    /// Send an empty resultset response to the client indicating that `rows` rows were affected by
157    /// the query. `last_insert_id` may be given to communiate an identifier for a client's most
158    /// recent insertion.
159    pub fn completed(self, rows: u64, last_insert_id: u64) -> io::Result<()> {
160        self.complete_one(rows, last_insert_id)?.no_more_results()
161    }
162
163    /// Reply to the client's query with an error.
164    pub fn error<E>(mut self, kind: ErrorKind, msg: &E) -> io::Result<()>
165    where
166        E: Borrow<[u8]> + ?Sized,
167    {
168        self.finalize(true)?;
169        writers::write_err(kind, msg.borrow(), self.writer)
170    }
171
172    /// Send the last bits of the last resultset to the client, and indicate that there are no more
173    /// resultsets coming.
174    pub fn no_more_results(mut self) -> io::Result<()> {
175        self.finalize(false)
176    }
177}
178
179impl<'a, W: Read + Write> Drop for QueryResultWriter<'a, W> {
180    fn drop(&mut self) {
181        self.finalize(false).unwrap();
182    }
183}
184
185/// Convenience type for sending rows of a resultset to a client.
186///
187/// Rows can either be written out one column at a time (using
188/// [`write_col`](struct.RowWriter.html#method.write_col) and
189/// [`end_row`](struct.RowWriter.html#method.end_row)), or one row at a time (using
190/// [`write_row`](struct.RowWriter.html#method.write_row)).
191///
192/// This type *may* be dropped without calling
193/// [`write_row`](struct.RowWriter.html#method.write_row) or
194/// [`finish`](struct.RowWriter.html#method.finish). However, in this case, the program may panic
195/// if an I/O error occurs when sending the end-of-records marker to the client. To avoid this,
196/// call [`finish`](struct.RowWriter.html#method.finish) explicitly.
197#[must_use]
198pub struct RowWriter<'a, W: Read + Write> {
199    result: Option<QueryResultWriter<'a, W>>,
200    bitmap_len: usize,
201    data: Vec<u8>,
202    columns: &'a [Column],
203
204    // next column to write for the current row
205    // NOTE: (ab)used to track number of *rows* for a zero-column resultset
206    col: usize,
207
208    finished: bool,
209}
210
211impl<'a, W> RowWriter<'a, W>
212where
213    W: Read + Write + 'a,
214{
215    fn new(
216        result: QueryResultWriter<'a, W>,
217        columns: &'a [Column],
218    ) -> io::Result<RowWriter<'a, W>> {
219        let bitmap_len = (columns.len() + 7 + 2) / 8;
220        let mut rw = RowWriter {
221            result: Some(result),
222            columns,
223            bitmap_len,
224            data: Vec::new(),
225
226            col: 0,
227
228            finished: false,
229        };
230        rw.start()?;
231        Ok(rw)
232    }
233
234    #[inline]
235    fn start(&mut self) -> io::Result<()> {
236        if !self.columns.is_empty() {
237            writers::column_definitions(self.columns, self.result.as_mut().unwrap().writer)?;
238        }
239        Ok(())
240    }
241
242    /// Write a value to the next column of the current row as a part of this resultset.
243    ///
244    /// If you do not call [`end_row`](struct.RowWriter.html#method.end_row) after the last row,
245    /// any errors that occur when writing out the last row will be returned by
246    /// [`finish`](struct.RowWriter.html#method.finish). If you do not call `finish` either, any
247    /// errors will cause a panic when the `RowWriter` is dropped.
248    ///
249    /// Note that the row *must* conform to the column specification provided to
250    /// [`QueryResultWriter::start`](struct.QueryResultWriter.html#method.start). If it does not,
251    /// this method will return an error indicating that an invalid value type or specification was
252    /// provided.
253    pub fn write_col<T>(&mut self, v: T) -> io::Result<()>
254    where
255        T: ToMysqlValue,
256    {
257        if self.columns.is_empty() {
258            return Ok(());
259        }
260
261        if self.result.as_mut().unwrap().is_bin {
262            if self.col == 0 {
263                self.result.as_mut().unwrap().writer.write_u8(0x00)?;
264
265                // leave space for nullmap
266                self.data.resize(self.bitmap_len, 0);
267            }
268
269            let c = self.columns.get(self.col).ok_or_else(|| {
270                io::Error::new(
271                    io::ErrorKind::InvalidData,
272                    "row has more columns than specification",
273                )
274            })?;
275            if v.is_null() {
276                if c.colflags.contains(ColumnFlags::NOT_NULL_FLAG) {
277                    return Err(io::Error::new(
278                        io::ErrorKind::InvalidData,
279                        "given NULL value for NOT NULL column",
280                    ));
281                } else {
282                    // https://web.archive.org/web/20170404144156/https://dev.mysql.com/doc/internals/en/null-bitmap.html
283                    // NULL-bitmap-byte = ((field-pos + offset) / 8)
284                    // NULL-bitmap-bit  = ((field-pos + offset) % 8)
285                    self.data[(self.col + 2) / 8] |= 1u8 << ((self.col + 2) % 8);
286                }
287            } else {
288                v.to_mysql_bin(&mut self.data, c)?;
289            }
290        } else {
291            v.to_mysql_text(self.result.as_mut().unwrap().writer)?;
292        }
293        self.col += 1;
294        Ok(())
295    }
296
297    /// Indicate that no more column data will be written for the current row.
298    pub fn end_row(&mut self) -> io::Result<()> {
299        if self.columns.is_empty() {
300            self.col += 1;
301            return Ok(());
302        }
303
304        if self.col != self.columns.len() {
305            return Err(io::Error::new(
306                io::ErrorKind::InvalidData,
307                "row has fewer columns than specification",
308            ));
309        }
310
311        if self.result.as_mut().unwrap().is_bin {
312            self.result
313                .as_mut()
314                .unwrap()
315                .writer
316                .write_all(&self.data[..])?;
317            self.data.clear();
318        }
319        self.result.as_mut().unwrap().writer.end_packet()?;
320        self.col = 0;
321
322        Ok(())
323    }
324
325    /// Write a single row as a part of this resultset.
326    ///
327    /// Note that the row *must* conform to the column specification provided to
328    /// [`QueryResultWriter::start`](struct.QueryResultWriter.html#method.start). If it does not,
329    /// this method will return an error indicating that an invalid value type or specification was
330    /// provided.
331    pub fn write_row<I, E>(&mut self, row: I) -> io::Result<()>
332    where
333        I: IntoIterator<Item = E>,
334        E: ToMysqlValue,
335    {
336        if !self.columns.is_empty() {
337            for v in row {
338                self.write_col(v)?;
339            }
340        }
341        self.end_row()
342    }
343}
344
345impl<'a, W: Read + Write + 'a> RowWriter<'a, W> {
346    fn finish_inner(&mut self, complete: bool) -> io::Result<()> {
347        if self.finished {
348            return Ok(());
349        }
350
351        self.finished = true;
352
353        if !self.columns.is_empty() && self.col != 0 {
354            self.end_row()?;
355        }
356
357        if complete {
358            if self.columns.is_empty() {
359                // response to no column query is always an OK packet
360                // we've kept track of the number of rows in col (hacky, I know)
361                self.result.as_mut().unwrap().last_end = Some(Finalizer::Ok {
362                    rows: self.col as u64,
363                    last_insert_id: 0,
364                });
365            } else {
366                // we wrote out at least one row
367                self.result.as_mut().unwrap().last_end = Some(Finalizer::Eof);
368            }
369        }
370
371        Ok(())
372    }
373
374    /// Indicate to the client that no more rows are coming.
375    pub fn finish(self) -> io::Result<()> {
376        self.finish_one()?.no_more_results()
377    }
378
379    /// End this resultset response, and indicate to the client that no more rows are coming.
380    pub fn finish_one(mut self) -> io::Result<QueryResultWriter<'a, W>> {
381        self.finish_inner(true)?;
382
383        // we know that dropping self will see self.finished == true,
384        // and so Drop won't try to use self.result.
385        Ok(self.result.take().unwrap())
386    }
387
388    /// End this resultset response, and indicate to the client there was an error.
389    pub fn finish_error<E>(mut self, kind: ErrorKind, msg: &E) -> io::Result<()>
390    where
391        E: Borrow<[u8]>,
392    {
393        self.finish_inner(false)?;
394
395        self.result.take().unwrap().error(kind, msg)
396    }
397}
398
399impl<'a, W: Read + Write + 'a> Drop for RowWriter<'a, W> {
400    fn drop(&mut self) {
401        self.finish_inner(true).unwrap();
402    }
403}