1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
use serde::Serialize;
use serde_json::Error as SerdeError;
use sqlx_core::{arguments::Arguments, encode::Encode, error::BoxDynError, types::Type};
use crate::{database::Exasol, error::ExaProtocolError, type_info::ExaTypeInfo};
#[derive(Debug, Default)]
pub struct ExaArguments {
pub buf: ExaBuffer,
pub types: Vec<ExaTypeInfo>,
}
impl<'q> Arguments<'q> for ExaArguments {
type Database = Exasol;
fn reserve(&mut self, additional: usize, size: usize) {
self.types.reserve(additional);
self.buf.inner.reserve(size);
}
fn add<T>(&mut self, value: T) -> Result<(), BoxDynError>
where
T: 'q + Encode<'q, Self::Database> + Type<Self::Database>,
{
let ty = value.produces().unwrap_or_else(T::type_info);
self.buf.start_seq();
let _ = value.encode(&mut self.buf)?;
self.buf.end_seq();
self.buf.add_separator();
self.buf.check_param_count()?;
self.types.push(ty);
Ok(())
}
fn len(&self) -> usize {
self.types.len()
}
}
#[derive(Debug)]
pub struct ExaBuffer {
pub(crate) inner: Vec<u8>,
pub(crate) params_count: usize,
pub(crate) num_param_sets: Option<usize>,
}
impl ExaBuffer {
/// Serializes and appends a value to this buffer.
pub fn append<T>(&mut self, value: T) -> Result<(), SerdeError>
where
T: Serialize,
{
self.params_count += 1;
serde_json::to_writer(&mut self.inner, &value)
}
/// Serializes and appends an iterator of values to this buffer.
pub fn append_iter<'q, I, T>(&mut self, iter: I) -> Result<(), BoxDynError>
where
I: IntoIterator<Item = T>,
T: 'q + Encode<'q, Exasol> + Type<Exasol>,
{
let mut iter = iter.into_iter();
if let Some(value) = iter.next() {
let _ = value.encode(self)?;
}
for value in iter {
self.add_separator();
let _ = value.encode(self)?;
}
Ok(())
}
/// Outputs the numbers of parameter sets in the buffer.
///
/// # Errors
///
/// Will throw an error if a mismatch was recorded.
pub(crate) fn num_param_sets(&self) -> usize {
self.num_param_sets.unwrap_or_default()
}
/// Ends the main sequence serialization in the buffer.
///
/// We're technically always guaranteed to have at least
/// one byte in the buffer as this type is only created
/// by [`sqlx`] through it's [`Default`] implementation.
///
/// It will either overwrite the `[` set by default
/// or a `,` separator automatically set when an element
/// is encoded and added.
pub(crate) fn finalize(&mut self) {
let b = self.inner.last_mut().expect("buffer cannot be empty");
*b = b']';
}
/// Adds the sequence serialization start to the buffer.
fn start_seq(&mut self) {
self.inner.push(b'[');
}
/// Adds the sequence serialization start to the buffer.
fn end_seq(&mut self) {
self.inner.push(b']');
}
/// Adds the sequence serialization separator to the buffer.
fn add_separator(&mut self) {
self.inner.push(b',');
}
/// Registers the number of rows we bound parameters for.
///
/// The first time we add an argument, we store the number of rows
/// we pass parameters for. This is useful for when arrays of
/// parameters get passed for each column.
///
/// All subsequent calls will check that the number of rows is the same.
/// If it is not, the first mismatch is recorded so we can throw
/// an error later (before sending data to the database).
///
/// This is also due to `Encode` not throwing errors.
fn check_param_count(&mut self) -> Result<(), ExaProtocolError> {
let count = self.params_count;
// We must reset the count in preparation for the next parameter.
self.params_count = 0;
match self.num_param_sets {
Some(n) if n == count => (),
Some(n) => Err(ExaProtocolError::ParameterLengthMismatch(count, n))?,
None => self.num_param_sets = Some(count),
};
Ok(())
}
}
impl Default for ExaBuffer {
fn default() -> Self {
let inner = Vec::with_capacity(1);
let mut buffer = Self {
inner,
params_count: 0,
num_param_sets: None,
};
buffer.start_seq();
buffer
}
}