tarantool/network/protocol/
codec.rs

1use std::io::{self, Cursor, Read, Seek, Write};
2use std::os::raw::c_char;
3
4use crate::auth::AuthMethod;
5use crate::error::Error;
6use crate::error::TarantoolError;
7use crate::index::IteratorType;
8use crate::msgpack;
9use crate::network::protocol::ProtocolError;
10use crate::tuple::{ToTupleBuffer, Tuple};
11
12use super::SyncIndex;
13
14const MP_STR_MAX_HEADER_SIZE: usize = 5;
15
16/// Keys of the HEADER and BODY maps in the iproto packets.
17///
18/// See `enum iproto_key` in \<tarantool>/src/box/iproto_constants.h for source
19/// of truth.
20pub mod iproto_key {
21    pub const REQUEST_TYPE: u8 = 0x00;
22    pub const SYNC: u8 = 0x01;
23    // ...
24    pub const SCHEMA_VERSION: u8 = 0x05;
25    // ...
26    pub const SPACE_ID: u8 = 0x10;
27    pub const INDEX_ID: u8 = 0x11;
28    pub const LIMIT: u8 = 0x12;
29    pub const OFFSET: u8 = 0x13;
30    pub const ITERATOR: u8 = 0x14;
31    pub const INDEX_BASE: u8 = 0x15;
32    // ...
33    pub const KEY: u8 = 0x20;
34    pub const TUPLE: u8 = 0x21;
35    pub const FUNCTION_NAME: u8 = 0x22;
36    pub const USER_NAME: u8 = 0x23;
37    // ...
38    pub const EXPR: u8 = 0x27;
39    pub const OPS: u8 = 0x28;
40    // ...
41    pub const DATA: u8 = 0x30;
42    pub const ERROR: u8 = 0x31;
43    // ...
44    pub const SQL_TEXT: u8 = 0x40;
45    pub const SQL_BIND: u8 = 0x41;
46    pub const SQL_INFO: u8 = 0x42;
47    // ...
48    pub const STMT_ID: u8 = 0x43;
49    // ...
50    pub const ERROR_EXT: u8 = 0x52;
51    // ...
52}
53use iproto_key::*;
54
55crate::define_enum_with_introspection! {
56    /// Iproto packet type.
57    ///
58    /// See `enum iproto_type` in \<tarantool>/src/box/iproto_constants.h for source
59    /// of truth.
60    #[non_exhaustive]
61    #[repr(C)]
62    pub enum IProtoType {
63        /// This packet is a response with status success.
64        Ok = 0,
65        Select = 1,
66        Insert = 2,
67        Replace = 3,
68        Update = 4,
69        Delete = 5,
70        /// Deprecated in Tarantool 1.6 with name `IPROTO_CALL_16`.
71        /// Superseeded by `IPROTO_CALL`, see [`IProtoType::Call`].
72        LegacyCall = 6,
73        Auth = 7,
74        Eval = 8,
75        Upsert = 9,
76        Call = 10,
77        Execute = 11,
78        Nop = 12,
79        Prepare = 13,
80        Begin = 14,
81        Commit = 15,
82        Rollback = 16,
83        // ...
84        Ping = 64,
85        // ...
86        /// Error marker. This value will be combined with the error code in the
87        /// actual iproto response: `(IProtoType::Error | error_code)`.
88        Error = 1 << 15,
89    }
90}
91
92/// Encode an IPROTO request header.
93#[inline(always)]
94pub fn encode_header(
95    stream: &mut impl Write,
96    sync: SyncIndex,
97    request_type: IProtoType,
98) -> Result<(), Error> {
99    let helper = Header {
100        sync,
101        iproto_type: request_type as _,
102        // Not used when encoding a request
103        error_code: 0,
104        // Not used when encoding a request
105        schema_version: 0,
106    };
107    helper.encode(stream)
108}
109
110/// Prepares (hashes) password with salt according to CHAP-SHA1 algorithm.
111#[inline]
112pub fn chap_sha1_prepare(password: impl AsRef<[u8]>, salt: &[u8]) -> Vec<u8> {
113    // prepare 'chap-sha1' scramble:
114    // salt = base64_decode(encoded_salt);
115    // step_1 = sha1(password);
116    // step_2 = sha1(step_1);
117    // step_3 = sha1(first_20_bytes_of_salt, step_2);
118    // scramble = xor(step_1, step_3);
119
120    use sha1::{Digest as Sha1Digest, Sha1};
121
122    let mut hasher = Sha1::new();
123    hasher.update(password);
124    let mut step_1_and_scramble = hasher.finalize();
125
126    let mut hasher = Sha1::new();
127    hasher.update(step_1_and_scramble);
128    let step_2 = hasher.finalize();
129
130    let mut hasher = Sha1::new();
131    hasher.update(&salt[0..20]);
132    hasher.update(step_2);
133    let step_3 = hasher.finalize();
134
135    step_1_and_scramble
136        .iter_mut()
137        .zip(step_3.iter())
138        .for_each(|(a, b)| *a ^= *b);
139
140    let scramble_bytes = step_1_and_scramble.to_vec();
141    debug_assert_eq!(scramble_bytes.len(), 20);
142    scramble_bytes
143}
144
145/// Prepares (hashes) password with salt according to CHAP-SHA1 algorithm and encodes into MessagePack.
146// TODO(kbezuglyi): password should be `impl AsRef<[u8]>`, not `&str`.
147#[inline]
148pub fn chap_sha1_auth_data(password: &str, salt: &[u8]) -> Vec<u8> {
149    let hashed_data = chap_sha1_prepare(password, salt);
150    let hashed_len = hashed_data.len();
151
152    let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
153    rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
154    res.write_all(&hashed_data).expect("Can't fail for a Vec");
155    res
156}
157
158/// Prepares password according to LDAP.
159#[cfg(feature = "picodata")]
160#[inline]
161pub fn ldap_prepare(password: impl AsRef<[u8]>) -> Vec<u8> {
162    password.as_ref().to_vec()
163}
164
165/// Prepares password according to LDAP and encodes into MessagePack.
166/// WARNING: data is sent without any encryption, it is recommended
167/// to use SSH tunnel/SSL/else to make communication secure.
168// TODO(kbezuglyi): password should be `impl AsRef<[u8]>`, not `&str`.
169#[cfg(feature = "picodata")]
170#[inline]
171pub fn ldap_auth_data(password: &str) -> Vec<u8> {
172    let hashed_data = ldap_prepare(password);
173    let hashed_len = hashed_data.len();
174
175    let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
176    rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
177    res.write_all(&hashed_data).expect("Can't fail for a Vec");
178    res
179}
180
181/// Prepares (hashes) password with salt according to MD5.
182#[cfg(feature = "picodata")]
183#[inline]
184pub fn md5_prepare(user: &str, password: impl AsRef<[u8]>, salt: [u8; 4]) -> Vec<u8> {
185    // recv_from_db(salt)
186    // recv_from_user(name, password)
187    //   shadow_pass = md5(name + password), do not add "md5" prefix
188    //   client_pass = md5(shadow_pass + salt), only first 4 bytes of salt are used
189    //   result = encode_as_msgpack_str(client_pass)
190    // send_to_db(result)
191
192    use md5::{Digest as Md5Digest, Md5};
193
194    let mut md5 = Md5::new();
195
196    md5.update(password);
197    md5.update(user);
198    let shadow_pass = format!("{:x}", md5.finalize_reset());
199
200    md5.update(shadow_pass);
201    md5.update(salt);
202    let client_pass = format!("md5{:x}", md5.finalize());
203
204    client_pass.into_bytes()
205}
206
207/// Prepares (hashes) password with salt according to MD5 and encodes into MessagePack.
208// TODO(kbezuglyi): password should be `impl AsRef<[u8]>`, not `&str`.
209#[cfg(feature = "picodata")]
210#[inline]
211pub fn md5_auth_data(user: &str, password: &str, salt: [u8; 4]) -> Vec<u8> {
212    let hashed_data = md5_prepare(user, password, salt);
213    let hashed_len = hashed_data.len();
214
215    let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
216    rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
217    res.write_all(&hashed_data).expect("Can't fail for a Vec");
218    res
219}
220
221pub fn encode_auth(
222    stream: &mut impl Write,
223    user: &str,
224    password: &str,
225    salt: &[u8],
226    auth_method: AuthMethod,
227) -> Result<(), Error> {
228    let auth_data;
229    match auth_method {
230        AuthMethod::ChapSha1 => {
231            auth_data = chap_sha1_auth_data(password, salt);
232        }
233        #[cfg(feature = "picodata")]
234        AuthMethod::Ldap => {
235            auth_data = ldap_auth_data(password);
236        }
237        #[cfg(feature = "picodata")]
238        AuthMethod::Md5 => {
239            // We only use first four bytes of a salt. To understand why,
240            // check `MD5_SALT_LEN` from `tarantool-sys/src/lib/core/md5.h:enum`,
241            // that is used in `tarantool-sys/src/lib/core/crypt.c:md5_encrypt`.
242            let salt_part =
243                crate::util::slice_first_chunk::<4, _>(salt).expect("Can't fail for valid salt");
244            auth_data = md5_auth_data(user, password, *salt_part);
245        }
246    }
247
248    rmp::encode::write_map_len(stream, 2)?;
249
250    // username:
251    rmp::encode::write_pfix(stream, USER_NAME)?;
252    rmp::encode::write_str(stream, user)?;
253
254    // encrypted password:
255    rmp::encode::write_pfix(stream, TUPLE)?;
256    rmp::encode::write_array_len(stream, 2)?;
257    rmp::encode::write_str(stream, auth_method.as_str())?;
258    stream.write_all(&auth_data)?;
259    Ok(())
260}
261
262pub fn encode_ping(stream: &mut impl Write) -> Result<(), Error> {
263    rmp::encode::write_map_len(stream, 0)?;
264    Ok(())
265}
266
267pub fn encode_execute<P>(stream: &mut impl Write, sql: &str, bind_params: &P) -> Result<(), Error>
268where
269    P: ToTupleBuffer + ?Sized,
270{
271    rmp::encode::write_map_len(stream, 2)?;
272    rmp::encode::write_pfix(stream, SQL_TEXT)?;
273    rmp::encode::write_str(stream, sql)?;
274
275    rmp::encode::write_pfix(stream, SQL_BIND)?;
276    bind_params.write_tuple_data(stream)?;
277    Ok(())
278}
279
280pub fn encode_call<T>(stream: &mut impl Write, function_name: &str, args: &T) -> Result<(), Error>
281where
282    T: ToTupleBuffer + ?Sized,
283{
284    rmp::encode::write_map_len(stream, 2)?;
285    rmp::encode::write_pfix(stream, FUNCTION_NAME)?;
286    rmp::encode::write_str(stream, function_name)?;
287    rmp::encode::write_pfix(stream, TUPLE)?;
288    args.write_tuple_data(stream)?;
289    Ok(())
290}
291
292pub fn encode_eval<T>(stream: &mut impl Write, expression: &str, args: &T) -> Result<(), Error>
293where
294    T: ToTupleBuffer + ?Sized,
295{
296    rmp::encode::write_map_len(stream, 2)?;
297    rmp::encode::write_pfix(stream, EXPR)?;
298    rmp::encode::write_str(stream, expression)?;
299    rmp::encode::write_pfix(stream, TUPLE)?;
300    args.write_tuple_data(stream)?;
301    Ok(())
302}
303
304#[allow(clippy::too_many_arguments)]
305pub fn encode_select<K>(
306    stream: &mut impl Write,
307    space_id: u32,
308    index_id: u32,
309    limit: u32,
310    offset: u32,
311    iterator_type: IteratorType,
312    key: &K,
313) -> Result<(), Error>
314where
315    K: ToTupleBuffer + ?Sized,
316{
317    rmp::encode::write_map_len(stream, 6)?;
318    rmp::encode::write_pfix(stream, SPACE_ID)?;
319    rmp::encode::write_u32(stream, space_id)?;
320    rmp::encode::write_pfix(stream, INDEX_ID)?;
321    rmp::encode::write_u32(stream, index_id)?;
322    rmp::encode::write_pfix(stream, LIMIT)?;
323    rmp::encode::write_u32(stream, limit)?;
324    rmp::encode::write_pfix(stream, OFFSET)?;
325    rmp::encode::write_u32(stream, offset)?;
326    rmp::encode::write_pfix(stream, ITERATOR)?;
327    rmp::encode::write_u32(stream, iterator_type as u32)?;
328    rmp::encode::write_pfix(stream, KEY)?;
329    key.write_tuple_data(stream)?;
330    Ok(())
331}
332
333pub fn encode_insert<T>(stream: &mut impl Write, space_id: u32, value: &T) -> Result<(), Error>
334where
335    T: ToTupleBuffer + ?Sized,
336{
337    rmp::encode::write_map_len(stream, 2)?;
338    rmp::encode::write_pfix(stream, SPACE_ID)?;
339    rmp::encode::write_u32(stream, space_id)?;
340    rmp::encode::write_pfix(stream, TUPLE)?;
341    value.write_tuple_data(stream)?;
342    Ok(())
343}
344
345pub fn encode_replace<T>(stream: &mut impl Write, space_id: u32, value: &T) -> Result<(), Error>
346where
347    T: ToTupleBuffer + ?Sized,
348{
349    rmp::encode::write_map_len(stream, 2)?;
350    rmp::encode::write_pfix(stream, SPACE_ID)?;
351    rmp::encode::write_u32(stream, space_id)?;
352    rmp::encode::write_pfix(stream, TUPLE)?;
353    value.write_tuple_data(stream)?;
354    Ok(())
355}
356
357pub fn encode_update<K, Op>(
358    stream: &mut impl Write,
359    space_id: u32,
360    index_id: u32,
361    key: &K,
362    ops: &Op,
363) -> Result<(), Error>
364where
365    K: ToTupleBuffer + ?Sized,
366    Op: ToTupleBuffer + ?Sized,
367{
368    rmp::encode::write_map_len(stream, 4)?;
369    rmp::encode::write_pfix(stream, SPACE_ID)?;
370    rmp::encode::write_u32(stream, space_id)?;
371    rmp::encode::write_pfix(stream, INDEX_ID)?;
372    rmp::encode::write_u32(stream, index_id)?;
373    rmp::encode::write_pfix(stream, KEY)?;
374    key.write_tuple_data(stream)?;
375    rmp::encode::write_pfix(stream, TUPLE)?;
376    ops.write_tuple_data(stream)?;
377    Ok(())
378}
379
380pub fn encode_upsert<T, Op>(
381    stream: &mut impl Write,
382    space_id: u32,
383    index_id: u32,
384    value: &T,
385    ops: &Op,
386) -> Result<(), Error>
387where
388    T: ToTupleBuffer + ?Sized,
389    Op: ToTupleBuffer + ?Sized,
390{
391    rmp::encode::write_map_len(stream, 4)?;
392    rmp::encode::write_pfix(stream, SPACE_ID)?;
393    rmp::encode::write_u32(stream, space_id)?;
394    rmp::encode::write_pfix(stream, INDEX_BASE)?;
395    rmp::encode::write_u32(stream, index_id)?;
396    rmp::encode::write_pfix(stream, OPS)?;
397    ops.write_tuple_data(stream)?;
398    rmp::encode::write_pfix(stream, TUPLE)?;
399    value.write_tuple_data(stream)?;
400    Ok(())
401}
402
403pub fn encode_delete<K>(
404    stream: &mut impl Write,
405    space_id: u32,
406    index_id: u32,
407    key: &K,
408) -> Result<(), Error>
409where
410    K: ToTupleBuffer + ?Sized,
411{
412    rmp::encode::write_map_len(stream, 3)?;
413    rmp::encode::write_pfix(stream, SPACE_ID)?;
414    rmp::encode::write_u32(stream, space_id)?;
415    rmp::encode::write_pfix(stream, INDEX_ID)?;
416    rmp::encode::write_u32(stream, index_id)?;
417    rmp::encode::write_pfix(stream, KEY)?;
418    key.write_tuple_data(stream)?;
419    Ok(())
420}
421
422#[derive(Debug)]
423pub struct Header {
424    pub sync: SyncIndex,
425    /// Type of the iproto packet.
426    ///
427    /// If the packet is an error response (see [`IProtoType::Error`]) then the
428    /// error code is removed from it and assigned to [`Header::error_code`].
429    ///
430    /// This should be a value from `enum iproto_type` from tarantool sources,
431    /// but it's practically impossible to keep our `IProtoType` up to date with
432    /// the latest version of tarantool, so we just store it as a plain integer.
433    pub iproto_type: u32,
434    pub error_code: u32,
435    pub schema_version: u64,
436}
437
438impl Header {
439    /// Encode an IPROTO request header.
440    ///
441    // FIXME: bad name, this encodes a request header, hence error_code & schema_version are ignored.
442    // This code will not work if we want to implement the server side of the protocol.
443    pub fn encode(&self, stream: &mut impl Write) -> Result<(), Error> {
444        rmp::encode::write_map_len(stream, 2)?;
445        rmp::encode::write_pfix(stream, REQUEST_TYPE)?;
446        rmp::encode::write_uint(stream, self.iproto_type as _)?;
447        rmp::encode::write_pfix(stream, SYNC)?;
448        rmp::encode::write_uint(stream, self.sync.0)?;
449        Ok(())
450    }
451
452    /// This function doesn't need to exist
453    #[inline(always)]
454    pub fn encode_from_parts(
455        stream: &mut impl Write,
456        sync: SyncIndex,
457        request_type: IProtoType,
458    ) -> Result<(), Error> {
459        encode_header(stream, sync, request_type)
460    }
461
462    /// Decode an IPROTO response header.
463    ///
464    // FIXME: bad name, this decodes only response headers.
465    // This code will not work if we want to implement the server side of the protocol.
466    pub fn decode(stream: &mut (impl Read + Seek)) -> Result<Header, Error> {
467        let mut sync: Option<u64> = None;
468        let mut iproto_type: Option<u32> = None;
469        let mut error_code: u32 = 0;
470        let mut schema_version: Option<u64> = None;
471
472        let map_len = rmp::decode::read_map_len(stream)?;
473        for _ in 0..map_len {
474            let key = rmp::decode::read_pfix(stream)?;
475            match key {
476                REQUEST_TYPE => {
477                    let r#type: u32 = rmp::decode::read_int(stream)?;
478
479                    const IPROTO_TYPE_ERROR: u32 = IProtoType::Error as _;
480                    if (r#type & IPROTO_TYPE_ERROR) != 0 {
481                        iproto_type = Some(IPROTO_TYPE_ERROR);
482                        error_code = r#type & !IPROTO_TYPE_ERROR;
483                    } else {
484                        iproto_type = Some(r#type);
485                    }
486                }
487                SYNC => sync = Some(rmp::decode::read_int(stream)?),
488                SCHEMA_VERSION => schema_version = Some(rmp::decode::read_int(stream)?),
489                _ => msgpack::skip_value(stream)?,
490            }
491        }
492
493        if sync.is_none() || iproto_type.is_none() || schema_version.is_none() {
494            return Err(io::Error::from(io::ErrorKind::InvalidData).into());
495        }
496
497        Ok(Header {
498            sync: SyncIndex(sync.unwrap()),
499            iproto_type: iproto_type.unwrap(),
500            error_code,
501            schema_version: schema_version.unwrap(),
502        })
503    }
504}
505
506pub struct Response<T> {
507    pub header: Header,
508    pub payload: T,
509}
510
511/// Decode an IPROTO response header.
512#[inline(always)]
513pub fn decode_header(stream: &mut (impl Read + Seek)) -> Result<Header, Error> {
514    Header::decode(stream)
515}
516
517////////////////////////////////////////////////////////////////////////////////
518// error decoding
519////////////////////////////////////////////////////////////////////////////////
520
521/// Constant definitions for keys of the extended error info. Currently there's
522/// only one possible key - error stack, and the value associated with it is an
523/// array of error info maps. These error info maps have fields from the
524/// [`error_field`] module defined below.
525///
526/// See enum MP_ERROR_* \<tarantool>/src/box/mp_error.cc
527mod extended_error_keys {
528    /// Stack of error infos.
529    pub const STACK: u8 = 0;
530}
531
532/// Constant definitions for extended error info fields.
533///
534/// See enum MP_ERROR_* \<tarantool>/src/box/mp_error.cc
535mod error_field {
536    /// Error type.
537    pub const TYPE: u8 = 0x00;
538
539    /// File name from trace.
540    pub const FILE: u8 = 0x01;
541
542    /// Line from trace.
543    pub const LINE: u8 = 0x02;
544
545    /// Error message.
546    pub const MESSAGE: u8 = 0x03;
547
548    /// Errno at the moment of error creation.
549    pub const ERRNO: u8 = 0x04;
550
551    /// Error code.
552    pub const CODE: u8 = 0x05;
553
554    /// Type-specific fields stored as a map
555    /// {string key = value}.
556    pub const FIELDS: u8 = 0x06;
557}
558
559/// Reads a IPROTO packet from the `stream` (i.e. a msgpack map with integer keys)
560pub fn decode_error(stream: &mut impl Read, header: &Header) -> Result<TarantoolError, Error> {
561    let mut error = TarantoolError::default();
562
563    let map_len = rmp::decode::read_map_len(stream)?;
564    for _ in 0..map_len {
565        let key = rmp::decode::read_pfix(stream)?;
566        match key {
567            ERROR => {
568                let message = decode_string(stream)?;
569                error.message = Some(message.into());
570                error.code = header.error_code;
571            }
572            ERROR_EXT => {
573                if let Some(e) = decode_extended_error(stream)? {
574                    error = e;
575                } else {
576                    crate::say_verbose!("empty ERROR_EXT field");
577                }
578            }
579            _ => {
580                crate::say_verbose!("unhandled iproto key {key} when decoding error");
581            }
582        }
583    }
584
585    if error.message.is_none() {
586        return Err(ProtocolError::ResponseFieldNotFound {
587            key: "ERROR",
588            context: "required for error responses",
589        }
590        .into());
591    }
592
593    Ok(error)
594}
595
596pub fn decode_extended_error(stream: &mut impl Read) -> Result<Option<TarantoolError>, Error> {
597    let extended_error_n_fields = rmp::decode::read_map_len(stream)? as usize;
598    if extended_error_n_fields == 0 {
599        return Ok(None);
600    }
601
602    let mut error_info = None;
603
604    for _ in 0..extended_error_n_fields {
605        let key = rmp::decode::read_pfix(stream)?;
606        match key {
607            extended_error_keys::STACK => {
608                if error_info.is_some() {
609                    crate::say_verbose!("duplicate error stack in response");
610                }
611
612                let error_stack_len = rmp::decode::read_array_len(stream)? as usize;
613                if error_stack_len == 0 {
614                    continue;
615                }
616
617                let mut stack_nodes = Vec::with_capacity(error_stack_len);
618                for _ in 0..error_stack_len {
619                    stack_nodes.push(decode_error_stack_node(stream)?);
620                }
621
622                for mut node in stack_nodes.into_iter().rev() {
623                    if let Some(next_node) = error_info {
624                        node.cause = Some(Box::new(next_node));
625                    }
626                    error_info = Some(node);
627                }
628            }
629            _ => {
630                crate::say_verbose!("unknown extended error key {key}");
631            }
632        }
633    }
634
635    Ok(error_info)
636}
637
638pub fn decode_error_stack_node(mut stream: &mut impl Read) -> Result<TarantoolError, Error> {
639    let mut res = TarantoolError::default();
640
641    let map_len = rmp::decode::read_map_len(stream)? as usize;
642    for _ in 0..map_len {
643        let key = rmp::decode::read_pfix(stream)?;
644        match key {
645            error_field::TYPE => {
646                res.error_type = Some(decode_string(stream)?.into_boxed_str());
647            }
648            error_field::FILE => {
649                res.file = Some(decode_string(stream)?.into_boxed_str());
650            }
651            error_field::LINE => {
652                res.line = Some(rmp::decode::read_int(stream)?);
653            }
654            error_field::MESSAGE => {
655                res.message = Some(decode_string(stream)?.into_boxed_str());
656            }
657            error_field::ERRNO => {
658                let n = rmp::decode::read_int(stream)?;
659                if n != 0 {
660                    res.errno = Some(n);
661                }
662            }
663            error_field::CODE => {
664                res.code = rmp::decode::read_int(stream)?;
665            }
666            error_field::FIELDS => match rmp_serde::from_read(&mut stream) {
667                Ok(f) => {
668                    res.fields = f;
669                }
670                Err(e) => {
671                    crate::say_verbose!("failed decoding error fields: {e}");
672                }
673            },
674            _ => {
675                crate::say_verbose!("unexpected error field {key}");
676            }
677        }
678    }
679
680    Ok(res)
681}
682
683////////////////////////////////////////////////////////////////////////////////
684// ...
685////////////////////////////////////////////////////////////////////////////////
686
687pub fn decode_string(stream: &mut impl Read) -> Result<String, Error> {
688    let len = rmp::decode::read_str_len(stream)? as usize;
689    let mut str_buf = vec![0u8; len];
690    stream.read_exact(&mut str_buf)?;
691    let res = String::from_utf8(str_buf)?;
692    Ok(res)
693}
694
695pub fn decode_greeting(stream: &mut impl Read) -> Result<Vec<u8>, Error> {
696    let mut buf = [0; 128];
697    stream.read_exact(&mut buf)?;
698    let salt = base64::decode(&buf[64..108]).unwrap();
699    Ok(salt)
700}
701
702pub fn decode_call(buffer: &mut Cursor<Vec<u8>>) -> Result<Tuple, Error> {
703    let payload_len = rmp::decode::read_map_len(buffer)?;
704    for _ in 0..payload_len {
705        let key = rmp::decode::read_pfix(buffer)?;
706        match key {
707            DATA => {
708                return decode_tuple(buffer);
709            }
710            _ => {
711                msgpack::skip_value(buffer)?;
712            }
713        };
714    }
715    Err(ProtocolError::ResponseFieldNotFound {
716        key: "DATA",
717        context: "required for CALL/EVAL responses",
718    }
719    .into())
720}
721
722pub fn decode_multiple_rows(buffer: &mut Cursor<Vec<u8>>) -> Result<Vec<Tuple>, Error> {
723    let payload_len = rmp::decode::read_map_len(buffer)?;
724    for _ in 0..payload_len {
725        let key = rmp::decode::read_pfix(buffer)?;
726        match key {
727            DATA => {
728                let items_count = rmp::decode::read_array_len(buffer)? as usize;
729                let mut result = Vec::with_capacity(items_count);
730                for _ in 0..items_count {
731                    result.push(decode_tuple(buffer)?);
732                }
733                return Ok(result);
734            }
735            _ => {
736                msgpack::skip_value(buffer)?;
737            }
738        };
739    }
740    Ok(vec![])
741}
742
743pub fn decode_single_row(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Tuple>, Error> {
744    let payload_len = rmp::decode::read_map_len(buffer)?;
745    for _ in 0..payload_len {
746        let key = rmp::decode::read_pfix(buffer)?;
747        match key {
748            DATA => {
749                let items_count = rmp::decode::read_array_len(buffer)? as usize;
750                return Ok(if items_count == 0 {
751                    None
752                } else {
753                    Some(decode_tuple(buffer)?)
754                });
755            }
756            _ => {
757                msgpack::skip_value(buffer)?;
758            }
759        }
760    }
761    Ok(None)
762}
763
764pub fn decode_tuple(buffer: &mut Cursor<Vec<u8>>) -> Result<Tuple, Error> {
765    let payload_offset = buffer.position();
766    msgpack::skip_value(buffer)?;
767    let payload_len = buffer.position() - payload_offset;
768    let buf = buffer.get_mut();
769    unsafe {
770        Ok(Tuple::from_raw_data(
771            buf.as_slice().as_ptr().add(payload_offset as usize) as *mut c_char,
772            payload_len as u32,
773        ))
774    }
775}
776
777pub fn value_slice(cursor: &mut Cursor<impl AsRef<[u8]>>) -> crate::Result<&[u8]> {
778    let start = cursor.position() as usize;
779    msgpack::skip_value(cursor)?;
780    Ok(&cursor.get_ref().as_ref()[start..(cursor.position() as usize)])
781}