pub mod buffer;
pub mod error;
pub mod status;
pub(crate) mod transport;
mod channel;
mod pub_node;
mod status_stream;
mod sub_fragment_node;
#[cfg(feature = "aeron")]
pub mod rusteron_backend;
#[cfg(feature = "aeron-rs-beta")]
pub mod aeron_rs_backend;
pub use buffer::{ClaimBuffer, FragmentBuffer, FragmentHeader};
pub use channel::ChannelUri;
pub use error::TransportError;
pub use pub_node::AeronPub;
pub use status::AeronStatus;
pub use status_stream::AeronStatusStream;
pub use transport::{AeronPublisherBackend, AeronSubscriberBackend};
#[cfg(feature = "aeron")]
pub use rusteron_backend::AeronHandle;
#[cfg(feature = "aeron-rs-beta")]
pub use aeron_rs_backend::AeronRsHandle;
use crate::{Burst, Element, IntoStream, Stream};
use std::cell::RefCell;
use std::rc::Rc;
#[cfg(all(test, feature = "aeron-integration-test"))]
pub(crate) mod integration_tests;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AeronMode {
Spin,
Threaded,
}
pub const DEFAULT_FRAGMENT_LIMIT: usize = 256;
#[derive(Debug, Clone, Copy)]
pub struct AeronSubOptions {
pub mode: AeronMode,
pub fragment_limit: usize,
}
impl Default for AeronSubOptions {
fn default() -> Self {
Self {
mode: AeronMode::Spin,
fragment_limit: DEFAULT_FRAGMENT_LIMIT,
}
}
}
fn effective_mode<B: transport::AeronSubscriberBackend>(
requested: AeronMode,
subscriber: &B,
) -> AeronMode {
if requested == AeronMode::Spin && !subscriber.supports_graph_thread_poll() {
log::warn!(
"aeron sub: backend cannot be polled on the graph thread (it locks on poll); \
downgrading AeronMode::Spin to Threaded to honour the no-locks-in-cycle invariant"
);
AeronMode::Threaded
} else {
requested
}
}
#[must_use]
pub fn aeron_sub_fragment<T, F, B>(
subscriber: B,
parser: F,
opts: AeronSubOptions,
) -> Rc<dyn Stream<Burst<T>>>
where
T: Element + Send,
F: FnMut(&buffer::FragmentBuffer<'_>) -> Result<Option<T>, error::TransportError>
+ Send
+ 'static,
B: transport::AeronSubscriberBackend,
{
let subscriber = subscriber.with_fragment_limit(opts.fragment_limit);
match effective_mode(opts.mode, &subscriber) {
AeronMode::Spin => {
sub_fragment_node::AeronSpinSubFragmentNode::new(subscriber, parser).into_stream()
}
AeronMode::Threaded => sub_fragment_node::build_threaded(subscriber, parser),
}
}
#[must_use]
pub fn aeron_sub_fragment_with_status<T, F, B>(
subscriber: B,
parser: F,
opts: AeronSubOptions,
) -> (Rc<dyn Stream<Burst<T>>>, Rc<dyn Stream<Burst<AeronStatus>>>)
where
T: Element + Send,
F: FnMut(&buffer::FragmentBuffer<'_>) -> Result<Option<T>, error::TransportError>
+ Send
+ 'static,
B: transport::AeronSubscriberBackend,
{
let subscriber = subscriber.with_fragment_limit(opts.fragment_limit);
match effective_mode(opts.mode, &subscriber) {
AeronMode::Spin => {
let status = Rc::new(RefCell::new(status_stream::AeronStatusStream::default()));
let status_stream: Rc<dyn Stream<Burst<AeronStatus>>> = status.clone();
let data = sub_fragment_node::AeronSpinSubFragmentNode::with_status(
subscriber,
parser,
Rc::clone(&status),
)
.into_stream();
status
.borrow_mut()
.set_producer(Rc::downgrade(&data.clone().as_node()));
(data, status_stream)
}
AeronMode::Threaded => {
let status = Rc::new(RefCell::new(status_stream::AeronStatusStream::default()));
let status_stream: Rc<dyn Stream<Burst<AeronStatus>>> = status.clone();
let data = sub_fragment_node::build_threaded_with_status(
subscriber,
parser,
Rc::clone(&status),
);
status
.borrow_mut()
.set_producer(Rc::downgrade(&data.clone().as_node()));
(data, status_stream)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn given_default_fragment_limit_constant_when_inspected_then_equals_256() {
assert_eq!(DEFAULT_FRAGMENT_LIMIT, 256);
}
#[test]
fn given_aeron_sub_options_when_default_then_fragment_limit_is_256() {
let opts = AeronSubOptions::default();
assert_eq!(opts.fragment_limit, DEFAULT_FRAGMENT_LIMIT);
assert_eq!(opts.fragment_limit, 256);
}
#[test]
fn given_aeron_sub_options_when_default_then_mode_is_spin() {
assert!(matches!(AeronSubOptions::default().mode, AeronMode::Spin));
}
struct NoGraphThreadPollSubscriber;
impl transport::AeronSubscriberBackend for NoGraphThreadPollSubscriber {
fn poll(&mut self, _handler: &mut dyn FnMut(&[u8])) -> anyhow::Result<usize> {
Ok(0)
}
fn supports_graph_thread_poll(&self) -> bool {
false
}
}
#[test]
fn given_non_graph_thread_backend_when_spin_requested_then_downgraded_to_threaded() {
let backend = NoGraphThreadPollSubscriber;
assert_eq!(
effective_mode(AeronMode::Spin, &backend),
AeronMode::Threaded,
"a backend that locks on poll must never run Spin on the graph thread"
);
}
#[test]
fn given_non_graph_thread_backend_when_threaded_requested_then_stays_threaded() {
let backend = NoGraphThreadPollSubscriber;
assert_eq!(
effective_mode(AeronMode::Threaded, &backend),
AeronMode::Threaded
);
}
#[test]
fn given_lock_free_backend_when_spin_requested_then_stays_spin() {
let backend = transport::MockSubscriber::from_messages(vec![]);
assert_eq!(effective_mode(AeronMode::Spin, &backend), AeronMode::Spin);
}
#[test]
fn given_aeron_sub_fragment_with_status_when_threaded_mode_then_disconnected_emits_no_transition()
{
use crate::adapters::aeron::buffer::FragmentBuffer;
use crate::adapters::aeron::error::TransportError;
use crate::adapters::aeron::transport::MockSubscriber;
use crate::{NodeOperators, RunFor, RunMode, StreamOperators};
use std::time::Duration;
let backend = MockSubscriber::from_messages(vec![]);
let parser = |f: &FragmentBuffer<'_>| -> Result<Option<i64>, TransportError> {
Ok(f.as_ref().try_into().ok().map(i64::from_le_bytes))
};
let (_data, status_stream) = aeron_sub_fragment_with_status(
backend,
parser,
AeronSubOptions {
mode: AeronMode::Threaded,
fragment_limit: DEFAULT_FRAGMENT_LIMIT,
},
);
let collected = status_stream.collect();
collected
.clone()
.run(
RunMode::RealTime,
RunFor::Duration(Duration::from_millis(100)),
)
.unwrap();
assert!(
collected
.peek_value()
.into_iter()
.all(|burst| burst.value.is_empty()),
"disconnected backend must not emit any status transition"
);
}
}