use crate::adapters::aeron::DEFAULT_FRAGMENT_LIMIT;
use crate::adapters::aeron::buffer::{ClaimBuffer, FragmentBuffer, FragmentHeader};
use crate::adapters::aeron::error::TransportError;
use crate::adapters::aeron::transport::{AeronPublisherBackend, AeronSubscriberBackend};
use rusteron_client::{
Aeron, AeronBufferClaim, AeronContext, AeronPublication, AeronSubscription, Handlers,
IntoCString,
};
use std::time::Duration;
fn result_to_transport_error(result: i64) -> Result<i64, TransportError> {
match result {
pos if pos >= 0 => Ok(pos),
-2 => Err(TransportError::BackPressure),
-1 => Err(TransportError::Connection("Not connected".to_string())),
-4 => Err(TransportError::Connection("Publication closed".to_string())),
code => Err(TransportError::Backend(format!(
"Rusteron error code: {}",
code
))),
}
}
pub struct AeronHandle {
_context: AeronContext,
aeron: Aeron,
}
impl AeronHandle {
pub fn connect() -> anyhow::Result<Self> {
let context = AeronContext::new()?;
let aeron = Aeron::new(&context)?;
aeron.start()?;
Ok(Self {
_context: context,
aeron,
})
}
pub fn subscription(
&self,
channel: &str,
stream_id: i32,
timeout: Duration,
) -> anyhow::Result<RusteronSubscriber> {
let sub = self
.aeron
.async_add_subscription(
&channel.into_c_string(),
stream_id,
Handlers::no_available_image_handler(),
Handlers::no_unavailable_image_handler(),
)?
.poll_blocking(timeout)?;
Ok(RusteronSubscriber {
sub,
fragment_limit: DEFAULT_FRAGMENT_LIMIT,
})
}
pub fn publication(
&self,
channel: &str,
stream_id: i32,
timeout: Duration,
) -> anyhow::Result<RusteronPublisher> {
let publication = self
.aeron
.async_add_publication(&channel.into_c_string(), stream_id)?
.poll_blocking(timeout)?;
Ok(RusteronPublisher { publication })
}
}
pub struct RusteronSubscriber {
sub: AeronSubscription,
fragment_limit: usize,
}
impl RusteronSubscriber {
#[must_use]
pub fn new(sub: AeronSubscription) -> Self {
Self {
sub,
fragment_limit: DEFAULT_FRAGMENT_LIMIT,
}
}
#[must_use]
pub fn with_fragment_limit(mut self, fragment_limit: usize) -> Self {
self.fragment_limit = fragment_limit;
self
}
#[must_use]
pub fn fragment_limit(&self) -> usize {
self.fragment_limit
}
}
impl AeronSubscriberBackend for RusteronSubscriber {
fn poll(&mut self, handler: &mut dyn FnMut(&[u8])) -> anyhow::Result<usize> {
let mut count = 0usize;
self.sub.poll_once(
|buffer, _header| {
handler(buffer);
count += 1;
},
self.fragment_limit,
)?;
Ok(count)
}
fn poll_fragments(
&mut self,
handler: &mut dyn FnMut(&FragmentBuffer<'_>),
) -> Result<usize, TransportError> {
let mut count = 0usize;
self.sub
.poll_once(
|buffer, header| {
let (session_id, stream_id) = header
.get_values()
.ok()
.map(|v| {
let frame = v.frame();
(frame.session_id(), frame.stream_id())
})
.unwrap_or((0, 0));
let frag_header = FragmentHeader {
position: header.position(),
session_id,
stream_id,
};
let frag = FragmentBuffer::new(buffer, frag_header);
handler(&frag);
count += 1;
},
self.fragment_limit,
)
.map_err(|e| TransportError::Backend(format!("{e:#}")))?;
Ok(count)
}
fn is_connected(&self) -> bool {
self.sub.is_connected()
}
fn is_closed(&self) -> bool {
self.sub.is_closed()
}
fn with_fragment_limit(self, fragment_limit: usize) -> Self {
RusteronSubscriber::with_fragment_limit(self, fragment_limit)
}
}
pub struct RusteronPublisher {
publication: AeronPublication,
}
impl RusteronPublisher {
#[must_use]
pub fn new(publication: AeronPublication) -> Self {
Self { publication }
}
}
impl AeronPublisherBackend for RusteronPublisher {
fn offer(&mut self, buffer: &[u8]) -> anyhow::Result<()> {
let position = self
.publication
.offer(buffer, Handlers::no_reserved_value_supplier_handler());
result_to_transport_error(position)?;
Ok(())
}
fn is_connected(&self) -> bool {
self.publication.is_connected()
}
fn is_closed(&self) -> bool {
self.publication.is_closed()
}
fn try_claim<'a>(&'a mut self, length: usize) -> Result<ClaimBuffer<'a>, TransportError> {
let claim = AeronBufferClaim::default();
let position = self.publication.try_claim(length, &claim);
let position = result_to_transport_error(position)?;
Ok(ClaimBuffer::from_aeron(claim, position))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn given_positive_result_when_mapped_then_returns_ok_position() {
assert!(matches!(result_to_transport_error(12345), Ok(12345)));
assert!(matches!(result_to_transport_error(0), Ok(0)));
}
#[test]
fn given_minus_two_when_mapped_then_returns_back_pressure() {
let err = result_to_transport_error(-2).expect_err("-2 maps to Err");
assert!(matches!(err, TransportError::BackPressure));
}
#[test]
fn given_minus_one_when_mapped_then_returns_connection_not_connected() {
let err = result_to_transport_error(-1).expect_err("-1 maps to Err");
match err {
TransportError::Connection(msg) => assert!(
msg.contains("Not connected"),
"expected 'Not connected', got: {msg}"
),
other => panic!("expected Connection, got {other:?}"),
}
}
#[test]
fn given_minus_four_when_mapped_then_returns_connection_publication_closed() {
let err = result_to_transport_error(-4).expect_err("-4 maps to Err");
match err {
TransportError::Connection(msg) => assert!(
msg.contains("Publication closed"),
"expected 'Publication closed', got: {msg}"
),
other => panic!("expected Connection, got {other:?}"),
}
}
#[test]
fn given_unknown_negative_code_when_mapped_then_returns_backend_with_code() {
let err = result_to_transport_error(-99).expect_err("-99 maps to Err");
match err {
TransportError::Backend(msg) => {
assert!(msg.contains("-99"), "expected code in message, got: {msg}")
}
other => panic!("expected Backend, got {other:?}"),
}
}
#[cfg(feature = "aeron-integration-test")]
mod field_tests {
use super::*;
use crate::adapters::aeron::integration_tests::{
AERON_CHANNEL, CONNECT_TIMEOUT, start_media_driver,
};
#[test]
fn given_rusteron_subscriber_when_built_default_then_fragment_limit_is_256()
-> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let sub = handle.subscription(AERON_CHANNEL, 2101, CONNECT_TIMEOUT)?;
assert_eq!(sub.fragment_limit(), 256);
Ok(())
}
#[test]
fn given_rusteron_subscriber_when_with_fragment_limit_then_field_matches()
-> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronHandle::connect()?;
let sub = handle
.subscription(AERON_CHANNEL, 2102, CONNECT_TIMEOUT)?
.with_fragment_limit(32);
assert_eq!(sub.fragment_limit(), 32);
Ok(())
}
}
}