use std::marker::PhantomData;
use std::ops::Deref;
#[cfg(feature = "aeron")]
use rusteron_client::AeronBufferClaim;
#[cfg(feature = "aeron")]
use crate::adapters::aeron::error::TransportError;
pub struct ClaimBuffer<'a> {
#[cfg(feature = "aeron")]
claim: AeronBufferClaim,
#[cfg(feature = "aeron")]
position: i64,
#[cfg(feature = "aeron")]
finalised: bool,
_publisher: PhantomData<&'a mut ()>,
}
impl<'a> std::fmt::Debug for ClaimBuffer<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut dbg = f.debug_struct("ClaimBuffer");
#[cfg(feature = "aeron")]
{
dbg.field("position", &self.position)
.field("finalised", &self.finalised)
.field("len", &self.claim.length());
}
dbg.finish_non_exhaustive()
}
}
#[cfg(feature = "aeron")]
impl<'a> ClaimBuffer<'a> {
pub(crate) fn from_aeron(claim: AeronBufferClaim, position: i64) -> Self {
ClaimBuffer {
claim,
position,
finalised: false,
_publisher: PhantomData,
}
}
pub fn data(&mut self) -> &mut [u8] {
self.claim.data()
}
pub fn len(&self) -> usize {
self.claim.length()
}
pub fn is_empty(&self) -> bool {
self.claim.length() == 0
}
pub fn position(&self) -> i64 {
self.position
}
pub fn commit(mut self) -> Result<(), TransportError> {
self.finalised = true;
self.claim
.commit()
.map(|_| ())
.map_err(|e| TransportError::Backend(format!("aeron buffer commit: {:?}", e)))
}
pub fn abort(mut self) -> Result<(), TransportError> {
self.finalised = true;
self.claim
.abort()
.map(|_| ())
.map_err(|e| TransportError::Backend(format!("aeron buffer abort: {:?}", e)))
}
}
impl<'a> Drop for ClaimBuffer<'a> {
fn drop(&mut self) {
#[cfg(feature = "aeron")]
if !self.finalised {
let _ = self.claim.abort();
}
}
}
#[derive(Debug)]
pub struct FragmentBuffer<'a> {
buffer: &'a [u8],
header: FragmentHeader,
}
#[derive(Debug, Clone, Copy)]
pub struct FragmentHeader {
pub position: i64,
pub session_id: i32,
pub stream_id: i32,
}
impl<'a> FragmentBuffer<'a> {
pub fn new(buffer: &'a [u8], header: FragmentHeader) -> Self {
FragmentBuffer { buffer, header }
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn header(&self) -> &FragmentHeader {
&self.header
}
pub fn position(&self) -> i64 {
self.header.position
}
}
impl<'a> Deref for FragmentBuffer<'a> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buffer
}
}
impl<'a> AsRef<[u8]> for FragmentBuffer<'a> {
fn as_ref(&self) -> &[u8] {
self.buffer
}
}
#[cfg(test)]
mod tests {
use super::*;
fn header(position: i64) -> FragmentHeader {
FragmentHeader {
position,
session_id: 1,
stream_id: 1001,
}
}
#[test]
fn given_constructed_fragment_buffer_when_header_called_then_returns_constructor_header() {
let data: [u8; 4] = [1, 2, 3, 4];
let buf = FragmentBuffer::new(&data, header(4242));
let got = buf.header();
assert_eq!(got.position, 4242);
assert_eq!(got.session_id, 1);
assert_eq!(got.stream_id, 1001);
}
#[test]
fn given_constructed_fragment_buffer_when_position_called_then_returns_header_position() {
let data: [u8; 1] = [0];
let buf = FragmentBuffer::new(&data, header(99));
assert_eq!(buf.position(), 99);
}
#[test]
fn given_empty_fragment_buffer_when_is_empty_called_then_returns_true() {
let data: [u8; 0] = [];
let buf = FragmentBuffer::new(&data, header(0));
assert!(buf.is_empty());
assert_eq!(buf.len(), 0);
}
#[test]
fn given_non_empty_fragment_buffer_when_deref_called_then_yields_source_slice() {
let data: [u8; 3] = [9, 8, 7];
let buf = FragmentBuffer::new(&data, header(0));
let slice: &[u8] = &buf;
assert_eq!(slice, &data[..]);
assert_eq!(buf.len(), 3);
}
#[test]
fn given_non_empty_fragment_buffer_when_as_ref_called_then_yields_source_slice() {
let data: [u8; 3] = [10, 20, 30];
let buf = FragmentBuffer::new(&data, header(0));
assert_eq!(buf.as_ref(), &data[..]);
}
#[cfg(feature = "aeron")]
mod claim_buffer {
use super::super::*;
use rusteron_client::AeronBufferClaim;
fn defused_claim_buffer<'a>(position: i64) -> ClaimBuffer<'a> {
let mut buf = ClaimBuffer::from_aeron(AeronBufferClaim::default(), position);
buf.finalised = true;
buf
}
#[test]
fn given_defused_claim_buffer_when_position_called_then_returns_constructor_position() {
let buf = defused_claim_buffer(4242);
assert_eq!(buf.position(), 4242);
}
#[test]
fn given_defused_claim_buffer_when_len_called_then_returns_zero_for_default_claim() {
let buf = defused_claim_buffer(0);
assert_eq!(buf.len(), 0);
assert!(buf.is_empty());
}
#[test]
fn given_defused_claim_buffer_when_finalised_set_then_drop_does_not_call_ffi() {
let buf = defused_claim_buffer(7);
drop(buf);
}
}
}