1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use serde::Serialize;
use serde_json::Error as SerdeError;
use sqlx_core::{arguments::Arguments, encode::Encode, error::BoxDynError, types::Type};

use crate::{database::Exasol, error::ExaProtocolError, type_info::ExaTypeInfo};

#[derive(Debug, Default)]
pub struct ExaArguments {
    pub buf: ExaBuffer,
    pub types: Vec<ExaTypeInfo>,
}

impl<'q> Arguments<'q> for ExaArguments {
    type Database = Exasol;

    fn reserve(&mut self, additional: usize, size: usize) {
        self.types.reserve(additional);
        self.buf.inner.reserve(size);
    }

    fn add<T>(&mut self, value: T) -> Result<(), BoxDynError>
    where
        T: 'q + Encode<'q, Self::Database> + Type<Self::Database>,
    {
        let ty = value.produces().unwrap_or_else(T::type_info);

        self.buf.start_seq();
        let _ = value.encode(&mut self.buf)?;
        self.buf.end_seq();
        self.buf.add_separator();

        self.buf.check_param_count()?;

        self.types.push(ty);

        Ok(())
    }

    fn len(&self) -> usize {
        self.types.len()
    }
}

#[derive(Debug)]
pub struct ExaBuffer {
    pub(crate) inner: Vec<u8>,
    pub(crate) params_count: usize,
    pub(crate) num_param_sets: Option<usize>,
}

impl ExaBuffer {
    /// Serializes and appends a value to this buffer.
    pub fn append<T>(&mut self, value: T) -> Result<(), SerdeError>
    where
        T: Serialize,
    {
        self.params_count += 1;
        serde_json::to_writer(&mut self.inner, &value)
    }

    /// Serializes and appends an iterator of values to this buffer.
    pub fn append_iter<'q, I, T>(&mut self, iter: I) -> Result<(), BoxDynError>
    where
        I: IntoIterator<Item = T>,
        T: 'q + Encode<'q, Exasol> + Type<Exasol>,
    {
        let mut iter = iter.into_iter();

        if let Some(value) = iter.next() {
            let _ = value.encode(self)?;
        }

        for value in iter {
            self.add_separator();
            let _ = value.encode(self)?;
        }

        Ok(())
    }

    /// Outputs the numbers of parameter sets in the buffer.
    ///
    /// # Errors
    ///
    /// Will throw an error if a mismatch was recorded.
    pub(crate) fn num_param_sets(&self) -> usize {
        self.num_param_sets.unwrap_or_default()
    }

    /// Ends the main sequence serialization in the buffer.
    ///
    /// We're technically always guaranteed to have at least
    /// one byte in the buffer as this type is only created
    /// by [`sqlx`] through it's [`Default`] implementation.
    ///
    /// It will either overwrite the `[` set by default
    /// or a `,` separator automatically set when an element
    /// is encoded and added.
    pub(crate) fn finalize(&mut self) {
        let b = self.inner.last_mut().expect("buffer cannot be empty");
        *b = b']';
    }

    /// Adds the sequence serialization start to the buffer.
    fn start_seq(&mut self) {
        self.inner.push(b'[');
    }

    /// Adds the sequence serialization start to the buffer.
    fn end_seq(&mut self) {
        self.inner.push(b']');
    }

    /// Adds the sequence serialization separator to the buffer.
    fn add_separator(&mut self) {
        self.inner.push(b',');
    }

    /// Registers the number of rows we bound parameters for.
    ///
    /// The first time we add an argument, we store the number of rows
    /// we pass parameters for. This is useful for when arrays of
    /// parameters get passed for each column.
    ///
    /// All subsequent calls will check that the number of rows is the same.
    /// If it is not, the first mismatch is recorded so we can throw
    /// an error later (before sending data to the database).
    ///
    /// This is also due to `Encode` not throwing errors.
    fn check_param_count(&mut self) -> Result<(), ExaProtocolError> {
        let count = self.params_count;

        // We must reset the count in preparation for the next parameter.
        self.params_count = 0;

        match self.num_param_sets {
            Some(n) if n == count => (),
            Some(n) => Err(ExaProtocolError::ParameterLengthMismatch(count, n))?,
            None => self.num_param_sets = Some(count),
        };

        Ok(())
    }
}

impl Default for ExaBuffer {
    fn default() -> Self {
        let inner = Vec::with_capacity(1);
        let mut buffer = Self {
            inner,
            params_count: 0,
            num_param_sets: None,
        };

        buffer.start_seq();
        buffer
    }
}