1use core::slice;
16use std::{cmp::min, ptr::NonNull};
17
18use crate::{
19 buffer::manager::{
20 BUFFER_CAP_OFFSET, BUFFER_DATA_START_OFFSET, BUFFER_FLAG_OFFSET, BUFFER_SIZE_OFFSET,
21 HAS_NEXT_BUFFER_FLAG, NEXT_BUFFER_OFFSET, SLICE_IN_USED_FLAG,
22 },
23 error::Error,
24};
25
26#[derive(Default, Debug)]
27pub struct SliceList {
28 pub front_slice: Option<NonNull<BufferSlice>>,
29 pub write_slice: Option<NonNull<BufferSlice>>,
30 pub back_slice: Option<NonNull<BufferSlice>>,
31 len: usize,
32}
33
34impl SliceList {
35 pub fn new() -> Self {
36 Self::default()
37 }
38
39 #[inline]
40 pub fn front(&self) -> Option<&BufferSlice> {
41 unsafe { self.front_slice.map(|node| &(*node.as_ptr())) }
42 }
43
44 #[inline]
45 pub fn front_mut(&self) -> Option<&mut BufferSlice> {
46 unsafe { self.front_slice.map(|node| &mut (*node.as_ptr())) }
47 }
48
49 #[inline]
50 pub fn back(&self) -> Option<&BufferSlice> {
51 unsafe { self.back_slice.map(|node| &(*node.as_ptr())) }
52 }
53
54 #[inline]
55 pub fn back_mut(&self) -> Option<&mut BufferSlice> {
56 unsafe { self.back_slice.map(|node| &mut (*node.as_ptr())) }
57 }
58
59 #[inline]
60 pub fn write(&self) -> Option<&BufferSlice> {
61 unsafe { self.write_slice.map(|node| &(*node.as_ptr())) }
62 }
63
64 #[inline]
65 pub fn write_mut(&self) -> Option<&mut BufferSlice> {
66 unsafe { self.write_slice.map(|node| &mut (*node.as_ptr())) }
67 }
68
69 #[inline]
70 pub fn size(&self) -> usize {
71 self.len
72 }
73
74 pub fn push_back(&mut self, s: BufferSlice) {
75 unsafe {
76 let new = NonNull::new_unchecked(Box::into_raw(Box::new(s)));
77 if self.len > 0 {
78 self.back_slice.unwrap().as_mut().next_slice = Some(new);
79 } else {
80 self.front_slice = Some(new);
81 }
82 self.back_slice = Some(new);
83 self.len += 1;
84 }
85 }
86
87 pub fn pop_front(&mut self) -> Option<BufferSlice> {
88 unsafe {
89 let r = self.front_slice;
90 if self.len > 0 {
91 self.len -= 1;
92 self.front_slice = self.front_slice.unwrap().as_ref().next_slice;
93 }
94 if self.len == 0 {
95 self.front_slice = None;
96 self.back_slice = None;
97 }
98 r.map(|node| *Box::from_raw(node.as_ptr()))
99 }
100 }
101
102 pub fn split_from_write(&mut self) -> Option<BufferSlice> {
103 unsafe {
104 self.write_slice.and_then(|slice| {
105 let next_list_head = (*slice.as_ptr()).next_slice;
106 self.back_slice = Some(slice);
107 (*slice.as_ptr()).next_slice = None;
108 let mut next_list_size = 0;
109 let mut s = next_list_head;
110 while s.is_some() {
111 next_list_size += 1;
112 s = (*s.unwrap_unchecked().as_ptr()).next_slice;
113 }
114 self.len -= next_list_size;
115 next_list_head.map(|head| *Box::from_raw(head.as_ptr()))
116 })
117 }
118 }
119}
120
121#[derive(Debug, Eq, PartialEq, Clone)]
122pub struct BufferSlice {
123 pub buffer_header: Option<BufferHeader>,
126 pub data: *mut u8,
127 pub cap: u32,
128 pub start: u32,
130 pub offset_in_shm: u32,
131 pub read_index: usize,
132 pub write_index: usize,
133 pub is_from_shm: bool,
134 pub next_slice: Option<NonNull<BufferSlice>>,
135}
136
137unsafe impl Send for BufferSlice {}
138unsafe impl Sync for BufferSlice {}
139
140impl BufferSlice {
141 pub fn new(
142 header: Option<BufferHeader>,
143 data: &mut [u8],
144 offset_in_shm: u32,
145 is_from_shm: bool,
146 ) -> Self {
147 debug_assert!(!data.is_empty());
148
149 let len = data.len() as u32;
150 let mut s = Self {
151 buffer_header: None,
152 data: data.as_mut_ptr(),
153 cap: 0,
154 start: 0,
155 offset_in_shm,
156 read_index: 0,
157 write_index: 0,
158 is_from_shm,
159 next_slice: None,
160 };
161 if is_from_shm && header.is_some() {
162 let buffer_header = header.unwrap();
163 s.cap = buffer_header.cap();
164 s.start = buffer_header.start();
165 s.write_index = (s.start + buffer_header.size()) as usize;
166 s.buffer_header = Some(buffer_header);
167 } else {
168 s.cap = len;
169 }
170 s
171 }
172
173 pub fn update(&self) {
174 if let Some(buffer_header) = &self.buffer_header {
175 buffer_header.set_size(self.size() as u32);
176 buffer_header.set_start(self.start);
177
178 if let Some(next_slice) = &self.next_slice {
179 unsafe {
180 buffer_header.link_next((*next_slice.as_ptr()).offset_in_shm);
181 }
182 }
183 }
184 }
185
186 pub fn reset(&mut self) {
187 if let Some(buffer_header) = &self.buffer_header {
188 buffer_header.set_size(0);
189 buffer_header.set_start(0);
190 buffer_header.clear_flag()
191 }
192 self.write_index = 0;
193 self.read_index = 0;
194 self.next_slice = None;
195 }
196
197 pub fn size(&self) -> usize {
198 self.write_index - self.read_index
199 }
200
201 pub fn remain(&self) -> usize {
202 self.cap as usize - self.write_index
203 }
204
205 pub fn capacity(&self) -> usize {
206 self.cap as usize
207 }
208
209 pub fn reserve(&mut self, size: usize) -> Result<&mut [u8], Error> {
210 let start = self.write_index;
211 let remain = self.remain();
212 if remain >= size {
213 self.write_index += size;
214 return Ok(unsafe { slice::from_raw_parts_mut(self.data.add(start), size) });
215 }
216 Err(Error::NoMoreBuffer)
217 }
218
219 pub fn append(&mut self, data: &[u8]) -> usize {
220 if data.is_empty() {
221 return 0;
222 }
223 let copy_size = min(data.len(), self.remain());
224 unsafe {
225 self.data
226 .add(self.write_index)
227 .copy_from_nonoverlapping(data.as_ptr(), copy_size)
228 };
229 self.write_index += copy_size;
230 copy_size
231 }
232
233 #[must_use]
234 pub fn read(&mut self, mut size: usize) -> &[u8] {
235 size = min(size, self.size());
236 let data = unsafe { slice::from_raw_parts(self.data.add(self.read_index), size) };
237 self.read_index += size;
238 data
239 }
240
241 pub fn peek(&mut self, mut size: usize) -> &[u8] {
242 size = min(size, self.size());
243 unsafe { slice::from_raw_parts(self.data.add(self.read_index), size) }
244 }
245
246 pub fn skip(&mut self, size: usize) -> usize {
247 let un_read = self.size();
248 if un_read > size {
249 self.read_index += size;
250 return size;
251 }
252 self.read_index += un_read;
253 un_read
254 }
255
256 #[inline]
257 pub fn next(&self) -> Option<&BufferSlice> {
258 unsafe { self.next_slice.map(|node| &(*node.as_ptr())) }
259 }
260
261 #[inline]
262 pub fn next_mut(&self) -> Option<&mut BufferSlice> {
263 unsafe { self.next_slice.map(|node| &mut (*node.as_ptr())) }
264 }
265}
266
267#[derive(Eq, PartialEq, Debug, Clone)]
275pub struct BufferHeader(pub *mut u8);
276
277impl BufferHeader {
278 #[inline]
279 pub fn next_buffer_offset(&self) -> u32 {
280 unsafe { *(self.0.offset(NEXT_BUFFER_OFFSET as isize) as *const u32) }
281 }
282
283 #[inline]
284 pub fn has_next(&self) -> bool {
285 unsafe { *self.0.offset(BUFFER_FLAG_OFFSET as isize) & HAS_NEXT_BUFFER_FLAG > 0 }
286 }
287
288 #[inline]
289 pub fn clear_flag(&self) {
290 unsafe {
291 *self.0.offset(BUFFER_FLAG_OFFSET as isize) = 0u8;
292 }
293 }
294
295 #[inline]
296 pub fn set_in_used(&self) {
297 unsafe {
298 *self.0.offset(BUFFER_FLAG_OFFSET as isize) |= SLICE_IN_USED_FLAG;
299 }
300 }
301
302 #[inline]
303 pub fn is_in_used(&self) -> bool {
304 unsafe { *self.0.offset(BUFFER_FLAG_OFFSET as isize) & SLICE_IN_USED_FLAG > 0 }
305 }
306
307 #[inline]
308 pub fn link_next(&self, next: u32) {
309 unsafe {
310 *(self.0.offset(NEXT_BUFFER_OFFSET as isize) as *mut u32) = next;
311 *self.0.offset(BUFFER_FLAG_OFFSET as isize) |= HAS_NEXT_BUFFER_FLAG;
312 }
313 }
314
315 #[inline]
316 pub fn cap(&self) -> u32 {
317 unsafe { *(self.0.offset(BUFFER_CAP_OFFSET as isize) as *const u32) }
318 }
319
320 #[inline]
321 pub fn size(&self) -> u32 {
322 unsafe { *(self.0.offset(BUFFER_SIZE_OFFSET as isize) as *const u32) }
323 }
324
325 #[inline]
326 pub fn set_size(&self, size: u32) {
327 unsafe {
328 *(self.0.offset(BUFFER_SIZE_OFFSET as isize) as *mut u32) = size;
329 }
330 }
331
332 #[inline]
333 pub fn start(&self) -> u32 {
334 unsafe { *(self.0.offset(BUFFER_DATA_START_OFFSET as isize) as *const u32) }
335 }
336
337 #[inline]
338 pub fn set_start(&self, start: u32) {
339 unsafe {
340 *(self.0.offset(BUFFER_DATA_START_OFFSET as isize) as *mut u32) = start;
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use core::slice;
348
349 use memmap2::MmapOptions;
350 use rand::Rng;
351
352 use super::{BufferSlice, SliceList};
353 use crate::{
354 buffer::{
355 manager::{BUFFER_CAP_OFFSET, BUFFER_HEADER_SIZE, BufferManager},
356 slice::BufferHeader,
357 },
358 config::SizePercentPair,
359 };
360
361 #[test]
362 fn test_buffer_slice_read_write() {
363 const SIZE: usize = 8192;
364
365 let mut buf = [0u8; SIZE];
366 let mut slice = BufferSlice::new(None, &mut buf, 0, false);
367 for i in 0..SIZE {
368 let n = slice.append(&[i as u8]);
369 assert_eq!(n, 1);
370 }
371 let n = slice.append(&[10]);
372 assert_eq!(n, 0);
373
374 let data = slice.read(SIZE * 10);
375 assert_eq!(data.len(), SIZE);
376
377 (0..SIZE).for_each(|i| {
379 assert_eq!(data[i], i as u8);
380 });
381 }
382
383 #[test]
384 fn test_buffer_slice_skip() {
385 const SIZE: usize = 8192;
386
387 let mut buf = [0u8; SIZE];
388 let mut slice = BufferSlice::new(None, &mut buf, 0, false);
389 slice.append(&[0u8; SIZE]);
390 let mut remain = slice.capacity();
391
392 let n = slice.skip(10);
393 remain -= n;
394 assert_eq!(remain, slice.size());
395
396 let n = slice.skip(100);
397 remain -= n;
398 assert_eq!(remain, slice.size());
399
400 _ = slice.skip(10000);
401 assert_eq!(0, slice.size());
402 }
403
404 #[test]
405 fn test_buffer_slice_reserve() {
406 const SIZE: usize = 8192;
407
408 let mut buf = [0u8; SIZE];
409 let mut slice = BufferSlice::new(None, &mut buf, 0, false);
410 let data1 = slice.reserve(100).unwrap();
411 assert_eq!(100, data1.len());
412
413 (0..data1.len()).for_each(|i| data1[i] = i as u8);
414 let data1 = unsafe { slice::from_raw_parts(data1.as_ptr(), data1.len()) };
415
416 let data2 = slice.reserve(SIZE);
417 assert!(data2.is_err());
418
419 let read_data = slice.read(100);
420 assert_eq!(100, read_data.len());
421
422 (0..100).for_each(|i| {
423 assert_eq!(read_data[i], data1[i]);
424 });
425
426 let read_data = slice.read(10000);
427 assert_eq!(read_data.len(), 0);
428 }
429
430 #[test]
431 fn test_buffer_slice_update() {
432 const SIZE: usize = 8192;
433
434 let mut buf = [0u8; SIZE];
435 let mut header = [0u8; BUFFER_HEADER_SIZE as usize];
436 unsafe {
437 *(header.as_mut_ptr().offset(BUFFER_CAP_OFFSET as isize) as *mut u32) = 8192u32;
438 }
439 let mut slice =
440 BufferSlice::new(Some(BufferHeader(header.as_mut_ptr())), &mut buf, 0, true);
441
442 let n = slice.append(&[0u8; SIZE]);
443 assert_eq!(n, SIZE);
444 slice.update();
445 assert_eq!(SIZE, slice.buffer_header.as_ref().unwrap().size() as usize);
446 }
447
448 #[test]
449 fn test_buffer_slice_linked_next() {
450 const SIZE: usize = 8192;
451 const SLICE_NUM: usize = 100;
452
453 let mut slices = Vec::with_capacity(SLICE_NUM);
454 let mem = MmapOptions::new().len(10 << 20).map_anon().unwrap();
455 let bm = BufferManager::create(
456 &[SizePercentPair {
457 size: 8192,
458 percent: 100,
459 }],
460 "",
461 mem,
462 0,
463 )
464 .unwrap();
465
466 let mut write_data_array = Vec::with_capacity(100);
467
468 for _ in 0..SLICE_NUM {
469 let mut s = bm.alloc_shm_buffer(SIZE as u32).unwrap();
470 let mut rng = rand::rng();
471 let data: Vec<u8> = (0..SIZE).map(|_| rng.random()).collect();
472 assert_eq!(s.append(&data), SIZE);
473 s.update();
474 slices.push(s);
475 write_data_array.push(data);
476 }
477
478 for i in 0..slices.len() - 1 {
479 slices[i]
480 .buffer_header
481 .as_ref()
482 .unwrap()
483 .link_next(slices[i + 1].offset_in_shm);
484 }
485
486 let mut next = slices[0].offset_in_shm;
487 (0..SLICE_NUM).for_each(|i| {
488 let mut s = bm.read_buffer_slice(next).unwrap();
489 assert_eq!(s.capacity(), SIZE);
490 assert_eq!(s.size(), SIZE);
491 let read_data = s.read(SIZE);
492 assert_eq!(read_data.len(), SIZE);
493 (0..SIZE).for_each(|j| {
494 assert_eq!(read_data[j], write_data_array[i][j]);
495 });
496 let is_last_slice = i == SLICE_NUM - 1;
497 assert_eq!(s.buffer_header.as_ref().unwrap().has_next(), !is_last_slice);
498 next = s.buffer_header.as_ref().unwrap().next_buffer_offset();
499 });
500 }
501
502 #[test]
503 fn test_slice_list_push_pop() {
504 let mut l = SliceList::new();
506 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
507 assert_eq!(l.front(), l.back());
508 assert_eq!(1, l.size());
509
510 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
511 assert_eq!(2, l.size());
512 assert_ne!(l.front(), l.back());
513
514 l.pop_front();
515 assert_eq!(1, l.size());
516 assert_eq!(l.front(), l.back());
517
518 l.pop_front();
519 assert_eq!(0, l.size());
520 assert!(l.front().is_none());
521 assert!(l.back().is_none());
522
523 for i in 0..100 {
525 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
526 assert_eq!(i + 1, l.size());
527 }
528 for i in 0..100 {
529 l.pop_front();
530 assert_eq!(100 - (i + 1), l.size());
531 }
532 assert_eq!(0, l.size());
533 assert!(l.front().is_none());
534 assert!(l.back().is_none());
535 }
536
537 #[test]
538 fn test_slice_list_split_from_write() {
539 let mut l = SliceList::new();
541 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
542 l.write_slice = l.front_slice;
543 assert!(l.split_from_write().is_none());
544 assert_eq!(1, l.size());
545 assert_eq!(l.front(), l.back());
546 assert_eq!(l.back(), l.write());
547
548 let mut l = SliceList::new();
550 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
551 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
552 l.write_slice = l.front_slice;
553 l.split_from_write();
554
555 assert_eq!(1, l.size());
556 assert_eq!(l.front(), l.back());
557 assert_eq!(l.back(), l.write());
558
559 let mut l = SliceList::new();
561 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
562 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
563 l.write_slice = l.back_slice;
564 assert!(l.split_from_write().is_none());
565 assert_eq!(2, l.size());
566 assert_eq!(l.back(), l.write());
567
568 let mut l = SliceList::new();
570 for i in 0..100 {
571 l.push_back(BufferSlice::new(None, &mut [0; 1024], 0, false));
572 if i == 50 {
573 l.write_slice = l.back_slice;
574 }
575 }
576 l.split_from_write();
577 assert_eq!(l.back(), l.write());
578
579 assert_eq!(51, l.size());
580 }
581}