1use std::io::{self, Cursor, Read, Seek, Write};
2use std::os::raw::c_char;
3
4use crate::auth::AuthMethod;
5use crate::error::Error;
6use crate::error::TarantoolError;
7use crate::index::IteratorType;
8use crate::msgpack;
9use crate::network::protocol::ProtocolError;
10use crate::tuple::{ToTupleBuffer, Tuple};
11
12use super::SyncIndex;
13
14const MP_STR_MAX_HEADER_SIZE: usize = 5;
15
16pub mod iproto_key {
21 pub const REQUEST_TYPE: u8 = 0x00;
22 pub const SYNC: u8 = 0x01;
23 pub const SCHEMA_VERSION: u8 = 0x05;
25 pub const SPACE_ID: u8 = 0x10;
27 pub const INDEX_ID: u8 = 0x11;
28 pub const LIMIT: u8 = 0x12;
29 pub const OFFSET: u8 = 0x13;
30 pub const ITERATOR: u8 = 0x14;
31 pub const INDEX_BASE: u8 = 0x15;
32 pub const KEY: u8 = 0x20;
34 pub const TUPLE: u8 = 0x21;
35 pub const FUNCTION_NAME: u8 = 0x22;
36 pub const USER_NAME: u8 = 0x23;
37 pub const EXPR: u8 = 0x27;
39 pub const OPS: u8 = 0x28;
40 pub const DATA: u8 = 0x30;
42 pub const ERROR: u8 = 0x31;
43 pub const SQL_TEXT: u8 = 0x40;
45 pub const SQL_BIND: u8 = 0x41;
46 pub const SQL_INFO: u8 = 0x42;
47 pub const STMT_ID: u8 = 0x43;
49 pub const ERROR_EXT: u8 = 0x52;
51 }
53use iproto_key::*;
54
55crate::define_enum_with_introspection! {
56 #[non_exhaustive]
61 #[repr(C)]
62 pub enum IProtoType {
63 Ok = 0,
65 Select = 1,
66 Insert = 2,
67 Replace = 3,
68 Update = 4,
69 Delete = 5,
70 LegacyCall = 6,
73 Auth = 7,
74 Eval = 8,
75 Upsert = 9,
76 Call = 10,
77 Execute = 11,
78 Nop = 12,
79 Prepare = 13,
80 Begin = 14,
81 Commit = 15,
82 Rollback = 16,
83 Ping = 64,
85 Error = 1 << 15,
89 }
90}
91
92#[inline(always)]
94pub fn encode_header(
95 stream: &mut impl Write,
96 sync: SyncIndex,
97 request_type: IProtoType,
98) -> Result<(), Error> {
99 let helper = Header {
100 sync,
101 iproto_type: request_type as _,
102 error_code: 0,
104 schema_version: 0,
106 };
107 helper.encode(stream)
108}
109
110#[inline]
112pub fn chap_sha1_prepare(password: impl AsRef<[u8]>, salt: &[u8]) -> Vec<u8> {
113 use sha1::{Digest as Sha1Digest, Sha1};
121
122 let mut hasher = Sha1::new();
123 hasher.update(password);
124 let mut step_1_and_scramble = hasher.finalize();
125
126 let mut hasher = Sha1::new();
127 hasher.update(step_1_and_scramble);
128 let step_2 = hasher.finalize();
129
130 let mut hasher = Sha1::new();
131 hasher.update(&salt[0..20]);
132 hasher.update(step_2);
133 let step_3 = hasher.finalize();
134
135 step_1_and_scramble
136 .iter_mut()
137 .zip(step_3.iter())
138 .for_each(|(a, b)| *a ^= *b);
139
140 let scramble_bytes = step_1_and_scramble.to_vec();
141 debug_assert_eq!(scramble_bytes.len(), 20);
142 scramble_bytes
143}
144
145#[inline]
148pub fn chap_sha1_auth_data(password: &str, salt: &[u8]) -> Vec<u8> {
149 let hashed_data = chap_sha1_prepare(password, salt);
150 let hashed_len = hashed_data.len();
151
152 let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
153 rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
154 res.write_all(&hashed_data).expect("Can't fail for a Vec");
155 res
156}
157
158#[cfg(feature = "picodata")]
160#[inline]
161pub fn ldap_prepare(password: impl AsRef<[u8]>) -> Vec<u8> {
162 password.as_ref().to_vec()
163}
164
165#[cfg(feature = "picodata")]
170#[inline]
171pub fn ldap_auth_data(password: &str) -> Vec<u8> {
172 let hashed_data = ldap_prepare(password);
173 let hashed_len = hashed_data.len();
174
175 let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
176 rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
177 res.write_all(&hashed_data).expect("Can't fail for a Vec");
178 res
179}
180
181#[cfg(feature = "picodata")]
183#[inline]
184pub fn md5_prepare(user: &str, password: impl AsRef<[u8]>, salt: [u8; 4]) -> Vec<u8> {
185 use md5::{Digest as Md5Digest, Md5};
193
194 let mut md5 = Md5::new();
195
196 md5.update(password);
197 md5.update(user);
198 let shadow_pass = format!("{:x}", md5.finalize_reset());
199
200 md5.update(shadow_pass);
201 md5.update(salt);
202 let client_pass = format!("md5{:x}", md5.finalize());
203
204 client_pass.into_bytes()
205}
206
207#[cfg(feature = "picodata")]
210#[inline]
211pub fn md5_auth_data(user: &str, password: &str, salt: [u8; 4]) -> Vec<u8> {
212 let hashed_data = md5_prepare(user, password, salt);
213 let hashed_len = hashed_data.len();
214
215 let mut res = Vec::with_capacity(hashed_len + MP_STR_MAX_HEADER_SIZE);
216 rmp::encode::write_str_len(&mut res, hashed_len as _).expect("Can't fail for a Vec");
217 res.write_all(&hashed_data).expect("Can't fail for a Vec");
218 res
219}
220
221pub fn encode_auth(
222 stream: &mut impl Write,
223 user: &str,
224 password: &str,
225 salt: &[u8],
226 auth_method: AuthMethod,
227) -> Result<(), Error> {
228 let auth_data;
229 match auth_method {
230 AuthMethod::ChapSha1 => {
231 auth_data = chap_sha1_auth_data(password, salt);
232 }
233 #[cfg(feature = "picodata")]
234 AuthMethod::Ldap => {
235 auth_data = ldap_auth_data(password);
236 }
237 #[cfg(feature = "picodata")]
238 AuthMethod::Md5 => {
239 let salt_part =
243 crate::util::slice_first_chunk::<4, _>(salt).expect("Can't fail for valid salt");
244 auth_data = md5_auth_data(user, password, *salt_part);
245 }
246 }
247
248 rmp::encode::write_map_len(stream, 2)?;
249
250 rmp::encode::write_pfix(stream, USER_NAME)?;
252 rmp::encode::write_str(stream, user)?;
253
254 rmp::encode::write_pfix(stream, TUPLE)?;
256 rmp::encode::write_array_len(stream, 2)?;
257 rmp::encode::write_str(stream, auth_method.as_str())?;
258 stream.write_all(&auth_data)?;
259 Ok(())
260}
261
262pub fn encode_ping(stream: &mut impl Write) -> Result<(), Error> {
263 rmp::encode::write_map_len(stream, 0)?;
264 Ok(())
265}
266
267pub fn encode_execute<P>(stream: &mut impl Write, sql: &str, bind_params: &P) -> Result<(), Error>
268where
269 P: ToTupleBuffer + ?Sized,
270{
271 rmp::encode::write_map_len(stream, 2)?;
272 rmp::encode::write_pfix(stream, SQL_TEXT)?;
273 rmp::encode::write_str(stream, sql)?;
274
275 rmp::encode::write_pfix(stream, SQL_BIND)?;
276 bind_params.write_tuple_data(stream)?;
277 Ok(())
278}
279
280pub fn encode_call<T>(stream: &mut impl Write, function_name: &str, args: &T) -> Result<(), Error>
281where
282 T: ToTupleBuffer + ?Sized,
283{
284 rmp::encode::write_map_len(stream, 2)?;
285 rmp::encode::write_pfix(stream, FUNCTION_NAME)?;
286 rmp::encode::write_str(stream, function_name)?;
287 rmp::encode::write_pfix(stream, TUPLE)?;
288 args.write_tuple_data(stream)?;
289 Ok(())
290}
291
292pub fn encode_eval<T>(stream: &mut impl Write, expression: &str, args: &T) -> Result<(), Error>
293where
294 T: ToTupleBuffer + ?Sized,
295{
296 rmp::encode::write_map_len(stream, 2)?;
297 rmp::encode::write_pfix(stream, EXPR)?;
298 rmp::encode::write_str(stream, expression)?;
299 rmp::encode::write_pfix(stream, TUPLE)?;
300 args.write_tuple_data(stream)?;
301 Ok(())
302}
303
304#[allow(clippy::too_many_arguments)]
305pub fn encode_select<K>(
306 stream: &mut impl Write,
307 space_id: u32,
308 index_id: u32,
309 limit: u32,
310 offset: u32,
311 iterator_type: IteratorType,
312 key: &K,
313) -> Result<(), Error>
314where
315 K: ToTupleBuffer + ?Sized,
316{
317 rmp::encode::write_map_len(stream, 6)?;
318 rmp::encode::write_pfix(stream, SPACE_ID)?;
319 rmp::encode::write_u32(stream, space_id)?;
320 rmp::encode::write_pfix(stream, INDEX_ID)?;
321 rmp::encode::write_u32(stream, index_id)?;
322 rmp::encode::write_pfix(stream, LIMIT)?;
323 rmp::encode::write_u32(stream, limit)?;
324 rmp::encode::write_pfix(stream, OFFSET)?;
325 rmp::encode::write_u32(stream, offset)?;
326 rmp::encode::write_pfix(stream, ITERATOR)?;
327 rmp::encode::write_u32(stream, iterator_type as u32)?;
328 rmp::encode::write_pfix(stream, KEY)?;
329 key.write_tuple_data(stream)?;
330 Ok(())
331}
332
333pub fn encode_insert<T>(stream: &mut impl Write, space_id: u32, value: &T) -> Result<(), Error>
334where
335 T: ToTupleBuffer + ?Sized,
336{
337 rmp::encode::write_map_len(stream, 2)?;
338 rmp::encode::write_pfix(stream, SPACE_ID)?;
339 rmp::encode::write_u32(stream, space_id)?;
340 rmp::encode::write_pfix(stream, TUPLE)?;
341 value.write_tuple_data(stream)?;
342 Ok(())
343}
344
345pub fn encode_replace<T>(stream: &mut impl Write, space_id: u32, value: &T) -> Result<(), Error>
346where
347 T: ToTupleBuffer + ?Sized,
348{
349 rmp::encode::write_map_len(stream, 2)?;
350 rmp::encode::write_pfix(stream, SPACE_ID)?;
351 rmp::encode::write_u32(stream, space_id)?;
352 rmp::encode::write_pfix(stream, TUPLE)?;
353 value.write_tuple_data(stream)?;
354 Ok(())
355}
356
357pub fn encode_update<K, Op>(
358 stream: &mut impl Write,
359 space_id: u32,
360 index_id: u32,
361 key: &K,
362 ops: &Op,
363) -> Result<(), Error>
364where
365 K: ToTupleBuffer + ?Sized,
366 Op: ToTupleBuffer + ?Sized,
367{
368 rmp::encode::write_map_len(stream, 4)?;
369 rmp::encode::write_pfix(stream, SPACE_ID)?;
370 rmp::encode::write_u32(stream, space_id)?;
371 rmp::encode::write_pfix(stream, INDEX_ID)?;
372 rmp::encode::write_u32(stream, index_id)?;
373 rmp::encode::write_pfix(stream, KEY)?;
374 key.write_tuple_data(stream)?;
375 rmp::encode::write_pfix(stream, TUPLE)?;
376 ops.write_tuple_data(stream)?;
377 Ok(())
378}
379
380pub fn encode_upsert<T, Op>(
381 stream: &mut impl Write,
382 space_id: u32,
383 index_id: u32,
384 value: &T,
385 ops: &Op,
386) -> Result<(), Error>
387where
388 T: ToTupleBuffer + ?Sized,
389 Op: ToTupleBuffer + ?Sized,
390{
391 rmp::encode::write_map_len(stream, 4)?;
392 rmp::encode::write_pfix(stream, SPACE_ID)?;
393 rmp::encode::write_u32(stream, space_id)?;
394 rmp::encode::write_pfix(stream, INDEX_BASE)?;
395 rmp::encode::write_u32(stream, index_id)?;
396 rmp::encode::write_pfix(stream, OPS)?;
397 ops.write_tuple_data(stream)?;
398 rmp::encode::write_pfix(stream, TUPLE)?;
399 value.write_tuple_data(stream)?;
400 Ok(())
401}
402
403pub fn encode_delete<K>(
404 stream: &mut impl Write,
405 space_id: u32,
406 index_id: u32,
407 key: &K,
408) -> Result<(), Error>
409where
410 K: ToTupleBuffer + ?Sized,
411{
412 rmp::encode::write_map_len(stream, 3)?;
413 rmp::encode::write_pfix(stream, SPACE_ID)?;
414 rmp::encode::write_u32(stream, space_id)?;
415 rmp::encode::write_pfix(stream, INDEX_ID)?;
416 rmp::encode::write_u32(stream, index_id)?;
417 rmp::encode::write_pfix(stream, KEY)?;
418 key.write_tuple_data(stream)?;
419 Ok(())
420}
421
422#[derive(Debug)]
423pub struct Header {
424 pub sync: SyncIndex,
425 pub iproto_type: u32,
434 pub error_code: u32,
435 pub schema_version: u64,
436}
437
438impl Header {
439 pub fn encode(&self, stream: &mut impl Write) -> Result<(), Error> {
444 rmp::encode::write_map_len(stream, 2)?;
445 rmp::encode::write_pfix(stream, REQUEST_TYPE)?;
446 rmp::encode::write_uint(stream, self.iproto_type as _)?;
447 rmp::encode::write_pfix(stream, SYNC)?;
448 rmp::encode::write_uint(stream, self.sync.0)?;
449 Ok(())
450 }
451
452 #[inline(always)]
454 pub fn encode_from_parts(
455 stream: &mut impl Write,
456 sync: SyncIndex,
457 request_type: IProtoType,
458 ) -> Result<(), Error> {
459 encode_header(stream, sync, request_type)
460 }
461
462 pub fn decode(stream: &mut (impl Read + Seek)) -> Result<Header, Error> {
467 let mut sync: Option<u64> = None;
468 let mut iproto_type: Option<u32> = None;
469 let mut error_code: u32 = 0;
470 let mut schema_version: Option<u64> = None;
471
472 let map_len = rmp::decode::read_map_len(stream)?;
473 for _ in 0..map_len {
474 let key = rmp::decode::read_pfix(stream)?;
475 match key {
476 REQUEST_TYPE => {
477 let r#type: u32 = rmp::decode::read_int(stream)?;
478
479 const IPROTO_TYPE_ERROR: u32 = IProtoType::Error as _;
480 if (r#type & IPROTO_TYPE_ERROR) != 0 {
481 iproto_type = Some(IPROTO_TYPE_ERROR);
482 error_code = r#type & !IPROTO_TYPE_ERROR;
483 } else {
484 iproto_type = Some(r#type);
485 }
486 }
487 SYNC => sync = Some(rmp::decode::read_int(stream)?),
488 SCHEMA_VERSION => schema_version = Some(rmp::decode::read_int(stream)?),
489 _ => msgpack::skip_value(stream)?,
490 }
491 }
492
493 if sync.is_none() || iproto_type.is_none() || schema_version.is_none() {
494 return Err(io::Error::from(io::ErrorKind::InvalidData).into());
495 }
496
497 Ok(Header {
498 sync: SyncIndex(sync.unwrap()),
499 iproto_type: iproto_type.unwrap(),
500 error_code,
501 schema_version: schema_version.unwrap(),
502 })
503 }
504}
505
506pub struct Response<T> {
507 pub header: Header,
508 pub payload: T,
509}
510
511#[inline(always)]
513pub fn decode_header(stream: &mut (impl Read + Seek)) -> Result<Header, Error> {
514 Header::decode(stream)
515}
516
517mod extended_error_keys {
528 pub const STACK: u8 = 0;
530}
531
532mod error_field {
536 pub const TYPE: u8 = 0x00;
538
539 pub const FILE: u8 = 0x01;
541
542 pub const LINE: u8 = 0x02;
544
545 pub const MESSAGE: u8 = 0x03;
547
548 pub const ERRNO: u8 = 0x04;
550
551 pub const CODE: u8 = 0x05;
553
554 pub const FIELDS: u8 = 0x06;
557}
558
559pub fn decode_error(stream: &mut impl Read, header: &Header) -> Result<TarantoolError, Error> {
561 let mut error = TarantoolError::default();
562
563 let map_len = rmp::decode::read_map_len(stream)?;
564 for _ in 0..map_len {
565 let key = rmp::decode::read_pfix(stream)?;
566 match key {
567 ERROR => {
568 let message = decode_string(stream)?;
569 error.message = Some(message.into());
570 error.code = header.error_code;
571 }
572 ERROR_EXT => {
573 if let Some(e) = decode_extended_error(stream)? {
574 error = e;
575 } else {
576 crate::say_verbose!("empty ERROR_EXT field");
577 }
578 }
579 _ => {
580 crate::say_verbose!("unhandled iproto key {key} when decoding error");
581 }
582 }
583 }
584
585 if error.message.is_none() {
586 return Err(ProtocolError::ResponseFieldNotFound {
587 key: "ERROR",
588 context: "required for error responses",
589 }
590 .into());
591 }
592
593 Ok(error)
594}
595
596pub fn decode_extended_error(stream: &mut impl Read) -> Result<Option<TarantoolError>, Error> {
597 let extended_error_n_fields = rmp::decode::read_map_len(stream)? as usize;
598 if extended_error_n_fields == 0 {
599 return Ok(None);
600 }
601
602 let mut error_info = None;
603
604 for _ in 0..extended_error_n_fields {
605 let key = rmp::decode::read_pfix(stream)?;
606 match key {
607 extended_error_keys::STACK => {
608 if error_info.is_some() {
609 crate::say_verbose!("duplicate error stack in response");
610 }
611
612 let error_stack_len = rmp::decode::read_array_len(stream)? as usize;
613 if error_stack_len == 0 {
614 continue;
615 }
616
617 let mut stack_nodes = Vec::with_capacity(error_stack_len);
618 for _ in 0..error_stack_len {
619 stack_nodes.push(decode_error_stack_node(stream)?);
620 }
621
622 for mut node in stack_nodes.into_iter().rev() {
623 if let Some(next_node) = error_info {
624 node.cause = Some(Box::new(next_node));
625 }
626 error_info = Some(node);
627 }
628 }
629 _ => {
630 crate::say_verbose!("unknown extended error key {key}");
631 }
632 }
633 }
634
635 Ok(error_info)
636}
637
638pub fn decode_error_stack_node(mut stream: &mut impl Read) -> Result<TarantoolError, Error> {
639 let mut res = TarantoolError::default();
640
641 let map_len = rmp::decode::read_map_len(stream)? as usize;
642 for _ in 0..map_len {
643 let key = rmp::decode::read_pfix(stream)?;
644 match key {
645 error_field::TYPE => {
646 res.error_type = Some(decode_string(stream)?.into_boxed_str());
647 }
648 error_field::FILE => {
649 res.file = Some(decode_string(stream)?.into_boxed_str());
650 }
651 error_field::LINE => {
652 res.line = Some(rmp::decode::read_int(stream)?);
653 }
654 error_field::MESSAGE => {
655 res.message = Some(decode_string(stream)?.into_boxed_str());
656 }
657 error_field::ERRNO => {
658 let n = rmp::decode::read_int(stream)?;
659 if n != 0 {
660 res.errno = Some(n);
661 }
662 }
663 error_field::CODE => {
664 res.code = rmp::decode::read_int(stream)?;
665 }
666 error_field::FIELDS => match rmp_serde::from_read(&mut stream) {
667 Ok(f) => {
668 res.fields = f;
669 }
670 Err(e) => {
671 crate::say_verbose!("failed decoding error fields: {e}");
672 }
673 },
674 _ => {
675 crate::say_verbose!("unexpected error field {key}");
676 }
677 }
678 }
679
680 Ok(res)
681}
682
683pub fn decode_string(stream: &mut impl Read) -> Result<String, Error> {
688 let len = rmp::decode::read_str_len(stream)? as usize;
689 let mut str_buf = vec![0u8; len];
690 stream.read_exact(&mut str_buf)?;
691 let res = String::from_utf8(str_buf)?;
692 Ok(res)
693}
694
695pub fn decode_greeting(stream: &mut impl Read) -> Result<Vec<u8>, Error> {
696 let mut buf = [0; 128];
697 stream.read_exact(&mut buf)?;
698 let salt = base64::decode(&buf[64..108]).unwrap();
699 Ok(salt)
700}
701
702pub fn decode_call(buffer: &mut Cursor<Vec<u8>>) -> Result<Tuple, Error> {
703 let payload_len = rmp::decode::read_map_len(buffer)?;
704 for _ in 0..payload_len {
705 let key = rmp::decode::read_pfix(buffer)?;
706 match key {
707 DATA => {
708 return decode_tuple(buffer);
709 }
710 _ => {
711 msgpack::skip_value(buffer)?;
712 }
713 };
714 }
715 Err(ProtocolError::ResponseFieldNotFound {
716 key: "DATA",
717 context: "required for CALL/EVAL responses",
718 }
719 .into())
720}
721
722pub fn decode_multiple_rows(buffer: &mut Cursor<Vec<u8>>) -> Result<Vec<Tuple>, Error> {
723 let payload_len = rmp::decode::read_map_len(buffer)?;
724 for _ in 0..payload_len {
725 let key = rmp::decode::read_pfix(buffer)?;
726 match key {
727 DATA => {
728 let items_count = rmp::decode::read_array_len(buffer)? as usize;
729 let mut result = Vec::with_capacity(items_count);
730 for _ in 0..items_count {
731 result.push(decode_tuple(buffer)?);
732 }
733 return Ok(result);
734 }
735 _ => {
736 msgpack::skip_value(buffer)?;
737 }
738 };
739 }
740 Ok(vec![])
741}
742
743pub fn decode_single_row(buffer: &mut Cursor<Vec<u8>>) -> Result<Option<Tuple>, Error> {
744 let payload_len = rmp::decode::read_map_len(buffer)?;
745 for _ in 0..payload_len {
746 let key = rmp::decode::read_pfix(buffer)?;
747 match key {
748 DATA => {
749 let items_count = rmp::decode::read_array_len(buffer)? as usize;
750 return Ok(if items_count == 0 {
751 None
752 } else {
753 Some(decode_tuple(buffer)?)
754 });
755 }
756 _ => {
757 msgpack::skip_value(buffer)?;
758 }
759 }
760 }
761 Ok(None)
762}
763
764pub fn decode_tuple(buffer: &mut Cursor<Vec<u8>>) -> Result<Tuple, Error> {
765 let payload_offset = buffer.position();
766 msgpack::skip_value(buffer)?;
767 let payload_len = buffer.position() - payload_offset;
768 let buf = buffer.get_mut();
769 unsafe {
770 Ok(Tuple::from_raw_data(
771 buf.as_slice().as_ptr().add(payload_offset as usize) as *mut c_char,
772 payload_len as u32,
773 ))
774 }
775}
776
777pub fn value_slice(cursor: &mut Cursor<impl AsRef<[u8]>>) -> crate::Result<&[u8]> {
778 let start = cursor.position() as usize;
779 msgpack::skip_value(cursor)?;
780 Ok(&cursor.get_ref().as_ref()[start..(cursor.position() as usize)])
781}