use crate::adapters::aeron::DEFAULT_FRAGMENT_LIMIT;
use crate::adapters::aeron::buffer::{FragmentBuffer, FragmentHeader};
use crate::adapters::aeron::error::TransportError;
use crate::adapters::aeron::transport::{AeronPublisherBackend, AeronSubscriberBackend};
use aeron_rs::aeron::Aeron;
use aeron_rs::concurrent::atomic_buffer::{AlignedBuffer, AtomicBuffer};
use aeron_rs::concurrent::logbuffer::header::Header;
use aeron_rs::context::Context;
use aeron_rs::publication::Publication;
use aeron_rs::subscription::Subscription;
use aeron_rs::utils::types::Index;
use std::ffi::CString;
use std::sync::{Arc, Mutex};
use std::time::Duration;
fn poll_until_found<T, F>(mut check: F, timeout: Duration, what: &str) -> anyhow::Result<T>
where
F: FnMut() -> Option<T>,
{
let deadline = std::time::Instant::now() + timeout;
loop {
if let Some(item) = check() {
return Ok(item);
}
if std::time::Instant::now() > deadline {
anyhow::bail!("timed out waiting for {what}");
}
std::thread::sleep(Duration::from_millis(1));
}
}
pub struct AeronRsHandle {
aeron: Arc<Mutex<Aeron>>,
}
impl AeronRsHandle {
#[allow(clippy::arc_with_non_send_sync)]
pub fn connect() -> anyhow::Result<Self> {
let mut ctx = Context::new();
if let Ok(dir) = std::env::var("AERON_DIR") {
ctx.set_aeron_dir(dir);
}
let aeron = Aeron::new(ctx)?;
Ok(Self {
aeron: Arc::new(Mutex::new(aeron)),
})
}
pub fn subscription(
&self,
channel: &str,
stream_id: i32,
timeout: Duration,
) -> anyhow::Result<AeronRsSubscriber> {
let mut aeron = self
.aeron
.lock()
.map_err(|e| anyhow::anyhow!("aeron-rs client mutex poisoned: {e}"))?;
let id = aeron.add_subscription(CString::new(channel)?, stream_id)?;
let sub = poll_until_found(
|| aeron.find_subscription(id).ok(),
timeout,
&format!("subscription on {channel}:{stream_id}"),
)?;
Ok(AeronRsSubscriber {
sub,
fragment_limit: DEFAULT_FRAGMENT_LIMIT,
})
}
pub fn publication(
&self,
channel: &str,
stream_id: i32,
timeout: Duration,
) -> anyhow::Result<AeronRsPublisher> {
let mut aeron = self
.aeron
.lock()
.map_err(|e| anyhow::anyhow!("aeron-rs client mutex poisoned: {e}"))?;
let id = aeron.add_publication(CString::new(channel)?, stream_id)?;
let publication = poll_until_found(
|| aeron.find_publication(id).ok(),
timeout,
&format!("publication on {channel}:{stream_id}"),
)?;
Ok(AeronRsPublisher { publication })
}
}
pub struct AeronRsSubscriber {
sub: Arc<Mutex<Subscription>>,
fragment_limit: usize,
}
impl AeronRsSubscriber {
#[must_use]
pub fn with_fragment_limit(mut self, fragment_limit: usize) -> Self {
self.fragment_limit = fragment_limit;
self
}
#[must_use]
pub fn fragment_limit(&self) -> usize {
self.fragment_limit
}
}
impl AeronSubscriberBackend for AeronRsSubscriber {
fn poll(&mut self, handler: &mut dyn FnMut(&[u8])) -> anyhow::Result<usize> {
let mut count = 0usize;
let mut sub = self
.sub
.lock()
.map_err(|e| anyhow::anyhow!("aeron-rs subscription mutex poisoned: {e}"))?;
sub.poll(
&mut |buffer: &AtomicBuffer, offset: Index, length: Index, _header: &Header| {
handler(buffer.as_sub_slice(offset, length));
count += 1;
},
self.fragment_limit as Index,
);
Ok(count)
}
fn poll_fragments(
&mut self,
handler: &mut dyn FnMut(&FragmentBuffer<'_>),
) -> Result<usize, TransportError> {
let mut count = 0usize;
let mut sub = self
.sub
.lock()
.map_err(|e| TransportError::Backend(format!("aeron-rs subscription mutex: {e}")))?;
sub.poll(
&mut |buffer: &AtomicBuffer, offset: Index, length: Index, header: &Header| {
let frag_header = FragmentHeader {
position: header.position(),
session_id: header.session_id(),
stream_id: header.stream_id(),
};
let bytes = buffer.as_sub_slice(offset, length);
let frag = FragmentBuffer::new(bytes, frag_header);
handler(&frag);
count += 1;
},
self.fragment_limit as Index,
);
Ok(count)
}
fn with_fragment_limit(self, fragment_limit: usize) -> Self {
AeronRsSubscriber::with_fragment_limit(self, fragment_limit)
}
fn supports_graph_thread_poll(&self) -> bool {
false
}
}
#[cfg(all(test, feature = "aeron-integration-test"))]
mod tests {
use super::*;
use crate::adapters::aeron::integration_tests::{
AERON_CHANNEL, CONNECT_TIMEOUT, start_media_driver,
};
#[test]
fn given_aeron_rs_subscriber_when_built_default_then_fragment_limit_is_256()
-> anyhow::Result<()> {
let _container = start_media_driver()?;
let handle = AeronRsHandle::connect()?;
let sub = handle.subscription(AERON_CHANNEL, 3101, CONNECT_TIMEOUT)?;
assert_eq!(sub.fragment_limit(), 256);
Ok(())
}
#[test]
fn given_aeron_rs_subscriber_when_with_fragment_limit_then_field_matches() -> anyhow::Result<()>
{
let _container = start_media_driver()?;
let handle = AeronRsHandle::connect()?;
let sub = handle
.subscription(AERON_CHANNEL, 3102, CONNECT_TIMEOUT)?
.with_fragment_limit(64);
assert_eq!(sub.fragment_limit(), 64);
Ok(())
}
}
pub struct AeronRsPublisher {
publication: Arc<Mutex<Publication>>,
}
impl AeronPublisherBackend for AeronRsPublisher {
fn offer(&mut self, buffer: &[u8]) -> anyhow::Result<()> {
let publication = self
.publication
.lock()
.map_err(|e| anyhow::anyhow!("aeron-rs publication mutex poisoned: {e}"))?;
let len = buffer.len() as Index;
let aligned = AlignedBuffer::with_capacity(len);
let ab = AtomicBuffer::from_aligned(&aligned);
ab.put_bytes(0, buffer);
publication.offer_part(ab, 0, len)?;
Ok(())
}
}