skytable 0.4.0-alpha.1

Official Rust client driver for Skytable
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
/*
 * Created on Tue May 11 2021
 *
 * Copyright (c) 2021 Sayan Nandan <nandansayan@outlook.com>
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *    http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
*/

//! # The Skyhash Protocol
//!
//! ## Introduction
//! The Skyhash Protocol is a serialization protocol that is used by Skytable for client/server communication.
//! It works in a query/response action similar to HTTP's request/response action. Skyhash supersedes the Terrapipe
//! protocol as a more simple, reliable, robust and scalable protocol.
//!
//! This module contains the [`Parser`] for the Skyhash protocol and it's enough to just pass a query packet as
//! a slice of unsigned 8-bit integers and the parser will do everything else. The Skyhash protocol was designed
//! by Sayan Nandan and this is the first client implementation of the protocol
//!

use crate::RespCode;
use std::hint::unreachable_unchecked;

#[derive(Debug)]
/// # Skyhash Deserializer (Parser)
///
/// The [`Parser`] object can be used to deserialized a packet serialized by Skyhash which in turn serializes
/// it into data structures native to the Rust Language (and some Compound Types built on top of them).
///
/// ## Evaluation
///
/// The parser is pessimistic in most cases and will readily throw out any errors. On non-recusrive types
/// there is no recursion, but the parser will use implicit recursion for nested arrays. The parser will
/// happily not report any errors if some part of the next query was passed. This is very much a possibility
/// and so has been accounted for
///
/// ## Important note
///
/// All developers willing to modify the deserializer must keep this in mind: the cursor is always Ahead-Of-Position
/// that is the cursor should always point at the next character that can be read.
///
pub(super) struct Parser<'a> {
    /// The internal cursor position
    ///
    /// Do not even think of touching this externally
    cursor: usize,
    /// The buffer slice
    buffer: &'a [u8],
}

#[derive(Debug, PartialEq)]
#[non_exhaustive]
/// # Data Types
///
/// This enum represents the data types supported by the Skyhash Protocol
pub enum Element {
    /// Arrays can be nested! Their `<tsymbol>` is `&`
    Array(Vec<Element>),
    /// A String value; `<tsymbol>` is `+`
    String(String),
    /// An unsigned integer value; `<tsymbol>` is `:`
    UnsignedInt(u64),
    /// A non-recursive String array; tsymbol: `_`
    FlatArray(Vec<String>),
    /// A response code
    RespCode(RespCode),
}

#[derive(Debug, PartialEq)]
/// # Parser Errors
///
/// Several errors can arise during parsing and this enum accounts for them
pub enum ParseError {
    /// Didn't get the number of expected bytes
    NotEnough,
    /// The query contains an unexpected byte
    UnexpectedByte,
    /// The packet simply contains invalid data
    ///
    /// This is rarely returned and only in the special cases where a bad client sends `0` as
    /// the query count
    BadPacket,
    /// A data type was given but the parser failed to serialize it into this type
    ///
    /// This can happen not just for elements but can also happen for their sizes ([`Self::parse_into_u64`])
    DataTypeParseError,
    /// A data type that the client doesn't know was passed into the query
    ///
    /// This is a frequent problem that can arise between different server editions as more data types
    /// can be added with changing server versions
    UnknownDatatype,
    /// The query is empty
    ///
    /// The **parser will never return this**, but instead it is provided for convenience with [`dbnet`]
    Empty,
}

#[derive(Debug, PartialEq)]
/// # Types of Response
///
/// A simple response carries the data for one action while a complex response carries data for
/// multiple actions
pub enum RawResponse {
    /// A simple query will just hold one element
    SimpleQuery(Element),
    /// A pipelined/batch query will hold multiple elements
    PipelinedQuery(Vec<Element>),
}

/// A generic result to indicate parsing errors thorugh the [`ParseError`] enum
pub type ParseResult<T> = Result<T, ParseError>;

impl<'a> Parser<'a> {
    /// Initialize a new parser instance
    pub const fn new(buffer: &'a [u8]) -> Self {
        Parser {
            cursor: 0usize,
            buffer,
        }
    }
    /// Read from the current cursor position to `until` number of positions ahead
    /// This **will forward the cursor itself** if the bytes exist or it will just return a `NotEnough` error
    fn read_until(&mut self, until: usize) -> ParseResult<&[u8]> {
        if let Some(b) = self.buffer.get(self.cursor..self.cursor + until) {
            self.cursor += until;
            Ok(b)
        } else {
            Err(ParseError::NotEnough)
        }
    }
    /// This returns the position at which the line parsing began and the position at which the line parsing
    /// stopped, in other words, you should be able to do self.buffer[started_at..stopped_at] to get a line
    /// and do it unchecked. This **will move the internal cursor ahead** and place it **at the `\n` byte**
    fn read_line(&mut self) -> (usize, usize) {
        let started_at = self.cursor;
        let mut stopped_at = self.cursor;
        while self.cursor < self.buffer.len() {
            if self.buffer[self.cursor] == b'\n' {
                // Oh no! Newline reached, time to break the loop
                // But before that ... we read the newline, so let's advance the cursor
                self.incr_cursor();
                break;
            }
            // So this isn't an LF, great! Let's forward the stopped_at position
            stopped_at += 1;
            self.incr_cursor();
        }
        (started_at, stopped_at)
    }
    /// Push the internal cursor ahead by one
    fn incr_cursor(&mut self) {
        self.cursor += 1;
    }
    /// This function will evaluate if the byte at the current cursor position equals the `ch` argument, i.e
    /// the expression `*v == ch` is evaluated. However, if no element is present ahead, then the function
    /// will return `Ok(_this_if_nothing_ahead_)`
    fn will_cursor_give_char(&self, ch: u8, this_if_nothing_ahead: bool) -> ParseResult<bool> {
        self.buffer.get(self.cursor).map_or(
            if this_if_nothing_ahead {
                Ok(true)
            } else {
                Err(ParseError::NotEnough)
            },
            |v| Ok(*v == ch),
        )
    }
    /// Will the current cursor position give a linefeed? This will return `ParseError::NotEnough` if
    /// the current cursor points at a non-existent index in `self.buffer`
    fn will_cursor_give_linefeed(&self) -> ParseResult<bool> {
        self.will_cursor_give_char(b'\n', false)
    }
    /// Parse a stream of bytes into [`usize`]
    fn parse_into_usize(bytes: &[u8]) -> ParseResult<usize> {
        if bytes.is_empty() {
            return Err(ParseError::NotEnough);
        }
        let byte_iter = bytes.iter();
        let mut item_usize = 0usize;
        for dig in byte_iter {
            if !dig.is_ascii_digit() {
                // dig has to be an ASCII digit
                return Err(ParseError::DataTypeParseError);
            }
            // 48 is the ASCII code for 0, and 57 is the ascii code for 9
            // so if 0 is given, the subtraction should give 0; similarly
            // if 9 is given, the subtraction should give us 9!
            let curdig: usize = dig
                .checked_sub(48)
                .unwrap_or_else(|| unsafe { unreachable_unchecked() })
                .into();
            // The usize can overflow; check that case
            let product = match item_usize.checked_mul(10) {
                Some(not_overflowed) => not_overflowed,
                None => return Err(ParseError::DataTypeParseError),
            };
            let sum = match product.checked_add(curdig) {
                Some(not_overflowed) => not_overflowed,
                None => return Err(ParseError::DataTypeParseError),
            };
            item_usize = sum;
        }
        Ok(item_usize)
    }
    /// Pasre a stream of bytes into an [`u64`]
    fn parse_into_u64(bytes: &[u8]) -> ParseResult<u64> {
        if bytes.is_empty() {
            return Err(ParseError::NotEnough);
        }
        let byte_iter = bytes.iter();
        let mut item_u64 = 0u64;
        for dig in byte_iter {
            if !dig.is_ascii_digit() {
                // dig has to be an ASCII digit
                return Err(ParseError::DataTypeParseError);
            }
            // 48 is the ASCII code for 0, and 57 is the ascii code for 9
            // so if 0 is given, the subtraction should give 0; similarly
            // if 9 is given, the subtraction should give us 9!
            let curdig: u64 = dig
                .checked_sub(48)
                .unwrap_or_else(|| unsafe { unreachable_unchecked() })
                .into();
            // Now the entire u64 can overflow, so let's attempt to check it
            let product = match item_u64.checked_mul(10) {
                Some(not_overflowed) => not_overflowed,
                None => return Err(ParseError::DataTypeParseError),
            };
            let sum = match product.checked_add(curdig) {
                Some(not_overflowed) => not_overflowed,
                None => return Err(ParseError::DataTypeParseError),
            };
            item_u64 = sum;
        }
        Ok(item_u64)
    }
    /// This will return the number of datagroups present in this query packet
    ///
    /// This **will forward the cursor itself**
    fn parse_metaframe_get_datagroup_count(&mut self) -> ParseResult<usize> {
        // the smallest query we can have is: *1\n or 3 chars
        if self.buffer.len() < 3 {
            return Err(ParseError::NotEnough);
        }
        // Now we want to read `*<n>\n`
        let (start, stop) = self.read_line();
        if let Some(our_chunk) = self.buffer.get(start..stop) {
            if our_chunk[0] == b'*' {
                // Good, this will tell us the number of actions
                // Let us attempt to read the usize from this point onwards
                // that is excluding the '*' (so 1..)
                let ret = Self::parse_into_usize(&our_chunk[1..])?;
                Ok(ret)
            } else {
                Err(ParseError::UnexpectedByte)
            }
        } else {
            Err(ParseError::NotEnough)
        }
    }
    /// Get the next element **without** the tsymbol
    ///
    /// This function **does not forward the newline**
    fn __get_next_element(&mut self) -> ParseResult<&[u8]> {
        let string_sizeline = self.read_line();
        if let Some(line) = self.buffer.get(string_sizeline.0..string_sizeline.1) {
            let string_size = Self::parse_into_usize(line)?;
            let our_chunk = self.read_until(string_size)?;
            Ok(our_chunk)
        } else {
            Err(ParseError::NotEnough)
        }
    }
    /// The cursor should have passed the `+` tsymbol
    fn parse_next_string(&mut self) -> ParseResult<String> {
        let our_string_chunk = self.__get_next_element()?;
        let our_string = String::from_utf8_lossy(&our_string_chunk).to_string();
        if self.will_cursor_give_linefeed()? {
            // there is a lf after the end of the string; great!
            // let's skip that now
            self.incr_cursor();
            // let's return our string
            Ok(our_string)
        } else {
            Err(ParseError::UnexpectedByte)
        }
    }
    /// The cursor should have passed the `:` tsymbol
    fn parse_next_u64(&mut self) -> ParseResult<u64> {
        let our_u64_chunk = self.__get_next_element()?;
        let our_u64 = Self::parse_into_u64(our_u64_chunk)?;
        if self.will_cursor_give_linefeed()? {
            // line feed after u64; heck yeah!
            self.incr_cursor();
            // return it
            Ok(our_u64)
        } else {
            Err(ParseError::UnexpectedByte)
        }
    }
    fn parse_next_respcode(&mut self) -> ParseResult<RespCode> {
        let our_respcode_chunk = self.__get_next_element()?;
        let our_respcode = RespCode::from_str(&String::from_utf8_lossy(our_respcode_chunk));
        if self.will_cursor_give_linefeed()? {
            self.incr_cursor();
            Ok(our_respcode)
        } else {
            Err(ParseError::UnexpectedByte)
        }
    }
    /// The cursor should be **at the tsymbol**
    fn parse_next_element(&mut self) -> ParseResult<Element> {
        if let Some(tsymbol) = self.buffer.get(self.cursor) {
            // so we have a tsymbol; nice, let's match it
            // but advance the cursor before doing that
            self.incr_cursor();
            let ret = match *tsymbol {
                b'+' => Element::String(self.parse_next_string()?),
                b':' => Element::UnsignedInt(self.parse_next_u64()?),
                b'&' => Element::Array(self.parse_next_array()?),
                b'!' => Element::RespCode(self.parse_next_respcode()?),
                b'_' => Element::FlatArray(self.parse_next_flat_array()?),
                _ => return Err(ParseError::UnknownDatatype),
            };
            Ok(ret)
        } else {
            // Not enough bytes to read an element
            Err(ParseError::NotEnough)
        }
    }
    /// The cursor should have passed the tsymbol
    fn parse_next_flat_array(&mut self) -> ParseResult<Vec<String>> {
        let (start, stop) = self.read_line();
        if let Some(our_size_chunk) = self.buffer.get(start..stop) {
            let array_size = Self::parse_into_usize(&our_size_chunk)?;
            let mut array = Vec::with_capacity(array_size);
            for _ in 0..array_size {
                if let Some(tsymbol) = self.buffer.get(self.cursor) {
                    // good, there is a tsymbol; move the cursor ahead
                    self.incr_cursor();
                    let ret = match *tsymbol {
                        b'+' => self.parse_next_string()?,
                        _ => return Err(ParseError::UnknownDatatype),
                    };
                    array.push(ret);
                } else {
                    return Err(ParseError::NotEnough);
                }
            }
            Ok(array)
        } else {
            Err(ParseError::NotEnough)
        }
    }
    /// The tsymbol `&` should have been passed!
    fn parse_next_array(&mut self) -> ParseResult<Vec<Element>> {
        let (start, stop) = self.read_line();
        if let Some(our_size_chunk) = self.buffer.get(start..stop) {
            let array_size = Self::parse_into_usize(our_size_chunk)?;
            let mut array = Vec::with_capacity(array_size);
            for _ in 0..array_size {
                array.push(self.parse_next_element()?);
            }
            Ok(array)
        } else {
            Err(ParseError::NotEnough)
        }
    }
    /// Parse a query and return the [`Query`] and an `usize` indicating the number of bytes that
    /// can be safely discarded from the buffer. It will otherwise return errors if they are found.
    ///
    /// This object will drop `Self`
    pub fn parse(mut self) -> Result<(RawResponse, usize), ParseError> {
        let number_of_queries = self.parse_metaframe_get_datagroup_count()?;
        if number_of_queries == 0 {
            // how on earth do you expect us to execute 0 queries? waste of bandwidth
            return Err(ParseError::BadPacket);
        }
        if number_of_queries == 1 {
            // This is a simple query
            let single_group = self.parse_next_element()?;
            // The below line defaults to false if no item is there in the buffer
            // or it checks if the next time is a \r char; if it is, then it is the beginning
            // of the next query
            #[allow(clippy::blocks_in_if_conditions)]
            // this lint is pointless here, just some optimizations
            if self
                .will_cursor_give_char(b'*', true)
                .unwrap_or_else(|_| unsafe {
                    // This will never be the case because we'll always get a result and no error value
                    // as we've passed true which will yield Ok(true) even if there is no byte ahead
                    unreachable_unchecked()
                })
            {
                Ok((RawResponse::SimpleQuery(single_group), self.cursor))
            } else {
                // the next item isn't the beginning of a query but something else?
                // that doesn't look right!
                Err(ParseError::UnexpectedByte)
            }
        } else {
            // This is a pipelined query
            // We'll first make space for all the actiongroups
            let mut queries = Vec::with_capacity(number_of_queries);
            for _ in 0..number_of_queries {
                queries.push(self.parse_next_element()?);
            }
            if self.will_cursor_give_char(b'*', true)? {
                Ok((RawResponse::PipelinedQuery(queries), self.cursor))
            } else {
                Err(ParseError::UnexpectedByte)
            }
        }
    }
}