1use std::{fmt::Debug, hash, io::IoSlice, mem};
23
24use crate::{
25 rtype_dispatch, HasRType, Record, RecordHeader, RecordMut, RecordRef, RecordRefEnum,
26 RecordRefMut, MAX_RECORD_LEN,
27};
28
29#[derive(Clone)]
52#[cfg_attr(feature = "trivial_copy", derive(Copy))]
53#[repr(align(8))]
54pub struct RecordBuf<const CAP: usize = MAX_RECORD_LEN>(Repr<CAP>);
55
56#[derive(Clone, Copy)]
57union Repr<const CAP: usize> {
58 hd: RecordHeader,
59 buf: [u8; CAP],
60}
61
62impl<const CAP: usize> RecordBuf<CAP> {
63 pub const fn capacity() -> usize {
66 CAP
67 }
68
69 pub fn as_rec_ref(&self) -> RecordRef<'_> {
80 unsafe { RecordRef::new(self.as_ref()) }
82 }
83
84 pub fn as_rec_ref_mut(&mut self) -> RecordRefMut<'_> {
96 unsafe { RecordRefMut::new(self.raw_buf_mut()) }
98 }
99
100 pub fn as_enum(&self) -> crate::Result<RecordRefEnum<'_>> {
105 RecordRefEnum::try_from(self.as_rec_ref())
106 }
107
108 pub fn upgrade<F, T>(&mut self) -> crate::Result<()>
123 where
124 F: HasRType,
125 T: HasRType,
126 T: for<'a> From<&'a F>,
127 {
128 let upgraded = T::from(self.try_get::<F>()?);
129 self.set(upgraded);
130 Ok(())
131 }
132
133 pub fn set<T>(&mut self, other: T)
148 where
149 T: HasRType,
150 {
151 const {
152 assert!(
153 mem::size_of::<T>() <= CAP,
154 "record size exceeds buffer capacity",
155 );
156 }
157 let size = other.record_size();
158 debug_assert!(
159 size <= CAP,
160 "record_size ({size}) exceeds buffer capacity ({CAP})"
161 );
162 unsafe {
166 self.0.buf[..size].copy_from_slice(other.as_ref());
167 self.0.buf[size..].fill(0);
168 }
169 }
170
171 pub fn has<T: HasRType>(&self) -> bool {
173 T::has_rtype(self.header().rtype)
174 }
175
176 pub fn get<T: HasRType>(&self) -> Option<&T> {
195 if self.has::<T>() {
196 assert!(
197 self.record_size() >= mem::size_of::<T>(),
198 "Malformed `{}` record: expected length of at least {} bytes, found {} bytes. \
199 Confirm the DBN version in the Metadata header and the version upgrade policy",
200 std::any::type_name::<T>(),
201 mem::size_of::<T>(),
202 self.record_size()
203 );
204 Some(unsafe { std::mem::transmute::<&Repr<CAP>, &T>(&self.0) })
207 } else {
208 None
209 }
210 }
211
212 pub fn get_mut<T: HasRType>(&mut self) -> Option<&mut T> {
229 if self.has::<T>() {
230 assert!(
231 self.record_size() >= mem::size_of::<T>(),
232 "Malformed `{}` record: expected length of at least {} bytes, found {} bytes. \
233 Confirm the DBN version in the Metadata header and the version upgrade policy",
234 std::any::type_name::<T>(),
235 mem::size_of::<T>(),
236 self.record_size()
237 );
238 Some(unsafe { std::mem::transmute::<&mut Repr<CAP>, &mut T>(&mut self.0) })
240 } else {
241 None
242 }
243 }
244
245 pub fn try_get<T: HasRType>(&self) -> crate::Result<&T> {
265 if self.has::<T>() {
266 if self.record_size() >= mem::size_of::<T>() {
267 Ok(unsafe { std::mem::transmute::<&Repr<CAP>, &T>(&self.0) })
269 } else {
270 Err(crate::Error::conversion::<T>(format!(
271 "{self:?} has insufficient length, may be an earlier version of this record"
272 )))
273 }
274 } else {
275 Err(crate::Error::conversion::<T>(format!(
276 "{self:?} has incorrect rtype"
277 )))
278 }
279 }
280
281 pub fn try_get_mut<T: HasRType>(&mut self) -> crate::Result<&mut T> {
288 if self.has::<T>() {
289 if self.record_size() >= mem::size_of::<T>() {
290 Ok(unsafe { std::mem::transmute::<&mut Repr<CAP>, &mut T>(&mut self.0) })
292 } else {
293 Err(crate::Error::conversion::<T>(format!(
294 "{self:?} has insufficient length, may be an earlier version of this record"
295 )))
296 }
297 } else {
298 Err(crate::Error::conversion::<T>(format!(
299 "{self:?} has incorrect rtype"
300 )))
301 }
302 }
303
304 pub unsafe fn get_unchecked<T: HasRType>(&self) -> &T {
311 debug_assert!(self.record_size() >= mem::size_of::<T>());
312 self.0.buf.as_ptr().cast::<T>().as_ref().unwrap_unchecked()
315 }
316
317 pub unsafe fn get_unchecked_mut<T: HasRType>(&mut self) -> &mut T {
325 debug_assert!(self.record_size() >= mem::size_of::<T>());
326 self.0
329 .buf
330 .as_mut_ptr()
331 .cast::<T>()
332 .as_mut()
333 .unwrap_unchecked()
334 }
335}
336
337impl<const CAP: usize> Record for RecordBuf<CAP> {
338 fn header(&self) -> &RecordHeader {
339 unsafe { &self.0.hd }
342 }
343
344 fn raw_index_ts(&self) -> u64 {
345 fn raw_index_ts<T: HasRType>(t: &T) -> u64 {
346 t.raw_index_ts()
347 }
348 rtype_dispatch!(self, raw_index_ts()).unwrap_or_else(|_| self.header().ts_event)
349 }
350}
351
352impl<const CAP: usize> RecordMut for RecordBuf<CAP> {
353 fn header_mut(&mut self) -> &mut RecordHeader {
354 unsafe { &mut self.0.hd }
356 }
357}
358
359impl<const CAP: usize> AsRef<[u8]> for RecordBuf<CAP> {
360 fn as_ref(&self) -> &[u8] {
361 unsafe { std::slice::from_raw_parts(self.0.buf.as_ptr(), self.record_size()) }
365 }
366}
367
368impl<const CAP: usize> RecordBuf<CAP> {
369 pub fn raw_buf_mut(&mut self) -> &mut [u8; CAP] {
373 unsafe { &mut self.0.buf }
375 }
376}
377
378impl<T, const CAP: usize> From<T> for RecordBuf<CAP>
379where
380 T: HasRType,
381{
382 fn from(value: T) -> Self {
385 const {
386 assert!(
387 mem::size_of::<T>() <= CAP,
388 "record size exceeds buffer capacity"
389 )
390 };
391 let mut buf = [0u8; CAP];
392 buf[..value.record_size()].copy_from_slice(value.as_ref());
393 Self(Repr { buf })
394 }
395}
396
397impl<'a, const CAP: usize> From<&'a RecordBuf<CAP>> for IoSlice<'a> {
398 fn from(rec: &'a RecordBuf<CAP>) -> Self {
399 Self::new(rec.as_ref())
400 }
401}
402
403impl<const A: usize, const B: usize> PartialEq<RecordBuf<B>> for RecordBuf<A> {
404 fn eq(&self, other: &RecordBuf<B>) -> bool {
405 self.as_ref() == other.as_ref()
406 }
407}
408
409impl<const CAP: usize> Eq for RecordBuf<CAP> {}
410
411impl<const CAP: usize> hash::Hash for RecordBuf<CAP> {
412 fn hash<H: hash::Hasher>(&self, state: &mut H) {
413 self.as_ref().hash(state);
414 }
415}
416
417impl<const CAP: usize> PartialEq<RecordRef<'_>> for RecordBuf<CAP> {
418 fn eq(&self, other: &RecordRef<'_>) -> bool {
419 *self.as_ref() == *other.as_ref()
420 }
421}
422
423impl<const CAP: usize> PartialEq<RecordRefMut<'_>> for RecordBuf<CAP> {
424 fn eq(&self, other: &RecordRefMut<'_>) -> bool {
425 *self.as_ref() == *other.as_ref()
426 }
427}
428
429impl<const CAP: usize> Debug for RecordBuf<CAP> {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 fn fmt_rec<T: HasRType + Debug>(t: &T, debug: &mut std::fmt::DebugStruct) {
432 debug.field("buf", &t);
433 }
434 let mut debug = f.debug_struct("RecordBuf");
435 match rtype_dispatch!(self, fmt_rec(&mut debug)) {
436 Ok(_) => debug.finish(),
437 Err(_) => debug.field("hd", self.header()).finish_non_exhaustive(),
438 }
439 }
440}
441
442impl<const CAP: usize> TryFrom<RecordRef<'_>> for RecordBuf<CAP> {
443 type Error = crate::Error;
444
445 fn try_from(rec_ref: RecordRef<'_>) -> Result<Self, Self::Error> {
450 if rec_ref.record_size() > CAP {
451 Err(crate::Error::conversion::<Self>(format!(
452 "{rec_ref:?} is too long for the RecordBuf's capacity"
453 )))
454 } else {
455 let mut buf = [0; CAP];
456 buf[..rec_ref.record_size()].copy_from_slice(rec_ref.as_ref());
457 Ok(Self(Repr { buf }))
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use std::{ffi::c_char, io::IoSlice};
465
466 use crate::{
467 enums::rtype, v1, v3, FlagSet, MboMsg, RecordHeader, RecordRef, RecordRefEnum,
468 RecordRefMut, TradeMsg, MAX_RECORD_LEN,
469 };
470
471 use super::*;
472
473 type Buf = RecordBuf;
475
476 const SOURCE_RECORD: MboMsg = MboMsg {
477 hd: RecordHeader::new::<MboMsg>(rtype::MBO, 1, 1, 0),
478 order_id: 17,
479 price: 0,
480 size: 32,
481 flags: FlagSet::empty(),
482 channel_id: 1,
483 action: 'A' as c_char,
484 side: 'B' as c_char,
485 ts_recv: 0,
486 ts_in_delta: 160,
487 sequence: 1067,
488 };
489
490 #[test]
491 fn round_trip() {
492 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
493 let rec = buf.get::<MboMsg>().expect("should contain MboMsg");
494 assert_eq!(*rec, SOURCE_RECORD);
495 }
496
497 #[test]
498 fn wrong_type_returns_none() {
499 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
500 assert!(buf.has::<MboMsg>());
501 assert!(!buf.has::<TradeMsg>());
502 assert!(buf.get::<TradeMsg>().is_none());
503 }
504
505 #[test]
506 fn try_get_insufficient_length() {
507 let def = v1::InstrumentDefMsg::default();
508 let buf: Buf = RecordBuf::from(def);
509 let err = buf.try_get::<v3::InstrumentDefMsg>().unwrap_err();
510 assert!(
511 err.to_string().contains("has insufficient length"),
512 "unexpected error: {err}"
513 );
514 }
515
516 #[test]
517 fn try_from_record_ref_capacity_overflow() {
518 let mbo = SOURCE_RECORD;
519 let rec_ref = RecordRef::from(&mbo);
520 let result = RecordBuf::<4>::try_from(rec_ref);
521 assert!(result.is_err());
522 }
523
524 #[test]
525 fn upgrade_v1_to_v3() {
526 let def = v1::InstrumentDefMsg::default();
527 let mut buf: Buf = RecordBuf::from(def);
528 assert!(buf.has::<v1::InstrumentDefMsg>());
529 buf.upgrade::<v1::InstrumentDefMsg, v3::InstrumentDefMsg>()
530 .unwrap();
531 assert!(buf.has::<v3::InstrumentDefMsg>());
532 }
533
534 #[test]
535 fn partial_eq_same_capacity() {
536 let buf1: Buf = RecordBuf::from(SOURCE_RECORD);
537 let buf2: Buf = RecordBuf::from(SOURCE_RECORD);
538 assert_eq!(buf1, buf2);
539
540 let other: Buf = RecordBuf::from(TradeMsg::default());
541 assert_ne!(buf1, other);
542 }
543
544 #[test]
545 fn partial_eq_cross_capacity() {
546 let buf_default: Buf = RecordBuf::from(SOURCE_RECORD);
547 let buf_small = RecordBuf::<256>::from(SOURCE_RECORD);
548 assert!(buf_default == buf_small);
549 }
550
551 #[test]
552 fn partial_eq_with_record_ref() {
553 let mbo = SOURCE_RECORD;
554 let buf: Buf = RecordBuf::from(mbo);
555 let mbo2 = SOURCE_RECORD;
556 let rec_ref = RecordRef::from(&mbo2);
557 assert!(buf == rec_ref);
558 }
559
560 #[test]
561 fn set_replaces_record() {
562 let mut buf: Buf = RecordBuf::from(SOURCE_RECORD);
563 assert!(buf.has::<MboMsg>());
564
565 let trade = TradeMsg::default();
566 buf.set(trade);
567 assert!(buf.has::<TradeMsg>());
568 assert!(!buf.has::<MboMsg>());
569 }
570
571 #[test]
572 fn get_mut_returns_mutable_ref() {
573 let mut buf: Buf = RecordBuf::from(SOURCE_RECORD);
574 let rec = buf.get_mut::<MboMsg>().expect("should contain MboMsg");
575 rec.order_id = 42;
576 assert_eq!(buf.get::<MboMsg>().unwrap().order_id, 42);
577 }
578
579 #[test]
580 fn get_mut_wrong_type_returns_none() {
581 let mut buf: Buf = RecordBuf::from(SOURCE_RECORD);
582 assert!(buf.get_mut::<TradeMsg>().is_none());
583 }
584
585 #[test]
586 fn try_get_mut_wrong_rtype() {
587 let mut buf: Buf = RecordBuf::from(SOURCE_RECORD);
588 let err = buf.try_get_mut::<TradeMsg>().unwrap_err();
589 assert!(
590 err.to_string().contains("has incorrect rtype"),
591 "unexpected error: {err}"
592 );
593 }
594
595 #[test]
596 fn try_get_mut_insufficient_length() {
597 let def = v1::InstrumentDefMsg::default();
598 let mut buf: Buf = RecordBuf::from(def);
599 let err = buf.try_get_mut::<v3::InstrumentDefMsg>().unwrap_err();
600 assert!(
601 err.to_string().contains("has insufficient length"),
602 "unexpected error: {err}"
603 );
604 }
605
606 #[test]
607 fn get_unchecked_returns_correct_record() {
608 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
609 assert!(buf.has::<MboMsg>());
610 let rec = unsafe { buf.get_unchecked::<MboMsg>() };
612 assert_eq!(*rec, SOURCE_RECORD);
613 }
614
615 #[test]
616 fn get_unchecked_mut_returns_correct_record() {
617 let mut buf: Buf = RecordBuf::from(SOURCE_RECORD);
618 assert!(buf.has::<MboMsg>());
619 unsafe { buf.get_unchecked_mut::<MboMsg>() }.order_id = 99;
621 assert_eq!(buf.get::<MboMsg>().unwrap().order_id, 99);
622 }
623
624 #[test]
625 fn as_rec_ref_mut_allows_mutation() {
626 let mut buf: Buf = RecordBuf::from(SOURCE_RECORD);
627 buf.as_rec_ref_mut().get_mut::<MboMsg>().unwrap().order_id = 77;
628 assert_eq!(buf.get::<MboMsg>().unwrap().order_id, 77);
629 }
630
631 #[test]
632 fn io_slice_spans_record_bytes_only() {
633 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
634 let slice = IoSlice::from(&buf);
635 assert_eq!(slice.len(), buf.record_size());
636 assert!(slice.len() < MAX_RECORD_LEN);
637 }
638
639 #[test]
640 fn as_ref_returns_record_bytes_only() {
641 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
642 assert_eq!(buf.as_ref().len(), buf.record_size());
643 assert!(buf.as_ref().len() < MAX_RECORD_LEN);
644 }
645
646 #[test]
647 fn try_get_incorrect_rtype_error() {
648 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
649 let err = buf.try_get::<TradeMsg>().unwrap_err();
650 assert!(
651 err.to_string().contains("has incorrect rtype"),
652 "unexpected error: {err}"
653 );
654 }
655
656 #[test]
657 fn partial_eq_with_record_ref_mut() {
658 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
659 let mut mbo = SOURCE_RECORD;
660 let ref_mut = RecordRefMut::from(&mut mbo);
661 assert!(buf == ref_mut);
662 }
663
664 #[test]
665 fn upgrade_wrong_type_returns_error() {
666 let mut buf: Buf = RecordBuf::from(SOURCE_RECORD);
667 assert!(buf
668 .upgrade::<v1::InstrumentDefMsg, v3::InstrumentDefMsg>()
669 .is_err());
670 }
671
672 #[test]
673 fn as_enum_dispatches_correctly() {
674 let buf: Buf = RecordBuf::from(SOURCE_RECORD);
675 assert!(matches!(buf.as_enum().unwrap(), RecordRefEnum::Mbo(_)));
676 }
677
678 #[test]
679 fn set_clears_trailing_bytes() {
680 let def = v3::InstrumentDefMsg::default();
683 let mut buf: Buf = RecordBuf::from(def);
684 buf.set(SOURCE_RECORD);
685 let record_size = buf.record_size();
686 assert!(buf.raw_buf_mut()[record_size..].iter().all(|&b| b == 0));
687 }
688}