1use crossbeam_queue::ArrayQueue;
10use std::sync::Arc;
11
12pub trait ReadBuffer {
16 fn as_slice(&self) -> &[u8];
18
19 fn len(&self) -> usize;
21
22 #[must_use]
24 fn is_empty(&self) -> bool {
25 self.len() == 0
26 }
27
28 #[inline(always)]
33 fn get_u8(&self, offset: usize) -> u8 {
34 self.as_slice()[offset]
35 }
36
37 #[inline(always)]
42 fn get_i8(&self, offset: usize) -> i8 {
43 self.as_slice()[offset] as i8
44 }
45
46 #[inline(always)]
51 fn get_u16_le(&self, offset: usize) -> u16 {
52 let bytes = &self.as_slice()[offset..offset + 2];
53 u16::from_le_bytes([bytes[0], bytes[1]])
54 }
55
56 #[inline(always)]
61 fn get_i16_le(&self, offset: usize) -> i16 {
62 let bytes = &self.as_slice()[offset..offset + 2];
63 i16::from_le_bytes([bytes[0], bytes[1]])
64 }
65
66 #[inline(always)]
71 fn get_u32_le(&self, offset: usize) -> u32 {
72 let bytes = &self.as_slice()[offset..offset + 4];
73 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
74 }
75
76 #[inline(always)]
81 fn get_i32_le(&self, offset: usize) -> i32 {
82 let bytes = &self.as_slice()[offset..offset + 4];
83 i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
84 }
85
86 #[inline(always)]
91 fn get_u64_le(&self, offset: usize) -> u64 {
92 let bytes = &self.as_slice()[offset..offset + 8];
93 u64::from_le_bytes(bytes.try_into().unwrap())
94 }
95
96 #[inline(always)]
101 fn get_i64_le(&self, offset: usize) -> i64 {
102 let bytes = &self.as_slice()[offset..offset + 8];
103 i64::from_le_bytes(bytes.try_into().unwrap())
104 }
105
106 #[inline(always)]
111 fn get_f32_le(&self, offset: usize) -> f32 {
112 f32::from_bits(self.get_u32_le(offset))
113 }
114
115 #[inline(always)]
120 fn get_f64_le(&self, offset: usize) -> f64 {
121 f64::from_bits(self.get_u64_le(offset))
122 }
123
124 #[inline(always)]
130 fn get_bytes(&self, offset: usize, len: usize) -> &[u8] {
131 &self.as_slice()[offset..offset + len]
132 }
133
134 #[inline]
141 fn get_str(&self, offset: usize, len: usize) -> &str {
142 let bytes = self.get_bytes(offset, len);
143 let end = bytes.iter().position(|&b| b == 0).unwrap_or(len);
144 std::str::from_utf8(&bytes[..end]).unwrap_or("")
145 }
146}
147
148pub trait WriteBuffer: ReadBuffer {
152 fn as_mut_slice(&mut self) -> &mut [u8];
154
155 #[inline(always)]
161 fn put_u8(&mut self, offset: usize, value: u8) {
162 self.as_mut_slice()[offset] = value;
163 }
164
165 #[inline(always)]
171 fn put_i8(&mut self, offset: usize, value: i8) {
172 self.as_mut_slice()[offset] = value as u8;
173 }
174
175 #[inline(always)]
181 fn put_u16_le(&mut self, offset: usize, value: u16) {
182 let bytes = value.to_le_bytes();
183 self.as_mut_slice()[offset..offset + 2].copy_from_slice(&bytes);
184 }
185
186 #[inline(always)]
192 fn put_i16_le(&mut self, offset: usize, value: i16) {
193 let bytes = value.to_le_bytes();
194 self.as_mut_slice()[offset..offset + 2].copy_from_slice(&bytes);
195 }
196
197 #[inline(always)]
203 fn put_u32_le(&mut self, offset: usize, value: u32) {
204 let bytes = value.to_le_bytes();
205 self.as_mut_slice()[offset..offset + 4].copy_from_slice(&bytes);
206 }
207
208 #[inline(always)]
214 fn put_i32_le(&mut self, offset: usize, value: i32) {
215 let bytes = value.to_le_bytes();
216 self.as_mut_slice()[offset..offset + 4].copy_from_slice(&bytes);
217 }
218
219 #[inline(always)]
225 fn put_u64_le(&mut self, offset: usize, value: u64) {
226 let bytes = value.to_le_bytes();
227 self.as_mut_slice()[offset..offset + 8].copy_from_slice(&bytes);
228 }
229
230 #[inline(always)]
236 fn put_i64_le(&mut self, offset: usize, value: i64) {
237 let bytes = value.to_le_bytes();
238 self.as_mut_slice()[offset..offset + 8].copy_from_slice(&bytes);
239 }
240
241 #[inline(always)]
247 fn put_f32_le(&mut self, offset: usize, value: f32) {
248 self.put_u32_le(offset, value.to_bits());
249 }
250
251 #[inline(always)]
257 fn put_f64_le(&mut self, offset: usize, value: f64) {
258 self.put_u64_le(offset, value.to_bits());
259 }
260
261 #[inline(always)]
267 fn put_bytes(&mut self, offset: usize, src: &[u8]) {
268 self.as_mut_slice()[offset..offset + src.len()].copy_from_slice(src);
269 }
270
271 #[inline]
278 fn put_str(&mut self, offset: usize, value: &str, max_len: usize) {
279 let bytes = value.as_bytes();
280 let copy_len = bytes.len().min(max_len);
281 self.as_mut_slice()[offset..offset + copy_len].copy_from_slice(&bytes[..copy_len]);
282 if copy_len < max_len {
283 self.as_mut_slice()[offset + copy_len..offset + max_len].fill(0);
284 }
285 }
286
287 #[inline]
293 fn zero(&mut self, offset: usize, len: usize) {
294 self.as_mut_slice()[offset..offset + len].fill(0);
295 }
296}
297
298impl ReadBuffer for [u8] {
300 #[inline(always)]
301 fn as_slice(&self) -> &[u8] {
302 self
303 }
304
305 #[inline(always)]
306 fn len(&self) -> usize {
307 <[u8]>::len(self)
308 }
309}
310
311impl WriteBuffer for [u8] {
313 #[inline(always)]
314 fn as_mut_slice(&mut self) -> &mut [u8] {
315 self
316 }
317}
318
319impl ReadBuffer for Vec<u8> {
321 #[inline(always)]
322 fn as_slice(&self) -> &[u8] {
323 self
324 }
325
326 #[inline(always)]
327 fn len(&self) -> usize {
328 Vec::len(self)
329 }
330}
331
332impl WriteBuffer for Vec<u8> {
334 #[inline(always)]
335 fn as_mut_slice(&mut self) -> &mut [u8] {
336 self
337 }
338}
339
340#[repr(C, align(64))]
348#[derive(Clone)]
349pub struct AlignedBuffer<const N: usize> {
350 data: [u8; N],
351}
352
353impl<const N: usize> AlignedBuffer<N> {
354 #[must_use]
356 pub const fn new() -> Self {
357 Self { data: [0u8; N] }
358 }
359
360 #[must_use]
362 pub const fn zeroed() -> Self {
363 Self { data: [0u8; N] }
364 }
365
366 #[must_use]
368 pub const fn capacity(&self) -> usize {
369 N
370 }
371}
372
373impl<const N: usize> Default for AlignedBuffer<N> {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379impl<const N: usize> ReadBuffer for AlignedBuffer<N> {
380 #[inline(always)]
381 fn as_slice(&self) -> &[u8] {
382 &self.data
383 }
384
385 #[inline(always)]
386 fn len(&self) -> usize {
387 N
388 }
389}
390
391impl<const N: usize> WriteBuffer for AlignedBuffer<N> {
392 #[inline(always)]
393 fn as_mut_slice(&mut self) -> &mut [u8] {
394 &mut self.data
395 }
396}
397
398impl<const N: usize> AsRef<[u8]> for AlignedBuffer<N> {
399 fn as_ref(&self) -> &[u8] {
400 &self.data
401 }
402}
403
404impl<const N: usize> AsMut<[u8]> for AlignedBuffer<N> {
405 fn as_mut(&mut self) -> &mut [u8] {
406 &mut self.data
407 }
408}
409
410impl<const N: usize> std::fmt::Debug for AlignedBuffer<N> {
411 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412 f.debug_struct("AlignedBuffer")
413 .field("capacity", &N)
414 .finish()
415 }
416}
417
418pub const DEFAULT_BUFFER_SIZE: usize = 65536;
420
421pub struct BufferPool {
426 buffers: Arc<ArrayQueue<Box<AlignedBuffer<DEFAULT_BUFFER_SIZE>>>>,
427 capacity: usize,
428}
429
430impl BufferPool {
431 #[must_use]
436 pub fn new(capacity: usize) -> Self {
437 let buffers = ArrayQueue::new(capacity);
438 for _ in 0..capacity {
439 let _ = buffers.push(Box::new(AlignedBuffer::zeroed()));
440 }
441 Self {
442 buffers: Arc::new(buffers),
443 capacity,
444 }
445 }
446
447 #[inline]
451 #[must_use]
452 pub fn acquire(&self) -> Option<Box<AlignedBuffer<DEFAULT_BUFFER_SIZE>>> {
453 self.buffers.pop()
454 }
455
456 #[inline]
463 pub fn release(&self, mut buffer: Box<AlignedBuffer<DEFAULT_BUFFER_SIZE>>) {
464 buffer.as_mut_slice().fill(0);
465 let _ = self.buffers.push(buffer);
466 }
467
468 #[must_use]
470 pub fn capacity(&self) -> usize {
471 self.capacity
472 }
473
474 #[must_use]
476 pub fn available(&self) -> usize {
477 self.buffers.len()
478 }
479}
480
481impl Clone for BufferPool {
482 fn clone(&self) -> Self {
483 Self {
484 buffers: Arc::clone(&self.buffers),
485 capacity: self.capacity,
486 }
487 }
488}
489
490impl std::fmt::Debug for BufferPool {
491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492 f.debug_struct("BufferPool")
493 .field("capacity", &self.capacity)
494 .field("available", &self.buffers.len())
495 .finish()
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_aligned_buffer_creation() {
505 let buf: AlignedBuffer<1024> = AlignedBuffer::new();
506 assert_eq!(buf.len(), 1024);
507 assert!(buf.as_slice().iter().all(|&b| b == 0));
508 }
509
510 #[test]
511 fn test_aligned_buffer_alignment() {
512 let buf: AlignedBuffer<64> = AlignedBuffer::new();
513 let ptr = buf.as_slice().as_ptr() as usize;
514 assert_eq!(ptr % 64, 0, "Buffer should be 64-byte aligned");
515 }
516
517 #[test]
518 fn test_read_write_primitives() {
519 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
520
521 buf.put_u8(0, 0xFF);
522 assert_eq!(buf.get_u8(0), 0xFF);
523
524 buf.put_i8(1, -42);
525 assert_eq!(buf.get_i8(1), -42);
526
527 buf.put_u16_le(2, 0x1234);
528 assert_eq!(buf.get_u16_le(2), 0x1234);
529
530 buf.put_i16_le(4, -1000);
531 assert_eq!(buf.get_i16_le(4), -1000);
532
533 buf.put_u32_le(8, 0x12345678);
534 assert_eq!(buf.get_u32_le(8), 0x12345678);
535
536 buf.put_i32_le(12, -100000);
537 assert_eq!(buf.get_i32_le(12), -100000);
538
539 buf.put_u64_le(16, 0x123456789ABCDEF0);
540 assert_eq!(buf.get_u64_le(16), 0x123456789ABCDEF0);
541
542 buf.put_i64_le(24, -1_000_000_000_000);
543 assert_eq!(buf.get_i64_le(24), -1_000_000_000_000);
544
545 buf.put_f32_le(32, std::f32::consts::PI);
546 assert!((buf.get_f32_le(32) - std::f32::consts::PI).abs() < 0.00001);
547
548 buf.put_f64_le(40, std::f64::consts::PI);
549 assert!((buf.get_f64_le(40) - std::f64::consts::PI).abs() < 0.0000001);
550 }
551
552 #[test]
553 fn test_read_write_bytes() {
554 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
555 let data = b"Hello, SBE!";
556
557 buf.put_bytes(0, data);
558 assert_eq!(buf.get_bytes(0, data.len()), data);
559 }
560
561 #[test]
562 fn test_read_write_str() {
563 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
564
565 buf.put_str(0, "AAPL", 8);
566 assert_eq!(buf.get_str(0, 8), "AAPL");
567
568 buf.put_str(8, "VERY_LONG_SYMBOL", 8);
569 assert_eq!(buf.get_str(8, 8), "VERY_LON");
570 }
571
572 #[test]
573 fn test_buffer_pool() {
574 let pool = BufferPool::new(4);
575 assert_eq!(pool.capacity(), 4);
576 assert_eq!(pool.available(), 4);
577
578 let buf1 = pool.acquire().expect("Should acquire buffer");
579 assert_eq!(pool.available(), 3);
580
581 let buf2 = pool.acquire().expect("Should acquire buffer");
582 assert_eq!(pool.available(), 2);
583
584 pool.release(buf1);
585 assert_eq!(pool.available(), 3);
586
587 pool.release(buf2);
588 assert_eq!(pool.available(), 4);
589 }
590
591 #[test]
592 fn test_buffer_pool_empty() {
593 let pool = BufferPool::new(1);
594 let _buf = pool.acquire().expect("Should acquire buffer");
595 assert!(pool.acquire().is_none(), "Pool should be empty");
596 }
597
598 #[test]
599 fn test_slice_read_buffer() {
600 let data: &[u8] = &[0x12, 0x34, 0x56, 0x78];
601 assert_eq!(data.get_u8(0), 0x12);
602 assert_eq!(data.get_u16_le(0), 0x3412);
603 assert_eq!(data.get_u32_le(0), 0x78563412);
604 }
605
606 #[test]
607 fn test_vec_write_buffer() {
608 let mut data = vec![0u8; 16];
609 data.put_u32_le(0, 0xDEADBEEF);
610 assert_eq!(data.get_u32_le(0), 0xDEADBEEF);
611 }
612
613 #[test]
614 fn test_aligned_buffer_zeroed() {
615 let buf: AlignedBuffer<128> = AlignedBuffer::zeroed();
616 assert_eq!(buf.len(), 128);
617 assert!(buf.as_slice().iter().all(|&b| b == 0));
618 }
619
620 #[test]
621 fn test_aligned_buffer_as_mut_slice() {
622 let mut buf: AlignedBuffer<64> = AlignedBuffer::new();
623 let slice = buf.as_mut_slice();
624 slice[0] = 0xAB;
625 slice[1] = 0xCD;
626 assert_eq!(buf.get_u8(0), 0xAB);
627 assert_eq!(buf.get_u8(1), 0xCD);
628 }
629
630 #[test]
631 fn test_aligned_buffer_debug() {
632 let buf: AlignedBuffer<256> = AlignedBuffer::new();
633 let debug_str = format!("{:?}", buf);
634 assert!(debug_str.contains("AlignedBuffer"));
635 assert!(debug_str.contains("256"));
636 }
637
638 #[test]
639 fn test_buffer_pool_clone() {
640 let pool1 = BufferPool::new(2);
641 let pool2 = pool1.clone();
642
643 let buf = pool1.acquire().expect("Should acquire");
645 assert_eq!(pool1.available(), 1);
646 assert_eq!(pool2.available(), 1);
647
648 pool2.release(buf);
649 assert_eq!(pool1.available(), 2);
650 assert_eq!(pool2.available(), 2);
651 }
652
653 #[test]
654 fn test_buffer_pool_debug() {
655 let pool = BufferPool::new(4);
656 let debug_str = format!("{:?}", pool);
657 assert!(debug_str.contains("BufferPool"));
658 assert!(debug_str.contains("capacity"));
659 assert!(debug_str.contains("4"));
660 }
661
662 #[test]
663 fn test_slice_read_buffer_all_types() {
664 let data: &[u8] = &[
665 0x12, 0x34, 0x56, 0x78, 0x9A, 0xBC, 0xDE, 0xF0, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66,
666 0x77, 0x88,
667 ];
668
669 assert_eq!(data.get_i8(0), 0x12);
670 assert_eq!(data.get_i16_le(0), 0x3412);
671 assert_eq!(data.get_i32_le(0), 0x78563412_u32 as i32);
672 assert_eq!(data.get_i64_le(0), 0xF0DEBC9A78563412_u64 as i64);
673 assert_eq!(data.get_u64_le(0), 0xF0DEBC9A78563412);
674 }
675
676 #[test]
677 fn test_vec_write_buffer_all_types() {
678 let mut data = vec![0u8; 64];
679
680 data.put_u8(0, 0xFF);
681 assert_eq!(data.get_u8(0), 0xFF);
682
683 data.put_i8(1, -1);
684 assert_eq!(data.get_i8(1), -1);
685
686 data.put_u16_le(2, 0xABCD);
687 assert_eq!(data.get_u16_le(2), 0xABCD);
688
689 data.put_i16_le(4, -1234);
690 assert_eq!(data.get_i16_le(4), -1234);
691
692 data.put_i32_le(8, -123456);
693 assert_eq!(data.get_i32_le(8), -123456);
694
695 data.put_u64_le(16, 0x123456789ABCDEF0);
696 assert_eq!(data.get_u64_le(16), 0x123456789ABCDEF0);
697
698 data.put_i64_le(24, -9876543210);
699 assert_eq!(data.get_i64_le(24), -9876543210);
700
701 data.put_f32_le(32, 1.23456);
702 assert!((data.get_f32_le(32) - 1.23456).abs() < 0.0001);
703
704 data.put_f64_le(40, 9.87654321);
705 assert!((data.get_f64_le(40) - 9.87654321).abs() < 0.0000001);
706
707 data.put_bytes(48, b"test");
708 assert_eq!(data.get_bytes(48, 4), b"test");
709
710 data.put_str(52, "hello", 8);
711 assert_eq!(data.get_str(52, 8), "hello");
712 }
713
714 #[test]
715 fn test_aligned_buffer_default() {
716 let buf: AlignedBuffer<32> = AlignedBuffer::default();
717 assert_eq!(buf.len(), 32);
718 }
719}