shmipc/buffer/
slice.rs

1// Copyright 2025 CloudWeGo Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::slice;
16use std::{cmp::min, ptr::NonNull};
17
18use crate::{
19    buffer::manager::{
20        BUFFER_CAP_OFFSET, BUFFER_DATA_START_OFFSET, BUFFER_FLAG_OFFSET, BUFFER_SIZE_OFFSET,
21        HAS_NEXT_BUFFER_FLAG, NEXT_BUFFER_OFFSET, SLICE_IN_USED_FLAG,
22    },
23    error::Error,
24};
25
26#[derive(Default, Debug)]
27pub struct SliceList {
28    pub front_slice: Option<NonNull<BufferSlice>>,
29    pub write_slice: Option<NonNull<BufferSlice>>,
30    pub back_slice: Option<NonNull<BufferSlice>>,
31    len: usize,
32}
33
34impl SliceList {
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    #[inline]
40    pub fn front(&self) -> Option<&BufferSlice> {
41        unsafe { self.front_slice.map(|node| &(*node.as_ptr())) }
42    }
43
44    #[inline]
45    pub fn front_mut(&self) -> Option<&mut BufferSlice> {
46        unsafe { self.front_slice.map(|node| &mut (*node.as_ptr())) }
47    }
48
49    #[inline]
50    pub fn back(&self) -> Option<&BufferSlice> {
51        unsafe { self.back_slice.map(|node| &(*node.as_ptr())) }
52    }
53
54    #[inline]
55    pub fn back_mut(&self) -> Option<&mut BufferSlice> {
56        unsafe { self.back_slice.map(|node| &mut (*node.as_ptr())) }
57    }
58
59    #[inline]
60    pub fn write(&self) -> Option<&BufferSlice> {
61        unsafe { self.write_slice.map(|node| &(*node.as_ptr())) }
62    }
63
64    #[inline]
65    pub fn write_mut(&self) -> Option<&mut BufferSlice> {
66        unsafe { self.write_slice.map(|node| &mut (*node.as_ptr())) }
67    }
68
69    #[inline]
70    pub fn size(&self) -> usize {
71        self.len
72    }
73
74    pub fn push_back(&mut self, s: BufferSlice) {
75        unsafe {
76            let new = NonNull::new_unchecked(Box::into_raw(Box::new(s)));
77            if self.len > 0 {
78                self.back_slice.unwrap().as_mut().next_slice = Some(new);
79            } else {
80                self.front_slice = Some(new);
81            }
82            self.back_slice = Some(new);
83            self.len += 1;
84        }
85    }
86
87    pub fn pop_front(&mut self) -> Option<BufferSlice> {
88        unsafe {
89            let r = self.front_slice;
90            if self.len > 0 {
91                self.len -= 1;
92                self.front_slice = self.front_slice.unwrap().as_ref().next_slice;
93            }
94            if self.len == 0 {
95                self.front_slice = None;
96                self.back_slice = None;
97            }
98            r.map(|node| *Box::from_raw(node.as_ptr()))
99        }
100    }
101
102    pub fn split_from_write(&mut self) -> Option<BufferSlice> {
103        unsafe {
104            self.write_slice.and_then(|slice| {
105                let next_list_head = (*slice.as_ptr()).next_slice;
106                self.back_slice = Some(slice);
107                (*slice.as_ptr()).next_slice = None;
108                let mut next_list_size = 0;
109                let mut s = next_list_head;
110                while s.is_some() {
111                    next_list_size += 1;
112                    s = (*s.unwrap_unchecked().as_ptr()).next_slice;
113                }
114                self.len -= next_list_size;
115                next_list_head.map(|head| *Box::from_raw(head.as_ptr()))
116            })
117        }
118    }
119}
120
121#[derive(Debug, Eq, PartialEq, Clone)]
122pub struct BufferSlice {
123    /// BufferHeader layout: cap 4 byte | size 4 byte | start 4 byte | next 4 byte | flag 2 byte |
124    /// unused 2 byte
125    pub buffer_header: Option<BufferHeader>,
126    pub data: *mut u8,
127    pub cap: u32,
128    /// use for prepend
129    pub start: u32,
130    pub offset_in_shm: u32,
131    pub read_index: usize,
132    pub write_index: usize,
133    pub is_from_shm: bool,
134    pub next_slice: Option<NonNull<BufferSlice>>,
135}
136
137unsafe impl Send for BufferSlice {}
138unsafe impl Sync for BufferSlice {}
139
140impl BufferSlice {
141    pub fn new(
142        header: Option<BufferHeader>,
143        data: &mut [u8],
144        offset_in_shm: u32,
145        is_from_shm: bool,
146    ) -> Self {
147        debug_assert!(!data.is_empty());
148
149        let len = data.len() as u32;
150        let mut s = Self {
151            buffer_header: None,
152            data: data.as_mut_ptr(),
153            cap: 0,
154            start: 0,
155            offset_in_shm,
156            read_index: 0,
157            write_index: 0,
158            is_from_shm,
159            next_slice: None,
160        };
161        if is_from_shm && header.is_some() {
162            let buffer_header = header.unwrap();
163            s.cap = buffer_header.cap();
164            s.start = buffer_header.start();
165            s.write_index = (s.start + buffer_header.size()) as usize;
166            s.buffer_header = Some(buffer_header);
167        } else {
168            s.cap = len;
169        }
170        s
171    }
172
173    pub fn update(&self) {
174        if let Some(buffer_header) = &self.buffer_header {
175            buffer_header.set_size(self.size() as u32);
176            buffer_header.set_start(self.start);
177
178            if let Some(next_slice) = &self.next_slice {
179                unsafe {
180                    buffer_header.link_next((*next_slice.as_ptr()).offset_in_shm);
181                }
182            }
183        }
184    }
185
186    pub fn reset(&mut self) {
187        if let Some(buffer_header) = &self.buffer_header {
188            buffer_header.set_size(0);
189            buffer_header.set_start(0);
190            buffer_header.clear_flag()
191        }
192        self.write_index = 0;
193        self.read_index = 0;
194        self.next_slice = None;
195    }
196
197    pub fn size(&self) -> usize {
198        self.write_index - self.read_index
199    }
200
201    pub fn remain(&self) -> usize {
202        self.cap as usize - self.write_index
203    }
204
205    pub fn capacity(&self) -> usize {
206        self.cap as usize
207    }
208
209    pub fn reserve(&mut self, size: usize) -> Result<&mut [u8], Error> {
210        let start = self.write_index;
211        let remain = self.remain();
212        if remain >= size {
213            self.write_index += size;
214            return Ok(unsafe { slice::from_raw_parts_mut(self.data.add(start), size) });
215        }
216        Err(Error::NoMoreBuffer)
217    }
218
219    pub fn append(&mut self, data: &[u8]) -> usize {
220        if data.is_empty() {
221            return 0;
222        }
223        let copy_size = min(data.len(), self.remain());
224        unsafe {
225            self.data
226                .add(self.write_index)
227                .copy_from_nonoverlapping(data.as_ptr(), copy_size)
228        };
229        self.write_index += copy_size;
230        copy_size
231    }
232
233    #[must_use]
234    pub fn read(&mut self, mut size: usize) -> &[u8] {
235        size = min(size, self.size());
236        let data = unsafe { slice::from_raw_parts(self.data.add(self.read_index), size) };
237        self.read_index += size;
238        data
239    }
240
241    pub fn peek(&mut self, mut size: usize) -> &[u8] {
242        size = min(size, self.size());
243        unsafe { slice::from_raw_parts(self.data.add(self.read_index), size) }
244    }
245
246    pub fn skip(&mut self, size: usize) -> usize {
247        let un_read = self.size();
248        if un_read > size {
249            self.read_index += size;
250            return size;
251        }
252        self.read_index += un_read;
253        un_read
254    }
255
256    #[inline]
257    pub fn next(&self) -> Option<&BufferSlice> {
258        unsafe { self.next_slice.map(|node| &(*node.as_ptr())) }
259    }
260
261    #[inline]
262    pub fn next_mut(&self) -> Option<&mut BufferSlice> {
263        unsafe { self.next_slice.map(|node| &mut (*node.as_ptr())) }
264    }
265}
266
267/// BufferHeader is the header of a buffer slice.
268///
269/// Layout: cap 4 byte | size 4 byte | start 4 byte | next 4 byte | flag 2 byte | unused 2 byte
270///
271/// # Safety
272///
273/// Make sure it is well initialized before use and see ptr.offset method safety requirements.
274#[derive(Eq, PartialEq, Debug, Clone)]
275pub struct BufferHeader(pub *mut u8);
276
277impl BufferHeader {
278    #[inline]
279    pub fn next_buffer_offset(&self) -> u32 {
280        unsafe { *(self.0.offset(NEXT_BUFFER_OFFSET as isize) as *const u32) }
281    }
282
283    #[inline]
284    pub fn has_next(&self) -> bool {
285        unsafe { *self.0.offset(BUFFER_FLAG_OFFSET as isize) & HAS_NEXT_BUFFER_FLAG > 0 }
286    }
287
288    #[inline]
289    pub fn clear_flag(&self) {
290        unsafe {
291            *self.0.offset(BUFFER_FLAG_OFFSET as isize) = 0u8;
292        }
293    }
294
295    #[inline]
296    pub fn set_in_used(&self) {
297        unsafe {
298            *self.0.offset(BUFFER_FLAG_OFFSET as isize) |= SLICE_IN_USED_FLAG;
299        }
300    }
301
302    #[inline]
303    pub fn is_in_used(&self) -> bool {
304        unsafe { *self.0.offset(BUFFER_FLAG_OFFSET as isize) & SLICE_IN_USED_FLAG > 0 }
305    }
306
307    #[inline]
308    pub fn link_next(&self, next: u32) {
309        unsafe {
310            *(self.0.offset(NEXT_BUFFER_OFFSET as isize) as *mut u32) = next;
311            *self.0.offset(BUFFER_FLAG_OFFSET as isize) |= HAS_NEXT_BUFFER_FLAG;
312        }
313    }
314
315    #[inline]
316    pub fn cap(&self) -> u32 {
317        unsafe { *(self.0.offset(BUFFER_CAP_OFFSET as isize) as *const u32) }
318    }
319
320    #[inline]
321    pub fn size(&self) -> u32 {
322        unsafe { *(self.0.offset(BUFFER_SIZE_OFFSET as isize) as *const u32) }
323    }
324
325    #[inline]
326    pub fn set_size(&self, size: u32) {
327        unsafe {
328            *(self.0.offset(BUFFER_SIZE_OFFSET as isize) as *mut u32) = size;
329        }
330    }
331
332    #[inline]
333    pub fn start(&self) -> u32 {
334        unsafe { *(self.0.offset(BUFFER_DATA_START_OFFSET as isize) as *const u32) }
335    }
336
337    #[inline]
338    pub fn set_start(&self, start: u32) {
339        unsafe {
340            *(self.0.offset(BUFFER_DATA_START_OFFSET as isize) as *mut u32) = start;
341        }
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use core::slice;
348
349    use memmap2::MmapOptions;
350    use rand::Rng;
351
352    use super::{BufferSlice, SliceList};
353    use crate::{
354        buffer::{
355            manager::{BUFFER_CAP_OFFSET, BUFFER_HEADER_SIZE, BufferManager},
356            slice::BufferHeader,
357        },
358        config::SizePercentPair,
359    };
360
361    #[test]
362    fn test_buffer_slice_read_write() {
363        const SIZE: usize = 8192;
364
365        let mut buf = [0u8; SIZE];
366        let mut slice = BufferSlice::new(None, &mut buf, 0, false);
367        for i in 0..SIZE {
368            let n = slice.append(&[i as u8]);
369            assert_eq!(n, 1);
370        }
371        let n = slice.append(&[10]);
372        assert_eq!(n, 0);
373
374        let data = slice.read(SIZE * 10);
375        assert_eq!(data.len(), SIZE);
376
377        // vertfy read data.
378        (0..SIZE).for_each(|i| {
379            assert_eq!(data[i], i as u8);
380        });
381    }
382
383    #[test]
384    fn test_buffer_slice_skip() {
385        const SIZE: usize = 8192;
386
387        let mut buf = [0u8; SIZE];
388        let mut slice = BufferSlice::new(None, &mut buf, 0, false);
389        slice.append(&[0u8; SIZE]);
390        let mut remain = slice.capacity();
391
392        let n = slice.skip(10);
393        remain -= n;
394        assert_eq!(remain, slice.size());
395
396        let n = slice.skip(100);
397        remain -= n;
398        assert_eq!(remain, slice.size());
399
400        _ = slice.skip(10000);
401        assert_eq!(0, slice.size());
402    }
403
404    #[test]
405    fn test_buffer_slice_reserve() {
406        const SIZE: usize = 8192;
407
408        let mut buf = [0u8; SIZE];
409        let mut slice = BufferSlice::new(None, &mut buf, 0, false);
410        let data1 = slice.reserve(100).unwrap();
411        assert_eq!(100, data1.len());
412
413        (0..data1.len()).for_each(|i| data1[i] = i as u8);
414        let data1 = unsafe { slice::from_raw_parts(data1.as_ptr(), data1.len()) };
415
416        let data2 = slice.reserve(SIZE);
417        assert!(data2.is_err());
418
419        let read_data = slice.read(100);
420        assert_eq!(100, read_data.len());
421
422        (0..100).for_each(|i| {
423            assert_eq!(read_data[i], data1[i]);
424        });
425
426        let read_data = slice.read(10000);
427        assert_eq!(read_data.len(), 0);
428    }
429
430    #[test]
431    fn test_buffer_slice_update() {
432        const SIZE: usize = 8192;
433
434        let mut buf = [0u8; SIZE];
435        let mut header = [0u8; BUFFER_HEADER_SIZE as usize];
436        unsafe {
437            *(header.as_mut_ptr().offset(BUFFER_CAP_OFFSET as isize) as *mut u32) = 8192u32;
438        }
439        let mut slice =
440            BufferSlice::new(Some(BufferHeader(header.as_mut_ptr())), &mut buf, 0, true);
441
442        let n = slice.append(&[0u8; SIZE]);
443        assert_eq!(n, SIZE);
444        slice.update();
445        assert_eq!(SIZE, slice.buffer_header.as_ref().unwrap().size() as usize);
446    }
447
448    #[test]
449    fn test_buffer_slice_linked_next() {
450        const SIZE: usize = 8192;
451        const SLICE_NUM: usize = 100;
452
453        let mut slices = Vec::with_capacity(SLICE_NUM);
454        let mem = MmapOptions::new().len(10 << 20).map_anon().unwrap();
455        let bm = BufferManager::create(
456            &[SizePercentPair {
457                size: 8192,
458                percent: 100,
459            }],
460            "",
461            mem,
462            0,
463        )
464        .unwrap();
465
466        let mut write_data_array = Vec::with_capacity(100);
467
468        for _ in 0..SLICE_NUM {
469            let mut s = bm.alloc_shm_buffer(SIZE as u32).unwrap();
470            let mut rng = rand::rng();
471            let data: Vec<u8> = (0..SIZE).map(|_| rng.random()).collect();
472            assert_eq!(s.append(&data), SIZE);
473            s.update();
474            slices.push(s);
475            write_data_array.push(data);
476        }
477
478        for i in 0..slices.len() - 1 {
479            slices[i]
480                .buffer_header
481                .as_ref()
482                .unwrap()
483                .link_next(slices[i + 1].offset_in_shm);
484        }
485
486        let mut next = slices[0].offset_in_shm;
487        (0..SLICE_NUM).for_each(|i| {
488            let mut s = bm.read_buffer_slice(next).unwrap();
489            assert_eq!(s.capacity(), SIZE);
490            assert_eq!(s.size(), SIZE);
491            let read_data = s.read(SIZE);
492            assert_eq!(read_data.len(), SIZE);
493            (0..SIZE).for_each(|j| {
494                assert_eq!(read_data[j], write_data_array[i][j]);
495            });
496            let is_last_slice = i == SLICE_NUM - 1;
497            assert_eq!(s.buffer_header.as_ref().unwrap().has_next(), !is_last_slice);
498            next = s.buffer_header.as_ref().unwrap().next_buffer_offset();
499        });
500    }
501
502    #[test]
503    fn test_slice_list_push_pop() {
504        // 1. twice push, twice pop
505        let mut l = SliceList::new();
506        l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
507        assert_eq!(l.front(), l.back());
508        assert_eq!(1, l.size());
509
510        l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
511        assert_eq!(2, l.size());
512        assert_ne!(l.front(), l.back());
513
514        l.pop_front();
515        assert_eq!(1, l.size());
516        assert_eq!(l.front(), l.back());
517
518        l.pop_front();
519        assert_eq!(0, l.size());
520        assert!(l.front().is_none());
521        assert!(l.back().is_none());
522
523        // multi push and pop
524        for i in 0..100 {
525            l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
526            assert_eq!(i + 1, l.size());
527        }
528        for i in 0..100 {
529            l.pop_front();
530            assert_eq!(100 - (i + 1), l.size());
531        }
532        assert_eq!(0, l.size());
533        assert!(l.front().is_none());
534        assert!(l.back().is_none());
535    }
536
537    #[test]
538    fn test_slice_list_split_from_write() {
539        // 1. sliceList's size == 1
540        let mut l = SliceList::new();
541        l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
542        l.write_slice = l.front_slice;
543        assert!(l.split_from_write().is_none());
544        assert_eq!(1, l.size());
545        assert_eq!(l.front(), l.back());
546        assert_eq!(l.back(), l.write());
547
548        // 2. sliceList's size == 2, writeSlice's index is 0
549        let mut l = SliceList::new();
550        l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
551        l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
552        l.write_slice = l.front_slice;
553        l.split_from_write();
554
555        assert_eq!(1, l.size());
556        assert_eq!(l.front(), l.back());
557        assert_eq!(l.back(), l.write());
558
559        // 2. sliceList's size == 2, writeSlice's index is 1
560        let mut l = SliceList::new();
561        l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
562        l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
563        l.write_slice = l.back_slice;
564        assert!(l.split_from_write().is_none());
565        assert_eq!(2, l.size());
566        assert_eq!(l.back(), l.write());
567
568        // 3. sliceList's size == 3, writeSlice's index is 50
569        let mut l = SliceList::new();
570        for i in 0..100 {
571            l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
572            if i == 50 {
573                l.write_slice = l.back_slice;
574            }
575        }
576        l.split_from_write();
577        assert_eq!(l.back(), l.write());
578
579        assert_eq!(51, l.size());
580    }
581}