1use crossbeam_queue::ArrayQueue;
10use std::sync::Arc;
11
12pub trait ReadBuffer {
16 fn as_slice(&self) -> &[u8];
18
19 fn len(&self) -> usize;
21
22 #[must_use]
24 fn is_empty(&self) -> bool {
25 self.len() == 0
26 }
27
28 #[inline(always)]
33 fn get_u8(&self, offset: usize) -> u8 {
34 self.as_slice()[offset]
35 }
36
37 #[inline(always)]
42 fn get_i8(&self, offset: usize) -> i8 {
43 self.as_slice()[offset] as i8
44 }
45
46 #[inline(always)]
51 fn get_u16_le(&self, offset: usize) -> u16 {
52 let bytes = &self.as_slice()[offset..offset + 2];
53 u16::from_le_bytes([bytes[0], bytes[1]])
54 }
55
56 #[inline(always)]
61 fn get_i16_le(&self, offset: usize) -> i16 {
62 let bytes = &self.as_slice()[offset..offset + 2];
63 i16::from_le_bytes([bytes[0], bytes[1]])
64 }
65
66 #[inline(always)]
71 fn get_u32_le(&self, offset: usize) -> u32 {
72 let bytes = &self.as_slice()[offset..offset + 4];
73 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
74 }
75
76 #[inline(always)]
81 fn get_i32_le(&self, offset: usize) -> i32 {
82 let bytes = &self.as_slice()[offset..offset + 4];
83 i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
84 }
85
86 #[inline(always)]
91 fn get_u64_le(&self, offset: usize) -> u64 {
92 let bytes = &self.as_slice()[offset..offset + 8];
93 u64::from_le_bytes(bytes.try_into().unwrap())
94 }
95
96 #[inline(always)]
101 fn get_i64_le(&self, offset: usize) -> i64 {
102 let bytes = &self.as_slice()[offset..offset + 8];
103 i64::from_le_bytes(bytes.try_into().unwrap())
104 }
105
106 #[inline(always)]
111 fn get_f32_le(&self, offset: usize) -> f32 {
112 f32::from_bits(self.get_u32_le(offset))
113 }
114
115 #[inline(always)]
120 fn get_f64_le(&self, offset: usize) -> f64 {
121 f64::from_bits(self.get_u64_le(offset))
122 }
123
124 #[inline(always)]
130 fn get_bytes(&self, offset: usize, len: usize) -> &[u8] {
131 &self.as_slice()[offset..offset + len]
132 }
133
134 #[inline]
141 fn get_str(&self, offset: usize, len: usize) -> &str {
142 let bytes = self.get_bytes(offset, len);
143 let end = bytes.iter().position(|&b| b == 0).unwrap_or(len);
144 std::str::from_utf8(&bytes[..end]).unwrap_or("")
145 }
146}
147
148pub trait WriteBuffer: ReadBuffer {
152 fn as_mut_slice(&mut self) -> &mut [u8];
154
155 #[inline(always)]
161 fn put_u8(&mut self, offset: usize, value: u8) {
162 self.as_mut_slice()[offset] = value;
163 }
164
165 #[inline(always)]
171 fn put_i8(&mut self, offset: usize, value: i8) {
172 self.as_mut_slice()[offset] = value as u8;
173 }
174
175 #[inline(always)]
181 fn put_u16_le(&mut self, offset: usize, value: u16) {
182 let bytes = value.to_le_bytes();
183 self.as_mut_slice()[offset..offset + 2].copy_from_slice(&bytes);
184 }
185
186 #[inline(always)]
192 fn put_i16_le(&mut self, offset: usize, value: i16) {
193 let bytes = value.to_le_bytes();
194 self.as_mut_slice()[offset..offset + 2].copy_from_slice(&bytes);
195 }
196
197 #[inline(always)]
203 fn put_u32_le(&mut self, offset: usize, value: u32) {
204 let bytes = value.to_le_bytes();
205 self.as_mut_slice()[offset..offset + 4].copy_from_slice(&bytes);
206 }
207
208 #[inline(always)]
214 fn put_i32_le(&mut self, offset: usize, value: i32) {
215 let bytes = value.to_le_bytes();
216 self.as_mut_slice()[offset..offset + 4].copy_from_slice(&bytes);
217 }
218
219 #[inline(always)]
225 fn put_u64_le(&mut self, offset: usize, value: u64) {
226 let bytes = value.to_le_bytes();
227 self.as_mut_slice()[offset..offset + 8].copy_from_slice(&bytes);
228 }
229
230 #[inline(always)]
236 fn put_i64_le(&mut self, offset: usize, value: i64) {
237 let bytes = value.to_le_bytes();
238 self.as_mut_slice()[offset..offset + 8].copy_from_slice(&bytes);
239 }
240
241 #[inline(always)]
247 fn put_f32_le(&mut self, offset: usize, value: f32) {
248 self.put_u32_le(offset, value.to_bits());
249 }
250
251 #[inline(always)]
257 fn put_f64_le(&mut self, offset: usize, value: f64) {
258 self.put_u64_le(offset, value.to_bits());
259 }
260
261 #[inline(always)]
267 fn put_bytes(&mut self, offset: usize, src: &[u8]) {
268 self.as_mut_slice()[offset..offset + src.len()].copy_from_slice(src);
269 }
270
271 #[inline]
278 fn put_str(&mut self, offset: usize, value: &str, max_len: usize) {
279 let bytes = value.as_bytes();
280 let copy_len = bytes.len().min(max_len);
281 self.as_mut_slice()[offset..offset + copy_len].copy_from_slice(&bytes[..copy_len]);
282 if copy_len < max_len {
283 self.as_mut_slice()[offset + copy_len..offset + max_len].fill(0);
284 }
285 }
286
287 #[inline]
293 fn zero(&mut self, offset: usize, len: usize) {
294 self.as_mut_slice()[offset..offset + len].fill(0);
295 }
296}
297
298impl ReadBuffer for [u8] {
300 #[inline(always)]
301 fn as_slice(&self) -> &[u8] {
302 self
303 }
304
305 #[inline(always)]
306 fn len(&self) -> usize {
307 <[u8]>::len(self)
308 }
309}
310
311impl ReadBuffer for Vec<u8> {
313 #[inline(always)]
314 fn as_slice(&self) -> &[u8] {
315 self
316 }
317
318 #[inline(always)]
319 fn len(&self) -> usize {
320 Vec::len(self)
321 }
322}
323
324impl WriteBuffer for Vec<u8> {
326 #[inline(always)]
327 fn as_mut_slice(&mut self) -> &mut [u8] {
328 self
329 }
330}
331
332#[repr(C, align(64))]
340#[derive(Clone)]
341pub struct AlignedBuffer<const N: usize> {
342 data: [u8; N],
343}
344
345impl<const N: usize> AlignedBuffer<N> {
346 #[must_use]
348 pub const fn new() -> Self {
349 Self { data: [0u8; N] }
350 }
351
352 #[must_use]
354 pub const fn zeroed() -> Self {
355 Self { data: [0u8; N] }
356 }
357
358 #[must_use]
360 pub const fn capacity(&self) -> usize {
361 N
362 }
363}
364
365impl<const N: usize> Default for AlignedBuffer<N> {
366 fn default() -> Self {
367 Self::new()
368 }
369}
370
371impl<const N: usize> ReadBuffer for AlignedBuffer<N> {
372 #[inline(always)]
373 fn as_slice(&self) -> &[u8] {
374 &self.data
375 }
376
377 #[inline(always)]
378 fn len(&self) -> usize {
379 N
380 }
381}
382
383impl<const N: usize> WriteBuffer for AlignedBuffer<N> {
384 #[inline(always)]
385 fn as_mut_slice(&mut self) -> &mut [u8] {
386 &mut self.data
387 }
388}
389
390impl<const N: usize> AsRef<[u8]> for AlignedBuffer<N> {
391 fn as_ref(&self) -> &[u8] {
392 &self.data
393 }
394}
395
396impl<const N: usize> AsMut<[u8]> for AlignedBuffer<N> {
397 fn as_mut(&mut self) -> &mut [u8] {
398 &mut self.data
399 }
400}
401
402impl<const N: usize> std::fmt::Debug for AlignedBuffer<N> {
403 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
404 f.debug_struct("AlignedBuffer")
405 .field("capacity", &N)
406 .finish()
407 }
408}
409
410pub const DEFAULT_BUFFER_SIZE: usize = 65536;
412
413pub struct BufferPool {
418 buffers: Arc<ArrayQueue<Box<AlignedBuffer<DEFAULT_BUFFER_SIZE>>>>,
419 capacity: usize,
420}
421
422impl BufferPool {
423 #[must_use]
428 pub fn new(capacity: usize) -> Self {
429 let buffers = ArrayQueue::new(capacity);
430 for _ in 0..capacity {
431 let _ = buffers.push(Box::new(AlignedBuffer::zeroed()));
432 }
433 Self {
434 buffers: Arc::new(buffers),
435 capacity,
436 }
437 }
438
439 #[inline]
443 #[must_use]
444 pub fn acquire(&self) -> Option<Box<AlignedBuffer<DEFAULT_BUFFER_SIZE>>> {
445 self.buffers.pop()
446 }
447
448 #[inline]
455 pub fn release(&self, mut buffer: Box<AlignedBuffer<DEFAULT_BUFFER_SIZE>>) {
456 buffer.as_mut_slice().fill(0);
457 let _ = self.buffers.push(buffer);
458 }
459
460 #[must_use]
462 pub fn capacity(&self) -> usize {
463 self.capacity
464 }
465
466 #[must_use]
468 pub fn available(&self) -> usize {
469 self.buffers.len()
470 }
471}
472
473impl Clone for BufferPool {
474 fn clone(&self) -> Self {
475 Self {
476 buffers: Arc::clone(&self.buffers),
477 capacity: self.capacity,
478 }
479 }
480}
481
482impl std::fmt::Debug for BufferPool {
483 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
484 f.debug_struct("BufferPool")
485 .field("capacity", &self.capacity)
486 .field("available", &self.buffers.len())
487 .finish()
488 }
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494
495 #[test]
496 fn test_aligned_buffer_creation() {
497 let buf: AlignedBuffer<1024> = AlignedBuffer::new();
498 assert_eq!(buf.len(), 1024);
499 assert!(buf.as_slice().iter().all(|&b| b == 0));
500 }
501
502 #[test]
503 fn test_aligned_buffer_alignment() {
504 let buf: AlignedBuffer<64> = AlignedBuffer::new();
505 let ptr = buf.as_slice().as_ptr() as usize;
506 assert_eq!(ptr % 64, 0, "Buffer should be 64-byte aligned");
507 }
508
509 #[test]
510 fn test_read_write_primitives() {
511 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
512
513 buf.put_u8(0, 0xFF);
514 assert_eq!(buf.get_u8(0), 0xFF);
515
516 buf.put_i8(1, -42);
517 assert_eq!(buf.get_i8(1), -42);
518
519 buf.put_u16_le(2, 0x1234);
520 assert_eq!(buf.get_u16_le(2), 0x1234);
521
522 buf.put_i16_le(4, -1000);
523 assert_eq!(buf.get_i16_le(4), -1000);
524
525 buf.put_u32_le(8, 0x12345678);
526 assert_eq!(buf.get_u32_le(8), 0x12345678);
527
528 buf.put_i32_le(12, -100000);
529 assert_eq!(buf.get_i32_le(12), -100000);
530
531 buf.put_u64_le(16, 0x123456789ABCDEF0);
532 assert_eq!(buf.get_u64_le(16), 0x123456789ABCDEF0);
533
534 buf.put_i64_le(24, -1_000_000_000_000);
535 assert_eq!(buf.get_i64_le(24), -1_000_000_000_000);
536
537 buf.put_f32_le(32, std::f32::consts::PI);
538 assert!((buf.get_f32_le(32) - std::f32::consts::PI).abs() < 0.00001);
539
540 buf.put_f64_le(40, std::f64::consts::PI);
541 assert!((buf.get_f64_le(40) - std::f64::consts::PI).abs() < 0.0000001);
542 }
543
544 #[test]
545 fn test_read_write_bytes() {
546 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
547 let data = b"Hello, SBE!";
548
549 buf.put_bytes(0, data);
550 assert_eq!(buf.get_bytes(0, data.len()), data);
551 }
552
553 #[test]
554 fn test_read_write_str() {
555 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
556
557 buf.put_str(0, "AAPL", 8);
558 assert_eq!(buf.get_str(0, 8), "AAPL");
559
560 buf.put_str(8, "VERY_LONG_SYMBOL", 8);
561 assert_eq!(buf.get_str(8, 8), "VERY_LON");
562 }
563
564 #[test]
565 fn test_buffer_pool() {
566 let pool = BufferPool::new(4);
567 assert_eq!(pool.capacity(), 4);
568 assert_eq!(pool.available(), 4);
569
570 let buf1 = pool.acquire().expect("Should acquire buffer");
571 assert_eq!(pool.available(), 3);
572
573 let buf2 = pool.acquire().expect("Should acquire buffer");
574 assert_eq!(pool.available(), 2);
575
576 pool.release(buf1);
577 assert_eq!(pool.available(), 3);
578
579 pool.release(buf2);
580 assert_eq!(pool.available(), 4);
581 }
582
583 #[test]
584 fn test_buffer_pool_empty() {
585 let pool = BufferPool::new(1);
586 let _buf = pool.acquire().expect("Should acquire buffer");
587 assert!(pool.acquire().is_none(), "Pool should be empty");
588 }
589
590 #[test]
591 fn test_slice_read_buffer() {
592 let data: &[u8] = &[0x12, 0x34, 0x56, 0x78];
593 assert_eq!(data.get_u8(0), 0x12);
594 assert_eq!(data.get_u16_le(0), 0x3412);
595 assert_eq!(data.get_u32_le(0), 0x78563412);
596 }
597
598 #[test]
599 fn test_vec_write_buffer() {
600 let mut data = vec![0u8; 16];
601 data.put_u32_le(0, 0xDEADBEEF);
602 assert_eq!(data.get_u32_le(0), 0xDEADBEEF);
603 }
604
605 #[test]
606 fn test_aligned_buffer_zeroed() {
607 let buf: AlignedBuffer<128> = AlignedBuffer::zeroed();
608 assert_eq!(buf.len(), 128);
609 assert!(buf.as_slice().iter().all(|&b| b == 0));
610 }
611
612 #[test]
613 fn test_aligned_buffer_as_mut_slice() {
614 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
615 let slice = buf.as_mut_slice();
616 slice[0] = 0xAB;
617 slice[1] = 0xCD;
618 assert_eq!(buf.get_u8(0), 0xAB);
619 assert_eq!(buf.get_u8(1), 0xCD);
620 }
621
622 #[test]
623 fn test_aligned_buffer_debug() {
624 let buf: AlignedBuffer<256> = AlignedBuffer::new();
625 let debug_str = format!("{:?}", buf);
626 assert!(debug_str.contains("AlignedBuffer"));
627 assert!(debug_str.contains("256"));
628 }
629
630 #[test]
631 fn test_buffer_pool_clone() {
632 let pool1 = BufferPool::new(2);
633 let pool2 = pool1.clone();
634
635 let buf = pool1.acquire().expect("Should acquire");
637 assert_eq!(pool1.available(), 1);
638 assert_eq!(pool2.available(), 1);
639
640 pool2.release(buf);
641 assert_eq!(pool1.available(), 2);
642 assert_eq!(pool2.available(), 2);
643 }
644
645 #[test]
646 fn test_buffer_pool_debug() {
647 let pool = BufferPool::new(4);
648 let debug_str = format!("{:?}", pool);
649 assert!(debug_str.contains("BufferPool"));
650 assert!(debug_str.contains("capacity"));
651 assert!(debug_str.contains("4"));
652 }
653
654 #[test]
655 fn test_slice_read_buffer_all_types() {
656 let data: &[u8] = &[
657 0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
658 0x77, 0x88,
659 ];
660
661 assert_eq!(data.get_i8(0), 0x12);
662 assert_eq!(data.get_i16_le(0), 0x3412);
663 assert_eq!(data.get_i32_le(0), 0x78563412_u32 as i32);
664 assert_eq!(data.get_i64_le(0), 0xF0DEBC9A78563412_u64 as i64);
665 assert_eq!(data.get_u64_le(0), 0xF0DEBC9A78563412);
666 }
667
668 #[test]
669 fn test_vec_write_buffer_all_types() {
670 let mut data = vec![0u8; 64];
671
672 data.put_u8(0, 0xFF);
673 assert_eq!(data.get_u8(0), 0xFF);
674
675 data.put_i8(1, -1);
676 assert_eq!(data.get_i8(1), -1);
677
678 data.put_u16_le(2, 0xABCD);
679 assert_eq!(data.get_u16_le(2), 0xABCD);
680
681 data.put_i16_le(4, -1234);
682 assert_eq!(data.get_i16_le(4), -1234);
683
684 data.put_i32_le(8, -123456);
685 assert_eq!(data.get_i32_le(8), -123456);
686
687 data.put_u64_le(16, 0x123456789ABCDEF0);
688 assert_eq!(data.get_u64_le(16), 0x123456789ABCDEF0);
689
690 data.put_i64_le(24, -9876543210);
691 assert_eq!(data.get_i64_le(24), -9876543210);
692
693 data.put_f32_le(32, 1.23456);
694 assert!((data.get_f32_le(32) - 1.23456).abs() < 0.0001);
695
696 data.put_f64_le(40, 9.87654321);
697 assert!((data.get_f64_le(40) - 9.87654321).abs() < 0.0000001);
698
699 data.put_bytes(48, b"test");
700 assert_eq!(data.get_bytes(48, 4), b"test");
701
702 data.put_str(52, "hello", 8);
703 assert_eq!(data.get_str(52, 8), "hello");
704 }
705
706 #[test]
707 fn test_aligned_buffer_default() {
708 let buf: AlignedBuffer<32> = AlignedBuffer::default();
709 assert_eq!(buf.len(), 32);
710 }
711}