tarantool/network/protocol/
codec.rs

1use std::io::{self, Cursor, Read, Seek, Write};
2use std::os::raw::c_char;
3
4use crate::auth::AuthMethod;
5use crate::error::Error;
6use crate::error::TarantoolError;
7use crate::index::IteratorType;
8use crate::msgpack;
9use crate::network::protocol::ProtocolError;
10use crate::tuple::{ToTupleBuffer, Tuple};
11
12use super::SyncIndex;
13
14const MP_STR_MAX_HEADER_SIZE: usize = 5;
15
16/// Keys of the HEADER and BODY maps in the iproto packets.
17///
18/// See `enum iproto_key` in \<tarantool>/src/box/iproto_constants.h for source
19/// of truth.
20pub mod iproto_key {
21    pub const REQUEST_TYPE: u8 = 0x00;
22    pub const SYNC: u8 = 0x01;
23    // ...
24    pub const SCHEMA_VERSION: u8 = 0x05;
25    // ...
26    pub const SPACE_ID: u8 = 0x10;
27    pub const INDEX_ID: u8 = 0x11;
28    pub const LIMIT: u8 = 0x12;
29    pub const OFFSET: u8 = 0x13;
30    pub const ITERATOR: u8 = 0x14;
31    pub const INDEX_BASE: u8 = 0x15;
32    // ...
33    pub const KEY: u8 = 0x20;
34    pub const TUPLE: u8 = 0x21;
35    pub const FUNCTION_NAME: u8 = 0x22;
36    pub const USER_NAME: u8 = 0x23;
37    // ...
38    pub const EXPR: u8 = 0x27;
39    pub const OPS: u8 = 0x28;
40    // ...
41    pub const DATA: u8 = 0x30;
42    pub const ERROR: u8 = 0x31;
43    // ...
44    pub const SQL_TEXT: u8 = 0x40;
45    pub const SQL_BIND: u8 = 0x41;
46    pub const SQL_INFO: u8 = 0x42;
47    // ...
48    pub const STMT_ID: u8 = 0x43;
49    // ...
50    pub const ERROR_EXT: u8 = 0x52;
51    // ...
52    pub const CLUSTER_UUID: u8 = 0x5c;
53}
54use iproto_key::*;
55
56crate::define_enum_with_introspection! {
57    /// Iproto packet type.
58    ///
59    /// See `enum iproto_type` in \<tarantool>/src/box/iproto_constants.h for source
60    /// of truth.
61    #[non_exhaustive]
62    #[repr(C)]
63    pub enum IProtoType {
64        /// This packet is a response with status success.
65        Ok = 0,
66        Select = 1,
67        Insert = 2,
68        Replace = 3,
69        Update = 4,
70        Delete = 5,
71        /// Deprecated in Tarantool 1.6 with name `IPROTO_CALL_16`.
72        /// Superseeded by `IPROTO_CALL`, see [`IProtoType::Call`].
73        LegacyCall = 6,
74        Auth = 7,
75        Eval = 8,
76        Upsert = 9,
77        Call = 10,
78        Execute = 11,
79        Nop = 12,
80        Prepare = 13,
81        Begin = 14,
82        Commit = 15,
83        Rollback = 16,
84        // ...
85        Id = 73,
86        Ping = 64,
87        // ...
88        /// Error marker. This value will be combined with the error code in the
89        /// actual iproto response: `(IProtoType::Error | error_code)`.
90        Error = 1 << 15,
91    }
92}
93
94/// Encode an IPROTO request header.
95#[inline(always)]
96pub fn encode_header(
97    stream: &mut impl Write,
98    sync: SyncIndex,
99    request_type: IProtoType,
100) -> Result<(), Error> {
101    let helper = Header {
102        sync,
103        iproto_type: request_type as _,
104        // Not used when encoding a request
105        error_code: 0,
106        // Not used when encoding a request
107        schema_version: 0,
108    };
109    helper.encode(stream)
110}
111
112/// Prepares (hashes) password with salt according to CHAP-SHA1 algorithm.
113#[inline]
114pub fn chap_sha1_prepare(password: impl AsRef<[u8]>, salt: &[u8; 20]) -> Vec<u8> {
115    // prepare 'chap-sha1' scramble:
116    // salt = base64_decode(encoded_salt);
117    // step_1 = sha1(password);
118    // step_2 = sha1(step_1);
119    // step_3 = sha1(first_20_bytes_of_salt, step_2);
120    // scramble = xor(step_1, step_3);
121
122    use sha1::{Digest as Sha1Digest, Sha1};
123
124    let mut hasher = Sha1::new();
125    hasher.update(password);
126    let mut step_1_and_scramble = hasher.finalize();
127
128    let mut hasher = Sha1::new();
129    hasher.update(step_1_and_scramble);
130    let step_2 = hasher.finalize();
131
132    let mut hasher = Sha1::new();
133    hasher.update(salt);
134    hasher.update(step_2);
135    let step_3 = hasher.finalize();
136
137    step_1_and_scramble
138        .iter_mut()
139        .zip(step_3.iter())
140        .for_each(|(a, b)| *a ^= *b);
141
142    let scramble_bytes = step_1_and_scramble.to_vec();
143    debug_assert_eq!(scramble_bytes.len(), 20);
144    scramble_bytes
145}
146
147/// Prepares (hashes) password with salt according to CHAP-SHA1 algorithm and encodes into MessagePack.
148// TODO(kbezuglyi): password should be `impl AsRef<[u8]>`, not `&str`.
149#[inline]
150pub fn chap_sha1_auth_data(password: &str, salt: &[u8; 20]) -> Vec<u8> {
151    let hashed_data = chap_sha1_prepare(password, salt);
152    let hashed_len = hashed_data.len();
153
154    let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
155    rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
156    res.write_all(&hashed_data).expect("Can't fail for a Vec");
157    res
158}
159
160/// Prepares password according to LDAP.
161#[cfg(feature = "picodata")]
162#[inline]
163pub fn ldap_prepare(password: impl AsRef<[u8]>) -> Vec<u8> {
164    password.as_ref().to_vec()
165}
166
167/// Prepares password according to LDAP and encodes into MessagePack.
168/// WARNING: data is sent without any encryption, it is recommended
169/// to use SSH tunnel/SSL/else to make communication secure.
170// TODO(kbezuglyi): password should be `impl AsRef<[u8]>`, not `&str`.
171#[cfg(feature = "picodata")]
172#[inline]
173pub fn ldap_auth_data(password: &str) -> Vec<u8> {
174    let hashed_data = ldap_prepare(password);
175    let hashed_len = hashed_data.len();
176
177    let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
178    rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
179    res.write_all(&hashed_data).expect("Can't fail for a Vec");
180    res
181}
182
183/// Prepares (hashes) password with salt according to MD5.
184#[cfg(feature = "picodata")]
185#[inline]
186pub fn md5_prepare(user: &str, password: impl AsRef<[u8]>, salt: &[u8; 4]) -> Vec<u8> {
187    // recv_from_db(salt)
188    // recv_from_user(name, password)
189    //   shadow_pass = md5(name + password), do not add "md5" prefix
190    //   client_pass = md5(shadow_pass + salt)
191    //   result = encode_as_msgpack_str(client_pass)
192    // send_to_db(result)
193
194    use md5::{Digest as Md5Digest, Md5};
195
196    let mut md5 = Md5::new();
197
198    md5.update(password);
199    md5.update(user);
200    let shadow_pass = format!("{:x}", md5.finalize_reset());
201
202    md5.update(shadow_pass);
203    md5.update(salt);
204    let client_pass = format!("md5{:x}", md5.finalize());
205
206    client_pass.into_bytes()
207}
208
209/// Prepares (hashes) password with salt according to MD5 and encodes into MessagePack.
210// TODO(kbezuglyi): password should be `impl AsRef<[u8]>`, not `&str`.
211#[cfg(feature = "picodata")]
212#[inline]
213pub fn md5_auth_data(user: &str, password: &str, salt: &[u8; 4]) -> Vec<u8> {
214    let hashed_data = md5_prepare(user, password, salt);
215    let hashed_len = hashed_data.len();
216
217    let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
218    rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
219    res.write_all(&hashed_data).expect("Can't fail for a Vec");
220    res
221}
222
223pub fn encode_auth(
224    stream: &mut impl Write,
225    user: &str,
226    password: &str,
227    salt: &[u8],
228    method: AuthMethod,
229) -> Result<(), Error> {
230    let auth_data;
231    match method {
232        AuthMethod::ChapSha1 => {
233            let salt = salt
234                .first_chunk()
235                .ok_or_else(|| std::io::Error::other("bad salt length (expect 20)"))?;
236            auth_data = chap_sha1_auth_data(password, salt);
237        }
238        #[cfg(feature = "picodata")]
239        AuthMethod::Ldap => {
240            auth_data = ldap_auth_data(password);
241        }
242        #[cfg(feature = "picodata")]
243        AuthMethod::Md5 => {
244            // We only use first four bytes of a salt. To understand why,
245            // check `MD5_SALT_LEN` from `tarantool-sys/src/lib/core/md5.h:enum`,
246            // that is used in `tarantool-sys/src/lib/core/crypt.c:md5_encrypt`.
247            let salt = salt
248                .first_chunk()
249                .ok_or_else(|| std::io::Error::other("bad salt length (expect >= 4)"))?;
250            auth_data = md5_auth_data(user, password, salt);
251        }
252        #[cfg(feature = "picodata")]
253        AuthMethod::ScramSha256 => {
254            use crate::error::{BoxError, TarantoolErrorCode};
255
256            return Err(BoxError::new(
257                TarantoolErrorCode::UnknownAuthMethod,
258                "scram-sha256 over iproto is not supported",
259            )
260            .into());
261        }
262    }
263
264    rmp::encode::write_map_len(stream, 2)?;
265
266    // username:
267    rmp::encode::write_pfix(stream, USER_NAME)?;
268    rmp::encode::write_str(stream, user)?;
269
270    // encrypted password:
271    rmp::encode::write_pfix(stream, TUPLE)?;
272    rmp::encode::write_array_len(stream, 2)?;
273    rmp::encode::write_str(stream, method.as_str())?;
274    stream.write_all(&auth_data)?;
275    Ok(())
276}
277
278pub fn encode_ping(stream: &mut impl Write) -> Result<(), Error> {
279    rmp::encode::write_map_len(stream, 0)?;
280    Ok(())
281}
282
283pub fn encode_id(stream: &mut impl Write, cluster_uuid: Option<&str>) -> Result<(), Error> {
284    use iproto_key::CLUSTER_UUID;
285
286    if let Some(uuid) = cluster_uuid {
287        rmp::encode::write_map_len(stream, 1)?;
288        rmp::encode::write_pfix(stream, CLUSTER_UUID)?;
289        rmp::encode::write_str(stream, uuid)?;
290    } else {
291        rmp::encode::write_map_len(stream, 0)?;
292    }
293    Ok(())
294}
295
296pub fn encode_execute<P>(stream: &mut impl Write, sql: &str, bind_params: &P) -> Result<(), Error>
297where
298    P: ToTupleBuffer + ?Sized,
299{
300    rmp::encode::write_map_len(stream, 2)?;
301    rmp::encode::write_pfix(stream, SQL_TEXT)?;
302    rmp::encode::write_str(stream, sql)?;
303
304    rmp::encode::write_pfix(stream, SQL_BIND)?;
305    bind_params.write_tuple_data(stream)?;
306    Ok(())
307}
308
309pub fn encode_call<T>(stream: &mut impl Write, function_name: &str, args: &T) -> Result<(), Error>
310where
311    T: ToTupleBuffer + ?Sized,
312{
313    rmp::encode::write_map_len(stream, 2)?;
314    rmp::encode::write_pfix(stream, FUNCTION_NAME)?;
315    rmp::encode::write_str(stream, function_name)?;
316    rmp::encode::write_pfix(stream, TUPLE)?;
317    args.write_tuple_data(stream)?;
318    Ok(())
319}
320
321pub fn encode_eval<T>(stream: &mut impl Write, expression: &str, args: &T) -> Result<(), Error>
322where
323    T: ToTupleBuffer + ?Sized,
324{
325    rmp::encode::write_map_len(stream, 2)?;
326    rmp::encode::write_pfix(stream, EXPR)?;
327    rmp::encode::write_str(stream, expression)?;
328    rmp::encode::write_pfix(stream, TUPLE)?;
329    args.write_tuple_data(stream)?;
330    Ok(())
331}
332
333#[allow(clippy::too_many_arguments)]
334pub fn encode_select<K>(
335    stream: &mut impl Write,
336    space_id: u32,
337    index_id: u32,
338    limit: u32,
339    offset: u32,
340    iterator_type: IteratorType,
341    key: &K,
342) -> Result<(), Error>
343where
344    K: ToTupleBuffer + ?Sized,
345{
346    rmp::encode::write_map_len(stream, 6)?;
347    rmp::encode::write_pfix(stream, SPACE_ID)?;
348    rmp::encode::write_u32(stream, space_id)?;
349    rmp::encode::write_pfix(stream, INDEX_ID)?;
350    rmp::encode::write_u32(stream, index_id)?;
351    rmp::encode::write_pfix(stream, LIMIT)?;
352    rmp::encode::write_u32(stream, limit)?;
353    rmp::encode::write_pfix(stream, OFFSET)?;
354    rmp::encode::write_u32(stream, offset)?;
355    rmp::encode::write_pfix(stream, ITERATOR)?;
356    rmp::encode::write_u32(stream, iterator_type as u32)?;
357    rmp::encode::write_pfix(stream, KEY)?;
358    key.write_tuple_data(stream)?;
359    Ok(())
360}
361
362pub fn encode_insert<T>(stream: &mut impl Write, space_id: u32, value: &T) -> Result<(), Error>
363where
364    T: ToTupleBuffer + ?Sized,
365{
366    rmp::encode::write_map_len(stream, 2)?;
367    rmp::encode::write_pfix(stream, SPACE_ID)?;
368    rmp::encode::write_u32(stream, space_id)?;
369    rmp::encode::write_pfix(stream, TUPLE)?;
370    value.write_tuple_data(stream)?;
371    Ok(())
372}
373
374pub fn encode_replace<T>(stream: &mut impl Write, space_id: u32, value: &T) -> Result<(), Error>
375where
376    T: ToTupleBuffer + ?Sized,
377{
378    rmp::encode::write_map_len(stream, 2)?;
379    rmp::encode::write_pfix(stream, SPACE_ID)?;
380    rmp::encode::write_u32(stream, space_id)?;
381    rmp::encode::write_pfix(stream, TUPLE)?;
382    value.write_tuple_data(stream)?;
383    Ok(())
384}
385
386pub fn encode_update<K, Op>(
387    stream: &mut impl Write,
388    space_id: u32,
389    index_id: u32,
390    key: &K,
391    ops: &Op,
392) -> Result<(), Error>
393where
394    K: ToTupleBuffer + ?Sized,
395    Op: ToTupleBuffer + ?Sized,
396{
397    rmp::encode::write_map_len(stream, 4)?;
398    rmp::encode::write_pfix(stream, SPACE_ID)?;
399    rmp::encode::write_u32(stream, space_id)?;
400    rmp::encode::write_pfix(stream, INDEX_ID)?;
401    rmp::encode::write_u32(stream, index_id)?;
402    rmp::encode::write_pfix(stream, KEY)?;
403    key.write_tuple_data(stream)?;
404    rmp::encode::write_pfix(stream, TUPLE)?;
405    ops.write_tuple_data(stream)?;
406    Ok(())
407}
408
409pub fn encode_upsert<T, Op>(
410    stream: &mut impl Write,
411    space_id: u32,
412    index_id: u32,
413    value: &T,
414    ops: &Op,
415) -> Result<(), Error>
416where
417    T: ToTupleBuffer + ?Sized,
418    Op: ToTupleBuffer + ?Sized,
419{
420    rmp::encode::write_map_len(stream, 4)?;
421    rmp::encode::write_pfix(stream, SPACE_ID)?;
422    rmp::encode::write_u32(stream, space_id)?;
423    rmp::encode::write_pfix(stream, INDEX_BASE)?;
424    rmp::encode::write_u32(stream, index_id)?;
425    rmp::encode::write_pfix(stream, OPS)?;
426    ops.write_tuple_data(stream)?;
427    rmp::encode::write_pfix(stream, TUPLE)?;
428    value.write_tuple_data(stream)?;
429    Ok(())
430}
431
432pub fn encode_delete<K>(
433    stream: &mut impl Write,
434    space_id: u32,
435    index_id: u32,
436    key: &K,
437) -> Result<(), Error>
438where
439    K: ToTupleBuffer + ?Sized,
440{
441    rmp::encode::write_map_len(stream, 3)?;
442    rmp::encode::write_pfix(stream, SPACE_ID)?;
443    rmp::encode::write_u32(stream, space_id)?;
444    rmp::encode::write_pfix(stream, INDEX_ID)?;
445    rmp::encode::write_u32(stream, index_id)?;
446    rmp::encode::write_pfix(stream, KEY)?;
447    key.write_tuple_data(stream)?;
448    Ok(())
449}
450
451#[derive(Debug)]
452pub struct Header {
453    pub sync: SyncIndex,
454    /// Type of the iproto packet.
455    ///
456    /// If the packet is an error response (see [`IProtoType::Error`]) then the
457    /// error code is removed from it and assigned to [`Header::error_code`].
458    ///
459    /// This should be a value from `enum iproto_type` from tarantool sources,
460    /// but it's practically impossible to keep our `IProtoType` up to date with
461    /// the latest version of tarantool, so we just store it as a plain integer.
462    pub iproto_type: u32,
463    pub error_code: u32,
464    pub schema_version: u64,
465}
466
467impl Header {
468    /// Encode an IPROTO request header.
469    ///
470    // FIXME: bad name, this encodes a request header, hence error_code & schema_version are ignored.
471    // This code will not work if we want to implement the server side of the protocol.
472    pub fn encode(&self, stream: &mut impl Write) -> Result<(), Error> {
473        rmp::encode::write_map_len(stream, 2)?;
474        rmp::encode::write_pfix(stream, REQUEST_TYPE)?;
475        rmp::encode::write_uint(stream, self.iproto_type as _)?;
476        rmp::encode::write_pfix(stream, SYNC)?;
477        rmp::encode::write_uint(stream, self.sync.0)?;
478        Ok(())
479    }
480
481    /// This function doesn't need to exist
482    #[inline(always)]
483    pub fn encode_from_parts(
484        stream: &mut impl Write,
485        sync: SyncIndex,
486        request_type: IProtoType,
487    ) -> Result<(), Error> {
488        encode_header(stream, sync, request_type)
489    }
490
491    /// Decode an IPROTO response header.
492    ///
493    // FIXME: bad name, this decodes only response headers.
494    // This code will not work if we want to implement the server side of the protocol.
495    pub fn decode(stream: &mut (impl Read + Seek)) -> Result<Header, Error> {
496        let mut sync: Option<u64> = None;
497        let mut iproto_type: Option<u32> = None;
498        let mut error_code: u32 = 0;
499        let mut schema_version: Option<u64> = None;
500
501        let map_len = rmp::decode::read_map_len(stream)?;
502        for _ in 0..map_len {
503            let key = rmp::decode::read_pfix(stream)?;
504            match key {
505                REQUEST_TYPE => {
506                    let r#type: u32 = rmp::decode::read_int(stream)?;
507
508                    const IPROTO_TYPE_ERROR: u32 = IProtoType::Error as _;
509                    if (r#type & IPROTO_TYPE_ERROR) != 0 {
510                        iproto_type = Some(IPROTO_TYPE_ERROR);
511                        error_code = r#type & !IPROTO_TYPE_ERROR;
512                    } else {
513                        iproto_type = Some(r#type);
514                    }
515                }
516                SYNC => sync = Some(rmp::decode::read_int(stream)?),
517                SCHEMA_VERSION => schema_version = Some(rmp::decode::read_int(stream)?),
518                _ => msgpack::skip_value(stream)?,
519            }
520        }
521
522        if sync.is_none() || iproto_type.is_none() || schema_version.is_none() {
523            return Err(io::Error::from(io::ErrorKind::InvalidData).into());
524        }
525
526        Ok(Header {
527            sync: SyncIndex(sync.unwrap()),
528            iproto_type: iproto_type.unwrap(),
529            error_code,
530            schema_version: schema_version.unwrap(),
531        })
532    }
533}
534
535pub struct Response<T> {
536    pub header: Header,
537    pub payload: T,
538}
539
540/// Decode an IPROTO response header.
541#[inline(always)]
542pub fn decode_header(stream: &mut (impl Read + Seek)) -> Result<Header, Error> {
543    Header::decode(stream)
544}
545
546////////////////////////////////////////////////////////////////////////////////
547// error decoding
548////////////////////////////////////////////////////////////////////////////////
549
550/// Constant definitions for keys of the extended error info. Currently there's
551/// only one possible key - error stack, and the value associated with it is an
552/// array of error info maps. These error info maps have fields from the
553/// [`error_field`] module defined below.
554///
555/// See enum MP_ERROR_* \<tarantool>/src/box/mp_error.cc
556mod extended_error_keys {
557    /// Stack of error infos.
558    pub const STACK: u8 = 0;
559}
560
561/// Constant definitions for extended error info fields.
562///
563/// See enum MP_ERROR_* \<tarantool>/src/box/mp_error.cc
564mod error_field {
565    /// Error type.
566    pub const TYPE: u8 = 0x00;
567
568    /// File name from trace.
569    pub const FILE: u8 = 0x01;
570
571    /// Line from trace.
572    pub const LINE: u8 = 0x02;
573
574    /// Error message.
575    pub const MESSAGE: u8 = 0x03;
576
577    /// Errno at the moment of error creation.
578    pub const ERRNO: u8 = 0x04;
579
580    /// Error code.
581    pub const CODE: u8 = 0x05;
582
583    /// Type-specific fields stored as a map
584    /// {string key = value}.
585    pub const FIELDS: u8 = 0x06;
586}
587
588/// Reads a IPROTO packet from the `stream` (i.e. a msgpack map with integer keys)
589pub fn decode_error(stream: &mut impl Read, header: &Header) -> Result<TarantoolError, Error> {
590    let mut error = TarantoolError::default();
591
592    let map_len = rmp::decode::read_map_len(stream)?;
593    for _ in 0..map_len {
594        let key = rmp::decode::read_pfix(stream)?;
595        match key {
596            ERROR => {
597                let message = decode_string(stream)?;
598                error.message = Some(message.into());
599                error.code = header.error_code;
600            }
601            ERROR_EXT => {
602                if let Some(e) = decode_extended_error(stream)? {
603                    error = e;
604                } else {
605                    crate::say_verbose!("empty ERROR_EXT field");
606                }
607            }
608            _ => {
609                crate::say_verbose!("unhandled iproto key {key} when decoding error");
610            }
611        }
612    }
613
614    if error.message.is_none() {
615        return Err(ProtocolError::ResponseFieldNotFound {
616            key: "ERROR",
617            context: "required for error responses",
618        }
619        .into());
620    }
621
622    Ok(error)
623}
624
625pub fn decode_extended_error(stream: &mut impl Read) -> Result<Option<TarantoolError>, Error> {
626    let extended_error_n_fields = rmp::decode::read_map_len(stream)? as usize;
627    if extended_error_n_fields == 0 {
628        return Ok(None);
629    }
630
631    let mut error_info = None;
632
633    for _ in 0..extended_error_n_fields {
634        let key = rmp::decode::read_pfix(stream)?;
635        match key {
636            extended_error_keys::STACK => {
637                if error_info.is_some() {
638                    crate::say_verbose!("duplicate error stack in response");
639                }
640
641                let error_stack_len = rmp::decode::read_array_len(stream)? as usize;
642                if error_stack_len == 0 {
643                    continue;
644                }
645
646                let mut stack_nodes = Vec::with_capacity(error_stack_len);
647                for _ in 0..error_stack_len {
648                    stack_nodes.push(decode_error_stack_node(stream)?);
649                }
650
651                for mut node in stack_nodes.into_iter().rev() {
652                    if let Some(next_node) = error_info {
653                        node.cause = Some(Box::new(next_node));
654                    }
655                    error_info = Some(node);
656                }
657            }
658            _ => {
659                crate::say_verbose!("unknown extended error key {key}");
660            }
661        }
662    }
663
664    Ok(error_info)
665}
666
667pub fn decode_error_stack_node(mut stream: &mut impl Read) -> Result<TarantoolError, Error> {
668    let mut res = TarantoolError::default();
669
670    let map_len = rmp::decode::read_map_len(stream)? as usize;
671    for _ in 0..map_len {
672        let key = rmp::decode::read_pfix(stream)?;
673        match key {
674            error_field::TYPE => {
675                res.error_type = Some(decode_string(stream)?.into_boxed_str());
676            }
677            error_field::FILE => {
678                res.file = Some(decode_string(stream)?.into_boxed_str());
679            }
680            error_field::LINE => {
681                res.line = Some(rmp::decode::read_int(stream)?);
682            }
683            error_field::MESSAGE => {
684                res.message = Some(decode_string(stream)?.into_boxed_str());
685            }
686            error_field::ERRNO => {
687                let n = rmp::decode::read_int(stream)?;
688                if n != 0 {
689                    res.errno = Some(n);
690                }
691            }
692            error_field::CODE => {
693                res.code = rmp::decode::read_int(stream)?;
694            }
695            error_field::FIELDS => match rmp_serde::from_read(&mut stream) {
696                Ok(f) => {
697                    res.fields = f;
698                }
699                Err(e) => {
700                    crate::say_verbose!("failed decoding error fields: {e}");
701                }
702            },
703            _ => {
704                crate::say_verbose!("unexpected error field {key}");
705            }
706        }
707    }
708
709    Ok(res)
710}
711
712////////////////////////////////////////////////////////////////////////////////
713// ...
714////////////////////////////////////////////////////////////////////////////////
715
716pub fn decode_string(stream: &mut impl Read) -> Result<String, Error> {
717    let len = rmp::decode::read_str_len(stream)? as usize;
718    let mut str_buf = vec![0u8; len];
719    stream.read_exact(&mut str_buf)?;
720    let res = String::from_utf8(str_buf)?;
721    Ok(res)
722}
723
724pub fn decode_greeting(stream: &mut impl Read) -> Result<Vec<u8>, Error> {
725    let mut buf = [0; 128];
726    stream.read_exact(&mut buf)?;
727    let salt = base64::decode(&buf[64..108]).unwrap();
728    Ok(salt)
729}
730
731pub fn decode_call(buffer: &mut Cursor<Vec<u8>>) -> Result<Tuple, Error> {
732    let payload_len = rmp::decode::read_map_len(buffer)?;
733    for _ in 0..payload_len {
734        let key = rmp::decode::read_pfix(buffer)?;
735        match key {
736            DATA => {
737                return decode_tuple(buffer);
738            }
739            _ => {
740                msgpack::skip_value(buffer)?;
741            }
742        };
743    }
744    Err(ProtocolError::ResponseFieldNotFound {
745        key: "DATA",
746        context: "required for CALL/EVAL responses",
747    }
748    .into())
749}
750
751pub fn decode_multiple_rows(buffer: &mut Cursor<Vec<u8>>) -> Result<Vec<Tuple>, Error> {
752    let payload_len = rmp::decode::read_map_len(buffer)?;
753    for _ in 0..payload_len {
754        let key = rmp::decode::read_pfix(buffer)?;
755        match key {
756            DATA => {
757                let items_count = rmp::decode::read_array_len(buffer)? as usize;
758                let mut result = Vec::with_capacity(items_count);
759                for _ in 0..items_count {
760                    result.push(decode_tuple(buffer)?);
761                }
762                return Ok(result);
763            }
764            _ => {
765                msgpack::skip_value(buffer)?;
766            }
767        };
768    }
769    Ok(vec![])
770}
771
772pub fn decode_single_row(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Tuple>, Error> {
773    let payload_len = rmp::decode::read_map_len(buffer)?;
774    for _ in 0..payload_len {
775        let key = rmp::decode::read_pfix(buffer)?;
776        match key {
777            DATA => {
778                let items_count = rmp::decode::read_array_len(buffer)? as usize;
779                return Ok(if items_count == 0 {
780                    None
781                } else {
782                    Some(decode_tuple(buffer)?)
783                });
784            }
785            _ => {
786                msgpack::skip_value(buffer)?;
787            }
788        }
789    }
790    Ok(None)
791}
792
793pub fn decode_tuple(buffer: &mut Cursor<Vec<u8>>) -> Result<Tuple, Error> {
794    let payload_offset = buffer.position();
795    msgpack::skip_value(buffer)?;
796    let payload_len = buffer.position() - payload_offset;
797    let buf = buffer.get_mut();
798    unsafe {
799        Ok(Tuple::from_raw_data(
800            buf.as_slice().as_ptr().add(payload_offset as usize) as *mut c_char,
801            payload_len as u32,
802        ))
803    }
804}
805
806pub fn value_slice(cursor: &mut Cursor<impl AsRef<[u8]>>) -> crate::Result<&[u8]> {
807    let start = cursor.position() as usize;
808    msgpack::skip_value(cursor)?;
809    Ok(&cursor.get_ref().as_ref()[start..(cursor.position() as usize)])
810}