1use crate::error_code::ErrorCode;
2use abi_stable::StableAbi;
3use std::io::Cursor;
4use std::ptr::NonNull;
5use tarantool::error::BoxError;
6use tarantool::error::TarantoolErrorCode;
7use tarantool::ffi::tarantool as ffi;
8
9#[repr(C)]
15#[derive(StableAbi, Clone, Copy, Debug)]
16pub struct FfiSafeBytes {
17 pointer: NonNull<u8>,
18 len: usize,
19}
20
21impl FfiSafeBytes {
22 #[inline(always)]
23 pub fn len(self) -> usize {
24 self.len
25 }
26
27 #[inline(always)]
28 pub fn is_empty(self) -> bool {
29 self.len == 0
30 }
31
32 #[inline(always)]
36 pub unsafe fn from_raw_parts(pointer: NonNull<u8>, len: usize) -> Self {
37 Self { pointer, len }
38 }
39
40 #[inline(always)]
41 pub fn into_raw_parts(self) -> (*mut u8, usize) {
42 (self.pointer.as_ptr(), self.len)
43 }
44
45 pub unsafe fn as_bytes<'a>(self) -> &'a [u8] {
55 std::slice::from_raw_parts(self.pointer.as_ptr(), self.len)
56 }
57}
58
59impl Default for FfiSafeBytes {
60 #[inline(always)]
61 fn default() -> Self {
62 Self {
63 pointer: NonNull::dangling(),
64 len: 0,
65 }
66 }
67}
68
69impl<'a> From<&'a [u8]> for FfiSafeBytes {
70 #[inline(always)]
71 fn from(value: &'a [u8]) -> Self {
72 Self {
73 pointer: as_non_null_ptr(value),
74 len: value.len(),
75 }
76 }
77}
78
79impl<'a> From<&'a str> for FfiSafeBytes {
80 #[inline(always)]
81 fn from(value: &'a str) -> Self {
82 Self {
83 pointer: as_non_null_ptr(value.as_bytes()),
84 len: value.len(),
85 }
86 }
87}
88
89#[repr(C)]
98#[derive(StableAbi, Clone, Copy, Debug)]
99pub struct FfiSafeStr {
100 pointer: NonNull<u8>,
101 len: usize,
102}
103
104impl FfiSafeStr {
105 #[inline(always)]
106 pub fn len(self) -> usize {
107 self.len
108 }
109
110 #[inline(always)]
111 pub fn is_empty(self) -> bool {
112 self.len == 0
113 }
114
115 #[inline(always)]
119 pub unsafe fn from_raw_parts(pointer: NonNull<u8>, len: usize) -> Self {
120 Self { pointer, len }
121 }
122
123 pub unsafe fn from_utf8_unchecked(bytes: &[u8]) -> Self {
126 let pointer = as_non_null_ptr(bytes);
127 let len = bytes.len();
128 Self { pointer, len }
129 }
130
131 #[inline(always)]
132 pub fn into_raw_parts(self) -> (*mut u8, usize) {
133 (self.pointer.as_ptr(), self.len)
134 }
135
136 #[inline]
146 pub unsafe fn as_str<'a>(self) -> &'a str {
147 if cfg!(debug_assertions) {
148 std::str::from_utf8(self.as_bytes()).expect("should only be used with valid utf8")
149 } else {
150 std::str::from_utf8_unchecked(self.as_bytes())
151 }
152 }
153
154 #[inline(always)]
164 pub unsafe fn as_bytes<'a>(self) -> &'a [u8] {
165 std::slice::from_raw_parts(self.pointer.as_ptr(), self.len)
166 }
167}
168
169impl Default for FfiSafeStr {
170 #[inline(always)]
171 fn default() -> Self {
172 Self {
173 pointer: NonNull::dangling(),
174 len: 0,
175 }
176 }
177}
178
179impl<'a> From<&'a str> for FfiSafeStr {
180 #[inline(always)]
181 fn from(value: &'a str) -> Self {
182 Self {
183 pointer: as_non_null_ptr(value.as_bytes()),
184 len: value.len(),
185 }
186 }
187}
188
189#[derive(Debug)]
197pub struct RegionGuard {
198 save_point: usize,
199}
200
201impl RegionGuard {
202 #[inline(always)]
209 #[allow(clippy::new_without_default)]
210 pub fn new() -> Self {
211 let save_point = unsafe { ffi::box_region_used() };
214 Self { save_point }
215 }
216
217 #[inline(always)]
222 pub fn used_at_creation(&self) -> usize {
223 self.save_point
224 }
225}
226
227impl Drop for RegionGuard {
228 fn drop(&mut self) {
229 unsafe { ffi::box_region_truncate(self.save_point) }
232 }
233}
234
235#[inline]
242fn allocate_on_region(size: usize) -> Result<&'static mut [u8], BoxError> {
243 let pointer = unsafe { ffi::box_region_alloc(size).cast::<u8>() };
245 if pointer.is_null() {
246 return Err(BoxError::last());
247 }
248 let region_slice = unsafe { std::slice::from_raw_parts_mut(pointer, size) };
250 Ok(region_slice)
251}
252
253#[inline]
263pub fn copy_to_region(data: &[u8]) -> Result<&'static [u8], BoxError> {
264 let region_slice = allocate_on_region(data.len())?;
265 region_slice.copy_from_slice(data);
266 Ok(region_slice)
267}
268
269#[derive(Debug)]
283pub struct RegionBuffer {
284 guard: RegionGuard,
285
286 start: *mut u8,
287 count: usize,
288}
289
290impl RegionBuffer {
291 #[inline(always)]
295 #[allow(clippy::new_without_default)]
296 pub fn new() -> Self {
297 Self {
298 guard: RegionGuard::new(),
299 start: std::ptr::null_mut(),
300 count: 0,
301 }
302 }
303
304 #[track_caller]
312 pub fn push(&mut self, data: &[u8]) -> Result<(), BoxError> {
313 let added_count = data.len();
314 unsafe {
315 let pointer: *mut u8 = ffi::box_region_alloc(added_count) as _;
316
317 if pointer.is_null() {
318 #[rustfmt::skip]
319 return Err(BoxError::new(TarantoolErrorCode::MemoryIssue, format!("failed to allocate {added_count} bytes on the region allocator")));
320 }
321
322 memcpy(pointer, data.as_ptr(), added_count);
323 self.count += added_count;
324 if self.start.is_null() {
325 self.start = pointer;
326 }
327 }
328
329 Ok(())
330 }
331
332 #[deprecated = "no longer supported, consider using RegionBuffer::into_raw_parts instead"]
333 #[inline(always)]
334 pub fn get(&self) -> &[u8] {
335 unimplemented!("RegionBuffer::get is no longer supported")
336 }
337
338 #[inline]
354 pub fn into_raw_parts(self) -> (&'static [u8], usize) {
355 self.try_into_raw_parts().unwrap()
356 }
357
358 pub fn try_into_raw_parts(self) -> Result<(&'static [u8], usize), (BoxError, Self)> {
372 let res = unsafe { self.join() };
375 let slice = match res {
376 Ok(v) => v,
377 Err(e) => {
378 return Err((e, self));
379 }
380 };
381
382 let save_point = self.guard.used_at_creation();
383 std::mem::forget(self.guard);
384 Ok((slice, save_point))
385 }
386
387 #[inline]
391 unsafe fn join(&self) -> Result<&'static [u8], BoxError> {
392 use crate::internal::ffi;
393
394 if self.count == 0 {
395 return Ok(&[]);
396 }
397
398 if !ffi::has_box_region_join() {
399 return Err(BoxError::new(
400 TarantoolErrorCode::Unsupported,
401 "box_region_join is not supported in this version of picodata",
402 ));
403 }
404
405 let start = unsafe { ffi::box_region_join(self.count) };
408 if start.is_null() {
409 return Err(BoxError::last());
410 }
411 let slice = unsafe { std::slice::from_raw_parts(start.cast(), self.count) };
413 Ok(slice)
414 }
415
416 pub fn try_into_vec(self) -> Result<Vec<u8>, BoxError> {
419 let res = unsafe { self.join() };
422 let slice = match res {
423 Ok(v) => v,
424 Err(e) => {
425 return Err(e);
426 }
427 };
428
429 let res = Vec::from(slice);
430
431 drop(self);
433
434 Ok(res)
435 }
436}
437
438impl std::io::Write for RegionBuffer {
439 #[inline(always)]
440 fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
441 if let Err(e) = self.push(data) {
442 #[rustfmt::skip]
443 return Err(std::io::Error::new(std::io::ErrorKind::OutOfMemory, e.message()));
444 }
445
446 Ok(data.len())
447 }
448
449 #[inline(always)]
450 fn flush(&mut self) -> std::io::Result<()> {
451 Ok(())
452 }
453}
454
455#[inline(always)]
456unsafe fn memcpy(destination: *mut u8, source: *const u8, count: usize) {
457 let to = std::slice::from_raw_parts_mut(destination, count);
458 let from = std::slice::from_raw_parts(source, count);
459 to.copy_from_slice(from)
460}
461
462pub struct DisplayErrorLocation<'a>(pub &'a BoxError);
468
469impl std::fmt::Display for DisplayErrorLocation<'_> {
470 #[inline]
471 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
472 if let Some((file, line)) = self.0.file().zip(self.0.line()) {
473 write!(f, "{file}:{line}: ")?;
474 }
475 Ok(())
476 }
477}
478
479pub struct DisplayAsHexBytesLimitted<'a>(pub &'a [u8]);
485
486impl std::fmt::Display for DisplayAsHexBytesLimitted<'_> {
487 #[inline]
488 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
489 if self.0.len() > 512 {
490 f.write_str("<too-big-to-display>")
491 } else {
492 tarantool::util::DisplayAsHexBytes(self.0).fmt(f)
493 }
494 }
495}
496
497#[track_caller]
504#[inline]
505pub fn msgpack_decode_str(data: &[u8]) -> Result<&str, BoxError> {
506 let mut cursor = Cursor::new(data);
507 let length = rmp::decode::read_str_len(&mut cursor).map_err(invalid_msgpack)? as usize;
508
509 let res = str_from_cursor(length, &mut cursor)?;
510 let (_, tail) = cursor_split(&cursor);
511 if !tail.is_empty() {
512 return Err(invalid_msgpack(format!(
513 "unexpected data after msgpack value: {}",
514 DisplayAsHexBytesLimitted(tail)
515 )));
516 }
517
518 Ok(res)
519}
520
521#[track_caller]
524pub fn msgpack_read_str<'a>(cursor: &mut Cursor<&'a [u8]>) -> Result<&'a str, BoxError> {
525 let length = rmp::decode::read_str_len(cursor).map_err(invalid_msgpack)? as usize;
526
527 str_from_cursor(length, cursor)
528}
529
530#[track_caller]
540pub fn msgpack_read_rest_of_str<'a>(
541 marker: rmp::Marker,
542 cursor: &mut Cursor<&'a [u8]>,
543) -> Result<Option<&'a str>, BoxError> {
544 use rmp::decode::RmpRead as _;
545
546 let length = match marker {
547 rmp::Marker::FixStr(v) => v as usize,
548 rmp::Marker::Str8 => cursor.read_data_u8().map_err(invalid_msgpack)? as usize,
549 rmp::Marker::Str16 => cursor.read_data_u16().map_err(invalid_msgpack)? as usize,
550 rmp::Marker::Str32 => cursor.read_data_u32().map_err(invalid_msgpack)? as usize,
551 _ => return Ok(None),
552 };
553
554 str_from_cursor(length, cursor).map(Some)
555}
556
557#[inline]
558#[track_caller]
559fn str_from_cursor<'a>(length: usize, cursor: &mut Cursor<&'a [u8]>) -> Result<&'a str, BoxError> {
560 let start_index = cursor.position() as usize;
561 let data = *cursor.get_ref();
562 let remaining_length = data.len() - start_index;
563 if remaining_length < length {
564 return Err(invalid_msgpack(format!(
565 "expected a string of length {length}, got {remaining_length}"
566 )));
567 }
568
569 let end_index = start_index + length;
570 let res = std::str::from_utf8(&data[start_index..end_index]).map_err(invalid_msgpack)?;
571 cursor.set_position(end_index as _);
572 Ok(res)
573}
574
575#[track_caller]
577pub fn msgpack_decode_bin(data: &[u8]) -> Result<&[u8], BoxError> {
578 let mut cursor = Cursor::new(data);
579 let length = rmp::decode::read_bin_len(&mut cursor).map_err(invalid_msgpack)? as usize;
580
581 let res = bin_from_cursor(length, &mut cursor)?;
582 let (_, tail) = cursor_split(&cursor);
583 if !tail.is_empty() {
584 return Err(invalid_msgpack(format!(
585 "unexpected data after msgpack value: {}",
586 DisplayAsHexBytesLimitted(tail)
587 )));
588 }
589
590 Ok(res)
591}
592
593#[track_caller]
596pub fn msgpack_read_bin<'a>(cursor: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], BoxError> {
597 let length = rmp::decode::read_bin_len(cursor).map_err(invalid_msgpack)? as usize;
598
599 bin_from_cursor(length, cursor)
600}
601
602#[track_caller]
611pub fn msgpack_read_rest_of_bin<'a>(
612 marker: rmp::Marker,
613 cursor: &mut Cursor<&'a [u8]>,
614) -> Result<Option<&'a [u8]>, BoxError> {
615 use rmp::decode::RmpRead as _;
616
617 let length = match marker {
618 rmp::Marker::Bin8 => cursor.read_data_u8().map_err(invalid_msgpack)? as usize,
619 rmp::Marker::Bin16 => cursor.read_data_u16().map_err(invalid_msgpack)? as usize,
620 rmp::Marker::Bin32 => cursor.read_data_u32().map_err(invalid_msgpack)? as usize,
621 _ => return Ok(None),
622 };
623
624 bin_from_cursor(length, cursor).map(Some)
625}
626
627#[inline]
628#[track_caller]
629fn bin_from_cursor<'a>(length: usize, cursor: &mut Cursor<&'a [u8]>) -> Result<&'a [u8], BoxError> {
630 let start_index = cursor.position() as usize;
631 let data = *cursor.get_ref();
632 let remaining_length = data.len() - start_index;
633 if remaining_length < length {
634 return Err(invalid_msgpack(format!(
635 "expected binary data of length {length}, got {remaining_length}"
636 )));
637 }
638
639 let end_index = start_index + length;
640 let res = &data[start_index..end_index];
641 cursor.set_position(end_index as _);
642 Ok(res)
643}
644
645fn cursor_split<'a>(cursor: &Cursor<&'a [u8]>) -> (&'a [u8], &'a [u8]) {
647 let slice = cursor.get_ref();
648 let pos = cursor.position().min(slice.len() as u64);
649 slice.split_at(pos as usize)
650}
651
652#[inline(always)]
653#[track_caller]
654fn invalid_msgpack(error: impl ToString) -> BoxError {
655 BoxError::new(TarantoolErrorCode::InvalidMsgpack, error.to_string())
656}
657
658#[inline(always)]
663fn as_non_null_ptr<T>(data: &[T]) -> NonNull<T> {
664 let pointer = data.as_ptr();
665 unsafe { NonNull::new_unchecked(pointer as *mut _) }
669}
670
671pub fn tarantool_error_to_box_error(e: tarantool::error::Error) -> BoxError {
673 match e {
674 tarantool::error::Error::Tarantool(e) => e,
675 other => BoxError::new(ErrorCode::Other, other.to_string()),
676 }
677}
678
679#[cfg(feature = "internal_test")]
684mod test {
685 use super::*;
686
687 #[tarantool::test]
688 fn region_buffer() {
689 #[derive(serde::Serialize, Debug)]
690 struct S {
691 name: String,
692 x: f32,
693 y: f32,
694 array: Vec<(i32, i32, bool)>,
695 }
696
697 let s = S {
698 name: "foo".into(),
699 x: 4.2,
700 y: 6.9,
701 array: vec![(1, 2, true), (3, 4, false)],
702 };
703
704 let vec = rmp_serde::to_vec(&s).unwrap();
705 let mut buffer = RegionBuffer::new();
706 rmp_serde::encode::write(&mut buffer, &s).unwrap();
707 let data = buffer.try_into_vec().unwrap();
708 assert_eq!(vec, data);
709 }
710
711 #[tarantool::test]
712 fn region_buffer_tiny_allocation() {
713 let _guard = RegionGuard::new();
715
716 let mut buffer = RegionBuffer::new();
717 buffer.push(&[1, 2, 3]).unwrap();
718 let data = unsafe { buffer.join().unwrap() };
719 assert_eq!(data, &[1, 2, 3]);
720 assert_eq!(data.as_ptr(), buffer.start);
723
724 let data2 = unsafe { buffer.join().unwrap() };
727 assert_eq!(data2, data);
728 assert_eq!(data2.as_ptr(), buffer.start);
729
730 let (data3, _) = buffer.into_raw_parts();
732 assert_eq!(data3, data);
733 assert_eq!(data3.as_ptr(), data.as_ptr());
734 }
735
736 #[tarantool::test]
737 fn region_buffer_big_allocation() {
738 const N: usize = 4923;
739 const M: usize = 85;
740 const K: usize = 10;
741
742 const {
743 const SLAB_SIZE: usize = u16::MAX as usize + 1;
745 assert!(N * M * K > SLAB_SIZE);
746 };
747
748 let t0 = std::time::Instant::now();
749
750 let mut input = Vec::with_capacity(N);
752 for i in 0..N {
753 let mut row = Vec::with_capacity(M);
754 for j in 0..M {
755 let mut col = Vec::with_capacity(K);
756 for k in 0..K {
757 let v = (1 + i + j) * (1 + k);
759 col.push(v as u8);
760 }
761 row.push(col);
762 }
763 input.push(row);
764 }
765
766 tarantool::say_info!("generating data took {:?}", t0.elapsed());
767
768 let t0 = std::time::Instant::now();
769
770 let mut buffer = RegionBuffer::new();
771 rmp_serde::encode::write(&mut buffer, &input).unwrap();
774 let data = unsafe { buffer.join().unwrap() };
775
776 tarantool::say_info!(
777 "serializing data to region allocator took {:?}",
778 t0.elapsed()
779 );
780
781 assert_ne!(data.as_ptr(), buffer.start);
784
785 let t0 = std::time::Instant::now();
786
787 let control = rmp_serde::to_vec(&input).unwrap();
789
790 tarantool::say_info!("serializing data to rust allocator took {:?}", t0.elapsed());
791
792 assert_eq!(control, data);
793 }
794}